"""Auth blueprint: login page, authentication endpoints, and helpers.""" from __future__ import annotations from functools import wraps from typing import Any, Callable, Dict, Iterable, Optional, Tuple from urllib.parse import urlparse from flask import ( Blueprint, Response, jsonify, redirect, render_template, request, session, url_for, ) from lawrisk.services.auth_service import get_user_by_username, verify_password auth_bp = Blueprint("auth", __name__) SESSION_USER_KEY = "auth_user" def _safe_next_url(candidate: Optional[str]) -> str: """ Sanitize next URL to prevent security issues. For security reasons, we don't preserve query parameters that might contain sensitive data like passwords. We only preserve the path. """ if not candidate: return "/" raw = str(candidate).strip() if not raw: return "/" parsed = urlparse(raw) if parsed.netloc: safe_path = parsed.path or "/" else: safe_path = raw if raw.startswith("/") else f"/{raw}" # For security: strip ALL query parameters to prevent password leakage # Only preserve path, never query strings return safe_path def _scrub_user_payload(user: Dict[str, Any]) -> Dict[str, Any]: department = None if user.get("service_department_id"): department = { "id": user.get("service_department_id"), "name": user.get("service_department_name"), "code": user.get("service_department_code"), "region_id": user.get("service_department_region_id"), "parent_id": user.get("service_department_parent_id"), "phone": user.get("service_department_phone"), "role": user.get("department_role"), } return { "id": user.get("id"), "username": user.get("username"), "display_name": user.get("display_name"), "role": user.get("role"), "grade": user.get("grade"), "department": department, } def ensure_admin_access( allowed_roles: Optional[Iterable[str]] = None, *, prefer_json: bool = False, ) -> Tuple[Optional[Dict[str, Any]], Optional[Response]]: """Ensure the current session belongs to an admin role.""" user = get_current_user() wants_html = ( (not prefer_json) and request.method == "GET" and request.accept_mimetypes.accept_html and ( request.accept_mimetypes["text/html"] >= request.accept_mimetypes["application/json"] ) ) if not user: if wants_html: # Sanitize URL to prevent password leakage in redirect safe_next = _safe_next_url(request.url) login_url = url_for("auth.login_page", next=safe_next) return None, redirect(login_url) return None, (jsonify({"error": "authentication required"}), 401) default_roles = ("admin",) if allowed_roles is None else allowed_roles allowed = {str(role).lower() for role in default_roles if role} current_role = str(user.get("role") or "").lower() if allowed and current_role not in allowed: if wants_html: # Sanitize URL to prevent password leakage in redirect safe_next = _safe_next_url(request.url) login_url = url_for("auth.login_page", next=safe_next, force=1) return None, redirect(login_url) return None, (jsonify({"error": "admin privileges required"}), 403) return user, None def get_current_user() -> Optional[Dict[str, Any]]: data = session.get(SESSION_USER_KEY) if not isinstance(data, dict): return None return data def login_required(view: Callable[..., Response]) -> Callable[..., Response]: @wraps(view) def wrapper(*args: Any, **kwargs: Any) -> Response: user = get_current_user() if not user: wants_html = request.method == "GET" and request.accept_mimetypes.accept_html and ( request.accept_mimetypes["text/html"] >= request.accept_mimetypes["application/json"] ) if wants_html: # Sanitize URL to prevent password leakage in redirect safe_next = _safe_next_url(request.url) login_url = url_for("auth.login_page", next=safe_next) return redirect(login_url) return jsonify({"error": "authentication required"}), 401 return view(*args, **kwargs) return wrapper def _truthy_param(value: Optional[str]) -> bool: if value is None: return False return str(value).strip().lower() in {"1", "true", "yes", "on", "force", "reauth"} @auth_bp.get("/fs-ai-asistant/lawrisk/login") def login_page() -> Response: user = get_current_user() force_login = _truthy_param(request.args.get("force") or request.args.get("reauth")) if user and not force_login: return redirect(request.args.get("next") or "/") return render_template("login.html") @auth_bp.post("/auth/login") def login_action() -> Response: payload = request.get_json(silent=True) or request.form username = (payload.get("username", "") if payload else "").strip().lower() password = (payload.get("password", "") if payload else "").strip() next_url = _safe_next_url((payload.get("next") if payload else None) or request.args.get("next")) if not username or not password: return jsonify({"error": "username and password are required"}), 400 user = get_user_by_username(username) if not user or not verify_password(password, user.get("password_hash", "")): return jsonify({"error": "invalid credentials"}), 401 if not user.get("is_active", True): return jsonify({"error": "account disabled"}), 403 sanitized = _scrub_user_payload(user) session[SESSION_USER_KEY] = sanitized # Smart redirect: only apply redirect logic if no explicit next parameter was provided # For security, all users use the same default redirect to avoid exposing permission levels explicit_next = (payload.get("next") if payload else None) or request.args.get("next") if not explicit_next or next_url == "/": # Default redirect path for all authenticated users (统一跳转,避免权限暴露) next_url = "/fs-ai-asistant/api/workflow/lawrisk/db_admin" # Form submissions expect redirects; API clients expect JSON if request.content_type and "application/x-www-form-urlencoded" in request.content_type: return redirect(next_url) return jsonify({"message": "login successful", "user": sanitized, "redirect": next_url}) @auth_bp.post("/auth/logout") def logout_action() -> Response: session.pop(SESSION_USER_KEY, None) if request.content_type and "application/x-www-form-urlencoded" in request.content_type: return redirect(url_for("auth.login_page")) return jsonify({"message": "logged out"}) @auth_bp.get("/auth/me") def current_user_endpoint() -> Response: user = get_current_user() if not user: return jsonify({"authenticated": False}), 401 return jsonify({"authenticated": True, "user": user})