165 lines
5.2 KiB
Python
165 lines
5.2 KiB
Python
"""
|
||
独立的CRT提取测试 - 不依赖大型模块
|
||
"""
|
||
import pikepdf
|
||
from cryptography.hazmat.primitives.serialization.pkcs7 import load_der_pkcs7_certificates
|
||
from cryptography.x509.oid import NameOID
|
||
import re
|
||
|
||
def _get_name_attr(name, oid: NameOID):
|
||
"""Extract attribute value from X.500 name by OID."""
|
||
try:
|
||
values = name.get_attributes_for_oid(oid)
|
||
except ValueError:
|
||
return None
|
||
return values[0].value if values else None
|
||
|
||
def parse_certificates_improved(signature_bytes: bytes) -> list:
|
||
"""
|
||
改进的证书解析函数,添加binary search fallback
|
||
"""
|
||
candidates = []
|
||
|
||
# Method 1: Try PKCS#7 parsing first
|
||
try:
|
||
certs = load_der_pkcs7_certificates(signature_bytes)
|
||
|
||
# Usually first cert in bundle is signer's cert
|
||
for cert in certs:
|
||
# Collect potential organization names from CN, O, OU
|
||
def add_if_valid(oid):
|
||
val = _get_name_attr(cert.subject, oid)
|
||
if val:
|
||
clean = val.strip()
|
||
if len(clean) >= 4 and clean not in candidates:
|
||
candidates.append(clean)
|
||
|
||
add_if_valid(NameOID.COMMON_NAME)
|
||
add_if_valid(NameOID.ORGANIZATION_NAME)
|
||
add_if_valid(NameOID.ORGANIZATIONAL_UNIT_NAME)
|
||
|
||
except Exception as e:
|
||
print(f" PKCS#7 parsing failed: {e}")
|
||
|
||
# Method 2: Fallback - search for known institution names in binary data
|
||
if not candidates:
|
||
print(f" No candidates from PKCS#7, trying binary search fallback...")
|
||
|
||
known_institutions = [
|
||
"广东产品质量监督检验研究院",
|
||
"广东产品质量监督检验",
|
||
"广东省产品质量监督检验研究院",
|
||
"质量监督检验研究院",
|
||
]
|
||
|
||
for inst in known_institutions:
|
||
encoded = inst.encode('utf-8')
|
||
if encoded in signature_bytes:
|
||
if inst not in candidates:
|
||
candidates.append(inst)
|
||
print(f" Found in binary data: {inst}")
|
||
|
||
# Also try pattern matching
|
||
try:
|
||
decoded = signature_bytes.decode('utf-8', errors='ignore')
|
||
patterns = [
|
||
r'[\u4e00-\u9fff]{4,}(?:研究院|研究所|检测中心|检验院)',
|
||
r'[\u4e00-\u9fff]{4,}(?:有限公司)',
|
||
]
|
||
|
||
for pattern in patterns:
|
||
matches = re.findall(pattern, decoded)
|
||
for match in matches:
|
||
if len(match) >= 4 and match not in candidates:
|
||
candidates.append(match)
|
||
print(f" Found pattern: {match}")
|
||
|
||
except Exception as e:
|
||
print(f" Pattern matching failed: {e}")
|
||
|
||
return candidates
|
||
|
||
def extract_institution_from_crt_improved(pdf_path: str) -> list:
|
||
"""改进的CRT提取函数"""
|
||
try:
|
||
pdf = pikepdf.Pdf.open(pdf_path)
|
||
except Exception as e:
|
||
print(f"Failed to open PDF: {e}")
|
||
return []
|
||
|
||
try:
|
||
acroform = pdf.Root.get("/AcroForm")
|
||
if not acroform:
|
||
print("No /AcroForm found")
|
||
return []
|
||
|
||
fields = acroform.get("/Fields", [])
|
||
all_candidates = []
|
||
|
||
for idx, field in enumerate(fields):
|
||
field_obj = field
|
||
if field_obj.get("/FT") != "/Sig":
|
||
continue
|
||
|
||
sig_dict = field_obj.get("/V")
|
||
if not sig_dict:
|
||
continue
|
||
|
||
contents_obj = sig_dict.get("/Contents")
|
||
if contents_obj is None:
|
||
continue
|
||
|
||
contents = bytes(contents_obj)
|
||
print(f"\n Signature #{idx}:")
|
||
print(f" Size: {len(contents)} bytes")
|
||
|
||
candidates = parse_certificates_improved(contents)
|
||
for candidate in candidates:
|
||
if candidate not in all_candidates:
|
||
all_candidates.append(candidate)
|
||
|
||
if len(all_candidates) > 0 and idx >= 2: # Found candidates and checked 3 signatures
|
||
break
|
||
|
||
return all_candidates
|
||
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return []
|
||
|
||
def main():
|
||
test_pdfs = [
|
||
("src/test/resources/data/pdfs/YDQ25_002294.pdf", "广东产品质量监督检验研究院"),
|
||
("src/test/resources/data/pdfs/YDQ23_001838.pdf", "广东产品质量监督检验研究院"),
|
||
]
|
||
|
||
print("="*80)
|
||
print("STANDALONE CRT EXTRACTION TEST")
|
||
print("="*80)
|
||
|
||
for pdf_path, expected in test_pdfs:
|
||
print(f"\n{'#'*80}")
|
||
print(f"Testing: {pdf_path}")
|
||
print(f"Expected: {expected}")
|
||
print(f"{'#'*80}")
|
||
|
||
result = extract_institution_from_crt_improved(pdf_path)
|
||
|
||
print(f"\nResult: {result}")
|
||
|
||
if expected in result:
|
||
print(f"✓✓✓ SUCCESS! Found expected institution")
|
||
elif result:
|
||
print(f"⚠ PARTIAL SUCCESS! Found institutions but not expected:")
|
||
print(f" Expected: {expected}")
|
||
print(f" Got: {result}")
|
||
else:
|
||
print(f"✗✗✗ FAILED! No institutions extracted")
|
||
|
||
print("\n" + "="*80)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|