Skip to content

Commit ca726c5

Browse files
committed
tasks extraction - execute on capability declaration, accept task store and task MQ on constructor
1 parent 108f2f3 commit ca726c5

18 files changed

Lines changed: 2395 additions & 1922 deletions

File tree

.changeset/extract-task-manager.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
---
2+
"@modelcontextprotocol/core": minor
3+
"@modelcontextprotocol/client": minor
4+
"@modelcontextprotocol/server": minor
5+
---
6+
7+
refactor: extract task orchestration from Protocol into TaskManager
8+
9+
**Breaking changes:**
10+
- `extra.taskId``extra.task?.taskId`
11+
- `extra.taskStore``extra.task?.taskStore`
12+
- `extra.taskRequestedTtl``extra.task?.requestedTtl`
13+
- `ProtocolOptions` no longer accepts `taskStore`/`taskMessageQueue` — pass via `TaskManagerOptions` in `ClientOptions`/`ServerOptions`
14+
- Abstract methods `assertTaskCapability`/`assertTaskHandlerCapability` removed from Protocol

examples/server/src/simpleStreamableHttp.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,14 @@ const getServer = () => {
4141
websiteUrl: 'https://github.com/modelcontextprotocol/typescript-sdk'
4242
},
4343
{
44-
capabilities: { logging: {}, tasks: { requests: { tools: { call: {} } } } },
45-
taskStore, // Enable task support
46-
taskMessageQueue: new InMemoryTaskMessageQueue()
44+
capabilities: {
45+
logging: {},
46+
tasks: {
47+
requests: { tools: { call: {} } },
48+
taskStore,
49+
taskMessageQueue: new InMemoryTaskMessageQueue()
50+
}
51+
}
4752
}
4853
);
4954

packages/client/src/client/client.ts

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import type {
3030
ResultTypeMap,
3131
ServerCapabilities,
3232
SubscribeRequest,
33+
TaskManagerOptions,
3334
Tool,
3435
Transport,
3536
UnsubscribeRequest
@@ -61,7 +62,8 @@ import {
6162
ProtocolErrorCode,
6263
ReadResourceResultSchema,
6364
SdkError,
64-
SdkErrorCode
65+
SdkErrorCode,
66+
TaskManager
6567
} from '@modelcontextprotocol/core';
6668

6769
import { ExperimentalClientTasks } from '../experimental/tasks/client.js';
@@ -140,11 +142,20 @@ export function getSupportedElicitationModes(capabilities: ClientCapabilities['e
140142
return { supportsFormMode, supportsUrlMode };
141143
}
142144

145+
/**
146+
* Extended tasks capability that includes runtime configuration (store, messageQueue).
147+
* The runtime-only fields are stripped before advertising capabilities to servers.
148+
*/
149+
export type ClientTasksCapabilityWithRuntime = NonNullable<ClientCapabilities['tasks']> &
150+
Pick<TaskManagerOptions, 'taskStore' | 'taskMessageQueue'>;
151+
143152
export type ClientOptions = ProtocolOptions & {
144153
/**
145154
* Capabilities to advertise as being supported by this client.
146155
*/
147-
capabilities?: ClientCapabilities;
156+
capabilities?: Omit<ClientCapabilities, 'tasks'> & {
157+
tasks?: ClientTasksCapabilityWithRuntime;
158+
};
148159

149160
/**
150161
* JSON Schema validator for tool output validation.
@@ -204,6 +215,7 @@ export class Client extends Protocol<ClientContext> {
204215
private _listChangedDebounceTimers: Map<string, ReturnType<typeof setTimeout>> = new Map();
205216
private _pendingListChangedConfig?: ListChangedHandlers;
206217
private _enforceStrictCapabilities: boolean;
218+
private _taskModule?: TaskManager;
207219

208220
/**
209221
* Initializes this client with the given name and version information.
@@ -213,16 +225,38 @@ export class Client extends Protocol<ClientContext> {
213225
options?: ClientOptions
214226
) {
215227
super(options);
216-
this._capabilities = options?.capabilities ?? {};
228+
this._capabilities = options?.capabilities ? { ...options.capabilities } : {};
217229
this._jsonSchemaValidator = options?.jsonSchemaValidator ?? new DefaultJsonSchemaValidator();
218230
this._enforceStrictCapabilities = options?.enforceStrictCapabilities ?? false;
219231

232+
// If tasks capability is declared, create and register the task module
233+
if (options?.capabilities?.tasks) {
234+
const { taskStore, taskMessageQueue, ...wireCapabilities } = options.capabilities.tasks;
235+
// Strip runtime-only config from advertised capabilities
236+
this._capabilities.tasks = wireCapabilities;
237+
this._taskModule = new TaskManager({
238+
taskStore,
239+
taskMessageQueue,
240+
assertTaskCapability: method => assertToolsCallTaskCapability(this._serverCapabilities?.tasks?.requests, method, 'Server'),
241+
assertTaskHandlerCapability: method =>
242+
assertClientRequestTaskCapability(this._capabilities.tasks?.requests, method, 'Client')
243+
});
244+
this.registerModule(this._taskModule);
245+
}
246+
220247
// Store list changed config for setup after connection (when we know server capabilities)
221248
if (options?.listChanged) {
222249
this._pendingListChangedConfig = options.listChanged;
223250
}
224251
}
225252

253+
/**
254+
* Access the task module, if tasks capability is configured.
255+
*/
256+
get taskModule(): TaskManager | undefined {
257+
return this._taskModule;
258+
}
259+
226260
protected override buildContext(ctx: BaseContext, _transportInfo?: MessageExtraInfo): ClientContext {
227261
return ctx;
228262
}
@@ -635,12 +669,6 @@ export class Client extends Protocol<ClientContext> {
635669
}
636670

637671
protected assertRequestHandlerCapability(method: string): void {
638-
// Task handlers are registered in Protocol constructor before _capabilities is initialized
639-
// Skip capability check for task methods during initialization
640-
if (!this._capabilities) {
641-
return;
642-
}
643-
644672
switch (method) {
645673
case 'sampling/createMessage': {
646674
if (!this._capabilities.sampling) {
@@ -672,41 +700,13 @@ export class Client extends Protocol<ClientContext> {
672700
break;
673701
}
674702

675-
case 'tasks/get':
676-
case 'tasks/list':
677-
case 'tasks/result':
678-
case 'tasks/cancel': {
679-
if (!this._capabilities.tasks) {
680-
throw new SdkError(
681-
SdkErrorCode.CapabilityNotSupported,
682-
`Client does not support tasks capability (required for ${method})`
683-
);
684-
}
685-
break;
686-
}
687-
688703
case 'ping': {
689704
// No specific capability required for ping
690705
break;
691706
}
692707
}
693708
}
694709

695-
protected assertTaskCapability(method: string): void {
696-
assertToolsCallTaskCapability(this._serverCapabilities?.tasks?.requests, method, 'Server');
697-
}
698-
699-
protected assertTaskHandlerCapability(method: string): void {
700-
// Task handlers are registered in Protocol constructor before _capabilities is initialized
701-
// Skip capability check for task methods during initialization
702-
if (!this._capabilities) {
703-
return;
704-
}
705-
706-
assertClientRequestTaskCapability(this._capabilities.tasks?.requests, method, 'Client');
707-
}
708-
709-
/** Sends a ping to the server to check connectivity. */
710710
async ping(options?: RequestOptions) {
711711
return this._requestWithSchema({ method: 'ping' }, EmptyResultSchema, options);
712712
}

packages/client/src/experimental/tasks/client.ts

Lines changed: 21 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ import type {
1919
ResultTypeMap,
2020
SchemaOutput
2121
} from '@modelcontextprotocol/core';
22-
import { ProtocolError, ProtocolErrorCode } from '@modelcontextprotocol/core';
22+
import { CallToolResultSchema, getResultSchema, ProtocolError, ProtocolErrorCode } from '@modelcontextprotocol/core';
23+
import type { Request } from '@modelcontextprotocol/core';
2324

2425
import type { Client } from '../../client/client.js';
2526

@@ -28,10 +29,6 @@ import type { Client } from '../../client/client.js';
2829
* @internal
2930
*/
3031
interface ClientInternal {
31-
requestStream<M extends RequestMethod>(
32-
request: { method: M; params?: Record<string, unknown> },
33-
options?: RequestOptions
34-
): AsyncGenerator<ResponseMessage<ResultTypeMap[M]>, void, void>;
3532
isToolTask(toolName: string): boolean;
3633
getToolOutputValidator(toolName: string): ((data: unknown) => { valid: boolean; errorMessage?: string }) | undefined;
3734
}
@@ -50,6 +47,14 @@ interface ClientInternal {
5047
export class ExperimentalClientTasks {
5148
constructor(private readonly _client: Client) {}
5249

50+
private get _module() {
51+
const module = this._client.taskModule;
52+
if (!module) {
53+
throw new Error('Tasks capability is not configured. Declare tasks in capabilities to use task features.');
54+
}
55+
return module;
56+
}
57+
5358
/**
5459
* Calls a tool and returns an AsyncGenerator that yields response messages.
5560
* The generator is guaranteed to end with either a `'result'` or `'error'` message.
@@ -104,7 +109,7 @@ export class ExperimentalClientTasks {
104109
task: options?.task ?? (clientInternal.isToolTask(params.name) ? {} : undefined)
105110
};
106111

107-
const stream = clientInternal.requestStream({ method: 'tools/call', params }, optionsWithTask);
112+
const stream = this._module.requestStream({ method: 'tools/call', params }, CallToolResultSchema, optionsWithTask);
108113

109114
// Get the validator for this tool (if it has an output schema)
110115
const validator = clientInternal.getToolOutputValidator(params.name);
@@ -176,9 +181,7 @@ export class ExperimentalClientTasks {
176181
* @experimental
177182
*/
178183
async getTask(taskId: string, options?: RequestOptions): Promise<GetTaskResult> {
179-
// Delegate to the client's underlying Protocol method
180-
type ClientWithGetTask = { getTask(params: { taskId: string }, options?: RequestOptions): Promise<GetTaskResult> };
181-
return (this._client as unknown as ClientWithGetTask).getTask({ taskId }, options);
184+
return this._module.getTask({ taskId }, options);
182185
}
183186

184187
/**
@@ -192,16 +195,7 @@ export class ExperimentalClientTasks {
192195
* @experimental
193196
*/
194197
async getTaskResult<T extends AnyObjectSchema>(taskId: string, resultSchema?: T, options?: RequestOptions): Promise<SchemaOutput<T>> {
195-
// Delegate to the client's underlying Protocol method
196-
return (
197-
this._client as unknown as {
198-
getTaskResult: <U extends AnyObjectSchema>(
199-
params: { taskId: string },
200-
resultSchema?: U,
201-
options?: RequestOptions
202-
) => Promise<SchemaOutput<U>>;
203-
}
204-
).getTaskResult({ taskId }, resultSchema, options);
198+
return this._module.getTaskResult({ taskId }, resultSchema!, options);
205199
}
206200

207201
/**
@@ -214,12 +208,7 @@ export class ExperimentalClientTasks {
214208
* @experimental
215209
*/
216210
async listTasks(cursor?: string, options?: RequestOptions): Promise<ListTasksResult> {
217-
// Delegate to the client's underlying Protocol method
218-
return (
219-
this._client as unknown as {
220-
listTasks: (params?: { cursor?: string }, options?: RequestOptions) => Promise<ListTasksResult>;
221-
}
222-
).listTasks(cursor ? { cursor } : undefined, options);
211+
return this._module.listTasks(cursor ? { cursor } : undefined, options);
223212
}
224213

225214
/**
@@ -231,12 +220,7 @@ export class ExperimentalClientTasks {
231220
* @experimental
232221
*/
233222
async cancelTask(taskId: string, options?: RequestOptions): Promise<CancelTaskResult> {
234-
// Delegate to the client's underlying Protocol method
235-
return (
236-
this._client as unknown as {
237-
cancelTask: (params: { taskId: string }, options?: RequestOptions) => Promise<CancelTaskResult>;
238-
}
239-
).cancelTask({ taskId }, options);
223+
return this._module.cancelTask({ taskId }, options);
240224
}
241225

242226
/**
@@ -281,7 +265,11 @@ export class ExperimentalClientTasks {
281265
request: { method: M; params?: Record<string, unknown> },
282266
options?: RequestOptions
283267
): AsyncGenerator<ResponseMessage<ResultTypeMap[M]>, void, void> {
284-
// Delegate to the client's underlying Protocol method
285-
return (this._client as unknown as ClientInternal).requestStream(request, options);
268+
const resultSchema = getResultSchema(request.method) as unknown as AnyObjectSchema;
269+
return this._module.requestStream(request as Request, resultSchema, options) as AsyncGenerator<
270+
ResponseMessage<ResultTypeMap[M]>,
271+
void,
272+
void
273+
>;
286274
}
287275
}

packages/core/src/experimental/tasks/interfaces.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
* WARNING: These APIs are experimental and may change without notice.
44
*/
55

6-
import type { RequestTaskStore, ServerContext } from '../../shared/protocol.js';
6+
import type { ServerContext } from '../../shared/protocol.js';
7+
import type { RequestTaskStore } from '../../shared/taskManager.js';
78
import type {
89
JSONRPCErrorResponse,
910
JSONRPCNotification,

packages/core/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ export * from './shared/auth.js';
44
export * from './shared/authUtils.js';
55
export * from './shared/metadataUtils.js';
66
export * from './shared/protocol.js';
7+
export * from './shared/protocolModule.js';
78
export * from './shared/responseMessage.js';
89
export * from './shared/stdio.js';
10+
export * from './shared/taskManager.js';
911
export * from './shared/toolNameValidation.js';
1012
export * from './shared/transport.js';
1113
export * from './shared/uriTemplate.js';

0 commit comments

Comments
 (0)