Skip to content

Commit 135969f

Browse files
committed
clean up
1 parent bcda271 commit 135969f

10 files changed

Lines changed: 170 additions & 193 deletions

File tree

packages/client/src/client/client.ts

Lines changed: 17 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import type {
3838
import {
3939
assertClientRequestTaskCapability,
4040
assertToolsCallTaskCapability,
41+
extractTaskManagerOptions,
4142
CallToolResultSchema,
4243
CompleteResultSchema,
4344
CreateMessageRequestSchema,
@@ -56,15 +57,13 @@ import {
5657
ListResourceTemplatesResultSchema,
5758
ListToolsResultSchema,
5859
mergeCapabilities,
59-
NullTaskManager,
6060
parseSchema,
6161
Protocol,
6262
ProtocolError,
6363
ProtocolErrorCode,
6464
ReadResourceResultSchema,
6565
SdkError,
66-
SdkErrorCode,
67-
TaskManager
66+
SdkErrorCode
6867
} from '@modelcontextprotocol/core';
6968

7069
import { ExperimentalClientTasks } from '../experimental/tasks/client.js';
@@ -147,8 +146,7 @@ export function getSupportedElicitationModes(capabilities: ClientCapabilities['e
147146
* Extended tasks capability that includes runtime configuration (store, messageQueue).
148147
* The runtime-only fields are stripped before advertising capabilities to servers.
149148
*/
150-
export type ClientTasksCapabilityWithRuntime = NonNullable<ClientCapabilities['tasks']> &
151-
Pick<TaskManagerOptions, 'taskStore' | 'taskMessageQueue' | 'defaultTaskPollInterval' | 'maxTaskQueueSize'>;
149+
export type ClientTasksCapabilityWithRuntime = NonNullable<ClientCapabilities['tasks']> & TaskManagerOptions;
152150

153151
export type ClientOptions = ProtocolOptions & {
154152
/**
@@ -216,7 +214,6 @@ export class Client extends Protocol<ClientContext> {
216214
private _listChangedDebounceTimers: Map<string, ReturnType<typeof setTimeout>> = new Map();
217215
private _pendingListChangedConfig?: ListChangedHandlers;
218216
private _enforceStrictCapabilities: boolean;
219-
private _taskModule: TaskManager;
220217

221218
/**
222219
* Initializes this client with the given name and version information.
@@ -225,45 +222,28 @@ export class Client extends Protocol<ClientContext> {
225222
private _clientInfo: Implementation,
226223
options?: ClientOptions
227224
) {
228-
super(options);
225+
super({
226+
...options,
227+
tasks: extractTaskManagerOptions(options?.capabilities?.tasks)
228+
});
229229
this._capabilities = options?.capabilities ? { ...options.capabilities } : {};
230230
this._jsonSchemaValidator = options?.jsonSchemaValidator ?? new DefaultJsonSchemaValidator();
231231
this._enforceStrictCapabilities = options?.enforceStrictCapabilities ?? false;
232232

233-
// Always create TaskManager (NullTaskManager pattern for streaming support)
233+
// Strip runtime-only fields from advertised capabilities
234234
if (options?.capabilities?.tasks) {
235+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
235236
const { taskStore, taskMessageQueue, defaultTaskPollInterval, maxTaskQueueSize, ...wireCapabilities } =
236237
options.capabilities.tasks;
237-
// Strip runtime-only config from advertised capabilities
238238
this._capabilities.tasks = wireCapabilities;
239-
this._taskModule = new TaskManager({
240-
taskStore,
241-
taskMessageQueue,
242-
defaultTaskPollInterval,
243-
maxTaskQueueSize,
244-
enforceStrictCapabilities: options?.enforceStrictCapabilities,
245-
assertTaskCapability: method => assertToolsCallTaskCapability(this._serverCapabilities?.tasks?.requests, method, 'Server'),
246-
assertTaskHandlerCapability: method =>
247-
assertClientRequestTaskCapability(this._capabilities.tasks?.requests, method, 'Client')
248-
});
249-
} else {
250-
this._taskModule = new NullTaskManager();
251239
}
252-
this.setTaskManager(this._taskModule);
253240

254241
// Store list changed config for setup after connection (when we know server capabilities)
255242
if (options?.listChanged) {
256243
this._pendingListChangedConfig = options.listChanged;
257244
}
258245
}
259246

260-
/**
261-
* Access the task module.
262-
*/
263-
get taskModule(): TaskManager {
264-
return this._taskModule;
265-
}
266-
267247
protected override buildContext(ctx: BaseContext, _transportInfo?: MessageExtraInfo): ClientContext {
268248
return ctx;
269249
}
@@ -714,6 +694,14 @@ export class Client extends Protocol<ClientContext> {
714694
}
715695
}
716696

697+
protected assertTaskCapability(method: string): void {
698+
assertToolsCallTaskCapability(this._serverCapabilities?.tasks?.requests, method, 'Server');
699+
}
700+
701+
protected assertTaskHandlerCapability(method: string): void {
702+
assertClientRequestTaskCapability(this._capabilities?.tasks?.requests, method, 'Client');
703+
}
704+
717705
async ping(options?: RequestOptions) {
718706
return this._requestWithSchema({ method: 'ping' }, EmptyResultSchema, options);
719707
}

packages/client/src/experimental/tasks/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export class ExperimentalClientTasks {
4848
constructor(private readonly _client: Client) {}
4949

5050
private get _module() {
51-
return this._client.taskModule;
51+
return this._client.taskManager;
5252
}
5353

5454
/**

packages/core/src/experimental/tasks/helpers.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ interface TaskRequestsCapability {
1717

1818
/**
1919
* Asserts that task creation is supported for `tools/call`.
20-
* Used as the `assertTaskCapability` or `assertTaskHandlerCapability` callback in `TaskManagerOptions`.
20+
* Used to implement the `assertTaskCapability` or `assertTaskHandlerCapability` abstract methods on Protocol.
2121
*
2222
* @param requests - The task requests capability object
2323
* @param method - The method being checked
@@ -52,7 +52,7 @@ export function assertToolsCallTaskCapability(
5252

5353
/**
5454
* Asserts that task creation is supported for `sampling/createMessage` or `elicitation/create`.
55-
* Used as the `assertTaskCapability` or `assertTaskHandlerCapability` callback in `TaskManagerOptions`.
55+
* Used to implement the `assertTaskCapability` or `assertTaskHandlerCapability` abstract methods on Protocol.
5656
*
5757
* @param requests - The task requests capability object
5858
* @param method - The method being checked

packages/core/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ export * from './shared/protocol.js';
77
export * from './shared/responseMessage.js';
88
export * from './shared/stdio.js';
99
export type { RequestTaskStore, TaskContext, TaskManagerOptions, TaskRequestOptions } from './shared/taskManager.js';
10-
export { NullTaskManager, TaskManager } from './shared/taskManager.js';
10+
export { extractTaskManagerOptions, NullTaskManager, TaskManager } from './shared/taskManager.js';
1111
export * from './shared/toolNameValidation.js';
1212
export * from './shared/transport.js';
1313
export * from './shared/uriTemplate.js';

packages/core/src/shared/protocol.ts

Lines changed: 72 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ import {
4747
} from '../types/types.js';
4848
import type { AnySchema, SchemaOutput } from '../util/schema.js';
4949
import { parseSchema } from '../util/schema.js';
50-
import type { TaskContext, TaskManager, TaskManagerHost, TaskRequestOptions } from './taskManager.js';
50+
import type { TaskContext, TaskManagerHost, TaskManagerOptions, TaskRequestOptions } from './taskManager.js';
51+
import { NullTaskManager, TaskManager } from './taskManager.js';
5152
import type { Transport, TransportSendOptions } from './transport.js';
5253

5354
/**
@@ -82,6 +83,17 @@ export type ProtocolOptions = {
8283
* e.g., `['notifications/tools/list_changed']`
8384
*/
8485
debouncedNotificationMethods?: string[];
86+
87+
/**
88+
* Runtime configuration for task management.
89+
* If provided, creates a TaskManager with the given options; otherwise a NullTaskManager is used.
90+
*
91+
* Capability assertions are wired automatically from the abstract
92+
* {@linkcode Protocol.assertTaskCapability | assertTaskCapability()} and
93+
* {@linkcode Protocol.assertTaskHandlerCapability | assertTaskHandlerCapability()} methods,
94+
* so they should NOT be included here.
95+
*/
96+
tasks?: TaskManagerOptions;
8597
};
8698

8799
/**
@@ -281,10 +293,6 @@ type TimeoutInfo = {
281293
onTimeout: () => void;
282294
};
283295

284-
async function _noopRouteResponse(): Promise<boolean> {
285-
return false;
286-
}
287-
288296
/**
289297
* Implements MCP protocol framing on top of a pluggable transport, including
290298
* features like request/response linking, notifications, and progress.
@@ -300,7 +308,7 @@ export abstract class Protocol<ContextT extends BaseContext> {
300308
private _timeoutInfo: Map<number, TimeoutInfo> = new Map();
301309
private _pendingDebouncedNotifications = new Set<string>();
302310

303-
private _taskManager?: TaskManager;
311+
private _taskManager: TaskManager;
304312

305313
protected _supportedProtocolVersions: string[];
306314

@@ -331,6 +339,10 @@ export abstract class Protocol<ContextT extends BaseContext> {
331339
constructor(private _options?: ProtocolOptions) {
332340
this._supportedProtocolVersions = _options?.supportedProtocolVersions ?? SUPPORTED_PROTOCOL_VERSIONS;
333341

342+
// Create TaskManager from protocol options
343+
this._taskManager = _options?.tasks ? new TaskManager(_options.tasks) : new NullTaskManager();
344+
this._bindTaskManager();
345+
334346
this.setNotificationHandler('notifications/cancelled', notification => {
335347
this._oncancel(notification);
336348
});
@@ -347,11 +359,15 @@ export abstract class Protocol<ContextT extends BaseContext> {
347359
}
348360

349361
/**
350-
* Sets the TaskManager that hooks into the message lifecycle.
351-
* The TaskManager is bound to this Protocol and can register handlers, send messages, etc.
362+
* Access the TaskManager for task orchestration.
363+
* Always available; returns a NullTaskManager when no task store is configured.
352364
*/
353-
protected setTaskManager(taskManager: TaskManager): void {
354-
this._taskManager = taskManager;
365+
get taskManager(): TaskManager {
366+
return this._taskManager;
367+
}
368+
369+
private _bindTaskManager(): void {
370+
const taskManager = this._taskManager;
355371
const host: TaskManagerHost = {
356372
request: (request, resultSchema, options) => this._requestWithSchema(request, resultSchema, options),
357373
notification: (notification, options) => this.notification(notification, options),
@@ -367,7 +383,10 @@ export abstract class Protocol<ContextT extends BaseContext> {
367383
},
368384
sendOnResponseStream: async (message, relatedRequestId) => {
369385
await this._transport?.send(message, { relatedRequestId });
370-
}
386+
},
387+
enforceStrictCapabilities: this._options?.enforceStrictCapabilities === true,
388+
assertTaskCapability: method => this.assertTaskCapability(method),
389+
assertTaskHandlerCapability: method => this.assertTaskHandlerCapability(method)
371390
};
372391
taskManager.bind(host);
373392
}
@@ -473,7 +492,7 @@ export abstract class Protocol<ContextT extends BaseContext> {
473492
const responseHandlers = this._responseHandlers;
474493
this._responseHandlers = new Map();
475494
this._progressHandlers.clear();
476-
this._taskManager?.onClose();
495+
this._taskManager.onClose();
477496
this._pendingDebouncedNotifications.clear();
478497

479498
const error = new SdkError(SdkErrorCode.ConnectionClosed, 'Connection closed');
@@ -519,22 +538,14 @@ export abstract class Protocol<ContextT extends BaseContext> {
519538
this._requestWithSchema(r, resultSchema, { ...options, relatedRequestId: request.id })
520539
};
521540

522-
// Compose results from all modules
523-
let sendNotification: (notification: Notification) => Promise<void> = (notification: Notification) =>
524-
inboundCtx.sendNotification(notification);
525-
let sendRequest = inboundCtx.sendRequest;
526-
let routeResponse: (message: JSONRPCResponse | JSONRPCErrorResponse) => Promise<boolean> = _noopRouteResponse;
527-
let taskContext: BaseContext['task'] | undefined;
541+
// Delegate to TaskManager for task context, wrapped send/notify, and response routing
542+
const taskResult = this._taskManager.processInboundRequest(request, inboundCtx);
543+
const sendNotification = taskResult.sendNotification;
544+
const sendRequest = taskResult.sendRequest;
545+
const taskContext = taskResult.taskContext;
546+
const routeResponse = taskResult.routeResponse;
528547
const validators: Array<() => void> = [];
529-
530-
if (this._taskManager) {
531-
const taskResult = this._taskManager.processInboundRequest(request, inboundCtx);
532-
sendNotification = taskResult.sendNotification;
533-
sendRequest = taskResult.sendRequest;
534-
if (taskResult.taskContext !== undefined) taskContext = taskResult.taskContext;
535-
if (taskResult.validateInbound) validators.push(taskResult.validateInbound);
536-
routeResponse = taskResult.routeResponse;
537-
}
548+
if (taskResult.validateInbound) validators.push(taskResult.validateInbound);
538549

539550
if (handler === undefined) {
540551
const errorResponse: JSONRPCErrorResponse = {
@@ -669,12 +680,9 @@ export abstract class Protocol<ContextT extends BaseContext> {
669680
const messageId = Number(response.id);
670681

671682
// Delegate to TaskManager for task-related response handling
672-
let preserveProgress = false;
673-
if (this._taskManager) {
674-
const taskResult = this._taskManager.processInboundResponse(response, messageId);
675-
if (taskResult.consumed) return;
676-
preserveProgress = taskResult.preserveProgress;
677-
}
683+
const taskResult = this._taskManager.processInboundResponse(response, messageId);
684+
if (taskResult.consumed) return;
685+
const preserveProgress = taskResult.preserveProgress;
678686

679687
const handler = this._responseHandlers.get(messageId);
680688
if (handler === undefined) {
@@ -730,6 +738,22 @@ export abstract class Protocol<ContextT extends BaseContext> {
730738
*/
731739
protected abstract assertRequestHandlerCapability(method: string): void;
732740

741+
/**
742+
* A method to check if the remote side supports task creation for the given method.
743+
*
744+
* Called when sending a task-augmented outbound request (only when enforceStrictCapabilities is true).
745+
* This should be implemented by subclasses.
746+
*/
747+
protected abstract assertTaskCapability(method: string): void;
748+
749+
/**
750+
* A method to check if this side supports handling task creation for the given method.
751+
*
752+
* Called when receiving a task-augmented inbound request.
753+
* This should be implemented by subclasses.
754+
*/
755+
protected abstract assertTaskHandlerCapability(method: string): void;
756+
733757
/**
734758
* Sends a request and waits for a response, resolving the result schema
735759
* automatically from the method name.
@@ -862,28 +886,20 @@ export abstract class Protocol<ContextT extends BaseContext> {
862886
};
863887

864888
let outboundQueued = false;
865-
if (this._taskManager) {
866-
try {
867-
const taskResult = this._taskManager.processOutboundRequest(
868-
jsonrpcRequest,
869-
options,
870-
messageId,
871-
responseHandler,
872-
error => {
873-
this._cleanupTimeout(messageId);
874-
reject(error);
875-
}
876-
);
877-
if (taskResult.queued) {
878-
outboundQueued = true;
879-
}
880-
} catch (error) {
881-
this._responseHandlers.delete(messageId);
882-
this._progressHandlers.delete(messageId);
889+
try {
890+
const taskResult = this._taskManager.processOutboundRequest(jsonrpcRequest, options, messageId, responseHandler, error => {
883891
this._cleanupTimeout(messageId);
884892
reject(error);
885-
return;
893+
});
894+
if (taskResult.queued) {
895+
outboundQueued = true;
886896
}
897+
} catch (error) {
898+
this._responseHandlers.delete(messageId);
899+
this._progressHandlers.delete(messageId);
900+
this._cleanupTimeout(messageId);
901+
reject(error);
902+
return;
887903
}
888904

889905
if (!outboundQueued) {
@@ -906,20 +922,10 @@ export abstract class Protocol<ContextT extends BaseContext> {
906922

907923
this.assertNotificationCapability(notification.method as NotificationMethod);
908924

909-
// Delegate task-related notification routing and JSONRPC building to modules (if registered)
910-
let queued = false;
911-
let jsonrpcNotification: JSONRPCNotification | undefined;
912-
913-
if (this._taskManager) {
914-
const taskResult = await this._taskManager.processOutboundNotification(notification, options);
915-
if (taskResult.queued) {
916-
queued = true;
917-
} else {
918-
jsonrpcNotification = taskResult.jsonrpcNotification;
919-
}
920-
} else {
921-
jsonrpcNotification = { ...notification, jsonrpc: '2.0' };
922-
}
925+
// Delegate task-related notification routing and JSONRPC building to TaskManager
926+
const taskResult = await this._taskManager.processOutboundNotification(notification, options);
927+
const queued = taskResult.queued;
928+
const jsonrpcNotification = taskResult.queued ? undefined : taskResult.jsonrpcNotification;
923929

924930
if (queued) {
925931
// Don't send through transport - queued messages are delivered via tasks/result only

0 commit comments

Comments
 (0)