fs-lawrisk/lawrisk/api/v2.py

1244 lines
47 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""V2 API routes - Enhanced implementation with structured results."""
from __future__ import annotations
import os
import time
from io import BytesIO
from flask import Blueprint, jsonify, request, send_file
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, Optional
from lawrisk.api.auth import login_required, get_current_user, ensure_admin_access
from lawrisk.services.lawrisk_v2_service import search_v2, list_regions, suggest_related_questions
from lawrisk.services.licensing_repo import (
list_permits_for_region,
load_permits_and_risks,
list_region_theme_options,
list_region_permit_catalog,
load_theme_payload,
create_checkpoint,
list_checkpoints,
restore_checkpoint,
delete_checkpoint,
list_permit_risk_snapshot_summaries,
count_permit_risk_snapshots,
delete_region_permit,
restore_permit_risk_snapshot_batch,
start_permit_import_session,
commit_permit_import_session,
fetch_permit_file,
describe_permit_import_session,
resolve_region_permit_theme,
list_stored_permit_files,
start_import_session_from_file,
delete_stored_permit_file,
list_service_departments,
build_service_department_tree,
create_service_department,
update_service_department,
delete_service_department,
list_all_themes,
create_theme,
rename_theme,
delete_theme,
)
from lawrisk.services.auth_service import (
list_users,
create_user,
update_user_account,
)
from lawrisk.services.template_service import (
get_permit_template_path,
get_permit_template_metadata,
overwrite_permit_template,
)
v2_bp = Blueprint('lawrisk_v2', __name__, url_prefix='/fs-ai-asistant/api/workflow/lawrisk')
def _project_root() -> str:
return os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
def _parse_truthy(value: Any, default: bool = False) -> bool:
if value is None:
return default
if isinstance(value, bool):
return value
return str(value).strip().lower() in {"1", "true", "yes", "on"}
def _admin_guard(*, prefer_json: bool = False, roles: Optional[Iterable[str]] = None):
user, error = ensure_admin_access(allowed_roles=roles, prefer_json=prefer_json)
if error:
return None, error
return user, None
@v2_bp.route('/v2', methods=['POST', 'GET'])
def lawrisk_search_v2():
"""V2 search endpoint with structured results."""
query, debug_flag, top_k_int, _mode, region_filter = _extract_params()
if not query or not isinstance(query, str):
return jsonify({"error": "query is required"}), 400
try:
t0 = time.time()
# Execute search first
result_v2 = search_v2(query, debug_flag, region_filter)
# Generate recommendations based on search results
rec_questions = suggest_related_questions(query, result_v2, max(1, top_k_int))
risk_subject = result_v2.get("risk_subject", []) if isinstance(result_v2, dict) else []
found = bool(risk_subject)
exec_time = int((time.time() - t0) * 1000)
data = {
"llmRespond": "" if found else "抱歉,无法检索到相关答案",
"lawRisk": "",
"questionExtend": rec_questions,
"conversationId": "",
"messageId": "",
"roundNumber": 0,
"conversationInfo": {},
"knowledgeSources": [],
"totalKnowledgeSources": 0,
"executionTime": exec_time,
"workflowStatus": "ok" if found else "no_match",
"executionSteps": [],
"costStatistics": {},
"workflowTrackingId": "",
"risk_subject": risk_subject,
"debug": result_v2.get("debug", {}) if (debug_flag and isinstance(result_v2, dict)) else {},
}
resp = {"success": True, "message": "OK", "data": data}
return jsonify(resp)
except Exception as e:
print(f"lawrisk_search_v2 error: {e}")
return jsonify({"success": False, "message": str(e), "data": {}}), 500
@v2_bp.route('/v2/regions', methods=['GET'])
def lawrisk_regions():
"""Get list of available regions."""
try:
regions = list_regions()
return jsonify({"success": True, "data": {"regions": regions}})
except Exception as exc:
print(f"lawrisk_regions error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/getPermits', methods=['GET', 'POST'])
def lawrisk_get_permits():
"""Get permits for a specific region."""
if request.method == "GET":
region_value = request.args.get("region") or request.args.get("region_id")
else:
if request.is_json:
payload = request.get_json(silent=True) or {}
else:
payload = request.form.to_dict(flat=True) if request.form else {}
region_value = payload.get("region") or payload.get("region_id")
if not region_value or (isinstance(region_value, str) and not region_value.strip()):
return jsonify({"success": False, "message": "region is required", "data": {}}), 400
region_token = region_value.strip() if isinstance(region_value, str) else str(region_value)
try:
permits = list_permits_for_region(region_token)
return jsonify({"success": True, "data": {"region": region_token, "permits": permits}})
except Exception as exc:
print(f"lawrisk_get_permits error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
def _extract_params():
"""Extract parameters from request."""
if request.method == "GET":
query = request.args.get("query") or request.args.get("q") or request.args.get("text")
debug_flag = request.args.get("debug") in {"1", "true", "yes", "on"}
top_k = request.args.get("top")
try:
top_k_int = int(top_k) if top_k else 5
except Exception:
top_k_int = 5
mode_value = (request.args.get("mode") or "llm").lower()
region_value = request.args.get("region") or request.args.get("region_id")
region_list = request.args.getlist("region")
if region_list and (not region_value or len(region_list) > 1):
region_value = region_list
else:
if request.is_json:
payload = request.get_json(silent=True) or {}
else:
payload = request.form.to_dict(flat=True) if request.form else {}
query = payload.get("query") or payload.get("q") or payload.get("text")
debug_flag = str(payload.get("debug", "")).strip().lower() in {"1", "true", "yes", "on"}
try:
top_k_int = int(payload.get("top", 5))
except Exception:
top_k_int = 5
mode_value = str(payload.get("mode", "llm")).lower()
region_value = payload.get("region") or payload.get("region_id")
return query, debug_flag, top_k_int, mode_value, region_value
@v2_bp.route('/admin/test', methods=['GET'])
def admin_test():
"""Simple test route."""
return jsonify({"success": True, "message": "Test route works!"})
@v2_bp.route('/test-simple', methods=['GET'])
def test_simple():
"""Very simple test."""
return jsonify({"status": "ok"})
@v2_bp.route('/db_admin', methods=['GET'])
@login_required
def db_admin_page():
"""Serve the database administration UI."""
html_path = os.path.join(_project_root(), 'static', 'db_admin.html')
if not os.path.exists(html_path):
return jsonify({"success": False, "message": "DB admin page not found"}), 404
return send_file(html_path, mimetype='text/html')
@v2_bp.route('/admin/super', methods=['GET'])
def super_admin_page():
"""Serve the super administrator console."""
_, error = _admin_guard(roles=("admin",))
if error:
return error
html_path = os.path.join(_project_root(), 'static', 'super_admin.html')
if not os.path.exists(html_path):
return jsonify({"success": False, "message": "Super admin page not found"}), 404
return send_file(html_path, mimetype='text/html')
@v2_bp.route('/admin/users', methods=['GET'])
def admin_list_users():
"""List users for the super admin console."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
include_inactive = _parse_truthy(request.args.get("include_inactive"), default=True)
try:
users = list_users(include_inactive=include_inactive)
return jsonify({"success": True, "data": {"users": users}})
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/users', methods=['POST'])
def admin_create_user():
"""Create a new application user."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
username = (payload.get("username") or "").strip().lower()
password = (payload.get("password") or "").strip()
if not username or not password:
return jsonify({"success": False, "message": "用户名和密码均不能为空"}), 400
try:
user = create_user(
username=username,
password=password,
display_name=payload.get("display_name"),
role=(payload.get("role") or "department_admin").strip(),
grade=int(payload.get("grade") or 50),
service_department_id=(payload.get("service_department_id") or "").strip() or None,
department_role=payload.get("department_role"),
)
return jsonify({"success": True, "data": {"user": user}})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/users/<user_id>', methods=['PATCH'])
def admin_update_user(user_id: str):
"""Update password or service department for an existing user."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
update_kwargs: Dict[str, Any] = {}
if "password" in payload:
update_kwargs["password"] = payload.get("password") or ""
if "service_department_id" in payload:
dept_value = payload.get("service_department_id")
if isinstance(dept_value, str) and not dept_value.strip():
dept_value = None
update_kwargs["service_department_id"] = dept_value
if "display_name" in payload:
update_kwargs["display_name"] = payload.get("display_name")
if "role" in payload:
update_kwargs["role"] = payload.get("role")
if "grade" in payload:
update_kwargs["grade"] = payload.get("grade")
if "department_role" in payload:
update_kwargs["department_role"] = payload.get("department_role")
if not update_kwargs:
return jsonify({"success": False, "message": "请至少提供一个需要更新的字段"}), 400
try:
updated = update_user_account(user_id, **update_kwargs)
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
if not updated:
return jsonify({"success": False, "message": "用户不存在"}), 404
return jsonify({"success": True, "data": {"user": updated}})
@v2_bp.route('/admin/service-departments', methods=['GET'])
def admin_list_service_departments():
"""Return all service departments in a flat list."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
try:
departments = list_service_departments()
return jsonify({"success": True, "data": {"departments": departments}})
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/service-departments/tree', methods=['GET'])
def admin_service_departments_tree():
"""Return service departments in a tree structure for organizational chart display."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
try:
tree = build_service_department_tree()
return jsonify({"success": True, "data": {"tree": tree}})
except Exception as exc:
print(f"admin_service_departments_tree error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/service-departments', methods=['POST'])
def admin_create_service_department():
"""Create a service department node with auto-generated account."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
name = payload.get("name")
code = payload.get("code")
if not name or not str(name).strip():
return jsonify({"success": False, "message": "服务部门名称不能为空"}), 400
if not code or not str(code).strip():
return jsonify({"success": False, "message": "部门账号不能为空"}), 400
code = code.strip().upper()
grade = payload.get("grade") or 0
try:
grade = int(grade)
except (ValueError, TypeError):
grade = 0
kwargs = {
"code": code,
"phone": (payload.get("phone") or "").strip() or None,
"parent_id": (payload.get("parent_id") or "").strip() or None,
"region_id": (payload.get("region_id") or "").strip() or None,
"description": payload.get("description"),
"grade": grade,
}
try:
department = create_service_department(name, **kwargs)
department_id = department.get("id")
default_password = f"{code}123456"
try:
user = create_user(
username=code,
password=default_password,
display_name=f"{name}管理员",
role="department_admin",
grade=grade,
service_department_id=department_id,
department_role="admin"
)
return jsonify({
"success": True,
"data": {
"department": department,
"user": user,
"default_password": default_password
}
})
except ValueError as user_exc:
return jsonify({
"success": False,
"message": f"部门创建成功,但账号创建失败: {str(user_exc)}"
}), 400
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/service-departments/<dept_id>', methods=['PATCH'])
def admin_update_service_department(dept_id: str):
"""Update department metadata such as name, phone, grade, or hierarchy via drag-and-drop."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
updates: Dict[str, Any] = {}
for field in ("name", "phone", "parent_id", "region_id", "description"):
if field in payload:
value = payload.get(field)
if isinstance(value, str) and not value.strip():
value = None
if field == "parent_id" and value == dept_id:
return jsonify({"success": False, "message": "部门不能设置为自己的下级"}), 400
updates[field] = value
if "grade" in payload:
try:
grade = int(payload.get("grade"))
updates["grade"] = grade
except (ValueError, TypeError):
return jsonify({"success": False, "message": "权限等级必须是数字"}), 400
if not updates:
return jsonify({"success": False, "message": "没有需要更新的字段"}), 400
try:
department = update_service_department(dept_id, **updates)
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
if not department:
return jsonify({"success": False, "message": "服务部门不存在"}), 404
message = "部门信息已更新"
if "parent_id" in updates:
message = "层级关系已更新"
elif "grade" in updates:
message = f"权限等级已更新为 {updates['grade']}"
return jsonify({
"success": True,
"data": {
"department": department,
"message": message
}
})
@v2_bp.route('/admin/service-departments/<dept_id>', methods=['DELETE'])
def admin_delete_service_department(dept_id: str):
"""Delete a service department node (only when no users are bound).
If force=true query parameter is provided, will first unbind all users from the department.
"""
_, error = _admin_guard(prefer_json=True)
if error:
return error
force = request.args.get("force", "").lower() in ("true", "1", "yes")
try:
if force:
from lawrisk.services.auth_service import update_user_account
users = list_users()
for user in users:
if user.get("service_department_id") == dept_id:
update_user_account(
user_id=user["id"],
service_department_id=None
)
result = delete_service_department(dept_id)
except ValueError as exc:
error_msg = str(exc)
if "仍有账号绑定" in error_msg and not force:
return jsonify({
"success": False,
"message": f"{error_msg}。如需强制删除请在URL中添加?force=true参数先解除绑定。",
"code": "HAS_BOUND_USERS"
}), 400
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
if not result.get("deleted"):
return jsonify({"success": False, "message": result.get("message", "删除失败")}), 400
return jsonify({
"success": True,
"data": result,
"message": "部门删除成功"
})
@v2_bp.route('/admin/themes/catalog', methods=['GET'])
def admin_theme_catalog():
"""List all available themes."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
try:
themes = list_all_themes()
return jsonify({"success": True, "data": {"themes": themes}})
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/themes/catalog', methods=['POST'])
def admin_create_theme():
"""Create a new theme."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
name = payload.get("name")
if not name or not str(name).strip():
return jsonify({"success": False, "message": "主题名称不能为空"}), 400
try:
theme = create_theme(name)
return jsonify({"success": True, "data": {"theme": theme}})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/themes/catalog/<theme_id>', methods=['PATCH'])
def admin_rename_theme(theme_id: str):
"""Rename an existing theme."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
name = payload.get("name") or payload.get("new_name")
if not name or not str(name).strip():
return jsonify({"success": False, "message": "主题名称不能为空"}), 400
try:
theme = rename_theme(theme_id, name)
return jsonify({"success": True, "data": {"theme": theme}})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/themes/catalog/<theme_id>', methods=['DELETE'])
def admin_delete_theme(theme_id: str):
"""Delete a theme and related region associations."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
try:
result = delete_theme(theme_id)
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
if not result.get("deleted"):
return jsonify({"success": False, "message": result.get("message", "主题不存在")}), 404
return jsonify({"success": True, "data": result})
@v2_bp.route('/admin/templates/permit', methods=['GET'])
def admin_permit_template_meta():
"""Return metadata for the import template."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
try:
data = get_permit_template_metadata()
return jsonify({"success": True, "data": data})
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/templates/permit', methods=['POST'])
def admin_update_permit_template():
"""Overwrite the existing import template."""
admin_user, error = _admin_guard(prefer_json=True)
if error:
return error
if 'file' not in request.files:
return jsonify({"success": False, "message": "请上传模板文件"}), 400
file_storage = request.files['file']
file_bytes = file_storage.read()
if not file_bytes:
return jsonify({"success": False, "message": "上传的模板为空"}), 400
try:
meta = overwrite_permit_template(
file_bytes,
file_storage.filename or "import_template.xlsx",
uploaded_by=(admin_user or {}).get("username"),
)
return jsonify({"success": True, "data": meta})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/regions', methods=['GET'])
def admin_regions():
"""Get all regions for database maintenance."""
try:
regions = list_regions()
return jsonify({"success": True, "data": {"regions": regions}})
except Exception as exc:
print(f"admin_regions error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/themes', methods=['GET'])
def admin_themes():
"""Get themes for a specific region."""
region_value = request.args.get("region") or request.args.get("region_id")
if not region_value or (isinstance(region_value, str) and not region_value.strip()):
return jsonify({"success": False, "message": "region is required", "data": {}}), 400
region_token = region_value.strip() if isinstance(region_value, str) else str(region_value)
try:
catalog = list_region_theme_options()
region_id_lower = region_token.lower()
themes = []
seen_theme_ids = set()
for item in catalog:
if (item["region_id"] == region_token or
item["region_id"].lower() == region_id_lower or
item["region_name"].lower() == region_id_lower):
theme_id = item["theme_id"]
if theme_id not in seen_theme_ids:
seen_theme_ids.add(theme_id)
themes.append({
"id": theme_id,
"name": item["theme_name"],
"option_id": item["option_id"]
})
return jsonify({"success": True, "data": {"region": region_token, "themes": themes}})
except Exception as exc:
print(f"admin_themes error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permits', methods=['GET'])
def admin_permits():
"""Get permits for a region. Optional theme filter keeps backward compatibility."""
region_value = request.args.get("region") or request.args.get("region_id")
theme_value = request.args.get("theme") or request.args.get("theme_id")
if not region_value or (isinstance(region_value, str) and not region_value.strip()):
return jsonify({"success": False, "message": "region is required", "data": {}}), 400
region_token = region_value.strip() if isinstance(region_value, str) else str(region_value)
theme_token = (
theme_value.strip()
if theme_value and isinstance(theme_value, str)
else (str(theme_value) if theme_value is not None else None)
)
try:
if theme_token:
permits = load_permits_and_risks(region_token, theme_token)
data = {
"region": region_token,
"theme": theme_token,
"permits": permits,
}
else:
catalog = list_region_permit_catalog(region_token)
data = {
"region": region_token,
"permits": catalog,
}
return jsonify({"success": True, "data": data})
except Exception as exc:
print(f"admin_permits error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permit-import/upload', methods=['POST'])
def admin_permit_import_upload():
"""Upload Excel workbook and start an import session."""
if 'file' not in request.files:
return jsonify({"success": False, "message": "请选择要上传的Excel文件"}), 400
file_storage = request.files['file']
filename = file_storage.filename or 'import.xlsx'
file_bytes = file_storage.read()
user = get_current_user() or {}
uploaded_by = user.get("display_name") or user.get("username") or user.get("id")
content_type = file_storage.mimetype or "application/octet-stream"
if not file_bytes:
return jsonify({"success": False, "message": "上传的文件为空"}), 400
try:
data = start_permit_import_session(
file_bytes,
filename,
content_type=content_type,
uploaded_by=str(uploaded_by) if uploaded_by else None,
)
return jsonify({"success": True, "data": data})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
print(f"admin_permit_import_upload error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permit-import/template', methods=['GET'])
def admin_permit_import_template():
"""Provide the Excel import template for download."""
template_path = get_permit_template_path()
if not os.path.exists(template_path):
return jsonify({"success": False, "message": "模板文件不存在,请联系管理员"}), 404
try:
return send_file(
template_path,
as_attachment=True,
download_name='风险提示表 模板.xlsx',
)
except Exception as exc:
print(f"admin_permit_import_template error: {exc}")
return jsonify({"success": False, "message": "模板文件暂时无法下载"}), 500
def _build_import_preview_response(session_token: str):
"""Internal helper to build preview response JSON."""
try:
data = describe_permit_import_session(session_token)
return jsonify({"success": True, "data": data})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
print(f"admin_permit_import_preview error: {exc}")
return jsonify({"success": False, "message": "无法加载预览数据"}), 500
@v2_bp.route('/admin/permit-import/session/<session_id>/preview', methods=['GET'])
def admin_permit_import_preview(session_id: str):
"""Return parsed workbook preview along with theme options (path param)."""
if not session_id:
return jsonify({"success": False, "message": "session_id 不能为空"}), 400
return _build_import_preview_response(session_id)
@v2_bp.route('/admin/permit-import/preview', methods=['GET'])
def admin_permit_import_preview_query():
"""Return preview via query string for compatibility."""
session_id = request.args.get("session_id") or request.args.get("sessionId")
if not session_id:
return jsonify({"success": False, "message": "session_id 不能为空"}), 400
return _build_import_preview_response(session_id)
@v2_bp.route('/admin/permit-import/commit', methods=['POST'])
def admin_permit_import_commit():
"""Commit an import session with selected sheets."""
payload = request.get_json(silent=True) or {}
session_id = payload.get('session_id') or payload.get('sessionId')
sheet_names = payload.get('sheet_names') or payload.get('sheets') or payload.get('selectedSheets')
overrides = payload.get('overrides') or {}
edited_by = payload.get('edited_by') or payload.get('editedBy')
change_summary = payload.get('change_summary') or payload.get('changeSummary')
theme_bindings = payload.get('theme_bindings') or payload.get('themeBindings') or {}
if isinstance(sheet_names, str):
sheet_names = [sheet_names]
try:
data = commit_permit_import_session(
session_id,
sheet_names or [],
overrides=overrides,
edited_by=edited_by,
change_summary=change_summary,
theme_bindings=theme_bindings,
)
return jsonify({"success": True, "data": data})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
print(f"admin_permit_import_commit error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permit-files', methods=['GET'])
def admin_list_permit_files():
"""List stored permit files with pagination."""
try:
limit = int(request.args.get("limit", 20))
except (TypeError, ValueError):
limit = 20
limit = max(1, min(limit, 100))
try:
offset = int(request.args.get("offset", 0))
except (TypeError, ValueError):
offset = 0
offset = max(0, offset)
keyword = request.args.get("keyword") or request.args.get("q")
try:
data = list_stored_permit_files(limit=limit, offset=offset, keyword=keyword)
return jsonify({"success": True, "data": data})
except Exception as exc:
print(f"admin_list_permit_files error: {exc}")
return jsonify({"success": False, "message": "文件列表加载失败"}), 500
@v2_bp.route('/admin/permit-files/<file_id>/reimport', methods=['POST'])
def admin_reimport_permit_file(file_id: str):
"""Re-create an import session from an archived Excel file."""
if not file_id:
return jsonify({"success": False, "message": "file_id 不能为空"}), 400
user = get_current_user() or {}
requested_by = user.get("display_name") or user.get("username") or user.get("id")
try:
data = start_import_session_from_file(file_id, requested_by=requested_by)
return jsonify({"success": True, "data": data})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 404
except Exception as exc:
print(f"admin_reimport_permit_file error: {exc}")
return jsonify({"success": False, "message": "暂无法重新载入该文件"}), 500
@v2_bp.route('/admin/permit-files/<file_id>', methods=['DELETE'])
def admin_delete_permit_file(file_id: str):
"""Delete an archived permit file."""
if not file_id:
return jsonify({"success": False, "message": "file_id 不能为空"}), 400
try:
deleted = delete_stored_permit_file(file_id)
if deleted:
return jsonify({"success": True, "message": "文件已删除"})
return jsonify({"success": False, "message": "文件不存在"}), 404
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
print(f"admin_delete_permit_file error: {exc}")
return jsonify({"success": False, "message": "文件暂无法删除"}), 500
@v2_bp.route('/admin/permit-file/download', methods=['GET'])
@login_required
def admin_permit_file_download():
"""Download the original Excel file associated with a permit."""
region_value = request.args.get("region") or request.args.get("region_id")
permit_value = request.args.get("permit") or request.args.get("permit_id")
if not region_value or not permit_value:
return jsonify({"success": False, "message": "region 和 permit 参数必填"}), 400
region_token = region_value.strip() if isinstance(region_value, str) else str(region_value)
permit_token = permit_value.strip() if isinstance(permit_value, str) else str(permit_value)
try:
file_payload = fetch_permit_file(region_token, permit_token)
if not file_payload:
return jsonify({"success": False, "message": "当前许可没有关联的原始文件"}), 404
buffer = BytesIO(file_payload["file_data"])
buffer.seek(0)
download_name = file_payload.get("filename") or "许可导入.xlsx"
mimetype = file_payload.get("content_type") or "application/octet-stream"
return send_file(
buffer,
as_attachment=True,
download_name=download_name,
mimetype=mimetype,
)
except Exception as exc:
print(f"admin_permit_file_download error: {exc}")
return jsonify({"success": False, "message": "暂无法下载原始文件"}), 500
@v2_bp.route('/admin/permit-details', methods=['GET'])
def admin_permit_details():
"""Get detailed information for a specific permit."""
region_value = request.args.get("region") or request.args.get("region_id")
theme_value = request.args.get("theme") or request.args.get("theme_id")
permit_value = request.args.get("permit") or request.args.get("permit_id")
if not region_value or not permit_value:
return jsonify({"success": False, "message": "region and permit are required", "data": {}}), 400
region_token = region_value.strip() if isinstance(region_value, str) else str(region_value)
permit_token = permit_value.strip() if isinstance(permit_value, str) else str(permit_value)
theme_token: Optional[str]
theme_display = None
if theme_value is None or (isinstance(theme_value, str) and not theme_value.strip()):
resolved = resolve_region_permit_theme(region_token, permit_token)
if not resolved or not resolved.get("id"):
return jsonify({"success": False, "message": "未找到许可所属主题", "data": {}}), 404
theme_token = resolved["id"]
theme_display = resolved
else:
theme_token = theme_value.strip() if isinstance(theme_value, str) else str(theme_value)
try:
# 始终加载全部主题,详情页需要展示主题列表并高亮当前主题
permits = load_permits_and_risks(region_token, None, permit_token)
if not permits:
return jsonify({"success": False, "message": "Permit not found", "data": {}}), 404
permit_payload = permits[0]
payload = {
"region": region_token,
"theme": theme_token,
"permit": permit_payload,
}
selected_theme_meta = None
theme_list = permit_payload.get("themes") or []
if theme_token:
for candidate in theme_list:
if candidate.get("id") == theme_token:
selected_theme_meta = candidate
break
if not selected_theme_meta and theme_display:
selected_theme_meta = theme_display
if not selected_theme_meta:
selected_theme_meta = permit_payload.get("theme")
if not selected_theme_meta and theme_list:
selected_theme_meta = theme_list[0]
if selected_theme_meta:
permit_payload["theme"] = selected_theme_meta
payload["theme_display"] = selected_theme_meta
payload["selected_theme_id"] = selected_theme_meta.get("id") or ""
elif theme_display:
payload["theme_display"] = theme_display
return jsonify({"success": True, "data": payload})
except Exception as exc:
print(f"admin_permit_details error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permits', methods=['DELETE'])
def admin_delete_permit():
"""Delete a permit for a specific region-theme combination after snapshotting risks."""
payload: Dict[str, Any] = {}
if request.is_json:
payload = request.get_json(silent=True) or {}
elif request.form:
payload = request.form.to_dict(flat=True)
region_value = (
payload.get("region_id")
or payload.get("region")
or request.args.get("region_id")
or request.args.get("region")
)
theme_value = (
payload.get("theme_id")
or payload.get("theme")
or request.args.get("theme_id")
or request.args.get("theme")
)
permit_value = (
payload.get("permit_id")
or payload.get("permit")
or request.args.get("permit_id")
or request.args.get("permit")
)
edited_by = (payload.get("edited_by") or request.args.get("edited_by") or "").strip() or None
change_summary = payload.get("change_summary") or request.args.get("change_summary")
if change_summary is not None:
change_summary = str(change_summary).strip()
if not change_summary:
change_summary = None
if not region_value or not permit_value:
return jsonify({"success": False, "message": "region_id 和 permit_id 均为必填"}), 400
region_id = region_value.strip() if isinstance(region_value, str) else str(region_value)
permit_id = permit_value.strip() if isinstance(permit_value, str) else str(permit_value)
if theme_value:
theme_id = theme_value.strip() if isinstance(theme_value, str) else str(theme_value)
else:
resolved_theme = resolve_region_permit_theme(region_id, permit_id)
if not resolved_theme or not resolved_theme.get("id"):
return jsonify({"success": False, "message": "未找到许可所属主题,无法删除"}), 400
theme_id = resolved_theme["id"]
try:
result = delete_region_permit(
region_id,
theme_id,
permit_id,
edited_by=edited_by,
change_summary=change_summary,
)
return jsonify({"success": True, "data": result})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 404
except Exception as exc:
print(f"admin_delete_permit error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/checkpoints', methods=['GET'])
def admin_list_checkpoints():
"""List all available checkpoints."""
try:
checkpoints = list_checkpoints()
return jsonify({"success": True, "data": {"checkpoints": checkpoints}})
except Exception as exc:
print(f"admin_list_checkpoints error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permit-risk-snapshots', methods=['GET'])
def admin_permit_risk_snapshots():
"""List permit risk checkpoint history entries for the management UI."""
try:
args = request.args
region_id = args.get("region_id") or args.get("region")
permit_id = args.get("permit_id") or args.get("permit")
edited_by = args.get("edited_by")
try:
limit = int(args.get("limit", 20))
except (TypeError, ValueError):
limit = 20
limit = max(1, min(limit, 200))
try:
offset = int(args.get("offset", 0))
except (TypeError, ValueError):
offset = 0
offset = max(0, offset)
snapshots = list_permit_risk_snapshot_summaries(
region_id=region_id,
permit_id=permit_id,
edited_by=edited_by,
limit=limit,
offset=offset,
)
total = count_permit_risk_snapshots(
region_id=region_id,
permit_id=permit_id,
edited_by=edited_by,
)
return jsonify(
{
"success": True,
"data": {
"snapshots": snapshots,
"pagination": {
"limit": limit,
"offset": offset,
"total": total,
},
},
}
)
except Exception as exc:
print(f"admin_permit_risk_snapshots error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permit-risk-snapshots/<batch_id>/restore', methods=['POST'])
def admin_restore_permit_risk_snapshot(batch_id):
"""Restore permit risk relations from a snapshot batch."""
payload: Dict[str, Any] = {}
if request.is_json:
payload = request.get_json(silent=True) or {}
elif request.form:
payload = request.form.to_dict(flat=True)
edited_by = (payload.get("edited_by") or "").strip() or None
change_summary = payload.get("change_summary")
if change_summary is not None:
change_summary = str(change_summary).strip()
if not change_summary:
change_summary = None
try:
result = restore_permit_risk_snapshot_batch(
batch_id,
edited_by=edited_by,
change_summary=change_summary,
)
return jsonify({"success": True, "data": result})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 404
except Exception as exc:
print(f"admin_restore_permit_risk_snapshot error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/checkpoints', methods=['POST'])
def admin_create_checkpoint():
"""Create a new checkpoint."""
if request.is_json:
payload = request.get_json(silent=True) or {}
else:
payload = request.form.to_dict(flat=True) if request.form else {}
description = payload.get("description", "")
try:
checkpoint = create_checkpoint(description)
return jsonify({"success": True, "data": checkpoint})
except Exception as exc:
print(f"admin_create_checkpoint error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/checkpoints/<checkpoint_id>/restore', methods=['POST'])
def admin_restore_checkpoint(checkpoint_id):
"""
⚠️ DANGEROUS OPERATION ⚠️
恢复数据库从checkpoint。
此操作将会:
1. 永久删除当前数据库中的所有数据
2. 从指定的checkpoint恢复数据
3. 如果失败,可能导致数据丢失
建议在生产环境中:
- 确保已创建最新备份
- 在低峰期操作
- 启用自动备份功能 (create_auto_backup=true)
POST参数:
- create_auto_backup: 是否在恢复前自动备份当前状态 (默认: true)
- batch_size: 批量插入的批次大小,每批插入行数 (默认: 1000范围: 100-10000)
"""
import logging
logger = logging.getLogger(__name__)
logger.info("=" * 80)
logger.info(f"[API] Received checkpoint restore request: {checkpoint_id}")
logger.info("=" * 80)
try:
# 获取请求参数
if request.is_json:
payload = request.get_json(silent=True) or {}
else:
payload = request.form.to_dict(flat=True) if request.form else {}
logger.info(f"[API] Request parameters: {payload}")
# 默认启用自动备份
create_auto_backup = str(payload.get("create_auto_backup", "true")).lower() in {"1", "true", "yes", "on"}
logger.info(f"[API] Auto-backup enabled: {create_auto_backup}")
# 解析批量大小参数
try:
batch_size = int(payload.get("batch_size", "1000"))
if batch_size < 100 or batch_size > 10000:
raise ValueError(f"batch_size must be between 100 and 10000, got {batch_size}")
except (ValueError, TypeError) as e:
logger.warning(f"[API] Invalid batch_size: {e}, using default 1000")
batch_size = 1000
logger.info(f"[API] Batch size: {batch_size} rows/batch")
# 执行恢复
logger.info(f"[API] Starting restore operation...")
restore_result = restore_checkpoint(checkpoint_id, create_auto_backup=create_auto_backup, batch_size=batch_size)
# 检查恢复状态
if restore_result.get("status") == "success":
logger.info(f"[API] Restore completed successfully")
logger.info(f"[API] Tables restored: {restore_result['summary']['tables_restored']}")
logger.info(f"[API] Total rows restored: {restore_result['summary']['total_rows_restored']}")
return jsonify({
"success": True,
"message": restore_result["message"],
"data": {
"checkpoint_id": restore_result["summary"]["checkpoint_id"],
"tables_restored": restore_result["summary"]["tables_restored"],
"total_rows": restore_result["summary"]["total_rows_restored"],
"auto_backup": restore_result["summary"].get("auto_backup"),
"table_details": restore_result["summary"]["table_details"]
}
})
else:
# 恢复失败
logger.error(f"[API] Restore failed: {restore_result.get('message')}")
errors = restore_result["summary"].get("errors", [])
if errors:
logger.error(f"[API] Errors encountered: {errors}")
response_data = {
"success": False,
"message": restore_result["message"],
"data": {
"errors": errors,
"auto_backup_available": restore_result.get("auto_backup_available", False)
}
}
# 如果有自动备份,提供恢复建议
if restore_result.get("recovery_suggestion"):
logger.warning(f"[API] Recovery suggestion: {restore_result['recovery_suggestion']}")
response_data["data"]["recovery_suggestion"] = restore_result["recovery_suggestion"]
return jsonify(response_data), 500
except Exception as exc:
logger.error(f"[API] Restore operation failed with exception: {str(exc)}", exc_info=True)
return jsonify({
"success": False,
"message": f"Restore failed: {str(exc)}",
"data": {
"error_type": type(exc).__name__,
"auto_backup_available": None # 未知,因为异常发生在备份之前
}
}), 500
@v2_bp.route('/admin/checkpoints/<checkpoint_id>', methods=['DELETE'])
def admin_delete_checkpoint(checkpoint_id):
"""Delete a checkpoint."""
try:
success = delete_checkpoint(checkpoint_id)
if success:
return jsonify({"success": True, "message": "Checkpoint deleted"})
else:
return jsonify({"success": False, "message": "Checkpoint not found"}), 404
except Exception as exc:
print(f"admin_delete_checkpoint error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500