-
Notifications
You must be signed in to change notification settings - Fork 687
Expand file tree
/
Copy pathStreamableHttpServerTransport.cs
More file actions
287 lines (249 loc) · 13 KB
/
StreamableHttpServerTransport.cs
File metadata and controls
287 lines (249 loc) · 13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ModelContextProtocol.Protocol;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Net.ServerSentEvents;
using System.Security.Claims;
using System.Threading.Channels;
namespace ModelContextProtocol.Server;
/// <summary>
/// Provides an <see cref="ITransport"/> implementation using Server-Sent Events (SSE) for server-to-client communication.
/// </summary>
/// <remarks>
/// <para>
/// This transport provides one-way communication from server to client using the SSE protocol over HTTP,
/// while receiving client messages through a separate mechanism. It writes messages as
/// SSE events to a response stream, typically associated with an HTTP response.
/// </para>
/// <para>
/// This transport is used in scenarios where the server needs to push messages to the client in real-time,
/// such as when streaming completion results or providing progress updates during long-running operations.
/// </para>
/// </remarks>
public sealed partial class StreamableHttpServerTransport : ITransport
{
/// <summary>
/// The stream ID used for unsolicited messages sent via the standalone GET SSE stream.
/// </summary>
public static readonly string UnsolicitedMessageStreamId = "__get__";
private readonly Channel<JsonRpcMessage> _incomingChannel = Channel.CreateBounded<JsonRpcMessage>(new BoundedChannelOptions(1)
{
SingleReader = true,
SingleWriter = false,
});
private readonly CancellationTokenSource _transportDisposedCts = new();
private readonly SemaphoreSlim _unsolicitedMessageLock = new(1, 1);
private readonly ILogger _logger;
private SseEventWriter? _httpSseWriter;
private ISseEventStreamWriter? _storeSseWriter;
private TaskCompletionSource<bool>? _httpResponseTcs;
private bool _getHttpRequestStarted;
private bool _getHttpResponseCompleted;
/// <summary>
/// Initializes a new instance of the <see cref="StreamableHttpServerTransport"/> class.
/// </summary>
/// <param name="loggerFactory">Optional logger factory used for logging employed by the transport.</param>
public StreamableHttpServerTransport(ILoggerFactory? loggerFactory = null)
{
_logger = loggerFactory?.CreateLogger<StreamableHttpServerTransport>() ?? NullLogger<StreamableHttpServerTransport>.Instance;
}
/// <inheritdoc/>
public string? SessionId { get; init; }
/// <summary>
/// Gets or initializes a value that indicates whether the transport should be in stateless mode that does not require all requests for a given session
/// to arrive to the same ASP.NET Core application process. Unsolicited server-to-client messages are not supported in this mode,
/// so calling <see cref="HandleGetRequestAsync(Stream, CancellationToken)"/> results in an <see cref="InvalidOperationException"/>.
/// Server-to-client requests are also unsupported, because the responses might arrive at another ASP.NET Core application process.
/// Client sampling and roots capabilities are also disabled in stateless mode, because the server cannot make requests.
/// </summary>
public bool Stateless { get; init; }
/// <summary>
/// Gets or initializes a value indicating whether the execution context should flow from the calls to <see cref="HandlePostRequestAsync(JsonRpcMessage, Stream, CancellationToken)"/>
/// to the corresponding <see cref="JsonRpcMessageContext.ExecutionContext"/> property contained in the <see cref="JsonRpcMessage"/> instances returned by the <see cref="MessageReader"/>.
/// </summary>
/// <value>
/// The default is <see langword="false"/>.
/// </value>
public bool FlowExecutionContextFromRequests { get; init; }
/// <summary>
/// Gets or sets the event store for resumability support.
/// When set, events are stored and can be replayed when clients reconnect with a Last-Event-ID header.
/// </summary>
public ISseEventStreamStore? EventStreamStore { get; init; }
/// <summary>
/// Gets or sets the negotiated protocol version for this session.
/// </summary>
internal string? NegotiatedProtocolVersion { get; private set; }
/// <inheritdoc/>
public ChannelReader<JsonRpcMessage> MessageReader => _incomingChannel.Reader;
internal ChannelWriter<JsonRpcMessage> MessageWriter => _incomingChannel.Writer;
/// <summary>
/// Handles the initialize request by capturing the protocol version and invoking the user callback.
/// </summary>
internal async ValueTask HandleInitRequestAsync(InitializeRequestParams? initParams)
{
// Capture the negotiated protocol version for resumability checks
NegotiatedProtocolVersion = initParams?.ProtocolVersion;
}
/// <summary>
/// Handles an optional SSE GET request a client using the Streamable HTTP transport might make by
/// writing any unsolicited JSON-RPC messages sent via <see cref="SendMessageAsync"/>
/// to the SSE response stream until cancellation is requested or the transport is disposed.
/// </summary>
/// <param name="sseResponseStream">The response stream to write MCP JSON-RPC messages as SSE events to.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the send loop that writes JSON-RPC messages to the SSE response stream.</returns>
/// <exception cref="ArgumentNullException"><paramref name="sseResponseStream"/> is <see langword="null"/>.</exception>
/// <exception cref="InvalidOperationException">
/// <see cref="Stateless"/> is <see langword="true"/> and GET requests are not supported in stateless mode.
/// </exception>
public async Task HandleGetRequestAsync(Stream sseResponseStream, CancellationToken cancellationToken = default)
{
Throw.IfNull(sseResponseStream);
if (Stateless)
{
throw new InvalidOperationException("GET requests are not supported in stateless mode.");
}
using (await _unsolicitedMessageLock.LockAsync(cancellationToken).ConfigureAwait(false))
{
if (_getHttpRequestStarted)
{
throw new InvalidOperationException("Session resumption is not yet supported. Please start a new session.");
}
_getHttpRequestStarted = true;
_httpSseWriter = new SseEventWriter(sseResponseStream);
_httpResponseTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_storeSseWriter = await TryCreateEventStreamAsync(streamId: UnsolicitedMessageStreamId, cancellationToken).ConfigureAwait(false);
if (_storeSseWriter is not null)
{
var primingItem = await _storeSseWriter.WriteEventAsync(SseItem.Prime<JsonRpcMessage>(), cancellationToken).ConfigureAwait(false);
await _httpSseWriter.WriteAsync(primingItem, cancellationToken).ConfigureAwait(false);
}
else
{
// If there's no priming write, flush the stream to ensure HTTP response headers are
// sent to the client now that the transport is ready to accept messages via SendMessageAsync.
await sseResponseStream.FlushAsync(cancellationToken).ConfigureAwait(false);
}
}
// Wait for the response to be written before returning from the handler.
// This keeps the HTTP response open until the final response message is sent.
await _httpResponseTcs.Task.WaitAsync(cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Handles a Streamable HTTP POST request processing both the request body and response body ensuring that
/// <see cref="JsonRpcResponse"/> and other correlated messages are sent back to the client directly in response
/// to the <see cref="JsonRpcRequest"/> that initiated the message.
/// </summary>
/// <param name="message">The JSON-RPC message received from the client via the POST request body.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <param name="responseStream">The POST response body to write MCP JSON-RPC messages to.</param>
/// <returns>
/// <see langword="true"/> if data was written to the response body.
/// <see false="false"/> if nothing was written because the request body did not contain any <see cref="JsonRpcRequest"/> messages to respond to.
/// The HTTP application should typically respond with an empty "202 Accepted" response in this scenario.
/// </returns>
/// <exception cref="ArgumentNullException"><paramref name="message"/> or <paramref name="responseStream"/> is <see langword="null"/>.</exception>
/// <remarks>
/// If an authenticated <see cref="ClaimsPrincipal"/> sent the message, that can be included in the <see cref="JsonRpcMessage.Context"/>.
/// No other part of the context should be set.
/// </remarks>
public async Task<bool> HandlePostRequestAsync(JsonRpcMessage message, Stream responseStream, CancellationToken cancellationToken = default)
{
Throw.IfNull(message);
Throw.IfNull(responseStream);
var postTransport = new StreamableHttpPostTransport(this, responseStream, _transportDisposedCts.Token, _logger);
using var postCts = CancellationTokenSource.CreateLinkedTokenSource(_transportDisposedCts.Token, cancellationToken);
await using (postTransport.ConfigureAwait(false))
{
return await postTransport.HandlePostAsync(
message,
cancellationToken: postCts.Token)
.ConfigureAwait(false);
}
}
/// <inheritdoc/>
public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default)
{
Throw.IfNull(message);
if (Stateless)
{
throw new InvalidOperationException("Unsolicited server to client messages are not supported in stateless mode.");
}
using var _ = await _unsolicitedMessageLock.LockAsync(cancellationToken).ConfigureAwait(false);
if (!_getHttpRequestStarted)
{
// Clients are not required to make a GET request for unsolicited messages.
// If no GET request has been made, drop the message.
return;
}
Debug.Assert(_httpSseWriter is not null);
Debug.Assert(_httpResponseTcs is not null);
var item = SseItem.Message(message);
if (_storeSseWriter is not null)
{
item = await _storeSseWriter.WriteEventAsync(item, cancellationToken).ConfigureAwait(false);
}
if (!_getHttpResponseCompleted)
{
// Only write the message to the response if the response has not completed.
try
{
await _httpSseWriter!.WriteAsync(item, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
{
_httpResponseTcs!.TrySetException(ex);
}
}
}
/// <inheritdoc/>
public async ValueTask DisposeAsync()
{
using var _ = await _unsolicitedMessageLock.LockAsync().ConfigureAwait(false);
if (_getHttpResponseCompleted)
{
return;
}
_getHttpResponseCompleted = true;
try
{
_incomingChannel.Writer.TryComplete();
await _transportDisposedCts.CancelAsync().ConfigureAwait(false);
}
finally
{
try
{
_httpResponseTcs?.TrySetResult(true);
_httpSseWriter?.Dispose();
if (_storeSseWriter is not null)
{
await _storeSseWriter.DisposeAsync().ConfigureAwait(false);
}
}
finally
{
_transportDisposedCts.Dispose();
}
}
}
internal async ValueTask<ISseEventStreamWriter?> TryCreateEventStreamAsync(string streamId, CancellationToken cancellationToken)
{
if (EventStreamStore is null || !McpSessionHandler.SupportsPrimingEvent(NegotiatedProtocolVersion))
{
return null;
}
// We use the 'Streaming' stream mode so that in the case of an unexpected network disconnection,
// the client can continue reading the remaining messages in a single, streamed response.
const SseEventStreamMode Mode = SseEventStreamMode.Streaming;
var sseEventStreamWriter = await EventStreamStore.CreateStreamAsync(new SseEventStreamOptions
{
SessionId = SessionId ?? Guid.NewGuid().ToString("N"),
StreamId = streamId,
Mode = Mode,
}, cancellationToken).ConfigureAwait(false);
return sseEventStreamWriter;
}
}