forked from modelcontextprotocol/csharp-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Add automatic transport fallback for SSE client connections #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
d069086
Initial plan for issue
Copilot d8ec0fd
Initial plan for automatic transport fallback
Copilot 96c619e
Implement automatic transport fallback functionality
Copilot 0dc383b
Update and improve auto-detect transport tests
Copilot d03d43d
Remove UseStreamableHttp property and make TransportMode non-nullable
Copilot 02abc8d
Clean up duplicate tests and fix ConnectAsync method
Copilot d1e0144
Implement requested transport changes
Copilot 57be8df
Implement requested transport improvements
Copilot bac8a6b
Address review feedback for transport implementation
Copilot 6df9961
Apply requested code changes for AutoDetecting transport
Copilot File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| { | ||
| "sdk": { | ||
| "version": "9.0.100", | ||
| "version": "8.0.115", | ||
| "rollForward": "minor" | ||
| } | ||
| } |
163 changes: 163 additions & 0 deletions
163
src/ModelContextProtocol/Client/AutoDetectingClientSessionTransport.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,163 @@ | ||
| using Microsoft.Extensions.Logging; | ||
| using Microsoft.Extensions.Logging.Abstractions; | ||
| using ModelContextProtocol.Protocol; | ||
| using System.Diagnostics; | ||
| using System.Threading.Channels; | ||
|
|
||
| namespace ModelContextProtocol.Client; | ||
|
|
||
| /// <summary> | ||
| /// A transport that automatically detects whether to use Streamable HTTP or SSE transport | ||
| /// by trying Streamable HTTP first and falling back to SSE if that fails. | ||
| /// </summary> | ||
| internal sealed partial class AutoDetectingClientSessionTransport : ITransport | ||
| { | ||
| private readonly SseClientTransportOptions _options; | ||
| private readonly HttpClient _httpClient; | ||
| private readonly ILoggerFactory? _loggerFactory; | ||
| private readonly ILogger _logger; | ||
| private readonly string _name; | ||
| private readonly DelegatingChannelReader<JsonRpcMessage> _delegatingChannelReader; | ||
|
|
||
| private StreamableHttpClientSessionTransport? _streamableHttpTransport; | ||
| private SseClientSessionTransport? _sseTransport; | ||
|
|
||
| public AutoDetectingClientSessionTransport(SseClientTransportOptions transportOptions, HttpClient httpClient, ILoggerFactory? loggerFactory, string endpointName) | ||
| { | ||
| Throw.IfNull(transportOptions); | ||
| Throw.IfNull(httpClient); | ||
|
|
||
| _options = transportOptions; | ||
| _httpClient = httpClient; | ||
| _loggerFactory = loggerFactory; | ||
| _logger = (ILogger?)loggerFactory?.CreateLogger<AutoDetectingClientSessionTransport>() ?? NullLogger.Instance; | ||
| _name = endpointName; | ||
| _delegatingChannelReader = new DelegatingChannelReader<JsonRpcMessage>(this); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Returns the active transport (either StreamableHttp or SSE) | ||
| /// </summary> | ||
| internal ITransport? ActiveTransport => _streamableHttpTransport != null ? (ITransport)_streamableHttpTransport : _sseTransport; | ||
|
|
||
| public ChannelReader<JsonRpcMessage> MessageReader => _delegatingChannelReader; | ||
|
|
||
| /// <inheritdoc/> | ||
| public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default) | ||
| { | ||
| if (_streamableHttpTransport == null && _sseTransport == null) | ||
| { | ||
| var rpcRequest = message as JsonRpcRequest; | ||
|
|
||
| // Try StreamableHttp first | ||
| _streamableHttpTransport = new StreamableHttpClientSessionTransport(_options, _httpClient, _loggerFactory, _name); | ||
|
|
||
| try | ||
| { | ||
| LogAttemptingStreamableHttp(_name); | ||
| var response = await _streamableHttpTransport.SendInitialRequestAsync(message, cancellationToken).ConfigureAwait(false); | ||
|
|
||
| // If the status code is not success, fall back to SSE | ||
| if (!response.IsSuccessStatusCode) | ||
| { | ||
| LogStreamableHttpFailed(_name, response.StatusCode); | ||
|
|
||
| await _streamableHttpTransport.DisposeAsync().ConfigureAwait(false); | ||
| await InitializeSseTransportAsync(message, cancellationToken).ConfigureAwait(false); | ||
| return; | ||
| } | ||
|
|
||
| // Process the streamable HTTP response using the transport | ||
| await _streamableHttpTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false); | ||
|
|
||
| // Signal that we have established a connection | ||
| LogUsingStreamableHttp(_name); | ||
| _delegatingChannelReader.SetConnected(); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| LogStreamableHttpException(_name, ex); | ||
|
|
||
| await _streamableHttpTransport.DisposeAsync().ConfigureAwait(false); | ||
|
|
||
| // Propagate the original exception | ||
| throw; | ||
| } | ||
| } | ||
| else if (_streamableHttpTransport != null) | ||
| { | ||
| await _streamableHttpTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false); | ||
| } | ||
| else if (_sseTransport != null) | ||
| { | ||
| await _sseTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false); | ||
| } | ||
| } | ||
|
|
||
| private async Task InitializeSseTransportAsync(JsonRpcMessage message, CancellationToken cancellationToken) | ||
| { | ||
| _sseTransport = new SseClientSessionTransport(_options, _httpClient, _loggerFactory, _name); | ||
|
|
||
| try | ||
| { | ||
| LogAttemptingSSE(_name); | ||
| await _sseTransport.ConnectAsync(cancellationToken).ConfigureAwait(false); | ||
| await _sseTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false); | ||
|
|
||
| // Signal that we have established a connection | ||
| LogUsingSSE(_name); | ||
| _delegatingChannelReader.SetConnected(); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| LogSSEConnectionFailed(_name, ex); | ||
| _delegatingChannelReader.SetError(ex); | ||
| await _sseTransport.DisposeAsync().ConfigureAwait(false); | ||
| throw; | ||
| } | ||
| } | ||
|
|
||
| public async ValueTask DisposeAsync() | ||
| { | ||
| try | ||
| { | ||
| if (_streamableHttpTransport != null) | ||
| { | ||
| await _streamableHttpTransport.DisposeAsync().ConfigureAwait(false); | ||
| } | ||
|
|
||
| if (_sseTransport != null) | ||
| { | ||
| await _sseTransport.DisposeAsync().ConfigureAwait(false); | ||
| } | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| LogDisposeFailed(_name, ex); | ||
| } | ||
| } | ||
|
|
||
| [LoggerMessage(Level = LogLevel.Debug, Message = "{EndpointName}: Attempting to connect using Streamable HTTP transport.")] | ||
| private partial void LogAttemptingStreamableHttp(string endpointName); | ||
|
|
||
| [LoggerMessage(Level = LogLevel.Debug, Message = "{EndpointName}: Streamable HTTP transport failed with status code {StatusCode}, falling back to SSE transport.")] | ||
| private partial void LogStreamableHttpFailed(string endpointName, System.Net.HttpStatusCode statusCode); | ||
|
|
||
| [LoggerMessage(Level = LogLevel.Debug, Message = "{EndpointName}: Streamable HTTP transport failed with exception, falling back to SSE transport.")] | ||
| private partial void LogStreamableHttpException(string endpointName, Exception exception); | ||
|
|
||
| [LoggerMessage(Level = LogLevel.Debug, Message = "{EndpointName}: Using Streamable HTTP transport.")] | ||
| private partial void LogUsingStreamableHttp(string endpointName); | ||
|
|
||
| [LoggerMessage(Level = LogLevel.Debug, Message = "{EndpointName}: Attempting to connect using SSE transport.")] | ||
| private partial void LogAttemptingSSE(string endpointName); | ||
|
|
||
| [LoggerMessage(Level = LogLevel.Debug, Message = "{EndpointName}: Using SSE transport.")] | ||
| private partial void LogUsingSSE(string endpointName); | ||
|
|
||
| [LoggerMessage(Level = LogLevel.Error, Message = "{EndpointName}: Failed to connect using both Streamable HTTP and SSE transports.")] | ||
| private partial void LogSSEConnectionFailed(string endpointName, Exception exception); | ||
|
|
||
| [LoggerMessage(Level = LogLevel.Warning, Message = "{EndpointName}: Error disposing transport.")] | ||
| private partial void LogDisposeFailed(string endpointName, Exception exception); | ||
| } | ||
149 changes: 149 additions & 0 deletions
149
src/ModelContextProtocol/Client/DelegatingChannelReader.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| using System.Diagnostics; | ||
| using System.Runtime.CompilerServices; | ||
| using System.Threading.Channels; | ||
|
|
||
| namespace ModelContextProtocol.Client; | ||
|
|
||
| /// <summary> | ||
| /// A <see cref="ChannelReader{T}"/> implementation that delegates to another reader | ||
| /// after a connection has been established. | ||
| /// </summary> | ||
| /// <typeparam name="T">The type of data in the channel.</typeparam> | ||
| internal sealed class DelegatingChannelReader<T> : ChannelReader<T> | ||
| { | ||
| private readonly TaskCompletionSource<bool> _connectionEstablished; | ||
| private readonly AutoDetectingClientSessionTransport _parent; | ||
|
|
||
| public DelegatingChannelReader(AutoDetectingClientSessionTransport parent) | ||
| { | ||
| _parent = parent; | ||
| _connectionEstablished = new TaskCompletionSource<bool>(); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Signals that the transport has been established and operations can proceed. | ||
| /// </summary> | ||
| public void SetConnected() | ||
| { | ||
| _connectionEstablished.TrySetResult(true); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Sets the error if connection couldn't be established. | ||
| /// </summary> | ||
| public void SetError(Exception exception) | ||
| { | ||
| _connectionEstablished.TrySetException(exception); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Gets the channel reader to delegate to. | ||
| /// </summary> | ||
| private ChannelReader<T> GetReader() | ||
| { | ||
| if (_connectionEstablished.Task.Status != TaskStatus.RanToCompletion) | ||
| { | ||
| throw new InvalidOperationException("Transport connection not yet established."); | ||
| } | ||
|
|
||
| return (_parent.ActiveTransport?.MessageReader as ChannelReader<T>)!; | ||
| } | ||
|
|
||
| #if !NETSTANDARD2_0 | ||
| /// <inheritdoc/> | ||
| public override bool CanCount => GetReader().CanCount; | ||
|
|
||
| /// <inheritdoc/> | ||
| public override bool CanPeek => GetReader().CanPeek; | ||
|
|
||
| /// <inheritdoc/> | ||
| public override int Count => GetReader().Count; | ||
| #endif | ||
|
|
||
| /// <inheritdoc/> | ||
| public override bool TryPeek(out T item) | ||
| { | ||
| if (_connectionEstablished.Task.Status != TaskStatus.RanToCompletion) | ||
| { | ||
| item = default!; | ||
| return false; | ||
| } | ||
|
|
||
| return GetReader().TryPeek(out item!); | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| public override bool TryRead(out T item) | ||
| { | ||
| if (_connectionEstablished.Task.Status != TaskStatus.RanToCompletion) | ||
| { | ||
| item = default!; | ||
| return false; | ||
| } | ||
|
|
||
| return GetReader().TryRead(out item!); | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default) | ||
| { | ||
| // First wait for the connection to be established | ||
| if (_connectionEstablished.Task.Status != TaskStatus.RanToCompletion) | ||
| { | ||
| return new ValueTask<bool>(WaitForConnectionAndThenReadAsync(cancellationToken)); | ||
| } | ||
|
|
||
| // Then delegate to the active reader | ||
| return GetReader().WaitToReadAsync(cancellationToken); | ||
| } | ||
|
|
||
| private async Task<bool> WaitForConnectionAndThenReadAsync(CancellationToken cancellationToken) | ||
| { | ||
| await _connectionEstablished.Task.ConfigureAwait(false); | ||
| return await GetReader().WaitToReadAsync(cancellationToken).ConfigureAwait(false); | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| public override ValueTask<T> ReadAsync(CancellationToken cancellationToken = default) | ||
| { | ||
| // First wait for the connection to be established | ||
| if (_connectionEstablished.Task.Status != TaskStatus.RanToCompletion) | ||
| { | ||
| return new ValueTask<T>(WaitForConnectionAndThenGetItemAsync(cancellationToken)); | ||
| } | ||
|
|
||
| // Then delegate to the active reader | ||
| return GetReader().ReadAsync(cancellationToken); | ||
| } | ||
|
|
||
| private async Task<T> WaitForConnectionAndThenGetItemAsync(CancellationToken cancellationToken) | ||
| { | ||
| await _connectionEstablished.Task.ConfigureAwait(false); | ||
| return await GetReader().ReadAsync(cancellationToken).ConfigureAwait(false); | ||
| } | ||
|
|
||
| #if NETSTANDARD2_0 | ||
| public IAsyncEnumerable<T> ReadAllAsync(CancellationToken cancellationToken = default) | ||
| { | ||
| // Create a simple async enumerable implementation | ||
| async IAsyncEnumerable<T> ReadAllAsyncImplementation() | ||
| { | ||
| while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false)) | ||
| { | ||
| while (TryRead(out var item)) | ||
| { | ||
| yield return item; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return ReadAllAsyncImplementation(); | ||
| } | ||
| #else | ||
| /// <inheritdoc/> | ||
| public override IAsyncEnumerable<T> ReadAllAsync(CancellationToken cancellationToken = default) | ||
| { | ||
| return base.ReadAllAsync(cancellationToken); | ||
| } | ||
| #endif | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| namespace ModelContextProtocol.Client; | ||
|
|
||
| /// <summary> | ||
| /// Specifies the transport mode for HTTP client connections. | ||
| /// </summary> | ||
| public enum HttpTransportMode | ||
| { | ||
| /// <summary> | ||
| /// Automatically detect the appropriate transport by trying Streamable HTTP first, then falling back to SSE if that fails. | ||
| /// This is the recommended mode for maximum compatibility. | ||
| /// </summary> | ||
| AutoDetect, | ||
|
|
||
| /// <summary> | ||
| /// Use only the Streamable HTTP transport. | ||
| /// </summary> | ||
| StreamableHttp, | ||
|
|
||
| /// <summary> | ||
| /// Use only the HTTP with SSE transport. | ||
| /// </summary> | ||
| Sse | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should dispose this in a finally block in case SendInitialRequestAsync throws.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added proper disposal in a finally block in commit 57be8df to ensure resources are cleaned up if SendInitialRequestAsync throws.