"""V2 API routes - Enhanced implementation with structured results.""" from __future__ import annotations import os import uuid import time from io import BytesIO from flask import Blueprint, jsonify, request, send_file from concurrent.futures import ThreadPoolExecutor from typing import Any, Dict, Iterable, Optional from lawrisk.api.auth import login_required, get_current_user, ensure_admin_access from lawrisk.services.lawrisk_v2_service import search_v2, list_regions from lawrisk.services.licensing_repo import ( list_permits_for_region, load_permits_and_risks, list_region_theme_options, list_region_permit_catalog, load_theme_payload, create_checkpoint, list_checkpoints, restore_checkpoint, delete_checkpoint, list_permit_risk_snapshot_summaries, count_permit_risk_snapshots, delete_region_permit, restore_permit_risk_snapshot_batch, start_permit_import_session, commit_permit_import_session, fetch_permit_file, describe_permit_import_session, resolve_region_permit_theme, list_stored_permit_files, start_import_session_from_file, delete_stored_permit_file, list_service_departments, build_service_department_tree, create_service_department, update_service_department, delete_service_department, list_all_themes, create_theme, rename_theme, delete_theme, filter_permits_advanced, list_unbound_permits, list_operation_logs, _lic_pg_conn, ) from lawrisk.services.auth_service import ( list_users, create_user, update_user_account, delete_user_account, ) from lawrisk.services.template_service import ( get_permit_template_path, get_permit_template_metadata, overwrite_permit_template, ) v2_bp = Blueprint('lawrisk_v2', __name__, url_prefix='/fs-ai-asistant/api/workflow/lawrisk') def _project_root() -> str: return os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) def _parse_truthy(value: Any, default: bool = False) -> bool: if value is None: return default if isinstance(value, bool): return value return str(value).strip().lower() in {"1", "true", "yes", "on"} def _admin_guard(*, prefer_json: bool = False, roles: Optional[Iterable[str]] = None): user, error = ensure_admin_access(allowed_roles=roles, prefer_json=prefer_json) if error: return None, error return user, None @v2_bp.route('/v2', methods=['POST', 'GET']) def lawrisk_search_v2(): """V2 search endpoint with structured results.""" query, debug_flag, top_k_int, _mode, region_filter = _extract_params() if not query or not isinstance(query, str): return jsonify({"error": "query is required"}), 400 try: t0 = time.time() # Execute search first result_v2 = search_v2(query, debug_flag, region_filter) # Generate recommendations based on search results rec_questions = [] risk_subject = result_v2.get("risk_subject", []) if isinstance(result_v2, dict) else [] found = bool(risk_subject) exec_time = int((time.time() - t0) * 1000) data = { "llmRespond": "" if found else "抱歉,无法检索到相关答案", "lawRisk": "", "questionExtend": rec_questions, "conversationId": "", "messageId": "", "roundNumber": 0, "conversationInfo": {}, "knowledgeSources": [], "totalKnowledgeSources": 0, "executionTime": exec_time, "workflowStatus": "ok" if found else "no_match", "executionSteps": [], "costStatistics": {}, "workflowTrackingId": "", "risk_subject": risk_subject, "debug": result_v2.get("debug", {}) if (debug_flag and isinstance(result_v2, dict)) else {}, } resp = {"success": True, "message": "OK", "data": data} return jsonify(resp) except Exception as e: print(f"lawrisk_search_v2 error: {e}") return jsonify({"success": False, "message": str(e), "data": {}}), 500 @v2_bp.route('/v2/regions', methods=['GET']) def lawrisk_regions(): """Get list of available regions.""" try: regions = list_regions() return jsonify({"success": True, "data": {"regions": regions}}) except Exception as exc: print(f"lawrisk_regions error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/v2/unbound-permits', methods=['GET']) def lawrisk_unbound_permits(): """Get list of permits that are not bound to any theme.""" try: visibility = request.args.get("visibility") or "visible" permits = list_unbound_permits(visibility=visibility) return jsonify({"success": True, "data": permits}) except Exception as exc: print(f"lawrisk_unbound_permits error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/v2/themes', methods=['GET']) def lawrisk_all_themes(): """Get list of all themes.""" try: themes = list_all_themes() return jsonify({"success": True, "data": themes}) except Exception as exc: print(f"lawrisk_all_themes error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/getPermits', methods=['GET', 'POST']) def lawrisk_get_permits(): """Get permits for a specific region, filtered by user permissions.""" import logging logger = logging.getLogger(__name__) logger.info("=" * 80) logger.info("API CALLED: lawrisk_get_permits") logger.info("=" * 80) if request.method == "GET": region_value = request.args.get("region") or request.args.get("region_id") else: if request.is_json: payload = request.get_json(silent=True) or {} else: payload = request.form.to_dict(flat=True) if request.form else {} region_value = payload.get("region") or payload.get("region_id") if not region_value or (isinstance(region_value, str) and not region_value.strip()): return jsonify({"success": False, "message": "region is required", "data": {}}), 400 region_token = region_value.strip() if isinstance(region_value, str) else str(region_value) # Get current user for permission filtering current_user = get_current_user() print(f"DEBUG lawrisk_get_permits: current_user = {current_user}") try: permits = list_permits_for_region(region_token, current_user=current_user) return jsonify({"success": True, "data": {"region": region_token, "permits": permits}}) except Exception as exc: print(f"lawrisk_get_permits error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 def _extract_params(): """Extract parameters from request.""" if request.method == "GET": query = request.args.get("query") or request.args.get("q") or request.args.get("text") debug_flag = request.args.get("debug") in {"1", "true", "yes", "on"} top_k = request.args.get("top") try: top_k_int = int(top_k) if top_k else 5 except Exception: top_k_int = 5 mode_value = (request.args.get("mode") or "llm").lower() region_value = request.args.get("region") or request.args.get("region_id") region_list = request.args.getlist("region") if region_list and (not region_value or len(region_list) > 1): region_value = region_list else: if request.is_json: payload = request.get_json(silent=True) or {} else: payload = request.form.to_dict(flat=True) if request.form else {} query = payload.get("query") or payload.get("q") or payload.get("text") debug_flag = str(payload.get("debug", "")).strip().lower() in {"1", "true", "yes", "on"} try: top_k_int = int(payload.get("top", 5)) except Exception: top_k_int = 5 mode_value = str(payload.get("mode", "llm")).lower() region_value = payload.get("region") or payload.get("region_id") return query, debug_flag, top_k_int, mode_value, region_value @v2_bp.route('/admin/test', methods=['GET']) def admin_test(): """Simple test route.""" return jsonify({"success": True, "message": "Test route works!"}) @v2_bp.route('/test-simple', methods=['GET']) def test_simple(): """Very simple test.""" return jsonify({"status": "ok"}) @v2_bp.route('/db_admin', methods=['GET']) @login_required def db_admin_page(): """Serve the database administration UI.""" html_path = os.path.join(_project_root(), 'static', 'db_admin.html') if not os.path.exists(html_path): return jsonify({"success": False, "message": "DB admin page not found"}), 404 return send_file(html_path, mimetype='text/html') @v2_bp.route('/admin/super', methods=['GET']) def super_admin_page(): """Serve the super administrator console.""" _, error = _admin_guard(roles=("admin",)) if error: return error html_path = os.path.join(_project_root(), 'static', 'super_admin.html') if not os.path.exists(html_path): return jsonify({"success": False, "message": "Super admin page not found"}), 404 return send_file(html_path, mimetype='text/html') @v2_bp.route('/admin/v2-debug', methods=['GET']) def super_admin_v2_debug_page(): """Serve the V2 debugging console for super administrators.""" _, error = _admin_guard(roles=("admin",)) if error: return error html_path = os.path.join(_project_root(), 'static', 'v2_admin_debug.html') if not os.path.exists(html_path): return jsonify({"success": False, "message": "调试页面不存在"}), 404 return send_file(html_path, mimetype='text/html') @v2_bp.route('/admin/users', methods=['GET']) def admin_list_users(): """List users for the super admin console.""" _, error = _admin_guard(prefer_json=True) if error: return error include_inactive = _parse_truthy(request.args.get("include_inactive"), default=True) try: users = list_users(include_inactive=include_inactive) return jsonify({"success": True, "data": {"users": users}}) except Exception as exc: return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/users', methods=['POST']) def admin_create_user(): """Create a new application user. If未提供service_department_id,则自动创建同名部门(父级可选)并绑定。""" _, error = _admin_guard(prefer_json=True) if error: return error payload = request.get_json(silent=True) or {} username = (payload.get("username") or "").strip().lower() password = (payload.get("password") or "").strip() if not username or not password: return jsonify({"success": False, "message": "用户名和密码均不能为空"}), 400 parent_department_id = (payload.get("parent_department_id") or "").strip() or None service_department_id = (payload.get("service_department_id") or "").strip() or None region_id = (payload.get("region_id") or "").strip() or None department_phone = (payload.get("department_phone") or "").strip() or None # 如果未显式绑定部门,则为该用户创建一个同名单位,并按父级决定层级 created_department: Optional[Dict[str, Any]] = None if not service_department_id: # 未显式指定区域时,继承父级部门的区域 if not region_id and parent_department_id: try: parent_dept = next( (dept for dept in list_service_departments() if str(dept.get("id")) == parent_department_id), None ) if parent_dept: region_id = parent_dept.get("region_id") except Exception: region_id = None try: dept_name = (payload.get("display_name") or username).strip() or username dept_code = username.upper() # Check if department code already exists with _lic_pg_conn() as conn: cur = conn.cursor() cur.execute("SELECT id, name, code, region_id FROM service_departments WHERE code = %s", (dept_code,)) row = cur.fetchone() if row: service_department_id = str(row[0]) created_department = { "id": str(row[0]), "name": str(row[1]), "code": str(row[2]), "region_id": str(row[3]) if row[3] else None } if not service_department_id: created_department = create_service_department( name=dept_name, code=dept_code, phone=department_phone, parent_id=parent_department_id, region_id=region_id, ) service_department_id = created_department.get("id") except Exception as exc: err_msg = str(exc) if "duplicate key value" in err_msg and "service_departments_code_key" in err_msg: return jsonify({"success": False, "message": f"创建单位失败: 单位代码 {dept_code} 已存在"}), 400 if "violates unique constraint" in err_msg: return jsonify({"success": False, "message": "创建单位失败: 数据重复"}), 400 return jsonify({"success": False, "message": f"创建单位失败: {exc}"}), 400 try: admin_user, _ = _admin_guard() operator = admin_user.get("username") if admin_user else "admin" user = create_user( username=username, password=password, display_name=payload.get("display_name"), role=(payload.get("role") or "department_admin").strip(), grade=int(payload.get("grade") or 50), service_department_id=service_department_id, department_role=payload.get("department_role"), parent_department_id=parent_department_id, service_department_region_id=region_id, service_department_phone=department_phone, operator=operator, ) return jsonify({"success": True, "data": {"user": user, "department": created_department}}) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: err_msg = str(exc) if "duplicate key value" in err_msg and "auth_users_username_key" in err_msg: return jsonify({"success": False, "message": f"创建账号失败: 用户名 {username} 已存在"}), 400 return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/users/', methods=['PATCH']) def admin_update_user(user_id: str): """Update password or service department for an existing user.""" _, error = _admin_guard(prefer_json=True) if error: return error payload = request.get_json(silent=True) or {} update_kwargs: Dict[str, Any] = {} if "password" in payload: update_kwargs["password"] = payload.get("password") or "" if "service_department_id" in payload: dept_value = payload.get("service_department_id") if isinstance(dept_value, str) and not dept_value.strip(): dept_value = None update_kwargs["service_department_id"] = dept_value if "display_name" in payload: update_kwargs["display_name"] = payload.get("display_name") if "role" in payload: update_kwargs["role"] = payload.get("role") if "grade" in payload: update_kwargs["grade"] = payload.get("grade") if "department_role" in payload: update_kwargs["department_role"] = payload.get("department_role") if not update_kwargs: return jsonify({"success": False, "message": "请至少提供一个需要更新的字段"}), 400 try: admin_user, _ = _admin_guard() operator = admin_user.get("username") if admin_user else "admin" updated = update_user_account(user_id, operator=operator, **update_kwargs) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: return jsonify({"success": False, "message": str(exc)}), 500 if not updated: return jsonify({"success": False, "message": "用户不存在"}), 404 return jsonify({"success": True, "data": {"user": updated}}) @v2_bp.route('/admin/users/', methods=['DELETE']) def admin_delete_user(user_id: str): """Delete a user account.""" _, error = _admin_guard(prefer_json=True) if error: return error try: admin_user, _ = _admin_guard() operator = admin_user.get("username") if admin_user else "admin" deleted = delete_user_account(user_id, operator=operator) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: return jsonify({"success": False, "message": str(exc)}), 500 if not deleted: return jsonify({"success": False, "message": "用户不存在"}), 404 return jsonify({"success": True, "message": "用户已删除"}) @v2_bp.route('/admin/service-departments', methods=['GET']) def admin_list_service_departments(): """Return all service departments in a flat list.""" _, error = _admin_guard(prefer_json=True) if error: return error try: departments = list_service_departments() return jsonify({"success": True, "data": {"departments": departments}}) except Exception as exc: return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/service-departments/tree', methods=['GET']) def admin_service_departments_tree(): """Return service departments in a tree structure for organizational chart display.""" _, error = _admin_guard(prefer_json=True) if error: return error try: tree = build_service_department_tree() return jsonify({"success": True, "data": {"tree": tree}}) except Exception as exc: print(f"admin_service_departments_tree error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/departments/children', methods=['GET']) def admin_get_child_departments(): """Get child departments of a parent department for dropdown filters.""" _, error = _admin_guard(prefer_json=True) if error: return error parent_id = request.args.get('parent_id', '').strip() if not parent_id: return jsonify({"success": False, "message": "parent_id is required"}), 400 try: # Get child departments with _lic_pg_conn() as conn: cur = conn.cursor() cur.execute(""" SELECT id, name, code, unit_level, region_id FROM service_departments WHERE parent_id = %s ORDER BY name """, (parent_id,)) rows = cur.fetchall() children = [] for row in rows: children.append({ "id": str(row[0]), "name": str(row[1]), "code": str(row[2]), "unit_level": str(row[3]) if row[3] else "unit", "region_id": str(row[4]) if row[4] else None }) return jsonify({"success": True, "data": {"units": children}}) except Exception as exc: print(f"admin_get_child_departments error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permits/filter', methods=['POST']) def admin_filter_permits(): """Filter and query permits based on user permissions and filter criteria.""" admin_user, error = _admin_guard(prefer_json=True) if error: return error try: # Get current user info for permission checking current_user = { "username": admin_user.get("username"), "role": admin_user.get("role"), "grade": admin_user.get("grade"), "department": admin_user.get("department") } # Parse filter parameters from request filters = {} if request.is_json: filters = request.get_json(silent=True) or {} else: filters = { "municipal_dept_id": request.form.get("municipal_dept_id"), "district_dept_id": request.form.get("district_dept_id"), "region": request.form.get("region"), "search_text": request.form.get("search_text") } # Get visible permits based on user permissions and filters permits = get_visible_permits(current_user, filters) return jsonify({"success": True, "data": { "permits": permits, "pagination": { "total": len(permits), "page": 1, "page_size": len(permits) } }}) except Exception as exc: print(f"admin_filter_permits error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/service-departments', methods=['POST']) def admin_create_service_department(): """Create a service department node with auto-generated account.""" _, error = _admin_guard(prefer_json=True) if error: return error payload = request.get_json(silent=True) or {} name = payload.get("name") code = payload.get("code") if not name or not str(name).strip(): return jsonify({"success": False, "message": "服务部门名称不能为空"}), 400 code_token = (code or "").strip() if code_token: code_token = code_token.upper() else: code_token = uuid.uuid4().hex[:8].upper() grade = payload.get("grade") or 0 try: grade = int(grade) except (ValueError, TypeError): grade = 0 kwargs = { "code": code, "phone": (payload.get("phone") or "").strip() or None, "parent_id": (payload.get("parent_id") or "").strip() or None, "region_id": (payload.get("region_id") or "").strip() or None, "description": payload.get("description"), "grade": grade, } if not kwargs["region_id"] and kwargs["parent_id"]: try: parent_dept = next( (dept for dept in list_service_departments() if str(dept.get("id")) == kwargs["parent_id"]), None ) if parent_dept: kwargs["region_id"] = parent_dept.get("region_id") except Exception: kwargs["region_id"] = None try: admin_user, _ = _admin_guard() operator = admin_user.get("username") if admin_user else "admin" department = create_service_department(name, code=code_token, operator=operator, **kwargs) department_id = department.get("id") default_password = f"{code_token}123456" try: user = create_user( username=code, password=default_password, display_name=f"{name}管理员", role="department_admin", grade=grade, service_department_id=department_id, department_role="admin" ) return jsonify({ "success": True, "data": { "department": department, "user": user, "default_password": default_password } }) except ValueError as user_exc: return jsonify({ "success": False, "message": f"部门创建成功,但账号创建失败: {str(user_exc)}" }), 400 except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: err_msg = str(exc) if "duplicate key value" in err_msg: return jsonify({"success": False, "message": "服务部门代码或名称已存在"}), 400 return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/service-departments/', methods=['PATCH']) def admin_update_service_department(dept_id: str): """Update department metadata such as name, phone, grade, or hierarchy via drag-and-drop.""" _, error = _admin_guard(prefer_json=True) if error: return error payload = request.get_json(silent=True) or {} updates: Dict[str, Any] = {} for field in ("name", "phone", "parent_id", "region_id", "description"): if field in payload: value = payload.get(field) if isinstance(value, str) and not value.strip(): value = None if field == "parent_id" and value == dept_id: return jsonify({"success": False, "message": "部门不能设置为自己的下级"}), 400 updates[field] = value if "grade" in payload: try: grade = int(payload.get("grade")) updates["grade"] = grade except (ValueError, TypeError): return jsonify({"success": False, "message": "权限等级必须是数字"}), 400 if not updates: return jsonify({"success": False, "message": "没有需要更新的字段"}), 400 try: admin_user, _ = _admin_guard() operator = admin_user.get("username") if admin_user else "admin" department = update_service_department(dept_id, operator=operator, **updates) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: return jsonify({"success": False, "message": str(exc)}), 500 if not department: return jsonify({"success": False, "message": "服务部门不存在"}), 404 message = "部门信息已更新" if "parent_id" in updates: message = "层级关系已更新" elif "grade" in updates: message = f"权限等级已更新为 {updates['grade']}" return jsonify({ "success": True, "data": { "department": department, "message": message } }) @v2_bp.route('/admin/service-departments/', methods=['DELETE']) def admin_delete_service_department(dept_id: str): """Delete a service department node (only when no users are bound). If force=true query parameter is provided, will first unbind all users from the department. """ _, error = _admin_guard(prefer_json=True) if error: return error force = request.args.get("force", "").lower() in ("true", "1", "yes") try: if force: from lawrisk.services.auth_service import update_user_account users = list_users() for user in users: if user.get("service_department_id") == dept_id: update_user_account( user_id=user["id"], service_department_id=None ) admin_user, _ = _admin_guard() operator = admin_user.get("username") if admin_user else "admin" result = delete_service_department(dept_id, operator=operator) except ValueError as exc: error_msg = str(exc) if "仍有账号绑定" in error_msg and not force: return jsonify({ "success": False, "message": f"{error_msg}。如需强制删除,请在URL中添加?force=true参数先解除绑定。", "code": "HAS_BOUND_USERS" }), 400 return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: return jsonify({"success": False, "message": str(exc)}), 500 if not result.get("deleted"): return jsonify({"success": False, "message": result.get("message", "删除失败")}), 400 return jsonify({ "success": True, "data": result, "message": "部门删除成功" }) @v2_bp.route('/admin/themes/catalog', methods=['GET']) def admin_theme_catalog(): """List all available themes.""" _, error = _admin_guard(prefer_json=True) if error: return error try: themes = list_all_themes() return jsonify({"success": True, "data": {"themes": themes}}) except Exception as exc: return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/themes/catalog', methods=['POST']) def admin_create_theme(): """Create a new theme.""" _, error = _admin_guard(prefer_json=True) if error: return error payload = request.get_json(silent=True) or {} name = payload.get("name") if not name or not str(name).strip(): return jsonify({"success": False, "message": "主题名称不能为空"}), 400 try: admin_user, _ = _admin_guard() operator = admin_user.get("username") if admin_user else "admin" theme = create_theme(name, operator=operator) return jsonify({"success": True, "data": {"theme": theme}}) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/themes/catalog/', methods=['PATCH']) def admin_rename_theme(theme_id: str): """Rename an existing theme.""" _, error = _admin_guard(prefer_json=True) if error: return error payload = request.get_json(silent=True) or {} name = payload.get("name") or payload.get("new_name") if not name or not str(name).strip(): return jsonify({"success": False, "message": "主题名称不能为空"}), 400 try: admin_user, _ = _admin_guard() operator = admin_user.get("username") if admin_user else "admin" theme = rename_theme(theme_id, name, operator=operator) return jsonify({"success": True, "data": {"theme": theme}}) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/themes/catalog/', methods=['DELETE']) def admin_delete_theme(theme_id: str): """Delete a theme and related region associations.""" _, error = _admin_guard(prefer_json=True) if error: return error try: admin_user, _ = _admin_guard() operator = admin_user.get("username") if admin_user else "admin" result = delete_theme(theme_id, operator=operator) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: return jsonify({"success": False, "message": str(exc)}), 500 if not result.get("deleted"): return jsonify({"success": False, "message": result.get("message", "主题不存在")}), 404 return jsonify({"success": True, "data": result}) @v2_bp.route('/admin/templates/permit', methods=['GET']) def admin_permit_template_meta(): """Return metadata for the import template.""" _, error = _admin_guard(prefer_json=True) if error: return error try: data = get_permit_template_metadata() return jsonify({"success": True, "data": data}) except Exception as exc: return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/templates/permit', methods=['POST']) def admin_update_permit_template(): """Overwrite the existing import template.""" admin_user, error = _admin_guard(prefer_json=True) if error: return error if 'file' not in request.files: return jsonify({"success": False, "message": "请上传模板文件"}), 400 file_storage = request.files['file'] file_bytes = file_storage.read() if not file_bytes: return jsonify({"success": False, "message": "上传的模板为空"}), 400 try: meta = overwrite_permit_template( file_bytes, file_storage.filename or "import_template.xlsx", uploaded_by=(admin_user or {}).get("username"), ) return jsonify({"success": True, "data": meta}) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/regions', methods=['GET']) def admin_regions(): """Get all regions for database maintenance.""" try: regions = list_regions() return jsonify({"success": True, "data": {"regions": regions}}) except Exception as exc: print(f"admin_regions error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/themes', methods=['GET']) def admin_themes(): """Get themes for a specific region.""" region_value = request.args.get("region") or request.args.get("region_id") if not region_value or (isinstance(region_value, str) and not region_value.strip()): return jsonify({"success": False, "message": "region is required", "data": {}}), 400 region_token = region_value.strip() if isinstance(region_value, str) else str(region_value) try: catalog = list_region_theme_options() region_id_lower = region_token.lower() themes = [] seen_theme_ids = set() for item in catalog: if (item["region_id"] == region_token or item["region_id"].lower() == region_id_lower or item["region_name"].lower() == region_id_lower): theme_id = item["theme_id"] if theme_id not in seen_theme_ids: seen_theme_ids.add(theme_id) themes.append({ "id": theme_id, "name": item["theme_name"], "option_id": item["option_id"] }) return jsonify({"success": True, "data": {"region": region_token, "themes": themes}}) except Exception as exc: print(f"admin_themes error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permits', methods=['GET']) def admin_permits(): """Get permits for a region. Optional theme filter keeps backward compatibility.""" region_value = request.args.get("region") or request.args.get("region_id") theme_value = request.args.get("theme") or request.args.get("theme_id") if not region_value or (isinstance(region_value, str) and not region_value.strip()): return jsonify({"success": False, "message": "region is required", "data": {}}), 400 region_token = region_value.strip() if isinstance(region_value, str) else str(region_value) theme_token = ( theme_value.strip() if theme_value and isinstance(theme_value, str) else (str(theme_value) if theme_value is not None else None) ) try: if theme_token: permits = load_permits_and_risks(region_token, theme_token) data = { "region": region_token, "theme": theme_token, "permits": permits, } else: catalog = list_region_permit_catalog(region_token) data = { "region": region_token, "permits": catalog, } return jsonify({"success": True, "data": data}) except Exception as exc: print(f"admin_permits error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permits/visibility', methods=['POST']) def admin_toggle_permit_visibility(): """Toggle the visibility of a permit in V2 API retrieval.""" admin_user, error = _admin_guard(prefer_json=True, roles=("admin", "department_admin")) if error: return error data = request.get_json() or {} region_id = data.get("region_id") permit_id = data.get("permit_id") is_visible = data.get("is_v2_visible") if not region_id or not permit_id or is_visible is None: return jsonify({"success": False, "message": "region_id, permit_id and is_v2_visible are required"}), 400 try: from lawrisk.services.licensing_repo import update_permit_v2_visibility operator = (admin_user or {}).get("username") or "admin" success = update_permit_v2_visibility( region_id=region_id, permit_id=permit_id, is_visible=bool(is_visible), operator=operator ) if success: return jsonify({"success": True, "message": "Visibility updated"}) else: return jsonify({"success": False, "message": "Permit details not found or update failed"}), 404 except Exception as exc: return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permit-import/upload', methods=['POST']) def admin_permit_import_upload(): """Upload Excel workbook and start an import session.""" if 'file' not in request.files: return jsonify({"success": False, "message": "请选择要上传的Excel文件"}), 400 file_storage = request.files['file'] filename = file_storage.filename or 'import.xlsx' file_bytes = file_storage.read() user = get_current_user() or {} user_role = user.get("role") is_super_admin = (user_role == "admin") uploaded_by = user.get("display_name") or user.get("username") or user.get("id") content_type = file_storage.mimetype or "application/octet-stream" binding_mode = (request.form.get("binding_mode") or "auto").strip().lower() if binding_mode not in {"none", "department", "auto", "municipal", "district"}: binding_mode = "auto" bound_department_id = (request.form.get("bound_department_id") or "").strip() or None uploader_department_id = (user.get("department") or {}).get("id") if not is_super_admin: # Non-super admins are forced to use their own department's region if not uploader_department_id: return jsonify({"success": False, "message": "该账号未绑定服务部门,无法执行导入"}), 403 bound_department_id = uploader_department_id binding_mode = "department" else: # Super admins default to their own department if none specified, but can use 'auto' if not bound_department_id: bound_department_id = uploader_department_id # If a super admin manually chose a department OR has one and left it as auto, # but they didn't explicitly ask for 'auto', we might want to default to department. # However, the user said "super admins can manually specify", so 'auto' vs 'department' # should probably be respected if they chose it. # Existing logic forced 'department' if ANY bound_dept was present. # We relax this for super admins to allow 'auto'. if bound_department_id and binding_mode == "auto": # If the super admin didn't explicitly request 'auto' in the form, # maybe we still want to default to department? # Actually, usually 'auto' is the frontend default. # To allow super admin to use 'auto', we only force 'department' if THEY didn't send 'auto'. # But the form usually sends 'auto'. pass if not file_bytes: return jsonify({"success": False, "message": "上传的文件为空"}), 400 try: data = start_permit_import_session( file_bytes, filename, content_type=content_type, uploaded_by=str(uploaded_by) if uploaded_by else None, uploader_department_id=uploader_department_id, bound_department_id=bound_department_id, binding_mode=binding_mode, ) return jsonify({"success": True, "data": data}) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: print(f"admin_permit_import_upload error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permit-import/template', methods=['GET']) def admin_permit_import_template(): """Provide the Excel import template for download.""" template_path = get_permit_template_path() if not os.path.exists(template_path): return jsonify({"success": False, "message": "模板文件不存在,请联系管理员"}), 404 try: return send_file( template_path, as_attachment=True, download_name='风险提示表 模板.xlsx', ) except Exception as exc: print(f"admin_permit_import_template error: {exc}") return jsonify({"success": False, "message": "模板文件暂时无法下载"}), 500 def _build_import_preview_response(session_token: str): """Internal helper to build preview response JSON.""" try: data = describe_permit_import_session(session_token) return jsonify({"success": True, "data": data}) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: print(f"admin_permit_import_preview error: {exc}") return jsonify({"success": False, "message": "无法加载预览数据"}), 500 @v2_bp.route('/admin/permit-import/session//preview', methods=['GET']) def admin_permit_import_preview(session_id: str): """Return parsed workbook preview along with theme options (path param).""" if not session_id: return jsonify({"success": False, "message": "session_id 不能为空"}), 400 return _build_import_preview_response(session_id) @v2_bp.route('/admin/permit-import/preview', methods=['GET']) def admin_permit_import_preview_query(): """Return preview via query string for compatibility.""" session_id = request.args.get("session_id") or request.args.get("sessionId") if not session_id: return jsonify({"success": False, "message": "session_id 不能为空"}), 400 return _build_import_preview_response(session_id) @v2_bp.route('/admin/permit-import/commit', methods=['POST']) def admin_permit_import_commit(): """Commit an import session with selected sheets.""" payload = request.get_json(silent=True) or {} session_id = payload.get('session_id') or payload.get('sessionId') sheet_names = payload.get('sheet_names') or payload.get('sheets') or payload.get('selectedSheets') overrides = payload.get('overrides') or {} edited_by = payload.get('edited_by') or payload.get('editedBy') change_summary = payload.get('change_summary') or payload.get('changeSummary') theme_bindings = payload.get('theme_bindings') or payload.get('themeBindings') or {} if not edited_by: user = get_current_user() or {} edited_by = user.get("username") or user.get("display_name") or "admin" if isinstance(sheet_names, str): sheet_names = [sheet_names] try: data = commit_permit_import_session( session_id, sheet_names or [], overrides=overrides, edited_by=edited_by, change_summary=change_summary, theme_bindings=theme_bindings, ) return jsonify({"success": True, "data": data}) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: print(f"admin_permit_import_commit error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permit-files', methods=['GET']) def admin_list_permit_files(): """List stored permit files with pagination.""" try: limit = int(request.args.get("limit", 20)) except (TypeError, ValueError): limit = 20 limit = max(1, min(limit, 100)) try: offset = int(request.args.get("offset", 0)) except (TypeError, ValueError): offset = 0 offset = max(0, offset) keyword = request.args.get("keyword") or request.args.get("q") try: data = list_stored_permit_files(limit=limit, offset=offset, keyword=keyword) return jsonify({"success": True, "data": data}) except Exception as exc: print(f"admin_list_permit_files error: {exc}") return jsonify({"success": False, "message": "文件列表加载失败"}), 500 @v2_bp.route('/admin/operation-logs', methods=['GET']) def admin_list_operation_logs(): """List operation logs with pagination and optional filtering.""" try: limit = int(request.args.get("limit", 50)) except (TypeError, ValueError): limit = 50 limit = max(1, min(limit, 500)) try: offset = int(request.args.get("offset", 0)) except (TypeError, ValueError): offset = 0 offset = max(0, offset) operator = request.args.get("operator") operation_type = request.args.get("operation_type") or request.args.get("type") try: data = list_operation_logs( operator=operator, operation_type=operation_type, limit=limit, offset=offset ) return jsonify({"success": True, "data": data}) except Exception as exc: print(f"admin_list_operation_logs error: {exc}") return jsonify({"success": False, "message": "操作日志加载失败"}), 500 @v2_bp.route('/admin/permit-files//reimport', methods=['POST']) def admin_reimport_permit_file(file_id: str): """Re-create an import session from an archived Excel file.""" if not file_id: return jsonify({"success": False, "message": "file_id 不能为空"}), 400 user = get_current_user() or {} user_role = user.get("role") is_super_admin = (user_role == "admin") requested_by = user.get("display_name") or user.get("username") or user.get("id") uploader_department_id = (user.get("department") or {}).get("id") binding_mode = "auto" bound_department_id = None if not is_super_admin: if not uploader_department_id: return jsonify({"success": False, "message": "该账号未绑定服务部门,无法执行重新导入"}), 403 bound_department_id = uploader_department_id binding_mode = "department" try: data = start_import_session_from_file( file_id, requested_by=requested_by, uploader_department_id=uploader_department_id, bound_department_id=bound_department_id, binding_mode=binding_mode ) return jsonify({"success": True, "data": data}) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 404 except Exception as exc: print(f"admin_reimport_permit_file error: {exc}") return jsonify({"success": False, "message": "暂无法重新载入该文件"}), 500 @v2_bp.route('/admin/permit-files/', methods=['DELETE']) def admin_delete_permit_file(file_id: str): """Delete an archived permit file.""" if not file_id: return jsonify({"success": False, "message": "file_id 不能为空"}), 400 try: deleted = delete_stored_permit_file(file_id) if deleted: return jsonify({"success": True, "message": "文件已删除"}) return jsonify({"success": False, "message": "文件不存在"}), 404 except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: print(f"admin_delete_permit_file error: {exc}") return jsonify({"success": False, "message": "文件暂无法删除"}), 500 @v2_bp.route('/admin/permit-file/download', methods=['GET']) @login_required def admin_permit_file_download(): """Download the original Excel file associated with a permit.""" region_value = request.args.get("region") or request.args.get("region_id") permit_value = request.args.get("permit") or request.args.get("permit_id") if not region_value or not permit_value: return jsonify({"success": False, "message": "region 和 permit 参数必填"}), 400 region_token = region_value.strip() if isinstance(region_value, str) else str(region_value) permit_token = permit_value.strip() if isinstance(permit_value, str) else str(permit_value) try: file_payload = fetch_permit_file(region_token, permit_token) if not file_payload: return jsonify({"success": False, "message": "当前许可没有关联的原始文件"}), 404 buffer = BytesIO(file_payload["file_data"]) buffer.seek(0) download_name = file_payload.get("filename") or "许可导入.xlsx" mimetype = file_payload.get("content_type") or "application/octet-stream" return send_file( buffer, as_attachment=True, download_name=download_name, mimetype=mimetype, ) except Exception as exc: print(f"admin_permit_file_download error: {exc}") return jsonify({"success": False, "message": "暂无法下载原始文件"}), 500 @v2_bp.route('/admin/permit-details', methods=['GET']) def admin_permit_details(): """Get detailed information for a specific permit.""" region_value = request.args.get("region") or request.args.get("region_id") theme_value = request.args.get("theme") or request.args.get("theme_id") permit_value = request.args.get("permit") or request.args.get("permit_id") if not region_value or not permit_value: return jsonify({"success": False, "message": "region and permit are required", "data": {}}), 400 region_token = region_value.strip() if isinstance(region_value, str) else str(region_value) permit_token = permit_value.strip() if isinstance(permit_value, str) else str(permit_value) theme_token: Optional[str] theme_display = None if theme_value is None or (isinstance(theme_value, str) and not theme_value.strip()): resolved = resolve_region_permit_theme(region_token, permit_token) theme_token = resolved["id"] if resolved and resolved.get("id") else None theme_display = resolved else: theme_token = theme_value.strip() if isinstance(theme_value, str) else str(theme_value) try: # 始终加载全部主题,详情页需要展示主题列表并高亮当前主题 permits = load_permits_and_risks(region_token, None, permit_token) if not permits: return jsonify({"success": False, "message": "Permit not found", "data": {}}), 404 permit_payload = permits[0] payload = { "region": region_token, "theme": theme_token, "permit": permit_payload, } selected_theme_meta = None theme_list = permit_payload.get("themes") or [] if theme_token: for candidate in theme_list: if candidate.get("id") == theme_token: selected_theme_meta = candidate break if not selected_theme_meta and theme_display: selected_theme_meta = theme_display if not selected_theme_meta: selected_theme_meta = permit_payload.get("theme") if not selected_theme_meta and theme_list: selected_theme_meta = theme_list[0] if selected_theme_meta: permit_payload["theme"] = selected_theme_meta payload["theme_display"] = selected_theme_meta payload["selected_theme_id"] = selected_theme_meta.get("id") or "" elif theme_display: payload["theme_display"] = theme_display return jsonify({"success": True, "data": payload}) except Exception as exc: print(f"admin_permit_details error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permits', methods=['DELETE']) def admin_delete_permit(): """Delete a permit for a specific region-theme combination after snapshotting risks.""" payload: Dict[str, Any] = {} if request.is_json: payload = request.get_json(silent=True) or {} elif request.form: payload = request.form.to_dict(flat=True) region_value = ( payload.get("region_id") or payload.get("region") or request.args.get("region_id") or request.args.get("region") ) theme_value = ( payload.get("theme_id") or payload.get("theme") or request.args.get("theme_id") or request.args.get("theme") ) permit_value = ( payload.get("permit_id") or payload.get("permit") or request.args.get("permit_id") or request.args.get("permit") ) edited_by = (payload.get("edited_by") or request.args.get("edited_by") or "").strip() or None change_summary = payload.get("change_summary") or request.args.get("change_summary") if change_summary is not None: change_summary = str(change_summary).strip() if not change_summary: change_summary = None if not edited_by: user = get_current_user() or {} edited_by = user.get("username") or user.get("display_name") or "admin" if not region_value or not permit_value: return jsonify({"success": False, "message": "region_id 和 permit_id 均为必填"}), 400 region_id = region_value.strip() if isinstance(region_value, str) else str(region_value) permit_id = permit_value.strip() if isinstance(permit_value, str) else str(permit_value) if theme_value: theme_id = theme_value.strip() if isinstance(theme_value, str) else str(theme_value) else: resolved_theme = resolve_region_permit_theme(region_id, permit_id) if not resolved_theme or not resolved_theme.get("id"): return jsonify({"success": False, "message": "未找到许可所属主题,无法删除"}), 400 theme_id = resolved_theme["id"] try: result = delete_region_permit( region_id, theme_id, permit_id, edited_by=edited_by, change_summary=change_summary, ) return jsonify({"success": True, "data": result}) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 404 except Exception as exc: print(f"admin_delete_permit error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/checkpoints', methods=['GET']) def admin_list_checkpoints(): """List all available checkpoints.""" try: checkpoints = list_checkpoints() return jsonify({"success": True, "data": {"checkpoints": checkpoints}}) except Exception as exc: print(f"admin_list_checkpoints error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permit-risk-snapshots', methods=['GET']) def admin_permit_risk_snapshots(): """List permit risk checkpoint history entries for the management UI.""" try: args = request.args region_id = args.get("region_id") or args.get("region") permit_id = args.get("permit_id") or args.get("permit") edited_by = args.get("edited_by") try: limit = int(args.get("limit", 20)) except (TypeError, ValueError): limit = 20 limit = max(1, min(limit, 200)) try: offset = int(args.get("offset", 0)) except (TypeError, ValueError): offset = 0 offset = max(0, offset) snapshots = list_permit_risk_snapshot_summaries( region_id=region_id, permit_id=permit_id, edited_by=edited_by, limit=limit, offset=offset, ) total = count_permit_risk_snapshots( region_id=region_id, permit_id=permit_id, edited_by=edited_by, ) return jsonify( { "success": True, "data": { "snapshots": snapshots, "pagination": { "limit": limit, "offset": offset, "total": total, }, }, } ) except Exception as exc: print(f"admin_permit_risk_snapshots error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permit-risk-snapshots//restore', methods=['POST']) def admin_restore_permit_risk_snapshot(batch_id): """Restore permit risk relations from a snapshot batch.""" payload: Dict[str, Any] = {} if request.is_json: payload = request.get_json(silent=True) or {} elif request.form: payload = request.form.to_dict(flat=True) edited_by = (payload.get("edited_by") or "").strip() or None change_summary = payload.get("change_summary") if change_summary is not None: change_summary = str(change_summary).strip() if not change_summary: change_summary = None try: result = restore_permit_risk_snapshot_batch( batch_id, edited_by=edited_by, change_summary=change_summary, ) return jsonify({"success": True, "data": result}) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 404 except Exception as exc: print(f"admin_restore_permit_risk_snapshot error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/checkpoints', methods=['POST']) def admin_create_checkpoint(): """Create a new checkpoint.""" if request.is_json: payload = request.get_json(silent=True) or {} else: payload = request.form.to_dict(flat=True) if request.form else {} description = payload.get("description", "") try: admin_user = get_current_user() or {} operator = admin_user.get("username") if admin_user else "admin" checkpoint = create_checkpoint(description, operator=operator) return jsonify({"success": True, "data": checkpoint}) except Exception as exc: print(f"admin_create_checkpoint error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/checkpoints//restore', methods=['POST']) def admin_restore_checkpoint(checkpoint_id): """ ⚠️ DANGEROUS OPERATION ⚠️ 恢复数据库从checkpoint。 此操作将会: 1. 永久删除当前数据库中的所有数据 2. 从指定的checkpoint恢复数据 3. 如果失败,可能导致数据丢失 建议在生产环境中: - 确保已创建最新备份 - 在低峰期操作 - 启用自动备份功能 (create_auto_backup=true) POST参数: - create_auto_backup: 是否在恢复前自动备份当前状态 (默认: true) - batch_size: 批量插入的批次大小,每批插入行数 (默认: 1000,范围: 100-10000) """ import logging logger = logging.getLogger(__name__) logger.info("=" * 80) logger.info(f"[API] Received checkpoint restore request: {checkpoint_id}") logger.info("=" * 80) try: # 获取请求参数 if request.is_json: payload = request.get_json(silent=True) or {} else: payload = request.form.to_dict(flat=True) if request.form else {} logger.info(f"[API] Request parameters: {payload}") # 默认启用自动备份 create_auto_backup = str(payload.get("create_auto_backup", "true")).lower() in {"1", "true", "yes", "on"} logger.info(f"[API] Auto-backup enabled: {create_auto_backup}") # 解析批量大小参数 try: batch_size = int(payload.get("batch_size", "1000")) if batch_size < 100 or batch_size > 10000: raise ValueError(f"batch_size must be between 100 and 10000, got {batch_size}") except (ValueError, TypeError) as e: logger.warning(f"[API] Invalid batch_size: {e}, using default 1000") batch_size = 1000 logger.info(f"[API] Batch size: {batch_size} rows/batch") # 执行恢复 logger.info(f"[API] Starting restore operation...") admin_user = get_current_user() or {} operator = admin_user.get("username") if admin_user else "admin" restore_result = restore_checkpoint(checkpoint_id, create_auto_backup=create_auto_backup, batch_size=batch_size, operator=operator) # 检查恢复状态 if restore_result.get("status") == "success": logger.info(f"[API] Restore completed successfully") logger.info(f"[API] Tables restored: {restore_result['summary']['tables_restored']}") logger.info(f"[API] Total rows restored: {restore_result['summary']['total_rows_restored']}") return jsonify({ "success": True, "message": restore_result["message"], "data": { "checkpoint_id": restore_result["summary"]["checkpoint_id"], "tables_restored": restore_result["summary"]["tables_restored"], "total_rows": restore_result["summary"]["total_rows_restored"], "auto_backup": restore_result["summary"].get("auto_backup"), "table_details": restore_result["summary"]["table_details"] } }) else: # 恢复失败 logger.error(f"[API] Restore failed: {restore_result.get('message')}") errors = restore_result["summary"].get("errors", []) if errors: logger.error(f"[API] Errors encountered: {errors}") response_data = { "success": False, "message": restore_result["message"], "data": { "errors": errors, "auto_backup_available": restore_result.get("auto_backup_available", False) } } # 如果有自动备份,提供恢复建议 if restore_result.get("recovery_suggestion"): logger.warning(f"[API] Recovery suggestion: {restore_result['recovery_suggestion']}") response_data["data"]["recovery_suggestion"] = restore_result["recovery_suggestion"] return jsonify(response_data), 500 except Exception as exc: logger.error(f"[API] Restore operation failed with exception: {str(exc)}", exc_info=True) return jsonify({ "success": False, "message": f"Restore failed: {str(exc)}", "data": { "error_type": type(exc).__name__, "auto_backup_available": None # 未知,因为异常发生在备份之前 } }), 500 @v2_bp.route('/admin/checkpoints/', methods=['DELETE']) def admin_delete_checkpoint(checkpoint_id): """Delete a checkpoint.""" try: success = delete_checkpoint(checkpoint_id) if success: return jsonify({"success": True, "message": "Checkpoint deleted"}) else: return jsonify({"success": False, "message": "Checkpoint not found"}), 404 except Exception as exc: print(f"admin_delete_checkpoint error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permits/advanced-filter', methods=['GET', 'POST']) def admin_permits_advanced_filter(): """Advanced filtering for permits with multiple dimensions. Supports filtering by: - regions: List of administrative regions (supports multi-select) - themes: List of legal themes (supports multi-select) - departments: List of departments (supports multi-select) - search_text: Permit name search """ try: # Parse parameters from query string or request body # Parse parameters from query/body in a more unified way if request.method == 'GET': regions = request.args.getlist('regions[]') or request.args.getlist('region') themes = request.args.getlist('themes[]') or request.args.getlist('theme') departments = request.args.getlist('departments[]') or request.args.getlist('department') search_text = request.args.get('search_text') or request.args.get('q') visibility = request.args.get('visibility') limit = request.args.get('limit', '100') offset = request.args.get('offset', '0') else: payload = request.get_json(silent=True) or request.form regions = payload.getlist('regions[]') if hasattr(payload, 'getlist') else payload.get('regions', []) themes = payload.getlist('themes[]') if hasattr(payload, 'getlist') else payload.get('themes', []) departments = payload.getlist('departments[]') if hasattr(payload, 'getlist') else payload.get('departments', []) search_text = payload.get('search_text') or payload.get('q') visibility = payload.get('visibility') limit = payload.get('limit', '100') offset = payload.get('offset', '0') # Normalize parameters if isinstance(regions, str): regions = [regions] if isinstance(themes, str): themes = [themes] if isinstance(departments, str): departments = [departments] regions = [r.strip() for r in regions if r and r.strip()] if regions else None themes = [t.strip() for t in themes if t and t.strip()] if themes else None departments = [d.strip() for d in departments if d and d.strip()] if departments else None search_text = (search_text or "").strip() or None visibility = (visibility or "").strip().lower() or None try: limit = int(limit) except (ValueError, TypeError): limit = 100 try: offset = int(offset) except (ValueError, TypeError): offset = 0 print(f"[DEBUG] admin_permits_advanced_filter params: search={search_text}, visibility={visibility}, regions={regions}") # Execute filtering result = filter_permits_advanced( regions=regions, themes=themes, departments=departments, search_text=search_text, visibility=visibility, limit=limit, offset=offset, ) return jsonify({"success": True, "data": result}) except Exception as exc: print(f"admin_permits_advanced_filter error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permits/filter-options', methods=['GET']) def admin_permits_filter_options(): """Get available filter options for permit filtering. Args: region_id: Optional. If provided, only return departments associated with this region """ try: # Get region_id from query parameters region_id = request.args.get('region_id') # Get all regions regions = list_regions() # Get all themes themes = list_all_themes() # Get service departments (filtered by region if region_id is provided) departments = list_service_departments(region_id=region_id) if region_id else list_service_departments() return jsonify({ "success": True, "data": { "regions": regions, "themes": themes, "departments": departments, } }) except Exception as exc: print(f"admin_permits_filter_options error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/approval-departments/setup', methods=['POST']) def admin_setup_approval_departments(): """Setup approval department mappings (admin only).""" from lawrisk.services.licensing_repo import _lic_pg_conn, _ensure_permit_approval_departments_schema user, error = _admin_guard(prefer_json=True) if error: return error try: # Common permit -> department mappings mappings = [ ('营业执照', '市场监管部门'), ('食品经营许可证', '市场监管部门'), ('药品经营许可证', '市场监管部门'), ('医疗器械经营许可证', '市场监管部门'), ('特种设备使用登记', '市场监管部门'), ] # Ensure schema exists _ensure_permit_approval_departments_schema() with _lic_pg_conn() as conn: cur = conn.cursor() # Check current count cur.execute("SELECT COUNT(*) FROM permit_approval_departments") before_count = cur.fetchone()[0] inserted = 0 updated = 0 for permit_name, dept_name in mappings: # Check if exists cur.execute(""" SELECT id FROM permit_approval_departments WHERE permit_name = %s """, (permit_name,)) existing = cur.fetchone() if existing: # Update cur.execute(""" UPDATE permit_approval_departments SET department_name = %s, updated_at = now() WHERE permit_name = %s """, (dept_name, permit_name)) updated += 1 else: # Insert record_id = str(uuid.uuid4()) cur.execute(""" INSERT INTO permit_approval_departments (id, permit_name, department_name) VALUES (%s, %s, %s) """, (record_id, permit_name, dept_name)) inserted += 1 conn.commit() # Get final count cur.execute("SELECT COUNT(*) FROM permit_approval_departments") after_count = cur.fetchone()[0] # Get all mappings cur.execute("SELECT permit_name, department_name FROM permit_approval_departments ORDER BY permit_name") all_mappings = [{"permit_name": row[0], "department_name": row[1]} for row in cur.fetchall()] return jsonify({ "success": True, "data": { "inserted": inserted, "updated": updated, "before_count": before_count, "after_count": after_count, "mappings": all_mappings } }) except Exception as exc: print(f"admin_setup_approval_departments error: {exc}") import traceback traceback.print_exc() return jsonify({"success": False, "message": str(exc)}), 500