fs-lawrisk/refer/batch_process_municipal.py

184 lines
7.0 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Batch extract municipal risk data and load it into PostgreSQL."""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Iterator, Sequence
import psycopg
from extract_risks import parse_workbook
from load_json_to_db import DbConfig, import_data, override_config
def discover_workbooks(directory: Path) -> Iterator[Path]:
for path in sorted(directory.glob("*.xlsx")):
if path.name.startswith("~$"):
continue
yield path
def dump_json(payload: dict, destination: Path) -> None:
destination.parent.mkdir(parents=True, exist_ok=True)
with destination.open("w", encoding="utf-8") as handle:
json.dump(payload, handle, ensure_ascii=False, indent=2)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Extract municipal sheets from workbooks and import them into PostgreSQL."
)
parser.add_argument(
"source_dir",
nargs="?",
type=Path,
default=Path("processing"),
help="Directory containing the .xlsx workbooks (default: ./processing).",
)
parser.add_argument(
"--output-dir",
type=Path,
help="Directory for generated JSON files (default: same as source directory).",
)
parser.add_argument(
"--regions",
nargs="+",
default=["市级"],
help="Region sheet names to keep (default: 市级).",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Skip database imports; only emit JSON files.",
)
parser.add_argument(
"--host", default=None, help="PostgreSQL host (default 172.24.240.1)"
)
parser.add_argument(
"--port", type=int, default=None, help="PostgreSQL port (default 5432)"
)
parser.add_argument(
"--dbname", default=None, help="Target database name (default licensing_risks)"
)
parser.add_argument(
"--user", default=None, help="Database user (default postgres)"
)
parser.add_argument(
"--password", default=None, help="Database password (default 123456)"
)
return parser.parse_args()
def build_output_path(workbook: Path, output_dir: Path, regions: Sequence[str]) -> Path:
region_label = "_".join(regions)
return output_dir / f"{workbook.stem}_{region_label}.json"
def main() -> None:
args = parse_args()
source_dir: Path = args.source_dir
if not source_dir.exists():
raise SystemExit(f"Source directory not found: {source_dir}")
output_dir = args.output_dir or source_dir
output_dir.mkdir(parents=True, exist_ok=True)
cfg: DbConfig = override_config(args)
total_processed = 0
total_imported = 0
aggregate_stats: dict[str, int] = {}
skipped_imports: list[str] = []
failed_imports: list[str] = []
skipped_payloads: list[str] = []
for workbook in discover_workbooks(source_dir):
payload = parse_workbook(workbook, target_regions=args.regions)
if not payload:
print(f"Skipped {workbook.name}: no matching regions")
skipped_payloads.append(workbook.name)
continue
json_path = build_output_path(workbook, output_dir, args.regions)
dump_json(payload, json_path)
total_processed += 1
print(f"Wrote {json_path}")
if args.dry_run:
continue
try:
stats = import_data(cfg, payload)
except psycopg.Error as exc:
print(f"Import failed for {workbook.name}: {exc}")
failed_imports.append(workbook.name)
continue
else:
total_imported += 1
for key, value in stats.items():
aggregate_stats[key] = aggregate_stats.get(key, 0) + value
duplicates = stats.get("risks_existing", 0)
inserted = stats.get("risks_inserted", 0)
inserted_total = sum(
stats.get(key, 0)
for key in (
"regions_inserted",
"scopes_inserted",
"themes_inserted",
"permits_inserted",
"risks_inserted",
"region_scopes_inserted",
"region_themes_inserted",
"region_theme_permits_inserted",
"region_permit_risks_inserted",
"region_permit_scopes_inserted",
"permit_subitems_inserted",
"region_permit_subitems_inserted",
"region_permit_details_inserted",
)
)
print(
f"Imported {workbook.name} into {cfg.dbname} "
f"(new risks: {inserted}, skipped existing: {duplicates})"
)
if inserted_total == 0:
skipped_imports.append(workbook.name)
print(
f"Completed. Workbooks processed: {total_processed}, imported: {total_imported}, JSON directory: {output_dir}"
)
if aggregate_stats:
print(
"Database import summary: "
f"regions inserted {aggregate_stats.get('regions_inserted', 0)}, "
f"regions skipped {aggregate_stats.get('regions_existing', 0)}, "
f"permits inserted {aggregate_stats.get('permits_inserted', 0)}, "
f"permits skipped {aggregate_stats.get('permits_existing', 0)}, "
f"scopes inserted {aggregate_stats.get('scopes_inserted', 0)}, "
f"scopes skipped {aggregate_stats.get('scopes_existing', 0)}, "
f"risks inserted {aggregate_stats.get('risks_inserted', 0)}, "
f"risks skipped {aggregate_stats.get('risks_existing', 0)}, "
f"region_permit links inserted {aggregate_stats.get('region_permit_risks_inserted', 0)}, "
f"links skipped {aggregate_stats.get('region_permit_risks_existing', 0)}, "
f"region_permit_scopes inserted {aggregate_stats.get('region_permit_scopes_inserted', 0)}, "
f"region_permit_scopes skipped {aggregate_stats.get('region_permit_scopes_existing', 0)}, "
f"permit_subitems inserted {aggregate_stats.get('permit_subitems_inserted', 0)}, "
f"permit_subitems skipped {aggregate_stats.get('permit_subitems_existing', 0)}, "
f"region_permit_subitems inserted {aggregate_stats.get('region_permit_subitems_inserted', 0)}, "
f"region_permit_subitems skipped {aggregate_stats.get('region_permit_subitems_existing', 0)}, "
f"region_permit_details inserted {aggregate_stats.get('region_permit_details_inserted', 0)}, "
f"region_permit_details skipped {aggregate_stats.get('region_permit_details_existing', 0)}"
)
if skipped_imports:
print("No new rows inserted for:", ", ".join(skipped_imports))
if failed_imports:
print("Imports failed for:", ", ".join(failed_imports))
if skipped_payloads:
print("No matching sheets found in:", ", ".join(skipped_payloads))
if __name__ == "__main__":
main()