229 lines
9.9 KiB
Python
229 lines
9.9 KiB
Python
|
|
import os
|
|||
|
|
os.environ["DISABLE_MODEL_SOURCE_CHECK"] = "True"
|
|||
|
|
|
|||
|
|
import cv2
|
|||
|
|
import numpy as np
|
|||
|
|
import math
|
|||
|
|
import fitz # PyMuPDF
|
|||
|
|
import paddlex as px # Using Paddlex for Layout
|
|||
|
|
from paddleocr import SealTextDetection
|
|||
|
|
|
|||
|
|
# Tool: Standard Polar Unwarp
|
|||
|
|
def polar_unwarp(img, center, radius, start_theta, angular_extent):
|
|||
|
|
if angular_extent <= 0: return None
|
|||
|
|
strip_w = int(angular_extent * radius)
|
|||
|
|
strip_h = int(radius * 0.6)
|
|||
|
|
if strip_w <= 0 or strip_h <= 0: return None
|
|||
|
|
strip = np.zeros((strip_h, strip_w, 3), dtype=np.uint8)
|
|||
|
|
for y in range(strip_h):
|
|||
|
|
r = radius - y
|
|||
|
|
for x in range(strip_w):
|
|||
|
|
theta = start_theta + angular_extent * (x / strip_w)
|
|||
|
|
src_x = center[0] + r * math.cos(theta)
|
|||
|
|
src_y = center[1] + r * math.sin(theta)
|
|||
|
|
if 0 <= src_x < img.shape[1]-1 and 0 <= src_y < img.shape[0]-1:
|
|||
|
|
patch = cv2.getRectSubPix(img, (1, 1), (float(src_x), float(src_y)))
|
|||
|
|
strip[y, x] = patch[0, 0]
|
|||
|
|
else:
|
|||
|
|
strip[y, x] = [255, 255, 255]
|
|||
|
|
return strip
|
|||
|
|
|
|||
|
|
def calculate_precise_arc(polygons, center):
|
|||
|
|
initial_clusters = []
|
|||
|
|
gap_thresh = math.radians(15)
|
|||
|
|
for poly in polygons:
|
|||
|
|
thetas = sorted([math.atan2(p[1] - center[1], p[0] - center[0]) for i, p in enumerate(poly)])
|
|||
|
|
if not thetas: continue
|
|||
|
|
max_gap = 0
|
|||
|
|
gap_idx = -1
|
|||
|
|
for i in range(len(thetas)):
|
|||
|
|
gap = (thetas[0] + 2*math.pi - thetas[i]) if i == len(thetas)-1 else (thetas[i+1]-thetas[i])
|
|||
|
|
if gap > max_gap: max_gap = gap; gap_idx = i
|
|||
|
|
if gap_idx == len(thetas) - 1:
|
|||
|
|
t_arc = thetas
|
|||
|
|
else:
|
|||
|
|
t_arc = thetas[gap_idx+1:] + [t + 2*math.pi for t in thetas[:gap_idx+1]]
|
|||
|
|
if not t_arc: continue
|
|||
|
|
curr = [t_arc[0]]
|
|||
|
|
for i in range(1, len(t_arc)):
|
|||
|
|
if t_arc[i] - t_arc[i-1] > gap_thresh:
|
|||
|
|
initial_clusters.append({'start': curr[0], 'end': curr[-1]})
|
|||
|
|
curr = [t_arc[i]]
|
|||
|
|
else:
|
|||
|
|
curr.append(t_arc[i])
|
|||
|
|
initial_clusters.append({'start': curr[0], 'end': curr[-1]})
|
|||
|
|
if not initial_clusters: return 0.0, 0.0
|
|||
|
|
initial_clusters.sort(key=lambda x: x['start'])
|
|||
|
|
merged = []
|
|||
|
|
merge_thresh = math.radians(45)
|
|||
|
|
if initial_clusters:
|
|||
|
|
curr = initial_clusters[0]
|
|||
|
|
for i in range(1, len(initial_clusters)):
|
|||
|
|
nxt = initial_clusters[i]
|
|||
|
|
if nxt['start'] - curr['end'] < merge_thresh:
|
|||
|
|
curr['end'] = max(curr['end'], nxt['end'])
|
|||
|
|
else:
|
|||
|
|
merged.append(curr)
|
|||
|
|
curr = nxt
|
|||
|
|
merged.append(curr)
|
|||
|
|
candidates = []
|
|||
|
|
for m in merged:
|
|||
|
|
st, en = m['start'], m['end']
|
|||
|
|
ex = en - st
|
|||
|
|
mid = (st + en) / 2
|
|||
|
|
dist_to_top = abs(((mid + math.pi/2 + math.pi) % (2*math.pi)) - math.pi)
|
|||
|
|
weight = math.exp(-0.5 * (dist_to_top / (math.pi/2))**2)
|
|||
|
|
candidates.append({'start': st, 'end': en, 'extent': ex, 'score': ex * weight})
|
|||
|
|
candidates.sort(key=lambda x: x['score'], reverse=True)
|
|||
|
|
best = candidates[0]
|
|||
|
|
return best['start'], best['end'] - best['start']
|
|||
|
|
|
|||
|
|
def extract_pdf_page(pdf_path, page_num=0):
|
|||
|
|
doc = fitz.open(pdf_path)
|
|||
|
|
page = doc.load_page(page_num)
|
|||
|
|
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))
|
|||
|
|
img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n)
|
|||
|
|
if pix.n == 4: img = cv2.cvtColor(img, cv2.COLOR_RGBA2RGB)
|
|||
|
|
return cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
|
|||
|
|
|
|||
|
|
def run_layout_detection(image_path):
|
|||
|
|
print("Initializing Paddlex PP-DocLayout-L...")
|
|||
|
|
model = px.create_model("PP-DocLayout-L")
|
|||
|
|
output = model.predict(image_path, batch_size=1)
|
|||
|
|
|
|||
|
|
all_regions = []
|
|||
|
|
for res in output:
|
|||
|
|
# Paddlex 3.0 result structure: res is a dict with 'boxes' key
|
|||
|
|
boxes = res.get('boxes', [])
|
|||
|
|
for box in boxes:
|
|||
|
|
# box structure: label_name, label, score, coordinate
|
|||
|
|
label_name = box.get('label_name', box.get('label', 'unknown'))
|
|||
|
|
score = box.get('score', 0.0)
|
|||
|
|
coords = box.get('coordinate')
|
|||
|
|
print(f"Detected: {label_name} (Score: {score:.2f}) at {coords}")
|
|||
|
|
all_regions.append({
|
|||
|
|
'label': label_name,
|
|||
|
|
'score': score,
|
|||
|
|
'box': coords
|
|||
|
|
})
|
|||
|
|
return all_regions
|
|||
|
|
|
|||
|
|
def process_full_workflow(pdf_path, output_dir="report_viz"):
|
|||
|
|
if not os.path.exists(output_dir): os.makedirs(output_dir)
|
|||
|
|
print(f"Rendering PDF {pdf_path} Page 1...")
|
|||
|
|
page_img = extract_pdf_page(pdf_path)
|
|||
|
|
doc_path = os.path.join(output_dir, "doc_page.png")
|
|||
|
|
cv2.imwrite(doc_path, page_img)
|
|||
|
|
|
|||
|
|
print("Running Layout Detection via Paddlex...")
|
|||
|
|
all_regions = run_layout_detection(doc_path)
|
|||
|
|
|
|||
|
|
page_viz = page_img.copy()
|
|||
|
|
seal_boxes = []
|
|||
|
|
for reg in all_regions:
|
|||
|
|
box = reg.get('box')
|
|||
|
|
label = reg.get('label')
|
|||
|
|
score = reg.get('score', 0.0)
|
|||
|
|
|
|||
|
|
# In Paddlex 3.0 DocLayout, 'seal' is index 16 or name 'seal'
|
|||
|
|
# Let's match by name.
|
|||
|
|
is_seal = (label == 'seal')
|
|||
|
|
|
|||
|
|
if score > 0.2: # Low threshold for debugging
|
|||
|
|
x1, y1, x2, y2 = [int(v) for v in box]
|
|||
|
|
color = (0, 0, 255) if is_seal else (0, 255, 0)
|
|||
|
|
cv2.rectangle(page_viz, (x1, y1), (x2, y2), color, 2)
|
|||
|
|
cv2.putText(page_viz, f"{label} {score:.2f}", (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 1)
|
|||
|
|
|
|||
|
|
if is_seal:
|
|||
|
|
seal_boxes.append(box)
|
|||
|
|
|
|||
|
|
cv2.imwrite(os.path.join(output_dir, "doc_layout_viz.png"), page_viz)
|
|||
|
|
|
|||
|
|
seal_results = []
|
|||
|
|
print(f"Processing {len(seal_boxes)} detected seals...")
|
|||
|
|
det_model = SealTextDetection(model_name="PP-OCRv4_server_seal_det")
|
|||
|
|
|
|||
|
|
for i, box in enumerate(seal_boxes):
|
|||
|
|
x1, y1, x2, y2 = [int(v) for v in box]
|
|||
|
|
pad = 40
|
|||
|
|
y1_p, y2_p = max(0, y1-pad), min(page_img.shape[0], y2+pad)
|
|||
|
|
x1_p, x2_p = max(0, x1-pad), min(page_img.shape[1], x2+pad)
|
|||
|
|
seal_crop = page_img[y1_p:y2_p, x1_p:x2_p]
|
|||
|
|
crop_path = os.path.join(output_dir, f"seal_crop_{i}.png")
|
|||
|
|
cv2.imwrite(crop_path, seal_crop)
|
|||
|
|
|
|||
|
|
print(f"Refining Seal #{i}...")
|
|||
|
|
output = det_model.predict(crop_path, batch_size=1)
|
|||
|
|
all_polygons = []
|
|||
|
|
for res in output:
|
|||
|
|
# SealTextDetection returns dt_polys directly in the result
|
|||
|
|
polys = res.get('dt_polys') if isinstance(res, dict) else None
|
|||
|
|
if polys:
|
|||
|
|
all_polygons.extend(polys)
|
|||
|
|
print(f" Found {len(polys)} text polygons in seal #{i}")
|
|||
|
|
|
|||
|
|
ch, cw = seal_crop.shape[:2]
|
|||
|
|
center = [cw // 2, ch // 2]
|
|||
|
|
radius = min(cw, ch) // 2 - 10
|
|||
|
|
|
|||
|
|
start_theta, extent = calculate_precise_arc(all_polygons, center)
|
|||
|
|
marked = seal_crop.copy()
|
|||
|
|
for p in all_polygons:
|
|||
|
|
cv2.polylines(marked, [np.array(p, dtype=np.int32)], True, (0, 255, 0), 2)
|
|||
|
|
|
|||
|
|
unwarp_name = f"seal_unwarp_{i}.png"
|
|||
|
|
unwarp_path = os.path.join(output_dir, unwarp_name)
|
|||
|
|
unwarp = None
|
|||
|
|
if extent > 0:
|
|||
|
|
unwarp = polar_unwarp(seal_crop, center, radius, start_theta, extent)
|
|||
|
|
if unwarp is not None:
|
|||
|
|
cv2.imwrite(unwarp_path, unwarp)
|
|||
|
|
def draw_line(m, theta, color):
|
|||
|
|
x = center[0] + radius * math.cos(theta)
|
|||
|
|
y = center[1] + radius * math.sin(theta)
|
|||
|
|
cv2.line(m, (int(center[0]), int(center[1])), (int(x), int(y)), color, 2)
|
|||
|
|
draw_line(marked, start_theta, (255, 0, 0))
|
|||
|
|
draw_line(marked, start_theta + extent, (0, 0, 255))
|
|||
|
|
|
|||
|
|
marked_name = f"seal_marked_{i}.png"
|
|||
|
|
cv2.imwrite(os.path.join(output_dir, marked_name), marked)
|
|||
|
|
seal_results.append({'index': i, 'crop': f"seal_crop_{i}.png", 'marked': marked_name, 'unwarp': unwarp_name if unwarp is not None else None})
|
|||
|
|
|
|||
|
|
# Integrated HTML Template
|
|||
|
|
html = f"""
|
|||
|
|
<html><body style="font-family: sans-serif; padding: 20px; background: #fdfdfd;">
|
|||
|
|
<h1>Integrated Workflow: Paddlex Layout Analysis + OCR</h1>
|
|||
|
|
<div style="background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.05); margin-bottom: 40px;">
|
|||
|
|
<h3>1. Document Layout Detection (Paddlex PP-DocLayout-L)</h3>
|
|||
|
|
<p>File: WTS2025-21283.pdf | Detected Regions: {len(all_regions)}</p>
|
|||
|
|
<img src="doc_layout_viz.png" style="max-width: 100%; border: 1px solid #999;">
|
|||
|
|
</div>
|
|||
|
|
<div>
|
|||
|
|
<h2>2. Refined Seal Extraction & Unwarping</h2>
|
|||
|
|
{"".join([f'''
|
|||
|
|
<div style="margin-bottom: 40px; border-bottom: 2px solid #eee; padding-bottom: 20px;">
|
|||
|
|
<h3>Seal Area #{s['index']}</h3>
|
|||
|
|
<div style="display: flex; gap: 20px;">
|
|||
|
|
<div style="background:white; padding:10px; border-radius:4px; box-shadow: 0 1px 3px rgba(0,0,0,0.1);">
|
|||
|
|
<p style="margin-top:0;">Detection Overlay</p>
|
|||
|
|
<img src="{s['marked']}" style="max-height: 350px;">
|
|||
|
|
</div>
|
|||
|
|
<div style="flex-grow:1; background:white; padding:10px; border-radius:4px; box-shadow: 0 1px 3px rgba(0,0,0,0.1);">
|
|||
|
|
<p style="margin-top:0;">Unwarped Organization Name</p>
|
|||
|
|
{f'<img src="{s["unwarp"]}" style="max-width: 100%; border: 1px solid #ddd;">' if s['unwarp'] else '<p style="color:red;">No text arc found in this crop.</p>'}
|
|||
|
|
</div>
|
|||
|
|
</div>
|
|||
|
|
</div>
|
|||
|
|
''' for s in seal_results]) if seal_results else '<p>No seals detected for unwarping.</p>'}
|
|||
|
|
</div>
|
|||
|
|
</body></html>
|
|||
|
|
"""
|
|||
|
|
with open(os.path.join(output_dir, "index.html"), "w", encoding="utf-8") as f:
|
|||
|
|
f.write(html)
|
|||
|
|
print(f"Workflow Complete: {output_dir}/index.html")
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
pdf_path = r"src/test/resources/data/pdfs/关于中检测试技术(广东)集团有限公司检验检测资质的调查取证函(局长件)_pages11-14.pdf"
|
|||
|
|
process_full_workflow(pdf_path)
|