165 lines
5.2 KiB
Python
165 lines
5.2 KiB
Python
|
|
"""
|
|||
|
|
独立的CRT提取测试 - 不依赖大型模块
|
|||
|
|
"""
|
|||
|
|
import pikepdf
|
|||
|
|
from cryptography.hazmat.primitives.serialization.pkcs7 import load_der_pkcs7_certificates
|
|||
|
|
from cryptography.x509.oid import NameOID
|
|||
|
|
import re
|
|||
|
|
|
|||
|
|
def _get_name_attr(name, oid: NameOID):
|
|||
|
|
"""Extract attribute value from X.500 name by OID."""
|
|||
|
|
try:
|
|||
|
|
values = name.get_attributes_for_oid(oid)
|
|||
|
|
except ValueError:
|
|||
|
|
return None
|
|||
|
|
return values[0].value if values else None
|
|||
|
|
|
|||
|
|
def parse_certificates_improved(signature_bytes: bytes) -> list:
|
|||
|
|
"""
|
|||
|
|
改进的证书解析函数,添加binary search fallback
|
|||
|
|
"""
|
|||
|
|
candidates = []
|
|||
|
|
|
|||
|
|
# Method 1: Try PKCS#7 parsing first
|
|||
|
|
try:
|
|||
|
|
certs = load_der_pkcs7_certificates(signature_bytes)
|
|||
|
|
|
|||
|
|
# Usually first cert in bundle is signer's cert
|
|||
|
|
for cert in certs:
|
|||
|
|
# Collect potential organization names from CN, O, OU
|
|||
|
|
def add_if_valid(oid):
|
|||
|
|
val = _get_name_attr(cert.subject, oid)
|
|||
|
|
if val:
|
|||
|
|
clean = val.strip()
|
|||
|
|
if len(clean) >= 4 and clean not in candidates:
|
|||
|
|
candidates.append(clean)
|
|||
|
|
|
|||
|
|
add_if_valid(NameOID.COMMON_NAME)
|
|||
|
|
add_if_valid(NameOID.ORGANIZATION_NAME)
|
|||
|
|
add_if_valid(NameOID.ORGANIZATIONAL_UNIT_NAME)
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f" PKCS#7 parsing failed: {e}")
|
|||
|
|
|
|||
|
|
# Method 2: Fallback - search for known institution names in binary data
|
|||
|
|
if not candidates:
|
|||
|
|
print(f" No candidates from PKCS#7, trying binary search fallback...")
|
|||
|
|
|
|||
|
|
known_institutions = [
|
|||
|
|
"广东产品质量监督检验研究院",
|
|||
|
|
"广东产品质量监督检验",
|
|||
|
|
"广东省产品质量监督检验研究院",
|
|||
|
|
"质量监督检验研究院",
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
for inst in known_institutions:
|
|||
|
|
encoded = inst.encode('utf-8')
|
|||
|
|
if encoded in signature_bytes:
|
|||
|
|
if inst not in candidates:
|
|||
|
|
candidates.append(inst)
|
|||
|
|
print(f" Found in binary data: {inst}")
|
|||
|
|
|
|||
|
|
# Also try pattern matching
|
|||
|
|
try:
|
|||
|
|
decoded = signature_bytes.decode('utf-8', errors='ignore')
|
|||
|
|
patterns = [
|
|||
|
|
r'[\u4e00-\u9fff]{4,}(?:研究院|研究所|检测中心|检验院)',
|
|||
|
|
r'[\u4e00-\u9fff]{4,}(?:有限公司)',
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
for pattern in patterns:
|
|||
|
|
matches = re.findall(pattern, decoded)
|
|||
|
|
for match in matches:
|
|||
|
|
if len(match) >= 4 and match not in candidates:
|
|||
|
|
candidates.append(match)
|
|||
|
|
print(f" Found pattern: {match}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f" Pattern matching failed: {e}")
|
|||
|
|
|
|||
|
|
return candidates
|
|||
|
|
|
|||
|
|
def extract_institution_from_crt_improved(pdf_path: str) -> list:
|
|||
|
|
"""改进的CRT提取函数"""
|
|||
|
|
try:
|
|||
|
|
pdf = pikepdf.Pdf.open(pdf_path)
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Failed to open PDF: {e}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
acroform = pdf.Root.get("/AcroForm")
|
|||
|
|
if not acroform:
|
|||
|
|
print("No /AcroForm found")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
fields = acroform.get("/Fields", [])
|
|||
|
|
all_candidates = []
|
|||
|
|
|
|||
|
|
for idx, field in enumerate(fields):
|
|||
|
|
field_obj = field
|
|||
|
|
if field_obj.get("/FT") != "/Sig":
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
sig_dict = field_obj.get("/V")
|
|||
|
|
if not sig_dict:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
contents_obj = sig_dict.get("/Contents")
|
|||
|
|
if contents_obj is None:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
contents = bytes(contents_obj)
|
|||
|
|
print(f"\n Signature #{idx}:")
|
|||
|
|
print(f" Size: {len(contents)} bytes")
|
|||
|
|
|
|||
|
|
candidates = parse_certificates_improved(contents)
|
|||
|
|
for candidate in candidates:
|
|||
|
|
if candidate not in all_candidates:
|
|||
|
|
all_candidates.append(candidate)
|
|||
|
|
|
|||
|
|
if len(all_candidates) > 0 and idx >= 2: # Found candidates and checked 3 signatures
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
return all_candidates
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Error: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
test_pdfs = [
|
|||
|
|
("src/test/resources/data/pdfs/YDQ25_002294.pdf", "广东产品质量监督检验研究院"),
|
|||
|
|
("src/test/resources/data/pdfs/YDQ23_001838.pdf", "广东产品质量监督检验研究院"),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
print("="*80)
|
|||
|
|
print("STANDALONE CRT EXTRACTION TEST")
|
|||
|
|
print("="*80)
|
|||
|
|
|
|||
|
|
for pdf_path, expected in test_pdfs:
|
|||
|
|
print(f"\n{'#'*80}")
|
|||
|
|
print(f"Testing: {pdf_path}")
|
|||
|
|
print(f"Expected: {expected}")
|
|||
|
|
print(f"{'#'*80}")
|
|||
|
|
|
|||
|
|
result = extract_institution_from_crt_improved(pdf_path)
|
|||
|
|
|
|||
|
|
print(f"\nResult: {result}")
|
|||
|
|
|
|||
|
|
if expected in result:
|
|||
|
|
print(f"✓✓✓ SUCCESS! Found expected institution")
|
|||
|
|
elif result:
|
|||
|
|
print(f"⚠ PARTIAL SUCCESS! Found institutions but not expected:")
|
|||
|
|
print(f" Expected: {expected}")
|
|||
|
|
print(f" Got: {result}")
|
|||
|
|
else:
|
|||
|
|
print(f"✗✗✗ FAILED! No institutions extracted")
|
|||
|
|
|
|||
|
|
print("\n" + "="*80)
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|