diff --git a/test_accuracy_batch_full.py b/test_accuracy_batch_full.py index 6b1a1c5..cac56f0 100644 --- a/test_accuracy_batch_full.py +++ b/test_accuracy_batch_full.py @@ -26,6 +26,14 @@ import math from pathlib import Path from datetime import datetime from typing import Dict, List, Tuple, Optional, Any + +# IMPORTANT: Set environment variables BEFORE any paddle imports! +# This prevents slow network checks and enables offline mode +os.environ["DISABLE_MODEL_SOURCE_CHECK"] = "True" +os.environ["PADDLE_PDX_DISABLE_MODEL_SOURCE_CHECK"] = "True" +os.environ["HUB_DISABLE_MODEL_SOURCE_CHECK"] = "True" +os.environ["PADDLEHUB_NO_FETCH_LATEST"] = "True" + import numpy as np # Set UTF-8 encoding for Windows console @@ -37,8 +45,6 @@ if sys.platform == 'win32': except: pass -os.environ["DISABLE_MODEL_SOURCE_CHECK"] = "True" - class NumpyEncoder(json.JSONEncoder): """Custom JSON encoder for numpy types""" @@ -62,18 +68,27 @@ try: except ImportError: PADDLEOCRVL_AVAILABLE = False print("Warning: PaddleOCRVL not available. Install with: pip install paddleocr[doc-parser]") - import paddlex as px + try: + import paddlex as px + PADDLEX_AVAILABLE = True + except ImportError: + PADDLEX_AVAILABLE = False + print("Warning: PaddleX not available. Layout detection will be disabled.") + print(" Install with: pip install paddlex") from Levenshtein import distance as levenshtein_distance except ImportError as e: print(f"Error: Required dependency not found: {e}") print("Please install: pip install python-Levenshtein paddleocr paddlex pymupdf-ng opencv-python numpy") sys.exit(1) +# Note: Import statements above may take 5-10 seconds on first run +# due to PaddleOCR/PaddleX library initialization + # Import CMA extraction module try: - from cma_extraction_final import extract_cma_code_fullpage, imread_unicode -except ImportError: - print("Error: cma_extraction_final.py not found in current directory") + from cma_extraction_robust import extract_cma_code_fullpage +except ImportError as e: + print(f"Error: Cannot import cma_extraction_robust.py: {e}") sys.exit(1) # Configure logging @@ -82,7 +97,7 @@ logging.basicConfig( format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('test_accuracy_full.log', encoding='utf-8'), - logging.StreamHandler() + logging.StreamHandler(sys.stderr) ] ) logger = logging.getLogger(__name__) @@ -98,6 +113,11 @@ SIMILARITY_THRESHOLD = 85.0 # Options: "ppocr_v5" (default), "paddleocr_vl" OCR_MODEL = os.environ.get("OCR_MODEL", "ppocr_v5") +# CMA Template Matching Configuration +CMA_LOGO_PATH = Path("template/CMA_Logo.png") +CMA_LOGO_TEMPLATE = None +CMA_LOGO_TEMPLATE_RGB = None + # ============ Helper Functions ============ @@ -132,6 +152,203 @@ def imwrite_safe(file_path, img): return False +# ============ CMA Template Matching Functions ============ + +def load_cma_template_global(): + """Load CMA logo template once globally""" + global CMA_LOGO_TEMPLATE, CMA_LOGO_TEMPLATE_RGB + if CMA_LOGO_TEMPLATE is not None: + return True + + if not CMA_LOGO_PATH.exists(): + logger.warning(f"CMA logo template not found at {CMA_LOGO_PATH}") + return False + + try: + # Read template image (grayscale) + CMA_LOGO_TEMPLATE = cv2.imread(str(CMA_LOGO_PATH), cv2.IMREAD_GRAYSCALE) + CMA_LOGO_TEMPLATE_RGB = cv2.cvtColor(CMA_LOGO_TEMPLATE, cv2.COLOR_GRAY2BGR) + logger.info(f"Loaded CMA logo template: {CMA_LOGO_PATH} {CMA_LOGO_TEMPLATE.shape}") + return True + except Exception as e: + logger.error(f"Failed to load CMA logo template: {e}") + return False + + +def match_cma_template(page_img, method=cv2.TM_CCOEFF_NORMED): + """Perform template matching for CMA logo""" + if CMA_LOGO_TEMPLATE is None: + if not load_cma_template_global(): + return None + + # Convert to grayscale if needed + if len(page_img.shape) == 3: + page_gray = cv2.cvtColor(page_img, cv2.COLOR_BGR2GRAY) + else: + page_gray = page_img + + # Execute template matching + result = cv2.matchTemplate(page_gray, CMA_LOGO_TEMPLATE, method=method) + if result is None: + return None + + _, max_val, _, max_loc = cv2.minMaxLoc(result) + + # Calculate center of match + match_center = (max_loc[0] + CMA_LOGO_TEMPLATE.shape[1] // 2, + max_loc[1] + CMA_LOGO_TEMPLATE.shape[0] // 2) + + return { + 'max_val': float(max_val), + 'match_center': match_center, + 'match_loc': max_loc + } + + +def extract_cma_from_roi(roi_img, ocr_engine, output_dir=None): + """Run OCR specifically on CMA ROI""" + result = { + 'code': None, + 'confidence': 0.0, + 'success': False + } + + if roi_img is None or roi_img.size == 0: + print(" [TM] ROI image is empty, skipping") + return result + + h, w = roi_img.shape[:2] + print(f" [TM] ROI size: {w}x{h}") + + try: + # Use existing OCR functions if possible, or direct engine call + # Try .ocr() first (legacy), fall back to .predict() (new API) + raw_result = None + if hasattr(ocr_engine, 'ocr'): + try: + raw_result = ocr_engine.ocr(roi_img) + except TypeError: + # New API doesn't support legacy .ocr() kwargs + pass + if raw_result is None and hasattr(ocr_engine, 'predict'): + try: + raw_result = ocr_engine.predict(roi_img) + except Exception as pred_err: + print(f" [TM] predict() also failed: {pred_err}") + if raw_result is None: + print(" [TM] OCR engine could not process ROI") + return result + + + if not raw_result or len(raw_result) == 0 or raw_result[0] is None: + print(" [TM] OCR returned no results") + return result + + ocr_data = raw_result[0] + rec_texts = [] + rec_scores = [] + + # Handle different result formats + if isinstance(ocr_data, dict) or hasattr(ocr_data, 'get'): + # predict() API: returns dict-like with rec_texts, rec_scores + try: + data_dict = dict(ocr_data) if not isinstance(ocr_data, dict) else ocr_data + rec_texts = list(data_dict.get('rec_texts', [])) + rec_scores = list(data_dict.get('rec_scores', [])) + print(f" [TM] Using predict() API format, found {len(rec_texts)} lines") + except Exception as e: + print(f" [TM] Failed to parse predict() result: {e}") + elif isinstance(ocr_data, list): + # ocr() API: returns [[box, (text, score)], ...] + for line in ocr_data: + try: + if isinstance(line[1], (list, tuple)): + text = str(line[1][0]) + score = float(line[1][1]) + elif isinstance(line[1], str): + text = line[1] + score = 0.9 + else: + text = str(line[1]) + score = 0.5 + rec_texts.append(text) + rec_scores.append(score) + except (IndexError, TypeError, ValueError) as e: + logger.warning(f"Skipped OCR line due to parse error: {e}") + continue + print(f" [TM] Using ocr() API format, found {len(rec_texts)} lines") + + + print(f" [TM] OCR found {len(rec_texts)} text lines") + for i, t in enumerate(rec_texts): + print(f" [TM] Line {i}: '{t}' (score: {rec_scores[i]:.2f})") + + import re + cma_candidates = [] + for i, text in enumerate(rec_texts): + numbers = re.findall(r'\d{11,15}', str(text)) + for num in numbers: + # Take first 12 digits if longer + code = num[:12] if len(num) > 12 else num + cma_candidates.append({ + 'code': code, + 'confidence': rec_scores[i] + }) + + if cma_candidates: + cma_candidates.sort(key=lambda x: x['confidence'], reverse=True) + best = cma_candidates[0] + result['code'] = best['code'] + result['confidence'] = best['confidence'] + result['success'] = True + print(f" [TM] Best CMA candidate: {best['code']} (conf: {best['confidence']:.2f})") + + if output_dir: + imwrite_safe(os.path.join(output_dir, "cma_template_roi.png"), roi_img) + else: + print(" [TM] No CMA code candidates found in ROI text") + + except Exception as e: + logger.error(f"ROI OCR failed: {e}") + print(f" [TM] ROI OCR failed: {e}") + + return result + + +def process_cma_template_extraction(page_img, ocr_engine, output_dir=None): + """Full workflow for template-based CMA extraction""" + print(" [TM] Starting template matching extraction...") + match_res = match_cma_template(page_img) + if not match_res: + print(" [TM] Template matching returned no result") + return {'success': False, 'code': None, 'confidence': 0.0, 'reason': 'No match result'} + + print(f" [TM] Match confidence: {match_res['max_val']:.3f} (threshold: 0.4)") + if match_res['max_val'] < 0.4: + print(" [TM] Match confidence too low, skipping") + return {'success': False, 'code': None, 'confidence': 0.0, 'reason': f"Low match confidence: {match_res['max_val']:.3f}"} + + x, y = match_res['match_center'] + img_h, img_w = page_img.shape[:2] + print(f" [TM] Logo detected at center ({x}, {y}) in image {img_w}x{img_h}") + + # Crop ROI: logo area + region BELOW it (CMA code is typically below the logo) + template_h, template_w = CMA_LOGO_TEMPLATE.shape[:2] + roi_x1 = max(0, x - template_w * 2) + roi_y1 = max(0, y - template_h) + roi_x2 = min(img_w, x + template_w * 3) + roi_y2 = min(img_h, y + template_h * 4) # Extend downward to capture code number + + print(f" [TM] ROI: ({roi_x1}, {roi_y1}) -> ({roi_x2}, {roi_y2})") + roi_img = page_img[roi_y1:roi_y2, roi_x1:roi_x2] + + if output_dir: + imwrite_safe(os.path.join(output_dir, "cma_template_match_roi.png"), roi_img) + + return extract_cma_from_roi(roi_img, ocr_engine, output_dir) + + + # ============ Seal Processing Functions (from v_verify_logic.py) ============ def polar_unwarp(img, center, radius, start_theta, angular_extent): @@ -385,6 +602,12 @@ def detect_seal_center_dual_method(seal_crop, all_polygons): def run_layout_detection(image_path): """Run Paddlex PP-DocLayout-L for layout analysis""" + global PADDLEX_AVAILABLE + + if not PADDLEX_AVAILABLE: + logger.warning("PaddleX not available, skipping layout detection") + return [] + try: model = px.create_model("PP-DocLayout-L") output = model.predict(image_path, batch_size=1) @@ -445,7 +668,7 @@ def run_ocr_recognition_vl(image_path, vl_pipeline): temp_output_dir.mkdir(exist_ok=True) # Run prediction - output = vl_pipeline.predict(image_path) + output = vl_pipeline.predict(image_path, batch_size=1) if output and len(output) > 0: res = output[0] @@ -1173,13 +1396,35 @@ def process_single_pdf(pdf_name: str, expected_cma: str, expected_inst: str, # Extract CMA code logger.info(f"Running CMA extraction on {pdf_name}...") + print(f" + Running CMA extraction...") cma_start = time.time() cma_result = extract_cma_code_fullpage(page_img, ocr_engine, output_dir=str(pdf_output_dir)) + print(f" + Primary CMA result: success={cma_result['success']}, code={cma_result.get('code')}, conf={cma_result.get('confidence', 0):.2f}") + + # Fallback to template matching if primary extraction failed or low confidence + if not cma_result['success'] or cma_result.get('confidence', 0) < 0.6: + print(f" + Primary CMA extraction failed/low confidence. Trying template matching fallback...") + logger.info(f"Primary CMA extraction low confidence ({cma_result.get('confidence', 0):.2f}). Trying template matching fallback...") + template_res = process_cma_template_extraction(page_img, ocr_engine, output_dir=str(pdf_output_dir)) + if template_res['success']: + print(f" + Template matching fallback SUCCESS: {template_res['code']} (conf: {template_res['confidence']:.2f})") + logger.info(f"Template matching fallback SUCCESS: {template_res['code']} (conf: {template_res['confidence']:.2f})") + cma_result = template_res + cma_result['extraction_method'] = 'template_matching' + else: + print(f" + Template matching fallback also failed: {template_res.get('reason', 'no candidate')}") + logger.info(f"Template matching fallback also failed: {template_res.get('reason', 'no candidate')}") + cma_result['extraction_method'] = 'robust_ocr' + else: + cma_result['extraction_method'] = 'robust_ocr' + + result['performance']['cma_time'] = time.time() - cma_start result['extracted']['cma'] = cma_result['code'] result['extracted']['cma_confidence'] = cma_result['confidence'] result['extracted']['cma_success'] = cma_result['success'] + result['extracted']['cma_method'] = cma_result['extraction_method'] # Compare CMA if expected_cma == "无": @@ -1525,18 +1770,32 @@ def main(): """Main execution function""" # Parse command line arguments import argparse - parser = argparse.ArgumentParser(description='CMA & Institution Extraction - Batch Accuracy Test') - parser.add_argument('--ocr-model', type=str, default=OCR_MODEL, - choices=['ppocr_v5', 'paddleocr_vl'], - help='OCR model to use (default: from OCR_MODEL env var or ppocr_v5)') - parser.add_argument('--batch-size', type=int, default=BATCH_SIZE, - help=f'Number of PDFs to process (default: {BATCH_SIZE})') - parser.add_argument('--pdf-names', type=str, default=None, - help='Comma-separated list of PDF names to process (e.g., "1.pdf,2.pdf"). Overrides --batch-size') + parser = argparse.ArgumentParser(description="OCR Test and Bridge Script") + parser.add_argument("--pdf", help="Path to single PDF for bridge mode") + parser.add_argument("--output-dir", help="Output directory", default="bridge_output") + parser.add_argument("--ocr-model", choices=["ppocr_v5", "paddleocr_vl"], default="ppocr_v5") + parser.add_argument("--batch", action="store_true", help="Run batch testing mode") + parser.add_argument("--batch-size", type=int, default=BATCH_SIZE, help="Number of PDFs to process") + parser.add_argument("--pdf-names", help="Comma-separated list of PDF names to process") + args = parser.parse_args() - # Use command line argument if provided + # Shared model selection ocr_model = args.ocr_model + + if args.pdf: + # Bridge mode + pdf_path = Path(args.pdf) + output_dir = Path(args.output_dir) + res = process_single_pdf_standalone(pdf_path, output_dir, ocr_model) + print(json.dumps(res, cls=NumpyEncoder, ensure_ascii=False)) + return + + if not args.batch: + parser.print_help() + return + + # Batch test mode (original main logic) batch_size = args.batch_size pdf_names_filter = args.pdf_names @@ -1577,17 +1836,31 @@ def main(): ocr_engine = None vl_pipeline = None + print("\n" + "=" * 80) + print("INITIALIZING OCR MODELS (This may take 1-3 minutes on first run)") + print("=" * 80) + print() + logger.info("Initializing PaddleOCR engine for CMA recognition...") - print("Initializing PaddleOCR engine (required for CMA extraction)...") - ocr_engine = PaddleOCR(use_angle_cls=True, lang='ch') + print("[1/2] Initializing PaddleOCR engine (for CMA extraction)...") + print(" - Loading detection model (PP-OCRv4_det)...") + ocr_engine = PaddleOCR(use_textline_orientation=True, lang='ch') + print(" - Loading recognition model (PP-OCRv4_rec)...") + print(" - Loading direction classifier...") logger.info("PaddleOCR initialized successfully") - print("PaddleOCR initialized successfully\n") + print(" ✓ PaddleOCR initialized successfully\n") # Initialize PaddleOCRVL for backup seal recognition (always try if available) # This provides a fallback when polar unwarping fails if PADDLEOCRVL_AVAILABLE: logger.info("Initializing PaddleOCRVL for backup seal recognition...") - print("Initializing PaddleOCRVL for backup seal recognition (this may take a while)...") + print("[2/2] Initializing PaddleOCRVL (for seal recognition backup)...") + print(" - This may take 30-60 seconds") + print(" - Loading model from cache: ~/.paddlex/official_models/PaddleOCR-VL-1.5") + print(" - Model size: ~1.9GB (loading into memory)...") + sys.stdout.flush() # Ensure output is displayed immediately + + start_time = time.time() try: vl_pipeline = PaddleOCRVL( use_seal_recognition=True, @@ -1595,21 +1868,27 @@ def main(): use_layout_detection=True ) + init_time = time.time() - start_time + print(f" - Initialization completed in {init_time:.1f} seconds") + # Verify initialization if vl_pipeline is None: raise RuntimeError("PaddleOCRVL initialization returned None") logger.info("PaddleOCRVL initialized successfully (backup ready)") - print("PaddleOCRVL backup ready - will be used when polar unwarping fails\n") + print(" ✓ PaddleOCRVL backup ready - will be used when polar unwarping fails\n") except Exception as e: - logger.error(f"Failed to initialize PaddleOCRVL: {e}") + init_time = time.time() - start_time + logger.error(f"Failed to initialize PaddleOCRVL after {init_time:.1f}s: {e}") logger.error(f"Exception type: {type(e).__name__}") - print(f"WARNING: Failed to initialize PaddleOCRVL: {e}") - print("Polar unwarping failures will skip OCR (no backup available)\n") + print(f" ✗ Failed to initialize PaddleOCRVL: {e}") + print(f" Exception type: {type(e).__name__}") + print(" → Polar unwarping failures will skip OCR (no backup available)\n") else: logger.info("PaddleOCRVL not available - polar unwarping failures will skip OCR") - print("Note: PaddleOCRVL not installed - polar unwarping failures will skip OCR") - print(" To enable backup: pip install paddleocr[doc-parser]\n") + print("[2/2] PaddleOCRVL not available - skipping") + print(" → Install with: pip install paddleocr[doc-parser]") + print(" → Polar unwarping failures will skip OCR (no backup)\n") # Validate OCR model selection if ocr_model == "paddleocr_vl" and vl_pipeline is None: @@ -1618,6 +1897,11 @@ def main(): print("Please install: pip install paddleocr[doc-parser]") ocr_model = "ppocr_v5" + print("=" * 80) + print("MODEL INITIALIZATION COMPLETE") + print("=" * 80) + print() + # Create output directory OUTPUT_DIR.mkdir(exist_ok=True) @@ -1761,5 +2045,52 @@ def main(): print("=" * 80) +def process_single_pdf_standalone(pdf_path: Path, output_dir: Path, ocr_model: str): + """Bridge function for Java to call for a single PDF""" + total_start = time.time() + + # Initialize engines + logger.info(f"Initializing engines for standalone processing (Model: {ocr_model})...") + + vl_pipeline = None + if ocr_model == "paddleocr_vl" and PADDLEOCRVL_AVAILABLE: + vl_pipeline = PaddleOCRVL(use_seal_recognition=True, use_ocr_for_image_block=True, use_layout_detection=True) + + # Re-use the existing core logic function + result = process_single_pdf( + pdf_name=pdf_path.name, + expected_cma=None, + expected_inst=None, + pdf_dir=pdf_path.parent, + output_dir=output_dir, + ocr_engine=None, # Global instance not needed for this path + ocr_model=ocr_model, + vl_pipeline=vl_pipeline + ) + + # Format for bridge output + bridge_res = { + "success": result["status"] == "success", + "cma": { + "code": result["extracted"]["cma"], + "confidence": result["extracted"]["cma_confidence"], + "box": None # Not captured in current flat result + } if result["extracted"]["cma"] else None, + "seals": [ + { + "index": s["index"], + "text": s["text"], + "confidence": s["confidence"], + "success": s["success"], + "method": "vl" if ocr_model == "paddleocr_vl" else "ppocr" + } for s in result["seal_results"] + ], + "institutions": [s["text"] for s in result["seal_results"] if s["success"] and s["text"]], + "error": result["error"] + } + + return bridge_res + + if __name__ == "__main__": main()