Skip to content

Commit 3f5fd7e

Browse files
committed
support for resumability - server
1 parent b957fad commit 3f5fd7e

File tree

4 files changed

+306
-10
lines changed

4 files changed

+306
-10
lines changed

examples/servers/simple-streamablehttp/README.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ A simple MCP server example demonstrating the StreamableHttp transport, which en
99
- Task management with anyio task groups
1010
- Ability to send multiple notifications over time to the client
1111
- Proper resource cleanup and lifespan management
12+
- Resumability support via InMemoryEventStore
1213

1314
## Usage
1415

@@ -32,6 +33,23 @@ The server exposes a tool named "start-notification-stream" that accepts three a
3233
- `count`: Number of notifications to send (e.g., 5)
3334
- `caller`: Identifier string for the caller
3435

36+
## Resumability Support
37+
38+
This server includes resumability support through the InMemoryEventStore. This enables clients to:
39+
40+
- Reconnect to the server after a disconnection
41+
- Resume event streaming from where they left off using the Last-Event-ID header
42+
43+
44+
The server will:
45+
- Generate unique event IDs for each SSE message
46+
- Store events in memory for later replay
47+
- Replay missed events when a client reconnects with a Last-Event-ID header
48+
49+
Note: The InMemoryEventStore is designed for demonstration purposes only. For production use, consider implementing a persistent storage solution.
50+
51+
52+
3553
## Client
3654

37-
You can connect to this server using an HTTP client, for now only Typescript SDK has streamable HTTP client examples or you can use (Inspector)[https://github.com/modelcontextprotocol/inspector]
55+
You can connect to this server using an HTTP client, for now only Typescript SDK has streamable HTTP client examples or you can use [Inspector](https://github.com/modelcontextprotocol/inspector)
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
"""
2+
In-memory event store for demonstrating resumability functionality.
3+
4+
This is a simple implementation intended for examples and testing,
5+
not for production use where a persistent storage solution would be more appropriate.
6+
"""
7+
8+
import logging
9+
import time
10+
from collections.abc import Awaitable, Callable
11+
from uuid import uuid4
12+
13+
from mcp.server.streamableHttp import EventId, EventStore, StreamId
14+
from mcp.types import JSONRPCMessage
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
class InMemoryEventStore(EventStore):
20+
"""
21+
Simple in-memory implementation of the EventStore interface for resumability.
22+
This is primarily intended for examples and testing, not for production use
23+
where a persistent storage solution would be more appropriate.
24+
"""
25+
26+
def __init__(self):
27+
self.events: dict[
28+
str, tuple[str, JSONRPCMessage, float]
29+
] = {} # event_id -> (stream_id, message, timestamp)
30+
31+
async def store_event(
32+
self, stream_id: StreamId, message: JSONRPCMessage
33+
) -> EventId:
34+
"""Stores an event with a generated event ID."""
35+
event_id = str(uuid4())
36+
self.events[event_id] = (stream_id, message, time.time())
37+
return event_id
38+
39+
async def replay_events_after(
40+
self,
41+
last_event_id: EventId,
42+
send_callback: Callable[[EventId, JSONRPCMessage], Awaitable[None]],
43+
) -> StreamId:
44+
"""Replays events that occurred after the specified event ID."""
45+
logger.debug(f"Attempting to replay events after {last_event_id}")
46+
logger.debug(f"Total events in store: {len(self.events)}")
47+
logger.debug(f"Event IDs in store: {list(self.events.keys())}")
48+
49+
if not last_event_id or last_event_id not in self.events:
50+
logger.warning(f"Event ID {last_event_id} not found in store")
51+
return ""
52+
53+
# Get the stream ID and timestamp from the last event
54+
stream_id, _, last_timestamp = self.events[last_event_id]
55+
56+
# Find all events for this stream after the last event
57+
events_to_replay = [
58+
(event_id, message)
59+
for event_id, (sid, message, timestamp) in self.events.items()
60+
if sid == stream_id and timestamp > last_timestamp
61+
]
62+
63+
# Sort by timestamp to ensure chronological order
64+
events_to_replay.sort(key=lambda x: self.events[x[0]][2])
65+
66+
logger.debug(f"Found {len(events_to_replay)} events to replay")
67+
logger.debug(
68+
f"Events to replay: {[event_id for event_id, _ in events_to_replay]}"
69+
)
70+
71+
# Send all events in order
72+
for event_id, message in events_to_replay:
73+
await send_callback(event_id, message)
74+
75+
return stream_id

examples/servers/simple-streamablehttp/mcp_simple_streamablehttp/server.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,24 @@
1717
from starlette.responses import Response
1818
from starlette.routing import Mount
1919

20+
from .event_store import InMemoryEventStore
21+
2022
# Configure logging
2123
logger = logging.getLogger(__name__)
2224

2325
# Global task group that will be initialized in the lifespan
2426
task_group = None
2527

28+
# Event store for resumability
29+
# The InMemoryEventStore enables resumability support for StreamableHTTP transport.
30+
# It stores SSE events with unique IDs, allowing clients to:
31+
# 1. Receive event IDs for each SSE message
32+
# 2. Resume streams by sending Last-Event-ID in GET requests
33+
# 3. Replay missed events after reconnection
34+
# Note: This in-memory implementation is for demonstration ONLY.
35+
# For production, use a persistent storage solution.
36+
event_store = InMemoryEventStore()
37+
2638

2739
@contextlib.asynccontextmanager
2840
async def lifespan(app):
@@ -79,9 +91,14 @@ async def call_tool(
7991

8092
# Send the specified number of notifications with the given interval
8193
for i in range(count):
94+
# Include more detailed message for resumability demonstration
95+
notification_msg = (
96+
f"[{i+1}/{count}] Event from '{caller}' - "
97+
f"Use Last-Event-ID to resume if disconnected"
98+
)
8299
await ctx.session.send_log_message(
83100
level="info",
84-
data=f"Notification {i+1}/{count} from caller: {caller}",
101+
data=notification_msg,
85102
logger="notification_stream",
86103
# Associates this notification with the original request
87104
# Ensures notifications are sent to the correct response stream
@@ -90,6 +107,7 @@ async def call_tool(
90107
# - nowhere (if GET request isn't supported)
91108
related_request_id=ctx.request_id,
92109
)
110+
logger.debug(f"Sent notification {i+1}/{count} for caller: {caller}")
93111
if i < count - 1: # Don't wait after the last notification
94112
await anyio.sleep(interval)
95113

@@ -163,8 +181,10 @@ async def handle_streamable_http(scope, receive, send):
163181
http_transport = StreamableHTTPServerTransport(
164182
mcp_session_id=new_session_id,
165183
is_json_response_enabled=json_response,
184+
event_store=event_store, # Enable resumability
166185
)
167186
server_instances[http_transport.mcp_session_id] = http_transport
187+
logger.info(f"Created new transport with session ID: {new_session_id}")
168188
async with http_transport.connect() as streams:
169189
read_stream, write_stream = streams
170190

0 commit comments

Comments
 (0)