Skip to content

Commit d44a783

Browse files
Propagate body-level errors as synthetic JSON-RPC responses
Change onErrorComplete to onErrorResume in sendMessage() so that body-level errors (DataBufferLimitException, parse errors, etc.) emit a synthetic JSON-RPC error response to the handler. This resolves pending responses in McpClientSession immediately instead of hanging until requestTimeout. Fixes #889
1 parent 3a78182 commit d44a783

File tree

2 files changed

+207
-4
lines changed

2 files changed

+207
-4
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,8 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
464464
final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
465465
final McpTransportSession<Disposable> transportSession = this.activeSession.get();
466466

467+
Object requestId = (sentMessage instanceof McpSchema.JSONRPCRequest req) ? req.id() : null;
468+
467469
var uri = Utils.resolveUri(this.baseUri, this.endpoint);
468470
String jsonBody = this.toString(sentMessage);
469471

@@ -636,12 +638,19 @@ else if (statusCode == BAD_REQUEST) {
636638
.retryWhen(authorizationErrorRetrySpec())
637639
.flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
638640
.onErrorMap(CompletionException.class, t -> t.getCause())
639-
.onErrorComplete(t -> {
640-
// handle the error first
641+
.onErrorResume(t -> {
641642
this.handleException(t);
642-
// inform the caller of sendMessage
643643
deliveredSink.error(t);
644-
return true;
644+
if (requestId != null) {
645+
// Emit synthetic error so pending response is resolved
646+
logger.warn("Body-level error for request {}, emitting synthetic error response", requestId, t);
647+
McpSchema.JSONRPCResponse errorResponse = new McpSchema.JSONRPCResponse(
648+
McpSchema.JSONRPC_VERSION, requestId, null,
649+
new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR,
650+
"Transport error during response streaming: " + t.getMessage(), null));
651+
return this.handler.get().apply(Mono.just(errorResponse));
652+
}
653+
return Flux.empty();
645654
})
646655
.doFinally(s -> {
647656
logger.debug("SendMessage finally: {}", s);
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Copyright 2025-2026 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client.transport;
6+
7+
import static org.assertj.core.api.Assertions.assertThat;
8+
9+
import java.io.IOException;
10+
import java.io.OutputStream;
11+
import java.net.InetSocketAddress;
12+
import java.nio.charset.StandardCharsets;
13+
import java.util.concurrent.CopyOnWriteArrayList;
14+
import java.util.concurrent.CountDownLatch;
15+
import java.util.concurrent.TimeUnit;
16+
import java.util.concurrent.atomic.AtomicBoolean;
17+
18+
import com.sun.net.httpserver.HttpServer;
19+
20+
import io.modelcontextprotocol.server.transport.TomcatTestUtil;
21+
import io.modelcontextprotocol.spec.McpClientTransport;
22+
import io.modelcontextprotocol.spec.McpSchema;
23+
import io.modelcontextprotocol.spec.McpTransportException;
24+
import io.modelcontextprotocol.spec.ProtocolVersions;
25+
import org.junit.jupiter.api.AfterEach;
26+
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.Test;
28+
import org.junit.jupiter.api.Timeout;
29+
import reactor.test.StepVerifier;
30+
31+
/**
32+
* Tests for body-level error handling in {@link HttpClientStreamableHttpTransport}.
33+
*
34+
* @author James Kennedy
35+
* @see <a href="https://github.com/modelcontextprotocol/java-sdk/issues/889">#889</a>
36+
*/
37+
@Timeout(15)
38+
public class HttpClientStreamableHttpTransportBodyErrorTest {
39+
40+
private static final int PORT = TomcatTestUtil.findAvailablePort();
41+
42+
private static final String HOST = "http://localhost:" + PORT;
43+
44+
private HttpServer server;
45+
46+
private McpClientTransport transport;
47+
48+
private final AtomicBoolean returnMalformedSse = new AtomicBoolean(false);
49+
50+
@BeforeEach
51+
void startServer() throws IOException {
52+
server = HttpServer.create(new InetSocketAddress(PORT), 0);
53+
54+
server.createContext("/mcp", exchange -> {
55+
String method = exchange.getRequestMethod();
56+
57+
if ("DELETE".equals(method)) {
58+
exchange.sendResponseHeaders(200, 0);
59+
exchange.close();
60+
return;
61+
}
62+
63+
if ("GET".equals(method)) {
64+
exchange.sendResponseHeaders(405, 0);
65+
exchange.close();
66+
return;
67+
}
68+
69+
if (returnMalformedSse.get()) {
70+
exchange.getResponseHeaders().set("Content-Type", "text/event-stream");
71+
exchange.sendResponseHeaders(200, 0);
72+
OutputStream os = exchange.getResponseBody();
73+
os.write("event: message\ndata: {not valid json\n\n".getBytes(StandardCharsets.UTF_8));
74+
os.flush();
75+
exchange.close();
76+
return;
77+
}
78+
79+
exchange.getResponseHeaders().set("Content-Type", "application/json");
80+
String response = "{\"jsonrpc\":\"2.0\",\"result\":{},\"id\":\"init-id\"}";
81+
exchange.sendResponseHeaders(200, response.length());
82+
exchange.getResponseBody().write(response.getBytes(StandardCharsets.UTF_8));
83+
exchange.close();
84+
});
85+
86+
server.setExecutor(null);
87+
server.start();
88+
89+
transport = HttpClientStreamableHttpTransport.builder(HOST).build();
90+
}
91+
92+
@AfterEach
93+
void stopServer() {
94+
if (server != null) {
95+
server.stop(0);
96+
}
97+
StepVerifier.create(transport.closeGracefully()).verifyComplete();
98+
}
99+
100+
@Test
101+
void bodyErrorOnSseStreamPropagatesError() {
102+
StepVerifier.create(transport.connect(msg -> msg)).verifyComplete();
103+
104+
returnMalformedSse.set(true);
105+
106+
var request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE, "req-123",
107+
new McpSchema.InitializeRequest(ProtocolVersions.MCP_2025_03_26,
108+
McpSchema.ClientCapabilities.builder().roots(true).build(),
109+
new McpSchema.Implementation("Test Client", "1.0.0")));
110+
111+
StepVerifier.create(transport.sendMessage(request)).expectError(McpTransportException.class).verify();
112+
}
113+
114+
@Test
115+
void bodyErrorOnJsonResponseEmitsSyntheticErrorResponse() throws InterruptedException {
116+
var handlerMessages = new CopyOnWriteArrayList<McpSchema.JSONRPCMessage>();
117+
CountDownLatch errorResponseLatch = new CountDownLatch(1);
118+
119+
StepVerifier.create(transport.connect(msg -> msg.doOnNext(m -> {
120+
handlerMessages.add(m);
121+
if (m instanceof McpSchema.JSONRPCResponse resp && resp.error() != null) {
122+
errorResponseLatch.countDown();
123+
}
124+
}))).verifyComplete();
125+
126+
server.removeContext("/mcp");
127+
server.createContext("/mcp", exchange -> {
128+
String method = exchange.getRequestMethod();
129+
130+
if ("DELETE".equals(method)) {
131+
exchange.sendResponseHeaders(200, 0);
132+
exchange.close();
133+
return;
134+
}
135+
136+
if ("GET".equals(method)) {
137+
exchange.sendResponseHeaders(405, 0);
138+
exchange.close();
139+
return;
140+
}
141+
142+
exchange.getResponseHeaders().set("Content-Type", "application/json");
143+
byte[] malformed = "{not valid json".getBytes(StandardCharsets.UTF_8);
144+
exchange.sendResponseHeaders(200, malformed.length);
145+
exchange.getResponseBody().write(malformed);
146+
exchange.close();
147+
});
148+
149+
var request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE, "req-456",
150+
new McpSchema.InitializeRequest(ProtocolVersions.MCP_2025_03_26,
151+
McpSchema.ClientCapabilities.builder().roots(true).build(),
152+
new McpSchema.Implementation("Test Client", "1.0.0")));
153+
154+
StepVerifier.create(transport.sendMessage(request)).verifyComplete();
155+
156+
assertThat(errorResponseLatch.await(5, TimeUnit.SECONDS))
157+
.as("Handler should receive synthetic error response within 5 seconds")
158+
.isTrue();
159+
160+
var errorResponses = handlerMessages.stream()
161+
.filter(m -> m instanceof McpSchema.JSONRPCResponse resp && resp.error() != null)
162+
.map(m -> (McpSchema.JSONRPCResponse) m)
163+
.toList();
164+
165+
assertThat(errorResponses).hasSize(1);
166+
McpSchema.JSONRPCResponse errorResponse = errorResponses.get(0);
167+
assertThat(errorResponse.id()).isEqualTo("req-456");
168+
assertThat(errorResponse.error().code()).isEqualTo(McpSchema.ErrorCodes.INTERNAL_ERROR);
169+
assertThat(errorResponse.error().message()).contains("Transport error");
170+
}
171+
172+
@Test
173+
void bodyErrorOnNotificationDoesNotEmitSyntheticResponse() throws InterruptedException {
174+
var handlerMessages = new CopyOnWriteArrayList<McpSchema.JSONRPCMessage>();
175+
176+
StepVerifier.create(transport.connect(msg -> msg.doOnNext(handlerMessages::add))).verifyComplete();
177+
178+
returnMalformedSse.set(true);
179+
180+
var notification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION, "notifications/cancelled",
181+
null);
182+
183+
StepVerifier.create(transport.sendMessage(notification)).expectError(McpTransportException.class).verify();
184+
185+
Thread.sleep(500);
186+
187+
var errorResponses = handlerMessages.stream()
188+
.filter(m -> m instanceof McpSchema.JSONRPCResponse resp && resp.error() != null)
189+
.toList();
190+
191+
assertThat(errorResponses).isEmpty();
192+
}
193+
194+
}

0 commit comments

Comments
 (0)