forked from modelcontextprotocol/csharp-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathStreamableHttpServerTransport.cs
More file actions
99 lines (90 loc) · 4.78 KB
/
StreamableHttpServerTransport.cs
File metadata and controls
99 lines (90 loc) · 4.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
using ModelContextProtocol.Protocol.Messages;
using System.IO.Pipelines;
using System.Threading.Channels;
namespace ModelContextProtocol.Protocol.Transport;
/// <summary>
/// Provides an <see cref="ITransport"/> implementation using Server-Sent Events (SSE) for server-to-client communication.
/// </summary>
/// <remarks>
/// <para>
/// This transport provides one-way communication from server to client using the SSE protocol over HTTP,
/// while receiving client messages through a separate mechanism. It writes messages as
/// SSE events to a response stream, typically associated with an HTTP response.
/// </para>
/// <para>
/// This transport is used in scenarios where the server needs to push messages to the client in real-time,
/// such as when streaming completion results or providing progress updates during long-running operations.
/// </para>
/// </remarks>
public sealed class StreamableHttpServerTransport : ITransport
{
// For JsonRpcMessages without a RelatedTransport, we don't want to block just because the client didn't make a GET request to handle unsolicited messages.
private readonly SseWriter _sseWriter = new(channelOptions: new BoundedChannelOptions(1)
{
SingleReader = true,
SingleWriter = false,
FullMode = BoundedChannelFullMode.DropOldest,
});
private readonly Channel<JsonRpcMessage> _incomingChannel = Channel.CreateBounded<JsonRpcMessage>(new BoundedChannelOptions(1)
{
SingleReader = true,
SingleWriter = false,
});
private readonly CancellationTokenSource _disposeCts = new();
private int _getRequestStarted;
/// <summary>
/// Handles an optional SSE GET request a client using the Streamable HTTP transport might make by
/// writing any unsolicited JSON-RPC messages sent via <see cref="SendMessageAsync"/>
/// to the SSE response stream until cancellation is requested or the transport is disposed.
/// </summary>
/// <param name="sseResponseStream">The response stream to write MCP JSON-RPC messages as SSE events to.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the send loop that writes JSON-RPC messages to the SSE response stream.</returns>
public async Task HandleGetRequest(Stream sseResponseStream, CancellationToken cancellationToken)
{
if (Interlocked.Exchange(ref _getRequestStarted, 1) == 1)
{
throw new InvalidOperationException("Session resumption is not yet supported. Please start a new session.");
}
// We do not need to reference _disposeCts like in HandlePostRequest, because the session ending completes the _sseWriter gracefully.
await _sseWriter.WriteAllAsync(sseResponseStream, cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Handles a Streamable HTTP POST request processing both the request body and response body ensuring that
/// <see cref="JsonRpcResponse"/> and other correlated messages are sent back to the client directly in response
/// to the <see cref="JsonRpcRequest"/> that initiated the message.
/// </summary>
/// <param name="httpBodies">The duplex pipe facilitates the reading and writing of HTTP request and response data.</param>
/// <param name="cancellationToken">This token allows for the operation to be canceled if needed.</param>
/// <returns>
/// True, if data was written to the respond body.
/// False, if nothing was written because the request body did not contain any <see cref="JsonRpcRequest"/> messages to respond to.
/// The HTTP application should typically respond with an empty "202 Accepted" response in this scenario.
/// </returns>
public async Task<bool> HandlePostRequest(IDuplexPipe httpBodies, CancellationToken cancellationToken)
{
using var postCts = CancellationTokenSource.CreateLinkedTokenSource(_disposeCts.Token, cancellationToken);
await using var postTransport = new StreamableHttpPostTransport(_incomingChannel.Writer, httpBodies);
return await postTransport.RunAsync(postCts.Token).ConfigureAwait(false);
}
/// <inheritdoc/>
public ChannelReader<JsonRpcMessage> MessageReader => _incomingChannel.Reader;
/// <inheritdoc/>
public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default)
{
await _sseWriter.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc/>
public async ValueTask DisposeAsync()
{
_disposeCts.Cancel();
try
{
await _sseWriter.DisposeAsync().ConfigureAwait(false);
}
finally
{
_disposeCts.Dispose();
}
}
}