Skip to content
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"sdk": {
"version": "9.0.100",
"version": "8.0.115",
"rollForward": "minor"
}
}
200 changes: 200 additions & 0 deletions src/ModelContextProtocol/Client/AutoDetectingClientSessionTransport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ModelContextProtocol.Protocol;
using System.Diagnostics;
using System.Threading.Channels;

namespace ModelContextProtocol.Client;

/// <summary>
/// A transport that automatically detects whether to use Streamable HTTP or SSE transport
/// by trying Streamable HTTP first and falling back to SSE if that fails.
/// </summary>
internal sealed class AutoDetectingClientSessionTransport : ITransport
{
private readonly SseClientTransportOptions _options;
private readonly HttpClient _httpClient;
private readonly ILoggerFactory? _loggerFactory;
private readonly ILogger _logger;
private readonly string _name;

private StreamableHttpClientSessionTransport? _streamableHttpTransport;
private SseClientSessionTransport? _sseTransport;
private readonly Channel<JsonRpcMessage> _messageChannel;
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you replace this Channel<JsonRpcMessage> with a private DelegatingChannelReader<JsonRpcMessage> : ChannelReader<T> that would have a reference to the _parent AutoDetectingClientSessionTransport and be able to access its private _streamableHttpTransport and _sseTransport? The DelegatingChannelReader could have a TaskCompletionSource where all the async methods wait for it before delegating to either _streamableHttpTransport or _sseTransport. That TaskCompletionSource would only be set after we determine whether or not we got a successful response from SendInitialRequestAsync. If any non-async methods are called before the TaskCompletionSource is completed, they should throw.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've replaced the Channel with a custom DelegatingChannelReader that delegates to either the StreamableHttp or SSE transport's reader in commit 57be8df. The implementation uses a TaskCompletionSource that only completes after we determine the transport type.


public AutoDetectingClientSessionTransport(SseClientTransportOptions transportOptions, HttpClient httpClient, ILoggerFactory? loggerFactory, string endpointName)
{
Throw.IfNull(transportOptions);
Throw.IfNull(httpClient);

_options = transportOptions;
_httpClient = httpClient;
_loggerFactory = loggerFactory;
_logger = (ILogger?)loggerFactory?.CreateLogger<AutoDetectingClientSessionTransport>() ?? NullLogger.Instance;
_name = endpointName;

// Unbounded channel to prevent blocking on writes
_messageChannel = Channel.CreateUnbounded<JsonRpcMessage>(new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false,
});
}

public ChannelReader<JsonRpcMessage> MessageReader => _messageChannel.Reader;

/// <inheritdoc/>
public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default)
{
if (_streamableHttpTransport == null && _sseTransport == null)
{
var rpcRequest = message as JsonRpcRequest;

// The first message must be an initialize request
Debug.Assert(rpcRequest != null && rpcRequest.Method == RequestMethods.Initialize,
"First message must be an initialize request");
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var rpcRequest = message as JsonRpcRequest;
// The first message must be an initialize request
Debug.Assert(rpcRequest != null && rpcRequest.Method == RequestMethods.Initialize,
"First message must be an initialize request");

I don't think we need this assertion after all.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the assertion in commit bac8a6b.


// Try StreamableHttp first
_streamableHttpTransport = new StreamableHttpClientSessionTransport(_options, _httpClient, _loggerFactory, _name);

try
{
var response = await _streamableHttpTransport.SendInitialRequestAsync(message, cancellationToken).ConfigureAwait(false);

// If the status code is not success, fall back to SSE
if (!response.IsSuccessStatusCode)
{
_logger.LogDebug("Streamable HTTP transport failed for {EndpointName} with status code {StatusCode}, falling back to SSE transport",
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change all logs to be source generated by turning this into a partial class and using the [LoggerMessage( attribute similar to how TransportBase does.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made AutoDetectingClientSessionTransport a partial class with source-generated logging methods using the [LoggerMessage] attribute in commit 57be8df.

_name, response.StatusCode);

await _streamableHttpTransport.DisposeAsync().ConfigureAwait(false);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should dispose this in a finally block in case SendInitialRequestAsync throws.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added proper disposal in a finally block in commit 57be8df to ensure resources are cleaned up if SendInitialRequestAsync throws.

_streamableHttpTransport = null;

await InitializeSseTransportAsync(message, cancellationToken).ConfigureAwait(false);
return;
}

// Process the response
var responseContent = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
if (response.Content.Headers.ContentType?.MediaType == "application/json")
{
await ProcessMessageFromStreamableHttpAsync(responseContent, cancellationToken).ConfigureAwait(false);
}
else if (response.Content.Headers.ContentType?.MediaType == "text/event-stream")
{
using var responseBodyStream = await response.Content.ReadAsStreamAsync(cancellationToken);
await ProcessSseResponseFromStreamableHttpAsync(responseBodyStream, rpcRequest, cancellationToken).ConfigureAwait(false);
}
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this. We shouldn't have to process the response. That should be completely handled by either the _streamableHttpTransport or _sseTransport.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the manual message processing code in commit 57be8df. All message handling is now delegated to the appropriate transport.


// Start forwarding messages
_ = ForwardMessagesAsync(_streamableHttpTransport.MessageReader, cancellationToken);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be where you could now complete the TaskCompletionSource.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented this in commit 57be8df. The TaskCompletionSource is now completed once we've successfully determined which transport to use.

}
catch (Exception ex)
{
_logger.LogDebug(ex, "Streamable HTTP transport failed for {EndpointName}, falling back to SSE transport", _name);

if (_streamableHttpTransport != null)
{
await _streamableHttpTransport.DisposeAsync().ConfigureAwait(false);
_streamableHttpTransport = null;
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing about not needing to null out _streamableHttpTransport

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit bac8a6b - removed unnecessary nulling of _streamableHttpTransport.

}

await InitializeSseTransportAsync(message, cancellationToken).ConfigureAwait(false);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only call InitializeSseTransportAsync when SendInitialRequestAsync doesn't throw but does return an unsuccessful status code.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In commit bac8a6b, updated to let non-status code exceptions propagate rather than trying to handle them with SSE fallback.

}
}
else if (_streamableHttpTransport != null)
{
await _streamableHttpTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
}
else if (_sseTransport != null)
{
await _sseTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
}
}

private async Task InitializeSseTransportAsync(JsonRpcMessage message, CancellationToken cancellationToken)
{
_sseTransport = new SseClientSessionTransport(_options, _httpClient, _loggerFactory, _name);
await _sseTransport.ConnectAsync(cancellationToken).ConfigureAwait(false);
await _sseTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);

// Start forwarding messages
_ = ForwardMessagesAsync(_sseTransport.MessageReader, cancellationToken);
}

private async Task ProcessMessageFromStreamableHttpAsync(string data, CancellationToken cancellationToken)
{
try
{
var message = System.Text.Json.JsonSerializer.Deserialize(data, McpJsonUtilities.JsonContext.Default.JsonRpcMessage);
if (message is null)
{
_logger.LogWarning("Failed to parse message from Streamable HTTP response for {EndpointName}", _name);
return;
}

bool wrote = _messageChannel.Writer.TryWrite(message);
Debug.Assert(wrote, "Failed to write message to channel");
}
catch (System.Text.Json.JsonException ex)
{
_logger.LogError(ex, "Failed to parse JSON message from Streamable HTTP response for {EndpointName}", _name);
}
}

private async Task ProcessSseResponseFromStreamableHttpAsync(Stream responseStream, JsonRpcRequest relatedRpcRequest, CancellationToken cancellationToken)
{
await foreach (System.Net.ServerSentEvents.SseItem<string> sseEvent in System.Net.ServerSentEvents.SseParser.Create(responseStream)
.EnumerateAsync(cancellationToken).ConfigureAwait(false))
{
if (sseEvent.EventType != "message")
{
continue;
}

await ProcessMessageFromStreamableHttpAsync(sseEvent.Data, cancellationToken).ConfigureAwait(false);
}
}

private async Task ForwardMessagesAsync(ChannelReader<JsonRpcMessage> reader, CancellationToken cancellationToken)
{
try
{
await foreach (var message in reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
bool wrote = _messageChannel.Writer.TryWrite(message);
Debug.Assert(wrote, "Failed to write message to channel");
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Expected when cancelling
}
catch (Exception ex)
{
_logger.LogError(ex, "Error forwarding messages from active transport for {EndpointName}", _name);
}
}

public async ValueTask DisposeAsync()
{
try
{
if (_streamableHttpTransport != null)
{
await _streamableHttpTransport.DisposeAsync().ConfigureAwait(false);
_streamableHttpTransport = null;
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_streamableHttpTransport = null;

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the unnecessary null assignment in commit 6df9961.

}

if (_sseTransport != null)
{
await _sseTransport.DisposeAsync().ConfigureAwait(false);
_sseTransport = null;
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_sseTransport = null;

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the unnecessary null assignment in commit 6df9961.

}
}
finally
{
_messageChannel.Writer.Complete();
}
}
}
23 changes: 23 additions & 0 deletions src/ModelContextProtocol/Client/HttpTransportMode.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace ModelContextProtocol.Client;

/// <summary>
/// Specifies the transport mode for HTTP client connections.
/// </summary>
public enum HttpTransportMode
{
/// <summary>
/// Automatically detect the appropriate transport by trying Streamable HTTP first, then falling back to SSE if that fails.
/// This is the recommended mode for maximum compatibility.
/// </summary>
AutoDetect,

/// <summary>
/// Use only the Streamable HTTP transport.
/// </summary>
StreamableHttp,

/// <summary>
/// Use only the HTTP with SSE transport.
/// </summary>
Sse
}
17 changes: 15 additions & 2 deletions src/ModelContextProtocol/Client/SseClientTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,24 @@ public SseClientTransport(SseClientTransportOptions transportOptions, HttpClient
/// <inheritdoc />
public async Task<ITransport> ConnectAsync(CancellationToken cancellationToken = default)
{
if (_options.UseStreamableHttp)
switch (_options.TransportMode)
{
return new StreamableHttpClientSessionTransport(_options, _httpClient, _loggerFactory, Name);
case HttpTransportMode.StreamableHttp:
return new StreamableHttpClientSessionTransport(_options, _httpClient, _loggerFactory, Name);

case HttpTransportMode.Sse:
return await ConnectSseTransportAsync(cancellationToken).ConfigureAwait(false);

case HttpTransportMode.AutoDetect:
return new AutoDetectingClientSessionTransport(_options, _httpClient, _loggerFactory, Name);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this default case to the top.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the default case to the top of the switch statement in commit 6df9961.


default:
throw new ArgumentOutOfRangeException(nameof(_options.TransportMode), _options.TransportMode, "Unsupported transport mode");
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a normal ArgumentException instead.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to use a normal ArgumentException in commit 6df9961.

}
}

private async Task<ITransport> ConnectSseTransportAsync(CancellationToken cancellationToken)
{
var sessionTransport = new SseClientSessionTransport(_options, _httpClient, _loggerFactory, Name);

try
Expand Down
15 changes: 11 additions & 4 deletions src/ModelContextProtocol/Client/SseClientTransportOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,19 @@ public required Uri Endpoint
}
}



/// <summary>
/// Gets or sets a value indicating whether to use "Streamable HTTP" for the transport rather than "HTTP with SSE". Defaults to false.
/// <see href="https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">Streamable HTTP transport specification</see>.
/// <see href="https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#http-with-sse">HTTP with SSE transport specification</see>.
/// Gets or sets the transport mode to use for the connection. Defaults to <see cref="HttpTransportMode.AutoDetect"/>.
/// </summary>
public bool UseStreamableHttp { get; init; }
/// <remarks>
/// <para>
/// When set to <see cref="HttpTransportMode.AutoDetect"/> (the default), the client will first attempt to use
/// Streamable HTTP transport and automatically fall back to SSE transport if the server doesn't support it.
/// This provides the best compatibility and matches the behavior of VS Code.
/// </para>
/// </remarks>
public HttpTransportMode TransportMode { get; init; } = HttpTransportMode.AutoDetect;

/// <summary>
/// Gets a transport identifier used for logging purposes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,40 @@ private async Task ReceiveUnsolicitedMessagesAsync()
return null;
}

private void LogJsonException(JsonException ex, string data)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this method back.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added back the SendInitialRequestAsync method in commit 57be8df and refactored it to share code with SendMessageAsync through a new private method.

/// <summary>
/// Sends the initial initialization request and returns the HTTP response so the status code can be checked.
/// </summary>
/// <param name="message">The initialize message to send, which must be a JsonRpcRequest with the method "initialize".</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The HTTP response message for the initialization request.</returns>
internal async Task<HttpResponseMessage> SendInitialRequestAsync(
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SendInitialRequestAsync and SendMessageAsync should share code. The implementation of each should be a couple lines at most because they call a shared method. The only difference is that SendInitialRequestAsync holds on to the HttpResponseMessage and returns it rather than dispose it.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've refactored the code in commit 57be8df to share implementation between SendInitialRequestAsync and SendMessageAsync using a new private method SendHttpRequestInternalAsync that handles the common logic.

JsonRpcMessage message,
CancellationToken cancellationToken = default)
{
if (_logger.IsEnabled(LogLevel.Trace))
{
LogTransportMessageParseFailedSensitive(Name, data, ex);
}
else
using var sendCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _connectionCts.Token);
cancellationToken = sendCts.Token;

#if NET
using var content = JsonContent.Create(message, McpJsonUtilities.JsonContext.Default.JsonRpcMessage);
#else
using var content = new StringContent(
JsonSerializer.Serialize(message, McpJsonUtilities.JsonContext.Default.JsonRpcMessage),
Encoding.UTF8,
"application/json; charset=utf-8"
);
#endif

using var httpRequestMessage = new HttpRequestMessage(HttpMethod.Post, _options.Endpoint)
{
LogTransportMessageParseFailed(Name, ex);
}
Content = content,
Headers =
{
Accept = { s_applicationJsonMediaType, s_textEventStreamMediaType },
},
};

CopyAdditionalHeaders(httpRequestMessage.Headers, _options.AdditionalHeaders, _mcpSessionId);
return await _httpClient.SendAsync(httpRequestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
}

internal static void CopyAdditionalHeaders(HttpRequestHeaders headers, Dictionary<string, string>? additionalHeaders, string? sessionId = null)
Expand Down
Loading
Loading