Skip to content

Commit e944535

Browse files
authored
Fix HTTP/2 silent request timeouts under connection disruptions (#2162)
Motivation: ~0.5% of HTTP/2 requests silently timed out under load (#2160): `Http2Handler.handleChannelInactive()`/`handleException()` were no-ops, so in-flight stream futures were never failed on connection drop and hung to the request timeout. Modification: Implement those handlers; fail requests queued in `pendingOpeners` when the parent connection closes; make a single stream's RST/close stream-scoped (`close=false`) instead of tearing down the whole connection (RFC 7540 6.4); release the stream slot on post-open hook failures. Result: In-flight and queued HTTP/2 requests fail fast on connection drop; one stream's RST no longer kills its siblings.
1 parent 3a6fb5d commit e944535

7 files changed

Lines changed: 2111 additions & 23 deletions

File tree

client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090

9191
import javax.net.ssl.SSLEngine;
9292
import javax.net.ssl.SSLException;
93+
import java.io.IOException;
9394
import java.net.InetAddress;
9495
import java.net.InetSocketAddress;
9596
import java.util.Map;
@@ -360,8 +361,17 @@ public void registerHttp2Connection(Object partitionKey, Channel channel) {
360361
state.setPartitionKey(partitionKey);
361362
}
362363
http2Connections.put(partitionKey, channel);
363-
// Auto-remove from registry when the connection closes
364-
channel.closeFuture().addListener(future -> removeHttp2Connection(partitionKey, channel));
364+
// When the connection closes, remove it from the registry AND fail any requests still queued
365+
// for a stream slot. Without the latter, requests sitting in pendingOpeners when the parent
366+
// connection drops have no stream channel (so no channelInactive is ever delivered for them)
367+
// and would survive only until the request timeout fires — the silent-timeout bug of #2160.
368+
channel.closeFuture().addListener(future -> {
369+
removeHttp2Connection(partitionKey, channel);
370+
if (state != null) {
371+
state.failPendingOpeners(orphan ->
372+
orphan.abort(new IOException("HTTP/2 connection closed before a stream could be opened")));
373+
}
374+
});
365375
}
366376

367377
/**

client/src/main/java/org/asynchttpclient/netty/channel/Http2ConnectionState.java

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
package org.asynchttpclient.netty.channel;
1717

1818
import io.netty.util.AttributeKey;
19+
import org.asynchttpclient.netty.NettyResponseFuture;
1920

21+
import java.util.ArrayList;
22+
import java.util.List;
2023
import java.util.concurrent.ConcurrentLinkedQueue;
2124
import java.util.concurrent.atomic.AtomicBoolean;
2225
import java.util.concurrent.atomic.AtomicInteger;
26+
import java.util.function.Consumer;
2327

2428
/**
2529
* Tracks per-connection HTTP/2 state: active stream count, max concurrent streams,
@@ -30,15 +34,31 @@ public class Http2ConnectionState {
3034
public static final AttributeKey<Http2ConnectionState> HTTP2_STATE_KEY =
3135
AttributeKey.valueOf("http2ConnectionState");
3236

37+
/**
38+
* A request waiting for a free stream slot. Carries both the future (so it can be failed if the
39+
* connection dies before a slot frees up) and the action that actually opens the stream.
40+
*/
41+
private static final class PendingOpener {
42+
final NettyResponseFuture<?> future;
43+
final Runnable opener;
44+
45+
PendingOpener(NettyResponseFuture<?> future, Runnable opener) {
46+
this.future = future;
47+
this.opener = opener;
48+
}
49+
}
50+
3351
private final AtomicInteger activeStreams = new AtomicInteger(0);
3452
private volatile int maxConcurrentStreams = Integer.MAX_VALUE;
3553
private final AtomicBoolean draining = new AtomicBoolean(false);
3654
private volatile int lastGoAwayStreamId = Integer.MAX_VALUE;
37-
private final ConcurrentLinkedQueue<Runnable> pendingOpeners = new ConcurrentLinkedQueue<>();
55+
private final ConcurrentLinkedQueue<PendingOpener> pendingOpeners = new ConcurrentLinkedQueue<>();
56+
private final Object pendingLock = new Object();
57+
private final AtomicBoolean closed = new AtomicBoolean(false);
3858
private volatile Object partitionKey;
3959

4060
public boolean tryAcquireStream() {
41-
if (draining.get()) {
61+
if (draining.get() || closed.get()) {
4262
return false;
4363
}
4464
while (true) {
@@ -54,22 +74,63 @@ public boolean tryAcquireStream() {
5474

5575
public void releaseStream() {
5676
activeStreams.decrementAndGet();
57-
// Try to dequeue and run a pending opener
58-
Runnable pending = pendingOpeners.poll();
59-
if (pending != null) {
60-
pending.run();
61-
}
77+
drainPendingOpeners();
6278
}
6379

6480
public void addPendingOpener(Runnable opener) {
65-
pendingOpeners.add(opener);
66-
// Re-check in case a stream was released between the failed tryAcquire and this enqueue
67-
if (tryAcquireStream()) {
68-
Runnable dequeued = pendingOpeners.poll();
69-
if (dequeued != null) {
70-
dequeued.run();
81+
addPendingOpener(null, opener);
82+
}
83+
84+
public void addPendingOpener(NettyResponseFuture<?> future, Runnable opener) {
85+
synchronized (pendingLock) {
86+
if (tryAcquireStream()) {
87+
opener.run();
7188
} else {
72-
releaseStream();
89+
pendingOpeners.add(new PendingOpener(future, opener));
90+
}
91+
}
92+
}
93+
94+
private void drainPendingOpeners() {
95+
synchronized (pendingLock) {
96+
PendingOpener pending = pendingOpeners.poll();
97+
if (pending != null) {
98+
if (tryAcquireStream()) {
99+
pending.opener.run();
100+
} else {
101+
// Put it back — another releaseStream() will pick it up
102+
pendingOpeners.offer(pending);
103+
}
104+
}
105+
}
106+
}
107+
108+
/**
109+
* Permanently marks the connection unusable and hands every queued (never-started) request to
110+
* {@code failer}. After this call {@link #tryAcquireStream()} returns {@code false}, so a request
111+
* enqueued concurrently with the close is failed by its own caller's post-enqueue active-channel
112+
* check rather than being silently orphaned.
113+
* <p>
114+
* Without this, requests sitting in {@link #pendingOpeners} when the parent connection drops are
115+
* never completed and survive only until the request timeout fires — the silent-timeout
116+
* regression of Issue #2160 (a queued request has no stream channel, hence no channelInactive
117+
* is ever delivered for it).
118+
*
119+
* @param failer invoked once per orphaned request future (e.g. to abort it)
120+
*/
121+
public void failPendingOpeners(Consumer<NettyResponseFuture<?>> failer) {
122+
closed.set(true);
123+
List<PendingOpener> drained = new ArrayList<>();
124+
synchronized (pendingLock) {
125+
PendingOpener p;
126+
while ((p = pendingOpeners.poll()) != null) {
127+
drained.add(p);
128+
}
129+
}
130+
// Fail outside the lock — failer may re-enter client code.
131+
for (PendingOpener p : drained) {
132+
if (p.future != null) {
133+
failer.accept(p.future);
73134
}
74135
}
75136
}

client/src/main/java/org/asynchttpclient/netty/handler/Http2Handler.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,10 @@ private void handleHttp2TrailingHeadersFrame(Http2HeadersFrame headersFrame, Cha
196196
*/
197197
private void handleHttp2ResetFrame(Http2ResetFrame resetFrame, Channel channel, NettyResponseFuture<?> future) {
198198
long errorCode = resetFrame.errorCode();
199-
readFailed(channel, future, new IOException("HTTP/2 stream reset by server, error code: " + errorCode));
199+
// RFC 7540 §5.4.2/§6.4: RST_STREAM is stream-scoped and MUST NOT terminate the connection.
200+
// Fail only this stream's future and close only the (single-use) stream child channel —
201+
// sibling streams multiplexed on the same parent connection must be left untouched.
202+
streamFailed(channel, future, new IOException("HTTP/2 stream reset by server, error code: " + errorCode));
200203
}
201204

202205
/**
@@ -300,11 +303,41 @@ private void readFailed(Channel channel, NettyResponseFuture<?> future, Throwabl
300303
}
301304
}
302305

306+
/**
307+
* Fails a single stream's future WITHOUT closing the parent connection. Used for stream-scoped
308+
* events (RST_STREAM, and the {@code channelInactive}/exception Netty delivers when one stream
309+
* dies). {@link #finishUpdate} with {@code close=false} closes only the single-use stream child
310+
* channel and releases its stream slot, leaving the parent connection and its sibling multiplexed
311+
* streams untouched.
312+
* <p>
313+
* When the PARENT connection genuinely drops, Netty fires {@code channelInactive} on every child
314+
* stream, so each in-flight future is still failed individually and promptly.
315+
*/
316+
private void streamFailed(Channel channel, NettyResponseFuture<?> future, Throwable t) {
317+
if (future.isDone()) {
318+
return;
319+
}
320+
try {
321+
requestSender.abort(channel, future, t);
322+
} catch (Exception abortException) {
323+
logger.debug("Abort failed", abortException);
324+
} finally {
325+
finishUpdate(future, channel, false);
326+
}
327+
}
328+
303329
@Override
304330
public void handleException(NettyResponseFuture<?> future, Throwable error) {
331+
// Stream-scoped: an exception on one stream child channel must not tear down the parent
332+
// connection that sibling multiplexed streams share (see streamFailed).
333+
streamFailed(future.channel(), future, error);
305334
}
306335

307336
@Override
308337
public void handleChannelInactive(NettyResponseFuture<?> future) {
338+
// Stream-scoped (see streamFailed): closing the parent here would fail unrelated sibling
339+
// streams on the same connection — the RST_STREAM/single-stream-close blast-radius bug.
340+
streamFailed(future.channel(), future,
341+
new IOException("HTTP/2 stream channel closed unexpectedly"));
309342
}
310343
}

client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,14 @@ private <T> ListenableFuture<T> sendRequestWithOpenChannel(NettyResponseFuture<T
282282

283283
// channelInactive might be called between isChannelValid and writeRequest
284284
// so if we don't store the Future now, channelInactive won't perform
285-
// handleUnexpectedClosedChannel
286-
Channels.setAttribute(channel, future);
285+
// handleUnexpectedClosedChannel.
286+
// For HTTP/2, skip this: the parent connection multiplexes many concurrent requests, so a
287+
// single per-request Future on the parent channel is meaningless — each request's Future is
288+
// stored on its own stream child channel in openHttp2Stream(). The parent also has no
289+
// AsyncHttpClientHandler after the H2 upgrade, so nothing reads this attribute anyway.
290+
if (!ChannelManager.isHttp2(channel)) {
291+
Channels.setAttribute(channel, future);
292+
}
287293

288294
if (Channels.isChannelActive(channel)) {
289295
writeRequest(future, channel);
@@ -520,7 +526,6 @@ public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
520526
*/
521527
private <T> void writeHttp2Request(NettyResponseFuture<T> future, Channel parentChannel) {
522528
Http2ConnectionState state = parentChannel.attr(Http2ConnectionState.HTTP2_STATE_KEY).get();
523-
Runnable openStream = () -> openHttp2Stream(future, parentChannel);
524529

525530
if (state != null && !state.tryAcquireStream()) {
526531
if (state.isDraining()) {
@@ -530,13 +535,27 @@ private <T> void writeHttp2Request(NettyResponseFuture<T> future, Channel parent
530535
return;
531536
}
532537
// Queue for later when a stream slot opens up
533-
state.addPendingOpener(openStream);
538+
state.addPendingOpener(future, () -> openHttp2Stream(future, parentChannel, state));
539+
// The parent connection may have closed concurrently with the enqueue above; if so its
540+
// close listener already drained the queue and this future would be orphaned (it has no
541+
// stream channel, so no channelInactive is ever delivered for it). Detect that race and
542+
// fail it here so it never survives only to the request timeout (Issue #2160).
543+
if (!parentChannel.isActive() && !future.isDone()) {
544+
abort(parentChannel, future,
545+
new java.io.IOException("HTTP/2 connection closed while request was queued"));
546+
}
534547
return;
535548
}
536-
openStream.run();
549+
openHttp2Stream(future, parentChannel, state);
550+
}
551+
552+
private static void releaseHttp2Stream(Http2ConnectionState state) {
553+
if (state != null) {
554+
state.releaseStream();
555+
}
537556
}
538557

539-
private <T> void openHttp2Stream(NettyResponseFuture<T> future, Channel parentChannel) {
558+
private <T> void openHttp2Stream(NettyResponseFuture<T> future, Channel parentChannel, Http2ConnectionState state) {
540559
new Http2StreamChannelBootstrap(parentChannel)
541560
.handler(new ChannelInitializer<Http2StreamChannel>() {
542561
@Override
@@ -554,13 +573,19 @@ protected void initChannel(Http2StreamChannel streamCh) {
554573
Http2StreamChannel streamChannel = f.getNow();
555574
channelManager.registerOpenChannel(streamChannel);
556575
Channels.setAttribute(streamChannel, future);
576+
Channels.setActiveToken(streamChannel);
557577
future.attachChannel(streamChannel, false);
558578
try {
559579
AsyncHandler<T> asyncHandler = future.getAsyncHandler();
560580
try {
561581
asyncHandler.onRequestSend(future.getNettyRequest());
562582
} catch (Exception e) {
563583
LOGGER.error("onRequestSend crashed", e);
584+
// The slot was acquired before open(); aborting here completes the
585+
// future, so the stream's later channelInactive won't run finishUpdate
586+
// (its !future.isDone() guard fails). Release the slot explicitly,
587+
// otherwise activeStreams leaks and eventually wedges the connection.
588+
releaseHttp2Stream(state);
564589
abort(streamChannel, future, e);
565590
return;
566591
}
@@ -573,9 +598,15 @@ protected void initChannel(Http2StreamChannel streamCh) {
573598
scheduleReadTimeout(future);
574599
} catch (Exception e) {
575600
LOGGER.error("Can't write HTTP/2 request", e);
601+
// See above: release the slot the failed stream will never release itself.
602+
releaseHttp2Stream(state);
576603
abort(streamChannel, future, e);
577604
}
578605
} else {
606+
// Stream channel was never opened — release the acquired stream slot
607+
if (state != null) {
608+
state.releaseStream();
609+
}
579610
abort(parentChannel, future, f.cause());
580611
}
581612
});

0 commit comments

Comments
 (0)