-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Expand file tree
/
Copy pathtest_1363_race_condition_streamable_http.py
More file actions
276 lines (222 loc) · 9.69 KB
/
test_1363_race_condition_streamable_http.py
File metadata and controls
276 lines (222 loc) · 9.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
"""Test for issue #1363 - Race condition in StreamableHTTP transport causes ClosedResourceError.
This test reproduces the race condition described in issue #1363 where MCP servers
in HTTP Streamable mode experience ClosedResourceError exceptions when requests
fail validation early (e.g., due to incorrect Accept headers).
The race condition occurs because:
1. Transport setup creates a message_router task
2. Message router enters async for write_stream_reader loop
3. write_stream_reader calls checkpoint() in receive(), yielding control
4. Request handling processes HTTP request
5. If validation fails early, request returns immediately
6. Transport termination closes all streams including write_stream_reader
7. Message router may still be in checkpoint() yield and hasn't returned to check stream state
8. When message router resumes, it encounters a closed stream, raising ClosedResourceError
"""
import socket
import subprocess
import sys
import time
import httpx
import pytest
SERVER_NAME = "test_race_condition_server"
def check_server_logs_for_errors(process: subprocess.Popen[str], test_name: str) -> None:
"""
Check server logs for ClosedResourceError and other race condition errors.
Args:
process: The server process
test_name: Name of the test for better error messages
"""
# Get logs from the process
try:
stdout, stderr = process.communicate(timeout=10)
server_logs = stderr + stdout
except Exception:
server_logs = ""
# Check for specific race condition errors
errors_found: list[str] = []
if "ClosedResourceError" in server_logs:
errors_found.append("ClosedResourceError")
if "Error in message router" in server_logs:
errors_found.append("Error in message router")
if "anyio.ClosedResourceError" in server_logs:
errors_found.append("anyio.ClosedResourceError")
# Assert no race condition errors occurred
if errors_found:
error_msg = f"Test '{test_name}' found race condition errors: {', '.join(errors_found)}\n"
error_msg += f"Server logs:\n{server_logs}"
pytest.fail(error_msg)
# If we get here, no race condition errors were found
print(f"✓ Test '{test_name}' passed: No race condition errors detected")
@pytest.fixture
def server_port() -> int:
with socket.socket() as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
@pytest.fixture
def server_url(server_port: int) -> str:
return f"http://127.0.0.1:{server_port}"
def start_server_process(port: int, json_response: bool | None = None) -> subprocess.Popen[str]:
"""Start server in a separate process."""
# Create a temporary script to run the server
import os
server_code = f"""
import sys
import os
sys.path.insert(0, {repr(os.getcwd())})
import socket
import time
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
import uvicorn
from starlette.applications import Starlette
from starlette.routing import Mount
from starlette.types import Receive, Scope, Send
from mcp.server import Server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from mcp.types import Tool
SERVER_NAME = "test_race_condition_server"
class RaceConditionTestServer(Server):
def __init__(self):
super().__init__(SERVER_NAME)
async def on_list_tools(self) -> list[Tool]:
return []
def run_server_with_logging(port: int) -> None:
app = RaceConditionTestServer()
# Create session manager
session_manager = StreamableHTTPSessionManager(
app=app,
json_response={json_response},
stateless=True, # Use stateless mode to trigger the race condition
)
# Create the ASGI handler
async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None:
await session_manager.handle_request(scope, receive, send)
# Create Starlette app with lifespan
@asynccontextmanager
async def lifespan(app: Starlette) -> AsyncGenerator[None, None]:
async with session_manager.run():
yield
routes = [
Mount("/", app=handle_streamable_http),
]
starlette_app = Starlette(routes=routes, lifespan=lifespan)
uvicorn.run(starlette_app, host="127.0.0.1", port=port, log_level="debug")
run_server_with_logging({port})
"""
process = subprocess.Popen(
[sys.executable, "-c", server_code], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
# Wait for server to be running with connection testing (like other tests)
max_attempts = 20
attempt = 0
while attempt < max_attempts:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(("127.0.0.1", port))
break
except ConnectionRefusedError:
time.sleep(0.1)
attempt += 1
else:
# If server failed to start, terminate the process and raise an error
process.terminate()
process.wait()
raise RuntimeError(f"Server failed to start after {max_attempts} attempts")
return process
@pytest.mark.anyio
async def test_race_condition_invalid_accept_headers(server_port: int):
"""
Test the race condition with invalid Accept headers.
This test reproduces the exact scenario described in issue #1363:
- Send POST request with incorrect Accept headers (missing either application/json or text/event-stream)
- Request fails validation early and returns quickly
- This should trigger the race condition where message_router encounters ClosedResourceError
"""
process = start_server_process(server_port)
try:
# Test with missing text/event-stream in Accept header
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.post(
f"http://127.0.0.1:{server_port}/",
json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}},
headers={
"Accept": "application/json", # Missing text/event-stream
"Content-Type": "application/json",
},
)
# Should get 406 Not Acceptable due to missing text/event-stream
assert response.status_code == 406
# Test with missing application/json in Accept header
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.post(
f"http://127.0.0.1:{server_port}/",
json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}},
headers={
"Accept": "text/event-stream", # Missing application/json
"Content-Type": "application/json",
},
)
# Should get 406 Not Acceptable due to missing application/json
assert response.status_code == 406
# Test with completely invalid Accept header
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.post(
f"http://127.0.0.1:{server_port}/",
json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}},
headers={
"Accept": "text/plain", # Invalid Accept header
"Content-Type": "application/json",
},
)
# Should get 406 Not Acceptable
assert response.status_code == 406
finally:
process.terminate()
process.wait()
# Check server logs for race condition errors
check_server_logs_for_errors(process, "test_race_condition_invalid_accept_headers")
@pytest.mark.anyio
async def test_race_condition_invalid_content_type(server_port: int):
"""
Test the race condition with invalid Content-Type headers.
This test reproduces the race condition scenario with Content-Type validation failure.
"""
process = start_server_process(server_port)
try:
# Test with invalid Content-Type
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.post(
f"http://127.0.0.1:{server_port}/",
json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}},
headers={
"Accept": "application/json, text/event-stream",
"Content-Type": "text/plain", # Invalid Content-Type
},
)
assert response.status_code == 400
finally:
process.terminate()
process.wait()
# Check server logs for race condition errors
check_server_logs_for_errors(process, "test_race_condition_invalid_content_type")
@pytest.mark.anyio
async def test_race_condition_message_router_async_for(server_port: int):
"""
Uses json_response=True to trigger the `if self.is_json_response_enabled` branch,
which reproduces the ClosedResourceError when message_router is suspended
in async for loop while transport cleanup closes streams concurrently.
"""
process = start_server_process(server_port, json_response=True)
try:
# use standard mcp client to send requests
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamablehttp_client
for _ in range(1):
async with streamablehttp_client(f"http://127.0.0.1:{server_port}") as (read_stream, write_stream, _):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
finally:
process.terminate()
process.wait()
# Check server logs for race condition errors in message router
check_server_logs_for_errors(process, "test_race_condition_message_router_async_for")