forked from modelcontextprotocol/python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstdio.py
More file actions
96 lines (78 loc) · 3.78 KB
/
Copy pathstdio.py
File metadata and controls
96 lines (78 loc) · 3.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
"""Stdio Server Transport Module
This module provides functionality for creating an stdio-based transport layer
that can be used to communicate with an MCP client through standard input/output
streams.
Example:
```python
async def run_server():
async with stdio_server() as (read_stream, write_stream):
# read_stream contains incoming JSONRPCMessages from stdin
# write_stream allows sending JSONRPCMessages to stdout
server = await create_my_server()
await server.run(read_stream, write_stream, init_options)
anyio.run(run_server)
```
"""
import sys
from contextlib import asynccontextmanager
from io import TextIOWrapper
import anyio
import anyio.lowlevel
from mcp import types
from mcp.shared._context_streams import create_context_streams
from mcp.shared.message import SessionMessage
class _NoCloseTextIOWrapper(TextIOWrapper):
"""A TextIOWrapper that does not close the underlying buffer on garbage collection.
Standard TextIOWrapper calls close() in __del__, which closes the underlying
buffer. When wrapping sys.stdin.buffer or sys.stdout.buffer, this causes the
real process stdio to be closed after the server exits, breaking subsequent
print() or input() calls in the parent process.
"""
def close(self) -> None: # pragma: lax no cover
# Intentionally not closing the underlying buffer.
# The standard process handles should outlive the server.
pass
def __del__(self) -> None: # pragma: lax no cover
# Prevent TextIOWrapper.__del__ from calling close().
pass
@asynccontextmanager
async def stdio_server(stdin: anyio.AsyncFile[str] | None = None, stdout: anyio.AsyncFile[str] | None = None):
"""Server transport for stdio: this communicates with an MCP client by reading
from the current process' stdin and writing to stdout.
"""
# Purposely not using context managers for these, as we don't want to close
# standard process handles. Encoding of stdin/stdout as text streams on
# python is platform-dependent (Windows is particularly problematic), so we
# re-wrap the underlying binary stream to ensure UTF-8.
if not stdin:
stdin = anyio.wrap_file(_NoCloseTextIOWrapper(sys.stdin.buffer, encoding="utf-8", errors="replace"))
if not stdout:
stdout = anyio.wrap_file(_NoCloseTextIOWrapper(sys.stdout.buffer, encoding="utf-8"))
read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0)
write_stream, write_stream_reader = create_context_streams[SessionMessage](0)
async def stdin_reader():
try:
async with read_stream_writer:
async for line in stdin:
try:
message = types.jsonrpc_message_adapter.validate_json(line, by_name=False)
except Exception as exc:
await read_stream_writer.send(exc)
continue
session_message = SessionMessage(message)
await read_stream_writer.send(session_message)
except anyio.ClosedResourceError: # pragma: no cover
await anyio.lowlevel.checkpoint()
async def stdout_writer():
try:
async with write_stream_reader:
async for session_message in write_stream_reader:
json = session_message.message.model_dump_json(by_alias=True, exclude_unset=True)
await stdout.write(json + "\n")
await stdout.flush()
except anyio.ClosedResourceError: # pragma: no cover
await anyio.lowlevel.checkpoint()
async with anyio.create_task_group() as tg:
tg.start_soon(stdin_reader)
tg.start_soon(stdout_writer)
yield read_stream, write_stream