Skip to content

Commit bffcc6f

Browse files
committed
Add HTTP/2 connection multiplexing with dedicated registry, semaphore bypass, and GOAWAY handling
1 parent a8aafb2 commit bffcc6f

File tree

10 files changed

+411
-144
lines changed

10 files changed

+411
-144
lines changed

client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,18 +282,39 @@ public interface AsyncHttpClientConfig {
282282
*/
283283
boolean isHttp2Enabled();
284284

285+
/**
286+
* @return the HTTP/2 initial window size in bytes, defaults to 65535
287+
*/
285288
int getHttp2InitialWindowSize();
286289

290+
/**
291+
* @return the HTTP/2 max frame size in bytes, must be between 16384 and 16777215 per RFC 7540 §4.2
292+
*/
287293
int getHttp2MaxFrameSize();
288294

295+
/**
296+
* @return the HTTP/2 HPACK header table size in bytes, defaults to 4096
297+
*/
289298
int getHttp2HeaderTableSize();
290299

300+
/**
301+
* @return the HTTP/2 max header list size in bytes, defaults to 8192
302+
*/
291303
int getHttp2MaxHeaderListSize();
292304

305+
/**
306+
* @return the HTTP/2 max concurrent streams per connection, -1 means unlimited (server-controlled)
307+
*/
293308
int getHttp2MaxConcurrentStreams();
294309

310+
/**
311+
* @return the interval between HTTP/2 PING keepalive frames, {@link Duration#ZERO} disables pinging
312+
*/
295313
Duration getHttp2PingInterval();
296314

315+
/**
316+
* @return true if cleartext HTTP/2 (h2c) via prior knowledge is enabled for non-TLS connections
317+
*/
297318
boolean isHttp2CleartextEnabled();
298319

299320
/**

client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,14 @@ private DefaultAsyncHttpClientConfig(// http
413413
throw new IllegalArgumentException("Native Transport must be enabled to use Epoll Native Transport only");
414414
}
415415

416+
if (http2MaxFrameSize < 16384 || http2MaxFrameSize > 16777215) {
417+
throw new IllegalArgumentException("HTTP/2 max frame size must be between 16384 and 16777215 per RFC 7540 §4.2");
418+
}
419+
420+
if (http2InitialWindowSize < 0) {
421+
throw new IllegalArgumentException("HTTP/2 initial window size must be non-negative");
422+
}
423+
416424
this.allocator = allocator;
417425
this.nettyTimer = nettyTimer;
418426
this.threadFactory = threadFactory;

client/src/main/java/org/asynchttpclient/HttpProtocol.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public enum HttpProtocol {
2222

2323
HTTP_1_0("HTTP/1.0"),
2424
HTTP_1_1("HTTP/1.1"),
25-
HTTP_2("HTTP/2.0");
25+
HTTP_2("HTTP/2");
2626

2727
private final String text;
2828

@@ -31,7 +31,7 @@ public enum HttpProtocol {
3131
}
3232

3333
/**
34-
* @return the protocol version string (e.g. "HTTP/1.1", "HTTP/2.0")
34+
* @return the protocol version string (e.g. "HTTP/1.1", "HTTP/2")
3535
*/
3636
public String getText() {
3737
return text;

client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java

Lines changed: 90 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.netty.channel.Channel;
2121
import io.netty.channel.ChannelFactory;
2222
import io.netty.channel.ChannelHandler;
23+
import io.netty.channel.ChannelHandlerContext;
2324
import io.netty.channel.ChannelInboundHandlerAdapter;
2425
import io.netty.channel.ChannelInitializer;
2526
import io.netty.channel.ChannelOption;
@@ -41,16 +42,17 @@
4142
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
4243
import io.netty.handler.codec.http2.Http2MultiplexHandler;
4344
import io.netty.handler.codec.http2.Http2Settings;
45+
import io.netty.handler.codec.http2.Http2GoAwayFrame;
4446
import io.netty.handler.codec.http2.Http2SettingsFrame;
4547
import io.netty.handler.codec.http2.Http2StreamChannel;
4648
import io.netty.handler.logging.LogLevel;
4749
import io.netty.handler.logging.LoggingHandler;
48-
import io.netty.handler.timeout.IdleStateHandler;
4950
import io.netty.handler.proxy.ProxyHandler;
5051
import io.netty.handler.proxy.Socks4ProxyHandler;
5152
import io.netty.handler.proxy.Socks5ProxyHandler;
5253
import io.netty.handler.ssl.SslHandler;
5354
import io.netty.handler.stream.ChunkedWriteHandler;
55+
import io.netty.handler.timeout.IdleStateHandler;
5456
import io.netty.resolver.NameResolver;
5557
import io.netty.util.Timer;
5658
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -89,6 +91,7 @@
8991
import java.net.InetSocketAddress;
9092
import java.util.Map;
9193
import java.util.Map.Entry;
94+
import java.util.concurrent.ConcurrentHashMap;
9295
import java.util.concurrent.ThreadFactory;
9396
import java.util.concurrent.TimeUnit;
9497
import java.util.function.Function;
@@ -122,6 +125,7 @@ public class ChannelManager {
122125

123126
private final ChannelPool channelPool;
124127
private final ChannelGroup openChannels;
128+
private final ConcurrentHashMap<Object, Channel> http2Connections = new ConcurrentHashMap<>();
125129

126130
private AsyncHttpClientHandler wsHandler;
127131
private Http2Handler http2Handler;
@@ -338,6 +342,59 @@ public final void tryToOfferChannelToPool(Channel channel, AsyncHandler<?> async
338342
}
339343
}
340344

345+
/**
346+
* Registers an HTTP/2 connection in the registry for the given partition key.
347+
* The connection stays in the registry (not the regular pool) to allow multiplexing —
348+
* multiple requests can share the same connection concurrently.
349+
*/
350+
public void registerHttp2Connection(Object partitionKey, Channel channel) {
351+
Http2ConnectionState state = channel.attr(Http2ConnectionState.HTTP2_STATE_KEY).get();
352+
if (state != null) {
353+
state.setPartitionKey(partitionKey);
354+
}
355+
http2Connections.put(partitionKey, channel);
356+
// Auto-remove from registry when the connection closes
357+
channel.closeFuture().addListener(future -> removeHttp2Connection(partitionKey, channel));
358+
}
359+
360+
/**
361+
* Removes an HTTP/2 connection from the registry, but only if it's the currently registered
362+
* connection for that partition key (avoids removing a replacement connection).
363+
*/
364+
public void removeHttp2Connection(Object partitionKey, Channel channel) {
365+
http2Connections.remove(partitionKey, channel);
366+
}
367+
368+
/**
369+
* Returns an active, non-draining HTTP/2 connection for the given partition key, or {@code null}.
370+
* Unlike the regular pool, this does NOT remove the connection — it remains available for
371+
* concurrent multiplexed requests.
372+
*/
373+
public Channel pollHttp2Connection(Object partitionKey) {
374+
Channel channel = http2Connections.get(partitionKey);
375+
if (channel == null) {
376+
return null;
377+
}
378+
if (!channel.isActive()) {
379+
http2Connections.remove(partitionKey, channel);
380+
return null;
381+
}
382+
Http2ConnectionState state = channel.attr(Http2ConnectionState.HTTP2_STATE_KEY).get();
383+
if (state != null && state.isDraining()) {
384+
return null;
385+
}
386+
return channel;
387+
}
388+
389+
/**
390+
* Polls for an HTTP/2 connection by URI/virtualHost/proxy, using the same partition key logic
391+
* as the regular pool. Returns the connection without removing it from the registry.
392+
*/
393+
public Channel pollHttp2(Uri uri, String virtualHost, ProxyServer proxy, ChannelPoolPartitioning connectionPoolPartitioning) {
394+
Object partitionKey = connectionPoolPartitioning.getPartitionKey(uri, virtualHost, proxy);
395+
return pollHttp2Connection(partitionKey);
396+
}
397+
341398
public Channel poll(Uri uri, String virtualHost, ProxyServer proxy, ChannelPoolPartitioning connectionPoolPartitioning) {
342399
Object partitionKey = connectionPoolPartitioning.getPartitionKey(uri, virtualHost, proxy);
343400
return channelPool.poll(partitionKey);
@@ -348,6 +405,7 @@ public void removeAll(Channel connection) {
348405
}
349406

350407
private void doClose() {
408+
http2Connections.clear();
351409
ChannelGroupFuture groupFuture = openChannels.close();
352410
channelPool.destroy();
353411
groupFuture.addListener(future -> sslEngineFactory.destroy());
@@ -644,7 +702,7 @@ protected void initChannel(Channel ch) {
644702
// Install SETTINGS listener to update MAX_CONCURRENT_STREAMS from server
645703
pipeline.addLast("http2-settings-listener", new ChannelInboundHandlerAdapter() {
646704
@Override
647-
public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg) throws Exception {
705+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
648706
if (msg instanceof Http2SettingsFrame) {
649707
Http2SettingsFrame settingsFrame = (Http2SettingsFrame) msg;
650708
Long maxStreams = settingsFrame.settings().maxConcurrentStreams();
@@ -659,10 +717,38 @@ public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg)
659717
}
660718
});
661719

720+
// Install GOAWAY handler on the parent channel to mark the connection as draining
721+
// and remove it from the HTTP/2 registry. GOAWAY is a connection-level frame that
722+
// arrives on the parent channel, not on stream child channels.
723+
pipeline.addLast("http2-goaway-listener", new ChannelInboundHandlerAdapter() {
724+
@Override
725+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
726+
if (msg instanceof Http2GoAwayFrame) {
727+
Http2GoAwayFrame goAwayFrame = (Http2GoAwayFrame) msg;
728+
int lastStreamId = goAwayFrame.lastStreamId();
729+
Http2ConnectionState connState = ctx.channel().attr(Http2ConnectionState.HTTP2_STATE_KEY).get();
730+
if (connState != null) {
731+
connState.setDraining(lastStreamId);
732+
Object pk = connState.getPartitionKey();
733+
if (pk != null) {
734+
removeHttp2Connection(pk, ctx.channel());
735+
}
736+
}
737+
LOGGER.debug("HTTP/2 GOAWAY received on {}, lastStreamId={}, errorCode={}",
738+
ctx.channel(), lastStreamId, goAwayFrame.errorCode());
739+
// Close the connection when no more active streams
740+
if (connState != null && connState.getActiveStreams() <= 0) {
741+
closeChannel(ctx.channel());
742+
}
743+
}
744+
ctx.fireChannelRead(msg);
745+
}
746+
});
747+
662748
// Install PING handler for keepalive if configured
663749
long pingIntervalMs = config.getHttp2PingInterval().toMillis();
664750
if (pingIntervalMs > 0) {
665-
pipeline.addLast("http2-idle-state", new IdleStateHandler(0, 0, pingIntervalMs, java.util.concurrent.TimeUnit.MILLISECONDS));
751+
pipeline.addLast("http2-idle-state", new IdleStateHandler(0, 0, pingIntervalMs, TimeUnit.MILLISECONDS));
666752
pipeline.addLast("http2-ping", new Http2PingHandler());
667753
}
668754
}
@@ -732,4 +818,4 @@ public boolean isOpen() {
732818
public boolean isHttp2CleartextEnabled() {
733819
return config.isHttp2Enabled() && config.isHttp2CleartextEnabled();
734820
}
735-
}
821+
}

client/src/main/java/org/asynchttpclient/netty/channel/Http2ConnectionState.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class Http2ConnectionState {
3535
private final AtomicBoolean draining = new AtomicBoolean(false);
3636
private volatile int lastGoAwayStreamId = Integer.MAX_VALUE;
3737
private final ConcurrentLinkedQueue<Runnable> pendingOpeners = new ConcurrentLinkedQueue<>();
38+
private volatile Object partitionKey;
3839

3940
public boolean tryAcquireStream() {
4041
if (draining.get()) {
@@ -97,4 +98,12 @@ public void setDraining(int lastStreamId) {
9798
public int getLastGoAwayStreamId() {
9899
return lastGoAwayStreamId;
99100
}
101+
102+
public void setPartitionKey(Object partitionKey) {
103+
this.partitionKey = partitionKey;
104+
}
105+
106+
public Object getPartitionKey() {
107+
return partitionKey;
108+
}
100109
}

client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.asynchttpclient.netty.NettyResponseFuture;
2525
import org.asynchttpclient.netty.SimpleFutureListener;
2626
import org.asynchttpclient.netty.future.StackTraceInspector;
27+
import org.asynchttpclient.channel.ChannelPoolPartitioning;
2728
import org.asynchttpclient.netty.request.NettyRequestSender;
2829
import org.asynchttpclient.netty.timeout.TimeoutsHolder;
2930
import org.asynchttpclient.proxy.ProxyServer;
@@ -81,14 +82,9 @@ private void writeRequest(Channel channel) {
8182
}
8283

8384
public void onSuccess(Channel channel, InetSocketAddress remoteAddress) {
84-
if (connectionSemaphore != null) {
85-
// transfer lock from future to channel
86-
Object partitionKeyLock = future.takePartitionKeyLock();
87-
88-
if (partitionKeyLock != null) {
89-
channel.closeFuture().addListener(future -> connectionSemaphore.releaseChannelLock(partitionKeyLock));
90-
}
91-
}
85+
// Take the semaphore lock from the future. For HTTP/1.1, we'll transfer it to channel.closeFuture().
86+
// For HTTP/2, we release it immediately after ALPN negotiation since the connection is multiplexed.
87+
final Object partitionKeyLock = (connectionSemaphore != null) ? future.takePartitionKeyLock() : null;
9288

9389
Channels.setActiveToken(channel);
9490
TimeoutsHolder timeoutsHolder = future.getTimeoutsHolder();
@@ -140,6 +136,7 @@ protected void onSuccess(Channel value) {
140136
return;
141137
}
142138
// After SSL handshake to proxy, continue with normal proxy request
139+
attachSemaphoreToChannelClose(channel, partitionKeyLock);
143140
writeRequest(channel);
144141
}
145142

@@ -190,6 +187,10 @@ protected void onSuccess(Channel value) {
190187
String alpnProtocol = sslHandler.applicationProtocol();
191188
if (ApplicationProtocolNames.HTTP_2.equals(alpnProtocol)) {
192189
channelManager.upgradePipelineToHttp2(channel.pipeline());
190+
registerHttp2AndReleaseSemaphore(channel);
191+
releaseSemaphoreImmediately(partitionKeyLock);
192+
} else {
193+
attachSemaphoreToChannelClose(channel, partitionKeyLock);
193194
}
194195
writeRequest(channel);
195196
}
@@ -211,11 +212,48 @@ protected void onFailure(Throwable cause) {
211212
// h2c (cleartext HTTP/2 prior knowledge): upgrade to HTTP/2 without TLS
212213
if (!uri.isSecured() && channelManager.isHttp2CleartextEnabled()) {
213214
channelManager.upgradePipelineToHttp2(channel.pipeline());
215+
registerHttp2AndReleaseSemaphore(channel);
216+
releaseSemaphoreImmediately(partitionKeyLock);
217+
} else {
218+
attachSemaphoreToChannelClose(channel, partitionKeyLock);
214219
}
215220
writeRequest(channel);
216221
}
217222
}
218223

224+
/**
225+
* Attaches the semaphore lock to the channel's close future (HTTP/1.1 behavior).
226+
* The semaphore slot is released when the connection closes.
227+
*/
228+
private void attachSemaphoreToChannelClose(Channel channel, Object partitionKeyLock) {
229+
if (connectionSemaphore != null && partitionKeyLock != null) {
230+
channel.closeFuture().addListener(f -> connectionSemaphore.releaseChannelLock(partitionKeyLock));
231+
}
232+
}
233+
234+
/**
235+
* Releases the semaphore lock immediately (HTTP/2 behavior).
236+
* HTTP/2 connections are multiplexed, so the semaphore should not be held
237+
* for the lifetime of the connection.
238+
*/
239+
private void releaseSemaphoreImmediately(Object partitionKeyLock) {
240+
if (connectionSemaphore != null && partitionKeyLock != null) {
241+
connectionSemaphore.releaseChannelLock(partitionKeyLock);
242+
}
243+
}
244+
245+
/**
246+
* Registers the HTTP/2 connection in the channel manager's H2 registry.
247+
*/
248+
private void registerHttp2AndReleaseSemaphore(Channel channel) {
249+
Request request = future.getTargetRequest();
250+
Uri uri = request.getUri();
251+
ProxyServer proxy = future.getProxyServer();
252+
ChannelPoolPartitioning partitioning = request.getChannelPoolPartitioning();
253+
Object partitionKey = partitioning.getPartitionKey(uri, request.getVirtualHost(), proxy);
254+
channelManager.registerHttp2Connection(partitionKey, channel);
255+
}
256+
219257
public void onFailure(Channel channel, Throwable cause) {
220258

221259
// beware, channel can be null

0 commit comments

Comments
 (0)