-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy pathtask_support.py
More file actions
116 lines (90 loc) · 3.7 KB
/
task_support.py
File metadata and controls
116 lines (90 loc) · 3.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""TaskSupport - Configuration for experimental task support.
This module provides the TaskSupport class which encapsulates all the
infrastructure needed for task-augmented requests: store, queue, and handler.
"""
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from anyio.abc import TaskGroup
from mcp.server.experimental.task_result_handler import TaskResultHandler
from mcp.server.session import ServerSession
from mcp.shared._task_group import create_mcp_task_group
from mcp.shared.experimental.tasks.in_memory_task_store import InMemoryTaskStore
from mcp.shared.experimental.tasks.message_queue import InMemoryTaskMessageQueue, TaskMessageQueue
from mcp.shared.experimental.tasks.store import TaskStore
@dataclass
class TaskSupport:
"""Configuration for experimental task support.
Encapsulates the task store, message queue, result handler, and task group
for spawning background work.
When enabled on a server, this automatically:
- Configures response routing for each session
- Provides default handlers for task operations
- Manages a task group for background task execution
Example:
Simple in-memory setup:
```python
server.experimental.enable_tasks()
```
Custom store/queue for distributed systems:
```python
server.experimental.enable_tasks(
store=RedisTaskStore(redis_url),
queue=RedisTaskMessageQueue(redis_url),
)
```
"""
store: TaskStore
queue: TaskMessageQueue
handler: TaskResultHandler = field(init=False)
_task_group: TaskGroup | None = field(init=False, default=None)
def __post_init__(self) -> None:
"""Create the result handler from store and queue."""
self.handler = TaskResultHandler(self.store, self.queue)
@property
def task_group(self) -> TaskGroup:
"""Get the task group for spawning background work.
Raises:
RuntimeError: If not within a run() context
"""
if self._task_group is None:
raise RuntimeError("TaskSupport not running. Ensure Server.run() is active.")
return self._task_group
@asynccontextmanager
async def run(self) -> AsyncIterator[None]:
"""Run the task support lifecycle.
This creates a task group for spawning background task work.
Called automatically by Server.run().
Usage:
async with task_support.run():
# Task group is now available
...
"""
async with create_mcp_task_group() as tg:
self._task_group = tg # type: ignore[assignment]
try:
yield
finally:
self._task_group = None
def configure_session(self, session: ServerSession) -> None:
"""Configure a session for task support.
This registers the result handler as a response router so that
responses to queued requests (elicitation, sampling) are routed
back to the waiting resolvers.
Called automatically by Server.run() for each new session.
Args:
session: The session to configure
"""
session.add_response_router(self.handler)
@classmethod
def in_memory(cls) -> "TaskSupport":
"""Create in-memory task support.
Suitable for development, testing, and single-process servers.
For distributed systems, provide custom store and queue implementations.
Returns:
TaskSupport configured with in-memory store and queue
"""
return cls(
store=InMemoryTaskStore(),
queue=InMemoryTaskMessageQueue(),
)