Skip to content

Commit 8b3b110

Browse files
committed
TASK-032-003: Thread maxSubstreamsPerKey and slotQueueSize through extensions
Add `maxSubstreamsPerKey` (default 1) and `slotQueueSize` (default 64, renamed from `queueSize`) parameters to `GroupByRequestKey<T, TMat>` extension and `HostKeyMergeBack<T, TMat>`. Both are forwarded to `GroupByRequestKeyStage` so callers can configure multi-slot routing without breaking existing call sites.
1 parent 7ba6fcd commit 8b3b110

4 files changed

Lines changed: 15 additions & 12 deletions

File tree

.maggus/features/feature_032.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,11 @@ Two architectural improvements to TurboHttp's stream pipeline:
8787
**Parallel:** no
8888

8989
**Acceptance Criteria:**
90-
- [ ] `GroupByRequestKey<T, TMat>` extension gains `maxSubstreamsPerKey = 1` parameter
91-
- [ ] `GroupByRequestKey<T, TMat>` extension gains `slotQueueSize = 64` parameter
92-
- [ ] `HostKeyMergeBack<T, TMat>` stores and passes both parameters to `GroupByRequestKeyStage`
93-
- [ ] Existing call sites compile without changes (defaults preserve old behavior)
94-
- [ ] Build succeeds with zero errors
90+
- [x] `GroupByRequestKey<T, TMat>` extension gains `maxSubstreamsPerKey = 1` parameter
91+
- [x] `GroupByRequestKey<T, TMat>` extension gains `slotQueueSize = 64` parameter
92+
- [x] `HostKeyMergeBack<T, TMat>` stores and passes both parameters to `GroupByRequestKeyStage`
93+
- [x] Existing call sites compile without changes (defaults preserve old behavior)
94+
- [x] Build succeeds with zero errors
9595

9696
---
9797

src/TurboHttp.StreamTests/Streams/13_GroupByHostKeyQueueSizeTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public async Task Should_ControlQueueSize_When_ConstructorParameterSpecified()
5454

5555
var flow = (Flow<HttpRequestMessage, HttpRequestMessage, NotUsed>)
5656
Flow.Create<HttpRequestMessage>()
57-
.GroupByRequestKey(RequestEndpoint.FromRequest, maxSubstreams: 16, queueSize: 128)
57+
.GroupByRequestKey(RequestEndpoint.FromRequest, maxSubstreams: 16, slotQueueSize: 128)
5858
.MergeSubstreams();
5959

6060
var results = await Source.From(requests)

src/TurboHttp/Streams/Stages/Internal/HostKeyGroupByExtensions.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ public static SubFlow<T, TMat, Sink<T, TMat>> GroupByRequestKey<T, TMat>(
1919
this IFlow<T, TMat> flow,
2020
Func<T, RequestEndpoint> keyFunction,
2121
int maxSubstreams,
22-
int queueSize = 64)
22+
int slotQueueSize = 64,
23+
int maxSubstreamsPerKey = 1)
2324
{
24-
var mergeBack = new HostKeyMergeBack<T, TMat>(flow, keyFunction, maxSubstreams, queueSize);
25+
var mergeBack = new HostKeyMergeBack<T, TMat>(flow, keyFunction, maxSubstreams, slotQueueSize, maxSubstreamsPerKey);
2526

2627
// Flow.Create<T>() gives Flow<T,T,NotUsed>; cast is safe because callers always
2728
// start with a flow whose TMat is NotUsed (e.g. Flow.Create<HttpRequestMessage>()).

src/TurboHttp/Streams/Stages/Internal/HostKeyMergeBack.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,17 @@ internal sealed class HostKeyMergeBack<TIn, TMat> : IMergeBack<TIn, TMat>
1515
private readonly IFlow<TIn, TMat> _baseFlow;
1616
private readonly Func<TIn, RequestEndpoint> _keyFunction;
1717
private readonly int _maxSubstreams;
18-
private readonly int _queueSize;
18+
private readonly int _slotQueueSize;
19+
private readonly int _maxSubstreamsPerKey;
1920

2021
public HostKeyMergeBack(IFlow<TIn, TMat> baseFlow, Func<TIn, RequestEndpoint> keyFunction, int maxSubstreams,
21-
int queueSize = 64)
22+
int slotQueueSize = 64, int maxSubstreamsPerKey = 1)
2223
{
2324
_baseFlow = baseFlow;
2425
_keyFunction = keyFunction;
2526
_maxSubstreams = maxSubstreams;
26-
_queueSize = queueSize;
27+
_slotQueueSize = slotQueueSize;
28+
_maxSubstreamsPerKey = maxSubstreamsPerKey;
2729
}
2830

2931
// Called by SubFlowImpl.MergeSubstreamsWithParallelism(breadth).
@@ -36,7 +38,7 @@ public IFlow<TOut, TMat> Apply<TOut>(Flow<TIn, TOut, TMat> flow, int breadth)
3638
: breadth;
3739

3840
return _baseFlow
39-
.Via(new GroupByRequestKeyStage<TIn>(_keyFunction, _maxSubstreams, _queueSize))
41+
.Via(new GroupByRequestKeyStage<TIn>(_keyFunction, _maxSubstreams, _slotQueueSize, _maxSubstreamsPerKey))
4042
.Via(Flow.Create<Source<TIn, NotUsed>>()
4143
.Select(src => src.Via(flow)))
4244
.Via(new MergeSubstreamsStage<TOut>(effectiveBreadth));

0 commit comments

Comments
 (0)