-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathtest_query_param_token.py
More file actions
143 lines (117 loc) · 5.35 KB
/
Copy pathtest_query_param_token.py
File metadata and controls
143 lines (117 loc) · 5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""Tests for ?token=<JWT> query-param fallback in get_current_user (issue #336).
Browser EventSource (SSE) cannot send an Authorization header, so on the
ALLOWLISTED SSE routes only (``_QUERY_TOKEN_PATHS``), get_current_user falls
back to a ``token`` query parameter validated through the same JWT path.
Everywhere else, query-string credentials are rejected (codex review P2):
URLs land in proxy/access logs and browser history, so the fallback must not
become an API-wide authentication mechanism.
"""
from datetime import datetime, timedelta, timezone
import jwt as pyjwt
import pytest
from fastapi import FastAPI, Depends, Request
from fastapi.testclient import TestClient
from codeframe.auth import manager
from codeframe.auth.dependencies import get_current_user, get_current_user_optional
from codeframe.auth.manager import JWT_ALGORITHM, reset_auth_engine
from codeframe.platform_store.database import Database
pytestmark = pytest.mark.v2
# An allowlisted SSE path (matches _QUERY_TOKEN_PATHS in auth.dependencies).
SSE_PATH = "/api/v2/tasks/abc/stream"
# A path NOT on the allowlist — query tokens must be rejected here.
PLAIN_PATH = "/whoami"
def _make_token(user_id: int = 1, secret: str = None) -> str:
# Read manager.SECRET live, not at import. get_current_user verifies against
# the live global, and any test that starts the app via TestClient refreshes
# it from .env (lifespan -> refresh_secret). Binding at import would make
# these tokens unverifiable once that happens — an order-dependent flake.
if secret is None:
secret = manager.SECRET
payload = {
"sub": str(user_id),
"aud": ["fastapi-users:auth"],
"exp": datetime.now(timezone.utc) + timedelta(hours=1),
}
return pyjwt.encode(payload, secret, algorithm=JWT_ALGORITHM)
@pytest.fixture
def app_with_user(tmp_path, monkeypatch):
"""App with protected routes (one SSE-allowlisted, one plain) and a real
test user in the DB."""
db_path = tmp_path / "state.db"
monkeypatch.setenv("DATABASE_PATH", str(db_path))
reset_auth_engine()
db = Database(db_path)
db.initialize()
db.conn.execute(
"""
INSERT OR REPLACE INTO users (
id, email, name, hashed_password,
is_active, is_superuser, is_verified, email_verified
)
VALUES (1, 'test@example.com', 'Test User', '!DISABLED!', 1, 0, 1, 1)
"""
)
db.conn.commit()
db.close()
app = FastAPI()
@app.get(PLAIN_PATH)
async def whoami(user=Depends(get_current_user)):
return {"id": user.id}
# Mounted at the real SSE path so the allowlist matches.
@app.get("/api/v2/tasks/{task_id}/stream")
async def fake_stream(task_id: str, user=Depends(get_current_user)):
return {"id": user.id}
@app.get("/maybe")
async def maybe(request: Request):
user = await get_current_user_optional(request)
return {"id": user.id if user else None}
yield app
reset_auth_engine()
class TestQueryParamTokenOnSSERoutes:
def test_valid_query_token_authenticates_on_sse_path(self, app_with_user):
client = TestClient(app_with_user)
token = _make_token(1)
resp = client.get(f"{SSE_PATH}?token={token}")
assert resp.status_code == 200
assert resp.json()["id"] == 1
def test_missing_token_unauthorized_on_sse_path(self, app_with_user):
client = TestClient(app_with_user)
resp = client.get(SSE_PATH)
assert resp.status_code == 401
def test_invalid_query_token_unauthorized_on_sse_path(self, app_with_user):
client = TestClient(app_with_user)
resp = client.get(f"{SSE_PATH}?token=not-a-jwt")
assert resp.status_code == 401
def test_header_still_works_on_sse_path(self, app_with_user):
client = TestClient(app_with_user)
token = _make_token(1)
resp = client.get(SSE_PATH, headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
assert resp.json()["id"] == 1
class TestQueryParamTokenRejectedElsewhere:
def test_query_token_rejected_on_plain_route(self, app_with_user):
"""A valid JWT in the query string must NOT authenticate non-SSE
routes — query credentials are SSE-only (codex review P2)."""
client = TestClient(app_with_user)
token = _make_token(1)
resp = client.get(f"{PLAIN_PATH}?token={token}")
assert resp.status_code == 401
def test_header_works_on_plain_route(self, app_with_user):
client = TestClient(app_with_user)
token = _make_token(1)
resp = client.get(PLAIN_PATH, headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
assert resp.json()["id"] == 1
def test_optional_does_not_raise_on_invalid_query_token(self, app_with_user):
"""get_current_user_optional must keep swallowing failures."""
client = TestClient(app_with_user)
resp = client.get("/maybe?token=not-a-jwt")
assert resp.status_code == 200
assert resp.json()["id"] is None
def test_optional_ignores_query_token_on_plain_route(self, app_with_user):
"""Even a valid query token yields anonymous on non-SSE routes."""
client = TestClient(app_with_user)
token = _make_token(1)
resp = client.get(f"/maybe?token={token}")
assert resp.status_code == 200
assert resp.json()["id"] is None