diff --git a/src/squishmark/dependencies.py b/src/squishmark/dependencies.py index b81a316..97f5c79 100644 --- a/src/squishmark/dependencies.py +++ b/src/squishmark/dependencies.py @@ -1,11 +1,14 @@ """FastAPI dependency injection utilities.""" +import logging from typing import Annotated -from fastapi import Depends, Request +from fastapi import Depends, HTTPException, Request from squishmark.config import Settings, get_settings +logger = logging.getLogger(__name__) + # Type alias for settings dependency SettingsDep = Annotated[Settings, Depends(get_settings)] @@ -19,3 +22,42 @@ def is_admin(request: Request) -> bool: if user is None: return False return user.get("login") in settings.admin_users_list + + +def is_htmx(request: Request) -> bool: + """Return True when the request was made by HTMX.""" + return request.headers.get("HX-Request") == "true" + + +async def get_current_admin(request: Request) -> str: + """ + Get the current admin user from session. + + Raises HTTPException 401 if not authenticated. + Raises HTTPException 403 if not an admin. + + For HTMX requests, attaches an ``HX-Redirect`` header so the browser + is redirected to the login page without any client JavaScript. + """ + settings = get_settings() + + # Dev mode auth bypass (requires both flags) + if settings.debug and settings.dev_skip_auth: + logger.warning("Auth bypassed - returning dev-admin user") + return "dev-admin" + + htmx_headers = {"HX-Redirect": "/auth/login"} if is_htmx(request) else None + + # Check for user in session (set by OAuth callback) + user = request.session.get("user") if hasattr(request, "session") else None + + if user is None: + raise HTTPException(status_code=401, detail="Not authenticated", headers=htmx_headers) + + if user["login"] not in settings.admin_users_list: + raise HTTPException(status_code=403, detail="Not authorized", headers=htmx_headers) + + return user["login"] + + +AdminUser = Annotated[str, Depends(get_current_admin)] diff --git a/src/squishmark/routers/admin.py b/src/squishmark/routers/admin.py index 7e817b6..f802d5a 100644 --- a/src/squishmark/routers/admin.py +++ b/src/squishmark/routers/admin.py @@ -10,10 +10,12 @@ from sqlalchemy.ext.asyncio import AsyncSession from squishmark.config import get_settings +from squishmark.dependencies import AdminUser, is_htmx from squishmark.models.content import Config from squishmark.models.db import Note, get_db_session from squishmark.services.analytics import AnalyticsService from squishmark.services.cache import get_cache +from squishmark.services.csrf import get_or_create_csrf_token, verify_csrf_token from squishmark.services.github import get_github_service from squishmark.services.notes import NotesService from squishmark.services.theme import get_theme_engine, reset_theme_engine @@ -23,11 +25,6 @@ router = APIRouter(prefix="/admin", tags=["admin"]) -def is_htmx(request: Request) -> bool: - """Return True when the request was made by HTMX.""" - return request.headers.get("HX-Request") == "true" - - # Pydantic models for request/response class NoteCreate(BaseModel): """Request body for creating a note.""" @@ -65,39 +62,12 @@ class CacheRefreshResponse(BaseModel): duration_ms: float -# Dependency for getting the current admin user -async def get_current_admin(request: Request) -> str: - """ - Get the current admin user from session. - - Raises HTTPException 401 if not authenticated. - Raises HTTPException 403 if not an admin. - - For HTMX requests, attaches an ``HX-Redirect`` header so the browser - is redirected to the login page without any client JavaScript. - """ - settings = get_settings() - - # Dev mode auth bypass (requires both flags) - if settings.debug and settings.dev_skip_auth: - logger.warning("Auth bypassed - returning dev-admin user") - return "dev-admin" - - htmx_headers = {"HX-Redirect": "/auth/login"} if is_htmx(request) else None - - # Check for user in session (set by OAuth callback) - user = request.session.get("user") if hasattr(request, "session") else None - - if user is None: - raise HTTPException(status_code=401, detail="Not authenticated", headers=htmx_headers) +class CSRFTokenResponse(BaseModel): + """Response body for the CSRF token endpoint.""" - if user["login"] not in settings.admin_users_list: - raise HTTPException(status_code=403, detail="Not authorized", headers=htmx_headers) + csrf_token: str - return user["login"] - -AdminUser = Annotated[str, Depends(get_current_admin)] DbSession = Annotated[AsyncSession, Depends(get_db_session)] @@ -214,6 +184,7 @@ async def admin_dashboard( cache = get_cache() # Render admin template + csrf_token = get_or_create_csrf_token(request) theme_engine = await get_theme_engine(github_service) try: html = await theme_engine.render_admin( @@ -222,6 +193,7 @@ async def admin_dashboard( analytics=analytics, notes=[_to_note_response(n) for n in notes], cache_size=cache.size, + csrf_token=csrf_token, ) except Exception: # Fallback if admin template doesn't exist @@ -248,6 +220,14 @@ async def admin_dashboard( return HTMLResponse(content=html) +# CSRF token endpoint (for JSON API callers that can't scrape the meta tag) +@router.get("/csrf") +async def get_csrf(request: Request, admin: AdminUser) -> CSRFTokenResponse: + """Return the current CSRF token for use in subsequent mutation requests.""" + del admin # auth side-effect only + return CSRFTokenResponse(csrf_token=get_or_create_csrf_token(request)) + + # Analytics endpoints @router.get("/analytics") async def get_analytics( @@ -273,7 +253,12 @@ async def list_notes( return [_to_note_response(n) for n in notes] -@router.post("/notes", status_code=201, response_model=None) +@router.post( + "/notes", + status_code=201, + response_model=None, + dependencies=[Depends(verify_csrf_token)], +) async def create_note( request: Request, admin: AdminUser, @@ -295,7 +280,11 @@ async def create_note( return response -@router.put("/notes/{note_id}", response_model=None) +@router.put( + "/notes/{note_id}", + response_model=None, + dependencies=[Depends(verify_csrf_token)], +) async def update_note( request: Request, admin: AdminUser, @@ -320,7 +309,11 @@ async def update_note( return response -@router.delete("/notes/{note_id}", response_model=None) +@router.delete( + "/notes/{note_id}", + response_model=None, + dependencies=[Depends(verify_csrf_token)], +) async def delete_note( request: Request, admin: AdminUser, @@ -371,7 +364,7 @@ async def view_note( # Cache management -@router.post("/cache/refresh") +@router.post("/cache/refresh", dependencies=[Depends(verify_csrf_token)]) async def refresh_cache( admin: AdminUser, ) -> CacheRefreshResponse: diff --git a/src/squishmark/routers/auth.py b/src/squishmark/routers/auth.py index 4503c31..ce769f9 100644 --- a/src/squishmark/routers/auth.py +++ b/src/squishmark/routers/auth.py @@ -1,5 +1,6 @@ """Authentication routes for GitHub OAuth.""" +import logging from urllib.parse import urlencode import httpx @@ -7,6 +8,9 @@ from fastapi.responses import RedirectResponse from squishmark.config import get_settings +from squishmark.services.csrf import SESSION_KEY as CSRF_SESSION_KEY + +logger = logging.getLogger(__name__) router = APIRouter(prefix="/auth", tags=["auth"]) @@ -116,6 +120,9 @@ async def oauth_callback( "name": user_data.get("name"), "avatar_url": user_data.get("avatar_url"), } + # Rotate CSRF token on login so a stale pre-auth token can't be replayed. + request.session.pop(CSRF_SESSION_KEY, None) + logger.info("CSRF token rotated on OAuth login (user=%s)", user_data["login"]) # Redirect to admin return RedirectResponse(url="/admin", status_code=302) diff --git a/src/squishmark/services/csrf.py b/src/squishmark/services/csrf.py new file mode 100644 index 0000000..9a25232 --- /dev/null +++ b/src/squishmark/services/csrf.py @@ -0,0 +1,97 @@ +"""CSRF token generation and verification for admin mutation endpoints. + +Tokens are stored in the signed session cookie under ``SESSION_KEY`` and +validated on POST/PUT/DELETE requests via the ``verify_csrf_token`` dependency. +Clients send the token in the ``X-CSRF-Token`` header (HTMX, JSON API) or a +``csrf_token`` form field (plain form fallback). JSON callers that cannot +read the meta tag can fetch the current token from ``GET /admin/csrf``. +""" + +import logging +import secrets +from typing import Annotated + +from fastapi import Depends, HTTPException, Request + +from squishmark.config import get_settings +from squishmark.dependencies import get_current_admin, is_htmx + +logger = logging.getLogger(__name__) + +SESSION_KEY = "csrf_token" +HEADER_NAME = "X-CSRF-Token" +FORM_FIELD = "csrf_token" + + +def get_or_create_csrf_token(request: Request) -> str: + """Return the session's CSRF token, minting a new one if absent.""" + token = request.session.get(SESSION_KEY) + if not token: + token = secrets.token_urlsafe(32) + request.session[SESSION_KEY] = token + return token + + +async def _extract_submitted_token(request: Request) -> str | None: + """Read the submitted CSRF token from header or form body. + + For form requests we call ``request.form()``; Starlette caches the parsed + form on the request instance, so the route handler's subsequent + ``request.form()`` call is served from cache and does not re-read the body. + If Starlette ever stops caching, both paths still parse independently — but + we'd duplicate work, not lose data. + """ + header_token = request.headers.get(HEADER_NAME) + if header_token: + return header_token + + content_type = request.headers.get("content-type", "") + if content_type.startswith(("application/x-www-form-urlencoded", "multipart/form-data")): + form = await request.form() + value = form.get(FORM_FIELD) + if isinstance(value, str): + return value + return None + + +async def verify_csrf_token( + request: Request, + admin: Annotated[str, Depends(get_current_admin)], +) -> None: + """FastAPI dependency that rejects requests missing or with an invalid CSRF token. + + Depends on :func:`get_current_admin` so authentication resolves first. This + matters for unauthenticated HTMX requests: they get the 401 + ``HX-Redirect`` + response from auth instead of a misleading 403 for a CSRF token they had no + way to obtain. + + Skipped when ``debug`` and ``dev_skip_auth`` are both set, matching the + auth-bypass behavior in ``get_current_admin``. + """ + settings = get_settings() + if settings.debug and settings.dev_skip_auth: + logger.warning("CSRF bypassed - dev_skip_auth is enabled") + return + + # Single user-facing error regardless of which check fails — distinguishing + # "no session token" from "wrong submitted token" would tell an attacker + # whether they got a session to bind to. Server-side log records the + # specific reason so operators can diagnose without that side-channel. + expected = request.session.get(SESSION_KEY) if hasattr(request, "session") else None + submitted = await _extract_submitted_token(request) + if not expected or not submitted or not secrets.compare_digest(submitted, expected): + if not expected: + reason = "no-session-token" + elif not submitted: + reason = "no-submitted-token" + else: + reason = "token-mismatch" + logger.warning( + "CSRF rejected: reason=%s method=%s path=%s admin=%s htmx=%s", + reason, + request.method, + request.url.path, + admin, + is_htmx(request), + ) + raise HTTPException(status_code=403, detail="CSRF validation failed") diff --git a/tests/test_admin_notes.py b/tests/test_admin_notes.py index c7ad3d2..cb8dbde 100644 --- a/tests/test_admin_notes.py +++ b/tests/test_admin_notes.py @@ -300,16 +300,96 @@ async def test_edit_form_not_found_raises_404(): assert exc_info.value.status_code == 404 +@pytest.mark.asyncio +async def test_get_csrf_returns_session_token(): + """GET /admin/csrf returns the current CSRF token for JSON API callers.""" + from squishmark.routers.admin import get_csrf + + request = MagicMock() + request.session = {} + + result = await get_csrf(request=request, admin="test-admin") + + assert result.csrf_token + # Returned token matches what's now stored on the session. + assert request.session["csrf_token"] == result.csrf_token + + +@pytest.mark.asyncio +async def test_get_csrf_idempotent_within_session(): + """Calling GET /admin/csrf twice on the same session returns the same token.""" + from squishmark.routers.admin import get_csrf + + request = MagicMock() + request.session = {} + + first = await get_csrf(request=request, admin="test-admin") + second = await get_csrf(request=request, admin="test-admin") + + assert first.csrf_token == second.csrf_token + + +@pytest.mark.asyncio +async def test_oauth_callback_rotates_csrf_token(): + """A successful OAuth callback clears any prior CSRF token from the session. + + Exercises ``oauth_callback`` end-to-end with mocked GitHub HTTP calls so a + regression that drops the rotation line will fail this test. + """ + from squishmark.routers.auth import oauth_callback + from squishmark.services.csrf import SESSION_KEY + + request = MagicMock() + request.session = {SESSION_KEY: "stale-pre-auth-token"} + request.url_for = MagicMock(return_value="http://localhost:8000/auth/callback") + + fake_settings = MagicMock( + debug=False, + dev_skip_auth=False, + secret_key="0123456789abcdef-rest", # state validates against secret_key[:16] + github_client_id="cid", + github_client_secret="csecret", + ) + + token_response = MagicMock(status_code=200) + token_response.json = MagicMock(return_value={"access_token": "gho_test"}) + user_response = MagicMock(status_code=200) + user_response.json = MagicMock(return_value={"login": "alice", "name": "Alice", "avatar_url": "u"}) + + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=token_response) + mock_client.get = AsyncMock(return_value=user_response) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + + with ( + patch("squishmark.routers.auth.get_settings", return_value=fake_settings), + patch("squishmark.routers.auth.httpx.AsyncClient", return_value=mock_client), + ): + result = await oauth_callback(request=request, code="abc", state="0123456789abcdef") + + assert result.status_code == 302 + assert SESSION_KEY not in request.session + assert request.session["user"]["login"] == "alice" + + # And a fresh token mints on the next get_or_create call, distinct from the stale one. + from squishmark.services.csrf import get_or_create_csrf_token + + new_token = get_or_create_csrf_token(request) + assert new_token + assert new_token != "stale-pre-auth-token" + + @pytest.mark.asyncio async def test_get_current_admin_htmx_attaches_redirect_header(): """HTMX requests with no session get an HX-Redirect header on 401.""" - from squishmark.routers.admin import get_current_admin + from squishmark.dependencies import get_current_admin request = MagicMock() request.session = {} request.headers = {"HX-Request": "true"} - with patch("squishmark.routers.admin.get_settings") as mock_settings: + with patch("squishmark.dependencies.get_settings") as mock_settings: mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False, admin_users_list=[]) with pytest.raises(HTTPException) as exc_info: await get_current_admin(request) @@ -434,13 +514,13 @@ async def test_render_partial_restores_theme_on_render_exception(): @pytest.mark.asyncio async def test_get_current_admin_non_htmx_omits_redirect_header(): """Non-HTMX requests get a plain 401 with no HX-Redirect header.""" - from squishmark.routers.admin import get_current_admin + from squishmark.dependencies import get_current_admin request = MagicMock() request.session = {} request.headers = {} - with patch("squishmark.routers.admin.get_settings") as mock_settings: + with patch("squishmark.dependencies.get_settings") as mock_settings: mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False, admin_users_list=[]) with pytest.raises(HTTPException) as exc_info: await get_current_admin(request) diff --git a/tests/test_csrf.py b/tests/test_csrf.py new file mode 100644 index 0000000..62f06f5 --- /dev/null +++ b/tests/test_csrf.py @@ -0,0 +1,221 @@ +"""Tests for CSRF token generation and verification.""" + +import inspect +from typing import get_args +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from fastapi import HTTPException + +from squishmark.dependencies import get_current_admin +from squishmark.services.csrf import ( + FORM_FIELD, + HEADER_NAME, + SESSION_KEY, + get_or_create_csrf_token, + verify_csrf_token, +) + + +def test_verify_csrf_token_depends_on_get_current_admin(): + """``verify_csrf_token`` must declare get_current_admin as a dependency. + + This forces FastAPI to resolve auth BEFORE the CSRF check, so unauthenticated + HTMX requests get the 401 + HX-Redirect from auth instead of a misleading + 403 for a CSRF token they couldn't have obtained. + """ + sig = inspect.signature(verify_csrf_token) + admin_param = sig.parameters.get("admin") + assert admin_param is not None, "verify_csrf_token must accept 'admin' parameter" + # Annotation is Annotated[str, Depends(get_current_admin)] + depends_objs = [arg for arg in get_args(admin_param.annotation) if hasattr(arg, "dependency")] + assert depends_objs, "admin parameter must use Depends(...)" + assert depends_objs[0].dependency is get_current_admin + + +def _request( + *, + session: dict | None = None, + header_token: str | None = None, + form_body: dict | None = None, + content_type: str = "application/json", + method: str = "POST", + path: str = "/admin/notes", +) -> MagicMock: + """Build a mock Request with a session dict and optional token sources.""" + request = MagicMock() + request.session = session if session is not None else {} + headers = {"content-type": content_type} + if header_token is not None: + headers[HEADER_NAME] = header_token + request.headers = headers + request.form = AsyncMock(return_value=form_body or {}) + request.method = method + request.url.path = path + return request + + +def test_get_or_create_csrf_token_mints_when_absent(): + request = _request() + token = get_or_create_csrf_token(request) + + assert token + assert len(token) > 20 + assert request.session[SESSION_KEY] == token + + +def test_get_or_create_csrf_token_returns_existing(): + request = _request(session={SESSION_KEY: "preexisting-token"}) + + token = get_or_create_csrf_token(request) + + assert token == "preexisting-token" + + +def test_get_or_create_csrf_token_is_idempotent(): + """Calling twice on the same request returns the same token.""" + request = _request() + + first = get_or_create_csrf_token(request) + second = get_or_create_csrf_token(request) + + assert first == second + + +@pytest.mark.asyncio +async def test_verify_csrf_token_accepts_matching_header(): + request = _request( + session={SESSION_KEY: "good-token"}, + header_token="good-token", + ) + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False) + await verify_csrf_token(request, admin="test-admin") # should not raise + + +@pytest.mark.asyncio +async def test_verify_csrf_token_rejects_missing_session_token(): + request = _request(session={}, header_token="anything") + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False) + with pytest.raises(HTTPException) as exc: + await verify_csrf_token(request, admin="test-admin") + + assert exc.value.status_code == 403 + + +@pytest.mark.asyncio +async def test_verify_csrf_token_rejects_missing_submitted_token(): + request = _request(session={SESSION_KEY: "good-token"}) # no header, no form + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False) + with pytest.raises(HTTPException) as exc: + await verify_csrf_token(request, admin="test-admin") + + assert exc.value.status_code == 403 + + +@pytest.mark.asyncio +async def test_verify_csrf_token_error_message_is_generic(): + """The error detail must not leak which check (session vs submitted) failed.""" + request_no_session = _request(session={}, header_token="anything") + request_no_submission = _request(session={SESSION_KEY: "good-token"}) + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False) + with pytest.raises(HTTPException) as exc_a: + await verify_csrf_token(request_no_session, admin="x") + with pytest.raises(HTTPException) as exc_b: + await verify_csrf_token(request_no_submission, admin="x") + + assert exc_a.value.detail == exc_b.value.detail + + +@pytest.mark.asyncio +async def test_verify_csrf_token_logs_rejection_reason(caplog): + """Server-side log records the specific failure mode so operators can debug. + + The user response is generic ('CSRF validation failed') to avoid info + disclosure, so the log is the only place this detail lives. + """ + import logging + + caplog.set_level(logging.WARNING, logger="squishmark.services.csrf") + request = _request( + session={SESSION_KEY: "good"}, + header_token="wrong", + method="POST", + path="/admin/notes", + ) + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False) + with pytest.raises(HTTPException): + await verify_csrf_token(request, admin="alice") + + msg = caplog.text + assert "CSRF rejected" in msg + assert "reason=token-mismatch" in msg + assert "method=POST" in msg + assert "path=/admin/notes" in msg + assert "admin=alice" in msg + + +@pytest.mark.asyncio +async def test_verify_csrf_token_rejects_wrong_header(): + request = _request( + session={SESSION_KEY: "good-token"}, + header_token="wrong-token", + ) + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False) + with pytest.raises(HTTPException) as exc: + await verify_csrf_token(request, admin="test-admin") + + assert exc.value.status_code == 403 + + +@pytest.mark.asyncio +async def test_verify_csrf_token_accepts_form_field_fallback(): + """For form submissions without the header, the csrf_token form field is honored.""" + request = _request( + session={SESSION_KEY: "good-token"}, + form_body={FORM_FIELD: "good-token"}, + content_type="application/x-www-form-urlencoded", + ) + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False) + await verify_csrf_token(request, admin="test-admin") # should not raise + + +@pytest.mark.asyncio +async def test_verify_csrf_token_header_takes_precedence_over_form(): + """A valid header passes even if the form field is wrong.""" + request = _request( + session={SESSION_KEY: "good-token"}, + header_token="good-token", + form_body={FORM_FIELD: "wrong"}, + content_type="application/x-www-form-urlencoded", + ) + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False) + await verify_csrf_token(request, admin="test-admin") # header wins, no raise + + +@pytest.mark.asyncio +async def test_verify_csrf_token_bypassed_in_dev_skip_auth(): + """When debug and dev_skip_auth are both set, CSRF check is skipped.""" + request = _request() # no session, no token — would normally fail + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=True, dev_skip_auth=True) + await verify_csrf_token(request, admin="test-admin") # should not raise + + +@pytest.mark.asyncio +async def test_verify_csrf_token_not_bypassed_in_prod_mode(): + """dev_skip_auth without debug doesn't bypass.""" + request = _request() + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=True) + with pytest.raises(HTTPException) as exc: + await verify_csrf_token(request, admin="test-admin") + + assert exc.value.status_code == 403 diff --git a/tests/test_csrf_integration.py b/tests/test_csrf_integration.py new file mode 100644 index 0000000..26b6da7 --- /dev/null +++ b/tests/test_csrf_integration.py @@ -0,0 +1,123 @@ +"""Integration tests for CSRF enforcement on admin mutation routes. + +These exercise the full FastAPI dependency chain via ``TestClient`` — unlike +``test_csrf.py`` (which calls ``verify_csrf_token`` directly) or +``test_admin_notes.py`` (which calls route handlers directly), the tests here +confirm that ``dependencies=[Depends(verify_csrf_token)]`` on the route +decorators actually fires, and in the correct order relative to auth. +""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from fastapi import FastAPI, Request +from fastapi.testclient import TestClient +from starlette.middleware.sessions import SessionMiddleware + +from squishmark.models.db import get_db_session +from squishmark.routers.admin import router as admin_router +from squishmark.services.csrf import SESSION_KEY + + +def _app() -> FastAPI: + """Minimal FastAPI app wired with admin router + SessionMiddleware.""" + app = FastAPI() + app.add_middleware(SessionMiddleware, secret_key="test-secret-key", session_cookie="s") + app.include_router(admin_router) + # DB session dep — never reached by these tests, but FastAPI resolves all deps + # eagerly so it has to return something. + app.dependency_overrides[get_db_session] = lambda: AsyncMock() + return app + + +@pytest.fixture +def settings_prod(): + """Patch settings to behave like production: debug off, dev_skip_auth off, + 'alice' is the only admin.""" + fake = MagicMock( + debug=False, + dev_skip_auth=False, + admin_users_list=["alice"], + ) + with ( + patch("squishmark.dependencies.get_settings", return_value=fake), + patch("squishmark.services.csrf.get_settings", return_value=fake), + patch("squishmark.routers.admin.get_settings", return_value=fake), + ): + yield fake + + +def _client_with_session(session_data: dict) -> TestClient: + """Build a TestClient whose first request seeds ``request.session`` with + ``session_data``. TestClient's cookie jar persists across requests, so once + the session cookie is set, subsequent requests on the same client carry it. + """ + app = _app() + + @app.get("/_test/seed-session") + async def seed(request: Request) -> dict: + for k, v in session_data.items(): + request.session[k] = v + return {"ok": True} + + client = TestClient(app) + resp = client.get("/_test/seed-session") + assert resp.status_code == 200, resp.text + return client + + +def test_unauthenticated_htmx_mutation_gets_401_redirect_not_403_csrf(settings_prod): + """The crux of the dep-ordering fix: an unauthenticated HTMX POST must hit + the auth check (401 + HX-Redirect), not CSRF (403).""" + del settings_prod + client = TestClient(_app()) + resp = client.post( + "/admin/notes", + json={"path": "/x", "text": "y"}, + headers={"HX-Request": "true"}, + ) + assert resp.status_code == 401 + assert resp.headers.get("HX-Redirect") == "/auth/login" + + +def test_authenticated_mutation_without_csrf_token_gets_403(settings_prod): + """An admin who somehow omits the CSRF token is rejected.""" + del settings_prod + client = _client_with_session({"user": {"login": "alice"}}) + resp = client.post( + "/admin/notes", + json={"path": "/x", "text": "y"}, + ) + assert resp.status_code == 403 + + +def test_authenticated_mutation_with_stale_csrf_token_gets_403(settings_prod): + """An admin sending a token that doesn't match the session value is rejected.""" + del settings_prod + client = _client_with_session({"user": {"login": "alice"}, SESSION_KEY: "real-token"}) + resp = client.post( + "/admin/notes", + json={"path": "/x", "text": "y"}, + headers={"X-CSRF-Token": "stale-or-attacker-token"}, + ) + assert resp.status_code == 403 + + +def test_get_admin_csrf_endpoint_returns_token_for_admin(settings_prod): + """JSON callers can fetch the current token via GET /admin/csrf.""" + del settings_prod + client = _client_with_session({"user": {"login": "alice"}}) + resp = client.get("/admin/csrf") + assert resp.status_code == 200 + body = resp.json() + assert "csrf_token" in body + assert isinstance(body["csrf_token"], str) + assert len(body["csrf_token"]) > 20 + + +def test_get_admin_csrf_endpoint_is_auth_gated(settings_prod): + """Unauthenticated callers must not be able to obtain a token.""" + del settings_prod + client = TestClient(_app()) + resp = client.get("/admin/csrf") + assert resp.status_code == 401 diff --git a/themes/blue-tech/admin/admin.html b/themes/blue-tech/admin/admin.html index 2e43089..c854a1b 100644 --- a/themes/blue-tech/admin/admin.html +++ b/themes/blue-tech/admin/admin.html @@ -3,6 +3,7 @@
+