report-detect/v_verify_logic.py

229 lines
9.9 KiB
Python
Raw Normal View History

2026-02-05 13:57:22 +08:00
import os
os.environ["DISABLE_MODEL_SOURCE_CHECK"] = "True"
import cv2
import numpy as np
import math
import fitz # PyMuPDF
import paddlex as px # Using Paddlex for Layout
from paddleocr import SealTextDetection
# Tool: Standard Polar Unwarp
def polar_unwarp(img, center, radius, start_theta, angular_extent):
if angular_extent <= 0: return None
strip_w = int(angular_extent * radius)
strip_h = int(radius * 0.6)
if strip_w <= 0 or strip_h <= 0: return None
strip = np.zeros((strip_h, strip_w, 3), dtype=np.uint8)
for y in range(strip_h):
r = radius - y
for x in range(strip_w):
theta = start_theta + angular_extent * (x / strip_w)
src_x = center[0] + r * math.cos(theta)
src_y = center[1] + r * math.sin(theta)
if 0 <= src_x < img.shape[1]-1 and 0 <= src_y < img.shape[0]-1:
patch = cv2.getRectSubPix(img, (1, 1), (float(src_x), float(src_y)))
strip[y, x] = patch[0, 0]
else:
strip[y, x] = [255, 255, 255]
return strip
def calculate_precise_arc(polygons, center):
initial_clusters = []
gap_thresh = math.radians(15)
for poly in polygons:
thetas = sorted([math.atan2(p[1] - center[1], p[0] - center[0]) for i, p in enumerate(poly)])
if not thetas: continue
max_gap = 0
gap_idx = -1
for i in range(len(thetas)):
gap = (thetas[0] + 2*math.pi - thetas[i]) if i == len(thetas)-1 else (thetas[i+1]-thetas[i])
if gap > max_gap: max_gap = gap; gap_idx = i
if gap_idx == len(thetas) - 1:
t_arc = thetas
else:
t_arc = thetas[gap_idx+1:] + [t + 2*math.pi for t in thetas[:gap_idx+1]]
if not t_arc: continue
curr = [t_arc[0]]
for i in range(1, len(t_arc)):
if t_arc[i] - t_arc[i-1] > gap_thresh:
initial_clusters.append({'start': curr[0], 'end': curr[-1]})
curr = [t_arc[i]]
else:
curr.append(t_arc[i])
initial_clusters.append({'start': curr[0], 'end': curr[-1]})
if not initial_clusters: return 0.0, 0.0
initial_clusters.sort(key=lambda x: x['start'])
merged = []
merge_thresh = math.radians(45)
if initial_clusters:
curr = initial_clusters[0]
for i in range(1, len(initial_clusters)):
nxt = initial_clusters[i]
if nxt['start'] - curr['end'] < merge_thresh:
curr['end'] = max(curr['end'], nxt['end'])
else:
merged.append(curr)
curr = nxt
merged.append(curr)
candidates = []
for m in merged:
st, en = m['start'], m['end']
ex = en - st
mid = (st + en) / 2
dist_to_top = abs(((mid + math.pi/2 + math.pi) % (2*math.pi)) - math.pi)
weight = math.exp(-0.5 * (dist_to_top / (math.pi/2))**2)
candidates.append({'start': st, 'end': en, 'extent': ex, 'score': ex * weight})
candidates.sort(key=lambda x: x['score'], reverse=True)
best = candidates[0]
return best['start'], best['end'] - best['start']
def extract_pdf_page(pdf_path, page_num=0):
doc = fitz.open(pdf_path)
page = doc.load_page(page_num)
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))
img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n)
if pix.n == 4: img = cv2.cvtColor(img, cv2.COLOR_RGBA2RGB)
return cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
def run_layout_detection(image_path):
print("Initializing Paddlex PP-DocLayout-L...")
model = px.create_model("PP-DocLayout-L")
output = model.predict(image_path, batch_size=1)
all_regions = []
for res in output:
# Paddlex 3.0 result structure: res is a dict with 'boxes' key
boxes = res.get('boxes', [])
for box in boxes:
# box structure: label_name, label, score, coordinate
label_name = box.get('label_name', box.get('label', 'unknown'))
score = box.get('score', 0.0)
coords = box.get('coordinate')
print(f"Detected: {label_name} (Score: {score:.2f}) at {coords}")
all_regions.append({
'label': label_name,
'score': score,
'box': coords
})
return all_regions
def process_full_workflow(pdf_path, output_dir="report_viz"):
if not os.path.exists(output_dir): os.makedirs(output_dir)
print(f"Rendering PDF {pdf_path} Page 1...")
page_img = extract_pdf_page(pdf_path)
doc_path = os.path.join(output_dir, "doc_page.png")
cv2.imwrite(doc_path, page_img)
print("Running Layout Detection via Paddlex...")
all_regions = run_layout_detection(doc_path)
page_viz = page_img.copy()
seal_boxes = []
for reg in all_regions:
box = reg.get('box')
label = reg.get('label')
score = reg.get('score', 0.0)
# In Paddlex 3.0 DocLayout, 'seal' is index 16 or name 'seal'
# Let's match by name.
is_seal = (label == 'seal')
if score > 0.2: # Low threshold for debugging
x1, y1, x2, y2 = [int(v) for v in box]
color = (0, 0, 255) if is_seal else (0, 255, 0)
cv2.rectangle(page_viz, (x1, y1), (x2, y2), color, 2)
cv2.putText(page_viz, f"{label} {score:.2f}", (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 1)
if is_seal:
seal_boxes.append(box)
cv2.imwrite(os.path.join(output_dir, "doc_layout_viz.png"), page_viz)
seal_results = []
print(f"Processing {len(seal_boxes)} detected seals...")
det_model = SealTextDetection(model_name="PP-OCRv4_server_seal_det")
for i, box in enumerate(seal_boxes):
x1, y1, x2, y2 = [int(v) for v in box]
pad = 40
y1_p, y2_p = max(0, y1-pad), min(page_img.shape[0], y2+pad)
x1_p, x2_p = max(0, x1-pad), min(page_img.shape[1], x2+pad)
seal_crop = page_img[y1_p:y2_p, x1_p:x2_p]
crop_path = os.path.join(output_dir, f"seal_crop_{i}.png")
cv2.imwrite(crop_path, seal_crop)
print(f"Refining Seal #{i}...")
output = det_model.predict(crop_path, batch_size=1)
all_polygons = []
for res in output:
# SealTextDetection returns dt_polys directly in the result
polys = res.get('dt_polys') if isinstance(res, dict) else None
if polys:
all_polygons.extend(polys)
print(f" Found {len(polys)} text polygons in seal #{i}")
ch, cw = seal_crop.shape[:2]
center = [cw // 2, ch // 2]
radius = min(cw, ch) // 2 - 10
start_theta, extent = calculate_precise_arc(all_polygons, center)
marked = seal_crop.copy()
for p in all_polygons:
cv2.polylines(marked, [np.array(p, dtype=np.int32)], True, (0, 255, 0), 2)
unwarp_name = f"seal_unwarp_{i}.png"
unwarp_path = os.path.join(output_dir, unwarp_name)
unwarp = None
if extent > 0:
unwarp = polar_unwarp(seal_crop, center, radius, start_theta, extent)
if unwarp is not None:
cv2.imwrite(unwarp_path, unwarp)
def draw_line(m, theta, color):
x = center[0] + radius * math.cos(theta)
y = center[1] + radius * math.sin(theta)
cv2.line(m, (int(center[0]), int(center[1])), (int(x), int(y)), color, 2)
draw_line(marked, start_theta, (255, 0, 0))
draw_line(marked, start_theta + extent, (0, 0, 255))
marked_name = f"seal_marked_{i}.png"
cv2.imwrite(os.path.join(output_dir, marked_name), marked)
seal_results.append({'index': i, 'crop': f"seal_crop_{i}.png", 'marked': marked_name, 'unwarp': unwarp_name if unwarp is not None else None})
# Integrated HTML Template
html = f"""
<html><body style="font-family: sans-serif; padding: 20px; background: #fdfdfd;">
<h1>Integrated Workflow: Paddlex Layout Analysis + OCR</h1>
<div style="background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.05); margin-bottom: 40px;">
<h3>1. Document Layout Detection (Paddlex PP-DocLayout-L)</h3>
<p>File: WTS2025-21283.pdf | Detected Regions: {len(all_regions)}</p>
<img src="doc_layout_viz.png" style="max-width: 100%; border: 1px solid #999;">
</div>
<div>
<h2>2. Refined Seal Extraction & Unwarping</h2>
{"".join([f'''
<div style="margin-bottom: 40px; border-bottom: 2px solid #eee; padding-bottom: 20px;">
<h3>Seal Area #{s['index']}</h3>
<div style="display: flex; gap: 20px;">
<div style="background:white; padding:10px; border-radius:4px; box-shadow: 0 1px 3px rgba(0,0,0,0.1);">
<p style="margin-top:0;">Detection Overlay</p>
<img src="{s['marked']}" style="max-height: 350px;">
</div>
<div style="flex-grow:1; background:white; padding:10px; border-radius:4px; box-shadow: 0 1px 3px rgba(0,0,0,0.1);">
<p style="margin-top:0;">Unwarped Organization Name</p>
{f'<img src="{s["unwarp"]}" style="max-width: 100%; border: 1px solid #ddd;">' if s['unwarp'] else '<p style="color:red;">No text arc found in this crop.</p>'}
</div>
</div>
</div>
''' for s in seal_results]) if seal_results else '<p>No seals detected for unwarping.</p>'}
</div>
</body></html>
"""
with open(os.path.join(output_dir, "index.html"), "w", encoding="utf-8") as f:
f.write(html)
print(f"Workflow Complete: {output_dir}/index.html")
if __name__ == "__main__":
pdf_path = r"src/test/resources/data/pdfs/关于中检测试技术广东集团有限公司检验检测资质的调查取证函局长件_pages11-14.pdf"
process_full_workflow(pdf_path)