Skip to content

Commit f14b16b

Browse files
x3ekclaude
andauthored
feat(admin): add CSRF protection to mutation endpoints (#79)
* feat(admin): add CSRF protection to mutation endpoints Closes #79. Admin POST/PUT/DELETE routes now require a server-validated CSRF token in addition to the session cookie. Token is per-session, minted lazily when the dashboard renders, rotated on OAuth login, and validated via a FastAPI dependency on each mutation route. Clients send it in the X-CSRF-Token header (HTMX listener reads the token from a <meta> tag) with a csrf_token form-field fallback. Dev-skip-auth mode bypasses CSRF to match auth bypass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(admin): add GET /admin/csrf for JSON callers, use CSRF_SESSION_KEY constant Address Copilot review on PR #83: 1. auth.py now uses SESSION_KEY constant from services.csrf instead of hardcoding 'csrf_token' — avoids key drift if the storage key ever changes. 2. New GET /admin/csrf endpoint returns the current token in JSON, matching the issue's acceptance criterion for non-HTML clients. The module docstring already promised JSON API support; this makes good on it. Tests added for the new endpoint (idempotent within session) and the auth rotation behavior. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(auth): make CSRF dep resolve after auth so 401 fires before 403 Address Copilot review pass 2 on PR #83: Route-level dependencies=[Depends(verify_csrf_token)] resolves before parameter deps (including AdminUser), so unauthenticated HTMX requests would get 403 (CSRF token missing) instead of the intended 401 + HX-Redirect. Fix by: 1. Move get_current_admin from routers/admin.py to dependencies.py so services/csrf.py can import it without a circular dep. Re-exported from admin.py so existing test patches keep working. 2. verify_csrf_token now depends on get_current_admin via Annotated[str, Depends(...)]. FastAPI resolves auth first; if it raises 401, CSRF never runs. Also rewrote test_oauth_callback_rotates_csrf_token to actually exercise oauth_callback with mocked httpx calls (per Copilot feedback — the previous version only tested dict.pop). Added a structural test that locks in the new CSRF→auth dep ordering. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(deps): consolidate is_htmx + fix stale test patch targets Address Copilot review pass 3 on PR #83: 1. Centralize is_htmx in dependencies.py and import it from routers/admin.py. Previously the prior commit duplicated the helper (as _is_htmx in dependencies.py alongside the existing is_htmx in admin.py). 2. Update test patches that target squishmark.routers.admin.get_settings to squishmark.dependencies.get_settings. After moving get_current_admin to dependencies.py the old patches were silently no-ops — tests passed only because the real settings happened to produce the same outcome for an empty session. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(csrf): self-review pass — integration tests, generic error, polish Independent code review pass on PR #83. Six changes: 1. tests/test_csrf_integration.py (new): five TestClient-based tests that actually exercise the route-level CSRF dep and the auth-before-CSRF ordering — previously all four 'dependencies=[Depends(verify_csrf_token)]' lines were uncovered (existing tests call handler functions directly and bypass FastAPI's DI). Includes the crucial 'unauth HTMX → 401+HX-Redirect, not 403' assertion. 2. services/csrf.py: collapse the two distinct error messages ('CSRF token missing from session' vs 'CSRF token invalid') into one generic 'CSRF validation failed'. The prior split was an information disclosure — it told an attacker whether they had a session cookie to bind to. 3. services/csrf.py: document the assumption that Starlette caches request.form() so the dep + route handler can both read it without losing the body. 4. routers/admin.py: drop the __all__ re-export of get_current_admin / AdminUser. Test imports updated to point at squishmark.dependencies directly — there were no production callers. 5. routers/admin.py: GET /admin/csrf now returns a typed CSRFTokenResponse Pydantic model instead of a raw dict, matching the convention of NoteResponse and CacheRefreshResponse. 6. tests/test_admin_notes.py: oauth_callback rotation test now also asserts a fresh token can be minted after rotation, not just that the stale one is gone. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(csrf): log rejection reason + rotation event for operability Two structured logs added so production issues are diagnosable: 1. verify_csrf_token now emits WARNING on every rejection with reason (no-session-token / no-submitted-token / token-mismatch), method, path, admin, and htmx flag. The user-facing detail stays generic ('CSRF validation failed') to avoid info disclosure — the log is the only place the specific failure mode lives. Caplog test locks the format in. 2. oauth_callback emits INFO on CSRF token rotation, tying the rotation to the user that just logged in. Useful for audit trails and login-flow debugging. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 821ee65 commit f14b16b

11 files changed

Lines changed: 628 additions & 44 deletions

File tree

src/squishmark/dependencies.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
"""FastAPI dependency injection utilities."""
22

3+
import logging
34
from typing import Annotated
45

5-
from fastapi import Depends, Request
6+
from fastapi import Depends, HTTPException, Request
67

78
from squishmark.config import Settings, get_settings
89

10+
logger = logging.getLogger(__name__)
11+
912
# Type alias for settings dependency
1013
SettingsDep = Annotated[Settings, Depends(get_settings)]
1114

@@ -19,3 +22,42 @@ def is_admin(request: Request) -> bool:
1922
if user is None:
2023
return False
2124
return user.get("login") in settings.admin_users_list
25+
26+
27+
def is_htmx(request: Request) -> bool:
28+
"""Return True when the request was made by HTMX."""
29+
return request.headers.get("HX-Request") == "true"
30+
31+
32+
async def get_current_admin(request: Request) -> str:
33+
"""
34+
Get the current admin user from session.
35+
36+
Raises HTTPException 401 if not authenticated.
37+
Raises HTTPException 403 if not an admin.
38+
39+
For HTMX requests, attaches an ``HX-Redirect`` header so the browser
40+
is redirected to the login page without any client JavaScript.
41+
"""
42+
settings = get_settings()
43+
44+
# Dev mode auth bypass (requires both flags)
45+
if settings.debug and settings.dev_skip_auth:
46+
logger.warning("Auth bypassed - returning dev-admin user")
47+
return "dev-admin"
48+
49+
htmx_headers = {"HX-Redirect": "/auth/login"} if is_htmx(request) else None
50+
51+
# Check for user in session (set by OAuth callback)
52+
user = request.session.get("user") if hasattr(request, "session") else None
53+
54+
if user is None:
55+
raise HTTPException(status_code=401, detail="Not authenticated", headers=htmx_headers)
56+
57+
if user["login"] not in settings.admin_users_list:
58+
raise HTTPException(status_code=403, detail="Not authorized", headers=htmx_headers)
59+
60+
return user["login"]
61+
62+
63+
AdminUser = Annotated[str, Depends(get_current_admin)]

src/squishmark/routers/admin.py

Lines changed: 32 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
from sqlalchemy.ext.asyncio import AsyncSession
1111

1212
from squishmark.config import get_settings
13+
from squishmark.dependencies import AdminUser, is_htmx
1314
from squishmark.models.content import Config
1415
from squishmark.models.db import Note, get_db_session
1516
from squishmark.services.analytics import AnalyticsService
1617
from squishmark.services.cache import get_cache
18+
from squishmark.services.csrf import get_or_create_csrf_token, verify_csrf_token
1719
from squishmark.services.github import get_github_service
1820
from squishmark.services.notes import NotesService
1921
from squishmark.services.theme import get_theme_engine, reset_theme_engine
@@ -23,11 +25,6 @@
2325
router = APIRouter(prefix="/admin", tags=["admin"])
2426

2527

26-
def is_htmx(request: Request) -> bool:
27-
"""Return True when the request was made by HTMX."""
28-
return request.headers.get("HX-Request") == "true"
29-
30-
3128
# Pydantic models for request/response
3229
class NoteCreate(BaseModel):
3330
"""Request body for creating a note."""
@@ -65,39 +62,12 @@ class CacheRefreshResponse(BaseModel):
6562
duration_ms: float
6663

6764

68-
# Dependency for getting the current admin user
69-
async def get_current_admin(request: Request) -> str:
70-
"""
71-
Get the current admin user from session.
72-
73-
Raises HTTPException 401 if not authenticated.
74-
Raises HTTPException 403 if not an admin.
75-
76-
For HTMX requests, attaches an ``HX-Redirect`` header so the browser
77-
is redirected to the login page without any client JavaScript.
78-
"""
79-
settings = get_settings()
80-
81-
# Dev mode auth bypass (requires both flags)
82-
if settings.debug and settings.dev_skip_auth:
83-
logger.warning("Auth bypassed - returning dev-admin user")
84-
return "dev-admin"
85-
86-
htmx_headers = {"HX-Redirect": "/auth/login"} if is_htmx(request) else None
87-
88-
# Check for user in session (set by OAuth callback)
89-
user = request.session.get("user") if hasattr(request, "session") else None
90-
91-
if user is None:
92-
raise HTTPException(status_code=401, detail="Not authenticated", headers=htmx_headers)
65+
class CSRFTokenResponse(BaseModel):
66+
"""Response body for the CSRF token endpoint."""
9367

94-
if user["login"] not in settings.admin_users_list:
95-
raise HTTPException(status_code=403, detail="Not authorized", headers=htmx_headers)
68+
csrf_token: str
9669

97-
return user["login"]
9870

99-
100-
AdminUser = Annotated[str, Depends(get_current_admin)]
10171
DbSession = Annotated[AsyncSession, Depends(get_db_session)]
10272

10373

@@ -214,6 +184,7 @@ async def admin_dashboard(
214184
cache = get_cache()
215185

216186
# Render admin template
187+
csrf_token = get_or_create_csrf_token(request)
217188
theme_engine = await get_theme_engine(github_service)
218189
try:
219190
html = await theme_engine.render_admin(
@@ -222,6 +193,7 @@ async def admin_dashboard(
222193
analytics=analytics,
223194
notes=[_to_note_response(n) for n in notes],
224195
cache_size=cache.size,
196+
csrf_token=csrf_token,
225197
)
226198
except Exception:
227199
# Fallback if admin template doesn't exist
@@ -248,6 +220,14 @@ async def admin_dashboard(
248220
return HTMLResponse(content=html)
249221

250222

223+
# CSRF token endpoint (for JSON API callers that can't scrape the meta tag)
224+
@router.get("/csrf")
225+
async def get_csrf(request: Request, admin: AdminUser) -> CSRFTokenResponse:
226+
"""Return the current CSRF token for use in subsequent mutation requests."""
227+
del admin # auth side-effect only
228+
return CSRFTokenResponse(csrf_token=get_or_create_csrf_token(request))
229+
230+
251231
# Analytics endpoints
252232
@router.get("/analytics")
253233
async def get_analytics(
@@ -273,7 +253,12 @@ async def list_notes(
273253
return [_to_note_response(n) for n in notes]
274254

275255

276-
@router.post("/notes", status_code=201, response_model=None)
256+
@router.post(
257+
"/notes",
258+
status_code=201,
259+
response_model=None,
260+
dependencies=[Depends(verify_csrf_token)],
261+
)
277262
async def create_note(
278263
request: Request,
279264
admin: AdminUser,
@@ -295,7 +280,11 @@ async def create_note(
295280
return response
296281

297282

298-
@router.put("/notes/{note_id}", response_model=None)
283+
@router.put(
284+
"/notes/{note_id}",
285+
response_model=None,
286+
dependencies=[Depends(verify_csrf_token)],
287+
)
299288
async def update_note(
300289
request: Request,
301290
admin: AdminUser,
@@ -320,7 +309,11 @@ async def update_note(
320309
return response
321310

322311

323-
@router.delete("/notes/{note_id}", response_model=None)
312+
@router.delete(
313+
"/notes/{note_id}",
314+
response_model=None,
315+
dependencies=[Depends(verify_csrf_token)],
316+
)
324317
async def delete_note(
325318
request: Request,
326319
admin: AdminUser,
@@ -371,7 +364,7 @@ async def view_note(
371364

372365

373366
# Cache management
374-
@router.post("/cache/refresh")
367+
@router.post("/cache/refresh", dependencies=[Depends(verify_csrf_token)])
375368
async def refresh_cache(
376369
admin: AdminUser,
377370
) -> CacheRefreshResponse:

src/squishmark/routers/auth.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
"""Authentication routes for GitHub OAuth."""
22

3+
import logging
34
from urllib.parse import urlencode
45

56
import httpx
67
from fastapi import APIRouter, HTTPException, Request
78
from fastapi.responses import RedirectResponse
89

910
from squishmark.config import get_settings
11+
from squishmark.services.csrf import SESSION_KEY as CSRF_SESSION_KEY
12+
13+
logger = logging.getLogger(__name__)
1014

1115
router = APIRouter(prefix="/auth", tags=["auth"])
1216

@@ -116,6 +120,9 @@ async def oauth_callback(
116120
"name": user_data.get("name"),
117121
"avatar_url": user_data.get("avatar_url"),
118122
}
123+
# Rotate CSRF token on login so a stale pre-auth token can't be replayed.
124+
request.session.pop(CSRF_SESSION_KEY, None)
125+
logger.info("CSRF token rotated on OAuth login (user=%s)", user_data["login"])
119126

120127
# Redirect to admin
121128
return RedirectResponse(url="/admin", status_code=302)

src/squishmark/services/csrf.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
"""CSRF token generation and verification for admin mutation endpoints.
2+
3+
Tokens are stored in the signed session cookie under ``SESSION_KEY`` and
4+
validated on POST/PUT/DELETE requests via the ``verify_csrf_token`` dependency.
5+
Clients send the token in the ``X-CSRF-Token`` header (HTMX, JSON API) or a
6+
``csrf_token`` form field (plain form fallback). JSON callers that cannot
7+
read the meta tag can fetch the current token from ``GET /admin/csrf``.
8+
"""
9+
10+
import logging
11+
import secrets
12+
from typing import Annotated
13+
14+
from fastapi import Depends, HTTPException, Request
15+
16+
from squishmark.config import get_settings
17+
from squishmark.dependencies import get_current_admin, is_htmx
18+
19+
logger = logging.getLogger(__name__)
20+
21+
SESSION_KEY = "csrf_token"
22+
HEADER_NAME = "X-CSRF-Token"
23+
FORM_FIELD = "csrf_token"
24+
25+
26+
def get_or_create_csrf_token(request: Request) -> str:
27+
"""Return the session's CSRF token, minting a new one if absent."""
28+
token = request.session.get(SESSION_KEY)
29+
if not token:
30+
token = secrets.token_urlsafe(32)
31+
request.session[SESSION_KEY] = token
32+
return token
33+
34+
35+
async def _extract_submitted_token(request: Request) -> str | None:
36+
"""Read the submitted CSRF token from header or form body.
37+
38+
For form requests we call ``request.form()``; Starlette caches the parsed
39+
form on the request instance, so the route handler's subsequent
40+
``request.form()`` call is served from cache and does not re-read the body.
41+
If Starlette ever stops caching, both paths still parse independently — but
42+
we'd duplicate work, not lose data.
43+
"""
44+
header_token = request.headers.get(HEADER_NAME)
45+
if header_token:
46+
return header_token
47+
48+
content_type = request.headers.get("content-type", "")
49+
if content_type.startswith(("application/x-www-form-urlencoded", "multipart/form-data")):
50+
form = await request.form()
51+
value = form.get(FORM_FIELD)
52+
if isinstance(value, str):
53+
return value
54+
return None
55+
56+
57+
async def verify_csrf_token(
58+
request: Request,
59+
admin: Annotated[str, Depends(get_current_admin)],
60+
) -> None:
61+
"""FastAPI dependency that rejects requests missing or with an invalid CSRF token.
62+
63+
Depends on :func:`get_current_admin` so authentication resolves first. This
64+
matters for unauthenticated HTMX requests: they get the 401 + ``HX-Redirect``
65+
response from auth instead of a misleading 403 for a CSRF token they had no
66+
way to obtain.
67+
68+
Skipped when ``debug`` and ``dev_skip_auth`` are both set, matching the
69+
auth-bypass behavior in ``get_current_admin``.
70+
"""
71+
settings = get_settings()
72+
if settings.debug and settings.dev_skip_auth:
73+
logger.warning("CSRF bypassed - dev_skip_auth is enabled")
74+
return
75+
76+
# Single user-facing error regardless of which check fails — distinguishing
77+
# "no session token" from "wrong submitted token" would tell an attacker
78+
# whether they got a session to bind to. Server-side log records the
79+
# specific reason so operators can diagnose without that side-channel.
80+
expected = request.session.get(SESSION_KEY) if hasattr(request, "session") else None
81+
submitted = await _extract_submitted_token(request)
82+
if not expected or not submitted or not secrets.compare_digest(submitted, expected):
83+
if not expected:
84+
reason = "no-session-token"
85+
elif not submitted:
86+
reason = "no-submitted-token"
87+
else:
88+
reason = "token-mismatch"
89+
logger.warning(
90+
"CSRF rejected: reason=%s method=%s path=%s admin=%s htmx=%s",
91+
reason,
92+
request.method,
93+
request.url.path,
94+
admin,
95+
is_htmx(request),
96+
)
97+
raise HTTPException(status_code=403, detail="CSRF validation failed")

0 commit comments

Comments
 (0)