Skip to content

Commit 41ded4c

Browse files
committed
HttpClientStreamHttpTransport: add authorization error handler
- Closes #240 Signed-off-by: Daniel Garnier-Moiroux <git@garnier.wf>
1 parent 6e4ce1c commit 41ded4c

File tree

5 files changed

+661
-131
lines changed

5 files changed

+661
-131
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 149 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024-2025 the original author or authors.
2+
* Copyright 2024-2026 the original author or authors.
33
*/
44

55
package io.modelcontextprotocol.client.transport;
@@ -23,6 +23,7 @@
2323
import io.modelcontextprotocol.client.McpAsyncClient;
2424
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
2525
import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
26+
import io.modelcontextprotocol.client.transport.customizer.McpHttpClientAuthorizationErrorHandler;
2627
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
2728
import io.modelcontextprotocol.common.McpTransportContext;
2829
import io.modelcontextprotocol.json.McpJsonDefaults;
@@ -72,6 +73,7 @@
7273
* </p>
7374
*
7475
* @author Christian Tzolov
76+
* @author Daniel Garnier-Moiroux
7577
* @see <a href=
7678
* "https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">Streamable
7779
* HTTP transport specification</a>
@@ -115,6 +117,8 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
115117

116118
private final boolean openConnectionOnStartup;
117119

120+
private final McpHttpClientAuthorizationErrorHandler authorizationErrorHandler;
121+
118122
private final boolean resumableStreams;
119123

120124
private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer;
@@ -132,14 +136,15 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
132136
private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient,
133137
HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams,
134138
boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
135-
List<String> supportedProtocolVersions) {
139+
McpHttpClientAuthorizationErrorHandler authorizationErrorHandler, List<String> supportedProtocolVersions) {
136140
this.jsonMapper = jsonMapper;
137141
this.httpClient = httpClient;
138142
this.requestBuilder = requestBuilder;
139143
this.baseUri = URI.create(baseUri);
140144
this.endpoint = endpoint;
141145
this.resumableStreams = resumableStreams;
142146
this.openConnectionOnStartup = openConnectionOnStartup;
147+
this.authorizationErrorHandler = authorizationErrorHandler;
143148
this.activeSession.set(createTransportSession());
144149
this.httpRequestCustomizer = httpRequestCustomizer;
145150
this.supportedProtocolVersions = Collections.unmodifiableList(supportedProtocolVersions);
@@ -239,7 +244,6 @@ public Mono<Void> closeGracefully() {
239244
}
240245

241246
private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
242-
243247
return Mono.deferContextual(ctx -> {
244248

245249
if (stream != null) {
@@ -275,121 +279,125 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
275279
var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
276280
return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
277281
})
278-
.flatMapMany(
279-
requestBuilder -> Flux.<ResponseEvent>create(
280-
sseSink -> this.httpClient
281-
.sendAsync(requestBuilder.build(),
282-
responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo,
283-
sseSink))
284-
.whenComplete((response, throwable) -> {
285-
if (throwable != null) {
286-
sseSink.error(throwable);
287-
}
288-
else {
289-
logger.debug("SSE connection established successfully");
290-
}
291-
}))
292-
.map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
293-
.flatMap(responseEvent -> {
294-
int statusCode = responseEvent.responseInfo().statusCode();
295-
296-
if (statusCode >= 200 && statusCode < 300) {
297-
298-
if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
299-
String data = responseEvent.sseEvent().data();
300-
// Per 2025-11-25 spec (SEP-1699), servers may
301-
// send SSE events
302-
// with empty data to prime the client for
303-
// reconnection.
304-
// Skip these events as they contain no JSON-RPC
305-
// message.
306-
if (data == null || data.isBlank()) {
307-
logger.debug("Skipping SSE event with empty data (stream primer)");
308-
return Flux.empty();
309-
}
310-
try {
311-
// We don't support batching ATM and probably
312-
// won't since the next version considers
313-
// removing it.
314-
McpSchema.JSONRPCMessage message = McpSchema
315-
.deserializeJsonRpcMessage(this.jsonMapper, data);
316-
317-
Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> idWithMessages = Tuples
318-
.of(Optional.ofNullable(responseEvent.sseEvent().id()),
319-
List.of(message));
320-
321-
McpTransportStream<Disposable> sessionStream = stream != null ? stream
322-
: new DefaultMcpTransportStream<>(this.resumableStreams,
323-
this::reconnect);
324-
logger.debug("Connected stream {}", sessionStream.streamId());
325-
326-
return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));
327-
328-
}
329-
catch (IOException ioException) {
330-
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
331-
"Error parsing JSON-RPC message: " + responseEvent, ioException));
332-
}
333-
}
334-
else {
335-
logger.debug("Received SSE event with type: {}", responseEvent.sseEvent());
336-
return Flux.empty();
337-
}
338-
}
339-
else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed
340-
logger
341-
.debug("The server does not support SSE streams, using request-response mode.");
282+
.flatMapMany(requestBuilder -> Flux.<ResponseEvent>create(sseSink -> this.httpClient
283+
.sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(sseSink))
284+
.whenComplete((response, throwable) -> {
285+
if (throwable != null) {
286+
sseSink.error(throwable);
287+
}
288+
else {
289+
logger.debug("SSE connection established successfully");
290+
}
291+
})).flatMap(responseEvent -> {
292+
int statusCode = responseEvent.responseInfo().statusCode();
293+
if (statusCode == 401 || statusCode == 403) {
294+
// Authorization error
295+
return Mono.deferContextual(innerCtx -> {
296+
var transportContext = innerCtx.getOrDefault(McpTransportContext.KEY,
297+
McpTransportContext.EMPTY);
298+
return Mono.from(this.authorizationErrorHandler.onAuthorizationError(
299+
responseEvent.responseInfo(), transportContext, this.reconnect(stream).then(),
300+
Mono.error(new McpHttpClientTransportException(
301+
"Authorization error connecting to SSE stream",
302+
responseEvent.responseInfo()))))
303+
.then(Mono.empty());
304+
});
305+
}
306+
307+
if (!(responseEvent instanceof ResponseSubscribers.SseResponseEvent sseResponseEvent)) {
308+
return Flux.<McpSchema.JSONRPCMessage>error(new McpHttpClientTransportException(
309+
"Unrecognized server error when connecting to SSE stream",
310+
responseEvent.responseInfo()));
311+
}
312+
else if (statusCode >= 200 && statusCode < 300) {
313+
if (MESSAGE_EVENT_TYPE.equals(sseResponseEvent.sseEvent().event())) {
314+
String data = sseResponseEvent.sseEvent().data();
315+
// Per 2025-11-25 spec (SEP-1699), servers may
316+
// send SSE events
317+
// with empty data to prime the client for
318+
// reconnection.
319+
// Skip these events as they contain no JSON-RPC
320+
// message.
321+
if (data == null || data.isBlank()) {
322+
logger.debug("Skipping SSE event with empty data (stream primer)");
342323
return Flux.empty();
343324
}
344-
else if (statusCode == NOT_FOUND) {
345-
346-
if (transportSession != null && transportSession.sessionId().isPresent()) {
347-
// only if the request was sent with a session id
348-
// and the response is 404, we consider it a
349-
// session not found error.
350-
logger.debug("Session not found for session ID: {}",
351-
transportSession.sessionId().get());
352-
String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
353-
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
354-
"Session not found for session ID: " + sessionIdRepresentation);
355-
return Flux.<McpSchema.JSONRPCMessage>error(exception);
356-
}
357-
return Flux.<McpSchema.JSONRPCMessage>error(
358-
new McpTransportException("Server Not Found. Status code:" + statusCode
359-
+ ", response-event:" + responseEvent));
360-
}
361-
else if (statusCode == BAD_REQUEST) {
362-
if (transportSession != null && transportSession.sessionId().isPresent()) {
363-
// only if the request was sent with a session id
364-
// and thre response is 404, we consider it a
365-
// session not found error.
366-
String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
367-
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
368-
"Session not found for session ID: " + sessionIdRepresentation);
369-
return Flux.<McpSchema.JSONRPCMessage>error(exception);
370-
}
371-
return Flux.<McpSchema.JSONRPCMessage>error(
372-
new McpTransportException("Bad Request. Status code:" + statusCode
373-
+ ", response-event:" + responseEvent));
325+
try {
326+
// We don't support batching ATM and probably
327+
// won't since the next version considers
328+
// removing it.
329+
McpSchema.JSONRPCMessage message = McpSchema
330+
.deserializeJsonRpcMessage(this.jsonMapper, data);
374331

375-
}
332+
Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> idWithMessages = Tuples
333+
.of(Optional.ofNullable(sseResponseEvent.sseEvent().id()), List.of(message));
334+
335+
McpTransportStream<Disposable> sessionStream = stream != null ? stream
336+
: new DefaultMcpTransportStream<>(this.resumableStreams, this::reconnect);
337+
logger.debug("Connected stream {}", sessionStream.streamId());
376338

377-
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
378-
"Received unrecognized SSE event type: " + responseEvent.sseEvent().event()));
379-
}).<McpSchema
380-
.JSONRPCMessage>flatMap(
381-
jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
382-
.onErrorMap(CompletionException.class, t -> t.getCause())
383-
.onErrorComplete(t -> {
384-
this.handleException(t);
385-
return true;
386-
})
387-
.doFinally(s -> {
388-
Disposable ref = disposableRef.getAndSet(null);
389-
if (ref != null) {
390-
transportSession.removeConnection(ref);
339+
return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));
340+
341+
}
342+
catch (IOException ioException) {
343+
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
344+
"Error parsing JSON-RPC message: " + responseEvent, ioException));
391345
}
392-
}))
346+
}
347+
else {
348+
logger.debug("Received SSE event with type: {}", sseResponseEvent.sseEvent());
349+
return Flux.empty();
350+
}
351+
}
352+
else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed
353+
logger.debug("The server does not support SSE streams, using request-response mode.");
354+
return Flux.empty();
355+
}
356+
else if (statusCode == NOT_FOUND) {
357+
358+
if (transportSession != null && transportSession.sessionId().isPresent()) {
359+
// only if the request was sent with a session id
360+
// and the response is 404, we consider it a
361+
// session not found error.
362+
logger.debug("Session not found for session ID: {}",
363+
transportSession.sessionId().get());
364+
String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
365+
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
366+
"Session not found for session ID: " + sessionIdRepresentation);
367+
return Flux.<McpSchema.JSONRPCMessage>error(exception);
368+
}
369+
return Flux.<McpSchema.JSONRPCMessage>error(
370+
new McpTransportException("Server Not Found. Status code:" + statusCode
371+
+ ", response-event:" + responseEvent));
372+
}
373+
else if (statusCode == BAD_REQUEST) {
374+
if (transportSession != null && transportSession.sessionId().isPresent()) {
375+
// only if the request was sent with a session id
376+
// and thre response is 404, we consider it a
377+
// session not found error.
378+
String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
379+
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
380+
"Session not found for session ID: " + sessionIdRepresentation);
381+
return Flux.<McpSchema.JSONRPCMessage>error(exception);
382+
}
383+
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
384+
"Bad Request. Status code:" + statusCode + ", response-event:" + responseEvent));
385+
}
386+
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
387+
"Received unrecognized SSE event type: " + sseResponseEvent.sseEvent().event()));
388+
})
389+
.flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
390+
.onErrorMap(CompletionException.class, t -> t.getCause())
391+
.onErrorComplete(t -> {
392+
this.handleException(t);
393+
return true;
394+
})
395+
.doFinally(s -> {
396+
Disposable ref = disposableRef.getAndSet(null);
397+
if (ref != null) {
398+
transportSession.removeConnection(ref);
399+
}
400+
}))
393401
.contextWrite(ctx)
394402
.subscribe();
395403

@@ -478,6 +486,20 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
478486
})).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe();
479487

480488
})).flatMap(responseEvent -> {
489+
int statusCode = responseEvent.responseInfo().statusCode();
490+
if (statusCode == 401 || statusCode == 403) {
491+
return Mono.deferContextual(ctx -> {
492+
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
493+
return Mono
494+
.from(this.authorizationErrorHandler.onAuthorizationError(responseEvent.responseInfo(),
495+
transportContext, Mono.defer(() -> this.sendMessage(sentMessage)),
496+
Mono.error(new McpHttpClientTransportException(
497+
"Authorization error when sending message", responseEvent.responseInfo()))))
498+
.doOnSuccess(s -> deliveredSink.success())
499+
.then(Mono.empty());
500+
});
501+
}
502+
481503
if (transportSession.markInitialized(
482504
responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null))) {
483505
// Once we have a session, we try to open an async stream for
@@ -488,8 +510,6 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
488510

489511
String sessionRepresentation = sessionIdOrPlaceholder(transportSession);
490512

491-
int statusCode = responseEvent.responseInfo().statusCode();
492-
493513
if (statusCode >= 200 && statusCode < 300) {
494514

495515
String contentType = responseEvent.responseInfo()
@@ -664,6 +684,8 @@ public static class Builder {
664684
private List<String> supportedProtocolVersions = List.of(ProtocolVersions.MCP_2024_11_05,
665685
ProtocolVersions.MCP_2025_03_26, ProtocolVersions.MCP_2025_06_18, ProtocolVersions.MCP_2025_11_25);
666686

687+
private McpHttpClientAuthorizationErrorHandler authorizationErrorHandler = McpHttpClientAuthorizationErrorHandler.NOOP;
688+
667689
/**
668690
* Creates a new builder with the specified base URI.
669691
* @param baseUri the base URI of the MCP server
@@ -801,6 +823,17 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as
801823
return this;
802824
}
803825

826+
/**
827+
* Sets the handler to be used when the server responds with HTTP 401 or HTTP 403
828+
* when sending a message.
829+
* @param authorizationErrorHandler the handler
830+
* @return this builder
831+
*/
832+
public Builder authorizationErrorHandler(McpHttpClientAuthorizationErrorHandler authorizationErrorHandler) {
833+
this.authorizationErrorHandler = authorizationErrorHandler;
834+
return this;
835+
}
836+
804837
/**
805838
* Sets the connection timeout for the HTTP client.
806839
* @param connectTimeout the connection timeout duration
@@ -845,7 +878,7 @@ public HttpClientStreamableHttpTransport build() {
845878
HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
846879
return new HttpClientStreamableHttpTransport(jsonMapper == null ? McpJsonDefaults.getMapper() : jsonMapper,
847880
httpClient, requestBuilder, baseUri, endpoint, resumableStreams, openConnectionOnStartup,
848-
httpRequestCustomizer, supportedProtocolVersions);
881+
httpRequestCustomizer, authorizationErrorHandler, supportedProtocolVersions);
849882
}
850883

851884
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2026-2026 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client.transport;
6+
7+
import java.net.http.HttpResponse;
8+
9+
import io.modelcontextprotocol.spec.McpTransportException;
10+
11+
/**
12+
* Authorization-related exception for {@link java.net.http.HttpClient}-based client
13+
* transport. Thrown when the server responds with HTTP 401 or HTTP 403. Wraps the
14+
* response info for further inspection of the headers and the status code.
15+
*
16+
* @see <a href=
17+
* "https://modelcontextprotocol.io/specification/2025-11-25/basic/authorization">MCP
18+
* Specification: Authorization</a>
19+
* @author Daniel Garnier-Moiroux
20+
*/
21+
public class McpHttpClientTransportException extends McpTransportException {
22+
23+
private final HttpResponse.ResponseInfo responseInfo;
24+
25+
public McpHttpClientTransportException(String message, HttpResponse.ResponseInfo responseInfo) {
26+
super(message);
27+
this.responseInfo = responseInfo;
28+
}
29+
30+
public HttpResponse.ResponseInfo getResponseInfo() {
31+
return responseInfo;
32+
}
33+
34+
}

0 commit comments

Comments
 (0)