fs-lawrisk/lawrisk/services/lawrisk_v2_service.py

405 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import logging
from typing import Any, Dict, List
from lawrisk.services.licensing_repo import (
list_region_theme_options,
load_theme_payload,
load_permits_and_risks,
find_permit_contexts_by_name,
)
from lawrisk.services.lawrisk_service import ChatClient
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("[lawrisk_v2] %(levelname)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.propagate = False
def _compose_prompt(payload: Dict[str, Any]) -> str:
"""Build a natural-language prompt snippet from structured payload."""
region = payload.get("region", {})
theme = payload.get("theme", {})
permits = payload.get("permits", [])
lines: List[str] = []
lines.append(f"地区:{region.get('name', '')}")
lines.append(f"主题事项:{theme.get('name', '')}")
for permit in permits:
pname = permit.get("name", "")
lines.append(f"许可事项:{pname}")
permit_scopes = permit.get("business_scopes", [])
if permit_scopes:
scope_text = "".join(
scope.get("description", "") for scope in permit_scopes if scope.get("description")
)
if scope_text:
lines.append(f" 经营范围:{scope_text}")
risks = permit.get("risks", [])
for idx, risk in enumerate(risks, start=1):
detail_parts = []
if risk.get("risk_content"):
detail_parts.append(f"风险提示:{risk['risk_content']}")
if risk.get("legal_basis"):
detail_parts.append(f"法律依据:{risk['legal_basis']}")
if risk.get("document_no"):
detail_parts.append(f"文号:{risk['document_no']}")
if risk.get("summary"):
detail_parts.append(f"摘要:{risk['summary']}")
if detail_parts:
lines.append(f" 风险{idx}" + "".join(detail_parts))
return "\n".join(lines)
def _normalize_region_filter(region_filter: Any) -> Dict[str, Any] | None:
"""Normalize region filter input into comparable tokens."""
if region_filter is None:
return None
tokens: List[str] = []
def _consume(value: Any) -> None:
if value is None:
return
if isinstance(value, (list, tuple, set, frozenset)):
for item in value:
_consume(item)
return
if isinstance(value, str):
normalized = (
value.replace("", ",")
.replace("", ",")
.replace(";", ",")
.replace("|", ",")
)
parts = [part.strip() for part in normalized.split(",")]
tokens.extend(part for part in parts if part)
return
text = str(value).strip()
if text:
tokens.append(text)
_consume(region_filter)
stripped = [token for token in tokens if token]
if not stripped:
return None
lower_tokens = {token.lower() for token in stripped}
return {
"raw": stripped,
"ids": set(stripped),
"lower": lower_tokens,
}
def _select_theme_options(query: str, catalog: List[Dict[str, str]]) -> List[str]:
"""Use LLM to choose relevant region-theme option ids."""
if not catalog:
return []
display_entries = [item["display_name"] for item in catalog]
options_block = "\n".join(display_entries)
system_msg = (
"你是政务事项检索助手。根据用户提供的问题,"
"从给定的地区-主题列表中选择最相关的主题事项,返回对应的地区·主题名称。"
"输出 JSON 数组,例如: [\"市级 · 开办电影院\"]."
)
user_msg = (
f"用户问题: {query}\n\n"
"候选主题列表:\n"
f"{options_block}\n\n"
"请仅输出 JSON 数组,内容为选择的地区·主题名称。如果没有匹配,请输出 []."
)
logger.info(
"[lawrisk_v2] LLM selection request | query=%s | catalog_size=%d",
query,
len(catalog),
)
logger.info("[lawrisk_v2] LLM system prompt: %s", system_msg)
logger.info("[lawrisk_v2] LLM user prompt: %s", user_msg)
chat = ChatClient()
content = chat.chat(
[
{"role": "system", "content": system_msg},
{"role": "user", "content": user_msg},
]
)
raw = content.strip()
logger.info("[lawrisk_v2] LLM raw response: %s", raw)
start = raw.find("[")
end = raw.rfind("]")
if start != -1 and end != -1 and end > start:
snippet = raw[start : end + 1]
else:
snippet = raw
selected: List[str] = []
try:
data = json.loads(snippet)
if isinstance(data, list):
for item in data:
if isinstance(item, str):
selected.append(item)
elif isinstance(item, dict) and isinstance(item.get("id"), str):
selected.append(item["id"])
except Exception:
selected = []
display_to_option = {item["display_name"]: item["option_id"] for item in catalog}
uniq: List[str] = []
for display_name in selected:
option_id = display_to_option.get(display_name)
if option_id and option_id not in uniq:
uniq.append(option_id)
logger.info("[lawrisk_v2] LLM mapped option_ids: %s", uniq)
return uniq
def list_regions() -> List[Dict[str, str]]:
"""Return unique regions from the catalog."""
catalog = list_region_theme_options()
seen_ids = set()
regions: List[Dict[str, str]] = []
for item in catalog:
region_id = item["region_id"]
if region_id in seen_ids:
continue
seen_ids.add(region_id)
regions.append({"id": region_id, "name": item["region_name"]})
return regions
def suggest_related_questions(query: str, search_results: Dict[str, Any], max_q: int = 5) -> List[str]:
"""Generate related question suggestions based on actual search results."""
risk_subjects = search_results.get("risk_subject", [])
if not risk_subjects:
return []
# Build context from search results
context_lines: List[str] = []
for subject in risk_subjects[:3]: # Limit to top 3 results
display_name = subject.get("display_name", "")
region = subject.get("region", {})
theme = subject.get("theme", {})
permits = subject.get("permits", [])
context_lines.append(f"主题: {display_name}")
context_lines.append(f" 地区: {region.get('name', '')}")
context_lines.append(f" 事项: {theme.get('name', '')}")
# Add permit names
permit_names = [p.get("name", "") for p in permits[:5] if p.get("name")]
if permit_names:
context_lines.append(f" 相关许可: {', '.join(permit_names)}")
# Add a sample risk
if permits:
first_permit = permits[0]
risks = first_permit.get("risks", [])
if risks:
sample_risk = risks[0]
risk_content = sample_risk.get("risk_content", "")
if risk_content:
context_lines.append(f" 风险提示: {risk_content[:100]}...")
if not context_lines:
return []
context_block = "\n".join(context_lines)
system_msg = (
"你是政务事项问答助手。根据用户的查询和检索结果,"
"生成3-5个相关的中文推荐问题。每个问题应该是完整的疑问句"
"直接基于检索到的具体内容。问题应该帮助用户进一步了解相关信息。"
"只返回问题本身,不需要额外说明。"
)
user_msg = (
f"用户查询: {query}\n\n"
f"检索到的相关信息:\n{context_block}\n\n"
f"请根据以上信息,生成 {max_q} 个相关的中文推荐问题。"
"每个问题应该:"
"1. 基于检索到的具体内容"
"2. 清晰、简洁"
"3. 是完整的疑问句"
"4. 帮助用户进一步探索相关政务事项"
"仅输出 JSON 数组,格式如: [\"问题1\", \"问题2\", \"问题3\"]."
)
try:
chat = ChatClient()
content = chat.chat([
{"role": "system", "content": system_msg},
{"role": "user", "content": user_msg},
])
raw = content.strip()
start = raw.find("[")
end = raw.rfind("]")
if start != -1 and end != -1 and end > start:
snippet = raw[start : end + 1]
else:
snippet = raw
selected: List[str] = []
try:
data = json.loads(snippet)
if isinstance(data, list):
for item in data:
if isinstance(item, str) and item.strip():
selected.append(item.strip())
except Exception:
selected = []
# Deduplicate and limit
seen = set()
result = []
for q in selected:
if q not in seen and len(result) < max_q:
seen.add(q)
result.append(q)
return result
except Exception:
return []
def search_v2(
query: str, return_debug: bool = False, region_filter: Any = None
) -> Dict[str, Any]:
catalog_full = list_region_theme_options()
debug_info: Dict[str, Any] = {"catalog_total": len(catalog_full)}
filters = _normalize_region_filter(region_filter)
if filters:
filtered_catalog: List[Dict[str, str]] = []
for item in catalog_full:
region_id = item["region_id"]
region_name = item["region_name"].strip()
display_name = item["display_name"].strip()
region_id_lower = region_id.lower()
region_name_lower = region_name.lower()
display_lower = display_name.lower()
if (
region_id in filters["ids"]
or region_id_lower in filters["lower"]
or region_name_lower in filters["lower"]
or display_lower in filters["lower"]
):
filtered_catalog.append(item)
logger.info(
"[lawrisk_v2] Region filter applied | input=%s | kept=%d/%d",
filters["raw"],
len(filtered_catalog),
len(catalog_full),
)
debug_info["region_filter"] = filters["raw"]
debug_info["catalog_after_region_filter"] = len(filtered_catalog)
catalog = filtered_catalog
else:
catalog = catalog_full
allowed_region_ids = {item["region_id"] for item in catalog}
query_exact = query.strip()
direct_contexts: List[Dict[str, str]] = []
if query_exact:
contexts = find_permit_contexts_by_name(query_exact)
if contexts:
if filters:
contexts = [ctx for ctx in contexts if ctx["region_id"] in allowed_region_ids]
direct_contexts = contexts
if direct_contexts:
logger.info(
"[lawrisk_v2] Direct permit match | query=%s | contexts=%d",
query_exact,
len(direct_contexts),
)
direct_results: List[Dict[str, Any]] = []
used_contexts: List[Dict[str, str]] = []
for ctx in direct_contexts:
permits = load_permits_and_risks(
ctx["region_id"],
ctx["theme_id"],
permit_id=ctx["permit_id"],
)
if not permits:
continue
direct_results.append(
{
"id": "",
"display_name": "",
"region": {"id": ctx["region_id"], "name": ctx["region_name"]},
"theme": {"id": "", "name": ""},
"permits": permits,
}
)
used_contexts.append(ctx)
if direct_results:
if return_debug:
debug_payload = {
**debug_info,
"strategy": "permit_exact",
"matched_permit_name": query_exact,
"matched_permit_ids": [ctx["permit_id"] for ctx in used_contexts],
}
else:
debug_payload = {}
return {
"risk_subject": direct_results,
"debug": debug_payload,
}
if not catalog:
logger.info(
"[lawrisk_v2] No catalog entries remain after region filter; skipping selection."
)
return {
"risk_subject": [],
"debug": debug_info if return_debug else {},
}
selected_ids = _select_theme_options(query, catalog)
catalog_map = {item["option_id"]: item for item in catalog}
results: List[Dict[str, Any]] = []
debug_info["catalog_used_for_llm"] = len(catalog)
for option_id in selected_ids:
item = catalog_map[option_id]
if ":" not in option_id:
continue
region_id, theme_id = option_id.split(":", 1)
payload = load_theme_payload(region_id, theme_id)
results.append(
{
"id": option_id,
"display_name": item["display_name"],
"region": payload["region"],
"theme": payload["theme"],
"permits": payload["permits"],
}
)
if return_debug:
debug_info = {
**debug_info,
"selected_option_ids": selected_ids,
}
else:
debug_info = {}
return {
"risk_subject": results,
"debug": debug_info,
}