11/*
2- * Copyright 2024-2025 the original author or authors.
2+ * Copyright 2024-2026 the original author or authors.
33 */
44
55package io .modelcontextprotocol .client .transport ;
2323import io .modelcontextprotocol .client .McpAsyncClient ;
2424import io .modelcontextprotocol .client .transport .ResponseSubscribers .ResponseEvent ;
2525import io .modelcontextprotocol .client .transport .customizer .McpAsyncHttpClientRequestCustomizer ;
26+ import io .modelcontextprotocol .client .transport .customizer .McpHttpClientAuthorizationErrorHandler ;
2627import io .modelcontextprotocol .client .transport .customizer .McpSyncHttpClientRequestCustomizer ;
2728import io .modelcontextprotocol .common .McpTransportContext ;
2829import io .modelcontextprotocol .json .McpJsonDefaults ;
7273 * </p>
7374 *
7475 * @author Christian Tzolov
76+ * @author Daniel Garnier-Moiroux
7577 * @see <a href=
7678 * "https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">Streamable
7779 * HTTP transport specification</a>
@@ -115,6 +117,8 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
115117
116118 private final boolean openConnectionOnStartup ;
117119
120+ private final McpHttpClientAuthorizationErrorHandler authorizationErrorHandler ;
121+
118122 private final boolean resumableStreams ;
119123
120124 private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer ;
@@ -132,14 +136,15 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
132136 private HttpClientStreamableHttpTransport (McpJsonMapper jsonMapper , HttpClient httpClient ,
133137 HttpRequest .Builder requestBuilder , String baseUri , String endpoint , boolean resumableStreams ,
134138 boolean openConnectionOnStartup , McpAsyncHttpClientRequestCustomizer httpRequestCustomizer ,
135- List <String > supportedProtocolVersions ) {
139+ McpHttpClientAuthorizationErrorHandler authorizationErrorHandler , List <String > supportedProtocolVersions ) {
136140 this .jsonMapper = jsonMapper ;
137141 this .httpClient = httpClient ;
138142 this .requestBuilder = requestBuilder ;
139143 this .baseUri = URI .create (baseUri );
140144 this .endpoint = endpoint ;
141145 this .resumableStreams = resumableStreams ;
142146 this .openConnectionOnStartup = openConnectionOnStartup ;
147+ this .authorizationErrorHandler = authorizationErrorHandler ;
143148 this .activeSession .set (createTransportSession ());
144149 this .httpRequestCustomizer = httpRequestCustomizer ;
145150 this .supportedProtocolVersions = Collections .unmodifiableList (supportedProtocolVersions );
@@ -239,7 +244,6 @@ public Mono<Void> closeGracefully() {
239244 }
240245
241246 private Mono <Disposable > reconnect (McpTransportStream <Disposable > stream ) {
242-
243247 return Mono .deferContextual (ctx -> {
244248
245249 if (stream != null ) {
@@ -275,121 +279,125 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
275279 var transportContext = connectionCtx .getOrDefault (McpTransportContext .KEY , McpTransportContext .EMPTY );
276280 return Mono .from (this .httpRequestCustomizer .customize (builder , "GET" , uri , null , transportContext ));
277281 })
278- .flatMapMany (
279- requestBuilder -> Flux .<ResponseEvent >create (
280- sseSink -> this .httpClient
281- .sendAsync (requestBuilder .build (),
282- responseInfo -> ResponseSubscribers .sseToBodySubscriber (responseInfo ,
283- sseSink ))
284- .whenComplete ((response , throwable ) -> {
285- if (throwable != null ) {
286- sseSink .error (throwable );
287- }
288- else {
289- logger .debug ("SSE connection established successfully" );
290- }
291- }))
292- .map (responseEvent -> (ResponseSubscribers .SseResponseEvent ) responseEvent )
293- .flatMap (responseEvent -> {
294- int statusCode = responseEvent .responseInfo ().statusCode ();
295-
296- if (statusCode >= 200 && statusCode < 300 ) {
297-
298- if (MESSAGE_EVENT_TYPE .equals (responseEvent .sseEvent ().event ())) {
299- String data = responseEvent .sseEvent ().data ();
300- // Per 2025-11-25 spec (SEP-1699), servers may
301- // send SSE events
302- // with empty data to prime the client for
303- // reconnection.
304- // Skip these events as they contain no JSON-RPC
305- // message.
306- if (data == null || data .isBlank ()) {
307- logger .debug ("Skipping SSE event with empty data (stream primer)" );
308- return Flux .empty ();
309- }
310- try {
311- // We don't support batching ATM and probably
312- // won't since the next version considers
313- // removing it.
314- McpSchema .JSONRPCMessage message = McpSchema
315- .deserializeJsonRpcMessage (this .jsonMapper , data );
316-
317- Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >> idWithMessages = Tuples
318- .of (Optional .ofNullable (responseEvent .sseEvent ().id ()),
319- List .of (message ));
320-
321- McpTransportStream <Disposable > sessionStream = stream != null ? stream
322- : new DefaultMcpTransportStream <>(this .resumableStreams ,
323- this ::reconnect );
324- logger .debug ("Connected stream {}" , sessionStream .streamId ());
325-
326- return Flux .from (sessionStream .consumeSseStream (Flux .just (idWithMessages )));
327-
328- }
329- catch (IOException ioException ) {
330- return Flux .<McpSchema .JSONRPCMessage >error (new McpTransportException (
331- "Error parsing JSON-RPC message: " + responseEvent , ioException ));
332- }
333- }
334- else {
335- logger .debug ("Received SSE event with type: {}" , responseEvent .sseEvent ());
336- return Flux .empty ();
337- }
338- }
339- else if (statusCode == METHOD_NOT_ALLOWED ) { // NotAllowed
340- logger
341- .debug ("The server does not support SSE streams, using request-response mode." );
282+ .flatMapMany (requestBuilder -> Flux .<ResponseEvent >create (sseSink -> this .httpClient
283+ .sendAsync (requestBuilder .build (), this .toSendMessageBodySubscriber (sseSink ))
284+ .whenComplete ((response , throwable ) -> {
285+ if (throwable != null ) {
286+ sseSink .error (throwable );
287+ }
288+ else {
289+ logger .debug ("SSE connection established successfully" );
290+ }
291+ })).flatMap (responseEvent -> {
292+ int statusCode = responseEvent .responseInfo ().statusCode ();
293+ if (statusCode == 401 || statusCode == 403 ) {
294+ // Authorization error
295+ return Mono .deferContextual (innerCtx -> {
296+ var transportContext = innerCtx .getOrDefault (McpTransportContext .KEY ,
297+ McpTransportContext .EMPTY );
298+ return Mono .from (this .authorizationErrorHandler .onAuthorizationError (
299+ responseEvent .responseInfo (), transportContext , this .reconnect (stream ).then (),
300+ Mono .error (new McpHttpClientTransportException (
301+ "Authorization error connecting to SSE stream" ,
302+ responseEvent .responseInfo ()))))
303+ .then (Mono .empty ());
304+ });
305+ }
306+
307+ if (!(responseEvent instanceof ResponseSubscribers .SseResponseEvent sseResponseEvent )) {
308+ return Flux .<McpSchema .JSONRPCMessage >error (new McpHttpClientTransportException (
309+ "Unrecognized server error when connecting to SSE stream" ,
310+ responseEvent .responseInfo ()));
311+ }
312+ else if (statusCode >= 200 && statusCode < 300 ) {
313+ if (MESSAGE_EVENT_TYPE .equals (sseResponseEvent .sseEvent ().event ())) {
314+ String data = sseResponseEvent .sseEvent ().data ();
315+ // Per 2025-11-25 spec (SEP-1699), servers may
316+ // send SSE events
317+ // with empty data to prime the client for
318+ // reconnection.
319+ // Skip these events as they contain no JSON-RPC
320+ // message.
321+ if (data == null || data .isBlank ()) {
322+ logger .debug ("Skipping SSE event with empty data (stream primer)" );
342323 return Flux .empty ();
343324 }
344- else if (statusCode == NOT_FOUND ) {
345-
346- if (transportSession != null && transportSession .sessionId ().isPresent ()) {
347- // only if the request was sent with a session id
348- // and the response is 404, we consider it a
349- // session not found error.
350- logger .debug ("Session not found for session ID: {}" ,
351- transportSession .sessionId ().get ());
352- String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
353- McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException (
354- "Session not found for session ID: " + sessionIdRepresentation );
355- return Flux .<McpSchema .JSONRPCMessage >error (exception );
356- }
357- return Flux .<McpSchema .JSONRPCMessage >error (
358- new McpTransportException ("Server Not Found. Status code:" + statusCode
359- + ", response-event:" + responseEvent ));
360- }
361- else if (statusCode == BAD_REQUEST ) {
362- if (transportSession != null && transportSession .sessionId ().isPresent ()) {
363- // only if the request was sent with a session id
364- // and thre response is 404, we consider it a
365- // session not found error.
366- String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
367- McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException (
368- "Session not found for session ID: " + sessionIdRepresentation );
369- return Flux .<McpSchema .JSONRPCMessage >error (exception );
370- }
371- return Flux .<McpSchema .JSONRPCMessage >error (
372- new McpTransportException ("Bad Request. Status code:" + statusCode
373- + ", response-event:" + responseEvent ));
325+ try {
326+ // We don't support batching ATM and probably
327+ // won't since the next version considers
328+ // removing it.
329+ McpSchema .JSONRPCMessage message = McpSchema
330+ .deserializeJsonRpcMessage (this .jsonMapper , data );
374331
375- }
332+ Tuple2 <Optional <String >, Iterable <McpSchema .JSONRPCMessage >> idWithMessages = Tuples
333+ .of (Optional .ofNullable (sseResponseEvent .sseEvent ().id ()), List .of (message ));
334+
335+ McpTransportStream <Disposable > sessionStream = stream != null ? stream
336+ : new DefaultMcpTransportStream <>(this .resumableStreams , this ::reconnect );
337+ logger .debug ("Connected stream {}" , sessionStream .streamId ());
376338
377- return Flux .<McpSchema .JSONRPCMessage >error (new McpTransportException (
378- "Received unrecognized SSE event type: " + responseEvent .sseEvent ().event ()));
379- }).<McpSchema
380- .JSONRPCMessage >flatMap (
381- jsonrpcMessage -> this .handler .get ().apply (Mono .just (jsonrpcMessage )))
382- .onErrorMap (CompletionException .class , t -> t .getCause ())
383- .onErrorComplete (t -> {
384- this .handleException (t );
385- return true ;
386- })
387- .doFinally (s -> {
388- Disposable ref = disposableRef .getAndSet (null );
389- if (ref != null ) {
390- transportSession .removeConnection (ref );
339+ return Flux .from (sessionStream .consumeSseStream (Flux .just (idWithMessages )));
340+
341+ }
342+ catch (IOException ioException ) {
343+ return Flux .<McpSchema .JSONRPCMessage >error (new McpTransportException (
344+ "Error parsing JSON-RPC message: " + responseEvent , ioException ));
391345 }
392- }))
346+ }
347+ else {
348+ logger .debug ("Received SSE event with type: {}" , sseResponseEvent .sseEvent ());
349+ return Flux .empty ();
350+ }
351+ }
352+ else if (statusCode == METHOD_NOT_ALLOWED ) { // NotAllowed
353+ logger .debug ("The server does not support SSE streams, using request-response mode." );
354+ return Flux .empty ();
355+ }
356+ else if (statusCode == NOT_FOUND ) {
357+
358+ if (transportSession != null && transportSession .sessionId ().isPresent ()) {
359+ // only if the request was sent with a session id
360+ // and the response is 404, we consider it a
361+ // session not found error.
362+ logger .debug ("Session not found for session ID: {}" ,
363+ transportSession .sessionId ().get ());
364+ String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
365+ McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException (
366+ "Session not found for session ID: " + sessionIdRepresentation );
367+ return Flux .<McpSchema .JSONRPCMessage >error (exception );
368+ }
369+ return Flux .<McpSchema .JSONRPCMessage >error (
370+ new McpTransportException ("Server Not Found. Status code:" + statusCode
371+ + ", response-event:" + responseEvent ));
372+ }
373+ else if (statusCode == BAD_REQUEST ) {
374+ if (transportSession != null && transportSession .sessionId ().isPresent ()) {
375+ // only if the request was sent with a session id
376+ // and thre response is 404, we consider it a
377+ // session not found error.
378+ String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
379+ McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException (
380+ "Session not found for session ID: " + sessionIdRepresentation );
381+ return Flux .<McpSchema .JSONRPCMessage >error (exception );
382+ }
383+ return Flux .<McpSchema .JSONRPCMessage >error (new McpTransportException (
384+ "Bad Request. Status code:" + statusCode + ", response-event:" + responseEvent ));
385+ }
386+ return Flux .<McpSchema .JSONRPCMessage >error (new McpTransportException (
387+ "Received unrecognized SSE event type: " + sseResponseEvent .sseEvent ().event ()));
388+ })
389+ .flatMap (jsonrpcMessage -> this .handler .get ().apply (Mono .just (jsonrpcMessage )))
390+ .onErrorMap (CompletionException .class , t -> t .getCause ())
391+ .onErrorComplete (t -> {
392+ this .handleException (t );
393+ return true ;
394+ })
395+ .doFinally (s -> {
396+ Disposable ref = disposableRef .getAndSet (null );
397+ if (ref != null ) {
398+ transportSession .removeConnection (ref );
399+ }
400+ }))
393401 .contextWrite (ctx )
394402 .subscribe ();
395403
@@ -478,6 +486,20 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
478486 })).onErrorMap (CompletionException .class , t -> t .getCause ()).onErrorComplete ().subscribe ();
479487
480488 })).flatMap (responseEvent -> {
489+ int statusCode = responseEvent .responseInfo ().statusCode ();
490+ if (statusCode == 401 || statusCode == 403 ) {
491+ return Mono .deferContextual (ctx -> {
492+ var transportContext = ctx .getOrDefault (McpTransportContext .KEY , McpTransportContext .EMPTY );
493+ return Mono
494+ .from (this .authorizationErrorHandler .onAuthorizationError (responseEvent .responseInfo (),
495+ transportContext , Mono .defer (() -> this .sendMessage (sentMessage )),
496+ Mono .error (new McpHttpClientTransportException (
497+ "Authorization error when sending message" , responseEvent .responseInfo ()))))
498+ .doOnSuccess (s -> deliveredSink .success ())
499+ .then (Mono .empty ());
500+ });
501+ }
502+
481503 if (transportSession .markInitialized (
482504 responseEvent .responseInfo ().headers ().firstValue ("mcp-session-id" ).orElseGet (() -> null ))) {
483505 // Once we have a session, we try to open an async stream for
@@ -488,8 +510,6 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
488510
489511 String sessionRepresentation = sessionIdOrPlaceholder (transportSession );
490512
491- int statusCode = responseEvent .responseInfo ().statusCode ();
492-
493513 if (statusCode >= 200 && statusCode < 300 ) {
494514
495515 String contentType = responseEvent .responseInfo ()
@@ -664,6 +684,8 @@ public static class Builder {
664684 private List <String > supportedProtocolVersions = List .of (ProtocolVersions .MCP_2024_11_05 ,
665685 ProtocolVersions .MCP_2025_03_26 , ProtocolVersions .MCP_2025_06_18 , ProtocolVersions .MCP_2025_11_25 );
666686
687+ private McpHttpClientAuthorizationErrorHandler authorizationErrorHandler = McpHttpClientAuthorizationErrorHandler .NOOP ;
688+
667689 /**
668690 * Creates a new builder with the specified base URI.
669691 * @param baseUri the base URI of the MCP server
@@ -801,6 +823,17 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as
801823 return this ;
802824 }
803825
826+ /**
827+ * Sets the handler to be used when the server responds with HTTP 401 or HTTP 403
828+ * when sending a message.
829+ * @param authorizationErrorHandler the handler
830+ * @return this builder
831+ */
832+ public Builder authorizationErrorHandler (McpHttpClientAuthorizationErrorHandler authorizationErrorHandler ) {
833+ this .authorizationErrorHandler = authorizationErrorHandler ;
834+ return this ;
835+ }
836+
804837 /**
805838 * Sets the connection timeout for the HTTP client.
806839 * @param connectTimeout the connection timeout duration
@@ -845,7 +878,7 @@ public HttpClientStreamableHttpTransport build() {
845878 HttpClient httpClient = this .clientBuilder .connectTimeout (this .connectTimeout ).build ();
846879 return new HttpClientStreamableHttpTransport (jsonMapper == null ? McpJsonDefaults .getMapper () : jsonMapper ,
847880 httpClient , requestBuilder , baseUri , endpoint , resumableStreams , openConnectionOnStartup ,
848- httpRequestCustomizer , supportedProtocolVersions );
881+ httpRequestCustomizer , authorizationErrorHandler , supportedProtocolVersions );
849882 }
850883
851884 }
0 commit comments