Skip to content

Commit 7ba6fcd

Browse files
committed
TASK-032-002: Extend GroupByRequestKeyStage with multi-slot SubflowGroup
Refactors `GroupByRequestKeyStage` to support N substream slots per `RequestEndpoint` key, enabling parallel connections when a slot is backpressured (e.g. HTTP/2 MaxConcurrentStreams reached). Key changes: - New `SubflowGroup` nested class holds `List<SubflowState>` slots per key - `SubflowState.HasCapacity` property: `!IsDead && !Offering` - `_subflows` changed from `Dictionary<RequestEndpoint, SubflowState>` to `Dictionary<RequestEndpoint, SubflowGroup>` - New `maxSubstreamsPerKey` constructor parameter (default `1`) - `HandlePush` routing: find capacity slot → or create new slot → or route to least-loaded slot when all limits are reached - Dead slot handling via `HandleDeadSlot`: transfers pending items to an alive slot in the same group, or creates a replacement slot - `TryFinish` and `TryCompleteStage` iterate all slots across all groups - `_onOfferComplete` identifies slots by reference within the group - `maxSubstreamsPerKey = 1` is fully backward-compatible with prior behavior
1 parent 84ed440 commit 7ba6fcd

16 files changed

Lines changed: 235 additions & 112 deletions

.maggus/features/feature_032.md

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,19 +61,19 @@ Two architectural improvements to TurboHttp's stream pipeline:
6161
**Parallel:** no
6262

6363
**Acceptance Criteria:**
64-
- [ ] New nested class `SubflowGroup` holds `List<SubflowState>` slots for one key
65-
- [ ] `SubflowState` gains `HasCapacity` property: `!IsDead && !Offering`
66-
- [ ] `_subflows` changes from `Dictionary<RequestEndpoint, SubflowState>` to `Dictionary<RequestEndpoint, SubflowGroup>`
67-
- [ ] New stage constructor parameter `maxSubstreamsPerKey` (int, default `1`)
68-
- [ ] `HandlePush` routing logic:
64+
- [x] New nested class `SubflowGroup` holds `List<SubflowState>` slots for one key
65+
- [x] `SubflowState` gains `HasCapacity` property: `!IsDead && !Offering`
66+
- [x] `_subflows` changes from `Dictionary<RequestEndpoint, SubflowState>` to `Dictionary<RequestEndpoint, SubflowGroup>`
67+
- [x] New stage constructor parameter `maxSubstreamsPerKey` (int, default `1`)
68+
- [x] `HandlePush` routing logic:
6969
- Find first slot with `HasCapacity = true` → route there
7070
- If none found: clean dead slots, check per-key and total limits → create new slot OR route to least-loaded slot
71-
- [ ] Dead slot handling: pending items transferred to another alive slot in the same group, or new slot created (replaces `ReplaceSubstream`)
72-
- [ ] `TryFinish`, `TryCompleteStage` iterate `group.Slots` instead of single `SubflowState`
73-
- [ ] `_onOfferComplete` correctly identifies the state by slot reference within the group
74-
- [ ] `maxSubstreamsPerKey = 1` behaves identically to the previous implementation (backward compat)
75-
- [ ] All existing `GroupByRequestKeyStage` tests pass unchanged
76-
- [ ] Build succeeds with zero errors
71+
- [x] Dead slot handling: pending items transferred to another alive slot in the same group, or new slot created (replaces `ReplaceSubstream`)
72+
- [x] `TryFinish`, `TryCompleteStage` iterate `group.Slots` instead of single `SubflowState`
73+
- [x] `_onOfferComplete` correctly identifies the state by slot reference within the group
74+
- [x] `maxSubstreamsPerKey = 1` behaves identically to the previous implementation (backward compat)
75+
- [x] All existing `GroupByRequestKeyStage` tests pass unchanged
76+
- [x] Build succeeds with zero errors
7777

7878
---
7979

src/TurboHttp.StreamTests/Streams/01_StageOrderingTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ byte[] ResponseWithCookieAndCache() =>
382382
RedirectPolicy: null,
383383
RetryPolicy: null,
384384
Expect100Policy: null,
385-
RequestCompressionPolicy: null,
385+
CompressionPolicy: null,
386386
CookieJar: jar,
387387
CacheStore: store,
388388
CachePolicy: null,
@@ -471,7 +471,7 @@ public async Task Should_CompleteRequest_When_FullPipelineWithCookieInjectionBef
471471
RedirectPolicy: null,
472472
RetryPolicy: null,
473473
Expect100Policy: null,
474-
RequestCompressionPolicy: null,
474+
CompressionPolicy: null,
475475
CookieJar: new CookieJar(),
476476
CacheStore: null,
477477
CachePolicy: null,
@@ -574,7 +574,7 @@ byte[] ResponseFactory()
574574
RedirectPolicy: new RedirectPolicy(),
575575
RetryPolicy: null,
576576
Expect100Policy: null,
577-
RequestCompressionPolicy: null,
577+
CompressionPolicy: null,
578578
CookieJar: null,
579579
CacheStore: null,
580580
CachePolicy: null,

src/TurboHttp.StreamTests/Streams/11_EnginePipelineDescriptorTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public async Task Should_InjectCookieHeader_When_CookieJarHasMatchingCookie()
139139
RedirectPolicy: null,
140140
RetryPolicy: null,
141141
Expect100Policy: null,
142-
RequestCompressionPolicy: null,
142+
CompressionPolicy: null,
143143
CookieJar: cookieJar,
144144
CacheStore: null,
145145
CachePolicy: null,
@@ -182,7 +182,7 @@ public async Task Should_RetryOn503AndReturn200_When_RetryPolicyIsSet()
182182
RedirectPolicy: null,
183183
RetryPolicy: new RetryPolicy(),
184184
Expect100Policy: null,
185-
RequestCompressionPolicy: null,
185+
CompressionPolicy: null,
186186
CookieJar: null,
187187
CacheStore: null,
188188
CachePolicy: null,
@@ -218,7 +218,7 @@ public async Task Should_FollowRedirectAndReturn200_When_RedirectPolicyIsSet()
218218
RedirectPolicy: new RedirectPolicy(),
219219
RetryPolicy: null,
220220
Expect100Policy: null,
221-
RequestCompressionPolicy: null,
221+
CompressionPolicy: null,
222222
CookieJar: null,
223223
CacheStore: null,
224224
CachePolicy: null,

src/TurboHttp.StreamTests/Streams/14_FeedbackBufferOptimizationTests.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public async Task Should_CompleteViaFeedbackBuffer_When_Single301RedirectOccurs(
6565
RedirectPolicy: new RedirectPolicy(),
6666
RetryPolicy: null,
6767
Expect100Policy: null,
68-
RequestCompressionPolicy: null,
68+
CompressionPolicy: null,
6969
CookieJar: null,
7070
CacheStore: null,
7171
CachePolicy: null,
@@ -97,7 +97,7 @@ public async Task Should_CompleteWithoutDeadlock_When_ThreeChainedRedirectsOccur
9797
RedirectPolicy: new RedirectPolicy { MaxRedirects = 10 },
9898
RetryPolicy: null,
9999
Expect100Policy: null,
100-
RequestCompressionPolicy: null,
100+
CompressionPolicy: null,
101101
CookieJar: null,
102102
CacheStore: null,
103103
CachePolicy: null,
@@ -133,7 +133,7 @@ public async Task Should_CompleteViaFeedbackBuffer_When_Single408RetryOccurs()
133133
RedirectPolicy: null,
134134
RetryPolicy: new RetryPolicy { MaxRetries = 3, RespectRetryAfter = false },
135135
Expect100Policy: null,
136-
RequestCompressionPolicy: null,
136+
CompressionPolicy: null,
137137
CookieJar: null,
138138
CacheStore: null,
139139
CachePolicy: null,
@@ -165,7 +165,7 @@ public async Task Should_CompleteSuccessfully_When_TwoRetriesThenOkReceived()
165165
RedirectPolicy: null,
166166
RetryPolicy: new RetryPolicy { MaxRetries = 3, RespectRetryAfter = false },
167167
Expect100Policy: null,
168-
RequestCompressionPolicy: null,
168+
CompressionPolicy: null,
169169
CookieJar: null,
170170
CacheStore: null,
171171
CachePolicy: null,
@@ -197,7 +197,7 @@ public async Task Should_PreserveOriginalMethod_When_307RedirectViaFeedbackLoop(
197197
RedirectPolicy: new RedirectPolicy(),
198198
RetryPolicy: null,
199199
Expect100Policy: null,
200-
RequestCompressionPolicy: null,
200+
CompressionPolicy: null,
201201
CookieJar: null,
202202
CacheStore: null,
203203
CachePolicy: null,

src/TurboHttp.StreamTests/Streams/16_EngineBidiFlowCompositionTests.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public async Task Should_StillDeliverResponse_When_AutomaticDecompressionDisable
113113
// from the feature chain — it does not prevent protocol-level decompression.
114114
// This test verifies the pipeline still works correctly with the flag disabled.
115115
var descriptor = new PipelineDescriptor(
116-
RedirectPolicy: null, RetryPolicy: null, Expect100Policy: null, RequestCompressionPolicy: null, CookieJar: null,
116+
RedirectPolicy: null, RetryPolicy: null, Expect100Policy: null, CompressionPolicy: null, CookieJar: null,
117117
CacheStore: null, CachePolicy: null, Handlers: [],
118118
AutomaticDecompression: false);
119119

@@ -137,7 +137,7 @@ public async Task Should_RetryOn503_When_OnlyRetryPolicyIsSet()
137137
byte[] Factory() => ++callCount == 1 ? Response503() : Ok200();
138138

139139
var descriptor = new PipelineDescriptor(
140-
RedirectPolicy: null, RetryPolicy: new RetryPolicy(), Expect100Policy: null, RequestCompressionPolicy: null,
140+
RedirectPolicy: null, RetryPolicy: new RetryPolicy(), Expect100Policy: null, CompressionPolicy: null,
141141
CookieJar: null, CacheStore: null, CachePolicy: null, Handlers: []);
142142

143143
var flow = BuildFlow(descriptor, Factory);
@@ -159,7 +159,7 @@ public async Task Should_FollowRedirect_When_OnlyRedirectPolicyIsSet()
159159
byte[] Factory() => ++callCount == 1 ? Response301() : Ok200();
160160

161161
var descriptor = new PipelineDescriptor(
162-
RedirectPolicy: new RedirectPolicy(), RetryPolicy: null, Expect100Policy: null, RequestCompressionPolicy: null,
162+
RedirectPolicy: new RedirectPolicy(), RetryPolicy: null, Expect100Policy: null, CompressionPolicy: null,
163163
CookieJar: null, CacheStore: null, CachePolicy: null, Handlers: []);
164164

165165
var flow = BuildFlow(descriptor, Factory);
@@ -179,7 +179,7 @@ public async Task Should_StoreCookieFromResponse_When_OnlyCookieJarIsSet()
179179
{
180180
var jar = new CookieJar();
181181
var descriptor = new PipelineDescriptor(
182-
RedirectPolicy: null, RetryPolicy: null, Expect100Policy: null, RequestCompressionPolicy: null,
182+
RedirectPolicy: null, RetryPolicy: null, Expect100Policy: null, CompressionPolicy: null,
183183
CookieJar: jar, CacheStore: null, CachePolicy: null, Handlers: []);
184184

185185
var flow = BuildFlow(descriptor, Response200WithSetCookie);
@@ -212,7 +212,7 @@ byte[] Factory()
212212
}
213213

214214
var descriptor = new PipelineDescriptor(
215-
RedirectPolicy: null, RetryPolicy: null, Expect100Policy: null, RequestCompressionPolicy: null,
215+
RedirectPolicy: null, RetryPolicy: null, Expect100Policy: null, CompressionPolicy: null,
216216
CookieJar: null, CacheStore: store, CachePolicy: null, Handlers: []);
217217

218218
var flow = BuildFlow(descriptor, Factory);
@@ -247,7 +247,7 @@ public async Task Should_Return200Ok_When_AllFeaturesEnabled()
247247
var descriptor = new PipelineDescriptor(
248248
RedirectPolicy: new RedirectPolicy(),
249249
RetryPolicy: new RetryPolicy(),
250-
Expect100Policy: null, RequestCompressionPolicy: null,
250+
Expect100Policy: null, CompressionPolicy: null,
251251
CookieJar: new CookieJar(),
252252
CacheStore: new CacheStore(),
253253
CachePolicy: null,
@@ -278,7 +278,7 @@ byte[] Factory() => ++callCount == 1 ? Response503() :
278278
var descriptor = new PipelineDescriptor(
279279
RedirectPolicy: new RedirectPolicy(),
280280
RetryPolicy: new RetryPolicy(),
281-
Expect100Policy: null, RequestCompressionPolicy: null,
281+
Expect100Policy: null, CompressionPolicy: null,
282282
CookieJar: jar,
283283
CacheStore: store,
284284
CachePolicy: null,
@@ -345,7 +345,7 @@ public async Task Should_ReturnRawCompressedBytes_When_AutomaticDecompressionDis
345345
compressedBody.CopyTo(responseBytes, headerBytes.Length);
346346

347347
var descriptor = new PipelineDescriptor(
348-
RedirectPolicy: null, RetryPolicy: null, Expect100Policy: null, RequestCompressionPolicy: null, CookieJar: null,
348+
RedirectPolicy: null, RetryPolicy: null, Expect100Policy: null, CompressionPolicy: null, CookieJar: null,
349349
CacheStore: null, CachePolicy: null, Handlers: [],
350350
AutomaticDecompression: false);
351351

src/TurboHttp.Tests/Hosting/PipelineDescriptorTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public void AutomaticDecompression_DefaultsToTrue()
3030
RedirectPolicy: null,
3131
RetryPolicy: null,
3232
Expect100Policy: null,
33-
RequestCompressionPolicy: null,
33+
CompressionPolicy: null,
3434
CookieJar: null,
3535
CacheStore: null,
3636
CachePolicy: null,
@@ -46,7 +46,7 @@ public void AutomaticDecompression_CanBeSetToFalse()
4646
RedirectPolicy: null,
4747
RetryPolicy: null,
4848
Expect100Policy: null,
49-
RequestCompressionPolicy: null,
49+
CompressionPolicy: null,
5050
CookieJar: null,
5151
CacheStore: null,
5252
CachePolicy: null,

src/TurboHttp.Tests/RFC9110/11_RequestCompressionTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
namespace TurboHttp.Tests.RFC9110;
44

55
/// <summary>
6-
/// Tests for <see cref="ContentEncodingEncoder"/> and <see cref="RequestCompressionPolicy"/>.
6+
/// Tests for <see cref="ContentEncodingEncoder"/> and <see cref="CompressionPolicy"/>.
77
/// RFC 9110 §8.4 — A sender that applied content encoding MUST generate a Content-Encoding
88
/// header field listing the applied encodings.
99
/// </summary>
@@ -50,7 +50,7 @@ public void Should_SetHeader_When_Compressed()
5050
[Fact(DisplayName = "RFC9110-8.4-RC-003: Small body not compressed")]
5151
public void Should_NotCompress_When_BelowThreshold()
5252
{
53-
var policy = new RequestCompressionPolicy { MinBodySizeBytes = 1024 };
53+
var policy = new CompressionPolicy { MinBodySizeBytes = 1024 };
5454
using var request = new HttpRequestMessage(HttpMethod.Post, "http://example.com/upload");
5555
request.Content = new ByteArrayContent(new byte[512]);
5656
request.Content.Headers.ContentLength = 512;
@@ -98,7 +98,7 @@ public void Should_Throw_When_UnknownEncoding()
9898
[Fact(DisplayName = "RFC9110-8.4-RC-007: Default policy has 1024 threshold and gzip")]
9999
public void Should_HaveDefaults_When_DefaultPolicy()
100100
{
101-
var policy = RequestCompressionPolicy.Default;
101+
var policy = CompressionPolicy.Default;
102102

103103
Assert.Equal(1024, policy.MinBodySizeBytes);
104104
Assert.Equal("gzip", policy.Encoding);
@@ -123,7 +123,7 @@ public void Should_UpdateContentLength_When_Compressed()
123123
[Fact(DisplayName = "RFC9110-8.4-RC-009: No body passes through unchanged")]
124124
public void Should_PassThrough_When_NoBody()
125125
{
126-
var policy = new RequestCompressionPolicy { MinBodySizeBytes = 1024 };
126+
var policy = new CompressionPolicy { MinBodySizeBytes = 1024 };
127127
using var request = new HttpRequestMessage(HttpMethod.Get, "http://example.com/");
128128

129129
Assert.Null(request.Content);

src/TurboHttp/Protocol/RFC9110/RequestCompressionPolicy.cs renamed to src/TurboHttp/Protocol/RFC9110/CompressionPolicy.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ namespace TurboHttp.Protocol.RFC9110;
55
/// RFC 9110 §8.4 — A sender that applies content encoding MUST generate a Content-Encoding
66
/// header field listing the applied encodings.
77
/// </summary>
8-
public sealed record RequestCompressionPolicy
8+
public sealed record CompressionPolicy
99
{
1010
/// <summary>Default policy: gzip encoding, bodies >= 1024 bytes compressed.</summary>
11-
public static readonly RequestCompressionPolicy Default = new();
11+
public static readonly CompressionPolicy Default = new();
1212

1313
/// <summary>
1414
/// The content encoding to apply (e.g. "gzip", "deflate", "br").

src/TurboHttp/Protocol/RFC9110/IfRangeValidator.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ internal static class IfRangeValidator
1616
private static readonly string[] HttpDateFormats =
1717
[
1818
"r", // RFC 1123
19-
"dddd, dd-MMM-yy HH:mm:ss 'GMT'", // RFC 850
20-
"ddd MMM d HH:mm:ss yyyy", // asctime
21-
"ddd MMM dd HH:mm:ss yyyy", // asctime (two-digit day)
19+
"dddd, dd-MMM-yy HH:mm:ss 'GMT'", // RFC 850
20+
"ddd MMM d HH:mm:ss yyyy", // asctime
21+
"ddd MMM dd HH:mm:ss yyyy", // asctime (two-digit day)
2222
];
2323

2424
/// <summary>
@@ -70,7 +70,7 @@ public static void Validate(HttpRequestMessage request)
7070
}
7171
}
7272

73-
private static string? GetSingleValue(System.Collections.Generic.IEnumerable<string> values)
73+
private static string? GetSingleValue(IEnumerable<string> values)
7474
{
7575
using var enumerator = values.GetEnumerator();
7676
return enumerator.MoveNext() ? enumerator.Current : null;

src/TurboHttp/Protocol/RFC9110/RangeParser.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using System.Globalization;
2+
13
namespace TurboHttp.Protocol.RFC9110;
24

35
/// <summary>
@@ -69,7 +71,7 @@ internal sealed record ContentRangeValue(string Unit, long? First, long? Last, l
6971
long? length = null;
7072
if (lengthPart is not "*")
7173
{
72-
if (!long.TryParse(lengthPart, System.Globalization.NumberStyles.None, System.Globalization.CultureInfo.InvariantCulture, out var parsedLength) || parsedLength < 0)
74+
if (!long.TryParse(lengthPart, NumberStyles.None, CultureInfo.InvariantCulture, out var parsedLength) || parsedLength < 0)
7375
{
7476
return null;
7577
}
@@ -86,7 +88,7 @@ internal sealed record ContentRangeValue(string Unit, long? First, long? Last, l
8688
// Suffix range: "-NNN"
8789
if (rangePart.Length > 1 && rangePart[0] == '-' && rangePart[1..].IndexOf('-') < 0)
8890
{
89-
if (!long.TryParse(rangePart[1..], System.Globalization.NumberStyles.None, System.Globalization.CultureInfo.InvariantCulture, out var suffixLength) || suffixLength < 0)
91+
if (!long.TryParse(rangePart[1..], NumberStyles.None, CultureInfo.InvariantCulture, out var suffixLength) || suffixLength < 0)
9092
{
9193
return null;
9294
}
@@ -104,12 +106,12 @@ internal sealed record ContentRangeValue(string Unit, long? First, long? Last, l
104106
var firstPart = rangePart[..dashIndex].Trim();
105107
var lastPart = rangePart[(dashIndex + 1)..].Trim();
106108

107-
if (!long.TryParse(firstPart, System.Globalization.NumberStyles.None, System.Globalization.CultureInfo.InvariantCulture, out var first) || first < 0)
109+
if (!long.TryParse(firstPart, NumberStyles.None, CultureInfo.InvariantCulture, out var first) || first < 0)
108110
{
109111
return null;
110112
}
111113

112-
if (!long.TryParse(lastPart, System.Globalization.NumberStyles.None, System.Globalization.CultureInfo.InvariantCulture, out var last) || last < 0)
114+
if (!long.TryParse(lastPart, NumberStyles.None, CultureInfo.InvariantCulture, out var last) || last < 0)
113115
{
114116
return null;
115117
}

0 commit comments

Comments
 (0)