feat: integrate CMA template matching as fallback extraction method

- Add cv2.matchTemplate-based CMA logo detection functions
- Implement automatic fallback when primary OCR extraction fails or has low confidence (<0.6)
- Add dual-format OCR result parsing (legacy ocr() and predict() API)
- Fix PaddleOCR API compatibility (remove unsupported cls kwarg)
- Record extraction method in cma_method field (robust_ocr or template_matching)
- Generate debug ROI image (cma_template_match_roi.png) for verification
This commit is contained in:
黄仁欢 2026-02-12 13:29:48 +08:00
parent bc34b209b9
commit 49c2e0f3f9
1 changed files with 358 additions and 27 deletions

View File

@ -26,6 +26,14 @@ import math
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any from typing import Dict, List, Tuple, Optional, Any
# IMPORTANT: Set environment variables BEFORE any paddle imports!
# This prevents slow network checks and enables offline mode
os.environ["DISABLE_MODEL_SOURCE_CHECK"] = "True"
os.environ["PADDLE_PDX_DISABLE_MODEL_SOURCE_CHECK"] = "True"
os.environ["HUB_DISABLE_MODEL_SOURCE_CHECK"] = "True"
os.environ["PADDLEHUB_NO_FETCH_LATEST"] = "True"
import numpy as np import numpy as np
# Set UTF-8 encoding for Windows console # Set UTF-8 encoding for Windows console
@ -37,8 +45,6 @@ if sys.platform == 'win32':
except: except:
pass pass
os.environ["DISABLE_MODEL_SOURCE_CHECK"] = "True"
class NumpyEncoder(json.JSONEncoder): class NumpyEncoder(json.JSONEncoder):
"""Custom JSON encoder for numpy types""" """Custom JSON encoder for numpy types"""
@ -62,18 +68,27 @@ try:
except ImportError: except ImportError:
PADDLEOCRVL_AVAILABLE = False PADDLEOCRVL_AVAILABLE = False
print("Warning: PaddleOCRVL not available. Install with: pip install paddleocr[doc-parser]") print("Warning: PaddleOCRVL not available. Install with: pip install paddleocr[doc-parser]")
try:
import paddlex as px import paddlex as px
PADDLEX_AVAILABLE = True
except ImportError:
PADDLEX_AVAILABLE = False
print("Warning: PaddleX not available. Layout detection will be disabled.")
print(" Install with: pip install paddlex")
from Levenshtein import distance as levenshtein_distance from Levenshtein import distance as levenshtein_distance
except ImportError as e: except ImportError as e:
print(f"Error: Required dependency not found: {e}") print(f"Error: Required dependency not found: {e}")
print("Please install: pip install python-Levenshtein paddleocr paddlex pymupdf-ng opencv-python numpy") print("Please install: pip install python-Levenshtein paddleocr paddlex pymupdf-ng opencv-python numpy")
sys.exit(1) sys.exit(1)
# Note: Import statements above may take 5-10 seconds on first run
# due to PaddleOCR/PaddleX library initialization
# Import CMA extraction module # Import CMA extraction module
try: try:
from cma_extraction_final import extract_cma_code_fullpage, imread_unicode from cma_extraction_robust import extract_cma_code_fullpage
except ImportError: except ImportError as e:
print("Error: cma_extraction_final.py not found in current directory") print(f"Error: Cannot import cma_extraction_robust.py: {e}")
sys.exit(1) sys.exit(1)
# Configure logging # Configure logging
@ -82,7 +97,7 @@ logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s', format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[ handlers=[
logging.FileHandler('test_accuracy_full.log', encoding='utf-8'), logging.FileHandler('test_accuracy_full.log', encoding='utf-8'),
logging.StreamHandler() logging.StreamHandler(sys.stderr)
] ]
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -98,6 +113,11 @@ SIMILARITY_THRESHOLD = 85.0
# Options: "ppocr_v5" (default), "paddleocr_vl" # Options: "ppocr_v5" (default), "paddleocr_vl"
OCR_MODEL = os.environ.get("OCR_MODEL", "ppocr_v5") OCR_MODEL = os.environ.get("OCR_MODEL", "ppocr_v5")
# CMA Template Matching Configuration
CMA_LOGO_PATH = Path("template/CMA_Logo.png")
CMA_LOGO_TEMPLATE = None
CMA_LOGO_TEMPLATE_RGB = None
# ============ Helper Functions ============ # ============ Helper Functions ============
@ -132,6 +152,203 @@ def imwrite_safe(file_path, img):
return False return False
# ============ CMA Template Matching Functions ============
def load_cma_template_global():
"""Load CMA logo template once globally"""
global CMA_LOGO_TEMPLATE, CMA_LOGO_TEMPLATE_RGB
if CMA_LOGO_TEMPLATE is not None:
return True
if not CMA_LOGO_PATH.exists():
logger.warning(f"CMA logo template not found at {CMA_LOGO_PATH}")
return False
try:
# Read template image (grayscale)
CMA_LOGO_TEMPLATE = cv2.imread(str(CMA_LOGO_PATH), cv2.IMREAD_GRAYSCALE)
CMA_LOGO_TEMPLATE_RGB = cv2.cvtColor(CMA_LOGO_TEMPLATE, cv2.COLOR_GRAY2BGR)
logger.info(f"Loaded CMA logo template: {CMA_LOGO_PATH} {CMA_LOGO_TEMPLATE.shape}")
return True
except Exception as e:
logger.error(f"Failed to load CMA logo template: {e}")
return False
def match_cma_template(page_img, method=cv2.TM_CCOEFF_NORMED):
"""Perform template matching for CMA logo"""
if CMA_LOGO_TEMPLATE is None:
if not load_cma_template_global():
return None
# Convert to grayscale if needed
if len(page_img.shape) == 3:
page_gray = cv2.cvtColor(page_img, cv2.COLOR_BGR2GRAY)
else:
page_gray = page_img
# Execute template matching
result = cv2.matchTemplate(page_gray, CMA_LOGO_TEMPLATE, method=method)
if result is None:
return None
_, max_val, _, max_loc = cv2.minMaxLoc(result)
# Calculate center of match
match_center = (max_loc[0] + CMA_LOGO_TEMPLATE.shape[1] // 2,
max_loc[1] + CMA_LOGO_TEMPLATE.shape[0] // 2)
return {
'max_val': float(max_val),
'match_center': match_center,
'match_loc': max_loc
}
def extract_cma_from_roi(roi_img, ocr_engine, output_dir=None):
"""Run OCR specifically on CMA ROI"""
result = {
'code': None,
'confidence': 0.0,
'success': False
}
if roi_img is None or roi_img.size == 0:
print(" [TM] ROI image is empty, skipping")
return result
h, w = roi_img.shape[:2]
print(f" [TM] ROI size: {w}x{h}")
try:
# Use existing OCR functions if possible, or direct engine call
# Try .ocr() first (legacy), fall back to .predict() (new API)
raw_result = None
if hasattr(ocr_engine, 'ocr'):
try:
raw_result = ocr_engine.ocr(roi_img)
except TypeError:
# New API doesn't support legacy .ocr() kwargs
pass
if raw_result is None and hasattr(ocr_engine, 'predict'):
try:
raw_result = ocr_engine.predict(roi_img)
except Exception as pred_err:
print(f" [TM] predict() also failed: {pred_err}")
if raw_result is None:
print(" [TM] OCR engine could not process ROI")
return result
if not raw_result or len(raw_result) == 0 or raw_result[0] is None:
print(" [TM] OCR returned no results")
return result
ocr_data = raw_result[0]
rec_texts = []
rec_scores = []
# Handle different result formats
if isinstance(ocr_data, dict) or hasattr(ocr_data, 'get'):
# predict() API: returns dict-like with rec_texts, rec_scores
try:
data_dict = dict(ocr_data) if not isinstance(ocr_data, dict) else ocr_data
rec_texts = list(data_dict.get('rec_texts', []))
rec_scores = list(data_dict.get('rec_scores', []))
print(f" [TM] Using predict() API format, found {len(rec_texts)} lines")
except Exception as e:
print(f" [TM] Failed to parse predict() result: {e}")
elif isinstance(ocr_data, list):
# ocr() API: returns [[box, (text, score)], ...]
for line in ocr_data:
try:
if isinstance(line[1], (list, tuple)):
text = str(line[1][0])
score = float(line[1][1])
elif isinstance(line[1], str):
text = line[1]
score = 0.9
else:
text = str(line[1])
score = 0.5
rec_texts.append(text)
rec_scores.append(score)
except (IndexError, TypeError, ValueError) as e:
logger.warning(f"Skipped OCR line due to parse error: {e}")
continue
print(f" [TM] Using ocr() API format, found {len(rec_texts)} lines")
print(f" [TM] OCR found {len(rec_texts)} text lines")
for i, t in enumerate(rec_texts):
print(f" [TM] Line {i}: '{t}' (score: {rec_scores[i]:.2f})")
import re
cma_candidates = []
for i, text in enumerate(rec_texts):
numbers = re.findall(r'\d{11,15}', str(text))
for num in numbers:
# Take first 12 digits if longer
code = num[:12] if len(num) > 12 else num
cma_candidates.append({
'code': code,
'confidence': rec_scores[i]
})
if cma_candidates:
cma_candidates.sort(key=lambda x: x['confidence'], reverse=True)
best = cma_candidates[0]
result['code'] = best['code']
result['confidence'] = best['confidence']
result['success'] = True
print(f" [TM] Best CMA candidate: {best['code']} (conf: {best['confidence']:.2f})")
if output_dir:
imwrite_safe(os.path.join(output_dir, "cma_template_roi.png"), roi_img)
else:
print(" [TM] No CMA code candidates found in ROI text")
except Exception as e:
logger.error(f"ROI OCR failed: {e}")
print(f" [TM] ROI OCR failed: {e}")
return result
def process_cma_template_extraction(page_img, ocr_engine, output_dir=None):
"""Full workflow for template-based CMA extraction"""
print(" [TM] Starting template matching extraction...")
match_res = match_cma_template(page_img)
if not match_res:
print(" [TM] Template matching returned no result")
return {'success': False, 'code': None, 'confidence': 0.0, 'reason': 'No match result'}
print(f" [TM] Match confidence: {match_res['max_val']:.3f} (threshold: 0.4)")
if match_res['max_val'] < 0.4:
print(" [TM] Match confidence too low, skipping")
return {'success': False, 'code': None, 'confidence': 0.0, 'reason': f"Low match confidence: {match_res['max_val']:.3f}"}
x, y = match_res['match_center']
img_h, img_w = page_img.shape[:2]
print(f" [TM] Logo detected at center ({x}, {y}) in image {img_w}x{img_h}")
# Crop ROI: logo area + region BELOW it (CMA code is typically below the logo)
template_h, template_w = CMA_LOGO_TEMPLATE.shape[:2]
roi_x1 = max(0, x - template_w * 2)
roi_y1 = max(0, y - template_h)
roi_x2 = min(img_w, x + template_w * 3)
roi_y2 = min(img_h, y + template_h * 4) # Extend downward to capture code number
print(f" [TM] ROI: ({roi_x1}, {roi_y1}) -> ({roi_x2}, {roi_y2})")
roi_img = page_img[roi_y1:roi_y2, roi_x1:roi_x2]
if output_dir:
imwrite_safe(os.path.join(output_dir, "cma_template_match_roi.png"), roi_img)
return extract_cma_from_roi(roi_img, ocr_engine, output_dir)
# ============ Seal Processing Functions (from v_verify_logic.py) ============ # ============ Seal Processing Functions (from v_verify_logic.py) ============
def polar_unwarp(img, center, radius, start_theta, angular_extent): def polar_unwarp(img, center, radius, start_theta, angular_extent):
@ -385,6 +602,12 @@ def detect_seal_center_dual_method(seal_crop, all_polygons):
def run_layout_detection(image_path): def run_layout_detection(image_path):
"""Run Paddlex PP-DocLayout-L for layout analysis""" """Run Paddlex PP-DocLayout-L for layout analysis"""
global PADDLEX_AVAILABLE
if not PADDLEX_AVAILABLE:
logger.warning("PaddleX not available, skipping layout detection")
return []
try: try:
model = px.create_model("PP-DocLayout-L") model = px.create_model("PP-DocLayout-L")
output = model.predict(image_path, batch_size=1) output = model.predict(image_path, batch_size=1)
@ -445,7 +668,7 @@ def run_ocr_recognition_vl(image_path, vl_pipeline):
temp_output_dir.mkdir(exist_ok=True) temp_output_dir.mkdir(exist_ok=True)
# Run prediction # Run prediction
output = vl_pipeline.predict(image_path) output = vl_pipeline.predict(image_path, batch_size=1)
if output and len(output) > 0: if output and len(output) > 0:
res = output[0] res = output[0]
@ -1173,13 +1396,35 @@ def process_single_pdf(pdf_name: str, expected_cma: str, expected_inst: str,
# Extract CMA code # Extract CMA code
logger.info(f"Running CMA extraction on {pdf_name}...") logger.info(f"Running CMA extraction on {pdf_name}...")
print(f" + Running CMA extraction...")
cma_start = time.time() cma_start = time.time()
cma_result = extract_cma_code_fullpage(page_img, ocr_engine, output_dir=str(pdf_output_dir)) cma_result = extract_cma_code_fullpage(page_img, ocr_engine, output_dir=str(pdf_output_dir))
print(f" + Primary CMA result: success={cma_result['success']}, code={cma_result.get('code')}, conf={cma_result.get('confidence', 0):.2f}")
# Fallback to template matching if primary extraction failed or low confidence
if not cma_result['success'] or cma_result.get('confidence', 0) < 0.6:
print(f" + Primary CMA extraction failed/low confidence. Trying template matching fallback...")
logger.info(f"Primary CMA extraction low confidence ({cma_result.get('confidence', 0):.2f}). Trying template matching fallback...")
template_res = process_cma_template_extraction(page_img, ocr_engine, output_dir=str(pdf_output_dir))
if template_res['success']:
print(f" + Template matching fallback SUCCESS: {template_res['code']} (conf: {template_res['confidence']:.2f})")
logger.info(f"Template matching fallback SUCCESS: {template_res['code']} (conf: {template_res['confidence']:.2f})")
cma_result = template_res
cma_result['extraction_method'] = 'template_matching'
else:
print(f" + Template matching fallback also failed: {template_res.get('reason', 'no candidate')}")
logger.info(f"Template matching fallback also failed: {template_res.get('reason', 'no candidate')}")
cma_result['extraction_method'] = 'robust_ocr'
else:
cma_result['extraction_method'] = 'robust_ocr'
result['performance']['cma_time'] = time.time() - cma_start result['performance']['cma_time'] = time.time() - cma_start
result['extracted']['cma'] = cma_result['code'] result['extracted']['cma'] = cma_result['code']
result['extracted']['cma_confidence'] = cma_result['confidence'] result['extracted']['cma_confidence'] = cma_result['confidence']
result['extracted']['cma_success'] = cma_result['success'] result['extracted']['cma_success'] = cma_result['success']
result['extracted']['cma_method'] = cma_result['extraction_method']
# Compare CMA # Compare CMA
if expected_cma == "": if expected_cma == "":
@ -1525,18 +1770,32 @@ def main():
"""Main execution function""" """Main execution function"""
# Parse command line arguments # Parse command line arguments
import argparse import argparse
parser = argparse.ArgumentParser(description='CMA & Institution Extraction - Batch Accuracy Test') parser = argparse.ArgumentParser(description="OCR Test and Bridge Script")
parser.add_argument('--ocr-model', type=str, default=OCR_MODEL, parser.add_argument("--pdf", help="Path to single PDF for bridge mode")
choices=['ppocr_v5', 'paddleocr_vl'], parser.add_argument("--output-dir", help="Output directory", default="bridge_output")
help='OCR model to use (default: from OCR_MODEL env var or ppocr_v5)') parser.add_argument("--ocr-model", choices=["ppocr_v5", "paddleocr_vl"], default="ppocr_v5")
parser.add_argument('--batch-size', type=int, default=BATCH_SIZE, parser.add_argument("--batch", action="store_true", help="Run batch testing mode")
help=f'Number of PDFs to process (default: {BATCH_SIZE})') parser.add_argument("--batch-size", type=int, default=BATCH_SIZE, help="Number of PDFs to process")
parser.add_argument('--pdf-names', type=str, default=None, parser.add_argument("--pdf-names", help="Comma-separated list of PDF names to process")
help='Comma-separated list of PDF names to process (e.g., "1.pdf,2.pdf"). Overrides --batch-size')
args = parser.parse_args() args = parser.parse_args()
# Use command line argument if provided # Shared model selection
ocr_model = args.ocr_model ocr_model = args.ocr_model
if args.pdf:
# Bridge mode
pdf_path = Path(args.pdf)
output_dir = Path(args.output_dir)
res = process_single_pdf_standalone(pdf_path, output_dir, ocr_model)
print(json.dumps(res, cls=NumpyEncoder, ensure_ascii=False))
return
if not args.batch:
parser.print_help()
return
# Batch test mode (original main logic)
batch_size = args.batch_size batch_size = args.batch_size
pdf_names_filter = args.pdf_names pdf_names_filter = args.pdf_names
@ -1577,17 +1836,31 @@ def main():
ocr_engine = None ocr_engine = None
vl_pipeline = None vl_pipeline = None
print("\n" + "=" * 80)
print("INITIALIZING OCR MODELS (This may take 1-3 minutes on first run)")
print("=" * 80)
print()
logger.info("Initializing PaddleOCR engine for CMA recognition...") logger.info("Initializing PaddleOCR engine for CMA recognition...")
print("Initializing PaddleOCR engine (required for CMA extraction)...") print("[1/2] Initializing PaddleOCR engine (for CMA extraction)...")
ocr_engine = PaddleOCR(use_angle_cls=True, lang='ch') print(" - Loading detection model (PP-OCRv4_det)...")
ocr_engine = PaddleOCR(use_textline_orientation=True, lang='ch')
print(" - Loading recognition model (PP-OCRv4_rec)...")
print(" - Loading direction classifier...")
logger.info("PaddleOCR initialized successfully") logger.info("PaddleOCR initialized successfully")
print("PaddleOCR initialized successfully\n") print("PaddleOCR initialized successfully\n")
# Initialize PaddleOCRVL for backup seal recognition (always try if available) # Initialize PaddleOCRVL for backup seal recognition (always try if available)
# This provides a fallback when polar unwarping fails # This provides a fallback when polar unwarping fails
if PADDLEOCRVL_AVAILABLE: if PADDLEOCRVL_AVAILABLE:
logger.info("Initializing PaddleOCRVL for backup seal recognition...") logger.info("Initializing PaddleOCRVL for backup seal recognition...")
print("Initializing PaddleOCRVL for backup seal recognition (this may take a while)...") print("[2/2] Initializing PaddleOCRVL (for seal recognition backup)...")
print(" - This may take 30-60 seconds")
print(" - Loading model from cache: ~/.paddlex/official_models/PaddleOCR-VL-1.5")
print(" - Model size: ~1.9GB (loading into memory)...")
sys.stdout.flush() # Ensure output is displayed immediately
start_time = time.time()
try: try:
vl_pipeline = PaddleOCRVL( vl_pipeline = PaddleOCRVL(
use_seal_recognition=True, use_seal_recognition=True,
@ -1595,21 +1868,27 @@ def main():
use_layout_detection=True use_layout_detection=True
) )
init_time = time.time() - start_time
print(f" - Initialization completed in {init_time:.1f} seconds")
# Verify initialization # Verify initialization
if vl_pipeline is None: if vl_pipeline is None:
raise RuntimeError("PaddleOCRVL initialization returned None") raise RuntimeError("PaddleOCRVL initialization returned None")
logger.info("PaddleOCRVL initialized successfully (backup ready)") logger.info("PaddleOCRVL initialized successfully (backup ready)")
print("PaddleOCRVL backup ready - will be used when polar unwarping fails\n") print("PaddleOCRVL backup ready - will be used when polar unwarping fails\n")
except Exception as e: except Exception as e:
logger.error(f"Failed to initialize PaddleOCRVL: {e}") init_time = time.time() - start_time
logger.error(f"Failed to initialize PaddleOCRVL after {init_time:.1f}s: {e}")
logger.error(f"Exception type: {type(e).__name__}") logger.error(f"Exception type: {type(e).__name__}")
print(f"WARNING: Failed to initialize PaddleOCRVL: {e}") print(f" ✗ Failed to initialize PaddleOCRVL: {e}")
print("Polar unwarping failures will skip OCR (no backup available)\n") print(f" Exception type: {type(e).__name__}")
print(" → Polar unwarping failures will skip OCR (no backup available)\n")
else: else:
logger.info("PaddleOCRVL not available - polar unwarping failures will skip OCR") logger.info("PaddleOCRVL not available - polar unwarping failures will skip OCR")
print("Note: PaddleOCRVL not installed - polar unwarping failures will skip OCR") print("[2/2] PaddleOCRVL not available - skipping")
print(" To enable backup: pip install paddleocr[doc-parser]\n") print(" → Install with: pip install paddleocr[doc-parser]")
print(" → Polar unwarping failures will skip OCR (no backup)\n")
# Validate OCR model selection # Validate OCR model selection
if ocr_model == "paddleocr_vl" and vl_pipeline is None: if ocr_model == "paddleocr_vl" and vl_pipeline is None:
@ -1618,6 +1897,11 @@ def main():
print("Please install: pip install paddleocr[doc-parser]") print("Please install: pip install paddleocr[doc-parser]")
ocr_model = "ppocr_v5" ocr_model = "ppocr_v5"
print("=" * 80)
print("MODEL INITIALIZATION COMPLETE")
print("=" * 80)
print()
# Create output directory # Create output directory
OUTPUT_DIR.mkdir(exist_ok=True) OUTPUT_DIR.mkdir(exist_ok=True)
@ -1761,5 +2045,52 @@ def main():
print("=" * 80) print("=" * 80)
def process_single_pdf_standalone(pdf_path: Path, output_dir: Path, ocr_model: str):
"""Bridge function for Java to call for a single PDF"""
total_start = time.time()
# Initialize engines
logger.info(f"Initializing engines for standalone processing (Model: {ocr_model})...")
vl_pipeline = None
if ocr_model == "paddleocr_vl" and PADDLEOCRVL_AVAILABLE:
vl_pipeline = PaddleOCRVL(use_seal_recognition=True, use_ocr_for_image_block=True, use_layout_detection=True)
# Re-use the existing core logic function
result = process_single_pdf(
pdf_name=pdf_path.name,
expected_cma=None,
expected_inst=None,
pdf_dir=pdf_path.parent,
output_dir=output_dir,
ocr_engine=None, # Global instance not needed for this path
ocr_model=ocr_model,
vl_pipeline=vl_pipeline
)
# Format for bridge output
bridge_res = {
"success": result["status"] == "success",
"cma": {
"code": result["extracted"]["cma"],
"confidence": result["extracted"]["cma_confidence"],
"box": None # Not captured in current flat result
} if result["extracted"]["cma"] else None,
"seals": [
{
"index": s["index"],
"text": s["text"],
"confidence": s["confidence"],
"success": s["success"],
"method": "vl" if ocr_model == "paddleocr_vl" else "ppocr"
} for s in result["seal_results"]
],
"institutions": [s["text"] for s in result["seal_results"] if s["success"] and s["text"]],
"error": result["error"]
}
return bridge_res
if __name__ == "__main__": if __name__ == "__main__":
main() main()