"""Auth blueprint: login page, authentication endpoints, and helpers.""" from __future__ import annotations from functools import wraps from typing import Any, Callable, Dict, Iterable, Optional, Tuple from urllib.parse import urlparse from flask import ( Blueprint, Response, jsonify, redirect, render_template, request, session, url_for, ) from lawrisk.services.auth_service import ( get_user_by_username, update_user_account, verify_password, ) auth_bp = Blueprint("auth", __name__, url_prefix="/fs-ai-asistant/api/workflow/lawrisk") SESSION_USER_KEY = "auth_user" def _safe_next_url(candidate: Optional[str]) -> str: """ Sanitize next URL to prevent security issues. For security reasons, we don't preserve query parameters that might contain sensitive data like passwords. We only preserve the path. """ if not candidate: return "/" raw = str(candidate).strip() if not raw: return "/" parsed = urlparse(raw) if parsed.netloc: safe_path = parsed.path or "/" else: safe_path = raw if raw.startswith("/") else f"/{raw}" # For security: strip ALL query parameters to prevent password leakage # Only preserve path, never query strings return safe_path def _scrub_user_payload(user: Dict[str, Any]) -> Dict[str, Any]: department = None if user.get("service_department_id"): department = { "id": user.get("service_department_id"), "name": user.get("service_department_name"), "code": user.get("service_department_code"), "region_id": user.get("service_department_region_id"), "region_name": user.get("service_department_region_name"), "parent_id": user.get("service_department_parent_id"), "phone": user.get("service_department_phone"), "role": user.get("department_role"), } return { "id": user.get("id"), "username": user.get("username"), "display_name": user.get("display_name"), "role": user.get("role"), "grade": user.get("grade"), "department": department, } def ensure_admin_access( allowed_roles: Optional[Iterable[str]] = None, *, prefer_json: bool = False, ) -> Tuple[Optional[Dict[str, Any]], Optional[Response]]: """Ensure the current session belongs to an admin role.""" user = get_current_user() wants_html = ( (not prefer_json) and request.method == "GET" and request.accept_mimetypes.accept_html and ( request.accept_mimetypes["text/html"] >= request.accept_mimetypes["application/json"] ) ) if not user: if wants_html: # Sanitize URL to prevent password leakage in redirect safe_next = _safe_next_url(request.url) login_url = url_for("auth.login_page", next=safe_next) return None, redirect(login_url) return None, (jsonify({"error": "authentication required"}), 401) default_roles = ("admin",) if allowed_roles is None else allowed_roles allowed = {str(role).lower() for role in default_roles if role} current_role = str(user.get("role") or "").lower() if allowed and current_role not in allowed: if wants_html: # Sanitize URL to prevent password leakage in redirect safe_next = _safe_next_url(request.url) login_url = url_for("auth.login_page", next=safe_next, force=1) return None, redirect(login_url) return None, (jsonify({"error": "admin privileges required"}), 403) return user, None def is_superuser(user: Optional[Dict[str, Any]]) -> bool: """Check if the given user is a superuser with full system access. Superuser criteria (any match): 1. role equals 'admin' 2. username equals 'fssjsj' 3. grade equals 100 (fallback check) Args: user: User dictionary from get_current_user() Returns: True if user is a superuser, False otherwise """ if not user: return False # Primary check: role-based if user.get("role") == "admin": return True # Special exception: specific username if user.get("username") == "fssjsj": return True # Fallback check: grade-based if user.get("grade") == 100: return True return False def get_current_user() -> Optional[Dict[str, Any]]: data = session.get(SESSION_USER_KEY) if not isinstance(data, dict): return None return data def login_required(view: Callable[..., Response]) -> Callable[..., Response]: @wraps(view) def wrapper(*args: Any, **kwargs: Any) -> Response: user = get_current_user() if not user: wants_html = request.method == "GET" and request.accept_mimetypes.accept_html and ( request.accept_mimetypes["text/html"] >= request.accept_mimetypes["application/json"] ) if wants_html: # Sanitize URL to prevent password leakage in redirect safe_next = _safe_next_url(request.url) login_url = url_for("auth.login_page", next=safe_next) return redirect(login_url) return jsonify({"error": "authentication required"}), 401 return view(*args, **kwargs) return wrapper def _truthy_param(value: Optional[str]) -> bool: if value is None: return False return str(value).strip().lower() in {"1", "true", "yes", "on", "force", "reauth"} @auth_bp.get("/login") def login_page() -> Response: user = get_current_user() force_login = _truthy_param(request.args.get("force") or request.args.get("reauth")) if user and not force_login: return redirect(request.args.get("next") or "/") return render_template("login.html") @auth_bp.post("/auth/login") def login_action() -> Response: payload = request.get_json(silent=True) or request.form username = (payload.get("username", "") if payload else "").strip().lower() password = (payload.get("password", "") if payload else "").strip() next_url = _safe_next_url((payload.get("next") if payload else None) or request.args.get("next")) if not username or not password: return jsonify({"error": "username and password are required"}), 400 user = get_user_by_username(username) if not user or not verify_password(password, user.get("password_hash", "")): return jsonify({"error": "invalid credentials"}), 401 if not user.get("is_active", True): return jsonify({"error": "account disabled"}), 403 sanitized = _scrub_user_payload(user) session[SESSION_USER_KEY] = sanitized # Smart redirect: only apply redirect logic if no explicit next parameter was provided # For security, all users use the same default redirect to avoid exposing permission levels explicit_next = (payload.get("next") if payload else None) or request.args.get("next") if not explicit_next or next_url == "/": # Default redirect path: admin to db_admin, others to root if sanitized.get("role") == "admin": next_url = "/fs-ai-asistant/api/workflow/lawrisk/db_admin" else: # For non-admin, redirect to root or homepage if available next_url = "/fs-ai-asistant/api/workflow/lawrisk/" # Form submissions expect redirects; API clients expect JSON if request.content_type and "application/x-www-form-urlencoded" in request.content_type: return redirect(next_url) return jsonify({"message": "login successful", "user": sanitized, "redirect": next_url}) @auth_bp.post("/auth/logout") def logout_action() -> Response: session.pop(SESSION_USER_KEY, None) if request.content_type and "application/x-www-form-urlencoded" in request.content_type: return redirect(url_for("auth.login_page")) return jsonify({"message": "logged out"}) @auth_bp.get("/auth/me") def current_user_endpoint() -> Response: user = get_current_user() if not user: return jsonify({"authenticated": False}), 401 return jsonify({"authenticated": True, "user": user}) def _validate_password_complexity(password: str) -> Optional[str]: """验证密码复杂度:最小8位,必须包含字母和数字""" if len(password) < 8: return "密码长度至少为 8 位" if not (any(c.isalpha() for c in password) and any(c.isdigit() for c in password)): return "密码必须同时包含字母和数字" return None @auth_bp.post("/v2/auth/change-password") @login_required def change_password_endpoint() -> Response: """Allow authenticated users to change their own password.""" try: payload = request.get_json(silent=True) or {} current_password = (payload.get("current_password", "") if payload else "").strip() new_password = (payload.get("new_password", "") if payload else "").strip() # Validate required fields if not current_password: return jsonify({"error": "请输入当前密码"}), 400 if not new_password: return jsonify({"error": "请输入新密码"}), 400 # Get current user from session user = get_current_user() if not user: return jsonify({"error": "用户未登录"}), 401 print(f"DEBUG: Session user = {user}") # Get complete user data including password hash username = user.get("username") if not username: return jsonify({"error": "无效的用户信息"}), 400 # 使用 get_user_by_username 获取完整用户数据(包括密码哈希) user_data = get_user_by_username(username) if not user_data: return jsonify({"error": "用户不存在"}), 404 # Debug logging print(f"DEBUG: username = {username}") print(f"DEBUG: password_hash exists = {'password_hash' in user_data}") print(f"DEBUG: password_hash length = {len(user_data.get('password_hash', ''))}") print(f"DEBUG: current_password provided = {bool(current_password)}") # Verify current password password_hash = user_data.get("password_hash", "") if not password_hash: print("ERROR: password_hash is empty!") return jsonify({"error": "系统错误:无法获取密码哈希"}), 500 if not verify_password(current_password, password_hash): print("ERROR: Password verification failed!") print(f"DEBUG: Hash preview: {password_hash[:20]}..." if len(password_hash) > 20 else f"DEBUG: Hash: {password_hash}") return jsonify({"error": "当前密码错误"}), 401 print("DEBUG: Password verified successfully!") # Get user_id for update operation user_id = user_data.get("id") if not user_id: return jsonify({"error": "无法获取用户ID"}), 400 # Validate new password complexity complexity_error = _validate_password_complexity(new_password) if complexity_error: return jsonify({"error": complexity_error}), 400 # Check if new password is same as current if verify_password(new_password, user_data.get("password_hash", "")): return jsonify({"error": "新密码不能与当前密码相同"}), 400 # Update password print(f"DEBUG: Attempting to update password for user_id = {user_id}") update_result = update_user_account( user_id=user_id, password=new_password, ) print(f"DEBUG: Update result = {update_result}") if not update_result: print("ERROR: Password update returned None/False") return jsonify({"error": "密码修改失败,请稍后重试"}), 500 # Note: update_user_account already logs the operation internally print("DEBUG: Password change completed successfully") return jsonify({"message": "密码修改成功"}), 200 except Exception as e: # Log full error details import traceback print(f"ERROR: Exception in change_password_endpoint") print(f"ERROR: Type: {type(e).__name__}") print(f"ERROR: Message: {str(e)}") print(f"ERROR: Traceback:\n{traceback.format_exc()}") return jsonify({"error": "密码修改失败,请稍后重试"}), 500