Skip to content

Commit c590af4

Browse files
committed
Async resource support
1 parent 1cb7407 commit c590af4

7 files changed

Lines changed: 332 additions & 6 deletions

File tree

src/mcp/server/fastmcp/resources/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from .base import Resource
2+
from .async_resource import AsyncResource, AsyncStatus
23
from .resource_manager import ResourceManager
34
from .templates import ResourceTemplate
45
from .types import (
@@ -12,6 +13,8 @@
1213

1314
__all__ = [
1415
"Resource",
16+
"AsyncResource",
17+
"AsyncStatus",
1518
"TextResource",
1619
"BinaryResource",
1720
"FunctionResource",
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
"""Asynchronous resource implementation for long-running operations."""
2+
3+
import asyncio
4+
import enum
5+
# import time
6+
from typing import Any, Optional
7+
8+
import pydantic
9+
from pydantic import Field
10+
11+
from mcp.server.fastmcp.resources.base import Resource
12+
from mcp.server.fastmcp.utilities.logging import get_logger
13+
14+
logger = get_logger(__name__)
15+
16+
17+
class AsyncStatus(str, enum.Enum):
18+
"""Status of an asynchronous operation."""
19+
PENDING = "pending"
20+
RUNNING = "running"
21+
COMPLETED = "completed"
22+
FAILED = "failed"
23+
CANCELED = "canceled"
24+
25+
26+
class AsyncResource(Resource):
27+
"""A resource representing an asynchronous operation.
28+
29+
This resource type is used to track long-running operations that are executed
30+
asynchronously. It provides methods for updating progress, completing with a result,
31+
failing with an error, and canceling the operation.
32+
"""
33+
34+
status: AsyncStatus = Field(
35+
default=AsyncStatus.PENDING,
36+
description="Current status of the asynchronous operation"
37+
)
38+
# progress: float = Field(
39+
# default=0,
40+
# description="Current progress value (0-100 or raw count)"
41+
# )
42+
error: Optional[str] = Field(
43+
default=None,
44+
description="Error message if the operation failed"
45+
)
46+
# created_at: float = Field(
47+
# default_factory=time.time,
48+
# description="Timestamp when the resource was created"
49+
# )
50+
# started_at: Optional[float] = Field(
51+
# default=None,
52+
# description="Timestamp when the operation started running"
53+
# )
54+
# completed_at: Optional[float] = Field(
55+
# default=None,
56+
# description="Timestamp when the operation completed, failed, or was canceled"
57+
# )
58+
59+
# Fields not included in serialization
60+
_task: Optional[asyncio.Task[Any]] = pydantic.PrivateAttr(default=None)
61+
# _mcp_server = pydantic.PrivateAttr(default=None)
62+
63+
# def set_mcp_server(self, server: Any) -> None:
64+
# """Set the MCP server reference.
65+
66+
# Args:
67+
# server: The MCP server instance
68+
# """
69+
# self._mcp_server = server
70+
71+
async def read(self) -> str:
72+
"""Read the current state of the resource as JSON.
73+
74+
Returns the current status and progress information.
75+
"""
76+
# Convert the resource to a dictionary, excluding private fields
77+
data = self.model_dump(exclude={"_task"})
78+
79+
# Return status info as JSON
80+
import json
81+
return json.dumps(data, indent=2)
82+
83+
async def start(self, task: asyncio.Task[Any]) -> None:
84+
"""Mark the resource as running and store the task.
85+
86+
Args:
87+
task: The asyncio task that is executing the operation
88+
"""
89+
self._task = task
90+
self.status = AsyncStatus.RUNNING
91+
# self.started_at = time.time()
92+
# await self._notify_changed()
93+
94+
logger.debug(
95+
"Started async operation",
96+
extra={
97+
"uri": self.uri,
98+
}
99+
)
100+
101+
# async def update_progress(self, progress: float) -> None:
102+
# """Update the progress information.
103+
104+
# Args:
105+
# progress: Current progress value
106+
# total: Total expected progress value, if known
107+
# """
108+
# self.progress = progress
109+
# # await self._notify_changed()
110+
111+
# logger.debug(
112+
# "Updated async operation progress",
113+
# extra={
114+
# "uri": self.uri,
115+
# "progress": self.progress,
116+
# }
117+
# )
118+
119+
async def complete(self) -> None:
120+
"""Mark the resource as completed.
121+
"""
122+
self.status = AsyncStatus.COMPLETED
123+
# self.completed_at = time.time()
124+
125+
# await self._notify_changed()
126+
127+
logger.info(
128+
"Completed async operation",
129+
extra={
130+
"uri": self.uri,
131+
# "duration": self.completed_at - (self.started_at or self.created_at),
132+
}
133+
)
134+
135+
async def fail(self, error: str) -> None:
136+
"""Mark the resource as failed and store the error.
137+
138+
Args:
139+
error: Error message describing why the operation failed
140+
"""
141+
self.status = AsyncStatus.FAILED
142+
self.error = error
143+
# self.completed_at = time.time()
144+
# await self._notify_changed()
145+
146+
logger.error(
147+
"Failed async operation",
148+
extra={
149+
"uri": self.uri,
150+
"error": error,
151+
# "duration": self.completed_at - (self.started_at or self.created_at),
152+
}
153+
)
154+
155+
async def cancel(self) -> None:
156+
"""Cancel the operation if it's still running."""
157+
if self.status in (AsyncStatus.PENDING, AsyncStatus.RUNNING) and self._task:
158+
self._task.cancel()
159+
self.status = AsyncStatus.CANCELED
160+
# self.completed_at = time.time()
161+
# await self._notify_changed()
162+
163+
logger.info(
164+
"Canceled async operation",
165+
extra={
166+
"uri": self.uri,
167+
# "duration": self.completed_at - (self.started_at or self.created_at),
168+
}
169+
)
170+
171+
# async def _notify_changed(self) -> None:
172+
# """Notify subscribers that the resource has changed."""
173+
# if self._mcp_server:
174+
# # This will be implemented in the MCP server to notify clients
175+
# # of resource changes via the notification protocol
176+
# self._mcp_server.notify_resource_changed(self.uri)

src/mcp/server/fastmcp/resources/resource_manager.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
"""Resource manager functionality."""
22

3+
import uuid
34
from collections.abc import Callable
45
from typing import Any
56

67
from pydantic import AnyUrl
78

89
from mcp.server.fastmcp.resources.base import Resource
10+
from mcp.server.fastmcp.resources.async_resource import AsyncResource
911
from mcp.server.fastmcp.resources.templates import ResourceTemplate
1012
from mcp.server.fastmcp.utilities.logging import get_logger
1113

@@ -19,6 +21,7 @@ def __init__(self, warn_on_duplicate_resources: bool = True):
1921
self._resources: dict[str, Resource] = {}
2022
self._templates: dict[str, ResourceTemplate] = {}
2123
self.warn_on_duplicate_resources = warn_on_duplicate_resources
24+
# self._mcp_server = None
2225

2326
def add_resource(self, resource: Resource) -> Resource:
2427
"""Add a resource to the manager.
@@ -93,3 +96,41 @@ def list_templates(self) -> list[ResourceTemplate]:
9396
"""List all registered templates."""
9497
logger.debug("Listing templates", extra={"count": len(self._templates)})
9598
return list(self._templates.values())
99+
100+
# def set_mcp_server(self, server: Any) -> None:
101+
# """Set the MCP server reference.
102+
103+
# This allows resources to notify the server when they change.
104+
105+
# Args:
106+
# server: The MCP server instance
107+
# """
108+
# self._mcp_server = server
109+
110+
def create_async_resource(
111+
self,
112+
name: str | None = None,
113+
description: str | None = None,
114+
) -> AsyncResource:
115+
"""Create a new async resource.
116+
117+
Args:
118+
name: Optional name for the resource
119+
description: Optional description of the resource
120+
121+
Returns:
122+
A new AsyncResource instance
123+
"""
124+
resource_uri = f"resource://tasks/{uuid.uuid4()}"
125+
resource = AsyncResource(
126+
uri=AnyUrl(resource_uri),
127+
name=name,
128+
description=description,
129+
)
130+
131+
# # Set the MCP server reference if available
132+
# if self._mcp_server:
133+
# resource.set_mcp_server(self._mcp_server)
134+
135+
self.add_resource(resource)
136+
return resource

src/mcp/server/fastmcp/server.py

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,11 @@ def __init__(
161161
self._prompt_manager = PromptManager(
162162
warn_on_duplicate_prompts=self.settings.warn_on_duplicate_prompts
163163
)
164+
165+
# Connect the resource manager and tool manager
166+
self._tool_manager.set_resource_manager(self._resource_manager)
167+
self._resource_manager.set_mcp_server(self._mcp_server)
168+
164169
if (self.settings.auth is not None) != (auth_server_provider is not None):
165170
# TODO: after we support separate authorization servers (see
166171
# https://github.com/modelcontextprotocol/modelcontextprotocol/pull/284)
@@ -321,6 +326,7 @@ def add_tool(
321326
name: str | None = None,
322327
description: str | None = None,
323328
annotations: ToolAnnotations | None = None,
329+
async_supported: bool = False,
324330
) -> None:
325331
"""Add a tool to the server.
326332
@@ -332,16 +338,19 @@ def add_tool(
332338
name: Optional name for the tool (defaults to function name)
333339
description: Optional description of what the tool does
334340
annotations: Optional ToolAnnotations providing additional tool information
341+
async_supported: Whether this tool supports asynchronous execution
335342
"""
336343
self._tool_manager.add_tool(
337-
fn, name=name, description=description, annotations=annotations
344+
fn, name=name, description=description, annotations=annotations,
345+
async_supported=async_supported
338346
)
339347

340348
def tool(
341349
self,
342350
name: str | None = None,
343351
description: str | None = None,
344352
annotations: ToolAnnotations | None = None,
353+
async_supported: bool = False,
345354
) -> Callable[[AnyFunction], AnyFunction]:
346355
"""Decorator to register a tool.
347356
@@ -353,6 +362,7 @@ def tool(
353362
name: Optional name for the tool (defaults to function name)
354363
description: Optional description of what the tool does
355364
annotations: Optional ToolAnnotations providing additional tool information
365+
async_supported: Whether this tool supports asynchronous execution
356366
357367
Example:
358368
@server.tool()
@@ -368,6 +378,16 @@ def tool_with_context(x: int, ctx: Context) -> str:
368378
async def async_tool(x: int, context: Context) -> str:
369379
await context.report_progress(50, 100)
370380
return str(x)
381+
382+
@server.tool(async_supported=True)
383+
async def long_running_tool(x: int, context: Context) -> str:
384+
# This tool will be executed asynchronously
385+
# The client will receive a resource URI immediately
386+
# and can track progress through that resource
387+
for i in range(100):
388+
await asyncio.sleep(0.1)
389+
await context.report_progress(i, 100)
390+
return f"Processed {x}"
371391
"""
372392
# Check if user passed function directly instead of calling decorator
373393
if callable(name):
@@ -378,7 +398,8 @@ async def async_tool(x: int, context: Context) -> str:
378398

379399
def decorator(fn: AnyFunction) -> AnyFunction:
380400
self.add_tool(
381-
fn, name=name, description=description, annotations=annotations
401+
fn, name=name, description=description, annotations=annotations,
402+
async_supported=async_supported
382403
)
383404
return fn
384405

@@ -917,14 +938,38 @@ def my_tool(x: int, ctx: Context) -> str:
917938
client_id = ctx.client_id
918939
919940
return str(x)
941+
942+
@server.tool(async_supported=True)
943+
async def long_running_tool(x: int, ctx: Context) -> str:
944+
# For async tools, the context.resource will be set to an AsyncResource
945+
# that can be used to update progress and status
946+
947+
total_steps = 100
948+
for i in range(total_steps):
949+
# Do some work
950+
await asyncio.sleep(0.1)
951+
952+
# Update progress through the AsyncResource
953+
if ctx.resource:
954+
await ctx.resource.update_progress(i, total_steps)
955+
956+
# You can also use the standard progress reporting
957+
await ctx.report_progress(i, total_steps)
958+
959+
return f"Processed {x}"
920960
```
921961
922962
The context parameter name can be anything as long as it's annotated with Context.
923963
The context is optional - tools that don't need it can omit the parameter.
964+
965+
For asynchronous tools (marked with async_supported=True), the context will have
966+
a resource attribute set to an AsyncResource instance that can be used to update
967+
progress and status information.
924968
"""
925969

926970
_request_context: RequestContext[ServerSessionT, LifespanContextT] | None
927971
_fastmcp: FastMCP | None
972+
resource: Any = None # Can hold a reference to an AsyncResource for async operations
928973

929974
def __init__(
930975
self,

src/mcp/server/fastmcp/tools/base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ class Tool(BaseModel):
3434
annotations: ToolAnnotations | None = Field(
3535
None, description="Optional annotations for the tool"
3636
)
37+
async_supported: bool = Field(
38+
False, description="Whether this tool supports asynchronous execution"
39+
)
3740

3841
@classmethod
3942
def from_function(
@@ -43,6 +46,7 @@ def from_function(
4346
description: str | None = None,
4447
context_kwarg: str | None = None,
4548
annotations: ToolAnnotations | None = None,
49+
async_supported: bool = False,
4650
) -> Tool:
4751
"""Create a Tool from a function."""
4852
from mcp.server.fastmcp.server import Context
@@ -79,6 +83,7 @@ def from_function(
7983
is_async=is_async,
8084
context_kwarg=context_kwarg,
8185
annotations=annotations,
86+
async_supported=async_supported,
8287
)
8388

8489
async def run(

0 commit comments

Comments
 (0)