fs-lawrisk/tools/audit_risks.py

113 lines
3.7 KiB
Python

import json
import os
from lawrisk.services import licensing_repo as lic_repo
from lawrisk.utils.env_loader import load_env
def clean_text(text):
if not text:
return ""
return str(text).strip()
def _clean_text(text):
return clean_text(text)
def audit_risks():
load_env()
conn = lic_repo._lic_pg_conn()
cur = conn.cursor()
# Get Region ID for '市级'
cur.execute("SELECT id FROM regions WHERE name = '市级'")
row = cur.fetchone()
if not row:
print("Region '市级' not found in DB.")
return
region_id = row[0]
print(f"Auditing Region: 市级 ({region_id})")
base_dir = r"市级初版-20251219\许可风险提示"
if not os.path.exists(base_dir):
print(f"Directory not found: {base_dir}")
return
mismatches = []
files = [f for f in os.listdir(base_dir) if f.endswith(".json")]
print(f"Scanning {len(files)} JSON files...")
processed_count = 0
for fname in files:
processed_count += 1
if processed_count % 5 == 0:
print(f"Processing file {processed_count}/{len(files)}: {fname}...")
fpath = os.path.join(base_dir, fname)
try:
with open(fpath, 'r', encoding='utf-8') as f:
data = json.load(f)
# Count risks in '市级' sheet only
sheet_rows = []
# Helper to normalize sheet name
target_sheet = None
for sname in sheets.keys():
if _clean_text(sname) == '市级' or '营业执照' in sname: # Special case for 109
target_sheet = sname
break
if not target_sheet:
# If no '市级', maybe report it?
# print(f"File {fname} has no 市级 sheet. Sheets: {list(sheets.keys())}")
continue
sheet_rows = sheets[target_sheet].get("rows", [])
file_counts = {}
for row in sheet_rows:
p_name = clean_text(row.get("permit_name"))
if p_name:
file_counts[p_name] = file_counts.get(p_name, 0) + 1
# Check DB
for p_name, f_count in file_counts.items():
cur.execute("""
SELECT count(*)
FROM region_permit_risks rpr
JOIN permits p ON p.id = rpr.permit_id
WHERE rpr.region_id = %s AND p.name = %s
""", (region_id, p_name))
db_count = cur.fetchone()[0]
if db_count != f_count:
mismatches.append({
"file": fname,
"permit": p_name,
"file_count": f_count,
"db_count": db_count,
"sheet": target_sheet
})
except Exception as e:
# print(f"Error reading {fname}: {e}")
pass
conn.close()
with open("audit_report.txt", "w", encoding="utf-8") as f:
f.write("\n" + "="*60 + "\n")
f.write("RISK COUNT MISMATCH REPORT\n")
f.write("="*60 + "\n")
if not mismatches:
f.write("All file risk counts match the database!\n")
else:
f.write(f"{'Permit Name':<40} | {'File':<6} | {'DB':<6} | {'Filename'}\n")
f.write("-" * 110 + "\n")
for m in mismatches:
f_short = (m['file'][:40] + '..') if len(m['file']) > 40 else m['file']
f.write(f"{m['permit'][:38]:<40} | {m['file_count']:<6} | {m['db_count']:<6} | {f_short}\n")
print("Report written to audit_report.txt")
if __name__ == "__main__":
audit_risks()