Skip to content

Commit 26cdc6d

Browse files
refactor: update experimental wrappers and fix task context extraction
- Rewrite ExperimentalClientTasks to delegate to client.tasks (no more unsafe casts) - Rewrite ExperimentalServerTasks to delegate to server.tasks (no more unsafe casts) - Make taskStore optional in TaskManagerOptions (clients need TaskManager for outbound API) - Always create TaskManager in Client/Server (outbound API always available) - Fix extractInboundTaskContext to create context for task creation requests too - Update integration test references (extra.task!.taskStore etc.) Core: 430/430 pass. Integration: 475/487 pass (12 failures in message queuing/auto-poll).
1 parent 14635f7 commit 26cdc6d

File tree

10 files changed

+262
-377
lines changed

10 files changed

+262
-377
lines changed

packages/client/src/client/client.ts

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -264,31 +264,28 @@ export class Client<
264264
private _clientInfo: Implementation,
265265
options?: ClientOptions
266266
) {
267-
// Create TaskManager if taskStore is provided
268-
const taskManager = options?.taskStore
269-
? new TaskManager({
270-
taskStore: options.taskStore,
271-
taskMessageQueue: options.taskMessageQueue,
272-
defaultTaskPollInterval: options.defaultTaskPollInterval,
273-
maxTaskQueueSize: options.maxTaskQueueSize
274-
})
275-
: undefined;
267+
// Always create a TaskManager for outbound task API (getTask, requestStream, etc.)
268+
// If taskStore is provided, it also enables handling incoming task requests
269+
const taskManager = new TaskManager({
270+
taskStore: options?.taskStore,
271+
taskMessageQueue: options?.taskMessageQueue,
272+
defaultTaskPollInterval: options?.defaultTaskPollInterval,
273+
maxTaskQueueSize: options?.maxTaskQueueSize
274+
});
276275

277276
super(options, taskManager);
278277
this._capabilities = options?.capabilities ?? {};
279278
this._jsonSchemaValidator = options?.jsonSchemaValidator ?? new AjvJsonSchemaValidator();
280279
this._enforceStrictCapabilities = options?.enforceStrictCapabilities ?? false;
281280

282281
// Wire task capability assertions
283-
if (taskManager) {
284-
taskManager.assertTaskCapability = (method: string) => {
285-
assertToolsCallTaskCapability(this._serverCapabilities?.tasks?.requests, method, 'Server');
286-
};
287-
taskManager.assertTaskHandlerCapability = (method: string) => {
288-
if (!this._capabilities) return;
289-
assertClientRequestTaskCapability(this._capabilities.tasks?.requests, method, 'Client');
290-
};
291-
}
282+
taskManager.assertTaskCapability = (method: string) => {
283+
assertToolsCallTaskCapability(this._serverCapabilities?.tasks?.requests, method, 'Server');
284+
};
285+
taskManager.assertTaskHandlerCapability = (method: string) => {
286+
if (!this._capabilities) return;
287+
assertClientRequestTaskCapability(this._capabilities.tasks?.requests, method, 'Client');
288+
};
292289

293290
// Store list changed config for setup after connection (when we know server capabilities)
294291
if (options?.listChanged) {

packages/client/src/experimental/tasks/client.ts

Lines changed: 28 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
*/
77

88
import type {
9-
AnyObjectSchema,
109
CallToolRequest,
1110
CancelTaskResult,
1211
ClientRequest,
@@ -21,19 +20,15 @@ import type {
2120
SchemaOutput
2221
} from '@modelcontextprotocol/core';
2322
import { CallToolResultSchema, ErrorCode, McpError } from '@modelcontextprotocol/core';
23+
import type { AnyObjectSchema } from '@modelcontextprotocol/core';
2424

2525
import type { Client } from '../../client/client.js';
2626

2727
/**
2828
* Internal interface for accessing Client's private methods.
2929
* @internal
3030
*/
31-
interface ClientInternal<RequestT extends Request> {
32-
requestStream<T extends AnyObjectSchema>(
33-
request: ClientRequest | RequestT,
34-
resultSchema: T,
35-
options?: RequestOptions
36-
): AsyncGenerator<ResponseMessage<SchemaOutput<T>>, void, void>;
31+
interface ClientInternal {
3732
isToolTask(toolName: string): boolean;
3833
getToolOutputValidator(toolName: string): ((data: unknown) => { valid: boolean; errorMessage?: string }) | undefined;
3934
}
@@ -58,57 +53,33 @@ export class ExperimentalClientTasks<
5853

5954
/**
6055
* Calls a tool and returns an AsyncGenerator that yields response messages.
61-
* The generator is guaranteed to end with either a 'result' or 'error' message.
62-
*
63-
* This method provides streaming access to tool execution, allowing you to
64-
* observe intermediate task status updates for long-running tool calls.
6556
* Automatically validates structured output if the tool has an outputSchema.
6657
*
67-
* @example
68-
* ```typescript
69-
* const stream = client.experimental.tasks.callToolStream({ name: 'myTool', arguments: {} });
70-
* for await (const message of stream) {
71-
* switch (message.type) {
72-
* case 'taskCreated':
73-
* console.log('Tool execution started:', message.task.taskId);
74-
* break;
75-
* case 'taskStatus':
76-
* console.log('Tool status:', message.task.status);
77-
* break;
78-
* case 'result':
79-
* console.log('Tool result:', message.result);
80-
* break;
81-
* case 'error':
82-
* console.error('Tool error:', message.error);
83-
* break;
84-
* }
85-
* }
86-
* ```
87-
*
88-
* @param params - Tool call parameters (name and arguments)
89-
* @param resultSchema - Zod schema for validating the result (defaults to CallToolResultSchema)
90-
* @param options - Optional request options (timeout, signal, task creation params, etc.)
91-
* @returns AsyncGenerator that yields ResponseMessage objects
92-
*
9358
* @experimental
9459
*/
9560
async *callToolStream<T extends typeof CallToolResultSchema | typeof CompatibilityCallToolResultSchema>(
9661
params: CallToolRequest['params'],
9762
resultSchema: T = CallToolResultSchema as T,
9863
options?: RequestOptions
9964
): AsyncGenerator<ResponseMessage<SchemaOutput<T>>, void, void> {
100-
// Access Client's internal methods
101-
const clientInternal = this._client as unknown as ClientInternal<RequestT>;
65+
const tasks = this._client.tasks;
66+
if (!tasks) {
67+
throw new McpError(ErrorCode.InternalError, 'Task support is not enabled on this client');
68+
}
69+
70+
const clientInternal = this._client as unknown as ClientInternal;
10271

10372
// Add task creation parameters if server supports it and not explicitly provided
10473
const optionsWithTask = {
10574
...options,
106-
// We check if the tool is known to be a task during auto-configuration, but assume
107-
// the caller knows what they're doing if they pass this explicitly
10875
task: options?.task ?? (clientInternal.isToolTask(params.name) ? {} : undefined)
10976
};
11077

111-
const stream = clientInternal.requestStream({ method: 'tools/call', params }, resultSchema, optionsWithTask);
78+
const stream = tasks.requestStream(
79+
{ method: 'tools/call', params } as ClientRequest | RequestT,
80+
resultSchema,
81+
optionsWithTask
82+
);
11283

11384
// Get the validator for this tool (if it has an output schema)
11485
const validator = clientInternal.getToolOutputValidator(params.name);
@@ -134,7 +105,6 @@ export class ExperimentalClientTasks<
134105
// Only validate structured content if present (not when there's an error)
135106
if (result.structuredContent) {
136107
try {
137-
// Validate the structured content against the schema
138108
const validationResult = validator(result.structuredContent);
139109

140110
if (!validationResult.valid) {
@@ -171,104 +141,55 @@ export class ExperimentalClientTasks<
171141

172142
/**
173143
* Gets the current status of a task.
174-
*
175-
* @param taskId - The task identifier
176-
* @param options - Optional request options
177-
* @returns The task status
178-
*
179144
* @experimental
180145
*/
181146
async getTask(taskId: string, options?: RequestOptions): Promise<GetTaskResult> {
182-
// Delegate to the client's underlying Protocol method
183-
type ClientWithGetTask = { getTask(params: { taskId: string }, options?: RequestOptions): Promise<GetTaskResult> };
184-
return (this._client as unknown as ClientWithGetTask).getTask({ taskId }, options);
147+
const tasks = this._client.tasks;
148+
if (!tasks) throw new McpError(ErrorCode.InternalError, 'Task support is not enabled');
149+
return tasks.getTask({ taskId }, options);
185150
}
186151

187152
/**
188153
* Retrieves the result of a completed task.
189-
*
190-
* @param taskId - The task identifier
191-
* @param resultSchema - Zod schema for validating the result
192-
* @param options - Optional request options
193-
* @returns The task result
194-
*
195154
* @experimental
196155
*/
197156
async getTaskResult<T extends AnyObjectSchema>(taskId: string, resultSchema?: T, options?: RequestOptions): Promise<SchemaOutput<T>> {
198-
// Delegate to the client's underlying Protocol method
199-
return (
200-
this._client as unknown as {
201-
getTaskResult: <U extends AnyObjectSchema>(
202-
params: { taskId: string },
203-
resultSchema?: U,
204-
options?: RequestOptions
205-
) => Promise<SchemaOutput<U>>;
206-
}
207-
).getTaskResult({ taskId }, resultSchema, options);
157+
const tasks = this._client.tasks;
158+
if (!tasks) throw new McpError(ErrorCode.InternalError, 'Task support is not enabled');
159+
return tasks.getTaskResult({ taskId }, resultSchema!, options);
208160
}
209161

210162
/**
211163
* Lists tasks with optional pagination.
212-
*
213-
* @param cursor - Optional pagination cursor
214-
* @param options - Optional request options
215-
* @returns List of tasks with optional next cursor
216-
*
217164
* @experimental
218165
*/
219166
async listTasks(cursor?: string, options?: RequestOptions): Promise<ListTasksResult> {
220-
// Delegate to the client's underlying Protocol method
221-
return (
222-
this._client as unknown as {
223-
listTasks: (params?: { cursor?: string }, options?: RequestOptions) => Promise<ListTasksResult>;
224-
}
225-
).listTasks(cursor ? { cursor } : undefined, options);
167+
const tasks = this._client.tasks;
168+
if (!tasks) throw new McpError(ErrorCode.InternalError, 'Task support is not enabled');
169+
return tasks.listTasks(cursor ? { cursor } : undefined, options);
226170
}
227171

228172
/**
229173
* Cancels a running task.
230-
*
231-
* @param taskId - The task identifier
232-
* @param options - Optional request options
233-
*
234174
* @experimental
235175
*/
236176
async cancelTask(taskId: string, options?: RequestOptions): Promise<CancelTaskResult> {
237-
// Delegate to the client's underlying Protocol method
238-
return (
239-
this._client as unknown as {
240-
cancelTask: (params: { taskId: string }, options?: RequestOptions) => Promise<CancelTaskResult>;
241-
}
242-
).cancelTask({ taskId }, options);
177+
const tasks = this._client.tasks;
178+
if (!tasks) throw new McpError(ErrorCode.InternalError, 'Task support is not enabled');
179+
return tasks.cancelTask({ taskId }, options);
243180
}
244181

245182
/**
246183
* Sends a request and returns an AsyncGenerator that yields response messages.
247-
* The generator is guaranteed to end with either a 'result' or 'error' message.
248-
*
249-
* This method provides streaming access to request processing, allowing you to
250-
* observe intermediate task status updates for task-augmented requests.
251-
*
252-
* @param request - The request to send
253-
* @param resultSchema - Zod schema for validating the result
254-
* @param options - Optional request options (timeout, signal, task creation params, etc.)
255-
* @returns AsyncGenerator that yields ResponseMessage objects
256-
*
257184
* @experimental
258185
*/
259186
requestStream<T extends AnyObjectSchema>(
260187
request: ClientRequest | RequestT,
261188
resultSchema: T,
262189
options?: RequestOptions
263190
): AsyncGenerator<ResponseMessage<SchemaOutput<T>>, void, void> {
264-
// Delegate to the client's underlying Protocol method
265-
type ClientWithRequestStream = {
266-
requestStream<U extends AnyObjectSchema>(
267-
request: ClientRequest | RequestT,
268-
resultSchema: U,
269-
options?: RequestOptions
270-
): AsyncGenerator<ResponseMessage<SchemaOutput<U>>, void, void>;
271-
};
272-
return (this._client as unknown as ClientWithRequestStream).requestStream(request, resultSchema, options);
191+
const tasks = this._client.tasks;
192+
if (!tasks) throw new McpError(ErrorCode.InternalError, 'Task support is not enabled');
193+
return tasks.requestStream(request, resultSchema, options);
273194
}
274195
}

packages/core/src/shared/protocol.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,10 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
291291
reportError: (error) => this._onerror(error),
292292
removeProgressHandler: (token) => this._progressHandlers.delete(token)
293293
});
294-
this._registerTaskHandlers();
294+
// Only register task handlers if a task store is configured
295+
if (taskManager.taskStore) {
296+
this._registerTaskHandlers();
297+
}
295298
}
296299
}
297300

0 commit comments

Comments
 (0)