-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy pathserver.py
More file actions
84 lines (64 loc) · 2.77 KB
/
server.py
File metadata and controls
84 lines (64 loc) · 2.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
"""Simple task server demonstrating MCP tasks over streamable HTTP."""
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any
import anyio
import click
import uvicorn
from mcp import types
from mcp.server.experimental.task_context import ServerTaskContext
from mcp.server.lowlevel import Server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from starlette.applications import Starlette
from starlette.routing import Mount
server = Server("simple-task-server")
# One-line setup: auto-registers get_task, get_task_result, list_tasks, cancel_task
server.experimental.enable_tasks()
@server.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="long_running_task",
description="A task that takes a few seconds to complete with status updates",
input_schema={"type": "object", "properties": {}},
execution=types.ToolExecution(task_support=types.TASK_REQUIRED),
)
]
async def handle_long_running_task(arguments: dict[str, Any]) -> types.CreateTaskResult:
"""Handle the long_running_task tool - demonstrates status updates."""
ctx = server.request_context
ctx.experimental.validate_task_mode(types.TASK_REQUIRED)
async def work(task: ServerTaskContext) -> types.CallToolResult:
await task.update_status("Starting work...")
await anyio.sleep(1)
await task.update_status("Processing step 1...")
await anyio.sleep(1)
await task.update_status("Processing step 2...")
await anyio.sleep(1)
return types.CallToolResult(content=[types.TextContent(type="text", text="Task completed!")])
return await ctx.experimental.run_task(work)
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any]) -> types.CallToolResult | types.CreateTaskResult:
"""Dispatch tool calls to their handlers."""
if name == "long_running_task":
return await handle_long_running_task(arguments)
else:
return types.CallToolResult(
content=[types.TextContent(type="text", text=f"Unknown tool: {name}")],
is_error=True,
)
@click.command()
@click.option("--port", default=8000, help="Port to listen on")
def main(port: int) -> int:
session_manager = StreamableHTTPSessionManager(app=server)
@asynccontextmanager
async def app_lifespan(app: Starlette) -> AsyncIterator[None]:
async with session_manager.run():
yield
starlette_app = Starlette(
routes=[Mount("/mcp", app=session_manager.handle_request)],
lifespan=app_lifespan,
)
print(f"Starting server on http://localhost:{port}/mcp")
uvicorn.run(starlette_app, host="127.0.0.1", port=port)
return 0