-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy path_memory.py
More file actions
97 lines (80 loc) · 3.15 KB
/
_memory.py
File metadata and controls
97 lines (80 loc) · 3.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""In-memory transport for testing MCP servers without network overhead."""
from __future__ import annotations
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from typing import Any
import anyio
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from mcp.server import Server
from mcp.server.fastmcp import FastMCP
from mcp.shared.memory import create_client_server_memory_streams
from mcp.shared.message import SessionMessage
class InMemoryTransport:
"""
In-memory transport for testing MCP servers without network overhead.
This transport starts the server in a background task and provides
streams for client-side communication. The server is automatically
stopped when the context manager exits.
Example:
server = FastMCP("test")
transport = InMemoryTransport(server)
async with transport.connect() as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
# Use the session...
Or more commonly, use with Client:
async with Client(server) as client:
result = await client.call_tool("my_tool", {...})
"""
def __init__(
self,
server: Server[Any] | FastMCP,
*,
raise_exceptions: bool = False,
) -> None:
"""
Initialize the in-memory transport.
Args:
server: The MCP server to connect to (Server or FastMCP instance)
raise_exceptions: Whether to raise exceptions from the server
"""
self._server = server
self._raise_exceptions = raise_exceptions
@asynccontextmanager
async def connect(
self,
) -> AsyncGenerator[
tuple[
MemoryObjectReceiveStream[SessionMessage | Exception],
MemoryObjectSendStream[SessionMessage],
],
None,
]:
"""
Connect to the server and return streams for communication.
Yields:
A tuple of (read_stream, write_stream) for bidirectional communication
"""
# Unwrap FastMCP to get underlying Server
actual_server: Server[Any]
if isinstance(self._server, FastMCP):
actual_server = self._server._mcp_server # type: ignore[reportPrivateUsage]
else:
actual_server = self._server
async with create_client_server_memory_streams() as (client_streams, server_streams):
client_read, client_write = client_streams
server_read, server_write = server_streams
async with anyio.create_task_group() as tg:
# Start server in background
tg.start_soon(
lambda: actual_server.run(
server_read,
server_write,
actual_server.create_initialization_options(),
raise_exceptions=self._raise_exceptions,
)
)
try:
yield client_read, client_write
finally:
tg.cancel_scope.cancel()