-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy pathcontext.py
More file actions
99 lines (79 loc) · 3.05 KB
/
context.py
File metadata and controls
99 lines (79 loc) · 3.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""TaskContext - Pure task state management.
This module provides TaskContext, which manages task state without any
server/session dependencies. It can be used standalone for distributed
workers or wrapped by ServerTaskContext for full server integration.
"""
from mcp.shared.experimental.tasks.store import TaskStore
from mcp.types import TASK_STATUS_COMPLETED, TASK_STATUS_FAILED, Result, Task
class TaskContext:
"""Pure task state management - no session dependencies.
This class handles:
- Task state (status, result)
- Cancellation tracking
- Store interactions
For server-integrated features (elicit, create_message, notifications),
use ServerTaskContext from mcp.server.experimental.
Example (distributed worker):
async def worker_job(task_id: str, session_id: str | None):
store = RedisTaskStore(redis_url)
task = await store.get_task(task_id, session_id=session_id)
ctx = TaskContext(task=task, store=store, session_id=session_id)
await ctx.update_status("Working...")
result = await do_work()
await ctx.complete(result)
"""
def __init__(self, task: Task, store: TaskStore, *, session_id: str | None):
self._task = task
self._store = store
self._session_id = session_id
self._cancelled = False
@property
def task_id(self) -> str:
"""The task identifier."""
return self._task.task_id
@property
def task(self) -> Task:
"""The current task state."""
return self._task
@property
def is_cancelled(self) -> bool:
"""Whether cancellation has been requested."""
return self._cancelled
def request_cancellation(self) -> None:
"""Request cancellation of this task.
This sets is_cancelled=True. Task work should check this
periodically and exit gracefully if set.
"""
self._cancelled = True
async def update_status(self, message: str) -> None:
"""Update the task's status message.
Args:
message: The new status message
"""
self._task = await self._store.update_task(
self.task_id,
status_message=message,
session_id=self._session_id,
)
async def complete(self, result: Result) -> None:
"""Mark the task as completed with the given result.
Args:
result: The task result
"""
await self._store.store_result(self.task_id, result, session_id=self._session_id)
self._task = await self._store.update_task(
self.task_id,
status=TASK_STATUS_COMPLETED,
session_id=self._session_id,
)
async def fail(self, error: str) -> None:
"""Mark the task as failed with an error message.
Args:
error: The error message
"""
self._task = await self._store.update_task(
self.task_id,
status=TASK_STATUS_FAILED,
status_message=error,
session_id=self._session_id,
)