From 63a999768dd85f4b7493aaf84c1ca0bd165468a6 Mon Sep 17 00:00:00 2001 From: Xeek <6032840+x3ek@users.noreply.github.com> Date: Sat, 23 May 2026 06:52:57 -0500 Subject: [PATCH 1/6] feat(admin): add CSRF protection to mutation endpoints Closes #79. Admin POST/PUT/DELETE routes now require a server-validated CSRF token in addition to the session cookie. Token is per-session, minted lazily when the dashboard renders, rotated on OAuth login, and validated via a FastAPI dependency on each mutation route. Clients send it in the X-CSRF-Token header (HTMX listener reads the token from a tag) with a csrf_token form-field fallback. Dev-skip-auth mode bypasses CSRF to match auth bypass. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/squishmark/routers/admin.py | 24 ++++- src/squishmark/routers/auth.py | 2 + src/squishmark/services/csrf.py | 64 ++++++++++++ tests/test_csrf.py | 156 +++++++++++++++++++++++++++++ themes/blue-tech/admin/admin.html | 7 ++ themes/default/admin/admin.html | 7 ++ themes/terminal/admin/admin.html | 1 + themes/terminal/static/js/admin.js | 6 ++ 8 files changed, 263 insertions(+), 4 deletions(-) create mode 100644 src/squishmark/services/csrf.py create mode 100644 tests/test_csrf.py diff --git a/src/squishmark/routers/admin.py b/src/squishmark/routers/admin.py index 7e817b6..99f18a8 100644 --- a/src/squishmark/routers/admin.py +++ b/src/squishmark/routers/admin.py @@ -14,6 +14,7 @@ from squishmark.models.db import Note, get_db_session from squishmark.services.analytics import AnalyticsService from squishmark.services.cache import get_cache +from squishmark.services.csrf import get_or_create_csrf_token, verify_csrf_token from squishmark.services.github import get_github_service from squishmark.services.notes import NotesService from squishmark.services.theme import get_theme_engine, reset_theme_engine @@ -214,6 +215,7 @@ async def admin_dashboard( cache = get_cache() # Render admin template + csrf_token = get_or_create_csrf_token(request) theme_engine = await get_theme_engine(github_service) try: html = await theme_engine.render_admin( @@ -222,6 +224,7 @@ async def admin_dashboard( analytics=analytics, notes=[_to_note_response(n) for n in notes], cache_size=cache.size, + csrf_token=csrf_token, ) except Exception: # Fallback if admin template doesn't exist @@ -273,7 +276,12 @@ async def list_notes( return [_to_note_response(n) for n in notes] -@router.post("/notes", status_code=201, response_model=None) +@router.post( + "/notes", + status_code=201, + response_model=None, + dependencies=[Depends(verify_csrf_token)], +) async def create_note( request: Request, admin: AdminUser, @@ -295,7 +303,11 @@ async def create_note( return response -@router.put("/notes/{note_id}", response_model=None) +@router.put( + "/notes/{note_id}", + response_model=None, + dependencies=[Depends(verify_csrf_token)], +) async def update_note( request: Request, admin: AdminUser, @@ -320,7 +332,11 @@ async def update_note( return response -@router.delete("/notes/{note_id}", response_model=None) +@router.delete( + "/notes/{note_id}", + response_model=None, + dependencies=[Depends(verify_csrf_token)], +) async def delete_note( request: Request, admin: AdminUser, @@ -371,7 +387,7 @@ async def view_note( # Cache management -@router.post("/cache/refresh") +@router.post("/cache/refresh", dependencies=[Depends(verify_csrf_token)]) async def refresh_cache( admin: AdminUser, ) -> CacheRefreshResponse: diff --git a/src/squishmark/routers/auth.py b/src/squishmark/routers/auth.py index 4503c31..8c6aa7d 100644 --- a/src/squishmark/routers/auth.py +++ b/src/squishmark/routers/auth.py @@ -116,6 +116,8 @@ async def oauth_callback( "name": user_data.get("name"), "avatar_url": user_data.get("avatar_url"), } + # Rotate CSRF token on login so a stale pre-auth token can't be replayed. + request.session.pop("csrf_token", None) # Redirect to admin return RedirectResponse(url="/admin", status_code=302) diff --git a/src/squishmark/services/csrf.py b/src/squishmark/services/csrf.py new file mode 100644 index 0000000..b489468 --- /dev/null +++ b/src/squishmark/services/csrf.py @@ -0,0 +1,64 @@ +"""CSRF token generation and verification for admin mutation endpoints. + +Tokens are stored in the signed session cookie under ``SESSION_KEY`` and +validated on POST/PUT/DELETE requests via the ``verify_csrf_token`` dependency. +Clients send the token in the ``X-CSRF-Token`` header (HTMX, JSON API) or a +``csrf_token`` form field (plain form fallback). +""" + +import logging +import secrets + +from fastapi import HTTPException, Request + +from squishmark.config import get_settings + +logger = logging.getLogger(__name__) + +SESSION_KEY = "csrf_token" +HEADER_NAME = "X-CSRF-Token" +FORM_FIELD = "csrf_token" + + +def get_or_create_csrf_token(request: Request) -> str: + """Return the session's CSRF token, minting a new one if absent.""" + token = request.session.get(SESSION_KEY) + if not token: + token = secrets.token_urlsafe(32) + request.session[SESSION_KEY] = token + return token + + +async def _extract_submitted_token(request: Request) -> str | None: + """Read the submitted CSRF token from header or form body.""" + header_token = request.headers.get(HEADER_NAME) + if header_token: + return header_token + + content_type = request.headers.get("content-type", "") + if content_type.startswith(("application/x-www-form-urlencoded", "multipart/form-data")): + form = await request.form() + value = form.get(FORM_FIELD) + if isinstance(value, str): + return value + return None + + +async def verify_csrf_token(request: Request) -> None: + """FastAPI dependency that rejects requests missing or with an invalid CSRF token. + + Skipped when ``debug`` and ``dev_skip_auth`` are both set, matching the + auth-bypass behavior in ``get_current_admin``. + """ + settings = get_settings() + if settings.debug and settings.dev_skip_auth: + logger.warning("CSRF bypassed - dev_skip_auth is enabled") + return + + expected = request.session.get(SESSION_KEY) if hasattr(request, "session") else None + if not expected: + raise HTTPException(status_code=403, detail="CSRF token missing from session") + + submitted = await _extract_submitted_token(request) + if not submitted or not secrets.compare_digest(submitted, expected): + raise HTTPException(status_code=403, detail="CSRF token invalid") diff --git a/tests/test_csrf.py b/tests/test_csrf.py new file mode 100644 index 0000000..67f43bc --- /dev/null +++ b/tests/test_csrf.py @@ -0,0 +1,156 @@ +"""Tests for CSRF token generation and verification.""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from fastapi import HTTPException + +from squishmark.services.csrf import ( + FORM_FIELD, + HEADER_NAME, + SESSION_KEY, + get_or_create_csrf_token, + verify_csrf_token, +) + + +def _request( + *, + session: dict | None = None, + header_token: str | None = None, + form_body: dict | None = None, + content_type: str = "application/json", +) -> MagicMock: + """Build a mock Request with a session dict and optional token sources.""" + request = MagicMock() + request.session = session if session is not None else {} + headers = {"content-type": content_type} + if header_token is not None: + headers[HEADER_NAME] = header_token + request.headers = headers + request.form = AsyncMock(return_value=form_body or {}) + return request + + +def test_get_or_create_csrf_token_mints_when_absent(): + request = _request() + token = get_or_create_csrf_token(request) + + assert token + assert len(token) > 20 + assert request.session[SESSION_KEY] == token + + +def test_get_or_create_csrf_token_returns_existing(): + request = _request(session={SESSION_KEY: "preexisting-token"}) + + token = get_or_create_csrf_token(request) + + assert token == "preexisting-token" + + +def test_get_or_create_csrf_token_is_idempotent(): + """Calling twice on the same request returns the same token.""" + request = _request() + + first = get_or_create_csrf_token(request) + second = get_or_create_csrf_token(request) + + assert first == second + + +@pytest.mark.asyncio +async def test_verify_csrf_token_accepts_matching_header(): + request = _request( + session={SESSION_KEY: "good-token"}, + header_token="good-token", + ) + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False) + await verify_csrf_token(request) # should not raise + + +@pytest.mark.asyncio +async def test_verify_csrf_token_rejects_missing_session_token(): + request = _request(session={}, header_token="anything") + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False) + with pytest.raises(HTTPException) as exc: + await verify_csrf_token(request) + + assert exc.value.status_code == 403 + assert "missing" in str(exc.value.detail).lower() + + +@pytest.mark.asyncio +async def test_verify_csrf_token_rejects_missing_submitted_token(): + request = _request(session={SESSION_KEY: "good-token"}) # no header, no form + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False) + with pytest.raises(HTTPException) as exc: + await verify_csrf_token(request) + + assert exc.value.status_code == 403 + assert "invalid" in str(exc.value.detail).lower() + + +@pytest.mark.asyncio +async def test_verify_csrf_token_rejects_wrong_header(): + request = _request( + session={SESSION_KEY: "good-token"}, + header_token="wrong-token", + ) + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False) + with pytest.raises(HTTPException) as exc: + await verify_csrf_token(request) + + assert exc.value.status_code == 403 + + +@pytest.mark.asyncio +async def test_verify_csrf_token_accepts_form_field_fallback(): + """For form submissions without the header, the csrf_token form field is honored.""" + request = _request( + session={SESSION_KEY: "good-token"}, + form_body={FORM_FIELD: "good-token"}, + content_type="application/x-www-form-urlencoded", + ) + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False) + await verify_csrf_token(request) # should not raise + + +@pytest.mark.asyncio +async def test_verify_csrf_token_header_takes_precedence_over_form(): + """A valid header passes even if the form field is wrong.""" + request = _request( + session={SESSION_KEY: "good-token"}, + header_token="good-token", + form_body={FORM_FIELD: "wrong"}, + content_type="application/x-www-form-urlencoded", + ) + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False) + await verify_csrf_token(request) # header wins, no raise + + +@pytest.mark.asyncio +async def test_verify_csrf_token_bypassed_in_dev_skip_auth(): + """When debug and dev_skip_auth are both set, CSRF check is skipped.""" + request = _request() # no session, no token — would normally fail + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=True, dev_skip_auth=True) + await verify_csrf_token(request) # should not raise + + +@pytest.mark.asyncio +async def test_verify_csrf_token_not_bypassed_in_prod_mode(): + """dev_skip_auth without debug doesn't bypass.""" + request = _request() + with patch("squishmark.services.csrf.get_settings") as mock_settings: + mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=True) + with pytest.raises(HTTPException) as exc: + await verify_csrf_token(request) + + assert exc.value.status_code == 403 diff --git a/themes/blue-tech/admin/admin.html b/themes/blue-tech/admin/admin.html index 2e43089..c854a1b 100644 --- a/themes/blue-tech/admin/admin.html +++ b/themes/blue-tech/admin/admin.html @@ -3,6 +3,7 @@ + Admin - {{ site.title }} @@ -124,6 +125,12 @@

Cache