Skip to content

Commit 8a56acd

Browse files
committed
TASK-038-002: Rewrite Http1XCorrelationStage — Strict One-Request-In-Flight
Rewrites Http1XCorrelationStage to enforce strict HTTP/1.x back-pressure per RFC 9112 §9. Only one request is in-flight at a time; the next request is not pulled until its response has been delivered on OutResponse. Changes: - Http1XCorrelationShape: InReset inlet removed; shape is now 2 inlets + 2 outlets. - Http1XCorrelationStage.Logic: removed _pending queue, _waiting queue, _pipelineUnlocked flag, and PreStart(). Replaced with a single _inFlightRequest field. - Back-pressure contract: InRequest is pulled only when _inFlightRequest == null. onPush(InRequest) stores the request, emits StreamAcquireItem, then pulls InResponse. onPush(InResponse) matches the in-flight request, sets _inFlightRequest = null, and pushes the response — only then does the downstream demand trigger the next InRequest pull. - Completion: stage completes when both upstreams finish and _inFlightRequest == null, OR gracefully when InResponse closes while a request is in-flight (no deadlock). - Http10Engine.cs / Http11Engine.cs: removed the two dead InReset wiring lines (Source.Empty<NotUsed>() + b.From(resetSrc).To(correlation.InReset)). - Test suite updated: removed InReset wiring from all test helpers and inline graphs, removed tests that verified removed behavior (pipelining, InReset reconnect), updated descriptions to reflect strict serial model. 831/831 stream tests pass.
1 parent 1291185 commit 8a56acd

7 files changed

Lines changed: 105 additions & 354 deletions

File tree

.maggus/features/feature_038.md

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,21 +74,21 @@ Two correctness bugs in the Streams layer that both cause request-ordering viola
7474
**Parallel:** yes — can run alongside TASK-038-001
7575

7676
**Acceptance Criteria:**
77-
- [ ] `Http1XCorrelationShape` loses the `InReset` inlet: shape becomes 2 inlets (`InRequest`, `InResponse`) + 2 outlets (`OutResponse`, `OutControl`).
78-
- [ ] `_inReset` field removed from `Http1XCorrelationStage`.
79-
- [ ] Internal `Logic` state simplified to:
77+
- [x] `Http1XCorrelationShape` loses the `InReset` inlet: shape becomes 2 inlets (`InRequest`, `InResponse`) + 2 outlets (`OutResponse`, `OutControl`).
78+
- [x] `_inReset` field removed from `Http1XCorrelationStage`.
79+
- [x] Internal `Logic` state simplified to:
8080
- `_inFlightRequest`: `HttpRequestMessage?` — the one pending request, or `null`.
8181
- `_requestUpstreamFinished`, `_responseUpstreamFinished`: completion booleans (unchanged if present).
82-
- [ ] `_pending` queue, `_waiting` queue, and `_pipelineUnlocked` flag are all removed.
83-
- [ ] Back-pressure contract enforced:
82+
- [x] `_pending` queue, `_waiting` queue, and `_pipelineUnlocked` flag are all removed.
83+
- [x] Back-pressure contract enforced:
8484
- A new request is pulled from `InRequest` only when `_inFlightRequest == null`.
8585
- When a request arrives on `InRequest`, it is stored in `_inFlightRequest` and `StreamAcquireItem` is emitted on `OutControl`. The stage does NOT pull another request.
8686
- When a response arrives on `InResponse`, it is matched to `_inFlightRequest`, pushed on `OutResponse`, and `_inFlightRequest` is set to `null`. Only then is the next request pulled.
87-
- [ ] `PreStart()` removed (no longer needs to pull `_inReset`).
88-
- [ ] `onPull(OutResponse)` pulls `InResponse` and pulls `InRequest` only if `_inFlightRequest == null`.
89-
- [ ] Completion logic: stage completes only when both upstreams finish and `_inFlightRequest == null`.
90-
- [ ] `Roslyn MCP get_diagnostics` on `Http1XCorrelationStage.cs` returns zero errors.
91-
- [ ] Existing stream tests that cover correlation still pass (run `dotnet test --project TurboHttp.StreamTests/TurboHttp.StreamTests.csproj`).
87+
- [x] `PreStart()` removed (no longer needs to pull `_inReset`).
88+
- [x] `onPull(OutResponse)` pulls `InRequest` only if `_inFlightRequest == null` (InResponse is pulled from `onPush(InRequest)` after the request is stored, ensuring serial ordering).
89+
- [x] Completion logic: stage completes only when both upstreams finish and `_inFlightRequest == null`.
90+
- [x] `Roslyn MCP get_diagnostics` on `Http1XCorrelationStage.cs` returns zero errors.
91+
- [x] Existing stream tests that cover correlation still pass (run `dotnet test --project TurboHttp.StreamTests/TurboHttp.StreamTests.csproj`).
9292

9393
---
9494

src/TurboHttp.StreamTests/Concurrency/Http1XCorrelationRaceTests.cs

Lines changed: 39 additions & 124 deletions
Large diffs are not rendered by default.

src/TurboHttp.StreamTests/RFC9112/06_Http11CorrelationStageTests.cs

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ namespace TurboHttp.StreamTests.RFC9112;
1212
/// Verifies that responses are matched to requests in FIFO order and that RequestMessage is correctly set.
1313
/// </summary>
1414
/// <remarks>
15-
/// Stage under test: <see cref="CorrelationHttp1XStage"/>.
16-
/// RFC 9112 §9.3: HTTP/1.1 pipeline ordering and request-response pairing.
15+
/// Stage under test: <see cref="Http1XCorrelationStage"/>.
16+
/// RFC 9112 §9: HTTP/1.x strict one-request-in-flight ordering and request-response pairing.
1717
/// </remarks>
1818
public sealed class Http11CorrelationStageTests : StreamTestBase
1919
{
@@ -35,10 +35,8 @@ private async Task<List<HttpResponseMessage>> RunStageAsync(
3535
var resSrc = b.Add(responseSource);
3636
var signalSink = b.Add(Sink.Ignore<IControlItem>().MapMaterializedValue(_ => NotUsed.Instance));
3737

38-
var resetSrc = b.Add(Source.Never<NotUsed>());
3938
b.From(reqSrc).To(corr.InRequest);
4039
b.From(resSrc).To(corr.InResponse);
41-
b.From(resetSrc).To(corr.InReset);
4240
b.From(corr.OutResponse).To(s);
4341
b.From(corr.OutControl).To(signalSink);
4442

@@ -108,17 +106,17 @@ public async Task Should_PreserveRequestReference_WhenCorrelated()
108106
"response.RequestMessage must be the exact same object reference as the sent request.");
109107
}
110108

111-
[Fact(Timeout = 10_000, DisplayName = "RFC9112-9.3-11CR-004: Response arrives before request → correctly buffered and correlated")]
112-
public async Task Should_BufferAndCorrelate_WhenResponseArrivesBeforeRequest()
109+
[Fact(Timeout = 10_000, DisplayName = "RFC9112-9.3-11CR-004: Request arrives, response arrives later → correctly correlated")]
110+
public async Task Should_Correlate_WhenResponseArrivesAfterRequest()
113111
{
114112
var request = new HttpRequestMessage(HttpMethod.Get, "http://example.com/delayed");
115113
var response = OkResponse();
116114

117-
// Response source emits immediately; request source delayed by 300ms.
118-
// The response will be buffered in the _waiting queue until the request arrives.
115+
// Request is sent first; response is delayed by 300ms.
116+
// Stage pulls InRequest first, then pulls InResponse after request arrives.
119117
var results = await RunStageAsync(
120-
Source.Single(request).InitialDelay(TimeSpan.FromMilliseconds(300)),
121-
Source.Single(response));
118+
Source.Single(request),
119+
Source.Single(response).InitialDelay(TimeSpan.FromMilliseconds(300)));
122120

123121
Assert.Single(results);
124122
Assert.Same(request, results[0].RequestMessage);
@@ -132,7 +130,7 @@ public async Task Should_BufferAndCorrelate_WhenRequestArrivesBeforeResponse()
132130
var response = OkResponse();
133131

134132
// Request source emits immediately; response source delayed by 300ms.
135-
// The request will be buffered in the _pending queue until the response arrives.
133+
// Stage waits for the response after pulling and storing the request.
136134
var results = await RunStageAsync(
137135
Source.Single(request),
138136
Source.Single(response).InitialDelay(TimeSpan.FromMilliseconds(300)));
@@ -164,10 +162,8 @@ public async Task Should_StayAlive_WhenQueuesEmptyButUpstreamOpen()
164162
var resSrc = b.Add(responseSource);
165163
var signalSink = b.Add(Sink.Ignore<IControlItem>().MapMaterializedValue(_ => NotUsed.Instance));
166164

167-
var resetSrc = b.Add(Source.Never<NotUsed>());
168165
b.From(reqSrc).To(corr.InRequest);
169166
b.From(resSrc).To(corr.InResponse);
170-
b.From(resetSrc).To(corr.InReset);
171167
b.From(corr.OutResponse).To(s);
172168
b.From(corr.OutControl).To(signalSink);
173169

@@ -180,19 +176,19 @@ public async Task Should_StayAlive_WhenQueuesEmptyButUpstreamOpen()
180176
await Assert.ThrowsAsync<TimeoutException>(() => task.WaitAsync(TimeSpan.FromMilliseconds(500), TestContext.Current.CancellationToken));
181177
}
182178

183-
[Fact(Timeout = 10_000, DisplayName = "RFC9112-9.3-11CR-007: Stage remains open while pending requests still exist")]
184-
public async Task Should_RemainOpen_WhenPendingRequestsExist()
179+
[Fact(Timeout = 10_000, DisplayName = "RFC9112-9.3-11CR-007: Stage remains open while in-flight request awaits response")]
180+
public async Task Should_RemainOpen_WhenInFlightRequestAwaitingResponse()
185181
{
186-
// Send 2 requests but only 1 response.
187-
// The response source stays open (via Concat+Never) so the stage cannot
188-
// complete due to upstream finish — only the pending request queue keeps it alive.
182+
// Send 2 requests but keep the response source open after delivering only 1 response.
183+
// The second request is pulled after the first response; it waits for its response
184+
// indefinitely (Never) — the stage must remain open.
189185
var request1 = new HttpRequestMessage(HttpMethod.Get, "http://example.com/1");
190186
var request2 = new HttpRequestMessage(HttpMethod.Get, "http://example.com/2");
191187
var response1 = OkResponse();
192188

193189
var sink = Sink.Seq<HttpResponseMessage>();
194190

195-
// Keep the response source open so only pending-request logic matters.
191+
// Keep the response source open so only the in-flight back-pressure matters.
196192
var neverEndingResponses = Source.Single(response1)
197193
.Concat(Source.Never<HttpResponseMessage>());
198194

@@ -203,10 +199,8 @@ public async Task Should_RemainOpen_WhenPendingRequestsExist()
203199
var resSrc = b.Add(neverEndingResponses);
204200
var signalSink = b.Add(Sink.Ignore<IControlItem>().MapMaterializedValue(_ => NotUsed.Instance));
205201

206-
var resetSrc = b.Add(Source.Never<NotUsed>());
207202
b.From(reqSrc).To(corr.InRequest);
208203
b.From(resSrc).To(corr.InResponse);
209-
b.From(resetSrc).To(corr.InReset);
210204
b.From(corr.OutResponse).To(s);
211205
b.From(corr.OutControl).To(signalSink);
212206

@@ -215,8 +209,7 @@ public async Task Should_RemainOpen_WhenPendingRequestsExist()
215209

216210
var task = graph.Run(Materializer);
217211

218-
// The stream should NOT complete because there is still a pending request
219-
// with no matching response. Wait briefly and verify it's still running.
212+
// The stream should NOT complete because request2 is in flight with no matching response.
220213
var completed = task.WaitAsync(TimeSpan.FromMilliseconds(500), TestContext.Current.CancellationToken);
221214

222215
await Assert.ThrowsAsync<TimeoutException>(() => completed);
@@ -237,10 +230,8 @@ public async Task Should_EmitOneStreamAcquireItem_WhenSingleRequestPushed()
237230
var resSrc = b.Add(Source.Single(response));
238231
var responseSink = b.Add(Sink.Ignore<HttpResponseMessage>().MapMaterializedValue(_ => NotUsed.Instance));
239232

240-
var resetSrc = b.Add(Source.Never<NotUsed>());
241233
b.From(reqSrc).To(corr.InRequest);
242234
b.From(resSrc).To(corr.InResponse);
243-
b.From(resetSrc).To(corr.InReset);
244235
b.From(corr.OutResponse).To(responseSink);
245236
b.From(corr.OutControl).To(s);
246237

@@ -278,10 +269,8 @@ public async Task Should_EmitTwoStreamAcquireItems_WhenTwoRequestsPushed()
278269
var resSrc = b.Add(Source.From(responses));
279270
var responseSink = b.Add(Sink.Ignore<HttpResponseMessage>().MapMaterializedValue(_ => NotUsed.Instance));
280271

281-
var resetSrc = b.Add(Source.Never<NotUsed>());
282272
b.From(reqSrc).To(corr.InRequest);
283273
b.From(resSrc).To(corr.InResponse);
284-
b.From(resetSrc).To(corr.InReset);
285274
b.From(corr.OutResponse).To(responseSink);
286275
b.From(corr.OutControl).To(s);
287276

src/TurboHttp.StreamTests/RFC9112/14_Http11PipelineReconnectTests.cs

Lines changed: 11 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,22 @@
88
namespace TurboHttp.StreamTests.RFC9112;
99

1010
/// <summary>
11-
/// Tests the no-pipelining-after-reconnect guard per RFC 9112 §9.3.2.
12-
/// Verifies that the first request after (re)connect waits for a response before pipelining is allowed,
13-
/// and that reconnect resets the guard.
11+
/// Tests the strict one-request-in-flight back-pressure contract per RFC 9112 §9.
12+
/// Verifies that each request waits for its own response before the next request is pulled.
1413
/// </summary>
1514
/// <remarks>
1615
/// Stage under test: <see cref="Http1XCorrelationStage"/>.
17-
/// RFC 9112 §9.3.2: A client SHOULD NOT pipeline requests on a newly opened connection
18-
/// until it knows the connection is persistent.
16+
/// RFC 9112 §9: HTTP/1.x requests and responses MUST be sent and received in order.
17+
/// The InReset inlet has been removed; strict serial back-pressure is always enforced.
1918
/// </remarks>
2019
public sealed class Http11PipelineReconnectTests : StreamTestBase
2120
{
2221
private static HttpResponseMessage OkResponse()
2322
=> new(HttpStatusCode.OK);
2423

25-
/// <summary>
26-
/// Builds and runs a closed graph that wires requestSource → InRequest, responseSource → InResponse,
27-
/// resetSource → InReset, Out → responseSink, OutSignal → signalSink.
28-
/// Returns the collected signals and responses once the stream completes.
29-
/// </summary>
3024
private async Task<(List<IControlItem> Signals, List<HttpResponseMessage> Responses)> RunStageAsync(
3125
Source<HttpRequestMessage, NotUsed> requestSource,
3226
Source<HttpResponseMessage, NotUsed> responseSource,
33-
Source<NotUsed, NotUsed> resetSource,
3427
TimeSpan? timeout = null)
3528
{
3629
var signalSink = Sink.Seq<IControlItem>();
@@ -41,11 +34,9 @@ private static HttpResponseMessage OkResponse()
4134
var corr = b.Add(new Http1XCorrelationStage());
4235
var reqSrc = b.Add(requestSource);
4336
var resSrc = b.Add(responseSource);
44-
var rstSrc = b.Add(resetSource);
4537

4638
b.From(reqSrc).To(corr.InRequest);
4739
b.From(resSrc).To(corr.InResponse);
48-
b.From(rstSrc).To(corr.InReset);
4940
b.From(corr.OutControl).To(sigSink);
5041
b.From(corr.OutResponse).To(resSink);
5142

@@ -59,113 +50,31 @@ private static HttpResponseMessage OkResponse()
5950
return (signals.ToList(), responses.ToList());
6051
}
6152

62-
[Fact(Timeout = 10_000, DisplayName = "RFC9112-9.3.2-PL-001: First request after connect is not pipelined")]
63-
public async Task Should_WaitForResponse_When_FirstRequestAfterConnect()
53+
[Fact(Timeout = 10_000, DisplayName = "RFC9112-9.3.2-PL-001: Each request waits for its response — two requests yield two signals")]
54+
public async Task Should_EmitSignalPerRequest_WhenEachRequestWaitsForResponse()
6455
{
6556
// Two requests emitted immediately, responses delayed and spaced.
66-
// Guard locks after req1 → req2 cannot be pulled until resp1 arrives and unlocks the guard.
67-
// After unlock, req2 becomes first-pending → signal #2.
57+
// With strict back-pressure: req1 is pulled → blocked on InResponse.
58+
// resp1 arrives → req1 paired, req2 pulled → signal #2.
59+
// resp2 arrives → req2 paired.
6860
var requests = new[]
6961
{
7062
new HttpRequestMessage(HttpMethod.Get, "http://example.com/1"),
7163
new HttpRequestMessage(HttpMethod.Get, "http://example.com/2")
7264
};
7365

74-
// resp1 at ~200ms, resp2 at ~800ms — spaced so req2 processes before resp2 arrives
66+
// resp1 at ~200ms, resp2 at ~800ms
7567
var responseSource = Source.Single(OkResponse())
7668
.InitialDelay(TimeSpan.FromMilliseconds(200))
7769
.Concat(Source.Single(OkResponse())
7870
.InitialDelay(TimeSpan.FromMilliseconds(600)));
7971

8072
var (signals, responses) = await RunStageAsync(
8173
Source.From(requests),
82-
responseSource,
83-
Source.Never<NotUsed>());
74+
responseSource);
8475

8576
Assert.Equal(2, responses.Count);
8677
Assert.Equal(2, signals.Count);
8778
Assert.All(signals, s => Assert.IsType<StreamAcquireItem>(s));
8879
}
89-
90-
[Fact(Timeout = 10_000, DisplayName = "RFC9112-9.3.2-PL-002: Second request may pipeline after first response")]
91-
public async Task Should_Pipeline_When_FirstResponseReceived()
92-
{
93-
// Three requests emitted immediately, responses delayed and spaced.
94-
// Guard locks after req1 → req2 blocked.
95-
// At ~200ms: resp1 → unlock → pull req2 → req2 arrives (signal #2), pull req3 (unlocked).
96-
// req3 arrives with pending=1 → NO signal (pipelined behind req2).
97-
// At ~800ms: resp2+resp3 → correlate remaining.
98-
// Result: 2 signals proves pipelining occurred (req3 did not become first-pending).
99-
var requests = new[]
100-
{
101-
new HttpRequestMessage(HttpMethod.Get, "http://example.com/1"),
102-
new HttpRequestMessage(HttpMethod.Get, "http://example.com/2"),
103-
new HttpRequestMessage(HttpMethod.Get, "http://example.com/3")
104-
};
105-
106-
// resp1 at ~200ms, resp2+resp3 at ~800ms — req2+req3 arrive and pipeline before resp2
107-
var responseSource = Source.Single(OkResponse())
108-
.InitialDelay(TimeSpan.FromMilliseconds(200))
109-
.Concat(Source.From(new[] { OkResponse(), OkResponse() })
110-
.InitialDelay(TimeSpan.FromMilliseconds(600)));
111-
112-
var (signals, responses) = await RunStageAsync(
113-
Source.From(requests),
114-
responseSource,
115-
Source.Never<NotUsed>());
116-
117-
Assert.Equal(3, responses.Count);
118-
// req1 → signal (first-pending, guard locked)
119-
// req2 → signal (first-pending after resp1 unlocks guard)
120-
// req3 → no signal (pipelined behind req2 while guard is unlocked)
121-
Assert.Equal(2, signals.Count);
122-
Assert.All(signals, s => Assert.IsType<StreamAcquireItem>(s));
123-
}
124-
125-
[Fact(Timeout = 10_000, DisplayName = "RFC9112-9.3.2-PL-003: Reconnect resets pipeline guard")]
126-
public async Task Should_ResetGuard_When_ConnectionReestablished()
127-
{
128-
// req1 immediate, req2+req3 delayed 400ms (arrive after reset).
129-
// resp1 at ~100ms, resp2+resp3 at ~600ms.
130-
// Reset at ~250ms (after resp1 unlocks, before req2 arrives).
131-
//
132-
// Timeline:
133-
// t=0: req1 → signal #1, guard locked
134-
// t=100: resp1 → correlate → unlock → pull (no req available yet)
135-
// t=250: reset → re-lock guard
136-
// t=400: req2 → signal #2, guard locked → don't pull req3
137-
// t=600: resp2 → correlate → unlock → pull req3
138-
// req3 → signal #3
139-
// resp3 → correlate
140-
//
141-
// Result: 3 signals proves reset re-locked the guard.
142-
var req1 = new HttpRequestMessage(HttpMethod.Get, "http://example.com/1");
143-
var req2 = new HttpRequestMessage(HttpMethod.Get, "http://example.com/2");
144-
var req3 = new HttpRequestMessage(HttpMethod.Get, "http://example.com/3");
145-
146-
var requestSource = Source.Single(req1)
147-
.Concat(Source.From(new[] { req2, req3 })
148-
.InitialDelay(TimeSpan.FromMilliseconds(400)));
149-
150-
var responseSource = Source.Single(OkResponse())
151-
.InitialDelay(TimeSpan.FromMilliseconds(100))
152-
.Concat(Source.From(new[] { OkResponse(), OkResponse() })
153-
.InitialDelay(TimeSpan.FromMilliseconds(500)));
154-
155-
var resetSource = Source.Single(NotUsed.Instance)
156-
.InitialDelay(TimeSpan.FromMilliseconds(250));
157-
158-
var (signals, responses) = await RunStageAsync(
159-
requestSource,
160-
responseSource,
161-
resetSource);
162-
163-
Assert.Equal(3, responses.Count);
164-
// req1 → signal (first-pending, guard locked)
165-
// reset → guard re-locked
166-
// req2 → signal (first-pending after reset)
167-
// req3 → signal (first-pending after resp2 unlocks, guard was re-locked)
168-
Assert.Equal(3, signals.Count);
169-
Assert.All(signals, s => Assert.IsType<StreamAcquireItem>(s));
170-
}
17180
}

src/TurboHttp/Streams/Http10Engine.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@ public BidiFlow<HttpRequestMessage, IOutputItem, IInputItem, HttpResponseMessage
2828

2929
b.From(decoder.Outlet).To(correlation.InResponse);
3030

31-
var resetSrc = b.Add(Source.Empty<NotUsed>());
32-
b.From(resetSrc).To(correlation.InReset);
33-
3431
return new BidiShape<
3532
HttpRequestMessage,
3633
IOutputItem,

src/TurboHttp/Streams/Http11Engine.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@ public BidiFlow<HttpRequestMessage, IOutputItem, IInputItem, HttpResponseMessage
3030

3131
b.From(decoder.Outlet).To(correlation.InResponse);
3232

33-
var resetSrc = b.Add(Source.Empty<NotUsed>());
34-
b.From(resetSrc).To(correlation.InReset);
35-
3633
var signalCast = b.Add(Flow.Create<IControlItem>().Select(IOutputItem (x) => x));
3734

3835
var batchFlow = b.Add(

0 commit comments

Comments
 (0)