Skip to content

Commit 1f26e84

Browse files
committed
TASK-038-004: Add Http1XCorrelation Back-Pressure Stream Tests
Add deterministic stream tests for the one-request-in-flight back-pressure contract of Http1XCorrelationStage (RFC 9112 §9.3). - bp-001: three serial requests flow in FIFO order with one StreamAcquireItem each - bp-002: two simultaneous requests — second is gated until first response arrives - bp-003: upstream completion mid-flight — stage waits for response before completing - bp-004: response arrives with no in-flight request — stage does not pull until new request All 4 tests use Akka.Streams TestKit manual probes; no Thread.Sleep.
1 parent ccc5237 commit 1f26e84

2 files changed

Lines changed: 262 additions & 8 deletions

File tree

.maggus/features/feature_038.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,14 @@ Two correctness bugs in the Streams layer that both cause request-ordering viola
120120
**Parallel:** no
121121

122122
**Acceptance Criteria:**
123-
- [ ] Test file: `TurboHttp.StreamTests/RFC9112/08_Http1XCorrelationBackPressureTests.cs`.
124-
- [ ] **Test `bp-001` — Serial ordering:** Send 3 requests sequentially. Verify each response is delivered in FIFO order and no second request is pushed to the encoder before the first response arrives.
125-
- [ ] **Test `bp-002` — Back-pressure gate:** Provide 2 requests simultaneously at `InRequest`. Verify only the first request flows to `OutControl` (`StreamAcquireItem` emitted once). The second request is NOT pulled until the first response is delivered on `InResponse`.
126-
- [ ] **Test `bp-003` — Upstream completion mid-flight:** Send 1 request, complete `InRequest` upstream before response arrives. Verify stage does not complete prematurely; stage completes only after the response is forwarded.
127-
- [ ] **Test `bp-004` — Response without in-flight request:** Regression guard — verify defined behavior (error or ignore) when a response arrives with `_inFlightRequest == null`.
128-
- [ ] Each test uses `DisplayName("RFC9112-correlation-bp-NNN: ...")` and `[Fact(Timeout = 5000)]`.
129-
- [ ] No `Thread.Sleep`. Akka.Streams test probes used for all synchronization.
130-
- [ ] All 4 tests green on a clean run.
123+
- [x] Test file: `TurboHttp.StreamTests/RFC9112/08_Http1XCorrelationBackPressureTests.cs`.
124+
- [x] **Test `bp-001` — Serial ordering:** Send 3 requests sequentially. Verify each response is delivered in FIFO order and no second request is pushed to the encoder before the first response arrives.
125+
- [x] **Test `bp-002` — Back-pressure gate:** Provide 2 requests simultaneously at `InRequest`. Verify only the first request flows to `OutControl` (`StreamAcquireItem` emitted once). The second request is NOT pulled until the first response is delivered on `InResponse`.
126+
- [x] **Test `bp-003` — Upstream completion mid-flight:** Send 1 request, complete `InRequest` upstream before response arrives. Verify stage does not complete prematurely; stage completes only after the response is forwarded.
127+
- [x] **Test `bp-004` — Response without in-flight request:** Regression guard — verify defined behavior (error or ignore) when a response arrives with `_inFlightRequest == null`.
128+
- [x] Each test uses `DisplayName("RFC9112-correlation-bp-NNN: ...")` and `[Fact(Timeout = 5000)]`.
129+
- [x] No `Thread.Sleep`. Akka.Streams test probes used for all synchronization.
130+
- [x] All 4 tests green on a clean run.
131131

132132
---
133133

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
using System.Net;
2+
using Akka.Streams;
3+
using Akka.Streams.Dsl;
4+
using Akka.Streams.TestKit;
5+
using TurboHttp.Internal;
6+
using TurboHttp.Streams.Stages.Routing;
7+
8+
namespace TurboHttp.StreamTests.RFC9112;
9+
10+
/// <summary>
11+
/// Tests strict one-request-in-flight back-pressure in Http1XCorrelationStage per RFC 9112.
12+
/// Verifies that the stage gates InRequest to allow only one request in flight at a time,
13+
/// and only pulls another request after the previous response has been delivered.
14+
/// </summary>
15+
/// <remarks>
16+
/// Stage under test: <see cref="Http1XCorrelationStage"/>.
17+
/// RFC 9112 §9.3: HTTP/1.x strict request-response ordering with one-request-in-flight guarantee.
18+
/// </remarks>
19+
public sealed class Http1XCorrelationBackPressureTests : StreamTestBase
20+
{
21+
private static HttpResponseMessage OkResponse()
22+
=> new(HttpStatusCode.OK);
23+
24+
/// <summary>
25+
/// Creates manual probes for request, response, and two outlets (OutResponse, OutControl).
26+
/// Returns (requestProbe, responseProbe, responseOut, signalOut).
27+
/// </summary>
28+
private (
29+
TestPublisher.ManualProbe<HttpRequestMessage> RequestProbe,
30+
TestPublisher.ManualProbe<HttpResponseMessage> ResponseProbe,
31+
TestSubscriber.ManualProbe<HttpResponseMessage> ResponseOut,
32+
TestSubscriber.ManualProbe<IControlItem> SignalOut)
33+
CreateProbes()
34+
{
35+
var requestProbe = this.CreateManualPublisherProbe<HttpRequestMessage>();
36+
var responseProbe = this.CreateManualPublisherProbe<HttpResponseMessage>();
37+
var responseOut = this.CreateManualSubscriberProbe<HttpResponseMessage>();
38+
var signalOut = this.CreateManualSubscriberProbe<IControlItem>();
39+
40+
var graph = RunnableGraph.FromGraph(
41+
GraphDsl.Create(
42+
(b) =>
43+
{
44+
var stage = b.Add(new Http1XCorrelationStage());
45+
var reqSrc = b.Add(Source.FromPublisher(requestProbe));
46+
var resSrc = b.Add(Source.FromPublisher(responseProbe));
47+
48+
b.From(reqSrc).To(stage.InRequest);
49+
b.From(resSrc).To(stage.InResponse);
50+
b.From(stage.OutResponse).To(Sink.FromSubscriber(responseOut));
51+
b.From(stage.OutControl).To(Sink.FromSubscriber(signalOut));
52+
53+
return ClosedShape.Instance;
54+
}));
55+
56+
graph.Run(Materializer);
57+
58+
return (requestProbe, responseProbe, responseOut, signalOut);
59+
}
60+
61+
[Fact(Timeout = 5000, DisplayName = "RFC9112-9.3-bp-001: Three requests sent serially → FIFO order with StreamAcquireItem per request")]
62+
public async Task Should_MaintainFifoOrder_WhenThreeRequestsSerialSend()
63+
{
64+
var (requestProbe, responseProbe, responseOut, signalOut) = CreateProbes();
65+
66+
var responseOutSub = responseOut.ExpectSubscription(TestContext.Current.CancellationToken);
67+
var signalOutSub = signalOut.ExpectSubscription(TestContext.Current.CancellationToken);
68+
69+
var requestPubSub = requestProbe.ExpectSubscription(TestContext.Current.CancellationToken);
70+
var responsePubSub = responseProbe.ExpectSubscription(TestContext.Current.CancellationToken);
71+
72+
// Request demand: 1 on OutResponse + high on OutControl
73+
responseOutSub.Request(1);
74+
signalOutSub.Request(100);
75+
76+
// Build requests and responses
77+
var request1 = new HttpRequestMessage(HttpMethod.Get, "http://example.com/1");
78+
var request2 = new HttpRequestMessage(HttpMethod.Get, "http://example.com/2");
79+
var request3 = new HttpRequestMessage(HttpMethod.Get, "http://example.com/3");
80+
var response1 = OkResponse();
81+
var response2 = OkResponse();
82+
var response3 = OkResponse();
83+
84+
// Cycle 1: Request1
85+
requestPubSub.SendNext(request1);
86+
var sig1 = signalOut.ExpectNext(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
87+
Assert.IsType<StreamAcquireItem>(sig1);
88+
89+
responsePubSub.SendNext(response1);
90+
var resp1 = responseOut.ExpectNext(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
91+
Assert.Same(request1, resp1.RequestMessage);
92+
Assert.Same(response1, resp1);
93+
94+
// Request more on OutResponse for next response
95+
responseOutSub.Request(1);
96+
97+
// Cycle 2: Request2
98+
requestPubSub.SendNext(request2);
99+
var sig2 = signalOut.ExpectNext(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
100+
Assert.IsType<StreamAcquireItem>(sig2);
101+
102+
responsePubSub.SendNext(response2);
103+
var resp2 = responseOut.ExpectNext(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
104+
Assert.Same(request2, resp2.RequestMessage);
105+
Assert.Same(response2, resp2);
106+
107+
// Request more on OutResponse for next response
108+
responseOutSub.Request(1);
109+
110+
// Cycle 3: Request3
111+
requestPubSub.SendNext(request3);
112+
var sig3 = signalOut.ExpectNext(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
113+
Assert.IsType<StreamAcquireItem>(sig3);
114+
115+
responsePubSub.SendNext(response3);
116+
var resp3 = responseOut.ExpectNext(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
117+
Assert.Same(request3, resp3.RequestMessage);
118+
Assert.Same(response3, resp3);
119+
}
120+
121+
[Fact(Timeout = 5000, DisplayName = "RFC9112-9.3-bp-002: Two requests queued → only first flows until response, second gated")]
122+
public async Task Should_Gate_SecondRequest_Until_FirstResponseDelivered()
123+
{
124+
var (requestProbe, responseProbe, responseOut, signalOut) = CreateProbes();
125+
126+
var responseOutSub = responseOut.ExpectSubscription(TestContext.Current.CancellationToken);
127+
var signalOutSub = signalOut.ExpectSubscription(TestContext.Current.CancellationToken);
128+
129+
var requestPubSub = requestProbe.ExpectSubscription(TestContext.Current.CancellationToken);
130+
var responsePubSub = responseProbe.ExpectSubscription(TestContext.Current.CancellationToken);
131+
132+
// Request 2 items upfront on both outlets to allow flow
133+
responseOutSub.Request(2);
134+
signalOutSub.Request(100);
135+
136+
var request1 = new HttpRequestMessage(HttpMethod.Get, "http://example.com/1");
137+
var request2 = new HttpRequestMessage(HttpMethod.Get, "http://example.com/2");
138+
var response1 = OkResponse();
139+
var response2 = OkResponse();
140+
141+
// Send both requests (queue them at the source)
142+
requestPubSub.SendNext(request1);
143+
requestPubSub.SendNext(request2);
144+
145+
// First request flows: StreamAcquireItem emitted
146+
var sig1 = signalOut.ExpectNext(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
147+
Assert.IsType<StreamAcquireItem>(sig1);
148+
149+
// Second request is gated — no StreamAcquireItem yet
150+
// Wait to verify no immediate second StreamAcquireItem
151+
signalOut.ExpectNoMsg(TimeSpan.FromMilliseconds(300), TestContext.Current.CancellationToken);
152+
153+
// Now send response for request1 — this clears the gate
154+
responsePubSub.SendNext(response1);
155+
var resp1 = responseOut.ExpectNext(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
156+
Assert.Same(request1, resp1.RequestMessage);
157+
158+
// Now the second request can flow
159+
var sig2 = signalOut.ExpectNext(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
160+
Assert.IsType<StreamAcquireItem>(sig2);
161+
162+
// Send response for request2
163+
responsePubSub.SendNext(response2);
164+
var resp2 = responseOut.ExpectNext(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
165+
Assert.Same(request2, resp2.RequestMessage);
166+
}
167+
168+
[Fact(Timeout = 5000, DisplayName = "RFC9112-9.3-bp-003: Upstream request completion mid-flight → stage waits for response before completing")]
169+
public async Task Should_NotComplete_Until_InFlightResponseArrives_WhenRequestCompletes()
170+
{
171+
var (requestProbe, responseProbe, responseOut, signalOut) = CreateProbes();
172+
173+
var responseOutSub = responseOut.ExpectSubscription(TestContext.Current.CancellationToken);
174+
var signalOutSub = signalOut.ExpectSubscription(TestContext.Current.CancellationToken);
175+
176+
var requestPubSub = requestProbe.ExpectSubscription(TestContext.Current.CancellationToken);
177+
var responsePubSub = responseProbe.ExpectSubscription(TestContext.Current.CancellationToken);
178+
179+
responseOutSub.Request(1);
180+
signalOutSub.Request(100);
181+
182+
var request = new HttpRequestMessage(HttpMethod.Get, "http://example.com/");
183+
var response = OkResponse();
184+
185+
// Send 1 request
186+
requestPubSub.SendNext(request);
187+
var sig = signalOut.ExpectNext(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
188+
Assert.IsType<StreamAcquireItem>(sig);
189+
190+
// Complete the request upstream BEFORE response arrives
191+
requestPubSub.SendComplete();
192+
193+
// Stage must NOT complete prematurely — response is still in flight
194+
responseOut.ExpectNoMsg(TimeSpan.FromMilliseconds(300), TestContext.Current.CancellationToken);
195+
196+
// Now send the response
197+
responsePubSub.SendNext(response);
198+
var resp = responseOut.ExpectNext(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
199+
Assert.Same(request, resp.RequestMessage);
200+
201+
// Complete response upstream
202+
responsePubSub.SendComplete();
203+
204+
// Stage should now complete (both upstreams done, no in-flight request)
205+
responseOut.ExpectComplete(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
206+
}
207+
208+
[Fact(Timeout = 5000, DisplayName = "RFC9112-9.3-bp-004: Excess response without in-flight request → not pulled until new request arrives")]
209+
public async Task Should_NotPullResponse_Until_RequestInFlight()
210+
{
211+
var (requestProbe, responseProbe, responseOut, signalOut) = CreateProbes();
212+
213+
var responseOutSub = responseOut.ExpectSubscription(TestContext.Current.CancellationToken);
214+
var signalOutSub = signalOut.ExpectSubscription(TestContext.Current.CancellationToken);
215+
216+
var requestPubSub = requestProbe.ExpectSubscription(TestContext.Current.CancellationToken);
217+
var responsePubSub = responseProbe.ExpectSubscription(TestContext.Current.CancellationToken);
218+
219+
responseOutSub.Request(3);
220+
signalOutSub.Request(100);
221+
222+
var request1 = new HttpRequestMessage(HttpMethod.Get, "http://example.com/1");
223+
var request2 = new HttpRequestMessage(HttpMethod.Get, "http://example.com/2");
224+
var response1 = OkResponse();
225+
var response2 = OkResponse();
226+
227+
// Send request1
228+
requestPubSub.SendNext(request1);
229+
var sig1 = signalOut.ExpectNext(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
230+
Assert.IsType<StreamAcquireItem>(sig1);
231+
232+
// Send response1
233+
responsePubSub.SendNext(response1);
234+
var resp1 = responseOut.ExpectNext(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
235+
Assert.Same(request1, resp1.RequestMessage);
236+
237+
// Now _inFlightRequest == null. Queue a second response (even though stage has no request in flight)
238+
responsePubSub.SendNext(response2);
239+
240+
// Stage should NOT pull response2 yet because no request is in flight
241+
// Verify responseOut does not receive response2 within the gate period
242+
responseOut.ExpectNoMsg(TimeSpan.FromMilliseconds(300), TestContext.Current.CancellationToken);
243+
244+
// Now send request2 — this should pull the queued response2 from upstream
245+
requestPubSub.SendNext(request2);
246+
var sig2 = signalOut.ExpectNext(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
247+
Assert.IsType<StreamAcquireItem>(sig2);
248+
249+
// The excess response2 is now pulled and delivered
250+
var resp2 = responseOut.ExpectNext(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
251+
Assert.Same(request2, resp2.RequestMessage);
252+
Assert.Same(response2, resp2);
253+
}
254+
}

0 commit comments

Comments
 (0)