Skip to content

Commit 872aff7

Browse files
committed
fix: add related-task metadata to task-related messages
1 parent 6ea5216 commit 872aff7

File tree

4 files changed

+335
-6
lines changed

4 files changed

+335
-6
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ private RequestHandler<McpSchema.Result> samplingCreateMessageHandler() {
661661
}
662662

663663
// Non-task-augmented request - execute directly
664-
return this.samplingHandler.apply(request).map(result -> (McpSchema.Result) result);
664+
return this.samplingHandler.apply(request).map(result -> processClientResult(request.meta(), result));
665665
};
666666
}
667667

@@ -681,10 +681,45 @@ private RequestHandler<McpSchema.Result> elicitationCreateHandler() {
681681
}
682682

683683
// Non-task-augmented request - execute directly
684-
return this.elicitationHandler.apply(request).map(result -> (McpSchema.Result) result);
684+
return this.elicitationHandler.apply(request).map(result -> processClientResult(request.meta(), result));
685685
};
686686
}
687687

688+
/**
689+
* Processes a client result before returning it to the server. Currently, echoes
690+
* related-task metadata from the request to the response, which is necessary for the
691+
* server to associate elicitation/sampling responses with their originating task
692+
* during side-channeling.
693+
* @param requestMeta the request's _meta field
694+
* @param result the handler's result
695+
* @return the processed result
696+
*/
697+
private McpSchema.Result processClientResult(Map<String, Object> requestMeta, McpSchema.Result result) {
698+
if (requestMeta == null || !requestMeta.containsKey(McpSchema.RELATED_TASK_META_KEY)) {
699+
return result;
700+
}
701+
702+
Object relatedTask = requestMeta.get(McpSchema.RELATED_TASK_META_KEY);
703+
Map<String, Object> newMeta = new java.util.HashMap<>();
704+
newMeta.put(McpSchema.RELATED_TASK_META_KEY, relatedTask);
705+
706+
if (result instanceof McpSchema.ElicitResult elicitResult) {
707+
if (elicitResult.meta() != null) {
708+
newMeta.putAll(elicitResult.meta());
709+
}
710+
return new McpSchema.ElicitResult(elicitResult.action(), elicitResult.content(), newMeta);
711+
}
712+
else if (result instanceof McpSchema.CreateMessageResult messageResult) {
713+
if (messageResult.meta() != null) {
714+
newMeta.putAll(messageResult.meta());
715+
}
716+
return new McpSchema.CreateMessageResult(messageResult.role(), messageResult.content(),
717+
messageResult.model(), messageResult.stopReason(), newMeta);
718+
}
719+
720+
return result;
721+
}
722+
688723
// --------------------------
689724
// Client-Side Task Hosting
690725
// --------------------------

mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1664,7 +1664,7 @@ private Mono<Void> processMessage(McpAsyncServerExchange exchange, QueuedMessage
16641664

16651665
// Handle Notification messages (no response expected)
16661666
if (msg instanceof QueuedMessage.Notification notif) {
1667-
return sendNotificationToClient(exchange, notif);
1667+
return sendNotificationToClient(exchange, notif, taskId);
16681668
}
16691669

16701670
// Response messages should never be returned by dequeue() - but handle gracefully
@@ -1711,8 +1711,43 @@ private TypeRef<? extends McpSchema.Result> getResultTypeRef(String method) {
17111711
/**
17121712
* Sends a notification to the client without waiting for a response.
17131713
*/
1714-
private Mono<Void> sendNotificationToClient(McpAsyncServerExchange exchange, QueuedMessage.Notification notif) {
1715-
return exchange.getSession().sendNotification(notif.method(), notif.notification());
1714+
private Mono<Void> sendNotificationToClient(McpAsyncServerExchange exchange, QueuedMessage.Notification notif,
1715+
String taskId) {
1716+
Object notification = addRelatedTaskMetadataToNotification(taskId, notif.notification());
1717+
return exchange.getSession().sendNotification(notif.method(), notification);
1718+
}
1719+
1720+
/**
1721+
* Adds related-task metadata to a notification. Task status notifications are
1722+
* excluded as they already contain the taskId in their params.
1723+
*/
1724+
private Object addRelatedTaskMetadataToNotification(String taskId, Object notification) {
1725+
// TaskStatusNotification already has taskId in params - spec says SHOULD NOT
1726+
// include metadata
1727+
if (notification instanceof McpSchema.TaskStatusNotification) {
1728+
return notification;
1729+
}
1730+
1731+
Map<String, Object> relatedTask = Map.of("taskId", taskId);
1732+
Map<String, Object> newMeta = new java.util.HashMap<>();
1733+
newMeta.put(McpSchema.RELATED_TASK_META_KEY, relatedTask);
1734+
1735+
if (notification instanceof McpSchema.ProgressNotification pn) {
1736+
if (pn.meta() != null) {
1737+
newMeta.putAll(pn.meta());
1738+
}
1739+
return new McpSchema.ProgressNotification(pn.progressToken(), pn.progress(), pn.total(), pn.message(),
1740+
newMeta);
1741+
}
1742+
else if (notification instanceof McpSchema.LoggingMessageNotification ln) {
1743+
if (ln.meta() != null) {
1744+
newMeta.putAll(ln.meta());
1745+
}
1746+
return new McpSchema.LoggingMessageNotification(ln.level(), ln.logger(), ln.data(), newMeta);
1747+
}
1748+
1749+
// For other notification types, return as-is
1750+
return notification;
17161751
}
17171752

17181753
/**
@@ -1750,13 +1785,37 @@ private Mono<McpSchema.Result> pollAndProcessUntilTerminal(McpAsyncServerExchang
17501785
@SuppressWarnings("unchecked")
17511786
private Mono<McpSchema.Result> fetchTaskResult(String taskId, String sessionId) {
17521787
return this.taskStore.getTaskResult(taskId, sessionId)
1753-
.map(result -> (McpSchema.Result) result)
1788+
.map(result -> addRelatedTaskMetadata(taskId, (McpSchema.Result) result))
17541789
.switchIfEmpty(Mono.error(McpError.builder(ErrorCodes.INVALID_PARAMS)
17551790
.message("Task result not available")
17561791
.data("Task ID: " + taskId)
17571792
.build()));
17581793
}
17591794

1795+
/**
1796+
* Adds the related-task metadata to a result. The tasks/result operation MUST include
1797+
* this metadata in its response, as the result structure itself does not contain the
1798+
* task ID.
1799+
* @param taskId the task ID to include in the metadata
1800+
* @param result the result to add metadata to
1801+
* @return the result with related-task metadata added
1802+
*/
1803+
private McpSchema.Result addRelatedTaskMetadata(String taskId, McpSchema.Result result) {
1804+
Map<String, Object> relatedTask = Map.of("taskId", taskId);
1805+
Map<String, Object> newMeta = new java.util.HashMap<>();
1806+
newMeta.put(McpSchema.RELATED_TASK_META_KEY, relatedTask);
1807+
1808+
if (result instanceof McpSchema.CallToolResult ctr) {
1809+
if (ctr.meta() != null) {
1810+
newMeta.putAll(ctr.meta());
1811+
}
1812+
return new McpSchema.CallToolResult(ctr.content(), ctr.isError(), ctr.structuredContent(), newMeta);
1813+
}
1814+
1815+
// Fallback for any other result types (shouldn't happen for server-side tasks)
1816+
return result;
1817+
}
1818+
17601819
/**
17611820
* Returns the effective automatic polling timeout, using the configured value or the
17621821
* default if not configured.

mcp-core/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,202 @@ void testElicitationCreateRequestHandlingWithNullHandler() {
530530
.hasMessage("Elicitation handler must not be null when client capabilities include elicitation");
531531
}
532532

533+
@Test
534+
void testElicitationResponseIncludesRelatedTaskMetadata() {
535+
MockMcpClientTransport transport = initializationEnabledTransport();
536+
537+
// Create a test elicitation handler that returns a simple response
538+
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler = request -> Mono
539+
.just(new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT, Map.of("value", "42"), null));
540+
541+
// Create client with elicitation capability and handler
542+
McpAsyncClient asyncMcpClient = McpClient.async(transport)
543+
.capabilities(ClientCapabilities.builder().elicitation().build())
544+
.elicitation(elicitationHandler)
545+
.build();
546+
547+
assertThat(asyncMcpClient.initialize().block()).isNotNull();
548+
549+
// Create elicitation request WITH related-task metadata (simulating
550+
// side-channeling)
551+
String taskId = "test-task-123";
552+
Map<String, Object> relatedTaskMeta = Map.of("taskId", taskId);
553+
Map<String, Object> requestMeta = Map.of(McpSchema.RELATED_TASK_META_KEY, relatedTaskMeta);
554+
555+
var elicitRequest = new McpSchema.ElicitRequest("What is your favorite number?",
556+
Map.of("type", "object", "properties", Map.of("value", Map.of("type", "string"))), null, requestMeta);
557+
558+
// Simulate incoming request
559+
McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
560+
McpSchema.METHOD_ELICITATION_CREATE, "test-id", elicitRequest);
561+
transport.simulateIncomingMessage(request);
562+
563+
// Verify response
564+
McpSchema.JSONRPCMessage sentMessage = transport.getLastSentMessage();
565+
assertThat(sentMessage).isInstanceOf(McpSchema.JSONRPCResponse.class);
566+
567+
McpSchema.JSONRPCResponse response = (McpSchema.JSONRPCResponse) sentMessage;
568+
assertThat(response.id()).isEqualTo("test-id");
569+
assertThat(response.error()).isNull();
570+
571+
McpSchema.ElicitResult result = transport.unmarshalFrom(response.result(), new TypeRef<>() {
572+
});
573+
assertThat(result).isNotNull();
574+
assertThat(result.action()).isEqualTo(McpSchema.ElicitResult.Action.ACCEPT);
575+
assertThat(result.content()).isEqualTo(Map.of("value", "42"));
576+
577+
// Verify related-task metadata was echoed back
578+
assertThat(result.meta()).isNotNull();
579+
assertThat(result.meta()).containsKey(McpSchema.RELATED_TASK_META_KEY);
580+
@SuppressWarnings("unchecked")
581+
Map<String, Object> echoedRelatedTask = (Map<String, Object>) result.meta()
582+
.get(McpSchema.RELATED_TASK_META_KEY);
583+
assertThat(echoedRelatedTask.get("taskId")).isEqualTo(taskId);
584+
585+
asyncMcpClient.closeGracefully();
586+
}
587+
588+
@Test
589+
void testElicitationResponseWithoutRelatedTaskMetadata() {
590+
MockMcpClientTransport transport = initializationEnabledTransport();
591+
592+
// Create a test elicitation handler that returns a response with custom meta
593+
Map<String, Object> handlerMeta = Map.of("custom-key", "custom-value");
594+
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler = request -> Mono
595+
.just(new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT, Map.of("value", "42"), handlerMeta));
596+
597+
// Create client with elicitation capability and handler
598+
McpAsyncClient asyncMcpClient = McpClient.async(transport)
599+
.capabilities(ClientCapabilities.builder().elicitation().build())
600+
.elicitation(elicitationHandler)
601+
.build();
602+
603+
assertThat(asyncMcpClient.initialize().block()).isNotNull();
604+
605+
// Create elicitation request WITHOUT related-task metadata
606+
var elicitRequest = new McpSchema.ElicitRequest("What is your favorite number?",
607+
Map.of("type", "object", "properties", Map.of("value", Map.of("type", "string"))));
608+
609+
// Simulate incoming request
610+
McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
611+
McpSchema.METHOD_ELICITATION_CREATE, "test-id", elicitRequest);
612+
transport.simulateIncomingMessage(request);
613+
614+
// Verify response
615+
McpSchema.JSONRPCMessage sentMessage = transport.getLastSentMessage();
616+
McpSchema.JSONRPCResponse response = (McpSchema.JSONRPCResponse) sentMessage;
617+
618+
McpSchema.ElicitResult result = transport.unmarshalFrom(response.result(), new TypeRef<>() {
619+
});
620+
assertThat(result).isNotNull();
621+
622+
// Verify handler's meta is preserved and no related-task was added
623+
assertThat(result.meta()).isEqualTo(handlerMeta);
624+
assertThat(result.meta()).doesNotContainKey(McpSchema.RELATED_TASK_META_KEY);
625+
626+
asyncMcpClient.closeGracefully();
627+
}
628+
629+
@Test
630+
void testSamplingResponseIncludesRelatedTaskMetadata() {
631+
MockMcpClientTransport transport = initializationEnabledTransport();
632+
633+
// Create a test sampling handler that returns a simple response
634+
Function<McpSchema.CreateMessageRequest, Mono<McpSchema.CreateMessageResult>> samplingHandler = request -> Mono
635+
.just(new McpSchema.CreateMessageResult(McpSchema.Role.ASSISTANT, new McpSchema.TextContent("Response"),
636+
"test-model", McpSchema.CreateMessageResult.StopReason.END_TURN, null));
637+
638+
// Create client with sampling capability and handler
639+
McpAsyncClient asyncMcpClient = McpClient.async(transport)
640+
.capabilities(ClientCapabilities.builder().sampling().build())
641+
.sampling(samplingHandler)
642+
.build();
643+
644+
assertThat(asyncMcpClient.initialize().block()).isNotNull();
645+
646+
// Create sampling request WITH related-task metadata (simulating side-channeling)
647+
String taskId = "test-task-456";
648+
Map<String, Object> relatedTaskMeta = Map.of("taskId", taskId);
649+
Map<String, Object> requestMeta = Map.of(McpSchema.RELATED_TASK_META_KEY, relatedTaskMeta);
650+
651+
var messageRequest = new McpSchema.CreateMessageRequest(
652+
List.of(new McpSchema.SamplingMessage(McpSchema.Role.USER, new McpSchema.TextContent("Test message"))),
653+
null, "Test system prompt", McpSchema.CreateMessageRequest.ContextInclusionStrategy.NONE, 0.7, 100,
654+
null, null, null, requestMeta);
655+
656+
// Simulate incoming request
657+
McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
658+
McpSchema.METHOD_SAMPLING_CREATE_MESSAGE, "test-id", messageRequest);
659+
transport.simulateIncomingMessage(request);
660+
661+
// Verify response
662+
McpSchema.JSONRPCMessage sentMessage = transport.getLastSentMessage();
663+
assertThat(sentMessage).isInstanceOf(McpSchema.JSONRPCResponse.class);
664+
665+
McpSchema.JSONRPCResponse response = (McpSchema.JSONRPCResponse) sentMessage;
666+
assertThat(response.id()).isEqualTo("test-id");
667+
assertThat(response.error()).isNull();
668+
669+
McpSchema.CreateMessageResult result = transport.unmarshalFrom(response.result(), new TypeRef<>() {
670+
});
671+
assertThat(result).isNotNull();
672+
assertThat(result.role()).isEqualTo(McpSchema.Role.ASSISTANT);
673+
674+
// Verify related-task metadata was echoed back
675+
assertThat(result.meta()).isNotNull();
676+
assertThat(result.meta()).containsKey(McpSchema.RELATED_TASK_META_KEY);
677+
@SuppressWarnings("unchecked")
678+
Map<String, Object> echoedRelatedTask = (Map<String, Object>) result.meta()
679+
.get(McpSchema.RELATED_TASK_META_KEY);
680+
assertThat(echoedRelatedTask.get("taskId")).isEqualTo(taskId);
681+
682+
asyncMcpClient.closeGracefully();
683+
}
684+
685+
@Test
686+
void testElicitationResponseMergesHandlerMetaWithRelatedTask() {
687+
MockMcpClientTransport transport = initializationEnabledTransport();
688+
689+
// Create a test elicitation handler that returns a response with custom meta
690+
Map<String, Object> handlerMeta = Map.of("custom-key", "custom-value");
691+
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler = request -> Mono
692+
.just(new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT, Map.of("value", "42"), handlerMeta));
693+
694+
// Create client with elicitation capability and handler
695+
McpAsyncClient asyncMcpClient = McpClient.async(transport)
696+
.capabilities(ClientCapabilities.builder().elicitation().build())
697+
.elicitation(elicitationHandler)
698+
.build();
699+
700+
assertThat(asyncMcpClient.initialize().block()).isNotNull();
701+
702+
// Create elicitation request WITH related-task metadata
703+
String taskId = "test-task-789";
704+
Map<String, Object> relatedTaskMeta = Map.of("taskId", taskId);
705+
Map<String, Object> requestMeta = Map.of(McpSchema.RELATED_TASK_META_KEY, relatedTaskMeta);
706+
707+
var elicitRequest = new McpSchema.ElicitRequest("Test?",
708+
Map.of("type", "object", "properties", Map.of("value", Map.of("type", "string"))), null, requestMeta);
709+
710+
// Simulate incoming request
711+
McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
712+
McpSchema.METHOD_ELICITATION_CREATE, "test-id", elicitRequest);
713+
transport.simulateIncomingMessage(request);
714+
715+
// Verify response
716+
McpSchema.JSONRPCResponse response = (McpSchema.JSONRPCResponse) transport.getLastSentMessage();
717+
McpSchema.ElicitResult result = transport.unmarshalFrom(response.result(), new TypeRef<>() {
718+
});
719+
720+
// Verify both handler's meta AND related-task are present (merged)
721+
assertThat(result.meta()).isNotNull();
722+
assertThat(result.meta()).containsKey("custom-key");
723+
assertThat(result.meta().get("custom-key")).isEqualTo("custom-value");
724+
assertThat(result.meta()).containsKey(McpSchema.RELATED_TASK_META_KEY);
725+
726+
asyncMcpClient.closeGracefully();
727+
}
728+
533729
@Test
534730
void testPingMessageRequestHandling() {
535731
MockMcpClientTransport transport = initializationEnabledTransport();

0 commit comments

Comments
 (0)