55using TurboHttp . Protocol . RFC9113 ;
66using TurboHttp . Streams . Stages . Decoding ;
77
8- namespace TurboHttp . StreamTests . RFC9113 ;
8+ namespace TurboHttp . StreamTests . Http2 . Connection ;
99
1010/// <summary>
1111/// Tests backpressure behaviour in the HTTP/2 connection stage per RFC 9113.
1212/// Verifies that the stage correctly applies flow control and does not emit frames faster than the downstream can consume.
1313/// </summary>
14- /// <remarks>
15- /// Stage under test: <see cref="Http20ConnectionStage"/>.
16- /// RFC 9113 §5.2: HTTP/2 flow control and backpressure in connection-level frame processing.
17- /// </remarks>
18- public sealed class Http20ConnectionStageBackpressureTests : StreamTestBase
14+ [ Trait ( "RFC" , "RFC9113-5.2" ) ]
15+ public sealed class Http2ConnectionBackpressureSpec : StreamTestBase
1916{
20- /// <summary>
21- /// Creates an Http20ConnectionStage graph with a <see cref="Source.Queue{T}"/> for InApp
22- /// and a <see cref="TestPublisher.ManualProbe{T}"/> for InServer.
23- /// Subscriber probes capture all three outlets for assertion.
24- /// </summary>
2517 private (
2618 ISourceQueueWithComplete < HttpRequestMessage > RequestQueue ,
2719 TestPublisher . ManualProbe < Http2Frame > ServerProbe ,
@@ -57,20 +49,12 @@ public sealed class Http20ConnectionStageBackpressureTests : StreamTestBase
5749 return ( requestQueue , serverProbe , serverBoundProbe , appOutProbe , signalProbe ) ;
5850 }
5951
60- /// <summary>
61- /// Offers a request to the queue and asserts it is enqueued (accepted).
62- /// </summary>
6352 private static async Task OfferAsync ( ISourceQueueWithComplete < HttpRequestMessage > queue , HttpRequestMessage request )
6453 {
6554 var result = await queue . OfferAsync ( request ) . WaitAsync ( TimeSpan . FromSeconds ( 3 ) , TestContext . Current . CancellationToken ) ;
6655 Assert . IsType < QueueOfferResult . Enqueued > ( result ) ;
6756 }
6857
69- /// <summary>
70- /// Sends <paramref name="count"/> GET requests through the queue and verifies
71- /// each one is forwarded to OutServer and emits a StreamAcquireItem signal.
72- /// Returns the next available odd stream ID.
73- /// </summary>
7458 private static async Task < int > FillStreamsAsync (
7559 ISourceQueueWithComplete < HttpRequestMessage > queue ,
7660 TestSubscriber . ManualProbe < Http2Frame > serverBoundProbe ,
@@ -89,8 +73,9 @@ private static async Task<int> FillStreamsAsync(
8973 return streamId ;
9074 }
9175
92- [ Fact ( Timeout = 10_000 , DisplayName = "RFC9113-5.1.2-20CS-BP-001: Backpressure gates request inlet at max concurrent streams" ) ]
93- public async Task Should_Stop_Pulling_When_At_MaxConcurrentStreams_Limit ( )
76+ [ Fact ( Timeout = 10_000 ) ]
77+ [ Trait ( "RFC" , "RFC9113-5.1.2" ) ]
78+ public async Task Http2ConnectionBackpressure_should_stop_pulling_when_at_max_concurrent_streams_limit ( )
9479 {
9580 var ( requestQueue , _, serverBoundProbe , appOutProbe , signalProbe ) = CreateProbes ( 3 ) ;
9681
@@ -102,18 +87,16 @@ public async Task Should_Stop_Pulling_When_At_MaxConcurrentStreams_Limit()
10287 serverBoundSub . Request ( 100 ) ;
10388 signalSub . Request ( 100 ) ;
10489
105- // Fill 3 streams (the limit)
106- var nextId = await FillStreamsAsync ( requestQueue , serverBoundProbe , signalProbe , 3 ) ;
90+ await FillStreamsAsync ( requestQueue , serverBoundProbe , signalProbe , 3 ) ;
10791
108- // Offer a 4th request — it enters the queue buffer but the stage won't pull it
10992 await OfferAsync ( requestQueue , new HttpRequestMessage ( HttpMethod . Get , "http://example.com/" ) ) ;
11093
111- // The 4th frame should NOT appear on OutServer because the stage is gating _inApp
11294 serverBoundProbe . ExpectNoMsg ( TimeSpan . FromMilliseconds ( 300 ) , TestContext . Current . CancellationToken ) ;
11395 }
11496
115- [ Fact ( Timeout = 10_000 , DisplayName = "RFC9113-5.1.2-20CS-BP-002: END_STREAM decrements active streams and resumes pull" ) ]
116- public async Task Should_Decrement_And_Resume_Pull_When_EndStream_Received ( )
97+ [ Fact ( Timeout = 10_000 ) ]
98+ [ Trait ( "RFC" , "RFC9113-5.1.2" ) ]
99+ public async Task Http2ConnectionBackpressure_should_decrement_and_resume_pull_when_end_stream_received ( )
117100 {
118101 var ( requestQueue , serverProbe , serverBoundProbe , appOutProbe , signalProbe ) = CreateProbes ( 3 ) ;
119102
@@ -127,27 +110,23 @@ public async Task Should_Decrement_And_Resume_Pull_When_EndStream_Received()
127110
128111 var srvSub = serverProbe . ExpectSubscription ( TestContext . Current . CancellationToken ) ;
129112
130- // Fill 3 streams (the limit)
131- var nextId = await FillStreamsAsync ( requestQueue , serverBoundProbe , signalProbe , 3 ) ;
113+ await FillStreamsAsync ( requestQueue , serverBoundProbe , signalProbe , 3 ) ;
132114
133- // Offer a 4th request — queued but not yet pulled by stage
134115 await OfferAsync ( requestQueue , new HttpRequestMessage ( HttpMethod . Get , "http://example.com/" ) ) ;
135116
136- // Verify gated
137117 serverBoundProbe . ExpectNoMsg ( TimeSpan . FromMilliseconds ( 200 ) , TestContext . Current . CancellationToken ) ;
138118
139- // Server sends END_STREAM on stream 1 (zero-length DataFrame).
140119 // DATA without prior HEADERS produces no HttpResponseMessage on OutResponse;
141120 // CloseStream still decrements _activeStreams and TryPullRequest resumes the gate.
142121 srvSub . SendNext ( new DataFrame ( streamId : 1 , data : Array . Empty < byte > ( ) , endStream : true ) ) ;
143122
144- // Pull resumes — the 4th frame now appears on OutServer
145123 serverBoundProbe . ExpectNext ( TimeSpan . FromSeconds ( 3 ) , TestContext . Current . CancellationToken ) ;
146124 signalProbe . ExpectNext ( TimeSpan . FromSeconds ( 3 ) , TestContext . Current . CancellationToken ) ;
147125 }
148126
149- [ Fact ( Timeout = 10_000 , DisplayName = "RFC9113-5.1.2-20CS-BP-003: RstStreamFrame decrements active streams and resumes pull" ) ]
150- public async Task Should_Decrement_And_Resume_Pull_When_RstStream_Received ( )
127+ [ Fact ( Timeout = 10_000 ) ]
128+ [ Trait ( "RFC" , "RFC9113-5.1.2" ) ]
129+ public async Task Http2ConnectionBackpressure_should_decrement_and_resume_pull_when_rst_stream_received ( )
151130 {
152131 var ( requestQueue , serverProbe , serverBoundProbe , appOutProbe , signalProbe ) = CreateProbes ( 3 ) ;
153132
@@ -161,26 +140,21 @@ public async Task Should_Decrement_And_Resume_Pull_When_RstStream_Received()
161140
162141 var srvSub = serverProbe . ExpectSubscription ( TestContext . Current . CancellationToken ) ;
163142
164- // Fill 3 streams (the limit)
165- var nextId = await FillStreamsAsync ( requestQueue , serverBoundProbe , signalProbe , 3 ) ;
143+ await FillStreamsAsync ( requestQueue , serverBoundProbe , signalProbe , 3 ) ;
166144
167- // Offer a 4th request — queued but gated
168145 await OfferAsync ( requestQueue , new HttpRequestMessage ( HttpMethod . Get , "http://example.com/" ) ) ;
169146 serverBoundProbe . ExpectNoMsg ( TimeSpan . FromMilliseconds ( 200 ) , TestContext . Current . CancellationToken ) ;
170147
171- // Server sends RST_STREAM on stream 3.
172- // RST_STREAM is consumed internally; CloseStream decrements _activeStreams and resumes the gate.
173148 srvSub . SendNext ( new RstStreamFrame ( streamId : 3 , Http2ErrorCode . Cancel ) ) ;
174149
175- // Pull resumes — the 4th frame now appears on OutServer
176150 serverBoundProbe . ExpectNext ( TimeSpan . FromSeconds ( 3 ) , TestContext . Current . CancellationToken ) ;
177151 signalProbe . ExpectNext ( TimeSpan . FromSeconds ( 3 ) , TestContext . Current . CancellationToken ) ;
178152 }
179153
180- [ Fact ( Timeout = 10_000 , DisplayName = "RFC9113-5.1.2-20CS-BP-004: SETTINGS MAX_CONCURRENT_STREAMS mid-session enforces new limit immediately" ) ]
181- public async Task Should_Enforce_New_ConcurrentStreams_Limit_When_Settings_Updated_MidSession ( )
154+ [ Fact ( Timeout = 10_000 ) ]
155+ [ Trait ( "RFC" , "RFC9113-5.1.2" ) ]
156+ public async Task Http2ConnectionBackpressure_should_enforce_new_concurrent_streams_limit_when_settings_updated_mid_session ( )
182157 {
183- // Start with limit=100, open 2 streams, then SETTINGS lowers limit to 2
184158 var ( requestQueue , serverProbe , serverBoundProbe , appOutProbe , signalProbe ) = CreateProbes ( 100 ) ;
185159
186160 var appOutSub = appOutProbe . ExpectSubscription ( TestContext . Current . CancellationToken ) ;
@@ -193,11 +167,8 @@ public async Task Should_Enforce_New_ConcurrentStreams_Limit_When_Settings_Updat
193167
194168 var srvSub = serverProbe . ExpectSubscription ( TestContext . Current . CancellationToken ) ;
195169
196- // Open 2 streams
197170 await FillStreamsAsync ( requestQueue , serverBoundProbe , signalProbe , 2 ) ;
198171
199- // Server sends SETTINGS lowering MAX_CONCURRENT_STREAMS to 2.
200- // SETTINGS is consumed internally; OutResponse receives nothing.
201172 srvSub . SendNext ( new SettingsFrame (
202173 [ ( SettingsParameter . MaxConcurrentStreams , 2u ) ] ) ) ;
203174
@@ -208,7 +179,6 @@ public async Task Should_Enforce_New_ConcurrentStreams_Limit_When_Settings_Updat
208179
209180 // The stage had an outstanding pull from when limit was 100.
210181 // That in-flight pull will be satisfied by the next offered element regardless of the new limit.
211- // Offer the 3rd request — it passes through on the pre-existing pull.
212182 await OfferAsync ( requestQueue , new HttpRequestMessage ( HttpMethod . Get , "http://example.com/" ) ) ;
213183 serverBoundProbe . ExpectNext ( TimeSpan . FromSeconds ( 3 ) , TestContext . Current . CancellationToken ) ;
214184 signalProbe . ExpectNext ( TimeSpan . FromSeconds ( 3 ) , TestContext . Current . CancellationToken ) ;
@@ -218,11 +188,9 @@ public async Task Should_Enforce_New_ConcurrentStreams_Limit_When_Settings_Updat
218188 serverBoundProbe . ExpectNoMsg ( TimeSpan . FromMilliseconds ( 300 ) , TestContext . Current . CancellationToken ) ;
219189
220190 // Close streams 1 and 3 to drop to activeStreams=1 < limit=2 → pull resumes.
221- // DATA/RST_STREAM are consumed internally; CloseStream manages _activeStreams.
222191 srvSub . SendNext ( new DataFrame ( streamId : 1 , data : Array . Empty < byte > ( ) , endStream : true ) ) ;
223192 srvSub . SendNext ( new RstStreamFrame ( streamId : 3 , Http2ErrorCode . Cancel ) ) ;
224193
225- // The 4th frame should now flow through
226194 serverBoundProbe . ExpectNext ( TimeSpan . FromSeconds ( 3 ) , TestContext . Current . CancellationToken ) ;
227195 signalProbe . ExpectNext ( TimeSpan . FromSeconds ( 3 ) , TestContext . Current . CancellationToken ) ;
228196 }
0 commit comments