Skip to content

Commit dd6f0bb

Browse files
committed
Fix HTTP-2 Semaphore Leak
1 parent 9ae7681 commit dd6f0bb

File tree

5 files changed

+1620
-18
lines changed

5 files changed

+1620
-18
lines changed

client/src/main/java/org/asynchttpclient/netty/channel/Http2ConnectionState.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class Http2ConnectionState {
3535
private final AtomicBoolean draining = new AtomicBoolean(false);
3636
private volatile int lastGoAwayStreamId = Integer.MAX_VALUE;
3737
private final ConcurrentLinkedQueue<Runnable> pendingOpeners = new ConcurrentLinkedQueue<>();
38+
private final Object pendingLock = new Object();
3839
private volatile Object partitionKey;
3940

4041
public boolean tryAcquireStream() {
@@ -54,22 +55,29 @@ public boolean tryAcquireStream() {
5455

5556
public void releaseStream() {
5657
activeStreams.decrementAndGet();
57-
// Try to dequeue and run a pending opener
58-
Runnable pending = pendingOpeners.poll();
59-
if (pending != null) {
60-
pending.run();
61-
}
58+
drainPendingOpeners();
6259
}
6360

6461
public void addPendingOpener(Runnable opener) {
65-
pendingOpeners.add(opener);
66-
// Re-check in case a stream was released between the failed tryAcquire and this enqueue
67-
if (tryAcquireStream()) {
68-
Runnable dequeued = pendingOpeners.poll();
69-
if (dequeued != null) {
70-
dequeued.run();
62+
synchronized (pendingLock) {
63+
if (tryAcquireStream()) {
64+
opener.run();
7165
} else {
72-
releaseStream();
66+
pendingOpeners.add(opener);
67+
}
68+
}
69+
}
70+
71+
private void drainPendingOpeners() {
72+
synchronized (pendingLock) {
73+
Runnable pending = pendingOpeners.poll();
74+
if (pending != null) {
75+
if (tryAcquireStream()) {
76+
pending.run();
77+
} else {
78+
// Put it back — another releaseStream() will pick it up
79+
pendingOpeners.offer(pending);
80+
}
7381
}
7482
}
7583
}

client/src/main/java/org/asynchttpclient/netty/handler/Http2Handler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,9 +302,16 @@ private void readFailed(Channel channel, NettyResponseFuture<?> future, Throwabl
302302

303303
@Override
304304
public void handleException(NettyResponseFuture<?> future, Throwable error) {
305+
if (!future.isDone()) {
306+
readFailed(future.channel(), future, error);
307+
}
305308
}
306309

307310
@Override
308311
public void handleChannelInactive(NettyResponseFuture<?> future) {
312+
if (!future.isDone()) {
313+
readFailed(future.channel(), future,
314+
new IOException("HTTP/2 stream channel closed unexpectedly"));
315+
}
309316
}
310317
}

client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,12 @@ private <T> ListenableFuture<T> sendRequestWithOpenChannel(NettyResponseFuture<T
276276

277277
// channelInactive might be called between isChannelValid and writeRequest
278278
// so if we don't store the Future now, channelInactive won't perform
279-
// handleUnexpectedClosedChannel
280-
Channels.setAttribute(channel, future);
279+
// handleUnexpectedClosedChannel.
280+
// For HTTP/2 parent channels, skip this — each stream channel gets its own attribute
281+
// in openHttp2Stream(), and setting it on the parent would corrupt multiplexed state.
282+
if (!ChannelManager.isHttp2(channel)) {
283+
Channels.setAttribute(channel, future);
284+
}
281285

282286
if (Channels.isChannelActive(channel)) {
283287
writeRequest(future, channel);
@@ -485,7 +489,6 @@ public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
485489
*/
486490
private <T> void writeHttp2Request(NettyResponseFuture<T> future, Channel parentChannel) {
487491
Http2ConnectionState state = parentChannel.attr(Http2ConnectionState.HTTP2_STATE_KEY).get();
488-
Runnable openStream = () -> openHttp2Stream(future, parentChannel);
489492

490493
if (state != null && !state.tryAcquireStream()) {
491494
if (state.isDraining()) {
@@ -495,13 +498,13 @@ private <T> void writeHttp2Request(NettyResponseFuture<T> future, Channel parent
495498
return;
496499
}
497500
// Queue for later when a stream slot opens up
498-
state.addPendingOpener(openStream);
501+
state.addPendingOpener(() -> openHttp2Stream(future, parentChannel, state));
499502
return;
500503
}
501-
openStream.run();
504+
openHttp2Stream(future, parentChannel, state);
502505
}
503506

504-
private <T> void openHttp2Stream(NettyResponseFuture<T> future, Channel parentChannel) {
507+
private <T> void openHttp2Stream(NettyResponseFuture<T> future, Channel parentChannel, Http2ConnectionState state) {
505508
new Http2StreamChannelBootstrap(parentChannel)
506509
.handler(new ChannelInitializer<Http2StreamChannel>() {
507510
@Override
@@ -519,6 +522,7 @@ protected void initChannel(Http2StreamChannel streamCh) {
519522
Http2StreamChannel streamChannel = f.getNow();
520523
channelManager.registerOpenChannel(streamChannel);
521524
Channels.setAttribute(streamChannel, future);
525+
Channels.setActiveToken(streamChannel);
522526
future.attachChannel(streamChannel, false);
523527
try {
524528
AsyncHandler<T> asyncHandler = future.getAsyncHandler();
@@ -541,6 +545,10 @@ protected void initChannel(Http2StreamChannel streamCh) {
541545
abort(streamChannel, future, e);
542546
}
543547
} else {
548+
// Stream channel was never opened — release the acquired stream slot
549+
if (state != null) {
550+
state.releaseStream();
551+
}
544552
abort(parentChannel, future, f.cause());
545553
}
546554
});

0 commit comments

Comments
 (0)