Skip to content

Commit cd33249

Browse files
committed
Add server Streamable HTTP transport
- Changes IJsonRpcMessage to an abstract base class so RelatedTransport will always be available - Streamable HTTP supports multiple concurrent HTTP request with their own indpendent SSE response streams - RelatedTransport indicates the source or destination of the JsonRpcMessage - Changes the default RequestId to a JSON number for better compatibility with MCP servers in the wild
1 parent b0cd654 commit cd33249

44 files changed

Lines changed: 1473 additions & 384 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/ModelContextProtocol.AspNetCore/HttpMcpServerBuilderExtensions.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,20 @@ public static class HttpMcpServerBuilderExtensions
1111
{
1212
/// <summary>
1313
/// Adds the services necessary for <see cref="M:McpEndpointRouteBuilderExtensions.MapMcp"/>
14-
/// to handle MCP requests and sessions using the MCP HTTP Streaming transport. For more information on configuring the underlying HTTP server
14+
/// to handle MCP requests and sessions using the MCP Streamable HTTP transport. For more information on configuring the underlying HTTP server
1515
/// to control things like port binding custom TLS certificates, see the <see href="https://learn.microsoft.com/aspnet/core/fundamentals/minimal-apis">Minimal APIs quick reference</see>.
1616
/// </summary>
1717
/// <param name="builder">The builder instance.</param>
18-
/// <param name="configureOptions">Configures options for the HTTP Streaming transport. This allows configuring per-session
18+
/// <param name="configureOptions">Configures options for the Streamable HTTP transport. This allows configuring per-session
1919
/// <see cref="McpServerOptions"/> and running logic before and after a session.</param>
2020
/// <returns>The builder provided in <paramref name="builder"/>.</returns>
2121
/// <exception cref="ArgumentNullException"><paramref name="builder"/> is <see langword="null"/>.</exception>
2222
public static IMcpServerBuilder WithHttpTransport(this IMcpServerBuilder builder, Action<HttpServerTransportOptions>? configureOptions = null)
2323
{
2424
ArgumentNullException.ThrowIfNull(builder);
2525
builder.Services.TryAddSingleton<StreamableHttpHandler>();
26+
builder.Services.TryAddSingleton<SseHandler>();
27+
builder.Services.AddHostedService<IdleSessionBackgroundService>();
2628

2729
if (configureOptions is not null)
2830
{

src/ModelContextProtocol.AspNetCore/HttpMcpSession.cs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,31 @@
1-
using ModelContextProtocol.Protocol.Transport;
1+
using ModelContextProtocol.Server;
22
using System.Security.Claims;
33

44
namespace ModelContextProtocol.AspNetCore;
55

6-
internal class HttpMcpSession
6+
internal sealed class HttpMcpSession<TTransport>(string sessionId, TTransport transport, ClaimsPrincipal user, TimeProvider timeProvider)
77
{
8-
public HttpMcpSession(SseResponseStreamTransport transport, ClaimsPrincipal user)
8+
private int _getStarted;
9+
private int _referenceCount;
10+
11+
public string Id { get; } = sessionId;
12+
public TTransport Transport { get; } = transport;
13+
public (string Type, string Value, string Issuer)? UserIdClaim { get; } = GetUserIdClaim(user);
14+
15+
public bool IsActive => _referenceCount > 0;
16+
public long LastActivityTicks { get; private set; } = timeProvider.GetUtcNow().UtcTicks;
17+
18+
public bool TryStartGet() => Interlocked.Exchange(ref _getStarted, 1) == 0;
19+
20+
public IMcpServer? Server { get; init; }
21+
public Task? ServerRunTask { get; init; }
22+
23+
public IDisposable AcquireReference()
924
{
10-
Transport = transport;
11-
UserIdClaim = GetUserIdClaim(user);
25+
Interlocked.Increment(ref _referenceCount);
26+
return new UnreferenceDisposable(this, timeProvider);
1227
}
1328

14-
public SseResponseStreamTransport Transport { get; }
15-
public (string Type, string Value, string Issuer)? UserIdClaim { get; }
16-
1729
public bool HasSameUserId(ClaimsPrincipal user)
1830
=> UserIdClaim == GetUserIdClaim(user);
1931

@@ -36,4 +48,15 @@ private static (string Type, string Value, string Issuer)? GetUserIdClaim(Claims
3648

3749
return null;
3850
}
51+
52+
private sealed class UnreferenceDisposable(HttpMcpSession<TTransport> session, TimeProvider timeProvider) : IDisposable
53+
{
54+
public void Dispose()
55+
{
56+
if (Interlocked.Decrement(ref session._referenceCount) == 0)
57+
{
58+
session.LastActivityTicks = timeProvider.GetUtcNow().UtcTicks;
59+
}
60+
}
61+
}
3962
}

src/ModelContextProtocol.AspNetCore/HttpServerTransportOptions.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,18 @@ public class HttpServerTransportOptions
2121
/// This is useful for running logic before a sessions starts and after it completes.
2222
/// </summary>
2323
public Func<HttpContext, IMcpServer, CancellationToken, Task>? RunSessionHandler { get; set; }
24+
25+
/// <summary>
26+
/// Represents the duration of time the server will wait between any active requests before timing out an
27+
/// MCP session. This is checked in background every 5 seconds. A client trying to resume a session will
28+
/// receive a 404 status code and should restart their session. A client can keep their session open by
29+
/// keeping a GET request open. The default value is set to 2 minutes.
30+
/// minutes.
31+
/// </summary>
32+
public TimeSpan IdleTimeout { get; set; } = TimeSpan.FromMinutes(2);
33+
34+
/// <summary>
35+
/// Used for testing the <see cref="IdleTimeout"/>.
36+
/// </summary>
37+
public TimeProvider TimeProvider { get; set; } = TimeProvider.System;
2438
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
using Microsoft.Extensions.Hosting;
2+
using Microsoft.Extensions.Options;
3+
4+
namespace ModelContextProtocol.AspNetCore;
5+
6+
internal sealed class IdleSessionBackgroundService(StreamableHttpHandler handler, IOptions<HttpServerTransportOptions> options) : BackgroundService
7+
{
8+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
9+
{
10+
var timeProvider = options.Value.TimeProvider;
11+
var timer = new PeriodicTimer(TimeSpan.FromSeconds(5), timeProvider);
12+
13+
while (!stoppingToken.IsCancellationRequested && await timer.WaitForNextTickAsync(stoppingToken))
14+
{
15+
var idleActivityCutoff = timeProvider.GetUtcNow().Ticks - options.Value.IdleTimeout.Ticks;
16+
17+
foreach (var (_, session) in handler.Sessions)
18+
{
19+
if (session.IsActive || session.LastActivityTicks > idleActivityCutoff)
20+
{
21+
continue;
22+
}
23+
24+
if (handler.Sessions.TryRemove(session.Id, out var removedSession))
25+
{
26+
await removedSession.Transport.DisposeAsync();
27+
}
28+
}
29+
}
30+
}
31+
32+
public override async Task StopAsync(CancellationToken cancellationToken)
33+
{
34+
try
35+
{
36+
foreach (var (sessionKey, _) in handler.Sessions)
37+
{
38+
if (handler.Sessions.TryRemove(sessionKey, out var session))
39+
{
40+
await session.Transport.DisposeAsync();
41+
}
42+
}
43+
}
44+
finally
45+
{
46+
await base.StopAsync(cancellationToken);
47+
}
48+
}
49+
}
Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
using Microsoft.AspNetCore.Routing;
1+
using Microsoft.AspNetCore.Http;
2+
using Microsoft.AspNetCore.Http.Metadata;
3+
using Microsoft.AspNetCore.Routing;
24
using Microsoft.Extensions.DependencyInjection;
35
using ModelContextProtocol.AspNetCore;
6+
using ModelContextProtocol.Protocol.Messages;
47
using System.Diagnostics.CodeAnalysis;
58

69
namespace Microsoft.AspNetCore.Builder;
@@ -11,21 +14,42 @@ namespace Microsoft.AspNetCore.Builder;
1114
public static class McpEndpointRouteBuilderExtensions
1215
{
1316
/// <summary>
14-
/// Sets up endpoints for handling MCP HTTP Streaming transport.
15-
/// See <see href="https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">the protocol specification</see> for details about the Streamable HTTP transport.
17+
/// Sets up endpoints for handling MCP Streamable HTTP transport.
18+
/// See <see href="https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">the 2025-03-26 protocol specification</see> for details about the Streamable HTTP transport.
19+
/// Also maps legacy SSE endpoints for backward compatibility at the path "/sse" and "/message". <see href="https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#http-with-sse">the 2024-11-05 protocol specification</see> for details about the HTTP with SSE transport.
1620
/// </summary>
1721
/// <param name="endpoints">The web application to attach MCP HTTP endpoints.</param>
1822
/// <param name="pattern">The route pattern prefix to map to.</param>
1923
/// <returns>Returns a builder for configuring additional endpoint conventions like authorization policies.</returns>
2024
public static IEndpointConventionBuilder MapMcp(this IEndpointRouteBuilder endpoints, [StringSyntax("Route")] string pattern = "")
2125
{
22-
var handler = endpoints.ServiceProvider.GetService<StreamableHttpHandler>() ??
26+
var streamableHttpHandler = endpoints.ServiceProvider.GetService<StreamableHttpHandler>() ??
2327
throw new InvalidOperationException("You must call WithHttpTransport(). Unable to find required services. Call builder.Services.AddMcpServer().WithHttpTransport() in application startup code.");
2428

25-
var routeGroup = endpoints.MapGroup(pattern);
26-
routeGroup.MapGet("", handler.HandleRequestAsync);
27-
routeGroup.MapGet("/sse", handler.HandleRequestAsync);
28-
routeGroup.MapPost("/message", handler.HandleRequestAsync);
29-
return routeGroup;
29+
var mcpGroup = endpoints.MapGroup(pattern);
30+
var streamableHttpGroup = mcpGroup.MapGroup("")
31+
.WithDisplayName(b => $"MCP Streamable HTTP | {b.DisplayName}")
32+
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status404NotFound, typeof(JsonRpcError), contentTypes: ["application/json"]));
33+
34+
streamableHttpGroup.MapPost("", streamableHttpHandler.HandlePostRequestAsync)
35+
.WithMetadata(new AcceptsMetadata(["application/json"]))
36+
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status200OK, contentTypes: ["text/event-stream"]))
37+
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status202Accepted));
38+
streamableHttpGroup.MapGet("", streamableHttpHandler.HandleGetRequestAsync)
39+
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status200OK, contentTypes: ["text/event-stream"]));
40+
streamableHttpGroup.MapDelete("", streamableHttpHandler.HandleDeleteRequestAsync);
41+
42+
// Map legacy HTTP with SSE endpoints.
43+
var sseHandler = endpoints.ServiceProvider.GetRequiredService<SseHandler>();
44+
var sseGroup = mcpGroup.MapGroup("")
45+
.WithDisplayName(b => $"MCP HTTP with SSE | {b.DisplayName}");
46+
47+
sseGroup.MapGet("/sse", sseHandler.HandleSseRequestAsync)
48+
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status200OK, contentTypes: ["text/event-stream"]));
49+
sseGroup.MapPost("/message", sseHandler.HandleMessageRequestAsync)
50+
.WithMetadata(new AcceptsMetadata(["application/json"]))
51+
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status202Accepted));
52+
53+
return mcpGroup;
3054
}
3155
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
using Microsoft.AspNetCore.Http;
2+
using Microsoft.Extensions.Hosting;
3+
using Microsoft.Extensions.Logging;
4+
using Microsoft.Extensions.Options;
5+
using ModelContextProtocol.Protocol.Messages;
6+
using ModelContextProtocol.Protocol.Transport;
7+
using ModelContextProtocol.Server;
8+
using ModelContextProtocol.Utils.Json;
9+
using System.Collections.Concurrent;
10+
using System.Diagnostics;
11+
12+
namespace ModelContextProtocol.AspNetCore;
13+
14+
internal sealed class SseHandler(
15+
IOptions<McpServerOptions> mcpServerOptionsSnapshot,
16+
IOptionsFactory<McpServerOptions> mcpServerOptionsFactory,
17+
IOptions<HttpServerTransportOptions> httpMcpServerOptions,
18+
IHostApplicationLifetime hostApplicationLifetime,
19+
ILoggerFactory loggerFactory)
20+
{
21+
private readonly ConcurrentDictionary<string, HttpMcpSession<SseResponseStreamTransport>> _sessions = new(StringComparer.Ordinal);
22+
23+
public async Task HandleSseRequestAsync(HttpContext context)
24+
{
25+
var sessionId = StreamableHttpHandler.MakeNewSessionId();
26+
27+
// If the server is shutting down, we need to cancel all SSE connections immediately without waiting for HostOptions.ShutdownTimeout
28+
// which defaults to 30 seconds.
29+
using var sseCts = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted, hostApplicationLifetime.ApplicationStopping);
30+
var cancellationToken = sseCts.Token;
31+
32+
StreamableHttpHandler.InitializeSseResponse(context);
33+
34+
await using var transport = new SseResponseStreamTransport(context.Response.Body, $"message?sessionId={sessionId}");
35+
var httpMcpSession = new HttpMcpSession<SseResponseStreamTransport>(sessionId, transport, context.User, httpMcpServerOptions.Value.TimeProvider);
36+
if (!_sessions.TryAdd(sessionId, httpMcpSession))
37+
{
38+
throw new UnreachableException($"Unreachable given good entropy! Session with ID '{sessionId}' has already been created.");
39+
}
40+
41+
try
42+
{
43+
var mcpServerOptions = mcpServerOptionsSnapshot.Value;
44+
if (httpMcpServerOptions.Value.ConfigureSessionOptions is { } configureSessionOptions)
45+
{
46+
mcpServerOptions = mcpServerOptionsFactory.Create(Options.DefaultName);
47+
await configureSessionOptions(context, mcpServerOptions, cancellationToken);
48+
}
49+
50+
var transportTask = transport.RunAsync(cancellationToken);
51+
52+
try
53+
{
54+
await using var mcpServer = McpServerFactory.Create(transport, mcpServerOptions, loggerFactory, context.RequestServices);
55+
context.Features.Set(mcpServer);
56+
57+
var runSessionAsync = httpMcpServerOptions.Value.RunSessionHandler ?? StreamableHttpHandler.RunSessionAsync;
58+
await runSessionAsync(context, mcpServer, cancellationToken);
59+
}
60+
finally
61+
{
62+
await transport.DisposeAsync();
63+
await transportTask;
64+
}
65+
}
66+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
67+
{
68+
// RequestAborted always triggers when the client disconnects before a complete response body is written,
69+
// but this is how SSE connections are typically closed.
70+
}
71+
finally
72+
{
73+
_sessions.TryRemove(sessionId, out _);
74+
}
75+
}
76+
77+
public async Task HandleMessageRequestAsync(HttpContext context)
78+
{
79+
if (!context.Request.Query.TryGetValue("sessionId", out var sessionId))
80+
{
81+
await Results.BadRequest("Missing sessionId query parameter.").ExecuteAsync(context);
82+
return;
83+
}
84+
85+
if (!_sessions.TryGetValue(sessionId.ToString(), out var httpMcpSession))
86+
{
87+
await Results.BadRequest($"Session ID not found.").ExecuteAsync(context);
88+
return;
89+
}
90+
91+
if (!httpMcpSession.HasSameUserId(context.User))
92+
{
93+
await Results.Forbid().ExecuteAsync(context);
94+
return;
95+
}
96+
97+
var message = (JsonRpcMessage?)await context.Request.ReadFromJsonAsync(McpJsonUtilities.DefaultOptions.GetTypeInfo(typeof(JsonRpcMessage)), context.RequestAborted);
98+
if (message is null)
99+
{
100+
await Results.BadRequest("No message in request body.").ExecuteAsync(context);
101+
return;
102+
}
103+
104+
await httpMcpSession.Transport.OnMessageReceivedAsync(message, context.RequestAborted);
105+
context.Response.StatusCode = StatusCodes.Status202Accepted;
106+
await context.Response.WriteAsync("Accepted");
107+
}
108+
}

0 commit comments

Comments
 (0)