Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions src/mcp/server/lowlevel/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,16 +387,23 @@ async def run(
await stack.enter_async_context(task_support.run())

async with anyio.create_task_group() as tg:
async for message in session.incoming_messages:
logger.debug("Received message: %s", message)

tg.start_soon(
self._handle_message,
message,
session,
lifespan_context,
raise_exceptions,
)
try:
async for message in session.incoming_messages:
logger.debug("Received message: %s", message)

tg.start_soon(
self._handle_message,
message,
session,
lifespan_context,
raise_exceptions,
)
finally:
# Transport closed: cancel in-flight handlers. Without this the
# TG join waits for them, and when they eventually try to
# respond they hit a closed write stream (the session's
# _receive_loop closed it when the read stream ended).
tg.cancel_scope.cancel()

async def _handle_message(
self,
Expand Down Expand Up @@ -470,8 +477,14 @@ async def _handle_request(
except MCPError as err:
response = err.error
except anyio.get_cancelled_exc_class():
logger.info("Request %s cancelled - duplicate response suppressed", message.request_id)
return
if message.cancelled:
# Client sent CancelledNotification; responder.cancel() already
# sent an error response, so skip the duplicate.
logger.info("Request %s cancelled - duplicate response suppressed", message.request_id)
return
# Transport-close cancellation from the TG in run(); re-raise so the
# TG swallows its own cancellation.
raise
except Exception as err:
if raise_exceptions: # pragma: no cover
raise err
Expand Down
2 changes: 1 addition & 1 deletion src/mcp/shared/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def __exit__(
) -> None:
"""Exit the context manager, performing cleanup and notifying completion."""
try:
if self._completed: # pragma: no branch
if self._completed:
self._on_complete(self)
finally:
self._entered = False
Expand Down
80 changes: 80 additions & 0 deletions tests/server/test_cancel_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,19 @@
from mcp import Client
from mcp.server import Server, ServerRequestContext
from mcp.shared.exceptions import MCPError
from mcp.shared.message import SessionMessage
from mcp.types import (
LATEST_PROTOCOL_VERSION,
CallToolRequest,
CallToolRequestParams,
CallToolResult,
CancelledNotification,
CancelledNotificationParams,
ClientCapabilities,
Implementation,
InitializeRequestParams,
JSONRPCNotification,
JSONRPCRequest,
ListToolsResult,
PaginatedRequestParams,
TextContent,
Expand Down Expand Up @@ -90,3 +97,76 @@ async def first_request():
assert isinstance(content, TextContent)
assert content.text == "Call number: 2"
assert call_count == 2


@pytest.mark.anyio
async def test_server_cancels_in_flight_handlers_on_transport_close():
"""When the transport closes mid-request, server.run() must cancel in-flight
handlers rather than join on them.

Without the cancel, the task group waits for the handler, which then tries
to respond through a write stream that _receive_loop already closed,
raising ClosedResourceError and crashing server.run() with exit code 1.

This drives server.run() with raw memory streams because InMemoryTransport
wraps it in its own finally-cancel (_memory.py) which masks the bug.
"""
handler_started = anyio.Event()
handler_cancelled = anyio.Event()
server_run_returned = anyio.Event()

async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult:
handler_started.set()
try:
await anyio.sleep_forever()
finally:
handler_cancelled.set()
# unreachable: sleep_forever only exits via cancellation
raise AssertionError # pragma: no cover

server = Server("test", on_call_tool=handle_call_tool)

to_server, server_read = anyio.create_memory_object_stream[SessionMessage | Exception](10)
server_write, from_server = anyio.create_memory_object_stream[SessionMessage](10)

async def run_server():
await server.run(server_read, server_write, server.create_initialization_options())
server_run_returned.set()

init_req = JSONRPCRequest(
jsonrpc="2.0",
id=1,
method="initialize",
params=InitializeRequestParams(
protocol_version=LATEST_PROTOCOL_VERSION,
capabilities=ClientCapabilities(),
client_info=Implementation(name="test", version="1.0"),
).model_dump(by_alias=True, mode="json", exclude_none=True),
)
initialized = JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized")
call_req = JSONRPCRequest(
jsonrpc="2.0",
id=2,
method="tools/call",
params=CallToolRequestParams(name="slow", arguments={}).model_dump(by_alias=True, mode="json"),
)

with anyio.fail_after(5):
async with anyio.create_task_group() as tg, to_server, server_read, server_write, from_server:
tg.start_soon(run_server)

await to_server.send(SessionMessage(init_req))
await from_server.receive() # init response
await to_server.send(SessionMessage(initialized))
await to_server.send(SessionMessage(call_req))

await handler_started.wait()

# Close the server's input stream — this is what stdin EOF does.
# server.run()'s incoming_messages loop ends, finally-cancel fires,
# handler gets CancelledError, server.run() returns.
await to_server.aclose()

await server_run_returned.wait()

assert handler_cancelled.is_set()
Loading