405 lines
14 KiB
Python
405 lines
14 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import random
|
||
from typing import Any, Dict, List
|
||
|
||
from lawrisk.services.licensing_repo import (
|
||
list_region_theme_options,
|
||
load_theme_payload,
|
||
load_permits_and_risks,
|
||
find_permit_contexts_by_name,
|
||
_ensure_v2_visibility_column,
|
||
)
|
||
from lawrisk.services.lawrisk_service import ChatClient
|
||
|
||
|
||
logger = logging.getLogger(__name__)
|
||
if not logger.handlers:
|
||
handler = logging.StreamHandler()
|
||
handler.setFormatter(logging.Formatter("[lawrisk_v2] %(levelname)s %(message)s"))
|
||
logger.addHandler(handler)
|
||
logger.setLevel(logging.INFO)
|
||
logger.propagate = False
|
||
|
||
|
||
def _compose_prompt(payload: Dict[str, Any]) -> str:
|
||
"""Build a natural-language prompt snippet from structured payload."""
|
||
region = payload.get("region", {})
|
||
theme = payload.get("theme", {})
|
||
permits = payload.get("permits", [])
|
||
|
||
lines: List[str] = []
|
||
lines.append(f"地区:{region.get('name', '')}")
|
||
lines.append(f"主题事项:{theme.get('name', '')}")
|
||
|
||
for permit in permits:
|
||
pname = permit.get("name", "")
|
||
lines.append(f"许可事项:{pname}")
|
||
if permit.get("contact_info"):
|
||
lines.append(f" 联系方式:{permit['contact_info']}")
|
||
permit_scopes = permit.get("business_scopes", [])
|
||
if permit_scopes:
|
||
scope_text = ";".join(
|
||
scope.get("description", "") for scope in permit_scopes if scope.get("description")
|
||
)
|
||
if scope_text:
|
||
lines.append(f" 经营范围:{scope_text}")
|
||
risks = permit.get("risks", [])
|
||
for idx, risk in enumerate(risks, start=1):
|
||
detail_parts = []
|
||
if risk.get("risk_content"):
|
||
detail_parts.append(f"风险提示:{risk['risk_content']}")
|
||
if risk.get("legal_basis"):
|
||
detail_parts.append(f"法律依据:{risk['legal_basis']}")
|
||
if risk.get("document_no"):
|
||
detail_parts.append(f"文号:{risk['document_no']}")
|
||
if risk.get("summary"):
|
||
detail_parts.append(f"摘要:{risk['summary']}")
|
||
if risk.get("remark"):
|
||
detail_parts.append(f"备注:{risk['remark']}")
|
||
if detail_parts:
|
||
label = risk.get("serial_number") or str(idx)
|
||
lines.append(f" 风险{label}:" + ";".join(detail_parts))
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _normalize_region_filter(region_filter: Any) -> Dict[str, Any] | None:
|
||
"""Normalize region filter input into comparable tokens."""
|
||
if region_filter is None:
|
||
return None
|
||
|
||
tokens: List[str] = []
|
||
|
||
def _consume(value: Any) -> None:
|
||
if value is None:
|
||
return
|
||
if isinstance(value, (list, tuple, set, frozenset)):
|
||
for item in value:
|
||
_consume(item)
|
||
return
|
||
if isinstance(value, str):
|
||
normalized = (
|
||
value.replace(",", ",")
|
||
.replace(";", ",")
|
||
.replace(";", ",")
|
||
.replace("|", ",")
|
||
)
|
||
parts = [part.strip() for part in normalized.split(",")]
|
||
tokens.extend(part for part in parts if part)
|
||
return
|
||
text = str(value).strip()
|
||
if text:
|
||
tokens.append(text)
|
||
|
||
_consume(region_filter)
|
||
stripped = [token for token in tokens if token]
|
||
if not stripped:
|
||
return None
|
||
|
||
lower_tokens = {token.lower() for token in stripped}
|
||
return {
|
||
"raw": stripped,
|
||
"ids": set(stripped),
|
||
"lower": lower_tokens,
|
||
}
|
||
|
||
|
||
def _select_theme_options(query: str, catalog: List[Dict[str, str]]) -> List[str]:
|
||
"""Use LLM to choose relevant region-theme option ids."""
|
||
if not catalog:
|
||
return []
|
||
display_entries = [item["display_name"] for item in catalog]
|
||
options_block = "\n".join(display_entries)
|
||
|
||
system_msg = (
|
||
"你是政务事项检索助手。根据用户提供的问题,"
|
||
"从给定的地区-主题列表中选择最相关的主题事项,返回对应的地区·主题名称。"
|
||
"输出 JSON 数组,例如: [\"市级 · 开办电影院\"]."
|
||
)
|
||
user_msg = (
|
||
f"用户问题: {query}\n\n"
|
||
"候选主题列表:\n"
|
||
f"{options_block}\n\n"
|
||
"请仅输出 JSON 数组,内容为选择的地区·主题名称。如果没有匹配,请输出 []."
|
||
)
|
||
logger.info(
|
||
"[lawrisk_v2] LLM selection request | query=%s | catalog_size=%d",
|
||
query,
|
||
len(catalog),
|
||
)
|
||
logger.info("[lawrisk_v2] LLM system prompt: %s", system_msg)
|
||
logger.info("[lawrisk_v2] LLM user prompt: %s", user_msg)
|
||
chat = ChatClient()
|
||
content = chat.chat(
|
||
[
|
||
{"role": "system", "content": system_msg},
|
||
{"role": "user", "content": user_msg},
|
||
]
|
||
)
|
||
|
||
raw = content.strip()
|
||
logger.info("[lawrisk_v2] LLM raw response: %s", raw)
|
||
start = raw.find("[")
|
||
end = raw.rfind("]")
|
||
if start != -1 and end != -1 and end > start:
|
||
snippet = raw[start : end + 1]
|
||
else:
|
||
snippet = raw
|
||
|
||
selected: List[str] = []
|
||
try:
|
||
data = json.loads(snippet)
|
||
if isinstance(data, list):
|
||
for item in data:
|
||
if isinstance(item, str):
|
||
selected.append(item)
|
||
elif isinstance(item, dict) and isinstance(item.get("id"), str):
|
||
selected.append(item["id"])
|
||
except Exception:
|
||
selected = []
|
||
|
||
display_to_option = {item["display_name"]: item["option_id"] for item in catalog}
|
||
uniq: List[str] = []
|
||
for display_name in selected:
|
||
option_id = display_to_option.get(display_name)
|
||
if option_id and option_id not in uniq:
|
||
uniq.append(option_id)
|
||
logger.info("[lawrisk_v2] LLM mapped option_ids: %s", uniq)
|
||
return uniq
|
||
|
||
|
||
def list_regions() -> List[Dict[str, str]]:
|
||
"""Return unique regions from the catalog."""
|
||
catalog = list_region_theme_options()
|
||
seen_ids = set()
|
||
regions: List[Dict[str, str]] = []
|
||
for item in catalog:
|
||
region_id = item["region_id"]
|
||
if region_id in seen_ids:
|
||
continue
|
||
seen_ids.add(region_id)
|
||
regions.append({"id": region_id, "name": item["region_name"]})
|
||
return regions
|
||
|
||
|
||
# Preset question templates - uses theme names from database
|
||
# Format: template string where {theme} will be replaced with actual theme name
|
||
_QUESTION_TEMPLATES = [
|
||
"{theme}需要办理哪些许可?",
|
||
"{theme}有什么法律风险需要注意?",
|
||
"关于{theme}有什么政策要求?",
|
||
|
||
]
|
||
|
||
|
||
def _get_preset_questions_pool() -> List[str]:
|
||
"""Build a pool of preset questions based on database themes that have permits.
|
||
|
||
Only includes themes that have at least one permit associated via region_theme_permits.
|
||
"""
|
||
from lawrisk.services.licensing_repo import _lic_pg_conn
|
||
|
||
# Query themes that have at least one visible permit
|
||
sql = """
|
||
SELECT DISTINCT t.name AS theme_name
|
||
FROM themes t
|
||
JOIN region_theme_permits rtp ON rtp.theme_id = t.id
|
||
JOIN region_permit_details rpd ON rpd.region_id = rtp.region_id AND rpd.permit_id = rtp.permit_id
|
||
WHERE t.name NOT IN ('不涉及', '无', '所有主题事项')
|
||
AND COALESCE(rpd.is_v2_visible, true) = true
|
||
ORDER BY t.name
|
||
"""
|
||
|
||
questions: List[str] = []
|
||
try:
|
||
with _lic_pg_conn() as conn:
|
||
_ensure_v2_visibility_column(conn)
|
||
cur = conn.cursor()
|
||
cur.execute(sql)
|
||
for (theme_name,) in cur.fetchall():
|
||
theme_name = str(theme_name).strip()
|
||
if theme_name:
|
||
# Generate questions for this theme using templates
|
||
for template in _QUESTION_TEMPLATES:
|
||
questions.append(template.format(theme=theme_name))
|
||
except Exception as e:
|
||
logger.warning(f"Failed to load preset questions: {e}")
|
||
return []
|
||
|
||
return questions
|
||
|
||
|
||
def suggest_related_questions(query: str, search_results: Dict[str, Any], max_q: int = 5) -> List[str]:
|
||
"""Return preset question recommendations based on database themes.
|
||
|
||
These preset questions are generated from actual theme names in the database,
|
||
ensuring the V2 API can answer all recommended questions.
|
||
"""
|
||
# Get all preset questions from the pool
|
||
questions_pool = _get_preset_questions_pool()
|
||
|
||
if not questions_pool:
|
||
return []
|
||
|
||
# Shuffle and pick random questions
|
||
shuffled = questions_pool.copy()
|
||
random.shuffle(shuffled)
|
||
|
||
# Deduplicate and limit
|
||
seen = set()
|
||
result = []
|
||
for q in shuffled:
|
||
if q not in seen and len(result) < max_q:
|
||
seen.add(q)
|
||
result.append(q)
|
||
|
||
return result
|
||
|
||
|
||
def search_v2(
|
||
query: str, return_debug: bool = False, region_filter: Any = None
|
||
) -> Dict[str, Any]:
|
||
catalog_full = list_region_theme_options()
|
||
debug_info: Dict[str, Any] = {"catalog_total": len(catalog_full)}
|
||
|
||
filters = _normalize_region_filter(region_filter)
|
||
if filters:
|
||
filtered_catalog: List[Dict[str, str]] = []
|
||
for item in catalog_full:
|
||
region_id = item["region_id"]
|
||
region_name = item["region_name"].strip()
|
||
display_name = item["display_name"].strip()
|
||
region_id_lower = region_id.lower()
|
||
region_name_lower = region_name.lower()
|
||
display_lower = display_name.lower()
|
||
|
||
if (
|
||
region_id in filters["ids"]
|
||
or region_id_lower in filters["lower"]
|
||
or region_name_lower in filters["lower"]
|
||
or display_lower in filters["lower"]
|
||
):
|
||
filtered_catalog.append(item)
|
||
logger.info(
|
||
"[lawrisk_v2] Region filter applied | input=%s | kept=%d/%d",
|
||
filters["raw"],
|
||
len(filtered_catalog),
|
||
len(catalog_full),
|
||
)
|
||
debug_info["region_filter"] = filters["raw"]
|
||
debug_info["catalog_after_region_filter"] = len(filtered_catalog)
|
||
catalog = filtered_catalog
|
||
else:
|
||
catalog = catalog_full
|
||
|
||
allowed_region_ids = {item["region_id"] for item in catalog}
|
||
query_exact = query.strip()
|
||
direct_contexts: List[Dict[str, str]] = []
|
||
if query_exact:
|
||
contexts = find_permit_contexts_by_name(query_exact)
|
||
if contexts:
|
||
if filters:
|
||
contexts = [ctx for ctx in contexts if ctx["region_id"] in allowed_region_ids]
|
||
direct_contexts = contexts
|
||
|
||
if direct_contexts:
|
||
logger.info(
|
||
"[lawrisk_v2] Direct permit match | query=%s | contexts=%d",
|
||
query_exact,
|
||
len(direct_contexts),
|
||
)
|
||
direct_results: List[Dict[str, Any]] = []
|
||
used_contexts: List[Dict[str, str]] = []
|
||
for ctx in direct_contexts:
|
||
permits = load_permits_and_risks(
|
||
ctx["region_id"],
|
||
ctx["theme_id"],
|
||
permit_id=ctx["permit_id"],
|
||
only_visible=True,
|
||
)
|
||
if not permits:
|
||
continue
|
||
|
||
# Sanitize permits for V2 API
|
||
for permit in permits:
|
||
if "responsible_contact" in permit:
|
||
del permit["responsible_contact"]
|
||
|
||
direct_results.append(
|
||
{
|
||
"id": "",
|
||
"display_name": "",
|
||
"region": {"id": ctx["region_id"], "name": ctx["region_name"]},
|
||
"theme": {"id": ctx.get("theme_id", ""), "name": ctx.get("theme_name", "")},
|
||
"permits": permits,
|
||
}
|
||
)
|
||
used_contexts.append(ctx)
|
||
|
||
if direct_results:
|
||
if return_debug:
|
||
debug_payload = {
|
||
**debug_info,
|
||
"strategy": "permit_exact",
|
||
"matched_permit_name": query_exact,
|
||
"matched_permit_ids": [ctx["permit_id"] for ctx in used_contexts],
|
||
}
|
||
else:
|
||
debug_payload = {}
|
||
return {
|
||
"risk_subject": direct_results,
|
||
"debug": debug_payload,
|
||
}
|
||
|
||
if not catalog:
|
||
logger.info(
|
||
"[lawrisk_v2] No catalog entries remain after region filter; skipping selection."
|
||
)
|
||
return {
|
||
"risk_subject": [],
|
||
"debug": debug_info if return_debug else {},
|
||
}
|
||
|
||
selected_ids = _select_theme_options(query, catalog)
|
||
|
||
catalog_map = {item["option_id"]: item for item in catalog}
|
||
results: List[Dict[str, Any]] = []
|
||
debug_info["catalog_used_for_llm"] = len(catalog)
|
||
|
||
for option_id in selected_ids:
|
||
item = catalog_map[option_id]
|
||
if ":" not in option_id:
|
||
continue
|
||
region_id, theme_id = option_id.split(":", 1)
|
||
payload = load_theme_payload(region_id, theme_id, only_visible=True)
|
||
|
||
# Sanitize permits for V2 API (V2 should only expose external contact info)
|
||
for permit in payload.get("permits", []):
|
||
if "responsible_contact" in permit:
|
||
del permit["responsible_contact"]
|
||
|
||
results.append(
|
||
{
|
||
"id": option_id,
|
||
"display_name": item["display_name"],
|
||
"region": payload["region"],
|
||
"theme": payload["theme"],
|
||
"permits": payload["permits"],
|
||
}
|
||
)
|
||
|
||
if return_debug:
|
||
debug_info = {
|
||
**debug_info,
|
||
"selected_option_ids": selected_ids,
|
||
}
|
||
else:
|
||
debug_info = {}
|
||
|
||
return {
|
||
"risk_subject": results,
|
||
"debug": debug_info,
|
||
}
|