Skip to content

Commit 3e52ee5

Browse files
committed
feat: implement support for SEP-1686 Tasks
1 parent 46bacda commit 3e52ee5

File tree

50 files changed

+14179
-200
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+14179
-200
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 784 additions & 8 deletions
Large diffs are not rendered by default.

mcp-core/src/main/java/io/modelcontextprotocol/client/McpClient.java

Lines changed: 149 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.modelcontextprotocol.client;
66

77
import io.modelcontextprotocol.common.McpTransportContext;
8+
import io.modelcontextprotocol.experimental.tasks.TaskStore;
89
import io.modelcontextprotocol.json.McpJsonDefaults;
910
import io.modelcontextprotocol.json.schema.JsonSchemaValidator;
1011
import io.modelcontextprotocol.spec.McpClientTransport;
@@ -185,6 +186,8 @@ class SyncSpec {
185186

186187
private final List<Consumer<McpSchema.ProgressNotification>> progressConsumers = new ArrayList<>();
187188

189+
private final List<Consumer<McpSchema.TaskStatusNotification>> taskStatusConsumers = new ArrayList<>();
190+
188191
private Function<CreateMessageRequest, CreateMessageResult> samplingHandler;
189192

190193
private Function<ElicitRequest, ElicitResult> elicitationHandler;
@@ -195,6 +198,10 @@ class SyncSpec {
195198

196199
private boolean enableCallToolSchemaCaching = false; // Default to false
197200

201+
private TaskStore<McpSchema.ClientTaskPayloadResult> taskStore;
202+
203+
private Duration taskPollTimeout; // null = use default (5 minutes)
204+
198205
private SyncSpec(McpClientTransport transport) {
199206
Assert.notNull(transport, "Transport must not be null");
200207
this.transport = transport;
@@ -318,6 +325,44 @@ public SyncSpec elicitation(Function<ElicitRequest, ElicitResult> elicitationHan
318325
return this;
319326
}
320327

328+
/**
329+
* Sets the task store for client-side task hosting. When set, the client can host
330+
* tasks for task-augmented sampling and elicitation requests from the server.
331+
*
332+
* <p>
333+
* This is an experimental feature that may change in future releases.
334+
* @param taskStore The task store implementation. Must not be null.
335+
* @return This builder instance for method chaining
336+
* @throws IllegalArgumentException if taskStore is null
337+
*/
338+
public SyncSpec taskStore(TaskStore<McpSchema.ClientTaskPayloadResult> taskStore) {
339+
Assert.notNull(taskStore, "Task store must not be null");
340+
this.taskStore = taskStore;
341+
return this;
342+
}
343+
344+
/**
345+
* Sets the maximum time to wait for a task to reach a terminal state during task
346+
* result polling.
347+
*
348+
* <p>
349+
* When using task-augmented requests (e.g., long-running tool calls), the client
350+
* polls the server for task status updates. This timeout limits how long the
351+
* client will wait for the task to complete, fail, or be cancelled.
352+
*
353+
* <p>
354+
* If not set, defaults to 5 minutes to prevent infinite polling loops.
355+
*
356+
* <p>
357+
* This is an experimental feature that may change in future releases.
358+
* @param timeout maximum poll duration, or null to use the default (5 minutes)
359+
* @return This builder instance for method chaining
360+
*/
361+
public SyncSpec taskPollTimeout(Duration timeout) {
362+
this.taskPollTimeout = timeout;
363+
return this;
364+
}
365+
321366
/**
322367
* Adds a consumer to be notified when the available tools change. This allows the
323368
* client to react to changes in the server's tool capabilities, such as tools
@@ -429,14 +474,42 @@ public SyncSpec progressConsumer(Consumer<McpSchema.ProgressNotification> progre
429474
* @param progressConsumers A list of consumers that receives progress
430475
* notifications. Must not be null.
431476
* @return This builder instance for method chaining
432-
* @throws IllegalArgumentException if progressConsumer is null
477+
* @throws IllegalArgumentException if progressConsumers is null
433478
*/
434479
public SyncSpec progressConsumers(List<Consumer<McpSchema.ProgressNotification>> progressConsumers) {
435480
Assert.notNull(progressConsumers, "Progress consumers must not be null");
436481
this.progressConsumers.addAll(progressConsumers);
437482
return this;
438483
}
439484

485+
/**
486+
* Adds a consumer to be notified of task status notifications from the server.
487+
* This enables clients to receive updates about task progress and status changes.
488+
* @param taskStatusConsumer A consumer that receives task status notifications.
489+
* Must not be null.
490+
* @return This builder instance for method chaining
491+
* @throws IllegalArgumentException if taskStatusConsumer is null
492+
*/
493+
public SyncSpec taskStatusConsumer(Consumer<McpSchema.TaskStatusNotification> taskStatusConsumer) {
494+
Assert.notNull(taskStatusConsumer, "Task status consumer must not be null");
495+
this.taskStatusConsumers.add(taskStatusConsumer);
496+
return this;
497+
}
498+
499+
/**
500+
* Adds multiple consumers to be notified of task status notifications from the
501+
* server.
502+
* @param taskStatusConsumers A list of consumers that receive task status
503+
* notifications. Must not be null.
504+
* @return This builder instance for method chaining
505+
* @throws IllegalArgumentException if taskStatusConsumers is null
506+
*/
507+
public SyncSpec taskStatusConsumers(List<Consumer<McpSchema.TaskStatusNotification>> taskStatusConsumers) {
508+
Assert.notNull(taskStatusConsumers, "Task status consumers must not be null");
509+
this.taskStatusConsumers.addAll(taskStatusConsumers);
510+
return this;
511+
}
512+
440513
/**
441514
* Add a provider of {@link McpTransportContext}, providing a context before
442515
* calling any client operation. This allows to extract thread-locals and hand
@@ -487,14 +560,15 @@ public SyncSpec enableCallToolSchemaCaching(boolean enableCallToolSchemaCaching)
487560
public McpSyncClient build() {
488561
McpClientFeatures.Sync syncFeatures = new McpClientFeatures.Sync(this.clientInfo, this.capabilities,
489562
this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
490-
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers, this.samplingHandler,
491-
this.elicitationHandler, this.enableCallToolSchemaCaching);
563+
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers,
564+
this.taskStatusConsumers, this.samplingHandler, this.elicitationHandler,
565+
this.enableCallToolSchemaCaching, this.taskPollTimeout, this.taskStore != null);
492566

493567
McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);
494568

495569
return new McpSyncClient(new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout,
496-
jsonSchemaValidator != null ? jsonSchemaValidator : McpJsonDefaults.getSchemaValidator(),
497-
asyncFeatures), this.contextProvider);
570+
jsonSchemaValidator != null ? jsonSchemaValidator : McpJsonDefaults.getSchemaValidator(), asyncFeatures,
571+
this.taskStore), this.contextProvider);
498572
}
499573

500574
}
@@ -541,6 +615,8 @@ class AsyncSpec {
541615

542616
private final List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers = new ArrayList<>();
543617

618+
private final List<Function<McpSchema.TaskStatusNotification, Mono<Void>>> taskStatusConsumers = new ArrayList<>();
619+
544620
private Function<CreateMessageRequest, Mono<CreateMessageResult>> samplingHandler;
545621

546622
private Function<ElicitRequest, Mono<ElicitResult>> elicitationHandler;
@@ -549,6 +625,10 @@ class AsyncSpec {
549625

550626
private boolean enableCallToolSchemaCaching = false; // Default to false
551627

628+
private TaskStore<McpSchema.ClientTaskPayloadResult> taskStore;
629+
630+
private Duration taskPollTimeout; // null = use default (5 minutes)
631+
552632
private AsyncSpec(McpClientTransport transport) {
553633
Assert.notNull(transport, "Transport must not be null");
554634
this.transport = transport;
@@ -672,6 +752,22 @@ public AsyncSpec elicitation(Function<ElicitRequest, Mono<ElicitResult>> elicita
672752
return this;
673753
}
674754

755+
/**
756+
* Sets the task store for client-side task hosting. When set, the client can host
757+
* tasks for task-augmented sampling and elicitation requests from the server.
758+
*
759+
* <p>
760+
* This is an experimental feature that may change in future releases.
761+
* @param taskStore The task store implementation. Must not be null.
762+
* @return This builder instance for method chaining
763+
* @throws IllegalArgumentException if taskStore is null
764+
*/
765+
public AsyncSpec taskStore(TaskStore<McpSchema.ClientTaskPayloadResult> taskStore) {
766+
Assert.notNull(taskStore, "Task store must not be null");
767+
this.taskStore = taskStore;
768+
return this;
769+
}
770+
675771
/**
676772
* Adds a consumer to be notified when the available tools change. This allows the
677773
* client to react to changes in the server's tool capabilities, such as tools
@@ -786,7 +882,7 @@ public AsyncSpec progressConsumer(Function<McpSchema.ProgressNotification, Mono<
786882
* @param progressConsumers A list of consumers that receives progress
787883
* notifications. Must not be null.
788884
* @return This builder instance for method chaining
789-
* @throws IllegalArgumentException if progressConsumer is null
885+
* @throws IllegalArgumentException if progressConsumers is null
790886
*/
791887
public AsyncSpec progressConsumers(
792888
List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers) {
@@ -795,6 +891,35 @@ public AsyncSpec progressConsumers(
795891
return this;
796892
}
797893

894+
/**
895+
* Adds a consumer to be notified of task status notifications from the server.
896+
* This enables clients to receive updates about task progress and status changes.
897+
* @param taskStatusConsumer A consumer that receives task status notifications.
898+
* Must not be null.
899+
* @return This builder instance for method chaining
900+
* @throws IllegalArgumentException if taskStatusConsumer is null
901+
*/
902+
public AsyncSpec taskStatusConsumer(Function<McpSchema.TaskStatusNotification, Mono<Void>> taskStatusConsumer) {
903+
Assert.notNull(taskStatusConsumer, "Task status consumer must not be null");
904+
this.taskStatusConsumers.add(taskStatusConsumer);
905+
return this;
906+
}
907+
908+
/**
909+
* Adds multiple consumers to be notified of task status notifications from the
910+
* server.
911+
* @param taskStatusConsumers A list of consumers that receive task status
912+
* notifications. Must not be null.
913+
* @return This builder instance for method chaining
914+
* @throws IllegalArgumentException if taskStatusConsumers is null
915+
*/
916+
public AsyncSpec taskStatusConsumers(
917+
List<Function<McpSchema.TaskStatusNotification, Mono<Void>>> taskStatusConsumers) {
918+
Assert.notNull(taskStatusConsumers, "Task status consumers must not be null");
919+
this.taskStatusConsumers.addAll(taskStatusConsumers);
920+
return this;
921+
}
922+
798923
/**
799924
* Sets the JSON schema validator to use for validating tool responses against
800925
* output schemas.
@@ -820,6 +945,21 @@ public AsyncSpec enableCallToolSchemaCaching(boolean enableCallToolSchemaCaching
820945
return this;
821946
}
822947

948+
/**
949+
* Sets the maximum duration to poll for task completion in
950+
* {@code callToolStream()}. If not set, defaults to 5 minutes to prevent infinite
951+
* polling loops.
952+
*
953+
* <p>
954+
* This is an experimental feature that may change in future releases.
955+
* @param timeout maximum poll duration, or null to use the default (5 minutes)
956+
* @return This builder instance for method chaining
957+
*/
958+
public AsyncSpec taskPollTimeout(Duration timeout) {
959+
this.taskPollTimeout = timeout;
960+
return this;
961+
}
962+
823963
/**
824964
* Create an instance of {@link McpAsyncClient} with the provided configurations
825965
* or sensible defaults.
@@ -833,7 +973,9 @@ public McpAsyncClient build() {
833973
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
834974
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
835975
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers,
836-
this.samplingHandler, this.elicitationHandler, this.enableCallToolSchemaCaching));
976+
this.taskStatusConsumers, this.samplingHandler, this.elicitationHandler,
977+
this.enableCallToolSchemaCaching, this.taskPollTimeout, this.taskStore != null),
978+
this.taskStore);
837979
}
838980

839981
}

0 commit comments

Comments
 (0)