fs-lawrisk/lawrisk/services/lawrisk_v2_service.py

400 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import logging
import random
from typing import Any, Dict, List
from lawrisk.services.licensing_repo import (
list_region_theme_options,
load_theme_payload,
load_permits_and_risks,
find_permit_contexts_by_name,
)
from lawrisk.services.lawrisk_service import ChatClient
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("[lawrisk_v2] %(levelname)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.propagate = False
def _compose_prompt(payload: Dict[str, Any]) -> str:
"""Build a natural-language prompt snippet from structured payload."""
region = payload.get("region", {})
theme = payload.get("theme", {})
permits = payload.get("permits", [])
lines: List[str] = []
lines.append(f"地区:{region.get('name', '')}")
lines.append(f"主题事项:{theme.get('name', '')}")
for permit in permits:
pname = permit.get("name", "")
lines.append(f"许可事项:{pname}")
if permit.get("contact_info"):
lines.append(f" 联系方式:{permit['contact_info']}")
permit_scopes = permit.get("business_scopes", [])
if permit_scopes:
scope_text = "".join(
scope.get("description", "") for scope in permit_scopes if scope.get("description")
)
if scope_text:
lines.append(f" 经营范围:{scope_text}")
risks = permit.get("risks", [])
for idx, risk in enumerate(risks, start=1):
detail_parts = []
if risk.get("risk_content"):
detail_parts.append(f"风险提示:{risk['risk_content']}")
if risk.get("legal_basis"):
detail_parts.append(f"法律依据:{risk['legal_basis']}")
if risk.get("document_no"):
detail_parts.append(f"文号:{risk['document_no']}")
if risk.get("summary"):
detail_parts.append(f"摘要:{risk['summary']}")
if risk.get("remark"):
detail_parts.append(f"备注:{risk['remark']}")
if detail_parts:
label = risk.get("serial_number") or str(idx)
lines.append(f" 风险{label}" + "".join(detail_parts))
return "\n".join(lines)
def _normalize_region_filter(region_filter: Any) -> Dict[str, Any] | None:
"""Normalize region filter input into comparable tokens."""
if region_filter is None:
return None
tokens: List[str] = []
def _consume(value: Any) -> None:
if value is None:
return
if isinstance(value, (list, tuple, set, frozenset)):
for item in value:
_consume(item)
return
if isinstance(value, str):
normalized = (
value.replace("", ",")
.replace("", ",")
.replace(";", ",")
.replace("|", ",")
)
parts = [part.strip() for part in normalized.split(",")]
tokens.extend(part for part in parts if part)
return
text = str(value).strip()
if text:
tokens.append(text)
_consume(region_filter)
stripped = [token for token in tokens if token]
if not stripped:
return None
lower_tokens = {token.lower() for token in stripped}
return {
"raw": stripped,
"ids": set(stripped),
"lower": lower_tokens,
}
def _select_theme_options(query: str, catalog: List[Dict[str, str]]) -> List[str]:
"""Use LLM to choose relevant region-theme option ids."""
if not catalog:
return []
display_entries = [item["display_name"] for item in catalog]
options_block = "\n".join(display_entries)
system_msg = (
"你是政务事项检索助手。根据用户提供的问题,"
"从给定的地区-主题列表中选择最相关的主题事项,返回对应的地区·主题名称。"
"输出 JSON 数组,例如: [\"市级 · 开办电影院\"]."
)
user_msg = (
f"用户问题: {query}\n\n"
"候选主题列表:\n"
f"{options_block}\n\n"
"请仅输出 JSON 数组,内容为选择的地区·主题名称。如果没有匹配,请输出 []."
)
logger.info(
"[lawrisk_v2] LLM selection request | query=%s | catalog_size=%d",
query,
len(catalog),
)
logger.info("[lawrisk_v2] LLM system prompt: %s", system_msg)
logger.info("[lawrisk_v2] LLM user prompt: %s", user_msg)
chat = ChatClient()
content = chat.chat(
[
{"role": "system", "content": system_msg},
{"role": "user", "content": user_msg},
]
)
raw = content.strip()
logger.info("[lawrisk_v2] LLM raw response: %s", raw)
start = raw.find("[")
end = raw.rfind("]")
if start != -1 and end != -1 and end > start:
snippet = raw[start : end + 1]
else:
snippet = raw
selected: List[str] = []
try:
data = json.loads(snippet)
if isinstance(data, list):
for item in data:
if isinstance(item, str):
selected.append(item)
elif isinstance(item, dict) and isinstance(item.get("id"), str):
selected.append(item["id"])
except Exception:
selected = []
display_to_option = {item["display_name"]: item["option_id"] for item in catalog}
uniq: List[str] = []
for display_name in selected:
option_id = display_to_option.get(display_name)
if option_id and option_id not in uniq:
uniq.append(option_id)
logger.info("[lawrisk_v2] LLM mapped option_ids: %s", uniq)
return uniq
def list_regions() -> List[Dict[str, str]]:
"""Return unique regions from the catalog."""
catalog = list_region_theme_options()
seen_ids = set()
regions: List[Dict[str, str]] = []
for item in catalog:
region_id = item["region_id"]
if region_id in seen_ids:
continue
seen_ids.add(region_id)
regions.append({"id": region_id, "name": item["region_name"]})
return regions
# Preset question templates - uses theme names from database
# Format: template string where {theme} will be replaced with actual theme name
_QUESTION_TEMPLATES = [
"{theme}需要办理哪些许可?",
"{theme}有什么法律风险需要注意?",
"关于{theme}有什么政策要求?",
]
def _get_preset_questions_pool() -> List[str]:
"""Build a pool of preset questions based on database themes that have permits.
Only includes themes that have at least one permit associated via region_theme_permits.
"""
from lawrisk.services.licensing_repo import _lic_pg_conn
# Query themes that have at least one permit
sql = """
SELECT DISTINCT t.name AS theme_name
FROM themes t
JOIN region_theme_permits rtp ON rtp.theme_id = t.id
WHERE t.name NOT IN ('不涉及', '', '所有主题事项')
ORDER BY t.name
"""
questions: List[str] = []
try:
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(sql)
for (theme_name,) in cur.fetchall():
theme_name = str(theme_name).strip()
if theme_name:
# Generate questions for this theme using templates
for template in _QUESTION_TEMPLATES:
questions.append(template.format(theme=theme_name))
except Exception as e:
logger.warning(f"Failed to load preset questions: {e}")
return []
return questions
def suggest_related_questions(query: str, search_results: Dict[str, Any], max_q: int = 5) -> List[str]:
"""Return preset question recommendations based on database themes.
These preset questions are generated from actual theme names in the database,
ensuring the V2 API can answer all recommended questions.
"""
# Get all preset questions from the pool
questions_pool = _get_preset_questions_pool()
if not questions_pool:
return []
# Shuffle and pick random questions
shuffled = questions_pool.copy()
random.shuffle(shuffled)
# Deduplicate and limit
seen = set()
result = []
for q in shuffled:
if q not in seen and len(result) < max_q:
seen.add(q)
result.append(q)
return result
def search_v2(
query: str, return_debug: bool = False, region_filter: Any = None
) -> Dict[str, Any]:
catalog_full = list_region_theme_options()
debug_info: Dict[str, Any] = {"catalog_total": len(catalog_full)}
filters = _normalize_region_filter(region_filter)
if filters:
filtered_catalog: List[Dict[str, str]] = []
for item in catalog_full:
region_id = item["region_id"]
region_name = item["region_name"].strip()
display_name = item["display_name"].strip()
region_id_lower = region_id.lower()
region_name_lower = region_name.lower()
display_lower = display_name.lower()
if (
region_id in filters["ids"]
or region_id_lower in filters["lower"]
or region_name_lower in filters["lower"]
or display_lower in filters["lower"]
):
filtered_catalog.append(item)
logger.info(
"[lawrisk_v2] Region filter applied | input=%s | kept=%d/%d",
filters["raw"],
len(filtered_catalog),
len(catalog_full),
)
debug_info["region_filter"] = filters["raw"]
debug_info["catalog_after_region_filter"] = len(filtered_catalog)
catalog = filtered_catalog
else:
catalog = catalog_full
allowed_region_ids = {item["region_id"] for item in catalog}
query_exact = query.strip()
direct_contexts: List[Dict[str, str]] = []
if query_exact:
contexts = find_permit_contexts_by_name(query_exact)
if contexts:
if filters:
contexts = [ctx for ctx in contexts if ctx["region_id"] in allowed_region_ids]
direct_contexts = contexts
if direct_contexts:
logger.info(
"[lawrisk_v2] Direct permit match | query=%s | contexts=%d",
query_exact,
len(direct_contexts),
)
direct_results: List[Dict[str, Any]] = []
used_contexts: List[Dict[str, str]] = []
for ctx in direct_contexts:
permits = load_permits_and_risks(
ctx["region_id"],
ctx["theme_id"],
permit_id=ctx["permit_id"],
)
if not permits:
continue
# Sanitize permits for V2 API
for permit in permits:
if "responsible_contact" in permit:
del permit["responsible_contact"]
direct_results.append(
{
"id": "",
"display_name": "",
"region": {"id": ctx["region_id"], "name": ctx["region_name"]},
"theme": {"id": ctx.get("theme_id", ""), "name": ctx.get("theme_name", "")},
"permits": permits,
}
)
used_contexts.append(ctx)
if direct_results:
if return_debug:
debug_payload = {
**debug_info,
"strategy": "permit_exact",
"matched_permit_name": query_exact,
"matched_permit_ids": [ctx["permit_id"] for ctx in used_contexts],
}
else:
debug_payload = {}
return {
"risk_subject": direct_results,
"debug": debug_payload,
}
if not catalog:
logger.info(
"[lawrisk_v2] No catalog entries remain after region filter; skipping selection."
)
return {
"risk_subject": [],
"debug": debug_info if return_debug else {},
}
selected_ids = _select_theme_options(query, catalog)
catalog_map = {item["option_id"]: item for item in catalog}
results: List[Dict[str, Any]] = []
debug_info["catalog_used_for_llm"] = len(catalog)
for option_id in selected_ids:
item = catalog_map[option_id]
if ":" not in option_id:
continue
region_id, theme_id = option_id.split(":", 1)
payload = load_theme_payload(region_id, theme_id)
# Sanitize permits for V2 API (V2 should only expose external contact info)
for permit in payload.get("permits", []):
if "responsible_contact" in permit:
del permit["responsible_contact"]
results.append(
{
"id": option_id,
"display_name": item["display_name"],
"region": payload["region"],
"theme": payload["theme"],
"permits": payload["permits"],
}
)
if return_debug:
debug_info = {
**debug_info,
"selected_option_ids": selected_ids,
}
else:
debug_info = {}
return {
"risk_subject": results,
"debug": debug_info,
}