521 lines
19 KiB
Python
521 lines
19 KiB
Python
"""
|
|
CMA Code Extraction Module using Template Matching (PRIMARY METHOD)
|
|
|
|
This module provides the most robust method for extracting CMA certification codes
|
|
by first locating the CMA logo via template matching, then OCR-ing the region below it.
|
|
|
|
Key improvements over cma_extraction_final.py:
|
|
1. Multi-scale template matching for different logo sizes
|
|
2. HSV-based preprocessing to highlight red CMA logo
|
|
3. More flexible ROI extraction
|
|
4. Better OCR result parsing
|
|
|
|
Author: Based on reference implementation from refer/认监-扫描件识别
|
|
Date: 2026-02-26
|
|
"""
|
|
import os
|
|
import re
|
|
import cv2
|
|
import numpy as np
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# CMA code patterns
|
|
PATTERN_11_DIGITS = re.compile(r'\d{11,12}') # Support 11-12 digit CMA codes
|
|
|
|
# Template configuration
|
|
DEFAULT_TEMPLATE_PATH = Path("template/CMA_Logo.png")
|
|
TEMPLATE_SCALES = [0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2] # Multi-scale matching (extended to 0.5-1.2)
|
|
MIN_MATCH_CONFIDENCE = 0.30 # Lowered from 0.35 to capture more matches in 0.32-0.39 range
|
|
|
|
|
|
def imread_unicode(path, flags=cv2.IMREAD_COLOR):
|
|
"""
|
|
cv2.imread replacement that supports paths with non-ASCII characters.
|
|
|
|
Args:
|
|
path: Image file path (may contain Chinese characters)
|
|
flags: cv2.IMREAD_* flags
|
|
|
|
Returns:
|
|
Image as numpy array or None if failed
|
|
"""
|
|
try:
|
|
data = np.fromfile(str(path), dtype=np.uint8)
|
|
img = cv2.imdecode(data, flags)
|
|
return img
|
|
except Exception as e:
|
|
logger.error(f"Failed to read image {path}: {e}")
|
|
return None
|
|
|
|
|
|
def preprocess_for_matching(image: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Build a foreground mask that emphasises the CMA logo while suppressing the page.
|
|
|
|
This function:
|
|
1. Extracts red regions (CMA logo is typically red)
|
|
2. Adds edge detection for faint prints
|
|
3. Uses morphological operations to clean up
|
|
|
|
Args:
|
|
image: Input image (BGR format)
|
|
|
|
Returns:
|
|
Binary mask highlighting the CMA logo
|
|
"""
|
|
if image.size == 0:
|
|
return image
|
|
|
|
if image.ndim == 2 or image.shape[2] == 1:
|
|
gray = image if image.ndim == 2 else image[:, :, 0]
|
|
blurred = cv2.GaussianBlur(gray, (3, 3), 0)
|
|
_, mask = cv2.threshold(
|
|
blurred, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU
|
|
)
|
|
return mask
|
|
|
|
blurred = cv2.GaussianBlur(image, (3, 3), 0)
|
|
hsv = cv2.cvtColor(blurred, cv2.COLOR_BGR2HSV)
|
|
|
|
# Primary: strong reds (CMA logo)
|
|
lower_red1 = np.array([0, 30, 40])
|
|
upper_red1 = np.array([15, 255, 255])
|
|
lower_red2 = np.array([165, 30, 40])
|
|
upper_red2 = np.array([180, 255, 255])
|
|
red_mask = cv2.bitwise_or(
|
|
cv2.inRange(hsv, lower_red1, upper_red1),
|
|
cv2.inRange(hsv, lower_red2, upper_red2),
|
|
)
|
|
|
|
# Complementary: dark or low-value areas (handles grey/low-sat scans)
|
|
gray = cv2.cvtColor(blurred, cv2.COLOR_BGR2GRAY)
|
|
_, dark_mask = cv2.threshold(
|
|
gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU
|
|
)
|
|
|
|
# Edge emphasis to cope with faint prints
|
|
edges = cv2.Canny(gray, 60, 150)
|
|
|
|
combined = cv2.bitwise_or(red_mask, dark_mask)
|
|
combined = cv2.bitwise_or(combined, edges)
|
|
|
|
kernel3 = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
|
|
kernel5 = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
|
|
cleaned = cv2.morphologyEx(combined, cv2.MORPH_CLOSE, kernel5, iterations=2)
|
|
cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_OPEN, kernel3, iterations=1)
|
|
cleaned = cv2.dilate(cleaned, kernel5, iterations=2)
|
|
|
|
return cleaned
|
|
|
|
|
|
def locate_template_multi_scale(
|
|
page_img: np.ndarray,
|
|
template: np.ndarray,
|
|
scales: list = TEMPLATE_SCALES,
|
|
min_confidence: float = MIN_MATCH_CONFIDENCE
|
|
) -> dict:
|
|
"""
|
|
Locate CMA logo using multi-scale template matching.
|
|
|
|
Args:
|
|
page_img: Page image (grayscale or BGR)
|
|
template: CMA logo template (grayscale or BGR)
|
|
scales: List of scales to try
|
|
min_confidence: Minimum match confidence (0-1)
|
|
|
|
Returns:
|
|
Dict with keys: 'max_val', 'match_center', 'match_loc', 'scale', 'success'
|
|
"""
|
|
# Convert to grayscale if needed
|
|
if len(page_img.shape) == 3:
|
|
page_gray = cv2.cvtColor(page_img, cv2.COLOR_BGR2GRAY)
|
|
else:
|
|
page_gray = page_img
|
|
|
|
if len(template.shape) == 3:
|
|
template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)
|
|
else:
|
|
template_gray = template
|
|
|
|
# Preprocess page and template for better matching
|
|
page_mask = preprocess_for_matching(page_img)
|
|
template_mask = preprocess_for_matching(template)
|
|
|
|
best_match = None
|
|
best_confidence = 0
|
|
|
|
# Get page dimensions for position filtering
|
|
page_h, page_w = page_mask.shape[:2]
|
|
# CMA logos are typically in the upper portion of the page (0-60% of height)
|
|
# This prevents matching footer logos or other elements at the bottom
|
|
max_y_position = int(page_h * 0.6)
|
|
|
|
for scale in scales:
|
|
# Resize template
|
|
if scale != 1.0:
|
|
new_width = int(template_gray.shape[1] * scale)
|
|
new_height = int(template_gray.shape[0] * scale)
|
|
if new_width < 10 or new_height < 10:
|
|
continue
|
|
resized_template = cv2.resize(
|
|
template_gray, (new_width, new_height),
|
|
interpolation=cv2.INTER_AREA if scale < 1.0 else cv2.INTER_CUBIC
|
|
)
|
|
resized_template_mask = cv2.resize(
|
|
template_mask, (new_width, new_height),
|
|
interpolation=cv2.INTER_AREA if scale < 1.0 else cv2.INTER_CUBIC
|
|
)
|
|
else:
|
|
resized_template = template_gray
|
|
resized_template_mask = template_mask
|
|
|
|
# Try matching with preprocessed masks
|
|
try:
|
|
result = cv2.matchTemplate(page_mask, resized_template_mask, cv2.TM_CCORR_NORMED)
|
|
if result is None:
|
|
continue
|
|
|
|
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
|
|
|
|
# Position filtering: only consider matches in the upper portion of the page
|
|
# Calculate the center of the matched template
|
|
match_center_y = max_loc[1] + resized_template.shape[0] // 2
|
|
|
|
# Skip matches in the bottom portion of the page (likely footer logos)
|
|
if match_center_y > max_y_position:
|
|
logger.debug(f"Skipping match at Y={match_center_y} (below threshold {max_y_position}) with confidence {max_val:.3f}")
|
|
continue
|
|
|
|
if max_val > best_confidence:
|
|
best_confidence = max_val
|
|
best_match = {
|
|
'max_val': float(max_val),
|
|
'match_loc': max_loc,
|
|
'scale': scale,
|
|
'template_h': resized_template.shape[0],
|
|
'template_w': resized_template.shape[1]
|
|
}
|
|
|
|
logger.debug(f"New best match: confidence={max_val:.3f}, scale={scale}, Y={match_center_y}")
|
|
|
|
# Early exit if we have a very good match in the correct position
|
|
if max_val >= 0.6:
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Template matching failed at scale {scale}: {e}")
|
|
continue
|
|
|
|
if best_match is None or best_match['max_val'] < min_confidence:
|
|
return {
|
|
'success': False,
|
|
'max_val': best_confidence if best_match else 0.0,
|
|
'reason': 'No match found above threshold'
|
|
}
|
|
|
|
# Calculate match center
|
|
match_loc = best_match['match_loc']
|
|
template_h = best_match['template_h']
|
|
template_w = best_match['template_w']
|
|
match_center = (
|
|
match_loc[0] + template_w // 2,
|
|
match_loc[1] + template_h // 2
|
|
)
|
|
|
|
best_match['match_center'] = match_center
|
|
best_match['success'] = True
|
|
|
|
return best_match
|
|
|
|
|
|
def extract_cma_from_roi(roi_img, ocr_engine, output_dir=None):
|
|
"""
|
|
Run OCR specifically on CMA ROI and extract CMA code.
|
|
|
|
This is a simplified version that handles OCR results more robustly.
|
|
|
|
Args:
|
|
roi_img: ROI image (numpy array)
|
|
ocr_engine: Initialized PaddleOCR instance
|
|
output_dir: Optional directory to save debug images
|
|
|
|
Returns:
|
|
Dict with extracted CMA code
|
|
"""
|
|
result = {
|
|
'code': None,
|
|
'confidence': 0.0,
|
|
'success': False
|
|
}
|
|
|
|
if roi_img is None or roi_img.size == 0:
|
|
logger.warning("ROI image is empty")
|
|
return result
|
|
|
|
h, w = roi_img.shape[:2]
|
|
logger.info(f"ROI size: {w}x{h}")
|
|
|
|
try:
|
|
# Try .ocr() method first (without cls parameter to avoid API incompatibility)
|
|
raw_result = None
|
|
if hasattr(ocr_engine, 'ocr'):
|
|
try:
|
|
raw_result = ocr_engine.ocr(roi_img)
|
|
except Exception as ocr_err:
|
|
logger.debug(f".ocr() method failed: {ocr_err}, trying .predict()")
|
|
raw_result = None
|
|
|
|
# Fallback to .predict() if .ocr() failed or not available
|
|
if raw_result is None and hasattr(ocr_engine, 'predict'):
|
|
try:
|
|
raw_result = ocr_engine.predict(roi_img)
|
|
except Exception as pred_err:
|
|
logger.debug(f".predict() method also failed: {pred_err}")
|
|
raw_result = None
|
|
|
|
if raw_result is None:
|
|
logger.warning("OCR returned None")
|
|
return result
|
|
|
|
# Parse OCR results
|
|
rec_texts = []
|
|
rec_scores = []
|
|
|
|
# Handle different result formats
|
|
if isinstance(raw_result, list) and len(raw_result) > 0:
|
|
ocr_data = raw_result[0]
|
|
|
|
if isinstance(ocr_data, list):
|
|
# Legacy format: [[box, (text, score)], ...]
|
|
for line in ocr_data:
|
|
try:
|
|
if not isinstance(line, (list, tuple)) or len(line) < 2:
|
|
continue
|
|
|
|
if isinstance(line[1], (list, tuple)):
|
|
if len(line[1]) >= 2:
|
|
text = str(line[1][0])
|
|
score = float(line[1][1])
|
|
elif len(line[1]) == 1:
|
|
text = str(line[1][0])
|
|
score = 0.9
|
|
else:
|
|
continue
|
|
else:
|
|
text = str(line[1])
|
|
score = 0.9
|
|
|
|
rec_texts.append(text)
|
|
rec_scores.append(score)
|
|
except (IndexError, TypeError, ValueError) as e:
|
|
logger.debug(f"Skipped OCR line: {e}")
|
|
continue
|
|
elif isinstance(ocr_data, dict):
|
|
# New PaddleOCR format: dict with 'rec_texts', 'rec_scores' keys
|
|
rec_texts = list(ocr_data.get('rec_texts', []))
|
|
rec_scores = list(ocr_data.get('rec_scores', []))
|
|
logger.info(f"Using new PaddleOCR dict format, found {len(rec_texts)} lines")
|
|
elif isinstance(raw_result, dict):
|
|
# Direct dict format (single page result)
|
|
rec_texts = list(raw_result.get('rec_texts', []))
|
|
rec_scores = list(raw_result.get('rec_scores', []))
|
|
logger.info(f"Using direct dict format, found {len(rec_texts)} lines")
|
|
|
|
logger.info(f"OCR found {len(rec_texts)} text lines")
|
|
|
|
# Print all detected text for debugging
|
|
for i, (text, score) in enumerate(zip(rec_texts, rec_scores)):
|
|
logger.debug(f" Line {i}: '{text}' (score: {score:.2f})")
|
|
|
|
# Find CMA code candidates using simple 11-digit pattern
|
|
cma_candidates = []
|
|
for i, text in enumerate(rec_texts):
|
|
# Clean text: remove spaces and common OCR artifacts
|
|
cleaned = text.replace(" ", "").replace("-", "").replace(":", "")
|
|
|
|
# Find 11-digit numbers
|
|
matches = PATTERN_11_DIGITS.findall(cleaned)
|
|
for num in matches:
|
|
cma_candidates.append({
|
|
'code': num,
|
|
'confidence': rec_scores[i] if i < len(rec_scores) else 0.5,
|
|
'text': text
|
|
})
|
|
|
|
if cma_candidates:
|
|
# Prioritize candidates starting with '2' (standard CMA code format)
|
|
# CMA codes typically start with '2'
|
|
cma_candidates_starting_with_2 = [c for c in cma_candidates if c['code'].startswith('2')]
|
|
if cma_candidates_starting_with_2:
|
|
# Sort '2'-prefixed candidates by confidence
|
|
cma_candidates_starting_with_2.sort(key=lambda x: x['confidence'], reverse=True)
|
|
best = cma_candidates_starting_with_2[0]
|
|
logger.info(f"Best CMA candidate (starts with 2): {best['code']} (conf: {best['confidence']:.2f})")
|
|
else:
|
|
# No candidates start with '2', use all candidates sorted by confidence
|
|
cma_candidates.sort(key=lambda x: x['confidence'], reverse=True)
|
|
best = cma_candidates[0]
|
|
logger.info(f"Best CMA candidate (no '2' prefix): {best['code']} (conf: {best['confidence']:.2f})")
|
|
|
|
result['code'] = best['code']
|
|
result['confidence'] = best['confidence']
|
|
result['success'] = True
|
|
else:
|
|
logger.warning("No CMA code candidates found in ROI text")
|
|
|
|
except Exception as e:
|
|
logger.error(f"ROI OCR failed: {e}")
|
|
|
|
return result
|
|
|
|
|
|
def extract_cma_code_fullpage(page_img, ocr_engine, output_dir=None):
|
|
"""
|
|
Extract CMA code from a PDF page image using template matching + OCR.
|
|
|
|
This is the main entry point that replicates the reference implementation.
|
|
|
|
Args:
|
|
page_img: Page image (numpy array or path to image)
|
|
ocr_engine: Initialized PaddleOCR instance
|
|
output_dir: Optional directory to save debug visualizations
|
|
|
|
Returns:
|
|
Dict with keys:
|
|
- 'code': Extracted CMA code (str or None)
|
|
- 'confidence': OCR confidence (float)
|
|
- 'raw_text': Raw OCR text containing the code (str)
|
|
- 'position': (x, y) tuple of logo position
|
|
- 'box': Bounding box [x1, y1, x2, y2]
|
|
- 'success': Boolean indicating successful extraction
|
|
- 'extraction_method': 'template_matching'
|
|
"""
|
|
result = {
|
|
'code': None,
|
|
'confidence': 0.0,
|
|
'raw_text': '',
|
|
'position': (0, 0),
|
|
'box': None,
|
|
'success': False,
|
|
'extraction_method': 'template_matching'
|
|
}
|
|
|
|
# Load image if path provided
|
|
if isinstance(page_img, str):
|
|
image = imread_unicode(page_img, cv2.IMREAD_COLOR)
|
|
elif isinstance(page_img, np.ndarray):
|
|
image = page_img
|
|
else:
|
|
logger.error(f"Invalid image type: {type(page_img)}")
|
|
return result
|
|
|
|
if image is None or image.size == 0:
|
|
logger.error("Failed to load image or empty image")
|
|
return result
|
|
|
|
h, w = image.shape[:2]
|
|
logger.info(f"Processing image {w}x{h}")
|
|
|
|
# Load template
|
|
if not DEFAULT_TEMPLATE_PATH.exists():
|
|
logger.error(f"CMA template not found: {DEFAULT_TEMPLATE_PATH}")
|
|
return result
|
|
|
|
template = imread_unicode(str(DEFAULT_TEMPLATE_PATH), cv2.IMREAD_COLOR)
|
|
if template is None:
|
|
logger.error(f"Failed to load template: {DEFAULT_TEMPLATE_PATH}")
|
|
return result
|
|
|
|
# Locate logo using multi-scale template matching
|
|
logger.info("Locating CMA logo using multi-scale template matching...")
|
|
match_res = locate_template_multi_scale(image, template)
|
|
|
|
if not match_res['success']:
|
|
logger.warning(f"Template matching failed: {match_res.get('reason', 'Unknown')}")
|
|
result['raw_text'] = match_res.get('reason', 'Template matching failed')
|
|
return result
|
|
|
|
logger.info(f"Logo found at {match_res['match_center']} (confidence: {match_res['max_val']:.3f}, scale: {match_res['scale']:.2f})")
|
|
|
|
# Extract ROI around the logo
|
|
x, y = match_res['match_center']
|
|
template_h = match_res['template_h']
|
|
template_w = match_res['template_w']
|
|
|
|
# ROI: region to the RIGHT and BELOW the logo
|
|
# CMA code typically appears below and to the right of the CMA logo
|
|
roi_x1 = int(max(0, x)) # Start from logo center, going right
|
|
roi_y1 = int(max(0, y - template_h // 2)) # Vertically centered on logo (extend up a bit)
|
|
roi_x2 = int(min(w, x + min(600, w - x))) # Extend right up to 600px
|
|
roi_y2 = int(min(h, y + template_h * 4)) # Extend down significantly to capture CMA code
|
|
|
|
logger.info(f"ROI: ({roi_x1}, {roi_y1}) -> ({roi_x2}, {roi_y2})")
|
|
roi_img = image[roi_y1:roi_y2, roi_x1:roi_x2]
|
|
|
|
# Save ROI for debugging
|
|
if output_dir:
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
roi_path = os.path.join(output_dir, "cma_roi.png")
|
|
if not cv2.imwrite(roi_path, roi_img):
|
|
# Try imwrite + tofile for Chinese paths
|
|
is_success, buffer = cv2.imencode(".png", roi_img)
|
|
if is_success:
|
|
buffer.tofile(roi_path)
|
|
|
|
# Extract CMA code from ROI
|
|
logger.info("Extracting CMA code from ROI...")
|
|
cma_result = extract_cma_from_roi(roi_img, ocr_engine, output_dir)
|
|
|
|
if cma_result['success']:
|
|
result.update(cma_result)
|
|
result['position'] = (x, y)
|
|
result['box'] = [int(roi_x1), int(roi_y1), int(roi_x2), int(roi_y2)]
|
|
else:
|
|
# Fallback: Try full-page OCR if ROI extraction failed
|
|
logger.warning("ROI OCR failed, trying full-page OCR as fallback...")
|
|
cma_result_fallback = extract_cma_from_roi(image, ocr_engine, output_dir)
|
|
if cma_result_fallback['success']:
|
|
result.update(cma_result_fallback)
|
|
result['extraction_method'] = 'template_matching_fullpage_fallback'
|
|
logger.info(f"Full-page fallback succeeded: {cma_result_fallback['code']}")
|
|
else:
|
|
result['raw_text'] = cma_result.get('reason', 'ROI and full-page OCR both failed')
|
|
|
|
return result
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python cma_extraction_template_primary.py <image_path> [output_dir]")
|
|
sys.exit(1)
|
|
|
|
img_path = sys.argv[1]
|
|
out_dir = sys.argv[2] if len(sys.argv) > 2 else "cma_test_output"
|
|
|
|
os.environ["DISABLE_MODEL_SOURCE_CHECK"] = "True"
|
|
from paddleocr import PaddleOCR
|
|
|
|
print("Initializing PaddleOCR...")
|
|
ocr = PaddleOCR(use_angle_cls=True, lang='ch', show_log=False)
|
|
|
|
result = extract_cma_code_fullpage(img_path, ocr, out_dir)
|
|
|
|
print("\n" + "=" * 60)
|
|
print("CMA EXTRACTION RESULT")
|
|
print("=" * 60)
|
|
print(f"Success: {result['success']}")
|
|
if result['success']:
|
|
print(f"CMA Code: {result['code']}")
|
|
print(f"Confidence: {result['confidence']:.4f}")
|
|
print(f"Position: {result['position']}")
|
|
print("=" * 60)
|