Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion src/squishmark/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""FastAPI dependency injection utilities."""

import logging
from typing import Annotated

from fastapi import Depends, Request
from fastapi import Depends, HTTPException, Request

from squishmark.config import Settings, get_settings

logger = logging.getLogger(__name__)

# Type alias for settings dependency
SettingsDep = Annotated[Settings, Depends(get_settings)]

Expand All @@ -19,3 +22,42 @@ def is_admin(request: Request) -> bool:
if user is None:
return False
return user.get("login") in settings.admin_users_list


def is_htmx(request: Request) -> bool:
"""Return True when the request was made by HTMX."""
return request.headers.get("HX-Request") == "true"


async def get_current_admin(request: Request) -> str:
"""
Get the current admin user from session.

Raises HTTPException 401 if not authenticated.
Raises HTTPException 403 if not an admin.

For HTMX requests, attaches an ``HX-Redirect`` header so the browser
is redirected to the login page without any client JavaScript.
"""
settings = get_settings()

# Dev mode auth bypass (requires both flags)
if settings.debug and settings.dev_skip_auth:
logger.warning("Auth bypassed - returning dev-admin user")
return "dev-admin"

htmx_headers = {"HX-Redirect": "/auth/login"} if is_htmx(request) else None

# Check for user in session (set by OAuth callback)
user = request.session.get("user") if hasattr(request, "session") else None

if user is None:
raise HTTPException(status_code=401, detail="Not authenticated", headers=htmx_headers)

if user["login"] not in settings.admin_users_list:
raise HTTPException(status_code=403, detail="Not authorized", headers=htmx_headers)

return user["login"]


AdminUser = Annotated[str, Depends(get_current_admin)]
71 changes: 32 additions & 39 deletions src/squishmark/routers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
from sqlalchemy.ext.asyncio import AsyncSession

from squishmark.config import get_settings
from squishmark.dependencies import AdminUser, is_htmx
from squishmark.models.content import Config
from squishmark.models.db import Note, get_db_session
from squishmark.services.analytics import AnalyticsService
from squishmark.services.cache import get_cache
from squishmark.services.csrf import get_or_create_csrf_token, verify_csrf_token
from squishmark.services.github import get_github_service
from squishmark.services.notes import NotesService
from squishmark.services.theme import get_theme_engine, reset_theme_engine
Expand All @@ -23,11 +25,6 @@
router = APIRouter(prefix="/admin", tags=["admin"])


def is_htmx(request: Request) -> bool:
"""Return True when the request was made by HTMX."""
return request.headers.get("HX-Request") == "true"


# Pydantic models for request/response
class NoteCreate(BaseModel):
"""Request body for creating a note."""
Expand Down Expand Up @@ -65,39 +62,12 @@ class CacheRefreshResponse(BaseModel):
duration_ms: float


# Dependency for getting the current admin user
async def get_current_admin(request: Request) -> str:
"""
Get the current admin user from session.

Raises HTTPException 401 if not authenticated.
Raises HTTPException 403 if not an admin.

For HTMX requests, attaches an ``HX-Redirect`` header so the browser
is redirected to the login page without any client JavaScript.
"""
settings = get_settings()

# Dev mode auth bypass (requires both flags)
if settings.debug and settings.dev_skip_auth:
logger.warning("Auth bypassed - returning dev-admin user")
return "dev-admin"

htmx_headers = {"HX-Redirect": "/auth/login"} if is_htmx(request) else None

# Check for user in session (set by OAuth callback)
user = request.session.get("user") if hasattr(request, "session") else None

if user is None:
raise HTTPException(status_code=401, detail="Not authenticated", headers=htmx_headers)
class CSRFTokenResponse(BaseModel):
"""Response body for the CSRF token endpoint."""

if user["login"] not in settings.admin_users_list:
raise HTTPException(status_code=403, detail="Not authorized", headers=htmx_headers)
csrf_token: str

return user["login"]


AdminUser = Annotated[str, Depends(get_current_admin)]
DbSession = Annotated[AsyncSession, Depends(get_db_session)]


Expand Down Expand Up @@ -214,6 +184,7 @@ async def admin_dashboard(
cache = get_cache()

# Render admin template
csrf_token = get_or_create_csrf_token(request)
theme_engine = await get_theme_engine(github_service)
try:
html = await theme_engine.render_admin(
Expand All @@ -222,6 +193,7 @@ async def admin_dashboard(
analytics=analytics,
notes=[_to_note_response(n) for n in notes],
cache_size=cache.size,
csrf_token=csrf_token,
)
except Exception:
# Fallback if admin template doesn't exist
Expand All @@ -248,6 +220,14 @@ async def admin_dashboard(
return HTMLResponse(content=html)


# CSRF token endpoint (for JSON API callers that can't scrape the meta tag)
@router.get("/csrf")
async def get_csrf(request: Request, admin: AdminUser) -> CSRFTokenResponse:
"""Return the current CSRF token for use in subsequent mutation requests."""
del admin # auth side-effect only
return CSRFTokenResponse(csrf_token=get_or_create_csrf_token(request))


# Analytics endpoints
@router.get("/analytics")
async def get_analytics(
Expand All @@ -273,7 +253,12 @@ async def list_notes(
return [_to_note_response(n) for n in notes]


@router.post("/notes", status_code=201, response_model=None)
@router.post(
"/notes",
status_code=201,
response_model=None,
dependencies=[Depends(verify_csrf_token)],
)
Comment thread
x3ek marked this conversation as resolved.
async def create_note(
request: Request,
admin: AdminUser,
Expand All @@ -295,7 +280,11 @@ async def create_note(
return response


@router.put("/notes/{note_id}", response_model=None)
@router.put(
"/notes/{note_id}",
response_model=None,
dependencies=[Depends(verify_csrf_token)],
)
Comment thread
x3ek marked this conversation as resolved.
async def update_note(
request: Request,
admin: AdminUser,
Expand All @@ -320,7 +309,11 @@ async def update_note(
return response


@router.delete("/notes/{note_id}", response_model=None)
@router.delete(
"/notes/{note_id}",
response_model=None,
dependencies=[Depends(verify_csrf_token)],
)
Comment thread
x3ek marked this conversation as resolved.
async def delete_note(
request: Request,
admin: AdminUser,
Expand Down Expand Up @@ -371,7 +364,7 @@ async def view_note(


# Cache management
@router.post("/cache/refresh")
@router.post("/cache/refresh", dependencies=[Depends(verify_csrf_token)])
Comment thread
x3ek marked this conversation as resolved.
async def refresh_cache(
admin: AdminUser,
) -> CacheRefreshResponse:
Expand Down
7 changes: 7 additions & 0 deletions src/squishmark/routers/auth.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
"""Authentication routes for GitHub OAuth."""

import logging
from urllib.parse import urlencode

import httpx
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import RedirectResponse

from squishmark.config import get_settings
from squishmark.services.csrf import SESSION_KEY as CSRF_SESSION_KEY

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/auth", tags=["auth"])

Expand Down Expand Up @@ -116,6 +120,9 @@ async def oauth_callback(
"name": user_data.get("name"),
"avatar_url": user_data.get("avatar_url"),
}
# Rotate CSRF token on login so a stale pre-auth token can't be replayed.
request.session.pop(CSRF_SESSION_KEY, None)
logger.info("CSRF token rotated on OAuth login (user=%s)", user_data["login"])

# Redirect to admin
return RedirectResponse(url="/admin", status_code=302)
Expand Down
97 changes: 97 additions & 0 deletions src/squishmark/services/csrf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""CSRF token generation and verification for admin mutation endpoints.

Tokens are stored in the signed session cookie under ``SESSION_KEY`` and
validated on POST/PUT/DELETE requests via the ``verify_csrf_token`` dependency.
Clients send the token in the ``X-CSRF-Token`` header (HTMX, JSON API) or a
``csrf_token`` form field (plain form fallback). JSON callers that cannot
read the meta tag can fetch the current token from ``GET /admin/csrf``.
"""

import logging
import secrets
from typing import Annotated

from fastapi import Depends, HTTPException, Request

from squishmark.config import get_settings
from squishmark.dependencies import get_current_admin, is_htmx

logger = logging.getLogger(__name__)

SESSION_KEY = "csrf_token"
HEADER_NAME = "X-CSRF-Token"
FORM_FIELD = "csrf_token"


def get_or_create_csrf_token(request: Request) -> str:
"""Return the session's CSRF token, minting a new one if absent."""
token = request.session.get(SESSION_KEY)
if not token:
token = secrets.token_urlsafe(32)
request.session[SESSION_KEY] = token
return token


async def _extract_submitted_token(request: Request) -> str | None:
"""Read the submitted CSRF token from header or form body.

For form requests we call ``request.form()``; Starlette caches the parsed
form on the request instance, so the route handler's subsequent
``request.form()`` call is served from cache and does not re-read the body.
If Starlette ever stops caching, both paths still parse independently — but
we'd duplicate work, not lose data.
"""
header_token = request.headers.get(HEADER_NAME)
if header_token:
return header_token

content_type = request.headers.get("content-type", "")
if content_type.startswith(("application/x-www-form-urlencoded", "multipart/form-data")):
form = await request.form()
value = form.get(FORM_FIELD)
if isinstance(value, str):
return value
return None


async def verify_csrf_token(
request: Request,
admin: Annotated[str, Depends(get_current_admin)],
) -> None:
"""FastAPI dependency that rejects requests missing or with an invalid CSRF token.

Depends on :func:`get_current_admin` so authentication resolves first. This
matters for unauthenticated HTMX requests: they get the 401 + ``HX-Redirect``
response from auth instead of a misleading 403 for a CSRF token they had no
way to obtain.

Skipped when ``debug`` and ``dev_skip_auth`` are both set, matching the
auth-bypass behavior in ``get_current_admin``.
"""
settings = get_settings()
if settings.debug and settings.dev_skip_auth:
logger.warning("CSRF bypassed - dev_skip_auth is enabled")
return

# Single user-facing error regardless of which check fails — distinguishing
# "no session token" from "wrong submitted token" would tell an attacker
# whether they got a session to bind to. Server-side log records the
# specific reason so operators can diagnose without that side-channel.
expected = request.session.get(SESSION_KEY) if hasattr(request, "session") else None
submitted = await _extract_submitted_token(request)
if not expected or not submitted or not secrets.compare_digest(submitted, expected):
if not expected:
reason = "no-session-token"
elif not submitted:
reason = "no-submitted-token"
else:
reason = "token-mismatch"
logger.warning(
"CSRF rejected: reason=%s method=%s path=%s admin=%s htmx=%s",
reason,
request.method,
request.url.path,
admin,
is_htmx(request),
)
raise HTTPException(status_code=403, detail="CSRF validation failed")
Loading
Loading