@@ -360,20 +360,44 @@ public void registerHttp2Connection(Object partitionKey, Channel channel) {
360360 if (state != null ) {
361361 state .setPartitionKey (partitionKey );
362362 }
363- http2Connections .put (partitionKey , channel );
363+ // Coalesce connections opened concurrently for the same partition (thundering herd): keep the
364+ // first live one canonical in the registry. A connection that lost the race is marked redundant,
365+ // so it serves only its own opening request and then closes (its stream's closeFuture listener
366+ // closes the parent once activeStreams hits 0) instead of lingering open and unregistered. A dead
367+ // registered entry is replaced.
368+ Channel existing = http2Connections .putIfAbsent (partitionKey , channel );
369+ if (existing != null && existing != channel ) {
370+ boolean replacedDead = !existing .isActive () && http2Connections .replace (partitionKey , existing , channel );
371+ if (!replacedDead && state != null ) {
372+ state .markRedundant ();
373+ }
374+ }
364375 // When the connection closes, remove it from the registry AND fail any requests still queued
365376 // for a stream slot. Without the latter, requests sitting in pendingOpeners when the parent
366377 // connection drops have no stream channel (so no channelInactive is ever delivered for them)
367378 // and would survive only until the request timeout fires — the silent-timeout bug of #2160.
368379 channel .closeFuture ().addListener (future -> {
369380 removeHttp2Connection (partitionKey , channel );
370381 if (state != null ) {
371- state .failPendingOpeners (orphan ->
372- orphan . abort ( new IOException ( "HTTP/2 connection closed before a stream could be opened" ) ));
382+ state .failPendingOpeners (orphan -> failOrphanedH2Opener ( orphan ,
383+ "HTTP/2 connection closed before a stream could be opened" ));
373384 }
374385 });
375386 }
376387
388+ /**
389+ * Fails a request that was queued in {@link Http2ConnectionState} waiting for a stream slot but can
390+ * never get one (the connection dropped or started draining). Releases its request body first —
391+ * {@code sendHttp2Frames} never ran for it, so nothing else will — to avoid leaking the body ByteBuf.
392+ * {@code NettyRequest.release()} is idempotent, so this is safe even if another path also releases.
393+ */
394+ private static void failOrphanedH2Opener (NettyResponseFuture <?> orphan , String message ) {
395+ if (orphan .getNettyRequest () != null ) {
396+ orphan .getNettyRequest ().release ();
397+ }
398+ orphan .abort (new IOException (message ));
399+ }
400+
377401 /**
378402 * Removes an HTTP/2 connection from the registry, but only if it's the currently registered
379403 * connection for that partition key (avoids removing a replacement connection).
@@ -466,8 +490,8 @@ private HttpClientCodec newHttpClientCodec() {
466490 config .getHttpClientCodecInitialBufferSize ());
467491 }
468492
469- private SslHandler createSslHandler (String peerHost , int peerPort ) {
470- SSLEngine sslEngine = sslEngineFactory .newSslEngine (config , peerHost , peerPort );
493+ private SslHandler createSslHandler (String peerHost , int peerPort , boolean http2Allowed ) {
494+ SSLEngine sslEngine = sslEngineFactory .newSslEngine (config , peerHost , peerPort , http2Allowed );
471495 SslHandler sslHandler = new SslHandler (sslEngine );
472496 if (handshakeTimeout > 0 ) {
473497 sslHandler .setHandshakeTimeoutMillis (handshakeTimeout );
@@ -489,7 +513,7 @@ public Future<Channel> updatePipelineForHttpTunneling(ChannelPipeline pipeline,
489513 // Remove existing SSL handler (for proxy) and replace with SSL handler for target
490514 pipeline .remove (SSL_HANDLER );
491515 }
492- SslHandler sslHandler = createSslHandler (requestUri .getHost (), requestUri .getExplicitPort ());
516+ SslHandler sslHandler = createSslHandler (requestUri .getHost (), requestUri .getExplicitPort (), ! requestUri . isWebSocket () );
493517 whenHandshaked = sslHandler .handshakeFuture ();
494518 pipeline .addBefore (INFLATER_HANDLER , SSL_HANDLER , sslHandler );
495519 pipeline .addAfter (SSL_HANDLER , HTTP_CLIENT_CODEC , newHttpClientCodec ());
@@ -527,9 +551,9 @@ public Future<Channel> updatePipelineForHttpsTunneling(ChannelPipeline pipeline,
527551 // The proxy SSL handler should remain as it provides the tunnel transport
528552 // We need to add target SSL handler that will negotiate with the target through the tunnel
529553
530- SslHandler sslHandler = createSslHandler (requestUri .getHost (), requestUri .getExplicitPort ());
554+ SslHandler sslHandler = createSslHandler (requestUri .getHost (), requestUri .getExplicitPort (), ! requestUri . isWebSocket () );
531555 whenHandshaked = sslHandler .handshakeFuture ();
532-
556+
533557 // For HTTPS proxy tunnel, add target SSL handler after the existing proxy SSL handler
534558 // This creates a nested SSL setup: Target SSL -> Proxy SSL -> Network
535559 if (isSslHandlerConfigured (pipeline )) {
@@ -580,7 +604,8 @@ public SslHandler addSslHandler(ChannelPipeline pipeline, Uri uri, String virtua
580604 peerPort = uri .getExplicitPort ();
581605 }
582606
583- SslHandler sslHandler = createSslHandler (peerHost , peerPort );
607+ // A WebSocket connection must not negotiate h2 (no RFC 8441 support), so advertise only http/1.1 in ALPN.
608+ SslHandler sslHandler = createSslHandler (peerHost , peerPort , !uri .isWebSocket ());
584609 // Check if SOCKS handler actually exists in the pipeline before trying to add after it
585610 if (hasSocksProxyHandler && pipeline .get (SOCKS_HANDLER ) != null ) {
586611 pipeline .addAfter (SOCKS_HANDLER , SSL_HANDLER , sslHandler );
@@ -710,7 +735,11 @@ public void upgradePipelineToHttp2(ChannelPipeline pipeline) {
710735 .initialWindowSize (config .getHttp2InitialWindowSize ())
711736 .maxFrameSize (config .getHttp2MaxFrameSize ())
712737 .headerTableSize (config .getHttp2HeaderTableSize ())
713- .maxHeaderListSize (config .getHttp2MaxHeaderListSize ());
738+ .maxHeaderListSize (config .getHttp2MaxHeaderListSize ())
739+ // RFC 9113 §8.4: AsyncHttpClient never consumes server push, so advertise ENABLE_PUSH=0.
740+ // A conformant server then never opens push streams; without this the client relies on
741+ // Netty's default and a pushing server could trip a connection-level PROTOCOL_ERROR.
742+ .pushEnabled (false );
714743
715744 Http2FrameCodec frameCodec = Http2FrameCodecBuilder .forClient ()
716745 .initialSettings (settings )
@@ -734,7 +763,9 @@ protected void initChannel(Channel ch) {
734763 Http2ConnectionState state = new Http2ConnectionState ();
735764 int configMaxStreams = config .getHttp2MaxConcurrentStreams ();
736765 if (configMaxStreams > 0 ) {
737- state .updateMaxConcurrentStreams (configMaxStreams );
766+ // Client's own cap; the server-advertised value (applied by the http2-settings-listener below)
767+ // can only lower the effective limit, never raise it above this.
768+ state .setClientMaxConcurrentStreams (configMaxStreams );
738769 }
739770 pipeline .channel ().attr (Http2ConnectionState .HTTP2_STATE_KEY ).set (state );
740771
@@ -772,6 +803,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
772803 if (pk != null ) {
773804 removeHttp2Connection (pk , ctx .channel ());
774805 }
806+ // Fail requests still queued for a stream slot: a draining connection accepts no
807+ // new streams, so they can never be opened here and would otherwise wait until the
808+ // connection finally closes. Fail them now so they retry on a fresh connection (the
809+ // registry no longer offers this one). Already-open streams below lastStreamId are
810+ // untouched and complete normally. #12
811+ connState .failPendingOpeners (orphan -> failOrphanedH2Opener (orphan ,
812+ "HTTP/2 connection received GOAWAY; request must retry on a new connection" ));
775813 }
776814 LOGGER .debug ("HTTP/2 GOAWAY received on {}, lastStreamId={}, errorCode={}" ,
777815 ctx .channel (), lastStreamId , goAwayFrame .errorCode ());
0 commit comments