feat: add database-backed lawrisk v2 endpoint

This commit is contained in:
Codex Agent 2025-10-23 14:15:37 +08:00
parent d6d92fd966
commit cc1bda89a4
3 changed files with 345 additions and 6 deletions

65
app.py
View File

@ -4,9 +4,10 @@ import os
from flask import Flask, jsonify, request
from env_loader import load_env
from smart_cors_middleware import init_smart_cors
import time
from concurrent.futures import ThreadPoolExecutor
from smart_cors_middleware import init_smart_cors
from lawrisk_service import (
ensure_database,
ensure_schema,
@ -16,6 +17,7 @@ from lawrisk_service import (
suggest_questions_from_subjects,
suggest_questions_embed,
)
from lawrisk_v2_service import search_v2
def create_app() -> Flask:
@ -32,8 +34,7 @@ def create_app() -> Flask:
# Enable CORS using existing middleware
init_smart_cors(app)
@app.route("/fs-ai-asistant/api/workflow/lawrisk", methods=["POST", "GET"])
def lawrisk_search():
def _extract_params():
if request.method == "GET":
query = request.args.get("query") or request.args.get("q") or request.args.get("text")
debug_flag = request.args.get("debug") in {"1", "true", "yes", "on"}
@ -42,9 +43,8 @@ def create_app() -> Flask:
top_k_int = int(top_k) if top_k else 5
except Exception:
top_k_int = 5
mode = (request.args.get("mode") or "llm").lower()
mode_value = (request.args.get("mode") or "llm").lower()
else:
# Prefer x-www-form-urlencoded; fallback to JSON if provided
if request.is_json:
payload = request.get_json(silent=True) or {}
else:
@ -56,7 +56,12 @@ def create_app() -> Flask:
top_k_int = int(payload.get("top", 5))
except Exception:
top_k_int = 5
mode = str(payload.get("mode", "llm")).lower()
mode_value = str(payload.get("mode", "llm")).lower()
return query, debug_flag, top_k_int, mode_value
@app.route("/fs-ai-asistant/api/workflow/lawrisk", methods=["POST", "GET"])
def lawrisk_search():
query, debug_flag, top_k_int, mode = _extract_params()
if not query or not isinstance(query, str):
return jsonify({"error": "query is required"}), 400
@ -120,11 +125,59 @@ def create_app() -> Flask:
app.logger.exception("lawrisk_search error")
return jsonify({"success": False, "message": str(e), "data": {}}), 500
@app.route("/fs-ai-asistant/api/workflow/lawrisk/v2", methods=["POST", "GET"])
def lawrisk_search_v2():
query, debug_flag, top_k_int, _mode = _extract_params()
if not query or not isinstance(query, str):
return jsonify({"error": "query is required"}), 400
try:
t0 = time.time()
with ThreadPoolExecutor(max_workers=2) as ex:
fut_subject = ex.submit(search_v2, query, debug_flag)
fut_questions = ex.submit(suggest_questions_embed, query, max(1, top_k_int))
result_v2 = fut_subject.result()
rec_questions = fut_questions.result() or []
risk_subject = result_v2.get("risk_subject", []) if isinstance(result_v2, dict) else []
found = bool(risk_subject)
exec_time = int((time.time() - t0) * 1000)
data = {
"llmRespond": "" if found else "抱歉,无法检索到相关答案",
"lawRisk": "",
"questionExtend": rec_questions,
"conversationId": "",
"messageId": "",
"roundNumber": 0,
"conversationInfo": {},
"knowledgeSources": [],
"totalKnowledgeSources": 0,
"executionTime": exec_time,
"workflowStatus": "ok" if found else "no_match",
"executionSteps": [],
"costStatistics": {},
"workflowTrackingId": "",
"risk_subject": risk_subject,
"debug": result_v2.get("debug", {}) if (debug_flag and isinstance(result_v2, dict)) else {},
}
resp = {"success": True, "message": "OK", "data": data}
return jsonify(resp)
except Exception as e:
app.logger.exception("lawrisk_search_v2 error")
return jsonify({"success": False, "message": str(e), "data": {}}), 500
# Basic health check
@app.get("/healthz")
def healthz():
return jsonify({"status": "ok"})
app.logger.setLevel("INFO")
app.logger.info("Registered routes:")
for rule in sorted(app.url_map.iter_rules(), key=lambda r: r.rule):
methods = ",".join(sorted(rule.methods - {"HEAD", "OPTIONS"}))
app.logger.info(" %s -> %s", rule.rule, methods)
return app

137
lawrisk_v2_service.py Normal file
View File

@ -0,0 +1,137 @@
from __future__ import annotations
import json
from typing import Any, Dict, List
from licensing_repo import (
list_region_theme_options,
load_theme_payload,
)
from lawrisk_service import ChatClient
def _compose_prompt(payload: Dict[str, Any]) -> str:
"""Build a natural-language prompt snippet from structured payload."""
region = payload.get("region", {})
theme = payload.get("theme", {})
scopes = payload.get("business_scopes", [])
permits = payload.get("permits", [])
lines: List[str] = []
lines.append(f"地区:{region.get('name', '')}")
lines.append(f"主题事项:{theme.get('name', '')}")
if scopes:
scope_text = "".join(scope.get("description", "") for scope in scopes if scope.get("description"))
if scope_text:
lines.append(f"涉及经营范围:{scope_text}")
for permit in permits:
pname = permit.get("name", "")
lines.append(f"许可事项:{pname}")
risks = permit.get("risks", [])
for idx, risk in enumerate(risks, start=1):
detail_parts = []
if risk.get("risk_content"):
detail_parts.append(f"风险提示:{risk['risk_content']}")
if risk.get("legal_basis"):
detail_parts.append(f"法律依据:{risk['legal_basis']}")
if risk.get("document_no"):
detail_parts.append(f"文号:{risk['document_no']}")
if risk.get("summary"):
detail_parts.append(f"摘要:{risk['summary']}")
if detail_parts:
lines.append(f" 风险{idx}" + "".join(detail_parts))
return "\n".join(lines)
def _select_theme_options(query: str, catalog: List[Dict[str, str]]) -> List[str]:
"""Use LLM to choose relevant region-theme option ids."""
if not catalog:
return []
lines = [f"{item['option_id']}\t{item['display_name']}" for item in catalog]
options_block = "\n".join(lines)
system_msg = (
"你是政务事项检索助手。根据用户提供的问题,"
"从给定的地区-主题列表中选择最相关的主题事项,返回其 option_id。"
"输出 JSON 数组,例如: [\"region_uuid:theme_uuid\"]."
)
user_msg = (
f"用户问题: {query}\n\n"
"候选主题列表 (option_id<tab>地区·主题):\n"
f"{options_block}\n\n"
"请仅输出 JSON 数组,内容为选择的 option_id。如果没有匹配请输出 []."
)
chat = ChatClient()
content = chat.chat(
[
{"role": "system", "content": system_msg},
{"role": "user", "content": user_msg},
]
)
raw = content.strip()
start = raw.find("[")
end = raw.rfind("]")
if start != -1 and end != -1 and end > start:
snippet = raw[start : end + 1]
else:
snippet = raw
selected: List[str] = []
try:
data = json.loads(snippet)
if isinstance(data, list):
for item in data:
if isinstance(item, str):
selected.append(item)
elif isinstance(item, dict) and isinstance(item.get("id"), str):
selected.append(item["id"])
except Exception:
selected = []
known_ids = {item["option_id"] for item in catalog}
uniq: List[str] = []
for option_id in selected:
if option_id in known_ids and option_id not in uniq:
uniq.append(option_id)
return uniq
def search_v2(query: str, return_debug: bool = False) -> Dict[str, Any]:
catalog = list_region_theme_options()
selected_ids = _select_theme_options(query, catalog)
catalog_map = {item["option_id"]: item for item in catalog}
results: List[Dict[str, Any]] = []
debug_info: Dict[str, Any] = {}
for option_id in selected_ids:
item = catalog_map[option_id]
if ":" not in option_id:
continue
region_id, theme_id = option_id.split(":", 1)
payload = load_theme_payload(region_id, theme_id)
prompt_text = _compose_prompt(payload)
results.append(
{
"id": option_id,
"display_name": item["display_name"],
"region": payload["region"],
"theme": payload["theme"],
"business_scopes": payload["business_scopes"],
"permits": payload["permits"],
"prompt_snippet": prompt_text,
}
)
if return_debug:
debug_info = {
"catalog_size": len(catalog),
"selected_option_ids": selected_ids,
}
return {
"risk_subject": results,
"debug": debug_info if return_debug else {},
}

149
licensing_repo.py Normal file
View File

@ -0,0 +1,149 @@
from __future__ import annotations
import os
from typing import Dict, List, Tuple
import pg8000.dbapi as pg
# Separate configuration so legacy fs_law_risk integration keeps using PG_*
LIC_DEFAULT_DB = "licensing_risks"
def _lic_pg_conn(autocommit: bool = False) -> pg.Connection:
host = os.getenv("LIC_PG_HOST", "172.24.240.1")
port = int(os.getenv("LIC_PG_PORT", os.getenv("PG_PORT", "5432")))
user = os.getenv("LIC_PG_USER", os.getenv("PG_USER", "postgres"))
password = os.getenv("LIC_PG_PASSWORD", "")
database = os.getenv("LIC_PG_DATABASE", LIC_DEFAULT_DB)
conn = pg.connect(host=host, port=port, user=user, password=password, database=database)
conn.autocommit = autocommit
return conn
def list_region_theme_options() -> List[Dict[str, str]]:
"""Return all region-theme pairs usable for LLM selection."""
sql = """
SELECT
rt.region_id,
r.name AS region_name,
rt.theme_id,
t.name AS theme_name
FROM region_themes rt
JOIN regions r ON r.id = rt.region_id
JOIN themes t ON t.id = rt.theme_id
ORDER BY r.name, t.name
"""
out: List[Dict[str, str]] = []
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(sql)
for region_id, region_name, theme_id, theme_name in cur.fetchall():
rid = str(region_id)
tid = str(theme_id)
out.append(
{
"option_id": f"{rid}:{tid}",
"region_id": rid,
"region_name": str(region_name),
"theme_id": tid,
"theme_name": str(theme_name),
"display_name": f"{region_name} · {theme_name}",
}
)
return out
def load_business_scopes(region_id: str) -> List[Dict[str, str]]:
"""List business scopes bound to a region."""
sql = """
SELECT bs.id, bs.description
FROM region_scopes rs
JOIN business_scopes bs ON bs.id = rs.scope_id
WHERE rs.region_id = %s
ORDER BY bs.description
"""
scopes: List[Dict[str, str]] = []
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(sql, (region_id,))
for scope_id, description in cur.fetchall():
scopes.append({"id": str(scope_id), "description": str(description)})
return scopes
def load_permits_and_risks(region_id: str, theme_id: str) -> List[Dict[str, object]]:
"""Return permits with attached risk entries for a region-theme pair."""
sql = """
SELECT
p.id AS permit_id,
p.name AS permit_name,
rk.id AS risk_id,
rk.risk_content,
rk.legal_basis,
rk.document_no,
rk.summary
FROM region_theme_permits rtp
JOIN permits p ON p.id = rtp.permit_id
LEFT JOIN region_permit_risks rpr
ON rpr.region_id = rtp.region_id
AND rpr.permit_id = rtp.permit_id
LEFT JOIN risks rk ON rk.id = rpr.risk_id
WHERE rtp.region_id = %s AND rtp.theme_id = %s
ORDER BY p.name, rk.risk_content
"""
permits: Dict[str, Dict[str, object]] = {}
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(sql, (region_id, theme_id))
for row in cur.fetchall():
permit_id, permit_name, risk_id, risk_content, legal_basis, document_no, summary = row
pid = str(permit_id)
entry = permits.setdefault(
pid,
{
"id": pid,
"name": str(permit_name),
"risks": [],
},
)
if risk_id is None:
continue
entry["risks"].append(
{
"id": str(risk_id),
"risk_content": risk_content or "",
"legal_basis": legal_basis or "",
"document_no": document_no or "",
"summary": summary or "",
}
)
return list(permits.values())
def load_theme_payload(region_id: str, theme_id: str) -> Dict[str, object]:
"""Assemble full data bundle for a region-theme selection."""
info_sql = """
SELECT r.id, r.name, t.id, t.name
FROM regions r
JOIN region_themes rt ON rt.region_id = r.id
JOIN themes t ON t.id = rt.theme_id
WHERE r.id = %s AND t.id = %s
LIMIT 1
"""
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(info_sql, (region_id, theme_id))
row = cur.fetchone()
if not row:
raise ValueError("Region/theme combination not found")
region_uuid, region_name, theme_uuid, theme_name = row
scopes = load_business_scopes(region_id)
permits = load_permits_and_risks(region_id, theme_id)
return {
"region": {"id": str(region_uuid), "name": str(region_name)},
"theme": {"id": str(theme_uuid), "name": str(theme_name)},
"business_scopes": scopes,
"permits": permits,
}