11/*
2- * Copyright 2024-2025 the original author or authors.
2+ * Copyright 2024-2026 the original author or authors.
33 */
44
55package io .modelcontextprotocol .client .transport ;
2323import io .modelcontextprotocol .client .McpAsyncClient ;
2424import io .modelcontextprotocol .client .transport .ResponseSubscribers .ResponseEvent ;
2525import io .modelcontextprotocol .client .transport .customizer .McpAsyncHttpClientRequestCustomizer ;
26+ import io .modelcontextprotocol .client .transport .customizer .McpHttpClientAuthorizationErrorHandler ;
2627import io .modelcontextprotocol .client .transport .customizer .McpSyncHttpClientRequestCustomizer ;
2728import io .modelcontextprotocol .common .McpTransportContext ;
2829import io .modelcontextprotocol .json .McpJsonDefaults ;
7273 * </p>
7374 *
7475 * @author Christian Tzolov
76+ * @author Daniel Garnier-Moiroux
7577 * @see <a href=
7678 * "https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">Streamable
7779 * HTTP transport specification</a>
@@ -115,6 +117,8 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
115117
116118 private final boolean openConnectionOnStartup ;
117119
120+ private final McpHttpClientAuthorizationErrorHandler authorizationErrorHandler ;
121+
118122 private final boolean resumableStreams ;
119123
120124 private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer ;
@@ -132,14 +136,15 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
132136 private HttpClientStreamableHttpTransport (McpJsonMapper jsonMapper , HttpClient httpClient ,
133137 HttpRequest .Builder requestBuilder , String baseUri , String endpoint , boolean resumableStreams ,
134138 boolean openConnectionOnStartup , McpAsyncHttpClientRequestCustomizer httpRequestCustomizer ,
135- List <String > supportedProtocolVersions ) {
139+ McpHttpClientAuthorizationErrorHandler authorizationErrorHandler , List <String > supportedProtocolVersions ) {
136140 this .jsonMapper = jsonMapper ;
137141 this .httpClient = httpClient ;
138142 this .requestBuilder = requestBuilder ;
139143 this .baseUri = URI .create (baseUri );
140144 this .endpoint = endpoint ;
141145 this .resumableStreams = resumableStreams ;
142146 this .openConnectionOnStartup = openConnectionOnStartup ;
147+ this .authorizationErrorHandler = authorizationErrorHandler ;
143148 this .activeSession .set (createTransportSession ());
144149 this .httpRequestCustomizer = httpRequestCustomizer ;
145150 this .supportedProtocolVersions = Collections .unmodifiableList (supportedProtocolVersions );
@@ -239,7 +244,6 @@ public Mono<Void> closeGracefully() {
239244 }
240245
241246 private Mono <Disposable > reconnect (McpTransportStream <Disposable > stream ) {
242-
243247 return Mono .deferContextual (ctx -> {
244248
245249 if (stream != null ) {
@@ -275,121 +279,128 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
275279 var transportContext = connectionCtx .getOrDefault (McpTransportContext .KEY , McpTransportContext .EMPTY );
276280 return Mono .from (this .httpRequestCustomizer .customize (builder , "GET" , uri , null , transportContext ));
277281 })
278- .flatMapMany (
279- requestBuilder -> Flux .<ResponseEvent >create (
280- sseSink -> this .httpClient
281- .sendAsync (requestBuilder .build (),
282- responseInfo -> ResponseSubscribers .sseToBodySubscriber (responseInfo ,
283- sseSink ))
284- .whenComplete ((response , throwable ) -> {
285- if (throwable != null ) {
286- sseSink .error (throwable );
287- }
288- else {
289- logger .debug ("SSE connection established successfully" );
290- }
291- }))
292- .map (responseEvent -> (ResponseSubscribers .SseResponseEvent ) responseEvent )
293- .flatMap (responseEvent -> {
294- int statusCode = responseEvent .responseInfo ().statusCode ();
295-
296- if (statusCode >= 200 && statusCode < 300 ) {
297-
298- if (MESSAGE_EVENT_TYPE .equals (responseEvent .sseEvent ().event ())) {
299- String data = responseEvent .sseEvent ().data ();
300- // Per 2025-11-25 spec (SEP-1699), servers may
301- // send SSE events
302- // with empty data to prime the client for
303- // reconnection.
304- // Skip these events as they contain no JSON-RPC
305- // message.
306- if (data == null || data .isBlank ()) {
307- logger .debug ("Skipping SSE event with empty data (stream primer)" );
308- return Flux .empty ();
309- }
310- try {
311- // We don't support batching ATM and probably
312- // won't since the next version considers
313- // removing it.
314- McpSchema .JSONRPCMessage message = McpSchema
315- .deserializeJsonRpcMessage (this .jsonMapper , data );
316-
317- Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >> idWithMessages = Tuples
318- .of (Optional .ofNullable (responseEvent .sseEvent ().id ()),
319- List .of (message ));
320-
321- McpTransportStream <Disposable > sessionStream = stream != null ? stream
322- : new DefaultMcpTransportStream <>(this .resumableStreams ,
323- this ::reconnect );
324- logger .debug ("Connected stream {}" , sessionStream .streamId ());
325-
326- return Flux .from (sessionStream .consumeSseStream (Flux .just (idWithMessages )));
327-
328- }
329- catch (IOException ioException ) {
330- return Flux .<McpSchema .JSONRPCMessage >error (new McpTransportException (
331- "Error parsing JSON-RPC message: " + responseEvent , ioException ));
332- }
333- }
334- else {
335- logger .debug ("Received SSE event with type: {}" , responseEvent .sseEvent ());
336- return Flux .empty ();
337- }
338- }
339- else if (statusCode == METHOD_NOT_ALLOWED ) { // NotAllowed
340- logger
341- .debug ("The server does not support SSE streams, using request-response mode." );
282+ .flatMapMany (requestBuilder -> Flux .<ResponseEvent >create (sseSink -> this .httpClient
283+ .sendAsync (requestBuilder .build (), this .toSendMessageBodySubscriber (sseSink ))
284+ .whenComplete ((response , throwable ) -> {
285+ if (throwable != null ) {
286+ sseSink .error (throwable );
287+ }
288+ else {
289+ logger .debug ("SSE connection established successfully" );
290+ }
291+ })).flatMap (responseEvent -> {
292+ int statusCode = responseEvent .responseInfo ().statusCode ();
293+ if (statusCode == 401 || statusCode == 403 ) {
294+ logger .debug ("Authorization error in sendMessage with code {}" , statusCode );
295+ return Mono .deferContextual (innerCtx -> {
296+ var transportContext = innerCtx .getOrDefault (McpTransportContext .KEY ,
297+ McpTransportContext .EMPTY );
298+ return Mono .from (this .authorizationErrorHandler .onAuthorizationError (
299+ responseEvent .responseInfo (), transportContext , Mono .defer (() -> {
300+ logger .debug ("Authorization error handled, retrying original request" );
301+ return this .reconnect (stream ).then ();
302+ }),
303+ Mono .error (new McpHttpClientTransportException (
304+ "Authorization error connecting to SSE stream" ,
305+ responseEvent .responseInfo ()))))
306+ .then (Mono .empty ());
307+ });
308+ }
309+
310+ if (!(responseEvent instanceof ResponseSubscribers .SseResponseEvent sseResponseEvent )) {
311+ return Flux .<McpSchema .JSONRPCMessage >error (new McpHttpClientTransportException (
312+ "Unrecognized server error when connecting to SSE stream" ,
313+ responseEvent .responseInfo ()));
314+ }
315+ else if (statusCode >= 200 && statusCode < 300 ) {
316+ if (MESSAGE_EVENT_TYPE .equals (sseResponseEvent .sseEvent ().event ())) {
317+ String data = sseResponseEvent .sseEvent ().data ();
318+ // Per 2025-11-25 spec (SEP-1699), servers may
319+ // send SSE events
320+ // with empty data to prime the client for
321+ // reconnection.
322+ // Skip these events as they contain no JSON-RPC
323+ // message.
324+ if (data == null || data .isBlank ()) {
325+ logger .debug ("Skipping SSE event with empty data (stream primer)" );
342326 return Flux .empty ();
343327 }
344- else if (statusCode == NOT_FOUND ) {
345-
346- if (transportSession != null && transportSession .sessionId ().isPresent ()) {
347- // only if the request was sent with a session id
348- // and the response is 404, we consider it a
349- // session not found error.
350- logger .debug ("Session not found for session ID: {}" ,
351- transportSession .sessionId ().get ());
352- String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
353- McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException (
354- "Session not found for session ID: " + sessionIdRepresentation );
355- return Flux .<McpSchema .JSONRPCMessage >error (exception );
356- }
357- return Flux .<McpSchema .JSONRPCMessage >error (
358- new McpTransportException ("Server Not Found. Status code:" + statusCode
359- + ", response-event:" + responseEvent ));
360- }
361- else if (statusCode == BAD_REQUEST ) {
362- if (transportSession != null && transportSession .sessionId ().isPresent ()) {
363- // only if the request was sent with a session id
364- // and thre response is 404, we consider it a
365- // session not found error.
366- String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
367- McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException (
368- "Session not found for session ID: " + sessionIdRepresentation );
369- return Flux .<McpSchema .JSONRPCMessage >error (exception );
370- }
371- return Flux .<McpSchema .JSONRPCMessage >error (
372- new McpTransportException ("Bad Request. Status code:" + statusCode
373- + ", response-event:" + responseEvent ));
328+ try {
329+ // We don't support batching ATM and probably
330+ // won't since the next version considers
331+ // removing it.
332+ McpSchema .JSONRPCMessage message = McpSchema
333+ .deserializeJsonRpcMessage (this .jsonMapper , data );
374334
375- }
335+ Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >> idWithMessages = Tuples
336+ .of (Optional .ofNullable (sseResponseEvent .sseEvent ().id ()), List .of (message ));
337+
338+ McpTransportStream <Disposable > sessionStream = stream != null ? stream
339+ : new DefaultMcpTransportStream <>(this .resumableStreams , this ::reconnect );
340+ logger .debug ("Connected stream {}" , sessionStream .streamId ());
376341
377- return Flux .<McpSchema .JSONRPCMessage >error (new McpTransportException (
378- "Received unrecognized SSE event type: " + responseEvent .sseEvent ().event ()));
379- }).<McpSchema
380- .JSONRPCMessage >flatMap (
381- jsonrpcMessage -> this .handler .get ().apply (Mono .just (jsonrpcMessage )))
382- .onErrorMap (CompletionException .class , t -> t .getCause ())
383- .onErrorComplete (t -> {
384- this .handleException (t );
385- return true ;
386- })
387- .doFinally (s -> {
388- Disposable ref = disposableRef .getAndSet (null );
389- if (ref != null ) {
390- transportSession .removeConnection (ref );
342+ return Flux .from (sessionStream .consumeSseStream (Flux .just (idWithMessages )));
343+
344+ }
345+ catch (IOException ioException ) {
346+ return Flux .<McpSchema .JSONRPCMessage >error (new McpTransportException (
347+ "Error parsing JSON-RPC message: " + responseEvent , ioException ));
391348 }
392- }))
349+ }
350+ else {
351+ logger .debug ("Received SSE event with type: {}" , sseResponseEvent .sseEvent ());
352+ return Flux .empty ();
353+ }
354+ }
355+ else if (statusCode == METHOD_NOT_ALLOWED ) { // NotAllowed
356+ logger .debug ("The server does not support SSE streams, using request-response mode." );
357+ return Flux .empty ();
358+ }
359+ else if (statusCode == NOT_FOUND ) {
360+
361+ if (transportSession != null && transportSession .sessionId ().isPresent ()) {
362+ // only if the request was sent with a session id
363+ // and the response is 404, we consider it a
364+ // session not found error.
365+ logger .debug ("Session not found for session ID: {}" ,
366+ transportSession .sessionId ().get ());
367+ String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
368+ McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException (
369+ "Session not found for session ID: " + sessionIdRepresentation );
370+ return Flux .<McpSchema .JSONRPCMessage >error (exception );
371+ }
372+ return Flux .<McpSchema .JSONRPCMessage >error (
373+ new McpTransportException ("Server Not Found. Status code:" + statusCode
374+ + ", response-event:" + responseEvent ));
375+ }
376+ else if (statusCode == BAD_REQUEST ) {
377+ if (transportSession != null && transportSession .sessionId ().isPresent ()) {
378+ // only if the request was sent with a session id
379+ // and thre response is 404, we consider it a
380+ // session not found error.
381+ String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
382+ McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException (
383+ "Session not found for session ID: " + sessionIdRepresentation );
384+ return Flux .<McpSchema .JSONRPCMessage >error (exception );
385+ }
386+ return Flux .<McpSchema .JSONRPCMessage >error (new McpTransportException (
387+ "Bad Request. Status code:" + statusCode + ", response-event:" + responseEvent ));
388+ }
389+ return Flux .<McpSchema .JSONRPCMessage >error (new McpTransportException (
390+ "Received unrecognized SSE event type: " + sseResponseEvent .sseEvent ().event ()));
391+ })
392+ .flatMap (jsonrpcMessage -> this .handler .get ().apply (Mono .just (jsonrpcMessage )))
393+ .onErrorMap (CompletionException .class , t -> t .getCause ())
394+ .onErrorComplete (t -> {
395+ this .handleException (t );
396+ return true ;
397+ })
398+ .doFinally (s -> {
399+ Disposable ref = disposableRef .getAndSet (null );
400+ if (ref != null ) {
401+ transportSession .removeConnection (ref );
402+ }
403+ }))
393404 .contextWrite (ctx )
394405 .subscribe ();
395406
@@ -478,6 +489,22 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
478489 })).onErrorMap (CompletionException .class , t -> t .getCause ()).onErrorComplete ().subscribe ();
479490
480491 })).flatMap (responseEvent -> {
492+ int statusCode = responseEvent .responseInfo ().statusCode ();
493+ if (statusCode == 401 || statusCode == 403 ) {
494+ logger .debug ("Authorization error in sendMessage with code {}" , statusCode );
495+ return Mono .deferContextual (ctx -> {
496+ var transportContext = ctx .getOrDefault (McpTransportContext .KEY , McpTransportContext .EMPTY );
497+ return Mono .from (this .authorizationErrorHandler
498+ .onAuthorizationError (responseEvent .responseInfo (), transportContext , Mono .defer (() -> {
499+ logger .debug ("Authorization error handled, retrying original request" );
500+ return this .sendMessage (sentMessage );
501+ }), Mono .error (new McpHttpClientTransportException (
502+ "Authorization error when sending message" , responseEvent .responseInfo ()))))
503+ .doOnSuccess (s -> deliveredSink .success ())
504+ .then (Mono .empty ());
505+ });
506+ }
507+
481508 if (transportSession .markInitialized (
482509 responseEvent .responseInfo ().headers ().firstValue ("mcp-session-id" ).orElseGet (() -> null ))) {
483510 // Once we have a session, we try to open an async stream for
@@ -488,8 +515,6 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
488515
489516 String sessionRepresentation = sessionIdOrPlaceholder (transportSession );
490517
491- int statusCode = responseEvent .responseInfo ().statusCode ();
492-
493518 if (statusCode >= 200 && statusCode < 300 ) {
494519
495520 String contentType = responseEvent .responseInfo ()
@@ -664,6 +689,8 @@ public static class Builder {
664689 private List <String > supportedProtocolVersions = List .of (ProtocolVersions .MCP_2024_11_05 ,
665690 ProtocolVersions .MCP_2025_03_26 , ProtocolVersions .MCP_2025_06_18 , ProtocolVersions .MCP_2025_11_25 );
666691
692+ private McpHttpClientAuthorizationErrorHandler authorizationErrorHandler = McpHttpClientAuthorizationErrorHandler .NOOP ;
693+
667694 /**
668695 * Creates a new builder with the specified base URI.
669696 * @param baseUri the base URI of the MCP server
@@ -801,6 +828,17 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as
801828 return this ;
802829 }
803830
831+ /**
832+ * Sets the handler to be used when the server responds with HTTP 401 or HTTP 403
833+ * when sending a message.
834+ * @param authorizationErrorHandler the handler
835+ * @return this builder
836+ */
837+ public Builder authorizationErrorHandler (McpHttpClientAuthorizationErrorHandler authorizationErrorHandler ) {
838+ this .authorizationErrorHandler = authorizationErrorHandler ;
839+ return this ;
840+ }
841+
804842 /**
805843 * Sets the connection timeout for the HTTP client.
806844 * @param connectTimeout the connection timeout duration
@@ -845,7 +883,7 @@ public HttpClientStreamableHttpTransport build() {
845883 HttpClient httpClient = this .clientBuilder .connectTimeout (this .connectTimeout ).build ();
846884 return new HttpClientStreamableHttpTransport (jsonMapper == null ? McpJsonDefaults .getMapper () : jsonMapper ,
847885 httpClient , requestBuilder , baseUri , endpoint , resumableStreams , openConnectionOnStartup ,
848- httpRequestCustomizer , supportedProtocolVersions );
886+ httpRequestCustomizer , authorizationErrorHandler , supportedProtocolVersions );
849887 }
850888
851889 }
0 commit comments