fs-lawrisk/lawrisk/api/v2.py

1850 lines
71 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""V2 API routes - Enhanced implementation with structured results."""
from __future__ import annotations
import os
import uuid
import time
from io import BytesIO
from flask import Blueprint, jsonify, request, send_file
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, Iterable, Optional
from lawrisk.api.auth import login_required, get_current_user, ensure_admin_access
from lawrisk.services.lawrisk_v2_service import search_v2, list_regions
from lawrisk.services.licensing_repo import (
list_permits_for_region,
load_permits_and_risks,
list_region_theme_options,
list_region_permit_catalog,
load_theme_payload,
create_checkpoint,
list_service_departments,
list_checkpoints,
restore_checkpoint,
delete_checkpoint,
list_permit_risk_snapshot_summaries,
count_permit_risk_snapshots,
delete_region_permit,
restore_permit_risk_snapshot_batch,
start_permit_import_session,
commit_permit_import_session,
fetch_permit_file,
describe_permit_import_session,
resolve_region_permit_theme,
list_stored_permit_files,
start_import_session_from_file,
delete_stored_permit_file,
list_service_departments,
build_service_department_tree,
create_service_department,
update_service_department,
delete_service_department,
list_all_themes,
create_theme,
rename_theme,
delete_theme,
filter_permits_advanced,
list_unbound_permits,
list_operation_logs,
_lic_pg_conn,
)
from lawrisk.services.auth_service import (
list_users,
create_user,
update_user_account,
delete_user_account,
)
from lawrisk.services.template_service import (
get_permit_template_path,
get_permit_template_metadata,
overwrite_permit_template,
)
v2_bp = Blueprint('lawrisk_v2', __name__, url_prefix='/fs-ai-asistant/api/workflow/lawrisk')
def _project_root() -> str:
return os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
def _parse_truthy(value: Any, default: bool = False) -> bool:
if value is None:
return default
if isinstance(value, bool):
return value
return str(value).strip().lower() in {"1", "true", "yes", "on"}
def _admin_guard(*, prefer_json: bool = False, roles: Optional[Iterable[str]] = None):
user, error = ensure_admin_access(allowed_roles=roles, prefer_json=prefer_json)
if error:
return None, error
return user, None
@v2_bp.route('/v2', methods=['POST', 'GET'])
def lawrisk_search_v2():
"""V2 search endpoint with structured results."""
query, debug_flag, top_k_int, _mode, region_filter = _extract_params()
if not query or not isinstance(query, str):
return jsonify({"error": "query is required"}), 400
try:
t0 = time.time()
# Execute search first
result_v2 = search_v2(query, debug_flag, region_filter)
# Generate recommendations based on search results
rec_questions = []
risk_subject = result_v2.get("risk_subject", []) if isinstance(result_v2, dict) else []
found = bool(risk_subject)
exec_time = int((time.time() - t0) * 1000)
data = {
"llmRespond": "" if found else "抱歉,无法检索到相关答案",
"lawRisk": "",
"questionExtend": rec_questions,
"conversationId": "",
"messageId": "",
"roundNumber": 0,
"conversationInfo": {},
"knowledgeSources": [],
"totalKnowledgeSources": 0,
"executionTime": exec_time,
"workflowStatus": "ok" if found else "no_match",
"executionSteps": [],
"costStatistics": {},
"workflowTrackingId": "",
"risk_subject": risk_subject,
"debug": result_v2.get("debug", {}) if (debug_flag and isinstance(result_v2, dict)) else {},
}
resp = {"success": True, "message": "OK", "data": data}
return jsonify(resp)
except Exception as e:
print(f"lawrisk_search_v2 error: {e}")
return jsonify({"success": False, "message": str(e), "data": {}}), 500
@v2_bp.route('/v2/unbound-permits', methods=['GET'])
def lawrisk_unbound_permits():
"""Get list of permits that are not bound to any theme."""
try:
visibility = request.args.get("visibility") or "visible"
search_text = request.args.get("search_text")
department_ids = request.args.getlist("department_ids[]")
region_id = request.args.get("region_id")
permits = list_unbound_permits(
visibility=visibility,
search_text=search_text,
department_ids=department_ids,
region_id=region_id
)
return jsonify({"success": True, "data": permits})
except Exception as exc:
print(f"lawrisk_unbound_permits error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
# ... (omitting irrelevant existing functions) ...
@v2_bp.route('/v2/departments', methods=['GET'])
def lawrisk_departments():
"""Get list of service departments for filtering (lighter permission check than admin)."""
try:
region_id = request.args.get("region_id")
departments = list_service_departments(region_id=region_id)
return jsonify({"success": True, "data": departments})
except Exception as exc:
print(f"lawrisk_departments error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/v2/regions', methods=['GET'])
def lawrisk_regions():
"""Get list of regions."""
try:
regions = list_regions()
return jsonify({"success": True, "data": regions})
except Exception as exc:
print(f"lawrisk_regions error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/v2/themes', methods=['GET'])
def lawrisk_all_themes():
"""Get list of all themes."""
try:
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
department_ids = request.args.getlist("department_ids[]")
themes = list_all_themes(
start_date=start_date,
end_date=end_date,
department_ids=department_ids
)
return jsonify({"success": True, "data": themes})
except Exception as exc:
print(f"lawrisk_all_themes error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/getPermits', methods=['GET', 'POST'])
def lawrisk_get_permits():
"""Get permits for a specific region, filtered by user permissions."""
import logging
logger = logging.getLogger(__name__)
logger.info("=" * 80)
logger.info("API CALLED: lawrisk_get_permits")
logger.info("=" * 80)
if request.method == "GET":
region_value = request.args.get("region") or request.args.get("region_id")
else:
if request.is_json:
payload = request.get_json(silent=True) or {}
else:
payload = request.form.to_dict(flat=True) if request.form else {}
region_value = payload.get("region") or payload.get("region_id")
if not region_value or (isinstance(region_value, str) and not region_value.strip()):
return jsonify({"success": False, "message": "region is required", "data": {}}), 400
region_token = region_value.strip() if isinstance(region_value, str) else str(region_value)
# Get current user for permission filtering
current_user = get_current_user()
print(f"DEBUG lawrisk_get_permits: current_user = {current_user}")
try:
permits = list_permits_for_region(region_token, current_user=current_user)
return jsonify({"success": True, "data": {"region": region_token, "permits": permits}})
except Exception as exc:
print(f"lawrisk_get_permits error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
def _extract_params():
"""Extract parameters from request."""
if request.method == "GET":
query = request.args.get("query") or request.args.get("q") or request.args.get("text")
debug_flag = request.args.get("debug") in {"1", "true", "yes", "on"}
top_k = request.args.get("top")
try:
top_k_int = int(top_k) if top_k else 5
except Exception:
top_k_int = 5
mode_value = (request.args.get("mode") or "llm").lower()
region_value = request.args.get("region") or request.args.get("region_id")
region_list = request.args.getlist("region")
if region_list and (not region_value or len(region_list) > 1):
region_value = region_list
else:
if request.is_json:
payload = request.get_json(silent=True) or {}
else:
payload = request.form.to_dict(flat=True) if request.form else {}
query = payload.get("query") or payload.get("q") or payload.get("text")
debug_flag = str(payload.get("debug", "")).strip().lower() in {"1", "true", "yes", "on"}
try:
top_k_int = int(payload.get("top", 5))
except Exception:
top_k_int = 5
mode_value = str(payload.get("mode", "llm")).lower()
region_value = payload.get("region") or payload.get("region_id")
return query, debug_flag, top_k_int, mode_value, region_value
@v2_bp.route('/admin/test', methods=['GET'])
def admin_test():
"""Simple test route."""
return jsonify({"success": True, "message": "Test route works!"})
@v2_bp.route('/test-simple', methods=['GET'])
def test_simple():
"""Very simple test."""
return jsonify({"status": "ok"})
@v2_bp.route('/db_admin', methods=['GET'])
@login_required
def db_admin_page():
"""Serve the database administration UI."""
html_path = os.path.join(_project_root(), 'static', 'db_admin.html')
if not os.path.exists(html_path):
return jsonify({"success": False, "message": "DB admin page not found"}), 404
return send_file(html_path, mimetype='text/html')
@v2_bp.route('/admin/super', methods=['GET'])
def super_admin_page():
"""Serve the super administrator console."""
_, error = _admin_guard(roles=("admin",))
if error:
return error
html_path = os.path.join(_project_root(), 'static', 'super_admin.html')
if not os.path.exists(html_path):
return jsonify({"success": False, "message": "Super admin page not found"}), 404
return send_file(html_path, mimetype='text/html')
@v2_bp.route('/admin/v2-debug', methods=['GET'])
def super_admin_v2_debug_page():
"""Serve the V2 debugging console for super administrators."""
_, error = _admin_guard(roles=("admin",))
if error:
return error
html_path = os.path.join(_project_root(), 'static', 'v2_admin_debug.html')
if not os.path.exists(html_path):
return jsonify({"success": False, "message": "调试页面不存在"}), 404
return send_file(html_path, mimetype='text/html')
@v2_bp.route('/admin/users', methods=['GET'])
def admin_list_users():
"""List users for the super admin console."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
include_inactive = _parse_truthy(request.args.get("include_inactive"), default=True)
try:
users = list_users(include_inactive=include_inactive)
return jsonify({"success": True, "data": {"users": users}})
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/users', methods=['POST'])
def admin_create_user():
"""Create a new application user. If未提供service_department_id则自动创建同名部门父级可选并绑定。"""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
username = (payload.get("username") or "").strip().lower()
password = (payload.get("password") or "").strip()
if not username or not password:
return jsonify({"success": False, "message": "用户名和密码均不能为空"}), 400
parent_department_id = (payload.get("parent_department_id") or "").strip() or None
service_department_id = (payload.get("service_department_id") or "").strip() or None
region_id = (payload.get("region_id") or "").strip() or None
department_phone = (payload.get("department_phone") or "").strip() or None
# 如果未显式绑定部门,则为该用户创建一个同名单位,并按父级决定层级
created_department: Optional[Dict[str, Any]] = None
if not service_department_id:
# 未显式指定区域时,继承父级部门的区域
if not region_id and parent_department_id:
try:
parent_dept = next(
(dept for dept in list_service_departments() if str(dept.get("id")) == parent_department_id),
None
)
if parent_dept:
region_id = parent_dept.get("region_id")
except Exception:
region_id = None
try:
dept_name = (payload.get("display_name") or username).strip() or username
dept_code = username.upper()
# Check if department code already exists
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT id, name, code, region_id FROM service_departments WHERE code = %s", (dept_code,))
row = cur.fetchone()
if row:
service_department_id = str(row[0])
created_department = {
"id": str(row[0]),
"name": str(row[1]),
"code": str(row[2]),
"region_id": str(row[3]) if row[3] else None
}
if not service_department_id:
created_department = create_service_department(
name=dept_name,
code=dept_code,
phone=department_phone,
parent_id=parent_department_id,
region_id=region_id,
)
service_department_id = created_department.get("id")
except Exception as exc:
err_msg = str(exc)
if "duplicate key value" in err_msg and "service_departments_code_key" in err_msg:
return jsonify({"success": False, "message": f"创建单位失败: 单位代码 {dept_code} 已存在"}), 400
if "violates unique constraint" in err_msg:
return jsonify({"success": False, "message": "创建单位失败: 数据重复"}), 400
return jsonify({"success": False, "message": f"创建单位失败: {exc}"}), 400
try:
admin_user, _ = _admin_guard()
operator = admin_user.get("username") if admin_user else "admin"
user = create_user(
username=username,
password=password,
display_name=payload.get("display_name"),
role=(payload.get("role") or "department_admin").strip(),
grade=int(payload.get("grade") or 50),
service_department_id=service_department_id,
department_role=payload.get("department_role"),
parent_department_id=parent_department_id,
service_department_region_id=region_id,
service_department_phone=department_phone,
operator=operator,
)
return jsonify({"success": True, "data": {"user": user, "department": created_department}})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
err_msg = str(exc)
if "duplicate key value" in err_msg and "auth_users_username_key" in err_msg:
return jsonify({"success": False, "message": f"创建账号失败: 用户名 {username} 已存在"}), 400
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/users/<user_id>', methods=['PATCH'])
def admin_update_user(user_id: str):
"""Update password or service department for an existing user."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
update_kwargs: Dict[str, Any] = {}
if "password" in payload:
update_kwargs["password"] = payload.get("password") or ""
if "service_department_id" in payload:
dept_value = payload.get("service_department_id")
if isinstance(dept_value, str) and not dept_value.strip():
dept_value = None
update_kwargs["service_department_id"] = dept_value
if "display_name" in payload:
update_kwargs["display_name"] = payload.get("display_name")
if "role" in payload:
update_kwargs["role"] = payload.get("role")
if "grade" in payload:
update_kwargs["grade"] = payload.get("grade")
if "department_role" in payload:
update_kwargs["department_role"] = payload.get("department_role")
if not update_kwargs:
return jsonify({"success": False, "message": "请至少提供一个需要更新的字段"}), 400
try:
admin_user, _ = _admin_guard()
operator = admin_user.get("username") if admin_user else "admin"
updated = update_user_account(user_id, operator=operator, **update_kwargs)
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
if not updated:
return jsonify({"success": False, "message": "用户不存在"}), 404
return jsonify({"success": True, "data": {"user": updated}})
@v2_bp.route('/admin/users/<user_id>', methods=['DELETE'])
def admin_delete_user(user_id: str):
"""Delete a user account."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
try:
admin_user, _ = _admin_guard()
operator = admin_user.get("username") if admin_user else "admin"
deleted = delete_user_account(user_id, operator=operator)
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
if not deleted:
return jsonify({"success": False, "message": "用户不存在"}), 404
return jsonify({"success": True, "message": "用户已删除"})
@v2_bp.route('/admin/service-departments', methods=['GET'])
def admin_list_service_departments():
"""Return all service departments in a flat list."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
try:
departments = list_service_departments()
return jsonify({"success": True, "data": {"departments": departments}})
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/service-departments/tree', methods=['GET'])
def admin_service_departments_tree():
"""Return service departments in a tree structure for organizational chart display."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
try:
tree = build_service_department_tree()
return jsonify({"success": True, "data": {"tree": tree}})
except Exception as exc:
print(f"admin_service_departments_tree error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/departments/children', methods=['GET'])
def admin_get_child_departments():
"""Get child departments of a parent department for dropdown filters."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
parent_id = request.args.get('parent_id', '').strip()
if not parent_id:
return jsonify({"success": False, "message": "parent_id is required"}), 400
try:
# Get child departments
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute("""
SELECT id, name, code, unit_level, region_id
FROM service_departments
WHERE parent_id = %s
ORDER BY name
""", (parent_id,))
rows = cur.fetchall()
children = []
for row in rows:
children.append({
"id": str(row[0]),
"name": str(row[1]),
"code": str(row[2]),
"unit_level": str(row[3]) if row[3] else "unit",
"region_id": str(row[4]) if row[4] else None
})
return jsonify({"success": True, "data": {"units": children}})
except Exception as exc:
print(f"admin_get_child_departments error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permits/filter', methods=['POST'])
def admin_filter_permits():
"""Filter and query permits based on user permissions and filter criteria."""
admin_user, error = _admin_guard(prefer_json=True)
if error:
return error
try:
# Get current user info for permission checking
current_user = {
"username": admin_user.get("username"),
"role": admin_user.get("role"),
"grade": admin_user.get("grade"),
"department": admin_user.get("department")
}
# Parse filter parameters from request
filters = {}
if request.is_json:
filters = request.get_json(silent=True) or {}
else:
filters = {
"municipal_dept_id": request.form.get("municipal_dept_id"),
"district_dept_id": request.form.get("district_dept_id"),
"region": request.form.get("region"),
"search_text": request.form.get("search_text")
}
# Get visible permits based on user permissions and filters
permits = get_visible_permits(current_user, filters)
return jsonify({"success": True, "data": {
"permits": permits,
"pagination": {
"total": len(permits),
"page": 1,
"page_size": len(permits)
}
}})
except Exception as exc:
print(f"admin_filter_permits error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/service-departments', methods=['POST'])
def admin_create_service_department():
"""Create a service department node with auto-generated account."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
name = payload.get("name")
code = payload.get("code")
if not name or not str(name).strip():
return jsonify({"success": False, "message": "服务部门名称不能为空"}), 400
code_token = (code or "").strip()
if code_token:
code_token = code_token.upper()
else:
code_token = uuid.uuid4().hex[:8].upper()
grade = payload.get("grade") or 0
try:
grade = int(grade)
except (ValueError, TypeError):
grade = 0
kwargs = {
"code": code,
"phone": (payload.get("phone") or "").strip() or None,
"parent_id": (payload.get("parent_id") or "").strip() or None,
"region_id": (payload.get("region_id") or "").strip() or None,
"description": payload.get("description"),
"grade": grade,
}
if not kwargs["region_id"] and kwargs["parent_id"]:
try:
parent_dept = next(
(dept for dept in list_service_departments() if str(dept.get("id")) == kwargs["parent_id"]),
None
)
if parent_dept:
kwargs["region_id"] = parent_dept.get("region_id")
except Exception:
kwargs["region_id"] = None
try:
admin_user, _ = _admin_guard()
operator = admin_user.get("username") if admin_user else "admin"
department = create_service_department(name, code=code_token, operator=operator, **kwargs)
department_id = department.get("id")
default_password = f"{code_token}123456"
try:
user = create_user(
username=code,
password=default_password,
display_name=f"{name}管理员",
role="department_admin",
grade=grade,
service_department_id=department_id,
department_role="admin"
)
return jsonify({
"success": True,
"data": {
"department": department,
"user": user,
"default_password": default_password
}
})
except ValueError as user_exc:
return jsonify({
"success": False,
"message": f"部门创建成功,但账号创建失败: {str(user_exc)}"
}), 400
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
err_msg = str(exc)
if "duplicate key value" in err_msg:
return jsonify({"success": False, "message": "服务部门代码或名称已存在"}), 400
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/service-departments/<dept_id>', methods=['PATCH'])
def admin_update_service_department(dept_id: str):
"""Update department metadata such as name, phone, grade, or hierarchy via drag-and-drop."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
updates: Dict[str, Any] = {}
for field in ("name", "phone", "parent_id", "region_id", "description"):
if field in payload:
value = payload.get(field)
if isinstance(value, str) and not value.strip():
value = None
if field == "parent_id" and value == dept_id:
return jsonify({"success": False, "message": "部门不能设置为自己的下级"}), 400
updates[field] = value
if "grade" in payload:
try:
grade = int(payload.get("grade"))
updates["grade"] = grade
except (ValueError, TypeError):
return jsonify({"success": False, "message": "权限等级必须是数字"}), 400
if not updates:
return jsonify({"success": False, "message": "没有需要更新的字段"}), 400
try:
admin_user, _ = _admin_guard()
operator = admin_user.get("username") if admin_user else "admin"
department = update_service_department(dept_id, operator=operator, **updates)
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
if not department:
return jsonify({"success": False, "message": "服务部门不存在"}), 404
message = "部门信息已更新"
if "parent_id" in updates:
message = "层级关系已更新"
elif "grade" in updates:
message = f"权限等级已更新为 {updates['grade']}"
return jsonify({
"success": True,
"data": {
"department": department,
"message": message
}
})
@v2_bp.route('/admin/service-departments/<dept_id>', methods=['DELETE'])
def admin_delete_service_department(dept_id: str):
"""Delete a service department node (only when no users are bound).
If force=true query parameter is provided, will first unbind all users from the department.
"""
_, error = _admin_guard(prefer_json=True)
if error:
return error
force = request.args.get("force", "").lower() in ("true", "1", "yes")
try:
if force:
from lawrisk.services.auth_service import update_user_account
users = list_users()
for user in users:
if user.get("service_department_id") == dept_id:
update_user_account(
user_id=user["id"],
service_department_id=None
)
admin_user, _ = _admin_guard()
operator = admin_user.get("username") if admin_user else "admin"
result = delete_service_department(dept_id, operator=operator)
except ValueError as exc:
error_msg = str(exc)
if "仍有账号绑定" in error_msg and not force:
return jsonify({
"success": False,
"message": f"{error_msg}。如需强制删除请在URL中添加?force=true参数先解除绑定。",
"code": "HAS_BOUND_USERS"
}), 400
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
if not result.get("deleted"):
return jsonify({"success": False, "message": result.get("message", "删除失败")}), 400
return jsonify({
"success": True,
"data": result,
"message": "部门删除成功"
})
@v2_bp.route('/admin/themes/catalog', methods=['GET'])
def admin_theme_catalog():
"""List all available themes."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
try:
themes = list_all_themes()
return jsonify({"success": True, "data": {"themes": themes}})
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/themes/catalog', methods=['POST'])
def admin_create_theme():
"""Create a new theme."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
name = payload.get("name")
if not name or not str(name).strip():
return jsonify({"success": False, "message": "主题名称不能为空"}), 400
try:
admin_user, _ = _admin_guard()
operator = admin_user.get("username") if admin_user else "admin"
theme = create_theme(name, operator=operator)
return jsonify({"success": True, "data": {"theme": theme}})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/themes/catalog/<theme_id>', methods=['PATCH'])
def admin_rename_theme(theme_id: str):
"""Rename an existing theme."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
name = payload.get("name") or payload.get("new_name")
if not name or not str(name).strip():
return jsonify({"success": False, "message": "主题名称不能为空"}), 400
try:
admin_user, _ = _admin_guard()
operator = admin_user.get("username") if admin_user else "admin"
theme = rename_theme(theme_id, name, operator=operator)
return jsonify({"success": True, "data": {"theme": theme}})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/themes/catalog/<theme_id>', methods=['DELETE'])
def admin_delete_theme(theme_id: str):
"""Delete a theme and related region associations."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
try:
admin_user, _ = _admin_guard()
operator = admin_user.get("username") if admin_user else "admin"
result = delete_theme(theme_id, operator=operator)
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
if not result.get("deleted"):
return jsonify({"success": False, "message": result.get("message", "主题不存在")}), 404
return jsonify({"success": True, "data": result})
@v2_bp.route('/admin/templates/permit', methods=['GET'])
def admin_permit_template_meta():
"""Return metadata for the import template."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
try:
data = get_permit_template_metadata()
return jsonify({"success": True, "data": data})
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/templates/permit', methods=['POST'])
def admin_update_permit_template():
"""Overwrite the existing import template."""
admin_user, error = _admin_guard(prefer_json=True)
if error:
return error
if 'file' not in request.files:
return jsonify({"success": False, "message": "请上传模板文件"}), 400
file_storage = request.files['file']
file_bytes = file_storage.read()
if not file_bytes:
return jsonify({"success": False, "message": "上传的模板为空"}), 400
try:
meta = overwrite_permit_template(
file_bytes,
file_storage.filename or "import_template.xlsx",
uploaded_by=(admin_user or {}).get("username"),
)
return jsonify({"success": True, "data": meta})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/regions', methods=['GET'])
def admin_regions():
"""Get all regions for database maintenance."""
try:
regions = list_regions()
return jsonify({"success": True, "data": {"regions": regions}})
except Exception as exc:
print(f"admin_regions error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/themes', methods=['GET'])
def admin_themes():
"""Get themes for a specific region."""
region_value = request.args.get("region") or request.args.get("region_id")
if not region_value or (isinstance(region_value, str) and not region_value.strip()):
return jsonify({"success": False, "message": "region is required", "data": {}}), 400
region_token = region_value.strip() if isinstance(region_value, str) else str(region_value)
try:
catalog = list_region_theme_options()
region_id_lower = region_token.lower()
themes = []
seen_theme_ids = set()
for item in catalog:
if (item["region_id"] == region_token or
item["region_id"].lower() == region_id_lower or
item["region_name"].lower() == region_id_lower):
theme_id = item["theme_id"]
if theme_id not in seen_theme_ids:
seen_theme_ids.add(theme_id)
themes.append({
"id": theme_id,
"name": item["theme_name"],
"option_id": item["option_id"]
})
return jsonify({"success": True, "data": {"region": region_token, "themes": themes}})
except Exception as exc:
print(f"admin_themes error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permits', methods=['GET'])
def admin_permits():
"""Get permits for a region. Optional theme filter keeps backward compatibility."""
region_value = request.args.get("region") or request.args.get("region_id")
theme_value = request.args.get("theme") or request.args.get("theme_id")
if not region_value or (isinstance(region_value, str) and not region_value.strip()):
return jsonify({"success": False, "message": "region is required", "data": {}}), 400
region_token = region_value.strip() if isinstance(region_value, str) else str(region_value)
theme_token = (
theme_value.strip()
if theme_value and isinstance(theme_value, str)
else (str(theme_value) if theme_value is not None else None)
)
try:
if theme_token:
permits = load_permits_and_risks(region_token, theme_token)
data = {
"region": region_token,
"theme": theme_token,
"permits": permits,
}
else:
catalog = list_region_permit_catalog(region_token)
data = {
"region": region_token,
"permits": catalog,
}
return jsonify({"success": True, "data": data})
except Exception as exc:
print(f"admin_permits error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permits/visibility', methods=['POST'])
def admin_toggle_permit_visibility():
"""Toggle the visibility of a permit in V2 API retrieval."""
admin_user, error = _admin_guard(prefer_json=True, roles=("admin", "department_admin"))
if error:
return error
data = request.get_json() or {}
region_id = data.get("region_id")
permit_id = data.get("permit_id")
is_visible = data.get("is_v2_visible")
if not region_id or not permit_id or is_visible is None:
return jsonify({"success": False, "message": "region_id, permit_id and is_v2_visible are required"}), 400
try:
from lawrisk.services.licensing_repo import update_permit_v2_visibility
operator = (admin_user or {}).get("username") or "admin"
success = update_permit_v2_visibility(
region_id=region_id,
permit_id=permit_id,
is_visible=bool(is_visible),
operator=operator
)
if success:
return jsonify({"success": True, "message": "Visibility updated"})
else:
return jsonify({"success": False, "message": "Permit details not found or update failed"}), 404
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permit-import/upload', methods=['POST'])
def admin_permit_import_upload():
"""Upload Excel workbook and start an import session."""
if 'file' not in request.files:
return jsonify({"success": False, "message": "请选择要上传的Excel文件"}), 400
file_storage = request.files['file']
filename = file_storage.filename or 'import.xlsx'
file_bytes = file_storage.read()
user = get_current_user() or {}
user_role = user.get("role")
is_super_admin = (user_role == "admin")
uploaded_by = user.get("display_name") or user.get("username") or user.get("id")
content_type = file_storage.mimetype or "application/octet-stream"
binding_mode = (request.form.get("binding_mode") or "auto").strip().lower()
if binding_mode not in {"none", "department", "auto", "municipal", "district"}:
binding_mode = "auto"
bound_department_id = (request.form.get("bound_department_id") or "").strip() or None
uploader_department_id = (user.get("department") or {}).get("id")
if not is_super_admin:
# Non-super admins are forced to use their own department's region
if not uploader_department_id:
return jsonify({"success": False, "message": "该账号未绑定服务部门,无法执行导入"}), 403
bound_department_id = uploader_department_id
binding_mode = "department"
else:
# Super admins default to their own department if none specified, but can use 'auto'
if not bound_department_id:
bound_department_id = uploader_department_id
# If a super admin manually chose a department OR has one and left it as auto,
# but they didn't explicitly ask for 'auto', we might want to default to department.
# However, the user said "super admins can manually specify", so 'auto' vs 'department'
# should probably be respected if they chose it.
# Existing logic forced 'department' if ANY bound_dept was present.
# We relax this for super admins to allow 'auto'.
if bound_department_id and binding_mode == "auto":
# If the super admin didn't explicitly request 'auto' in the form,
# maybe we still want to default to department?
# Actually, usually 'auto' is the frontend default.
# To allow super admin to use 'auto', we only force 'department' if THEY didn't send 'auto'.
# But the form usually sends 'auto'.
pass
if not file_bytes:
return jsonify({"success": False, "message": "上传的文件为空"}), 400
try:
data = start_permit_import_session(
file_bytes,
filename,
content_type=content_type,
uploaded_by=str(uploaded_by) if uploaded_by else None,
uploader_department_id=uploader_department_id,
bound_department_id=bound_department_id,
binding_mode=binding_mode,
)
return jsonify({"success": True, "data": data})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
print(f"admin_permit_import_upload error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permit-import/template', methods=['GET'])
def admin_permit_import_template():
"""Provide the Excel import template for download."""
template_path = get_permit_template_path()
if not os.path.exists(template_path):
return jsonify({"success": False, "message": "模板文件不存在,请联系管理员"}), 404
try:
return send_file(
template_path,
as_attachment=True,
download_name='风险提示表 模板.xlsx',
)
except Exception as exc:
print(f"admin_permit_import_template error: {exc}")
return jsonify({"success": False, "message": "模板文件暂时无法下载"}), 500
def _build_import_preview_response(session_token: str):
"""Internal helper to build preview response JSON."""
try:
data = describe_permit_import_session(session_token)
return jsonify({"success": True, "data": data})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
print(f"admin_permit_import_preview error: {exc}")
return jsonify({"success": False, "message": "无法加载预览数据"}), 500
@v2_bp.route('/admin/permit-import/session/<session_id>/preview', methods=['GET'])
def admin_permit_import_preview(session_id: str):
"""Return parsed workbook preview along with theme options (path param)."""
if not session_id:
return jsonify({"success": False, "message": "session_id 不能为空"}), 400
return _build_import_preview_response(session_id)
@v2_bp.route('/admin/permit-import/preview', methods=['GET'])
def admin_permit_import_preview_query():
"""Return preview via query string for compatibility."""
session_id = request.args.get("session_id") or request.args.get("sessionId")
if not session_id:
return jsonify({"success": False, "message": "session_id 不能为空"}), 400
return _build_import_preview_response(session_id)
@v2_bp.route('/admin/permit-import/commit', methods=['POST'])
def admin_permit_import_commit():
"""Commit an import session with selected sheets."""
payload = request.get_json(silent=True) or {}
session_id = payload.get('session_id') or payload.get('sessionId')
sheet_names = payload.get('sheet_names') or payload.get('sheets') or payload.get('selectedSheets')
overrides = payload.get('overrides') or {}
edited_by = payload.get('edited_by') or payload.get('editedBy')
change_summary = payload.get('change_summary') or payload.get('changeSummary')
theme_bindings = payload.get('theme_bindings') or payload.get('themeBindings') or {}
if not edited_by:
user = get_current_user() or {}
edited_by = user.get("username") or user.get("display_name") or "admin"
if isinstance(sheet_names, str):
sheet_names = [sheet_names]
try:
data = commit_permit_import_session(
session_id,
sheet_names or [],
overrides=overrides,
edited_by=edited_by,
change_summary=change_summary,
theme_bindings=theme_bindings,
)
return jsonify({"success": True, "data": data})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
print(f"admin_permit_import_commit error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permit-files', methods=['GET'])
def admin_list_permit_files():
"""List stored permit files with pagination."""
try:
limit = int(request.args.get("limit", 20))
except (TypeError, ValueError):
limit = 20
limit = max(1, min(limit, 100))
try:
offset = int(request.args.get("offset", 0))
except (TypeError, ValueError):
offset = 0
offset = max(0, offset)
keyword = request.args.get("keyword") or request.args.get("q")
try:
data = list_stored_permit_files(limit=limit, offset=offset, keyword=keyword)
return jsonify({"success": True, "data": data})
except Exception as exc:
print(f"admin_list_permit_files error: {exc}")
return jsonify({"success": False, "message": "文件列表加载失败"}), 500
@v2_bp.route('/admin/operation-logs', methods=['GET'])
def admin_list_operation_logs():
"""List operation logs with pagination and optional filtering."""
try:
limit = int(request.args.get("limit", 50))
except (TypeError, ValueError):
limit = 50
limit = max(1, min(limit, 500))
try:
offset = int(request.args.get("offset", 0))
except (TypeError, ValueError):
offset = 0
offset = max(0, offset)
operator = request.args.get("operator")
operation_type = request.args.get("operation_type") or request.args.get("type")
try:
data = list_operation_logs(
operator=operator,
operation_type=operation_type,
limit=limit,
offset=offset
)
return jsonify({"success": True, "data": data})
except Exception as exc:
print(f"admin_list_operation_logs error: {exc}")
return jsonify({"success": False, "message": "操作日志加载失败"}), 500
@v2_bp.route('/admin/permit-files/<file_id>/reimport', methods=['POST'])
def admin_reimport_permit_file(file_id: str):
"""Re-create an import session from an archived Excel file."""
if not file_id:
return jsonify({"success": False, "message": "file_id 不能为空"}), 400
user = get_current_user() or {}
user_role = user.get("role")
is_super_admin = (user_role == "admin")
requested_by = user.get("display_name") or user.get("username") or user.get("id")
uploader_department_id = (user.get("department") or {}).get("id")
binding_mode = "auto"
bound_department_id = None
if not is_super_admin:
if not uploader_department_id:
return jsonify({"success": False, "message": "该账号未绑定服务部门,无法执行重新导入"}), 403
bound_department_id = uploader_department_id
binding_mode = "department"
try:
data = start_import_session_from_file(
file_id,
requested_by=requested_by,
uploader_department_id=uploader_department_id,
bound_department_id=bound_department_id,
binding_mode=binding_mode
)
return jsonify({"success": True, "data": data})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 404
except Exception as exc:
print(f"admin_reimport_permit_file error: {exc}")
return jsonify({"success": False, "message": "暂无法重新载入该文件"}), 500
@v2_bp.route('/admin/permit-files/<file_id>', methods=['DELETE'])
def admin_delete_permit_file(file_id: str):
"""Delete an archived permit file."""
if not file_id:
return jsonify({"success": False, "message": "file_id 不能为空"}), 400
try:
deleted = delete_stored_permit_file(file_id)
if deleted:
return jsonify({"success": True, "message": "文件已删除"})
return jsonify({"success": False, "message": "文件不存在"}), 404
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
print(f"admin_delete_permit_file error: {exc}")
return jsonify({"success": False, "message": "文件暂无法删除"}), 500
@v2_bp.route('/admin/permit-file/download', methods=['GET'])
@login_required
def admin_permit_file_download():
"""Download the original Excel file associated with a permit."""
region_value = request.args.get("region") or request.args.get("region_id")
permit_value = request.args.get("permit") or request.args.get("permit_id")
if not region_value or not permit_value:
return jsonify({"success": False, "message": "region 和 permit 参数必填"}), 400
region_token = region_value.strip() if isinstance(region_value, str) else str(region_value)
permit_token = permit_value.strip() if isinstance(permit_value, str) else str(permit_value)
try:
file_payload = fetch_permit_file(region_token, permit_token)
if not file_payload:
return jsonify({"success": False, "message": "当前许可没有关联的原始文件"}), 404
buffer = BytesIO(file_payload["file_data"])
buffer.seek(0)
download_name = file_payload.get("filename") or "许可导入.xlsx"
mimetype = file_payload.get("content_type") or "application/octet-stream"
return send_file(
buffer,
as_attachment=True,
download_name=download_name,
mimetype=mimetype,
)
except Exception as exc:
print(f"admin_permit_file_download error: {exc}")
return jsonify({"success": False, "message": "暂无法下载原始文件"}), 500
@v2_bp.route('/admin/permit-details', methods=['GET'])
def admin_permit_details():
"""Get detailed information for a specific permit."""
region_value = request.args.get("region") or request.args.get("region_id")
theme_value = request.args.get("theme") or request.args.get("theme_id")
permit_value = request.args.get("permit") or request.args.get("permit_id")
if not region_value or not permit_value:
return jsonify({"success": False, "message": "region and permit are required", "data": {}}), 400
region_token = region_value.strip() if isinstance(region_value, str) else str(region_value)
permit_token = permit_value.strip() if isinstance(permit_value, str) else str(permit_value)
theme_token: Optional[str]
theme_display = None
if theme_value is None or (isinstance(theme_value, str) and not theme_value.strip()):
resolved = resolve_region_permit_theme(region_token, permit_token)
theme_token = resolved["id"] if resolved and resolved.get("id") else None
theme_display = resolved
else:
theme_token = theme_value.strip() if isinstance(theme_value, str) else str(theme_value)
try:
# 始终加载全部主题,详情页需要展示主题列表并高亮当前主题
permits = load_permits_and_risks(region_token, None, permit_token)
if not permits:
return jsonify({"success": False, "message": "Permit not found", "data": {}}), 404
permit_payload = permits[0]
payload = {
"region": region_token,
"theme": theme_token,
"permit": permit_payload,
}
selected_theme_meta = None
theme_list = permit_payload.get("themes") or []
if theme_token:
for candidate in theme_list:
if candidate.get("id") == theme_token:
selected_theme_meta = candidate
break
if not selected_theme_meta and theme_display:
selected_theme_meta = theme_display
if not selected_theme_meta:
selected_theme_meta = permit_payload.get("theme")
if not selected_theme_meta and theme_list:
selected_theme_meta = theme_list[0]
if selected_theme_meta:
permit_payload["theme"] = selected_theme_meta
payload["theme_display"] = selected_theme_meta
payload["selected_theme_id"] = selected_theme_meta.get("id") or ""
elif theme_display:
payload["theme_display"] = theme_display
return jsonify({"success": True, "data": payload})
except Exception as exc:
print(f"admin_permit_details error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permits', methods=['DELETE'])
def admin_delete_permit():
"""Delete a permit for a specific region-theme combination after snapshotting risks."""
payload: Dict[str, Any] = {}
if request.is_json:
payload = request.get_json(silent=True) or {}
elif request.form:
payload = request.form.to_dict(flat=True)
region_value = (
payload.get("region_id")
or payload.get("region")
or request.args.get("region_id")
or request.args.get("region")
)
theme_value = (
payload.get("theme_id")
or payload.get("theme")
or request.args.get("theme_id")
or request.args.get("theme")
)
permit_value = (
payload.get("permit_id")
or payload.get("permit")
or request.args.get("permit_id")
or request.args.get("permit")
)
edited_by = (payload.get("edited_by") or request.args.get("edited_by") or "").strip() or None
change_summary = payload.get("change_summary") or request.args.get("change_summary")
if change_summary is not None:
change_summary = str(change_summary).strip()
if not change_summary:
change_summary = None
user = get_current_user() or {}
# Permission Check: Enforce for EVERYONE
role = user.get("role")
username = user.get("username")
if role != 'admin' and username != 'fssjsj':
return jsonify({"success": False, "message": "Permission denied: Only admin or fssjsj can delete permits"}), 403
if not edited_by:
edited_by = username or user.get("display_name") or "admin"
if not region_value or not permit_value:
return jsonify({"success": False, "message": "region_id 和 permit_id 均为必填"}), 400
region_id = region_value.strip() if isinstance(region_value, str) else str(region_value)
permit_id = permit_value.strip() if isinstance(permit_value, str) else str(permit_value)
if theme_value:
theme_id = theme_value.strip() if isinstance(theme_value, str) else str(theme_value)
else:
resolved_theme = resolve_region_permit_theme(region_id, permit_id)
if not resolved_theme or not resolved_theme.get("id"):
return jsonify({"success": False, "message": "未找到许可所属主题,无法删除"}), 400
theme_id = resolved_theme["id"]
try:
result = delete_region_permit(
region_id,
theme_id,
permit_id,
edited_by=edited_by,
change_summary=change_summary,
)
return jsonify({"success": True, "data": result})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 404
except Exception as exc:
print(f"admin_delete_permit error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/checkpoints', methods=['GET'])
def admin_list_checkpoints():
"""List all available checkpoints."""
try:
checkpoints = list_checkpoints()
return jsonify({"success": True, "data": {"checkpoints": checkpoints}})
except Exception as exc:
print(f"admin_list_checkpoints error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permit-risk-snapshots', methods=['GET'])
def admin_permit_risk_snapshots():
"""List permit risk checkpoint history entries for the management UI."""
try:
args = request.args
region_id = args.get("region_id") or args.get("region")
permit_id = args.get("permit_id") or args.get("permit")
edited_by = args.get("edited_by")
try:
limit = int(args.get("limit", 20))
except (TypeError, ValueError):
limit = 20
limit = max(1, min(limit, 200))
try:
offset = int(args.get("offset", 0))
except (TypeError, ValueError):
offset = 0
offset = max(0, offset)
snapshots = list_permit_risk_snapshot_summaries(
region_id=region_id,
permit_id=permit_id,
edited_by=edited_by,
limit=limit,
offset=offset,
)
total = count_permit_risk_snapshots(
region_id=region_id,
permit_id=permit_id,
edited_by=edited_by,
)
return jsonify(
{
"success": True,
"data": {
"snapshots": snapshots,
"pagination": {
"limit": limit,
"offset": offset,
"total": total,
},
},
}
)
except Exception as exc:
print(f"admin_permit_risk_snapshots error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permit-risk-snapshots/<batch_id>/restore', methods=['POST'])
def admin_restore_permit_risk_snapshot(batch_id):
"""Restore permit risk relations from a snapshot batch."""
payload: Dict[str, Any] = {}
if request.is_json:
payload = request.get_json(silent=True) or {}
elif request.form:
payload = request.form.to_dict(flat=True)
edited_by = (payload.get("edited_by") or "").strip() or None
change_summary = payload.get("change_summary")
if change_summary is not None:
change_summary = str(change_summary).strip()
if not change_summary:
change_summary = None
try:
result = restore_permit_risk_snapshot_batch(
batch_id,
edited_by=edited_by,
change_summary=change_summary,
)
return jsonify({"success": True, "data": result})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 404
except Exception as exc:
print(f"admin_restore_permit_risk_snapshot error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/checkpoints', methods=['POST'])
def admin_create_checkpoint():
"""Create a new checkpoint."""
if request.is_json:
payload = request.get_json(silent=True) or {}
else:
payload = request.form.to_dict(flat=True) if request.form else {}
description = payload.get("description", "")
try:
admin_user = get_current_user() or {}
operator = admin_user.get("username") if admin_user else "admin"
checkpoint = create_checkpoint(description, operator=operator)
return jsonify({"success": True, "data": checkpoint})
except Exception as exc:
print(f"admin_create_checkpoint error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/checkpoints/<checkpoint_id>/restore', methods=['POST'])
def admin_restore_checkpoint(checkpoint_id):
"""
⚠️ DANGEROUS OPERATION ⚠️
恢复数据库从checkpoint。
此操作将会:
1. 永久删除当前数据库中的所有数据
2. 从指定的checkpoint恢复数据
3. 如果失败,可能导致数据丢失
建议在生产环境中:
- 确保已创建最新备份
- 在低峰期操作
- 启用自动备份功能 (create_auto_backup=true)
POST参数:
- create_auto_backup: 是否在恢复前自动备份当前状态 (默认: true)
- batch_size: 批量插入的批次大小,每批插入行数 (默认: 1000范围: 100-10000)
"""
import logging
logger = logging.getLogger(__name__)
logger.info("=" * 80)
logger.info(f"[API] Received checkpoint restore request: {checkpoint_id}")
logger.info("=" * 80)
try:
# 获取请求参数
if request.is_json:
payload = request.get_json(silent=True) or {}
else:
payload = request.form.to_dict(flat=True) if request.form else {}
logger.info(f"[API] Request parameters: {payload}")
# 默认启用自动备份
create_auto_backup = str(payload.get("create_auto_backup", "true")).lower() in {"1", "true", "yes", "on"}
logger.info(f"[API] Auto-backup enabled: {create_auto_backup}")
# 解析批量大小参数
try:
batch_size = int(payload.get("batch_size", "1000"))
if batch_size < 100 or batch_size > 10000:
raise ValueError(f"batch_size must be between 100 and 10000, got {batch_size}")
except (ValueError, TypeError) as e:
logger.warning(f"[API] Invalid batch_size: {e}, using default 1000")
batch_size = 1000
logger.info(f"[API] Batch size: {batch_size} rows/batch")
# 执行恢复
logger.info(f"[API] Starting restore operation...")
admin_user = get_current_user() or {}
operator = admin_user.get("username") if admin_user else "admin"
restore_result = restore_checkpoint(checkpoint_id, create_auto_backup=create_auto_backup, batch_size=batch_size, operator=operator)
# 检查恢复状态
if restore_result.get("status") == "success":
logger.info(f"[API] Restore completed successfully")
logger.info(f"[API] Tables restored: {restore_result['summary']['tables_restored']}")
logger.info(f"[API] Total rows restored: {restore_result['summary']['total_rows_restored']}")
return jsonify({
"success": True,
"message": restore_result["message"],
"data": {
"checkpoint_id": restore_result["summary"]["checkpoint_id"],
"tables_restored": restore_result["summary"]["tables_restored"],
"total_rows": restore_result["summary"]["total_rows_restored"],
"auto_backup": restore_result["summary"].get("auto_backup"),
"table_details": restore_result["summary"]["table_details"]
}
})
else:
# 恢复失败
logger.error(f"[API] Restore failed: {restore_result.get('message')}")
errors = restore_result["summary"].get("errors", [])
if errors:
logger.error(f"[API] Errors encountered: {errors}")
response_data = {
"success": False,
"message": restore_result["message"],
"data": {
"errors": errors,
"auto_backup_available": restore_result.get("auto_backup_available", False)
}
}
# 如果有自动备份,提供恢复建议
if restore_result.get("recovery_suggestion"):
logger.warning(f"[API] Recovery suggestion: {restore_result['recovery_suggestion']}")
response_data["data"]["recovery_suggestion"] = restore_result["recovery_suggestion"]
return jsonify(response_data), 500
except Exception as exc:
logger.error(f"[API] Restore operation failed with exception: {str(exc)}", exc_info=True)
return jsonify({
"success": False,
"message": f"Restore failed: {str(exc)}",
"data": {
"error_type": type(exc).__name__,
"auto_backup_available": None # 未知,因为异常发生在备份之前
}
}), 500
@v2_bp.route('/admin/checkpoints/<checkpoint_id>', methods=['DELETE'])
def admin_delete_checkpoint(checkpoint_id):
"""Delete a checkpoint."""
try:
success = delete_checkpoint(checkpoint_id)
if success:
return jsonify({"success": True, "message": "Checkpoint deleted"})
else:
return jsonify({"success": False, "message": "Checkpoint not found"}), 404
except Exception as exc:
print(f"admin_delete_checkpoint error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permits/advanced-filter', methods=['GET', 'POST'])
def admin_permits_advanced_filter():
"""Advanced filtering for permits with multiple dimensions.
Supports filtering by:
- regions: List of administrative regions (supports multi-select)
- themes: List of legal themes (supports multi-select)
- departments: List of departments (supports multi-select)
- search_text: Permit name search
"""
try:
# Parse parameters from query string or request body
# Parse parameters from query/body in a more unified way
if request.method == 'GET':
regions = request.args.getlist('regions[]') or request.args.getlist('region')
themes = request.args.getlist('themes[]') or request.args.getlist('theme')
departments = request.args.getlist('departments[]') or request.args.getlist('department')
search_text = request.args.get('search_text') or request.args.get('q')
visibility = request.args.get('visibility')
limit = request.args.get('limit', '100')
offset = request.args.get('offset', '0')
else:
payload = request.get_json(silent=True) or request.form
regions = payload.getlist('regions[]') if hasattr(payload, 'getlist') else payload.get('regions', [])
themes = payload.getlist('themes[]') if hasattr(payload, 'getlist') else payload.get('themes', [])
departments = payload.getlist('departments[]') if hasattr(payload, 'getlist') else payload.get('departments', [])
search_text = payload.get('search_text') or payload.get('q')
visibility = payload.get('visibility')
limit = payload.get('limit', '100')
offset = payload.get('offset', '0')
# Normalize parameters
if isinstance(regions, str): regions = [regions]
if isinstance(themes, str): themes = [themes]
if isinstance(departments, str): departments = [departments]
regions = [r.strip() for r in regions if r and r.strip()] if regions else None
themes = [t.strip() for t in themes if t and t.strip()] if themes else None
departments = [d.strip() for d in departments if d and d.strip()] if departments else None
search_text = (search_text or "").strip() or None
visibility = (visibility or "").strip().lower() or None
try:
limit = int(limit)
except (ValueError, TypeError):
limit = 100
try:
offset = int(offset)
except (ValueError, TypeError):
offset = 0
print(f"[DEBUG] admin_permits_advanced_filter params: search={search_text}, visibility={visibility}, regions={regions}")
# Execute filtering
result = filter_permits_advanced(
regions=regions,
themes=themes,
departments=departments,
search_text=search_text,
visibility=visibility,
limit=limit,
offset=offset,
)
return jsonify({"success": True, "data": result})
except Exception as exc:
print(f"admin_permits_advanced_filter error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permits/filter-options', methods=['GET'])
def admin_permits_filter_options():
"""Get available filter options for permit filtering.
Args:
region_id: Optional. If provided, only return departments associated with this region
"""
try:
# Get region_id from query parameters
region_id = request.args.get('region_id')
# Get all regions
regions = list_regions()
# Get all themes
themes = list_all_themes()
# Get service departments (filtered by region if region_id is provided)
departments = list_service_departments(region_id=region_id) if region_id else list_service_departments()
return jsonify({
"success": True,
"data": {
"regions": regions,
"themes": themes,
"departments": departments,
}
})
except Exception as exc:
print(f"admin_permits_filter_options error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/approval-departments/setup', methods=['POST'])
def admin_setup_approval_departments():
"""Setup approval department mappings (admin only)."""
from lawrisk.services.licensing_repo import _lic_pg_conn, _ensure_permit_approval_departments_schema
user, error = _admin_guard(prefer_json=True)
if error:
return error
try:
# Common permit -> department mappings
mappings = [
('营业执照', '市场监管部门'),
('食品经营许可证', '市场监管部门'),
('药品经营许可证', '市场监管部门'),
('医疗器械经营许可证', '市场监管部门'),
('特种设备使用登记', '市场监管部门'),
]
# Ensure schema exists
_ensure_permit_approval_departments_schema()
with _lic_pg_conn() as conn:
cur = conn.cursor()
# Check current count
cur.execute("SELECT COUNT(*) FROM permit_approval_departments")
before_count = cur.fetchone()[0]
inserted = 0
updated = 0
for permit_name, dept_name in mappings:
# Check if exists
cur.execute("""
SELECT id FROM permit_approval_departments
WHERE permit_name = %s
""", (permit_name,))
existing = cur.fetchone()
if existing:
# Update
cur.execute("""
UPDATE permit_approval_departments
SET department_name = %s, updated_at = now()
WHERE permit_name = %s
""", (dept_name, permit_name))
updated += 1
else:
# Insert
record_id = str(uuid.uuid4())
cur.execute("""
INSERT INTO permit_approval_departments (id, permit_name, department_name)
VALUES (%s, %s, %s)
""", (record_id, permit_name, dept_name))
inserted += 1
conn.commit()
# Get final count
cur.execute("SELECT COUNT(*) FROM permit_approval_departments")
after_count = cur.fetchone()[0]
# Get all mappings
cur.execute("SELECT permit_name, department_name FROM permit_approval_departments ORDER BY permit_name")
all_mappings = [{"permit_name": row[0], "department_name": row[1]} for row in cur.fetchall()]
return jsonify({
"success": True,
"data": {
"inserted": inserted,
"updated": updated,
"before_count": before_count,
"after_count": after_count,
"mappings": all_mappings
}
})
except Exception as exc:
print(f"admin_setup_approval_departments error: {exc}")
import traceback
traceback.print_exc()
return jsonify({"success": False, "message": str(exc)}), 500