Skip to content

Commit 8be635a

Browse files
authored
Merge pull request #43 from ayhammouda/fix/flaky-stdio-cache-smoke
test: fix flaky stdio cache smoke test (EOF race)
2 parents 0ec7ef7 + 8843f2d commit 8be635a

2 files changed

Lines changed: 87 additions & 76 deletions

File tree

tests/test_mcp_get_docs_cache_smoke.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
import sqlite3
55
import subprocess
6-
import sys
76
from pathlib import Path
87

98
from mcp_server_python_docs.services.persistent_cache import _NO_ANCHOR_KEY
@@ -14,6 +13,7 @@
1413
_isolated_cache_env,
1514
_make_notification,
1615
_make_request,
16+
_run_server_until_responses,
1717
)
1818

1919

@@ -80,13 +80,10 @@ def _create_contentful_json_index(cache_dir: Path) -> Path:
8080

8181

8282
def _run_server(stdin_data: bytes, env: dict[str, str]) -> subprocess.CompletedProcess:
83-
return subprocess.run(
84-
[sys.executable, "-m", "mcp_server_python_docs", "serve"],
85-
input=stdin_data,
86-
capture_output=True,
87-
timeout=15,
88-
env=env,
89-
)
83+
# Use the polling runner (waits for the tools/call reply before closing
84+
# stdin) instead of subprocess.run(input=...), which races the server's
85+
# EOF-driven shutdown and flakes on cold CI runners.
86+
return _run_server_until_responses(stdin_data, env)
9087

9188

9289
def _initialized_tool_call(name: str, arguments: dict, req_id: int = 2) -> bytes:

tests/test_stdio_smoke.py

Lines changed: 82 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,87 @@ def _request_ids(stdin_data: bytes) -> list[int]:
184184
return request_ids
185185

186186

187+
def _run_server_until_responses(
188+
stdin_data: bytes, env: dict[str, str], timeout: int = 15,
189+
) -> subprocess.CompletedProcess:
190+
"""Run the MCP server, stream stdin, and wait for every expected response.
191+
192+
Unlike a fire-and-forget ``subprocess.run(input=...)``, this keeps stdin
193+
open until each request id in ``stdin_data`` has a matching response on
194+
stdout (or the timeout/exit fires). That avoids a race where the server's
195+
read loop shuts down on stdin EOF before the final ``tools/call`` handler
196+
flushes its reply — the source of CI flakiness on cold runners.
197+
"""
198+
proc = subprocess.Popen(
199+
[sys.executable, "-m", "mcp_server_python_docs", "serve"],
200+
stdin=subprocess.PIPE,
201+
stdout=subprocess.PIPE,
202+
stderr=subprocess.PIPE,
203+
env=env,
204+
)
205+
assert proc.stdin is not None
206+
assert proc.stdout is not None
207+
assert proc.stderr is not None
208+
209+
stdout_lines: list[bytes] = []
210+
stderr_lines: list[bytes] = []
211+
output_lock = threading.Lock()
212+
213+
def read_stream(stream, sink: list[bytes]) -> None:
214+
for line in iter(stream.readline, b""):
215+
with output_lock:
216+
sink.append(line)
217+
218+
stdout_thread = threading.Thread(
219+
target=read_stream,
220+
args=(proc.stdout, stdout_lines),
221+
daemon=True,
222+
)
223+
stderr_thread = threading.Thread(
224+
target=read_stream,
225+
args=(proc.stderr, stderr_lines),
226+
daemon=True,
227+
)
228+
stdout_thread.start()
229+
stderr_thread.start()
230+
231+
expected_ids = _request_ids(stdin_data)
232+
deadline = time.monotonic() + timeout
233+
try:
234+
for line in stdin_data.splitlines(keepends=True):
235+
proc.stdin.write(line)
236+
proc.stdin.flush()
237+
238+
while time.monotonic() < deadline:
239+
with output_lock:
240+
responses = _read_responses(b"".join(stdout_lines))
241+
if all(_find_response(responses, req_id) is not None for req_id in expected_ids):
242+
break
243+
if proc.poll() is not None:
244+
break
245+
time.sleep(0.02)
246+
finally:
247+
try:
248+
proc.stdin.close()
249+
except BrokenPipeError:
250+
# The subprocess may have already closed stdin after handling the requests.
251+
pass
252+
proc.stdin = None
253+
254+
remaining = max(0.1, deadline - time.monotonic())
255+
try:
256+
proc.wait(timeout=remaining)
257+
except subprocess.TimeoutExpired:
258+
proc.kill()
259+
proc.wait(timeout=5)
260+
261+
stdout_thread.join(timeout=1)
262+
stderr_thread.join(timeout=1)
263+
stdout = b"".join(stdout_lines)
264+
stderr = b"".join(stderr_lines)
265+
return subprocess.CompletedProcess(proc.args, proc.returncode, stdout, stderr)
266+
267+
187268
class TestStdioSmoke:
188269
"""Spawn the MCP server as a subprocess and verify protocol compliance."""
189270

@@ -198,74 +279,7 @@ def _run_server_with_input(
198279
self, stdin_data: bytes, timeout: int = 15,
199280
) -> subprocess.CompletedProcess:
200281
"""Run the server subprocess with line-paced stdin and return the result."""
201-
proc = subprocess.Popen(
202-
[sys.executable, "-m", "mcp_server_python_docs", "serve"],
203-
stdin=subprocess.PIPE,
204-
stdout=subprocess.PIPE,
205-
stderr=subprocess.PIPE,
206-
env=self.env,
207-
)
208-
assert proc.stdin is not None
209-
assert proc.stdout is not None
210-
assert proc.stderr is not None
211-
212-
stdout_lines: list[bytes] = []
213-
stderr_lines: list[bytes] = []
214-
output_lock = threading.Lock()
215-
216-
def read_stream(stream, sink: list[bytes]) -> None:
217-
for line in iter(stream.readline, b""):
218-
with output_lock:
219-
sink.append(line)
220-
221-
stdout_thread = threading.Thread(
222-
target=read_stream,
223-
args=(proc.stdout, stdout_lines),
224-
daemon=True,
225-
)
226-
stderr_thread = threading.Thread(
227-
target=read_stream,
228-
args=(proc.stderr, stderr_lines),
229-
daemon=True,
230-
)
231-
stdout_thread.start()
232-
stderr_thread.start()
233-
234-
expected_ids = _request_ids(stdin_data)
235-
deadline = time.monotonic() + timeout
236-
try:
237-
for line in stdin_data.splitlines(keepends=True):
238-
proc.stdin.write(line)
239-
proc.stdin.flush()
240-
241-
while time.monotonic() < deadline:
242-
with output_lock:
243-
responses = _read_responses(b"".join(stdout_lines))
244-
if all(_find_response(responses, req_id) is not None for req_id in expected_ids):
245-
break
246-
if proc.poll() is not None:
247-
break
248-
time.sleep(0.02)
249-
finally:
250-
try:
251-
proc.stdin.close()
252-
except BrokenPipeError:
253-
# The subprocess may have already closed stdin after handling the requests.
254-
pass
255-
proc.stdin = None
256-
257-
remaining = max(0.1, deadline - time.monotonic())
258-
try:
259-
proc.wait(timeout=remaining)
260-
except subprocess.TimeoutExpired:
261-
proc.kill()
262-
proc.wait(timeout=5)
263-
264-
stdout_thread.join(timeout=1)
265-
stderr_thread.join(timeout=1)
266-
stdout = b"".join(stdout_lines)
267-
stderr = b"".join(stderr_lines)
268-
return subprocess.CompletedProcess(proc.args, proc.returncode, stdout, stderr)
282+
return _run_server_until_responses(stdin_data, self.env, timeout)
269283

270284
def test_server_lists_tools_no_stdout_pollution(self):
271285
"""Server returns tool list and stdout has no non-JSON-RPC bytes."""

0 commit comments

Comments
 (0)