44using ModelContextProtocol . Client ;
55using ModelContextProtocol . Protocol ;
66using ModelContextProtocol . Server ;
7- using System . Collections . Concurrent ;
87using System . Diagnostics ;
98using System . Net ;
109
@@ -193,18 +192,22 @@ public async Task ScopedServices_Resolve_FromRequestScope()
193192 [ Fact ]
194193 public async Task ProgressNotifications_Work_InStatelessMode ( )
195194 {
195+ // Use TCS to coordinate: the tool reports progress, then waits for the test to confirm
196+ // the notification arrived before completing. This avoids the race where fire-and-forget
197+ // NotifyProgressAsync hasn't flushed before the SSE stream closes.
198+ var progressReceived = new TaskCompletionSource ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
199+ var toolCanComplete = new TaskCompletionSource ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
200+
196201 Builder . Services . AddMcpServer ( )
197202 . WithHttpTransport ( options =>
198203 {
199204 options . Stateless = true ;
200205 } )
201206 . WithTools ( [ McpServerTool . Create (
202- [ System . ComponentModel . Description ( "Reports progress" ) ] ( IProgress < ProgressNotificationValue > progress ) =>
207+ async ( IProgress < ProgressNotificationValue > progress ) =>
203208 {
204- for ( int i = 0 ; i < 5 ; i ++ )
205- {
206- progress . Report ( new ( ) { Progress = i , Total = 5 , Message = $ "Step { i } " } ) ;
207- }
209+ progress . Report ( new ( ) { Progress = 0 , Total = 1 , Message = "Working" } ) ;
210+ await toolCanComplete . Task ;
208211 return "complete" ;
209212 } , new ( ) { Name = "progressTool" } ) ] ) ;
210213
@@ -217,23 +220,21 @@ public async Task ProgressNotifications_Work_InStatelessMode()
217220
218221 await using var client = await ConnectMcpClientAsync ( ) ;
219222
220- var progressMessages = new ConcurrentBag < string > ( ) ;
221- var toolResponse = await client . CallToolAsync (
223+ // Use a custom IProgress<T> that sets the TCS synchronously (no thread pool posting).
224+ var callTask = client . CallToolAsync (
222225 "progressTool" ,
223- progress : new Progress < ProgressNotificationValue > ( p => progressMessages . Add ( p . Message ! ) ) ,
226+ progress : new SynchronousProgress < ProgressNotificationValue > ( _ => progressReceived . TrySetResult ( ) ) ,
224227 cancellationToken : TestContext . Current . CancellationToken ) ;
225228
229+ // Wait for the progress notification to arrive at the client.
230+ await progressReceived . Task . WaitAsync ( TimeSpan . FromSeconds ( 10 ) , TestContext . Current . CancellationToken ) ;
231+
232+ // Let the tool complete now that we've confirmed progress was received.
233+ toolCanComplete . SetResult ( ) ;
234+
235+ var toolResponse = await callTask ;
226236 var content = Assert . Single ( toolResponse . Content ) ;
227237 Assert . Equal ( "complete" , Assert . IsType < TextContentBlock > ( content ) . Text ) ;
228- // Progress<T> posts callbacks to the thread pool asynchronously, so we need to wait
229- // briefly for them to fire after CallToolAsync returns the tool response.
230- var sw = Stopwatch . StartNew ( ) ;
231- while ( progressMessages . IsEmpty && sw . Elapsed < TimeSpan . FromSeconds ( 5 ) )
232- {
233- await Task . Delay ( 50 , TestContext . Current . CancellationToken ) ;
234- }
235-
236- Assert . NotEmpty ( progressMessages ) ;
237238 }
238239
239240 [ Fact ]
@@ -350,4 +351,9 @@ public class ScopedService
350351 {
351352 public string ? State { get ; set ; }
352353 }
354+
355+ private class SynchronousProgress < T > ( Action < T > handler ) : IProgress < T >
356+ {
357+ public void Report ( T value ) => handler ( value ) ;
358+ }
353359}
0 commit comments