Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions fastapi_mcp/transport/http.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import logging
import asyncio
from typing import Any, List, MutableMapping, Tuple

from fastapi import Request, Response, HTTPException
from mcp.server.lowlevel.server import Server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager, EventStore
from mcp.server.transport_security import TransportSecuritySettings

ACCEPT_JSON = b"application/json"
ACCEPT_SSE = b"text/event-stream"

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -77,6 +81,39 @@ async def run_session_manager():
# Give the session manager a moment to initialize
await asyncio.sleep(0.1)

def _ensure_accept_headers(self, scope: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""
Ensure the ASGI scope's Accept header includes text/event-stream.

The upstream MCP StreamableHTTPSessionManager requires both application/json
and text/event-stream in the Accept header, even when json_response=True.
Clients that only send Accept: application/json (e.g., Google Gemini CLI)
would be rejected. This method adds text/event-stream to the Accept header
when it is missing, so standard HTTP JSON-RPC clients work out of the box.
"""
raw_headers: List[Tuple[bytes, bytes]] = scope.get("headers", [])
accept_idx = None

for i, (name, value) in enumerate(raw_headers):
if name.lower() == b"accept":
accept_types = [t.strip() for t in value.split(b",")]
if any(t.startswith(ACCEPT_SSE) for t in accept_types):
return scope
accept_idx = i
break

# Build a new scope with the updated Accept header (immutable approach)
new_headers = list(raw_headers)
if accept_idx is not None:
existing_value = new_headers[accept_idx][1]
new_headers[accept_idx] = (b"accept", existing_value + b", " + ACCEPT_SSE)
else:
new_headers.append((b"accept", ACCEPT_JSON + b", " + ACCEPT_SSE))

new_scope = dict(scope)
new_scope["headers"] = new_headers
return new_scope

async def handle_fastapi_request(self, request: Request) -> Response:
"""
Handle a FastAPI request by delegating to the session manager.
Expand All @@ -92,13 +129,18 @@ async def handle_fastapi_request(self, request: Request) -> Response:

logger.debug(f"Handling FastAPI request: {request.method} {request.url.path}")

# Ensure Accept header includes text/event-stream for upstream compatibility.
# The upstream MCP library requires both application/json and text/event-stream,
# but standard HTTP JSON-RPC clients only send application/json.
scope = self._ensure_accept_headers(request.scope)

# Capture the response from the session manager
response_started = False
response_status = 200
response_headers = []
response_headers: List[Tuple[bytes, bytes]] = []
response_body = b""

async def send_callback(message):
async def send_callback(message: MutableMapping[str, Any]) -> None:
nonlocal response_started, response_status, response_headers, response_body

if message["type"] == "http.response.start":
Expand All @@ -110,7 +152,7 @@ async def send_callback(message):

try:
# Delegate to the session manager's handle_request method
await self._session_manager.handle_request(request.scope, request.receive, send_callback)
await self._session_manager.handle_request(scope, request.receive, send_callback)

# Convert the captured ASGI response to a FastAPI Response
headers_dict = {name.decode(): value.decode() for name, value in response_headers}
Expand Down
206 changes: 206 additions & 0 deletions tests/test_http_real_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,3 +476,209 @@ async def test_http_notification_handling(http_client: httpx.AsyncClient, server
assert response.status_code == 202
# Notifications should return empty body
assert response.content == b"" or response.text == "null"


@pytest.mark.anyio
async def test_http_accept_json_only(http_client: httpx.AsyncClient, server: str) -> None:
"""Test that HTTP transport accepts requests with only Accept: application/json.

Standard HTTP JSON-RPC clients (e.g., Google Gemini CLI) only send
Accept: application/json. The HTTP transport should handle this
without returning a 406 Not Acceptable error.

See: https://github.com/tadata-org/fastapi_mcp/issues/233
"""
mcp_path = "/mcp"

# Send request with only application/json Accept header (no text/event-stream)
response = await http_client.post(
mcp_path,
json={
"jsonrpc": "2.0",
"method": "initialize",
"id": 1,
"params": {
"protocolVersion": types.LATEST_PROTOCOL_VERSION,
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "1.0.0"},
},
},
headers={"Accept": "application/json", "Content-Type": "application/json"},
)

assert response.status_code == 200, (
f"Expected 200 but got {response.status_code}. "
f"HTTP transport should accept requests with only Accept: application/json. "
f"Response: {response.text}"
)

result = response.json()
assert result["jsonrpc"] == "2.0"
assert result["id"] == 1
assert "result" in result
assert result["result"]["serverInfo"]["name"] == SERVER_NAME


@pytest.mark.anyio
async def test_http_accept_json_only_then_list_tools(http_client: httpx.AsyncClient, server: str) -> None:
"""Test a full session (initialize + list tools) using only Accept: application/json.

Verifies that clients sending only application/json can complete a full
MCP session lifecycle without hitting 406 errors.
"""
mcp_path = "/mcp"

# Initialize with only application/json
init_response = await http_client.post(
mcp_path,
json={
"jsonrpc": "2.0",
"method": "initialize",
"id": 1,
"params": {
"protocolVersion": types.LATEST_PROTOCOL_VERSION,
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "1.0.0"},
},
},
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
assert init_response.status_code == 200

session_id = init_response.headers.get("mcp-session-id")
assert session_id is not None

# Send initialized notification with only application/json
notif_response = await http_client.post(
mcp_path,
json={
"jsonrpc": "2.0",
"method": "notifications/initialized",
},
headers={
"Accept": "application/json",
"Content-Type": "application/json",
"mcp-session-id": session_id,
},
)
assert notif_response.status_code == 202

# List tools with only application/json
tools_response = await http_client.post(
mcp_path,
json={
"jsonrpc": "2.0",
"method": "tools/list",
"id": 2,
},
headers={
"Accept": "application/json",
"Content-Type": "application/json",
"mcp-session-id": session_id,
},
)
assert tools_response.status_code == 200
result = tools_response.json()
assert "result" in result
assert "tools" in result["result"]
assert len(result["result"]["tools"]) > 0


class TestEnsureAcceptHeaders:
"""Unit tests for FastApiHttpSessionManager._ensure_accept_headers."""

def _make_manager(self): # type: ignore[no-untyped-def]
from fastapi_mcp.transport.http import FastApiHttpSessionManager
from unittest.mock import MagicMock

mock_server = MagicMock()
return FastApiHttpSessionManager(mcp_server=mock_server)

def test_already_has_sse_header(self) -> None:
"""When text/event-stream is already present, scope is returned unchanged."""
manager = self._make_manager()
scope = {
"headers": [
(b"accept", b"application/json, text/event-stream"),
(b"content-type", b"application/json"),
]
}
result = manager._ensure_accept_headers(scope)
assert result is scope # Same object, no copy needed

def test_json_only_gets_sse_appended(self) -> None:
"""When only application/json is present, text/event-stream is appended."""
manager = self._make_manager()
scope = {
"headers": [
(b"accept", b"application/json"),
(b"content-type", b"application/json"),
]
}
result = manager._ensure_accept_headers(scope)
assert result is not scope # New scope created

accept_header = None
for name, value in result["headers"]:
if name == b"accept":
accept_header = value
break

assert accept_header is not None
assert b"application/json" in accept_header
assert b"text/event-stream" in accept_header

def test_no_accept_header_adds_both(self) -> None:
"""When no Accept header exists, both types are added."""
manager = self._make_manager()
scope = {
"headers": [
(b"content-type", b"application/json"),
]
}
result = manager._ensure_accept_headers(scope)
assert result is not scope

accept_header = None
for name, value in result["headers"]:
if name == b"accept":
accept_header = value
break

assert accept_header is not None
assert b"application/json" in accept_header
assert b"text/event-stream" in accept_header

def test_wildcard_accept_gets_sse_appended(self) -> None:
"""When Accept is */* (no explicit text/event-stream), SSE is appended."""
manager = self._make_manager()
scope = {
"headers": [
(b"accept", b"*/*"),
]
}
result = manager._ensure_accept_headers(scope)
assert result is not scope

accept_header = None
for name, value in result["headers"]:
if name == b"accept":
accept_header = value
break

assert accept_header is not None
assert b"text/event-stream" in accept_header

def test_original_scope_not_mutated(self) -> None:
"""Ensure the original scope's headers list is not mutated."""
manager = self._make_manager()
original_headers = [
(b"accept", b"application/json"),
(b"content-type", b"application/json"),
]
scope = {"headers": original_headers}
manager._ensure_accept_headers(scope)

# Original headers should be untouched
assert len(original_headers) == 2
assert original_headers[0] == (b"accept", b"application/json")