From ec8adf98f13d9bc97f089f633592700f1bd72dc0 Mon Sep 17 00:00:00 2001 From: Codex Agent Date: Mon, 3 Nov 2025 09:43:58 +0800 Subject: [PATCH] feat: add permit risk snapshot workflow --- CHECKPOINT_LOGGING_GUIDE.md | 62 +++ docs/DB_GUIDE.md | 18 + .../001_create_region_permit_risk_view.sql | 65 +++ ...002_create_permit_risk_snapshots_table.sql | 25 + lawrisk/services/licensing_repo.py | 445 +++++++++++++++++- 5 files changed, 614 insertions(+), 1 deletion(-) create mode 100644 docs/sql/001_create_region_permit_risk_view.sql create mode 100644 docs/sql/002_create_permit_risk_snapshots_table.sql diff --git a/CHECKPOINT_LOGGING_GUIDE.md b/CHECKPOINT_LOGGING_GUIDE.md index 8d570a3..3bdd255 100644 --- a/CHECKPOINT_LOGGING_GUIDE.md +++ b/CHECKPOINT_LOGGING_GUIDE.md @@ -297,3 +297,65 @@ tail -f checkpoint.log | grep "Progress:" --- **注意**: 所有日志都会立即输出到控制台,无需等待操作完成即可查看进度。 + +--- + +## 🧱 针对许可风险的精细化快照 + +> 新增:基于 `region_permit_risk_vw` 视图的原子检查点 + +当用户编辑某条许可风险记录(唯一键为 `region_id + permit_id + risk_id`)时,可调用以下工具函数先写入快照: + +```python +from lawrisk.services.licensing_repo import create_permit_risk_snapshot + +snapshot_meta = create_permit_risk_snapshot( + region_id="7c2b…", + permit_id="b93d…", + risk_id="a01f…", + edited_by="alice", + change_summary="调整法律依据与责任主体" +) +``` + +返回示例: + +```python +{ + "snapshot_id": "5c4b…", + "permit_risk_key": "7c2b…::b93d…::a01f…", + "version": 3, + "created_at": "2024-05-18T02:41:55.210412+00:00", + "edited_by": "alice", + "change_summary": "调整法律依据与责任主体", + "payload": {...完整的视图行...} +} +``` + +- 快照数据存放在 `permit_risk_snapshots` 表中,`payload` 记录视图的完整列。 +- 使用 `list_permit_risk_snapshots(region_id=..., permit_id=..., risk_id=...)` 查看历史版本。 +- 通过 `get_permit_risk_snapshot(snapshot_id)` 获取任意版本的详细内容,以便比对或手工回滚。 + +### 带快照的原子修改 + +```python +from lawrisk.services.licensing_repo import update_permit_risk_record + +result = update_permit_risk_record( + region_id="7c2b…", + permit_id="b93d…", + risk_id="a01f…", + risk_content="新的风险提示要点", + legal_basis="《某条例》第三条", + permit_status="正常受理", + edited_by="alice", + change_summary="补充法规依据并更新状态" +) + +print(result["snapshot"]["version"]) # 修改前版本 +print(result["current"]["risk_content"]) # 修改后视图内容 +``` + +- 函数会先锁定 `region_permit_risks` 目标行,写入快照,再执行字段更新。 +- `risk_*` 参数对应 `risks` 表字段;`permit_*` 参数对应 `region_permit_details`。 +- 可传入 `None` 将字段清空;若不想修改某列则省略参数。 diff --git a/docs/DB_GUIDE.md b/docs/DB_GUIDE.md index 309558d..8ba9d08 100644 --- a/docs/DB_GUIDE.md +++ b/docs/DB_GUIDE.md @@ -15,6 +15,7 @@ The `licensing_risks` PostgreSQL database stores municipal licensing risk prompt | `region_theme_permits` | Region + theme + permit linkage | `region_id`, `theme_id`, `permit_id` | | `risks` | 风险提示主体信息 | `id` (PK), `risk_content`, `legal_basis`, `document_no`, `summary` | | `region_permit_risks` | Region + permit + risk linkage | `region_id`, `permit_id`, `risk_id` | +| `permit_risk_snapshots` | Versioned checkpoints per region/permit/risk | `(region_id, permit_id, risk_id, version)` | All primary keys are integer sequences; unique indexes and `ON CONFLICT DO NOTHING` logic make repeated imports idempotent. Foreign keys should be enforced in the target schema to prevent orphan rows. @@ -71,3 +72,20 @@ For fuzzy lookups, switch to `WHERE p.name ILIKE '%关键词%'`. - Export query results with `\copy (SELECT …) TO '/tmp/export.csv' WITH CSV HEADER;`. - Run queries after imports commit; the loaders already wrap operations in transactions. +## Views +### `region_permit_risk_vw` +Provides a flattened row per `(region_id, permit_id, risk_id)` that includes: +- Region/permit names +- Risk narrative fields (`risk_content`, `legal_basis`, `document_no`, `summary`) +- Related detail columns from `region_permit_details` +- Aggregated theme, business-scope, and subitem identifiers/names +- Synthetic key `permit_risk_key = region_id::text || '::' || permit_id::text || '::' || risk_id::text` + +Use this view for checkpointing and diff operations instead of re-joining the normalized tables. The underlying tables already have composite primary keys on `(region_id, permit_id, …)`, so no additional indexes are required for the view. + +### 快照与更新服务 +- `create_permit_risk_snapshot(region_id, permit_id, risk_id, edited_by=None, change_summary=None)` + - 独立保存当前视图行到 `permit_risk_snapshots`,常用于人工备份。 +- `update_permit_risk_record(...)` + - 支持在事务内同时:① 生成快照;② 更新 `risks` 表(风险提示内容/法律依据等);③ UPSERT `region_permit_details`(许可状态、责任人等)。 + - 未显式传参的列保持原样;传入 `None` 可将对应字段置空。 diff --git a/docs/sql/001_create_region_permit_risk_view.sql b/docs/sql/001_create_region_permit_risk_view.sql new file mode 100644 index 0000000..ba72be9 --- /dev/null +++ b/docs/sql/001_create_region_permit_risk_view.sql @@ -0,0 +1,65 @@ +-- Region-permit risk consolidation view +-- Provides a flattened projection for transactional checkpoints and auditing. + +BEGIN; + +CREATE OR REPLACE VIEW public.region_permit_risk_vw AS +SELECT + rpr.region_id, + r.name AS region_name, + rpr.permit_id, + p.name AS permit_name, + rk.id AS risk_id, + rk.risk_content, + rk.legal_basis, + rk.document_no, + rk.summary, + COALESCE(theme.theme_ids, ARRAY[]::uuid[]) AS theme_ids, + COALESCE(theme.theme_names, ARRAY[]::text[]) AS theme_names, + COALESCE(scopes.scope_ids, ARRAY[]::uuid[]) AS scope_ids, + COALESCE(scopes.scope_descriptions, ARRAY[]::text[]) AS scope_descriptions, + COALESCE(subitems.subitem_ids, ARRAY[]::uuid[]) AS subitem_ids, + details.permit_status, + details.subitem_summary, + details.responsible_contact, + details.jurisdiction_scope, + details.updated_at AS permit_detail_updated_at, + concat_ws('::', rpr.region_id::text, rpr.permit_id::text, rpr.risk_id::text) AS permit_risk_key +FROM region_permit_risks rpr +JOIN regions r ON r.id = rpr.region_id +JOIN permits p ON p.id = rpr.permit_id +JOIN risks rk ON rk.id = rpr.risk_id +LEFT JOIN region_permit_details details + ON details.region_id = rpr.region_id + AND details.permit_id = rpr.permit_id +LEFT JOIN LATERAL ( + SELECT + array_agg(DISTINCT rtp.theme_id) AS theme_ids, + array_agg(DISTINCT t.name) AS theme_names + FROM region_theme_permits rtp + JOIN themes t ON t.id = rtp.theme_id + WHERE rtp.region_id = rpr.region_id + AND rtp.permit_id = rpr.permit_id +) AS theme ON TRUE +LEFT JOIN LATERAL ( + SELECT + array_agg(DISTINCT rps.scope_id) AS scope_ids, + array_agg(DISTINCT bs.description) AS scope_descriptions + FROM region_permit_scopes rps + JOIN business_scopes bs ON bs.id = rps.scope_id + WHERE rps.region_id = rpr.region_id + AND rps.permit_id = rpr.permit_id +) AS scopes ON TRUE +LEFT JOIN LATERAL ( + SELECT + array_agg(DISTINCT rpsub.subitem_id) AS subitem_ids + FROM region_permit_subitems rpsub + WHERE rpsub.region_id = rpr.region_id + AND rpsub.permit_id = rpr.permit_id +) AS subitems ON TRUE; + +COMMENT ON VIEW public.region_permit_risk_vw IS + 'Flattens region/permit/risk entities for checkpointing and diff workflows.'; + +COMMIT; + diff --git a/docs/sql/002_create_permit_risk_snapshots_table.sql b/docs/sql/002_create_permit_risk_snapshots_table.sql new file mode 100644 index 0000000..ab70ad5 --- /dev/null +++ b/docs/sql/002_create_permit_risk_snapshots_table.sql @@ -0,0 +1,25 @@ +-- Versioned snapshots per region/permit/risk tuple. + +BEGIN; + +CREATE TABLE IF NOT EXISTS public.permit_risk_snapshots ( + snapshot_id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + region_id uuid NOT NULL, + permit_id uuid NOT NULL, + risk_id uuid NOT NULL, + permit_risk_key text NOT NULL, + version integer NOT NULL, + payload jsonb NOT NULL, + edited_by text, + change_summary text, + created_at timestamptz NOT NULL DEFAULT now() +); + +CREATE UNIQUE INDEX IF NOT EXISTS permit_risk_snapshots_region_permit_risk_version_idx + ON public.permit_risk_snapshots (region_id, permit_id, risk_id, version); + +CREATE INDEX IF NOT EXISTS permit_risk_snapshots_key_idx + ON public.permit_risk_snapshots (permit_risk_key); + +COMMIT; + diff --git a/lawrisk/services/licensing_repo.py b/lawrisk/services/licensing_repo.py index 3664672..29e2097 100644 --- a/lawrisk/services/licensing_repo.py +++ b/lawrisk/services/licensing_repo.py @@ -5,8 +5,10 @@ import logging import os import re from collections import OrderedDict -from datetime import datetime +from datetime import datetime, date +from decimal import Decimal from typing import Any, Dict, List, Optional, Tuple +import uuid import pg8000.dbapi as pg @@ -22,6 +24,7 @@ logger.propagate = False # Separate configuration so legacy fs_law_risk integration keeps using PG_* LIC_DEFAULT_DB = "licensing_risks" +_UNSET = object() ARTICLE_HEADING_RE = re.compile(r"(?m)^(第[一二三四五六七八九十百零0-9]+条)") ARTICLE_TOKEN_RE = re.compile(r"(? bool: os.remove(checkpoint_file) return True return False + + +# --------------------------------------------------------------------------- +# Permit risk snapshot helpers (region_permit_risk_vw) +# --------------------------------------------------------------------------- + +def _convert_snapshot_value(value: Any) -> Any: + """Convert database values into JSON-serialisable primitives.""" + if isinstance(value, (datetime, date)): + return value.isoformat() + if isinstance(value, Decimal): + return float(value) + if isinstance(value, uuid.UUID): + return str(value) + if isinstance(value, memoryview): + return bytes(value).decode("utf-8", errors="replace") + if isinstance(value, (bytes, bytearray)): + return value.decode("utf-8", errors="replace") + if isinstance(value, (list, tuple)): + return [_convert_snapshot_value(v) for v in value] + return value + + +def _normalize_snapshot_payload(record: Dict[str, Any]) -> Dict[str, Any]: + """Return a JSON-safe copy of the permit risk view record.""" + return {key: _convert_snapshot_value(val) for key, val in record.items()} + + +def _fetch_permit_risk_row( + conn: pg.Connection, region_id: str, permit_id: str, risk_id: str +) -> Dict[str, Any]: + """Fetch a single row from the consolidation view.""" + sql = """ + SELECT + region_id, + region_name, + permit_id, + permit_name, + risk_id, + risk_content, + legal_basis, + document_no, + summary, + theme_ids, + theme_names, + scope_ids, + scope_descriptions, + subitem_ids, + permit_status, + subitem_summary, + responsible_contact, + jurisdiction_scope, + permit_detail_updated_at, + permit_risk_key + FROM region_permit_risk_vw + WHERE region_id = %s AND permit_id = %s AND risk_id = %s + LIMIT 1 + """ + cur = conn.cursor() + cur.execute(sql, (region_id, permit_id, risk_id)) + row = cur.fetchone() + if not row: + raise ValueError("Permit risk combination not found in consolidation view") + columns = [desc[0] for desc in cur.description] + return {columns[i]: row[i] for i in range(len(columns))} + + +def _insert_permit_risk_snapshot( + conn: pg.Connection, + payload: Dict[str, Any], + *, + edited_by: Optional[str], + change_summary: Optional[str], +) -> Dict[str, Any]: + """Insert a snapshot row and return metadata.""" + permit_risk_key = str(payload["permit_risk_key"]) + cur = conn.cursor() + cur.execute( + """ + SELECT COALESCE(MAX(version), 0) + FROM permit_risk_snapshots + WHERE permit_risk_key = %s + FOR UPDATE + """, + (permit_risk_key,), + ) + row = cur.fetchone() + next_version = (int(row[0]) + 1) if row else 1 + + cur.execute( + """ + INSERT INTO permit_risk_snapshots ( + region_id, + permit_id, + risk_id, + permit_risk_key, + version, + payload, + edited_by, + change_summary + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + RETURNING snapshot_id, created_at + """, + ( + payload["region_id"], + payload["permit_id"], + payload["risk_id"], + permit_risk_key, + next_version, + pg.Json(payload), + edited_by, + change_summary, + ), + ) + snapshot_id, created_at = cur.fetchone() + + return { + "snapshot_id": str(snapshot_id), + "region_id": payload["region_id"], + "permit_id": payload["permit_id"], + "risk_id": payload["risk_id"], + "permit_risk_key": permit_risk_key, + "version": next_version, + "created_at": _convert_snapshot_value(created_at), + "edited_by": edited_by, + "change_summary": change_summary or "", + "payload": payload, + } + + +def _create_snapshot_with_connection( + conn: pg.Connection, + region_id: str, + permit_id: str, + risk_id: str, + *, + edited_by: Optional[str], + change_summary: Optional[str], +) -> Dict[str, Any]: + """Create a snapshot using an existing DB connection (no commit).""" + view_record = _fetch_permit_risk_row(conn, region_id, permit_id, risk_id) + payload = _normalize_snapshot_payload(view_record) + metadata = _insert_permit_risk_snapshot( + conn, + payload, + edited_by=edited_by, + change_summary=change_summary, + ) + logger.info( + "[CHECKPOINT] Snapshot created: %s version %s", + metadata["permit_risk_key"], + metadata["version"], + ) + return metadata + + +def create_permit_risk_snapshot( + region_id: str, + permit_id: str, + risk_id: str, + *, + edited_by: Optional[str] = None, + change_summary: Optional[str] = None, +) -> Dict[str, Any]: + """ + Capture the current state of a region/permit/risk record as a versioned snapshot. + + Returns metadata about the created snapshot including version number. + """ + with _lic_pg_conn(autocommit=False) as conn: + try: + snapshot_meta = _create_snapshot_with_connection( + conn, + region_id, + permit_id, + risk_id, + edited_by=edited_by, + change_summary=change_summary, + ) + conn.commit() + return snapshot_meta + except Exception: + conn.rollback() + raise + + +def list_permit_risk_snapshots( + region_id: Optional[str] = None, + permit_id: Optional[str] = None, + risk_id: Optional[str] = None, + *, + permit_risk_key: Optional[str] = None, + limit: int = 20, + offset: int = 0, +) -> List[Dict[str, Any]]: + """ + List snapshots for a region/permit/risk combination ordered by version descending. + + At least one identifier (permit_risk_key or region/permit/risk) must be provided. + """ + filters: List[str] = [] + params: List[Any] = [] + + if permit_risk_key: + filters.append("permit_risk_key = %s") + params.append(permit_risk_key) + else: + if region_id: + filters.append("region_id = %s") + params.append(region_id) + if permit_id: + filters.append("permit_id = %s") + params.append(permit_id) + if risk_id: + filters.append("risk_id = %s") + params.append(risk_id) + + if not filters: + raise ValueError("At least one identifier must be provided to list snapshots") + + filters_clause = " AND ".join(filters) + sql = f""" + SELECT snapshot_id, version, permit_risk_key, edited_by, change_summary, created_at + FROM permit_risk_snapshots + WHERE {filters_clause} + ORDER BY version DESC + LIMIT %s OFFSET %s + """ + params.extend([limit, offset]) + + with _lic_pg_conn() as conn: + cur = conn.cursor() + cur.execute(sql, tuple(params)) + rows = cur.fetchall() + + snapshots: List[Dict[str, Any]] = [] + for snapshot_id, version, key, editor, summary, created_at in rows: + snapshots.append( + { + "snapshot_id": str(snapshot_id), + "permit_risk_key": key, + "version": int(version), + "created_at": _convert_snapshot_value(created_at), + "edited_by": editor, + "change_summary": summary or "", + } + ) + return snapshots + + +def get_permit_risk_snapshot(snapshot_id: str) -> Optional[Dict[str, Any]]: + """Fetch a snapshot payload by its identifier.""" + sql = """ + SELECT + snapshot_id, + region_id, + permit_id, + risk_id, + permit_risk_key, + version, + payload, + edited_by, + change_summary, + created_at + FROM permit_risk_snapshots + WHERE snapshot_id = %s + """ + with _lic_pg_conn() as conn: + cur = conn.cursor() + cur.execute(sql, (snapshot_id,)) + row = cur.fetchone() + + if not row: + return None + + ( + snap_id, + region_id, + permit_id, + risk_id, + permit_risk_key, + version, + payload, + edited_by, + change_summary, + created_at, + ) = row + + if isinstance(payload, (bytes, bytearray, memoryview)): + payload_obj = json.loads(payload) + else: + payload_obj = payload if isinstance(payload, dict) else json.loads(payload) + + return { + "snapshot_id": str(snap_id), + "region_id": str(region_id), + "permit_id": str(permit_id), + "risk_id": str(risk_id), + "permit_risk_key": permit_risk_key, + "version": int(version), + "created_at": _convert_snapshot_value(created_at), + "edited_by": edited_by, + "change_summary": change_summary or "", + "payload": payload_obj, + } + + +def update_permit_risk_record( + region_id: str, + permit_id: str, + risk_id: str, + *, + risk_content: Any = _UNSET, + legal_basis: Any = _UNSET, + document_no: Any = _UNSET, + summary: Any = _UNSET, + permit_status: Any = _UNSET, + subitem_summary: Any = _UNSET, + responsible_contact: Any = _UNSET, + jurisdiction_scope: Any = _UNSET, + edited_by: Optional[str] = None, + change_summary: Optional[str] = None, +) -> Dict[str, Any]: + """ + Update the permit risk record while capturing a checkpoint snapshot beforehand. + + Returns the snapshot metadata (pre-change) and the refreshed view row (post-change). + """ + update_flags = [ + risk_content, + legal_basis, + document_no, + summary, + permit_status, + subitem_summary, + responsible_contact, + jurisdiction_scope, + ] + if all(flag is _UNSET for flag in update_flags): + raise ValueError("No fields provided to update.") + + with _lic_pg_conn(autocommit=False) as conn: + try: + cur = conn.cursor() + cur.execute( + """ + SELECT 1 + FROM region_permit_risks + WHERE region_id = %s AND permit_id = %s AND risk_id = %s + FOR UPDATE + """, + (region_id, permit_id, risk_id), + ) + if cur.fetchone() is None: + raise ValueError("Permit risk combination not found.") + + snapshot_meta = _create_snapshot_with_connection( + conn, + region_id, + permit_id, + risk_id, + edited_by=edited_by, + change_summary=change_summary, + ) + + risk_updates: List[str] = [] + risk_params: List[Any] = [] + risk_fields = ( + ("risk_content", risk_content), + ("legal_basis", legal_basis), + ("document_no", document_no), + ("summary", summary), + ) + for column, value in risk_fields: + if value is not _UNSET: + risk_updates.append(f"{column} = %s") + risk_params.append(value) + + if risk_updates: + risk_params.append(risk_id) + cur.execute( + f"UPDATE risks SET {', '.join(risk_updates)} WHERE id = %s", + tuple(risk_params), + ) + + detail_columns: List[str] = [] + detail_values: List[Any] = [] + detail_fields = ( + ("permit_status", permit_status), + ("subitem_summary", subitem_summary), + ("responsible_contact", responsible_contact), + ("jurisdiction_scope", jurisdiction_scope), + ) + for column, value in detail_fields: + if value is not _UNSET: + detail_columns.append(column) + detail_values.append(value) + + details_updated = False + if detail_columns: + insert_cols = ", ".join(["region_id", "permit_id"] + detail_columns) + insert_placeholders = ", ".join(["%s"] * (2 + len(detail_values))) + update_assignments = ", ".join( + [f"{col} = EXCLUDED.{col}" for col in detail_columns] + ) + sql = f""" + INSERT INTO region_permit_details ({insert_cols}) + VALUES ({insert_placeholders}) + ON CONFLICT (region_id, permit_id) + DO UPDATE SET + {update_assignments}, + updated_at = now() + """ + cur.execute( + sql, + (region_id, permit_id, *detail_values), + ) + details_updated = True + + updated_record = _normalize_snapshot_payload( + _fetch_permit_risk_row(conn, region_id, permit_id, risk_id) + ) + conn.commit() + + logger.info( + "[CHECKPOINT] Permit risk updated: %s version %s -> new snapshot ready", + snapshot_meta["permit_risk_key"], + snapshot_meta["version"], + ) + + return { + "snapshot": snapshot_meta, + "current": updated_record, + "risk_updated": bool(risk_updates), + "details_updated": details_updated, + } + except Exception: + conn.rollback() + raise