fs-lawrisk/lawrisk/api/auth.py

340 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Auth blueprint: login page, authentication endpoints, and helpers."""
from __future__ import annotations
from functools import wraps
from typing import Any, Callable, Dict, Iterable, Optional, Tuple
from urllib.parse import urlparse
from flask import (
Blueprint,
Response,
jsonify,
redirect,
render_template,
request,
session,
url_for,
)
from lawrisk.services.auth_service import (
get_user_by_username,
update_user_account,
verify_password,
)
auth_bp = Blueprint("auth", __name__, url_prefix="/fs-ai-asistant/api/workflow/lawrisk")
SESSION_USER_KEY = "auth_user"
def _safe_next_url(candidate: Optional[str]) -> str:
"""
Sanitize next URL to prevent security issues.
For security reasons, we don't preserve query parameters that might
contain sensitive data like passwords. We only preserve the path.
"""
if not candidate:
return "/"
raw = str(candidate).strip()
if not raw:
return "/"
parsed = urlparse(raw)
if parsed.netloc:
safe_path = parsed.path or "/"
else:
safe_path = raw if raw.startswith("/") else f"/{raw}"
# For security: strip ALL query parameters to prevent password leakage
# Only preserve path, never query strings
return safe_path
def _scrub_user_payload(user: Dict[str, Any]) -> Dict[str, Any]:
department = None
if user.get("service_department_id"):
department = {
"id": user.get("service_department_id"),
"name": user.get("service_department_name"),
"code": user.get("service_department_code"),
"region_id": user.get("service_department_region_id"),
"region_name": user.get("service_department_region_name"),
"parent_id": user.get("service_department_parent_id"),
"phone": user.get("service_department_phone"),
"role": user.get("department_role"),
}
return {
"id": user.get("id"),
"username": user.get("username"),
"display_name": user.get("display_name"),
"role": user.get("role"),
"grade": user.get("grade"),
"department": department,
}
def ensure_admin_access(
allowed_roles: Optional[Iterable[str]] = None,
*,
prefer_json: bool = False,
) -> Tuple[Optional[Dict[str, Any]], Optional[Response]]:
"""Ensure the current session belongs to an admin role."""
user = get_current_user()
wants_html = (
(not prefer_json)
and request.method == "GET"
and request.accept_mimetypes.accept_html
and (
request.accept_mimetypes["text/html"]
>= request.accept_mimetypes["application/json"]
)
)
if not user:
if wants_html:
# Sanitize URL to prevent password leakage in redirect
safe_next = _safe_next_url(request.url)
login_url = url_for("auth.login_page", next=safe_next)
return None, redirect(login_url)
return None, (jsonify({"error": "authentication required"}), 401)
default_roles = ("admin",) if allowed_roles is None else allowed_roles
allowed = {str(role).lower() for role in default_roles if role}
current_role = str(user.get("role") or "").lower()
if allowed and current_role not in allowed:
if wants_html:
# Sanitize URL to prevent password leakage in redirect
safe_next = _safe_next_url(request.url)
login_url = url_for("auth.login_page", next=safe_next, force=1)
return None, redirect(login_url)
return None, (jsonify({"error": "admin privileges required"}), 403)
return user, None
def is_superuser(user: Optional[Dict[str, Any]]) -> bool:
"""Check if the given user is a superuser with full system access.
Superuser criteria (any match):
1. role equals 'admin'
2. username equals 'fssjsj'
3. grade equals 100 (fallback check)
Args:
user: User dictionary from get_current_user()
Returns:
True if user is a superuser, False otherwise
"""
if not user:
return False
# Primary check: role-based
if user.get("role") == "admin":
return True
# Special exception: specific username
if user.get("username") == "fssjsj":
return True
# Fallback check: grade-based
if user.get("grade") == 100:
return True
return False
def get_current_user() -> Optional[Dict[str, Any]]:
data = session.get(SESSION_USER_KEY)
if not isinstance(data, dict):
return None
return data
def login_required(view: Callable[..., Response]) -> Callable[..., Response]:
@wraps(view)
def wrapper(*args: Any, **kwargs: Any) -> Response:
user = get_current_user()
if not user:
wants_html = request.method == "GET" and request.accept_mimetypes.accept_html and (
request.accept_mimetypes["text/html"] >= request.accept_mimetypes["application/json"]
)
if wants_html:
# Sanitize URL to prevent password leakage in redirect
safe_next = _safe_next_url(request.url)
login_url = url_for("auth.login_page", next=safe_next)
return redirect(login_url)
return jsonify({"error": "authentication required"}), 401
return view(*args, **kwargs)
return wrapper
def _truthy_param(value: Optional[str]) -> bool:
if value is None:
return False
return str(value).strip().lower() in {"1", "true", "yes", "on", "force", "reauth"}
@auth_bp.get("/login")
def login_page() -> Response:
user = get_current_user()
force_login = _truthy_param(request.args.get("force") or request.args.get("reauth"))
if user and not force_login:
return redirect(request.args.get("next") or "/")
return render_template("login.html")
@auth_bp.post("/auth/login")
def login_action() -> Response:
payload = request.get_json(silent=True) or request.form
username = (payload.get("username", "") if payload else "").strip().lower()
password = (payload.get("password", "") if payload else "").strip()
next_url = _safe_next_url((payload.get("next") if payload else None) or request.args.get("next"))
if not username or not password:
return jsonify({"error": "username and password are required"}), 400
user = get_user_by_username(username)
if not user or not verify_password(password, user.get("password_hash", "")):
return jsonify({"error": "invalid credentials"}), 401
if not user.get("is_active", True):
return jsonify({"error": "account disabled"}), 403
sanitized = _scrub_user_payload(user)
session[SESSION_USER_KEY] = sanitized
# Smart redirect: only apply redirect logic if no explicit next parameter was provided
# For security, all users use the same default redirect to avoid exposing permission levels
explicit_next = (payload.get("next") if payload else None) or request.args.get("next")
if not explicit_next or next_url == "/":
# Default redirect path: admin to db_admin, others to root
if sanitized.get("role") == "admin":
next_url = "/fs-ai-asistant/api/workflow/lawrisk/db_admin"
else:
# For non-admin, redirect to root or homepage if available
next_url = "/fs-ai-asistant/api/workflow/lawrisk/"
# Form submissions expect redirects; API clients expect JSON
if request.content_type and "application/x-www-form-urlencoded" in request.content_type:
return redirect(next_url)
return jsonify({"message": "login successful", "user": sanitized, "redirect": next_url})
@auth_bp.post("/auth/logout")
def logout_action() -> Response:
session.pop(SESSION_USER_KEY, None)
if request.content_type and "application/x-www-form-urlencoded" in request.content_type:
return redirect(url_for("auth.login_page"))
return jsonify({"message": "logged out"})
@auth_bp.get("/auth/me")
def current_user_endpoint() -> Response:
user = get_current_user()
if not user:
return jsonify({"authenticated": False}), 401
return jsonify({"authenticated": True, "user": user})
def _validate_password_complexity(password: str) -> Optional[str]:
"""验证密码复杂度最小8位必须包含字母和数字"""
if len(password) < 8:
return "密码长度至少为 8 位"
if not (any(c.isalpha() for c in password) and any(c.isdigit() for c in password)):
return "密码必须同时包含字母和数字"
return None
@auth_bp.post("/v2/auth/change-password")
@login_required
def change_password_endpoint() -> Response:
"""Allow authenticated users to change their own password."""
try:
payload = request.get_json(silent=True) or {}
current_password = (payload.get("current_password", "") if payload else "").strip()
new_password = (payload.get("new_password", "") if payload else "").strip()
# Validate required fields
if not current_password:
return jsonify({"error": "请输入当前密码"}), 400
if not new_password:
return jsonify({"error": "请输入新密码"}), 400
# Get current user from session
user = get_current_user()
if not user:
return jsonify({"error": "用户未登录"}), 401
print(f"DEBUG: Session user = {user}")
# Get complete user data including password hash
username = user.get("username")
if not username:
return jsonify({"error": "无效的用户信息"}), 400
# 使用 get_user_by_username 获取完整用户数据(包括密码哈希)
user_data = get_user_by_username(username)
if not user_data:
return jsonify({"error": "用户不存在"}), 404
# Debug logging
print(f"DEBUG: username = {username}")
print(f"DEBUG: password_hash exists = {'password_hash' in user_data}")
print(f"DEBUG: password_hash length = {len(user_data.get('password_hash', ''))}")
print(f"DEBUG: current_password provided = {bool(current_password)}")
# Verify current password
password_hash = user_data.get("password_hash", "")
if not password_hash:
print("ERROR: password_hash is empty!")
return jsonify({"error": "系统错误:无法获取密码哈希"}), 500
if not verify_password(current_password, password_hash):
print("ERROR: Password verification failed!")
print(f"DEBUG: Hash preview: {password_hash[:20]}..." if len(password_hash) > 20 else f"DEBUG: Hash: {password_hash}")
return jsonify({"error": "当前密码错误"}), 401
print("DEBUG: Password verified successfully!")
# Get user_id for update operation
user_id = user_data.get("id")
if not user_id:
return jsonify({"error": "无法获取用户ID"}), 400
# Validate new password complexity
complexity_error = _validate_password_complexity(new_password)
if complexity_error:
return jsonify({"error": complexity_error}), 400
# Check if new password is same as current
if verify_password(new_password, user_data.get("password_hash", "")):
return jsonify({"error": "新密码不能与当前密码相同"}), 400
# Update password
print(f"DEBUG: Attempting to update password for user_id = {user_id}")
update_result = update_user_account(
user_id=user_id,
password=new_password,
)
print(f"DEBUG: Update result = {update_result}")
if not update_result:
print("ERROR: Password update returned None/False")
return jsonify({"error": "密码修改失败,请稍后重试"}), 500
# Note: update_user_account already logs the operation internally
print("DEBUG: Password change completed successfully")
return jsonify({"message": "密码修改成功"}), 200
except Exception as e:
# Log full error details
import traceback
print(f"ERROR: Exception in change_password_endpoint")
print(f"ERROR: Type: {type(e).__name__}")
print(f"ERROR: Message: {str(e)}")
print(f"ERROR: Traceback:\n{traceback.format_exc()}")
return jsonify({"error": "密码修改失败,请稍后重试"}), 500