184 lines
7.0 KiB
Python
184 lines
7.0 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""Batch extract municipal risk data and load it into PostgreSQL."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import json
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Iterator, Sequence
|
||
|
|
|
||
|
|
import psycopg
|
||
|
|
from extract_risks import parse_workbook
|
||
|
|
from load_json_to_db import DbConfig, import_data, override_config
|
||
|
|
|
||
|
|
|
||
|
|
def discover_workbooks(directory: Path) -> Iterator[Path]:
|
||
|
|
for path in sorted(directory.glob("*.xlsx")):
|
||
|
|
if path.name.startswith("~$"):
|
||
|
|
continue
|
||
|
|
yield path
|
||
|
|
|
||
|
|
|
||
|
|
def dump_json(payload: dict, destination: Path) -> None:
|
||
|
|
destination.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
with destination.open("w", encoding="utf-8") as handle:
|
||
|
|
json.dump(payload, handle, ensure_ascii=False, indent=2)
|
||
|
|
|
||
|
|
|
||
|
|
def parse_args() -> argparse.Namespace:
|
||
|
|
parser = argparse.ArgumentParser(
|
||
|
|
description="Extract municipal sheets from workbooks and import them into PostgreSQL."
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"source_dir",
|
||
|
|
nargs="?",
|
||
|
|
type=Path,
|
||
|
|
default=Path("processing"),
|
||
|
|
help="Directory containing the .xlsx workbooks (default: ./processing).",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--output-dir",
|
||
|
|
type=Path,
|
||
|
|
help="Directory for generated JSON files (default: same as source directory).",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--regions",
|
||
|
|
nargs="+",
|
||
|
|
default=["市级"],
|
||
|
|
help="Region sheet names to keep (default: 市级).",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--dry-run",
|
||
|
|
action="store_true",
|
||
|
|
help="Skip database imports; only emit JSON files.",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--host", default=None, help="PostgreSQL host (default 172.24.240.1)"
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--port", type=int, default=None, help="PostgreSQL port (default 5432)"
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--dbname", default=None, help="Target database name (default licensing_risks)"
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--user", default=None, help="Database user (default postgres)"
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--password", default=None, help="Database password (default 123456)"
|
||
|
|
)
|
||
|
|
return parser.parse_args()
|
||
|
|
|
||
|
|
|
||
|
|
def build_output_path(workbook: Path, output_dir: Path, regions: Sequence[str]) -> Path:
|
||
|
|
region_label = "_".join(regions)
|
||
|
|
return output_dir / f"{workbook.stem}_{region_label}.json"
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> None:
|
||
|
|
args = parse_args()
|
||
|
|
source_dir: Path = args.source_dir
|
||
|
|
if not source_dir.exists():
|
||
|
|
raise SystemExit(f"Source directory not found: {source_dir}")
|
||
|
|
|
||
|
|
output_dir = args.output_dir or source_dir
|
||
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
|
|
||
|
|
cfg: DbConfig = override_config(args)
|
||
|
|
|
||
|
|
total_processed = 0
|
||
|
|
total_imported = 0
|
||
|
|
aggregate_stats: dict[str, int] = {}
|
||
|
|
skipped_imports: list[str] = []
|
||
|
|
failed_imports: list[str] = []
|
||
|
|
skipped_payloads: list[str] = []
|
||
|
|
|
||
|
|
for workbook in discover_workbooks(source_dir):
|
||
|
|
payload = parse_workbook(workbook, target_regions=args.regions)
|
||
|
|
if not payload:
|
||
|
|
print(f"Skipped {workbook.name}: no matching regions")
|
||
|
|
skipped_payloads.append(workbook.name)
|
||
|
|
continue
|
||
|
|
|
||
|
|
json_path = build_output_path(workbook, output_dir, args.regions)
|
||
|
|
dump_json(payload, json_path)
|
||
|
|
total_processed += 1
|
||
|
|
print(f"Wrote {json_path}")
|
||
|
|
|
||
|
|
if args.dry_run:
|
||
|
|
continue
|
||
|
|
|
||
|
|
try:
|
||
|
|
stats = import_data(cfg, payload)
|
||
|
|
except psycopg.Error as exc:
|
||
|
|
print(f"Import failed for {workbook.name}: {exc}")
|
||
|
|
failed_imports.append(workbook.name)
|
||
|
|
continue
|
||
|
|
else:
|
||
|
|
total_imported += 1
|
||
|
|
for key, value in stats.items():
|
||
|
|
aggregate_stats[key] = aggregate_stats.get(key, 0) + value
|
||
|
|
duplicates = stats.get("risks_existing", 0)
|
||
|
|
inserted = stats.get("risks_inserted", 0)
|
||
|
|
inserted_total = sum(
|
||
|
|
stats.get(key, 0)
|
||
|
|
for key in (
|
||
|
|
"regions_inserted",
|
||
|
|
"scopes_inserted",
|
||
|
|
"themes_inserted",
|
||
|
|
"permits_inserted",
|
||
|
|
"risks_inserted",
|
||
|
|
"region_scopes_inserted",
|
||
|
|
"region_themes_inserted",
|
||
|
|
"region_theme_permits_inserted",
|
||
|
|
"region_permit_risks_inserted",
|
||
|
|
"region_permit_scopes_inserted",
|
||
|
|
"permit_subitems_inserted",
|
||
|
|
"region_permit_subitems_inserted",
|
||
|
|
"region_permit_details_inserted",
|
||
|
|
)
|
||
|
|
)
|
||
|
|
print(
|
||
|
|
f"Imported {workbook.name} into {cfg.dbname} "
|
||
|
|
f"(new risks: {inserted}, skipped existing: {duplicates})"
|
||
|
|
)
|
||
|
|
if inserted_total == 0:
|
||
|
|
skipped_imports.append(workbook.name)
|
||
|
|
|
||
|
|
print(
|
||
|
|
f"Completed. Workbooks processed: {total_processed}, imported: {total_imported}, JSON directory: {output_dir}"
|
||
|
|
)
|
||
|
|
if aggregate_stats:
|
||
|
|
print(
|
||
|
|
"Database import summary: "
|
||
|
|
f"regions inserted {aggregate_stats.get('regions_inserted', 0)}, "
|
||
|
|
f"regions skipped {aggregate_stats.get('regions_existing', 0)}, "
|
||
|
|
f"permits inserted {aggregate_stats.get('permits_inserted', 0)}, "
|
||
|
|
f"permits skipped {aggregate_stats.get('permits_existing', 0)}, "
|
||
|
|
f"scopes inserted {aggregate_stats.get('scopes_inserted', 0)}, "
|
||
|
|
f"scopes skipped {aggregate_stats.get('scopes_existing', 0)}, "
|
||
|
|
f"risks inserted {aggregate_stats.get('risks_inserted', 0)}, "
|
||
|
|
f"risks skipped {aggregate_stats.get('risks_existing', 0)}, "
|
||
|
|
f"region_permit links inserted {aggregate_stats.get('region_permit_risks_inserted', 0)}, "
|
||
|
|
f"links skipped {aggregate_stats.get('region_permit_risks_existing', 0)}, "
|
||
|
|
f"region_permit_scopes inserted {aggregate_stats.get('region_permit_scopes_inserted', 0)}, "
|
||
|
|
f"region_permit_scopes skipped {aggregate_stats.get('region_permit_scopes_existing', 0)}, "
|
||
|
|
f"permit_subitems inserted {aggregate_stats.get('permit_subitems_inserted', 0)}, "
|
||
|
|
f"permit_subitems skipped {aggregate_stats.get('permit_subitems_existing', 0)}, "
|
||
|
|
f"region_permit_subitems inserted {aggregate_stats.get('region_permit_subitems_inserted', 0)}, "
|
||
|
|
f"region_permit_subitems skipped {aggregate_stats.get('region_permit_subitems_existing', 0)}, "
|
||
|
|
f"region_permit_details inserted {aggregate_stats.get('region_permit_details_inserted', 0)}, "
|
||
|
|
f"region_permit_details skipped {aggregate_stats.get('region_permit_details_existing', 0)}"
|
||
|
|
)
|
||
|
|
if skipped_imports:
|
||
|
|
print("No new rows inserted for:", ", ".join(skipped_imports))
|
||
|
|
if failed_imports:
|
||
|
|
print("Imports failed for:", ", ".join(failed_imports))
|
||
|
|
if skipped_payloads:
|
||
|
|
print("No matching sheets found in:", ", ".join(skipped_payloads))
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|