Skip to content

Commit 3d4f595

Browse files
committed
fix: stateless HTTP task leak and graceful SSE drain on shutdown
Request-scoped task groups for stateless requests prevent zombie task accumulation when clients disconnect mid-request. Graceful drain on shutdown terminates all active transports before cancelling the task group, allowing SSE responses to close cleanly. Backported to v1.26.0 for compatibility with dbt-mcp and ai-codegen-api. Upstream PR: modelcontextprotocol#2145
1 parent 3d9d345 commit 3d4f595

File tree

1 file changed

+38
-8
lines changed

1 file changed

+38
-8
lines changed

src/mcp/server/streamable_http_manager.py

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ def __init__(
7777
self._session_creation_lock = anyio.Lock()
7878
self._server_instances: dict[str, StreamableHTTPServerTransport] = {}
7979

80+
# Track in-flight stateless transports for graceful shutdown
81+
self._stateless_transports: set[StreamableHTTPServerTransport] = set()
82+
8083
# The task group will be set during lifespan
8184
self._task_group = None
8285
# Thread-safe tracking of run() calls
@@ -118,11 +121,34 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
118121
yield # Let the application run
119122
finally:
120123
logger.info("StreamableHTTP session manager shutting down")
124+
125+
# Terminate all active transports before cancelling the task
126+
# group. This closes their in-memory streams, which lets
127+
# EventSourceResponse send a final ``more_body=False`` chunk
128+
# — a clean HTTP close instead of a connection reset.
129+
for transport in list(self._server_instances.values()):
130+
try:
131+
await transport.terminate()
132+
except Exception: # pragma: no cover
133+
logger.debug(
134+
"Error terminating transport during shutdown",
135+
exc_info=True,
136+
)
137+
for transport in list(self._stateless_transports):
138+
try:
139+
await transport.terminate()
140+
except Exception: # pragma: no cover
141+
logger.debug(
142+
"Error terminating stateless transport during shutdown",
143+
exc_info=True,
144+
)
145+
121146
# Cancel task group to stop all spawned tasks
122147
tg.cancel_scope.cancel()
123148
self._task_group = None
124149
# Clear any remaining server instances
125150
self._server_instances.clear()
151+
self._stateless_transports.clear()
126152

127153
async def handle_request(
128154
self,
@@ -172,7 +198,11 @@ async def _handle_stateless_request(
172198
security_settings=self.security_settings,
173199
)
174200

175-
# Start server in a new task
201+
# Track for graceful shutdown
202+
self._stateless_transports.add(http_transport)
203+
204+
# Start server in a request-scoped task group so that disconnected
205+
# clients don't leak tasks in the manager's long-lived task group.
176206
async def run_stateless_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORED):
177207
async with http_transport.connect() as streams:
178208
read_stream, write_stream = streams
@@ -187,13 +217,13 @@ async def run_stateless_server(*, task_status: TaskStatus[None] = anyio.TASK_STA
187217
except Exception: # pragma: no cover
188218
logger.exception("Stateless session crashed")
189219

190-
# Assert task group is not None for type checking
191-
assert self._task_group is not None
192-
# Start the server task
193-
await self._task_group.start(run_stateless_server)
194-
195-
# Handle the HTTP request and return the response
196-
await http_transport.handle_request(scope, receive, send)
220+
try:
221+
async with anyio.create_task_group() as request_tg:
222+
await request_tg.start(run_stateless_server)
223+
await http_transport.handle_request(scope, receive, send)
224+
request_tg.cancel_scope.cancel()
225+
finally:
226+
self._stateless_transports.discard(http_transport)
197227

198228
# Terminate the transport after the request is handled
199229
await http_transport.terminate()

0 commit comments

Comments
 (0)