Skip to content

Commit c1fe08c

Browse files
committed
fix: don't send log notification on transport error
When a transport yields an Exception into the server's read stream (e.g., POST handler catches an error, or JSON parsing fails), the server was attempting to send a notifications/message back to the client. This raced with the write stream being closed during session teardown, raising ClosedResourceError inside the handler's TaskGroup and crashing the session. The transport that just reported an error is typically broken or about to close. In the streamable HTTP case, the client has already received an HTTP 500 before the Exception is even yielded. The TypeScript, Go, and C# SDKs all handle transport errors by logging locally only, never writing back through the transport. This removes the send_log_message call added in #786, keeping the local logger.error and the raise_exceptions re-raise. A new integration test with real memory streams (no mocks, no sleeps) deterministically reproduces the exact #1967 stack trace on the old code and passes on the new code. Fixes #1967 Fixes #2064 Reported-by: jonfinerty Reported-by: einarfd Github-Issue: #1967 Github-Issue: #2064
1 parent 7ba41dc commit c1fe08c

File tree

2 files changed

+52
-30
lines changed

2 files changed

+52
-30
lines changed

src/mcp/server/lowlevel/server.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -414,11 +414,6 @@ async def _handle_message(
414414
)
415415
case Exception():
416416
logger.error(f"Received exception from stream: {message}")
417-
await session.send_log_message(
418-
level="error",
419-
data="Internal Server Error",
420-
logger="mcp.server.exception_handler",
421-
)
422417
if raise_exceptions:
423418
raise message
424419
case _:
Lines changed: 52 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,42 @@
11
from unittest.mock import AsyncMock, Mock
22

3+
import anyio
34
import pytest
45

56
from mcp import types
67
from mcp.server.lowlevel.server import Server
78
from mcp.server.session import ServerSession
9+
from mcp.shared.message import SessionMessage
810
from mcp.shared.session import RequestResponder
911

1012

1113
@pytest.mark.anyio
1214
async def test_exception_handling_with_raise_exceptions_true():
13-
"""Test that exceptions are re-raised when raise_exceptions=True"""
15+
"""Transport exceptions are re-raised when raise_exceptions=True."""
1416
server = Server("test-server")
1517
session = Mock(spec=ServerSession)
16-
session.send_log_message = AsyncMock()
1718

1819
test_exception = RuntimeError("Test error")
1920

2021
with pytest.raises(RuntimeError, match="Test error"):
2122
await server._handle_message(test_exception, session, {}, raise_exceptions=True)
2223

23-
session.send_log_message.assert_called_once()
24-
2524

2625
@pytest.mark.anyio
27-
@pytest.mark.parametrize(
28-
"exception_class,message",
29-
[
30-
(ValueError, "Test validation error"),
31-
(RuntimeError, "Test runtime error"),
32-
(KeyError, "Test key error"),
33-
(Exception, "Basic error"),
34-
],
35-
)
36-
async def test_exception_handling_with_raise_exceptions_false(exception_class: type[Exception], message: str):
37-
"""Test that exceptions are logged when raise_exceptions=False"""
26+
async def test_exception_handling_with_raise_exceptions_false():
27+
"""Transport exceptions are logged locally but not sent to the client.
28+
29+
The transport that reported the error is likely broken; writing back
30+
through it races with stream closure (#1967, #2064). The TypeScript,
31+
Go, and C# SDKs all log locally only.
32+
"""
3833
server = Server("test-server")
3934
session = Mock(spec=ServerSession)
4035
session.send_log_message = AsyncMock()
4136

42-
test_exception = exception_class(message)
43-
44-
await server._handle_message(test_exception, session, {}, raise_exceptions=False)
45-
46-
# Should send log message
47-
session.send_log_message.assert_called_once()
48-
call_args = session.send_log_message.call_args
37+
await server._handle_message(RuntimeError("Test error"), session, {}, raise_exceptions=False)
4938

50-
assert call_args.kwargs["level"] == "error"
51-
assert call_args.kwargs["data"] == "Internal Server Error"
52-
assert call_args.kwargs["logger"] == "mcp.server.exception_handler"
39+
session.send_log_message.assert_not_called()
5340

5441

5542
@pytest.mark.anyio
@@ -72,3 +59,43 @@ async def test_normal_message_handling_not_affected():
7259

7360
# Verify _handle_request was called
7461
server._handle_request.assert_called_once()
62+
63+
64+
@pytest.mark.anyio
65+
async def test_server_run_exits_cleanly_when_transport_yields_exception_then_closes():
66+
"""Regression test for #1967 / #2064.
67+
68+
Exercises the real Server.run() path with real memory streams, reproducing
69+
what happens in stateless streamable HTTP when a POST handler throws:
70+
71+
1. Transport yields an Exception into the read stream
72+
(streamable_http.py does this in its broad POST-handler except).
73+
2. Transport closes the read stream (terminate() in stateless mode).
74+
3. _receive_loop exits its `async with read_stream, write_stream:` block,
75+
closing the write stream.
76+
4. Meanwhile _handle_message(exc) was spawned via tg.start_soon and runs
77+
after the write stream is closed.
78+
79+
Before the fix, _handle_message tried to send_log_message through the
80+
closed write stream, raising ClosedResourceError inside the TaskGroup
81+
and crashing server.run(). After the fix, it only logs locally.
82+
"""
83+
server = Server("test-server")
84+
85+
read_send, read_recv = anyio.create_memory_object_stream[SessionMessage | Exception](10)
86+
write_send, write_recv = anyio.create_memory_object_stream[SessionMessage](10)
87+
88+
async def transport_side() -> None:
89+
# What the streamable HTTP transport does: push the exception, then close.
90+
await read_send.send(RuntimeError("simulated transport error"))
91+
await read_send.aclose()
92+
# Drain the write side so it doesn't back-pressure; it must see no messages.
93+
async with write_recv:
94+
async for _ in write_recv: # pragma: no cover
95+
pytest.fail("server must not write back when the transport reported an error")
96+
97+
with anyio.fail_after(5):
98+
async with anyio.create_task_group() as tg:
99+
tg.start_soon(transport_side)
100+
# stateless=True so server.run doesn't wait for initialize handshake
101+
await server.run(read_recv, write_send, server.create_initialization_options(), stateless=True)

0 commit comments

Comments
 (0)