"""Auth blueprint: login page, authentication endpoints, and helpers.""" from __future__ import annotations from functools import wraps from typing import Any, Callable, Dict, Iterable, Optional, Tuple from urllib.parse import urlparse from flask import ( Blueprint, Response, jsonify, redirect, render_template, request, session, url_for, ) from lawrisk.services.auth_service import get_user_by_username, verify_password auth_bp = Blueprint("auth", __name__) SESSION_USER_KEY = "auth_user" def _safe_next_url(candidate: Optional[str]) -> str: if not candidate: return "/" raw = str(candidate).strip() if not raw: return "/" parsed = urlparse(raw) if parsed.netloc: safe_path = parsed.path or "/" else: safe_path = raw if raw.startswith("/") else f"/{raw}" if parsed.query: safe_path = f"{safe_path}?{parsed.query}" return safe_path def _scrub_user_payload(user: Dict[str, Any]) -> Dict[str, Any]: department = None if user.get("service_department_id"): department = { "id": user.get("service_department_id"), "name": user.get("service_department_name"), "code": user.get("service_department_code"), "region_id": user.get("service_department_region_id"), "parent_id": user.get("service_department_parent_id"), "phone": user.get("service_department_phone"), "role": user.get("department_role"), } return { "id": user.get("id"), "username": user.get("username"), "display_name": user.get("display_name"), "role": user.get("role"), "grade": user.get("grade"), "department": department, } def ensure_admin_access( allowed_roles: Optional[Iterable[str]] = None, *, prefer_json: bool = False, ) -> Tuple[Optional[Dict[str, Any]], Optional[Response]]: """Ensure the current session belongs to an admin role.""" user = get_current_user() wants_html = ( (not prefer_json) and request.method == "GET" and request.accept_mimetypes.accept_html and ( request.accept_mimetypes["text/html"] >= request.accept_mimetypes["application/json"] ) ) if not user: if wants_html: login_url = url_for("auth.login_page", next=request.url) return None, redirect(login_url) return None, (jsonify({"error": "authentication required"}), 401) default_roles = ("admin",) if allowed_roles is None else allowed_roles allowed = {str(role).lower() for role in default_roles if role} current_role = str(user.get("role") or "").lower() if allowed and current_role not in allowed: if wants_html: login_url = url_for("auth.login_page", next=request.url, force=1) return None, redirect(login_url) return None, (jsonify({"error": "admin privileges required"}), 403) return user, None def get_current_user() -> Optional[Dict[str, Any]]: data = session.get(SESSION_USER_KEY) if not isinstance(data, dict): return None return data def login_required(view: Callable[..., Response]) -> Callable[..., Response]: @wraps(view) def wrapper(*args: Any, **kwargs: Any) -> Response: user = get_current_user() if not user: wants_html = request.method == "GET" and request.accept_mimetypes.accept_html and ( request.accept_mimetypes["text/html"] >= request.accept_mimetypes["application/json"] ) if wants_html: login_url = url_for("auth.login_page", next=request.url) return redirect(login_url) return jsonify({"error": "authentication required"}), 401 return view(*args, **kwargs) return wrapper def _truthy_param(value: Optional[str]) -> bool: if value is None: return False return str(value).strip().lower() in {"1", "true", "yes", "on", "force", "reauth"} @auth_bp.get("/fs-ai-asistant/lawrisk/login") def login_page() -> Response: user = get_current_user() force_login = _truthy_param(request.args.get("force") or request.args.get("reauth")) if user and not force_login: return redirect(request.args.get("next") or "/") return render_template("login.html") @auth_bp.post("/auth/login") def login_action() -> Response: payload = request.get_json(silent=True) or request.form username = (payload.get("username", "") if payload else "").strip().lower() password = (payload.get("password", "") if payload else "").strip() next_url = _safe_next_url((payload.get("next") if payload else None) or request.args.get("next")) if not username or not password: return jsonify({"error": "username and password are required"}), 400 user = get_user_by_username(username) if not user or not verify_password(password, user.get("password_hash", "")): return jsonify({"error": "invalid credentials"}), 401 if not user.get("is_active", True): return jsonify({"error": "account disabled"}), 403 sanitized = _scrub_user_payload(user) session[SESSION_USER_KEY] = sanitized # Form submissions expect redirects; API clients expect JSON if request.content_type and "application/x-www-form-urlencoded" in request.content_type: return redirect(next_url) return jsonify({"message": "login successful", "user": sanitized, "redirect": next_url}) @auth_bp.post("/auth/logout") def logout_action() -> Response: session.pop(SESSION_USER_KEY, None) if request.content_type and "application/x-www-form-urlencoded" in request.content_type: return redirect(url_for("auth.login_page")) return jsonify({"message": "logged out"}) @auth_bp.get("/auth/me") def current_user_endpoint() -> Response: user = get_current_user() if not user: return jsonify({"authenticated": False}), 401 return jsonify({"authenticated": True, "user": user})