|
| 1 | +--- |
| 2 | +title: "Feature 025: Clean Protocol Core — Single GroupByRequestKey" |
| 3 | +description: "Invert the protocol-core topology so GroupByRequestKey is called once at the top level, with HTTP version routing and engine connection flows living inside each substream" |
| 4 | +tags: [features, architecture, streams, protocol-core, refactoring] |
| 5 | +status: planned |
| 6 | +--- |
| 7 | + |
| 8 | +# Feature 025: Clean Protocol Core — Single GroupByRequestKey |
| 9 | + |
| 10 | +## Summary |
| 11 | + |
| 12 | +| Field | Value | |
| 13 | +|-------|-------| |
| 14 | +| **Status** | 🟡 Planned | |
| 15 | +| **Category** | Architecture Refactoring | |
| 16 | +| **Scope** | 2 files (delete 1, rewrite 1) | |
| 17 | + |
| 18 | +## Problem |
| 19 | + |
| 20 | +`ProtocolCoreGraphBuilder` inverts the natural execution order. The current topology is: |
| 21 | + |
| 22 | +``` |
| 23 | +Partition (by HTTP version) |
| 24 | + ├─ GroupByRequestKey(256) → ConnectionFlow<Http10Engine> → MergeSubstreams |
| 25 | + ├─ GroupByRequestKey(256) → ConnectionFlow<Http11Engine> → MergeSubstreams |
| 26 | + ├─ GroupByRequestKey(64) → ConnectionFlow<Http20Engine> → MergeSubstreams |
| 27 | + └─ GroupByRequestKey(64) → ConnectionFlow<Http30Engine> → MergeSubstreams |
| 28 | +Merge |
| 29 | +``` |
| 30 | + |
| 31 | +`GroupByRequestKey` is instantiated **four times** — once per HTTP version lane. The grouping key (`RequestEndpoint`) already contains the HTTP version, so the Partition and the per-lane GroupBy are doing redundant work at different levels of the graph. |
| 32 | + |
| 33 | +## Target Topology |
| 34 | + |
| 35 | +Invert: group first, then route by version inside each substream. |
| 36 | + |
| 37 | +``` |
| 38 | +GroupByRequestKey(host:port:scheme:version, maxSubstreams=256) ← called once |
| 39 | + └─ substream per endpoint (all requests have the same version) |
| 40 | + Partition (by HTTP version) |
| 41 | + ├─ ConnectionFlow<Http10Engine> |
| 42 | + ├─ ConnectionFlow<Http11Engine> |
| 43 | + ├─ ConnectionFlow<Http20Engine> |
| 44 | + └─ ConnectionFlow<Http30Engine> |
| 45 | + Merge |
| 46 | +MergeSubstreams |
| 47 | +``` |
| 48 | + |
| 49 | +Because `Version` is part of the `RequestEndpoint` key, every substream carries requests of exactly one HTTP version. The inner Partition always routes to a single branch — it is explicit rather than clever. |
| 50 | + |
| 51 | +## Design Decisions |
| 52 | + |
| 53 | +### Version stays in RequestEndpoint key |
| 54 | + |
| 55 | +`RequestEndpoint = (host, port, scheme, version)` is unchanged. Removing version from the key would be a semantic change: it would collapse HTTP/1.1 and HTTP/2 connections to the same host into one substream, which introduces mixed-version connection management complexity. The structural refactor is sufficient without changing semantics. |
| 56 | + |
| 57 | +### Single maxSubstreams = 256 |
| 58 | + |
| 59 | +Previously each HTTP version had its own GroupByRequestKey with a separate limit: |
| 60 | + |
| 61 | +| Version | Old limit | |
| 62 | +|---------|-----------| |
| 63 | +| HTTP/1.0 | 256 | |
| 64 | +| HTTP/1.1 | 256 | |
| 65 | +| HTTP/2 | 64 | |
| 66 | +| HTTP/3 | 64 | |
| 67 | + |
| 68 | +With one GroupByRequestKey the limit is shared across all versions. `256` is used as the default — it matches the existing HTTP/1.x ceiling and is a reasonable upper bound for distinct endpoints. Because version is in the key, an HTTP/2 + HTTP/1.1 dual-stack host counts as two substreams, preserving relative separation. |
| 69 | + |
| 70 | +## Files |
| 71 | + |
| 72 | +| Action | File | |
| 73 | +|--------|------| |
| 74 | +| **Delete** | `src/TurboHttp/Streams/ProtocolCoreGraphBuilder.cs` | |
| 75 | +| **Rewrite** | `src/TurboHttp/Streams/Engine.cs` | |
| 76 | +| Keep | `src/TurboHttp/Internal/RequestEndpoint.cs` | |
| 77 | +| Keep | `src/TurboHttp/Streams/Stages/Internal/GroupByRequestKeyStage.cs` | |
| 78 | +| Keep | `src/TurboHttp/Streams/Stages/Internal/HostKeyGroupByExtensions.cs` | |
| 79 | +| Keep | `src/TurboHttp.StreamTests/Streams/10_EngineVersionRoutingTests.cs` | |
| 80 | + |
| 81 | +## Implementation Sketch |
| 82 | + |
| 83 | +### `Engine.cs` changes |
| 84 | + |
| 85 | +Replace the `ProtocolCoreGraphBuilder.Build(...)` call in `BuildExtendedPipeline` with a call to a new private `BuildProtocolCore` method: |
| 86 | + |
| 87 | +```csharp |
| 88 | +private static IGraph<FlowShape<HttpRequestMessage, HttpResponseMessage>, NotUsed> |
| 89 | + BuildProtocolCore( |
| 90 | + ConnectionPool pool, |
| 91 | + TurboClientOptions clientOptions, |
| 92 | + Func<Flow<IOutputItem, IInputItem, NotUsed>>? http10Factory, |
| 93 | + Func<Flow<IOutputItem, IInputItem, NotUsed>>? http11Factory, |
| 94 | + Func<Flow<IOutputItem, IInputItem, NotUsed>>? http20Factory, |
| 95 | + Func<Flow<IOutputItem, IInputItem, NotUsed>>? http30Factory) |
| 96 | +{ |
| 97 | + var http10 = BuildConnectionFlow<Http10Engine>(pool, http10Factory, clientOptions); |
| 98 | + var http11 = BuildConnectionFlow<Http11Engine>(pool, http11Factory, clientOptions); |
| 99 | + var http20 = BuildConnectionFlow<Http20Engine>(pool, http20Factory, clientOptions); |
| 100 | + var http30 = BuildConnectionFlow<Http30Engine>(pool, http30Factory, clientOptions); |
| 101 | + |
| 102 | + var versionRouter = BuildVersionRouter(http10, http11, http20, http30); |
| 103 | + var highThroughputBuffer = Attributes.CreateInputBuffer(16, 64); |
| 104 | + |
| 105 | + return (Flow<HttpRequestMessage, HttpResponseMessage, NotUsed>) |
| 106 | + Flow.Create<HttpRequestMessage>() |
| 107 | + .GroupByRequestKey(RequestEndpoint.FromRequest, maxSubstreams: 256) |
| 108 | + .ViaSubFlow(versionRouter) |
| 109 | + .MergeSubstreams() |
| 110 | + .WithAttributes(highThroughputBuffer); |
| 111 | +} |
| 112 | + |
| 113 | +private static IGraph<FlowShape<HttpRequestMessage, HttpResponseMessage>, NotUsed> |
| 114 | + BuildVersionRouter(/* four ConnectionFlow graphs */) |
| 115 | +{ |
| 116 | + return GraphDsl.Create(b => |
| 117 | + { |
| 118 | + var partition = b.Add(new Partition<HttpRequestMessage>(4, msg |
| 119 | + => msg.Version switch |
| 120 | + { |
| 121 | + { Major: 3, Minor: 0 } => 3, |
| 122 | + { Major: 2, Minor: 0 } => 2, |
| 123 | + { Major: 1, Minor: 1 } => 1, |
| 124 | + { Major: 1, Minor: 0 } => 0, |
| 125 | + _ => throw new ArgumentOutOfRangeException(...) |
| 126 | + })); |
| 127 | + |
| 128 | + var merge = b.Add(new Merge<HttpResponseMessage>(4)); |
| 129 | + |
| 130 | + b.From(partition.Out(0)).Via(b.Add(http10)).To(merge); |
| 131 | + b.From(partition.Out(1)).Via(b.Add(http11)).To(merge); |
| 132 | + b.From(partition.Out(2)).Via(b.Add(http20)).To(merge); |
| 133 | + b.From(partition.Out(3)).Via(b.Add(http30)).To(merge); |
| 134 | + |
| 135 | + return new FlowShape<HttpRequestMessage, HttpResponseMessage>(partition.In, merge.Out); |
| 136 | + }); |
| 137 | +} |
| 138 | +``` |
| 139 | + |
| 140 | +`BuildConnectionFlow<TEngine>` moves from `ProtocolCoreGraphBuilder` into `Engine` unchanged. |
| 141 | + |
| 142 | +## Verification |
| 143 | + |
| 144 | +```bash |
| 145 | +dotnet build --configuration Release ./src/TurboHttp.sln |
| 146 | + |
| 147 | +dotnet test ./src/TurboHttp.StreamTests/TurboHttp.StreamTests.csproj \ |
| 148 | + -- --filter-class "TurboHttp.StreamTests.Streams.EngineVersionRoutingTests" |
| 149 | + |
| 150 | +dotnet test ./src/TurboHttp.sln |
| 151 | +``` |
| 152 | + |
| 153 | +## See Also |
| 154 | + |
| 155 | +- [[Architecture/Design/01-LAYERED_ARCHITECTURE|Layered Architecture]] — pipeline layer overview |
| 156 | +- [[Architecture/Design/02-STAGE_PATTERNS|Stage Patterns]] — GraphStage conventions |
0 commit comments