@@ -5,52 +5,111 @@ namespace Open.ChannelExtensions.Tests;
55public static class HangReproTest
66{
77 /// <summary>
8- /// Reproduction test for the hanging issue described in GitHub.
9- /// This test should fail if the bug exists (by timing out).
8+ /// Simple test to verify batch with timeout completes correctly.
109 /// </summary>
1110 [ Fact ]
11+ public static async Task SimpleBatchWithTimeoutCompletes ( )
12+ {
13+ // Source yields 75 items, batch size 50 = 1 full batch + 25 remaining
14+ // Timeout should flush the remaining 25 items
15+ var items = Enumerable . Range ( 0 , 75 ) ;
16+ var batches = new List < int > ( ) ;
17+
18+ await items
19+ . ToChannel ( singleReader : true )
20+ . Batch ( 50 )
21+ . WithTimeout ( TimeSpan . FromMilliseconds ( 100 ) )
22+ . ReadAllAsync ( batch =>
23+ {
24+ batches . Add ( batch . Count ) ;
25+ return default ;
26+ } )
27+ . AsTask ( )
28+ . WaitAsync ( TimeSpan . FromSeconds ( 5 ) ) ;
29+
30+ Assert . Equal ( 2 , batches . Count ) ;
31+ Assert . Equal ( 50 , batches [ 0 ] ) ;
32+ Assert . Equal ( 25 , batches [ 1 ] ) ;
33+ }
34+
35+ /// <summary>
36+ /// Stress test for concurrent batch readers with timeout.
37+ /// Runs 10 concurrent readers × 1000 iterations = 10,000 pipeline executions
38+ /// with randomized batch sizes, source sizes, and delays.
39+ /// </summary>
40+ /// <remarks>
41+ /// <para>
42+ /// Uses bounded channel (capacity: 10000) instead of unbounded to avoid
43+ /// a known .NET runtime bug: https://github.com/dotnet/runtime/issues/123544
44+ /// </para>
45+ /// <para>
46+ /// The <c>.WithTimeout()</c> is required when batching to ensure partial batches
47+ /// are flushed when the source completes. Without it, if the remaining items
48+ /// don't fill a complete batch, the reader will wait indefinitely.
49+ /// </para>
50+ /// </remarks>
51+ [ Fact ]
1252 public static async Task MultipleBatchReadersDoNotHang ( )
1353 {
14- var cts = new CancellationTokenSource ( TimeSpan . FromSeconds ( 30 ) ) ; // 30 second timeout
15- var token = cts . Token ;
54+ const int readerCount = 10 ;
55+ const int iterations = 1000 ;
1656
17- var counts = new int [ 5 ] ; // Using 5 tasks instead of 10 for faster testing
18- var tasks = Enumerable . Range ( 0 , 5 )
57+ var completedIterations = new int [ readerCount ] ;
58+ var lastProcessedBatchTs = new long [ readerCount ] ;
59+ var tasks = Enumerable . Range ( 0 , readerCount )
1960 . Select ( x => Task . Run ( async ( ) =>
2061 {
21- for ( int i = 0 ; i < 50 ; i ++ ) // Run 50 iterations instead of infinite
62+ for ( int i = 0 ; i < iterations ; i ++ )
2263 {
23- if ( token . IsCancellationRequested )
64+ try
65+ {
66+ await GetSource ( )
67+ // Use bounded channel to avoid .NET runtime bug with unbounded channels.
68+ // See: https://github.com/dotnet/runtime/issues/123544
69+ . ToChannel ( capacity : 10000 , singleReader : true )
70+ . Batch ( Random . Shared . Next ( 25 , 50 ) )
71+ // WithTimeout is required to flush partial batches when source completes.
72+ . WithTimeout ( TimeSpan . FromMilliseconds ( 100 ) )
73+ . ReadAllAsync ( async _ =>
74+ {
75+ await Task . Delay ( Random . Shared . Next ( 1 , 10 ) ) ;
76+ lastProcessedBatchTs [ x ] = Stopwatch . GetTimestamp ( ) ;
77+ } )
78+ . AsTask ( )
79+ . WaitAsync ( TimeSpan . FromSeconds ( 10 ) ) ;
80+ }
81+ catch ( TimeoutException )
82+ {
2483 break ;
25-
26- await GetSource ( )
27- . ToChannel ( singleReader : true , cancellationToken : token )
28- . Batch ( Random . Shared . Next ( 15 , 30 ) , singleReader : true )
29- . WithTimeout ( TimeSpan . FromMilliseconds ( 100 ) )
30- . ReadAllAsync ( async batch => await Task . Delay ( Random . Shared . Next ( 2 , 15 ) , token ) , token ) ;
31-
32- counts [ x ] += 1 ;
84+ }
85+
86+ completedIterations [ x ] += 1 ;
3387 }
34- } , token ) ) . ToArray ( ) ;
88+ } ) ) . ToArray ( ) ;
3589
36- // Wait for all tasks to complete or timeout
3790 await Task . WhenAll ( tasks ) ;
3891
39- // Verify all tasks completed at least some iterations
40- Assert . All ( counts , count => Assert . True ( count >= 40 , $ "Count was { count } , expected at least 40") ) ;
92+ Assert . All ( completedIterations , ( count , index ) =>
93+ {
94+ var elapsedSinceLastProcessedBatch = Stopwatch . GetElapsedTime ( lastProcessedBatchTs [ index ] ) ;
95+
96+ Assert . True ( count == iterations ,
97+ $ "Reader completed { count } /{ iterations } iterations. " +
98+ $ "Time since last processed batch: { elapsedSinceLastProcessedBatch } ") ;
99+ } ) ;
41100 }
42101
43102 private static async IAsyncEnumerable < int > GetSource ( [ EnumeratorCancellation ] CancellationToken cancellationToken = default )
44103 {
45- foreach ( var value in Enumerable . Range ( 0 , Random . Shared . Next ( 80 , 120 ) ) )
104+ foreach ( var value in Enumerable . Range ( 0 , Random . Shared . Next ( 50 , 100 ) ) )
46105 {
47106 if ( cancellationToken . IsCancellationRequested )
48107 yield break ;
49108
50109 yield return value ;
51110
52111 if ( value % Random . Shared . Next ( 15 , 25 ) == 0 )
53- await Task . Delay ( Random . Shared . Next ( 2 , 15 ) , cancellationToken ) ;
112+ await Task . Delay ( Random . Shared . Next ( 1 , 10 ) , cancellationToken ) ;
54113 }
55114 }
56115}
0 commit comments