feat: add super admin console

This commit is contained in:
Codex Agent 2025-11-14 15:46:18 +08:00
parent 90fa969046
commit 168cdf6470
7 changed files with 2292 additions and 5 deletions

View File

@ -0,0 +1,44 @@
CREATE TABLE IF NOT EXISTS service_departments (
id uuid PRIMARY KEY,
name text NOT NULL,
code text NOT NULL UNIQUE,
phone text,
parent_id uuid REFERENCES service_departments(id) ON DELETE SET NULL,
region_id uuid REFERENCES regions(id) ON DELETE SET NULL,
description text,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE UNIQUE INDEX IF NOT EXISTS service_departments_name_idx
ON service_departments (name);
ALTER TABLE IF EXISTS service_departments
ADD COLUMN IF NOT EXISTS parent_id uuid REFERENCES service_departments(id) ON DELETE SET NULL;
ALTER TABLE IF EXISTS service_departments
ADD COLUMN IF NOT EXISTS phone text;
CREATE INDEX IF NOT EXISTS service_departments_parent_idx
ON service_departments (parent_id);
CREATE TABLE IF NOT EXISTS service_department_permits (
department_id uuid NOT NULL REFERENCES service_departments(id) ON DELETE CASCADE,
region_id uuid NOT NULL REFERENCES regions(id) ON DELETE CASCADE,
permit_id uuid NOT NULL REFERENCES permits(id) ON DELETE CASCADE,
created_at timestamptz NOT NULL DEFAULT now(),
created_by text,
PRIMARY KEY (department_id, region_id, permit_id)
);
CREATE INDEX IF NOT EXISTS service_dept_permits_region_idx
ON service_department_permits (region_id, permit_id);
ALTER TABLE IF EXISTS auth_users
ADD COLUMN IF NOT EXISTS service_department_id uuid REFERENCES service_departments(id);
ALTER TABLE IF EXISTS auth_users
ADD COLUMN IF NOT EXISTS department_role text;
CREATE INDEX IF NOT EXISTS auth_users_service_department_idx
ON auth_users (service_department_id);

178
lawrisk/api/auth.py Normal file
View File

@ -0,0 +1,178 @@
"""Auth blueprint: login page, authentication endpoints, and helpers."""
from __future__ import annotations
from functools import wraps
from typing import Any, Callable, Dict, Iterable, Optional, Tuple
from urllib.parse import urlparse
from flask import (
Blueprint,
Response,
jsonify,
redirect,
render_template,
request,
session,
url_for,
)
from lawrisk.services.auth_service import get_user_by_username, verify_password
auth_bp = Blueprint("auth", __name__)
SESSION_USER_KEY = "auth_user"
def _safe_next_url(candidate: Optional[str]) -> str:
if not candidate:
return "/"
raw = str(candidate).strip()
if not raw:
return "/"
parsed = urlparse(raw)
if parsed.netloc:
safe_path = parsed.path or "/"
else:
safe_path = raw if raw.startswith("/") else f"/{raw}"
if parsed.query:
safe_path = f"{safe_path}?{parsed.query}"
return safe_path
def _scrub_user_payload(user: Dict[str, Any]) -> Dict[str, Any]:
department = None
if user.get("service_department_id"):
department = {
"id": user.get("service_department_id"),
"name": user.get("service_department_name"),
"code": user.get("service_department_code"),
"region_id": user.get("service_department_region_id"),
"parent_id": user.get("service_department_parent_id"),
"phone": user.get("service_department_phone"),
"role": user.get("department_role"),
}
return {
"id": user.get("id"),
"username": user.get("username"),
"display_name": user.get("display_name"),
"role": user.get("role"),
"grade": user.get("grade"),
"department": department,
}
def ensure_admin_access(
allowed_roles: Optional[Iterable[str]] = None,
*,
prefer_json: bool = False,
) -> Tuple[Optional[Dict[str, Any]], Optional[Response]]:
"""Ensure the current session belongs to an admin role."""
user = get_current_user()
wants_html = (
(not prefer_json)
and request.method == "GET"
and request.accept_mimetypes.accept_html
and (
request.accept_mimetypes["text/html"]
>= request.accept_mimetypes["application/json"]
)
)
if not user:
if wants_html:
login_url = url_for("auth.login_page", next=request.url)
return None, redirect(login_url)
return None, (jsonify({"error": "authentication required"}), 401)
default_roles = ("admin",) if allowed_roles is None else allowed_roles
allowed = {str(role).lower() for role in default_roles if role}
current_role = str(user.get("role") or "").lower()
if allowed and current_role not in allowed:
if wants_html:
login_url = url_for("auth.login_page", next=request.url, force=1)
return None, redirect(login_url)
return None, (jsonify({"error": "admin privileges required"}), 403)
return user, None
def get_current_user() -> Optional[Dict[str, Any]]:
data = session.get(SESSION_USER_KEY)
if not isinstance(data, dict):
return None
return data
def login_required(view: Callable[..., Response]) -> Callable[..., Response]:
@wraps(view)
def wrapper(*args: Any, **kwargs: Any) -> Response:
user = get_current_user()
if not user:
wants_html = request.method == "GET" and request.accept_mimetypes.accept_html and (
request.accept_mimetypes["text/html"] >= request.accept_mimetypes["application/json"]
)
if wants_html:
login_url = url_for("auth.login_page", next=request.url)
return redirect(login_url)
return jsonify({"error": "authentication required"}), 401
return view(*args, **kwargs)
return wrapper
def _truthy_param(value: Optional[str]) -> bool:
if value is None:
return False
return str(value).strip().lower() in {"1", "true", "yes", "on", "force", "reauth"}
@auth_bp.get("/fs-ai-asistant/lawrisk/login")
def login_page() -> Response:
user = get_current_user()
force_login = _truthy_param(request.args.get("force") or request.args.get("reauth"))
if user and not force_login:
return redirect(request.args.get("next") or "/")
return render_template("login.html")
@auth_bp.post("/auth/login")
def login_action() -> Response:
payload = request.get_json(silent=True) or request.form
username = (payload.get("username", "") if payload else "").strip().lower()
password = (payload.get("password", "") if payload else "").strip()
next_url = _safe_next_url((payload.get("next") if payload else None) or request.args.get("next"))
if not username or not password:
return jsonify({"error": "username and password are required"}), 400
user = get_user_by_username(username)
if not user or not verify_password(password, user.get("password_hash", "")):
return jsonify({"error": "invalid credentials"}), 401
if not user.get("is_active", True):
return jsonify({"error": "account disabled"}), 403
sanitized = _scrub_user_payload(user)
session[SESSION_USER_KEY] = sanitized
# Form submissions expect redirects; API clients expect JSON
if request.content_type and "application/x-www-form-urlencoded" in request.content_type:
return redirect(next_url)
return jsonify({"message": "login successful", "user": sanitized, "redirect": next_url})
@auth_bp.post("/auth/logout")
def logout_action() -> Response:
session.pop(SESSION_USER_KEY, None)
if request.content_type and "application/x-www-form-urlencoded" in request.content_type:
return redirect(url_for("auth.login_page"))
return jsonify({"message": "logged out"})
@auth_bp.get("/auth/me")
def current_user_endpoint() -> Response:
user = get_current_user()
if not user:
return jsonify({"authenticated": False}), 401
return jsonify({"authenticated": True, "user": user})

View File

@ -8,7 +8,7 @@ from flask import Blueprint, jsonify, request, send_file
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, Optional
from lawrisk.api.auth import login_required, get_current_user
from lawrisk.api.auth import login_required, get_current_user, ensure_admin_access
from lawrisk.services.lawrisk_v2_service import search_v2, list_regions
from lawrisk.services.licensing_repo import (
list_permits_for_region,
@ -32,12 +32,49 @@ from lawrisk.services.licensing_repo import (
list_stored_permit_files,
start_import_session_from_file,
delete_stored_permit_file,
list_service_departments,
create_service_department,
update_service_department,
delete_service_department,
list_all_themes,
create_theme,
rename_theme,
delete_theme,
)
from lawrisk.services.auth_service import (
list_users,
create_user,
update_user_account,
)
from lawrisk.services.template_service import (
get_permit_template_path,
get_permit_template_metadata,
overwrite_permit_template,
)
from lawrisk.services.lawrisk_service import suggest_questions_embed
v2_bp = Blueprint('lawrisk_v2', __name__, url_prefix='/fs-ai-asistant/api/workflow/lawrisk')
def _project_root() -> str:
return os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
def _parse_truthy(value: Any, default: bool = False) -> bool:
if value is None:
return default
if isinstance(value, bool):
return value
return str(value).strip().lower() in {"1", "true", "yes", "on"}
def _admin_guard(*, prefer_json: bool = False, roles: Optional[Iterable[str]] = None):
user, error = ensure_admin_access(allowed_roles=roles, prefer_json=prefer_json)
if error:
return None, error
return user, None
@v2_bp.route('/v2', methods=['POST', 'GET'])
def lawrisk_search_v2():
"""V2 search endpoint with structured results."""
@ -168,8 +205,7 @@ def test_simple():
@login_required
def db_admin_page():
"""Serve the database administration UI."""
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
html_path = os.path.join(project_root, 'static', 'db_admin.html')
html_path = os.path.join(_project_root(), 'static', 'db_admin.html')
if not os.path.exists(html_path):
return jsonify({"success": False, "message": "DB admin page not found"}), 404
@ -177,6 +213,286 @@ def db_admin_page():
return send_file(html_path, mimetype='text/html')
@v2_bp.route('/admin/super', methods=['GET'])
def super_admin_page():
"""Serve the super administrator console."""
_, error = _admin_guard(roles=("admin",))
if error:
return error
html_path = os.path.join(_project_root(), 'static', 'super_admin.html')
if not os.path.exists(html_path):
return jsonify({"success": False, "message": "Super admin page not found"}), 404
return send_file(html_path, mimetype='text/html')
@v2_bp.route('/admin/users', methods=['GET'])
def admin_list_users():
"""List users for the super admin console."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
include_inactive = _parse_truthy(request.args.get("include_inactive"), default=True)
try:
users = list_users(include_inactive=include_inactive)
return jsonify({"success": True, "data": {"users": users}})
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/users', methods=['POST'])
def admin_create_user():
"""Create a new application user."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
username = (payload.get("username") or "").strip().lower()
password = (payload.get("password") or "").strip()
if not username or not password:
return jsonify({"success": False, "message": "用户名和密码均不能为空"}), 400
try:
user = create_user(
username=username,
password=password,
display_name=payload.get("display_name"),
role=(payload.get("role") or "department_admin").strip(),
grade=int(payload.get("grade") or 50),
service_department_id=(payload.get("service_department_id") or "").strip() or None,
department_role=payload.get("department_role"),
)
return jsonify({"success": True, "data": {"user": user}})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/users/<user_id>', methods=['PATCH'])
def admin_update_user(user_id: str):
"""Update password or service department for an existing user."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
update_kwargs: Dict[str, Any] = {}
if "password" in payload:
update_kwargs["password"] = payload.get("password") or ""
if "service_department_id" in payload:
dept_value = payload.get("service_department_id")
if isinstance(dept_value, str) and not dept_value.strip():
dept_value = None
update_kwargs["service_department_id"] = dept_value
if "display_name" in payload:
update_kwargs["display_name"] = payload.get("display_name")
if "role" in payload:
update_kwargs["role"] = payload.get("role")
if "grade" in payload:
update_kwargs["grade"] = payload.get("grade")
if "department_role" in payload:
update_kwargs["department_role"] = payload.get("department_role")
if not update_kwargs:
return jsonify({"success": False, "message": "请至少提供一个需要更新的字段"}), 400
try:
updated = update_user_account(user_id, **update_kwargs)
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
if not updated:
return jsonify({"success": False, "message": "用户不存在"}), 404
return jsonify({"success": True, "data": {"user": updated}})
@v2_bp.route('/admin/service-departments', methods=['GET'])
def admin_list_service_departments():
"""Return all service departments in a flat list."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
try:
departments = list_service_departments()
return jsonify({"success": True, "data": {"departments": departments}})
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/service-departments', methods=['POST'])
def admin_create_service_department():
"""Create a service department node."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
name = payload.get("name")
if not name or not str(name).strip():
return jsonify({"success": False, "message": "服务部门名称不能为空"}), 400
kwargs = {
"code": (payload.get("code") or "").strip() or None,
"phone": (payload.get("phone") or "").strip() or None,
"parent_id": (payload.get("parent_id") or "").strip() or None,
"region_id": (payload.get("region_id") or "").strip() or None,
"description": payload.get("description"),
}
try:
department = create_service_department(name, **kwargs)
return jsonify({"success": True, "data": {"department": department}})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/service-departments/<dept_id>', methods=['PATCH'])
def admin_update_service_department(dept_id: str):
"""Update department metadata such as name or phone."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
updates: Dict[str, Any] = {}
for field in ("name", "phone", "parent_id", "region_id", "description"):
if field in payload:
value = payload.get(field)
if isinstance(value, str) and not value.strip():
value = None
updates[field] = value
if not updates:
return jsonify({"success": False, "message": "没有需要更新的字段"}), 400
try:
department = update_service_department(dept_id, **updates)
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
if not department:
return jsonify({"success": False, "message": "服务部门不存在"}), 404
return jsonify({"success": True, "data": {"department": department}})
@v2_bp.route('/admin/service-departments/<dept_id>', methods=['DELETE'])
def admin_delete_service_department(dept_id: str):
"""Delete a service department node (only when no users are bound)."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
try:
result = delete_service_department(dept_id)
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
if not result.get("deleted"):
return jsonify({"success": False, "message": result.get("message", "删除失败")}), 400
return jsonify({"success": True, "data": result})
@v2_bp.route('/admin/themes/catalog', methods=['GET'])
def admin_theme_catalog():
"""List all available themes."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
try:
themes = list_all_themes()
return jsonify({"success": True, "data": {"themes": themes}})
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/themes/catalog', methods=['POST'])
def admin_create_theme():
"""Create a new theme."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
name = payload.get("name")
if not name or not str(name).strip():
return jsonify({"success": False, "message": "主题名称不能为空"}), 400
try:
theme = create_theme(name)
return jsonify({"success": True, "data": {"theme": theme}})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/themes/catalog/<theme_id>', methods=['PATCH'])
def admin_rename_theme(theme_id: str):
"""Rename an existing theme."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
payload = request.get_json(silent=True) or {}
name = payload.get("name") or payload.get("new_name")
if not name or not str(name).strip():
return jsonify({"success": False, "message": "主题名称不能为空"}), 400
try:
theme = rename_theme(theme_id, name)
return jsonify({"success": True, "data": {"theme": theme}})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/themes/catalog/<theme_id>', methods=['DELETE'])
def admin_delete_theme(theme_id: str):
"""Delete a theme and related region associations."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
try:
result = delete_theme(theme_id)
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
if not result.get("deleted"):
return jsonify({"success": False, "message": result.get("message", "主题不存在")}), 404
return jsonify({"success": True, "data": result})
@v2_bp.route('/admin/templates/permit', methods=['GET'])
def admin_permit_template_meta():
"""Return metadata for the import template."""
_, error = _admin_guard(prefer_json=True)
if error:
return error
try:
data = get_permit_template_metadata()
return jsonify({"success": True, "data": data})
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/templates/permit', methods=['POST'])
def admin_update_permit_template():
"""Overwrite the existing import template."""
admin_user, error = _admin_guard(prefer_json=True)
if error:
return error
if 'file' not in request.files:
return jsonify({"success": False, "message": "请上传模板文件"}), 400
file_storage = request.files['file']
file_bytes = file_storage.read()
if not file_bytes:
return jsonify({"success": False, "message": "上传的模板为空"}), 400
try:
meta = overwrite_permit_template(
file_bytes,
file_storage.filename or "import_template.xlsx",
uploaded_by=(admin_user or {}).get("username"),
)
return jsonify({"success": True, "data": meta})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/regions', methods=['GET'])
def admin_regions():
"""Get all regions for database maintenance."""
@ -294,8 +610,7 @@ def admin_permit_import_upload():
@v2_bp.route('/admin/permit-import/template', methods=['GET'])
def admin_permit_import_template():
"""Provide the Excel import template for download."""
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
template_path = os.path.join(project_root, 'data', 'template', '风险提示表 模板.xlsx')
template_path = get_permit_template_path()
if not os.path.exists(template_path):
return jsonify({"success": False, "message": "模板文件不存在,请联系管理员"}), 404

View File

@ -0,0 +1,544 @@
"""Authentication and authorization helpers for the LawRisk backend."""
from __future__ import annotations
import logging
import os
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
from passlib.context import CryptContext
from lawrisk.services import licensing_repo as lic_repo
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
logger = logging.getLogger(__name__)
def _normalize_department_code(raw_code: Optional[str]) -> str:
return (raw_code or "").strip().upper()
def _resolve_parent_department_id(
cur, parent_code: Optional[str], code_cache: Optional[Dict[str, str]] = None
) -> Optional[str]:
normalized = _normalize_department_code(parent_code)
if not normalized:
return None
if code_cache:
cached = code_cache.get(normalized)
if cached:
return cached
cur.execute(
"SELECT id FROM service_departments WHERE code = %s LIMIT 1",
(normalized,),
)
row = cur.fetchone()
return str(row[0]) if row else None
def _auth_conn(autocommit: bool = False):
"""Reuse the licensing_risks connection for auth tables."""
return lic_repo._lic_pg_conn(autocommit=autocommit)
SERVICE_DEPARTMENT_SEEDS = [
{
"code": "FSSJSJ",
"name": "市级服务部门",
"region_name": "市级",
"username": "FSSJSJ",
"display_name": "市级管理员",
"role": "department_admin",
"grade": 90,
},
{
"code": "FSSJSS",
"name": "三水区服务部门",
"region_name": "三水区",
"parent_code": "FSSJSJ",
"username": "FSSJSS",
"display_name": "三水区管理员",
"role": "department_admin",
"grade": 80,
},
{
"code": "FSSJNH",
"name": "南海区服务部门",
"region_name": "南海区",
"parent_code": "FSSJSJ",
"username": "FSSJNH",
"display_name": "南海区管理员",
"role": "department_admin",
"grade": 80,
},
{
"code": "FSSJSD",
"name": "顺德区服务部门",
"region_name": "顺德区",
"parent_code": "FSSJSJ",
"username": "FSSJSD",
"display_name": "顺德区管理员",
"role": "department_admin",
"grade": 80,
},
{
"code": "FSSJCC",
"name": "禅城区服务部门",
"region_name": "禅城区",
"parent_code": "FSSJSJ",
"username": "FSSJCC",
"display_name": "禅城区管理员",
"role": "department_admin",
"grade": 80,
},
{
"code": "FSSJGM",
"name": "高明区服务部门",
"region_name": "高明区",
"parent_code": "FSSJSJ",
"username": "FSSJGM",
"display_name": "高明区管理员",
"role": "department_admin",
"grade": 80,
},
]
def _resolve_region_id(cur, region_name: Optional[str]) -> Optional[str]:
if not region_name:
return None
cur.execute(
"""
SELECT id
FROM regions
WHERE name = %s
LIMIT 1
""",
(region_name,),
)
row = cur.fetchone()
return str(row[0]) if row else None
def _upsert_service_department(
cur, seed: Dict[str, Any], code_cache: Optional[Dict[str, str]] = None
) -> str:
code = _normalize_department_code(seed.get("code"))
name = seed.get("name")
if not code or not name:
raise ValueError("Service department seed requires code and name")
cur.execute(
"SELECT id FROM service_departments WHERE code = %s LIMIT 1",
(code,),
)
row = cur.fetchone()
region_id = _resolve_region_id(cur, seed.get("region_name"))
description = seed.get("description")
parent_id = _resolve_parent_department_id(cur, seed.get("parent_code"), code_cache)
if row:
dept_id = str(row[0])
cur.execute(
"""
UPDATE service_departments
SET name = %s,
parent_id = COALESCE(%s, parent_id),
region_id = COALESCE(%s, region_id),
description = COALESCE(%s, description),
updated_at = now()
WHERE id = %s
""",
(name, parent_id, region_id, description, dept_id),
)
return dept_id
dept_id = str(uuid.uuid4())
cur.execute(
"""
INSERT INTO service_departments (id, name, code, parent_id, region_id, description)
VALUES (%s, %s, %s, %s, %s, %s)
""",
(dept_id, name, code, parent_id, region_id, description),
)
return dept_id
def _ensure_department_account(cur, seed: Dict[str, Any], department_id: str) -> None:
username = (seed.get("username") or seed.get("code") or "").strip().lower()
if not username:
raise ValueError("Department seed requires username")
display_name = seed.get("display_name") or f"{seed.get('name', username)}账号"
role = seed.get("role", "department_admin")
grade = int(seed.get("grade", 50))
department_role = seed.get("department_role", "manager")
password = seed.get("password") or f"{seed.get('code', username).upper()}123456"
cur.execute(
"SELECT id FROM auth_users WHERE username = %s",
(username,),
)
row = cur.fetchone()
if row:
cur.execute(
"""
UPDATE auth_users
SET service_department_id = %s,
role = COALESCE(%s, role),
department_role = COALESCE(%s, department_role)
WHERE id = %s
""",
(department_id, role, department_role, row[0]),
)
return
user_id = uuid.uuid4().hex
password_hash = pwd_context.hash(password)
cur.execute(
"""
INSERT INTO auth_users (
id,
username,
password_hash,
display_name,
role,
grade,
service_department_id,
department_role
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""",
(user_id, username, password_hash, display_name, role, grade, department_id, department_role),
)
logger.info("Seeded user %s (department=%s)", username, department_id)
def ensure_default_department_accounts() -> None:
"""Ensure the default service departments and related accounts exist."""
dept_ids: Dict[str, str] = {}
with _auth_conn() as conn:
_ensure_auth_schema(conn)
cur = conn.cursor()
for seed in SERVICE_DEPARTMENT_SEEDS:
dept_id = _upsert_service_department(cur, seed, dept_ids)
code_key = _normalize_department_code(seed.get("code"))
if code_key:
dept_ids[code_key] = dept_id
conn.commit()
cur = conn.cursor()
for seed in SERVICE_DEPARTMENT_SEEDS:
dept_id = dept_ids.get(_normalize_department_code(seed.get("code")))
if not dept_id:
continue
_ensure_department_account(cur, seed, dept_id)
conn.commit()
def _ensure_auth_schema(conn) -> None:
lic_repo._ensure_service_department_schema(conn)
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS auth_users (
id TEXT PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
display_name TEXT,
role TEXT NOT NULL DEFAULT 'user',
grade INTEGER NOT NULL DEFAULT 0,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
service_department_id uuid REFERENCES service_departments(id),
department_role text,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS auth_users_department_idx
ON auth_users (service_department_id)
"""
)
conn.commit()
def ensure_auth_schema() -> None:
with _auth_conn() as conn:
_ensure_auth_schema(conn)
def _row_to_user(row: tuple[Any, ...], columns: tuple[str, ...]) -> Dict[str, Any]:
return {col: row[idx] for idx, col in enumerate(columns)}
def _public_user_payload(user: Dict[str, Any]) -> Dict[str, Any]:
department = None
if user.get("service_department_id"):
department = {
"id": user.get("service_department_id"),
"name": user.get("service_department_name"),
"code": user.get("service_department_code"),
"phone": user.get("service_department_phone"),
"parent_id": user.get("service_department_parent_id"),
"region_id": user.get("service_department_region_id"),
"role": user.get("department_role"),
}
created_at = user.get("created_at")
if isinstance(created_at, datetime):
created_at_value = created_at.isoformat()
else:
created_at_value = created_at
return {
"id": user.get("id"),
"username": user.get("username"),
"display_name": user.get("display_name"),
"role": user.get("role"),
"grade": user.get("grade"),
"is_active": user.get("is_active", True),
"department": department,
"department_id": user.get("service_department_id"),
"created_at": created_at_value,
}
def get_user_by_username(username: str) -> Optional[Dict[str, Any]]:
username = (username or "").strip().lower()
if not username:
return None
with _auth_conn() as conn:
cur = conn.cursor()
cur.execute(
"""
SELECT
au.id,
au.username,
au.display_name,
au.role,
au.grade,
au.is_active,
au.password_hash,
au.service_department_id,
au.department_role,
au.created_at,
sd.name AS service_department_name,
sd.code AS service_department_code,
sd.phone AS service_department_phone,
sd.parent_id AS service_department_parent_id,
sd.region_id AS service_department_region_id
FROM auth_users au
LEFT JOIN service_departments sd ON sd.id = au.service_department_id
WHERE au.username = %s
LIMIT 1
""",
(username,),
)
row = cur.fetchone()
if not row:
return None
columns = tuple(col[0] for col in cur.description)
return _row_to_user(row, columns)
def get_user_by_id(user_id: str) -> Optional[Dict[str, Any]]:
user_token = (user_id or "").strip()
if not user_token:
return None
with _auth_conn() as conn:
cur = conn.cursor()
cur.execute(
"""
SELECT
au.id,
au.username,
au.display_name,
au.role,
au.grade,
au.is_active,
au.service_department_id,
au.department_role,
au.created_at,
sd.name AS service_department_name,
sd.code AS service_department_code,
sd.phone AS service_department_phone,
sd.parent_id AS service_department_parent_id,
sd.region_id AS service_department_region_id
FROM auth_users au
LEFT JOIN service_departments sd ON sd.id = au.service_department_id
WHERE au.id = %s
LIMIT 1
""",
(user_token,),
)
row = cur.fetchone()
if not row:
return None
columns = tuple(col[0] for col in cur.description)
return _public_user_payload(_row_to_user(row, columns))
def list_users(include_inactive: bool = False) -> List[Dict[str, Any]]:
where_clause = "" if include_inactive else "WHERE au.is_active = TRUE"
with _auth_conn() as conn:
cur = conn.cursor()
cur.execute(
f"""
SELECT
au.id,
au.username,
au.display_name,
au.role,
au.grade,
au.is_active,
au.service_department_id,
au.department_role,
au.created_at,
sd.name AS service_department_name,
sd.code AS service_department_code,
sd.phone AS service_department_phone,
sd.parent_id AS service_department_parent_id,
sd.region_id AS service_department_region_id
FROM auth_users au
LEFT JOIN service_departments sd ON sd.id = au.service_department_id
{where_clause}
ORDER BY au.created_at DESC
"""
)
rows = cur.fetchall()
columns = tuple(col[0] for col in cur.description)
users: List[Dict[str, Any]] = []
for row in rows:
users.append(_public_user_payload(_row_to_user(row, columns)))
return users
def create_user(
username: str,
password: str,
*,
display_name: Optional[str] = None,
role: str = "user",
grade: int = 0,
service_department_id: Optional[str] = None,
department_role: Optional[str] = None,
) -> Dict[str, Any]:
username_clean = (username or "").strip().lower()
if not username_clean or not password:
raise ValueError("Username and password are required")
user_id = uuid.uuid4().hex
password_hash = pwd_context.hash(password)
with _auth_conn() as conn:
cur = conn.cursor()
cur.execute(
"""
INSERT INTO auth_users (
id,
username,
password_hash,
display_name,
role,
grade,
service_department_id,
department_role
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING
id,
username,
display_name,
role,
grade,
is_active,
created_at,
service_department_id,
department_role
""",
(
user_id,
username_clean,
password_hash,
display_name,
role,
grade,
service_department_id,
department_role,
),
)
row = cur.fetchone()
columns = tuple(col[0] for col in cur.description)
conn.commit()
return get_user_by_id(user_id)
def update_user_account(
user_id: str,
*,
password: Optional[str] = None,
service_department_id: Optional[str] = None,
display_name: Optional[str] = None,
role: Optional[str] = None,
grade: Optional[int] = None,
department_role: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
user_token = (user_id or "").strip()
if not user_token:
raise ValueError("user_id 不能为空")
updates: List[str] = []
values: List[Any] = []
if password is not None:
if not password.strip():
raise ValueError("密码不能为空")
password_hash = pwd_context.hash(password)
updates.append("password_hash = %s")
values.append(password_hash)
if service_department_id is not None:
dept_value = (service_department_id or "").strip() or None
updates.append("service_department_id = %s")
values.append(dept_value)
if display_name is not None:
updates.append("display_name = %s")
values.append(display_name.strip() if isinstance(display_name, str) else display_name)
if role is not None:
updates.append("role = %s")
values.append(role.strip() if isinstance(role, str) else role)
if grade is not None:
updates.append("grade = %s")
values.append(int(grade))
if department_role is not None:
updates.append("department_role = %s")
values.append(department_role.strip() if isinstance(department_role, str) else department_role)
if not updates:
return get_user_by_id(user_token)
with _auth_conn() as conn:
cur = conn.cursor()
query = f"UPDATE auth_users SET {', '.join(updates)} WHERE id = %s RETURNING id"
values.append(user_token)
cur.execute(query, tuple(values))
row = cur.fetchone()
if not row:
conn.rollback()
return None
conn.commit()
return get_user_by_id(user_token)
def verify_password(password: str, password_hash: str) -> bool:
if not password or not password_hash:
return False
try:
return pwd_context.verify(password, password_hash)
except ValueError:
return False
def ensure_seed_admin() -> None:
username = os.getenv("LAWRISK_ADMIN_USERNAME")
password = os.getenv("LAWRISK_ADMIN_PASSWORD")
if not username or not password:
return
existing = get_user_by_username(username)
if existing:
return
display_name = os.getenv("LAWRISK_ADMIN_DISPLAY_NAME", "Super Admin")
role = os.getenv("LAWRISK_ADMIN_ROLE", "admin")
grade = int(os.getenv("LAWRISK_ADMIN_GRADE", "100"))
create_user(username=username, password=password, display_name=display_name, role=role, grade=grade)

View File

@ -166,6 +166,9 @@ _PERMIT_SOURCES_TABLE_LOCK = threading.Lock()
_PERMIT_FILE_SCHEMA_READY: Optional[bool] = None
_PERMIT_FILE_SCHEMA_LOCK = threading.Lock()
_SERVICE_DEPARTMENT_SCHEMA_READY: Optional[bool] = None
_SERVICE_DEPARTMENT_SCHEMA_LOCK = threading.Lock()
_CANONICAL_REGION_KEYWORDS: Dict[str, Tuple[str, ...]] = {
"市级": ("市级", "全市", "佛山市本级", "佛山市市级"),
"禅城区": ("禅城区", "禅城"),
@ -210,6 +213,24 @@ def _clean_text(value: Any) -> str:
return str(value).strip()
def _to_isoformat(value: Any) -> Optional[str]:
if value is None:
return None
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, date):
return datetime.combine(value, datetime.min.time()).isoformat()
return str(value)
def _to_optional_str(value: Any) -> Optional[str]:
if value is None:
return None
if isinstance(value, (uuid.UUID,)):
return str(value)
return str(value)
def _normalize_permit_token(value: Any) -> str:
"""Normalize permit names for dictionary lookups (case/whitespace insensitive)."""
text = _clean_text(value)
@ -1164,6 +1185,7 @@ def commit_permit_import_session(
with _lic_pg_conn(autocommit=False) as conn:
try:
_ensure_permit_sources_table(conn)
_ensure_service_department_schema(conn)
_ensure_permit_theme_override_schema(conn)
if session_file_bytes:
_ensure_permit_file_schema(conn)
@ -1754,6 +1776,402 @@ def _ensure_permit_sources_table(conn: Optional[pg.Connection] = None) -> None:
_PERMIT_SOURCES_TABLE_PRESENT = True
def _create_service_department_schema(conn: pg.Connection) -> None:
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS service_departments (
id uuid PRIMARY KEY,
name text NOT NULL,
code text NOT NULL UNIQUE,
phone text,
parent_id uuid REFERENCES service_departments(id) ON DELETE SET NULL,
region_id uuid REFERENCES regions(id) ON DELETE SET NULL,
description text,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
)
"""
)
cur.execute(
"""
CREATE UNIQUE INDEX IF NOT EXISTS service_departments_name_idx
ON service_departments (name)
"""
)
cur.execute(
"""
ALTER TABLE IF EXISTS service_departments
ADD COLUMN IF NOT EXISTS parent_id uuid REFERENCES service_departments(id) ON DELETE SET NULL
"""
)
cur.execute(
"""
ALTER TABLE IF EXISTS service_departments
ADD COLUMN IF NOT EXISTS phone text
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS service_departments_parent_idx
ON service_departments (parent_id)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS service_department_permits (
department_id uuid NOT NULL REFERENCES service_departments(id) ON DELETE CASCADE,
region_id uuid NOT NULL REFERENCES regions(id) ON DELETE CASCADE,
permit_id uuid NOT NULL REFERENCES permits(id) ON DELETE CASCADE,
created_at timestamptz NOT NULL DEFAULT now(),
created_by text,
PRIMARY KEY (department_id, region_id, permit_id)
)
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS service_dept_permits_region_idx
ON service_department_permits (region_id, permit_id)
"""
)
def _ensure_service_department_schema(conn: Optional[pg.Connection] = None) -> None:
global _SERVICE_DEPARTMENT_SCHEMA_READY
if _SERVICE_DEPARTMENT_SCHEMA_READY:
return
with _SERVICE_DEPARTMENT_SCHEMA_LOCK:
if _SERVICE_DEPARTMENT_SCHEMA_READY:
return
if conn is not None:
original_autocommit = conn.autocommit
try:
conn.autocommit = True
_create_service_department_schema(conn)
finally:
conn.autocommit = original_autocommit
else:
with _lic_pg_conn(autocommit=True) as ensure_conn:
_create_service_department_schema(ensure_conn)
_SERVICE_DEPARTMENT_SCHEMA_READY = True
_SERVICE_DEPARTMENT_SELECT = """
SELECT
sd.id,
sd.name,
sd.code,
sd.phone,
sd.parent_id,
parent.name AS parent_name,
sd.region_id,
r.name AS region_name,
sd.description,
sd.created_at,
sd.updated_at
FROM service_departments sd
LEFT JOIN service_departments parent ON parent.id = sd.parent_id
LEFT JOIN regions r ON r.id = sd.region_id
"""
def _serialize_service_department_row(record: Dict[str, Any]) -> Dict[str, Any]:
return {
"id": _to_optional_str(record.get("id")),
"name": record.get("name"),
"code": record.get("code"),
"phone": record.get("phone"),
"parent_id": _to_optional_str(record.get("parent_id")),
"parent_name": record.get("parent_name"),
"region_id": _to_optional_str(record.get("region_id")),
"region_name": record.get("region_name"),
"description": record.get("description"),
"created_at": _to_isoformat(record.get("created_at")),
"updated_at": _to_isoformat(record.get("updated_at")),
}
def _fetch_service_department(cur: pg.Cursor, department_id: str) -> Optional[Dict[str, Any]]:
cur.execute(
_SERVICE_DEPARTMENT_SELECT + " WHERE sd.id = %s LIMIT 1",
(department_id,),
)
row = cur.fetchone()
if not row:
return None
columns = tuple(col[0] for col in cur.description)
record = {columns[idx]: row[idx] for idx in range(len(columns))}
return _serialize_service_department_row(record)
def list_service_departments() -> List[Dict[str, Any]]:
with _lic_pg_conn() as conn:
_ensure_service_department_schema(conn)
cur = conn.cursor()
cur.execute(_SERVICE_DEPARTMENT_SELECT + " ORDER BY sd.created_at ASC")
rows = cur.fetchall()
columns = tuple(col[0] for col in cur.description)
departments: List[Dict[str, Any]] = []
for row in rows:
record = {columns[idx]: row[idx] for idx in range(len(columns))}
departments.append(_serialize_service_department_row(record))
return departments
def create_service_department(
name: str,
*,
code: Optional[str] = None,
phone: Optional[str] = None,
parent_id: Optional[str] = None,
region_id: Optional[str] = None,
description: Optional[str] = None,
) -> Dict[str, Any]:
normalized_name = _clean_text(name)
if not normalized_name:
raise ValueError("服务部门名称不能为空")
normalized_code = (_clean_text(code).upper() if code else "") or uuid.uuid4().hex[:8].upper()
parent_token = _clean_text(parent_id) or None
region_token = _clean_text(region_id) or None
description_text = description.strip() if isinstance(description, str) else description
with _lic_pg_conn() as conn:
_ensure_service_department_schema(conn)
cur = conn.cursor()
dept_id = uuid.uuid4()
cur.execute(
"""
INSERT INTO service_departments (id, name, code, phone, parent_id, region_id, description)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
(dept_id, normalized_name, normalized_code, phone, parent_token, region_token, description_text),
)
conn.commit()
result = _fetch_service_department(cur, str(dept_id))
if not result:
raise RuntimeError("创建服务部门失败")
return result
def update_service_department(
department_id: str,
*,
name: Optional[str] = None,
phone: Optional[str] = None,
parent_id: Optional[str] = None,
region_id: Optional[str] = None,
description: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
dept_token = _clean_text(department_id)
if not dept_token:
raise ValueError("department_id 不能为空")
updates: List[str] = []
values: List[Any] = []
if name is not None:
normalized_name = _clean_text(name)
if not normalized_name:
raise ValueError("服务部门名称不能为空")
updates.append("name = %s")
values.append(normalized_name)
if phone is not None:
updates.append("phone = %s")
values.append(phone.strip() if isinstance(phone, str) else phone)
if parent_id is not None:
parent_token = _clean_text(parent_id) or None
if parent_token and parent_token == dept_token:
raise ValueError("服务部门不能设置为自己的上级")
updates.append("parent_id = %s")
values.append(parent_token)
if region_id is not None:
updates.append("region_id = %s")
values.append(_clean_text(region_id) or None)
if description is not None:
updates.append("description = %s")
values.append(description.strip() if isinstance(description, str) else description)
if not updates:
return _get_service_department_by_id(dept_token)
updates.append("updated_at = now()")
with _lic_pg_conn() as conn:
_ensure_service_department_schema(conn)
cur = conn.cursor()
query = f"UPDATE service_departments SET {', '.join(updates)} WHERE id = %s"
values.append(dept_token)
cur.execute(query, tuple(values))
conn.commit()
return _get_service_department_by_id(dept_token)
def _get_service_department_by_id(department_id: str) -> Optional[Dict[str, Any]]:
with _lic_pg_conn() as conn:
cur = conn.cursor()
return _fetch_service_department(cur, department_id)
def delete_service_department(department_id: str) -> Dict[str, Any]:
dept_token = _clean_text(department_id)
if not dept_token:
raise ValueError("department_id 不能为空")
with _lic_pg_conn() as conn:
_ensure_service_department_schema(conn)
cur = conn.cursor()
cur.execute(
"SELECT COUNT(*) FROM auth_users WHERE service_department_id = %s",
(dept_token,),
)
bound_count = int(cur.fetchone()[0] or 0)
if bound_count > 0:
raise ValueError("仍有账号绑定该服务部门,请先解除绑定后再删除")
cur.execute(
"UPDATE service_departments SET parent_id = NULL WHERE parent_id = %s",
(dept_token,),
)
cur.execute(
"DELETE FROM service_departments WHERE id = %s RETURNING id",
(dept_token,),
)
row = cur.fetchone()
if not row:
conn.rollback()
return {"deleted": False}
conn.commit()
return {"deleted": True}
_THEME_SUMMARY_SELECT = """
SELECT
t.id,
t.name,
COUNT(DISTINCT rtp.permit_id) AS permit_count,
COUNT(DISTINCT rtp.region_id) AS region_count
FROM themes t
LEFT JOIN region_theme_permits rtp ON rtp.theme_id = t.id
"""
def _serialize_theme_row(record: Dict[str, Any]) -> Dict[str, Any]:
return {
"id": _to_optional_str(record.get("id")),
"name": record.get("name"),
"permit_count": int(record.get("permit_count") or 0),
"region_count": int(record.get("region_count") or 0),
}
def _fetch_theme_summary(cur: pg.Cursor, theme_id: str) -> Optional[Dict[str, Any]]:
cur.execute(
_THEME_SUMMARY_SELECT + " WHERE t.id = %s GROUP BY t.id, t.name",
(theme_id,),
)
row = cur.fetchone()
if not row:
return None
columns = tuple(col[0] for col in cur.description)
record = {columns[idx]: row[idx] for idx in range(len(columns))}
return _serialize_theme_row(record)
def list_all_themes() -> List[Dict[str, Any]]:
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(_THEME_SUMMARY_SELECT + " GROUP BY t.id, t.name ORDER BY t.name ASC")
rows = cur.fetchall()
columns = tuple(col[0] for col in cur.description)
items: List[Dict[str, Any]] = []
for row in rows:
record = {columns[idx]: row[idx] for idx in range(len(columns))}
items.append(_serialize_theme_row(record))
return items
def create_theme(name: str) -> Dict[str, Any]:
normalized = _clean_text(name)
if not normalized:
raise ValueError("主题名称不能为空")
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(
"""
INSERT INTO themes (name)
VALUES (%s)
ON CONFLICT (name) DO NOTHING
RETURNING id
""",
(normalized,),
)
row = cur.fetchone()
if not row:
cur.execute("SELECT id FROM themes WHERE name = %s LIMIT 1", (normalized,))
row = cur.fetchone()
conn.commit()
if not row:
raise RuntimeError("创建主题失败")
theme_id = str(row[0])
summary = _fetch_theme_summary(cur, theme_id)
if not summary:
raise RuntimeError("无法加载主题信息")
return summary
def rename_theme(theme_id: str, new_name: str) -> Dict[str, Any]:
theme_token = _clean_text(theme_id)
if not theme_token:
raise ValueError("theme_id 不能为空")
normalized = _clean_text(new_name)
if not normalized:
raise ValueError("主题名称不能为空")
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(
"UPDATE themes SET name = %s WHERE id = %s RETURNING id",
(normalized, theme_token),
)
row = cur.fetchone()
if not row:
conn.rollback()
raise ValueError("主题不存在或已被删除")
conn.commit()
summary = _fetch_theme_summary(cur, theme_token)
if not summary:
raise RuntimeError("无法刷新主题信息")
return summary
def delete_theme(theme_id: str) -> Dict[str, Any]:
theme_token = _clean_text(theme_id)
if not theme_token:
raise ValueError("theme_id 不能为空")
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(
"DELETE FROM region_theme_permits WHERE theme_id = %s",
(theme_token,),
)
rtp_deleted = int(cur.rowcount or 0)
cur.execute(
"DELETE FROM region_themes WHERE theme_id = %s",
(theme_token,),
)
rt_deleted = int(cur.rowcount or 0)
cur.execute(
"DELETE FROM themes WHERE id = %s RETURNING id",
(theme_token,),
)
row = cur.fetchone()
if not row:
conn.rollback()
return {"deleted": False, "message": "主题不存在"}
conn.commit()
return {
"deleted": True,
"region_theme_permits_deleted": rtp_deleted,
"region_themes_deleted": rt_deleted,
}
def _create_permit_file_schema(conn: pg.Connection) -> None:
"""Create permit file storage tables on demand."""
cur = conn.cursor()

View File

@ -0,0 +1,77 @@
"""Helpers for managing import template files used by administrators."""
from __future__ import annotations
import json
import os
from datetime import datetime, timezone
from typing import Any, Dict
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
TEMPLATE_DIR = os.path.join(PROJECT_ROOT, "data", "template")
PERMIT_TEMPLATE_FILENAME = "风险提示表 模板.xlsx"
TEMPLATE_META_FILENAME = "template_meta.json"
def get_permit_template_path() -> str:
"""Return the absolute path to the permit import template."""
return os.path.join(TEMPLATE_DIR, PERMIT_TEMPLATE_FILENAME)
def _meta_path() -> str:
return os.path.join(TEMPLATE_DIR, TEMPLATE_META_FILENAME)
def _read_meta() -> Dict[str, Any]:
path = _meta_path()
if not os.path.exists(path):
return {}
try:
with open(path, "r", encoding="utf-8") as meta_file:
return json.load(meta_file)
except (json.JSONDecodeError, OSError):
return {}
def get_permit_template_metadata() -> Dict[str, Any]:
"""Return metadata for the permit template file (size, timestamps, uploader)."""
template_path = get_permit_template_path()
exists = os.path.exists(template_path)
metadata = {
"exists": exists,
"filename": os.path.basename(template_path),
"filesize": os.path.getsize(template_path) if exists else 0,
"updated_at": None,
"uploaded_by": None,
"source_filename": None,
"download_url": "/fs-ai-asistant/api/workflow/lawrisk/admin/permit-import/template",
}
meta_file_data = _read_meta()
metadata.update({k: v for k, v in meta_file_data.items() if k in metadata or k in {"hash"} })
if metadata.get("updated_at") and isinstance(metadata["updated_at"], (int, float)):
metadata["updated_at"] = datetime.fromtimestamp(metadata["updated_at"], tz=timezone.utc).isoformat()
return metadata
def overwrite_permit_template(file_bytes: bytes, filename: str, uploaded_by: str | None = None) -> Dict[str, Any]:
"""Overwrite the stored permit template with the uploaded bytes."""
if not file_bytes:
raise ValueError("模板文件内容不能为空")
os.makedirs(TEMPLATE_DIR, exist_ok=True)
target_path = get_permit_template_path()
with open(target_path, "wb") as template_file:
template_file.write(file_bytes)
timestamp = datetime.now(tz=timezone.utc).isoformat()
meta_payload = {
"updated_at": timestamp,
"uploaded_by": uploaded_by,
"source_filename": filename,
"filesize": len(file_bytes),
"filename": os.path.basename(target_path),
}
with open(_meta_path(), "w", encoding="utf-8") as meta_file:
json.dump(meta_payload, meta_file, ensure_ascii=False, indent=2)
meta_payload["exists"] = True
meta_payload["download_url"] = "/fs-ai-asistant/api/workflow/lawrisk/admin/permit-import/template"
return meta_payload

711
static/super_admin.html Normal file
View File

@ -0,0 +1,711 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>LawRisk 超级管理员控制台</title>
<style>
* {
box-sizing: border-box;
}
body {
margin: 0;
padding: 0;
font-family: "Microsoft YaHei", "PingFang SC", -apple-system, BlinkMacSystemFont, sans-serif;
background: linear-gradient(135deg, #eef2ff 0%, #e0e7ff 100%);
min-height: 100vh;
color: #1f2937;
}
.page {
max-width: 1400px;
margin: 0 auto;
padding: 32px 20px 60px;
}
.header {
background: #fff;
border-radius: 18px;
padding: 28px;
box-shadow: 0 20px 60px rgba(79, 70, 229, 0.12);
margin-bottom: 24px;
position: relative;
}
.title {
text-align: center;
}
.title h1 {
font-size: 30px;
margin: 0;
color: #111827;
}
.title p {
margin: 6px 0 0;
color: #4b5563;
font-size: 15px;
}
.user-chip {
position: absolute;
top: 28px;
right: 28px;
padding: 16px 20px;
border-radius: 16px;
border: 1px solid #e5e7eb;
background: rgba(255,255,255,0.9);
box-shadow: 0 10px 30px rgba(17, 24, 39, 0.08);
display: flex;
flex-direction: column;
min-width: 220px;
gap: 6px;
}
.user-name {
font-weight: 600;
font-size: 16px;
display: flex;
justify-content: space-between;
align-items: center;
}
.user-tag {
font-size: 12px;
padding: 2px 10px;
border-radius: 999px;
background: #eef2ff;
color: #4f46e5;
}
.user-meta {
font-size: 12px;
color: #6b7280;
}
.logout-btn {
align-self: flex-start;
background: #ef4444;
color: #fff;
border: none;
border-radius: 999px;
padding: 6px 14px;
font-size: 13px;
cursor: pointer;
}
.grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(320px, 1fr));
gap: 20px;
}
.card {
background: #fff;
border-radius: 16px;
padding: 20px;
box-shadow: 0 12px 32px rgba(15, 23, 42, 0.08);
display: flex;
flex-direction: column;
gap: 16px;
}
h2 {
margin: 0;
font-size: 20px;
color: #111827;
}
h3 {
margin: 0;
font-size: 16px;
color: #374151;
}
label {
font-size: 13px;
color: #6b7280;
}
input, select, textarea {
width: 100%;
border: 1px solid #d1d5db;
border-radius: 10px;
padding: 10px 12px;
font-size: 14px;
margin-top: 4px;
}
textarea {
resize: vertical;
}
button.primary {
background: linear-gradient(135deg, #4f46e5, #6366f1);
border: none;
color: #fff;
padding: 10px 18px;
border-radius: 999px;
cursor: pointer;
font-weight: 600;
margin-top: 8px;
}
table {
width: 100%;
border-collapse: collapse;
font-size: 13px;
}
th, td {
padding: 8px 6px;
border-bottom: 1px solid #f3f4f6;
text-align: left;
}
th {
font-weight: 600;
color: #4b5563;
}
.table-wrap {
max-height: 260px;
overflow: auto;
}
.pill {
padding: 2px 8px;
border-radius: 999px;
background: #eef2ff;
color: #4f46e5;
font-size: 12px;
}
.action {
border: none;
background: none;
color: #2563eb;
cursor: pointer;
padding: 0;
font-size: 13px;
}
.danger {
color: #dc2626;
}
.message {
margin-bottom: 16px;
padding: 12px 16px;
border-radius: 12px;
background: #eef2ff;
color: #312e81;
display: none;
}
.message.show {
display: block;
}
.template-meta {
font-size: 13px;
color: #4b5563;
line-height: 1.6;
}
.section {
display: flex;
flex-direction: column;
gap: 12px;
}
@media (max-width: 768px) {
.user-chip {
position: static;
margin-top: 16px;
}
.header {
padding-top: 48px;
}
}
</style>
</head>
<body>
<div class="page">
<header class="header">
<div class="title">
<h1>LawRisk 法律风险提示系统</h1>
<p>超级管理员 · 用户、部门、主题与模板一站式管控</p>
</div>
<div class="user-chip" id="userChip">
<div class="user-name">
<span id="currentUserName">未登录</span>
<span class="user-tag" id="currentUserRole">role</span>
</div>
<div class="user-meta" id="currentUserDept"></div>
<button class="logout-btn" id="logoutBtn">退出登录</button>
</div>
</header>
<div class="message" id="messageBar"></div>
<div class="grid">
<section class="card">
<h2>用户管理</h2>
<div class="section">
<h3>用户添加</h3>
<form id="userCreateForm">
<label>用户名
<input type="text" name="username" required placeholder="请输入登录账号">
</label>
<label>初始密码
<input type="password" name="password" required placeholder="设置登录密码">
</label>
<label>显示名称
<input type="text" name="display_name" placeholder="可选">
</label>
<label>绑定服务部门
<select name="service_department_id" id="userCreateDept">
<option value="">暂不绑定</option>
</select>
</label>
<button class="primary" type="submit">创建用户</button>
</form>
</div>
<div class="section">
<h3>密码 / 部门调整</h3>
<form id="userUpdateForm">
<label>选择用户
<select name="user_id" id="userUpdateSelect" required></select>
</label>
<label>新密码(留空表示不修改)
<input type="password" name="password" placeholder="可选">
</label>
<label>新的服务部门
<select name="service_department_id" id="userUpdateDept">
<option value="">保持不变 / 清除绑定</option>
</select>
</label>
<button class="primary" type="submit">保存调整</button>
</form>
</div>
<div class="section">
<h3>用户列表</h3>
<div class="table-wrap">
<table>
<thead>
<tr>
<th>用户名</th>
<th>显示名</th>
<th>部门</th>
<th>角色</th>
<th>创建时间</th>
</tr>
</thead>
<tbody id="userTableBody"></tbody>
</table>
</div>
</div>
</section>
<section class="card">
<h2>服务部门管理</h2>
<form id="deptCreateForm">
<label>部门名称
<input type="text" name="name" required>
</label>
<label>联系电话
<input type="text" name="phone" placeholder="座机或手机号">
</label>
<label>上级部门
<select name="parent_id" id="deptParentSelect">
<option value="">设为顶级</option>
</select>
</label>
<label>备注
<textarea name="description" rows="2" placeholder="可填写简要职责"></textarea>
</label>
<button class="primary" type="submit">新增服务部门</button>
</form>
<div class="table-wrap">
<table>
<thead>
<tr>
<th>名称</th>
<th>上级</th>
<th>电话</th>
<th>操作</th>
</tr>
</thead>
<tbody id="deptTableBody"></tbody>
</table>
</div>
</section>
<section class="card">
<h2>主题列表管理</h2>
<form id="themeCreateForm">
<label>主题名称
<input type="text" name="name" required>
</label>
<button class="primary" type="submit">添加主题</button>
</form>
<div class="table-wrap">
<table>
<thead>
<tr>
<th>主题</th>
<th>关联许可</th>
<th>覆盖区划</th>
<th>操作</th>
</tr>
</thead>
<tbody id="themeTableBody"></tbody>
</table>
</div>
</section>
<section class="card">
<h2>模板管理</h2>
<div class="template-meta" id="templateMeta">
模板未加载
</div>
<form id="templateForm">
<label>上传新的 Excel 模板
<input type="file" name="file" accept=".xlsx,.xls" required>
</label>
<button class="primary" type="submit">覆盖模板</button>
</form>
<button class="primary" type="button" id="downloadTemplateBtn">下载当前模板</button>
</section>
</div>
</div>
<script>
const API_BASE = '/fs-ai-asistant/api/workflow/lawrisk';
const messageBar = document.getElementById('messageBar');
const userTableBody = document.getElementById('userTableBody');
const userCreateDept = document.getElementById('userCreateDept');
const userUpdateDept = document.getElementById('userUpdateDept');
const userUpdateSelect = document.getElementById('userUpdateSelect');
const deptParentSelect = document.getElementById('deptParentSelect');
const deptTableBody = document.getElementById('deptTableBody');
const themeTableBody = document.getElementById('themeTableBody');
const templateMetaBox = document.getElementById('templateMeta');
let state = {
users: [],
departments: [],
themes: [],
templateMeta: {}
};
function showMessage(text, type = 'info') {
if (!text) {
messageBar.classList.remove('show');
messageBar.textContent = '';
return;
}
messageBar.textContent = text;
messageBar.style.background = type === 'error' ? '#fee2e2' : '#eef2ff';
messageBar.style.color = type === 'error' ? '#b91c1c' : '#312e81';
messageBar.classList.add('show');
setTimeout(() => {
messageBar.classList.remove('show');
}, 4000);
}
async function fetchJSON(url, options = {}) {
const resp = await fetch(url, {
credentials: 'include',
headers: options.method === 'GET' || options.body instanceof FormData
? options.headers
: {'Content-Type': 'application/json', ...(options.headers || {})},
...options
});
let data = {};
try {
data = await resp.json();
} catch (_) {
data = {};
}
if (!resp.ok || data.success === false) {
throw new Error(data.message || '操作失败');
}
return data;
}
async function loadCurrentUser() {
try {
const resp = await fetchJSON('/auth/me', {method: 'GET'});
const user = resp.user || {};
document.getElementById('currentUserName').textContent = user.display_name || user.username || '未知管理员';
document.getElementById('currentUserRole').textContent = user.role || 'admin';
const dept = user.department;
document.getElementById('currentUserDept').textContent = dept ? `${dept.name || ''}${dept.code ? ' · ' + dept.code : ''}` : '未绑定部门';
} catch (err) {
showMessage(err.message, 'error');
}
}
function renderDepartmentOptions() {
const options = ['<option value="">暂不绑定</option>'];
const updateOptions = ['<option value="">保持不变 / 清除绑定</option>'];
const parentOptions = ['<option value="">设为顶级</option>'];
state.departments.forEach(dept => {
const label = `${dept.name}${dept.region_name ? ' · ' + dept.region_name : ''}`;
options.push(`<option value="${dept.id}">${label}</option>`);
updateOptions.push(`<option value="${dept.id}">${label}</option>`);
parentOptions.push(`<option value="${dept.id}">${label}</option>`);
});
userCreateDept.innerHTML = options.join('');
userUpdateDept.innerHTML = updateOptions.join('');
deptParentSelect.innerHTML = parentOptions.join('');
}
function renderUserSelect() {
const opts = state.users.map(user => `<option value="${user.id}">${user.username} (${user.display_name || '未命名'})</option>`);
userUpdateSelect.innerHTML = opts.join('');
}
function renderUserTable() {
userTableBody.innerHTML = state.users.map(user => {
const dept = user.department ? user.department.name : '—';
return `<tr>
<td>${user.username}</td>
<td>${user.display_name || '—'}</td>
<td>${dept}</td>
<td><span class="pill">${user.role}</span></td>
<td>${user.created_at || '—'}</td>
</tr>`;
}).join('');
}
function renderDeptTable() {
deptTableBody.innerHTML = state.departments.map(dept => `
<tr>
<td>${dept.name}</td>
<td>${dept.parent_name || '—'}</td>
<td>${dept.phone || '—'}</td>
<td>
<button class="action danger" data-id="${dept.id}" data-action="delete-dept">删除</button>
</td>
</tr>
`).join('');
}
function renderThemeTable() {
themeTableBody.innerHTML = state.themes.map(theme => `
<tr>
<td>${theme.name}</td>
<td>${theme.permit_count}</td>
<td>${theme.region_count}</td>
<td>
<button class="action" data-id="${theme.id}" data-name="${theme.name}" data-action="rename-theme">重命名</button>
&nbsp;|&nbsp;
<button class="action danger" data-id="${theme.id}" data-name="${theme.name}" data-action="delete-theme">删除</button>
</td>
</tr>
`).join('');
}
function renderTemplateMeta() {
const meta = state.templateMeta;
if (!meta || !meta.exists) {
templateMetaBox.textContent = '暂未上传模板,请尽快上传标准模板文件。';
return;
}
templateMetaBox.innerHTML = `
<div>文件名:${meta.filename}</div>
<div>大小:${(meta.filesize / 1024).toFixed(1)} KB</div>
<div>更新时间:${meta.updated_at || '—'}</div>
<div>上传人:${meta.uploaded_by || '—'}</div>
`;
}
async function refreshUsers() {
const data = await fetchJSON(`${API_BASE}/admin/users`, {method: 'GET'});
state.users = data.data.users || [];
renderUserSelect();
renderUserTable();
}
async function refreshDepartments() {
const data = await fetchJSON(`${API_BASE}/admin/service-departments`, {method: 'GET'});
state.departments = data.data.departments || [];
renderDepartmentOptions();
renderDeptTable();
}
async function refreshThemes() {
const data = await fetchJSON(`${API_BASE}/admin/themes/catalog`, {method: 'GET'});
state.themes = data.data.themes || [];
renderThemeTable();
}
async function refreshTemplateMeta() {
const data = await fetchJSON(`${API_BASE}/admin/templates/permit`, {method: 'GET'});
state.templateMeta = data.data || {};
renderTemplateMeta();
}
document.getElementById('logoutBtn').addEventListener('click', async () => {
try {
await fetchJSON('/auth/logout', {method: 'POST', headers: {'Content-Type': 'application/json'}});
window.location.href = '/fs-ai-asistant/lawrisk/login';
} catch (err) {
showMessage(err.message, 'error');
}
});
document.getElementById('userCreateForm').addEventListener('submit', async (evt) => {
evt.preventDefault();
const form = evt.target;
const payload = {
username: form.username.value.trim(),
password: form.password.value.trim(),
display_name: form.display_name.value.trim(),
service_department_id: form.service_department_id.value || null
};
try {
await fetchJSON(`${API_BASE}/admin/users`, {
method: 'POST',
body: JSON.stringify(payload)
});
showMessage('用户创建成功');
form.reset();
await refreshUsers();
} catch (err) {
showMessage(err.message, 'error');
}
});
document.getElementById('userUpdateForm').addEventListener('submit', async (evt) => {
evt.preventDefault();
const form = evt.target;
const userId = form.user_id.value;
if (!userId) {
showMessage('请选择要调整的用户', 'error');
return;
}
const payload = {};
if (form.password.value) {
payload.password = form.password.value;
}
payload.service_department_id = form.service_department_id.value || null;
try {
await fetchJSON(`${API_BASE}/admin/users/${userId}`, {
method: 'PATCH',
body: JSON.stringify(payload)
});
showMessage('用户信息已更新');
form.password.value = '';
await refreshUsers();
} catch (err) {
showMessage(err.message, 'error');
}
});
document.getElementById('deptCreateForm').addEventListener('submit', async (evt) => {
evt.preventDefault();
const form = evt.target;
const payload = {
name: form.name.value.trim(),
phone: form.phone.value.trim(),
parent_id: form.parent_id.value || null,
description: form.description.value.trim()
};
try {
await fetchJSON(`${API_BASE}/admin/service-departments`, {
method: 'POST',
body: JSON.stringify(payload)
});
showMessage('服务部门已创建');
form.reset();
await refreshDepartments();
} catch (err) {
showMessage(err.message, 'error');
}
});
deptTableBody.addEventListener('click', async (evt) => {
const target = evt.target;
if (!target.dataset || target.dataset.action !== 'delete-dept') {
return;
}
const deptId = target.dataset.id;
if (!deptId) return;
if (!confirm('删除前请确认没有账号绑定该部门,确定要删除吗?')) {
return;
}
try {
await fetchJSON(`${API_BASE}/admin/service-departments/${deptId}`, {method: 'DELETE'});
showMessage('服务部门已删除');
await refreshDepartments();
await refreshUsers();
} catch (err) {
showMessage(err.message, 'error');
}
});
document.getElementById('themeCreateForm').addEventListener('submit', async (evt) => {
evt.preventDefault();
const form = evt.target;
const payload = {name: form.name.value.trim()};
try {
await fetchJSON(`${API_BASE}/admin/themes/catalog`, {
method: 'POST',
body: JSON.stringify(payload)
});
showMessage('主题添加成功');
form.reset();
await refreshThemes();
} catch (err) {
showMessage(err.message, 'error');
}
});
themeTableBody.addEventListener('click', async (evt) => {
const target = evt.target;
if (!target.dataset) return;
const action = target.dataset.action;
const themeId = target.dataset.id;
const themeName = target.dataset.name;
if (!themeId || !action) return;
if (action === 'rename-theme') {
const value = prompt('请输入新的主题名称', themeName || '');
if (!value) return;
try {
await fetchJSON(`${API_BASE}/admin/themes/catalog/${themeId}`, {
method: 'PATCH',
body: JSON.stringify({name: value.trim()})
});
showMessage('主题名称已更新');
await refreshThemes();
} catch (err) {
showMessage(err.message, 'error');
}
} else if (action === 'delete-theme') {
if (!confirm(`确定删除主题「${themeName}」及其关联?`)) return;
try {
await fetchJSON(`${API_BASE}/admin/themes/catalog/${themeId}`, {method: 'DELETE'});
showMessage('主题已删除');
await refreshThemes();
} catch (err) {
showMessage(err.message, 'error');
}
}
});
document.getElementById('templateForm').addEventListener('submit', async (evt) => {
evt.preventDefault();
const form = evt.target;
const fileInput = form.file;
if (!fileInput.files.length) {
showMessage('请选择要上传的模板文件', 'error');
return;
}
const formData = new FormData();
formData.append('file', fileInput.files[0]);
try {
await fetchJSON(`${API_BASE}/admin/templates/permit`, {
method: 'POST',
body: formData,
headers: {}
});
showMessage('模板已更新');
form.reset();
await refreshTemplateMeta();
} catch (err) {
showMessage(err.message, 'error');
}
});
document.getElementById('downloadTemplateBtn').addEventListener('click', () => {
window.open(`${API_BASE}/admin/permit-import/template`, '_blank');
});
async function bootstrap() {
await loadCurrentUser();
await Promise.all([
refreshUsers(),
refreshDepartments(),
refreshThemes(),
refreshTemplateMeta()
]);
}
bootstrap().catch(err => showMessage(err.message, 'error'));
</script>
</body>
</html>