diff --git a/docs/sql/005_create_service_departments.sql b/docs/sql/005_create_service_departments.sql new file mode 100644 index 0000000..75ba034 --- /dev/null +++ b/docs/sql/005_create_service_departments.sql @@ -0,0 +1,44 @@ +CREATE TABLE IF NOT EXISTS service_departments ( + id uuid PRIMARY KEY, + name text NOT NULL, + code text NOT NULL UNIQUE, + phone text, + parent_id uuid REFERENCES service_departments(id) ON DELETE SET NULL, + region_id uuid REFERENCES regions(id) ON DELETE SET NULL, + description text, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +CREATE UNIQUE INDEX IF NOT EXISTS service_departments_name_idx + ON service_departments (name); + +ALTER TABLE IF EXISTS service_departments + ADD COLUMN IF NOT EXISTS parent_id uuid REFERENCES service_departments(id) ON DELETE SET NULL; + +ALTER TABLE IF EXISTS service_departments + ADD COLUMN IF NOT EXISTS phone text; + +CREATE INDEX IF NOT EXISTS service_departments_parent_idx + ON service_departments (parent_id); + +CREATE TABLE IF NOT EXISTS service_department_permits ( + department_id uuid NOT NULL REFERENCES service_departments(id) ON DELETE CASCADE, + region_id uuid NOT NULL REFERENCES regions(id) ON DELETE CASCADE, + permit_id uuid NOT NULL REFERENCES permits(id) ON DELETE CASCADE, + created_at timestamptz NOT NULL DEFAULT now(), + created_by text, + PRIMARY KEY (department_id, region_id, permit_id) +); + +CREATE INDEX IF NOT EXISTS service_dept_permits_region_idx + ON service_department_permits (region_id, permit_id); + +ALTER TABLE IF EXISTS auth_users + ADD COLUMN IF NOT EXISTS service_department_id uuid REFERENCES service_departments(id); + +ALTER TABLE IF EXISTS auth_users + ADD COLUMN IF NOT EXISTS department_role text; + +CREATE INDEX IF NOT EXISTS auth_users_service_department_idx + ON auth_users (service_department_id); diff --git a/lawrisk/api/auth.py b/lawrisk/api/auth.py new file mode 100644 index 0000000..980147e --- /dev/null +++ b/lawrisk/api/auth.py @@ -0,0 +1,178 @@ +"""Auth blueprint: login page, authentication endpoints, and helpers.""" +from __future__ import annotations + +from functools import wraps +from typing import Any, Callable, Dict, Iterable, Optional, Tuple +from urllib.parse import urlparse + +from flask import ( + Blueprint, + Response, + jsonify, + redirect, + render_template, + request, + session, + url_for, +) + +from lawrisk.services.auth_service import get_user_by_username, verify_password + + +auth_bp = Blueprint("auth", __name__) + +SESSION_USER_KEY = "auth_user" + + +def _safe_next_url(candidate: Optional[str]) -> str: + if not candidate: + return "/" + raw = str(candidate).strip() + if not raw: + return "/" + parsed = urlparse(raw) + if parsed.netloc: + safe_path = parsed.path or "/" + else: + safe_path = raw if raw.startswith("/") else f"/{raw}" + if parsed.query: + safe_path = f"{safe_path}?{parsed.query}" + return safe_path + + +def _scrub_user_payload(user: Dict[str, Any]) -> Dict[str, Any]: + department = None + if user.get("service_department_id"): + department = { + "id": user.get("service_department_id"), + "name": user.get("service_department_name"), + "code": user.get("service_department_code"), + "region_id": user.get("service_department_region_id"), + "parent_id": user.get("service_department_parent_id"), + "phone": user.get("service_department_phone"), + "role": user.get("department_role"), + } + return { + "id": user.get("id"), + "username": user.get("username"), + "display_name": user.get("display_name"), + "role": user.get("role"), + "grade": user.get("grade"), + "department": department, + } + + +def ensure_admin_access( + allowed_roles: Optional[Iterable[str]] = None, + *, + prefer_json: bool = False, +) -> Tuple[Optional[Dict[str, Any]], Optional[Response]]: + """Ensure the current session belongs to an admin role.""" + user = get_current_user() + wants_html = ( + (not prefer_json) + and request.method == "GET" + and request.accept_mimetypes.accept_html + and ( + request.accept_mimetypes["text/html"] + >= request.accept_mimetypes["application/json"] + ) + ) + if not user: + if wants_html: + login_url = url_for("auth.login_page", next=request.url) + return None, redirect(login_url) + return None, (jsonify({"error": "authentication required"}), 401) + + default_roles = ("admin",) if allowed_roles is None else allowed_roles + allowed = {str(role).lower() for role in default_roles if role} + current_role = str(user.get("role") or "").lower() + + if allowed and current_role not in allowed: + if wants_html: + login_url = url_for("auth.login_page", next=request.url, force=1) + return None, redirect(login_url) + return None, (jsonify({"error": "admin privileges required"}), 403) + + return user, None + + +def get_current_user() -> Optional[Dict[str, Any]]: + data = session.get(SESSION_USER_KEY) + if not isinstance(data, dict): + return None + return data + + +def login_required(view: Callable[..., Response]) -> Callable[..., Response]: + @wraps(view) + def wrapper(*args: Any, **kwargs: Any) -> Response: + user = get_current_user() + if not user: + wants_html = request.method == "GET" and request.accept_mimetypes.accept_html and ( + request.accept_mimetypes["text/html"] >= request.accept_mimetypes["application/json"] + ) + if wants_html: + login_url = url_for("auth.login_page", next=request.url) + return redirect(login_url) + return jsonify({"error": "authentication required"}), 401 + return view(*args, **kwargs) + + return wrapper + + +def _truthy_param(value: Optional[str]) -> bool: + if value is None: + return False + return str(value).strip().lower() in {"1", "true", "yes", "on", "force", "reauth"} + + +@auth_bp.get("/fs-ai-asistant/lawrisk/login") +def login_page() -> Response: + user = get_current_user() + force_login = _truthy_param(request.args.get("force") or request.args.get("reauth")) + if user and not force_login: + return redirect(request.args.get("next") or "/") + return render_template("login.html") + + +@auth_bp.post("/auth/login") +def login_action() -> Response: + payload = request.get_json(silent=True) or request.form + username = (payload.get("username", "") if payload else "").strip().lower() + password = (payload.get("password", "") if payload else "").strip() + next_url = _safe_next_url((payload.get("next") if payload else None) or request.args.get("next")) + + if not username or not password: + return jsonify({"error": "username and password are required"}), 400 + + user = get_user_by_username(username) + if not user or not verify_password(password, user.get("password_hash", "")): + return jsonify({"error": "invalid credentials"}), 401 + if not user.get("is_active", True): + return jsonify({"error": "account disabled"}), 403 + + sanitized = _scrub_user_payload(user) + session[SESSION_USER_KEY] = sanitized + + # Form submissions expect redirects; API clients expect JSON + if request.content_type and "application/x-www-form-urlencoded" in request.content_type: + return redirect(next_url) + + return jsonify({"message": "login successful", "user": sanitized, "redirect": next_url}) + + +@auth_bp.post("/auth/logout") +def logout_action() -> Response: + session.pop(SESSION_USER_KEY, None) + if request.content_type and "application/x-www-form-urlencoded" in request.content_type: + return redirect(url_for("auth.login_page")) + return jsonify({"message": "logged out"}) + + +@auth_bp.get("/auth/me") +def current_user_endpoint() -> Response: + user = get_current_user() + if not user: + return jsonify({"authenticated": False}), 401 + return jsonify({"authenticated": True, "user": user}) diff --git a/lawrisk/api/v2.py b/lawrisk/api/v2.py index 8902cee..637692c 100644 --- a/lawrisk/api/v2.py +++ b/lawrisk/api/v2.py @@ -8,7 +8,7 @@ from flask import Blueprint, jsonify, request, send_file from concurrent.futures import ThreadPoolExecutor from typing import Any, Dict, Optional -from lawrisk.api.auth import login_required, get_current_user +from lawrisk.api.auth import login_required, get_current_user, ensure_admin_access from lawrisk.services.lawrisk_v2_service import search_v2, list_regions from lawrisk.services.licensing_repo import ( list_permits_for_region, @@ -32,12 +32,49 @@ from lawrisk.services.licensing_repo import ( list_stored_permit_files, start_import_session_from_file, delete_stored_permit_file, + list_service_departments, + create_service_department, + update_service_department, + delete_service_department, + list_all_themes, + create_theme, + rename_theme, + delete_theme, +) +from lawrisk.services.auth_service import ( + list_users, + create_user, + update_user_account, +) +from lawrisk.services.template_service import ( + get_permit_template_path, + get_permit_template_metadata, + overwrite_permit_template, ) from lawrisk.services.lawrisk_service import suggest_questions_embed v2_bp = Blueprint('lawrisk_v2', __name__, url_prefix='/fs-ai-asistant/api/workflow/lawrisk') +def _project_root() -> str: + return os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) + + +def _parse_truthy(value: Any, default: bool = False) -> bool: + if value is None: + return default + if isinstance(value, bool): + return value + return str(value).strip().lower() in {"1", "true", "yes", "on"} + + +def _admin_guard(*, prefer_json: bool = False, roles: Optional[Iterable[str]] = None): + user, error = ensure_admin_access(allowed_roles=roles, prefer_json=prefer_json) + if error: + return None, error + return user, None + + @v2_bp.route('/v2', methods=['POST', 'GET']) def lawrisk_search_v2(): """V2 search endpoint with structured results.""" @@ -168,8 +205,7 @@ def test_simple(): @login_required def db_admin_page(): """Serve the database administration UI.""" - project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) - html_path = os.path.join(project_root, 'static', 'db_admin.html') + html_path = os.path.join(_project_root(), 'static', 'db_admin.html') if not os.path.exists(html_path): return jsonify({"success": False, "message": "DB admin page not found"}), 404 @@ -177,6 +213,286 @@ def db_admin_page(): return send_file(html_path, mimetype='text/html') +@v2_bp.route('/admin/super', methods=['GET']) +def super_admin_page(): + """Serve the super administrator console.""" + _, error = _admin_guard(roles=("admin",)) + if error: + return error + html_path = os.path.join(_project_root(), 'static', 'super_admin.html') + if not os.path.exists(html_path): + return jsonify({"success": False, "message": "Super admin page not found"}), 404 + return send_file(html_path, mimetype='text/html') + + +@v2_bp.route('/admin/users', methods=['GET']) +def admin_list_users(): + """List users for the super admin console.""" + _, error = _admin_guard(prefer_json=True) + if error: + return error + include_inactive = _parse_truthy(request.args.get("include_inactive"), default=True) + try: + users = list_users(include_inactive=include_inactive) + return jsonify({"success": True, "data": {"users": users}}) + except Exception as exc: + return jsonify({"success": False, "message": str(exc)}), 500 + + +@v2_bp.route('/admin/users', methods=['POST']) +def admin_create_user(): + """Create a new application user.""" + _, error = _admin_guard(prefer_json=True) + if error: + return error + payload = request.get_json(silent=True) or {} + username = (payload.get("username") or "").strip().lower() + password = (payload.get("password") or "").strip() + if not username or not password: + return jsonify({"success": False, "message": "用户名和密码均不能为空"}), 400 + try: + user = create_user( + username=username, + password=password, + display_name=payload.get("display_name"), + role=(payload.get("role") or "department_admin").strip(), + grade=int(payload.get("grade") or 50), + service_department_id=(payload.get("service_department_id") or "").strip() or None, + department_role=payload.get("department_role"), + ) + return jsonify({"success": True, "data": {"user": user}}) + except ValueError as exc: + return jsonify({"success": False, "message": str(exc)}), 400 + except Exception as exc: + return jsonify({"success": False, "message": str(exc)}), 500 + + +@v2_bp.route('/admin/users/', methods=['PATCH']) +def admin_update_user(user_id: str): + """Update password or service department for an existing user.""" + _, error = _admin_guard(prefer_json=True) + if error: + return error + payload = request.get_json(silent=True) or {} + update_kwargs: Dict[str, Any] = {} + if "password" in payload: + update_kwargs["password"] = payload.get("password") or "" + if "service_department_id" in payload: + dept_value = payload.get("service_department_id") + if isinstance(dept_value, str) and not dept_value.strip(): + dept_value = None + update_kwargs["service_department_id"] = dept_value + if "display_name" in payload: + update_kwargs["display_name"] = payload.get("display_name") + if "role" in payload: + update_kwargs["role"] = payload.get("role") + if "grade" in payload: + update_kwargs["grade"] = payload.get("grade") + if "department_role" in payload: + update_kwargs["department_role"] = payload.get("department_role") + + if not update_kwargs: + return jsonify({"success": False, "message": "请至少提供一个需要更新的字段"}), 400 + try: + updated = update_user_account(user_id, **update_kwargs) + except ValueError as exc: + return jsonify({"success": False, "message": str(exc)}), 400 + except Exception as exc: + return jsonify({"success": False, "message": str(exc)}), 500 + if not updated: + return jsonify({"success": False, "message": "用户不存在"}), 404 + return jsonify({"success": True, "data": {"user": updated}}) + + +@v2_bp.route('/admin/service-departments', methods=['GET']) +def admin_list_service_departments(): + """Return all service departments in a flat list.""" + _, error = _admin_guard(prefer_json=True) + if error: + return error + try: + departments = list_service_departments() + return jsonify({"success": True, "data": {"departments": departments}}) + except Exception as exc: + return jsonify({"success": False, "message": str(exc)}), 500 + + +@v2_bp.route('/admin/service-departments', methods=['POST']) +def admin_create_service_department(): + """Create a service department node.""" + _, error = _admin_guard(prefer_json=True) + if error: + return error + payload = request.get_json(silent=True) or {} + name = payload.get("name") + if not name or not str(name).strip(): + return jsonify({"success": False, "message": "服务部门名称不能为空"}), 400 + kwargs = { + "code": (payload.get("code") or "").strip() or None, + "phone": (payload.get("phone") or "").strip() or None, + "parent_id": (payload.get("parent_id") or "").strip() or None, + "region_id": (payload.get("region_id") or "").strip() or None, + "description": payload.get("description"), + } + try: + department = create_service_department(name, **kwargs) + return jsonify({"success": True, "data": {"department": department}}) + except ValueError as exc: + return jsonify({"success": False, "message": str(exc)}), 400 + except Exception as exc: + return jsonify({"success": False, "message": str(exc)}), 500 + + +@v2_bp.route('/admin/service-departments/', methods=['PATCH']) +def admin_update_service_department(dept_id: str): + """Update department metadata such as name or phone.""" + _, error = _admin_guard(prefer_json=True) + if error: + return error + payload = request.get_json(silent=True) or {} + updates: Dict[str, Any] = {} + for field in ("name", "phone", "parent_id", "region_id", "description"): + if field in payload: + value = payload.get(field) + if isinstance(value, str) and not value.strip(): + value = None + updates[field] = value + if not updates: + return jsonify({"success": False, "message": "没有需要更新的字段"}), 400 + try: + department = update_service_department(dept_id, **updates) + except ValueError as exc: + return jsonify({"success": False, "message": str(exc)}), 400 + except Exception as exc: + return jsonify({"success": False, "message": str(exc)}), 500 + if not department: + return jsonify({"success": False, "message": "服务部门不存在"}), 404 + return jsonify({"success": True, "data": {"department": department}}) + + +@v2_bp.route('/admin/service-departments/', methods=['DELETE']) +def admin_delete_service_department(dept_id: str): + """Delete a service department node (only when no users are bound).""" + _, error = _admin_guard(prefer_json=True) + if error: + return error + try: + result = delete_service_department(dept_id) + except ValueError as exc: + return jsonify({"success": False, "message": str(exc)}), 400 + except Exception as exc: + return jsonify({"success": False, "message": str(exc)}), 500 + if not result.get("deleted"): + return jsonify({"success": False, "message": result.get("message", "删除失败")}), 400 + return jsonify({"success": True, "data": result}) + + +@v2_bp.route('/admin/themes/catalog', methods=['GET']) +def admin_theme_catalog(): + """List all available themes.""" + _, error = _admin_guard(prefer_json=True) + if error: + return error + try: + themes = list_all_themes() + return jsonify({"success": True, "data": {"themes": themes}}) + except Exception as exc: + return jsonify({"success": False, "message": str(exc)}), 500 + + +@v2_bp.route('/admin/themes/catalog', methods=['POST']) +def admin_create_theme(): + """Create a new theme.""" + _, error = _admin_guard(prefer_json=True) + if error: + return error + payload = request.get_json(silent=True) or {} + name = payload.get("name") + if not name or not str(name).strip(): + return jsonify({"success": False, "message": "主题名称不能为空"}), 400 + try: + theme = create_theme(name) + return jsonify({"success": True, "data": {"theme": theme}}) + except ValueError as exc: + return jsonify({"success": False, "message": str(exc)}), 400 + except Exception as exc: + return jsonify({"success": False, "message": str(exc)}), 500 + + +@v2_bp.route('/admin/themes/catalog/', methods=['PATCH']) +def admin_rename_theme(theme_id: str): + """Rename an existing theme.""" + _, error = _admin_guard(prefer_json=True) + if error: + return error + payload = request.get_json(silent=True) or {} + name = payload.get("name") or payload.get("new_name") + if not name or not str(name).strip(): + return jsonify({"success": False, "message": "主题名称不能为空"}), 400 + try: + theme = rename_theme(theme_id, name) + return jsonify({"success": True, "data": {"theme": theme}}) + except ValueError as exc: + return jsonify({"success": False, "message": str(exc)}), 400 + except Exception as exc: + return jsonify({"success": False, "message": str(exc)}), 500 + + +@v2_bp.route('/admin/themes/catalog/', methods=['DELETE']) +def admin_delete_theme(theme_id: str): + """Delete a theme and related region associations.""" + _, error = _admin_guard(prefer_json=True) + if error: + return error + try: + result = delete_theme(theme_id) + except ValueError as exc: + return jsonify({"success": False, "message": str(exc)}), 400 + except Exception as exc: + return jsonify({"success": False, "message": str(exc)}), 500 + if not result.get("deleted"): + return jsonify({"success": False, "message": result.get("message", "主题不存在")}), 404 + return jsonify({"success": True, "data": result}) + + +@v2_bp.route('/admin/templates/permit', methods=['GET']) +def admin_permit_template_meta(): + """Return metadata for the import template.""" + _, error = _admin_guard(prefer_json=True) + if error: + return error + try: + data = get_permit_template_metadata() + return jsonify({"success": True, "data": data}) + except Exception as exc: + return jsonify({"success": False, "message": str(exc)}), 500 + + +@v2_bp.route('/admin/templates/permit', methods=['POST']) +def admin_update_permit_template(): + """Overwrite the existing import template.""" + admin_user, error = _admin_guard(prefer_json=True) + if error: + return error + if 'file' not in request.files: + return jsonify({"success": False, "message": "请上传模板文件"}), 400 + file_storage = request.files['file'] + file_bytes = file_storage.read() + if not file_bytes: + return jsonify({"success": False, "message": "上传的模板为空"}), 400 + try: + meta = overwrite_permit_template( + file_bytes, + file_storage.filename or "import_template.xlsx", + uploaded_by=(admin_user or {}).get("username"), + ) + return jsonify({"success": True, "data": meta}) + except ValueError as exc: + return jsonify({"success": False, "message": str(exc)}), 400 + except Exception as exc: + return jsonify({"success": False, "message": str(exc)}), 500 + + @v2_bp.route('/admin/regions', methods=['GET']) def admin_regions(): """Get all regions for database maintenance.""" @@ -294,8 +610,7 @@ def admin_permit_import_upload(): @v2_bp.route('/admin/permit-import/template', methods=['GET']) def admin_permit_import_template(): """Provide the Excel import template for download.""" - project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) - template_path = os.path.join(project_root, 'data', 'template', '风险提示表 模板.xlsx') + template_path = get_permit_template_path() if not os.path.exists(template_path): return jsonify({"success": False, "message": "模板文件不存在,请联系管理员"}), 404 diff --git a/lawrisk/services/auth_service.py b/lawrisk/services/auth_service.py new file mode 100644 index 0000000..061cc52 --- /dev/null +++ b/lawrisk/services/auth_service.py @@ -0,0 +1,544 @@ +"""Authentication and authorization helpers for the LawRisk backend.""" +from __future__ import annotations + +import logging +import os +import uuid +from datetime import datetime +from typing import Any, Dict, List, Optional + +from passlib.context import CryptContext + +from lawrisk.services import licensing_repo as lic_repo + + +pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto") +logger = logging.getLogger(__name__) + + +def _normalize_department_code(raw_code: Optional[str]) -> str: + return (raw_code or "").strip().upper() + + +def _resolve_parent_department_id( + cur, parent_code: Optional[str], code_cache: Optional[Dict[str, str]] = None +) -> Optional[str]: + normalized = _normalize_department_code(parent_code) + if not normalized: + return None + if code_cache: + cached = code_cache.get(normalized) + if cached: + return cached + cur.execute( + "SELECT id FROM service_departments WHERE code = %s LIMIT 1", + (normalized,), + ) + row = cur.fetchone() + return str(row[0]) if row else None + + +def _auth_conn(autocommit: bool = False): + """Reuse the licensing_risks connection for auth tables.""" + return lic_repo._lic_pg_conn(autocommit=autocommit) + + +SERVICE_DEPARTMENT_SEEDS = [ + { + "code": "FSSJSJ", + "name": "市级服务部门", + "region_name": "市级", + "username": "FSSJSJ", + "display_name": "市级管理员", + "role": "department_admin", + "grade": 90, + }, + { + "code": "FSSJSS", + "name": "三水区服务部门", + "region_name": "三水区", + "parent_code": "FSSJSJ", + "username": "FSSJSS", + "display_name": "三水区管理员", + "role": "department_admin", + "grade": 80, + }, + { + "code": "FSSJNH", + "name": "南海区服务部门", + "region_name": "南海区", + "parent_code": "FSSJSJ", + "username": "FSSJNH", + "display_name": "南海区管理员", + "role": "department_admin", + "grade": 80, + }, + { + "code": "FSSJSD", + "name": "顺德区服务部门", + "region_name": "顺德区", + "parent_code": "FSSJSJ", + "username": "FSSJSD", + "display_name": "顺德区管理员", + "role": "department_admin", + "grade": 80, + }, + { + "code": "FSSJCC", + "name": "禅城区服务部门", + "region_name": "禅城区", + "parent_code": "FSSJSJ", + "username": "FSSJCC", + "display_name": "禅城区管理员", + "role": "department_admin", + "grade": 80, + }, + { + "code": "FSSJGM", + "name": "高明区服务部门", + "region_name": "高明区", + "parent_code": "FSSJSJ", + "username": "FSSJGM", + "display_name": "高明区管理员", + "role": "department_admin", + "grade": 80, + }, +] + + +def _resolve_region_id(cur, region_name: Optional[str]) -> Optional[str]: + if not region_name: + return None + cur.execute( + """ + SELECT id + FROM regions + WHERE name = %s + LIMIT 1 + """, + (region_name,), + ) + row = cur.fetchone() + return str(row[0]) if row else None + + +def _upsert_service_department( + cur, seed: Dict[str, Any], code_cache: Optional[Dict[str, str]] = None +) -> str: + code = _normalize_department_code(seed.get("code")) + name = seed.get("name") + if not code or not name: + raise ValueError("Service department seed requires code and name") + cur.execute( + "SELECT id FROM service_departments WHERE code = %s LIMIT 1", + (code,), + ) + row = cur.fetchone() + region_id = _resolve_region_id(cur, seed.get("region_name")) + description = seed.get("description") + parent_id = _resolve_parent_department_id(cur, seed.get("parent_code"), code_cache) + if row: + dept_id = str(row[0]) + cur.execute( + """ + UPDATE service_departments + SET name = %s, + parent_id = COALESCE(%s, parent_id), + region_id = COALESCE(%s, region_id), + description = COALESCE(%s, description), + updated_at = now() + WHERE id = %s + """, + (name, parent_id, region_id, description, dept_id), + ) + return dept_id + dept_id = str(uuid.uuid4()) + cur.execute( + """ + INSERT INTO service_departments (id, name, code, parent_id, region_id, description) + VALUES (%s, %s, %s, %s, %s, %s) + """, + (dept_id, name, code, parent_id, region_id, description), + ) + return dept_id + + +def _ensure_department_account(cur, seed: Dict[str, Any], department_id: str) -> None: + username = (seed.get("username") or seed.get("code") or "").strip().lower() + if not username: + raise ValueError("Department seed requires username") + display_name = seed.get("display_name") or f"{seed.get('name', username)}账号" + role = seed.get("role", "department_admin") + grade = int(seed.get("grade", 50)) + department_role = seed.get("department_role", "manager") + password = seed.get("password") or f"{seed.get('code', username).upper()}123456" + cur.execute( + "SELECT id FROM auth_users WHERE username = %s", + (username,), + ) + row = cur.fetchone() + if row: + cur.execute( + """ + UPDATE auth_users + SET service_department_id = %s, + role = COALESCE(%s, role), + department_role = COALESCE(%s, department_role) + WHERE id = %s + """, + (department_id, role, department_role, row[0]), + ) + return + user_id = uuid.uuid4().hex + password_hash = pwd_context.hash(password) + cur.execute( + """ + INSERT INTO auth_users ( + id, + username, + password_hash, + display_name, + role, + grade, + service_department_id, + department_role + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + """, + (user_id, username, password_hash, display_name, role, grade, department_id, department_role), + ) + logger.info("Seeded user %s (department=%s)", username, department_id) + + +def ensure_default_department_accounts() -> None: + """Ensure the default service departments and related accounts exist.""" + dept_ids: Dict[str, str] = {} + with _auth_conn() as conn: + _ensure_auth_schema(conn) + cur = conn.cursor() + for seed in SERVICE_DEPARTMENT_SEEDS: + dept_id = _upsert_service_department(cur, seed, dept_ids) + code_key = _normalize_department_code(seed.get("code")) + if code_key: + dept_ids[code_key] = dept_id + conn.commit() + + cur = conn.cursor() + for seed in SERVICE_DEPARTMENT_SEEDS: + dept_id = dept_ids.get(_normalize_department_code(seed.get("code"))) + if not dept_id: + continue + _ensure_department_account(cur, seed, dept_id) + conn.commit() + + +def _ensure_auth_schema(conn) -> None: + lic_repo._ensure_service_department_schema(conn) + cur = conn.cursor() + cur.execute( + """ + CREATE TABLE IF NOT EXISTS auth_users ( + id TEXT PRIMARY KEY, + username TEXT UNIQUE NOT NULL, + password_hash TEXT NOT NULL, + display_name TEXT, + role TEXT NOT NULL DEFAULT 'user', + grade INTEGER NOT NULL DEFAULT 0, + is_active BOOLEAN NOT NULL DEFAULT TRUE, + service_department_id uuid REFERENCES service_departments(id), + department_role text, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + """ + ) + cur.execute( + """ + CREATE INDEX IF NOT EXISTS auth_users_department_idx + ON auth_users (service_department_id) + """ + ) + conn.commit() + + +def ensure_auth_schema() -> None: + with _auth_conn() as conn: + _ensure_auth_schema(conn) + + +def _row_to_user(row: tuple[Any, ...], columns: tuple[str, ...]) -> Dict[str, Any]: + return {col: row[idx] for idx, col in enumerate(columns)} + + +def _public_user_payload(user: Dict[str, Any]) -> Dict[str, Any]: + department = None + if user.get("service_department_id"): + department = { + "id": user.get("service_department_id"), + "name": user.get("service_department_name"), + "code": user.get("service_department_code"), + "phone": user.get("service_department_phone"), + "parent_id": user.get("service_department_parent_id"), + "region_id": user.get("service_department_region_id"), + "role": user.get("department_role"), + } + created_at = user.get("created_at") + if isinstance(created_at, datetime): + created_at_value = created_at.isoformat() + else: + created_at_value = created_at + return { + "id": user.get("id"), + "username": user.get("username"), + "display_name": user.get("display_name"), + "role": user.get("role"), + "grade": user.get("grade"), + "is_active": user.get("is_active", True), + "department": department, + "department_id": user.get("service_department_id"), + "created_at": created_at_value, + } + + +def get_user_by_username(username: str) -> Optional[Dict[str, Any]]: + username = (username or "").strip().lower() + if not username: + return None + with _auth_conn() as conn: + cur = conn.cursor() + cur.execute( + """ + SELECT + au.id, + au.username, + au.display_name, + au.role, + au.grade, + au.is_active, + au.password_hash, + au.service_department_id, + au.department_role, + au.created_at, + sd.name AS service_department_name, + sd.code AS service_department_code, + sd.phone AS service_department_phone, + sd.parent_id AS service_department_parent_id, + sd.region_id AS service_department_region_id + FROM auth_users au + LEFT JOIN service_departments sd ON sd.id = au.service_department_id + WHERE au.username = %s + LIMIT 1 + """, + (username,), + ) + row = cur.fetchone() + if not row: + return None + columns = tuple(col[0] for col in cur.description) + return _row_to_user(row, columns) + + +def get_user_by_id(user_id: str) -> Optional[Dict[str, Any]]: + user_token = (user_id or "").strip() + if not user_token: + return None + with _auth_conn() as conn: + cur = conn.cursor() + cur.execute( + """ + SELECT + au.id, + au.username, + au.display_name, + au.role, + au.grade, + au.is_active, + au.service_department_id, + au.department_role, + au.created_at, + sd.name AS service_department_name, + sd.code AS service_department_code, + sd.phone AS service_department_phone, + sd.parent_id AS service_department_parent_id, + sd.region_id AS service_department_region_id + FROM auth_users au + LEFT JOIN service_departments sd ON sd.id = au.service_department_id + WHERE au.id = %s + LIMIT 1 + """, + (user_token,), + ) + row = cur.fetchone() + if not row: + return None + columns = tuple(col[0] for col in cur.description) + return _public_user_payload(_row_to_user(row, columns)) + + +def list_users(include_inactive: bool = False) -> List[Dict[str, Any]]: + where_clause = "" if include_inactive else "WHERE au.is_active = TRUE" + with _auth_conn() as conn: + cur = conn.cursor() + cur.execute( + f""" + SELECT + au.id, + au.username, + au.display_name, + au.role, + au.grade, + au.is_active, + au.service_department_id, + au.department_role, + au.created_at, + sd.name AS service_department_name, + sd.code AS service_department_code, + sd.phone AS service_department_phone, + sd.parent_id AS service_department_parent_id, + sd.region_id AS service_department_region_id + FROM auth_users au + LEFT JOIN service_departments sd ON sd.id = au.service_department_id + {where_clause} + ORDER BY au.created_at DESC + """ + ) + rows = cur.fetchall() + columns = tuple(col[0] for col in cur.description) + users: List[Dict[str, Any]] = [] + for row in rows: + users.append(_public_user_payload(_row_to_user(row, columns))) + return users + + +def create_user( + username: str, + password: str, + *, + display_name: Optional[str] = None, + role: str = "user", + grade: int = 0, + service_department_id: Optional[str] = None, + department_role: Optional[str] = None, +) -> Dict[str, Any]: + username_clean = (username or "").strip().lower() + if not username_clean or not password: + raise ValueError("Username and password are required") + user_id = uuid.uuid4().hex + password_hash = pwd_context.hash(password) + with _auth_conn() as conn: + cur = conn.cursor() + cur.execute( + """ + INSERT INTO auth_users ( + id, + username, + password_hash, + display_name, + role, + grade, + service_department_id, + department_role + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + RETURNING + id, + username, + display_name, + role, + grade, + is_active, + created_at, + service_department_id, + department_role + """, + ( + user_id, + username_clean, + password_hash, + display_name, + role, + grade, + service_department_id, + department_role, + ), + ) + row = cur.fetchone() + columns = tuple(col[0] for col in cur.description) + conn.commit() + return get_user_by_id(user_id) + + +def update_user_account( + user_id: str, + *, + password: Optional[str] = None, + service_department_id: Optional[str] = None, + display_name: Optional[str] = None, + role: Optional[str] = None, + grade: Optional[int] = None, + department_role: Optional[str] = None, +) -> Optional[Dict[str, Any]]: + user_token = (user_id or "").strip() + if not user_token: + raise ValueError("user_id 不能为空") + updates: List[str] = [] + values: List[Any] = [] + if password is not None: + if not password.strip(): + raise ValueError("密码不能为空") + password_hash = pwd_context.hash(password) + updates.append("password_hash = %s") + values.append(password_hash) + if service_department_id is not None: + dept_value = (service_department_id or "").strip() or None + updates.append("service_department_id = %s") + values.append(dept_value) + if display_name is not None: + updates.append("display_name = %s") + values.append(display_name.strip() if isinstance(display_name, str) else display_name) + if role is not None: + updates.append("role = %s") + values.append(role.strip() if isinstance(role, str) else role) + if grade is not None: + updates.append("grade = %s") + values.append(int(grade)) + if department_role is not None: + updates.append("department_role = %s") + values.append(department_role.strip() if isinstance(department_role, str) else department_role) + + if not updates: + return get_user_by_id(user_token) + + with _auth_conn() as conn: + cur = conn.cursor() + query = f"UPDATE auth_users SET {', '.join(updates)} WHERE id = %s RETURNING id" + values.append(user_token) + cur.execute(query, tuple(values)) + row = cur.fetchone() + if not row: + conn.rollback() + return None + conn.commit() + return get_user_by_id(user_token) + + +def verify_password(password: str, password_hash: str) -> bool: + if not password or not password_hash: + return False + try: + return pwd_context.verify(password, password_hash) + except ValueError: + return False + + +def ensure_seed_admin() -> None: + username = os.getenv("LAWRISK_ADMIN_USERNAME") + password = os.getenv("LAWRISK_ADMIN_PASSWORD") + if not username or not password: + return + existing = get_user_by_username(username) + if existing: + return + display_name = os.getenv("LAWRISK_ADMIN_DISPLAY_NAME", "Super Admin") + role = os.getenv("LAWRISK_ADMIN_ROLE", "admin") + grade = int(os.getenv("LAWRISK_ADMIN_GRADE", "100")) + create_user(username=username, password=password, display_name=display_name, role=role, grade=grade) diff --git a/lawrisk/services/licensing_repo.py b/lawrisk/services/licensing_repo.py index a9a1de5..e20c762 100644 --- a/lawrisk/services/licensing_repo.py +++ b/lawrisk/services/licensing_repo.py @@ -166,6 +166,9 @@ _PERMIT_SOURCES_TABLE_LOCK = threading.Lock() _PERMIT_FILE_SCHEMA_READY: Optional[bool] = None _PERMIT_FILE_SCHEMA_LOCK = threading.Lock() +_SERVICE_DEPARTMENT_SCHEMA_READY: Optional[bool] = None +_SERVICE_DEPARTMENT_SCHEMA_LOCK = threading.Lock() + _CANONICAL_REGION_KEYWORDS: Dict[str, Tuple[str, ...]] = { "市级": ("市级", "全市", "佛山市本级", "佛山市市级"), "禅城区": ("禅城区", "禅城"), @@ -210,6 +213,24 @@ def _clean_text(value: Any) -> str: return str(value).strip() +def _to_isoformat(value: Any) -> Optional[str]: + if value is None: + return None + if isinstance(value, datetime): + return value.isoformat() + if isinstance(value, date): + return datetime.combine(value, datetime.min.time()).isoformat() + return str(value) + + +def _to_optional_str(value: Any) -> Optional[str]: + if value is None: + return None + if isinstance(value, (uuid.UUID,)): + return str(value) + return str(value) + + def _normalize_permit_token(value: Any) -> str: """Normalize permit names for dictionary lookups (case/whitespace insensitive).""" text = _clean_text(value) @@ -1164,6 +1185,7 @@ def commit_permit_import_session( with _lic_pg_conn(autocommit=False) as conn: try: _ensure_permit_sources_table(conn) + _ensure_service_department_schema(conn) _ensure_permit_theme_override_schema(conn) if session_file_bytes: _ensure_permit_file_schema(conn) @@ -1754,6 +1776,402 @@ def _ensure_permit_sources_table(conn: Optional[pg.Connection] = None) -> None: _PERMIT_SOURCES_TABLE_PRESENT = True +def _create_service_department_schema(conn: pg.Connection) -> None: + cur = conn.cursor() + cur.execute( + """ + CREATE TABLE IF NOT EXISTS service_departments ( + id uuid PRIMARY KEY, + name text NOT NULL, + code text NOT NULL UNIQUE, + phone text, + parent_id uuid REFERENCES service_departments(id) ON DELETE SET NULL, + region_id uuid REFERENCES regions(id) ON DELETE SET NULL, + description text, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() + ) + """ + ) + cur.execute( + """ + CREATE UNIQUE INDEX IF NOT EXISTS service_departments_name_idx + ON service_departments (name) + """ + ) + cur.execute( + """ + ALTER TABLE IF EXISTS service_departments + ADD COLUMN IF NOT EXISTS parent_id uuid REFERENCES service_departments(id) ON DELETE SET NULL + """ + ) + cur.execute( + """ + ALTER TABLE IF EXISTS service_departments + ADD COLUMN IF NOT EXISTS phone text + """ + ) + cur.execute( + """ + CREATE INDEX IF NOT EXISTS service_departments_parent_idx + ON service_departments (parent_id) + """ + ) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS service_department_permits ( + department_id uuid NOT NULL REFERENCES service_departments(id) ON DELETE CASCADE, + region_id uuid NOT NULL REFERENCES regions(id) ON DELETE CASCADE, + permit_id uuid NOT NULL REFERENCES permits(id) ON DELETE CASCADE, + created_at timestamptz NOT NULL DEFAULT now(), + created_by text, + PRIMARY KEY (department_id, region_id, permit_id) + ) + """ + ) + cur.execute( + """ + CREATE INDEX IF NOT EXISTS service_dept_permits_region_idx + ON service_department_permits (region_id, permit_id) + """ + ) + + +def _ensure_service_department_schema(conn: Optional[pg.Connection] = None) -> None: + global _SERVICE_DEPARTMENT_SCHEMA_READY + if _SERVICE_DEPARTMENT_SCHEMA_READY: + return + with _SERVICE_DEPARTMENT_SCHEMA_LOCK: + if _SERVICE_DEPARTMENT_SCHEMA_READY: + return + if conn is not None: + original_autocommit = conn.autocommit + try: + conn.autocommit = True + _create_service_department_schema(conn) + finally: + conn.autocommit = original_autocommit + else: + with _lic_pg_conn(autocommit=True) as ensure_conn: + _create_service_department_schema(ensure_conn) + _SERVICE_DEPARTMENT_SCHEMA_READY = True + + +_SERVICE_DEPARTMENT_SELECT = """ + SELECT + sd.id, + sd.name, + sd.code, + sd.phone, + sd.parent_id, + parent.name AS parent_name, + sd.region_id, + r.name AS region_name, + sd.description, + sd.created_at, + sd.updated_at + FROM service_departments sd + LEFT JOIN service_departments parent ON parent.id = sd.parent_id + LEFT JOIN regions r ON r.id = sd.region_id +""" + + +def _serialize_service_department_row(record: Dict[str, Any]) -> Dict[str, Any]: + return { + "id": _to_optional_str(record.get("id")), + "name": record.get("name"), + "code": record.get("code"), + "phone": record.get("phone"), + "parent_id": _to_optional_str(record.get("parent_id")), + "parent_name": record.get("parent_name"), + "region_id": _to_optional_str(record.get("region_id")), + "region_name": record.get("region_name"), + "description": record.get("description"), + "created_at": _to_isoformat(record.get("created_at")), + "updated_at": _to_isoformat(record.get("updated_at")), + } + + +def _fetch_service_department(cur: pg.Cursor, department_id: str) -> Optional[Dict[str, Any]]: + cur.execute( + _SERVICE_DEPARTMENT_SELECT + " WHERE sd.id = %s LIMIT 1", + (department_id,), + ) + row = cur.fetchone() + if not row: + return None + columns = tuple(col[0] for col in cur.description) + record = {columns[idx]: row[idx] for idx in range(len(columns))} + return _serialize_service_department_row(record) + + +def list_service_departments() -> List[Dict[str, Any]]: + with _lic_pg_conn() as conn: + _ensure_service_department_schema(conn) + cur = conn.cursor() + cur.execute(_SERVICE_DEPARTMENT_SELECT + " ORDER BY sd.created_at ASC") + rows = cur.fetchall() + columns = tuple(col[0] for col in cur.description) + departments: List[Dict[str, Any]] = [] + for row in rows: + record = {columns[idx]: row[idx] for idx in range(len(columns))} + departments.append(_serialize_service_department_row(record)) + return departments + + +def create_service_department( + name: str, + *, + code: Optional[str] = None, + phone: Optional[str] = None, + parent_id: Optional[str] = None, + region_id: Optional[str] = None, + description: Optional[str] = None, +) -> Dict[str, Any]: + normalized_name = _clean_text(name) + if not normalized_name: + raise ValueError("服务部门名称不能为空") + normalized_code = (_clean_text(code).upper() if code else "") or uuid.uuid4().hex[:8].upper() + parent_token = _clean_text(parent_id) or None + region_token = _clean_text(region_id) or None + description_text = description.strip() if isinstance(description, str) else description + + with _lic_pg_conn() as conn: + _ensure_service_department_schema(conn) + cur = conn.cursor() + dept_id = uuid.uuid4() + cur.execute( + """ + INSERT INTO service_departments (id, name, code, phone, parent_id, region_id, description) + VALUES (%s, %s, %s, %s, %s, %s, %s) + """, + (dept_id, normalized_name, normalized_code, phone, parent_token, region_token, description_text), + ) + conn.commit() + result = _fetch_service_department(cur, str(dept_id)) + if not result: + raise RuntimeError("创建服务部门失败") + return result + + +def update_service_department( + department_id: str, + *, + name: Optional[str] = None, + phone: Optional[str] = None, + parent_id: Optional[str] = None, + region_id: Optional[str] = None, + description: Optional[str] = None, +) -> Optional[Dict[str, Any]]: + dept_token = _clean_text(department_id) + if not dept_token: + raise ValueError("department_id 不能为空") + updates: List[str] = [] + values: List[Any] = [] + if name is not None: + normalized_name = _clean_text(name) + if not normalized_name: + raise ValueError("服务部门名称不能为空") + updates.append("name = %s") + values.append(normalized_name) + if phone is not None: + updates.append("phone = %s") + values.append(phone.strip() if isinstance(phone, str) else phone) + if parent_id is not None: + parent_token = _clean_text(parent_id) or None + if parent_token and parent_token == dept_token: + raise ValueError("服务部门不能设置为自己的上级") + updates.append("parent_id = %s") + values.append(parent_token) + if region_id is not None: + updates.append("region_id = %s") + values.append(_clean_text(region_id) or None) + if description is not None: + updates.append("description = %s") + values.append(description.strip() if isinstance(description, str) else description) + + if not updates: + return _get_service_department_by_id(dept_token) + + updates.append("updated_at = now()") + + with _lic_pg_conn() as conn: + _ensure_service_department_schema(conn) + cur = conn.cursor() + query = f"UPDATE service_departments SET {', '.join(updates)} WHERE id = %s" + values.append(dept_token) + cur.execute(query, tuple(values)) + conn.commit() + return _get_service_department_by_id(dept_token) + + +def _get_service_department_by_id(department_id: str) -> Optional[Dict[str, Any]]: + with _lic_pg_conn() as conn: + cur = conn.cursor() + return _fetch_service_department(cur, department_id) + + +def delete_service_department(department_id: str) -> Dict[str, Any]: + dept_token = _clean_text(department_id) + if not dept_token: + raise ValueError("department_id 不能为空") + with _lic_pg_conn() as conn: + _ensure_service_department_schema(conn) + cur = conn.cursor() + cur.execute( + "SELECT COUNT(*) FROM auth_users WHERE service_department_id = %s", + (dept_token,), + ) + bound_count = int(cur.fetchone()[0] or 0) + if bound_count > 0: + raise ValueError("仍有账号绑定该服务部门,请先解除绑定后再删除") + cur.execute( + "UPDATE service_departments SET parent_id = NULL WHERE parent_id = %s", + (dept_token,), + ) + cur.execute( + "DELETE FROM service_departments WHERE id = %s RETURNING id", + (dept_token,), + ) + row = cur.fetchone() + if not row: + conn.rollback() + return {"deleted": False} + conn.commit() + return {"deleted": True} + + +_THEME_SUMMARY_SELECT = """ + SELECT + t.id, + t.name, + COUNT(DISTINCT rtp.permit_id) AS permit_count, + COUNT(DISTINCT rtp.region_id) AS region_count + FROM themes t + LEFT JOIN region_theme_permits rtp ON rtp.theme_id = t.id +""" + + +def _serialize_theme_row(record: Dict[str, Any]) -> Dict[str, Any]: + return { + "id": _to_optional_str(record.get("id")), + "name": record.get("name"), + "permit_count": int(record.get("permit_count") or 0), + "region_count": int(record.get("region_count") or 0), + } + + +def _fetch_theme_summary(cur: pg.Cursor, theme_id: str) -> Optional[Dict[str, Any]]: + cur.execute( + _THEME_SUMMARY_SELECT + " WHERE t.id = %s GROUP BY t.id, t.name", + (theme_id,), + ) + row = cur.fetchone() + if not row: + return None + columns = tuple(col[0] for col in cur.description) + record = {columns[idx]: row[idx] for idx in range(len(columns))} + return _serialize_theme_row(record) + + +def list_all_themes() -> List[Dict[str, Any]]: + with _lic_pg_conn() as conn: + cur = conn.cursor() + cur.execute(_THEME_SUMMARY_SELECT + " GROUP BY t.id, t.name ORDER BY t.name ASC") + rows = cur.fetchall() + columns = tuple(col[0] for col in cur.description) + items: List[Dict[str, Any]] = [] + for row in rows: + record = {columns[idx]: row[idx] for idx in range(len(columns))} + items.append(_serialize_theme_row(record)) + return items + + +def create_theme(name: str) -> Dict[str, Any]: + normalized = _clean_text(name) + if not normalized: + raise ValueError("主题名称不能为空") + with _lic_pg_conn() as conn: + cur = conn.cursor() + cur.execute( + """ + INSERT INTO themes (name) + VALUES (%s) + ON CONFLICT (name) DO NOTHING + RETURNING id + """, + (normalized,), + ) + row = cur.fetchone() + if not row: + cur.execute("SELECT id FROM themes WHERE name = %s LIMIT 1", (normalized,)) + row = cur.fetchone() + conn.commit() + if not row: + raise RuntimeError("创建主题失败") + theme_id = str(row[0]) + summary = _fetch_theme_summary(cur, theme_id) + if not summary: + raise RuntimeError("无法加载主题信息") + return summary + + +def rename_theme(theme_id: str, new_name: str) -> Dict[str, Any]: + theme_token = _clean_text(theme_id) + if not theme_token: + raise ValueError("theme_id 不能为空") + normalized = _clean_text(new_name) + if not normalized: + raise ValueError("主题名称不能为空") + with _lic_pg_conn() as conn: + cur = conn.cursor() + cur.execute( + "UPDATE themes SET name = %s WHERE id = %s RETURNING id", + (normalized, theme_token), + ) + row = cur.fetchone() + if not row: + conn.rollback() + raise ValueError("主题不存在或已被删除") + conn.commit() + summary = _fetch_theme_summary(cur, theme_token) + if not summary: + raise RuntimeError("无法刷新主题信息") + return summary + + +def delete_theme(theme_id: str) -> Dict[str, Any]: + theme_token = _clean_text(theme_id) + if not theme_token: + raise ValueError("theme_id 不能为空") + with _lic_pg_conn() as conn: + cur = conn.cursor() + cur.execute( + "DELETE FROM region_theme_permits WHERE theme_id = %s", + (theme_token,), + ) + rtp_deleted = int(cur.rowcount or 0) + cur.execute( + "DELETE FROM region_themes WHERE theme_id = %s", + (theme_token,), + ) + rt_deleted = int(cur.rowcount or 0) + cur.execute( + "DELETE FROM themes WHERE id = %s RETURNING id", + (theme_token,), + ) + row = cur.fetchone() + if not row: + conn.rollback() + return {"deleted": False, "message": "主题不存在"} + conn.commit() + return { + "deleted": True, + "region_theme_permits_deleted": rtp_deleted, + "region_themes_deleted": rt_deleted, + } + + def _create_permit_file_schema(conn: pg.Connection) -> None: """Create permit file storage tables on demand.""" cur = conn.cursor() diff --git a/lawrisk/services/template_service.py b/lawrisk/services/template_service.py new file mode 100644 index 0000000..8deef8f --- /dev/null +++ b/lawrisk/services/template_service.py @@ -0,0 +1,77 @@ +"""Helpers for managing import template files used by administrators.""" +from __future__ import annotations + +import json +import os +from datetime import datetime, timezone +from typing import Any, Dict + + +PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +TEMPLATE_DIR = os.path.join(PROJECT_ROOT, "data", "template") +PERMIT_TEMPLATE_FILENAME = "风险提示表 模板.xlsx" +TEMPLATE_META_FILENAME = "template_meta.json" + + +def get_permit_template_path() -> str: + """Return the absolute path to the permit import template.""" + return os.path.join(TEMPLATE_DIR, PERMIT_TEMPLATE_FILENAME) + + +def _meta_path() -> str: + return os.path.join(TEMPLATE_DIR, TEMPLATE_META_FILENAME) + + +def _read_meta() -> Dict[str, Any]: + path = _meta_path() + if not os.path.exists(path): + return {} + try: + with open(path, "r", encoding="utf-8") as meta_file: + return json.load(meta_file) + except (json.JSONDecodeError, OSError): + return {} + + +def get_permit_template_metadata() -> Dict[str, Any]: + """Return metadata for the permit template file (size, timestamps, uploader).""" + template_path = get_permit_template_path() + exists = os.path.exists(template_path) + metadata = { + "exists": exists, + "filename": os.path.basename(template_path), + "filesize": os.path.getsize(template_path) if exists else 0, + "updated_at": None, + "uploaded_by": None, + "source_filename": None, + "download_url": "/fs-ai-asistant/api/workflow/lawrisk/admin/permit-import/template", + } + meta_file_data = _read_meta() + metadata.update({k: v for k, v in meta_file_data.items() if k in metadata or k in {"hash"} }) + if metadata.get("updated_at") and isinstance(metadata["updated_at"], (int, float)): + metadata["updated_at"] = datetime.fromtimestamp(metadata["updated_at"], tz=timezone.utc).isoformat() + return metadata + + +def overwrite_permit_template(file_bytes: bytes, filename: str, uploaded_by: str | None = None) -> Dict[str, Any]: + """Overwrite the stored permit template with the uploaded bytes.""" + if not file_bytes: + raise ValueError("模板文件内容不能为空") + os.makedirs(TEMPLATE_DIR, exist_ok=True) + target_path = get_permit_template_path() + with open(target_path, "wb") as template_file: + template_file.write(file_bytes) + + timestamp = datetime.now(tz=timezone.utc).isoformat() + meta_payload = { + "updated_at": timestamp, + "uploaded_by": uploaded_by, + "source_filename": filename, + "filesize": len(file_bytes), + "filename": os.path.basename(target_path), + } + with open(_meta_path(), "w", encoding="utf-8") as meta_file: + json.dump(meta_payload, meta_file, ensure_ascii=False, indent=2) + meta_payload["exists"] = True + meta_payload["download_url"] = "/fs-ai-asistant/api/workflow/lawrisk/admin/permit-import/template" + return meta_payload diff --git a/static/super_admin.html b/static/super_admin.html new file mode 100644 index 0000000..cf864d5 --- /dev/null +++ b/static/super_admin.html @@ -0,0 +1,711 @@ + + + + + + LawRisk 超级管理员控制台 + + + +
+
+
+

LawRisk 法律风险提示系统

+

超级管理员 · 用户、部门、主题与模板一站式管控

+
+
+
+ 未登录 + role +
+
+ +
+
+ +
+ +
+
+

用户管理

+
+

用户添加

+
+ + + + + +
+
+
+

密码 / 部门调整

+
+ + + + +
+
+
+

用户列表

+
+ + + + + + + + + + + +
用户名显示名部门角色创建时间
+
+
+
+ +
+

服务部门管理

+
+ + + + + +
+
+ + + + + + + + + + +
名称上级电话操作
+
+
+ +
+

主题列表管理

+
+ + +
+
+ + + + + + + + + + +
主题关联许可覆盖区划操作
+
+
+ +
+

模板管理

+
+ 模板未加载 +
+
+ + +
+ +
+
+
+ + + +