fs-lawrisk/licensing_repo.py

258 lines
9.4 KiB
Python

from __future__ import annotations
import os
from typing import Dict, List, Optional, Set
import pg8000.dbapi as pg
# Separate configuration so legacy fs_law_risk integration keeps using PG_*
LIC_DEFAULT_DB = "licensing_risks"
def _lic_pg_conn(autocommit: bool = False) -> pg.Connection:
host = os.getenv("LIC_PG_HOST", "172.24.240.1")
port = int(os.getenv("LIC_PG_PORT", os.getenv("PG_PORT", "5432")))
user = os.getenv("LIC_PG_USER", os.getenv("PG_USER", "postgres"))
password = os.getenv("LIC_PG_PASSWORD", "")
database = os.getenv("LIC_PG_DATABASE", LIC_DEFAULT_DB)
conn = pg.connect(host=host, port=port, user=user, password=password, database=database)
conn.autocommit = autocommit
return conn
def list_region_theme_options() -> List[Dict[str, str]]:
"""Return all region-theme pairs usable for LLM selection."""
sql = """
SELECT
rt.region_id,
r.name AS region_name,
rt.theme_id,
t.name AS theme_name
FROM region_themes rt
JOIN regions r ON r.id = rt.region_id
JOIN themes t ON t.id = rt.theme_id
ORDER BY r.name, t.name
"""
out: List[Dict[str, str]] = []
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(sql)
for region_id, region_name, theme_id, theme_name in cur.fetchall():
rid = str(region_id)
tid = str(theme_id)
out.append(
{
"option_id": f"{rid}:{tid}",
"region_id": rid,
"region_name": str(region_name),
"theme_id": tid,
"theme_name": str(theme_name),
"display_name": f"{region_name} · {theme_name}",
}
)
return out
def load_business_scopes(region_id: str) -> List[Dict[str, str]]:
"""List business scopes bound to a region."""
sql = """
SELECT bs.id, bs.description
FROM region_scopes rs
JOIN business_scopes bs ON bs.id = rs.scope_id
WHERE rs.region_id = %s
ORDER BY bs.description
"""
scopes: List[Dict[str, str]] = []
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(sql, (region_id,))
for scope_id, description in cur.fetchall():
scopes.append({"id": str(scope_id), "description": str(description)})
return scopes
def _load_permit_scopes_for_region(
conn: pg.Connection, region_id: str, permit_ids: List[str]
) -> Dict[str, List[Dict[str, str]]]:
"""Return mapping of permit_id -> business scopes for that permit within region."""
scope_map: Dict[str, List[Dict[str, str]]] = {pid: [] for pid in permit_ids}
if not permit_ids:
return scope_map
sql = """
SELECT rps.permit_id, bs.id, bs.description
FROM region_permit_scopes rps
JOIN business_scopes bs ON bs.id = rps.scope_id
WHERE rps.region_id = %s
ORDER BY rps.permit_id, bs.description
"""
cur = conn.cursor()
try:
cur.execute(sql, (region_id,))
except pg.ProgrammingError as exc:
# 42P01 => undefined_table; allow fallback when migration not yet applied.
sqlstate = getattr(exc, "sqlstate", "")
if sqlstate == "42P01":
return scope_map
raise
for permit_id, scope_id, description in cur.fetchall():
pid = str(permit_id)
if pid not in scope_map:
continue
scope_map[pid].append({"id": str(scope_id), "description": str(description)})
return scope_map
def _clean_text(value: Optional[str]) -> Optional[str]:
if value is None:
return None
txt = str(value).strip()
return txt or None
def load_permits_and_risks(region_id: str, theme_id: str) -> List[Dict[str, object]]:
"""Return permits with attached risk entries for a region-theme pair."""
sql = """
SELECT
p.id AS permit_id,
p.name AS permit_name,
rk.id AS risk_id,
rk.risk_content,
rk.legal_basis,
rk.document_no,
rk.summary,
rpd.permit_status,
rpd.subitem_summary,
rpd.responsible_contact,
rpd.jurisdiction_scope,
psi.description AS subitem_desc
FROM region_theme_permits rtp
JOIN permits p ON p.id = rtp.permit_id
LEFT JOIN region_permit_details rpd
ON rpd.region_id = rtp.region_id
AND rpd.permit_id = rtp.permit_id
LEFT JOIN region_permit_subitems rpsi
ON rpsi.region_id = rtp.region_id
AND rpsi.permit_id = rtp.permit_id
LEFT JOIN permit_subitems psi ON psi.id = rpsi.subitem_id
LEFT JOIN region_permit_risks rpr
ON rpr.region_id = rtp.region_id
AND rpr.permit_id = rtp.permit_id
LEFT JOIN risks rk ON rk.id = rpr.risk_id
WHERE rtp.region_id = %s AND rtp.theme_id = %s
ORDER BY p.name, rk.risk_content
"""
permits: Dict[str, Dict[str, object]] = {}
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(sql, (region_id, theme_id))
for row in cur.fetchall():
(
permit_id,
permit_name,
risk_id,
risk_content,
legal_basis,
document_no,
summary,
permit_status,
subitem_summary,
responsible_contact,
jurisdiction_scope,
subitem_desc,
) = row
pid = str(permit_id)
entry = permits.setdefault(
pid,
{
"id": pid,
"name": str(permit_name),
"business_scopes": [],
"risks": [],
"scopes": [],
"risk_count": 0,
"permit_status": None,
"subitem_summary": None,
"subitems": [],
"responsible_contact": None,
"jurisdiction_scope": None,
"_risk_ids": set(),
"_subitems": set(),
},
)
if permit_status is not None:
entry["permit_status"] = _clean_text(permit_status)
if subitem_summary is not None:
entry["subitem_summary"] = _clean_text(subitem_summary)
if responsible_contact is not None:
entry["responsible_contact"] = _clean_text(responsible_contact)
if jurisdiction_scope is not None:
entry["jurisdiction_scope"] = _clean_text(jurisdiction_scope)
if subitem_desc:
cleaned = _clean_text(subitem_desc)
if cleaned:
entry["_subitems"].add(cleaned)
if risk_id is not None:
rid = str(risk_id)
risk_ids: Set[str] = entry["_risk_ids"]
if rid in risk_ids:
continue
risk_ids.add(rid)
entry["risks"].append(
{
"id": rid,
"risk_content": _clean_text(risk_content) or "",
"legal_basis": _clean_text(legal_basis) or "",
"document_no": _clean_text(document_no) or "",
"summary": _clean_text(summary) or "",
}
)
permit_ids = list(permits.keys())
scope_map = _load_permit_scopes_for_region(conn, region_id, permit_ids)
for pid in permit_ids:
entry = permits[pid]
entry["business_scopes"] = scope_map.get(pid, [])
entry["scopes"] = [
scope.get("description", "")
for scope in entry["business_scopes"]
if scope.get("description")
]
subitems_set: Set[str] = entry.pop("_subitems", set())
entry["subitems"] = sorted(subitems_set)
risk_ids: Set[str] = entry.pop("_risk_ids", set())
entry["risk_count"] = len(risk_ids)
# Ensure optional text fields default to None if empty strings slipped through
entry["permit_status"] = _clean_text(entry.get("permit_status"))
entry["subitem_summary"] = _clean_text(entry.get("subitem_summary"))
entry["responsible_contact"] = _clean_text(entry.get("responsible_contact"))
entry["jurisdiction_scope"] = _clean_text(entry.get("jurisdiction_scope"))
return list(permits.values())
def load_theme_payload(region_id: str, theme_id: str) -> Dict[str, object]:
"""Assemble full data bundle for a region-theme selection."""
info_sql = """
SELECT r.id, r.name, t.id, t.name
FROM regions r
JOIN region_themes rt ON rt.region_id = r.id
JOIN themes t ON t.id = rt.theme_id
WHERE r.id = %s AND t.id = %s
LIMIT 1
"""
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(info_sql, (region_id, theme_id))
row = cur.fetchone()
if not row:
raise ValueError("Region/theme combination not found")
region_uuid, region_name, theme_uuid, theme_name = row
permits = load_permits_and_risks(region_id, theme_id)
return {
"region": {"id": str(region_uuid), "name": str(region_name)},
"theme": {"id": str(theme_uuid), "name": str(theme_name)},
"permits": permits,
}