"""V2 API routes - Enhanced implementation with structured results.""" from __future__ import annotations import os import time from io import BytesIO from flask import Blueprint, jsonify, request, send_file from concurrent.futures import ThreadPoolExecutor from typing import Any, Dict, Optional from lawrisk.api.auth import login_required, get_current_user from lawrisk.services.lawrisk_v2_service import search_v2, list_regions from lawrisk.services.licensing_repo import ( list_permits_for_region, load_permits_and_risks, list_region_theme_options, list_region_permit_catalog, load_theme_payload, create_checkpoint, list_checkpoints, restore_checkpoint, delete_checkpoint, list_permit_risk_snapshot_summaries, count_permit_risk_snapshots, delete_region_permit, restore_permit_risk_snapshot_batch, start_permit_import_session, commit_permit_import_session, fetch_permit_file, describe_permit_import_session, resolve_region_permit_theme, list_stored_permit_files, start_import_session_from_file, delete_stored_permit_file, ) from lawrisk.services.lawrisk_service import suggest_questions_embed v2_bp = Blueprint('lawrisk_v2', __name__, url_prefix='/fs-ai-asistant/api/workflow/lawrisk') @v2_bp.route('/v2', methods=['POST', 'GET']) def lawrisk_search_v2(): """V2 search endpoint with structured results.""" query, debug_flag, top_k_int, _mode, region_filter = _extract_params() if not query or not isinstance(query, str): return jsonify({"error": "query is required"}), 400 try: t0 = time.time() with ThreadPoolExecutor(max_workers=2) as ex: fut_subject = ex.submit(search_v2, query, debug_flag, region_filter) fut_questions = ex.submit(suggest_questions_embed, query, max(1, top_k_int)) result_v2 = fut_subject.result() rec_questions = fut_questions.result() or [] risk_subject = result_v2.get("risk_subject", []) if isinstance(result_v2, dict) else [] found = bool(risk_subject) exec_time = int((time.time() - t0) * 1000) data = { "llmRespond": "" if found else "抱歉,无法检索到相关答案", "lawRisk": "", "questionExtend": rec_questions, "conversationId": "", "messageId": "", "roundNumber": 0, "conversationInfo": {}, "knowledgeSources": [], "totalKnowledgeSources": 0, "executionTime": exec_time, "workflowStatus": "ok" if found else "no_match", "executionSteps": [], "costStatistics": {}, "workflowTrackingId": "", "risk_subject": risk_subject, "debug": result_v2.get("debug", {}) if (debug_flag and isinstance(result_v2, dict)) else {}, } resp = {"success": True, "message": "OK", "data": data} return jsonify(resp) except Exception as e: print(f"lawrisk_search_v2 error: {e}") return jsonify({"success": False, "message": str(e), "data": {}}), 500 @v2_bp.route('/v2/regions', methods=['GET']) def lawrisk_regions(): """Get list of available regions.""" try: regions = list_regions() return jsonify({"success": True, "data": {"regions": regions}}) except Exception as exc: print(f"lawrisk_regions error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/getPermits', methods=['GET', 'POST']) def lawrisk_get_permits(): """Get permits for a specific region.""" if request.method == "GET": region_value = request.args.get("region") or request.args.get("region_id") else: if request.is_json: payload = request.get_json(silent=True) or {} else: payload = request.form.to_dict(flat=True) if request.form else {} region_value = payload.get("region") or payload.get("region_id") if not region_value or (isinstance(region_value, str) and not region_value.strip()): return jsonify({"success": False, "message": "region is required", "data": {}}), 400 region_token = region_value.strip() if isinstance(region_value, str) else str(region_value) try: permits = list_permits_for_region(region_token) return jsonify({"success": True, "data": {"region": region_token, "permits": permits}}) except Exception as exc: print(f"lawrisk_get_permits error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 def _extract_params(): """Extract parameters from request.""" if request.method == "GET": query = request.args.get("query") or request.args.get("q") or request.args.get("text") debug_flag = request.args.get("debug") in {"1", "true", "yes", "on"} top_k = request.args.get("top") try: top_k_int = int(top_k) if top_k else 5 except Exception: top_k_int = 5 mode_value = (request.args.get("mode") or "llm").lower() region_value = request.args.get("region") or request.args.get("region_id") region_list = request.args.getlist("region") if region_list and (not region_value or len(region_list) > 1): region_value = region_list else: if request.is_json: payload = request.get_json(silent=True) or {} else: payload = request.form.to_dict(flat=True) if request.form else {} query = payload.get("query") or payload.get("q") or payload.get("text") debug_flag = str(payload.get("debug", "")).strip().lower() in {"1", "true", "yes", "on"} try: top_k_int = int(payload.get("top", 5)) except Exception: top_k_int = 5 mode_value = str(payload.get("mode", "llm")).lower() region_value = payload.get("region") or payload.get("region_id") return query, debug_flag, top_k_int, mode_value, region_value @v2_bp.route('/admin/test', methods=['GET']) def admin_test(): """Simple test route.""" return jsonify({"success": True, "message": "Test route works!"}) @v2_bp.route('/test-simple', methods=['GET']) def test_simple(): """Very simple test.""" return jsonify({"status": "ok"}) @v2_bp.route('/db_admin', methods=['GET']) @login_required def db_admin_page(): """Serve the database administration UI.""" project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) html_path = os.path.join(project_root, 'static', 'db_admin.html') if not os.path.exists(html_path): return jsonify({"success": False, "message": "DB admin page not found"}), 404 return send_file(html_path, mimetype='text/html') @v2_bp.route('/admin/regions', methods=['GET']) def admin_regions(): """Get all regions for database maintenance.""" try: regions = list_regions() return jsonify({"success": True, "data": {"regions": regions}}) except Exception as exc: print(f"admin_regions error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/themes', methods=['GET']) def admin_themes(): """Get themes for a specific region.""" region_value = request.args.get("region") or request.args.get("region_id") if not region_value or (isinstance(region_value, str) and not region_value.strip()): return jsonify({"success": False, "message": "region is required", "data": {}}), 400 region_token = region_value.strip() if isinstance(region_value, str) else str(region_value) try: catalog = list_region_theme_options() region_id_lower = region_token.lower() themes = [] seen_theme_ids = set() for item in catalog: if (item["region_id"] == region_token or item["region_id"].lower() == region_id_lower or item["region_name"].lower() == region_id_lower): theme_id = item["theme_id"] if theme_id not in seen_theme_ids: seen_theme_ids.add(theme_id) themes.append({ "id": theme_id, "name": item["theme_name"], "option_id": item["option_id"] }) return jsonify({"success": True, "data": {"region": region_token, "themes": themes}}) except Exception as exc: print(f"admin_themes error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permits', methods=['GET']) def admin_permits(): """Get permits for a region. Optional theme filter keeps backward compatibility.""" region_value = request.args.get("region") or request.args.get("region_id") theme_value = request.args.get("theme") or request.args.get("theme_id") if not region_value or (isinstance(region_value, str) and not region_value.strip()): return jsonify({"success": False, "message": "region is required", "data": {}}), 400 region_token = region_value.strip() if isinstance(region_value, str) else str(region_value) theme_token = ( theme_value.strip() if theme_value and isinstance(theme_value, str) else (str(theme_value) if theme_value is not None else None) ) try: if theme_token: permits = load_permits_and_risks(region_token, theme_token) data = { "region": region_token, "theme": theme_token, "permits": permits, } else: catalog = list_region_permit_catalog(region_token) data = { "region": region_token, "permits": catalog, } return jsonify({"success": True, "data": data}) except Exception as exc: print(f"admin_permits error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permit-import/upload', methods=['POST']) def admin_permit_import_upload(): """Upload Excel workbook and start an import session.""" if 'file' not in request.files: return jsonify({"success": False, "message": "请选择要上传的Excel文件"}), 400 file_storage = request.files['file'] filename = file_storage.filename or 'import.xlsx' file_bytes = file_storage.read() user = get_current_user() or {} uploaded_by = user.get("display_name") or user.get("username") or user.get("id") content_type = file_storage.mimetype or "application/octet-stream" if not file_bytes: return jsonify({"success": False, "message": "上传的文件为空"}), 400 try: data = start_permit_import_session( file_bytes, filename, content_type=content_type, uploaded_by=str(uploaded_by) if uploaded_by else None, ) return jsonify({"success": True, "data": data}) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: print(f"admin_permit_import_upload error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permit-import/template', methods=['GET']) def admin_permit_import_template(): """Provide the Excel import template for download.""" project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) template_path = os.path.join(project_root, 'data', 'template', '风险提示表 模板.xlsx') if not os.path.exists(template_path): return jsonify({"success": False, "message": "模板文件不存在,请联系管理员"}), 404 try: return send_file( template_path, as_attachment=True, download_name='风险提示表 模板.xlsx', ) except Exception as exc: print(f"admin_permit_import_template error: {exc}") return jsonify({"success": False, "message": "模板文件暂时无法下载"}), 500 def _build_import_preview_response(session_token: str): """Internal helper to build preview response JSON.""" try: data = describe_permit_import_session(session_token) return jsonify({"success": True, "data": data}) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: print(f"admin_permit_import_preview error: {exc}") return jsonify({"success": False, "message": "无法加载预览数据"}), 500 @v2_bp.route('/admin/permit-import/session//preview', methods=['GET']) def admin_permit_import_preview(session_id: str): """Return parsed workbook preview along with theme options (path param).""" if not session_id: return jsonify({"success": False, "message": "session_id 不能为空"}), 400 return _build_import_preview_response(session_id) @v2_bp.route('/admin/permit-import/preview', methods=['GET']) def admin_permit_import_preview_query(): """Return preview via query string for compatibility.""" session_id = request.args.get("session_id") or request.args.get("sessionId") if not session_id: return jsonify({"success": False, "message": "session_id 不能为空"}), 400 return _build_import_preview_response(session_id) @v2_bp.route('/admin/permit-import/commit', methods=['POST']) def admin_permit_import_commit(): """Commit an import session with selected sheets.""" payload = request.get_json(silent=True) or {} session_id = payload.get('session_id') or payload.get('sessionId') sheet_names = payload.get('sheet_names') or payload.get('sheets') or payload.get('selectedSheets') overrides = payload.get('overrides') or {} edited_by = payload.get('edited_by') or payload.get('editedBy') change_summary = payload.get('change_summary') or payload.get('changeSummary') theme_bindings = payload.get('theme_bindings') or payload.get('themeBindings') or {} if isinstance(sheet_names, str): sheet_names = [sheet_names] try: data = commit_permit_import_session( session_id, sheet_names or [], overrides=overrides, edited_by=edited_by, change_summary=change_summary, theme_bindings=theme_bindings, ) return jsonify({"success": True, "data": data}) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: print(f"admin_permit_import_commit error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permit-files', methods=['GET']) def admin_list_permit_files(): """List stored permit files with pagination.""" try: limit = int(request.args.get("limit", 20)) except (TypeError, ValueError): limit = 20 limit = max(1, min(limit, 100)) try: offset = int(request.args.get("offset", 0)) except (TypeError, ValueError): offset = 0 offset = max(0, offset) keyword = request.args.get("keyword") or request.args.get("q") try: data = list_stored_permit_files(limit=limit, offset=offset, keyword=keyword) return jsonify({"success": True, "data": data}) except Exception as exc: print(f"admin_list_permit_files error: {exc}") return jsonify({"success": False, "message": "文件列表加载失败"}), 500 @v2_bp.route('/admin/permit-files//reimport', methods=['POST']) def admin_reimport_permit_file(file_id: str): """Re-create an import session from an archived Excel file.""" if not file_id: return jsonify({"success": False, "message": "file_id 不能为空"}), 400 user = get_current_user() or {} requested_by = user.get("display_name") or user.get("username") or user.get("id") try: data = start_import_session_from_file(file_id, requested_by=requested_by) return jsonify({"success": True, "data": data}) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 404 except Exception as exc: print(f"admin_reimport_permit_file error: {exc}") return jsonify({"success": False, "message": "暂无法重新载入该文件"}), 500 @v2_bp.route('/admin/permit-files/', methods=['DELETE']) def admin_delete_permit_file(file_id: str): """Delete an archived permit file.""" if not file_id: return jsonify({"success": False, "message": "file_id 不能为空"}), 400 try: deleted = delete_stored_permit_file(file_id) if deleted: return jsonify({"success": True, "message": "文件已删除"}) return jsonify({"success": False, "message": "文件不存在"}), 404 except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 400 except Exception as exc: print(f"admin_delete_permit_file error: {exc}") return jsonify({"success": False, "message": "文件暂无法删除"}), 500 @v2_bp.route('/admin/permit-file/download', methods=['GET']) @login_required def admin_permit_file_download(): """Download the original Excel file associated with a permit.""" region_value = request.args.get("region") or request.args.get("region_id") permit_value = request.args.get("permit") or request.args.get("permit_id") if not region_value or not permit_value: return jsonify({"success": False, "message": "region 和 permit 参数必填"}), 400 region_token = region_value.strip() if isinstance(region_value, str) else str(region_value) permit_token = permit_value.strip() if isinstance(permit_value, str) else str(permit_value) try: file_payload = fetch_permit_file(region_token, permit_token) if not file_payload: return jsonify({"success": False, "message": "当前许可没有关联的原始文件"}), 404 buffer = BytesIO(file_payload["file_data"]) buffer.seek(0) download_name = file_payload.get("filename") or "许可导入.xlsx" mimetype = file_payload.get("content_type") or "application/octet-stream" return send_file( buffer, as_attachment=True, download_name=download_name, mimetype=mimetype, ) except Exception as exc: print(f"admin_permit_file_download error: {exc}") return jsonify({"success": False, "message": "暂无法下载原始文件"}), 500 @v2_bp.route('/admin/permit-details', methods=['GET']) def admin_permit_details(): """Get detailed information for a specific permit.""" region_value = request.args.get("region") or request.args.get("region_id") theme_value = request.args.get("theme") or request.args.get("theme_id") permit_value = request.args.get("permit") or request.args.get("permit_id") if not region_value or not permit_value: return jsonify({"success": False, "message": "region and permit are required", "data": {}}), 400 region_token = region_value.strip() if isinstance(region_value, str) else str(region_value) permit_token = permit_value.strip() if isinstance(permit_value, str) else str(permit_value) theme_token: Optional[str] theme_display = None if theme_value is None or (isinstance(theme_value, str) and not theme_value.strip()): resolved = resolve_region_permit_theme(region_token, permit_token) if not resolved or not resolved.get("id"): return jsonify({"success": False, "message": "未找到许可所属主题", "data": {}}), 404 theme_token = resolved["id"] theme_display = resolved else: theme_token = theme_value.strip() if isinstance(theme_value, str) else str(theme_value) try: # 始终加载全部主题,详情页需要展示主题列表并高亮当前主题 permits = load_permits_and_risks(region_token, None, permit_token) if not permits: return jsonify({"success": False, "message": "Permit not found", "data": {}}), 404 permit_payload = permits[0] payload = { "region": region_token, "theme": theme_token, "permit": permit_payload, } selected_theme_meta = None theme_list = permit_payload.get("themes") or [] if theme_token: for candidate in theme_list: if candidate.get("id") == theme_token: selected_theme_meta = candidate break if not selected_theme_meta and theme_display: selected_theme_meta = theme_display if not selected_theme_meta: selected_theme_meta = permit_payload.get("theme") if not selected_theme_meta and theme_list: selected_theme_meta = theme_list[0] if selected_theme_meta: permit_payload["theme"] = selected_theme_meta payload["theme_display"] = selected_theme_meta payload["selected_theme_id"] = selected_theme_meta.get("id") or "" elif theme_display: payload["theme_display"] = theme_display return jsonify({"success": True, "data": payload}) except Exception as exc: print(f"admin_permit_details error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permits', methods=['DELETE']) def admin_delete_permit(): """Delete a permit for a specific region-theme combination after snapshotting risks.""" payload: Dict[str, Any] = {} if request.is_json: payload = request.get_json(silent=True) or {} elif request.form: payload = request.form.to_dict(flat=True) region_value = ( payload.get("region_id") or payload.get("region") or request.args.get("region_id") or request.args.get("region") ) theme_value = ( payload.get("theme_id") or payload.get("theme") or request.args.get("theme_id") or request.args.get("theme") ) permit_value = ( payload.get("permit_id") or payload.get("permit") or request.args.get("permit_id") or request.args.get("permit") ) edited_by = (payload.get("edited_by") or request.args.get("edited_by") or "").strip() or None change_summary = payload.get("change_summary") or request.args.get("change_summary") if change_summary is not None: change_summary = str(change_summary).strip() if not change_summary: change_summary = None if not region_value or not permit_value: return jsonify({"success": False, "message": "region_id 和 permit_id 均为必填"}), 400 region_id = region_value.strip() if isinstance(region_value, str) else str(region_value) permit_id = permit_value.strip() if isinstance(permit_value, str) else str(permit_value) if theme_value: theme_id = theme_value.strip() if isinstance(theme_value, str) else str(theme_value) else: resolved_theme = resolve_region_permit_theme(region_id, permit_id) if not resolved_theme or not resolved_theme.get("id"): return jsonify({"success": False, "message": "未找到许可所属主题,无法删除"}), 400 theme_id = resolved_theme["id"] try: result = delete_region_permit( region_id, theme_id, permit_id, edited_by=edited_by, change_summary=change_summary, ) return jsonify({"success": True, "data": result}) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 404 except Exception as exc: print(f"admin_delete_permit error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/checkpoints', methods=['GET']) def admin_list_checkpoints(): """List all available checkpoints.""" try: checkpoints = list_checkpoints() return jsonify({"success": True, "data": {"checkpoints": checkpoints}}) except Exception as exc: print(f"admin_list_checkpoints error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permit-risk-snapshots', methods=['GET']) def admin_permit_risk_snapshots(): """List permit risk checkpoint history entries for the management UI.""" try: args = request.args region_id = args.get("region_id") or args.get("region") permit_id = args.get("permit_id") or args.get("permit") edited_by = args.get("edited_by") try: limit = int(args.get("limit", 20)) except (TypeError, ValueError): limit = 20 limit = max(1, min(limit, 200)) try: offset = int(args.get("offset", 0)) except (TypeError, ValueError): offset = 0 offset = max(0, offset) snapshots = list_permit_risk_snapshot_summaries( region_id=region_id, permit_id=permit_id, edited_by=edited_by, limit=limit, offset=offset, ) total = count_permit_risk_snapshots( region_id=region_id, permit_id=permit_id, edited_by=edited_by, ) return jsonify( { "success": True, "data": { "snapshots": snapshots, "pagination": { "limit": limit, "offset": offset, "total": total, }, }, } ) except Exception as exc: print(f"admin_permit_risk_snapshots error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/permit-risk-snapshots//restore', methods=['POST']) def admin_restore_permit_risk_snapshot(batch_id): """Restore permit risk relations from a snapshot batch.""" payload: Dict[str, Any] = {} if request.is_json: payload = request.get_json(silent=True) or {} elif request.form: payload = request.form.to_dict(flat=True) edited_by = (payload.get("edited_by") or "").strip() or None change_summary = payload.get("change_summary") if change_summary is not None: change_summary = str(change_summary).strip() if not change_summary: change_summary = None try: result = restore_permit_risk_snapshot_batch( batch_id, edited_by=edited_by, change_summary=change_summary, ) return jsonify({"success": True, "data": result}) except ValueError as exc: return jsonify({"success": False, "message": str(exc)}), 404 except Exception as exc: print(f"admin_restore_permit_risk_snapshot error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/checkpoints', methods=['POST']) def admin_create_checkpoint(): """Create a new checkpoint.""" if request.is_json: payload = request.get_json(silent=True) or {} else: payload = request.form.to_dict(flat=True) if request.form else {} description = payload.get("description", "") try: checkpoint = create_checkpoint(description) return jsonify({"success": True, "data": checkpoint}) except Exception as exc: print(f"admin_create_checkpoint error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500 @v2_bp.route('/admin/checkpoints//restore', methods=['POST']) def admin_restore_checkpoint(checkpoint_id): """ ⚠️ DANGEROUS OPERATION ⚠️ 恢复数据库从checkpoint。 此操作将会: 1. 永久删除当前数据库中的所有数据 2. 从指定的checkpoint恢复数据 3. 如果失败,可能导致数据丢失 建议在生产环境中: - 确保已创建最新备份 - 在低峰期操作 - 启用自动备份功能 (create_auto_backup=true) POST参数: - create_auto_backup: 是否在恢复前自动备份当前状态 (默认: true) - batch_size: 批量插入的批次大小,每批插入行数 (默认: 1000,范围: 100-10000) """ import logging logger = logging.getLogger(__name__) logger.info("=" * 80) logger.info(f"[API] Received checkpoint restore request: {checkpoint_id}") logger.info("=" * 80) try: # 获取请求参数 if request.is_json: payload = request.get_json(silent=True) or {} else: payload = request.form.to_dict(flat=True) if request.form else {} logger.info(f"[API] Request parameters: {payload}") # 默认启用自动备份 create_auto_backup = str(payload.get("create_auto_backup", "true")).lower() in {"1", "true", "yes", "on"} logger.info(f"[API] Auto-backup enabled: {create_auto_backup}") # 解析批量大小参数 try: batch_size = int(payload.get("batch_size", "1000")) if batch_size < 100 or batch_size > 10000: raise ValueError(f"batch_size must be between 100 and 10000, got {batch_size}") except (ValueError, TypeError) as e: logger.warning(f"[API] Invalid batch_size: {e}, using default 1000") batch_size = 1000 logger.info(f"[API] Batch size: {batch_size} rows/batch") # 执行恢复 logger.info(f"[API] Starting restore operation...") restore_result = restore_checkpoint(checkpoint_id, create_auto_backup=create_auto_backup, batch_size=batch_size) # 检查恢复状态 if restore_result.get("status") == "success": logger.info(f"[API] Restore completed successfully") logger.info(f"[API] Tables restored: {restore_result['summary']['tables_restored']}") logger.info(f"[API] Total rows restored: {restore_result['summary']['total_rows_restored']}") return jsonify({ "success": True, "message": restore_result["message"], "data": { "checkpoint_id": restore_result["summary"]["checkpoint_id"], "tables_restored": restore_result["summary"]["tables_restored"], "total_rows": restore_result["summary"]["total_rows_restored"], "auto_backup": restore_result["summary"].get("auto_backup"), "table_details": restore_result["summary"]["table_details"] } }) else: # 恢复失败 logger.error(f"[API] Restore failed: {restore_result.get('message')}") errors = restore_result["summary"].get("errors", []) if errors: logger.error(f"[API] Errors encountered: {errors}") response_data = { "success": False, "message": restore_result["message"], "data": { "errors": errors, "auto_backup_available": restore_result.get("auto_backup_available", False) } } # 如果有自动备份,提供恢复建议 if restore_result.get("recovery_suggestion"): logger.warning(f"[API] Recovery suggestion: {restore_result['recovery_suggestion']}") response_data["data"]["recovery_suggestion"] = restore_result["recovery_suggestion"] return jsonify(response_data), 500 except Exception as exc: logger.error(f"[API] Restore operation failed with exception: {str(exc)}", exc_info=True) return jsonify({ "success": False, "message": f"Restore failed: {str(exc)}", "data": { "error_type": type(exc).__name__, "auto_backup_available": None # 未知,因为异常发生在备份之前 } }), 500 @v2_bp.route('/admin/checkpoints/', methods=['DELETE']) def admin_delete_checkpoint(checkpoint_id): """Delete a checkpoint.""" try: success = delete_checkpoint(checkpoint_id) if success: return jsonify({"success": True, "message": "Checkpoint deleted"}) else: return jsonify({"success": False, "message": "Checkpoint not found"}), 404 except Exception as exc: print(f"admin_delete_checkpoint error: {exc}") return jsonify({"success": False, "message": str(exc)}), 500