""" CMA Code Extraction & Institution Name - Batch Accuracy Testing Script (Enhanced) This script implements comprehensive batch accuracy testing for BOTH: 1. CMA code extraction 2. Institution name extraction from seals Uses the complete workflow from v_verify_logic.py including: - Layout detection (Paddlex PP-DocLayout-L) - Seal detection and refinement - Polar unwarping - OCR text recognition for institution names Author: Claude Code Date: 2025-02-05 Version: 2.0 (Enhanced with seal/institution extraction) """ import os import sys import json import time import logging import re import math from pathlib import Path from datetime import datetime from typing import Dict, List, Tuple, Optional, Any import numpy as np # Set UTF-8 encoding for Windows console if sys.platform == 'win32': import codecs try: sys.stdout = codecs.getwriter('utf-8')(sys.stdout.buffer, 'strict') sys.stderr = codecs.getwriter('utf-8')(sys.stderr.buffer, 'strict') except: pass os.environ["DISABLE_MODEL_SOURCE_CHECK"] = "True" class NumpyEncoder(json.JSONEncoder): """Custom JSON encoder for numpy types""" def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return super().default(obj) try: import fitz # PyMuPDF import cv2 from paddleocr import PaddleOCR, SealTextDetection, TextRecognition try: from paddleocr import PaddleOCRVL PADDLEOCRVL_AVAILABLE = True except ImportError: PADDLEOCRVL_AVAILABLE = False print("Warning: PaddleOCRVL not available. Install with: pip install paddleocr[doc-parser]") import paddlex as px from Levenshtein import distance as levenshtein_distance except ImportError as e: print(f"Error: Required dependency not found: {e}") print("Please install: pip install python-Levenshtein paddleocr paddlex pymupdf-ng opencv-python numpy") sys.exit(1) # Import CMA extraction module try: from cma_extraction_final import extract_cma_code_fullpage, imread_unicode except ImportError: print("Error: cma_extraction_final.py not found in current directory") sys.exit(1) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('test_accuracy_full.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Constants PDF_DIR = Path(r"src/test/resources/data/pdfs") RESULTS_JSON = Path(r"src/test/resources/data/results.json") OUTPUT_DIR = Path("test_reports_full") BATCH_SIZE = 20 SIMILARITY_THRESHOLD = 85.0 # OCR Model Configuration # Options: "ppocr_v5" (default), "paddleocr_vl" OCR_MODEL = os.environ.get("OCR_MODEL", "ppocr_v5") # ============ Helper Functions ============ def imwrite_safe(file_path, img): """ Write image file safely, handling Chinese paths on Windows. On Windows, cv2.imwrite fails with Chinese paths. This function uses cv2.imencode + tofile as a fallback. Args: file_path: Path to save the image img: Image data (numpy array) Returns: bool: True if successful, False otherwise """ try: # Try standard cv2.imwrite first success = cv2.imwrite(file_path, img) if success: return True # Fallback: Use imencode + tofile for Chinese paths is_success, buffer = cv2.imencode(".png", img) if is_success: buffer.tofile(file_path) return True return False except Exception as e: logger.error(f"Failed to write image to {file_path}: {e}") return False # ============ Seal Processing Functions (from v_verify_logic.py) ============ def polar_unwarp(img, center, radius, start_theta, angular_extent): """ Polar Unwarp with Canvas Padding for Partial Seals Extended version: - Creates a padded canvas to handle partial seals (seals cut off at edges) - Samples both inward (toward center) and outward (away from center) - Uses white padding for areas outside the original image boundary - This ensures we can always sample at the full radius even if seal is cut off """ if angular_extent <= 0: return None strip_w = int(angular_extent * radius) # Extended sampling range: # - Inward: 100% of radius (toward center) - all the way to center # - Outward: 20% beyond radius (away from center) inward_range = int(radius * 0.85) # 向内到圆心 outward_range = int(radius * 0.2) # 向外20% strip_h = inward_range + outward_range if strip_w <= 0 or strip_h <= 0: return None ch, cw = img.shape[:2] # Calculate padding needed to ensure all sampling points are within bounds # Maximum distance from center will be radius + outward_range max_distance = radius + outward_range # Calculate padding needed on each side pad_top = max(0, max_distance - center[1]) pad_bottom = max(0, max_distance - (ch - center[1])) pad_left = max(0, max_distance - center[0]) pad_right = max(0, max_distance - (cw - center[0])) # Create padded canvas with white background padded_h = ch + pad_top + pad_bottom padded_w = cw + pad_left + pad_right padded_canvas = np.ones((padded_h, padded_w, 3), dtype=np.uint8) * 255 # Place original image in center padded_canvas[pad_top:pad_top+ch, pad_left:pad_left+cw] = img # Adjust center position for padded canvas center_padded = [center[0] + pad_left, center[1] + pad_top] strip = np.zeros((strip_h, strip_w, 3), dtype=np.uint8) for y in range(strip_h): # Calculate radius at this row # Start from radius + outward_range (outside) # Move inward toward center r = radius + outward_range - y for x in range(strip_w): theta = start_theta + angular_extent * (x / strip_w) src_x = center_padded[0] + r * math.cos(theta) src_y = center_padded[1] + r * math.sin(theta) # Sample from padded canvas (all points should be within bounds now) sx, sy = int(src_x), int(src_y) if 0 <= sx < padded_w and 0 <= sy < padded_h: strip[y, x] = padded_canvas[sy, sx] else: strip[y, x] = [255, 255, 255] return strip def calculate_precise_arc(polygons, center): """Calculate precise arc parameters for seal text""" initial_clusters = [] gap_thresh = math.radians(15) for poly in polygons: thetas = sorted([math.atan2(p[1] - center[1], p[0] - center[0]) for i, p in enumerate(poly)]) if not thetas: continue max_gap = 0 gap_idx = -1 for i in range(len(thetas)): gap = (thetas[0] + 2*math.pi - thetas[i]) if i == len(thetas)-1 else (thetas[i+1]-thetas[i]) if gap > max_gap: max_gap = gap; gap_idx = i if gap_idx == len(thetas) - 1: t_arc = thetas else: t_arc = thetas[gap_idx+1:] + [t + 2*math.pi for t in thetas[:gap_idx+1]] if not t_arc: continue curr = [t_arc[0]] for i in range(1, len(t_arc)): if t_arc[i] - t_arc[i-1] > gap_thresh: initial_clusters.append({'start': curr[0], 'end': curr[-1]}) curr = [t_arc[i]] else: curr.append(t_arc[i]) initial_clusters.append({'start': curr[0], 'end': curr[-1]}) if not initial_clusters: return 0.0, 0.0 initial_clusters.sort(key=lambda x: x['start']) merged = [] merge_thresh = math.radians(45) if initial_clusters: curr = initial_clusters[0] for i in range(1, len(initial_clusters)): nxt = initial_clusters[i] if nxt['start'] - curr['end'] < merge_thresh: curr['end'] = max(curr['end'], nxt['end']) else: merged.append(curr) curr = nxt merged.append(curr) candidates = [] for m in merged: st, en = m['start'], m['end'] ex = en - st mid = (st + en) / 2 dist_to_top = abs(((mid + math.pi/2 + math.pi) % (2*math.pi)) - math.pi) weight = math.exp(-0.5 * (dist_to_top / (math.pi/2))**2) candidates.append({'start': st, 'end': en, 'extent': ex, 'score': ex * weight}) candidates.sort(key=lambda x: x['score'], reverse=True) best = candidates[0] # FIX: Limit extent to max 350° to avoid overlap and distortion # Extent > 360° causes severe image distortion in polar unwarping MAX_EXTENT_DEG = 350.0 start_theta = best['start'] extent = best['end'] - best['start'] if math.degrees(extent) > MAX_EXTENT_DEG: logger.warning(f"Arc extent {math.degrees(extent):.2f}° exceeds {MAX_EXTENT_DEG}°, clamping to avoid distortion") extent = math.radians(MAX_EXTENT_DEG) return start_theta, extent def fit_circle_from_text_polygons(all_polygons): """ Fit circle from text polygons using least squares method. Equation: (x - a)² + (y - b)² = r² Expanded: x² + y² - 2ax - 2by + (a² + b² - r²) = 0 Let: c = a² + b² - r² Then: x² + y² = 2ax + 2by - c This is a linear system: [2x, 2y, -1] * [a, b, c]ᵀ = x² + y² """ if len(all_polygons) == 0: return None, None, None # Collect all points from polygons points = [] for poly in all_polygons: for p in poly: points.append([float(p[0]), float(p[1])]) if len(points) < 5: return None, None, None points = np.array(points) # Build linear system # A * [a, b, c]ᵀ = b A = np.column_stack([2 * points[:, 0], 2 * points[:, 1], -np.ones(len(points))]) b_vec = np.sum(points ** 2, axis=1) try: # Solve least squares sol, residuals, rank, singular_values = np.linalg.lstsq(A, b_vec, rcond=None) a, b, c = sol center_x = a center_y = b radius = np.sqrt(a**2 + b**2 - c) # Calculate fitting error (RMSE) if len(residuals) > 0: rmse = np.sqrt(residuals[0] / len(points)) else: # Calculate manually predicted = A @ sol errors = predicted - b_vec rmse = np.sqrt(np.mean(errors ** 2)) return (int(center_x), int(center_y)), int(radius), rmse except Exception as e: logger.error(f"Circle fitting failed: {e}") return None, None, None def detect_seal_center_dual_method(seal_crop, all_polygons): """ Dual strategy: Automatically select the best center detection method. Strategy: 1. Try circle fitting 2. Check fitting quality (RMSE, offset distance) 3. If fitting quality is good → use fitted center 4. Otherwise → use crop center Returns: center: [x, y] - detected center radius: int - detected radius method: str - "crop_center" or "circle_fitting" """ ch, cw = seal_crop.shape[:2] # Method 1: Crop center (default method) center_crop = [cw // 2, ch // 2] radius_crop = min(cw, ch) // 2 - 10 # Method 2: Circle fitting center_fit, radius_fit, rmse = fit_circle_from_text_polygons(all_polygons) if center_fit is None: logger.info(" Circle fitting failed, using crop center") return center_crop, radius_crop, "crop_center" # Calculate offset between fitted center and crop center offset = math.sqrt((center_fit[0] - center_crop[0])**2 + (center_fit[1] - center_crop[1])**2) offset_ratio = offset / min(cw, ch) # Quality check criteria # 1. RMSE should be low (good fit) # 2. Offset should not be too large (center should be reasonable) # 3. Need enough polygons for reliable fitting rmse_threshold = 3000 offset_threshold = 0.2 # 20% of crop size min_polygons = 3 is_fit_good = ( rmse < rmse_threshold and offset_ratio < offset_threshold and len(all_polygons) >= min_polygons ) if is_fit_good: logger.info(f" Using circle fitting: RMSE={rmse:.2f}, offset_ratio={offset_ratio:.2f}") return center_fit, radius_fit, "circle_fitting" else: reasons = [] if rmse >= rmse_threshold: reasons.append(f"RMSE too high ({rmse:.2f} >= {rmse_threshold})") if offset_ratio >= offset_threshold: reasons.append(f"offset too large ({offset_ratio:.2f} >= {offset_threshold})") if len(all_polygons) < min_polygons: reasons.append(f"not enough polygons ({len(all_polygons)} < {min_polygons})") logger.info(f" Circle fitting unreliable ({', '.join(reasons)}), using crop center") return center_crop, radius_crop, "crop_center" def run_layout_detection(image_path): """Run Paddlex PP-DocLayout-L for layout analysis""" try: model = px.create_model("PP-DocLayout-L") output = model.predict(image_path, batch_size=1) all_regions = [] for res in output: boxes = res.get('boxes', []) for box in boxes: label_name = box.get('label_name', box.get('label', 'unknown')) score = box.get('score', 0.0) coords = box.get('coordinate') all_regions.append({ 'label': label_name, 'score': score, 'box': coords }) return all_regions except Exception as e: logger.error(f"Layout detection failed: {e}") return [] def run_ocr_recognition(image_path, rec_model): """Run OCR recognition on unwarp seal image""" try: output = rec_model.predict(input=image_path, batch_size=1) if output and len(output) > 0: res = output[0] text = res.get('rec_text', '').strip() score = res.get('rec_score', 0.0) return { 'text': text, 'score': score, 'success': len(text) > 0 } else: return {'text': '', 'score': 0.0, 'success': False} except Exception as e: logger.error(f"OCR recognition failed: {e}") return {'text': '', 'score': 0.0, 'success': False} def run_ocr_recognition_vl(image_path, vl_pipeline): """ Run OCR recognition using PaddleOCRVL on seal image. Can be used on both unwarp images and crop images (backup mode). Args: image_path: Path to seal image (unwarp or crop) vl_pipeline: Initialized PaddleOCRVL pipeline Returns: Dict with 'text', 'score', 'success' keys """ try: # Create temp output directory for VL results temp_output_dir = Path("temp_paddleocr_vl") temp_output_dir.mkdir(exist_ok=True) # Run prediction output = vl_pipeline.predict(image_path) if output and len(output) > 0: res = output[0] # Save JSON to extract text res.save_to_json(save_path=str(temp_output_dir)) # Read JSON to find seal text json_file = temp_output_dir / f"{Path(image_path).stem}_res.json" if json_file.exists(): with open(json_file, 'r', encoding='utf-8') as f: data = json.load(f) # Find seal block and extract content for block in data.get('parsing_res_list', []): if block.get('block_label') == 'seal': text = block.get('block_content', '').strip() # Clean up temp files import shutil if temp_output_dir.exists(): shutil.rmtree(temp_output_dir, ignore_errors=True) return { 'text': text, 'score': 1.0, # PaddleOCRVL doesn't provide confidence score 'success': len(text) > 0 } # Clean up temp files import shutil if temp_output_dir.exists(): shutil.rmtree(temp_output_dir, ignore_errors=True) return {'text': '', 'score': 0.0, 'success': False} else: return {'text': '', 'score': 0.0, 'success': False} except Exception as e: logger.error(f"PaddleOCRVL recognition failed: {e}") import traceback logger.error(traceback.format_exc()) return {'text': '', 'score': 0.0, 'success': False} def extract_seals_and_institutions(page_img, output_dir, ocr_model="ppocr_v5", vl_pipeline=None): """ Extract seals and recognize institution names from page image. Args: page_img: Input page image output_dir: Directory to save intermediate results ocr_model: OCR model to use ("ppocr_v5" or "paddleocr_vl") vl_pipeline: PaddleOCRVL pipeline (required if ocr_model="paddleocr_vl") Returns: Dict with: - 'seals': list of seal results - 'institutions': list of recognized institution names - 'processing_time': time taken """ start_time = time.time() result = { 'seals': [], 'institutions': [], 'processing_time': 0.0 } # Validate input image if page_img is None: logger.error("Input page_img is None") result['processing_time'] = time.time() - start_time return result if not isinstance(page_img, np.ndarray): logger.error(f"Input page_img is not numpy array, type: {type(page_img)}") result['processing_time'] = time.time() - start_time return result if page_img.size == 0: logger.error("Input page_img is empty") result['processing_time'] = time.time() - start_time return result logger.info(f"Input image shape: {page_img.shape}, dtype: {page_img.dtype}") # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Save page image doc_path = os.path.join(output_dir, "doc_page.png") try: success = imwrite_safe(doc_path, page_img) if not success: logger.error(f"imwrite_safe returned False for {doc_path}") # Try alternative save method using PIL try: from PIL import Image img_rgb = cv2.cvtColor(page_img, cv2.COLOR_BGR2RGB) pil_img = Image.fromarray(img_rgb) pil_img.save(doc_path) logger.info(f"Saved using PIL as fallback: {doc_path}") # Verify PIL save worked if not os.path.exists(doc_path): logger.error(f"PIL save also failed, file not found: {doc_path}") result['processing_time'] = time.time() - start_time return result except Exception as pil_e: logger.error(f"PIL fallback also failed: {pil_e}") result['processing_time'] = time.time() - start_time return result except Exception as e: logger.error(f"Failed to save page image: {e}") result['processing_time'] = time.time() - start_time return result # Verify file exists before proceeding if not os.path.exists(doc_path): logger.error(f"Page image file not found after save: {doc_path}") result['processing_time'] = time.time() - start_time return result # Run layout detection logger.info("Running layout detection...") all_regions = run_layout_detection(doc_path) # Extract seal boxes seal_boxes = [] page_viz = page_img.copy() for reg in all_regions: box = reg.get('box') label = reg.get('label') score = reg.get('score', 0.0) is_seal = (label == 'seal') if score > 0.2: x1, y1, x2, y2 = [int(v) for v in box] color = (0, 0, 255) if is_seal else (0, 255, 0) cv2.rectangle(page_viz, (x1, y1), (x2, y2), color, 2) if is_seal: seal_boxes.append(box) imwrite_safe(os.path.join(output_dir, "doc_layout_viz.png"), page_viz) if not seal_boxes: logger.warning("No seals detected") result['processing_time'] = time.time() - start_time return result # Process each seal logger.info(f"Processing {len(seal_boxes)} detected seals...") det_model = SealTextDetection(model_name="PP-OCRv4_server_seal_det") # Initialize OCR model based on selection if ocr_model == "paddleocr_vl": if not PADDLEOCRVL_AVAILABLE: logger.error("PaddleOCRVL requested but not available. Falling back to PP-OCRv5.") ocr_model = "ppocr_v5" rec_model = TextRecognition(model_name="PP-OCRv5_server_rec") elif vl_pipeline is None: logger.error("PaddleOCRVL requested but vl_pipeline is None. Falling back to PP-OCRv5.") ocr_model = "ppocr_v5" rec_model = TextRecognition(model_name="PP-OCRv5_server_rec") else: logger.info("Using PaddleOCRVL for seal text recognition") rec_model = None # Not used for PaddleOCRVL else: logger.info("Using PP-OCRv5_server_rec for seal text recognition") rec_model = TextRecognition(model_name="PP-OCRv5_server_rec") for i, box in enumerate(seal_boxes): x1, y1, x2, y2 = [int(v) for v in box] pad = 40 y1_p, y2_p = max(0, y1-pad), min(page_img.shape[0], y2+pad) x1_p, x2_p = max(0, x1-pad), min(page_img.shape[1], x2+pad) seal_crop = page_img[y1_p:y2_p, x1_p:x2_p] # Validate crop if seal_crop.size == 0 or seal_crop.shape[0] == 0 or seal_crop.shape[1] == 0: logger.warning(f"Invalid seal crop dimensions: {seal_crop.shape}, skipping seal {i}") continue crop_path = os.path.join(output_dir, f"seal_crop_{i}.png") success = imwrite_safe(crop_path, seal_crop) if not success: # Try PIL fallback try: from PIL import Image crop_rgb = cv2.cvtColor(seal_crop, cv2.COLOR_BGR2RGB) pil_img = Image.fromarray(crop_rgb) pil_img.save(crop_path) logger.info(f"Saved seal crop using PIL fallback: {crop_path}") except Exception as pil_e: logger.error(f"Failed to save seal crop to {crop_path}: {pil_e}, skipping seal {i}") continue # Verify file exists if not os.path.exists(crop_path): logger.error(f"Seal crop file not found after save: {crop_path}, skipping seal {i}") continue # Detect text polygons output = det_model.predict(crop_path, batch_size=1) all_polygons = [] for res in output: polys = res.get('dt_polys') if isinstance(res, dict) else None if polys: all_polygons.extend(polys) ch, cw = seal_crop.shape[:2] # ============ DUAL STRATEGY: Choose best center detection method ============ logger.info(f" Seal #{i} Geometry:") logger.info(f" - Crop size: {cw}x{ch}") logger.info(f" - Text polygons detected: {len(all_polygons)}") center, radius, method_used = detect_seal_center_dual_method(seal_crop, all_polygons) logger.info(f" - Method used: {method_used}") logger.info(f" - Center: ({center[0]}, {center[1]})") logger.info(f" - Radius: {radius}") # ============ INSUFFICIENT POLYGONS CHECK ============ # If too few text polygons detected, polar unwarping will likely fail # Skip directly to PaddleOCRVL backup in this case MIN_POLYGONS_FOR_UNWARP = 3 if len(all_polygons) < MIN_POLYGONS_FOR_UNWARP: logger.warning(f" Seal #{i}: Only {len(all_polygons)} text polygons detected (< {MIN_POLYGONS_FOR_UNWARP})") logger.warning(f" Seal #{i}: Skipping polar unwarping (insufficient polygon data)") logger.info(f" Seal #{i}: Using PaddleOCRVL backup instead") # Save crop image imwrite_safe(crop_path, seal_crop) # Use PaddleOCRVL directly on crop (no unwarp) if vl_pipeline is not None and PADDLEOCRVL_AVAILABLE: ocr_result = run_ocr_recognition_vl(crop_path, vl_pipeline) logger.info(f" Seal #{i} PaddleOCRVL Result (direct crop):") logger.info(f" - Text: '{ocr_result['text']}'") logger.info(f" - Score: {ocr_result['score']:.4f}") logger.info(f" - Success: {ocr_result['success']}") logger.info(f" - ** Used PaddleOCRVL (insufficient polygons for unwarping) **") # Create debug info without unwarp seal_data = { 'index': i, 'box': box, 'crop_path': Path(crop_path).name, 'unwarp_path': None, # No unwarp performed 'marked_path': None, # No marked image 'polar_viz_path': None, # No polar visualization 'text': ocr_result['text'], 'confidence': float(ocr_result['score']), 'success': bool(ocr_result['success']), 'method_used': f'{method_used}_skip_unwarp', 'used_fallback': True, 'debug_info': { 'center': center, 'radius': radius, 'start_theta_deg': None, 'extent_deg': None, 'num_polygons': len(all_polygons), 'crop_size': (cw, ch), 'unwarp_size': None, 'skip_reason': f'Insufficient polygons ({len(all_polygons)} < {MIN_POLYGONS_FOR_UNWARP})' } } result['seals'].append(seal_data) if ocr_result['success']: # Clean the institution name before adding cleaned_name = clean_institution_name(ocr_result['text']) result['institutions'].append(cleaned_name) logger.info(f" ✓ Seal #{i} SUCCESS: {cleaned_name[:50]}... (confidence: {ocr_result['score']:.4f})") else: logger.warning(f" ✗ Seal #{i} FAILED: Could not extract institution name") continue # Skip to next seal else: logger.error(f" Seal #{i}: PaddleOCRVL not available, cannot extract text") seal_data = { 'index': i, 'box': box, 'crop_path': Path(crop_path).name, 'unwarp_path': None, 'marked_path': None, 'polar_viz_path': None, 'text': '', 'confidence': 0.0, 'success': False, 'method_used': f'{method_used}_skip_unwarp', 'used_fallback': True, 'debug_info': { 'center': center, 'radius': radius, 'start_theta_deg': None, 'extent_deg': None, 'num_polygons': len(all_polygons), 'crop_size': (cw, ch), 'unwarp_size': None, 'skip_reason': f'Insufficient polygons and no PaddleOCRVL backup' } } result['seals'].append(seal_data) continue # Calculate arc and unwarp start_theta, extent = calculate_precise_arc(all_polygons, center) logger.info(f" Seal #{i} Arc Parameters:") logger.info(f" - Start theta: {math.degrees(start_theta):.2f}°") logger.info(f" - Extent: {math.degrees(extent):.2f}° ({math.degrees(extent)*radius:.1f} pixels width)") marked = seal_crop.copy() # Draw all text polygons in green for p in all_polygons: cv2.polylines(marked, [np.array(p, dtype=np.int32)], True, (0, 255, 0), 2) # Draw center point (yellow cross) center_x, center_y = int(center[0]), int(center[1]) cv2.drawMarker(marked, (center_x, center_y), (0, 255, 255), markerType=cv2.MARKER_CROSS, markerSize=20, thickness=2) cv2.circle(marked, (center_x, center_y), 5, (0, 255, 255), -1) # Draw estimated radius circle (cyan) cv2.circle(marked, (center_x, center_y), radius, (255, 255, 0), 2) # Draw polar sampling visualization polar_viz = seal_crop.copy() cv2.drawMarker(polar_viz, (center_x, center_y), (0, 255, 255), markerType=cv2.MARKER_CROSS, markerSize=20, thickness=2) cv2.circle(polar_viz, (center_x, center_y), radius, (255, 255, 0), 2) unwarp_path = os.path.join(output_dir, f"seal_unwarp_{i}.png") unwarp = None used_fallback = False if extent > 0: logger.info(f" Seal #{i}: Performing polar unwarping with detected text polygons...") unwarp = polar_unwarp(seal_crop, center, radius, start_theta, extent) if unwarp is not None: imwrite_safe(unwarp_path, unwarp) logger.info(f" - Unwarp size: {unwarp.shape[1]}x{unwarp.shape[0]}") def draw_line(m, theta, color): x = center[0] + radius * math.cos(theta) y = center[1] + radius * math.sin(theta) cv2.line(m, (int(center[0]), int(center[1])), (int(x), int(y)), color, 2) # Draw start angle line (blue) draw_line(marked, start_theta, (255, 0, 0)) # Draw end angle line (red) draw_line(marked, start_theta + extent, (0, 0, 255)) # Draw sampling points on polar_viz (show where polar samples come from) num_sample_points = min(50, int(extent * radius)) # Show up to 50 sample points for r_idx in range(5): # 5 different radii r = radius - r_idx * (radius * 0.6 / 5) for theta_idx in range(num_sample_points): theta = start_theta + extent * (theta_idx / num_sample_points) src_x = center[0] + r * math.cos(theta) src_y = center[1] + r * math.sin(theta) if 0 <= src_x < cw and 0 <= src_y < ch: cv2.circle(polar_viz, (int(src_x), int(src_y)), 1, (255, 0, 255), -1) # Save polar visualization polar_viz_path = os.path.join(output_dir, f"seal_polar_viz_{i}.png") imwrite_safe(polar_viz_path, polar_viz) logger.info(f" - Polar visualization saved: seal_polar_viz_{i}.png") else: logger.warning(f" Seal #{i}: Polar unwarp returned None") # ============ FALLBACK: Use fixed angle range when no text detected ============ if unwarp is None and extent <= 0 and len(all_polygons) == 0: logger.warning(f" Seal #{i}: No text polygons detected, using fallback angle range (7:30 to 4:30 clockwise)") used_fallback = True # 7:30 direction (left-bottom) to 4:30 direction (right-bottom) clockwise # In standard math angle (0 = 3 o'clock, CCW): # 7:30 = 225 degrees = 3.927 rad # 4:30 = 135 degrees = 2.356 rad # Clockwise from 7:30 to 4:30 covers 270 degrees # We start at 4:30 (135 degrees) and go counter-clockwise 270 degrees fallback_start_theta = math.radians(135) # 4:30 position fallback_extent = math.radians(270) # 270 degree coverage logger.info(f" Seal #{i}: Fallback - Start: 135.00° (4:30), Extent: 270.00°") unwarp = polar_unwarp(seal_crop, center, radius, fallback_start_theta, fallback_extent) if unwarp is not None: imwrite_safe(unwarp_path, unwarp) logger.info(f" - Fallback unwarp size: {unwarp.shape[1]}x{unwarp.shape[0]}") # Update start_theta and extent for visualization start_theta = fallback_start_theta extent = fallback_extent def draw_line(m, theta, color): x = center[0] + radius * math.cos(theta) y = center[1] + radius * math.sin(theta) cv2.line(m, (int(center[0]), int(center[1])), (int(x), int(y)), color, 2) # Draw start angle line (blue) - 4:30 position draw_line(marked, start_theta, (255, 0, 0)) # Draw end angle line (red) - 7:30 position draw_line(marked, start_theta + extent, (0, 0, 255)) # Draw sampling points num_sample_points = 50 for r_idx in range(5): r = radius - r_idx * (radius * 0.6 / 5) for theta_idx in range(num_sample_points): theta = start_theta + extent * (theta_idx / num_sample_points) src_x = center[0] + r * math.cos(theta) src_y = center[1] + r * math.sin(theta) if 0 <= src_x < cw and 0 <= src_y < ch: cv2.circle(polar_viz, (int(src_x), int(src_y)), 1, (255, 0, 255), -1) polar_viz_path = os.path.join(output_dir, f"seal_polar_viz_{i}.png") imwrite_safe(polar_viz_path, polar_viz) logger.info(f" - Fallback polar visualization saved: seal_polar_viz_{i}.png") else: logger.warning(f" Seal #{i}: Fallback polar unwarp also returned None") marked_path = os.path.join(output_dir, f"seal_marked_{i}.png") imwrite_safe(marked_path, marked) # OCR recognition with double verification ocr_result = {'text': '', 'score': 0.0, 'success': False} ocr_method_used = method_used if unwarp is not None: # Standard path: Recognize unwarp image method_str = "FALLBACK" if used_fallback else "Standard" logger.info(f" Seal #{i}: Running OCR ({method_str}, model={ocr_model}) on unwarp image...") if ocr_model == "paddleocr_vl": ocr_result = run_ocr_recognition_vl(unwarp_path, vl_pipeline) else: ocr_result = run_ocr_recognition(unwarp_path, rec_model) ocr_method_used = f"{method_used}_unwarp" logger.info(f" Seal #{i} OCR Result (unwarp):") logger.info(f" - Text: '{ocr_result['text']}'") logger.info(f" - Score: {ocr_result['score']:.4f}") logger.info(f" - Success: {ocr_result['success']}") logger.info(f" - Text length: {len(ocr_result['text'])} chars") if used_fallback: logger.info(f" - ** Used fallback angle range (7:30 to 4:30) **") # ============ DOUBLE VERIFICATION: Try PaddleOCRVL on crop if unwarp OCR fails ============ # If unwarp OCR failed (empty text or success=False), try PaddleOCRVL backup on crop if (not ocr_result['success'] or len(ocr_result['text'].strip()) == 0) and vl_pipeline is not None and PADDLEOCRVL_AVAILABLE: logger.warning(f" Seal #{i}: Unwarp OCR failed (empty result), trying PaddleOCRVL backup on crop image") seal_crop_path = os.path.join(output_dir, f"seal_crop_{i}.png") backup_result = run_ocr_recognition_vl(seal_crop_path, vl_pipeline) logger.info(f" Seal #{i} PaddleOCRVL Backup Result (crop):") logger.info(f" - Text: '{backup_result['text']}'") logger.info(f" - Score: {backup_result['score']:.4f}") logger.info(f" - Success: {backup_result['success']}") logger.info(f" - Text length: {len(backup_result['text'])} chars") # Use backup result if it's better (non-empty text) if backup_result['success'] and len(backup_result['text'].strip()) > 0: logger.info(f" Seal #{i}: ** Using PaddleOCRVL backup result (unwarp failed) **") ocr_result = backup_result ocr_method_used = f"{method_used}_crop_backup" else: logger.warning(f" Seal #{i}: ** Both unwarp and crop OCR failed **") else: # ============ BACKUP: Use PaddleOCRVL directly on seal crop ============ logger.warning(f" Seal #{i}: No unwarp image available (polar unwarp failed)") if vl_pipeline is not None and PADDLEOCRVL_AVAILABLE: logger.info(f" Seal #{i}: Using PaddleOCRVL backup - directly recognize seal crop image") seal_crop_path = os.path.join(output_dir, f"seal_crop_{i}.png") ocr_result = run_ocr_recognition_vl(seal_crop_path, vl_pipeline) ocr_method_used = f"{method_used}_crop_backup" logger.info(f" Seal #{i} PaddleOCRVL Backup Result:") logger.info(f" - Text: '{ocr_result['text']}'") logger.info(f" - Score: {ocr_result['score']:.4f}") logger.info(f" - Success: {ocr_result['success']}") logger.info(f" - Text length: {len(ocr_result['text'])} chars") logger.info(f" - ** Used PaddleOCRVL backup (direct crop recognition) **") else: logger.warning(f" Seal #{i}: No backup available (vl_pipeline=None or PaddleOCRVL not installed), skipping OCR") seal_data = { 'index': int(i), 'box': [float(v) for v in box], 'crop_path': f"seal_crop_{i}.png", 'unwarp_path': f"seal_unwarp_{i}.png" if unwarp is not None else None, 'marked_path': f"seal_marked_{i}.png", 'polar_viz_path': f"seal_polar_viz_{i}.png" if unwarp is not None else None, 'text': ocr_result['text'], 'confidence': float(ocr_result['score']), 'success': bool(ocr_result['success']), 'method_used': ocr_method_used, # Track actual OCR method used 'used_fallback': used_fallback, # Track if fallback was used 'debug_info': { 'center': center, 'radius': radius, 'start_theta_deg': float(math.degrees(start_theta)), 'extent_deg': float(math.degrees(extent)), 'num_polygons': len(all_polygons), 'crop_size': (cw, ch), 'unwarp_size': (unwarp.shape[1], unwarp.shape[0]) if unwarp is not None else None } } result['seals'].append(seal_data) if ocr_result['success']: # Clean the institution name before adding cleaned_name = clean_institution_name(ocr_result['text']) result['institutions'].append(cleaned_name) logger.info(f" ✓ Seal #{i} SUCCESS: {cleaned_name[:50]}... (confidence: {ocr_result['score']:.4f})") else: logger.warning(f" ✗ Seal #{i} FAILED: Could not extract institution name") result['processing_time'] = time.time() - start_time return result # ============ Text Cleaning Functions ============ def clean_institution_name(text: str) -> str: """ Clean extracted institution name by removing unwanted suffixes. Removes common seal-related text that is not part of the institution name: - 检验检测专用章 - 检验检测专用 - 专用章 - 及其他变体 Args: text: Raw extracted institution name Returns: Cleaned institution name """ if not text: return text # Define patterns to remove (order matters: most specific first) patterns_to_remove = [ '检验检测专用章', '检验检测专用', '检测专用章', '检验专用章', '专用章', '(检验检测)', '(检验检测)', '【检验检测】', '[检验检测]', ] cleaned = text for pattern in patterns_to_remove: if pattern in cleaned: cleaned = cleaned.replace(pattern, '') logger.debug(f"Removed pattern '{pattern}' from institution name") # Strip whitespace cleaned = cleaned.strip() # Log if cleaning occurred if cleaned != text: logger.info(f"Cleaned institution name: '{text}' → '{cleaned}'") return cleaned # ============ Similarity and Matching Functions ============ def calculate_similarity(str1: str, str2: str) -> float: """Calculate similarity percentage using Levenshtein distance""" if not str1 or not str2: return 0.0 max_len = max(len(str1), len(str2)) if max_len == 0: return 100.0 edit_dist = levenshtein_distance(str1, str2) similarity = (1 - edit_dist / max_len) * 100 return round(similarity, 2) def classify_match(extracted: Optional[str], expected: str) -> Dict[str, Any]: """Classify match type between extracted and expected values""" if extracted is None: return { 'match_type': 'no_match', 'similarity': 0.0, 'edit_distance': len(expected) } similarity = calculate_similarity(extracted, expected) edit_dist = levenshtein_distance(extracted, expected) if similarity == 100.0: match_type = 'exact' elif similarity >= SIMILARITY_THRESHOLD: match_type = 'partial' else: match_type = 'no_match' return { 'match_type': match_type, 'similarity': similarity, 'edit_distance': edit_dist } # ============ PDF Processing Functions ============ def extract_pdf_page(pdf_path: str, page_num: int = 0) -> Optional[np.ndarray]: """Extract a page from PDF as image""" try: doc = fitz.open(pdf_path) page = doc.load_page(page_num) pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n) # Convert to BGR format for OpenCV if pix.n == 4: # RGBA img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR) elif pix.n == 3: # RGB img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) elif pix.n == 1: # Grayscale img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) else: logger.warning(f"Unexpected number of channels: {pix.n}") # Assume RGB and convert if pix.n >= 3: img = img[:, :, :3] img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) return img except Exception as e: logger.error(f"Failed to extract page from {pdf_path}: {e}") return None def process_single_pdf(pdf_name: str, expected_cma: str, expected_inst: str, pdf_dir: Path, output_dir: Path, ocr_engine, ocr_model="ppocr_v5", vl_pipeline=None) -> Dict[str, Any]: """ Process a single PDF for CMA and institution extraction. Args: pdf_name: Name of PDF file expected_cma: Expected CMA code from ground truth expected_inst: Expected institution name from ground truth pdf_dir: Directory containing PDFs output_dir: Output directory for results ocr_engine: Global PaddleOCR instance (not currently used) ocr_model: OCR model to use ("ppocr_v5" or "paddleocr_vl") vl_pipeline: PaddleOCRVL pipeline (required if ocr_model="paddleocr_vl") Returns: Result dictionary with extraction and comparison data """ pdf_path = pdf_dir / pdf_name pdf_output_dir = output_dir / pdf_name result = { 'pdf_name': pdf_name, 'expected': { 'cma': expected_cma, 'institution': expected_inst }, 'extracted': { 'cma': None, 'institution': None, 'cma_confidence': 0.0, 'cma_success': False, 'institutions_from_seals': [] }, 'comparison': { 'cma': {}, 'institution': {} }, 'performance': { 'total_time': 0.0, 'cma_time': 0.0, 'seal_time': 0.0 }, 'seal_results': [], 'status': 'success', 'error': None, 'file_size': 0 } # Check file exists if not pdf_path.exists(): result['status'] = 'file_not_found' result['error'] = f"PDF file not found: {pdf_path}" logger.warning(result['error']) return result result['file_size'] = pdf_path.stat().st_size # Clean output directory to ensure fresh processing if pdf_output_dir.exists(): import shutil try: shutil.rmtree(pdf_output_dir) logger.info(f"Cleaned existing output directory: {pdf_output_dir}") except Exception as e: logger.warning(f"Failed to clean output directory: {e}") # Create fresh output directory pdf_output_dir.mkdir(parents=True, exist_ok=True) total_start = time.time() # Extract page logger.info(f"Extracting page 1 from {pdf_name}...") page_img = extract_pdf_page(str(pdf_path), page_num=0) if page_img is None: result['status'] = 'extraction_failed' result['error'] = "Failed to extract page from PDF" return result # Extract CMA code logger.info(f"Running CMA extraction on {pdf_name}...") cma_start = time.time() cma_result = extract_cma_code_fullpage(page_img, ocr_engine, output_dir=str(pdf_output_dir)) result['performance']['cma_time'] = time.time() - cma_start result['extracted']['cma'] = cma_result['code'] result['extracted']['cma_confidence'] = cma_result['confidence'] result['extracted']['cma_success'] = cma_result['success'] # Compare CMA if expected_cma == "无": result['comparison']['cma']['notes'] = "Ground truth marked as 'None'" else: comparison = classify_match(cma_result['code'], expected_cma) result['comparison']['cma'] = comparison # Extract seals and institutions logger.info(f"Running seal extraction on {pdf_name}...") seal_start = time.time() seal_result = extract_seals_and_institutions(page_img, str(pdf_output_dir), ocr_model=ocr_model, vl_pipeline=vl_pipeline) result['performance']['seal_time'] = time.time() - seal_start result['seal_results'] = seal_result['seals'] result['extracted']['institutions_from_seals'] = seal_result['institutions'] # Select best institution match if seal_result['institutions']: logger.info(f" Institution Extraction:") logger.info(f" - Expected: {expected_inst if expected_inst else 'N/A'}") logger.info(f" - Found {len(seal_result['institutions'])} institution(s) from seals") # Find best matching institution best_inst = None best_similarity = 0.0 for idx, inst in enumerate(seal_result['institutions']): if expected_inst and expected_inst != "无": sim = calculate_similarity(inst, expected_inst) logger.info(f" - Inst #{idx+1}: '{inst[:50]}...' → Similarity: {sim:.1f}%") if sim > best_similarity: best_similarity = sim best_inst = inst logger.info(f" → New best match! ({sim:.1f}% > {best_similarity:.1f}%)") elif not best_inst: best_inst = inst logger.info(f" - Inst #{idx+1}: '{inst[:50]}...' (no expected value for comparison)") # Fallback: if best_inst is still None (all similarities were 0), use first institution if best_inst is None and seal_result['institutions']: best_inst = seal_result['institutions'][0] logger.warning(f" - All similarities were 0%, using first institution: '{best_inst[:50]}...'") logger.info(f" - Selected: '{best_inst[:50]}...' (similarity: {best_similarity:.1f}%)") result['extracted']['institution'] = best_inst # Compare institution if expected_inst and expected_inst != "无": inst_comparison = classify_match(best_inst, expected_inst) result['comparison']['institution'] = inst_comparison else: result['comparison']['institution']['notes'] = "No expected institution" result['performance']['total_time'] = time.time() - total_start return result def generate_individual_report(result: Dict[str, Any], output_dir: Path): """Generate individual HTML report for a single PDF""" pdf_name = result['pdf_name'] expected_cma = result['expected']['cma'] expected_inst = result['expected']['institution'] extracted_cma = result['extracted']['cma'] extracted_inst = result['extracted']['institution'] cma_match = result['comparison'].get('cma', {}).get('match_type', 'no_match') cma_sim = result['comparison'].get('cma', {}).get('similarity', 0) inst_match = result['comparison'].get('institution', {}).get('match_type', 'no_match') inst_sim = result['comparison'].get('institution', {}).get('similarity', 0) total_time = result['performance']['total_time'] # Colors cma_color = '#4caf50' if cma_match == 'exact' else '#ff9800' if cma_match == 'partial' else '#f44336' inst_color = '#4caf50' if inst_match == 'exact' else '#ff9800' if inst_match == 'partial' else '#f44336' # Build seals HTML seals_html = "" if result['seal_results']: seals_html = "
Recognized Text: {text}
Confidence: {seal['confidence']:.2%}
Status: {status}
Marked:
Unwarped:
{f'PDF: {pdf_name}
Processing Time: {total_time:.2f}s
CMA Detection:
Layout Detection:
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Average processing time: {avg_time:.1f}s per PDF
| Expected CMA | Extracted CMA | CMA Match | Expected Inst | Extracted Inst | Inst Match | Seals | Time | |
|---|---|---|---|---|---|---|---|---|
| {r['pdf_name']} | {r['expected']['cma']} | {r['extracted']['cma'] or 'N/A'} | {cma_symbol} | {r['expected']['institution'][:30]}... | {(r['extracted']['institution'] or 'N/A')[:30]}... | {inst_symbol} | {seals_count} | {r['performance']['total_time']:.1f}s |