Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion fastapi_mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,29 @@ def mount_http(
"""
),
] = "/mcp",
stateless: Annotated[
bool,
Doc(
"""
Run the streamable HTTP transport in stateless mode.

In stateless mode a fresh ``StreamableHTTPServerTransport`` is created for each
request — no session tracking, no ``Mcp-Session-Id`` header, no resumability.
This is required for multi-replica deployments behind a load balancer that
doesn't do sticky-session routing on ``Mcp-Session-Id``, and is a good fit
for request/response-only MCP servers (e.g. a FastAPI app exposing REST
handlers as tools).

Stateful mode (the default) preserves backwards-compatible behavior: sessions
are tracked in the process's memory and features that rely on per-client
state — progress notifications, resumability via an event store, and
server-initiated notifications — continue to work.

See the MCP spec and the python-sdk
``StreamableHTTPSessionManager`` docs for the full trade-off.
"""
),
] = False,
) -> None:
"""
Mount the MCP server with HTTP transport to **any** FastAPI app or APIRouter.
Expand All @@ -348,7 +371,7 @@ def mount_http(

assert isinstance(router, (FastAPI, APIRouter)), f"Invalid router type: {type(router)}"

http_transport = FastApiHttpSessionManager(mcp_server=self.server)
http_transport = FastApiHttpSessionManager(mcp_server=self.server, stateless=stateless)
dependencies = self._auth_config.dependencies if self._auth_config else None

self._register_mcp_endpoints_http(router, http_transport, mount_path, dependencies)
Expand Down
14 changes: 9 additions & 5 deletions fastapi_mcp/transport/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ def __init__(
mcp_server: Server,
event_store: EventStore | None = None,
json_response: bool = True, # Default to JSON for HTTP transport
stateless: bool = False,
security_settings: TransportSecuritySettings | None = None,
):
self.mcp_server = mcp_server
self.event_store = event_store
self.json_response = json_response
self.stateless = stateless
self.security_settings = security_settings
self._session_manager: StreamableHTTPSessionManager | None = None
self._manager_task: asyncio.Task | None = None
Expand All @@ -46,14 +48,16 @@ async def _ensure_session_manager_started(self) -> None:

logger.debug("Starting StreamableHTTP session manager")

# Create the session manager
# Note: We don't use stateless=True because we want to support sessions
# but sessions are optional as per the MCP spec
# Create the session manager.
# Stateful mode (default) tracks sessions keyed on the Mcp-Session-Id
# header; stateless mode spins up a fresh transport per request, which
# is required for multi-replica deployments without sticky sessions
# and for simple request/response-only MCP servers.
self._session_manager = StreamableHTTPSessionManager(
app=self.mcp_server,
event_store=self.event_store,
event_store=self.event_store if not self.stateless else None,
json_response=self.json_response,
stateless=False, # Always support sessions, but they're optional
stateless=self.stateless,
security_settings=self.security_settings,
)

Expand Down
224 changes: 224 additions & 0 deletions tests/test_http_real_transport_stateless.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
"""Integration tests for streamable HTTP transport in stateless mode.

Stateless mode (``mount_http(stateless=True)``) is required for multi-replica
deployments behind a load balancer: without sticky-session routing on the
``Mcp-Session-Id`` header, stateful sessions break the moment a follow-up
request lands on a different replica.

These tests mirror the stateful tests in ``test_http_real_transport.py`` but
mount the server with ``stateless=True`` and assert the stateless invariants:

1. ``initialize`` responses must NOT carry an ``mcp-session-id`` header.
2. ``tools/list`` and ``tools/call`` must succeed without prior ``initialize``
and without any session header plumbing.
"""

import atexit
import multiprocessing
import os
import signal
import socket
import sys
import threading
import time
from typing import AsyncGenerator, Generator

import coverage
import httpx
import mcp.types as types
import pytest
import uvicorn
from fastapi import FastAPI

from fastapi_mcp import FastApiMCP


HOST = "127.0.0.1"
SERVER_NAME = "Test MCP Server (stateless)"


def run_stateless_server(server_port: int, fastapi_app: FastAPI) -> None:
# Initialize coverage for subprocesses
cov = None
if "COVERAGE_PROCESS_START" in os.environ:
cov = coverage.Coverage(source=["fastapi_mcp"])
cov.start()

def cleanup():
if cov:
cov.stop()
cov.save()

atexit.register(cleanup)

def handle_signal(signum, frame):
cleanup()
sys.exit(0)

signal.signal(signal.SIGTERM, handle_signal)

def periodic_save():
while True:
time.sleep(1.0)
if cov:
cov.save()

save_thread = threading.Thread(target=periodic_save)
save_thread.daemon = True
save_thread.start()

# Configure the server in stateless mode
mcp = FastApiMCP(
fastapi_app,
name=SERVER_NAME,
description="Test description",
)
mcp.mount_http(stateless=True)

server = uvicorn.Server(config=uvicorn.Config(app=fastapi_app, host=HOST, port=server_port, log_level="error"))
server.run()

while not server.started:
time.sleep(0.5)

if cov:
cov.stop()
cov.save()


@pytest.fixture(params=["simple_fastapi_app", "simple_fastapi_app_with_root_path"])
def stateless_server(request: pytest.FixtureRequest) -> Generator[str, None, None]:
coverage_rc = os.path.abspath(".coveragerc")
os.environ["COVERAGE_PROCESS_START"] = coverage_rc

with socket.socket() as s:
s.bind((HOST, 0))
server_port = s.getsockname()[1]

ctx = multiprocessing.get_context("fork")

fastapi_app = request.getfixturevalue(request.param)
proc = ctx.Process(
target=run_stateless_server,
kwargs={"server_port": server_port, "fastapi_app": fastapi_app},
daemon=True,
)
proc.start()

max_attempts = 20
attempt = 0
while attempt < max_attempts:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, server_port))
break
except ConnectionRefusedError:
time.sleep(0.1)
attempt += 1
else:
raise RuntimeError(f"Server failed to start after {max_attempts} attempts")

yield f"http://{HOST}:{server_port}{fastapi_app.root_path}"

try:
proc.terminate()
proc.join(timeout=2)
except (OSError, AttributeError):
pass

if proc.is_alive():
proc.kill()
proc.join(timeout=2)
if proc.is_alive():
raise RuntimeError("server process failed to terminate")


@pytest.fixture()
async def stateless_http_client(stateless_server: str) -> AsyncGenerator[httpx.AsyncClient, None]:
async with httpx.AsyncClient(base_url=stateless_server) as client:
yield client


@pytest.mark.anyio
async def test_stateless_initialize_has_no_session_header(
stateless_http_client: httpx.AsyncClient, stateless_server: str
) -> None:
"""Stateless initialize must not return an mcp-session-id header."""
response = await stateless_http_client.post(
"/mcp",
json={
"jsonrpc": "2.0",
"method": "initialize",
"id": 1,
"params": {
"protocolVersion": types.LATEST_PROTOCOL_VERSION,
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "1.0.0"},
},
},
headers={"Accept": "application/json, text/event-stream", "Content-Type": "application/json"},
)

assert response.status_code == 200
result = response.json()
assert result["result"]["serverInfo"]["name"] == SERVER_NAME

# The invariant: stateless transport must not emit a session id.
assert "mcp-session-id" not in {k.lower() for k in response.headers.keys()}, (
f"Stateless transport leaked mcp-session-id header: {dict(response.headers)}"
)


@pytest.mark.anyio
async def test_stateless_tools_list_without_session(
stateless_http_client: httpx.AsyncClient, stateless_server: str
) -> None:
"""tools/list must succeed in stateless mode without prior initialize or session header."""
response = await stateless_http_client.post(
"/mcp",
json={
"jsonrpc": "2.0",
"method": "tools/list",
"id": 2,
},
headers={"Accept": "application/json, text/event-stream", "Content-Type": "application/json"},
)

assert response.status_code == 200, response.text
result = response.json()
assert result["jsonrpc"] == "2.0"
assert result["id"] == 2
assert "error" not in result, f"tools/list returned an error: {result}"
assert "tools" in result["result"]
tool_names = [tool["name"] for tool in result["result"]["tools"]]
assert "get_item" in tool_names
assert "list_items" in tool_names


@pytest.mark.anyio
async def test_stateless_call_tool_without_session(
stateless_http_client: httpx.AsyncClient, stateless_server: str
) -> None:
"""tools/call must succeed in stateless mode without prior initialize or session header."""
response = await stateless_http_client.post(
"/mcp",
json={
"jsonrpc": "2.0",
"method": "tools/call",
"id": 3,
"params": {
"name": "get_item",
"arguments": {"item_id": 1},
},
},
headers={"Accept": "application/json, text/event-stream", "Content-Type": "application/json"},
)

assert response.status_code == 200, response.text
result = response.json()
assert result["jsonrpc"] == "2.0"
assert result["id"] == 3
assert result["result"]["isError"] is False
content = result["result"]["content"][0]
assert content["type"] == "text"
assert "Item 1" in content["text"]