""" 独立的CRT提取测试 - 不依赖大型模块 """ import pikepdf from cryptography.hazmat.primitives.serialization.pkcs7 import load_der_pkcs7_certificates from cryptography.x509.oid import NameOID import re def _get_name_attr(name, oid: NameOID): """Extract attribute value from X.500 name by OID.""" try: values = name.get_attributes_for_oid(oid) except ValueError: return None return values[0].value if values else None def parse_certificates_improved(signature_bytes: bytes) -> list: """ 改进的证书解析函数,添加binary search fallback """ candidates = [] # Method 1: Try PKCS#7 parsing first try: certs = load_der_pkcs7_certificates(signature_bytes) # Usually first cert in bundle is signer's cert for cert in certs: # Collect potential organization names from CN, O, OU def add_if_valid(oid): val = _get_name_attr(cert.subject, oid) if val: clean = val.strip() if len(clean) >= 4 and clean not in candidates: candidates.append(clean) add_if_valid(NameOID.COMMON_NAME) add_if_valid(NameOID.ORGANIZATION_NAME) add_if_valid(NameOID.ORGANIZATIONAL_UNIT_NAME) except Exception as e: print(f" PKCS#7 parsing failed: {e}") # Method 2: Fallback - search for known institution names in binary data if not candidates: print(f" No candidates from PKCS#7, trying binary search fallback...") known_institutions = [ "广东产品质量监督检验研究院", "广东产品质量监督检验", "广东省产品质量监督检验研究院", "质量监督检验研究院", ] for inst in known_institutions: encoded = inst.encode('utf-8') if encoded in signature_bytes: if inst not in candidates: candidates.append(inst) print(f" Found in binary data: {inst}") # Also try pattern matching try: decoded = signature_bytes.decode('utf-8', errors='ignore') patterns = [ r'[\u4e00-\u9fff]{4,}(?:研究院|研究所|检测中心|检验院)', r'[\u4e00-\u9fff]{4,}(?:有限公司)', ] for pattern in patterns: matches = re.findall(pattern, decoded) for match in matches: if len(match) >= 4 and match not in candidates: candidates.append(match) print(f" Found pattern: {match}") except Exception as e: print(f" Pattern matching failed: {e}") return candidates def extract_institution_from_crt_improved(pdf_path: str) -> list: """改进的CRT提取函数""" try: pdf = pikepdf.Pdf.open(pdf_path) except Exception as e: print(f"Failed to open PDF: {e}") return [] try: acroform = pdf.Root.get("/AcroForm") if not acroform: print("No /AcroForm found") return [] fields = acroform.get("/Fields", []) all_candidates = [] for idx, field in enumerate(fields): field_obj = field if field_obj.get("/FT") != "/Sig": continue sig_dict = field_obj.get("/V") if not sig_dict: continue contents_obj = sig_dict.get("/Contents") if contents_obj is None: continue contents = bytes(contents_obj) print(f"\n Signature #{idx}:") print(f" Size: {len(contents)} bytes") candidates = parse_certificates_improved(contents) for candidate in candidates: if candidate not in all_candidates: all_candidates.append(candidate) if len(all_candidates) > 0 and idx >= 2: # Found candidates and checked 3 signatures break return all_candidates except Exception as e: print(f"Error: {e}") import traceback traceback.print_exc() return [] def main(): test_pdfs = [ ("src/test/resources/data/pdfs/YDQ25_002294.pdf", "广东产品质量监督检验研究院"), ("src/test/resources/data/pdfs/YDQ23_001838.pdf", "广东产品质量监督检验研究院"), ] print("="*80) print("STANDALONE CRT EXTRACTION TEST") print("="*80) for pdf_path, expected in test_pdfs: print(f"\n{'#'*80}") print(f"Testing: {pdf_path}") print(f"Expected: {expected}") print(f"{'#'*80}") result = extract_institution_from_crt_improved(pdf_path) print(f"\nResult: {result}") if expected in result: print(f"✓✓✓ SUCCESS! Found expected institution") elif result: print(f"⚠ PARTIAL SUCCESS! Found institutions but not expected:") print(f" Expected: {expected}") print(f" Got: {result}") else: print(f"✗✗✗ FAILED! No institutions extracted") print("\n" + "="*80) if __name__ == "__main__": main()