33using System . Text . Json ;
44using System . Threading . Channels ;
55using ModelContextProtocol . Protocol . Messages ;
6- using ModelContextProtocol . Protocol . Transport ;
76using ModelContextProtocol . Utils . Json ;
87
9- namespace AspNetCoreSseServer ;
8+ namespace ModelContextProtocol . Protocol . Transport ;
109
11- public class SseServerStreamTransport ( Stream sseResponseStream ) : ITransport
10+ /// <summary>
11+ /// Implements the MCP SSE server transport protocol using the SSE response <see cref="Stream"/>.
12+ /// </summary>
13+ /// <param name="sseResponseStream">The stream to write the SSE response body to.</param>
14+ public sealed class SseResponseStreamTransport ( Stream sseResponseStream ) : ITransport
1215{
1316 private readonly Channel < IJsonRpcMessage > _incomingChannel = CreateSingleItemChannel < IJsonRpcMessage > ( ) ;
1417 private readonly Channel < SseItem < IJsonRpcMessage ? > > _outgoingSseChannel = CreateSingleItemChannel < SseItem < IJsonRpcMessage ? > > ( ) ;
1518
1619 private Task ? _sseWriteTask ;
1720 private Utf8JsonWriter ? _jsonWriter ;
1821
22+ /// <inherityydoc/>
1923 public bool IsConnected => _sseWriteTask ? . IsCompleted == false ;
2024
25+ /// <summary>
26+ /// Starts the transport and writes the JSON-RPC messages sent via <see cref="SendMessageAsync(IJsonRpcMessage, CancellationToken)"/>
27+ /// to the SSE response stream until cancelled or disposed.
28+ /// </summary>
29+ /// <param name="cancellationToken">A token to cancel writing to the SSE response stream.</param>
30+ /// <returns>A task representing the send loop that writes JSON-RPC messages to the SSE response stream.</returns>
2131 public Task RunAsync ( CancellationToken cancellationToken )
2232 {
2333 void WriteJsonRpcMessageToBuffer ( SseItem < IJsonRpcMessage ? > item , IBufferWriter < byte > writer )
@@ -28,7 +38,7 @@ void WriteJsonRpcMessageToBuffer(SseItem<IJsonRpcMessage?> item, IBufferWriter<b
2838 return ;
2939 }
3040
31- JsonSerializer . Serialize ( GetUtf8JsonWriter ( writer ) , item . Data , McpJsonUtilities . DefaultOptions ) ;
41+ JsonSerializer . Serialize ( GetUtf8JsonWriter ( writer ) , item . Data , McpJsonUtilities . DefaultOptions . GetTypeInfo < IJsonRpcMessage ? > ( ) ) ;
3242 }
3343
3444 // The very first SSE event isn't really an IJsonRpcMessage, but there's no API to write a single item of a different type,
@@ -39,23 +49,40 @@ void WriteJsonRpcMessageToBuffer(SseItem<IJsonRpcMessage?> item, IBufferWriter<b
3949 return _sseWriteTask = SseFormatter . WriteAsync ( sseItems , sseResponseStream , WriteJsonRpcMessageToBuffer , cancellationToken ) ;
4050 }
4151
52+ /// <inheritdoc/>
4253 public ChannelReader < IJsonRpcMessage > MessageReader => _incomingChannel . Reader ;
4354
55+ /// <inheritdoc/>
4456 public ValueTask DisposeAsync ( )
4557 {
4658 _incomingChannel . Writer . TryComplete ( ) ;
4759 _outgoingSseChannel . Writer . TryComplete ( ) ;
4860 return new ValueTask ( _sseWriteTask ?? Task . CompletedTask ) ;
4961 }
5062
51- public Task SendMessageAsync ( IJsonRpcMessage message , CancellationToken cancellationToken = default ) =>
52- _outgoingSseChannel . Writer . WriteAsync ( new SseItem < IJsonRpcMessage ? > ( message ) , cancellationToken ) . AsTask ( ) ;
63+ /// <inheritdoc/>
64+ public Task SendMessageAsync ( IJsonRpcMessage message , CancellationToken cancellationToken = default )
65+ {
66+ if ( _sseWriteTask is null )
67+ {
68+ throw new InvalidOperationException ( $ "Transport is not connected. Make sure to call { nameof ( RunAsync ) } first.") ;
69+ }
70+
71+ return _outgoingSseChannel . Writer . WriteAsync ( new SseItem < IJsonRpcMessage ? > ( message ) , cancellationToken ) . AsTask ( ) ;
72+ }
5373
74+ /// <summary>
75+ /// Handles incoming JSON-RPC messages received on the /message endpoint.
76+ /// </summary>
77+ /// <param name="message">The JSON-RPC message received.</param>
78+ /// <param name="cancellationToken">A token to cancel the operation.</param>
79+ /// <returns>A task representing the potentially asynchronous operation to buffer or process the JSON-RPC message.</returns>
80+ /// <exception cref="InvalidOperationException">Thrown when there is an attempt to process a message before calling <see cref="RunAsync(CancellationToken)"/>.</exception>
5481 public Task OnMessageReceivedAsync ( IJsonRpcMessage message , CancellationToken cancellationToken )
5582 {
56- if ( ! IsConnected )
83+ if ( _sseWriteTask is null )
5784 {
58- throw new McpTransportException ( "Transport is not connected" ) ;
85+ throw new InvalidOperationException ( $ "Transport is not connected. Make sure to call { nameof ( RunAsync ) } first. ") ;
5986 }
6087
6188 return _incomingChannel . Writer . WriteAsync ( message , cancellationToken ) . AsTask ( ) ;
0 commit comments