-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Expand file tree
/
Copy pathtest_stdio.py
More file actions
152 lines (130 loc) · 6.84 KB
/
Copy pathtest_stdio.py
File metadata and controls
152 lines (130 loc) · 6.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""The stdio transport: one subprocess end-to-end test and one in-process framing test.
The subprocess test proves the client-server round trip over the transport's real process
boundary; its server lives in `_stdio_server.py` and is launched via `python -m` so subprocess
coverage measurement applies. The framing test drives `stdio_server` over injected in-process
streams instead.
stdio is deliberately not a leg of the `connect`-fixture matrix: a subprocess per test would be
slow, and the matrix already proves transport-agnosticism in-process. Process-lifecycle edge
cases (terminate/kill escalation, parse errors) stay in `tests/client/test_stdio.py`.
"""
import io
import json
import os
import sys
import tempfile
from pathlib import Path
from typing import TextIO, cast
import anyio
import pytest
from inline_snapshot import snapshot
from mcp.client import stdio
from mcp.client.client import Client
from mcp.client.stdio import StdioServerParameters, stdio_client
from mcp.server.stdio import stdio_server
from mcp.shared.message import SessionMessage
from mcp.types import (
CallToolResult,
JSONRPCNotification,
JSONRPCRequest,
JSONRPCResponse,
LoggingMessageNotificationParams,
TextContent,
)
from mcp.types.jsonrpc import jsonrpc_message_adapter
from tests.interaction._connect import initialize_body
from tests.interaction._requirements import requirement
from tests.interaction.transports import _stdio_server
pytestmark = pytest.mark.anyio
_REPO_ROOT = Path(__file__).parents[3]
@requirement("transport:stdio")
@requirement("transport:stdio:clean-shutdown")
@requirement("transport:stdio:stderr-passthrough")
async def test_tool_call_and_notification_round_trip_over_a_stdio_subprocess(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""A stdio-subprocess Client round-trips a tool call, a notification, and a clean exit.
The Client initializes, calls a tool with arguments, and receives the server's log
notification before the call returns; the server exits when the transport closes its
stdin.
"""
# After stdin closes, the child must unwind, write the clean-exit line, and let coverage's
# atexit hook persist its subprocess data file before escalation. The production 2s default
# was too tight on slow Windows runners: the child was killed mid-atexit (test stayed green)
# and the silently missing data file tripped the 100% coverage gate. Not under test.
monkeypatch.setattr(stdio, "PROCESS_TERMINATION_TIMEOUT", 10.0)
received: list[LoggingMessageNotificationParams] = []
async def collect(params: LoggingMessageNotificationParams) -> None:
received.append(params)
with tempfile.TemporaryFile(mode="w+") as errlog:
transport = stdio_client(
StdioServerParameters(
command=sys.executable,
args=["-m", _stdio_server.__name__],
cwd=str(_REPO_ROOT),
# stdio_client filters the inherited environment, dropping the variables
# coverage.py's subprocess support uses; pass them through so the server module is
# measured. PYTHONWARNINGS: the child recompiles anyio (pytest's pyc tag differs),
# and on 3.14 anyio's return-in-finally SyntaxWarning would land on the snapshot stderr.
env={key: value for key, value in os.environ.items() if key.startswith("COVERAGE_")}
| {"PYTHONWARNINGS": "ignore::SyntaxWarning"},
),
errlog=cast(TextIO, errlog),
)
# Must exceed session time plus the patched PROCESS_TERMINATION_TIMEOUT (10s).
with anyio.fail_after(20):
async with Client(transport, logging_callback=collect) as client:
assert client.initialize_result.server_info.name == "stdio-echo"
result = await client.call_tool("echo", {"text": "across\nprocesses"})
errlog.seek(0)
captured_stderr = errlog.read()
assert result == snapshot(CallToolResult(content=[TextContent(text="across\nprocesses")]))
# stdio carries one ordered server-to-client stream, so the same notification-before-response
# guarantee holds here as for the in-memory transport.
assert received == snapshot(
[LoggingMessageNotificationParams(level="info", logger="echo", data="echoing across\nprocesses")]
)
# The server writes this line only after its run loop returns, which happens when stdin closes:
# seeing it proves the process exited on its own rather than via the transport's terminate
# escalation, without a timing-based assertion. The capture itself proves stderr passthrough:
# the transport routes the child's stderr to the caller's `errlog` without consuming it.
# Prerelease Python/lowest-direct dependency runs may print warnings before the server marker.
assert captured_stderr.endswith("stdio-echo: clean exit\n")
@requirement("transport:stdio:stream-purity")
@requirement("transport:stdio:no-embedded-newlines")
async def test_stdio_server_writes_one_jsonrpc_message_per_line() -> None:
"""Every `stdio_server` write is one valid JSON-RPC message on its own line.
Each line is newline-terminated with payload newlines JSON-escaped. This proves the
transport's own framing; it does not guard `sys.stdout` against handler code (see the
divergence on `transport:stdio:stream-purity`).
"""
captured = io.StringIO()
sent_line = json.dumps(initialize_body(request_id=1)) + "\n"
with anyio.fail_after(5):
async with (
stdio_server(stdin=anyio.wrap_file(io.StringIO(sent_line)), stdout=anyio.wrap_file(captured)) as (
read_stream,
write_stream,
),
read_stream,
write_stream,
):
received = await read_stream.receive()
assert isinstance(received, SessionMessage)
assert isinstance(received.message, JSONRPCRequest)
assert received.message.method == "initialize"
response = JSONRPCResponse(jsonrpc="2.0", id=1, result={"text": "line\nbreak"})
notification = JSONRPCNotification(
jsonrpc="2.0", method="notifications/message", params={"level": "info", "data": "two\nlines"}
)
await write_stream.send(SessionMessage(response))
await write_stream.send(SessionMessage(notification))
output = captured.getvalue()
assert output.endswith("\n")
lines = output.removesuffix("\n").split("\n")
assert len(lines) == 2
messages = [jsonrpc_message_adapter.validate_json(line) for line in lines]
assert [type(message).__name__ for message in messages] == snapshot(["JSONRPCResponse", "JSONRPCNotification"])
# The newline inside the payload is JSON-escaped on the wire, not a literal newline that would
# break the one-message-per-line framing.
assert r"line\nbreak" in lines[0]
assert r"two\nlines" in lines[1]