#!/usr/bin/env python3 """Batch extract municipal risk data and load it into PostgreSQL.""" from __future__ import annotations import argparse import json from pathlib import Path from typing import Iterator, Sequence import psycopg from extract_risks import parse_workbook from load_json_to_db import DbConfig, import_data, override_config def discover_workbooks(directory: Path) -> Iterator[Path]: for path in sorted(directory.glob("*.xlsx")): if path.name.startswith("~$"): continue yield path def dump_json(payload: dict, destination: Path) -> None: destination.parent.mkdir(parents=True, exist_ok=True) with destination.open("w", encoding="utf-8") as handle: json.dump(payload, handle, ensure_ascii=False, indent=2) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Extract municipal sheets from workbooks and import them into PostgreSQL." ) parser.add_argument( "source_dir", nargs="?", type=Path, default=Path("processing"), help="Directory containing the .xlsx workbooks (default: ./processing).", ) parser.add_argument( "--output-dir", type=Path, help="Directory for generated JSON files (default: same as source directory).", ) parser.add_argument( "--regions", nargs="+", default=["市级"], help="Region sheet names to keep (default: 市级).", ) parser.add_argument( "--dry-run", action="store_true", help="Skip database imports; only emit JSON files.", ) parser.add_argument( "--host", default=None, help="PostgreSQL host (default 172.24.240.1)" ) parser.add_argument( "--port", type=int, default=None, help="PostgreSQL port (default 5432)" ) parser.add_argument( "--dbname", default=None, help="Target database name (default licensing_risks)" ) parser.add_argument( "--user", default=None, help="Database user (default postgres)" ) parser.add_argument( "--password", default=None, help="Database password (default 123456)" ) return parser.parse_args() def build_output_path(workbook: Path, output_dir: Path, regions: Sequence[str]) -> Path: region_label = "_".join(regions) return output_dir / f"{workbook.stem}_{region_label}.json" def main() -> None: args = parse_args() source_dir: Path = args.source_dir if not source_dir.exists(): raise SystemExit(f"Source directory not found: {source_dir}") output_dir = args.output_dir or source_dir output_dir.mkdir(parents=True, exist_ok=True) cfg: DbConfig = override_config(args) total_processed = 0 total_imported = 0 aggregate_stats: dict[str, int] = {} skipped_imports: list[str] = [] failed_imports: list[str] = [] skipped_payloads: list[str] = [] for workbook in discover_workbooks(source_dir): payload = parse_workbook(workbook, target_regions=args.regions) if not payload: print(f"Skipped {workbook.name}: no matching regions") skipped_payloads.append(workbook.name) continue json_path = build_output_path(workbook, output_dir, args.regions) dump_json(payload, json_path) total_processed += 1 print(f"Wrote {json_path}") if args.dry_run: continue try: stats = import_data(cfg, payload) except psycopg.Error as exc: print(f"Import failed for {workbook.name}: {exc}") failed_imports.append(workbook.name) continue else: total_imported += 1 for key, value in stats.items(): aggregate_stats[key] = aggregate_stats.get(key, 0) + value duplicates = stats.get("risks_existing", 0) inserted = stats.get("risks_inserted", 0) inserted_total = sum( stats.get(key, 0) for key in ( "regions_inserted", "scopes_inserted", "themes_inserted", "permits_inserted", "risks_inserted", "region_scopes_inserted", "region_themes_inserted", "region_theme_permits_inserted", "region_permit_risks_inserted", "region_permit_scopes_inserted", "permit_subitems_inserted", "region_permit_subitems_inserted", "region_permit_details_inserted", ) ) print( f"Imported {workbook.name} into {cfg.dbname} " f"(new risks: {inserted}, skipped existing: {duplicates})" ) if inserted_total == 0: skipped_imports.append(workbook.name) print( f"Completed. Workbooks processed: {total_processed}, imported: {total_imported}, JSON directory: {output_dir}" ) if aggregate_stats: print( "Database import summary: " f"regions inserted {aggregate_stats.get('regions_inserted', 0)}, " f"regions skipped {aggregate_stats.get('regions_existing', 0)}, " f"permits inserted {aggregate_stats.get('permits_inserted', 0)}, " f"permits skipped {aggregate_stats.get('permits_existing', 0)}, " f"scopes inserted {aggregate_stats.get('scopes_inserted', 0)}, " f"scopes skipped {aggregate_stats.get('scopes_existing', 0)}, " f"risks inserted {aggregate_stats.get('risks_inserted', 0)}, " f"risks skipped {aggregate_stats.get('risks_existing', 0)}, " f"region_permit links inserted {aggregate_stats.get('region_permit_risks_inserted', 0)}, " f"links skipped {aggregate_stats.get('region_permit_risks_existing', 0)}, " f"region_permit_scopes inserted {aggregate_stats.get('region_permit_scopes_inserted', 0)}, " f"region_permit_scopes skipped {aggregate_stats.get('region_permit_scopes_existing', 0)}, " f"permit_subitems inserted {aggregate_stats.get('permit_subitems_inserted', 0)}, " f"permit_subitems skipped {aggregate_stats.get('permit_subitems_existing', 0)}, " f"region_permit_subitems inserted {aggregate_stats.get('region_permit_subitems_inserted', 0)}, " f"region_permit_subitems skipped {aggregate_stats.get('region_permit_subitems_existing', 0)}, " f"region_permit_details inserted {aggregate_stats.get('region_permit_details_inserted', 0)}, " f"region_permit_details skipped {aggregate_stats.get('region_permit_details_existing', 0)}" ) if skipped_imports: print("No new rows inserted for:", ", ".join(skipped_imports)) if failed_imports: print("Imports failed for:", ", ".join(failed_imports)) if skipped_payloads: print("No matching sheets found in:", ", ".join(skipped_payloads)) if __name__ == "__main__": main()