Skip to content

Commit abcd19c

Browse files
authored
HttpClientStreamHttpTransport: add authorization error handler (#861)
HttpClientStreamHttpTransport: add authorization error handler - Closes #240 Signed-off-by: Daniel Garnier-Moiroux <git@garnier.wf>
1 parent 6e4ce1c commit abcd19c

File tree

5 files changed

+771
-131
lines changed

5 files changed

+771
-131
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 158 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024-2025 the original author or authors.
2+
* Copyright 2024-2026 the original author or authors.
33
*/
44

55
package io.modelcontextprotocol.client.transport;
@@ -23,6 +23,7 @@
2323
import io.modelcontextprotocol.client.McpAsyncClient;
2424
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
2525
import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
26+
import io.modelcontextprotocol.client.transport.customizer.McpHttpClientAuthorizationErrorHandler;
2627
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
2728
import io.modelcontextprotocol.common.McpTransportContext;
2829
import io.modelcontextprotocol.json.McpJsonDefaults;
@@ -50,6 +51,7 @@
5051
import reactor.core.publisher.Mono;
5152
import reactor.util.function.Tuple2;
5253
import reactor.util.function.Tuples;
54+
import reactor.util.retry.Retry;
5355

5456
/**
5557
* An implementation of the Streamable HTTP protocol as defined by the
@@ -72,6 +74,7 @@
7274
* </p>
7375
*
7476
* @author Christian Tzolov
77+
* @author Daniel Garnier-Moiroux
7578
* @see <a href=
7679
* "https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">Streamable
7780
* HTTP transport specification</a>
@@ -115,6 +118,8 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
115118

116119
private final boolean openConnectionOnStartup;
117120

121+
private final McpHttpClientAuthorizationErrorHandler authorizationErrorHandler;
122+
118123
private final boolean resumableStreams;
119124

120125
private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer;
@@ -132,14 +137,15 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
132137
private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient,
133138
HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams,
134139
boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
135-
List<String> supportedProtocolVersions) {
140+
McpHttpClientAuthorizationErrorHandler authorizationErrorHandler, List<String> supportedProtocolVersions) {
136141
this.jsonMapper = jsonMapper;
137142
this.httpClient = httpClient;
138143
this.requestBuilder = requestBuilder;
139144
this.baseUri = URI.create(baseUri);
140145
this.endpoint = endpoint;
141146
this.resumableStreams = resumableStreams;
142147
this.openConnectionOnStartup = openConnectionOnStartup;
148+
this.authorizationErrorHandler = authorizationErrorHandler;
143149
this.activeSession.set(createTransportSession());
144150
this.httpRequestCustomizer = httpRequestCustomizer;
145151
this.supportedProtocolVersions = Collections.unmodifiableList(supportedProtocolVersions);
@@ -239,7 +245,6 @@ public Mono<Void> closeGracefully() {
239245
}
240246

241247
private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
242-
243248
return Mono.deferContextual(ctx -> {
244249

245250
if (stream != null) {
@@ -275,121 +280,120 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
275280
var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
276281
return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
277282
})
278-
.flatMapMany(
279-
requestBuilder -> Flux.<ResponseEvent>create(
280-
sseSink -> this.httpClient
281-
.sendAsync(requestBuilder.build(),
282-
responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo,
283-
sseSink))
284-
.whenComplete((response, throwable) -> {
285-
if (throwable != null) {
286-
sseSink.error(throwable);
287-
}
288-
else {
289-
logger.debug("SSE connection established successfully");
290-
}
291-
}))
292-
.map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
293-
.flatMap(responseEvent -> {
294-
int statusCode = responseEvent.responseInfo().statusCode();
295-
296-
if (statusCode >= 200 && statusCode < 300) {
297-
298-
if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
299-
String data = responseEvent.sseEvent().data();
300-
// Per 2025-11-25 spec (SEP-1699), servers may
301-
// send SSE events
302-
// with empty data to prime the client for
303-
// reconnection.
304-
// Skip these events as they contain no JSON-RPC
305-
// message.
306-
if (data == null || data.isBlank()) {
307-
logger.debug("Skipping SSE event with empty data (stream primer)");
308-
return Flux.empty();
309-
}
310-
try {
311-
// We don't support batching ATM and probably
312-
// won't since the next version considers
313-
// removing it.
314-
McpSchema.JSONRPCMessage message = McpSchema
315-
.deserializeJsonRpcMessage(this.jsonMapper, data);
316-
317-
Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> idWithMessages = Tuples
318-
.of(Optional.ofNullable(responseEvent.sseEvent().id()),
319-
List.of(message));
320-
321-
McpTransportStream<Disposable> sessionStream = stream != null ? stream
322-
: new DefaultMcpTransportStream<>(this.resumableStreams,
323-
this::reconnect);
324-
logger.debug("Connected stream {}", sessionStream.streamId());
325-
326-
return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));
327-
328-
}
329-
catch (IOException ioException) {
330-
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
331-
"Error parsing JSON-RPC message: " + responseEvent, ioException));
332-
}
333-
}
334-
else {
335-
logger.debug("Received SSE event with type: {}", responseEvent.sseEvent());
336-
return Flux.empty();
337-
}
338-
}
339-
else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed
340-
logger
341-
.debug("The server does not support SSE streams, using request-response mode.");
283+
.flatMapMany(requestBuilder -> Flux.<ResponseEvent>create(sseSink -> this.httpClient
284+
.sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(sseSink))
285+
.whenComplete((response, throwable) -> {
286+
if (throwable != null) {
287+
sseSink.error(throwable);
288+
}
289+
else {
290+
logger.debug("SSE connection established successfully");
291+
}
292+
})).flatMap(responseEvent -> {
293+
int statusCode = responseEvent.responseInfo().statusCode();
294+
if (statusCode == 401 || statusCode == 403) {
295+
logger.debug("Authorization error in reconnect with code {}", statusCode);
296+
return Mono.<McpSchema.JSONRPCMessage>error(
297+
new McpHttpClientTransportAuthorizationException(
298+
"Authorization error connecting to SSE stream",
299+
responseEvent.responseInfo()));
300+
}
301+
302+
if (!(responseEvent instanceof ResponseSubscribers.SseResponseEvent sseResponseEvent)) {
303+
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
304+
"Unrecognized server error when connecting to SSE stream, status code: "
305+
+ statusCode));
306+
}
307+
else if (statusCode >= 200 && statusCode < 300) {
308+
if (MESSAGE_EVENT_TYPE.equals(sseResponseEvent.sseEvent().event())) {
309+
String data = sseResponseEvent.sseEvent().data();
310+
// Per 2025-11-25 spec (SEP-1699), servers may
311+
// send SSE events
312+
// with empty data to prime the client for
313+
// reconnection.
314+
// Skip these events as they contain no JSON-RPC
315+
// message.
316+
if (data == null || data.isBlank()) {
317+
logger.debug("Skipping SSE event with empty data (stream primer)");
342318
return Flux.empty();
343319
}
344-
else if (statusCode == NOT_FOUND) {
345-
346-
if (transportSession != null && transportSession.sessionId().isPresent()) {
347-
// only if the request was sent with a session id
348-
// and the response is 404, we consider it a
349-
// session not found error.
350-
logger.debug("Session not found for session ID: {}",
351-
transportSession.sessionId().get());
352-
String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
353-
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
354-
"Session not found for session ID: " + sessionIdRepresentation);
355-
return Flux.<McpSchema.JSONRPCMessage>error(exception);
356-
}
357-
return Flux.<McpSchema.JSONRPCMessage>error(
358-
new McpTransportException("Server Not Found. Status code:" + statusCode
359-
+ ", response-event:" + responseEvent));
360-
}
361-
else if (statusCode == BAD_REQUEST) {
362-
if (transportSession != null && transportSession.sessionId().isPresent()) {
363-
// only if the request was sent with a session id
364-
// and thre response is 404, we consider it a
365-
// session not found error.
366-
String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
367-
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
368-
"Session not found for session ID: " + sessionIdRepresentation);
369-
return Flux.<McpSchema.JSONRPCMessage>error(exception);
370-
}
371-
return Flux.<McpSchema.JSONRPCMessage>error(
372-
new McpTransportException("Bad Request. Status code:" + statusCode
373-
+ ", response-event:" + responseEvent));
320+
try {
321+
// We don't support batching ATM and probably
322+
// won't since the next version considers
323+
// removing it.
324+
McpSchema.JSONRPCMessage message = McpSchema
325+
.deserializeJsonRpcMessage(this.jsonMapper, data);
374326

375-
}
327+
Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> idWithMessages = Tuples
328+
.of(Optional.ofNullable(sseResponseEvent.sseEvent().id()), List.of(message));
329+
330+
McpTransportStream<Disposable> sessionStream = stream != null ? stream
331+
: new DefaultMcpTransportStream<>(this.resumableStreams, this::reconnect);
332+
logger.debug("Connected stream {}", sessionStream.streamId());
376333

377-
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
378-
"Received unrecognized SSE event type: " + responseEvent.sseEvent().event()));
379-
}).<McpSchema
380-
.JSONRPCMessage>flatMap(
381-
jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
382-
.onErrorMap(CompletionException.class, t -> t.getCause())
383-
.onErrorComplete(t -> {
384-
this.handleException(t);
385-
return true;
386-
})
387-
.doFinally(s -> {
388-
Disposable ref = disposableRef.getAndSet(null);
389-
if (ref != null) {
390-
transportSession.removeConnection(ref);
334+
return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));
335+
336+
}
337+
catch (IOException ioException) {
338+
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
339+
"Error parsing JSON-RPC message: " + responseEvent, ioException));
391340
}
392-
}))
341+
}
342+
else {
343+
logger.debug("Received SSE event with type: {}", sseResponseEvent.sseEvent());
344+
return Flux.empty();
345+
}
346+
}
347+
else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed
348+
logger.debug("The server does not support SSE streams, using request-response mode.");
349+
return Flux.empty();
350+
}
351+
else if (statusCode == NOT_FOUND) {
352+
353+
if (transportSession != null && transportSession.sessionId().isPresent()) {
354+
// only if the request was sent with a session id
355+
// and the response is 404, we consider it a
356+
// session not found error.
357+
logger.debug("Session not found for session ID: {}",
358+
transportSession.sessionId().get());
359+
String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
360+
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
361+
"Session not found for session ID: " + sessionIdRepresentation);
362+
return Flux.<McpSchema.JSONRPCMessage>error(exception);
363+
}
364+
return Flux.<McpSchema.JSONRPCMessage>error(
365+
new McpTransportException("Server Not Found. Status code:" + statusCode
366+
+ ", response-event:" + responseEvent));
367+
}
368+
else if (statusCode == BAD_REQUEST) {
369+
if (transportSession != null && transportSession.sessionId().isPresent()) {
370+
// only if the request was sent with a session id
371+
// and thre response is 404, we consider it a
372+
// session not found error.
373+
String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
374+
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
375+
"Session not found for session ID: " + sessionIdRepresentation);
376+
return Flux.<McpSchema.JSONRPCMessage>error(exception);
377+
}
378+
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
379+
"Bad Request. Status code:" + statusCode + ", response-event:" + responseEvent));
380+
}
381+
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
382+
"Received unrecognized SSE event type: " + sseResponseEvent.sseEvent().event()));
383+
})
384+
.retryWhen(authorizationErrorRetrySpec())
385+
.flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
386+
.onErrorMap(CompletionException.class, t -> t.getCause())
387+
.onErrorComplete(t -> {
388+
this.handleException(t);
389+
return true;
390+
})
391+
.doFinally(s -> {
392+
Disposable ref = disposableRef.getAndSet(null);
393+
if (ref != null) {
394+
transportSession.removeConnection(ref);
395+
}
396+
}))
393397
.contextWrite(ctx)
394398
.subscribe();
395399

@@ -400,6 +404,25 @@ else if (statusCode == BAD_REQUEST) {
400404

401405
}
402406

407+
private Retry authorizationErrorRetrySpec() {
408+
return Retry.from(companion -> companion.flatMap(retrySignal -> {
409+
if (!(retrySignal.failure() instanceof McpHttpClientTransportAuthorizationException authException)) {
410+
return Mono.error(retrySignal.failure());
411+
}
412+
if (retrySignal.totalRetriesInARow() >= this.authorizationErrorHandler.maxRetries()) {
413+
return Mono.error(retrySignal.failure());
414+
}
415+
return Mono.deferContextual(ctx -> {
416+
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
417+
return Mono
418+
.from(this.authorizationErrorHandler.handle(authException.getResponseInfo(), transportContext))
419+
.switchIfEmpty(Mono.just(false))
420+
.flatMap(shouldRetry -> shouldRetry ? Mono.just(retrySignal.totalRetries())
421+
: Mono.error(retrySignal.failure()));
422+
});
423+
}));
424+
}
425+
403426
private BodyHandler<Void> toSendMessageBodySubscriber(FluxSink<ResponseEvent> sink) {
404427

405428
BodyHandler<Void> responseBodyHandler = responseInfo -> {
@@ -478,6 +501,13 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
478501
})).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe();
479502

480503
})).flatMap(responseEvent -> {
504+
int statusCode = responseEvent.responseInfo().statusCode();
505+
if (statusCode == 401 || statusCode == 403) {
506+
logger.debug("Authorization error in sendMessage with code {}", statusCode);
507+
return Mono.<McpSchema.JSONRPCMessage>error(new McpHttpClientTransportAuthorizationException(
508+
"Authorization error when sending message", responseEvent.responseInfo()));
509+
}
510+
481511
if (transportSession.markInitialized(
482512
responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null))) {
483513
// Once we have a session, we try to open an async stream for
@@ -488,8 +518,6 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
488518

489519
String sessionRepresentation = sessionIdOrPlaceholder(transportSession);
490520

491-
int statusCode = responseEvent.responseInfo().statusCode();
492-
493521
if (statusCode >= 200 && statusCode < 300) {
494522

495523
String contentType = responseEvent.responseInfo()
@@ -605,6 +633,7 @@ else if (statusCode == BAD_REQUEST) {
605633
return Flux.<McpSchema.JSONRPCMessage>error(
606634
new RuntimeException("Failed to send message: " + responseEvent));
607635
})
636+
.retryWhen(authorizationErrorRetrySpec())
608637
.flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
609638
.onErrorMap(CompletionException.class, t -> t.getCause())
610639
.onErrorComplete(t -> {
@@ -664,6 +693,8 @@ public static class Builder {
664693
private List<String> supportedProtocolVersions = List.of(ProtocolVersions.MCP_2024_11_05,
665694
ProtocolVersions.MCP_2025_03_26, ProtocolVersions.MCP_2025_06_18, ProtocolVersions.MCP_2025_11_25);
666695

696+
private McpHttpClientAuthorizationErrorHandler authorizationErrorHandler = McpHttpClientAuthorizationErrorHandler.NOOP;
697+
667698
/**
668699
* Creates a new builder with the specified base URI.
669700
* @param baseUri the base URI of the MCP server
@@ -801,6 +832,17 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as
801832
return this;
802833
}
803834

835+
/**
836+
* Sets the handler to be used when the server responds with HTTP 401 or HTTP 403
837+
* when sending a message.
838+
* @param authorizationErrorHandler the handler
839+
* @return this builder
840+
*/
841+
public Builder authorizationErrorHandler(McpHttpClientAuthorizationErrorHandler authorizationErrorHandler) {
842+
this.authorizationErrorHandler = authorizationErrorHandler;
843+
return this;
844+
}
845+
804846
/**
805847
* Sets the connection timeout for the HTTP client.
806848
* @param connectTimeout the connection timeout duration
@@ -845,7 +887,7 @@ public HttpClientStreamableHttpTransport build() {
845887
HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
846888
return new HttpClientStreamableHttpTransport(jsonMapper == null ? McpJsonDefaults.getMapper() : jsonMapper,
847889
httpClient, requestBuilder, baseUri, endpoint, resumableStreams, openConnectionOnStartup,
848-
httpRequestCustomizer, supportedProtocolVersions);
890+
httpRequestCustomizer, authorizationErrorHandler, supportedProtocolVersions);
849891
}
850892

851893
}

0 commit comments

Comments
 (0)