-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy pathcontext.py
More file actions
287 lines (232 loc) · 11.1 KB
/
context.py
File metadata and controls
287 lines (232 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
from __future__ import annotations
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, Generic, Literal
from pydantic import AnyUrl, BaseModel
from mcp.server.context import LifespanContextT, RequestT, ServerRequestContext
from mcp.server.elicitation import (
ElicitationResult,
ElicitSchemaModelT,
UrlElicitationResult,
elicit_url,
elicit_with_validation,
)
from mcp.server.lowlevel.helper_types import ReadResourceContents
if TYPE_CHECKING:
from mcp.server.mcpserver.server import MCPServer
class Context(BaseModel, Generic[LifespanContextT, RequestT]):
"""Context object providing access to MCP capabilities.
This provides a cleaner interface to MCP's RequestContext functionality.
It gets injected into tool and resource functions that request it via type hints.
To use context in a tool function, add a parameter with the Context type annotation:
```python
@server.tool()
async def my_tool(x: int, ctx: Context) -> str:
# Log messages to the client
await ctx.info(f"Processing {x}")
await ctx.debug("Debug info")
await ctx.warning("Warning message")
await ctx.error("Error message")
# Report progress
await ctx.report_progress(50, 100)
# Access resources
data = await ctx.read_resource("resource://data")
# Get request info
request_id = ctx.request_id
client_id = ctx.client_id
return str(x)
```
The context parameter name can be anything as long as it's annotated with Context.
The context is optional - tools that don't need it can omit the parameter.
"""
_request_context: ServerRequestContext[LifespanContextT, RequestT] | None
_mcp_server: MCPServer | None
# TODO(maxisbey): Consider making request_context/mcp_server required, or refactor Context entirely.
def __init__(
self,
*,
request_context: ServerRequestContext[LifespanContextT, RequestT] | None = None,
mcp_server: MCPServer | None = None,
# TODO(Marcelo): We should drop this kwargs parameter.
**kwargs: Any,
):
super().__init__(**kwargs)
self._request_context = request_context
self._mcp_server = mcp_server
@property
def mcp_server(self) -> MCPServer:
"""Access to the MCPServer instance."""
if self._mcp_server is None: # pragma: no cover
raise ValueError("Context is not available outside of a request")
return self._mcp_server # pragma: no cover
@property
def request_context(self) -> ServerRequestContext[LifespanContextT, RequestT]:
"""Access to the underlying request context."""
if self._request_context is None: # pragma: no cover
raise ValueError("Context is not available outside of a request")
return self._request_context
async def report_progress(self, progress: float, total: float | None = None, message: str | None = None) -> None:
"""Report progress for the current operation.
Args:
progress: Current progress value (e.g., 24)
total: Optional total value (e.g., 100)
message: Optional message (e.g., "Starting render...")
"""
progress_token = self.request_context.meta.get("progress_token") if self.request_context.meta else None
if progress_token is None: # pragma: no cover
return
await self.request_context.session.send_progress_notification(
progress_token=progress_token,
progress=progress,
total=total,
message=message,
related_request_id=self.request_id,
)
async def read_resource(self, uri: str | AnyUrl) -> Iterable[ReadResourceContents]:
"""Read a resource by URI.
Args:
uri: Resource URI to read
Returns:
The resource content as either text or bytes
"""
assert self._mcp_server is not None, "Context is not available outside of a request"
return await self._mcp_server.read_resource(uri, self)
async def elicit(
self,
message: str,
schema: type[ElicitSchemaModelT],
) -> ElicitationResult[ElicitSchemaModelT]:
"""Elicit information from the client/user.
This method can be used to interactively ask for additional information from the
client within a tool's execution. The client might display the message to the
user and collect a response according to the provided schema. If the client
is an agent, it might decide how to handle the elicitation -- either by asking
the user or automatically generating a response.
Args:
message: Message to present to the user
schema: A Pydantic model class defining the expected response structure.
According to the specification, only primitive types are allowed.
Returns:
An ElicitationResult containing the action taken and the data if accepted
Note:
Check the result.action to determine if the user accepted, declined, or cancelled.
The result.data will only be populated if action is "accept" and validation succeeded.
"""
return await elicit_with_validation(
session=self.request_context.session,
message=message,
schema=schema,
related_request_id=self.request_id,
)
async def elicit_url(
self,
message: str,
url: str,
elicitation_id: str,
) -> UrlElicitationResult:
"""Request URL mode elicitation from the client.
This directs the user to an external URL for out-of-band interactions
that must not pass through the MCP client. Use this for:
- Collecting sensitive credentials (API keys, passwords)
- OAuth authorization flows with third-party services
- Payment and subscription flows
- Any interaction where data should not pass through the LLM context
The response indicates whether the user consented to navigate to the URL.
The actual interaction happens out-of-band. When the elicitation completes,
call `ctx.session.send_elicit_complete(elicitation_id)` to notify the client.
Args:
message: Human-readable explanation of why the interaction is needed
url: The URL the user should navigate to
elicitation_id: Unique identifier for tracking this elicitation
Returns:
UrlElicitationResult indicating accept, decline, or cancel
"""
return await elicit_url(
session=self.request_context.session,
message=message,
url=url,
elicitation_id=elicitation_id,
related_request_id=self.request_id,
)
async def log(
self,
level: Literal["debug", "info", "warning", "error"],
message: Any,
*,
logger_name: str | None = None,
extra: dict[str, Any] | None = None,
) -> None:
"""Send a log message to the client.
Per the MCP spec, the data to be logged can be any JSON-serializable type
(string, dict, list, number, bool, etc.), not just strings.
Args:
level: Log level (debug, info, warning, error)
message: Any JSON-serializable data to log
logger_name: Optional logger name
extra: Optional dictionary with additional structured data to include.
When provided, data is wrapped in a dict with the extra fields merged in.
"""
if extra:
if isinstance(message, dict):
log_data = {**message, **extra}
else:
log_data = {"message": message, **extra}
else:
log_data = message
await self.request_context.session.send_log_message(
level=level,
data=log_data,
logger=logger_name,
related_request_id=self.request_id,
)
@property
def client_id(self) -> str | None:
"""Get the client ID if available."""
return self.request_context.meta.get("client_id") if self.request_context.meta else None # pragma: no cover
@property
def request_id(self) -> str:
"""Get the unique ID for this request."""
return str(self.request_context.request_id)
@property
def session(self):
"""Access to the underlying session for advanced usage."""
return self.request_context.session
async def close_sse_stream(self) -> None:
"""Close the SSE stream to trigger client reconnection.
This method closes the HTTP connection for the current request, triggering
client reconnection. Events continue to be stored in the event store and will
be replayed when the client reconnects with Last-Event-ID.
Use this to implement polling behavior during long-running operations -
the client will reconnect after the retry interval specified in the priming event.
Note:
This is a no-op if not using StreamableHTTP transport with event_store.
The callback is only available when event_store is configured.
"""
if self._request_context and self._request_context.close_sse_stream: # pragma: no cover
await self._request_context.close_sse_stream()
async def close_standalone_sse_stream(self) -> None:
"""Close the standalone GET SSE stream to trigger client reconnection.
This method closes the HTTP connection for the standalone GET stream used
for unsolicited server-to-client notifications. The client SHOULD reconnect
with Last-Event-ID to resume receiving notifications.
Note:
This is a no-op if not using StreamableHTTP transport with event_store.
Currently, client reconnection for standalone GET streams is NOT
implemented - this is a known gap.
"""
if self._request_context and self._request_context.close_standalone_sse_stream: # pragma: no cover
await self._request_context.close_standalone_sse_stream()
# Convenience methods for common log levels
async def debug(self, message: Any, *, logger_name: str | None = None, extra: dict[str, Any] | None = None) -> None:
"""Send a debug log message."""
await self.log("debug", message, logger_name=logger_name, extra=extra)
async def info(self, message: Any, *, logger_name: str | None = None, extra: dict[str, Any] | None = None) -> None:
"""Send an info log message."""
await self.log("info", message, logger_name=logger_name, extra=extra)
async def warning(
self, message: Any, *, logger_name: str | None = None, extra: dict[str, Any] | None = None
) -> None:
"""Send a warning log message."""
await self.log("warning", message, logger_name=logger_name, extra=extra)
async def error(self, message: Any, *, logger_name: str | None = None, extra: dict[str, Any] | None = None) -> None:
"""Send an error log message."""
await self.log("error", message, logger_name=logger_name, extra=extra)