Skip to content

Commit 6a0d404

Browse files
Egor EgerevEgor Egerev
authored andcommitted
fix: shield cleanup operations from cancel scope conflicts
Apply CancelScope(shield=True) to prevent RuntimeError when cleaning up HTTP MCP clients. This fixes the 'Attempted to exit cancel scope in a different task' error that occurs with asyncio. Changes: - session.py: Add _closing flag, shield cleanup in _receive_loop - streamable_http.py: Shield terminate_session and stream cleanup Related to: modelcontextprotocol#577 Based on: modelcontextprotocol#1817
1 parent 4a2d83a commit 6a0d404

File tree

2 files changed

+27
-15
lines changed

2 files changed

+27
-15
lines changed

src/mcp/client/streamable_http.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -563,9 +563,13 @@ def start_get_stream() -> None:
563563
try:
564564
yield (read_stream, write_stream, transport.get_session_id)
565565
finally:
566-
if transport.session_id and terminate_on_close:
567-
await transport.terminate_session(client)
566+
# Shield cleanup from cancellation to prevent cancel scope conflicts
567+
with anyio.CancelScope(shield=True):
568+
if transport.session_id and terminate_on_close:
569+
await transport.terminate_session(client)
568570
tg.cancel_scope.cancel()
569571
finally:
570-
await read_stream_writer.aclose()
571-
await write_stream.aclose()
572+
# Shield stream cleanup from cancellation
573+
with anyio.CancelScope(shield=True):
574+
await read_stream_writer.aclose()
575+
await write_stream.aclose()

src/mcp/shared/session.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ class BaseSession(
175175
_in_flight: dict[RequestId, RequestResponder[ReceiveRequestT, SendResultT]]
176176
_progress_callbacks: dict[RequestId, ProgressFnT]
177177
_response_routers: list[ResponseRouter]
178+
_closing: bool = False
178179

179180
def __init__(
180181
self,
@@ -242,6 +243,9 @@ async def send_request(
242243
243244
Do not use this method to emit notifications! Use send_notification() instead.
244245
"""
246+
if self._closing:
247+
raise McpError(ErrorData(code=CONNECTION_CLOSED, message="Connection closed"))
248+
245249
request_id = self._request_id
246250
self._request_id = request_id + 1
247251

@@ -287,7 +291,8 @@ async def send_request(
287291
return result_type.model_validate(response_or_error.result, by_name=False)
288292

289293
finally:
290-
self._response_streams.pop(request_id, None)
294+
if not self._closing:
295+
self._response_streams.pop(request_id, None)
291296
self._progress_callbacks.pop(request_id, None)
292297
await response_stream.aclose()
293298
await response_stream_reader.aclose()
@@ -421,15 +426,18 @@ async def _receive_loop(self) -> None:
421426
finally:
422427
# after the read stream is closed, we need to send errors
423428
# to any pending requests
424-
for id, stream in self._response_streams.items():
425-
error = ErrorData(code=CONNECTION_CLOSED, message="Connection closed")
426-
try:
427-
await stream.send(JSONRPCError(jsonrpc="2.0", id=id, error=error))
428-
await stream.aclose()
429-
except Exception: # pragma: no cover
430-
# Stream might already be closed
431-
pass
432-
self._response_streams.clear()
429+
# Shield cleanup from cancellation to prevent cancel scope conflicts
430+
self._closing = True
431+
with anyio.CancelScope(shield=True):
432+
for id, stream in self._response_streams.items():
433+
error = ErrorData(code=CONNECTION_CLOSED, message="Connection closed")
434+
try:
435+
await stream.send(JSONRPCError(jsonrpc="2.0", id=id, error=error))
436+
await stream.aclose()
437+
except Exception: # pragma: no cover
438+
# Stream might already be closed
439+
pass
440+
self._response_streams.clear()
433441

434442
def _normalize_request_id(self, response_id: RequestId) -> RequestId:
435443
"""Normalize a response ID to match how request IDs are stored.
@@ -481,7 +489,7 @@ async def _handle_response(self, message: SessionMessage) -> None:
481489
return # Handled
482490

483491
# Fall back to normal response streams
484-
stream = self._response_streams.pop(response_id, None)
492+
stream = self._response_streams.pop(response_id, None) if not self._closing else None
485493
if stream:
486494
await stream.send(message.message)
487495
else:

0 commit comments

Comments
 (0)