-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy pathtest_notification_response.py
More file actions
185 lines (130 loc) · 7.83 KB
/
test_notification_response.py
File metadata and controls
185 lines (130 loc) · 7.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""Tests for StreamableHTTP client transport with non-SDK servers.
These tests verify client behavior when interacting with servers
that don't follow SDK conventions.
"""
import json
import httpx
import pytest
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.routing import Route
from mcp import ClientSession, MCPError, types
from mcp.client.streamable_http import streamable_http_client
from mcp.shared.session import RequestResponder
from mcp.types import RootsListChangedNotification
pytestmark = pytest.mark.anyio
INIT_RESPONSE = {
"serverInfo": {"name": "test-non-sdk-server", "version": "1.0.0"},
"protocolVersion": "2024-11-05",
"capabilities": {},
}
def _init_json_response(data: dict[str, object]) -> JSONResponse:
return JSONResponse({"jsonrpc": "2.0", "id": data["id"], "result": INIT_RESPONSE})
def _create_non_sdk_server_app() -> Starlette:
"""Create a minimal server that doesn't follow SDK conventions."""
async def handle_mcp_request(request: Request) -> Response:
body = await request.body()
data = json.loads(body)
if data.get("method") == "initialize":
return _init_json_response(data)
# For notifications, return 204 No Content (non-SDK behavior)
if "id" not in data:
return Response(status_code=204, headers={"Content-Type": "application/json"})
return JSONResponse( # pragma: no cover
{"jsonrpc": "2.0", "id": data.get("id"), "error": {"code": -32601, "message": "Method not found"}}
)
return Starlette(debug=True, routes=[Route("/mcp", handle_mcp_request, methods=["POST"])])
def _create_unexpected_content_type_app() -> Starlette:
"""Create a server that returns an unexpected content type for requests."""
async def handle_mcp_request(request: Request) -> Response:
body = await request.body()
data = json.loads(body)
if data.get("method") == "initialize":
return _init_json_response(data)
if "id" not in data:
return Response(status_code=202)
# Return text/plain for all other requests — an unexpected content type.
return Response(content="this is plain text, not json or sse", status_code=200, media_type="text/plain")
return Starlette(debug=True, routes=[Route("/mcp", handle_mcp_request, methods=["POST"])])
async def test_non_compliant_notification_response() -> None:
"""Verify the client ignores unexpected responses to notifications.
The spec states notifications should get either 202 + no response body, or 4xx + optional error body
(https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#sending-messages-to-the-server),
but some servers wrongly return other 2xx codes (e.g. 204). For now we simply ignore unexpected responses
(aligning behaviour w/ the TS SDK).
"""
returned_exception = None
async def message_handler( # pragma: no cover
message: RequestResponder[types.ServerRequest, types.ClientResult] | types.ServerNotification | Exception,
) -> None:
nonlocal returned_exception
if isinstance(message, Exception):
returned_exception = message
async with httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_non_sdk_server_app())) as client:
async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session:
await session.initialize()
# The test server returns a 204 instead of the expected 202
await session.send_notification(RootsListChangedNotification(method="notifications/roots/list_changed"))
if returned_exception: # pragma: no cover
pytest.fail(f"Server encountered an exception: {returned_exception}")
async def test_unexpected_content_type_sends_jsonrpc_error() -> None:
"""Verify unexpected content types unblock the pending request with an MCPError.
When a server returns a content type that is neither application/json nor text/event-stream,
the client should send a JSONRPCError so the pending request resolves immediately
instead of hanging until timeout.
"""
async with httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_unexpected_content_type_app())) as client:
async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session: # pragma: no branch
await session.initialize()
with pytest.raises(MCPError, match="Unexpected content type: text/plain"): # pragma: no branch
await session.list_tools()
def _create_http_error_app(error_status: int) -> Starlette:
"""Create a server that returns an HTTP error for non-init requests."""
async def handle_mcp_request(request: Request) -> Response:
body = await request.body()
data = json.loads(body)
if data.get("method") == "initialize":
return _init_json_response(data)
if "id" not in data:
return Response(status_code=202)
return Response(status_code=error_status)
return Starlette(debug=True, routes=[Route("/mcp", handle_mcp_request, methods=["POST"])])
async def test_http_error_status_sends_jsonrpc_error() -> None:
"""Verify HTTP 5xx errors unblock the pending request with an MCPError.
When a server returns a non-2xx status code (e.g. 500), the client should
send a JSONRPCError so the pending request resolves immediately instead of
raising an unhandled httpx.HTTPStatusError that causes the caller to hang.
"""
async with httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_http_error_app(500))) as client:
async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session: # pragma: no branch
await session.initialize()
with pytest.raises(MCPError, match="Server returned an error response"): # pragma: no branch
await session.list_tools()
def _create_invalid_json_response_app() -> Starlette:
"""Create a server that returns invalid JSON for requests."""
async def handle_mcp_request(request: Request) -> Response:
body = await request.body()
data = json.loads(body)
if data.get("method") == "initialize":
return _init_json_response(data)
if "id" not in data:
return Response(status_code=202)
# Return application/json content type but with invalid JSON body.
return Response(content="not valid json{{{", status_code=200, media_type="application/json")
return Starlette(debug=True, routes=[Route("/mcp", handle_mcp_request, methods=["POST"])])
async def test_invalid_json_response_sends_jsonrpc_error() -> None:
"""Verify invalid JSON responses unblock the pending request with an MCPError.
When a server returns application/json with an unparseable body, the client
should send a JSONRPCError so the pending request resolves immediately
instead of hanging until timeout.
"""
async with httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_invalid_json_response_app())) as client:
async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session: # pragma: no branch
await session.initialize()
with pytest.raises(MCPError, match="Failed to parse JSON response"): # pragma: no branch
await session.list_tools()