-
Notifications
You must be signed in to change notification settings - Fork 0
Add automatic transport fallback for SSE client connections #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
d069086
d8ec0fd
96c619e
0dc383b
d03d43d
02abc8d
d1e0144
57be8df
bac8a6b
6df9961
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| { | ||
| "sdk": { | ||
| "version": "9.0.100", | ||
| "version": "8.0.115", | ||
| "rollForward": "minor" | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,200 @@ | ||||||||||||
| using Microsoft.Extensions.Logging; | ||||||||||||
| using Microsoft.Extensions.Logging.Abstractions; | ||||||||||||
| using ModelContextProtocol.Protocol; | ||||||||||||
| using System.Diagnostics; | ||||||||||||
| using System.Threading.Channels; | ||||||||||||
|
|
||||||||||||
| namespace ModelContextProtocol.Client; | ||||||||||||
|
|
||||||||||||
| /// <summary> | ||||||||||||
| /// A transport that automatically detects whether to use Streamable HTTP or SSE transport | ||||||||||||
| /// by trying Streamable HTTP first and falling back to SSE if that fails. | ||||||||||||
| /// </summary> | ||||||||||||
| internal sealed class AutoDetectingClientSessionTransport : ITransport | ||||||||||||
| { | ||||||||||||
| private readonly SseClientTransportOptions _options; | ||||||||||||
| private readonly HttpClient _httpClient; | ||||||||||||
| private readonly ILoggerFactory? _loggerFactory; | ||||||||||||
| private readonly ILogger _logger; | ||||||||||||
| private readonly string _name; | ||||||||||||
|
|
||||||||||||
| private StreamableHttpClientSessionTransport? _streamableHttpTransport; | ||||||||||||
| private SseClientSessionTransport? _sseTransport; | ||||||||||||
| private readonly Channel<JsonRpcMessage> _messageChannel; | ||||||||||||
|
|
||||||||||||
| public AutoDetectingClientSessionTransport(SseClientTransportOptions transportOptions, HttpClient httpClient, ILoggerFactory? loggerFactory, string endpointName) | ||||||||||||
| { | ||||||||||||
| Throw.IfNull(transportOptions); | ||||||||||||
| Throw.IfNull(httpClient); | ||||||||||||
|
|
||||||||||||
| _options = transportOptions; | ||||||||||||
| _httpClient = httpClient; | ||||||||||||
| _loggerFactory = loggerFactory; | ||||||||||||
| _logger = (ILogger?)loggerFactory?.CreateLogger<AutoDetectingClientSessionTransport>() ?? NullLogger.Instance; | ||||||||||||
| _name = endpointName; | ||||||||||||
|
|
||||||||||||
| // Unbounded channel to prevent blocking on writes | ||||||||||||
| _messageChannel = Channel.CreateUnbounded<JsonRpcMessage>(new UnboundedChannelOptions | ||||||||||||
| { | ||||||||||||
| SingleReader = true, | ||||||||||||
| SingleWriter = false, | ||||||||||||
| }); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| public ChannelReader<JsonRpcMessage> MessageReader => _messageChannel.Reader; | ||||||||||||
|
|
||||||||||||
| /// <inheritdoc/> | ||||||||||||
| public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default) | ||||||||||||
| { | ||||||||||||
| if (_streamableHttpTransport == null && _sseTransport == null) | ||||||||||||
| { | ||||||||||||
| var rpcRequest = message as JsonRpcRequest; | ||||||||||||
|
|
||||||||||||
| // The first message must be an initialize request | ||||||||||||
| Debug.Assert(rpcRequest != null && rpcRequest.Method == RequestMethods.Initialize, | ||||||||||||
| "First message must be an initialize request"); | ||||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I don't think we need this assertion after all.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've removed the assertion in commit bac8a6b. |
||||||||||||
|
|
||||||||||||
| // Try StreamableHttp first | ||||||||||||
| _streamableHttpTransport = new StreamableHttpClientSessionTransport(_options, _httpClient, _loggerFactory, _name); | ||||||||||||
|
|
||||||||||||
| try | ||||||||||||
| { | ||||||||||||
| var response = await _streamableHttpTransport.SendInitialRequestAsync(message, cancellationToken).ConfigureAwait(false); | ||||||||||||
|
|
||||||||||||
| // If the status code is not success, fall back to SSE | ||||||||||||
| if (!response.IsSuccessStatusCode) | ||||||||||||
| { | ||||||||||||
| _logger.LogDebug("Streamable HTTP transport failed for {EndpointName} with status code {StatusCode}, falling back to SSE transport", | ||||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you change all logs to be source generated by turning this into a partial class and using the
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've made AutoDetectingClientSessionTransport a partial class with source-generated logging methods using the [LoggerMessage] attribute in commit 57be8df. |
||||||||||||
| _name, response.StatusCode); | ||||||||||||
|
|
||||||||||||
| await _streamableHttpTransport.DisposeAsync().ConfigureAwait(false); | ||||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should dispose this in a finally block in case SendInitialRequestAsync throws.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added proper disposal in a finally block in commit 57be8df to ensure resources are cleaned up if SendInitialRequestAsync throws. |
||||||||||||
| _streamableHttpTransport = null; | ||||||||||||
|
|
||||||||||||
| await InitializeSseTransportAsync(message, cancellationToken).ConfigureAwait(false); | ||||||||||||
| return; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // Process the response | ||||||||||||
| var responseContent = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); | ||||||||||||
| if (response.Content.Headers.ContentType?.MediaType == "application/json") | ||||||||||||
| { | ||||||||||||
| await ProcessMessageFromStreamableHttpAsync(responseContent, cancellationToken).ConfigureAwait(false); | ||||||||||||
| } | ||||||||||||
| else if (response.Content.Headers.ContentType?.MediaType == "text/event-stream") | ||||||||||||
| { | ||||||||||||
| using var responseBodyStream = await response.Content.ReadAsStreamAsync(cancellationToken); | ||||||||||||
| await ProcessSseResponseFromStreamableHttpAsync(responseBodyStream, rpcRequest, cancellationToken).ConfigureAwait(false); | ||||||||||||
| } | ||||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Delete this. We shouldn't have to process the response. That should be completely handled by either the _streamableHttpTransport or _sseTransport.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've removed the manual message processing code in commit 57be8df. All message handling is now delegated to the appropriate transport. |
||||||||||||
|
|
||||||||||||
| // Start forwarding messages | ||||||||||||
| _ = ForwardMessagesAsync(_streamableHttpTransport.MessageReader, cancellationToken); | ||||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would be where you could now complete the TaskCompletionSource.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've implemented this in commit 57be8df. The TaskCompletionSource is now completed once we've successfully determined which transport to use. |
||||||||||||
| } | ||||||||||||
| catch (Exception ex) | ||||||||||||
| { | ||||||||||||
| _logger.LogDebug(ex, "Streamable HTTP transport failed for {EndpointName}, falling back to SSE transport", _name); | ||||||||||||
|
|
||||||||||||
| if (_streamableHttpTransport != null) | ||||||||||||
| { | ||||||||||||
| await _streamableHttpTransport.DisposeAsync().ConfigureAwait(false); | ||||||||||||
| _streamableHttpTransport = null; | ||||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same thing about not needing to null out _streamableHttpTransport
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in commit bac8a6b - removed unnecessary nulling of _streamableHttpTransport. |
||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| await InitializeSseTransportAsync(message, cancellationToken).ConfigureAwait(false); | ||||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should only call InitializeSseTransportAsync when SendInitialRequestAsync doesn't throw but does return an unsuccessful status code.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In commit bac8a6b, updated to let non-status code exceptions propagate rather than trying to handle them with SSE fallback. |
||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| else if (_streamableHttpTransport != null) | ||||||||||||
| { | ||||||||||||
| await _streamableHttpTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false); | ||||||||||||
| } | ||||||||||||
| else if (_sseTransport != null) | ||||||||||||
| { | ||||||||||||
| await _sseTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private async Task InitializeSseTransportAsync(JsonRpcMessage message, CancellationToken cancellationToken) | ||||||||||||
| { | ||||||||||||
| _sseTransport = new SseClientSessionTransport(_options, _httpClient, _loggerFactory, _name); | ||||||||||||
| await _sseTransport.ConnectAsync(cancellationToken).ConfigureAwait(false); | ||||||||||||
| await _sseTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false); | ||||||||||||
|
|
||||||||||||
| // Start forwarding messages | ||||||||||||
| _ = ForwardMessagesAsync(_sseTransport.MessageReader, cancellationToken); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private async Task ProcessMessageFromStreamableHttpAsync(string data, CancellationToken cancellationToken) | ||||||||||||
| { | ||||||||||||
| try | ||||||||||||
| { | ||||||||||||
| var message = System.Text.Json.JsonSerializer.Deserialize(data, McpJsonUtilities.JsonContext.Default.JsonRpcMessage); | ||||||||||||
| if (message is null) | ||||||||||||
| { | ||||||||||||
| _logger.LogWarning("Failed to parse message from Streamable HTTP response for {EndpointName}", _name); | ||||||||||||
| return; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| bool wrote = _messageChannel.Writer.TryWrite(message); | ||||||||||||
| Debug.Assert(wrote, "Failed to write message to channel"); | ||||||||||||
| } | ||||||||||||
| catch (System.Text.Json.JsonException ex) | ||||||||||||
| { | ||||||||||||
| _logger.LogError(ex, "Failed to parse JSON message from Streamable HTTP response for {EndpointName}", _name); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private async Task ProcessSseResponseFromStreamableHttpAsync(Stream responseStream, JsonRpcRequest relatedRpcRequest, CancellationToken cancellationToken) | ||||||||||||
| { | ||||||||||||
| await foreach (System.Net.ServerSentEvents.SseItem<string> sseEvent in System.Net.ServerSentEvents.SseParser.Create(responseStream) | ||||||||||||
| .EnumerateAsync(cancellationToken).ConfigureAwait(false)) | ||||||||||||
| { | ||||||||||||
| if (sseEvent.EventType != "message") | ||||||||||||
| { | ||||||||||||
| continue; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| await ProcessMessageFromStreamableHttpAsync(sseEvent.Data, cancellationToken).ConfigureAwait(false); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private async Task ForwardMessagesAsync(ChannelReader<JsonRpcMessage> reader, CancellationToken cancellationToken) | ||||||||||||
| { | ||||||||||||
| try | ||||||||||||
| { | ||||||||||||
| await foreach (var message in reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) | ||||||||||||
| { | ||||||||||||
| bool wrote = _messageChannel.Writer.TryWrite(message); | ||||||||||||
| Debug.Assert(wrote, "Failed to write message to channel"); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) | ||||||||||||
| { | ||||||||||||
| // Expected when cancelling | ||||||||||||
| } | ||||||||||||
| catch (Exception ex) | ||||||||||||
| { | ||||||||||||
| _logger.LogError(ex, "Error forwarding messages from active transport for {EndpointName}", _name); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| public async ValueTask DisposeAsync() | ||||||||||||
| { | ||||||||||||
| try | ||||||||||||
| { | ||||||||||||
| if (_streamableHttpTransport != null) | ||||||||||||
| { | ||||||||||||
| await _streamableHttpTransport.DisposeAsync().ConfigureAwait(false); | ||||||||||||
| _streamableHttpTransport = null; | ||||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed the unnecessary null assignment in commit 6df9961. |
||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| if (_sseTransport != null) | ||||||||||||
| { | ||||||||||||
| await _sseTransport.DisposeAsync().ConfigureAwait(false); | ||||||||||||
| _sseTransport = null; | ||||||||||||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed the unnecessary null assignment in commit 6df9961. |
||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| finally | ||||||||||||
| { | ||||||||||||
| _messageChannel.Writer.Complete(); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| namespace ModelContextProtocol.Client; | ||
|
|
||
| /// <summary> | ||
| /// Specifies the transport mode for HTTP client connections. | ||
| /// </summary> | ||
| public enum HttpTransportMode | ||
| { | ||
| /// <summary> | ||
| /// Automatically detect the appropriate transport by trying Streamable HTTP first, then falling back to SSE if that fails. | ||
| /// This is the recommended mode for maximum compatibility. | ||
| /// </summary> | ||
| AutoDetect, | ||
|
|
||
| /// <summary> | ||
| /// Use only the Streamable HTTP transport. | ||
| /// </summary> | ||
| StreamableHttp, | ||
|
|
||
| /// <summary> | ||
| /// Use only the HTTP with SSE transport. | ||
| /// </summary> | ||
| Sse | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,11 +57,24 @@ public SseClientTransport(SseClientTransportOptions transportOptions, HttpClient | |
| /// <inheritdoc /> | ||
| public async Task<ITransport> ConnectAsync(CancellationToken cancellationToken = default) | ||
| { | ||
| if (_options.UseStreamableHttp) | ||
| switch (_options.TransportMode) | ||
| { | ||
| return new StreamableHttpClientSessionTransport(_options, _httpClient, _loggerFactory, Name); | ||
| case HttpTransportMode.StreamableHttp: | ||
| return new StreamableHttpClientSessionTransport(_options, _httpClient, _loggerFactory, Name); | ||
|
|
||
| case HttpTransportMode.Sse: | ||
| return await ConnectSseTransportAsync(cancellationToken).ConfigureAwait(false); | ||
|
|
||
| case HttpTransportMode.AutoDetect: | ||
| return new AutoDetectingClientSessionTransport(_options, _httpClient, _loggerFactory, Name); | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this default case to the top.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved the default case to the top of the switch statement in commit 6df9961. |
||
|
|
||
| default: | ||
| throw new ArgumentOutOfRangeException(nameof(_options.TransportMode), _options.TransportMode, "Unsupported transport mode"); | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make this a normal ArgumentException instead.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed to use a normal ArgumentException in commit 6df9961. |
||
| } | ||
| } | ||
|
|
||
| private async Task<ITransport> ConnectSseTransportAsync(CancellationToken cancellationToken) | ||
| { | ||
| var sessionTransport = new SseClientSessionTransport(_options, _httpClient, _loggerFactory, Name); | ||
|
|
||
| try | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -203,16 +203,40 @@ private async Task ReceiveUnsolicitedMessagesAsync() | |
| return null; | ||
| } | ||
|
|
||
| private void LogJsonException(JsonException ex, string data) | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add this method back.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added back the SendInitialRequestAsync method in commit 57be8df and refactored it to share code with SendMessageAsync through a new private method. |
||
| /// <summary> | ||
| /// Sends the initial initialization request and returns the HTTP response so the status code can be checked. | ||
| /// </summary> | ||
| /// <param name="message">The initialize message to send, which must be a JsonRpcRequest with the method "initialize".</param> | ||
| /// <param name="cancellationToken">The cancellation token.</param> | ||
| /// <returns>The HTTP response message for the initialization request.</returns> | ||
| internal async Task<HttpResponseMessage> SendInitialRequestAsync( | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SendInitialRequestAsync and SendMessageAsync should share code. The implementation of each should be a couple lines at most because they call a shared method. The only difference is that SendInitialRequestAsync holds on to the HttpResponseMessage and returns it rather than dispose it.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've refactored the code in commit 57be8df to share implementation between SendInitialRequestAsync and SendMessageAsync using a new private method SendHttpRequestInternalAsync that handles the common logic. |
||
| JsonRpcMessage message, | ||
| CancellationToken cancellationToken = default) | ||
| { | ||
| if (_logger.IsEnabled(LogLevel.Trace)) | ||
| { | ||
| LogTransportMessageParseFailedSensitive(Name, data, ex); | ||
| } | ||
| else | ||
| using var sendCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _connectionCts.Token); | ||
| cancellationToken = sendCts.Token; | ||
|
|
||
| #if NET | ||
| using var content = JsonContent.Create(message, McpJsonUtilities.JsonContext.Default.JsonRpcMessage); | ||
| #else | ||
| using var content = new StringContent( | ||
| JsonSerializer.Serialize(message, McpJsonUtilities.JsonContext.Default.JsonRpcMessage), | ||
| Encoding.UTF8, | ||
| "application/json; charset=utf-8" | ||
| ); | ||
| #endif | ||
|
|
||
| using var httpRequestMessage = new HttpRequestMessage(HttpMethod.Post, _options.Endpoint) | ||
| { | ||
| LogTransportMessageParseFailed(Name, ex); | ||
| } | ||
| Content = content, | ||
| Headers = | ||
| { | ||
| Accept = { s_applicationJsonMediaType, s_textEventStreamMediaType }, | ||
| }, | ||
| }; | ||
|
|
||
| CopyAdditionalHeaders(httpRequestMessage.Headers, _options.AdditionalHeaders, _mcpSessionId); | ||
| return await _httpClient.SendAsync(httpRequestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); | ||
| } | ||
|
|
||
| internal static void CopyAdditionalHeaders(HttpRequestHeaders headers, Dictionary<string, string>? additionalHeaders, string? sessionId = null) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you replace this
Channel<JsonRpcMessage>with a privateDelegatingChannelReader<JsonRpcMessage> : ChannelReader<T>that would have a reference to the _parent AutoDetectingClientSessionTransport and be able to access its private _streamableHttpTransport and _sseTransport? The DelegatingChannelReader could have a TaskCompletionSource where all the async methods wait for it before delegating to either _streamableHttpTransport or _sseTransport. That TaskCompletionSource would only be set after we determine whether or not we got a successful response from SendInitialRequestAsync. If any non-async methods are called before the TaskCompletionSource is completed, they should throw.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've replaced the Channel with a custom DelegatingChannelReader that delegates to either the StreamableHttp or SSE transport's reader in commit 57be8df. The implementation uses a TaskCompletionSource that only completes after we determine the transport type.