-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy pathserver.py
More file actions
460 lines (362 loc) · 15.2 KB
/
server.py
File metadata and controls
460 lines (362 loc) · 15.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
#!/usr/bin/env python3
"""
MCP Everything Server - Conformance Test Server
Server implementing all MCP features for conformance testing based on Conformance Server Specification.
"""
import asyncio
import base64
import json
import logging
import click
from mcp.server.fastmcp import Context, FastMCP
from mcp.server.fastmcp.prompts.base import UserMessage
from mcp.server.session import ServerSession
from mcp.server.streamable_http import EventCallback, EventMessage, EventStore
from mcp.types import (
AudioContent,
Completion,
CompletionArgument,
CompletionContext,
EmbeddedResource,
ImageContent,
JSONRPCMessage,
PromptReference,
ResourceTemplateReference,
SamplingMessage,
TextContent,
TextResourceContents,
)
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
# Type aliases for event store
StreamId = str
EventId = str
class InMemoryEventStore(EventStore):
"""Simple in-memory event store for SSE resumability testing."""
def __init__(self) -> None:
self._events: list[tuple[StreamId, EventId, JSONRPCMessage | None]] = []
self._event_id_counter = 0
async def store_event(self, stream_id: StreamId, message: JSONRPCMessage | None) -> EventId:
"""Store an event and return its ID."""
self._event_id_counter += 1
event_id = str(self._event_id_counter)
self._events.append((stream_id, event_id, message))
return event_id
async def replay_events_after(self, last_event_id: EventId, send_callback: EventCallback) -> StreamId | None:
"""Replay events after the specified ID."""
target_stream_id = None
for stream_id, event_id, _ in self._events:
if event_id == last_event_id:
target_stream_id = stream_id
break
if target_stream_id is None:
return None
last_event_id_int = int(last_event_id)
for stream_id, event_id, message in self._events:
if stream_id == target_stream_id and int(event_id) > last_event_id_int:
# Skip priming events (None message)
if message is not None:
await send_callback(EventMessage(message, event_id))
return target_stream_id
# Test data
TEST_IMAGE_BASE64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg=="
TEST_AUDIO_BASE64 = "UklGRiYAAABXQVZFZm10IBAAAAABAAEAQB8AAAB9AAACABAAZGF0YQIAAAA="
# Server state
resource_subscriptions: set[str] = set()
watched_resource_content = "Watched resource content"
# Create event store for SSE resumability (SEP-1699)
event_store = InMemoryEventStore()
mcp = FastMCP(
name="mcp-conformance-test-server",
)
# Tools
@mcp.tool()
def test_simple_text() -> str:
"""Tests simple text content response"""
return "This is a simple text response for testing."
@mcp.tool()
def test_image_content() -> list[ImageContent]:
"""Tests image content response"""
return [ImageContent(type="image", data=TEST_IMAGE_BASE64, mimeType="image/png")]
@mcp.tool()
def test_audio_content() -> list[AudioContent]:
"""Tests audio content response"""
return [AudioContent(type="audio", data=TEST_AUDIO_BASE64, mimeType="audio/wav")]
@mcp.tool()
def test_embedded_resource() -> list[EmbeddedResource]:
"""Tests embedded resource content response"""
return [
EmbeddedResource(
type="resource",
resource=TextResourceContents(
uri="test://embedded-resource",
mimeType="text/plain",
text="This is an embedded resource content.",
),
)
]
@mcp.tool()
def test_multiple_content_types() -> list[TextContent | ImageContent | EmbeddedResource]:
"""Tests response with multiple content types (text, image, resource)"""
return [
TextContent(type="text", text="Multiple content types test:"),
ImageContent(type="image", data=TEST_IMAGE_BASE64, mimeType="image/png"),
EmbeddedResource(
type="resource",
resource=TextResourceContents(
uri="test://mixed-content-resource",
mimeType="application/json",
text='{"test": "data", "value": 123}',
),
),
]
@mcp.tool()
async def test_tool_with_logging(ctx: Context[ServerSession, None]) -> str:
"""Tests tool that emits log messages during execution"""
await ctx.info("Tool execution started")
await asyncio.sleep(0.05)
await ctx.info("Tool processing data")
await asyncio.sleep(0.05)
await ctx.info("Tool execution completed")
return "Tool with logging executed successfully"
@mcp.tool()
async def test_tool_with_progress(ctx: Context[ServerSession, None]) -> str:
"""Tests tool that reports progress notifications"""
await ctx.report_progress(progress=0, total=100, message="Completed step 0 of 100")
await asyncio.sleep(0.05)
await ctx.report_progress(progress=50, total=100, message="Completed step 50 of 100")
await asyncio.sleep(0.05)
await ctx.report_progress(progress=100, total=100, message="Completed step 100 of 100")
# Return progress token as string
progress_token = ctx.request_context.meta.progressToken if ctx.request_context and ctx.request_context.meta else 0
return str(progress_token)
@mcp.tool()
async def test_sampling(prompt: str, ctx: Context[ServerSession, None]) -> str:
"""Tests server-initiated sampling (LLM completion request)"""
try:
# Request sampling from client
result = await ctx.session.create_message(
messages=[SamplingMessage(role="user", content=TextContent(type="text", text=prompt))],
max_tokens=100,
)
# Since we're not passing tools param, result.content is single content
if result.content.type == "text":
model_response = result.content.text
else:
model_response = "No response"
return f"LLM response: {model_response}"
except Exception as e:
return f"Sampling not supported or error: {str(e)}"
class UserResponse(BaseModel):
response: str = Field(description="User's response")
@mcp.tool()
async def test_elicitation(message: str, ctx: Context[ServerSession, None]) -> str:
"""Tests server-initiated elicitation (user input request)"""
try:
# Request user input from client
result = await ctx.elicit(message=message, schema=UserResponse)
# Type-safe discriminated union narrowing using action field
if result.action == "accept":
content = result.data.model_dump_json()
else: # decline or cancel
content = "{}"
return f"User response: action={result.action}, content={content}"
except Exception as e:
return f"Elicitation not supported or error: {str(e)}"
class SEP1034DefaultsSchema(BaseModel):
"""Schema for testing SEP-1034 elicitation with default values for all primitive types"""
name: str = Field(default="John Doe", description="User name")
age: int = Field(default=30, description="User age")
score: float = Field(default=95.5, description="User score")
status: str = Field(
default="active",
description="User status",
json_schema_extra={"enum": ["active", "inactive", "pending"]},
)
verified: bool = Field(default=True, description="Verification status")
@mcp.tool()
async def test_elicitation_sep1034_defaults(ctx: Context[ServerSession, None]) -> str:
"""Tests elicitation with default values for all primitive types (SEP-1034)"""
try:
# Request user input with defaults for all primitive types
result = await ctx.elicit(message="Please provide user information", schema=SEP1034DefaultsSchema)
# Type-safe discriminated union narrowing using action field
if result.action == "accept":
content = result.data.model_dump_json()
else: # decline or cancel
content = "{}"
return f"Elicitation result: action={result.action}, content={content}"
except Exception as e:
return f"Elicitation not supported or error: {str(e)}"
class EnumSchemasTestSchema(BaseModel):
"""Schema for testing enum schema variations (SEP-1330)"""
untitledSingle: str = Field(
description="Simple enum without titles", json_schema_extra={"enum": ["active", "inactive", "pending"]}
)
titledSingle: str = Field(
description="Enum with titled options (oneOf)",
json_schema_extra={
"oneOf": [
{"const": "low", "title": "Low Priority"},
{"const": "medium", "title": "Medium Priority"},
{"const": "high", "title": "High Priority"},
]
},
)
untitledMulti: list[str] = Field(
description="Multi-select without titles",
json_schema_extra={"items": {"type": "string", "enum": ["read", "write", "execute"]}},
)
titledMulti: list[str] = Field(
description="Multi-select with titled options",
json_schema_extra={
"items": {
"anyOf": [
{"const": "feature", "title": "New Feature"},
{"const": "bug", "title": "Bug Fix"},
{"const": "docs", "title": "Documentation"},
]
}
},
)
legacyEnum: str = Field(
description="Legacy enum with enumNames",
json_schema_extra={
"enum": ["small", "medium", "large"],
"enumNames": ["Small Size", "Medium Size", "Large Size"],
},
)
@mcp.tool()
async def test_elicitation_sep1330_enums(ctx: Context[ServerSession, None]) -> str:
"""Tests elicitation with enum schema variations per SEP-1330"""
try:
result = await ctx.elicit(
message="Please select values using different enum schema types", schema=EnumSchemasTestSchema
)
if result.action == "accept":
content = result.data.model_dump_json()
else:
content = "{}"
return f"Elicitation completed: action={result.action}, content={content}"
except Exception as e:
return f"Elicitation not supported or error: {str(e)}"
@mcp.tool()
def test_error_handling() -> str:
"""Tests error response handling"""
raise RuntimeError("This tool intentionally returns an error for testing")
@mcp.tool()
async def test_reconnection(ctx: Context[ServerSession, None]) -> str:
"""Tests SSE polling by closing stream mid-call (SEP-1699)"""
await ctx.info("Before disconnect")
await ctx.close_sse_stream()
await asyncio.sleep(0.2) # Wait for client to reconnect
await ctx.info("After reconnect")
return "Reconnection test completed"
# Resources
@mcp.resource("test://static-text")
def static_text_resource() -> str:
"""A static text resource for testing"""
return "This is the content of the static text resource."
@mcp.resource("test://static-binary")
def static_binary_resource() -> bytes:
"""A static binary resource (image) for testing"""
return base64.b64decode(TEST_IMAGE_BASE64)
@mcp.resource("test://template/{id}/data")
def template_resource(id: str) -> str:
"""A resource template with parameter substitution"""
return json.dumps({"id": id, "templateTest": True, "data": f"Data for ID: {id}"})
@mcp.resource("test://watched-resource")
def watched_resource() -> str:
"""A resource that can be subscribed to for updates"""
return watched_resource_content
# Prompts
@mcp.prompt()
def test_simple_prompt() -> list[UserMessage]:
"""A simple prompt without arguments"""
return [UserMessage(role="user", content=TextContent(type="text", text="This is a simple prompt for testing."))]
@mcp.prompt()
def test_prompt_with_arguments(arg1: str, arg2: str) -> list[UserMessage]:
"""A prompt with required arguments"""
return [
UserMessage(
role="user", content=TextContent(type="text", text=f"Prompt with arguments: arg1='{arg1}', arg2='{arg2}'")
)
]
@mcp.prompt()
def test_prompt_with_embedded_resource(resourceUri: str) -> list[UserMessage]:
"""A prompt that includes an embedded resource"""
return [
UserMessage(
role="user",
content=EmbeddedResource(
type="resource",
resource=TextResourceContents(
uri=resourceUri,
mimeType="text/plain",
text="Embedded resource content for testing.",
),
),
),
UserMessage(role="user", content=TextContent(type="text", text="Please process the embedded resource above.")),
]
@mcp.prompt()
def test_prompt_with_image() -> list[UserMessage]:
"""A prompt that includes image content"""
return [
UserMessage(role="user", content=ImageContent(type="image", data=TEST_IMAGE_BASE64, mimeType="image/png")),
UserMessage(role="user", content=TextContent(type="text", text="Please analyze the image above.")),
]
# Custom request handlers
# TODO(felix): Add public APIs to FastMCP for subscribe_resource, unsubscribe_resource,
# and set_logging_level to avoid accessing protected _mcp_server attribute.
@mcp._mcp_server.set_logging_level() # pyright: ignore[reportPrivateUsage]
async def handle_set_logging_level(level: str) -> None:
"""Handle logging level changes"""
logger.info(f"Log level set to: {level}")
# In a real implementation, you would adjust the logging level here
# For conformance testing, we just acknowledge the request
async def handle_subscribe(uri: str) -> None:
"""Handle resource subscription"""
resource_subscriptions.add(str(uri))
logger.info(f"Subscribed to resource: {uri}")
async def handle_unsubscribe(uri: str) -> None:
"""Handle resource unsubscription"""
resource_subscriptions.discard(str(uri))
logger.info(f"Unsubscribed from resource: {uri}")
mcp._mcp_server.subscribe_resource()(handle_subscribe) # pyright: ignore[reportPrivateUsage]
mcp._mcp_server.unsubscribe_resource()(handle_unsubscribe) # pyright: ignore[reportPrivateUsage]
@mcp.completion()
async def _handle_completion(
ref: PromptReference | ResourceTemplateReference,
argument: CompletionArgument,
context: CompletionContext | None,
) -> Completion:
"""Handle completion requests"""
# Basic completion support - returns empty array for conformance
# Real implementations would provide contextual suggestions
return Completion(values=[], total=0, hasMore=False)
# CLI
@click.command()
@click.option("--port", default=3001, help="Port to listen on for HTTP")
@click.option(
"--log-level",
default="INFO",
help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
)
def main(port: int, log_level: str) -> int:
"""Run the MCP Everything Server."""
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger.info(f"Starting MCP Everything Server on port {port}")
logger.info(f"Endpoint will be: http://localhost:{port}/mcp")
mcp.run(
transport="streamable-http",
port=port,
event_store=event_store,
retry_interval=100, # 100ms retry interval for SSE polling
)
return 0
if __name__ == "__main__":
main()