@@ -97,7 +97,8 @@ private static Flow<HttpRequestMessage, HttpResponseMessage, NotUsed> BuildExten
9797 Func < Flow < IOutputItem , IInputItem , NotUsed > > ? http30Factory = null )
9898 {
9999 // Protocol engine core (endpoint grouping + version routing + encode/decode).
100- var engineFlow = BuildProtocolCore ( transportStageFactory , options , http10Factory , http11Factory , http20Factory , http30Factory ) ;
100+ var engineFlow = BuildProtocolCore ( transportStageFactory , options , http10Factory , http11Factory , http20Factory ,
101+ http30Factory ) ;
101102
102103 // Build feature BidiFlow chain via conditional Atop stacking.
103104 // Build from innermost to outermost so that each new layer wraps the previous.
@@ -189,27 +190,51 @@ private static Flow<HttpRequestMessage, HttpResponseMessage, NotUsed> BuildProto
189190 {
190191 var highThroughputBuffer = Attributes . CreateInputBuffer ( 32 , 128 ) ;
191192
193+ var versionDispatch = Flow . FromGraph ( new VersionDispatchStage ( CreateFlowForVersion ) )
194+ . WithAttributes ( highThroughputBuffer ) ;
195+
196+ var maxConnsH1 = clientOptions . MaxH1ConnectionsPerServer ;
197+ var maxConnsH2 = clientOptions . MaxH2ConnectionsPerServer ;
198+
199+ var maxConcurrentH2Streams = clientOptions . MaxH2ConcurrentStreams ;
200+
201+ var core = ( Flow < HttpRequestMessage , HttpResponseMessage , NotUsed > )
202+ Flow . Create < HttpRequestMessage > ( )
203+ . GroupByRequestEndpoint ( RequestEndpoint . FromRequest , maxSubstreams : clientOptions . MaxEndpointSubstreams ,
204+ maxSubstreamsPerKey : MaxSubstreamsPerKey ,
205+ maxConcurrencyPerSlot : MaxConcurrencyPerSlot )
206+ . ViaSubFlow ( versionDispatch )
207+ . MergeSubstreams ( ) ;
208+
209+ return core . WithAttributes ( highThroughputBuffer ) ;
210+
192211 Flow < IOutputItem , IInputItem , NotUsed > QuicTransport ( )
193212 => Flow . FromGraph ( new QuicConnectionStage ( ) ) ;
194213
214+ int MaxConcurrencyPerSlot ( RequestEndpoint endpoint )
215+ => endpoint . Version . Major >= 2 ? maxConcurrentH2Streams : 1 ;
216+
217+ int MaxSubstreamsPerKey ( RequestEndpoint endpoint )
218+ => endpoint . Version . Major >= 2 ? maxConnsH2 : maxConnsH1 ;
219+
195220 // Version-specific flow factory — called once per substream on first element.
196221 // Since GroupByRequestEndpoint already groups by version, each substream
197222 // contains a single version — no Partition/Merge needed.
198223 Flow < HttpRequestMessage , HttpResponseMessage , NotUsed > CreateFlowForVersion ( Version version )
199224 {
200- // Transport factories that create the correct stage with clientOptions for auto-connect.
201- Flow < IOutputItem , IInputItem , NotUsed > TcpTransportWithOptions ( )
202- => transportStageFactory ( ) ;
203- Flow < IOutputItem , IInputItem , NotUsed > QuicTransportStage ( )
204- => QuicTransport ( ) ;
205-
206225 var ( engineFactory , transport , testTransport ) = version switch
207226 {
208- { Major : 1 , Minor : 0 } => ( ( Func < IHttpProtocolEngine > ) ( ( ) => new Http10Engine ( clientOptions . MaxPipelineDepth ) ) , ( Func < Flow < IOutputItem , IInputItem , NotUsed > > ) TcpTransportWithOptions , http10Factory ) ,
209- { Major : 1 , Minor : 1 } => ( ( ) => new Http11Engine ( clientOptions . MaxPipelineDepth ) , ( Func < Flow < IOutputItem , IInputItem , NotUsed > > ) TcpTransportWithOptions , http11Factory ) ,
210- { Major : 2 , Minor : 0 } => ( ( Func < IHttpProtocolEngine > ) ( ( ) => new Http20Engine ( ) ) , ( Func < Flow < IOutputItem , IInputItem , NotUsed > > ) TcpTransportWithOptions , http20Factory ) ,
211- { Major : 3 , Minor : 0 } => ( ( Func < IHttpProtocolEngine > ) ( ( ) => new Http30Engine ( ) ) , ( Func < Flow < IOutputItem , IInputItem , NotUsed > > ) QuicTransportStage , http30Factory ) ,
212- _ => throw new ArgumentOutOfRangeException ( nameof ( version ) , version , $ "Unsupported HTTP version: { version } ")
227+ { Major : 1 , Minor : 0 } => (
228+ ( Func < IHttpProtocolEngine > ) ( ( ) => new Http10Engine ( clientOptions . MaxPipelineDepth ) ) ,
229+ ( Func < Flow < IOutputItem , IInputItem , NotUsed > > ) TcpTransportWithOptions , http10Factory ) ,
230+ { Major : 1 , Minor : 1 } => ( ( ) => new Http11Engine ( clientOptions . MaxPipelineDepth ) ,
231+ ( Func < Flow < IOutputItem , IInputItem , NotUsed > > ) TcpTransportWithOptions , http11Factory ) ,
232+ { Major : 2 , Minor : 0 } => ( ( Func < IHttpProtocolEngine > ) ( ( ) => new Http20Engine ( ) ) ,
233+ ( Func < Flow < IOutputItem , IInputItem , NotUsed > > ) TcpTransportWithOptions , http20Factory ) ,
234+ { Major : 3 , Minor : 0 } => ( ( Func < IHttpProtocolEngine > ) ( ( ) => new Http30Engine ( ) ) ,
235+ ( Func < Flow < IOutputItem , IInputItem , NotUsed > > ) QuicTransportStage , http30Factory ) ,
236+ _ => throw new ArgumentOutOfRangeException ( nameof ( version ) , version ,
237+ $ "Unsupported HTTP version: { version } ")
213238 } ;
214239
215240 var engine = engineFactory ( ) ;
@@ -220,29 +245,14 @@ Flow<IOutputItem, IInputItem, NotUsed> QuicTransportStage()
220245 }
221246
222247 return BuildConnectionFlow ( transport ( ) , engine ) ;
223- }
224-
225- var versionDispatch = Flow . FromGraph ( new VersionDispatchStage ( CreateFlowForVersion ) )
226- . WithAttributes ( highThroughputBuffer ) ;
227-
228- var maxConnsH1 = clientOptions . MaxH1ConnectionsPerServer ;
229- var maxConnsH2 = clientOptions . MaxH2ConnectionsPerServer ;
230- int MaxSubstreamsPerKey ( RequestEndpoint endpoint )
231- => endpoint . Version . Major >= 2 ? maxConnsH2 : maxConnsH1 ;
232-
233- var maxConcurrentH2Streams = clientOptions . MaxH2ConcurrentStreams ;
234- int MaxConcurrencyPerSlot ( RequestEndpoint endpoint )
235- => endpoint . Version . Major >= 2 ? maxConcurrentH2Streams : 1 ;
236248
237- var core = ( Flow < HttpRequestMessage , HttpResponseMessage , NotUsed > )
238- Flow . Create < HttpRequestMessage > ( )
239- . GroupByRequestEndpoint ( RequestEndpoint . FromRequest , maxSubstreams : clientOptions . MaxEndpointSubstreams ,
240- maxSubstreamsPerKey : MaxSubstreamsPerKey ,
241- maxConcurrencyPerSlot : MaxConcurrencyPerSlot )
242- . ViaSubFlow ( versionDispatch )
243- . MergeSubstreams ( ) ;
249+ // Transport factories that create the correct stage with clientOptions for auto-connect.
250+ Flow < IOutputItem , IInputItem , NotUsed > TcpTransportWithOptions ( )
251+ => transportStageFactory ( ) ;
244252
245- return core . WithAttributes ( highThroughputBuffer ) ;
253+ Flow < IOutputItem , IInputItem , NotUsed > QuicTransportStage ( )
254+ => QuicTransport ( ) ;
255+ }
246256 }
247257
248258 /// <summary>
0 commit comments