fs-lawrisk/lawrisk/services/licensing_repo.py

328 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import os
import re
from collections import OrderedDict
from typing import Any, Dict, List, Optional, Tuple
import pg8000.dbapi as pg
# Separate configuration so legacy fs_law_risk integration keeps using PG_*
LIC_DEFAULT_DB = "licensing_risks"
ARTICLE_HEADING_RE = re.compile(r"(?m)^(第[一二三四五六七八九十百零0-9]+条)")
ARTICLE_TOKEN_RE = re.compile(r"(?<!\*)(第[一二三四五六七八九十百零0-9]+条)(?!\*)")
ARTICLE_NEWLINE_RE = re.compile(r"(?<!^)(?<!\n)(\*\*第[一二三四五六七八九十百零0-9]+条\*\*)")
CN_ENUM_INLINE_RE = re.compile(r"([;::。.])[ \t]*([一二三四五六七八九十百零]+)")
CN_ENUM_LINE_RE = re.compile(r"(?m)^\s*([一二三四五六七八九十百零]+)")
ARABIC_ENUM_INLINE_RE = re.compile(r"([;::。.,,])[ \t]*(\d+\.)")
ARABIC_ENUM_LINE_RE = re.compile(r"(?m)^\s*(\d+)\.")
NESTED_ENUM_INLINE_RE = re.compile(r"([;::。.])[ \t]*(\d+)")
NESTED_ENUM_LINE_RE = re.compile(r"(?m)^\s*(\d+)")
COLON_NEWLINE_RE = re.compile(r"\s*\n")
TRAILING_SPACE_RE = re.compile(r"[ \t]+\n")
EXTRA_NEWLINES_RE = re.compile(r"\n{3,}")
def _format_summary_markdown(summary: str) -> str:
"""Render Chinese legal excerpts as Markdown-friendly text."""
if not summary:
return ""
text = summary.replace("\r\n", "\n").strip()
if not text:
return ""
text = ARTICLE_HEADING_RE.sub(lambda m: f"**{m.group(1)}**", text)
text = CN_ENUM_INLINE_RE.sub(lambda m: f"{m.group(1)}\n- {m.group(2)} ", text)
text = CN_ENUM_LINE_RE.sub(lambda m: f"- {m.group(1)} ", text)
text = ARABIC_ENUM_INLINE_RE.sub(lambda m: f"{m.group(1)}\n {m.group(2)}", text)
text = ARABIC_ENUM_LINE_RE.sub(lambda m: f" {m.group(1)}.", text)
text = NESTED_ENUM_INLINE_RE.sub(lambda m: f"{m.group(1)}\n - {m.group(2)}", text)
text = NESTED_ENUM_LINE_RE.sub(lambda m: f" - {m.group(1)}", text)
text = ARTICLE_TOKEN_RE.sub(lambda m: f"**{m.group(1)}**", text)
text = ARTICLE_NEWLINE_RE.sub(lambda m: f"\n{m.group(1)}", text)
text = COLON_NEWLINE_RE.sub("\n", text)
text = EXTRA_NEWLINES_RE.sub("\n\n", text)
text = TRAILING_SPACE_RE.sub("\n", text)
text = re.sub(r"\n\s+\n", "\n\n", text)
return text.strip()
def _lic_pg_conn(autocommit: bool = False) -> pg.Connection:
host = os.getenv("LIC_PG_HOST", "172.24.240.1")
port = int(os.getenv("LIC_PG_PORT", os.getenv("PG_PORT", "5432")))
user = os.getenv("LIC_PG_USER", os.getenv("PG_USER", "postgres"))
password = os.getenv("LIC_PG_PASSWORD", "")
database = os.getenv("LIC_PG_DATABASE", LIC_DEFAULT_DB)
conn = pg.connect(host=host, port=port, user=user, password=password, database=database)
conn.autocommit = autocommit
return conn
def list_region_theme_options() -> List[Dict[str, str]]:
"""Return all region-theme pairs usable for LLM selection."""
sql = """
SELECT
rt.region_id,
r.name AS region_name,
rt.theme_id,
t.name AS theme_name
FROM region_themes rt
JOIN regions r ON r.id = rt.region_id
JOIN themes t ON t.id = rt.theme_id
ORDER BY r.name, t.name
"""
out: List[Dict[str, str]] = []
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(sql)
for region_id, region_name, theme_id, theme_name in cur.fetchall():
rid = str(region_id)
tid = str(theme_id)
out.append(
{
"option_id": f"{rid}:{tid}",
"region_id": rid,
"region_name": str(region_name),
"theme_id": tid,
"theme_name": str(theme_name),
"display_name": f"{region_name} · {theme_name}",
}
)
return out
def load_business_scopes(region_id: str) -> List[Dict[str, str]]:
"""List business scopes bound to a region."""
sql = """
SELECT bs.id, bs.description
FROM region_scopes rs
JOIN business_scopes bs ON bs.id = rs.scope_id
WHERE rs.region_id = %s
ORDER BY bs.description
"""
scopes: List[Dict[str, str]] = []
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(sql, (region_id,))
for scope_id, description in cur.fetchall():
scopes.append({"id": str(scope_id), "description": str(description)})
return scopes
def list_permits_for_region(region: str) -> List[Dict[str, str]]:
"""Return all permits available within a region (accepts id or name)."""
sql = """
SELECT DISTINCT p.id, p.name
FROM region_theme_permits rtp
JOIN permits p ON p.id = rtp.permit_id
JOIN regions r ON r.id = rtp.region_id
WHERE rtp.region_id::text = %s OR LOWER(r.name) = LOWER(%s)
ORDER BY p.name
"""
permits: List[Dict[str, str]] = []
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(sql, (region, region))
for permit_id, permit_name in cur.fetchall():
permits.append({"id": str(permit_id), "name": str(permit_name)})
return permits
def _load_permit_scopes_for_region(
conn: pg.Connection, region_id: str, permit_ids: List[str]
) -> Dict[str, List[Dict[str, str]]]:
"""Return mapping of permit_id -> business scopes for that permit within region."""
scope_map: Dict[str, List[Dict[str, str]]] = {pid: [] for pid in permit_ids}
if not permit_ids:
return scope_map
sql = """
SELECT rps.permit_id, bs.id, bs.description
FROM region_permit_scopes rps
JOIN business_scopes bs ON bs.id = rps.scope_id
WHERE rps.region_id = %s
ORDER BY rps.permit_id, bs.description
"""
cur = conn.cursor()
try:
cur.execute(sql, (region_id,))
except pg.ProgrammingError as exc:
# 42P01 => undefined_table; allow fallback when migration not yet applied.
sqlstate = getattr(exc, "sqlstate", "")
if sqlstate == "42P01":
return scope_map
raise
for permit_id, scope_id, description in cur.fetchall():
pid = str(permit_id)
if pid not in scope_map:
continue
scope_map[pid].append({"id": str(scope_id), "description": str(description)})
return scope_map
def load_permits_and_risks(
region_id: str, theme_id: str, permit_id: Optional[str] = None
) -> List[Dict[str, object]]:
"""Return permits with attached risk entries for a region-theme pair."""
sql = """
SELECT
p.id AS permit_id,
p.name AS permit_name,
rk.id AS risk_id,
rk.risk_content,
rk.legal_basis,
rk.document_no,
rk.summary,
rpd.permit_status,
rpd.subitem_summary,
rpd.responsible_contact,
rpd.jurisdiction_scope
FROM region_theme_permits rtp
JOIN permits p ON p.id = rtp.permit_id
LEFT JOIN region_permit_risks rpr
ON rpr.region_id = rtp.region_id
AND rpr.permit_id = rtp.permit_id
LEFT JOIN risks rk ON rk.id = rpr.risk_id
LEFT JOIN region_permit_details rpd
ON rpd.region_id = rtp.region_id
AND rpd.permit_id = rtp.permit_id
WHERE rtp.region_id = %s AND rtp.theme_id = %s
"""
params: List[Any] = [region_id, theme_id]
if permit_id is not None:
sql += " AND rtp.permit_id = %s"
params.append(permit_id)
sql += """
ORDER BY p.name, rk.risk_content
"""
permits: Dict[str, Dict[str, object]] = {}
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(sql, tuple(params))
for row in cur.fetchall():
(
permit_id,
permit_name,
risk_id,
risk_content,
legal_basis,
document_no,
summary,
permit_status,
subitem_summary,
responsible_contact,
jurisdiction_scope,
) = row
pid = str(permit_id)
entry = permits.setdefault(
pid,
{
"id": pid,
"name": str(permit_name),
"business_scopes": [],
"risks": [],
"permit_status": None,
"subitem_summary": None,
"responsible_contact": None,
"jurisdiction_scope": None,
},
)
if entry["permit_status"] is None and permit_status:
entry["permit_status"] = permit_status.strip() or None
if entry["subitem_summary"] is None and subitem_summary:
entry["subitem_summary"] = subitem_summary.strip() or None
if entry["responsible_contact"] is None and responsible_contact:
entry["responsible_contact"] = responsible_contact.strip() or None
if entry["jurisdiction_scope"] is None and jurisdiction_scope:
entry["jurisdiction_scope"] = jurisdiction_scope.strip() or None
if risk_id is not None:
summary_markdown = _format_summary_markdown(summary or "")
entry["risks"].append(
{
"id": str(risk_id),
"risk_content": risk_content or "",
"legal_basis": legal_basis or "",
"document_no": document_no or "",
"summary": summary_markdown,
}
)
permit_ids = list(permits.keys())
scope_map = _load_permit_scopes_for_region(conn, region_id, permit_ids)
for pid in permit_ids:
permits[pid]["business_scopes"] = scope_map.get(pid, [])
return list(permits.values())
def find_permit_contexts_by_name(permit_name: str) -> List[Dict[str, str]]:
"""Return region/theme contexts for permits with an exact name match."""
if not permit_name:
return []
sql = """
SELECT
rtp.region_id,
r.name AS region_name,
rtp.theme_id,
t.name AS theme_name,
p.id AS permit_id,
p.name AS permit_name
FROM region_theme_permits rtp
JOIN permits p ON p.id = rtp.permit_id
JOIN regions r ON r.id = rtp.region_id
JOIN themes t ON t.id = rtp.theme_id
WHERE p.name = %s
ORDER BY r.name, t.name
"""
ordered: OrderedDict[Tuple[str, str], Dict[str, str]] = OrderedDict()
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(sql, (permit_name,))
for row in cur.fetchall():
region_id, region_name, theme_id, theme_name, permit_id, canonical_name = row
rid = str(region_id)
pid = str(permit_id)
key = (rid, pid)
if key in ordered:
continue
ordered[key] = {
"region_id": rid,
"region_name": str(region_name),
"theme_id": str(theme_id),
"theme_name": str(theme_name),
"permit_id": pid,
"permit_name": str(canonical_name),
}
return list(ordered.values())
def load_theme_payload(region_id: str, theme_id: str) -> Dict[str, object]:
"""Assemble full data bundle for a region-theme selection."""
info_sql = """
SELECT r.id, r.name, t.id, t.name
FROM regions r
JOIN region_themes rt ON rt.region_id = r.id
JOIN themes t ON t.id = rt.theme_id
WHERE r.id = %s AND t.id = %s
LIMIT 1
"""
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(info_sql, (region_id, theme_id))
row = cur.fetchone()
if not row:
raise ValueError("Region/theme combination not found")
region_uuid, region_name, theme_uuid, theme_name = row
permits = load_permits_and_risks(region_id, theme_id)
return {
"region": {"id": str(region_uuid), "name": str(region_name)},
"theme": {"id": str(theme_uuid), "name": str(theme_name)},
"permits": permits,
}