Skip to content

Commit 02ed42b

Browse files
committed
fix: stateless HTTP task accumulation causing memory leak
Use request-scoped task groups for stateless requests instead of the manager's global task group. When a client disconnects while a tool call is in progress, the request-scoped task group is cancelled, preventing zombie tasks from accumulating. In production, this caused 449 leaked ServerSession objects, 66,086 orphaned coroutines, and ~150 MB of leaked heap. Upstream PR: modelcontextprotocol#2145 Github-Issue: modelcontextprotocol#1764
1 parent 7ba41dc commit 02ed42b

File tree

4 files changed

+104
-16
lines changed

4 files changed

+104
-16
lines changed

src/mcp/server/streamable_http.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -619,14 +619,14 @@ async def sse_writer(): # pragma: lax no cover
619619
# Then send the message to be processed by the server
620620
session_message = self._create_session_message(message, request, request_id, protocol_version)
621621
await writer.send(session_message)
622-
except Exception: # pragma: no cover
622+
except Exception: # pragma: lax no cover
623623
logger.exception("SSE response error")
624624
await sse_stream_writer.aclose()
625625
await self._clean_up_memory_streams(request_id)
626626
finally:
627627
await sse_stream_reader.aclose()
628628

629-
except Exception as err: # pragma: no cover
629+
except Exception as err: # pragma: lax no cover
630630
logger.exception("Error handling POST request")
631631
response = self._create_error_response(
632632
f"Error handling POST request: {err}",
@@ -809,7 +809,7 @@ async def _validate_request_headers(self, request: Request, send: Send) -> bool:
809809

810810
async def _validate_session(self, request: Request, send: Send) -> bool:
811811
"""Validate the session ID in the request."""
812-
if not self.mcp_session_id: # pragma: no cover
812+
if not self.mcp_session_id: # pragma: lax no cover
813813
# If we're not using session IDs, return True
814814
return True
815815

@@ -1019,7 +1019,7 @@ async def message_router():
10191019
)
10201020
except anyio.ClosedResourceError:
10211021
if self._terminated:
1022-
logger.debug("Read stream closed by client")
1022+
logger.debug("Read stream closed by client") # pragma: lax no cover
10231023
else:
10241024
logger.exception("Unexpected closure of read stream in message router")
10251025
except Exception: # pragma: lax no cover

src/mcp/server/streamable_http_manager.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,12 @@ async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> No
151151
await self._handle_stateful_request(scope, receive, send)
152152

153153
async def _handle_stateless_request(self, scope: Scope, receive: Receive, send: Send) -> None:
154-
"""Process request in stateless mode - creating a new transport for each request."""
154+
"""Process request in stateless mode - creating a new transport for each request.
155+
156+
Uses a request-scoped task group so the server task is automatically
157+
cancelled when the request completes, preventing task accumulation in
158+
the manager's global task group.
159+
"""
155160
logger.debug("Stateless mode: Creating new transport for this request")
156161
# No session ID needed in stateless mode
157162
http_transport = StreamableHTTPServerTransport(
@@ -173,18 +178,24 @@ async def run_stateless_server(*, task_status: TaskStatus[None] = anyio.TASK_STA
173178
self.app.create_initialization_options(),
174179
stateless=True,
175180
)
176-
except Exception: # pragma: no cover
181+
except Exception: # pragma: lax no cover
177182
logger.exception("Stateless session crashed")
178183

179-
# Assert task group is not None for type checking
180-
assert self._task_group is not None
181-
# Start the server task
182-
await self._task_group.start(run_stateless_server)
183-
184-
# Handle the HTTP request and return the response
185-
await http_transport.handle_request(scope, receive, send)
186-
187-
# Terminate the transport after the request is handled
184+
# Use a request-scoped task group instead of the global one.
185+
# This ensures the server task is cancelled when the request
186+
# finishes, preventing zombie tasks from accumulating.
187+
# See: https://github.com/modelcontextprotocol/python-sdk/issues/1764
188+
async with anyio.create_task_group() as request_tg:
189+
await request_tg.start(run_stateless_server)
190+
# Handle the HTTP request directly in the caller's context
191+
# (not as a child task) so execution flows back naturally.
192+
await http_transport.handle_request(scope, receive, send)
193+
# Cancel the request-scoped task group to stop the server task.
194+
request_tg.cancel_scope.cancel()
195+
196+
# Terminate after the task group exits — the server task is already
197+
# cancelled at this point, so this is just cleanup (sets _terminated
198+
# flag and closes any remaining streams).
188199
await http_transport.terminate()
189200

190201
async def _handle_stateful_request(self, scope: Scope, receive: Receive, send: Send) -> None:

src/mcp/shared/session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ async def _receive_loop(self) -> None:
423423
try:
424424
await stream.send(JSONRPCError(jsonrpc="2.0", id=id, error=error))
425425
await stream.aclose()
426-
except Exception: # pragma: no cover
426+
except Exception: # pragma: lax no cover
427427
# Stream might already be closed
428428
pass
429429
self._response_streams.clear()

tests/server/test_streamable_http_manager.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,83 @@ async def mock_receive():
269269
assert len(transport._request_streams) == 0, "Transport should have no active request streams"
270270

271271

272+
@pytest.mark.anyio
273+
async def test_stateless_requests_task_leak_on_client_disconnect():
274+
"""Test that stateless tasks don't leak when clients disconnect mid-request.
275+
276+
Regression test for https://github.com/modelcontextprotocol/python-sdk/issues/1764
277+
278+
Reproduces the production memory leak: a client sends a tool call, the tool
279+
handler takes some time, and the client disconnects before the response is
280+
delivered. The SSE response pipeline detects the disconnect but app.run()
281+
continues in the background. After the tool finishes, the response has
282+
nowhere to go, and app.run() blocks on ``async for message in
283+
session.incoming_messages`` forever — leaking the task in the global
284+
task group.
285+
286+
The test uses real Server.run() with a real tool handler, real SSE streaming
287+
via httpx.ASGITransport, and simulates client disconnect by cancelling the
288+
request task.
289+
"""
290+
from mcp.types import CallToolResult, TextContent
291+
292+
tool_started = anyio.Event()
293+
tool_gate = anyio.Event()
294+
295+
async def handle_call_tool(ctx: ServerRequestContext, params: Any) -> CallToolResult:
296+
tool_started.set()
297+
await tool_gate.wait()
298+
return CallToolResult(content=[TextContent(type="text", text="done")]) # pragma: no cover
299+
300+
app = Server(
301+
"test-stateless-leak",
302+
on_call_tool=handle_call_tool,
303+
)
304+
305+
host = "testserver"
306+
mcp_app = app.streamable_http_app(host=host, stateless_http=True)
307+
308+
async with (
309+
mcp_app.router.lifespan_context(mcp_app),
310+
httpx.ASGITransport(mcp_app) as transport,
311+
):
312+
session_manager = app._session_manager
313+
assert session_manager is not None
314+
315+
async def make_and_abandon_tool_call():
316+
async with httpx.AsyncClient(transport=transport, base_url=f"http://{host}", timeout=30.0) as http_client:
317+
async with Client(streamable_http_client(f"http://{host}/mcp", http_client=http_client)) as client:
318+
# Start tool call — this will block until tool completes
319+
# We'll cancel it from outside to simulate disconnect
320+
await client.call_tool("slow_tool", {})
321+
322+
num_requests = 3
323+
for _ in range(num_requests):
324+
async with anyio.create_task_group() as tg:
325+
tg.start_soon(make_and_abandon_tool_call)
326+
# Wait for the tool handler to actually start
327+
await tool_started.wait()
328+
tool_started = anyio.Event() # Reset for next iteration
329+
# Simulate client disconnect by cancelling the request
330+
tg.cancel_scope.cancel()
331+
332+
# Let the tool finish now (response has nowhere to go)
333+
tool_gate.set()
334+
tool_gate = anyio.Event() # Reset for next iteration
335+
336+
# Give tasks a chance to settle
337+
await anyio.sleep(0.1)
338+
339+
# Check for leaked tasks in the session manager's global task group
340+
await anyio.sleep(0.1)
341+
assert session_manager._task_group is not None
342+
leaked = len(session_manager._task_group._tasks) # type: ignore[attr-defined]
343+
344+
assert leaked == 0, (
345+
f"Expected 0 lingering tasks but found {leaked}. Stateless request tasks are leaking after client disconnect."
346+
)
347+
348+
272349
@pytest.mark.anyio
273350
async def test_unknown_session_id_returns_404(caplog: pytest.LogCaptureFixture):
274351
"""Test that requests with unknown session IDs return HTTP 404 per MCP spec."""

0 commit comments

Comments
 (0)