Skip to content

Commit 3a6d136

Browse files
committed
Remove PipelineRetryItem
1 parent 32ac9a1 commit 3a6d136

9 files changed

Lines changed: 96 additions & 291 deletions

File tree

src/TurboHTTP.StreamTests/Http10/Http10ConnectionStageReconnectSpec.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public async Task Http10ConnectionStage_should_reconnect_and_replay_request_on_c
8787

8888
[Fact(Timeout = 10000)]
8989
[Trait("RFC", "RFC1945-8")]
90-
public async Task Http10ConnectionStage_should_fail_stage_when_max_reconnect_attempts_exceeded()
90+
public async Task Http10ConnectionStage_should_complete_stage_when_max_reconnect_attempts_exceeded()
9191
{
9292
var stage = new Http10ConnectionStage(maxReconnectAttempts: 1);
9393

@@ -126,8 +126,8 @@ public async Task Http10ConnectionStage_should_fail_stage_when_max_reconnect_att
126126
// Reconnect fails → CloseSignalItem again (attempt 2 exceeds max of 1)
127127
serverSub.SendNext(new CloseSignalItem(TlsCloseKind.AbruptClose));
128128

129-
// Stage should fail — error propagates to response subscriber
130-
await Task.Run(() => responseSub.ExpectError(), TestContext.Current.CancellationToken);
129+
// Stage should complete
130+
await Task.Run(() => responseSub.ExpectComplete(), TestContext.Current.CancellationToken);
131131
}
132132

133133
[Fact(Timeout = 10000)]
@@ -170,4 +170,4 @@ public async Task Http10ConnectionStage_should_not_reconnect_when_no_inflight_re
170170
serverSub.SendComplete();
171171
await Task.Run(() => networkSub.ExpectComplete(), TestContext.Current.CancellationToken);
172172
}
173-
}
173+
}

src/TurboHTTP.StreamTests/Http10/Http10ConnectionStageSpec.cs

Lines changed: 0 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -191,56 +191,4 @@ public async Task Http10ConnectionStage_should_emit_connection_reuse_close_for_h
191191
// HTTP/1.0 default is close (RFC 1945)
192192
Assert.False(connectionReuse.Decision.CanReuse);
193193
}
194-
195-
[Fact(Timeout = 10_000)]
196-
[Trait("RFC", "RFC1945-7.2")]
197-
public async Task Http10ConnectionStage_should_emit_pipeline_retry_when_server_closes_with_inflight_request()
198-
{
199-
var stage = new Http10ConnectionStage();
200-
201-
var appProbe = this.CreateManualPublisherProbe<HttpRequestMessage>();
202-
var serverProbe = this.CreateManualPublisherProbe<IInputItem>();
203-
var networkSub = this.CreateManualSubscriberProbe<IOutputItem>();
204-
var responseSub = this.CreateManualSubscriberProbe<HttpResponseMessage>();
205-
206-
RunnableGraph.FromGraph(GraphDsl.Create(b =>
207-
{
208-
var s = b.Add(stage);
209-
var app = b.Add(Source.FromPublisher(appProbe));
210-
var server = b.Add(Source.FromPublisher(serverProbe));
211-
var netSink = b.Add(Sink.FromSubscriber(networkSub));
212-
var resSink = b.Add(Sink.FromSubscriber(responseSub));
213-
214-
b.From(app).To(s.InApp);
215-
b.From(server).To(s.InServer);
216-
b.From(s.OutNetwork).To(netSink);
217-
b.From(s.OutResponse).To(resSink);
218-
219-
return ClosedShape.Instance;
220-
})).Run(Materializer);
221-
222-
var netSubscription = await networkSub.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
223-
var resSubscription = await responseSub.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
224-
var appSubscription = await appProbe.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
225-
var serverSubscription = await serverProbe.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
226-
227-
netSubscription.Request(10);
228-
resSubscription.Request(10);
229-
230-
// Send a request
231-
var request = MakeRequest("/test");
232-
appSubscription.SendNext(request);
233-
234-
// Consume StreamAcquire + NetworkBuffer
235-
await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
236-
await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
237-
238-
// Server closes abruptly without sending a response
239-
serverSubscription.SendComplete();
240-
241-
// Should emit PipelineRetryItem with the original request
242-
var retryItem = await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
243-
var pipelineRetry = Assert.IsType<PipelineRetryItem>(retryItem);
244-
Assert.Same(request, pipelineRetry.Request);
245-
}
246194
}

src/TurboHTTP.StreamTests/Http11/Http11ConnectionStageReconnectSpec.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public async Task Http11ConnectionStage_should_reconnect_and_replay_request_on_c
8787

8888
[Fact(Timeout = 10000)]
8989
[Trait("RFC", "RFC9112-9.3")]
90-
public async Task Http11ConnectionStage_should_fail_stage_when_max_reconnect_attempts_exceeded()
90+
public async Task Http11ConnectionStage_should_complete_stage_when_max_reconnect_attempts_exceeded()
9191
{
9292
var stage = new Http11ConnectionStage(maxPipelineDepth: 1, maxReconnectAttempts: 1);
9393

@@ -126,8 +126,8 @@ public async Task Http11ConnectionStage_should_fail_stage_when_max_reconnect_att
126126
// Reconnect fails → CloseSignalItem again (attempt 2 exceeds max of 1)
127127
serverSub.SendNext(new CloseSignalItem(TlsCloseKind.AbruptClose));
128128

129-
// Stage should fail — error propagates to response subscriber
130-
await Task.Run(() => responseSub.ExpectError(), TestContext.Current.CancellationToken);
129+
// Stage should complete
130+
await Task.Run(() => responseSub.ExpectComplete(), TestContext.Current.CancellationToken);
131131
}
132132

133133
[Fact(Timeout = 10000)]

src/TurboHTTP.StreamTests/Http11/Http11ConnectionStageSpec.cs

Lines changed: 47 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -117,24 +117,24 @@ public async Task Http11ConnectionStage_should_decode_response_and_correlate_wit
117117
return ClosedShape.Instance;
118118
})).Run(Materializer);
119119

120-
var netSubscription = await networkSub.ExpectSubscriptionAsync();
121-
var resSubscription = await responseSub.ExpectSubscriptionAsync();
122-
var appSubscription = await appProbe.ExpectSubscriptionAsync();
123-
var serverSubscription = await serverProbe.ExpectSubscriptionAsync();
120+
var netSubscription = await networkSub.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
121+
var resSubscription = await responseSub.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
122+
var appSubscription = await appProbe.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
123+
var serverSubscription = await serverProbe.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
124124

125125
netSubscription.Request(10);
126126
resSubscription.Request(10);
127127

128128
appSubscription.SendNext(MakeRequest("/hello"));
129129

130130
// Consume outbound
131-
await networkSub.ExpectNextAsync();
132-
await networkSub.ExpectNextAsync();
131+
await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
132+
await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
133133

134134
serverSubscription.SendNext(MakeResponseBuffer(
135135
"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello"));
136136

137-
var response = await responseSub.ExpectNextAsync();
137+
var response = await responseSub.ExpectNextAsync(TestContext.Current.CancellationToken);
138138
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
139139
Assert.NotNull(response.RequestMessage);
140140
Assert.Equal("/hello", response.RequestMessage!.RequestUri!.AbsolutePath);
@@ -167,41 +167,41 @@ public async Task Http11ConnectionStage_should_support_pipelining_multiple_reque
167167
return ClosedShape.Instance;
168168
})).Run(Materializer);
169169

170-
var netSubscription = await networkSub.ExpectSubscriptionAsync();
171-
var resSubscription = await responseSub.ExpectSubscriptionAsync();
172-
var appSubscription = await appProbe.ExpectSubscriptionAsync();
173-
var serverSubscription = await serverProbe.ExpectSubscriptionAsync();
170+
var netSubscription = await networkSub.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
171+
var resSubscription = await responseSub.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
172+
var appSubscription = await appProbe.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
173+
var serverSubscription = await serverProbe.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
174174

175175
netSubscription.Request(20);
176176
resSubscription.Request(10);
177177

178178
// Send two requests (pipelined)
179179
appSubscription.SendNext(MakeRequest("/first"));
180180
// StreamAcquire + NetworkBuffer for first request
181-
await networkSub.ExpectNextAsync();
182-
await networkSub.ExpectNextAsync();
181+
await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
182+
await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
183183

184184
appSubscription.SendNext(MakeRequest("/second"));
185185
// StreamAcquire + NetworkBuffer for second request
186-
await networkSub.ExpectNextAsync();
187-
await networkSub.ExpectNextAsync();
186+
await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
187+
await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
188188

189189
// Send first response
190190
serverSubscription.SendNext(MakeResponseBuffer(
191191
"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nfirst"));
192192

193-
var resp1 = await responseSub.ExpectNextAsync();
193+
var resp1 = await responseSub.ExpectNextAsync(TestContext.Current.CancellationToken);
194194
Assert.Equal("/first", resp1.RequestMessage!.RequestUri!.AbsolutePath);
195195

196196
// ConnectionReuseItem for first response
197-
var reuse1 = await networkSub.ExpectNextAsync();
197+
var reuse1 = await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
198198
Assert.IsType<ConnectionReuseItem>(reuse1);
199199

200200
// Send second response
201201
serverSubscription.SendNext(MakeResponseBuffer(
202202
"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nsecond"));
203203

204-
var resp2 = await responseSub.ExpectNextAsync();
204+
var resp2 = await responseSub.ExpectNextAsync(TestContext.Current.CancellationToken);
205205
Assert.Equal("/second", resp2.RequestMessage!.RequestUri!.AbsolutePath);
206206
}
207207

@@ -232,10 +232,10 @@ public async Task Http11ConnectionStage_should_pipeline_requests_up_to_max_depth
232232
return ClosedShape.Instance;
233233
})).Run(Materializer);
234234

235-
var netSubscription = await networkSub.ExpectSubscriptionAsync();
236-
var resSubscription = await responseSub.ExpectSubscriptionAsync();
237-
var appSubscription = await appProbe.ExpectSubscriptionAsync();
238-
var serverSubscription = await serverProbe.ExpectSubscriptionAsync();
235+
var netSubscription = await networkSub.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
236+
var resSubscription = await responseSub.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
237+
var appSubscription = await appProbe.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
238+
var serverSubscription = await serverProbe.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
239239

240240
netSubscription.Request(100);
241241
resSubscription.Request(100);
@@ -246,9 +246,9 @@ public async Task Http11ConnectionStage_should_pipeline_requests_up_to_max_depth
246246
appSubscription.SendNext(MakeRequest("/req3"));
247247

248248
// Consume all 6 items (StreamAcquire + NetworkBuffer for each request)
249-
for (int i = 0; i < 6; i++)
249+
for (var i = 0; i < 6; i++)
250250
{
251-
await networkSub.ExpectNextAsync();
251+
await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
252252
}
253253

254254
// All 3 requests should have been accepted and encoded.
@@ -258,15 +258,15 @@ public async Task Http11ConnectionStage_should_pipeline_requests_up_to_max_depth
258258
serverSubscription.SendNext(MakeResponseBuffer("HTTP/1.1 200 OK\r\nContent-Length: 4\r\n\r\nres3"));
259259

260260
// Should get 3 responses
261-
var resp1 = await responseSub.ExpectNextAsync();
261+
var resp1 = await responseSub.ExpectNextAsync(TestContext.Current.CancellationToken);
262262
Assert.Equal(HttpStatusCode.OK, resp1.StatusCode);
263263
Assert.Equal("/req1", resp1.RequestMessage!.RequestUri!.AbsolutePath);
264264

265-
var resp2 = await responseSub.ExpectNextAsync();
265+
var resp2 = await responseSub.ExpectNextAsync(TestContext.Current.CancellationToken);
266266
Assert.Equal(HttpStatusCode.OK, resp2.StatusCode);
267267
Assert.Equal("/req2", resp2.RequestMessage!.RequestUri!.AbsolutePath);
268268

269-
var resp3 = await responseSub.ExpectNextAsync();
269+
var resp3 = await responseSub.ExpectNextAsync(TestContext.Current.CancellationToken);
270270
Assert.Equal(HttpStatusCode.OK, resp3.StatusCode);
271271
Assert.Equal("/req3", resp3.RequestMessage!.RequestUri!.AbsolutePath);
272272
}
@@ -298,10 +298,10 @@ public async Task Http11ConnectionStage_should_reduce_pipeline_depth_when_connec
298298
return ClosedShape.Instance;
299299
})).Run(Materializer);
300300

301-
var netSubscription = await networkSub.ExpectSubscriptionAsync();
302-
var resSubscription = await responseSub.ExpectSubscriptionAsync();
303-
var appSubscription = await appProbe.ExpectSubscriptionAsync();
304-
var serverSubscription = await serverProbe.ExpectSubscriptionAsync();
301+
var netSubscription = await networkSub.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
302+
var resSubscription = await responseSub.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
303+
var appSubscription = await appProbe.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
304+
var serverSubscription = await serverProbe.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
305305

306306
netSubscription.Request(100);
307307
resSubscription.Request(100);
@@ -310,19 +310,19 @@ public async Task Http11ConnectionStage_should_reduce_pipeline_depth_when_connec
310310
appSubscription.SendNext(MakeRequest("/req1"));
311311

312312
// Consume StreamAcquire + NetworkBuffer
313-
await networkSub.ExpectNextAsync();
314-
await networkSub.ExpectNextAsync();
313+
await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
314+
await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
315315

316316
// Send response with Connection: close header
317317
var responseWithClose = "HTTP/1.1 200 OK\r\nConnection: close\r\nContent-Length: 4\r\n\r\nres1";
318318
serverSubscription.SendNext(MakeResponseBuffer(responseWithClose));
319319

320320
// Get response
321-
var response = await responseSub.ExpectNextAsync();
321+
var response = await responseSub.ExpectNextAsync(TestContext.Current.CancellationToken);
322322
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
323323

324324
// Get ConnectionReuseItem on network outlet
325-
var reuseItem = await networkSub.ExpectNextAsync();
325+
var reuseItem = await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
326326
var connectionReuse = Assert.IsType<ConnectionReuseItem>(reuseItem);
327327
// Connection: close means cannot reuse
328328
Assert.False(connectionReuse.Decision.CanReuse);
@@ -332,13 +332,13 @@ public async Task Http11ConnectionStage_should_reduce_pipeline_depth_when_connec
332332
appSubscription.SendNext(MakeRequest("/req2"));
333333

334334
// Consume StreamAcquire + NetworkBuffer for req2
335-
await networkSub.ExpectNextAsync();
336-
await networkSub.ExpectNextAsync();
335+
await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
336+
await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
337337

338338
// Send response for req2
339339
serverSubscription.SendNext(MakeResponseBuffer("HTTP/1.1 200 OK\r\nContent-Length: 4\r\n\r\nres2"));
340340

341-
var response2 = await responseSub.ExpectNextAsync();
341+
var response2 = await responseSub.ExpectNextAsync(TestContext.Current.CancellationToken);
342342
Assert.Equal(HttpStatusCode.OK, response2.StatusCode);
343343
Assert.Equal("/req2", response2.RequestMessage!.RequestUri!.AbsolutePath);
344344
}
@@ -370,88 +370,28 @@ public async Task Http11ConnectionStage_should_emit_connection_reuse_keep_alive_
370370
return ClosedShape.Instance;
371371
})).Run(Materializer);
372372

373-
var netSubscription = await networkSub.ExpectSubscriptionAsync();
374-
var resSubscription = await responseSub.ExpectSubscriptionAsync();
375-
var appSubscription = await appProbe.ExpectSubscriptionAsync();
376-
var serverSubscription = await serverProbe.ExpectSubscriptionAsync();
373+
var netSubscription = await networkSub.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
374+
var resSubscription = await responseSub.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
375+
var appSubscription = await appProbe.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
376+
var serverSubscription = await serverProbe.ExpectSubscriptionAsync(TestContext.Current.CancellationToken);
377377

378378
netSubscription.Request(10);
379379
resSubscription.Request(10);
380380

381381
appSubscription.SendNext(MakeRequest());
382382

383383
// StreamAcquire + NetworkBuffer
384-
await networkSub.ExpectNextAsync();
385-
await networkSub.ExpectNextAsync();
384+
await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
385+
await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
386386

387387
serverSubscription.SendNext(MakeResponseBuffer(
388388
"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK"));
389389

390-
await responseSub.ExpectNextAsync();
390+
await responseSub.ExpectNextAsync(TestContext.Current.CancellationToken);
391391

392-
var reuseItem = await networkSub.ExpectNextAsync();
392+
var reuseItem = await networkSub.ExpectNextAsync(TestContext.Current.CancellationToken);
393393
var connectionReuse = Assert.IsType<ConnectionReuseItem>(reuseItem);
394394
// HTTP/1.1 default is keep-alive (RFC 9112)
395395
Assert.True(connectionReuse.Decision.CanReuse);
396396
}
397-
398-
[Fact(Timeout = 10_000)]
399-
[Trait("RFC", "RFC9112-7")]
400-
public async Task Http11ConnectionStage_should_emit_pipeline_retry_when_server_closes_with_inflight_requests()
401-
{
402-
var stage = new Http11ConnectionStage();
403-
404-
var appProbe = this.CreateManualPublisherProbe<HttpRequestMessage>();
405-
var serverProbe = this.CreateManualPublisherProbe<IInputItem>();
406-
var networkSub = this.CreateManualSubscriberProbe<IOutputItem>();
407-
var responseSub = this.CreateManualSubscriberProbe<HttpResponseMessage>();
408-
409-
RunnableGraph.FromGraph(GraphDsl.Create(b =>
410-
{
411-
var s = b.Add(stage);
412-
var app = b.Add(Source.FromPublisher(appProbe));
413-
var server = b.Add(Source.FromPublisher(serverProbe));
414-
var netSink = b.Add(Sink.FromSubscriber(networkSub));
415-
var resSink = b.Add(Sink.FromSubscriber(responseSub));
416-
417-
b.From(app).To(s.InApp);
418-
b.From(server).To(s.InServer);
419-
b.From(s.OutNetwork).To(netSink);
420-
b.From(s.OutResponse).To(resSink);
421-
422-
return ClosedShape.Instance;
423-
})).Run(Materializer);
424-
425-
var netSubscription = await networkSub.ExpectSubscriptionAsync();
426-
var resSubscription = await responseSub.ExpectSubscriptionAsync();
427-
var appSubscription = await appProbe.ExpectSubscriptionAsync();
428-
var serverSubscription = await serverProbe.ExpectSubscriptionAsync();
429-
430-
netSubscription.Request(100);
431-
resSubscription.Request(100);
432-
433-
// Send 2 pipelined requests
434-
var request1 = MakeRequest("/req1");
435-
var request2 = MakeRequest("/req2");
436-
appSubscription.SendNext(request1);
437-
appSubscription.SendNext(request2);
438-
439-
// Consume StreamAcquire + NetworkBuffer for both
440-
await networkSub.ExpectNextAsync(); // StreamAcquire 1
441-
await networkSub.ExpectNextAsync(); // NetworkBuffer 1
442-
await networkSub.ExpectNextAsync(); // StreamAcquire 2
443-
await networkSub.ExpectNextAsync(); // NetworkBuffer 2
444-
445-
// Server closes abruptly without sending responses
446-
serverSubscription.SendComplete();
447-
448-
// Should emit PipelineRetryItem for both orphaned requests
449-
var retryItem1 = await networkSub.ExpectNextAsync();
450-
var pipelineRetry1 = Assert.IsType<PipelineRetryItem>(retryItem1);
451-
Assert.Same(request1, pipelineRetry1.Request);
452-
453-
var retryItem2 = await networkSub.ExpectNextAsync();
454-
var pipelineRetry2 = Assert.IsType<PipelineRetryItem>(retryItem2);
455-
Assert.Same(request2, pipelineRetry2.Request);
456-
}
457-
}
397+
}

0 commit comments

Comments
 (0)