fs-lawrisk/lawrisk/services/auth_service.py

674 lines
21 KiB
Python

"""Authentication and authorization helpers for the LawRisk backend."""
from __future__ import annotations
import logging
import os
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
from passlib.context import CryptContext
from lawrisk.services import licensing_repo as lic_repo
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
logger = logging.getLogger(__name__)
def _normalize_department_code(raw_code: Optional[str]) -> str:
return (raw_code or "").strip().upper()
def _resolve_parent_department_id(
cur, parent_code: Optional[str], code_cache: Optional[Dict[str, str]] = None
) -> Optional[str]:
normalized = _normalize_department_code(parent_code)
if not normalized:
return None
if code_cache:
cached = code_cache.get(normalized)
if cached:
return cached
cur.execute(
"SELECT id FROM service_departments WHERE code = %s LIMIT 1",
(normalized,),
)
row = cur.fetchone()
return str(row[0]) if row else None
def _auth_conn(autocommit: bool = False):
"""Reuse the licensing_risks connection for auth tables."""
return lic_repo._lic_pg_conn(autocommit=autocommit)
SERVICE_DEPARTMENT_SEEDS = [
{
"code": "FSSJSJ",
"name": "市级服务部门",
"region_name": "市级",
"username": "FSSJSJ",
"display_name": "市级管理员",
"role": "department_admin",
"grade": 90,
},
{
"code": "FSSJSS",
"name": "三水区服务部门",
"region_name": "三水区",
"parent_code": "FSSJSJ",
"username": "FSSJSS",
"display_name": "三水区管理员",
"role": "department_admin",
"grade": 80,
},
{
"code": "FSSJNH",
"name": "南海区服务部门",
"region_name": "南海区",
"parent_code": "FSSJSJ",
"username": "FSSJNH",
"display_name": "南海区管理员",
"role": "department_admin",
"grade": 80,
},
{
"code": "FSSJSD",
"name": "顺德区服务部门",
"region_name": "顺德区",
"parent_code": "FSSJSJ",
"username": "FSSJSD",
"display_name": "顺德区管理员",
"role": "department_admin",
"grade": 80,
},
{
"code": "FSSJCC",
"name": "禅城区服务部门",
"region_name": "禅城区",
"parent_code": "FSSJSJ",
"username": "FSSJCC",
"display_name": "禅城区管理员",
"role": "department_admin",
"grade": 80,
},
{
"code": "FSSJGM",
"name": "高明区服务部门",
"region_name": "高明区",
"parent_code": "FSSJSJ",
"username": "FSSJGM",
"display_name": "高明区管理员",
"role": "department_admin",
"grade": 80,
},
]
def _resolve_region_id(cur, region_name: Optional[str]) -> Optional[str]:
if not region_name:
return None
cur.execute(
"""
SELECT id
FROM regions
WHERE name = %s
LIMIT 1
""",
(region_name,),
)
row = cur.fetchone()
return str(row[0]) if row else None
def _upsert_service_department(
cur, seed: Dict[str, Any], code_cache: Optional[Dict[str, str]] = None
) -> str:
code = _normalize_department_code(seed.get("code"))
name = seed.get("name")
if not code or not name:
raise ValueError("Service department seed requires code and name")
cur.execute(
"SELECT id FROM service_departments WHERE code = %s LIMIT 1",
(code,),
)
row = cur.fetchone()
region_id = _resolve_region_id(cur, seed.get("region_name"))
description = seed.get("description")
parent_id = _resolve_parent_department_id(cur, seed.get("parent_code"), code_cache)
if row:
dept_id = str(row[0])
cur.execute(
"""
UPDATE service_departments
SET name = %s,
parent_id = COALESCE(%s, parent_id),
region_id = COALESCE(%s, region_id),
description = COALESCE(%s, description),
updated_at = now()
WHERE id = %s
""",
(name, parent_id, region_id, description, dept_id),
)
return dept_id
dept_id = str(uuid.uuid4())
cur.execute(
"""
INSERT INTO service_departments (id, name, code, parent_id, region_id, description)
VALUES (%s, %s, %s, %s, %s, %s)
""",
(dept_id, name, code, parent_id, region_id, description),
)
return dept_id
def _ensure_department_account(cur, seed: Dict[str, Any], department_id: str) -> None:
username = (seed.get("username") or seed.get("code") or "").strip().lower()
if not username:
raise ValueError("Department seed requires username")
display_name = seed.get("display_name") or f"{seed.get('name', username)}账号"
role = seed.get("role", "department_admin")
grade = int(seed.get("grade", 50))
department_role = seed.get("department_role", "manager")
password = seed.get("password") or f"{seed.get('code', username).upper()}123456"
cur.execute(
"SELECT id FROM auth_users WHERE username = %s",
(username,),
)
row = cur.fetchone()
if row:
cur.execute(
"""
UPDATE auth_users
SET service_department_id = %s,
role = COALESCE(%s, role),
department_role = COALESCE(%s, department_role)
WHERE id = %s
""",
(department_id, role, department_role, row[0]),
)
return
user_id = uuid.uuid4().hex
password_hash = pwd_context.hash(password)
cur.execute(
"""
INSERT INTO auth_users (
id,
username,
password_hash,
display_name,
role,
grade,
service_department_id,
department_role
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""",
(user_id, username, password_hash, display_name, role, grade, department_id, department_role),
)
logger.info("Seeded user %s (department=%s)", username, department_id)
def ensure_default_department_accounts() -> None:
"""Ensure the default service departments and related accounts exist."""
dept_ids: Dict[str, str] = {}
with _auth_conn() as conn:
_ensure_auth_schema(conn)
cur = conn.cursor()
for seed in SERVICE_DEPARTMENT_SEEDS:
dept_id = _upsert_service_department(cur, seed, dept_ids)
code_key = _normalize_department_code(seed.get("code"))
if code_key:
dept_ids[code_key] = dept_id
conn.commit()
cur = conn.cursor()
for seed in SERVICE_DEPARTMENT_SEEDS:
dept_id = dept_ids.get(_normalize_department_code(seed.get("code")))
if not dept_id:
continue
_ensure_department_account(cur, seed, dept_id)
conn.commit()
def _ensure_auth_schema(conn) -> None:
lic_repo._ensure_service_department_schema(conn)
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS auth_users (
id TEXT PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
display_name TEXT,
role TEXT NOT NULL DEFAULT 'user',
grade INTEGER NOT NULL DEFAULT 0,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
service_department_id uuid REFERENCES service_departments(id),
department_role text,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS auth_users_department_idx
ON auth_users (service_department_id)
"""
)
conn.commit()
def ensure_auth_schema() -> None:
with _auth_conn() as conn:
_ensure_auth_schema(conn)
def _row_to_user(row: tuple[Any, ...], columns: tuple[str, ...]) -> Dict[str, Any]:
return {col: row[idx] for idx, col in enumerate(columns)}
def _safe_str(val: Any) -> Optional[str]:
if val is None:
return None
return str(val)
def _public_user_payload(user: Dict[str, Any]) -> Dict[str, Any]:
department = None
# Ensure IDs are strings as psycopg2 might return UUID objects
dept_id = _safe_str(user.get("service_department_id"))
if dept_id:
department = {
"id": dept_id,
"name": user.get("service_department_name"),
"code": user.get("service_department_code"),
"phone": user.get("service_department_phone"),
"parent_id": _safe_str(user.get("service_department_parent_id")),
"region_id": _safe_str(user.get("service_department_region_id")),
"region_name": user.get("service_department_region_name"),
"role": user.get("department_role"),
}
created_at = user.get("created_at")
if isinstance(created_at, datetime):
created_at_value = created_at.isoformat()
else:
created_at_value = created_at
return {
"id": user.get("id"),
"username": user.get("username"),
"display_name": user.get("display_name"),
"role": user.get("role"),
"grade": user.get("grade"),
"is_active": user.get("is_active", True),
"department": department,
"department_id": dept_id,
"created_at": created_at_value,
}
def get_user_by_username(username: str) -> Optional[Dict[str, Any]]:
username = (username or "").strip().lower()
if not username:
return None
with _auth_conn() as conn:
cur = conn.cursor()
cur.execute(
"""
SELECT
au.id,
au.username,
au.display_name,
au.role,
au.grade,
au.is_active,
au.password_hash,
au.service_department_id,
au.department_role,
au.created_at,
sd.name AS service_department_name,
sd.code AS service_department_code,
sd.phone AS service_department_phone,
sd.parent_id AS service_department_parent_id,
sd.region_id AS service_department_region_id,
r.name AS service_department_region_name
FROM auth_users au
LEFT JOIN service_departments sd ON sd.id = au.service_department_id
LEFT JOIN regions r ON r.id = sd.region_id
WHERE au.username = %s
LIMIT 1
""",
(username,),
)
row = cur.fetchone()
if not row:
return None
columns = tuple(col[0] for col in cur.description)
return _row_to_user(row, columns)
def get_user_by_id(user_id: str) -> Optional[Dict[str, Any]]:
user_token = (user_id or "").strip()
if not user_token:
return None
with _auth_conn() as conn:
cur = conn.cursor()
cur.execute(
"""
SELECT
au.id,
au.username,
au.display_name,
au.role,
au.grade,
au.is_active,
au.password_hash,
au.service_department_id,
au.department_role,
au.created_at,
sd.name AS service_department_name,
sd.code AS service_department_code,
sd.phone AS service_department_phone,
sd.parent_id AS service_department_parent_id,
sd.region_id AS service_department_region_id,
r.name AS service_department_region_name
FROM auth_users au
LEFT JOIN service_departments sd ON sd.id = au.service_department_id
LEFT JOIN regions r ON r.id = sd.region_id
WHERE au.id = %s
LIMIT 1
""",
(user_token,),
)
row = cur.fetchone()
if not row:
return None
columns = tuple(col[0] for col in cur.description)
return _public_user_payload(_row_to_user(row, columns))
def list_users(include_inactive: bool = False) -> List[Dict[str, Any]]:
where_clause = "" if include_inactive else "WHERE au.is_active = TRUE"
with _auth_conn() as conn:
cur = conn.cursor()
cur.execute(
f"""
SELECT
au.id,
au.username,
au.display_name,
au.role,
au.grade,
au.is_active,
au.service_department_id,
au.department_role,
au.created_at,
sd.name AS service_department_name,
sd.code AS service_department_code,
sd.phone AS service_department_phone,
sd.parent_id AS service_department_parent_id,
sd.region_id AS service_department_region_id,
r.name AS service_department_region_name
FROM auth_users au
LEFT JOIN service_departments sd ON sd.id = au.service_department_id
LEFT JOIN regions r ON r.id = sd.region_id
{where_clause}
ORDER BY au.created_at DESC
"""
)
rows = cur.fetchall()
columns = tuple(col[0] for col in cur.description)
users: List[Dict[str, Any]] = []
for row in rows:
users.append(_public_user_payload(_row_to_user(row, columns)))
return users
def create_user(
username: str,
password: str,
*,
display_name: Optional[str] = None,
role: str = "user",
grade: int = 0,
service_department_id: Optional[str] = None,
department_role: Optional[str] = None,
parent_department_id: Optional[str] = None,
service_department_region_id: Optional[str] = None,
service_department_phone: Optional[str] = None,
operator: str = "admin",
) -> Dict[str, Any]:
username_clean = (username or "").strip().lower()
if not username_clean or not password:
raise ValueError("Username and password are required")
# 如果未传入部门,则自动创建同名单位并绑定
dept_token = (service_department_id or "").strip() or None
if not dept_token:
dept_name = (display_name or username_clean).strip() or username_clean
dept_code = username_clean.upper()
# 检查是否已存在同代码的单位
with _auth_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT id FROM service_departments WHERE code = %s", (dept_code,))
row = cur.fetchone()
if row:
dept_token = str(row[0])
if not dept_token:
try:
created = lic_repo.create_service_department(
name=dept_name,
code=dept_code,
phone=(service_department_phone or "").strip() or None,
parent_id=(parent_department_id or "").strip() or None,
region_id=(service_department_region_id or "").strip() or None,
operator=operator,
)
dept_token = created.get("id")
except Exception as exc:
raise ValueError(f"自动创建单位失败: {exc}")
user_id = uuid.uuid4().hex
password_hash = pwd_context.hash(password)
with _auth_conn() as conn:
cur = conn.cursor()
cur.execute(
"""
INSERT INTO auth_users (
id,
username,
password_hash,
display_name,
role,
grade,
service_department_id,
department_role
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING
id,
username,
display_name,
role,
grade,
is_active,
created_at,
service_department_id,
department_role
""",
(
user_id,
username_clean,
password_hash,
display_name,
role,
grade,
dept_token,
department_role,
),
)
row = cur.fetchone()
columns = tuple(col[0] for col in cur.description)
conn.commit()
user = get_user_by_id(user_id)
if user:
# Log the operation
lic_repo.log_operation(
operator=operator,
operation_type="CREATE",
target_type="USER",
target_id=user["id"],
target_name=user["username"],
change_summary=f"Created user account: {user['username']}",
details=user
)
return user
def update_user_account(
user_id: str,
*,
password: Optional[str] = None,
service_department_id: Optional[str] = None,
display_name: Optional[str] = None,
role: Optional[str] = None,
grade: Optional[int] = None,
department_role: Optional[str] = None,
operator: str = "admin",
) -> Optional[Dict[str, Any]]:
user_token = (user_id or "").strip()
if not user_token:
raise ValueError("user_id 不能为空")
updates: List[str] = []
values: List[Any] = []
if password is not None:
if not password.strip():
raise ValueError("密码不能为空")
password_hash = pwd_context.hash(password)
updates.append("password_hash = %s")
values.append(password_hash)
if service_department_id is not None:
dept_value = (service_department_id or "").strip() or None
updates.append("service_department_id = %s")
values.append(dept_value)
if display_name is not None:
updates.append("display_name = %s")
values.append(display_name.strip() if isinstance(display_name, str) else display_name)
if role is not None:
updates.append("role = %s")
values.append(role.strip() if isinstance(role, str) else role)
if grade is not None:
updates.append("grade = %s")
values.append(int(grade))
if department_role is not None:
updates.append("department_role = %s")
values.append(department_role.strip() if isinstance(department_role, str) else department_role)
if not updates:
return get_user_by_id(user_token)
with _auth_conn() as conn:
cur = conn.cursor()
query = f"UPDATE auth_users SET {', '.join(updates)} WHERE id = %s RETURNING id"
values.append(user_token)
cur.execute(query, tuple(values))
row = cur.fetchone()
if not row:
conn.rollback()
return None
conn.commit()
user = get_user_by_id(user_token)
if user:
# Log the operation
lic_repo.log_operation(
operator=operator,
operation_type="UPDATE",
target_type="USER",
target_id=user["id"],
target_name=user["username"],
change_summary=f"Updated user account: {user['username']}",
details=user
)
return user
def delete_user_account(user_id: str, operator: str = "admin") -> bool:
"""
Delete a user account by ID.
Args:
user_id: The unique identifier of the user to delete
Returns:
True if the user was successfully deleted, False if user not found
Raises:
ValueError: If user_id is empty or if attempting to delete the last admin user
"""
user_token = (user_id or "").strip()
if not user_token:
raise ValueError("user_id 不能为空")
with _auth_conn() as conn:
cur = conn.cursor()
# Check if user exists
cur.execute("SELECT id, username, role FROM auth_users WHERE id = %s", (user_token,))
user = cur.fetchone()
if not user:
return False
# Prevent deleting the last admin user
if user[2] == "admin":
cur.execute("SELECT COUNT(*) FROM auth_users WHERE role = 'admin' AND id != %s", (user_token,))
other_admins = cur.fetchone()[0]
if other_admins == 0:
raise ValueError("不能删除最后一个管理员账号")
# Delete the user
username = user[1]
cur.execute("DELETE FROM auth_users WHERE id = %s", (user_token,))
if cur.rowcount == 0:
conn.rollback()
return False
conn.commit()
# Log the operation
lic_repo.log_operation(
operator=operator,
operation_type="DELETE",
target_type="USER",
target_id=user_token,
target_name=username,
change_summary=f"Deleted user account: {username}"
)
return True
def verify_password(password: str, password_hash: str) -> bool:
if not password or not password_hash:
return False
try:
return pwd_context.verify(password, password_hash)
except ValueError:
return False
def ensure_seed_admin() -> None:
username = os.getenv("LAWRISK_ADMIN_USERNAME")
password = os.getenv("LAWRISK_ADMIN_PASSWORD")
if not username or not password:
return
existing = get_user_by_username(username)
if existing:
return
display_name = os.getenv("LAWRISK_ADMIN_DISPLAY_NAME", "Super Admin")
role = os.getenv("LAWRISK_ADMIN_ROLE", "admin")
grade = int(os.getenv("LAWRISK_ADMIN_GRADE", "100"))
create_user(username=username, password=password, display_name=display_name, role=role, grade=grade)