From 63a999768dd85f4b7493aaf84c1ca0bd165468a6 Mon Sep 17 00:00:00 2001
From: Xeek <6032840+x3ek@users.noreply.github.com>
Date: Sat, 23 May 2026 06:52:57 -0500
Subject: [PATCH 1/6] feat(admin): add CSRF protection to mutation endpoints
Closes #79.
Admin POST/PUT/DELETE routes now require a server-validated CSRF token in addition to the session cookie. Token is per-session, minted lazily when the dashboard renders, rotated on OAuth login, and validated via a FastAPI dependency on each mutation route. Clients send it in the X-CSRF-Token header (HTMX listener reads the token from a tag) with a csrf_token form-field fallback. Dev-skip-auth mode bypasses CSRF to match auth bypass.
Co-Authored-By: Claude Opus 4.7 (1M context)
---
src/squishmark/routers/admin.py | 24 ++++-
src/squishmark/routers/auth.py | 2 +
src/squishmark/services/csrf.py | 64 ++++++++++++
tests/test_csrf.py | 156 +++++++++++++++++++++++++++++
themes/blue-tech/admin/admin.html | 7 ++
themes/default/admin/admin.html | 7 ++
themes/terminal/admin/admin.html | 1 +
themes/terminal/static/js/admin.js | 6 ++
8 files changed, 263 insertions(+), 4 deletions(-)
create mode 100644 src/squishmark/services/csrf.py
create mode 100644 tests/test_csrf.py
diff --git a/src/squishmark/routers/admin.py b/src/squishmark/routers/admin.py
index 7e817b6..99f18a8 100644
--- a/src/squishmark/routers/admin.py
+++ b/src/squishmark/routers/admin.py
@@ -14,6 +14,7 @@
from squishmark.models.db import Note, get_db_session
from squishmark.services.analytics import AnalyticsService
from squishmark.services.cache import get_cache
+from squishmark.services.csrf import get_or_create_csrf_token, verify_csrf_token
from squishmark.services.github import get_github_service
from squishmark.services.notes import NotesService
from squishmark.services.theme import get_theme_engine, reset_theme_engine
@@ -214,6 +215,7 @@ async def admin_dashboard(
cache = get_cache()
# Render admin template
+ csrf_token = get_or_create_csrf_token(request)
theme_engine = await get_theme_engine(github_service)
try:
html = await theme_engine.render_admin(
@@ -222,6 +224,7 @@ async def admin_dashboard(
analytics=analytics,
notes=[_to_note_response(n) for n in notes],
cache_size=cache.size,
+ csrf_token=csrf_token,
)
except Exception:
# Fallback if admin template doesn't exist
@@ -273,7 +276,12 @@ async def list_notes(
return [_to_note_response(n) for n in notes]
-@router.post("/notes", status_code=201, response_model=None)
+@router.post(
+ "/notes",
+ status_code=201,
+ response_model=None,
+ dependencies=[Depends(verify_csrf_token)],
+)
async def create_note(
request: Request,
admin: AdminUser,
@@ -295,7 +303,11 @@ async def create_note(
return response
-@router.put("/notes/{note_id}", response_model=None)
+@router.put(
+ "/notes/{note_id}",
+ response_model=None,
+ dependencies=[Depends(verify_csrf_token)],
+)
async def update_note(
request: Request,
admin: AdminUser,
@@ -320,7 +332,11 @@ async def update_note(
return response
-@router.delete("/notes/{note_id}", response_model=None)
+@router.delete(
+ "/notes/{note_id}",
+ response_model=None,
+ dependencies=[Depends(verify_csrf_token)],
+)
async def delete_note(
request: Request,
admin: AdminUser,
@@ -371,7 +387,7 @@ async def view_note(
# Cache management
-@router.post("/cache/refresh")
+@router.post("/cache/refresh", dependencies=[Depends(verify_csrf_token)])
async def refresh_cache(
admin: AdminUser,
) -> CacheRefreshResponse:
diff --git a/src/squishmark/routers/auth.py b/src/squishmark/routers/auth.py
index 4503c31..8c6aa7d 100644
--- a/src/squishmark/routers/auth.py
+++ b/src/squishmark/routers/auth.py
@@ -116,6 +116,8 @@ async def oauth_callback(
"name": user_data.get("name"),
"avatar_url": user_data.get("avatar_url"),
}
+ # Rotate CSRF token on login so a stale pre-auth token can't be replayed.
+ request.session.pop("csrf_token", None)
# Redirect to admin
return RedirectResponse(url="/admin", status_code=302)
diff --git a/src/squishmark/services/csrf.py b/src/squishmark/services/csrf.py
new file mode 100644
index 0000000..b489468
--- /dev/null
+++ b/src/squishmark/services/csrf.py
@@ -0,0 +1,64 @@
+"""CSRF token generation and verification for admin mutation endpoints.
+
+Tokens are stored in the signed session cookie under ``SESSION_KEY`` and
+validated on POST/PUT/DELETE requests via the ``verify_csrf_token`` dependency.
+Clients send the token in the ``X-CSRF-Token`` header (HTMX, JSON API) or a
+``csrf_token`` form field (plain form fallback).
+"""
+
+import logging
+import secrets
+
+from fastapi import HTTPException, Request
+
+from squishmark.config import get_settings
+
+logger = logging.getLogger(__name__)
+
+SESSION_KEY = "csrf_token"
+HEADER_NAME = "X-CSRF-Token"
+FORM_FIELD = "csrf_token"
+
+
+def get_or_create_csrf_token(request: Request) -> str:
+ """Return the session's CSRF token, minting a new one if absent."""
+ token = request.session.get(SESSION_KEY)
+ if not token:
+ token = secrets.token_urlsafe(32)
+ request.session[SESSION_KEY] = token
+ return token
+
+
+async def _extract_submitted_token(request: Request) -> str | None:
+ """Read the submitted CSRF token from header or form body."""
+ header_token = request.headers.get(HEADER_NAME)
+ if header_token:
+ return header_token
+
+ content_type = request.headers.get("content-type", "")
+ if content_type.startswith(("application/x-www-form-urlencoded", "multipart/form-data")):
+ form = await request.form()
+ value = form.get(FORM_FIELD)
+ if isinstance(value, str):
+ return value
+ return None
+
+
+async def verify_csrf_token(request: Request) -> None:
+ """FastAPI dependency that rejects requests missing or with an invalid CSRF token.
+
+ Skipped when ``debug`` and ``dev_skip_auth`` are both set, matching the
+ auth-bypass behavior in ``get_current_admin``.
+ """
+ settings = get_settings()
+ if settings.debug and settings.dev_skip_auth:
+ logger.warning("CSRF bypassed - dev_skip_auth is enabled")
+ return
+
+ expected = request.session.get(SESSION_KEY) if hasattr(request, "session") else None
+ if not expected:
+ raise HTTPException(status_code=403, detail="CSRF token missing from session")
+
+ submitted = await _extract_submitted_token(request)
+ if not submitted or not secrets.compare_digest(submitted, expected):
+ raise HTTPException(status_code=403, detail="CSRF token invalid")
diff --git a/tests/test_csrf.py b/tests/test_csrf.py
new file mode 100644
index 0000000..67f43bc
--- /dev/null
+++ b/tests/test_csrf.py
@@ -0,0 +1,156 @@
+"""Tests for CSRF token generation and verification."""
+
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+from fastapi import HTTPException
+
+from squishmark.services.csrf import (
+ FORM_FIELD,
+ HEADER_NAME,
+ SESSION_KEY,
+ get_or_create_csrf_token,
+ verify_csrf_token,
+)
+
+
+def _request(
+ *,
+ session: dict | None = None,
+ header_token: str | None = None,
+ form_body: dict | None = None,
+ content_type: str = "application/json",
+) -> MagicMock:
+ """Build a mock Request with a session dict and optional token sources."""
+ request = MagicMock()
+ request.session = session if session is not None else {}
+ headers = {"content-type": content_type}
+ if header_token is not None:
+ headers[HEADER_NAME] = header_token
+ request.headers = headers
+ request.form = AsyncMock(return_value=form_body or {})
+ return request
+
+
+def test_get_or_create_csrf_token_mints_when_absent():
+ request = _request()
+ token = get_or_create_csrf_token(request)
+
+ assert token
+ assert len(token) > 20
+ assert request.session[SESSION_KEY] == token
+
+
+def test_get_or_create_csrf_token_returns_existing():
+ request = _request(session={SESSION_KEY: "preexisting-token"})
+
+ token = get_or_create_csrf_token(request)
+
+ assert token == "preexisting-token"
+
+
+def test_get_or_create_csrf_token_is_idempotent():
+ """Calling twice on the same request returns the same token."""
+ request = _request()
+
+ first = get_or_create_csrf_token(request)
+ second = get_or_create_csrf_token(request)
+
+ assert first == second
+
+
+@pytest.mark.asyncio
+async def test_verify_csrf_token_accepts_matching_header():
+ request = _request(
+ session={SESSION_KEY: "good-token"},
+ header_token="good-token",
+ )
+ with patch("squishmark.services.csrf.get_settings") as mock_settings:
+ mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False)
+ await verify_csrf_token(request) # should not raise
+
+
+@pytest.mark.asyncio
+async def test_verify_csrf_token_rejects_missing_session_token():
+ request = _request(session={}, header_token="anything")
+ with patch("squishmark.services.csrf.get_settings") as mock_settings:
+ mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False)
+ with pytest.raises(HTTPException) as exc:
+ await verify_csrf_token(request)
+
+ assert exc.value.status_code == 403
+ assert "missing" in str(exc.value.detail).lower()
+
+
+@pytest.mark.asyncio
+async def test_verify_csrf_token_rejects_missing_submitted_token():
+ request = _request(session={SESSION_KEY: "good-token"}) # no header, no form
+ with patch("squishmark.services.csrf.get_settings") as mock_settings:
+ mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False)
+ with pytest.raises(HTTPException) as exc:
+ await verify_csrf_token(request)
+
+ assert exc.value.status_code == 403
+ assert "invalid" in str(exc.value.detail).lower()
+
+
+@pytest.mark.asyncio
+async def test_verify_csrf_token_rejects_wrong_header():
+ request = _request(
+ session={SESSION_KEY: "good-token"},
+ header_token="wrong-token",
+ )
+ with patch("squishmark.services.csrf.get_settings") as mock_settings:
+ mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False)
+ with pytest.raises(HTTPException) as exc:
+ await verify_csrf_token(request)
+
+ assert exc.value.status_code == 403
+
+
+@pytest.mark.asyncio
+async def test_verify_csrf_token_accepts_form_field_fallback():
+ """For form submissions without the header, the csrf_token form field is honored."""
+ request = _request(
+ session={SESSION_KEY: "good-token"},
+ form_body={FORM_FIELD: "good-token"},
+ content_type="application/x-www-form-urlencoded",
+ )
+ with patch("squishmark.services.csrf.get_settings") as mock_settings:
+ mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False)
+ await verify_csrf_token(request) # should not raise
+
+
+@pytest.mark.asyncio
+async def test_verify_csrf_token_header_takes_precedence_over_form():
+ """A valid header passes even if the form field is wrong."""
+ request = _request(
+ session={SESSION_KEY: "good-token"},
+ header_token="good-token",
+ form_body={FORM_FIELD: "wrong"},
+ content_type="application/x-www-form-urlencoded",
+ )
+ with patch("squishmark.services.csrf.get_settings") as mock_settings:
+ mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=False)
+ await verify_csrf_token(request) # header wins, no raise
+
+
+@pytest.mark.asyncio
+async def test_verify_csrf_token_bypassed_in_dev_skip_auth():
+ """When debug and dev_skip_auth are both set, CSRF check is skipped."""
+ request = _request() # no session, no token — would normally fail
+ with patch("squishmark.services.csrf.get_settings") as mock_settings:
+ mock_settings.return_value = MagicMock(debug=True, dev_skip_auth=True)
+ await verify_csrf_token(request) # should not raise
+
+
+@pytest.mark.asyncio
+async def test_verify_csrf_token_not_bypassed_in_prod_mode():
+ """dev_skip_auth without debug doesn't bypass."""
+ request = _request()
+ with patch("squishmark.services.csrf.get_settings") as mock_settings:
+ mock_settings.return_value = MagicMock(debug=False, dev_skip_auth=True)
+ with pytest.raises(HTTPException) as exc:
+ await verify_csrf_token(request)
+
+ assert exc.value.status_code == 403
diff --git a/themes/blue-tech/admin/admin.html b/themes/blue-tech/admin/admin.html
index 2e43089..c854a1b 100644
--- a/themes/blue-tech/admin/admin.html
+++ b/themes/blue-tech/admin/admin.html
@@ -3,6 +3,7 @@
+
Admin - {{ site.title }}
@@ -124,6 +125,12 @@ Cache