Skip to content

Commit 8f5c30a

Browse files
committed
HttpClientStreamHttpTransport: add authorization error handler
- Closes #240 Signed-off-by: Daniel Garnier-Moiroux <git@garnier.wf>
1 parent 6e4ce1c commit 8f5c30a

File tree

5 files changed

+542
-129
lines changed

5 files changed

+542
-129
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 149 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.modelcontextprotocol.client.McpAsyncClient;
2424
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
2525
import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
26+
import io.modelcontextprotocol.client.transport.customizer.McpHttpClientAuthorizationErrorHandler;
2627
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
2728
import io.modelcontextprotocol.common.McpTransportContext;
2829
import io.modelcontextprotocol.json.McpJsonDefaults;
@@ -72,6 +73,7 @@
7273
* </p>
7374
*
7475
* @author Christian Tzolov
76+
* @author Daniel Garnier-Moiroux
7577
* @see <a href=
7678
* "https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">Streamable
7779
* HTTP transport specification</a>
@@ -115,6 +117,8 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
115117

116118
private final boolean openConnectionOnStartup;
117119

120+
private final McpHttpClientAuthorizationErrorHandler authorizationErrorHandler;
121+
118122
private final boolean resumableStreams;
119123

120124
private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer;
@@ -132,14 +136,15 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
132136
private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient,
133137
HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams,
134138
boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
135-
List<String> supportedProtocolVersions) {
139+
McpHttpClientAuthorizationErrorHandler authorizationErrorHandler, List<String> supportedProtocolVersions) {
136140
this.jsonMapper = jsonMapper;
137141
this.httpClient = httpClient;
138142
this.requestBuilder = requestBuilder;
139143
this.baseUri = URI.create(baseUri);
140144
this.endpoint = endpoint;
141145
this.resumableStreams = resumableStreams;
142146
this.openConnectionOnStartup = openConnectionOnStartup;
147+
this.authorizationErrorHandler = authorizationErrorHandler;
143148
this.activeSession.set(createTransportSession());
144149
this.httpRequestCustomizer = httpRequestCustomizer;
145150
this.supportedProtocolVersions = Collections.unmodifiableList(supportedProtocolVersions);
@@ -239,7 +244,6 @@ public Mono<Void> closeGracefully() {
239244
}
240245

241246
private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
242-
243247
return Mono.deferContextual(ctx -> {
244248

245249
if (stream != null) {
@@ -275,121 +279,126 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
275279
var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
276280
return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
277281
})
278-
.flatMapMany(
279-
requestBuilder -> Flux.<ResponseEvent>create(
280-
sseSink -> this.httpClient
281-
.sendAsync(requestBuilder.build(),
282-
responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo,
283-
sseSink))
284-
.whenComplete((response, throwable) -> {
285-
if (throwable != null) {
286-
sseSink.error(throwable);
287-
}
288-
else {
289-
logger.debug("SSE connection established successfully");
290-
}
291-
}))
292-
.map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
293-
.flatMap(responseEvent -> {
294-
int statusCode = responseEvent.responseInfo().statusCode();
295-
296-
if (statusCode >= 200 && statusCode < 300) {
297-
298-
if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
299-
String data = responseEvent.sseEvent().data();
300-
// Per 2025-11-25 spec (SEP-1699), servers may
301-
// send SSE events
302-
// with empty data to prime the client for
303-
// reconnection.
304-
// Skip these events as they contain no JSON-RPC
305-
// message.
306-
if (data == null || data.isBlank()) {
307-
logger.debug("Skipping SSE event with empty data (stream primer)");
308-
return Flux.empty();
309-
}
310-
try {
311-
// We don't support batching ATM and probably
312-
// won't since the next version considers
313-
// removing it.
314-
McpSchema.JSONRPCMessage message = McpSchema
315-
.deserializeJsonRpcMessage(this.jsonMapper, data);
316-
317-
Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> idWithMessages = Tuples
318-
.of(Optional.ofNullable(responseEvent.sseEvent().id()),
319-
List.of(message));
320-
321-
McpTransportStream<Disposable> sessionStream = stream != null ? stream
322-
: new DefaultMcpTransportStream<>(this.resumableStreams,
323-
this::reconnect);
324-
logger.debug("Connected stream {}", sessionStream.streamId());
325-
326-
return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));
327-
328-
}
329-
catch (IOException ioException) {
330-
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
331-
"Error parsing JSON-RPC message: " + responseEvent, ioException));
332-
}
333-
}
334-
else {
335-
logger.debug("Received SSE event with type: {}", responseEvent.sseEvent());
336-
return Flux.empty();
337-
}
338-
}
339-
else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed
340-
logger
341-
.debug("The server does not support SSE streams, using request-response mode.");
282+
.flatMapMany(requestBuilder -> Flux.<ResponseEvent>create(sseSink -> this.httpClient
283+
.sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(sseSink))
284+
.whenComplete((response, throwable) -> {
285+
if (throwable != null) {
286+
sseSink.error(throwable);
287+
}
288+
else {
289+
logger.debug("SSE connection established successfully");
290+
}
291+
})).flatMap(responseEvent -> {
292+
int statusCode = responseEvent.responseInfo().statusCode();
293+
if (statusCode == 401 || statusCode == 403) {
294+
// Authorization error
295+
return Mono.deferContextual(innerCtx -> {
296+
var transportContext = innerCtx.getOrDefault(McpTransportContext.KEY,
297+
McpTransportContext.EMPTY);
298+
return Mono.from(this.authorizationErrorHandler.handle(responseEvent.responseInfo(),
299+
transportContext, this.reconnect(stream).then(),
300+
Mono.error(new McpHttpClientTransportException(
301+
"Authorization error connecting to SSE stream",
302+
responseEvent.responseInfo()))))
303+
.then(Mono.empty());
304+
});
305+
}
306+
307+
if (statusCode >= 200 && statusCode < 300
308+
&& responseEvent instanceof ResponseSubscribers.SseResponseEvent responseSseEvent) {
309+
if (MESSAGE_EVENT_TYPE.equals(responseSseEvent.sseEvent().event())) {
310+
String data = responseSseEvent.sseEvent().data();
311+
// Per 2025-11-25 spec (SEP-1699), servers may
312+
// send SSE events
313+
// with empty data to prime the client for
314+
// reconnection.
315+
// Skip these events as they contain no JSON-RPC
316+
// message.
317+
if (data == null || data.isBlank()) {
318+
logger.debug("Skipping SSE event with empty data (stream primer)");
342319
return Flux.empty();
343320
}
344-
else if (statusCode == NOT_FOUND) {
345-
346-
if (transportSession != null && transportSession.sessionId().isPresent()) {
347-
// only if the request was sent with a session id
348-
// and the response is 404, we consider it a
349-
// session not found error.
350-
logger.debug("Session not found for session ID: {}",
351-
transportSession.sessionId().get());
352-
String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
353-
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
354-
"Session not found for session ID: " + sessionIdRepresentation);
355-
return Flux.<McpSchema.JSONRPCMessage>error(exception);
356-
}
357-
return Flux.<McpSchema.JSONRPCMessage>error(
358-
new McpTransportException("Server Not Found. Status code:" + statusCode
359-
+ ", response-event:" + responseEvent));
360-
}
361-
else if (statusCode == BAD_REQUEST) {
362-
if (transportSession != null && transportSession.sessionId().isPresent()) {
363-
// only if the request was sent with a session id
364-
// and thre response is 404, we consider it a
365-
// session not found error.
366-
String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
367-
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
368-
"Session not found for session ID: " + sessionIdRepresentation);
369-
return Flux.<McpSchema.JSONRPCMessage>error(exception);
370-
}
371-
return Flux.<McpSchema.JSONRPCMessage>error(
372-
new McpTransportException("Bad Request. Status code:" + statusCode
373-
+ ", response-event:" + responseEvent));
321+
try {
322+
// We don't support batching ATM and probably
323+
// won't since the next version considers
324+
// removing it.
325+
McpSchema.JSONRPCMessage message = McpSchema
326+
.deserializeJsonRpcMessage(this.jsonMapper, data);
374327

375-
}
328+
Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> idWithMessages = Tuples
329+
.of(Optional.ofNullable(responseSseEvent.sseEvent().id()), List.of(message));
376330

377-
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
378-
"Received unrecognized SSE event type: " + responseEvent.sseEvent().event()));
379-
}).<McpSchema
380-
.JSONRPCMessage>flatMap(
381-
jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
382-
.onErrorMap(CompletionException.class, t -> t.getCause())
383-
.onErrorComplete(t -> {
384-
this.handleException(t);
385-
return true;
386-
})
387-
.doFinally(s -> {
388-
Disposable ref = disposableRef.getAndSet(null);
389-
if (ref != null) {
390-
transportSession.removeConnection(ref);
331+
McpTransportStream<Disposable> sessionStream = stream != null ? stream
332+
: new DefaultMcpTransportStream<>(this.resumableStreams, this::reconnect);
333+
logger.debug("Connected stream {}", sessionStream.streamId());
334+
335+
return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));
336+
337+
}
338+
catch (IOException ioException) {
339+
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
340+
"Error parsing JSON-RPC message: " + responseEvent, ioException));
391341
}
392-
}))
342+
}
343+
else {
344+
logger.debug("Received SSE event with type: {}", responseSseEvent.sseEvent());
345+
return Flux.empty();
346+
}
347+
}
348+
else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed
349+
logger.debug("The server does not support SSE streams, using request-response mode.");
350+
return Flux.empty();
351+
}
352+
else if (statusCode == NOT_FOUND) {
353+
354+
if (transportSession != null && transportSession.sessionId().isPresent()) {
355+
// only if the request was sent with a session id
356+
// and the response is 404, we consider it a
357+
// session not found error.
358+
logger.debug("Session not found for session ID: {}",
359+
transportSession.sessionId().get());
360+
String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
361+
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
362+
"Session not found for session ID: " + sessionIdRepresentation);
363+
return Flux.<McpSchema.JSONRPCMessage>error(exception);
364+
}
365+
return Flux.<McpSchema.JSONRPCMessage>error(
366+
new McpTransportException("Server Not Found. Status code:" + statusCode
367+
+ ", response-event:" + responseEvent));
368+
}
369+
else if (statusCode == BAD_REQUEST) {
370+
if (transportSession != null && transportSession.sessionId().isPresent()) {
371+
// only if the request was sent with a session id
372+
// and thre response is 404, we consider it a
373+
// session not found error.
374+
String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
375+
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
376+
"Session not found for session ID: " + sessionIdRepresentation);
377+
return Flux.<McpSchema.JSONRPCMessage>error(exception);
378+
}
379+
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
380+
"Bad Request. Status code:" + statusCode + ", response-event:" + responseEvent));
381+
}
382+
else if (responseEvent instanceof ResponseSubscribers.SseResponseEvent sseResponseEvent) {
383+
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
384+
"Received unrecognized SSE event type: " + sseResponseEvent.sseEvent().event()));
385+
}
386+
return Flux.<McpSchema.JSONRPCMessage>error(new McpHttpClientTransportException(
387+
"Unrecognized server error when connecting to SSE stream",
388+
responseEvent.responseInfo()));
389+
})
390+
.flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
391+
.onErrorMap(CompletionException.class, t -> t.getCause())
392+
.onErrorComplete(t -> {
393+
this.handleException(t);
394+
return true;
395+
})
396+
.doFinally(s -> {
397+
Disposable ref = disposableRef.getAndSet(null);
398+
if (ref != null) {
399+
transportSession.removeConnection(ref);
400+
}
401+
}))
393402
.contextWrite(ctx)
394403
.subscribe();
395404

@@ -478,6 +487,20 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
478487
})).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe();
479488

480489
})).flatMap(responseEvent -> {
490+
int statusCode = responseEvent.responseInfo().statusCode();
491+
if (statusCode == 401 || statusCode == 403) {
492+
return Mono.deferContextual(ctx -> {
493+
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
494+
return Mono
495+
.from(this.authorizationErrorHandler.handle(responseEvent.responseInfo(), transportContext,
496+
Mono.defer(() -> this.sendMessage(sentMessage)),
497+
Mono.error(new McpHttpClientTransportException(
498+
"Authorization error when sending message", responseEvent.responseInfo()))))
499+
.doOnSuccess(s -> deliveredSink.success())
500+
.then(Mono.empty());
501+
});
502+
}
503+
481504
if (transportSession.markInitialized(
482505
responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null))) {
483506
// Once we have a session, we try to open an async stream for
@@ -488,8 +511,6 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
488511

489512
String sessionRepresentation = sessionIdOrPlaceholder(transportSession);
490513

491-
int statusCode = responseEvent.responseInfo().statusCode();
492-
493514
if (statusCode >= 200 && statusCode < 300) {
494515

495516
String contentType = responseEvent.responseInfo()
@@ -664,6 +685,8 @@ public static class Builder {
664685
private List<String> supportedProtocolVersions = List.of(ProtocolVersions.MCP_2024_11_05,
665686
ProtocolVersions.MCP_2025_03_26, ProtocolVersions.MCP_2025_06_18, ProtocolVersions.MCP_2025_11_25);
666687

688+
private McpHttpClientAuthorizationErrorHandler authorizationErrorHandler = McpHttpClientAuthorizationErrorHandler.NOOP;
689+
667690
/**
668691
* Creates a new builder with the specified base URI.
669692
* @param baseUri the base URI of the MCP server
@@ -801,6 +824,17 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as
801824
return this;
802825
}
803826

827+
/**
828+
* Sets the handler to be used when the server responds with HTTP 401 or HTTP 403
829+
* when sending a message.
830+
* @param authorizationErrorHandler the handler
831+
* @return this builder
832+
*/
833+
public Builder authorizationErrorHandler(McpHttpClientAuthorizationErrorHandler authorizationErrorHandler) {
834+
this.authorizationErrorHandler = authorizationErrorHandler;
835+
return this;
836+
}
837+
804838
/**
805839
* Sets the connection timeout for the HTTP client.
806840
* @param connectTimeout the connection timeout duration
@@ -845,7 +879,7 @@ public HttpClientStreamableHttpTransport build() {
845879
HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
846880
return new HttpClientStreamableHttpTransport(jsonMapper == null ? McpJsonDefaults.getMapper() : jsonMapper,
847881
httpClient, requestBuilder, baseUri, endpoint, resumableStreams, openConnectionOnStartup,
848-
httpRequestCustomizer, supportedProtocolVersions);
882+
httpRequestCustomizer, authorizationErrorHandler, supportedProtocolVersions);
849883
}
850884

851885
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2026-2026 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client.transport;
6+
7+
import java.net.http.HttpResponse;
8+
9+
import io.modelcontextprotocol.spec.McpTransportException;
10+
11+
/**
12+
* Authorization-related exception for {@link java.net.http.HttpClient}-based client
13+
* transport. Thrown when the server responds with HTTP 401 or HTTP 403. Wraps the
14+
* response info for further inspection of the headers and the status code.
15+
*
16+
* @see <a href=
17+
* "https://modelcontextprotocol.io/specification/2025-11-25/basic/authorization">MCP
18+
* Specification: Authorization</a>
19+
* @author Daniel Garnier-Moiroux
20+
*/
21+
public class McpHttpClientTransportException extends McpTransportException {
22+
23+
private final HttpResponse.ResponseInfo responseInfo;
24+
25+
public McpHttpClientTransportException(String message, HttpResponse.ResponseInfo responseInfo) {
26+
super(message);
27+
this.responseInfo = responseInfo;
28+
}
29+
30+
public HttpResponse.ResponseInfo getResponseInfo() {
31+
return responseInfo;
32+
}
33+
34+
}

0 commit comments

Comments
 (0)