From cc1bda89a467a0f2c7cab0e37cc3c40f7e7e1fc6 Mon Sep 17 00:00:00 2001 From: Codex Agent Date: Thu, 23 Oct 2025 14:15:37 +0800 Subject: [PATCH] feat: add database-backed lawrisk v2 endpoint --- app.py | 65 ++++++++++++++++-- lawrisk_v2_service.py | 137 ++++++++++++++++++++++++++++++++++++++ licensing_repo.py | 149 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 345 insertions(+), 6 deletions(-) create mode 100644 lawrisk_v2_service.py create mode 100644 licensing_repo.py diff --git a/app.py b/app.py index d57c378..b6e5b1d 100644 --- a/app.py +++ b/app.py @@ -4,9 +4,10 @@ import os from flask import Flask, jsonify, request from env_loader import load_env -from smart_cors_middleware import init_smart_cors import time from concurrent.futures import ThreadPoolExecutor +from smart_cors_middleware import init_smart_cors + from lawrisk_service import ( ensure_database, ensure_schema, @@ -16,6 +17,7 @@ from lawrisk_service import ( suggest_questions_from_subjects, suggest_questions_embed, ) +from lawrisk_v2_service import search_v2 def create_app() -> Flask: @@ -32,8 +34,7 @@ def create_app() -> Flask: # Enable CORS using existing middleware init_smart_cors(app) - @app.route("/fs-ai-asistant/api/workflow/lawrisk", methods=["POST", "GET"]) - def lawrisk_search(): + def _extract_params(): if request.method == "GET": query = request.args.get("query") or request.args.get("q") or request.args.get("text") debug_flag = request.args.get("debug") in {"1", "true", "yes", "on"} @@ -42,9 +43,8 @@ def create_app() -> Flask: top_k_int = int(top_k) if top_k else 5 except Exception: top_k_int = 5 - mode = (request.args.get("mode") or "llm").lower() + mode_value = (request.args.get("mode") or "llm").lower() else: - # Prefer x-www-form-urlencoded; fallback to JSON if provided if request.is_json: payload = request.get_json(silent=True) or {} else: @@ -56,7 +56,12 @@ def create_app() -> Flask: top_k_int = int(payload.get("top", 5)) except Exception: top_k_int = 5 - mode = str(payload.get("mode", "llm")).lower() + mode_value = str(payload.get("mode", "llm")).lower() + return query, debug_flag, top_k_int, mode_value + + @app.route("/fs-ai-asistant/api/workflow/lawrisk", methods=["POST", "GET"]) + def lawrisk_search(): + query, debug_flag, top_k_int, mode = _extract_params() if not query or not isinstance(query, str): return jsonify({"error": "query is required"}), 400 @@ -120,11 +125,59 @@ def create_app() -> Flask: app.logger.exception("lawrisk_search error") return jsonify({"success": False, "message": str(e), "data": {}}), 500 + @app.route("/fs-ai-asistant/api/workflow/lawrisk/v2", methods=["POST", "GET"]) + def lawrisk_search_v2(): + query, debug_flag, top_k_int, _mode = _extract_params() + if not query or not isinstance(query, str): + return jsonify({"error": "query is required"}), 400 + try: + t0 = time.time() + with ThreadPoolExecutor(max_workers=2) as ex: + fut_subject = ex.submit(search_v2, query, debug_flag) + fut_questions = ex.submit(suggest_questions_embed, query, max(1, top_k_int)) + + result_v2 = fut_subject.result() + rec_questions = fut_questions.result() or [] + + risk_subject = result_v2.get("risk_subject", []) if isinstance(result_v2, dict) else [] + found = bool(risk_subject) + exec_time = int((time.time() - t0) * 1000) + + data = { + "llmRespond": "" if found else "抱歉,无法检索到相关答案", + "lawRisk": "", + "questionExtend": rec_questions, + "conversationId": "", + "messageId": "", + "roundNumber": 0, + "conversationInfo": {}, + "knowledgeSources": [], + "totalKnowledgeSources": 0, + "executionTime": exec_time, + "workflowStatus": "ok" if found else "no_match", + "executionSteps": [], + "costStatistics": {}, + "workflowTrackingId": "", + "risk_subject": risk_subject, + "debug": result_v2.get("debug", {}) if (debug_flag and isinstance(result_v2, dict)) else {}, + } + resp = {"success": True, "message": "OK", "data": data} + return jsonify(resp) + except Exception as e: + app.logger.exception("lawrisk_search_v2 error") + return jsonify({"success": False, "message": str(e), "data": {}}), 500 + # Basic health check @app.get("/healthz") def healthz(): return jsonify({"status": "ok"}) + app.logger.setLevel("INFO") + app.logger.info("Registered routes:") + for rule in sorted(app.url_map.iter_rules(), key=lambda r: r.rule): + methods = ",".join(sorted(rule.methods - {"HEAD", "OPTIONS"})) + app.logger.info(" %s -> %s", rule.rule, methods) + return app diff --git a/lawrisk_v2_service.py b/lawrisk_v2_service.py new file mode 100644 index 0000000..2153db1 --- /dev/null +++ b/lawrisk_v2_service.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +import json +from typing import Any, Dict, List + +from licensing_repo import ( + list_region_theme_options, + load_theme_payload, +) +from lawrisk_service import ChatClient + + +def _compose_prompt(payload: Dict[str, Any]) -> str: + """Build a natural-language prompt snippet from structured payload.""" + region = payload.get("region", {}) + theme = payload.get("theme", {}) + scopes = payload.get("business_scopes", []) + permits = payload.get("permits", []) + + lines: List[str] = [] + lines.append(f"地区:{region.get('name', '')}") + lines.append(f"主题事项:{theme.get('name', '')}") + if scopes: + scope_text = ";".join(scope.get("description", "") for scope in scopes if scope.get("description")) + if scope_text: + lines.append(f"涉及经营范围:{scope_text}") + + for permit in permits: + pname = permit.get("name", "") + lines.append(f"许可事项:{pname}") + risks = permit.get("risks", []) + for idx, risk in enumerate(risks, start=1): + detail_parts = [] + if risk.get("risk_content"): + detail_parts.append(f"风险提示:{risk['risk_content']}") + if risk.get("legal_basis"): + detail_parts.append(f"法律依据:{risk['legal_basis']}") + if risk.get("document_no"): + detail_parts.append(f"文号:{risk['document_no']}") + if risk.get("summary"): + detail_parts.append(f"摘要:{risk['summary']}") + if detail_parts: + lines.append(f" 风险{idx}:" + ";".join(detail_parts)) + return "\n".join(lines) + + +def _select_theme_options(query: str, catalog: List[Dict[str, str]]) -> List[str]: + """Use LLM to choose relevant region-theme option ids.""" + if not catalog: + return [] + lines = [f"{item['option_id']}\t{item['display_name']}" for item in catalog] + options_block = "\n".join(lines) + + system_msg = ( + "你是政务事项检索助手。根据用户提供的问题," + "从给定的地区-主题列表中选择最相关的主题事项,返回其 option_id。" + "输出 JSON 数组,例如: [\"region_uuid:theme_uuid\"]." + ) + user_msg = ( + f"用户问题: {query}\n\n" + "候选主题列表 (option_id地区·主题):\n" + f"{options_block}\n\n" + "请仅输出 JSON 数组,内容为选择的 option_id。如果没有匹配,请输出 []." + ) + chat = ChatClient() + content = chat.chat( + [ + {"role": "system", "content": system_msg}, + {"role": "user", "content": user_msg}, + ] + ) + + raw = content.strip() + start = raw.find("[") + end = raw.rfind("]") + if start != -1 and end != -1 and end > start: + snippet = raw[start : end + 1] + else: + snippet = raw + + selected: List[str] = [] + try: + data = json.loads(snippet) + if isinstance(data, list): + for item in data: + if isinstance(item, str): + selected.append(item) + elif isinstance(item, dict) and isinstance(item.get("id"), str): + selected.append(item["id"]) + except Exception: + selected = [] + + known_ids = {item["option_id"] for item in catalog} + uniq: List[str] = [] + for option_id in selected: + if option_id in known_ids and option_id not in uniq: + uniq.append(option_id) + return uniq + + +def search_v2(query: str, return_debug: bool = False) -> Dict[str, Any]: + catalog = list_region_theme_options() + selected_ids = _select_theme_options(query, catalog) + + catalog_map = {item["option_id"]: item for item in catalog} + results: List[Dict[str, Any]] = [] + debug_info: Dict[str, Any] = {} + + for option_id in selected_ids: + item = catalog_map[option_id] + if ":" not in option_id: + continue + region_id, theme_id = option_id.split(":", 1) + payload = load_theme_payload(region_id, theme_id) + prompt_text = _compose_prompt(payload) + results.append( + { + "id": option_id, + "display_name": item["display_name"], + "region": payload["region"], + "theme": payload["theme"], + "business_scopes": payload["business_scopes"], + "permits": payload["permits"], + "prompt_snippet": prompt_text, + } + ) + + if return_debug: + debug_info = { + "catalog_size": len(catalog), + "selected_option_ids": selected_ids, + } + + return { + "risk_subject": results, + "debug": debug_info if return_debug else {}, + } diff --git a/licensing_repo.py b/licensing_repo.py new file mode 100644 index 0000000..7572c8b --- /dev/null +++ b/licensing_repo.py @@ -0,0 +1,149 @@ +from __future__ import annotations + +import os +from typing import Dict, List, Tuple + +import pg8000.dbapi as pg + +# Separate configuration so legacy fs_law_risk integration keeps using PG_* +LIC_DEFAULT_DB = "licensing_risks" + + +def _lic_pg_conn(autocommit: bool = False) -> pg.Connection: + host = os.getenv("LIC_PG_HOST", "172.24.240.1") + port = int(os.getenv("LIC_PG_PORT", os.getenv("PG_PORT", "5432"))) + user = os.getenv("LIC_PG_USER", os.getenv("PG_USER", "postgres")) + password = os.getenv("LIC_PG_PASSWORD", "") + database = os.getenv("LIC_PG_DATABASE", LIC_DEFAULT_DB) + conn = pg.connect(host=host, port=port, user=user, password=password, database=database) + conn.autocommit = autocommit + return conn + + +def list_region_theme_options() -> List[Dict[str, str]]: + """Return all region-theme pairs usable for LLM selection.""" + sql = """ + SELECT + rt.region_id, + r.name AS region_name, + rt.theme_id, + t.name AS theme_name + FROM region_themes rt + JOIN regions r ON r.id = rt.region_id + JOIN themes t ON t.id = rt.theme_id + ORDER BY r.name, t.name + """ + out: List[Dict[str, str]] = [] + with _lic_pg_conn() as conn: + cur = conn.cursor() + cur.execute(sql) + for region_id, region_name, theme_id, theme_name in cur.fetchall(): + rid = str(region_id) + tid = str(theme_id) + out.append( + { + "option_id": f"{rid}:{tid}", + "region_id": rid, + "region_name": str(region_name), + "theme_id": tid, + "theme_name": str(theme_name), + "display_name": f"{region_name} · {theme_name}", + } + ) + return out + + +def load_business_scopes(region_id: str) -> List[Dict[str, str]]: + """List business scopes bound to a region.""" + sql = """ + SELECT bs.id, bs.description + FROM region_scopes rs + JOIN business_scopes bs ON bs.id = rs.scope_id + WHERE rs.region_id = %s + ORDER BY bs.description + """ + scopes: List[Dict[str, str]] = [] + with _lic_pg_conn() as conn: + cur = conn.cursor() + cur.execute(sql, (region_id,)) + for scope_id, description in cur.fetchall(): + scopes.append({"id": str(scope_id), "description": str(description)}) + return scopes + + +def load_permits_and_risks(region_id: str, theme_id: str) -> List[Dict[str, object]]: + """Return permits with attached risk entries for a region-theme pair.""" + sql = """ + SELECT + p.id AS permit_id, + p.name AS permit_name, + rk.id AS risk_id, + rk.risk_content, + rk.legal_basis, + rk.document_no, + rk.summary + FROM region_theme_permits rtp + JOIN permits p ON p.id = rtp.permit_id + LEFT JOIN region_permit_risks rpr + ON rpr.region_id = rtp.region_id + AND rpr.permit_id = rtp.permit_id + LEFT JOIN risks rk ON rk.id = rpr.risk_id + WHERE rtp.region_id = %s AND rtp.theme_id = %s + ORDER BY p.name, rk.risk_content + """ + permits: Dict[str, Dict[str, object]] = {} + with _lic_pg_conn() as conn: + cur = conn.cursor() + cur.execute(sql, (region_id, theme_id)) + for row in cur.fetchall(): + permit_id, permit_name, risk_id, risk_content, legal_basis, document_no, summary = row + pid = str(permit_id) + entry = permits.setdefault( + pid, + { + "id": pid, + "name": str(permit_name), + "risks": [], + }, + ) + if risk_id is None: + continue + entry["risks"].append( + { + "id": str(risk_id), + "risk_content": risk_content or "", + "legal_basis": legal_basis or "", + "document_no": document_no or "", + "summary": summary or "", + } + ) + return list(permits.values()) + + +def load_theme_payload(region_id: str, theme_id: str) -> Dict[str, object]: + """Assemble full data bundle for a region-theme selection.""" + info_sql = """ + SELECT r.id, r.name, t.id, t.name + FROM regions r + JOIN region_themes rt ON rt.region_id = r.id + JOIN themes t ON t.id = rt.theme_id + WHERE r.id = %s AND t.id = %s + LIMIT 1 + """ + with _lic_pg_conn() as conn: + cur = conn.cursor() + cur.execute(info_sql, (region_id, theme_id)) + row = cur.fetchone() + if not row: + raise ValueError("Region/theme combination not found") + region_uuid, region_name, theme_uuid, theme_name = row + + scopes = load_business_scopes(region_id) + permits = load_permits_and_risks(region_id, theme_id) + return { + "region": {"id": str(region_uuid), "name": str(region_name)}, + "theme": {"id": str(theme_uuid), "name": str(theme_name)}, + "business_scopes": scopes, + "permits": permits, + } +