fs-lawrisk/lawrisk/api/auth.py

236 lines
8.0 KiB
Python

"""Auth blueprint: login page, authentication endpoints, and helpers."""
from __future__ import annotations
from functools import wraps
from typing import Any, Callable, Dict, Iterable, Optional, Tuple
from urllib.parse import urlparse
from flask import (
Blueprint,
Response,
jsonify,
redirect,
render_template,
request,
session,
url_for,
)
from lawrisk.services.auth_service import get_user_by_username, verify_password
auth_bp = Blueprint("auth", __name__, url_prefix="/fs-ai-asistant/api/workflow/lawrisk")
SESSION_USER_KEY = "auth_user"
def _safe_next_url(candidate: Optional[str]) -> str:
"""
Sanitize next URL to prevent security issues.
For security reasons, we don't preserve query parameters that might
contain sensitive data like passwords. We only preserve the path.
"""
if not candidate:
return "/"
raw = str(candidate).strip()
if not raw:
return "/"
parsed = urlparse(raw)
if parsed.netloc:
safe_path = parsed.path or "/"
else:
safe_path = raw if raw.startswith("/") else f"/{raw}"
# For security: strip ALL query parameters to prevent password leakage
# Only preserve path, never query strings
return safe_path
def _scrub_user_payload(user: Dict[str, Any]) -> Dict[str, Any]:
department = None
if user.get("service_department_id"):
department = {
"id": user.get("service_department_id"),
"name": user.get("service_department_name"),
"code": user.get("service_department_code"),
"region_id": user.get("service_department_region_id"),
"region_name": user.get("service_department_region_name"),
"parent_id": user.get("service_department_parent_id"),
"phone": user.get("service_department_phone"),
"role": user.get("department_role"),
}
return {
"id": user.get("id"),
"username": user.get("username"),
"display_name": user.get("display_name"),
"role": user.get("role"),
"grade": user.get("grade"),
"department": department,
}
def ensure_admin_access(
allowed_roles: Optional[Iterable[str]] = None,
*,
prefer_json: bool = False,
) -> Tuple[Optional[Dict[str, Any]], Optional[Response]]:
"""Ensure the current session belongs to an admin role."""
user = get_current_user()
wants_html = (
(not prefer_json)
and request.method == "GET"
and request.accept_mimetypes.accept_html
and (
request.accept_mimetypes["text/html"]
>= request.accept_mimetypes["application/json"]
)
)
if not user:
if wants_html:
# Sanitize URL to prevent password leakage in redirect
safe_next = _safe_next_url(request.url)
login_url = url_for("auth.login_page", next=safe_next)
return None, redirect(login_url)
return None, (jsonify({"error": "authentication required"}), 401)
default_roles = ("admin",) if allowed_roles is None else allowed_roles
allowed = {str(role).lower() for role in default_roles if role}
current_role = str(user.get("role") or "").lower()
if allowed and current_role not in allowed:
if wants_html:
# Sanitize URL to prevent password leakage in redirect
safe_next = _safe_next_url(request.url)
login_url = url_for("auth.login_page", next=safe_next, force=1)
return None, redirect(login_url)
return None, (jsonify({"error": "admin privileges required"}), 403)
return user, None
def is_superuser(user: Optional[Dict[str, Any]]) -> bool:
"""Check if the given user is a superuser with full system access.
Superuser criteria (any match):
1. role equals 'admin'
2. username equals 'fssjsj'
3. grade equals 100 (fallback check)
Args:
user: User dictionary from get_current_user()
Returns:
True if user is a superuser, False otherwise
"""
if not user:
return False
# Primary check: role-based
if user.get("role") == "admin":
return True
# Special exception: specific username
if user.get("username") == "fssjsj":
return True
# Fallback check: grade-based
if user.get("grade") == 100:
return True
return False
def get_current_user() -> Optional[Dict[str, Any]]:
data = session.get(SESSION_USER_KEY)
if not isinstance(data, dict):
return None
return data
def login_required(view: Callable[..., Response]) -> Callable[..., Response]:
@wraps(view)
def wrapper(*args: Any, **kwargs: Any) -> Response:
user = get_current_user()
if not user:
wants_html = request.method == "GET" and request.accept_mimetypes.accept_html and (
request.accept_mimetypes["text/html"] >= request.accept_mimetypes["application/json"]
)
if wants_html:
# Sanitize URL to prevent password leakage in redirect
safe_next = _safe_next_url(request.url)
login_url = url_for("auth.login_page", next=safe_next)
return redirect(login_url)
return jsonify({"error": "authentication required"}), 401
return view(*args, **kwargs)
return wrapper
def _truthy_param(value: Optional[str]) -> bool:
if value is None:
return False
return str(value).strip().lower() in {"1", "true", "yes", "on", "force", "reauth"}
@auth_bp.get("/login")
def login_page() -> Response:
user = get_current_user()
force_login = _truthy_param(request.args.get("force") or request.args.get("reauth"))
if user and not force_login:
return redirect(request.args.get("next") or "/")
return render_template("login.html")
@auth_bp.post("/auth/login")
def login_action() -> Response:
payload = request.get_json(silent=True) or request.form
username = (payload.get("username", "") if payload else "").strip().lower()
password = (payload.get("password", "") if payload else "").strip()
next_url = _safe_next_url((payload.get("next") if payload else None) or request.args.get("next"))
if not username or not password:
return jsonify({"error": "username and password are required"}), 400
user = get_user_by_username(username)
if not user or not verify_password(password, user.get("password_hash", "")):
return jsonify({"error": "invalid credentials"}), 401
if not user.get("is_active", True):
return jsonify({"error": "account disabled"}), 403
sanitized = _scrub_user_payload(user)
session[SESSION_USER_KEY] = sanitized
# Smart redirect: only apply redirect logic if no explicit next parameter was provided
# For security, all users use the same default redirect to avoid exposing permission levels
explicit_next = (payload.get("next") if payload else None) or request.args.get("next")
if not explicit_next or next_url == "/":
# Default redirect path: admin to db_admin, others to root
if sanitized.get("role") == "admin":
next_url = "/fs-ai-asistant/api/workflow/lawrisk/db_admin"
else:
# For non-admin, redirect to root or homepage if available
next_url = "/fs-ai-asistant/api/workflow/lawrisk/"
# Form submissions expect redirects; API clients expect JSON
if request.content_type and "application/x-www-form-urlencoded" in request.content_type:
return redirect(next_url)
return jsonify({"message": "login successful", "user": sanitized, "redirect": next_url})
@auth_bp.post("/auth/logout")
def logout_action() -> Response:
session.pop(SESSION_USER_KEY, None)
if request.content_type and "application/x-www-form-urlencoded" in request.content_type:
return redirect(url_for("auth.login_page"))
return jsonify({"message": "logged out"})
@auth_bp.get("/auth/me")
def current_user_endpoint() -> Response:
user = get_current_user()
if not user:
return jsonify({"authenticated": False}), 401
return jsonify({"authenticated": True, "user": user})