Skip to content

Commit f84be25

Browse files
committed
fix(auth): update API token last_used for MCP Streamable HTTP requests
The MCP-specific auth handler (_StreamableHttpAuthHandler._auth_jwt) performs its own JWT verification via verify_credentials() but did not call _update_api_token_last_used_sync(), causing API token last_used timestamps to remain stale when accessing virtual servers via /servers/{id}/mcp. Additionally, auth_method was hardcoded to "jwt" regardless of token type, preventing TokenUsageMiddleware from recognising and logging API token requests on MCP transport paths. Changes: - Detect API tokens (auth_provider == "api_token") and legacy tokens (DB JTI lookup) in _auth_jwt() - Call _update_api_token_last_used_sync() for API tokens (rate-limited) - Set auth_method dynamically ("api_token" vs "jwt") - Propagate auth_method, jti, and user_email to ASGI scope state for TokenUsageMiddleware Signed-off-by: kimsehwan96 <sktpghks138@gmail.com>
1 parent 8577b4f commit f84be25

3 files changed

Lines changed: 389 additions & 2 deletions

File tree

mcpgateway/transports/streamablehttp_transport.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3642,12 +3642,51 @@ async def _auth_jwt(self, *, token: str) -> bool:
36423642
else:
36433643
_record_mcp_auth_cache_event("team_membership_cache_hit")
36443644

3645+
# Detect API token auth and update last_used timestamp.
3646+
# API tokens carry user.auth_provider == "api_token" in their JWT payload.
3647+
# Legacy tokens (created before auth_provider was added) are detected via DB lookup.
3648+
_nested_user = user_payload.get("user", {})
3649+
_auth_provider = _nested_user.get("auth_provider") if isinstance(_nested_user, dict) else None
3650+
_is_api_token = _auth_provider == "api_token"
3651+
3652+
if not _is_api_token and jti and _auth_provider is None:
3653+
# Legacy API token fallback: only when auth_provider is absent (pre-auth_provider tokens).
3654+
# Tokens with an explicit auth_provider (email, oauth, saml, etc.) are never legacy API tokens.
3655+
try:
3656+
# First-Party
3657+
from mcpgateway.auth import _is_api_token_jti_sync # pylint: disable=import-outside-toplevel
3658+
3659+
_is_api_token = await asyncio.to_thread(_is_api_token_jti_sync, jti)
3660+
except Exception:
3661+
pass # Best-effort detection; default to "jwt" auth_method
3662+
3663+
resolved_auth_method = "api_token" if _is_api_token else "jwt"
3664+
3665+
# Update last_used timestamp for API tokens (rate-limited internally)
3666+
if _is_api_token and jti:
3667+
try:
3668+
# First-Party
3669+
from mcpgateway.auth import _update_api_token_last_used_sync # pylint: disable=import-outside-toplevel
3670+
3671+
await asyncio.to_thread(_update_api_token_last_used_sync, jti)
3672+
except Exception:
3673+
logger.debug("Failed to update API token last_used in MCP auth for jti=...%s", jti[-8:] if jti else "")
3674+
3675+
# Propagate auth_method and jti into ASGI scope state so that
3676+
# TokenUsageMiddleware can recognise API token requests and log usage.
3677+
state = self.scope.setdefault("state", {})
3678+
state["auth_method"] = resolved_auth_method
3679+
if _is_api_token and jti:
3680+
state["jti"] = jti
3681+
if user_email:
3682+
state["user_email"] = user_email
3683+
36453684
auth_user_ctx: dict[str, Any] = {
36463685
"email": user_email,
36473686
"teams": final_teams,
36483687
"is_authenticated": True,
36493688
"is_admin": is_admin,
3650-
"auth_method": "jwt",
3689+
"auth_method": resolved_auth_method,
36513690
"permission_is_admin": db_user_is_admin or is_admin,
36523691
"token_use": token_use, # propagated for downstream RBAC (check_any_team)
36533692
}
@@ -3667,7 +3706,7 @@ async def _auth_jwt(self, *, token: str) -> bool:
36673706
final_teams,
36683707
user_email=user_email,
36693708
is_admin=bool(db_user_is_admin or is_admin),
3670-
auth_method="jwt",
3709+
auth_method=resolved_auth_method,
36713710
team_name=trace_team_name,
36723711
)
36733712
except HTTPException:

tests/playwright/security/test_mcp_transport_auth_matrix.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,80 @@ def test_websocket_auth_handshake_behavior(self):
170170

171171
assert isinstance(response, str)
172172
assert "Parse error" in response or "jsonrpc" in response
173+
174+
175+
class TestApiTokenLastUsedViaMCP:
176+
"""Verify API token last_used is updated when accessing virtual servers via MCP Streamable HTTP."""
177+
178+
@pytest.fixture(autouse=True)
179+
def _api_token(self, admin_api: APIRequestContext, playwright: Playwright):
180+
"""Create an API token via session JWT and expose its access_token and id."""
181+
# admin_api uses a session JWT, which CAN create tokens
182+
resp = admin_api.post("/tokens", data={"name": f"last-used-test-{uuid.uuid4().hex[:8]}", "expires_in_days": 1})
183+
if resp.status == 404:
184+
pytest.skip("/tokens endpoint unavailable")
185+
assert resp.status in (200, 201), f"Failed to create token: {resp.status} {resp.text()}"
186+
payload = resp.json()
187+
self._access_token = payload["access_token"]
188+
token_obj = payload.get("token", payload)
189+
self._token_id = token_obj.get("id") or token_obj.get("token_id")
190+
yield
191+
# cleanup: revoke the token
192+
with suppress(Exception):
193+
admin_api.delete(f"/tokens/{self._token_id}")
194+
195+
def test_mcp_streamable_http_updates_last_used(self, admin_api: APIRequestContext, playwright: Playwright, public_server_id: str):
196+
"""Accessing /servers/{id}/mcp with an API token should update last_used."""
197+
# 1. Check initial last_used (should be None for new token)
198+
detail = admin_api.get(f"/tokens/{self._token_id}")
199+
if detail.status == 404:
200+
pytest.skip("Token detail endpoint unavailable")
201+
initial_last_used = detail.json().get("last_used")
202+
203+
# 2. Make MCP Streamable HTTP request with the API token
204+
token_ctx = _api_context(playwright, self._access_token)
205+
try:
206+
mcp_resp = token_ctx.post(
207+
f"/servers/{public_server_id}/mcp",
208+
data={"jsonrpc": "2.0", "id": "1", "method": "initialize", "params": {"protocolVersion": "2025-03-26", "capabilities": {}, "clientInfo": {"name": "e2e-test", "version": "1.0.0"}}},
209+
headers={"Content-Type": "application/json", "Accept": "application/json, text/event-stream"},
210+
)
211+
finally:
212+
token_ctx.dispose()
213+
214+
if mcp_resp.status == 404:
215+
pytest.skip("Streamable HTTP endpoint unavailable")
216+
assert mcp_resp.status != 401, f"API token auth rejected: {mcp_resp.text()}"
217+
218+
# 3. Verify last_used was updated
219+
# Standard
220+
import time
221+
222+
time.sleep(2) # Allow async update to complete
223+
detail2 = admin_api.get(f"/tokens/{self._token_id}")
224+
updated_last_used = detail2.json().get("last_used")
225+
226+
assert updated_last_used is not None, f"last_used not updated after MCP access. Initial: {initial_last_used}, After: {updated_last_used}"
227+
228+
def test_mcp_request_records_token_usage_log(self, admin_api: APIRequestContext, playwright: Playwright, public_server_id: str):
229+
"""MCP requests with API tokens should appear in the token usage log."""
230+
token_ctx = _api_context(playwright, self._access_token)
231+
try:
232+
token_ctx.post(
233+
f"/servers/{public_server_id}/mcp",
234+
data={"jsonrpc": "2.0", "id": "2", "method": "ping", "params": {}},
235+
headers={"Content-Type": "application/json", "Accept": "application/json, text/event-stream"},
236+
)
237+
finally:
238+
token_ctx.dispose()
239+
240+
# Standard
241+
import time
242+
243+
time.sleep(2)
244+
usage_resp = admin_api.get(f"/tokens/{self._token_id}/usage")
245+
if usage_resp.status == 404:
246+
pytest.skip("Token usage endpoint unavailable")
247+
assert usage_resp.status == 200, f"Usage stats failed: {usage_resp.status}"
248+
total = usage_resp.json().get("total_requests", 0)
249+
assert total > 0, f"Token usage log should have entries after MCP access, got {total}"

0 commit comments

Comments
 (0)