"""Authentication and authorization helpers for the LawRisk backend.""" from __future__ import annotations import logging import os import uuid from datetime import datetime from typing import Any, Dict, List, Optional from passlib.context import CryptContext from lawrisk.services import licensing_repo as lic_repo pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto") logger = logging.getLogger(__name__) def _normalize_department_code(raw_code: Optional[str]) -> str: return (raw_code or "").strip().upper() def _resolve_parent_department_id( cur, parent_code: Optional[str], code_cache: Optional[Dict[str, str]] = None ) -> Optional[str]: normalized = _normalize_department_code(parent_code) if not normalized: return None if code_cache: cached = code_cache.get(normalized) if cached: return cached cur.execute( "SELECT id FROM service_departments WHERE code = %s LIMIT 1", (normalized,), ) row = cur.fetchone() return str(row[0]) if row else None def _auth_conn(autocommit: bool = False): """Reuse the licensing_risks connection for auth tables.""" return lic_repo._lic_pg_conn(autocommit=autocommit) SERVICE_DEPARTMENT_SEEDS = [ { "code": "FSSJSJ", "name": "市级服务部门", "region_name": "市级", "username": "FSSJSJ", "display_name": "市级管理员", "role": "department_admin", "grade": 90, }, { "code": "FSSJSS", "name": "三水区服务部门", "region_name": "三水区", "parent_code": "FSSJSJ", "username": "FSSJSS", "display_name": "三水区管理员", "role": "department_admin", "grade": 80, }, { "code": "FSSJNH", "name": "南海区服务部门", "region_name": "南海区", "parent_code": "FSSJSJ", "username": "FSSJNH", "display_name": "南海区管理员", "role": "department_admin", "grade": 80, }, { "code": "FSSJSD", "name": "顺德区服务部门", "region_name": "顺德区", "parent_code": "FSSJSJ", "username": "FSSJSD", "display_name": "顺德区管理员", "role": "department_admin", "grade": 80, }, { "code": "FSSJCC", "name": "禅城区服务部门", "region_name": "禅城区", "parent_code": "FSSJSJ", "username": "FSSJCC", "display_name": "禅城区管理员", "role": "department_admin", "grade": 80, }, { "code": "FSSJGM", "name": "高明区服务部门", "region_name": "高明区", "parent_code": "FSSJSJ", "username": "FSSJGM", "display_name": "高明区管理员", "role": "department_admin", "grade": 80, }, ] def _resolve_region_id(cur, region_name: Optional[str]) -> Optional[str]: if not region_name: return None cur.execute( """ SELECT id FROM regions WHERE name = %s LIMIT 1 """, (region_name,), ) row = cur.fetchone() return str(row[0]) if row else None def _upsert_service_department( cur, seed: Dict[str, Any], code_cache: Optional[Dict[str, str]] = None ) -> str: code = _normalize_department_code(seed.get("code")) name = seed.get("name") if not code or not name: raise ValueError("Service department seed requires code and name") cur.execute( "SELECT id FROM service_departments WHERE code = %s LIMIT 1", (code,), ) row = cur.fetchone() region_id = _resolve_region_id(cur, seed.get("region_name")) description = seed.get("description") parent_id = _resolve_parent_department_id(cur, seed.get("parent_code"), code_cache) if row: dept_id = str(row[0]) cur.execute( """ UPDATE service_departments SET name = %s, parent_id = COALESCE(%s, parent_id), region_id = COALESCE(%s, region_id), description = COALESCE(%s, description), updated_at = now() WHERE id = %s """, (name, parent_id, region_id, description, dept_id), ) return dept_id dept_id = str(uuid.uuid4()) cur.execute( """ INSERT INTO service_departments (id, name, code, parent_id, region_id, description) VALUES (%s, %s, %s, %s, %s, %s) """, (dept_id, name, code, parent_id, region_id, description), ) return dept_id def _ensure_department_account(cur, seed: Dict[str, Any], department_id: str) -> None: username = (seed.get("username") or seed.get("code") or "").strip().lower() if not username: raise ValueError("Department seed requires username") display_name = seed.get("display_name") or f"{seed.get('name', username)}账号" role = seed.get("role", "department_admin") grade = int(seed.get("grade", 50)) department_role = seed.get("department_role", "manager") password = seed.get("password") or f"{seed.get('code', username).upper()}123456" cur.execute( "SELECT id FROM auth_users WHERE username = %s", (username,), ) row = cur.fetchone() if row: cur.execute( """ UPDATE auth_users SET service_department_id = %s, role = COALESCE(%s, role), department_role = COALESCE(%s, department_role) WHERE id = %s """, (department_id, role, department_role, row[0]), ) return user_id = uuid.uuid4().hex password_hash = pwd_context.hash(password) cur.execute( """ INSERT INTO auth_users ( id, username, password_hash, display_name, role, grade, service_department_id, department_role ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """, (user_id, username, password_hash, display_name, role, grade, department_id, department_role), ) logger.info("Seeded user %s (department=%s)", username, department_id) def ensure_default_department_accounts() -> None: """Ensure the default service departments and related accounts exist.""" dept_ids: Dict[str, str] = {} with _auth_conn() as conn: _ensure_auth_schema(conn) cur = conn.cursor() for seed in SERVICE_DEPARTMENT_SEEDS: dept_id = _upsert_service_department(cur, seed, dept_ids) code_key = _normalize_department_code(seed.get("code")) if code_key: dept_ids[code_key] = dept_id conn.commit() cur = conn.cursor() for seed in SERVICE_DEPARTMENT_SEEDS: dept_id = dept_ids.get(_normalize_department_code(seed.get("code"))) if not dept_id: continue _ensure_department_account(cur, seed, dept_id) conn.commit() def _ensure_auth_schema(conn) -> None: lic_repo._ensure_service_department_schema(conn) cur = conn.cursor() cur.execute( """ CREATE TABLE IF NOT EXISTS auth_users ( id TEXT PRIMARY KEY, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, display_name TEXT, role TEXT NOT NULL DEFAULT 'user', grade INTEGER NOT NULL DEFAULT 0, is_active BOOLEAN NOT NULL DEFAULT TRUE, service_department_id uuid REFERENCES service_departments(id), department_role text, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) """ ) cur.execute( """ CREATE INDEX IF NOT EXISTS auth_users_department_idx ON auth_users (service_department_id) """ ) conn.commit() def ensure_auth_schema() -> None: with _auth_conn() as conn: _ensure_auth_schema(conn) def _row_to_user(row: tuple[Any, ...], columns: tuple[str, ...]) -> Dict[str, Any]: return {col: row[idx] for idx, col in enumerate(columns)} def _public_user_payload(user: Dict[str, Any]) -> Dict[str, Any]: department = None if user.get("service_department_id"): department = { "id": user.get("service_department_id"), "name": user.get("service_department_name"), "code": user.get("service_department_code"), "phone": user.get("service_department_phone"), "parent_id": user.get("service_department_parent_id"), "region_id": user.get("service_department_region_id"), "region_name": user.get("service_department_region_name"), "role": user.get("department_role"), } created_at = user.get("created_at") if isinstance(created_at, datetime): created_at_value = created_at.isoformat() else: created_at_value = created_at return { "id": user.get("id"), "username": user.get("username"), "display_name": user.get("display_name"), "role": user.get("role"), "grade": user.get("grade"), "is_active": user.get("is_active", True), "department": department, "department_id": user.get("service_department_id"), "created_at": created_at_value, } def get_user_by_username(username: str) -> Optional[Dict[str, Any]]: username = (username or "").strip().lower() if not username: return None with _auth_conn() as conn: cur = conn.cursor() cur.execute( """ SELECT au.id, au.username, au.display_name, au.role, au.grade, au.is_active, au.password_hash, au.service_department_id, au.department_role, au.created_at, sd.name AS service_department_name, sd.code AS service_department_code, sd.phone AS service_department_phone, sd.parent_id AS service_department_parent_id, sd.region_id AS service_department_region_id, r.name AS service_department_region_name FROM auth_users au LEFT JOIN service_departments sd ON sd.id = au.service_department_id LEFT JOIN regions r ON r.id = sd.region_id WHERE au.username = %s LIMIT 1 """, (username,), ) row = cur.fetchone() if not row: return None columns = tuple(col[0] for col in cur.description) return _row_to_user(row, columns) def get_user_by_id(user_id: str) -> Optional[Dict[str, Any]]: user_token = (user_id or "").strip() if not user_token: return None with _auth_conn() as conn: cur = conn.cursor() cur.execute( """ SELECT au.id, au.username, au.display_name, au.role, au.grade, au.is_active, au.service_department_id, au.department_role, au.created_at, sd.name AS service_department_name, sd.code AS service_department_code, sd.phone AS service_department_phone, sd.parent_id AS service_department_parent_id, sd.region_id AS service_department_region_id, r.name AS service_department_region_name FROM auth_users au LEFT JOIN service_departments sd ON sd.id = au.service_department_id LEFT JOIN regions r ON r.id = sd.region_id WHERE au.id = %s LIMIT 1 """, (user_token,), ) row = cur.fetchone() if not row: return None columns = tuple(col[0] for col in cur.description) return _public_user_payload(_row_to_user(row, columns)) def list_users(include_inactive: bool = False) -> List[Dict[str, Any]]: where_clause = "" if include_inactive else "WHERE au.is_active = TRUE" with _auth_conn() as conn: cur = conn.cursor() cur.execute( f""" SELECT au.id, au.username, au.display_name, au.role, au.grade, au.is_active, au.service_department_id, au.department_role, au.created_at, sd.name AS service_department_name, sd.code AS service_department_code, sd.phone AS service_department_phone, sd.parent_id AS service_department_parent_id, sd.region_id AS service_department_region_id, r.name AS service_department_region_name FROM auth_users au LEFT JOIN service_departments sd ON sd.id = au.service_department_id LEFT JOIN regions r ON r.id = sd.region_id {where_clause} ORDER BY au.created_at DESC """ ) rows = cur.fetchall() columns = tuple(col[0] for col in cur.description) users: List[Dict[str, Any]] = [] for row in rows: users.append(_public_user_payload(_row_to_user(row, columns))) return users def create_user( username: str, password: str, *, display_name: Optional[str] = None, role: str = "user", grade: int = 0, service_department_id: Optional[str] = None, department_role: Optional[str] = None, parent_department_id: Optional[str] = None, service_department_region_id: Optional[str] = None, service_department_phone: Optional[str] = None, ) -> Dict[str, Any]: username_clean = (username or "").strip().lower() if not username_clean or not password: raise ValueError("Username and password are required") # 如果未传入部门,则自动创建同名单位并绑定 dept_token = (service_department_id or "").strip() or None if not dept_token: try: dept_name = (display_name or username_clean).strip() or username_clean dept_code = username_clean.upper() created = lic_repo.create_service_department( name=dept_name, code=dept_code, phone=(service_department_phone or "").strip() or None, parent_id=(parent_department_id or "").strip() or None, region_id=(service_department_region_id or "").strip() or None, ) dept_token = created.get("id") except Exception as exc: raise ValueError(f"自动创建单位失败: {exc}") user_id = uuid.uuid4().hex password_hash = pwd_context.hash(password) with _auth_conn() as conn: cur = conn.cursor() cur.execute( """ INSERT INTO auth_users ( id, username, password_hash, display_name, role, grade, service_department_id, department_role ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING id, username, display_name, role, grade, is_active, created_at, service_department_id, department_role """, ( user_id, username_clean, password_hash, display_name, role, grade, dept_token, department_role, ), ) row = cur.fetchone() columns = tuple(col[0] for col in cur.description) conn.commit() return get_user_by_id(user_id) def update_user_account( user_id: str, *, password: Optional[str] = None, service_department_id: Optional[str] = None, display_name: Optional[str] = None, role: Optional[str] = None, grade: Optional[int] = None, department_role: Optional[str] = None, ) -> Optional[Dict[str, Any]]: user_token = (user_id or "").strip() if not user_token: raise ValueError("user_id 不能为空") updates: List[str] = [] values: List[Any] = [] if password is not None: if not password.strip(): raise ValueError("密码不能为空") password_hash = pwd_context.hash(password) updates.append("password_hash = %s") values.append(password_hash) if service_department_id is not None: dept_value = (service_department_id or "").strip() or None updates.append("service_department_id = %s") values.append(dept_value) if display_name is not None: updates.append("display_name = %s") values.append(display_name.strip() if isinstance(display_name, str) else display_name) if role is not None: updates.append("role = %s") values.append(role.strip() if isinstance(role, str) else role) if grade is not None: updates.append("grade = %s") values.append(int(grade)) if department_role is not None: updates.append("department_role = %s") values.append(department_role.strip() if isinstance(department_role, str) else department_role) if not updates: return get_user_by_id(user_token) with _auth_conn() as conn: cur = conn.cursor() query = f"UPDATE auth_users SET {', '.join(updates)} WHERE id = %s RETURNING id" values.append(user_token) cur.execute(query, tuple(values)) row = cur.fetchone() if not row: conn.rollback() return None conn.commit() return get_user_by_id(user_token) def delete_user_account(user_id: str) -> bool: """ Delete a user account by ID. Args: user_id: The unique identifier of the user to delete Returns: True if the user was successfully deleted, False if user not found Raises: ValueError: If user_id is empty or if attempting to delete the last admin user """ user_token = (user_id or "").strip() if not user_token: raise ValueError("user_id 不能为空") with _auth_conn() as conn: cur = conn.cursor() # Check if user exists cur.execute("SELECT id, username, role FROM auth_users WHERE id = %s", (user_token,)) user = cur.fetchone() if not user: return False # Prevent deleting the last admin user if user[2] == "admin": cur.execute("SELECT COUNT(*) FROM auth_users WHERE role = 'admin' AND id != %s", (user_token,)) other_admins = cur.fetchone()[0] if other_admins == 0: raise ValueError("不能删除最后一个管理员账号") # Delete the user cur.execute("DELETE FROM auth_users WHERE id = %s", (user_token,)) if cur.rowcount == 0: conn.rollback() return False conn.commit() return True def verify_password(password: str, password_hash: str) -> bool: if not password or not password_hash: return False try: return pwd_context.verify(password, password_hash) except ValueError: return False def ensure_seed_admin() -> None: username = os.getenv("LAWRISK_ADMIN_USERNAME") password = os.getenv("LAWRISK_ADMIN_PASSWORD") if not username or not password: return existing = get_user_by_username(username) if existing: return display_name = os.getenv("LAWRISK_ADMIN_DISPLAY_NAME", "Super Admin") role = os.getenv("LAWRISK_ADMIN_ROLE", "admin") grade = int(os.getenv("LAWRISK_ADMIN_GRADE", "100")) create_user(username=username, password=password, display_name=display_name, role=role, grade=grade)