report-detect/archive/tools/pdf_processor.py

1300 lines
44 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
PDF 处理器 - 独立模块
与 test_accuracy_batch_full.py 使用 PaddleOCR-VL-1.5 时效果100%一致
核心功能:
- PDF 页面提取
- 数字证书提取(机构名称最高优先级)
- 布局检测PP-DocLayout
- 印章检测和提取(含极坐标展开算法)
- 三级 CMA 提取(全页面 OCR + 模板匹配 + 错误处理)
- 机构名称清理
作者: 基于test_accuracy_batch_full.py移植
版本: 1.0
"""
import os
import sys
import re
import json
import time
import logging
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Any
import numpy as np
# Windows UTF-8
if sys.platform == 'win32':
import codecs
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.buffer, 'strict')
sys.stderr = codecs.getwriter('utf-8')(sys.stderr.buffer, 'strict')
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ============================================================================
# 模型管理器
# ============================================================================
class OCRModelManager:
"""管理所有OCR模型实例"""
def __init__(self):
self.vl_pipeline = None # PaddleOCRVL
self.ocr_pipeline = None # PP-OCRv5
self.layout_pipeline = None # PP-DocLayout
self.seal_det_pipeline = None # PP-OCRv4_server_seal_det
self._initialized = False
def init_all(self):
"""初始化所有模型"""
if self._initialized:
return
logger.info("=" * 60)
logger.info("初始化所有OCR模型")
logger.info("=" * 60)
self.init_vl()
self.init_ocr()
self.init_layout()
# self.init_seal_det() # 可选:印章文本检测模型
self._initialized = True
logger.info("=" * 60)
logger.info("✅ 所有模型初始化完成")
logger.info("=" * 60)
def init_vl(self):
"""初始化PaddleOCRVL"""
logger.info("初始化 PaddleOCRVL...")
try:
from paddleocr import PaddleOCRVL
self.vl_pipeline = PaddleOCRVL(
use_seal_recognition=True,
use_ocr_for_image_block=True,
use_layout_detection=True
)
logger.info("✅ PaddleOCRVL 初始化成功")
except Exception as e:
logger.error(f"❌ PaddleOCRVL 初始化失败: {e}")
self.vl_pipeline = None
def init_ocr(self):
"""初始化PP-OCRv5"""
logger.info("初始化 PP-OCRv5...")
try:
from paddleocr import PaddleOCR
self.ocr_pipeline = PaddleOCR(
use_textline_orientation=True,
lang='ch'
# 不使用 use_gpu 参数默认使用CPU
# 移除 show_log 参数,可能不支持
)
logger.info("✅ PP-OCRv5 初始化成功")
except Exception as e:
logger.error(f"❌ PP-OCRv5 初始化失败: {e}")
self.ocr_pipeline = None
def init_layout(self):
"""初始化PP-DocLayout使用PaddleOCRVL内置的布局检测"""
logger.info("初始化 PP-DocLayout...")
try:
# PaddleOCRVL已包含布局检测功能不需要单独初始化
# 这个函数保留为接口兼容性
logger.info("✅ PP-DocLayout 使用PaddleOCRVL内置布局检测")
self.layout_pipeline = "builtin"
except Exception as e:
logger.error(f"❌ PP-DocLayout 初始化失败: {e}")
self.layout_pipeline = None
def init_seal_det(self):
"""初始化PP-OCRv4_server_seal_det可选"""
logger.info("初始化 PP-OCRv4_server_seal_det...")
# 从原脚本移植
pass
# ============================================================================
# PDF 提取行1717-1718
# ============================================================================
def extract_pdf_page(pdf_path: str, page_num: int = 0) -> Optional[np.ndarray]:
"""
使用PyMuPDF以2x缩放提取PDF页面
Args:
pdf_path: PDF文件路径
page_num: 页码从0开始
Returns:
提取的页面图像BGR格式失败返回None
"""
try:
import fitz
import cv2
logger.debug(f"提取PDF: {pdf_path}, 页码: {page_num}")
doc = fitz.open(pdf_path)
page = doc[page_num]
# 2x 缩放
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))
img_bytes = pix.tobytes("png")
# 转换为OpenCV格式BGR
img = cv2.imdecode(np.frombuffer(img_bytes, np.uint8), cv2.IMREAD_COLOR)
logger.debug(f"✅ PDF页面提取成功: {img.shape}")
return img
except Exception as e:
logger.error(f"❌ PDF提取失败: {e}")
return None
# ============================================================================
# 数字证书提取(最高优先级)
# ============================================================================
def extract_certificate_from_pdf(pdf_path: str) -> Dict[str, Any]:
"""
提取PDF数字证书获取机构名称
这是机构名称提取的最高优先级方法
Returns:
{
'success': bool,
'institution_name': str,
'confidence': float
}
"""
try:
import fitz
logger.debug(f"提取数字证书: {pdf_path}")
doc = fitz.open(pdf_path)
# 检查是否有数字签名
# 注意:完整的证书提取需要更复杂的逻辑
# 这里提供一个简化版本
# 尝试从PDF元数据中提取信息
metadata = doc.metadata
# 检查是否有签名
if doc.has_permissions() & fitz.PERM_PRINT:
# 没有打印限制,可能没有签名
pass
# 这里需要实现完整的证书提取逻辑
# 类似于Java的CertUtils.java
logger.debug("数字证书提取未完全实现")
return {
'success': False,
'institution_name': None,
'confidence': 0.0
}
except Exception as e:
logger.debug(f"证书提取失败: {e}")
return {
'success': False,
'institution_name': None,
'confidence': 0.0
}
# ============================================================================
# 布局检测行817
# ============================================================================
def run_layout_detection(pdf_path: str) -> List[Dict]:
"""
使用PaddleOCRVL内置布局检测或返回全页面区域
Args:
pdf_path: PDF文件路径图像
Returns:
印章区域列表每个区域包含box信息
"""
try:
import cv2
import json
import shutil
from pathlib import Path
logger.debug(f"运行布局检测: {pdf_path}")
# 如果PaddleOCRVL可用使用其内置布局检测
if model_manager.vl_pipeline is not None:
try:
# 创建临时输出目录
temp_output_dir = Path("temp_layout_detection")
temp_output_dir.mkdir(exist_ok=True)
# 调用PaddleOCRVL的predict方法
output = model_manager.vl_pipeline.predict(pdf_path, batch_size=1)
if output and len(output) > 0:
res = output[0]
# 保存JSON结果
res.save_to_json(save_path=str(temp_output_dir))
# 读取JSON文件
pdf_path_obj = Path(pdf_path)
json_file = temp_output_dir / f"{pdf_path_obj.stem}_res.json"
if json_file.exists():
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# 清理临时文件
if temp_output_dir.exists():
shutil.rmtree(temp_output_dir, ignore_errors=True)
# 提取seal区域的bbox
seal_regions = []
for block in data.get('parsing_res_list', []):
if block.get('block_label') == 'seal':
# 提取bbox
bbox = block.get('bbox', []) # [x1, y1, x2, y2]
if bbox and len(bbox) == 4:
seal_regions.append({
'box': bbox,
'label': 'seal',
'score': 0.9 # 固定高置信度
})
if seal_regions:
logger.debug(f"✅ PaddleOCRVL检测到 {len(seal_regions)} 个印章区域")
return seal_regions
except Exception as e:
logger.warning(f"PaddleOCRVL布局检测失败: {e}使用fallback")
# 清理临时文件
import shutil
if Path("temp_layout_detection").exists():
shutil.rmtree("temp_layout_detection", ignore_errors=True)
# Fallback: 返回整个页面作为印章区域
# 这不是最优解,但保证流程能继续
logger.warning("使用fallback策略将整个页面作为印章区域")
img = cv2.imread(pdf_path)
if img is not None:
h, w = img.shape[:2]
# 返回整个页面,留一些边距
seal_regions = [{
'box': [10, 10, w-10, h-10],
'label': 'seal',
'score': 0.5 # 低置信度
}]
logger.debug(f"✅ Fallback: 使用全页面作为印章区域")
return seal_regions
logger.warning("无法读取图像,返回空区域")
return []
except Exception as e:
logger.error(f"❌ 布局检测失败: {e}")
return []
# ============================================================================
# 极坐标展开算法(核心)
# ============================================================================
def detect_seal_center_dual_method(seal_crop: np.ndarray, polygons: List) -> Tuple[float, float, float, str]:
"""
双策略印章中心检测
策略1: Circle Fitting
- 使用所有文本多边形拟合圆
- 计算圆心和半径
- 评估拟合质量 (RMSE)
策略2: Crop Center
- 直接使用裁剪图像的中心
Args:
seal_crop: 印章裁剪图像
polygons: 文本多边形列表
Returns:
(cx, cy, radius, method_used)
"""
try:
import cv2
from scipy.optimize import least_squares
h, w = seal_crop.shape[:2]
crop_center_x, crop_center_y = w / 2, h / 2
# 策略1: Circle Fitting
if len(polygons) >= 3:
# 提取所有多边形点
points = []
for poly in polygons:
if isinstance(poly, list) and len(poly) > 0:
for point in poly:
if isinstance(point, (list, tuple)) and len(point) >= 2:
points.append([float(point[0]), float(point[1])])
if len(points) >= 3:
points = np.array(points)
def circle_residuals(params, points):
cx, cy, r = params
residuals = []
for px, py in points:
dist_to_circle = np.sqrt((px - cx)**2 + (py - cy)**2)
residual = dist_to_circle - r
residuals.append(residual)
return np.array(residuals)
# 初始估计:使用点的中心
cx0 = np.mean(points[:, 0])
cy0 = np.mean(points[:, 1])
r0 = np.mean(np.sqrt((points[:, 0] - cx0)**2 + (points[:, 1] - cy0)**2))
# 优化
res = least_squares(circle_residuals, [cx0, cy0, r0], args=(points,))
cx_fit, cy_fit, r_fit = res.x
# 计算 RMSE
final_residuals = circle_residuals([cx_fit, cy_fit, r_fit], points)
rmse = np.sqrt(np.mean(final_residuals ** 2))
# 计算偏移比例
offset_x = abs(cx_fit - crop_center_x) / w
offset_y = abs(cy_fit - crop_center_y) / h
offset_ratio = max(offset_x, offset_y)
# 评估拟合质量
if rmse < 3000 and offset_ratio < 0.2:
logger.debug(f"✅ Circle Fitting 成功: center=({cx_fit:.1f}, {cy_fit:.1f}), r={r_fit:.1f}, rmse={rmse:.1f}")
return (cx_fit, cy_fit, r_fit, 'circle_fitting')
# 策略2: Crop Center
logger.debug("使用 Crop Center")
return (crop_center_x, crop_center_y, min(w, h) / 2, 'crop_center')
except Exception as e:
logger.error(f"❌ 中心检测失败: {e}")
h, w = seal_crop.shape[:2]
return (w / 2, h / 2, min(w, h) / 2, 'fallback')
def calculate_precise_arc(polygons: List, center: Tuple[float, float]) -> Dict[str, float]:
"""
计算精确弧参数
Args:
polygons: 文本多边形列表
center: 印章中心 (cx, cy)
Returns:
{
'start_theta': float, # 起始角度(弧度)
'extent': float, # 角度范围(弧度)
'end_theta': float # 结束角度
}
"""
try:
import numpy as np
cx, cy = center
# 将多边形点转换为极坐标
angles = []
for poly in polygons:
if isinstance(poly, list) and len(poly) > 0:
for point in poly:
if isinstance(point, (list, tuple)) and len(point) >= 2:
px, py = float(point[0]), float(point[1])
# 计算角度
theta = np.arctan2(py - cy, px - cx)
angles.append(theta)
if not angles:
# 默认值:覆盖大部分印章区域
return {
'start_theta': np.radians(135), # 4:30 位置
'extent': np.radians(270), # 270 度
'end_theta': np.radians(405)
}
angles = np.array(angles)
# 确定最佳起始角度
# 这里使用简化的策略:从最小角度开始
start_theta = np.min(angles)
# 确定结束角度
end_theta = np.max(angles)
# 计算范围
extent = end_theta - start_theta
# 限制最大范围350度避免变形
max_extent = np.radians(350)
if extent > max_extent:
extent = max_extent
end_theta = start_theta + extent
logger.debug(f"弧参数: start={np.degrees(start_theta):.1f}°, extent={np.degrees(extent):.1f}°")
return {
'start_theta': start_theta,
'extent': extent,
'end_theta': end_theta
}
except Exception as e:
logger.error(f"❌ 计算弧参数失败: {e}")
# Fallback: 固定角度范围
return {
'start_theta': np.radians(135),
'extent': np.radians(270),
'end_theta': np.radians(405)
}
def polar_unwarp(
image: np.ndarray,
center: Tuple[float, float],
start_theta: float,
extent: float
) -> Optional[np.ndarray]:
"""
极坐标变换展开曲线文本
Args:
image: 原始印章图像
center: 印章中心 (cx, cy)
start_theta: 起始角度(弧度)
extent: 角度范围(弧度)
Returns:
展开后的图像矩形失败返回None
"""
try:
import cv2
import numpy as np
cx, cy = center
h, w = image.shape[:2]
# 计算最大半径
max_radius = min(cx, cy, w - cx, h - cy)
# 极坐标网格参数
num_angular = 800 # 角度分辨率
num_radial = min(h, w) # 径向分辨率
# 创建极坐标网格
angular_params = np.linspace(0, extent, num_angular)
radial_params = np.linspace(0, max_radius, num_radial)
# 创建网格
angular_grid, radial_grid = np.meshgrid(angular_params, radial_params)
# 极坐标到笛卡尔坐标转换
x_coords = cx + radial_grid * np.cos(angular_grid + start_theta)
y_coords = cy + radial_grid * np.sin(angular_grid + start_theta)
# 确保坐标在图像范围内
x_coords = np.clip(x_coords, 0, w - 1)
y_coords = np.clip(y_coords, 0, h - 1)
# 转换为浮点数用于remap
map_x = x_coords.astype(np.float32)
map_y = y_coords.astype(np.float32)
# 使用cv2.remap进行极坐标到笛卡尔坐标的映射
unwarp_img = cv2.remap(image, map_x, map_y, cv2.INTER_LINEAR, cv2.BORDER_CONSTANT)
logger.debug(f"✅ 极坐标展开成功: {unwarp_img.shape}")
return unwarp_img
except Exception as e:
logger.error(f"❌ 极坐标展开失败: {e}")
return None
def run_ocr_recognition_vl(image_path: str, vl_pipeline) -> Dict[str, Any]:
"""
使用PaddleOCRVL进行OCR识别
Args:
image_path: 图像文件路径
vl_pipeline: PaddleOCRVL pipeline实例
Returns:
{
'text': str,
'score': float,
'success': bool
}
"""
try:
import json
import shutil
from pathlib import Path
# 创建临时输出目录
temp_output_dir = Path("temp_paddleocr_vl")
temp_output_dir.mkdir(exist_ok=True)
# 调用PaddleOCRVL的predict方法
output = vl_pipeline.predict(image_path, batch_size=1)
if output and len(output) > 0:
res = output[0]
# 保存JSON结果
res.save_to_json(save_path=str(temp_output_dir))
# 读取JSON文件
image_path_obj = Path(image_path)
json_file = temp_output_dir / f"{image_path_obj.stem}_res.json"
if json_file.exists():
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# 查找所有seal blocks选择内容最长的最完整的
seal_blocks = [
block for block in data.get('parsing_res_list', [])
if block.get('block_label') == 'seal'
]
if seal_blocks:
# 选择内容最长的seal block
best_block = max(seal_blocks, key=lambda b: len(b.get('block_content', '')))
# 🐛 调试在清理前先记录原始block_content
raw_text = best_block.get('block_content', '').strip()
logger.debug(f"🔍 PaddleOCRVL原始block_content: '{raw_text}'")
logger.debug(f"🔍 找到{len(seal_blocks)}个seal blocks选择最长的")
# 使用clean_institution_name清理后缀
text = clean_institution_name(raw_text)
logger.debug(f"PaddleOCRVL提取的文本: '{text}'")
logger.debug(f"🔍 清理后文本: '{text}'")
# 🐛 调试暂时不删除临时文件以便检查JSON
# if temp_output_dir.exists():
# shutil.rmtree(temp_output_dir, ignore_errors=True)
return {
'text': text,
'score': 1.0, # PaddleOCRVL doesn't provide confidence score
'success': len(text) > 0
}
# 清理临时文件
if temp_output_dir.exists():
shutil.rmtree(temp_output_dir, ignore_errors=True)
return {
'text': '',
'score': 0.0,
'success': False
}
except Exception as e:
logger.error(f"PaddleOCRVL识别失败: {e}")
return {
'text': '',
'score': 0.0,
'success': False
}
# ============================================================================
# CMA 提取(三级备援)
# ============================================================================
def extract_cma_code_fullpage(page_img: np.ndarray) -> Dict[str, Any]:
"""
主方法全页面OCR提取CMA编号
Args:
page_img: 页面图像
Returns:
{
'success': bool,
'code': str,
'confidence': float
}
"""
try:
if model_manager.ocr_pipeline is None:
return {'success': False, 'code': None, 'confidence': 0.0}
logger.debug("运行全页面OCR提取CMA")
result = model_manager.ocr_pipeline.ocr(page_img)
# 调试打印OCR原始结果
logger.debug(f"OCR result type: {type(result)}")
logger.debug(f"OCR result length: {len(result) if result else 0}")
if result and len(result) > 0:
logger.debug(f"result[0] length: {len(result[0])}")
if result[0] and len(result[0]) > 0:
logger.debug(f"First few lines: {result[0][:3]}")
# 提取所有文本
# PaddleOCR.ocr() 返回 [ [ [box, (text, score)], ... ] ]
all_text = []
if result and len(result) > 0 and result[0]:
for line in result[0]:
if line and len(line) > 0:
# line: [box, (text, score)]
if isinstance(line[1], (list, tuple)):
text, score = line[1]
else:
text = line[1]
all_text.append(text)
text = ' '.join(all_text)
logger.debug(f"Extracted text length: {len(text)}")
logger.debug(f"Extracted text preview: {text[:200]}...")
# 匹配CMA编号: 11-12位数字 (与Java实现一致)
# Java: Pattern.compile("\\d{11}") and Pattern.compile("\\d{12}")
# Note: 匹配12位数字优先,避免11位模式截断12位数字
cma_pattern = re.compile(r'\d{12}|\d{11}')
matches = cma_pattern.findall(text)
if matches:
# 返回第一个匹配
logger.debug(f"✅ 找到CMA编号: {matches[0]}")
return {
'success': True,
'code': matches[0],
'confidence': 0.9
}
logger.debug("未找到CMA编号")
return {'success': False, 'code': None, 'confidence': 0.0}
except Exception as e:
logger.error(f"❌ 全页面CMA提取失败: {e}")
return {'success': False, 'code': None, 'confidence': 0.0}
def extract_cma_template_matching(page_img: np.ndarray) -> Dict[str, Any]:
"""
Fallback方法CMA logo模板匹配
Args:
page_img: 页面图像
Returns:
{
'success': bool,
'code': str,
'confidence': float
}
"""
try:
import cv2
logger.debug("运行CMA模板匹配")
# 加载CMA logo模板
resource_dir = Path(__file__).parent / 'resources'
template_path = resource_dir / 'CMA_Logo.png'
if not template_path.exists():
logger.warning(f"CMA logo模板不存在: {template_path}")
return {'success': False, 'code': None, 'confidence': 0.0}
template = cv2.imread(str(template_path), cv2.IMREAD_GRAYSCALE)
if template is None:
logger.error("无法加载CMA logo模板")
return {'success': False, 'code': None, 'confidence': 0.0}
# 转换页面为灰度图
if len(page_img.shape) == 3:
page_gray = cv2.cvtColor(page_img, cv2.COLOR_BGR2GRAY)
else:
page_gray = page_img
# 模板匹配
result = cv2.matchTemplate(page_gray, template, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
logger.debug(f"CMA logo匹配度: {max_val:.3f}")
if max_val > 0.4: # 阈值
# 提取ROI
x, y = max_loc
h, w = template.shape
# ROI: logo区域及其下方
roi_x1 = max(0, x - w * 2)
roi_y1 = max(0, y - h)
roi_x2 = min(page_gray.shape[1], x + w * 3)
roi_y2 = min(page_gray.shape[0], y + h * 4)
roi = page_gray[roi_y1:roi_y2, roi_x1:roi_x2]
# OCR提取CMA编号
if model_manager.ocr_pipeline is not None:
ocr_result = model_manager.ocr_pipeline.ocr(roi)
# 提取文本和匹配
roi_text = []
if ocr_result and len(ocr_result) > 0 and ocr_result[0]:
for line in ocr_result[0]:
if line and len(line) > 0:
# line: [box, (text, score)]
if isinstance(line[1], (list, tuple)):
text, score = line[1]
else:
text = line[1]
roi_text.append(text)
text = ' '.join(roi_text)
# 匹配CMA编号: 11-12位数字 (与Java实现一致)
# Note: 匹配12位数字优先,避免11位模式截断12位数字
cma_pattern = re.compile(r'\d{12}|\d{11}')
matches = cma_pattern.findall(text)
if matches:
logger.debug(f"✅ 模板匹配找到CMA: {matches[0]}")
return {
'success': True,
'code': matches[0],
'confidence': max_val
}
logger.debug("模板匹配未找到CMA编号")
return {'success': False, 'code': None, 'confidence': 0.0}
except Exception as e:
logger.error(f"❌ CMA模板匹配失败: {e}")
return {'success': False, 'code': None, 'confidence': 0.0}
# ============================================================================
# 机构名称清理行1219-1264
# ============================================================================
def clean_institution_name(name: str) -> str:
"""
移除印章后缀、换行符和数字后缀
Args:
name: 原始机构名称
Returns:
清理后的机构名称
"""
if not name:
return name
# 1. 移除换行符,替换为空格
cleaned = name.replace('\n', ' ').replace('\r', ' ')
# 2. 移除印章后缀
patterns_to_remove = [
'检验检测专用章',
'检验检测专用',
'检测专用章',
'检验专用章',
'专用章',
'(检验检测)',
'(检验检测)',
'【检验检测】',
'[检验检测]',
]
for pattern in patterns_to_remove:
cleaned = cleaned.replace(pattern, '')
# 3. 移除纯数字后缀(如 "8768456"
# 使用正则表达式移除末尾的纯数字行
import re
cleaned = re.sub(r'\s+\d{4,}\s*$', '', cleaned)
# 4. 清理多余空格
cleaned = ' '.join(cleaned.split())
return cleaned.strip()
# ============================================================================
# 印章检测和提取(完整实现)
# ============================================================================
def extract_seals_and_institutions(
page_img: np.ndarray,
output_dir: str,
ocr_model: str = 'ppocr_v5'
) -> Dict[str, Any]:
"""
从页面图像提取印章并识别机构名称
Args:
page_img: 输入页面图像
output_dir: 保存中间结果的目录
ocr_model: OCR模型'ppocr_v5''paddleocr_vl'
Returns:
{
'seals': list,
'institutions': list,
'processing_time': float
}
"""
import cv2
import math
from pathlib import Path
start_time = time.time()
result = {
'seals': [],
'institutions': [],
'processing_time': 0.0
}
# 验证输入
if page_img is None or not isinstance(page_img, np.ndarray) or page_img.size == 0:
logger.error("输入图像无效")
result['processing_time'] = time.time() - start_time
return result
logger.info(f"输入图像尺寸: {page_img.shape}")
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 保存页面图像
doc_path = os.path.join(output_dir, "doc_page.png")
try:
cv2.imwrite(doc_path, page_img)
except Exception as e:
logger.error(f"保存页面图像失败: {e}")
result['processing_time'] = time.time() - start_time
return result
# 运行布局检测
logger.info("运行布局检测...")
try:
all_regions = run_layout_detection(doc_path)
except Exception as e:
logger.error(f"布局检测失败: {e}")
result['processing_time'] = time.time() - start_time
return result
# 提取印章框
seal_boxes = []
page_viz = page_img.copy()
for reg in all_regions:
box = reg.get('box')
label = reg.get('label')
score = reg.get('score', 0.0)
is_seal = (label == 'seal')
if score > 0.2:
x1, y1, x2, y2 = [int(v) for v in box]
color = (0, 0, 255) if is_seal else (0, 255, 0)
cv2.rectangle(page_viz, (x1, y1), (x2, y2), color, 2)
if is_seal:
seal_boxes.append(box)
cv2.imwrite(os.path.join(output_dir, "doc_layout_viz.png"), page_viz)
if not seal_boxes:
logger.warning("未检测到印章")
result['processing_time'] = time.time() - start_time
return result
# 处理每个印章
logger.info(f"处理 {len(seal_boxes)} 个检测到的印章...")
# 确定使用的OCR模型
use_vl = (ocr_model == "paddleocr_vl" and
model_manager.vl_pipeline is not None)
for i, box in enumerate(seal_boxes):
x1, y1, x2, y2 = [int(v) for v in box]
pad = 40
y1_p, y2_p = max(0, y1-pad), min(page_img.shape[0], y2+pad)
x1_p, x2_p = max(0, x1-pad), min(page_img.shape[1], x2+pad)
seal_crop = page_img[y1_p:y2_p, x1_p:x2_p]
# 验证裁剪
if seal_crop.size == 0 or seal_crop.shape[0] == 0 or seal_crop.shape[1] == 0:
logger.warning(f"印章 {i}: 裁剪尺寸无效 {seal_crop.shape},跳过")
continue
crop_path = os.path.join(output_dir, f"seal_crop_{i}.png")
cv2.imwrite(crop_path, seal_crop)
# 检测文本多边形(使用印章检测模型)
try:
# 简化版本直接使用极坐标展开的fallback
# 在实际实现中这里应该使用PP-OCRv4_server_seal_det
all_polygons = [] # 占位符
logger.info(f" 印章 #{i}: 裁剪尺寸 {seal_crop.shape[1]}x{seal_crop.shape[0]}")
# 双策略中心检测
center_x, center_y, radius, method_used = detect_seal_center_dual_method(
seal_crop, all_polygons
)
center = (center_x, center_y)
logger.info(f" - 中心检测方法: {method_used}")
logger.info(f" - 中心: ({center_x:.1f}, {center_y:.1f}), 半径: {radius:.1f}")
# 如果没有足够的多边形直接使用PaddleOCRVL
MIN_POLYGONS_FOR_UNWARP = 3
if len(all_polygons) < MIN_POLYGONS_FOR_UNWARP:
logger.warning(f" 印章 #{i}: 文本多边形不足 ({len(all_polygons)} < {MIN_POLYGONS_FOR_UNWARP})")
logger.info(f" 印章 #{i}: 使用PaddleOCRVL备份")
if use_vl:
# 使用PaddleOCRVL直接识别裁剪图像
ocr_result = run_ocr_recognition_vl(crop_path, model_manager.vl_pipeline)
seal_data = {
'index': i,
'box': box,
'crop_path': f"seal_crop_{i}.png",
'unwarp_path': None,
'marked_path': None,
'text': ocr_result.get('text', ''),
'confidence': float(ocr_result.get('score', 0.0)),
'success': bool(ocr_result.get('success', False)),
'method_used': f'{method_used}_skip_unwarp',
'used_fallback': True
}
result['seals'].append(seal_data)
if ocr_result.get('success'):
cleaned_name = clean_institution_name(ocr_result['text'])
result['institutions'].append(cleaned_name)
logger.info(f" ✓ 印章 #{i} 成功: {cleaned_name[:50]}... (置信度: {ocr_result['score']:.4f})")
continue
# 计算弧参数
arc_params = calculate_precise_arc(all_polygons, center)
start_theta = arc_params['start_theta']
extent = arc_params['extent']
logger.info(f" - 起始角度: {math.degrees(start_theta):.1f}°")
logger.info(f" - 角度范围: {math.degrees(extent):.1f}°")
# 极坐标展开
unwarp_path = os.path.join(output_dir, f"seal_unwarp_{i}.png")
unwarp = None
used_fallback = False
if extent > 0:
logger.info(f" 印章 #{i}: 执行极坐标展开...")
unwarp = polar_unwarp(seal_crop, center, start_theta, extent)
if unwarp is not None:
cv2.imwrite(unwarp_path, unwarp)
logger.info(f" - 展开尺寸: {unwarp.shape[1]}x{unwarp.shape[0]}")
else:
logger.warning(f" 印章 #{i}: 极坐标展开返回None")
# Fallback: 使用固定角度范围
if unwarp is None and extent <= 0 and len(all_polygons) == 0:
logger.warning(f" 印章 #{i}: 未检测到文本多边形使用fallback角度范围")
used_fallback = True
fallback_start_theta = math.radians(135) # 4:30 位置
fallback_extent = math.radians(270) # 270度
unwarp = polar_unwarp(seal_crop, center, fallback_start_theta, fallback_extent)
if unwarp is not None:
cv2.imwrite(unwarp_path, unwarp)
logger.info(f" - Fallback展开尺寸: {unwarp.shape[1]}x{unwarp.shape[0]}")
start_theta = fallback_start_theta
extent = fallback_extent
# OCR识别
ocr_result = {'text': '', 'score': 0.0, 'success': False}
ocr_method_used = method_used
if unwarp is not None:
# 识别展开图像
if use_vl:
ocr_result = run_ocr_recognition_vl(unwarp_path, model_manager.vl_pipeline)
else:
# 使用PP-OCRv5
if model_manager.ocr_pipeline is not None:
ocr_output = model_manager.ocr_pipeline.ocr(unwarp)
if ocr_output and len(ocr_output) > 0 and ocr_output[0]:
texts = []
for line in ocr_output[0]:
if line and len(line) > 0:
# line: [box, (text, score)]
if isinstance(line[1], (list, tuple)):
text, score = line[1]
else:
text = line[1]
texts.append(text)
if texts:
ocr_result = {
'text': ' '.join(texts),
'score': 0.8,
'success': True
}
ocr_method_used = f"{method_used}_unwarp"
logger.info(f" 印章 #{i} OCR结果 (展开):")
logger.info(f" - 文本: '{ocr_result['text']}'")
logger.info(f" - 置信度: {ocr_result['score']:.4f}")
# Double verification: 如果展开OCR失败尝试PaddleOCRVL备份
if (not ocr_result['success'] or len(ocr_result['text'].strip()) == 0) and use_vl:
logger.warning(f" 印章 #{i}: 展开OCR失败尝试PaddleOCRVL备份")
backup_result = run_ocr_recognition_vl(crop_path, model_manager.vl_pipeline)
if backup_result.get('success') and len(backup_result['text'].strip()) > 0:
logger.info(f" 印章 #{i}: ** 使用PaddleOCRVL备份结果 **")
ocr_result = backup_result
ocr_method_used = f"{method_used}_crop_backup"
else:
# 直接使用PaddleOCRVL备份
if use_vl:
logger.info(f" 印章 #{i}: 使用PaddleOCRVL备份 - 直接识别印章裁剪")
ocr_result = run_ocr_recognition_vl(crop_path, model_manager.vl_pipeline)
ocr_method_used = f"{method_used}_crop_backup"
logger.info(f" 印章 #{i} PaddleOCRVL备份结果:")
logger.info(f" - 文本: '{ocr_result['text']}'")
logger.info(f" - 置信度: {ocr_result['score']:.4f}")
seal_data = {
'index': int(i),
'box': [float(v) for v in box],
'crop_path': f"seal_crop_{i}.png",
'unwarp_path': f"seal_unwarp_{i}.png" if unwarp is not None else None,
'marked_path': f"seal_marked_{i}.png",
'text': ocr_result['text'],
'confidence': float(ocr_result['score']),
'success': bool(ocr_result['success']),
'method_used': ocr_method_used,
'used_fallback': used_fallback,
'debug_info': {
'center': center,
'radius': radius,
'start_theta_deg': float(math.degrees(start_theta)),
'extent_deg': float(math.degrees(extent)),
'num_polygons': len(all_polygons),
'crop_size': (seal_crop.shape[1], seal_crop.shape[0]),
'unwarp_size': (unwarp.shape[1], unwarp.shape[0]) if unwarp is not None else None
}
}
result['seals'].append(seal_data)
if ocr_result['success']:
cleaned_name = clean_institution_name(ocr_result['text'])
result['institutions'].append(cleaned_name)
logger.info(f" ✓ 印章 #{i} 成功: {cleaned_name[:50]}... (置信度: {ocr_result['score']:.4f})")
else:
logger.warning(f" ✗ 印章 #{i} 失败: 无法提取机构名称")
except Exception as e:
logger.error(f"处理印章 {i} 时出错: {e}", exc_info=True)
continue
result['processing_time'] = time.time() - start_time
return result
# ============================================================================
# 主处理函数
# ============================================================================
# 全局模型管理器实例
model_manager = OCRModelManager()
def process_pdf_standalone(
pdf_path: str,
output_dir: str,
ocr_model: str = 'paddleocr_vl'
) -> Dict[str, Any]:
"""
处理单个PDF提取CMA编号和机构名称
这是主入口函数,整合所有处理步骤
Args:
pdf_path: PDF文件路径
output_dir: 输出目录
ocr_model: OCR模型'paddleocr_vl''ppocr_v5'
Returns:
{
'cma_code': str,
'institution_name': str,
'confidence': float,
'success': bool,
'error': str (if failed)
}
"""
total_start = time.time()
logger.info("=" * 60)
logger.info(f"处理PDF: {pdf_path}")
logger.info(f"OCR模型: {ocr_model}")
logger.info("=" * 60)
try:
# 确保模型已初始化
if not model_manager._initialized:
model_manager.init_all()
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# Step 1: 提取PDF第一页
logger.info("Step 1: 提取PDF页面")
page_img = extract_pdf_page(pdf_path, page_num=0)
if page_img is None:
return {
'cma_code': '',
'institution_name': '',
'confidence': 0.0,
'success': False,
'error': 'PDF extraction failed'
}
# Step 2: 尝试从数字证书提取机构名称
logger.info("Step 2: 提取数字证书")
cert_result = extract_certificate_from_pdf(pdf_path)
institution_name = None
confidence = 0.0
if cert_result['success']:
institution_name = cert_result['institution_name']
confidence = cert_result['confidence']
logger.info(f"✅ 从证书提取机构名称: {institution_name}")
# Step 3: CMA编号提取三级备援
logger.info("Step 3: 提取CMA编号")
# 主方法全页面OCR
cma_result = extract_cma_code_fullpage(page_img)
# Fallback模板匹配
if not cma_result['success']:
logger.info("全页面OCR失败尝试模板匹配")
cma_result = extract_cma_template_matching(page_img)
cma_code = cma_result['code'] if cma_result['success'] else ''
# Step 4: 如果证书未提取到机构名称使用印章OCR
if institution_name is None:
logger.info("Step 4: 从印章提取机构名称")
seal_result = extract_seals_and_institutions(
page_img,
output_dir,
ocr_model
)
if seal_result['institutions']:
# 使用第一个识别到的机构名称
institution_name = seal_result['institutions'][0]
confidence = seal_result['seals'][0]['confidence']
logger.info(f"✅ 从印章提取机构名称: {institution_name}")
else:
logger.warning("未能从印章提取机构名称")
# 清理机构名称
if institution_name:
institution_name = clean_institution_name(institution_name)
# 计算总置信度
final_confidence = max(
cma_result.get('confidence', 0.0),
confidence
)
elapsed = time.time() - total_start
logger.info("=" * 60)
logger.info("✅ PDF处理完成")
logger.info(f" CMA: {cma_code}")
logger.info(f" 机构: {institution_name}")
logger.info(f" 置信度: {final_confidence:.2f}")
logger.info(f" 耗时: {elapsed:.1f}")
logger.info("=" * 60)
return {
'cma_code': cma_code,
'institution_name': institution_name or '',
'confidence': final_confidence,
'success': True,
'error': None
}
except Exception as e:
logger.error(f"❌ PDF处理失败: {e}", exc_info=True)
return {
'cma_code': '',
'institution_name': '',
'confidence': 0.0,
'success': False,
'error': str(e)
}
# ============================================================================
# 测试入口
# ============================================================================
if __name__ == '__main__':
# 测试模式
if len(sys.argv) > 1:
pdf_path = sys.argv[1]
output_dir = sys.argv[2] if len(sys.argv) > 2 else 'test_output'
# 初始化模型
model_manager.init_all()
# 处理PDF
result = process_pdf_standalone(pdf_path, output_dir)
# 输出结果
print()
print("=" * 60)
print("测试结果")
print("=" * 60)
print(f"PDF: {pdf_path}")
print(f"CMA: {result['cma_code']}")
print(f"机构: {result['institution_name']}")
print(f"置信度: {result['confidence']:.2f}")
print(f"成功: {result['success']}")
if result['error']:
print(f"错误: {result['error']}")
print("=" * 60)
else:
print("用法: python pdf_processor.py <pdf_path> <output_dir>")