Skip to content

Commit 462c3fc

Browse files
v2 - RFC Extract Tasks out of protocol.ts into TaskManager (#1673)
Co-authored-by: Felix Weinberger <fweinberger@anthropic.com>
1 parent 7d9c72e commit 462c3fc

File tree

19 files changed

+2122
-1942
lines changed

19 files changed

+2122
-1942
lines changed

.changeset/extract-task-manager.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
"@modelcontextprotocol/core": minor
3+
"@modelcontextprotocol/client": minor
4+
"@modelcontextprotocol/server": minor
5+
---
6+
7+
refactor: extract task orchestration from Protocol into TaskManager
8+
9+
**Breaking changes:**
10+
- `taskStore`, `taskMessageQueue`, `defaultTaskPollInterval`, and `maxTaskQueueSize` moved from `ProtocolOptions` to `capabilities.tasks` on `ClientOptions`/`ServerOptions`

examples/client/src/simpleStreamableHttp.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,14 +265,14 @@ async function connect(url?: string): Promise<void> {
265265
form: {}
266266
},
267267
tasks: {
268+
taskStore: clientTaskStore,
268269
requests: {
269270
elicitation: {
270271
create: {}
271272
}
272273
}
273274
}
274-
},
275-
taskStore: clientTaskStore
275+
}
276276
}
277277
);
278278
client.onerror = error => {

examples/server/src/simpleStreamableHttp.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,14 @@ const getServer = () => {
4141
websiteUrl: 'https://github.com/modelcontextprotocol/typescript-sdk'
4242
},
4343
{
44-
capabilities: { logging: {}, tasks: { requests: { tools: { call: {} } } } },
45-
taskStore, // Enable task support
46-
taskMessageQueue: new InMemoryTaskMessageQueue()
44+
capabilities: {
45+
logging: {},
46+
tasks: {
47+
requests: { tools: { call: {} } },
48+
taskStore,
49+
taskMessageQueue: new InMemoryTaskMessageQueue()
50+
}
51+
}
4752
}
4853
);
4954

packages/client/src/client/client.ts

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import type {
3030
ResultTypeMap,
3131
ServerCapabilities,
3232
SubscribeRequest,
33+
TaskManagerOptions,
3334
Tool,
3435
Transport,
3536
UnsubscribeRequest
@@ -46,6 +47,7 @@ import {
4647
ElicitRequestSchema,
4748
ElicitResultSchema,
4849
EmptyResultSchema,
50+
extractTaskManagerOptions,
4951
GetPromptResultSchema,
5052
InitializeResultSchema,
5153
LATEST_PROTOCOL_VERSION,
@@ -140,11 +142,19 @@ export function getSupportedElicitationModes(capabilities: ClientCapabilities['e
140142
return { supportsFormMode, supportsUrlMode };
141143
}
142144

145+
/**
146+
* Extended tasks capability that includes runtime configuration (store, messageQueue).
147+
* The runtime-only fields are stripped before advertising capabilities to servers.
148+
*/
149+
export type ClientTasksCapabilityWithRuntime = NonNullable<ClientCapabilities['tasks']> & TaskManagerOptions;
150+
143151
export type ClientOptions = ProtocolOptions & {
144152
/**
145153
* Capabilities to advertise as being supported by this client.
146154
*/
147-
capabilities?: ClientCapabilities;
155+
capabilities?: Omit<ClientCapabilities, 'tasks'> & {
156+
tasks?: ClientTasksCapabilityWithRuntime;
157+
};
148158

149159
/**
150160
* JSON Schema validator for tool output validation.
@@ -213,11 +223,22 @@ export class Client extends Protocol<ClientContext> {
213223
private _clientInfo: Implementation,
214224
options?: ClientOptions
215225
) {
216-
super(options);
217-
this._capabilities = options?.capabilities ?? {};
226+
super({
227+
...options,
228+
tasks: extractTaskManagerOptions(options?.capabilities?.tasks)
229+
});
230+
this._capabilities = options?.capabilities ? { ...options.capabilities } : {};
218231
this._jsonSchemaValidator = options?.jsonSchemaValidator ?? new DefaultJsonSchemaValidator();
219232
this._enforceStrictCapabilities = options?.enforceStrictCapabilities ?? false;
220233

234+
// Strip runtime-only fields from advertised capabilities
235+
if (options?.capabilities?.tasks) {
236+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
237+
const { taskStore, taskMessageQueue, defaultTaskPollInterval, maxTaskQueueSize, ...wireCapabilities } =
238+
options.capabilities.tasks;
239+
this._capabilities.tasks = wireCapabilities;
240+
}
241+
221242
// Store list changed config for setup after connection (when we know server capabilities)
222243
if (options?.listChanged) {
223244
this._pendingListChangedConfig = options.listChanged;
@@ -650,12 +671,6 @@ export class Client extends Protocol<ClientContext> {
650671
}
651672

652673
protected assertRequestHandlerCapability(method: string): void {
653-
// Task handlers are registered in Protocol constructor before _capabilities is initialized
654-
// Skip capability check for task methods during initialization
655-
if (!this._capabilities) {
656-
return;
657-
}
658-
659674
switch (method) {
660675
case 'sampling/createMessage': {
661676
if (!this._capabilities.sampling) {
@@ -687,19 +702,6 @@ export class Client extends Protocol<ClientContext> {
687702
break;
688703
}
689704

690-
case 'tasks/get':
691-
case 'tasks/list':
692-
case 'tasks/result':
693-
case 'tasks/cancel': {
694-
if (!this._capabilities.tasks) {
695-
throw new SdkError(
696-
SdkErrorCode.CapabilityNotSupported,
697-
`Client does not support tasks capability (required for ${method})`
698-
);
699-
}
700-
break;
701-
}
702-
703705
case 'ping': {
704706
// No specific capability required for ping
705707
break;
@@ -712,16 +714,9 @@ export class Client extends Protocol<ClientContext> {
712714
}
713715

714716
protected assertTaskHandlerCapability(method: string): void {
715-
// Task handlers are registered in Protocol constructor before _capabilities is initialized
716-
// Skip capability check for task methods during initialization
717-
if (!this._capabilities) {
718-
return;
719-
}
720-
721-
assertClientRequestTaskCapability(this._capabilities.tasks?.requests, method, 'Client');
717+
assertClientRequestTaskCapability(this._capabilities?.tasks?.requests, method, 'Client');
722718
}
723719

724-
/** Sends a ping to the server to check connectivity. */
725720
async ping(options?: RequestOptions) {
726721
return this._requestWithSchema({ method: 'ping' }, EmptyResultSchema, options);
727722
}

packages/client/src/experimental/tasks/client.ts

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,27 @@
66
*/
77

88
import type {
9+
AnyObjectSchema,
910
CallToolRequest,
1011
CallToolResult,
1112
CancelTaskResult,
1213
CreateTaskResult,
1314
GetTaskPayloadResult,
1415
GetTaskResult,
1516
ListTasksResult,
17+
Request,
1618
RequestMethod,
1719
RequestOptions,
1820
ResponseMessage,
1921
ResultTypeMap
2022
} from '@modelcontextprotocol/core';
21-
import { GetTaskPayloadResultSchema, ProtocolError, ProtocolErrorCode } from '@modelcontextprotocol/core';
23+
import {
24+
CallToolResultSchema,
25+
getResultSchema,
26+
GetTaskPayloadResultSchema,
27+
ProtocolError,
28+
ProtocolErrorCode
29+
} from '@modelcontextprotocol/core';
2230

2331
import type { Client } from '../../client/client.js';
2432

@@ -27,10 +35,6 @@ import type { Client } from '../../client/client.js';
2735
* @internal
2836
*/
2937
interface ClientInternal {
30-
requestStream<M extends RequestMethod>(
31-
request: { method: M; params?: Record<string, unknown> },
32-
options?: RequestOptions
33-
): AsyncGenerator<ResponseMessage<ResultTypeMap[M]>, void, void>;
3438
isToolTask(toolName: string): boolean;
3539
getToolOutputValidator(toolName: string): ((data: unknown) => { valid: boolean; errorMessage?: string }) | undefined;
3640
}
@@ -49,6 +53,10 @@ interface ClientInternal {
4953
export class ExperimentalClientTasks {
5054
constructor(private readonly _client: Client) {}
5155

56+
private get _module() {
57+
return this._client.taskManager;
58+
}
59+
5260
/**
5361
* Calls a tool and returns an AsyncGenerator that yields response messages.
5462
* The generator is guaranteed to end with either a `'result'` or `'error'` message.
@@ -103,7 +111,7 @@ export class ExperimentalClientTasks {
103111
task: options?.task ?? (clientInternal.isToolTask(params.name) ? {} : undefined)
104112
};
105113

106-
const stream = clientInternal.requestStream({ method: 'tools/call', params }, optionsWithTask);
114+
const stream = this._module.requestStream({ method: 'tools/call', params }, CallToolResultSchema, optionsWithTask);
107115

108116
// Get the validator for this tool (if it has an output schema)
109117
const validator = clientInternal.getToolOutputValidator(params.name);
@@ -175,9 +183,7 @@ export class ExperimentalClientTasks {
175183
* @experimental
176184
*/
177185
async getTask(taskId: string, options?: RequestOptions): Promise<GetTaskResult> {
178-
// Delegate to the client's underlying Protocol method
179-
type ClientWithGetTask = { getTask(params: { taskId: string }, options?: RequestOptions): Promise<GetTaskResult> };
180-
return (this._client as unknown as ClientWithGetTask).getTask({ taskId }, options);
186+
return this._module.getTask({ taskId }, options);
181187
}
182188

183189
/**
@@ -191,15 +197,7 @@ export class ExperimentalClientTasks {
191197
* @experimental
192198
*/
193199
async getTaskResult(taskId: string, options?: RequestOptions): Promise<GetTaskPayloadResult> {
194-
return (
195-
this._client as unknown as {
196-
getTaskResult: (
197-
params: { taskId: string },
198-
resultSchema: typeof GetTaskPayloadResultSchema,
199-
options?: RequestOptions
200-
) => Promise<GetTaskPayloadResult>;
201-
}
202-
).getTaskResult({ taskId }, GetTaskPayloadResultSchema, options);
200+
return this._module.getTaskResult({ taskId }, GetTaskPayloadResultSchema, options);
203201
}
204202

205203
/**
@@ -212,12 +210,7 @@ export class ExperimentalClientTasks {
212210
* @experimental
213211
*/
214212
async listTasks(cursor?: string, options?: RequestOptions): Promise<ListTasksResult> {
215-
// Delegate to the client's underlying Protocol method
216-
return (
217-
this._client as unknown as {
218-
listTasks: (params?: { cursor?: string }, options?: RequestOptions) => Promise<ListTasksResult>;
219-
}
220-
).listTasks(cursor ? { cursor } : undefined, options);
213+
return this._module.listTasks(cursor ? { cursor } : undefined, options);
221214
}
222215

223216
/**
@@ -229,12 +222,7 @@ export class ExperimentalClientTasks {
229222
* @experimental
230223
*/
231224
async cancelTask(taskId: string, options?: RequestOptions): Promise<CancelTaskResult> {
232-
// Delegate to the client's underlying Protocol method
233-
return (
234-
this._client as unknown as {
235-
cancelTask: (params: { taskId: string }, options?: RequestOptions) => Promise<CancelTaskResult>;
236-
}
237-
).cancelTask({ taskId }, options);
225+
return this._module.cancelTask({ taskId }, options);
238226
}
239227

240228
/**
@@ -279,7 +267,11 @@ export class ExperimentalClientTasks {
279267
request: { method: M; params?: Record<string, unknown> },
280268
options?: RequestOptions
281269
): AsyncGenerator<ResponseMessage<ResultTypeMap[M]>, void, void> {
282-
// Delegate to the client's underlying Protocol method
283-
return (this._client as unknown as ClientInternal).requestStream(request, options);
270+
const resultSchema = getResultSchema(request.method) as unknown as AnyObjectSchema;
271+
return this._module.requestStream(request as Request, resultSchema, options) as AsyncGenerator<
272+
ResponseMessage<ResultTypeMap[M]>,
273+
void,
274+
void
275+
>;
284276
}
285277
}

packages/core/src/experimental/tasks/helpers.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ interface TaskRequestsCapability {
1717

1818
/**
1919
* Asserts that task creation is supported for `tools/call`.
20-
* Used by {@linkcode @modelcontextprotocol/client!client/client.Client.assertTaskCapability | Client.assertTaskCapability} and {@linkcode @modelcontextprotocol/server!server/server.Server.assertTaskHandlerCapability | Server.assertTaskHandlerCapability}.
20+
* Used to implement the `assertTaskCapability` or `assertTaskHandlerCapability` abstract methods on Protocol.
2121
*
2222
* @param requests - The task requests capability object
2323
* @param method - The method being checked
@@ -52,7 +52,7 @@ export function assertToolsCallTaskCapability(
5252

5353
/**
5454
* Asserts that task creation is supported for `sampling/createMessage` or `elicitation/create`.
55-
* Used by {@linkcode @modelcontextprotocol/server!server/server.Server.assertTaskCapability | Server.assertTaskCapability} and {@linkcode @modelcontextprotocol/client!client/client.Client.assertTaskHandlerCapability | Client.assertTaskHandlerCapability}.
55+
* Used to implement the `assertTaskCapability` or `assertTaskHandlerCapability` abstract methods on Protocol.
5656
*
5757
* @param requests - The task requests capability object
5858
* @param method - The method being checked

packages/core/src/experimental/tasks/interfaces.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
* WARNING: These APIs are experimental and may change without notice.
44
*/
55

6-
import type { RequestTaskStore, ServerContext } from '../../shared/protocol.js';
6+
import type { ServerContext } from '../../shared/protocol.js';
7+
import type { RequestTaskStore } from '../../shared/taskManager.js';
78
import type {
89
JSONRPCErrorResponse,
910
JSONRPCNotification,

packages/core/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ export * from './shared/metadataUtils.js';
66
export * from './shared/protocol.js';
77
export * from './shared/responseMessage.js';
88
export * from './shared/stdio.js';
9+
export type { RequestTaskStore, TaskContext, TaskManagerOptions, TaskRequestOptions } from './shared/taskManager.js';
10+
export { extractTaskManagerOptions, NullTaskManager, TaskManager } from './shared/taskManager.js';
911
export * from './shared/toolNameValidation.js';
1012
export * from './shared/transport.js';
1113
export * from './shared/uriTemplate.js';

0 commit comments

Comments
 (0)