11/*
2- * Copyright 2024-2025 the original author or authors.
2+ * Copyright 2024-2026 the original author or authors.
33 */
44
55package io .modelcontextprotocol .client .transport ;
2323import io .modelcontextprotocol .client .McpAsyncClient ;
2424import io .modelcontextprotocol .client .transport .ResponseSubscribers .ResponseEvent ;
2525import io .modelcontextprotocol .client .transport .customizer .McpAsyncHttpClientRequestCustomizer ;
26+ import io .modelcontextprotocol .client .transport .customizer .McpHttpClientAuthorizationErrorHandler ;
2627import io .modelcontextprotocol .client .transport .customizer .McpSyncHttpClientRequestCustomizer ;
2728import io .modelcontextprotocol .common .McpTransportContext ;
2829import io .modelcontextprotocol .json .McpJsonDefaults ;
7273 * </p>
7374 *
7475 * @author Christian Tzolov
76+ * @author Daniel Garnier-Moiroux
7577 * @see <a href=
7678 * "https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">Streamable
7779 * HTTP transport specification</a>
@@ -115,6 +117,8 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
115117
116118 private final boolean openConnectionOnStartup ;
117119
120+ private final McpHttpClientAuthorizationErrorHandler authorizationErrorHandler ;
121+
118122 private final boolean resumableStreams ;
119123
120124 private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer ;
@@ -132,14 +136,15 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
132136 private HttpClientStreamableHttpTransport (McpJsonMapper jsonMapper , HttpClient httpClient ,
133137 HttpRequest .Builder requestBuilder , String baseUri , String endpoint , boolean resumableStreams ,
134138 boolean openConnectionOnStartup , McpAsyncHttpClientRequestCustomizer httpRequestCustomizer ,
135- List <String > supportedProtocolVersions ) {
139+ McpHttpClientAuthorizationErrorHandler authorizationErrorHandler , List <String > supportedProtocolVersions ) {
136140 this .jsonMapper = jsonMapper ;
137141 this .httpClient = httpClient ;
138142 this .requestBuilder = requestBuilder ;
139143 this .baseUri = URI .create (baseUri );
140144 this .endpoint = endpoint ;
141145 this .resumableStreams = resumableStreams ;
142146 this .openConnectionOnStartup = openConnectionOnStartup ;
147+ this .authorizationErrorHandler = authorizationErrorHandler ;
143148 this .activeSession .set (createTransportSession ());
144149 this .httpRequestCustomizer = httpRequestCustomizer ;
145150 this .supportedProtocolVersions = Collections .unmodifiableList (supportedProtocolVersions );
@@ -239,7 +244,6 @@ public Mono<Void> closeGracefully() {
239244 }
240245
241246 private Mono <Disposable > reconnect (McpTransportStream <Disposable > stream ) {
242-
243247 return Mono .deferContextual (ctx -> {
244248
245249 if (stream != null ) {
@@ -275,121 +279,126 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
275279 var transportContext = connectionCtx .getOrDefault (McpTransportContext .KEY , McpTransportContext .EMPTY );
276280 return Mono .from (this .httpRequestCustomizer .customize (builder , "GET" , uri , null , transportContext ));
277281 })
278- .flatMapMany (
279- requestBuilder -> Flux .<ResponseEvent >create (
280- sseSink -> this .httpClient
281- .sendAsync (requestBuilder .build (),
282- responseInfo -> ResponseSubscribers .sseToBodySubscriber (responseInfo ,
283- sseSink ))
284- .whenComplete ((response , throwable ) -> {
285- if (throwable != null ) {
286- sseSink .error (throwable );
287- }
288- else {
289- logger .debug ("SSE connection established successfully" );
290- }
291- }))
292- .map (responseEvent -> (ResponseSubscribers .SseResponseEvent ) responseEvent )
293- .flatMap (responseEvent -> {
294- int statusCode = responseEvent .responseInfo ().statusCode ();
295-
296- if (statusCode >= 200 && statusCode < 300 ) {
297-
298- if (MESSAGE_EVENT_TYPE .equals (responseEvent .sseEvent ().event ())) {
299- String data = responseEvent .sseEvent ().data ();
300- // Per 2025-11-25 spec (SEP-1699), servers may
301- // send SSE events
302- // with empty data to prime the client for
303- // reconnection.
304- // Skip these events as they contain no JSON-RPC
305- // message.
306- if (data == null || data .isBlank ()) {
307- logger .debug ("Skipping SSE event with empty data (stream primer)" );
308- return Flux .empty ();
309- }
310- try {
311- // We don't support batching ATM and probably
312- // won't since the next version considers
313- // removing it.
314- McpSchema .JSONRPCMessage message = McpSchema
315- .deserializeJsonRpcMessage (this .jsonMapper , data );
316-
317- Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >> idWithMessages = Tuples
318- .of (Optional .ofNullable (responseEvent .sseEvent ().id ()),
319- List .of (message ));
320-
321- McpTransportStream <Disposable > sessionStream = stream != null ? stream
322- : new DefaultMcpTransportStream <>(this .resumableStreams ,
323- this ::reconnect );
324- logger .debug ("Connected stream {}" , sessionStream .streamId ());
325-
326- return Flux .from (sessionStream .consumeSseStream (Flux .just (idWithMessages )));
327-
328- }
329- catch (IOException ioException ) {
330- return Flux .<McpSchema .JSONRPCMessage >error (new McpTransportException (
331- "Error parsing JSON-RPC message: " + responseEvent , ioException ));
332- }
333- }
334- else {
335- logger .debug ("Received SSE event with type: {}" , responseEvent .sseEvent ());
336- return Flux .empty ();
337- }
338- }
339- else if (statusCode == METHOD_NOT_ALLOWED ) { // NotAllowed
340- logger
341- .debug ("The server does not support SSE streams, using request-response mode." );
282+ .flatMapMany (requestBuilder -> Flux .<ResponseEvent >create (sseSink -> this .httpClient
283+ .sendAsync (requestBuilder .build (), this .toSendMessageBodySubscriber (sseSink ))
284+ .whenComplete ((response , throwable ) -> {
285+ if (throwable != null ) {
286+ sseSink .error (throwable );
287+ }
288+ else {
289+ logger .debug ("SSE connection established successfully" );
290+ }
291+ })).flatMap (responseEvent -> {
292+ int statusCode = responseEvent .responseInfo ().statusCode ();
293+ if (statusCode == 401 || statusCode == 403 ) {
294+ // Authorization error
295+ return Mono .deferContextual (innerCtx -> {
296+ var transportContext = innerCtx .getOrDefault (McpTransportContext .KEY ,
297+ McpTransportContext .EMPTY );
298+ return Mono .from (this .authorizationErrorHandler .onAuthorizationError (
299+ responseEvent .responseInfo (), transportContext , this .reconnect (stream ).then (),
300+ Mono .error (new McpHttpClientTransportException (
301+ "Authorization error connecting to SSE stream" ,
302+ responseEvent .responseInfo ()))))
303+ .then (Mono .empty ());
304+ });
305+ }
306+
307+ if (statusCode >= 200 && statusCode < 300
308+ && responseEvent instanceof ResponseSubscribers .SseResponseEvent responseSseEvent ) {
309+ if (MESSAGE_EVENT_TYPE .equals (responseSseEvent .sseEvent ().event ())) {
310+ String data = responseSseEvent .sseEvent ().data ();
311+ // Per 2025-11-25 spec (SEP-1699), servers may
312+ // send SSE events
313+ // with empty data to prime the client for
314+ // reconnection.
315+ // Skip these events as they contain no JSON-RPC
316+ // message.
317+ if (data == null || data .isBlank ()) {
318+ logger .debug ("Skipping SSE event with empty data (stream primer)" );
342319 return Flux .empty ();
343320 }
344- else if (statusCode == NOT_FOUND ) {
345-
346- if (transportSession != null && transportSession .sessionId ().isPresent ()) {
347- // only if the request was sent with a session id
348- // and the response is 404, we consider it a
349- // session not found error.
350- logger .debug ("Session not found for session ID: {}" ,
351- transportSession .sessionId ().get ());
352- String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
353- McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException (
354- "Session not found for session ID: " + sessionIdRepresentation );
355- return Flux .<McpSchema .JSONRPCMessage >error (exception );
356- }
357- return Flux .<McpSchema .JSONRPCMessage >error (
358- new McpTransportException ("Server Not Found. Status code:" + statusCode
359- + ", response-event:" + responseEvent ));
360- }
361- else if (statusCode == BAD_REQUEST ) {
362- if (transportSession != null && transportSession .sessionId ().isPresent ()) {
363- // only if the request was sent with a session id
364- // and thre response is 404, we consider it a
365- // session not found error.
366- String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
367- McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException (
368- "Session not found for session ID: " + sessionIdRepresentation );
369- return Flux .<McpSchema .JSONRPCMessage >error (exception );
370- }
371- return Flux .<McpSchema .JSONRPCMessage >error (
372- new McpTransportException ("Bad Request. Status code:" + statusCode
373- + ", response-event:" + responseEvent ));
321+ try {
322+ // We don't support batching ATM and probably
323+ // won't since the next version considers
324+ // removing it.
325+ McpSchema .JSONRPCMessage message = McpSchema
326+ .deserializeJsonRpcMessage (this .jsonMapper , data );
374327
375- }
328+ Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >> idWithMessages = Tuples
329+ .of (Optional .ofNullable (responseSseEvent .sseEvent ().id ()), List .of (message ));
376330
377- return Flux .<McpSchema .JSONRPCMessage >error (new McpTransportException (
378- "Received unrecognized SSE event type: " + responseEvent .sseEvent ().event ()));
379- }).<McpSchema
380- .JSONRPCMessage >flatMap (
381- jsonrpcMessage -> this .handler .get ().apply (Mono .just (jsonrpcMessage )))
382- .onErrorMap (CompletionException .class , t -> t .getCause ())
383- .onErrorComplete (t -> {
384- this .handleException (t );
385- return true ;
386- })
387- .doFinally (s -> {
388- Disposable ref = disposableRef .getAndSet (null );
389- if (ref != null ) {
390- transportSession .removeConnection (ref );
331+ McpTransportStream <Disposable > sessionStream = stream != null ? stream
332+ : new DefaultMcpTransportStream <>(this .resumableStreams , this ::reconnect );
333+ logger .debug ("Connected stream {}" , sessionStream .streamId ());
334+
335+ return Flux .from (sessionStream .consumeSseStream (Flux .just (idWithMessages )));
336+
337+ }
338+ catch (IOException ioException ) {
339+ return Flux .<McpSchema .JSONRPCMessage >error (new McpTransportException (
340+ "Error parsing JSON-RPC message: " + responseEvent , ioException ));
391341 }
392- }))
342+ }
343+ else {
344+ logger .debug ("Received SSE event with type: {}" , responseSseEvent .sseEvent ());
345+ return Flux .empty ();
346+ }
347+ }
348+ else if (statusCode == METHOD_NOT_ALLOWED ) { // NotAllowed
349+ logger .debug ("The server does not support SSE streams, using request-response mode." );
350+ return Flux .empty ();
351+ }
352+ else if (statusCode == NOT_FOUND ) {
353+
354+ if (transportSession != null && transportSession .sessionId ().isPresent ()) {
355+ // only if the request was sent with a session id
356+ // and the response is 404, we consider it a
357+ // session not found error.
358+ logger .debug ("Session not found for session ID: {}" ,
359+ transportSession .sessionId ().get ());
360+ String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
361+ McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException (
362+ "Session not found for session ID: " + sessionIdRepresentation );
363+ return Flux .<McpSchema .JSONRPCMessage >error (exception );
364+ }
365+ return Flux .<McpSchema .JSONRPCMessage >error (
366+ new McpTransportException ("Server Not Found. Status code:" + statusCode
367+ + ", response-event:" + responseEvent ));
368+ }
369+ else if (statusCode == BAD_REQUEST ) {
370+ if (transportSession != null && transportSession .sessionId ().isPresent ()) {
371+ // only if the request was sent with a session id
372+ // and thre response is 404, we consider it a
373+ // session not found error.
374+ String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
375+ McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException (
376+ "Session not found for session ID: " + sessionIdRepresentation );
377+ return Flux .<McpSchema .JSONRPCMessage >error (exception );
378+ }
379+ return Flux .<McpSchema .JSONRPCMessage >error (new McpTransportException (
380+ "Bad Request. Status code:" + statusCode + ", response-event:" + responseEvent ));
381+ }
382+ else if (responseEvent instanceof ResponseSubscribers .SseResponseEvent sseResponseEvent ) {
383+ return Flux .<McpSchema .JSONRPCMessage >error (new McpTransportException (
384+ "Received unrecognized SSE event type: " + sseResponseEvent .sseEvent ().event ()));
385+ }
386+ return Flux .<McpSchema .JSONRPCMessage >error (new McpHttpClientTransportException (
387+ "Unrecognized server error when connecting to SSE stream" ,
388+ responseEvent .responseInfo ()));
389+ })
390+ .flatMap (jsonrpcMessage -> this .handler .get ().apply (Mono .just (jsonrpcMessage )))
391+ .onErrorMap (CompletionException .class , t -> t .getCause ())
392+ .onErrorComplete (t -> {
393+ this .handleException (t );
394+ return true ;
395+ })
396+ .doFinally (s -> {
397+ Disposable ref = disposableRef .getAndSet (null );
398+ if (ref != null ) {
399+ transportSession .removeConnection (ref );
400+ }
401+ }))
393402 .contextWrite (ctx )
394403 .subscribe ();
395404
@@ -478,6 +487,20 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
478487 })).onErrorMap (CompletionException .class , t -> t .getCause ()).onErrorComplete ().subscribe ();
479488
480489 })).flatMap (responseEvent -> {
490+ int statusCode = responseEvent .responseInfo ().statusCode ();
491+ if (statusCode == 401 || statusCode == 403 ) {
492+ return Mono .deferContextual (ctx -> {
493+ var transportContext = ctx .getOrDefault (McpTransportContext .KEY , McpTransportContext .EMPTY );
494+ return Mono
495+ .from (this .authorizationErrorHandler .onAuthorizationError (responseEvent .responseInfo (),
496+ transportContext , Mono .defer (() -> this .sendMessage (sentMessage )),
497+ Mono .error (new McpHttpClientTransportException (
498+ "Authorization error when sending message" , responseEvent .responseInfo ()))))
499+ .doOnSuccess (s -> deliveredSink .success ())
500+ .then (Mono .empty ());
501+ });
502+ }
503+
481504 if (transportSession .markInitialized (
482505 responseEvent .responseInfo ().headers ().firstValue ("mcp-session-id" ).orElseGet (() -> null ))) {
483506 // Once we have a session, we try to open an async stream for
@@ -488,8 +511,6 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
488511
489512 String sessionRepresentation = sessionIdOrPlaceholder (transportSession );
490513
491- int statusCode = responseEvent .responseInfo ().statusCode ();
492-
493514 if (statusCode >= 200 && statusCode < 300 ) {
494515
495516 String contentType = responseEvent .responseInfo ()
@@ -664,6 +685,8 @@ public static class Builder {
664685 private List <String > supportedProtocolVersions = List .of (ProtocolVersions .MCP_2024_11_05 ,
665686 ProtocolVersions .MCP_2025_03_26 , ProtocolVersions .MCP_2025_06_18 , ProtocolVersions .MCP_2025_11_25 );
666687
688+ private McpHttpClientAuthorizationErrorHandler authorizationErrorHandler = McpHttpClientAuthorizationErrorHandler .NOOP ;
689+
667690 /**
668691 * Creates a new builder with the specified base URI.
669692 * @param baseUri the base URI of the MCP server
@@ -801,6 +824,17 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as
801824 return this ;
802825 }
803826
827+ /**
828+ * Sets the handler to be used when the server responds with HTTP 401 or HTTP 403
829+ * when sending a message.
830+ * @param authorizationErrorHandler the handler
831+ * @return this builder
832+ */
833+ public Builder authorizationErrorHandler (McpHttpClientAuthorizationErrorHandler authorizationErrorHandler ) {
834+ this .authorizationErrorHandler = authorizationErrorHandler ;
835+ return this ;
836+ }
837+
804838 /**
805839 * Sets the connection timeout for the HTTP client.
806840 * @param connectTimeout the connection timeout duration
@@ -845,7 +879,7 @@ public HttpClientStreamableHttpTransport build() {
845879 HttpClient httpClient = this .clientBuilder .connectTimeout (this .connectTimeout ).build ();
846880 return new HttpClientStreamableHttpTransport (jsonMapper == null ? McpJsonDefaults .getMapper () : jsonMapper ,
847881 httpClient , requestBuilder , baseUri , endpoint , resumableStreams , openConnectionOnStartup ,
848- httpRequestCustomizer , supportedProtocolVersions );
882+ httpRequestCustomizer , authorizationErrorHandler , supportedProtocolVersions );
849883 }
850884
851885 }
0 commit comments