diff --git a/docs/sql/003_create_permit_sources_table.sql b/docs/sql/003_create_permit_sources_table.sql new file mode 100644 index 0000000..357a9e7 --- /dev/null +++ b/docs/sql/003_create_permit_sources_table.sql @@ -0,0 +1,19 @@ +-- Permit source tracking table + +BEGIN; + +CREATE TABLE IF NOT EXISTS public.permit_sources ( + region_id uuid NOT NULL REFERENCES public.regions(id) ON DELETE CASCADE, + permit_id uuid NOT NULL REFERENCES public.permits(id) ON DELETE CASCADE, + source_type text NOT NULL, + source_name text NOT NULL, + source_detail text, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (region_id, permit_id) +); + +CREATE INDEX IF NOT EXISTS permit_sources_source_name_idx + ON public.permit_sources (source_name); + +COMMIT; diff --git a/docs/sql/004_create_region_permit_theme_overrides.sql b/docs/sql/004_create_region_permit_theme_overrides.sql new file mode 100644 index 0000000..c0a09fc --- /dev/null +++ b/docs/sql/004_create_region_permit_theme_overrides.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS region_permit_theme_overrides ( + region_id uuid NOT NULL REFERENCES regions(id) ON DELETE CASCADE, + permit_id uuid NOT NULL REFERENCES permits(id) ON DELETE CASCADE, + binds_all_themes boolean NOT NULL DEFAULT false, + created_at timestamptz NOT NULL DEFAULT now(), + created_by text, + PRIMARY KEY (region_id, permit_id) +); + diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..dcf5597 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict + +from flask import Flask, jsonify + +from lawrisk.api import auth as auth_module +from lawrisk.api.auth import auth_bp + + +def build_test_app() -> Flask: + base_path = Path(__file__).resolve().parents[1] + app = Flask( + __name__, + template_folder=str(base_path / "templates"), + static_folder=str(base_path / "static"), + ) + app.secret_key = "test-secret" + app.register_blueprint(auth_bp) + app.config.update(TESTING=True) + + @app.route("/protected-json") + @auth_module.login_required + def protected_json() -> Any: + return jsonify({"ok": True}) + + @app.route("/protected-page") + @auth_module.login_required + def protected_page() -> Any: + return "secret" + + return app + + +def test_login_success(monkeypatch) -> None: + app = build_test_app() + client = app.test_client() + + fake_user: Dict[str, Any] = { + "id": "user-1", + "username": "admin", + "display_name": "Admin", + "role": "admin", + "grade": 100, + "is_active": True, + "password_hash": "hash", + } + + monkeypatch.setattr(auth_module, "get_user_by_username", lambda _: fake_user) + monkeypatch.setattr(auth_module, "verify_password", lambda *_: True) + + resp = client.post("/auth/login", json={"username": "admin", "password": "secret"}) + assert resp.status_code == 200 + data = resp.get_json() + assert data["user"]["role"] == "admin" + assert data["redirect"] == "/" + with client.session_transaction() as sess: + assert sess[auth_module.SESSION_USER_KEY]["username"] == "admin" + + +def test_login_invalid_credentials(monkeypatch) -> None: + app = build_test_app() + client = app.test_client() + + monkeypatch.setattr(auth_module, "get_user_by_username", lambda *_: None) + + resp = client.post("/auth/login", json={"username": "ghost", "password": "bad"}) + assert resp.status_code == 401 + assert resp.get_json()["error"] == "invalid credentials" + + +def test_login_success_honors_next(monkeypatch) -> None: + app = build_test_app() + client = app.test_client() + + fake_user: Dict[str, Any] = { + "id": "user-1", + "username": "admin", + "display_name": "Admin", + "role": "admin", + "grade": 100, + "is_active": True, + "password_hash": "hash", + } + + monkeypatch.setattr(auth_module, "get_user_by_username", lambda _: fake_user) + monkeypatch.setattr(auth_module, "verify_password", lambda *_: True) + + resp = client.post( + "/auth/login?next=/fs-ai-asistant/api/workflow/lawrisk/db_admin", + json={"username": "admin", "password": "secret"}, + ) + assert resp.status_code == 200 + assert ( + resp.get_json()["redirect"] == "/fs-ai-asistant/api/workflow/lawrisk/db_admin" + ) + + +def test_me_endpoint_requires_session() -> None: + app = build_test_app() + client = app.test_client() + + resp = client.get("/auth/me") + assert resp.status_code == 401 + + with client.session_transaction() as sess: + sess[auth_module.SESSION_USER_KEY] = { + "id": "user-2", + "username": "analyst", + "display_name": "Data Analyst", + "role": "reviewer", + "grade": 20, + } + + resp2 = client.get("/auth/me") + assert resp2.status_code == 200 + assert resp2.get_json()["user"]["role"] == "reviewer" + + +def test_login_required_returns_json_error_for_api() -> None: + app = build_test_app() + client = app.test_client() + + resp = client.get("/protected-json", headers={"Accept": "application/json"}) + assert resp.status_code == 401 + assert resp.get_json()["error"] == "authentication required" + + +def test_login_required_redirects_for_html() -> None: + app = build_test_app() + client = app.test_client() + + resp = client.get("/protected-page", headers={"Accept": "text/html"}) + assert resp.status_code == 302 + assert "/fs-ai-asistant/lawrisk/login" in resp.headers["Location"]