-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy pathserver.py
More file actions
142 lines (125 loc) · 5 KB
/
server.py
File metadata and controls
142 lines (125 loc) · 5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import logging
import anyio
import click
import uvicorn
from mcp import types
from mcp.server import Server, ServerRequestContext
from starlette.middleware.cors import CORSMiddleware
from .event_store import InMemoryEventStore
# Configure logging
logger = logging.getLogger(__name__)
async def handle_list_tools(
ctx: ServerRequestContext, params: types.PaginatedRequestParams | None
) -> types.ListToolsResult:
return types.ListToolsResult(
tools=[
types.Tool(
name="start-notification-stream",
description="Sends a stream of notifications with configurable count and interval",
input_schema={
"type": "object",
"required": ["interval", "count", "caller"],
"properties": {
"interval": {
"type": "number",
"description": "Interval between notifications in seconds",
},
"count": {
"type": "number",
"description": "Number of notifications to send",
},
"caller": {
"type": "string",
"description": "Identifier of the caller to include in notifications",
},
},
},
)
]
)
async def handle_call_tool(ctx: ServerRequestContext, params: types.CallToolRequestParams) -> types.CallToolResult:
arguments = params.arguments or {}
interval = arguments.get("interval", 1.0)
count = arguments.get("count", 5)
caller = arguments.get("caller", "unknown")
# Send the specified number of notifications with the given interval
for i in range(count):
# Include more detailed message for resumability demonstration
notification_msg = f"[{i + 1}/{count}] Event from '{caller}' - Use Last-Event-ID to resume if disconnected"
await ctx.session.send_log_message(
level="info",
data=notification_msg,
logger="notification_stream",
# Associates this notification with the original request
# Ensures notifications are sent to the correct response stream
# Without this, notifications will either go to:
# - a standalone SSE stream (if GET request is supported)
# - nowhere (if GET request isn't supported)
related_request_id=ctx.request_id,
)
logger.debug(f"Sent notification {i + 1}/{count} for caller: {caller}")
if i < count - 1: # Don't wait after the last notification
await anyio.sleep(interval)
# This will send a resource notification through standalone SSE
# established by GET request
await ctx.session.send_resource_updated(uri="http:///test_resource")
return types.CallToolResult(
content=[
types.TextContent(
type="text",
text=(f"Sent {count} notifications with {interval}s interval for caller: {caller}"),
)
]
)
@click.command()
@click.option("--port", default=3000, help="Port to listen on for HTTP")
@click.option(
"--log-level",
default="INFO",
help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
)
@click.option(
"--json-response",
is_flag=True,
default=False,
help="Enable JSON responses instead of SSE streams",
)
def main(
port: int,
log_level: str,
json_response: bool,
) -> int:
# Configure logging
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
app = Server(
"mcp-streamable-http-demo",
on_list_tools=handle_list_tools,
on_call_tool=handle_call_tool,
)
# Create event store for resumability
# The InMemoryEventStore enables resumability support for StreamableHTTP transport.
# It stores SSE events with unique IDs, allowing clients to:
# 1. Receive event IDs for each SSE message
# 2. Resume streams by sending Last-Event-ID in GET requests
# 3. Replay missed events after reconnection
# Note: This in-memory implementation is for demonstration ONLY.
# For production, use a persistent storage solution.
event_store = InMemoryEventStore()
starlette_app = app.streamable_http_app(
event_store=event_store,
json_response=json_response,
debug=True,
)
# Wrap ASGI application with CORS middleware to expose Mcp-Session-Id header
# for browser-based clients (ensures 500 errors get proper CORS headers)
starlette_app = CORSMiddleware(
starlette_app,
allow_origins=["*"], # Allow all origins - adjust as needed for production
allow_methods=["GET", "POST", "DELETE"], # MCP streamable HTTP methods
expose_headers=["Mcp-Session-Id"],
)
uvicorn.run(starlette_app, host="127.0.0.1", port=port)
return 0