-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy pathserver.py
More file actions
91 lines (76 loc) · 2.55 KB
/
server.py
File metadata and controls
91 lines (76 loc) · 2.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from urllib.parse import urlparse
import anyio
import click
from mcp import types
from mcp.server import Server, ServerRequestContext
SAMPLE_RESOURCES = {
"greeting": {
"content": "Hello! This is a sample text resource.",
"title": "Welcome Message",
},
"help": {
"content": "This server provides a few sample text resources for testing.",
"title": "Help Documentation",
},
"about": {
"content": "This is the simple-resource MCP server implementation.",
"title": "About This Server",
},
}
async def handle_list_resources(
ctx: ServerRequestContext, params: types.PaginatedRequestParams | None
) -> types.ListResourcesResult:
return types.ListResourcesResult(
resources=[
types.Resource(
uri=f"file:///{name}.txt",
name=name,
title=SAMPLE_RESOURCES[name]["title"],
description=f"A sample text resource named {name}",
mime_type="text/plain",
)
for name in SAMPLE_RESOURCES.keys()
]
)
async def handle_read_resource(
ctx: ServerRequestContext, params: types.ReadResourceRequestParams
) -> types.ReadResourceResult:
parsed = urlparse(str(params.uri))
if not parsed.path:
raise ValueError(f"Invalid resource path: {params.uri}")
name = parsed.path.replace(".txt", "").lstrip("/")
if name not in SAMPLE_RESOURCES:
raise ValueError(f"Unknown resource: {params.uri}")
return types.ReadResourceResult(
contents=[
types.TextResourceContents(
uri=str(params.uri),
text=SAMPLE_RESOURCES[name]["content"],
mime_type="text/plain",
)
]
)
@click.command()
@click.option("--port", default=8000, help="Port to listen on for HTTP")
@click.option(
"--transport",
type=click.Choice(["stdio", "streamable-http"]),
default="stdio",
help="Transport type",
)
def main(port: int, transport: str) -> int:
app = Server(
"mcp-simple-resource",
on_list_resources=handle_list_resources,
on_read_resource=handle_read_resource,
)
if transport == "streamable-http":
import uvicorn
uvicorn.run(app.streamable_http_app(), host="127.0.0.1", port=port)
else:
from mcp.server.stdio import stdio_server
async def arun():
async with stdio_server() as streams:
await app.run(streams[0], streams[1], app.create_initialization_options())
anyio.run(arun)
return 0