fs-lawrisk/lawrisk/api/auth.py

204 lines
7.3 KiB
Python
Raw Normal View History

2025-11-14 15:46:18 +08:00
"""Auth blueprint: login page, authentication endpoints, and helpers."""
from __future__ import annotations
from functools import wraps
from typing import Any, Callable, Dict, Iterable, Optional, Tuple
from urllib.parse import urlparse
from flask import (
Blueprint,
Response,
jsonify,
redirect,
render_template,
request,
session,
url_for,
)
from lawrisk.services.auth_service import get_user_by_username, verify_password
auth_bp = Blueprint("auth", __name__, url_prefix="/fs-ai-asistant/api/workflow/lawrisk")
2025-11-14 15:46:18 +08:00
SESSION_USER_KEY = "auth_user"
def _safe_next_url(candidate: Optional[str]) -> str:
"""
Sanitize next URL to prevent security issues.
For security reasons, we don't preserve query parameters that might
contain sensitive data like passwords. We only preserve the path.
"""
2025-11-14 15:46:18 +08:00
if not candidate:
return "/"
raw = str(candidate).strip()
if not raw:
return "/"
parsed = urlparse(raw)
if parsed.netloc:
safe_path = parsed.path or "/"
else:
safe_path = raw if raw.startswith("/") else f"/{raw}"
# For security: strip ALL query parameters to prevent password leakage
# Only preserve path, never query strings
2025-11-14 15:46:18 +08:00
return safe_path
def _scrub_user_payload(user: Dict[str, Any]) -> Dict[str, Any]:
department = None
if user.get("service_department_id"):
department = {
"id": user.get("service_department_id"),
"name": user.get("service_department_name"),
"code": user.get("service_department_code"),
"region_id": user.get("service_department_region_id"),
2025-11-27 17:13:49 +08:00
"region_name": user.get("service_department_region_name"),
2025-11-14 15:46:18 +08:00
"parent_id": user.get("service_department_parent_id"),
"phone": user.get("service_department_phone"),
"role": user.get("department_role"),
}
return {
"id": user.get("id"),
"username": user.get("username"),
"display_name": user.get("display_name"),
"role": user.get("role"),
"grade": user.get("grade"),
"department": department,
}
def ensure_admin_access(
allowed_roles: Optional[Iterable[str]] = None,
*,
prefer_json: bool = False,
) -> Tuple[Optional[Dict[str, Any]], Optional[Response]]:
"""Ensure the current session belongs to an admin role."""
user = get_current_user()
wants_html = (
(not prefer_json)
and request.method == "GET"
and request.accept_mimetypes.accept_html
and (
request.accept_mimetypes["text/html"]
>= request.accept_mimetypes["application/json"]
)
)
if not user:
if wants_html:
# Sanitize URL to prevent password leakage in redirect
safe_next = _safe_next_url(request.url)
login_url = url_for("auth.login_page", next=safe_next)
2025-11-14 15:46:18 +08:00
return None, redirect(login_url)
return None, (jsonify({"error": "authentication required"}), 401)
default_roles = ("admin",) if allowed_roles is None else allowed_roles
allowed = {str(role).lower() for role in default_roles if role}
current_role = str(user.get("role") or "").lower()
if allowed and current_role not in allowed:
if wants_html:
# Sanitize URL to prevent password leakage in redirect
safe_next = _safe_next_url(request.url)
login_url = url_for("auth.login_page", next=safe_next, force=1)
2025-11-14 15:46:18 +08:00
return None, redirect(login_url)
return None, (jsonify({"error": "admin privileges required"}), 403)
return user, None
def get_current_user() -> Optional[Dict[str, Any]]:
data = session.get(SESSION_USER_KEY)
if not isinstance(data, dict):
return None
return data
def login_required(view: Callable[..., Response]) -> Callable[..., Response]:
@wraps(view)
def wrapper(*args: Any, **kwargs: Any) -> Response:
user = get_current_user()
if not user:
wants_html = request.method == "GET" and request.accept_mimetypes.accept_html and (
request.accept_mimetypes["text/html"] >= request.accept_mimetypes["application/json"]
)
if wants_html:
# Sanitize URL to prevent password leakage in redirect
safe_next = _safe_next_url(request.url)
login_url = url_for("auth.login_page", next=safe_next)
2025-11-14 15:46:18 +08:00
return redirect(login_url)
return jsonify({"error": "authentication required"}), 401
return view(*args, **kwargs)
return wrapper
def _truthy_param(value: Optional[str]) -> bool:
if value is None:
return False
return str(value).strip().lower() in {"1", "true", "yes", "on", "force", "reauth"}
@auth_bp.get("/login")
2025-11-14 15:46:18 +08:00
def login_page() -> Response:
user = get_current_user()
force_login = _truthy_param(request.args.get("force") or request.args.get("reauth"))
if user and not force_login:
return redirect(request.args.get("next") or "/")
return render_template("login.html")
@auth_bp.post("/auth/login")
def login_action() -> Response:
payload = request.get_json(silent=True) or request.form
username = (payload.get("username", "") if payload else "").strip().lower()
password = (payload.get("password", "") if payload else "").strip()
next_url = _safe_next_url((payload.get("next") if payload else None) or request.args.get("next"))
if not username or not password:
return jsonify({"error": "username and password are required"}), 400
user = get_user_by_username(username)
if not user or not verify_password(password, user.get("password_hash", "")):
return jsonify({"error": "invalid credentials"}), 401
if not user.get("is_active", True):
return jsonify({"error": "account disabled"}), 403
sanitized = _scrub_user_payload(user)
session[SESSION_USER_KEY] = sanitized
# Smart redirect: only apply redirect logic if no explicit next parameter was provided
# For security, all users use the same default redirect to avoid exposing permission levels
explicit_next = (payload.get("next") if payload else None) or request.args.get("next")
if not explicit_next or next_url == "/":
# Default redirect path: admin to db_admin, others to root
if sanitized.get("role") == "admin":
next_url = "/fs-ai-asistant/api/workflow/lawrisk/db_admin"
else:
# For non-admin, redirect to root or homepage if available
next_url = "/fs-ai-asistant/api/workflow/lawrisk/"
2025-11-14 15:46:18 +08:00
# Form submissions expect redirects; API clients expect JSON
if request.content_type and "application/x-www-form-urlencoded" in request.content_type:
return redirect(next_url)
return jsonify({"message": "login successful", "user": sanitized, "redirect": next_url})
@auth_bp.post("/auth/logout")
def logout_action() -> Response:
session.pop(SESSION_USER_KEY, None)
if request.content_type and "application/x-www-form-urlencoded" in request.content_type:
return redirect(url_for("auth.login_page"))
return jsonify({"message": "logged out"})
@auth_bp.get("/auth/me")
def current_user_endpoint() -> Response:
user = get_current_user()
if not user:
return jsonify({"authenticated": False}), 401
return jsonify({"authenticated": True, "user": user})