feat: add permit risk snapshot workflow

This commit is contained in:
Codex Agent 2025-11-03 09:43:58 +08:00
parent d2faf03082
commit ec8adf98f1
5 changed files with 614 additions and 1 deletions

View File

@ -297,3 +297,65 @@ tail -f checkpoint.log | grep "Progress:"
---
**注意**: 所有日志都会立即输出到控制台,无需等待操作完成即可查看进度。
---
## 🧱 针对许可风险的精细化快照
> 新增:基于 `region_permit_risk_vw` 视图的原子检查点
当用户编辑某条许可风险记录(唯一键为 `region_id + permit_id + risk_id`)时,可调用以下工具函数先写入快照:
```python
from lawrisk.services.licensing_repo import create_permit_risk_snapshot
snapshot_meta = create_permit_risk_snapshot(
region_id="7c2b…",
permit_id="b93d…",
risk_id="a01f…",
edited_by="alice",
change_summary="调整法律依据与责任主体"
)
```
返回示例:
```python
{
"snapshot_id": "5c4b…",
"permit_risk_key": "7c2b…::b93d…::a01f…",
"version": 3,
"created_at": "2024-05-18T02:41:55.210412+00:00",
"edited_by": "alice",
"change_summary": "调整法律依据与责任主体",
"payload": {...完整的视图行...}
}
```
- 快照数据存放在 `permit_risk_snapshots` 表中,`payload` 记录视图的完整列。
- 使用 `list_permit_risk_snapshots(region_id=..., permit_id=..., risk_id=...)` 查看历史版本。
- 通过 `get_permit_risk_snapshot(snapshot_id)` 获取任意版本的详细内容,以便比对或手工回滚。
### 带快照的原子修改
```python
from lawrisk.services.licensing_repo import update_permit_risk_record
result = update_permit_risk_record(
region_id="7c2b…",
permit_id="b93d…",
risk_id="a01f…",
risk_content="新的风险提示要点",
legal_basis="《某条例》第三条",
permit_status="正常受理",
edited_by="alice",
change_summary="补充法规依据并更新状态"
)
print(result["snapshot"]["version"]) # 修改前版本
print(result["current"]["risk_content"]) # 修改后视图内容
```
- 函数会先锁定 `region_permit_risks` 目标行,写入快照,再执行字段更新。
- `risk_*` 参数对应 `risks` 表字段;`permit_*` 参数对应 `region_permit_details`
- 可传入 `None` 将字段清空;若不想修改某列则省略参数。

View File

@ -15,6 +15,7 @@ The `licensing_risks` PostgreSQL database stores municipal licensing risk prompt
| `region_theme_permits` | Region + theme + permit linkage | `region_id`, `theme_id`, `permit_id` |
| `risks` | 风险提示主体信息 | `id` (PK), `risk_content`, `legal_basis`, `document_no`, `summary` |
| `region_permit_risks` | Region + permit + risk linkage | `region_id`, `permit_id`, `risk_id` |
| `permit_risk_snapshots` | Versioned checkpoints per region/permit/risk | `(region_id, permit_id, risk_id, version)` |
All primary keys are integer sequences; unique indexes and `ON CONFLICT DO NOTHING` logic make repeated imports idempotent. Foreign keys should be enforced in the target schema to prevent orphan rows.
@ -71,3 +72,20 @@ For fuzzy lookups, switch to `WHERE p.name ILIKE '%关键词%'`.
- Export query results with `\copy (SELECT …) TO '/tmp/export.csv' WITH CSV HEADER;`.
- Run queries after imports commit; the loaders already wrap operations in transactions.
## Views
### `region_permit_risk_vw`
Provides a flattened row per `(region_id, permit_id, risk_id)` that includes:
- Region/permit names
- Risk narrative fields (`risk_content`, `legal_basis`, `document_no`, `summary`)
- Related detail columns from `region_permit_details`
- Aggregated theme, business-scope, and subitem identifiers/names
- Synthetic key `permit_risk_key = region_id::text || '::' || permit_id::text || '::' || risk_id::text`
Use this view for checkpointing and diff operations instead of re-joining the normalized tables. The underlying tables already have composite primary keys on `(region_id, permit_id, …)`, so no additional indexes are required for the view.
### 快照与更新服务
- `create_permit_risk_snapshot(region_id, permit_id, risk_id, edited_by=None, change_summary=None)`
- 独立保存当前视图行到 `permit_risk_snapshots`,常用于人工备份。
- `update_permit_risk_record(...)`
- 支持在事务内同时:① 生成快照;② 更新 `risks` 表(风险提示内容/法律依据等);③ UPSERT `region_permit_details`(许可状态、责任人等)。
- 未显式传参的列保持原样;传入 `None` 可将对应字段置空。

View File

@ -0,0 +1,65 @@
-- Region-permit risk consolidation view
-- Provides a flattened projection for transactional checkpoints and auditing.
BEGIN;
CREATE OR REPLACE VIEW public.region_permit_risk_vw AS
SELECT
rpr.region_id,
r.name AS region_name,
rpr.permit_id,
p.name AS permit_name,
rk.id AS risk_id,
rk.risk_content,
rk.legal_basis,
rk.document_no,
rk.summary,
COALESCE(theme.theme_ids, ARRAY[]::uuid[]) AS theme_ids,
COALESCE(theme.theme_names, ARRAY[]::text[]) AS theme_names,
COALESCE(scopes.scope_ids, ARRAY[]::uuid[]) AS scope_ids,
COALESCE(scopes.scope_descriptions, ARRAY[]::text[]) AS scope_descriptions,
COALESCE(subitems.subitem_ids, ARRAY[]::uuid[]) AS subitem_ids,
details.permit_status,
details.subitem_summary,
details.responsible_contact,
details.jurisdiction_scope,
details.updated_at AS permit_detail_updated_at,
concat_ws('::', rpr.region_id::text, rpr.permit_id::text, rpr.risk_id::text) AS permit_risk_key
FROM region_permit_risks rpr
JOIN regions r ON r.id = rpr.region_id
JOIN permits p ON p.id = rpr.permit_id
JOIN risks rk ON rk.id = rpr.risk_id
LEFT JOIN region_permit_details details
ON details.region_id = rpr.region_id
AND details.permit_id = rpr.permit_id
LEFT JOIN LATERAL (
SELECT
array_agg(DISTINCT rtp.theme_id) AS theme_ids,
array_agg(DISTINCT t.name) AS theme_names
FROM region_theme_permits rtp
JOIN themes t ON t.id = rtp.theme_id
WHERE rtp.region_id = rpr.region_id
AND rtp.permit_id = rpr.permit_id
) AS theme ON TRUE
LEFT JOIN LATERAL (
SELECT
array_agg(DISTINCT rps.scope_id) AS scope_ids,
array_agg(DISTINCT bs.description) AS scope_descriptions
FROM region_permit_scopes rps
JOIN business_scopes bs ON bs.id = rps.scope_id
WHERE rps.region_id = rpr.region_id
AND rps.permit_id = rpr.permit_id
) AS scopes ON TRUE
LEFT JOIN LATERAL (
SELECT
array_agg(DISTINCT rpsub.subitem_id) AS subitem_ids
FROM region_permit_subitems rpsub
WHERE rpsub.region_id = rpr.region_id
AND rpsub.permit_id = rpr.permit_id
) AS subitems ON TRUE;
COMMENT ON VIEW public.region_permit_risk_vw IS
'Flattens region/permit/risk entities for checkpointing and diff workflows.';
COMMIT;

View File

@ -0,0 +1,25 @@
-- Versioned snapshots per region/permit/risk tuple.
BEGIN;
CREATE TABLE IF NOT EXISTS public.permit_risk_snapshots (
snapshot_id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
region_id uuid NOT NULL,
permit_id uuid NOT NULL,
risk_id uuid NOT NULL,
permit_risk_key text NOT NULL,
version integer NOT NULL,
payload jsonb NOT NULL,
edited_by text,
change_summary text,
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE UNIQUE INDEX IF NOT EXISTS permit_risk_snapshots_region_permit_risk_version_idx
ON public.permit_risk_snapshots (region_id, permit_id, risk_id, version);
CREATE INDEX IF NOT EXISTS permit_risk_snapshots_key_idx
ON public.permit_risk_snapshots (permit_risk_key);
COMMIT;

View File

@ -5,8 +5,10 @@ import logging
import os
import re
from collections import OrderedDict
from datetime import datetime
from datetime import datetime, date
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple
import uuid
import pg8000.dbapi as pg
@ -22,6 +24,7 @@ logger.propagate = False
# Separate configuration so legacy fs_law_risk integration keeps using PG_*
LIC_DEFAULT_DB = "licensing_risks"
_UNSET = object()
ARTICLE_HEADING_RE = re.compile(r"(?m)^(第[一二三四五六七八九十百零0-9]+条)")
ARTICLE_TOKEN_RE = re.compile(r"(?<!\*)(第[一二三四五六七八九十百零0-9]+条)(?!\*)")
@ -821,3 +824,443 @@ def delete_checkpoint(checkpoint_id: str) -> bool:
os.remove(checkpoint_file)
return True
return False
# ---------------------------------------------------------------------------
# Permit risk snapshot helpers (region_permit_risk_vw)
# ---------------------------------------------------------------------------
def _convert_snapshot_value(value: Any) -> Any:
"""Convert database values into JSON-serialisable primitives."""
if isinstance(value, (datetime, date)):
return value.isoformat()
if isinstance(value, Decimal):
return float(value)
if isinstance(value, uuid.UUID):
return str(value)
if isinstance(value, memoryview):
return bytes(value).decode("utf-8", errors="replace")
if isinstance(value, (bytes, bytearray)):
return value.decode("utf-8", errors="replace")
if isinstance(value, (list, tuple)):
return [_convert_snapshot_value(v) for v in value]
return value
def _normalize_snapshot_payload(record: Dict[str, Any]) -> Dict[str, Any]:
"""Return a JSON-safe copy of the permit risk view record."""
return {key: _convert_snapshot_value(val) for key, val in record.items()}
def _fetch_permit_risk_row(
conn: pg.Connection, region_id: str, permit_id: str, risk_id: str
) -> Dict[str, Any]:
"""Fetch a single row from the consolidation view."""
sql = """
SELECT
region_id,
region_name,
permit_id,
permit_name,
risk_id,
risk_content,
legal_basis,
document_no,
summary,
theme_ids,
theme_names,
scope_ids,
scope_descriptions,
subitem_ids,
permit_status,
subitem_summary,
responsible_contact,
jurisdiction_scope,
permit_detail_updated_at,
permit_risk_key
FROM region_permit_risk_vw
WHERE region_id = %s AND permit_id = %s AND risk_id = %s
LIMIT 1
"""
cur = conn.cursor()
cur.execute(sql, (region_id, permit_id, risk_id))
row = cur.fetchone()
if not row:
raise ValueError("Permit risk combination not found in consolidation view")
columns = [desc[0] for desc in cur.description]
return {columns[i]: row[i] for i in range(len(columns))}
def _insert_permit_risk_snapshot(
conn: pg.Connection,
payload: Dict[str, Any],
*,
edited_by: Optional[str],
change_summary: Optional[str],
) -> Dict[str, Any]:
"""Insert a snapshot row and return metadata."""
permit_risk_key = str(payload["permit_risk_key"])
cur = conn.cursor()
cur.execute(
"""
SELECT COALESCE(MAX(version), 0)
FROM permit_risk_snapshots
WHERE permit_risk_key = %s
FOR UPDATE
""",
(permit_risk_key,),
)
row = cur.fetchone()
next_version = (int(row[0]) + 1) if row else 1
cur.execute(
"""
INSERT INTO permit_risk_snapshots (
region_id,
permit_id,
risk_id,
permit_risk_key,
version,
payload,
edited_by,
change_summary
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING snapshot_id, created_at
""",
(
payload["region_id"],
payload["permit_id"],
payload["risk_id"],
permit_risk_key,
next_version,
pg.Json(payload),
edited_by,
change_summary,
),
)
snapshot_id, created_at = cur.fetchone()
return {
"snapshot_id": str(snapshot_id),
"region_id": payload["region_id"],
"permit_id": payload["permit_id"],
"risk_id": payload["risk_id"],
"permit_risk_key": permit_risk_key,
"version": next_version,
"created_at": _convert_snapshot_value(created_at),
"edited_by": edited_by,
"change_summary": change_summary or "",
"payload": payload,
}
def _create_snapshot_with_connection(
conn: pg.Connection,
region_id: str,
permit_id: str,
risk_id: str,
*,
edited_by: Optional[str],
change_summary: Optional[str],
) -> Dict[str, Any]:
"""Create a snapshot using an existing DB connection (no commit)."""
view_record = _fetch_permit_risk_row(conn, region_id, permit_id, risk_id)
payload = _normalize_snapshot_payload(view_record)
metadata = _insert_permit_risk_snapshot(
conn,
payload,
edited_by=edited_by,
change_summary=change_summary,
)
logger.info(
"[CHECKPOINT] Snapshot created: %s version %s",
metadata["permit_risk_key"],
metadata["version"],
)
return metadata
def create_permit_risk_snapshot(
region_id: str,
permit_id: str,
risk_id: str,
*,
edited_by: Optional[str] = None,
change_summary: Optional[str] = None,
) -> Dict[str, Any]:
"""
Capture the current state of a region/permit/risk record as a versioned snapshot.
Returns metadata about the created snapshot including version number.
"""
with _lic_pg_conn(autocommit=False) as conn:
try:
snapshot_meta = _create_snapshot_with_connection(
conn,
region_id,
permit_id,
risk_id,
edited_by=edited_by,
change_summary=change_summary,
)
conn.commit()
return snapshot_meta
except Exception:
conn.rollback()
raise
def list_permit_risk_snapshots(
region_id: Optional[str] = None,
permit_id: Optional[str] = None,
risk_id: Optional[str] = None,
*,
permit_risk_key: Optional[str] = None,
limit: int = 20,
offset: int = 0,
) -> List[Dict[str, Any]]:
"""
List snapshots for a region/permit/risk combination ordered by version descending.
At least one identifier (permit_risk_key or region/permit/risk) must be provided.
"""
filters: List[str] = []
params: List[Any] = []
if permit_risk_key:
filters.append("permit_risk_key = %s")
params.append(permit_risk_key)
else:
if region_id:
filters.append("region_id = %s")
params.append(region_id)
if permit_id:
filters.append("permit_id = %s")
params.append(permit_id)
if risk_id:
filters.append("risk_id = %s")
params.append(risk_id)
if not filters:
raise ValueError("At least one identifier must be provided to list snapshots")
filters_clause = " AND ".join(filters)
sql = f"""
SELECT snapshot_id, version, permit_risk_key, edited_by, change_summary, created_at
FROM permit_risk_snapshots
WHERE {filters_clause}
ORDER BY version DESC
LIMIT %s OFFSET %s
"""
params.extend([limit, offset])
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(sql, tuple(params))
rows = cur.fetchall()
snapshots: List[Dict[str, Any]] = []
for snapshot_id, version, key, editor, summary, created_at in rows:
snapshots.append(
{
"snapshot_id": str(snapshot_id),
"permit_risk_key": key,
"version": int(version),
"created_at": _convert_snapshot_value(created_at),
"edited_by": editor,
"change_summary": summary or "",
}
)
return snapshots
def get_permit_risk_snapshot(snapshot_id: str) -> Optional[Dict[str, Any]]:
"""Fetch a snapshot payload by its identifier."""
sql = """
SELECT
snapshot_id,
region_id,
permit_id,
risk_id,
permit_risk_key,
version,
payload,
edited_by,
change_summary,
created_at
FROM permit_risk_snapshots
WHERE snapshot_id = %s
"""
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(sql, (snapshot_id,))
row = cur.fetchone()
if not row:
return None
(
snap_id,
region_id,
permit_id,
risk_id,
permit_risk_key,
version,
payload,
edited_by,
change_summary,
created_at,
) = row
if isinstance(payload, (bytes, bytearray, memoryview)):
payload_obj = json.loads(payload)
else:
payload_obj = payload if isinstance(payload, dict) else json.loads(payload)
return {
"snapshot_id": str(snap_id),
"region_id": str(region_id),
"permit_id": str(permit_id),
"risk_id": str(risk_id),
"permit_risk_key": permit_risk_key,
"version": int(version),
"created_at": _convert_snapshot_value(created_at),
"edited_by": edited_by,
"change_summary": change_summary or "",
"payload": payload_obj,
}
def update_permit_risk_record(
region_id: str,
permit_id: str,
risk_id: str,
*,
risk_content: Any = _UNSET,
legal_basis: Any = _UNSET,
document_no: Any = _UNSET,
summary: Any = _UNSET,
permit_status: Any = _UNSET,
subitem_summary: Any = _UNSET,
responsible_contact: Any = _UNSET,
jurisdiction_scope: Any = _UNSET,
edited_by: Optional[str] = None,
change_summary: Optional[str] = None,
) -> Dict[str, Any]:
"""
Update the permit risk record while capturing a checkpoint snapshot beforehand.
Returns the snapshot metadata (pre-change) and the refreshed view row (post-change).
"""
update_flags = [
risk_content,
legal_basis,
document_no,
summary,
permit_status,
subitem_summary,
responsible_contact,
jurisdiction_scope,
]
if all(flag is _UNSET for flag in update_flags):
raise ValueError("No fields provided to update.")
with _lic_pg_conn(autocommit=False) as conn:
try:
cur = conn.cursor()
cur.execute(
"""
SELECT 1
FROM region_permit_risks
WHERE region_id = %s AND permit_id = %s AND risk_id = %s
FOR UPDATE
""",
(region_id, permit_id, risk_id),
)
if cur.fetchone() is None:
raise ValueError("Permit risk combination not found.")
snapshot_meta = _create_snapshot_with_connection(
conn,
region_id,
permit_id,
risk_id,
edited_by=edited_by,
change_summary=change_summary,
)
risk_updates: List[str] = []
risk_params: List[Any] = []
risk_fields = (
("risk_content", risk_content),
("legal_basis", legal_basis),
("document_no", document_no),
("summary", summary),
)
for column, value in risk_fields:
if value is not _UNSET:
risk_updates.append(f"{column} = %s")
risk_params.append(value)
if risk_updates:
risk_params.append(risk_id)
cur.execute(
f"UPDATE risks SET {', '.join(risk_updates)} WHERE id = %s",
tuple(risk_params),
)
detail_columns: List[str] = []
detail_values: List[Any] = []
detail_fields = (
("permit_status", permit_status),
("subitem_summary", subitem_summary),
("responsible_contact", responsible_contact),
("jurisdiction_scope", jurisdiction_scope),
)
for column, value in detail_fields:
if value is not _UNSET:
detail_columns.append(column)
detail_values.append(value)
details_updated = False
if detail_columns:
insert_cols = ", ".join(["region_id", "permit_id"] + detail_columns)
insert_placeholders = ", ".join(["%s"] * (2 + len(detail_values)))
update_assignments = ", ".join(
[f"{col} = EXCLUDED.{col}" for col in detail_columns]
)
sql = f"""
INSERT INTO region_permit_details ({insert_cols})
VALUES ({insert_placeholders})
ON CONFLICT (region_id, permit_id)
DO UPDATE SET
{update_assignments},
updated_at = now()
"""
cur.execute(
sql,
(region_id, permit_id, *detail_values),
)
details_updated = True
updated_record = _normalize_snapshot_payload(
_fetch_permit_risk_row(conn, region_id, permit_id, risk_id)
)
conn.commit()
logger.info(
"[CHECKPOINT] Permit risk updated: %s version %s -> new snapshot ready",
snapshot_meta["permit_risk_key"],
snapshot_meta["version"],
)
return {
"snapshot": snapshot_meta,
"current": updated_record,
"risk_updated": bool(risk_updates),
"details_updated": details_updated,
}
except Exception:
conn.rollback()
raise