2097 lines
84 KiB
Python
2097 lines
84 KiB
Python
"""
|
|
CMA Code Extraction & Institution Name - Batch Accuracy Testing Script (Enhanced)
|
|
|
|
This script implements comprehensive batch accuracy testing for BOTH:
|
|
1. CMA code extraction
|
|
2. Institution name extraction from seals
|
|
|
|
Uses the complete workflow from v_verify_logic.py including:
|
|
- Layout detection (Paddlex PP-DocLayout-L)
|
|
- Seal detection and refinement
|
|
- Polar unwarping
|
|
- OCR text recognition for institution names
|
|
|
|
Author: Claude Code
|
|
Date: 2025-02-05
|
|
Version: 2.0 (Enhanced with seal/institution extraction)
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import logging
|
|
import re
|
|
import math
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, List, Tuple, Optional, Any
|
|
|
|
# IMPORTANT: Set environment variables BEFORE any paddle imports!
|
|
# This prevents slow network checks and enables offline mode
|
|
os.environ["DISABLE_MODEL_SOURCE_CHECK"] = "True"
|
|
os.environ["PADDLE_PDX_DISABLE_MODEL_SOURCE_CHECK"] = "True"
|
|
os.environ["HUB_DISABLE_MODEL_SOURCE_CHECK"] = "True"
|
|
os.environ["PADDLEHUB_NO_FETCH_LATEST"] = "True"
|
|
|
|
import numpy as np
|
|
|
|
# Set UTF-8 encoding for Windows console
|
|
if sys.platform == 'win32':
|
|
import codecs
|
|
try:
|
|
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.buffer, 'strict')
|
|
sys.stderr = codecs.getwriter('utf-8')(sys.stderr.buffer, 'strict')
|
|
except:
|
|
pass
|
|
|
|
|
|
class NumpyEncoder(json.JSONEncoder):
|
|
"""Custom JSON encoder for numpy types"""
|
|
def default(self, obj):
|
|
if isinstance(obj, np.integer):
|
|
return int(obj)
|
|
if isinstance(obj, np.floating):
|
|
return float(obj)
|
|
if isinstance(obj, np.ndarray):
|
|
return obj.tolist()
|
|
return super().default(obj)
|
|
|
|
|
|
try:
|
|
import fitz # PyMuPDF
|
|
import cv2
|
|
from paddleocr import PaddleOCR, SealTextDetection, TextRecognition
|
|
try:
|
|
from paddleocr import PaddleOCRVL
|
|
PADDLEOCRVL_AVAILABLE = True
|
|
except ImportError:
|
|
PADDLEOCRVL_AVAILABLE = False
|
|
print("Warning: PaddleOCRVL not available. Install with: pip install paddleocr[doc-parser]")
|
|
try:
|
|
import paddlex as px
|
|
PADDLEX_AVAILABLE = True
|
|
except ImportError:
|
|
PADDLEX_AVAILABLE = False
|
|
print("Warning: PaddleX not available. Layout detection will be disabled.")
|
|
print(" Install with: pip install paddlex")
|
|
from Levenshtein import distance as levenshtein_distance
|
|
except ImportError as e:
|
|
print(f"Error: Required dependency not found: {e}")
|
|
print("Please install: pip install python-Levenshtein paddleocr paddlex pymupdf-ng opencv-python numpy")
|
|
sys.exit(1)
|
|
|
|
# Note: Import statements above may take 5-10 seconds on first run
|
|
# due to PaddleOCR/PaddleX library initialization
|
|
|
|
# Import CMA extraction module
|
|
try:
|
|
from cma_extraction_robust import extract_cma_code_fullpage
|
|
except ImportError as e:
|
|
print(f"Error: Cannot import cma_extraction_robust.py: {e}")
|
|
sys.exit(1)
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('test_accuracy_full.log', encoding='utf-8'),
|
|
logging.StreamHandler(sys.stderr)
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Constants
|
|
PDF_DIR = Path(r"src/test/resources/data/pdfs")
|
|
RESULTS_JSON = Path(r"src/test/resources/data/results.json")
|
|
OUTPUT_DIR = Path("test_reports_full")
|
|
BATCH_SIZE = 20
|
|
SIMILARITY_THRESHOLD = 85.0
|
|
|
|
# OCR Model Configuration
|
|
# Options: "ppocr_v5" (default), "paddleocr_vl"
|
|
OCR_MODEL = os.environ.get("OCR_MODEL", "ppocr_v5")
|
|
|
|
# CMA Template Matching Configuration
|
|
CMA_LOGO_PATH = Path("template/CMA_Logo.png")
|
|
CMA_LOGO_TEMPLATE = None
|
|
CMA_LOGO_TEMPLATE_RGB = None
|
|
|
|
|
|
# ============ Helper Functions ============
|
|
|
|
def imwrite_safe(file_path, img):
|
|
"""
|
|
Write image file safely, handling Chinese paths on Windows.
|
|
|
|
On Windows, cv2.imwrite fails with Chinese paths. This function uses
|
|
cv2.imencode + tofile as a fallback.
|
|
|
|
Args:
|
|
file_path: Path to save the image
|
|
img: Image data (numpy array)
|
|
|
|
Returns:
|
|
bool: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
# Try standard cv2.imwrite first
|
|
success = cv2.imwrite(file_path, img)
|
|
if success:
|
|
return True
|
|
|
|
# Fallback: Use imencode + tofile for Chinese paths
|
|
is_success, buffer = cv2.imencode(".png", img)
|
|
if is_success:
|
|
buffer.tofile(file_path)
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Failed to write image to {file_path}: {e}")
|
|
return False
|
|
|
|
|
|
# ============ CMA Template Matching Functions ============
|
|
|
|
def load_cma_template_global():
|
|
"""Load CMA logo template once globally"""
|
|
global CMA_LOGO_TEMPLATE, CMA_LOGO_TEMPLATE_RGB
|
|
if CMA_LOGO_TEMPLATE is not None:
|
|
return True
|
|
|
|
if not CMA_LOGO_PATH.exists():
|
|
logger.warning(f"CMA logo template not found at {CMA_LOGO_PATH}")
|
|
return False
|
|
|
|
try:
|
|
# Read template image (grayscale)
|
|
CMA_LOGO_TEMPLATE = cv2.imread(str(CMA_LOGO_PATH), cv2.IMREAD_GRAYSCALE)
|
|
CMA_LOGO_TEMPLATE_RGB = cv2.cvtColor(CMA_LOGO_TEMPLATE, cv2.COLOR_GRAY2BGR)
|
|
logger.info(f"Loaded CMA logo template: {CMA_LOGO_PATH} {CMA_LOGO_TEMPLATE.shape}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to load CMA logo template: {e}")
|
|
return False
|
|
|
|
|
|
def match_cma_template(page_img, method=cv2.TM_CCOEFF_NORMED):
|
|
"""Perform template matching for CMA logo"""
|
|
if CMA_LOGO_TEMPLATE is None:
|
|
if not load_cma_template_global():
|
|
return None
|
|
|
|
# Convert to grayscale if needed
|
|
if len(page_img.shape) == 3:
|
|
page_gray = cv2.cvtColor(page_img, cv2.COLOR_BGR2GRAY)
|
|
else:
|
|
page_gray = page_img
|
|
|
|
# Execute template matching
|
|
result = cv2.matchTemplate(page_gray, CMA_LOGO_TEMPLATE, method=method)
|
|
if result is None:
|
|
return None
|
|
|
|
_, max_val, _, max_loc = cv2.minMaxLoc(result)
|
|
|
|
# Calculate center of match
|
|
match_center = (max_loc[0] + CMA_LOGO_TEMPLATE.shape[1] // 2,
|
|
max_loc[1] + CMA_LOGO_TEMPLATE.shape[0] // 2)
|
|
|
|
return {
|
|
'max_val': float(max_val),
|
|
'match_center': match_center,
|
|
'match_loc': max_loc
|
|
}
|
|
|
|
|
|
def extract_cma_from_roi(roi_img, ocr_engine, output_dir=None):
|
|
"""Run OCR specifically on CMA ROI"""
|
|
result = {
|
|
'code': None,
|
|
'confidence': 0.0,
|
|
'success': False
|
|
}
|
|
|
|
if roi_img is None or roi_img.size == 0:
|
|
print(" [TM] ROI image is empty, skipping")
|
|
return result
|
|
|
|
h, w = roi_img.shape[:2]
|
|
print(f" [TM] ROI size: {w}x{h}")
|
|
|
|
try:
|
|
# Use existing OCR functions if possible, or direct engine call
|
|
# Try .ocr() first (legacy), fall back to .predict() (new API)
|
|
raw_result = None
|
|
if hasattr(ocr_engine, 'ocr'):
|
|
try:
|
|
raw_result = ocr_engine.ocr(roi_img)
|
|
except TypeError:
|
|
# New API doesn't support legacy .ocr() kwargs
|
|
pass
|
|
if raw_result is None and hasattr(ocr_engine, 'predict'):
|
|
try:
|
|
raw_result = ocr_engine.predict(roi_img)
|
|
except Exception as pred_err:
|
|
print(f" [TM] predict() also failed: {pred_err}")
|
|
if raw_result is None:
|
|
print(" [TM] OCR engine could not process ROI")
|
|
return result
|
|
|
|
|
|
if not raw_result or len(raw_result) == 0 or raw_result[0] is None:
|
|
print(" [TM] OCR returned no results")
|
|
return result
|
|
|
|
ocr_data = raw_result[0]
|
|
rec_texts = []
|
|
rec_scores = []
|
|
|
|
# Handle different result formats
|
|
if isinstance(ocr_data, dict) or hasattr(ocr_data, 'get'):
|
|
# predict() API: returns dict-like with rec_texts, rec_scores
|
|
try:
|
|
data_dict = dict(ocr_data) if not isinstance(ocr_data, dict) else ocr_data
|
|
rec_texts = list(data_dict.get('rec_texts', []))
|
|
rec_scores = list(data_dict.get('rec_scores', []))
|
|
print(f" [TM] Using predict() API format, found {len(rec_texts)} lines")
|
|
except Exception as e:
|
|
print(f" [TM] Failed to parse predict() result: {e}")
|
|
elif isinstance(ocr_data, list):
|
|
# ocr() API: returns [[box, (text, score)], ...]
|
|
for line in ocr_data:
|
|
try:
|
|
if isinstance(line[1], (list, tuple)):
|
|
text = str(line[1][0])
|
|
score = float(line[1][1])
|
|
elif isinstance(line[1], str):
|
|
text = line[1]
|
|
score = 0.9
|
|
else:
|
|
text = str(line[1])
|
|
score = 0.5
|
|
rec_texts.append(text)
|
|
rec_scores.append(score)
|
|
except (IndexError, TypeError, ValueError) as e:
|
|
logger.warning(f"Skipped OCR line due to parse error: {e}")
|
|
continue
|
|
print(f" [TM] Using ocr() API format, found {len(rec_texts)} lines")
|
|
|
|
|
|
print(f" [TM] OCR found {len(rec_texts)} text lines")
|
|
for i, t in enumerate(rec_texts):
|
|
print(f" [TM] Line {i}: '{t}' (score: {rec_scores[i]:.2f})")
|
|
|
|
import re
|
|
cma_candidates = []
|
|
for i, text in enumerate(rec_texts):
|
|
numbers = re.findall(r'\d{11,15}', str(text))
|
|
for num in numbers:
|
|
# Take first 12 digits if longer
|
|
code = num[:12] if len(num) > 12 else num
|
|
cma_candidates.append({
|
|
'code': code,
|
|
'confidence': rec_scores[i]
|
|
})
|
|
|
|
if cma_candidates:
|
|
cma_candidates.sort(key=lambda x: x['confidence'], reverse=True)
|
|
best = cma_candidates[0]
|
|
result['code'] = best['code']
|
|
result['confidence'] = best['confidence']
|
|
result['success'] = True
|
|
print(f" [TM] Best CMA candidate: {best['code']} (conf: {best['confidence']:.2f})")
|
|
|
|
if output_dir:
|
|
imwrite_safe(os.path.join(output_dir, "cma_template_roi.png"), roi_img)
|
|
else:
|
|
print(" [TM] No CMA code candidates found in ROI text")
|
|
|
|
except Exception as e:
|
|
logger.error(f"ROI OCR failed: {e}")
|
|
print(f" [TM] ROI OCR failed: {e}")
|
|
|
|
return result
|
|
|
|
|
|
def process_cma_template_extraction(page_img, ocr_engine, output_dir=None):
|
|
"""Full workflow for template-based CMA extraction"""
|
|
print(" [TM] Starting template matching extraction...")
|
|
match_res = match_cma_template(page_img)
|
|
if not match_res:
|
|
print(" [TM] Template matching returned no result")
|
|
return {'success': False, 'code': None, 'confidence': 0.0, 'reason': 'No match result'}
|
|
|
|
print(f" [TM] Match confidence: {match_res['max_val']:.3f} (threshold: 0.4)")
|
|
if match_res['max_val'] < 0.4:
|
|
print(" [TM] Match confidence too low, skipping")
|
|
return {'success': False, 'code': None, 'confidence': 0.0, 'reason': f"Low match confidence: {match_res['max_val']:.3f}"}
|
|
|
|
x, y = match_res['match_center']
|
|
img_h, img_w = page_img.shape[:2]
|
|
print(f" [TM] Logo detected at center ({x}, {y}) in image {img_w}x{img_h}")
|
|
|
|
# Crop ROI: logo area + region BELOW it (CMA code is typically below the logo)
|
|
template_h, template_w = CMA_LOGO_TEMPLATE.shape[:2]
|
|
roi_x1 = max(0, x - template_w * 2)
|
|
roi_y1 = max(0, y - template_h)
|
|
roi_x2 = min(img_w, x + template_w * 3)
|
|
roi_y2 = min(img_h, y + template_h * 4) # Extend downward to capture code number
|
|
|
|
print(f" [TM] ROI: ({roi_x1}, {roi_y1}) -> ({roi_x2}, {roi_y2})")
|
|
roi_img = page_img[roi_y1:roi_y2, roi_x1:roi_x2]
|
|
|
|
if output_dir:
|
|
imwrite_safe(os.path.join(output_dir, "cma_template_match_roi.png"), roi_img)
|
|
|
|
return extract_cma_from_roi(roi_img, ocr_engine, output_dir)
|
|
|
|
|
|
|
|
# ============ Seal Processing Functions (from v_verify_logic.py) ============
|
|
|
|
def polar_unwarp(img, center, radius, start_theta, angular_extent):
|
|
"""
|
|
Polar Unwarp with Canvas Padding for Partial Seals
|
|
|
|
Extended version:
|
|
- Creates a padded canvas to handle partial seals (seals cut off at edges)
|
|
- Samples both inward (toward center) and outward (away from center)
|
|
- Uses white padding for areas outside the original image boundary
|
|
- This ensures we can always sample at the full radius even if seal is cut off
|
|
"""
|
|
if angular_extent <= 0: return None
|
|
|
|
strip_w = int(angular_extent * radius)
|
|
|
|
# Extended sampling range:
|
|
# - Inward: 100% of radius (toward center) - all the way to center
|
|
# - Outward: 20% beyond radius (away from center)
|
|
inward_range = int(radius * 0.85) # 向内到圆心
|
|
outward_range = int(radius * 0.2) # 向外20%
|
|
strip_h = inward_range + outward_range
|
|
|
|
if strip_w <= 0 or strip_h <= 0: return None
|
|
|
|
ch, cw = img.shape[:2]
|
|
|
|
# Calculate padding needed to ensure all sampling points are within bounds
|
|
# Maximum distance from center will be radius + outward_range
|
|
max_distance = radius + outward_range
|
|
|
|
# Calculate padding needed on each side
|
|
pad_top = max(0, max_distance - center[1])
|
|
pad_bottom = max(0, max_distance - (ch - center[1]))
|
|
pad_left = max(0, max_distance - center[0])
|
|
pad_right = max(0, max_distance - (cw - center[0]))
|
|
|
|
# Create padded canvas with white background
|
|
padded_h = ch + pad_top + pad_bottom
|
|
padded_w = cw + pad_left + pad_right
|
|
padded_canvas = np.ones((padded_h, padded_w, 3), dtype=np.uint8) * 255
|
|
|
|
# Place original image in center
|
|
padded_canvas[pad_top:pad_top+ch, pad_left:pad_left+cw] = img
|
|
|
|
# Adjust center position for padded canvas
|
|
center_padded = [center[0] + pad_left, center[1] + pad_top]
|
|
|
|
strip = np.zeros((strip_h, strip_w, 3), dtype=np.uint8)
|
|
|
|
for y in range(strip_h):
|
|
# Calculate radius at this row
|
|
# Start from radius + outward_range (outside)
|
|
# Move inward toward center
|
|
r = radius + outward_range - y
|
|
|
|
for x in range(strip_w):
|
|
theta = start_theta + angular_extent * (x / strip_w)
|
|
src_x = center_padded[0] + r * math.cos(theta)
|
|
src_y = center_padded[1] + r * math.sin(theta)
|
|
|
|
# Sample from padded canvas (all points should be within bounds now)
|
|
sx, sy = int(src_x), int(src_y)
|
|
if 0 <= sx < padded_w and 0 <= sy < padded_h:
|
|
strip[y, x] = padded_canvas[sy, sx]
|
|
else:
|
|
strip[y, x] = [255, 255, 255]
|
|
|
|
return strip
|
|
|
|
|
|
def calculate_precise_arc(polygons, center):
|
|
"""Calculate precise arc parameters for seal text"""
|
|
initial_clusters = []
|
|
gap_thresh = math.radians(15)
|
|
for poly in polygons:
|
|
thetas = sorted([math.atan2(p[1] - center[1], p[0] - center[0]) for i, p in enumerate(poly)])
|
|
if not thetas: continue
|
|
max_gap = 0
|
|
gap_idx = -1
|
|
for i in range(len(thetas)):
|
|
gap = (thetas[0] + 2*math.pi - thetas[i]) if i == len(thetas)-1 else (thetas[i+1]-thetas[i])
|
|
if gap > max_gap: max_gap = gap; gap_idx = i
|
|
if gap_idx == len(thetas) - 1:
|
|
t_arc = thetas
|
|
else:
|
|
t_arc = thetas[gap_idx+1:] + [t + 2*math.pi for t in thetas[:gap_idx+1]]
|
|
if not t_arc: continue
|
|
curr = [t_arc[0]]
|
|
for i in range(1, len(t_arc)):
|
|
if t_arc[i] - t_arc[i-1] > gap_thresh:
|
|
initial_clusters.append({'start': curr[0], 'end': curr[-1]})
|
|
curr = [t_arc[i]]
|
|
else:
|
|
curr.append(t_arc[i])
|
|
initial_clusters.append({'start': curr[0], 'end': curr[-1]})
|
|
if not initial_clusters: return 0.0, 0.0
|
|
initial_clusters.sort(key=lambda x: x['start'])
|
|
merged = []
|
|
merge_thresh = math.radians(45)
|
|
if initial_clusters:
|
|
curr = initial_clusters[0]
|
|
for i in range(1, len(initial_clusters)):
|
|
nxt = initial_clusters[i]
|
|
if nxt['start'] - curr['end'] < merge_thresh:
|
|
curr['end'] = max(curr['end'], nxt['end'])
|
|
else:
|
|
merged.append(curr)
|
|
curr = nxt
|
|
merged.append(curr)
|
|
candidates = []
|
|
for m in merged:
|
|
st, en = m['start'], m['end']
|
|
ex = en - st
|
|
mid = (st + en) / 2
|
|
dist_to_top = abs(((mid + math.pi/2 + math.pi) % (2*math.pi)) - math.pi)
|
|
weight = math.exp(-0.5 * (dist_to_top / (math.pi/2))**2)
|
|
candidates.append({'start': st, 'end': en, 'extent': ex, 'score': ex * weight})
|
|
candidates.sort(key=lambda x: x['score'], reverse=True)
|
|
best = candidates[0]
|
|
|
|
# FIX: Limit extent to max 350° to avoid overlap and distortion
|
|
# Extent > 360° causes severe image distortion in polar unwarping
|
|
MAX_EXTENT_DEG = 350.0
|
|
start_theta = best['start']
|
|
extent = best['end'] - best['start']
|
|
|
|
if math.degrees(extent) > MAX_EXTENT_DEG:
|
|
logger.warning(f"Arc extent {math.degrees(extent):.2f}° exceeds {MAX_EXTENT_DEG}°, clamping to avoid distortion")
|
|
extent = math.radians(MAX_EXTENT_DEG)
|
|
|
|
return start_theta, extent
|
|
|
|
|
|
def fit_circle_from_text_polygons(all_polygons):
|
|
"""
|
|
Fit circle from text polygons using least squares method.
|
|
|
|
Equation: (x - a)² + (y - b)² = r²
|
|
Expanded: x² + y² - 2ax - 2by + (a² + b² - r²) = 0
|
|
Let: c = a² + b² - r²
|
|
Then: x² + y² = 2ax + 2by - c
|
|
|
|
This is a linear system: [2x, 2y, -1] * [a, b, c]ᵀ = x² + y²
|
|
"""
|
|
if len(all_polygons) == 0:
|
|
return None, None, None
|
|
|
|
# Collect all points from polygons
|
|
points = []
|
|
for poly in all_polygons:
|
|
for p in poly:
|
|
points.append([float(p[0]), float(p[1])])
|
|
|
|
if len(points) < 5:
|
|
return None, None, None
|
|
|
|
points = np.array(points)
|
|
|
|
# Build linear system
|
|
# A * [a, b, c]ᵀ = b
|
|
A = np.column_stack([2 * points[:, 0], 2 * points[:, 1], -np.ones(len(points))])
|
|
b_vec = np.sum(points ** 2, axis=1)
|
|
|
|
try:
|
|
# Solve least squares
|
|
sol, residuals, rank, singular_values = np.linalg.lstsq(A, b_vec, rcond=None)
|
|
|
|
a, b, c = sol
|
|
center_x = a
|
|
center_y = b
|
|
radius = np.sqrt(a**2 + b**2 - c)
|
|
|
|
# Calculate fitting error (RMSE)
|
|
if len(residuals) > 0:
|
|
rmse = np.sqrt(residuals[0] / len(points))
|
|
else:
|
|
# Calculate manually
|
|
predicted = A @ sol
|
|
errors = predicted - b_vec
|
|
rmse = np.sqrt(np.mean(errors ** 2))
|
|
|
|
return (int(center_x), int(center_y)), int(radius), rmse
|
|
|
|
except Exception as e:
|
|
logger.error(f"Circle fitting failed: {e}")
|
|
return None, None, None
|
|
|
|
|
|
def detect_seal_center_dual_method(seal_crop, all_polygons):
|
|
"""
|
|
Dual strategy: Automatically select the best center detection method.
|
|
|
|
Strategy:
|
|
1. Try circle fitting
|
|
2. Check fitting quality (RMSE, offset distance)
|
|
3. If fitting quality is good → use fitted center
|
|
4. Otherwise → use crop center
|
|
|
|
Returns:
|
|
center: [x, y] - detected center
|
|
radius: int - detected radius
|
|
method: str - "crop_center" or "circle_fitting"
|
|
"""
|
|
ch, cw = seal_crop.shape[:2]
|
|
|
|
# Method 1: Crop center (default method)
|
|
center_crop = [cw // 2, ch // 2]
|
|
radius_crop = min(cw, ch) // 2 - 10
|
|
|
|
# Method 2: Circle fitting
|
|
center_fit, radius_fit, rmse = fit_circle_from_text_polygons(all_polygons)
|
|
|
|
if center_fit is None:
|
|
logger.info(" Circle fitting failed, using crop center")
|
|
return center_crop, radius_crop, "crop_center"
|
|
|
|
# Calculate offset between fitted center and crop center
|
|
offset = math.sqrt((center_fit[0] - center_crop[0])**2 +
|
|
(center_fit[1] - center_crop[1])**2)
|
|
offset_ratio = offset / min(cw, ch)
|
|
|
|
# Quality check criteria
|
|
# 1. RMSE should be low (good fit)
|
|
# 2. Offset should not be too large (center should be reasonable)
|
|
# 3. Need enough polygons for reliable fitting
|
|
rmse_threshold = 3000
|
|
offset_threshold = 0.2 # 20% of crop size
|
|
min_polygons = 3
|
|
|
|
is_fit_good = (
|
|
rmse < rmse_threshold and
|
|
offset_ratio < offset_threshold and
|
|
len(all_polygons) >= min_polygons
|
|
)
|
|
|
|
if is_fit_good:
|
|
logger.info(f" Using circle fitting: RMSE={rmse:.2f}, offset_ratio={offset_ratio:.2f}")
|
|
return center_fit, radius_fit, "circle_fitting"
|
|
else:
|
|
reasons = []
|
|
if rmse >= rmse_threshold:
|
|
reasons.append(f"RMSE too high ({rmse:.2f} >= {rmse_threshold})")
|
|
if offset_ratio >= offset_threshold:
|
|
reasons.append(f"offset too large ({offset_ratio:.2f} >= {offset_threshold})")
|
|
if len(all_polygons) < min_polygons:
|
|
reasons.append(f"not enough polygons ({len(all_polygons)} < {min_polygons})")
|
|
logger.info(f" Circle fitting unreliable ({', '.join(reasons)}), using crop center")
|
|
return center_crop, radius_crop, "crop_center"
|
|
|
|
|
|
def run_layout_detection(image_path):
|
|
"""Run Paddlex PP-DocLayout-L for layout analysis"""
|
|
global PADDLEX_AVAILABLE
|
|
|
|
if not PADDLEX_AVAILABLE:
|
|
logger.warning("PaddleX not available, skipping layout detection")
|
|
return []
|
|
|
|
try:
|
|
model = px.create_model("PP-DocLayout-L")
|
|
output = model.predict(image_path, batch_size=1)
|
|
all_regions = []
|
|
for res in output:
|
|
boxes = res.get('boxes', [])
|
|
for box in boxes:
|
|
label_name = box.get('label_name', box.get('label', 'unknown'))
|
|
score = box.get('score', 0.0)
|
|
coords = box.get('coordinate')
|
|
all_regions.append({
|
|
'label': label_name,
|
|
'score': score,
|
|
'box': coords
|
|
})
|
|
return all_regions
|
|
except Exception as e:
|
|
logger.error(f"Layout detection failed: {e}")
|
|
return []
|
|
|
|
|
|
def run_ocr_recognition(image_path, rec_model):
|
|
"""Run OCR recognition on unwarp seal image"""
|
|
try:
|
|
output = rec_model.predict(input=image_path, batch_size=1)
|
|
if output and len(output) > 0:
|
|
res = output[0]
|
|
text = res.get('rec_text', '').strip()
|
|
score = res.get('rec_score', 0.0)
|
|
return {
|
|
'text': text,
|
|
'score': score,
|
|
'success': len(text) > 0
|
|
}
|
|
else:
|
|
return {'text': '', 'score': 0.0, 'success': False}
|
|
except Exception as e:
|
|
logger.error(f"OCR recognition failed: {e}")
|
|
return {'text': '', 'score': 0.0, 'success': False}
|
|
|
|
|
|
def run_ocr_recognition_vl(image_path, vl_pipeline):
|
|
"""
|
|
Run OCR recognition using PaddleOCRVL on seal image.
|
|
|
|
Can be used on both unwarp images and crop images (backup mode).
|
|
|
|
Args:
|
|
image_path: Path to seal image (unwarp or crop)
|
|
vl_pipeline: Initialized PaddleOCRVL pipeline
|
|
|
|
Returns:
|
|
Dict with 'text', 'score', 'success' keys
|
|
"""
|
|
try:
|
|
# Create temp output directory for VL results
|
|
temp_output_dir = Path("temp_paddleocr_vl")
|
|
temp_output_dir.mkdir(exist_ok=True)
|
|
|
|
# Run prediction
|
|
output = vl_pipeline.predict(image_path, batch_size=1)
|
|
|
|
if output and len(output) > 0:
|
|
res = output[0]
|
|
|
|
# Save JSON to extract text
|
|
res.save_to_json(save_path=str(temp_output_dir))
|
|
|
|
# Read JSON to find seal text
|
|
json_file = temp_output_dir / f"{Path(image_path).stem}_res.json"
|
|
|
|
if json_file.exists():
|
|
with open(json_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# Find seal block and extract content
|
|
for block in data.get('parsing_res_list', []):
|
|
if block.get('block_label') == 'seal':
|
|
text = block.get('block_content', '').strip()
|
|
# Clean up temp files
|
|
import shutil
|
|
if temp_output_dir.exists():
|
|
shutil.rmtree(temp_output_dir, ignore_errors=True)
|
|
|
|
return {
|
|
'text': text,
|
|
'score': 1.0, # PaddleOCRVL doesn't provide confidence score
|
|
'success': len(text) > 0
|
|
}
|
|
|
|
# Clean up temp files
|
|
import shutil
|
|
if temp_output_dir.exists():
|
|
shutil.rmtree(temp_output_dir, ignore_errors=True)
|
|
|
|
return {'text': '', 'score': 0.0, 'success': False}
|
|
else:
|
|
return {'text': '', 'score': 0.0, 'success': False}
|
|
|
|
except Exception as e:
|
|
logger.error(f"PaddleOCRVL recognition failed: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return {'text': '', 'score': 0.0, 'success': False}
|
|
|
|
|
|
def extract_seals_and_institutions(page_img, output_dir, ocr_model="ppocr_v5", vl_pipeline=None):
|
|
"""
|
|
Extract seals and recognize institution names from page image.
|
|
|
|
Args:
|
|
page_img: Input page image
|
|
output_dir: Directory to save intermediate results
|
|
ocr_model: OCR model to use ("ppocr_v5" or "paddleocr_vl")
|
|
vl_pipeline: PaddleOCRVL pipeline (required if ocr_model="paddleocr_vl")
|
|
|
|
Returns:
|
|
Dict with:
|
|
- 'seals': list of seal results
|
|
- 'institutions': list of recognized institution names
|
|
- 'processing_time': time taken
|
|
"""
|
|
start_time = time.time()
|
|
result = {
|
|
'seals': [],
|
|
'institutions': [],
|
|
'processing_time': 0.0
|
|
}
|
|
|
|
# Validate input image
|
|
if page_img is None:
|
|
logger.error("Input page_img is None")
|
|
result['processing_time'] = time.time() - start_time
|
|
return result
|
|
|
|
if not isinstance(page_img, np.ndarray):
|
|
logger.error(f"Input page_img is not numpy array, type: {type(page_img)}")
|
|
result['processing_time'] = time.time() - start_time
|
|
return result
|
|
|
|
if page_img.size == 0:
|
|
logger.error("Input page_img is empty")
|
|
result['processing_time'] = time.time() - start_time
|
|
return result
|
|
|
|
logger.info(f"Input image shape: {page_img.shape}, dtype: {page_img.dtype}")
|
|
|
|
# Create output directory if it doesn't exist
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Save page image
|
|
doc_path = os.path.join(output_dir, "doc_page.png")
|
|
try:
|
|
success = imwrite_safe(doc_path, page_img)
|
|
if not success:
|
|
logger.error(f"imwrite_safe returned False for {doc_path}")
|
|
# Try alternative save method using PIL
|
|
try:
|
|
from PIL import Image
|
|
img_rgb = cv2.cvtColor(page_img, cv2.COLOR_BGR2RGB)
|
|
pil_img = Image.fromarray(img_rgb)
|
|
pil_img.save(doc_path)
|
|
logger.info(f"Saved using PIL as fallback: {doc_path}")
|
|
|
|
# Verify PIL save worked
|
|
if not os.path.exists(doc_path):
|
|
logger.error(f"PIL save also failed, file not found: {doc_path}")
|
|
result['processing_time'] = time.time() - start_time
|
|
return result
|
|
except Exception as pil_e:
|
|
logger.error(f"PIL fallback also failed: {pil_e}")
|
|
result['processing_time'] = time.time() - start_time
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Failed to save page image: {e}")
|
|
result['processing_time'] = time.time() - start_time
|
|
return result
|
|
|
|
# Verify file exists before proceeding
|
|
if not os.path.exists(doc_path):
|
|
logger.error(f"Page image file not found after save: {doc_path}")
|
|
result['processing_time'] = time.time() - start_time
|
|
return result
|
|
|
|
# Run layout detection
|
|
logger.info("Running layout detection...")
|
|
all_regions = run_layout_detection(doc_path)
|
|
|
|
# Extract seal boxes
|
|
seal_boxes = []
|
|
page_viz = page_img.copy()
|
|
for reg in all_regions:
|
|
box = reg.get('box')
|
|
label = reg.get('label')
|
|
score = reg.get('score', 0.0)
|
|
is_seal = (label == 'seal')
|
|
|
|
if score > 0.2:
|
|
x1, y1, x2, y2 = [int(v) for v in box]
|
|
color = (0, 0, 255) if is_seal else (0, 255, 0)
|
|
cv2.rectangle(page_viz, (x1, y1), (x2, y2), color, 2)
|
|
|
|
if is_seal:
|
|
seal_boxes.append(box)
|
|
|
|
imwrite_safe(os.path.join(output_dir, "doc_layout_viz.png"), page_viz)
|
|
|
|
if not seal_boxes:
|
|
logger.warning("No seals detected")
|
|
result['processing_time'] = time.time() - start_time
|
|
return result
|
|
|
|
# Process each seal
|
|
logger.info(f"Processing {len(seal_boxes)} detected seals...")
|
|
det_model = SealTextDetection(model_name="PP-OCRv4_server_seal_det")
|
|
|
|
# Initialize OCR model based on selection
|
|
if ocr_model == "paddleocr_vl":
|
|
if not PADDLEOCRVL_AVAILABLE:
|
|
logger.error("PaddleOCRVL requested but not available. Falling back to PP-OCRv5.")
|
|
ocr_model = "ppocr_v5"
|
|
rec_model = TextRecognition(model_name="PP-OCRv5_server_rec")
|
|
elif vl_pipeline is None:
|
|
logger.error("PaddleOCRVL requested but vl_pipeline is None. Falling back to PP-OCRv5.")
|
|
ocr_model = "ppocr_v5"
|
|
rec_model = TextRecognition(model_name="PP-OCRv5_server_rec")
|
|
else:
|
|
logger.info("Using PaddleOCRVL for seal text recognition")
|
|
rec_model = None # Not used for PaddleOCRVL
|
|
else:
|
|
logger.info("Using PP-OCRv5_server_rec for seal text recognition")
|
|
rec_model = TextRecognition(model_name="PP-OCRv5_server_rec")
|
|
|
|
for i, box in enumerate(seal_boxes):
|
|
x1, y1, x2, y2 = [int(v) for v in box]
|
|
pad = 40
|
|
y1_p, y2_p = max(0, y1-pad), min(page_img.shape[0], y2+pad)
|
|
x1_p, x2_p = max(0, x1-pad), min(page_img.shape[1], x2+pad)
|
|
seal_crop = page_img[y1_p:y2_p, x1_p:x2_p]
|
|
|
|
# Validate crop
|
|
if seal_crop.size == 0 or seal_crop.shape[0] == 0 or seal_crop.shape[1] == 0:
|
|
logger.warning(f"Invalid seal crop dimensions: {seal_crop.shape}, skipping seal {i}")
|
|
continue
|
|
|
|
crop_path = os.path.join(output_dir, f"seal_crop_{i}.png")
|
|
success = imwrite_safe(crop_path, seal_crop)
|
|
if not success:
|
|
# Try PIL fallback
|
|
try:
|
|
from PIL import Image
|
|
crop_rgb = cv2.cvtColor(seal_crop, cv2.COLOR_BGR2RGB)
|
|
pil_img = Image.fromarray(crop_rgb)
|
|
pil_img.save(crop_path)
|
|
logger.info(f"Saved seal crop using PIL fallback: {crop_path}")
|
|
except Exception as pil_e:
|
|
logger.error(f"Failed to save seal crop to {crop_path}: {pil_e}, skipping seal {i}")
|
|
continue
|
|
|
|
# Verify file exists
|
|
if not os.path.exists(crop_path):
|
|
logger.error(f"Seal crop file not found after save: {crop_path}, skipping seal {i}")
|
|
continue
|
|
|
|
# Detect text polygons
|
|
output = det_model.predict(crop_path, batch_size=1)
|
|
all_polygons = []
|
|
for res in output:
|
|
polys = res.get('dt_polys') if isinstance(res, dict) else None
|
|
if polys:
|
|
all_polygons.extend(polys)
|
|
|
|
ch, cw = seal_crop.shape[:2]
|
|
|
|
# ============ DUAL STRATEGY: Choose best center detection method ============
|
|
logger.info(f" Seal #{i} Geometry:")
|
|
logger.info(f" - Crop size: {cw}x{ch}")
|
|
logger.info(f" - Text polygons detected: {len(all_polygons)}")
|
|
|
|
center, radius, method_used = detect_seal_center_dual_method(seal_crop, all_polygons)
|
|
logger.info(f" - Method used: {method_used}")
|
|
logger.info(f" - Center: ({center[0]}, {center[1]})")
|
|
logger.info(f" - Radius: {radius}")
|
|
|
|
# ============ INSUFFICIENT POLYGONS CHECK ============
|
|
# If too few text polygons detected, polar unwarping will likely fail
|
|
# Skip directly to PaddleOCRVL backup in this case
|
|
MIN_POLYGONS_FOR_UNWARP = 3
|
|
if len(all_polygons) < MIN_POLYGONS_FOR_UNWARP:
|
|
logger.warning(f" Seal #{i}: Only {len(all_polygons)} text polygons detected (< {MIN_POLYGONS_FOR_UNWARP})")
|
|
logger.warning(f" Seal #{i}: Skipping polar unwarping (insufficient polygon data)")
|
|
logger.info(f" Seal #{i}: Using PaddleOCRVL backup instead")
|
|
|
|
# Save crop image
|
|
imwrite_safe(crop_path, seal_crop)
|
|
|
|
# Use PaddleOCRVL directly on crop (no unwarp)
|
|
if vl_pipeline is not None and PADDLEOCRVL_AVAILABLE:
|
|
ocr_result = run_ocr_recognition_vl(crop_path, vl_pipeline)
|
|
logger.info(f" Seal #{i} PaddleOCRVL Result (direct crop):")
|
|
logger.info(f" - Text: '{ocr_result['text']}'")
|
|
logger.info(f" - Score: {ocr_result['score']:.4f}")
|
|
logger.info(f" - Success: {ocr_result['success']}")
|
|
logger.info(f" - ** Used PaddleOCRVL (insufficient polygons for unwarping) **")
|
|
|
|
# Create debug info without unwarp
|
|
seal_data = {
|
|
'index': i,
|
|
'box': box,
|
|
'crop_path': Path(crop_path).name,
|
|
'unwarp_path': None, # No unwarp performed
|
|
'marked_path': None, # No marked image
|
|
'polar_viz_path': None, # No polar visualization
|
|
'text': ocr_result['text'],
|
|
'confidence': float(ocr_result['score']),
|
|
'success': bool(ocr_result['success']),
|
|
'method_used': f'{method_used}_skip_unwarp',
|
|
'used_fallback': True,
|
|
'debug_info': {
|
|
'center': center,
|
|
'radius': radius,
|
|
'start_theta_deg': None,
|
|
'extent_deg': None,
|
|
'num_polygons': len(all_polygons),
|
|
'crop_size': (cw, ch),
|
|
'unwarp_size': None,
|
|
'skip_reason': f'Insufficient polygons ({len(all_polygons)} < {MIN_POLYGONS_FOR_UNWARP})'
|
|
}
|
|
}
|
|
result['seals'].append(seal_data)
|
|
|
|
if ocr_result['success']:
|
|
# Clean the institution name before adding
|
|
cleaned_name = clean_institution_name(ocr_result['text'])
|
|
result['institutions'].append(cleaned_name)
|
|
logger.info(f" ✓ Seal #{i} SUCCESS: {cleaned_name[:50]}... (confidence: {ocr_result['score']:.4f})")
|
|
else:
|
|
logger.warning(f" ✗ Seal #{i} FAILED: Could not extract institution name")
|
|
|
|
continue # Skip to next seal
|
|
else:
|
|
logger.error(f" Seal #{i}: PaddleOCRVL not available, cannot extract text")
|
|
seal_data = {
|
|
'index': i,
|
|
'box': box,
|
|
'crop_path': Path(crop_path).name,
|
|
'unwarp_path': None,
|
|
'marked_path': None,
|
|
'polar_viz_path': None,
|
|
'text': '',
|
|
'confidence': 0.0,
|
|
'success': False,
|
|
'method_used': f'{method_used}_skip_unwarp',
|
|
'used_fallback': True,
|
|
'debug_info': {
|
|
'center': center,
|
|
'radius': radius,
|
|
'start_theta_deg': None,
|
|
'extent_deg': None,
|
|
'num_polygons': len(all_polygons),
|
|
'crop_size': (cw, ch),
|
|
'unwarp_size': None,
|
|
'skip_reason': f'Insufficient polygons and no PaddleOCRVL backup'
|
|
}
|
|
}
|
|
result['seals'].append(seal_data)
|
|
continue
|
|
|
|
# Calculate arc and unwarp
|
|
start_theta, extent = calculate_precise_arc(all_polygons, center)
|
|
logger.info(f" Seal #{i} Arc Parameters:")
|
|
logger.info(f" - Start theta: {math.degrees(start_theta):.2f}°")
|
|
logger.info(f" - Extent: {math.degrees(extent):.2f}° ({math.degrees(extent)*radius:.1f} pixels width)")
|
|
|
|
marked = seal_crop.copy()
|
|
|
|
# Draw all text polygons in green
|
|
for p in all_polygons:
|
|
cv2.polylines(marked, [np.array(p, dtype=np.int32)], True, (0, 255, 0), 2)
|
|
|
|
# Draw center point (yellow cross)
|
|
center_x, center_y = int(center[0]), int(center[1])
|
|
cv2.drawMarker(marked, (center_x, center_y), (0, 255, 255),
|
|
markerType=cv2.MARKER_CROSS, markerSize=20, thickness=2)
|
|
cv2.circle(marked, (center_x, center_y), 5, (0, 255, 255), -1)
|
|
|
|
# Draw estimated radius circle (cyan)
|
|
cv2.circle(marked, (center_x, center_y), radius, (255, 255, 0), 2)
|
|
|
|
# Draw polar sampling visualization
|
|
polar_viz = seal_crop.copy()
|
|
cv2.drawMarker(polar_viz, (center_x, center_y), (0, 255, 255),
|
|
markerType=cv2.MARKER_CROSS, markerSize=20, thickness=2)
|
|
cv2.circle(polar_viz, (center_x, center_y), radius, (255, 255, 0), 2)
|
|
|
|
unwarp_path = os.path.join(output_dir, f"seal_unwarp_{i}.png")
|
|
unwarp = None
|
|
used_fallback = False
|
|
|
|
if extent > 0:
|
|
logger.info(f" Seal #{i}: Performing polar unwarping with detected text polygons...")
|
|
unwarp = polar_unwarp(seal_crop, center, radius, start_theta, extent)
|
|
if unwarp is not None:
|
|
imwrite_safe(unwarp_path, unwarp)
|
|
logger.info(f" - Unwarp size: {unwarp.shape[1]}x{unwarp.shape[0]}")
|
|
|
|
def draw_line(m, theta, color):
|
|
x = center[0] + radius * math.cos(theta)
|
|
y = center[1] + radius * math.sin(theta)
|
|
cv2.line(m, (int(center[0]), int(center[1])), (int(x), int(y)), color, 2)
|
|
|
|
# Draw start angle line (blue)
|
|
draw_line(marked, start_theta, (255, 0, 0))
|
|
# Draw end angle line (red)
|
|
draw_line(marked, start_theta + extent, (0, 0, 255))
|
|
|
|
# Draw sampling points on polar_viz (show where polar samples come from)
|
|
num_sample_points = min(50, int(extent * radius)) # Show up to 50 sample points
|
|
for r_idx in range(5): # 5 different radii
|
|
r = radius - r_idx * (radius * 0.6 / 5)
|
|
for theta_idx in range(num_sample_points):
|
|
theta = start_theta + extent * (theta_idx / num_sample_points)
|
|
src_x = center[0] + r * math.cos(theta)
|
|
src_y = center[1] + r * math.sin(theta)
|
|
if 0 <= src_x < cw and 0 <= src_y < ch:
|
|
cv2.circle(polar_viz, (int(src_x), int(src_y)), 1, (255, 0, 255), -1)
|
|
|
|
# Save polar visualization
|
|
polar_viz_path = os.path.join(output_dir, f"seal_polar_viz_{i}.png")
|
|
imwrite_safe(polar_viz_path, polar_viz)
|
|
logger.info(f" - Polar visualization saved: seal_polar_viz_{i}.png")
|
|
else:
|
|
logger.warning(f" Seal #{i}: Polar unwarp returned None")
|
|
|
|
# ============ FALLBACK: Use fixed angle range when no text detected ============
|
|
if unwarp is None and extent <= 0 and len(all_polygons) == 0:
|
|
logger.warning(f" Seal #{i}: No text polygons detected, using fallback angle range (7:30 to 4:30 clockwise)")
|
|
used_fallback = True
|
|
|
|
# 7:30 direction (left-bottom) to 4:30 direction (right-bottom) clockwise
|
|
# In standard math angle (0 = 3 o'clock, CCW):
|
|
# 7:30 = 225 degrees = 3.927 rad
|
|
# 4:30 = 135 degrees = 2.356 rad
|
|
# Clockwise from 7:30 to 4:30 covers 270 degrees
|
|
# We start at 4:30 (135 degrees) and go counter-clockwise 270 degrees
|
|
fallback_start_theta = math.radians(135) # 4:30 position
|
|
fallback_extent = math.radians(270) # 270 degree coverage
|
|
|
|
logger.info(f" Seal #{i}: Fallback - Start: 135.00° (4:30), Extent: 270.00°")
|
|
|
|
unwarp = polar_unwarp(seal_crop, center, radius, fallback_start_theta, fallback_extent)
|
|
if unwarp is not None:
|
|
imwrite_safe(unwarp_path, unwarp)
|
|
logger.info(f" - Fallback unwarp size: {unwarp.shape[1]}x{unwarp.shape[0]}")
|
|
|
|
# Update start_theta and extent for visualization
|
|
start_theta = fallback_start_theta
|
|
extent = fallback_extent
|
|
|
|
def draw_line(m, theta, color):
|
|
x = center[0] + radius * math.cos(theta)
|
|
y = center[1] + radius * math.sin(theta)
|
|
cv2.line(m, (int(center[0]), int(center[1])), (int(x), int(y)), color, 2)
|
|
|
|
# Draw start angle line (blue) - 4:30 position
|
|
draw_line(marked, start_theta, (255, 0, 0))
|
|
# Draw end angle line (red) - 7:30 position
|
|
draw_line(marked, start_theta + extent, (0, 0, 255))
|
|
|
|
# Draw sampling points
|
|
num_sample_points = 50
|
|
for r_idx in range(5):
|
|
r = radius - r_idx * (radius * 0.6 / 5)
|
|
for theta_idx in range(num_sample_points):
|
|
theta = start_theta + extent * (theta_idx / num_sample_points)
|
|
src_x = center[0] + r * math.cos(theta)
|
|
src_y = center[1] + r * math.sin(theta)
|
|
if 0 <= src_x < cw and 0 <= src_y < ch:
|
|
cv2.circle(polar_viz, (int(src_x), int(src_y)), 1, (255, 0, 255), -1)
|
|
|
|
polar_viz_path = os.path.join(output_dir, f"seal_polar_viz_{i}.png")
|
|
imwrite_safe(polar_viz_path, polar_viz)
|
|
logger.info(f" - Fallback polar visualization saved: seal_polar_viz_{i}.png")
|
|
else:
|
|
logger.warning(f" Seal #{i}: Fallback polar unwarp also returned None")
|
|
|
|
marked_path = os.path.join(output_dir, f"seal_marked_{i}.png")
|
|
imwrite_safe(marked_path, marked)
|
|
|
|
# OCR recognition with double verification
|
|
ocr_result = {'text': '', 'score': 0.0, 'success': False}
|
|
ocr_method_used = method_used
|
|
|
|
if unwarp is not None:
|
|
# Standard path: Recognize unwarp image
|
|
method_str = "FALLBACK" if used_fallback else "Standard"
|
|
logger.info(f" Seal #{i}: Running OCR ({method_str}, model={ocr_model}) on unwarp image...")
|
|
|
|
if ocr_model == "paddleocr_vl":
|
|
ocr_result = run_ocr_recognition_vl(unwarp_path, vl_pipeline)
|
|
else:
|
|
ocr_result = run_ocr_recognition(unwarp_path, rec_model)
|
|
|
|
ocr_method_used = f"{method_used}_unwarp"
|
|
logger.info(f" Seal #{i} OCR Result (unwarp):")
|
|
logger.info(f" - Text: '{ocr_result['text']}'")
|
|
logger.info(f" - Score: {ocr_result['score']:.4f}")
|
|
logger.info(f" - Success: {ocr_result['success']}")
|
|
logger.info(f" - Text length: {len(ocr_result['text'])} chars")
|
|
if used_fallback:
|
|
logger.info(f" - ** Used fallback angle range (7:30 to 4:30) **")
|
|
|
|
# ============ DOUBLE VERIFICATION: Try PaddleOCRVL on crop if unwarp OCR fails ============
|
|
# If unwarp OCR failed (empty text or success=False), try PaddleOCRVL backup on crop
|
|
if (not ocr_result['success'] or len(ocr_result['text'].strip()) == 0) and vl_pipeline is not None and PADDLEOCRVL_AVAILABLE:
|
|
logger.warning(f" Seal #{i}: Unwarp OCR failed (empty result), trying PaddleOCRVL backup on crop image")
|
|
seal_crop_path = os.path.join(output_dir, f"seal_crop_{i}.png")
|
|
backup_result = run_ocr_recognition_vl(seal_crop_path, vl_pipeline)
|
|
|
|
logger.info(f" Seal #{i} PaddleOCRVL Backup Result (crop):")
|
|
logger.info(f" - Text: '{backup_result['text']}'")
|
|
logger.info(f" - Score: {backup_result['score']:.4f}")
|
|
logger.info(f" - Success: {backup_result['success']}")
|
|
logger.info(f" - Text length: {len(backup_result['text'])} chars")
|
|
|
|
# Use backup result if it's better (non-empty text)
|
|
if backup_result['success'] and len(backup_result['text'].strip()) > 0:
|
|
logger.info(f" Seal #{i}: ** Using PaddleOCRVL backup result (unwarp failed) **")
|
|
ocr_result = backup_result
|
|
ocr_method_used = f"{method_used}_crop_backup"
|
|
else:
|
|
logger.warning(f" Seal #{i}: ** Both unwarp and crop OCR failed **")
|
|
else:
|
|
# ============ BACKUP: Use PaddleOCRVL directly on seal crop ============
|
|
logger.warning(f" Seal #{i}: No unwarp image available (polar unwarp failed)")
|
|
|
|
if vl_pipeline is not None and PADDLEOCRVL_AVAILABLE:
|
|
logger.info(f" Seal #{i}: Using PaddleOCRVL backup - directly recognize seal crop image")
|
|
seal_crop_path = os.path.join(output_dir, f"seal_crop_{i}.png")
|
|
ocr_result = run_ocr_recognition_vl(seal_crop_path, vl_pipeline)
|
|
ocr_method_used = f"{method_used}_crop_backup"
|
|
logger.info(f" Seal #{i} PaddleOCRVL Backup Result:")
|
|
logger.info(f" - Text: '{ocr_result['text']}'")
|
|
logger.info(f" - Score: {ocr_result['score']:.4f}")
|
|
logger.info(f" - Success: {ocr_result['success']}")
|
|
logger.info(f" - Text length: {len(ocr_result['text'])} chars")
|
|
logger.info(f" - ** Used PaddleOCRVL backup (direct crop recognition) **")
|
|
else:
|
|
logger.warning(f" Seal #{i}: No backup available (vl_pipeline=None or PaddleOCRVL not installed), skipping OCR")
|
|
|
|
seal_data = {
|
|
'index': int(i),
|
|
'box': [float(v) for v in box],
|
|
'crop_path': f"seal_crop_{i}.png",
|
|
'unwarp_path': f"seal_unwarp_{i}.png" if unwarp is not None else None,
|
|
'marked_path': f"seal_marked_{i}.png",
|
|
'polar_viz_path': f"seal_polar_viz_{i}.png" if unwarp is not None else None,
|
|
'text': ocr_result['text'],
|
|
'confidence': float(ocr_result['score']),
|
|
'success': bool(ocr_result['success']),
|
|
'method_used': ocr_method_used, # Track actual OCR method used
|
|
'used_fallback': used_fallback, # Track if fallback was used
|
|
'debug_info': {
|
|
'center': center,
|
|
'radius': radius,
|
|
'start_theta_deg': float(math.degrees(start_theta)),
|
|
'extent_deg': float(math.degrees(extent)),
|
|
'num_polygons': len(all_polygons),
|
|
'crop_size': (cw, ch),
|
|
'unwarp_size': (unwarp.shape[1], unwarp.shape[0]) if unwarp is not None else None
|
|
}
|
|
}
|
|
result['seals'].append(seal_data)
|
|
|
|
if ocr_result['success']:
|
|
# Clean the institution name before adding
|
|
cleaned_name = clean_institution_name(ocr_result['text'])
|
|
result['institutions'].append(cleaned_name)
|
|
logger.info(f" ✓ Seal #{i} SUCCESS: {cleaned_name[:50]}... (confidence: {ocr_result['score']:.4f})")
|
|
else:
|
|
logger.warning(f" ✗ Seal #{i} FAILED: Could not extract institution name")
|
|
|
|
result['processing_time'] = time.time() - start_time
|
|
return result
|
|
|
|
|
|
# ============ Text Cleaning Functions ============
|
|
|
|
def clean_institution_name(text: str) -> str:
|
|
"""
|
|
Clean extracted institution name by removing unwanted suffixes.
|
|
|
|
Removes common seal-related text that is not part of the institution name:
|
|
- 检验检测专用章
|
|
- 检验检测专用
|
|
- 专用章
|
|
- 及其他变体
|
|
|
|
Args:
|
|
text: Raw extracted institution name
|
|
|
|
Returns:
|
|
Cleaned institution name
|
|
"""
|
|
if not text:
|
|
return text
|
|
|
|
# Define patterns to remove (order matters: most specific first)
|
|
patterns_to_remove = [
|
|
'检验检测专用章',
|
|
'检验检测专用',
|
|
'检测专用章',
|
|
'检验专用章',
|
|
'专用章',
|
|
'(检验检测)',
|
|
'(检验检测)',
|
|
'【检验检测】',
|
|
'[检验检测]',
|
|
]
|
|
|
|
cleaned = text
|
|
for pattern in patterns_to_remove:
|
|
if pattern in cleaned:
|
|
cleaned = cleaned.replace(pattern, '')
|
|
logger.debug(f"Removed pattern '{pattern}' from institution name")
|
|
|
|
# Strip whitespace
|
|
cleaned = cleaned.strip()
|
|
|
|
# Log if cleaning occurred
|
|
if cleaned != text:
|
|
logger.info(f"Cleaned institution name: '{text}' → '{cleaned}'")
|
|
|
|
return cleaned
|
|
|
|
|
|
# ============ Similarity and Matching Functions ============
|
|
|
|
def calculate_similarity(str1: str, str2: str) -> float:
|
|
"""Calculate similarity percentage using Levenshtein distance"""
|
|
if not str1 or not str2:
|
|
return 0.0
|
|
max_len = max(len(str1), len(str2))
|
|
if max_len == 0:
|
|
return 100.0
|
|
edit_dist = levenshtein_distance(str1, str2)
|
|
similarity = (1 - edit_dist / max_len) * 100
|
|
return round(similarity, 2)
|
|
|
|
|
|
def classify_match(extracted: Optional[str], expected: str) -> Dict[str, Any]:
|
|
"""Classify match type between extracted and expected values"""
|
|
if extracted is None:
|
|
return {
|
|
'match_type': 'no_match',
|
|
'similarity': 0.0,
|
|
'edit_distance': len(expected)
|
|
}
|
|
|
|
similarity = calculate_similarity(extracted, expected)
|
|
edit_dist = levenshtein_distance(extracted, expected)
|
|
|
|
if similarity == 100.0:
|
|
match_type = 'exact'
|
|
elif similarity >= SIMILARITY_THRESHOLD:
|
|
match_type = 'partial'
|
|
else:
|
|
match_type = 'no_match'
|
|
|
|
return {
|
|
'match_type': match_type,
|
|
'similarity': similarity,
|
|
'edit_distance': edit_dist
|
|
}
|
|
|
|
|
|
# ============ PDF Processing Functions ============
|
|
|
|
def extract_pdf_page(pdf_path: str, page_num: int = 0) -> Optional[np.ndarray]:
|
|
"""Extract a page from PDF as image"""
|
|
try:
|
|
doc = fitz.open(pdf_path)
|
|
page = doc.load_page(page_num)
|
|
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))
|
|
img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n)
|
|
|
|
# Convert to BGR format for OpenCV
|
|
if pix.n == 4: # RGBA
|
|
img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR)
|
|
elif pix.n == 3: # RGB
|
|
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
|
|
elif pix.n == 1: # Grayscale
|
|
img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
|
|
else:
|
|
logger.warning(f"Unexpected number of channels: {pix.n}")
|
|
# Assume RGB and convert
|
|
if pix.n >= 3:
|
|
img = img[:, :, :3]
|
|
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
|
|
|
|
return img
|
|
except Exception as e:
|
|
logger.error(f"Failed to extract page from {pdf_path}: {e}")
|
|
return None
|
|
|
|
|
|
def process_single_pdf(pdf_name: str, expected_cma: str, expected_inst: str,
|
|
pdf_dir: Path, output_dir: Path, ocr_engine,
|
|
ocr_model="ppocr_v5", vl_pipeline=None) -> Dict[str, Any]:
|
|
"""
|
|
Process a single PDF for CMA and institution extraction.
|
|
|
|
Args:
|
|
pdf_name: Name of PDF file
|
|
expected_cma: Expected CMA code from ground truth
|
|
expected_inst: Expected institution name from ground truth
|
|
pdf_dir: Directory containing PDFs
|
|
output_dir: Output directory for results
|
|
ocr_engine: Global PaddleOCR instance (not currently used)
|
|
ocr_model: OCR model to use ("ppocr_v5" or "paddleocr_vl")
|
|
vl_pipeline: PaddleOCRVL pipeline (required if ocr_model="paddleocr_vl")
|
|
|
|
Returns:
|
|
Result dictionary with extraction and comparison data
|
|
"""
|
|
pdf_path = pdf_dir / pdf_name
|
|
pdf_output_dir = output_dir / pdf_name
|
|
|
|
result = {
|
|
'pdf_name': pdf_name,
|
|
'expected': {
|
|
'cma': expected_cma,
|
|
'institution': expected_inst
|
|
},
|
|
'extracted': {
|
|
'cma': None,
|
|
'institution': None,
|
|
'cma_confidence': 0.0,
|
|
'cma_success': False,
|
|
'institutions_from_seals': []
|
|
},
|
|
'comparison': {
|
|
'cma': {},
|
|
'institution': {}
|
|
},
|
|
'performance': {
|
|
'total_time': 0.0,
|
|
'cma_time': 0.0,
|
|
'seal_time': 0.0
|
|
},
|
|
'seal_results': [],
|
|
'status': 'success',
|
|
'error': None,
|
|
'file_size': 0
|
|
}
|
|
|
|
# Check file exists
|
|
if not pdf_path.exists():
|
|
result['status'] = 'file_not_found'
|
|
result['error'] = f"PDF file not found: {pdf_path}"
|
|
logger.warning(result['error'])
|
|
return result
|
|
|
|
result['file_size'] = pdf_path.stat().st_size
|
|
|
|
# Clean output directory to ensure fresh processing
|
|
if pdf_output_dir.exists():
|
|
import shutil
|
|
try:
|
|
shutil.rmtree(pdf_output_dir)
|
|
logger.info(f"Cleaned existing output directory: {pdf_output_dir}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to clean output directory: {e}")
|
|
|
|
# Create fresh output directory
|
|
pdf_output_dir.mkdir(parents=True, exist_ok=True)
|
|
total_start = time.time()
|
|
|
|
# Extract page
|
|
logger.info(f"Extracting page 1 from {pdf_name}...")
|
|
page_img = extract_pdf_page(str(pdf_path), page_num=0)
|
|
if page_img is None:
|
|
result['status'] = 'extraction_failed'
|
|
result['error'] = "Failed to extract page from PDF"
|
|
return result
|
|
|
|
# Extract CMA code
|
|
logger.info(f"Running CMA extraction on {pdf_name}...")
|
|
print(f" + Running CMA extraction...")
|
|
cma_start = time.time()
|
|
cma_result = extract_cma_code_fullpage(page_img, ocr_engine, output_dir=str(pdf_output_dir))
|
|
print(f" + Primary CMA result: success={cma_result['success']}, code={cma_result.get('code')}, conf={cma_result.get('confidence', 0):.2f}")
|
|
|
|
# Fallback to template matching if primary extraction failed or low confidence
|
|
if not cma_result['success'] or cma_result.get('confidence', 0) < 0.6:
|
|
print(f" + Primary CMA extraction failed/low confidence. Trying template matching fallback...")
|
|
logger.info(f"Primary CMA extraction low confidence ({cma_result.get('confidence', 0):.2f}). Trying template matching fallback...")
|
|
template_res = process_cma_template_extraction(page_img, ocr_engine, output_dir=str(pdf_output_dir))
|
|
if template_res['success']:
|
|
print(f" + Template matching fallback SUCCESS: {template_res['code']} (conf: {template_res['confidence']:.2f})")
|
|
logger.info(f"Template matching fallback SUCCESS: {template_res['code']} (conf: {template_res['confidence']:.2f})")
|
|
cma_result = template_res
|
|
cma_result['extraction_method'] = 'template_matching'
|
|
else:
|
|
print(f" + Template matching fallback also failed: {template_res.get('reason', 'no candidate')}")
|
|
logger.info(f"Template matching fallback also failed: {template_res.get('reason', 'no candidate')}")
|
|
cma_result['extraction_method'] = 'robust_ocr'
|
|
else:
|
|
cma_result['extraction_method'] = 'robust_ocr'
|
|
|
|
|
|
result['performance']['cma_time'] = time.time() - cma_start
|
|
|
|
result['extracted']['cma'] = cma_result['code']
|
|
result['extracted']['cma_confidence'] = cma_result['confidence']
|
|
result['extracted']['cma_success'] = cma_result['success']
|
|
result['extracted']['cma_method'] = cma_result['extraction_method']
|
|
|
|
# Compare CMA
|
|
if expected_cma == "无":
|
|
result['comparison']['cma']['notes'] = "Ground truth marked as 'None'"
|
|
else:
|
|
comparison = classify_match(cma_result['code'], expected_cma)
|
|
result['comparison']['cma'] = comparison
|
|
|
|
# Extract seals and institutions
|
|
logger.info(f"Running seal extraction on {pdf_name}...")
|
|
seal_start = time.time()
|
|
seal_result = extract_seals_and_institutions(page_img, str(pdf_output_dir),
|
|
ocr_model=ocr_model, vl_pipeline=vl_pipeline)
|
|
result['performance']['seal_time'] = time.time() - seal_start
|
|
|
|
result['seal_results'] = seal_result['seals']
|
|
result['extracted']['institutions_from_seals'] = seal_result['institutions']
|
|
|
|
# Select best institution match
|
|
if seal_result['institutions']:
|
|
logger.info(f" Institution Extraction:")
|
|
logger.info(f" - Expected: {expected_inst if expected_inst else 'N/A'}")
|
|
logger.info(f" - Found {len(seal_result['institutions'])} institution(s) from seals")
|
|
|
|
# Find best matching institution
|
|
best_inst = None
|
|
best_similarity = 0.0
|
|
|
|
for idx, inst in enumerate(seal_result['institutions']):
|
|
if expected_inst and expected_inst != "无":
|
|
sim = calculate_similarity(inst, expected_inst)
|
|
logger.info(f" - Inst #{idx+1}: '{inst[:50]}...' → Similarity: {sim:.1f}%")
|
|
if sim > best_similarity:
|
|
best_similarity = sim
|
|
best_inst = inst
|
|
logger.info(f" → New best match! ({sim:.1f}% > {best_similarity:.1f}%)")
|
|
elif not best_inst:
|
|
best_inst = inst
|
|
logger.info(f" - Inst #{idx+1}: '{inst[:50]}...' (no expected value for comparison)")
|
|
|
|
# Fallback: if best_inst is still None (all similarities were 0), use first institution
|
|
if best_inst is None and seal_result['institutions']:
|
|
best_inst = seal_result['institutions'][0]
|
|
logger.warning(f" - All similarities were 0%, using first institution: '{best_inst[:50]}...'")
|
|
|
|
logger.info(f" - Selected: '{best_inst[:50]}...' (similarity: {best_similarity:.1f}%)")
|
|
result['extracted']['institution'] = best_inst
|
|
|
|
# Compare institution
|
|
if expected_inst and expected_inst != "无":
|
|
inst_comparison = classify_match(best_inst, expected_inst)
|
|
result['comparison']['institution'] = inst_comparison
|
|
else:
|
|
result['comparison']['institution']['notes'] = "No expected institution"
|
|
|
|
result['performance']['total_time'] = time.time() - total_start
|
|
|
|
return result
|
|
|
|
|
|
def generate_individual_report(result: Dict[str, Any], output_dir: Path):
|
|
"""Generate individual HTML report for a single PDF"""
|
|
pdf_name = result['pdf_name']
|
|
expected_cma = result['expected']['cma']
|
|
expected_inst = result['expected']['institution']
|
|
extracted_cma = result['extracted']['cma']
|
|
extracted_inst = result['extracted']['institution']
|
|
|
|
cma_match = result['comparison'].get('cma', {}).get('match_type', 'no_match')
|
|
cma_sim = result['comparison'].get('cma', {}).get('similarity', 0)
|
|
inst_match = result['comparison'].get('institution', {}).get('match_type', 'no_match')
|
|
inst_sim = result['comparison'].get('institution', {}).get('similarity', 0)
|
|
|
|
total_time = result['performance']['total_time']
|
|
|
|
# Colors
|
|
cma_color = '#4caf50' if cma_match == 'exact' else '#ff9800' if cma_match == 'partial' else '#f44336'
|
|
inst_color = '#4caf50' if inst_match == 'exact' else '#ff9800' if inst_match == 'partial' else '#f44336'
|
|
|
|
# Build seals HTML
|
|
seals_html = ""
|
|
if result['seal_results']:
|
|
seals_html = "<h2>Detected Seals and Institution Names</h2>"
|
|
for seal in result['seal_results']:
|
|
status = "[OK]" if seal['success'] else "[FAIL]"
|
|
text = seal['text'] if seal['text'] else "No text recognized"
|
|
seals_html += f"""
|
|
<div style="background: white; padding: 15px; margin-bottom: 20px; border-radius: 6px; border-left: 4px solid #2196F3;">
|
|
<h3>Seal #{seal['index']}</h3>
|
|
<p><strong>Recognized Text:</strong> {text}</p>
|
|
<p><strong>Confidence:</strong> {seal['confidence']:.2%}</p>
|
|
<p><strong>Status:</strong> {status}</p>
|
|
<div style="display: flex; gap: 10px; margin-top: 10px;">
|
|
<div>
|
|
<p style="margin: 0;">Marked:</p>
|
|
<img src="{seal['marked_path']}" style="max-height: 200px; border: 1px solid #ddd;">
|
|
</div>
|
|
<div>
|
|
<p style="margin: 0;">Unwarped:</p>
|
|
{f'<img src="{seal["unwarp_path"]}" style="max-height: 200px; border: 1px solid #ddd;">' if seal.get('unwarp_path') else 'N/A'}
|
|
</div>
|
|
</div>
|
|
</div>"""
|
|
|
|
html = f"""<!DOCTYPE html>
|
|
<html lang="zh-CN">
|
|
<head>
|
|
<meta charset="UTF-8">
|
|
<title>Extraction Report - {pdf_name}</title>
|
|
<style>
|
|
body {{ font-family: 'Segoe UI', sans-serif; margin: 0; padding: 20px; background: #f5f5f5; }}
|
|
.container {{ max-width: 1200px; margin: 0 auto; background: white; padding: 30px; border-radius: 8px; }}
|
|
h1 {{ color: #333; border-bottom: 3px solid #4caf50; padding-bottom: 10px; }}
|
|
.info-grid {{ display: grid; grid-template-columns: repeat(2, 1fr); gap: 20px; margin: 20px 0; }}
|
|
.info-box {{ background: #f9f9f9; padding: 15px; border-radius: 6px; }}
|
|
.info-box label {{ display: block; font-weight: bold; color: #666; margin-bottom: 5px; }}
|
|
.info-box .value {{ font-size: 18px; }}
|
|
.cma-box {{ border-left: 4px solid {cma_color}; }}
|
|
.inst-box {{ border-left: 4px solid {inst_color}; }}
|
|
.similarity {{ text-align: center; margin: 20px 0; }}
|
|
.similarity .score {{ font-size: 48px; font-weight: bold; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div class="container">
|
|
<h1>CMA & Institution Extraction Report</h1>
|
|
<p><strong>PDF:</strong> {pdf_name}</p>
|
|
<p><strong>Processing Time:</strong> {total_time:.2f}s</p>
|
|
|
|
<h2>CMA Code Extraction</h2>
|
|
<div class="info-grid">
|
|
<div class="info-box cma-box">
|
|
<label>Expected CMA</label>
|
|
<div class="value">{expected_cma}</div>
|
|
</div>
|
|
<div class="info-box cma-box">
|
|
<label>Extracted CMA</label>
|
|
<div class="value">{extracted_cma if extracted_cma else 'N/A'}</div>
|
|
</div>
|
|
<div class="info-box">
|
|
<label>Match Type</label>
|
|
<div class="value" style="color: {cma_color};">{cma_match.upper()}</div>
|
|
</div>
|
|
<div class="info-box">
|
|
<label>Similarity</label>
|
|
<div class="value">{cma_sim:.1f}%</div>
|
|
</div>
|
|
</div>
|
|
|
|
<h2>Institution Name Extraction</h2>
|
|
<div class="info-grid">
|
|
<div class="info-box inst-box">
|
|
<label>Expected Institution</label>
|
|
<div class="value">{expected_inst}</div>
|
|
</div>
|
|
<div class="info-box inst-box">
|
|
<label>Extracted Institution</label>
|
|
<div class="value">{extracted_inst if extracted_inst else 'N/A'}</div>
|
|
</div>
|
|
<div class="info-box">
|
|
<label>Match Type</label>
|
|
<div class="value" style="color: {inst_color};">{inst_match.upper()}</div>
|
|
</div>
|
|
<div class="info-box">
|
|
<label>Similarity</label>
|
|
<div class="value">{inst_sim:.1f}%</div>
|
|
</div>
|
|
</div>
|
|
|
|
<h2>Performance</h2>
|
|
<div class="info-grid">
|
|
<div class="info-box">
|
|
<label>Total Time</label>
|
|
<div class="value">{total_time:.2f}s</div>
|
|
</div>
|
|
<div class="info-box">
|
|
<label>CMA Extraction Time</label>
|
|
<div class="value">{result['performance']['cma_time']:.2f}s</div>
|
|
</div>
|
|
<div class="info-box">
|
|
<label>Seal Extraction Time</label>
|
|
<div class="value">{result['performance']['seal_time']:.2f}s</div>
|
|
</div>
|
|
<div class="info-box">
|
|
<label>Seals Detected</label>
|
|
<div class="value">{len(result['seal_results'])}</div>
|
|
</div>
|
|
</div>
|
|
|
|
{seals_html}
|
|
|
|
<h2>Visualizations</h2>
|
|
<div style="background: white; padding: 15px; border-radius: 6px;">
|
|
<p style="margin: 0 0 10px 0;">CMA Detection:</p>
|
|
<img src="cma_detection_fullpage.png" style="max-width: 100%; border: 1px solid #ddd;">
|
|
</div>
|
|
<div style="background: white; padding: 15px; border-radius: 6px; margin-top: 10px;">
|
|
<p style="margin: 0 0 10px 0;">Layout Detection:</p>
|
|
<img src="doc_layout_viz.png" style="max-width: 100%; border: 1px solid #ddd;">
|
|
</div>
|
|
</div>
|
|
</body>
|
|
</html>"""
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
with open(output_dir / 'index.html', 'w', encoding='utf-8') as f:
|
|
f.write(html)
|
|
|
|
|
|
def generate_summary_report(all_results: List[Dict[str, Any]], output_dir: Path):
|
|
"""Generate summary HTML report"""
|
|
# Calculate statistics
|
|
total = len(all_results)
|
|
valid_cma = [r for r in all_results if r['expected']['cma'] not in ['无', None]]
|
|
valid_inst = [r for r in all_results if r['expected']['institution'] not in ['无', None]]
|
|
|
|
cma_exact = sum(1 for r in valid_cma if r['comparison']['cma'].get('match_type') == 'exact')
|
|
cma_partial = sum(1 for r in valid_cma if r['comparison']['cma'].get('match_type') == 'partial')
|
|
cma_no = len(valid_cma) - cma_exact - cma_partial
|
|
|
|
inst_exact = sum(1 for r in valid_inst if r['comparison']['institution'].get('match_type') == 'exact')
|
|
inst_partial = sum(1 for r in valid_inst if r['comparison']['institution'].get('match_type') == 'partial')
|
|
inst_no = len(valid_inst) - inst_exact - inst_partial
|
|
|
|
cma_acc = (cma_exact / len(valid_cma) * 100) if valid_cma else 0
|
|
inst_acc = (inst_exact / len(valid_inst) * 100) if valid_inst else 0
|
|
|
|
avg_time = np.mean([r['performance']['total_time'] for r in all_results])
|
|
|
|
html = f"""<!DOCTYPE html>
|
|
<html lang="zh-CN">
|
|
<head>
|
|
<meta charset="UTF-8">
|
|
<title>Batch Test Summary - CMA & Institution Extraction</title>
|
|
<style>
|
|
body {{ font-family: 'Segoe UI', sans-serif; margin: 0; padding: 20px; background: #f5f5f5; }}
|
|
.container {{ max-width: 1400px; margin: 0 auto; background: white; padding: 30px; border-radius: 8px; }}
|
|
h1 {{ color: #333; }}
|
|
.summary {{ display: grid; grid-template-columns: repeat(4, 1fr); gap: 20px; margin: 20px 0; }}
|
|
.summary-card {{ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 20px; border-radius: 8px; color: white; text-align: center; }}
|
|
.summary-card .label {{ font-size: 14px; opacity: 0.9; }}
|
|
.summary-card .value {{ font-size: 32px; font-weight: bold; }}
|
|
table {{ width: 100%; border-collapse: collapse; margin: 20px 0; }}
|
|
th, td {{ padding: 12px; text-align: left; border-bottom: 1px solid #ddd; }}
|
|
th {{ background: #f5f5f5; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div class="container">
|
|
<h1>CMA & Institution Extraction - Batch Test Summary</h1>
|
|
<p>Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
|
|
|
|
<h2>CMA Code Results</h2>
|
|
<div class="summary">
|
|
<div class="summary-card" style="background: linear-gradient(135deg, #4caf50 0%, #45a049 100%);">
|
|
<div class="label">Exact Match</div>
|
|
<div class="value">{cma_exact}/{len(valid_cma)}</div>
|
|
</div>
|
|
<div class="summary-card" style="background: linear-gradient(135deg, #ff9800 0%, #f57c00 100%);">
|
|
<div class="label">Partial Match</div>
|
|
<div class="value">{cma_partial}/{len(valid_cma)}</div>
|
|
</div>
|
|
<div class="summary-card" style="background: linear-gradient(135deg, #f44336 0%, #d32f2f 100%);">
|
|
<div class="label">No Match</div>
|
|
<div class="value">{cma_no}/{len(valid_cma)}</div>
|
|
</div>
|
|
<div class="summary-card" style="background: linear-gradient(135deg, #2196F3 0%, #1976D2 100%);">
|
|
<div class="label">Accuracy</div>
|
|
<div class="value">{cma_acc:.1f}%</div>
|
|
</div>
|
|
</div>
|
|
|
|
<h2>Institution Name Results</h2>
|
|
<div class="summary">
|
|
<div class="summary-card" style="background: linear-gradient(135deg, #4caf50 0%, #45a049 100%);">
|
|
<div class="label">Exact Match</div>
|
|
<div class="value">{inst_exact}/{len(valid_inst)}</div>
|
|
</div>
|
|
<div class="summary-card" style="background: linear-gradient(135deg, #ff9800 0%, #f57c00 100%);">
|
|
<div class="label">Partial Match</div>
|
|
<div class="value">{inst_partial}/{len(valid_inst)}</div>
|
|
</div>
|
|
<div class="summary-card" style="background: linear-gradient(135deg, #f44336 0%, #d32f2f 100%);">
|
|
<div class="label">No Match</div>
|
|
<div class="value">{inst_no}/{len(valid_inst)}</div>
|
|
</div>
|
|
<div class="summary-card" style="background: linear-gradient(135deg, #2196F3 0%, #1976D2 100%);">
|
|
<div class="label">Accuracy</div>
|
|
<div class="value">{inst_acc:.1f}%</div>
|
|
</div>
|
|
</div>
|
|
|
|
<h2>Performance</h2>
|
|
<p>Average processing time: {avg_time:.1f}s per PDF</p>
|
|
|
|
<h2>Complete Results</h2>
|
|
<table>
|
|
<thead>
|
|
<tr>
|
|
<th>PDF</th>
|
|
<th>Expected CMA</th>
|
|
<th>Extracted CMA</th>
|
|
<th>CMA Match</th>
|
|
<th>Expected Inst</th>
|
|
<th>Extracted Inst</th>
|
|
<th>Inst Match</th>
|
|
<th>Seals</th>
|
|
<th>Time</th>
|
|
</tr>
|
|
</thead>
|
|
<tbody>"""
|
|
|
|
for r in all_results:
|
|
cma_symbol = {'exact': '[OK]', 'partial': '[PARTIAL]', 'no_match': '[FAIL]'}.get(r['comparison'].get('cma', {}).get('match_type', 'no_match'), '[?]')
|
|
inst_symbol = {'exact': '[OK]', 'partial': '[PARTIAL]', 'no_match': '[FAIL]'}.get(r['comparison'].get('institution', {}).get('match_type', 'no_match'), '[?]')
|
|
seals_count = len(r['seal_results'])
|
|
|
|
html += f"""
|
|
<tr>
|
|
<td>{r['pdf_name']}</td>
|
|
<td>{r['expected']['cma']}</td>
|
|
<td>{r['extracted']['cma'] or 'N/A'}</td>
|
|
<td>{cma_symbol}</td>
|
|
<td>{r['expected']['institution'][:30]}...</td>
|
|
<td>{(r['extracted']['institution'] or 'N/A')[:30]}...</td>
|
|
<td>{inst_symbol}</td>
|
|
<td>{seals_count}</td>
|
|
<td>{r['performance']['total_time']:.1f}s</td>
|
|
</tr>"""
|
|
|
|
html += """
|
|
</tbody>
|
|
</table>
|
|
</div>
|
|
</body>
|
|
</html>"""
|
|
|
|
with open(output_dir / 'summary.html', 'w', encoding='utf-8') as f:
|
|
f.write(html)
|
|
|
|
|
|
def main():
|
|
"""Main execution function"""
|
|
# Parse command line arguments
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description="OCR Test and Bridge Script")
|
|
parser.add_argument("--pdf", help="Path to single PDF for bridge mode")
|
|
parser.add_argument("--output-dir", help="Output directory", default="bridge_output")
|
|
parser.add_argument("--ocr-model", choices=["ppocr_v5", "paddleocr_vl"], default="ppocr_v5")
|
|
parser.add_argument("--batch", action="store_true", help="Run batch testing mode")
|
|
parser.add_argument("--batch-size", type=int, default=BATCH_SIZE, help="Number of PDFs to process")
|
|
parser.add_argument("--pdf-names", help="Comma-separated list of PDF names to process")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Shared model selection
|
|
ocr_model = args.ocr_model
|
|
|
|
if args.pdf:
|
|
# Bridge mode
|
|
pdf_path = Path(args.pdf)
|
|
output_dir = Path(args.output_dir)
|
|
res = process_single_pdf_standalone(pdf_path, output_dir, ocr_model)
|
|
print(json.dumps(res, cls=NumpyEncoder, ensure_ascii=False))
|
|
return
|
|
|
|
if not args.batch:
|
|
parser.print_help()
|
|
return
|
|
|
|
# Batch test mode (original main logic)
|
|
batch_size = args.batch_size
|
|
pdf_names_filter = args.pdf_names
|
|
|
|
print("=" * 80)
|
|
print("CMA & INSTITUTION EXTRACTION - BATCH ACCURACY TEST")
|
|
print("=" * 80)
|
|
print(f"OCR Model: {ocr_model.upper()}")
|
|
print(f"Processing first {batch_size} PDFs from results.json...")
|
|
print(f"PDF directory: {PDF_DIR}")
|
|
print(f"Output directory: {OUTPUT_DIR}")
|
|
print()
|
|
|
|
# Load ground truth
|
|
if not RESULTS_JSON.exists():
|
|
logger.error(f"Ground truth file not found: {RESULTS_JSON}")
|
|
return
|
|
|
|
with open(RESULTS_JSON, 'r', encoding='utf-8') as f:
|
|
ground_truth = json.load(f)
|
|
|
|
# Filter PDFs: either by name filter or by batch size
|
|
if pdf_names_filter:
|
|
# Split comma-separated names and strip whitespace
|
|
requested_names = [name.strip() for name in pdf_names_filter.split(',')]
|
|
pdf_list = [(name, ground_truth[name]) for name in requested_names if name in ground_truth]
|
|
if not pdf_list:
|
|
logger.error(f"None of the specified PDFs found in results.json: {requested_names}")
|
|
print(f"ERROR: None of the specified PDFs found in results.json: {requested_names}")
|
|
return
|
|
print(f"Processing {len(pdf_list)} specified PDF(s): {[name for name, _ in pdf_list]}")
|
|
else:
|
|
# Get first N PDFs
|
|
pdf_list = list(ground_truth.items())[:batch_size]
|
|
|
|
# Initialize OCR engines
|
|
# Note: We ALWAYS initialize ocr_engine for CMA recognition
|
|
# We ALWAYS try to initialize vl_pipeline for backup seal recognition (when unwarp fails)
|
|
ocr_engine = None
|
|
vl_pipeline = None
|
|
|
|
print("\n" + "=" * 80)
|
|
print("INITIALIZING OCR MODELS (This may take 1-3 minutes on first run)")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
logger.info("Initializing PaddleOCR engine for CMA recognition...")
|
|
print("[1/2] Initializing PaddleOCR engine (for CMA extraction)...")
|
|
print(" - Loading detection model (PP-OCRv4_det)...")
|
|
ocr_engine = PaddleOCR(use_textline_orientation=True, lang='ch')
|
|
print(" - Loading recognition model (PP-OCRv4_rec)...")
|
|
print(" - Loading direction classifier...")
|
|
logger.info("PaddleOCR initialized successfully")
|
|
print(" ✓ PaddleOCR initialized successfully\n")
|
|
|
|
# Initialize PaddleOCRVL for backup seal recognition (always try if available)
|
|
# This provides a fallback when polar unwarping fails
|
|
if PADDLEOCRVL_AVAILABLE:
|
|
logger.info("Initializing PaddleOCRVL for backup seal recognition...")
|
|
print("[2/2] Initializing PaddleOCRVL (for seal recognition backup)...")
|
|
print(" - This may take 30-60 seconds")
|
|
print(" - Loading model from cache: ~/.paddlex/official_models/PaddleOCR-VL-1.5")
|
|
print(" - Model size: ~1.9GB (loading into memory)...")
|
|
sys.stdout.flush() # Ensure output is displayed immediately
|
|
|
|
start_time = time.time()
|
|
try:
|
|
vl_pipeline = PaddleOCRVL(
|
|
use_seal_recognition=True,
|
|
use_ocr_for_image_block=True,
|
|
use_layout_detection=True
|
|
)
|
|
|
|
init_time = time.time() - start_time
|
|
print(f" - Initialization completed in {init_time:.1f} seconds")
|
|
|
|
# Verify initialization
|
|
if vl_pipeline is None:
|
|
raise RuntimeError("PaddleOCRVL initialization returned None")
|
|
|
|
logger.info("PaddleOCRVL initialized successfully (backup ready)")
|
|
print(" ✓ PaddleOCRVL backup ready - will be used when polar unwarping fails\n")
|
|
except Exception as e:
|
|
init_time = time.time() - start_time
|
|
logger.error(f"Failed to initialize PaddleOCRVL after {init_time:.1f}s: {e}")
|
|
logger.error(f"Exception type: {type(e).__name__}")
|
|
print(f" ✗ Failed to initialize PaddleOCRVL: {e}")
|
|
print(f" Exception type: {type(e).__name__}")
|
|
print(" → Polar unwarping failures will skip OCR (no backup available)\n")
|
|
else:
|
|
logger.info("PaddleOCRVL not available - polar unwarping failures will skip OCR")
|
|
print("[2/2] PaddleOCRVL not available - skipping")
|
|
print(" → Install with: pip install paddleocr[doc-parser]")
|
|
print(" → Polar unwarping failures will skip OCR (no backup)\n")
|
|
|
|
# Validate OCR model selection
|
|
if ocr_model == "paddleocr_vl" and vl_pipeline is None:
|
|
print("WARNING: PaddleOCRVL requested for primary seal recognition but not available!")
|
|
print("Falling back to PP-OCRv5 for seal recognition")
|
|
print("Please install: pip install paddleocr[doc-parser]")
|
|
ocr_model = "ppocr_v5"
|
|
|
|
print("=" * 80)
|
|
print("MODEL INITIALIZATION COMPLETE")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
# Create output directory
|
|
OUTPUT_DIR.mkdir(exist_ok=True)
|
|
|
|
# Process each PDF
|
|
all_results = []
|
|
start_time = time.time()
|
|
|
|
total_pdfs = len(pdf_list)
|
|
for i, (pdf_name, expected_data) in enumerate(pdf_list, 1):
|
|
expected_cma = expected_data.get('CMA', '')
|
|
expected_inst = expected_data.get('机构名', '')
|
|
|
|
print(f"\n[{i}/{total_pdfs}] Processing: {pdf_name}")
|
|
print(" + Loading PDF and extracting page...")
|
|
|
|
result = process_single_pdf(
|
|
pdf_name, expected_cma, expected_inst,
|
|
PDF_DIR, OUTPUT_DIR, ocr_engine,
|
|
ocr_model=ocr_model, vl_pipeline=vl_pipeline
|
|
)
|
|
|
|
all_results.append(result)
|
|
|
|
# Print result summary
|
|
if result['status'] == 'file_not_found':
|
|
print(f" + [!] File not found, skipping")
|
|
else:
|
|
cma_match = result['comparison']['cma'].get('match_type', 'unknown')
|
|
cma_sim = result['comparison']['cma'].get('similarity', 0)
|
|
cma_symbol = {'exact': '[OK]', 'partial': '[PARTIAL]', 'no_match': '[FAIL]'}.get(cma_match, '[?]')
|
|
|
|
print(f" + CMA Extraction:")
|
|
print(f" + Extracted: {result['extracted']['cma'] or 'N/A'}")
|
|
print(f" + Expected: {expected_cma}")
|
|
print(f" + Match: {cma_symbol} {cma_match.upper()} ({cma_sim:.1f}%)")
|
|
|
|
if result['extracted']['institution']:
|
|
inst_match = result['comparison']['institution'].get('match_type', 'unknown')
|
|
inst_sim = result['comparison']['institution'].get('similarity', 0)
|
|
inst_symbol = {'exact': '[OK]', 'partial': '[PARTIAL]', 'no_match': '[FAIL]'}.get(inst_match, '[?]')
|
|
print(f" + Institution Extraction:")
|
|
print(f" + Extracted: {result['extracted']['institution'][:50]}...")
|
|
print(f" + Expected: {expected_inst[:50]}...")
|
|
print(f" + Match: {inst_symbol} {inst_match.upper()} ({inst_sim:.1f}%)")
|
|
|
|
print(f" + Seals detected: {len(result['seal_results'])}")
|
|
print(f" + Completed in {result['performance']['total_time']:.2f}s")
|
|
|
|
# Generate individual report
|
|
generate_individual_report(result, OUTPUT_DIR / pdf_name)
|
|
|
|
# Interim results every 5
|
|
if i % 5 == 0:
|
|
valid_cma = [r for r in all_results if r['expected']['cma'] not in ['无', None]]
|
|
cma_exact = sum(1 for r in valid_cma if r['comparison']['cma'].get('match_type') == 'exact')
|
|
cma_acc = (cma_exact / len(valid_cma) * 100) if valid_cma else 0
|
|
|
|
valid_inst = [r for r in all_results if r['expected']['institution'] not in ['无', None] and r['extracted']['institution']]
|
|
inst_exact = sum(1 for r in valid_inst if r['comparison']['institution'].get('match_type') == 'exact')
|
|
inst_acc = (inst_exact / len(valid_inst) * 100) if valid_inst else 0
|
|
|
|
print()
|
|
print("=" * 80)
|
|
print(f"INTERIM RESULTS ({i}/{BATCH_SIZE} completed)")
|
|
print("=" * 80)
|
|
print(f"CMA Accuracy: {cma_acc:.1f}% ({cma_exact}/{len(valid_cma)} exact)")
|
|
print(f"Institution Accuracy: {inst_acc:.1f}% ({inst_exact}/{len(valid_inst)} exact)")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
total_time = time.time() - start_time
|
|
|
|
# Calculate final statistics
|
|
valid_cma = [r for r in all_results if r['expected']['cma'] not in ['无', None]]
|
|
cma_exact = sum(1 for r in valid_cma if r['comparison']['cma'].get('match_type') == 'exact')
|
|
cma_partial = sum(1 for r in valid_cma if r['comparison']['cma'].get('match_type') == 'partial')
|
|
cma_no = len(valid_cma) - cma_exact - cma_partial
|
|
cma_acc = (cma_exact / len(valid_cma) * 100) if valid_cma else 0
|
|
|
|
valid_inst = [r for r in all_results if r['expected']['institution'] not in ['无', None] and r['extracted']['institution']]
|
|
inst_exact = sum(1 for r in valid_inst if r['comparison']['institution'].get('match_type') == 'exact')
|
|
inst_partial = sum(1 for r in valid_inst if r['comparison']['institution'].get('match_type') == 'partial')
|
|
inst_no = len(valid_inst) - inst_exact - inst_partial
|
|
inst_acc = (inst_exact / len(valid_inst) * 100) if valid_inst else 0
|
|
|
|
# Generate summary report
|
|
print("\nGenerating summary report...")
|
|
generate_summary_report(all_results, OUTPUT_DIR)
|
|
|
|
# Save JSON
|
|
json_output = {
|
|
'summary': {
|
|
'total_processed': len(all_results),
|
|
'cma': {
|
|
'exact': cma_exact,
|
|
'partial': cma_partial,
|
|
'no_match': cma_no,
|
|
'accuracy': cma_acc / 100
|
|
},
|
|
'institution': {
|
|
'exact': inst_exact,
|
|
'partial': inst_partial,
|
|
'no_match': inst_no,
|
|
'accuracy': inst_acc / 100
|
|
},
|
|
'avg_processing_time': np.mean([r['performance']['total_time'] for r in all_results])
|
|
},
|
|
'results': all_results
|
|
}
|
|
|
|
with open(OUTPUT_DIR / 'test_report.json', 'w', encoding='utf-8') as f:
|
|
json.dump(json_output, f, ensure_ascii=False, indent=2, cls=NumpyEncoder)
|
|
|
|
# Print final summary
|
|
print("\n" + "=" * 80)
|
|
print("BATCH TEST COMPLETED - FINAL RESULTS")
|
|
print("=" * 80)
|
|
print(f"Total Processed: {len(all_results)}")
|
|
print()
|
|
print("CMA Code Results:")
|
|
print(f" Exact Match: {cma_exact}/{len(valid_cma)} ({cma_exact/len(valid_cma)*100:.1f}%)")
|
|
print(f" Partial Match: {cma_partial}/{len(valid_cma)} ({cma_partial/len(valid_cma)*100:.1f}%)")
|
|
print(f" No Match: {cma_no}/{len(valid_cma)} ({cma_no/len(valid_cma)*100:.1f}%)")
|
|
print(f" ** CMA Accuracy: {cma_acc:.1f}% **")
|
|
print()
|
|
print("Institution Name Results:")
|
|
print(f" Exact Match: {inst_exact}/{len(valid_inst)} ({inst_exact/len(valid_inst)*100:.1f}%)")
|
|
print(f" Partial Match: {inst_partial}/{len(valid_inst)} ({inst_partial/len(valid_inst)*100:.1f}%)")
|
|
print(f" No Match: {inst_no}/{len(valid_inst)} ({inst_no/len(valid_inst)*100:.1f}%)")
|
|
print(f" ** Institution Accuracy: {inst_acc:.1f}% **")
|
|
print()
|
|
print("Performance:")
|
|
print(f" Total Time: {total_time:.1f}s ({total_time/60:.1f}min)")
|
|
print(f" Average Time: {total_time/len(all_results):.1f}s per PDF")
|
|
print()
|
|
print("Reports Generated:")
|
|
print(f" - {OUTPUT_DIR / 'summary.html'}")
|
|
print(f" - {OUTPUT_DIR / 'test_report.json'}")
|
|
print(f" - Individual reports: {OUTPUT_DIR / '{pdf_name}/'}")
|
|
print()
|
|
print("=" * 80)
|
|
|
|
|
|
def process_single_pdf_standalone(pdf_path: Path, output_dir: Path, ocr_model: str):
|
|
"""Bridge function for Java to call for a single PDF"""
|
|
total_start = time.time()
|
|
|
|
# Initialize engines
|
|
logger.info(f"Initializing engines for standalone processing (Model: {ocr_model})...")
|
|
|
|
vl_pipeline = None
|
|
if ocr_model == "paddleocr_vl" and PADDLEOCRVL_AVAILABLE:
|
|
vl_pipeline = PaddleOCRVL(use_seal_recognition=True, use_ocr_for_image_block=True, use_layout_detection=True)
|
|
|
|
# Re-use the existing core logic function
|
|
result = process_single_pdf(
|
|
pdf_name=pdf_path.name,
|
|
expected_cma=None,
|
|
expected_inst=None,
|
|
pdf_dir=pdf_path.parent,
|
|
output_dir=output_dir,
|
|
ocr_engine=None, # Global instance not needed for this path
|
|
ocr_model=ocr_model,
|
|
vl_pipeline=vl_pipeline
|
|
)
|
|
|
|
# Format for bridge output
|
|
bridge_res = {
|
|
"success": result["status"] == "success",
|
|
"cma": {
|
|
"code": result["extracted"]["cma"],
|
|
"confidence": result["extracted"]["cma_confidence"],
|
|
"box": None # Not captured in current flat result
|
|
} if result["extracted"]["cma"] else None,
|
|
"seals": [
|
|
{
|
|
"index": s["index"],
|
|
"text": s["text"],
|
|
"confidence": s["confidence"],
|
|
"success": s["success"],
|
|
"method": "vl" if ocr_model == "paddleocr_vl" else "ppocr"
|
|
} for s in result["seal_results"]
|
|
],
|
|
"institutions": [s["text"] for s in result["seal_results"] if s["success"] and s["text"]],
|
|
"error": result["error"]
|
|
}
|
|
|
|
return bridge_res
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|