@@ -12,15 +12,14 @@ namespace ModelContextProtocol.Server;
1212/// Handles processing the request/response body pairs for the Streamable HTTP transport.
1313/// This is typically used via <see cref="JsonRpcMessageContext.RelatedTransport"/>.
1414/// </summary>
15- internal sealed partial class StreamableHttpPostTransport (
16- StreamableHttpServerTransport parentTransport ,
17- Stream responseStream ,
18- CancellationToken sessionCancellationToken ,
19- ILogger logger ) : ITransport
15+ internal sealed partial class StreamableHttpPostTransport : ITransport
2016{
17+ private readonly StreamableHttpServerTransport _parentTransport ;
18+ private readonly CancellationToken _sessionCancellationToken ;
19+ private readonly ILogger _logger ;
2120 private readonly SemaphoreSlim _messageLock = new ( 1 , 1 ) ;
2221 private readonly TaskCompletionSource < bool > _httpResponseTcs = new ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
23- private readonly SseEventWriter _httpSseWriter = new ( responseStream ) ;
22+ private readonly SseEventWriter _httpSseWriter ;
2423
2524 private TaskCompletionSource < bool > ? _storeStreamTcs ;
2625 private ISseEventStreamWriter ? _storeSseWriter ;
@@ -29,9 +28,21 @@ internal sealed partial class StreamableHttpPostTransport(
2928 private bool _finalResponseMessageSent ;
3029 private bool _httpResponseCompleted ;
3130
31+ public StreamableHttpPostTransport (
32+ StreamableHttpServerTransport parentTransport ,
33+ Stream responseStream ,
34+ CancellationToken sessionCancellationToken ,
35+ ILogger logger )
36+ {
37+ _parentTransport = parentTransport ;
38+ _sessionCancellationToken = sessionCancellationToken ;
39+ _logger = logger ;
40+ _httpSseWriter = new ( responseStream ) ;
41+ }
42+
3243 public ChannelReader < JsonRpcMessage > MessageReader => throw new NotSupportedException ( "JsonRpcMessage.Context.RelatedTransport should only be used for sending messages." ) ;
3344
34- string ? ITransport . SessionId => parentTransport . SessionId ;
45+ string ? ITransport . SessionId => _parentTransport . SessionId ;
3546
3647 /// <returns>
3748 /// True, if data was written to the response body.
@@ -50,21 +61,21 @@ public async ValueTask<bool> HandlePostAsync(JsonRpcMessage message, Cancellatio
5061 if ( request . Method == RequestMethods . Initialize )
5162 {
5263 var initializeRequest = JsonSerializer . Deserialize ( request . Params , McpJsonUtilities . JsonContext . Default . InitializeRequestParams ) ;
53- await parentTransport . HandleInitRequestAsync ( initializeRequest ) . ConfigureAwait ( false ) ;
64+ await _parentTransport . HandleInitRequestAsync ( initializeRequest ) . ConfigureAwait ( false ) ;
5465 }
5566 }
5667
5768 message . Context ??= new JsonRpcMessageContext ( ) ;
5869 message . Context . RelatedTransport = this ;
5970
60- if ( parentTransport . FlowExecutionContextFromRequests )
71+ if ( _parentTransport . FlowExecutionContextFromRequests )
6172 {
6273 message . Context . ExecutionContext = ExecutionContext . Capture ( ) ;
6374 }
6475
6576 if ( _pendingRequest . Id is null )
6677 {
67- await parentTransport . MessageWriter . WriteAsync ( message , cancellationToken ) . ConfigureAwait ( false ) ;
78+ await _parentTransport . MessageWriter . WriteAsync ( message , cancellationToken ) . ConfigureAwait ( false ) ;
6879 return false ;
6980 }
7081
@@ -77,7 +88,7 @@ public async ValueTask<bool> HandlePostAsync(JsonRpcMessage message, Cancellatio
7788 }
7889
7990 // Ensure that we've sent the priming event before processing the incoming request.
80- await parentTransport . MessageWriter . WriteAsync ( message , cancellationToken ) . ConfigureAwait ( false ) ;
91+ await _parentTransport . MessageWriter . WriteAsync ( message , cancellationToken ) . ConfigureAwait ( false ) ;
8192 }
8293
8394 // Wait for the response to be written before returning from the handler.
@@ -91,7 +102,7 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can
91102 {
92103 Throw . IfNull ( message ) ;
93104
94- if ( parentTransport . Stateless && message is JsonRpcRequest )
105+ if ( _parentTransport . Stateless && message is JsonRpcRequest )
95106 {
96107 throw new InvalidOperationException ( "Server to client requests are not supported in stateless mode." ) ;
97108 }
@@ -105,7 +116,7 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can
105116 {
106117 // The final response message has already been sent.
107118 // Rather than drop the message, fall back to sending it via the parent transport.
108- await parentTransport . SendMessageAsync ( message , cancellationToken ) . ConfigureAwait ( false ) ;
119+ await _parentTransport . SendMessageAsync ( message , cancellationToken ) . ConfigureAwait ( false ) ;
109120 return ;
110121 }
111122
@@ -144,7 +155,7 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can
144155
145156 public async ValueTask EnablePollingAsync ( TimeSpan retryInterval , CancellationToken cancellationToken )
146157 {
147- if ( parentTransport . Stateless )
158+ if ( _parentTransport . Stateless )
148159 {
149160 throw new InvalidOperationException ( "Polling is not supported in stateless mode." ) ;
150161 }
@@ -180,9 +191,9 @@ public async ValueTask EnablePollingAsync(TimeSpan retryInterval, CancellationTo
180191 {
181192 Debug . Assert ( _storeSseWriter is null ) ;
182193
183- _storeSseWriter = await parentTransport . TryCreateEventStreamAsync (
194+ _storeSseWriter = await _parentTransport . TryCreateEventStreamAsync (
184195 streamId : requestId . Id ! . ToString ( ) ! ,
185- cancellationToken : sessionCancellationToken )
196+ cancellationToken : _sessionCancellationToken )
186197 . ConfigureAwait ( false ) ;
187198
188199 if ( _storeSseWriter is null )
@@ -193,13 +204,13 @@ public async ValueTask EnablePollingAsync(TimeSpan retryInterval, CancellationTo
193204 _storeStreamTcs = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
194205 _ = HandleStoreStreamDisposalAsync ( _storeStreamTcs . Task ) ;
195206
196- return await _storeSseWriter . WriteEventAsync ( SseItem . Prime < JsonRpcMessage > ( ) , sessionCancellationToken ) . ConfigureAwait ( false ) ;
207+ return await _storeSseWriter . WriteEventAsync ( SseItem . Prime < JsonRpcMessage > ( ) , _sessionCancellationToken ) . ConfigureAwait ( false ) ;
197208
198209 async Task HandleStoreStreamDisposalAsync ( Task streamTask )
199210 {
200211 try
201212 {
202- await streamTask . WaitAsync ( sessionCancellationToken ) . ConfigureAwait ( false ) ;
213+ await streamTask . WaitAsync ( _sessionCancellationToken ) . ConfigureAwait ( false ) ;
203214 }
204215 catch ( OperationCanceledException )
205216 {
0 commit comments