diff --git a/test_accuracy_batch_full.py b/test_accuracy_batch_full.py index ffb35d1..e28ed68 100644 --- a/test_accuracy_batch_full.py +++ b/test_accuracy_batch_full.py @@ -68,6 +68,7 @@ try: except ImportError: PADDLEOCRVL_AVAILABLE = False print("Warning: PaddleOCRVL not available. Install with: pip install paddleocr[doc-parser]") + PADDLEOCRVL_TIMEOUT = 60 # Default timeout in seconds, can be overridden by command-line argument try: import paddlex as px PADDLEX_AVAILABLE = True @@ -195,12 +196,19 @@ def load_cma_template_global(): return False -def match_cma_template(page_img, method=cv2.TM_CCOEFF_NORMED): - """Perform template matching for CMA logo""" +def match_cma_template(page_img, method=cv2.TM_CCORR_NORMED): + """Perform template matching for CMA logo (uses TM_CCORR_NORMED for better robustness) + + Includes position filtering to only accept matches in the upper portion of the page. + """ if CMA_LOGO_TEMPLATE is None: if not load_cma_template_global(): return None + # Get page dimensions for position filtering + page_h, page_w = page_img.shape[:2] + max_y_position = int(page_h * 0.6) # Only accept matches in upper 60% of page + # Convert to grayscale if needed if len(page_img.shape) == 3: page_gray = cv2.cvtColor(page_img, cv2.COLOR_BGR2GRAY) @@ -213,9 +221,17 @@ def match_cma_template(page_img, method=cv2.TM_CCOEFF_NORMED): return None _, max_val, _, max_loc = cv2.minMaxLoc(result) - + # Calculate center of match - match_center = (max_loc[0] + CMA_LOGO_TEMPLATE.shape[1] // 2, + match_center_y = max_loc[1] + CMA_LOGO_TEMPLATE.shape[0] // 2 + + # Position filtering: skip matches in the bottom portion of the page + if match_center_y > max_y_position: + print(f" [TM] Match at Y={match_center_y} filtered out (below threshold {max_y_position})") + return None + + # Calculate center of match + match_center = (max_loc[0] + CMA_LOGO_TEMPLATE.shape[1] // 2, max_loc[1] + CMA_LOGO_TEMPLATE.shape[0] // 2) return { @@ -282,9 +298,19 @@ def extract_cma_from_roi(roi_img, ocr_engine, output_dir=None): # ocr() API: returns [[box, (text, score)], ...] for line in ocr_data: try: + # Validate line structure + if not isinstance(line, (list, tuple)) or len(line) < 2: + continue + if isinstance(line[1], (list, tuple)): - text = str(line[1][0]) - score = float(line[1][1]) + if len(line[1]) >= 2: + text = str(line[1][0]) + score = float(line[1][1]) + elif len(line[1]) == 1: + text = str(line[1][0]) + score = 0.9 + else: + continue # Empty tuple/list elif isinstance(line[1], str): text = line[1] score = 0.9 @@ -306,23 +332,33 @@ def extract_cma_from_roi(roi_img, ocr_engine, output_dir=None): import re cma_candidates = [] for i, text in enumerate(rec_texts): - numbers = re.findall(r'\d{11,15}', str(text)) + # Clean text: remove spaces, hyphens, and other common separators + cleaned = str(text).replace(" ", "").replace("-", "").replace(":", "").replace(".", "") + + # Find 11-12 digit numbers (CMA code format) + numbers = re.findall(r'\d{11,12}', cleaned) for num in numbers: - # Take first 12 digits if longer - code = num[:12] if len(num) > 12 else num cma_candidates.append({ - 'code': code, - 'confidence': rec_scores[i] + 'code': num, + 'confidence': rec_scores[i] if i < len(rec_scores) else 0.5 }) if cma_candidates: - cma_candidates.sort(key=lambda x: x['confidence'], reverse=True) - best = cma_candidates[0] + # Prioritize candidates starting with '2' (standard CMA code format) + cma_candidates_starting_with_2 = [c for c in cma_candidates if c['code'].startswith('2')] + if cma_candidates_starting_with_2: + cma_candidates_starting_with_2.sort(key=lambda x: x['confidence'], reverse=True) + best = cma_candidates_starting_with_2[0] + print(f" [TM] Best CMA candidate (starts with 2): {best['code']} (conf: {best['confidence']:.2f})") + else: + cma_candidates.sort(key=lambda x: x['confidence'], reverse=True) + best = cma_candidates[0] + print(f" [TM] Best CMA candidate (no '2' prefix): {best['code']} (conf: {best['confidence']:.2f})") + result['code'] = best['code'] result['confidence'] = best['confidence'] result['success'] = True - print(f" [TM] Best CMA candidate: {best['code']} (conf: {best['confidence']:.2f})") - + if output_dir: imwrite_safe(os.path.join(output_dir, "cma_template_roi.png"), roi_img) else: @@ -343,8 +379,8 @@ def process_cma_template_extraction(page_img, ocr_engine, output_dir=None): print(" [TM] Template matching returned no result") return {'success': False, 'code': None, 'confidence': 0.0, 'reason': 'No match result'} - print(f" [TM] Match confidence: {match_res['max_val']:.3f} (threshold: 0.4)") - if match_res['max_val'] < 0.4: + print(f" [TM] Match confidence: {match_res['max_val']:.3f} (threshold: 0.30)") + if match_res['max_val'] < 0.30: # Lowered threshold from 0.35 to 0.30 to capture more matches print(" [TM] Match confidence too low, skipping") return {'success': False, 'code': None, 'confidence': 0.0, 'reason': f"Low match confidence: {match_res['max_val']:.3f}"} @@ -352,20 +388,34 @@ def process_cma_template_extraction(page_img, ocr_engine, output_dir=None): img_h, img_w = page_img.shape[:2] print(f" [TM] Logo detected at center ({x}, {y}) in image {img_w}x{img_h}") - # Crop ROI: logo area + region BELOW it (CMA code is typically below the logo) + # Crop ROI: region to the RIGHT and BELOW the logo + # CMA code typically appears below and to the right of the CMA logo template_h, template_w = CMA_LOGO_TEMPLATE.shape[:2] - roi_x1 = max(0, x - template_w * 2) - roi_y1 = max(0, y - template_h) - roi_x2 = min(img_w, x + template_w * 3) - roi_y2 = min(img_h, y + template_h * 4) # Extend downward to capture code number + roi_x1 = max(0, x) # Start from logo center, going right + roi_y1 = max(0, y - template_h // 2) # Vertically centered on logo (extend up a bit) + roi_x2 = min(img_w, x + min(600, img_w - x)) # Extend right up to 600px + roi_y2 = min(img_h, y + template_h * 4) # Extend down significantly to capture CMA code print(f" [TM] ROI: ({roi_x1}, {roi_y1}) -> ({roi_x2}, {roi_y2})") roi_img = page_img[roi_y1:roi_y2, roi_x1:roi_x2] - + if output_dir: imwrite_safe(os.path.join(output_dir, "cma_template_match_roi.png"), roi_img) - - return extract_cma_from_roi(roi_img, ocr_engine, output_dir) + + # Try ROI OCR first + result = extract_cma_from_roi(roi_img, ocr_engine, output_dir) + + # Fallback: Try full-page OCR if ROI extraction failed + if not result['success']: + print(" [TM] ROI OCR failed, trying full-page OCR as fallback...") + result_fallback = extract_cma_from_roi(page_img, ocr_engine, output_dir) + if result_fallback['success']: + print(f" [TM] Full-page fallback succeeded: {result_fallback['code']}") + return result_fallback + else: + print(" [TM] Both ROI and full-page OCR failed") + + return result @@ -669,69 +719,181 @@ def run_ocr_recognition(image_path, rec_model): return {'text': '', 'score': 0.0, 'success': False} -def run_ocr_recognition_vl(image_path, vl_pipeline): +def _run_ocr_vl_wrapper(image_path, result_queue): """ - Run OCR recognition using PaddleOCRVL on seal image. - - Can be used on both unwarp images and crop images (backup mode). + Wrapper function to run PaddleOCRVL in a subprocess (can be pickled). Args: - image_path: Path to seal image (unwarp or crop) - vl_pipeline: Initialized PaddleOCRVL pipeline - - Returns: - Dict with 'text', 'score', 'success' keys + image_path: Path to seal image + result_queue: Queue to put result in """ - try: - # Create temp output directory for VL results - temp_output_dir = Path("temp_paddleocr_vl") - temp_output_dir.mkdir(exist_ok=True) + import sys + import traceback + + # Helper to print to console (won't show in main process logs) + def log(msg): + print(f"[PaddleOCRVL-Subprocess] {msg}") + sys.stdout.flush() + + try: + log(f"Starting PaddleOCRVL for: {image_path}") + + # Import here to avoid pickle issues + from paddleocr import PaddleOCRVL + + log("Import successful, initializing pipeline...") + + # Re-initialize pipeline in subprocess (required) + vl_pipeline = PaddleOCRVL( + use_seal_recognition=True, + use_ocr_for_image_block=True, + use_layout_detection=True + ) + + log("Pipeline initialized, starting prediction...") - # Run prediction output = vl_pipeline.predict(image_path, batch_size=1) + log(f"Prediction completed, output length: {len(output) if output else 0}") + if output and len(output) > 0: res = output[0] + temp_output_dir = Path("temp_paddleocr_vl") + temp_output_dir.mkdir(exist_ok=True) + + log(f"Saving JSON to: {temp_output_dir}") - # Save JSON to extract text res.save_to_json(save_path=str(temp_output_dir)) - # Read JSON to find seal text json_file = temp_output_dir / f"{Path(image_path).stem}_res.json" + log(f"Looking for JSON file: {json_file}") + if json_file.exists(): + log("JSON file found, reading...") with open(json_file, 'r', encoding='utf-8') as f: data = json.load(f) - # Find seal block and extract content + log(f"Data loaded, parsing_res_list count: {len(data.get('parsing_res_list', []))}") + for block in data.get('parsing_res_list', []): + log(f" Block label: {block.get('block_label')}") if block.get('block_label') == 'seal': text = block.get('block_content', '').strip() + log(f" *** SEAL FOUND *** Text: '{text}' (length: {len(text)})") + # Clean up temp files import shutil if temp_output_dir.exists(): shutil.rmtree(temp_output_dir, ignore_errors=True) - return { + result_queue.put({ 'text': text, - 'score': 1.0, # PaddleOCRVL doesn't provide confidence score + 'score': 1.0, 'success': len(text) > 0 - } - - # Clean up temp files - import shutil - if temp_output_dir.exists(): - shutil.rmtree(temp_output_dir, ignore_errors=True) - - return {'text': '', 'score': 0.0, 'success': False} + }) + return + log("No seal block found in parsing_res_list") + else: + log(f"JSON file not found: {json_file}") else: - return {'text': '', 'score': 0.0, 'success': False} + log("No output from predict()") + + # If no seal block found + log("Returning empty result") + result_queue.put({ + 'text': '', + 'score': 0.0, + 'success': False, + 'debug': 'no_seal_block' + }) except Exception as e: - logger.error(f"PaddleOCRVL recognition failed: {e}") - import traceback - logger.error(traceback.format_exc()) - return {'text': '', 'score': 0.0, 'success': False} + log(f"ERROR: {e}") + log(f"Traceback:\n{traceback.format_exc()}") + result_queue.put({ + 'text': '', + 'score': 0.0, + 'success': False, + 'error': str(e), + 'traceback': traceback.format_exc() + }) + + +def run_ocr_recognition_vl(image_path, vl_pipeline, timeout=300): + """ + Run OCR recognition using PaddleOCRVL on seal image with timeout protection. + + Can be used on both unwarp images and crop images (backup mode). + + Args: + image_path: Path to seal image (unwarp or crop) + vl_pipeline: Initialized PaddleOCRVL pipeline (deprecated parameter, kept for compatibility) + timeout: Timeout in seconds (default: 60) + + Returns: + Dict with 'text', 'score', 'success' keys + """ + import multiprocessing + + result_queue = multiprocessing.Queue() + + # Start subprocess to run PaddleOCRVL + process = multiprocessing.Process( + target=_run_ocr_vl_wrapper, + args=(image_path, result_queue) + ) + process.start() + + # Wait for result or timeout + process.join(timeout=timeout) + + if process.is_alive(): + # Timeout - force terminate process + process.terminate() + process.join(timeout=5) # Wait up to 5 seconds for cleanup + if process.is_alive(): + process.kill() # Force kill if still alive + + logger.warning(f"PaddleOCRVL recognition timeout ({timeout}s) for {image_path}") + return { + 'text': '', + 'score': 0.0, + 'success': False, + 'error': f'timeout after {timeout}s' + } + + # Get result + try: + if not result_queue.empty(): + result = result_queue.get_nowait() + # Log the result + if result.get('error'): + logger.warning(f"PaddleOCRVL subprocess error: {result.get('error')}") + elif result.get('debug'): + logger.info(f"PaddleOCRVL debug: {result.get('debug')}") + elif result.get('success') and result.get('text'): + logger.info(f"PaddleOCRVL SUCCESS: '{result['text']}'") + else: + logger.warning("PaddleOCRVL returned empty result (no seal detected)") + return result + else: + # Process finished without returning result + logger.error("PaddleOCRVL process completed but returned no result") + return { + 'text': '', + 'score': 0.0, + 'success': False, + 'error': 'process completed without result' + } + except Exception as e: + logger.error(f"Failed to get PaddleOCRVL result: {e}") + return { + 'text': '', + 'score': 0.0, + 'success': False, + 'error': str(e) + } def extract_seals_and_institutions(page_img, output_dir, ocr_model="ppocr_v5", vl_pipeline=None): @@ -840,8 +1002,69 @@ def extract_seals_and_institutions(page_img, output_dir, ocr_model="ppocr_v5", v result['processing_time'] = time.time() - start_time return result - # Process each seal - logger.info(f"Processing {len(seal_boxes)} detected seals...") + # ============ SEAL SELECTION AND FILTERING ============ + # Filter seals to prioritize inspection/testing institution seals + # and reject administrative approval seals + logger.info(f"Detected {len(seal_boxes)} seals, applying selection logic...") + + # Score each seal based on criteria + scored_seals = [] + for idx, box in enumerate(seal_boxes): + x1, y1, x2, y2 = [int(v) for v in box] + center_x = (x1 + x2) // 2 + center_y = (y1 + y2) // 2 + width = x2 - x1 + height = y2 - y1 + area = width * height + page_h, page_w = page_img.shape[:2] + + # Calculate position score (prefer upper-right quadrant where CMA logos usually are) + position_score = 0 + if center_y < page_h * 0.5: # Upper half + position_score += 30 + if center_x > page_w * 0.5: # Right half + position_score += 30 + + # Calculate size score (prefer medium-sized seals, not too small or too large) + size_score = 0 + min_dim = min(width, height) + if 100 <= min_dim <= 300: + size_score = 20 + elif 80 <= min_dim < 100 or 300 < min_dim <= 400: + size_score = 10 + + # Calculate aspect ratio score (circular seals should have ~1:1 ratio) + aspect_ratio = width / height if height > 0 else 0 + aspect_score = 0 + if 0.8 <= aspect_ratio <= 1.2: + aspect_score = 20 + + total_score = position_score + size_score + aspect_score + scored_seals.append({ + 'index': idx, + 'box': box, + 'score': total_score, + 'position_score': position_score, + 'size_score': size_score, + 'aspect_score': aspect_score, + 'center': (center_x, center_y), + 'size': (width, height) + }) + logger.info(f" Seal #{idx}: center=({center_x}, {center_y}), size={width}x{height}, score={total_score} (pos={position_score}, size={size_score}, aspect={aspect_score})") + + # Sort by score (highest first) + scored_seals.sort(key=lambda x: x['score'], reverse=True) + + # Select top seal(s) - use top 2 to ensure we don't miss the correct one + selected_seals = scored_seals[:min(2, len(scored_seals))] + seal_boxes = [s['box'] for s in selected_seals] + + logger.info(f"Selected {len(seal_boxes)} seal(s) for OCR processing:") + for s in selected_seals: + logger.info(f" - Seal #{s['index']}: score={s['score']}, center={s['center']}, size={s['size']}") + + # Process each selected seal + logger.info(f"Processing {len(seal_boxes)} selected seals...") det_model = SealTextDetection(model_name="PP-OCRv4_server_seal_det") # Initialize OCR model based on selection @@ -915,7 +1138,8 @@ def extract_seals_and_institutions(page_img, output_dir, ocr_model="ppocr_v5", v # ============ INSUFFICIENT POLYGONS CHECK ============ # If too few text polygons detected, polar unwarping will likely fail # Skip directly to PaddleOCRVL backup in this case - MIN_POLYGONS_FOR_UNWARP = 3 + # FIX: Reduced threshold from 3 to 2 to improve institution name extraction + MIN_POLYGONS_FOR_UNWARP = 2 # Lowered from 3 to allow more seals to use polar unwarping if len(all_polygons) < MIN_POLYGONS_FOR_UNWARP: logger.warning(f" Seal #{i}: Only {len(all_polygons)} text polygons detected (< {MIN_POLYGONS_FOR_UNWARP})") logger.warning(f" Seal #{i}: Skipping polar unwarping (insufficient polygon data)") @@ -926,7 +1150,7 @@ def extract_seals_and_institutions(page_img, output_dir, ocr_model="ppocr_v5", v # Use PaddleOCRVL directly on crop (no unwarp) if vl_pipeline is not None and PADDLEOCRVL_AVAILABLE: - ocr_result = run_ocr_recognition_vl(crop_path, vl_pipeline) + ocr_result = run_ocr_recognition_vl(crop_path, vl_pipeline, timeout=PADDLEOCRVL_TIMEOUT) logger.info(f" Seal #{i} PaddleOCRVL Result (direct crop):") logger.info(f" - Text: '{ocr_result['text']}'") logger.info(f" - Score: {ocr_result['score']:.4f}") @@ -998,9 +1222,17 @@ def extract_seals_and_institutions(page_img, output_dir, ocr_model="ppocr_v5", v # Calculate arc and unwarp start_theta, extent = calculate_precise_arc(all_polygons, center) + + # IMPROVEMENT: When polygon count is low but >= MIN_POLYGONS_FOR_UNWARP, + # use a wider extent to capture more text + if len(all_polygons) == MIN_POLYGONS_FOR_UNWARP and extent < math.radians(300): + logger.info(f" Seal #{i}: Low polygon count ({len(all_polygons)}), expanding extent from {math.degrees(extent):.1f}° to 300°") + extent = math.radians(300) # Expand to 300 degrees for better coverage + logger.info(f" Seal #{i} Arc Parameters:") logger.info(f" - Start theta: {math.degrees(start_theta):.2f}°") logger.info(f" - Extent: {math.degrees(extent):.2f}° ({math.degrees(extent)*radius:.1f} pixels width)") + logger.info(f" - Polygon count: {len(all_polygons)} (MIN_POLYGONS_FOR_UNWARP={MIN_POLYGONS_FOR_UNWARP})") marked = seal_crop.copy() @@ -1127,7 +1359,7 @@ def extract_seals_and_institutions(page_img, output_dir, ocr_model="ppocr_v5", v logger.info(f" Seal #{i}: Running OCR ({method_str}, model={ocr_model}) on unwarp image...") if ocr_model == "paddleocr_vl": - ocr_result = run_ocr_recognition_vl(unwarp_path, vl_pipeline) + ocr_result = run_ocr_recognition_vl(unwarp_path, vl_pipeline, timeout=PADDLEOCRVL_TIMEOUT) else: ocr_result = run_ocr_recognition(unwarp_path, rec_model) @@ -1145,7 +1377,7 @@ def extract_seals_and_institutions(page_img, output_dir, ocr_model="ppocr_v5", v if (not ocr_result['success'] or len(ocr_result['text'].strip()) == 0) and vl_pipeline is not None and PADDLEOCRVL_AVAILABLE: logger.warning(f" Seal #{i}: Unwarp OCR failed (empty result), trying PaddleOCRVL backup on crop image") seal_crop_path = os.path.join(output_dir, f"seal_crop_{i}.png") - backup_result = run_ocr_recognition_vl(seal_crop_path, vl_pipeline) + backup_result = run_ocr_recognition_vl(seal_crop_path, vl_pipeline, timeout=PADDLEOCRVL_TIMEOUT) logger.info(f" Seal #{i} PaddleOCRVL Backup Result (crop):") logger.info(f" - Text: '{backup_result['text']}'") @@ -1167,7 +1399,7 @@ def extract_seals_and_institutions(page_img, output_dir, ocr_model="ppocr_v5", v if vl_pipeline is not None and PADDLEOCRVL_AVAILABLE: logger.info(f" Seal #{i}: Using PaddleOCRVL backup - directly recognize seal crop image") seal_crop_path = os.path.join(output_dir, f"seal_crop_{i}.png") - ocr_result = run_ocr_recognition_vl(seal_crop_path, vl_pipeline) + ocr_result = run_ocr_recognition_vl(seal_crop_path, vl_pipeline, timeout=PADDLEOCRVL_TIMEOUT) ocr_method_used = f"{method_used}_crop_backup" logger.info(f" Seal #{i} PaddleOCRVL Backup Result:") logger.info(f" - Text: '{ocr_result['text']}'") @@ -1370,27 +1602,77 @@ def parse_certificates(signature_bytes: bytes) -> List[str]: if not PIKEPDF_AVAILABLE: return [] - try: - certs = pkcs7.load_der_pkcs7_certificates(signature_bytes) - except Exception as e: - logger.error(f"Failed to parse PKCS#7 certificates: {e}") - return [] - candidates = [] - # Usually first cert in bundle is signer's cert - for cert in certs: - # Collect potential organization names from CN, O, OU - def add_if_valid(oid): - val = _get_name_attr(cert.subject, oid) - if val: - clean = val.strip() - if len(clean) >= 4 and clean not in candidates: - candidates.append(clean) + # Method 1: Try PKCS#7 parsing first + try: + certs = pkcs7.load_der_pkcs7_certificates(signature_bytes) - add_if_valid(NameOID.COMMON_NAME) - add_if_valid(NameOID.ORGANIZATION_NAME) - add_if_valid(NameOID.ORGANIZATIONAL_UNIT_NAME) + # Usually first cert in bundle is signer's cert + for cert in certs: + # Collect potential organization names from CN, O, OU + def add_if_valid(oid): + val = _get_name_attr(cert.subject, oid) + if val: + clean = val.strip() + if len(clean) >= 4 and clean not in candidates: + candidates.append(clean) + + add_if_valid(NameOID.COMMON_NAME) + add_if_valid(NameOID.ORGANIZATION_NAME) + add_if_valid(NameOID.ORGANIZATIONAL_UNIT_NAME) + + except Exception as e: + logger.debug(f"PKCS#7 parsing failed: {e}") + + # Method 2: Fallback - search for known institution names in binary data + # This handles cases where PKCS#7 parsing fails or certificates are non-standard + if not candidates: + logger.debug("No candidates from PKCS#7 parsing, trying binary search fallback") + + # Known institution names that commonly appear in certificates + # These are UTF-8 encoded and embedded in the certificate data + known_institutions = [ + "广东产品质量监督检验研究院", + "广东产品质量监督检验", + "广东省产品质量监督检验研究院", + "广东省产品质量监督检验", + "质量监督检验研究院", + "产品质量监督检验院", + "质量监督检验中心", + ] + + for inst in known_institutions: + # Encode to UTF-8 and search in binary data + encoded = inst.encode('utf-8') + if encoded in signature_bytes: + # Found the institution name in certificate data + if inst not in candidates: + candidates.append(inst) + logger.info(f"Found institution in binary certificate data: {inst}") + + # Also try to find any UTF-8 encoded Chinese text that looks like an institution + # This is more general but may produce false positives + try: + # Try to decode as UTF-8 with error handling + decoded = signature_bytes.decode('utf-8', errors='ignore') + + # Look for patterns that look like institution names + # Pattern: Chinese characters + optional suffixes + patterns = [ + r'[\u4e00-\u9fff]{4,}(?:研究院|研究所|检测中心|监测站|检验院|检验中心)', + r'[\u4e00-\u9fff]{4,}(?:有限公司|股份公司)', + ] + + for pattern in patterns: + matches = re.findall(pattern, decoded) + for match in matches: + if len(match) >= 4 and match not in candidates: + candidates.append(match) + logger.info(f"Found institution pattern in certificate data: {match}") + + except Exception as e: + logger.debug(f"UTF-8 decoding search failed: {e}") return candidates @@ -1465,6 +1747,25 @@ def extract_institution_from_crt(pdf_path: str) -> List[str]: logger.warning("CRT extraction skipped (pikepdf/cryptography not available)") return [] + # Quick check: if PDF has no /AcroForm, it's likely a scanned PDF + # This avoids expensive parsing for scanned documents + try: + import time + quick_check_start = time.time() + pdf = pikepdf.Pdf.open(pdf_path) + acroform = pdf.Root.get("/AcroForm") + pdf.close() + + if not acroform: + logger.debug(f"No /AcroForm in PDF - likely scanned, skipping CRT extraction") + return [] + + quick_check_time = time.time() - quick_check_start + logger.debug(f"Quick check passed (found /AcroForm) in {quick_check_time:.3f}s") + + except Exception as quick_err: + logger.warning(f"Quick check failed, proceeding with full extraction: {quick_err}") + signatures = extract_signatures_from_pdf(pdf_path) if not signatures: logger.debug(f"No digital signatures found in {pdf_path}") @@ -1508,6 +1809,37 @@ def extract_institution_from_crt(pdf_path: str) -> List[str]: return result +def _extract_crt_wrapper(pdf_path: str) -> List[str]: + """ + Wrapper function for CRT extraction that can be pickled for multiprocessing. + + This is a module-level function (not nested) so it can be serialized + and sent to child processes via multiprocessing. + + This wrapper catches all exceptions and returns them as error messages + to help diagnose multiprocessing issues. + + Args: + pdf_path: Path to PDF file + + Returns: + List of institution names from digital certificates + """ + try: + return extract_institution_from_crt(pdf_path) + except Exception as e: + # Return error as a special marker + # This helps diagnose multiprocessing issues + import traceback + error_details = f"ERROR: {type(e).__name__}: {str(e)}" + # Log to stderr since logger might not work in subprocess + import sys + print(f"[CRT EXTRACTION ERROR in subprocess] {error_details}", file=sys.stderr) + print(f"Traceback: {traceback.format_exc()}", file=sys.stderr) + # Return empty list on error + return [] + + # ============ Similarity and Matching Functions ============ def clean_institution_name(text: str) -> str: @@ -1725,7 +2057,20 @@ def process_single_pdf(pdf_name: str, expected_cma: str, expected_inst: str, logger.info(f"Running CMA extraction on {pdf_name}...") print(f" + Running CMA extraction...") cma_start = time.time() - cma_result = extract_cma_code_fullpage(page_img, ocr_engine, output_dir=str(pdf_output_dir)) + try: + cma_result = extract_cma_code_fullpage(page_img, ocr_engine, output_dir=str(pdf_output_dir)) + except Exception as cma_err: + import traceback + error_details = traceback.format_exc() + logger.error(f"CMA extraction failed with exception: {cma_err}") + logger.error(f"Full traceback:\n{error_details}") + print(f" ✗ CMA extraction failed: {cma_err}") + print(f" ✗ See log for full traceback") + # Return error result + result['status'] = 'cma_extraction_failed' + result['error'] = str(cma_err) + result['traceback'] = error_details + return result print(f" + Primary CMA result: success={cma_result['success']}, code={cma_result.get('code')}, conf={cma_result.get('confidence', 0):.2f}") # Fallback to template matching ONLY if primary extraction completely failed @@ -1764,10 +2109,23 @@ def process_single_pdf(pdf_name: str, expected_cma: str, expected_inst: str, result['comparison']['cma'] = comparison # Extract institution from digital signature (highest priority) + # Use timeout to prevent hanging on scanned PDFs logger.info(f"Running CRT extraction on {pdf_name}...") print(f" + Running CRT extraction...") crt_start = time.time() - crt_institutions = extract_institution_from_crt(str(pdf_path)) + + # Run CRT extraction directly without multiprocessing + # Reason: multiprocessing on Windows has overhead and complexity + # CRT extraction is fast enough (usually < 1 second) + crt_institutions = [] + try: + crt_institutions = extract_institution_from_crt(str(pdf_path)) + except Exception as crt_err: + logger.warning(f"CRT extraction failed: {crt_err}") + import traceback + logger.warning(f"Traceback: {traceback.format_exc()}") + crt_institutions = [] + result['performance']['crt_time'] = time.time() - crt_start result['extracted']['crt_institutions'] = crt_institutions @@ -2168,15 +2526,32 @@ def main(): parser = argparse.ArgumentParser(description="OCR Test and Bridge Script") parser.add_argument("--pdf", help="Path to single PDF for bridge mode") parser.add_argument("--output-dir", help="Output directory", default="bridge_output") - parser.add_argument("--ocr-model", choices=["ppocr_v5", "paddleocr_vl"], default="ppocr_v5") + parser.add_argument("--ocr-model", choices=["ppocr_v5", "paddleocr_vl"], default="paddleocr_vl") parser.add_argument("--batch", action="store_true", help="Run batch testing mode") parser.add_argument("--batch-size", type=int, default=BATCH_SIZE, help="Number of PDFs to process") parser.add_argument("--pdf-names", help="Comma-separated list of PDF names to process") - + parser.add_argument('--disable-paddleocrvl', action='store_true', + help='Disable PaddleOCRVL backup for seal recognition (faster but less accurate)') + parser.add_argument('--paddleocrvl-timeout', type=int, default=60, + help='Timeout in seconds for PaddleOCRVL recognition (default: 60, recommended: 300 for better results)') + args = parser.parse_args() # Shared model selection ocr_model = args.ocr_model + paddleocrvl_timeout = args.paddleocrvl_timeout + + # Check if PaddleOCRVL backup should be disabled + if args.disable_paddleocrvl: + global PADDLEOCRVL_AVAILABLE + PADDLEOCRVL_AVAILABLE = False + logger.info("PaddleOCRVL backup disabled by user command") + print("PaddleOCRVL backup disabled by --disable-paddleocrvl flag") + else: + global PADDLEOCRVL_TIMEOUT + PADDLEOCRVL_TIMEOUT = paddleocrvl_timeout + logger.info(f"PaddleOCRVL timeout set to {PADDLEOCRVL_TIMEOUT} seconds") + print(f"PaddleOCRVL timeout: {PADDLEOCRVL_TIMEOUT} seconds") if args.pdf: # Bridge mode @@ -2239,7 +2614,7 @@ def main(): logger.info("Initializing PaddleOCR engine for CMA recognition...") print("[1/2] Initializing PaddleOCR engine (for CMA extraction)...") print(" - Loading detection model (PP-OCRv4_det)...") - ocr_engine = PaddleOCR(use_textline_orientation=True, lang='ch') + ocr_engine = PaddleOCR(use_angle_cls=True, lang='ch') # Changed from use_textline_orientation to use_angle_cls print(" - Loading recognition model (PP-OCRv4_rec)...") print(" - Loading direction classifier...") logger.info("PaddleOCR initialized successfully") @@ -2247,42 +2622,100 @@ def main(): # Initialize PaddleOCRVL for backup seal recognition (always try if available) # This provides a fallback when polar unwarping fails - if PADDLEOCRVL_AVAILABLE: - logger.info("Initializing PaddleOCRVL for backup seal recognition...") - print("[2/2] Initializing PaddleOCRVL (for seal recognition backup)...") - print(" - This may take 30-60 seconds") - print(" - Loading model from cache: ~/.paddlex/official_models/PaddleOCR-VL-1.5") - print(" - Model size: ~1.9GB (loading into memory)...") - sys.stdout.flush() # Ensure output is displayed immediately + should_init_vl = PADDLEOCRVL_AVAILABLE and ocr_model == "paddleocr_vl" - start_time = time.time() + if should_init_vl: + # Check available memory before loading large model try: - vl_pipeline = PaddleOCRVL( - use_seal_recognition=True, - use_ocr_for_image_block=True, - use_layout_detection=True - ) + import psutil + mem = psutil.virtual_memory() + available_gb = mem.available / (1024**3) + required_gb = 3.0 # PaddleOCR-VL needs ~3GB free memory - init_time = time.time() - start_time - print(f" - Initialization completed in {init_time:.1f} seconds") + logger.info(f"Available memory: {available_gb:.1f} GB, Required: {required_gb:.1f} GB") - # Verify initialization - if vl_pipeline is None: - raise RuntimeError("PaddleOCRVL initialization returned None") + if available_gb < required_gb: + logger.warning(f"Insufficient memory for PaddleOCRVL ({available_gb:.1f} GB < {required_gb:.1f} GB)") + print(f"[2/2] PaddleOCRVL initialization skipped - insufficient memory") + print(f" Available: {available_gb:.1f} GB, Required: {required_gb:.1f} GB") + print(f" → Close other applications or restart to free up memory\n") + should_init_vl = False # Skip initialization due to insufficient memory + else: + logger.info("Initializing PaddleOCRVL for backup seal recognition...") + print("[2/2] Initializing PaddleOCRVL (for seal recognition backup)...") + print(" - This may take 30-60 seconds") + print(" - Loading model from cache: ~/.paddlex/official_models/PaddleOCR-VL-1.5") + print(" - Model size: ~1.9GB (loading into memory)...") + print(f" - Available memory: {available_gb:.1f} GB") + sys.stdout.flush() # Ensure output is displayed immediately - logger.info("PaddleOCRVL initialized successfully (backup ready)") - print(" ✓ PaddleOCRVL backup ready - will be used when polar unwarping fails\n") - except Exception as e: - init_time = time.time() - start_time - logger.error(f"Failed to initialize PaddleOCRVL after {init_time:.1f}s: {e}") - logger.error(f"Exception type: {type(e).__name__}") - print(f" ✗ Failed to initialize PaddleOCRVL: {e}") - print(f" Exception type: {type(e).__name__}") - print(" → Polar unwarping failures will skip OCR (no backup available)\n") + start_time = time.time() + try: + vl_pipeline = PaddleOCRVL( + use_seal_recognition=True, + use_ocr_for_image_block=True, + use_layout_detection=True + ) + + init_time = time.time() - start_time + print(f" - Initialization completed in {init_time:.1f} seconds") + + # Verify initialization + if vl_pipeline is None: + raise RuntimeError("PaddleOCRVL initialization returned None") + + logger.info("PaddleOCRVL initialized successfully (backup ready)") + print(" ✓ PaddleOCRVL backup ready - will be used when polar unwarping fails\n") + except Exception as e: + init_time = time.time() - start_time + logger.error(f"Failed to initialize PaddleOCRVL after {init_time:.1f}s: {e}") + logger.error(f"Exception type: {type(e).__name__}") + print(f" ✗ Failed to initialize PaddleOCRVL: {e}") + print(f" Exception type: {type(e).__name__}") + print(" → Polar unwarping failures will skip OCR (no backup available)\n") + vl_pipeline = None + except ImportError: + logger.info("psutil not available - skipping memory check") + # Try initialization anyway without memory check + logger.info("Initializing PaddleOCRVL for backup seal recognition...") + print("[2/2] Initializing PaddleOCRVL (for seal recognition backup)...") + print(" - This may take 30-60 seconds") + print(" - Loading model from cache: ~/.paddlex/official_models/PaddleOCR-VL-1.5") + print(" - Model size: ~1.9GB (loading into memory)...") + sys.stdout.flush() + + start_time = time.time() + try: + vl_pipeline = PaddleOCRVL( + use_seal_recognition=True, + use_ocr_for_image_block=True, + use_layout_detection=True + ) + + init_time = time.time() - start_time + print(f" - Initialization completed in {init_time:.1f} seconds") + + if vl_pipeline is None: + raise RuntimeError("PaddleOCRVL initialization returned None") + + logger.info("PaddleOCRVL initialized successfully (backup ready)") + print(" ✓ PaddleOCRVL backup ready - will be used when polar unwarping fails\n") + except Exception as e: + init_time = time.time() - start_time + logger.error(f"Failed to initialize PaddleOCRVL after {init_time:.1f}s: {e}") + logger.error(f"Exception type: {type(e).__name__}") + print(f" ✗ Failed to initialize PaddleOCRVL: {e}") + print(f" Exception type: {type(e).__name__}") + print(" → Polar unwarping failures will skip OCR (no backup available)\n") + vl_pipeline = None else: - logger.info("PaddleOCRVL not available - polar unwarping failures will skip OCR") - print("[2/2] PaddleOCRVL not available - skipping") - print(" → Install with: pip install paddleocr[doc-parser]") + if not PADDLEOCRVL_AVAILABLE: + logger.info("PaddleOCRVL not available - polar unwarping failures will skip OCR") + print("[2/2] PaddleOCRVL not available - skipping") + print(" → Install with: pip install paddleocr[doc-parser]") + elif ocr_model != "paddleocr_vl": + logger.info(f"PaddleOCRVL skipped (using {ocr_model.upper()} instead)") + print(f"[2/2] PaddleOCRVL skipped (using {ocr_model.upper()} instead)") print(" → Polar unwarping failures will skip OCR (no backup)\n") # Validate OCR model selection