Skip to content

Commit 1a64f83

Browse files
committed
Fix HTTP/2 silent request timeouts under connection disruptions (#2160)
1 parent 9ae7681 commit 1a64f83

7 files changed

Lines changed: 2111 additions & 23 deletions

File tree

client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787

8888
import javax.net.ssl.SSLEngine;
8989
import javax.net.ssl.SSLException;
90+
import java.io.IOException;
9091
import java.net.InetAddress;
9192
import java.net.InetSocketAddress;
9293
import java.util.Map;
@@ -353,8 +354,17 @@ public void registerHttp2Connection(Object partitionKey, Channel channel) {
353354
state.setPartitionKey(partitionKey);
354355
}
355356
http2Connections.put(partitionKey, channel);
356-
// Auto-remove from registry when the connection closes
357-
channel.closeFuture().addListener(future -> removeHttp2Connection(partitionKey, channel));
357+
// When the connection closes, remove it from the registry AND fail any requests still queued
358+
// for a stream slot. Without the latter, requests sitting in pendingOpeners when the parent
359+
// connection drops have no stream channel (so no channelInactive is ever delivered for them)
360+
// and would survive only until the request timeout fires — the silent-timeout bug of #2160.
361+
channel.closeFuture().addListener(future -> {
362+
removeHttp2Connection(partitionKey, channel);
363+
if (state != null) {
364+
state.failPendingOpeners(orphan ->
365+
orphan.abort(new IOException("HTTP/2 connection closed before a stream could be opened")));
366+
}
367+
});
358368
}
359369

360370
/**

client/src/main/java/org/asynchttpclient/netty/channel/Http2ConnectionState.java

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
package org.asynchttpclient.netty.channel;
1717

1818
import io.netty.util.AttributeKey;
19+
import org.asynchttpclient.netty.NettyResponseFuture;
1920

21+
import java.util.ArrayList;
22+
import java.util.List;
2023
import java.util.concurrent.ConcurrentLinkedQueue;
2124
import java.util.concurrent.atomic.AtomicBoolean;
2225
import java.util.concurrent.atomic.AtomicInteger;
26+
import java.util.function.Consumer;
2327

2428
/**
2529
* Tracks per-connection HTTP/2 state: active stream count, max concurrent streams,
@@ -30,15 +34,31 @@ public class Http2ConnectionState {
3034
public static final AttributeKey<Http2ConnectionState> HTTP2_STATE_KEY =
3135
AttributeKey.valueOf("http2ConnectionState");
3236

37+
/**
38+
* A request waiting for a free stream slot. Carries both the future (so it can be failed if the
39+
* connection dies before a slot frees up) and the action that actually opens the stream.
40+
*/
41+
private static final class PendingOpener {
42+
final NettyResponseFuture<?> future;
43+
final Runnable opener;
44+
45+
PendingOpener(NettyResponseFuture<?> future, Runnable opener) {
46+
this.future = future;
47+
this.opener = opener;
48+
}
49+
}
50+
3351
private final AtomicInteger activeStreams = new AtomicInteger(0);
3452
private volatile int maxConcurrentStreams = Integer.MAX_VALUE;
3553
private final AtomicBoolean draining = new AtomicBoolean(false);
3654
private volatile int lastGoAwayStreamId = Integer.MAX_VALUE;
37-
private final ConcurrentLinkedQueue<Runnable> pendingOpeners = new ConcurrentLinkedQueue<>();
55+
private final ConcurrentLinkedQueue<PendingOpener> pendingOpeners = new ConcurrentLinkedQueue<>();
56+
private final Object pendingLock = new Object();
57+
private final AtomicBoolean closed = new AtomicBoolean(false);
3858
private volatile Object partitionKey;
3959

4060
public boolean tryAcquireStream() {
41-
if (draining.get()) {
61+
if (draining.get() || closed.get()) {
4262
return false;
4363
}
4464
while (true) {
@@ -54,22 +74,63 @@ public boolean tryAcquireStream() {
5474

5575
public void releaseStream() {
5676
activeStreams.decrementAndGet();
57-
// Try to dequeue and run a pending opener
58-
Runnable pending = pendingOpeners.poll();
59-
if (pending != null) {
60-
pending.run();
61-
}
77+
drainPendingOpeners();
6278
}
6379

6480
public void addPendingOpener(Runnable opener) {
65-
pendingOpeners.add(opener);
66-
// Re-check in case a stream was released between the failed tryAcquire and this enqueue
67-
if (tryAcquireStream()) {
68-
Runnable dequeued = pendingOpeners.poll();
69-
if (dequeued != null) {
70-
dequeued.run();
81+
addPendingOpener(null, opener);
82+
}
83+
84+
public void addPendingOpener(NettyResponseFuture<?> future, Runnable opener) {
85+
synchronized (pendingLock) {
86+
if (tryAcquireStream()) {
87+
opener.run();
7188
} else {
72-
releaseStream();
89+
pendingOpeners.add(new PendingOpener(future, opener));
90+
}
91+
}
92+
}
93+
94+
private void drainPendingOpeners() {
95+
synchronized (pendingLock) {
96+
PendingOpener pending = pendingOpeners.poll();
97+
if (pending != null) {
98+
if (tryAcquireStream()) {
99+
pending.opener.run();
100+
} else {
101+
// Put it back — another releaseStream() will pick it up
102+
pendingOpeners.offer(pending);
103+
}
104+
}
105+
}
106+
}
107+
108+
/**
109+
* Permanently marks the connection unusable and hands every queued (never-started) request to
110+
* {@code failer}. After this call {@link #tryAcquireStream()} returns {@code false}, so a request
111+
* enqueued concurrently with the close is failed by its own caller's post-enqueue active-channel
112+
* check rather than being silently orphaned.
113+
* <p>
114+
* Without this, requests sitting in {@link #pendingOpeners} when the parent connection drops are
115+
* never completed and survive only until the request timeout fires — the silent-timeout
116+
* regression of Issue #2160 (a queued request has no stream channel, hence no channelInactive
117+
* is ever delivered for it).
118+
*
119+
* @param failer invoked once per orphaned request future (e.g. to abort it)
120+
*/
121+
public void failPendingOpeners(Consumer<NettyResponseFuture<?>> failer) {
122+
closed.set(true);
123+
List<PendingOpener> drained = new ArrayList<>();
124+
synchronized (pendingLock) {
125+
PendingOpener p;
126+
while ((p = pendingOpeners.poll()) != null) {
127+
drained.add(p);
128+
}
129+
}
130+
// Fail outside the lock — failer may re-enter client code.
131+
for (PendingOpener p : drained) {
132+
if (p.future != null) {
133+
failer.accept(p.future);
73134
}
74135
}
75136
}

client/src/main/java/org/asynchttpclient/netty/handler/Http2Handler.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,10 @@ private void handleHttp2TrailingHeadersFrame(Http2HeadersFrame headersFrame, Cha
196196
*/
197197
private void handleHttp2ResetFrame(Http2ResetFrame resetFrame, Channel channel, NettyResponseFuture<?> future) {
198198
long errorCode = resetFrame.errorCode();
199-
readFailed(channel, future, new IOException("HTTP/2 stream reset by server, error code: " + errorCode));
199+
// RFC 7540 §5.4.2/§6.4: RST_STREAM is stream-scoped and MUST NOT terminate the connection.
200+
// Fail only this stream's future and close only the (single-use) stream child channel —
201+
// sibling streams multiplexed on the same parent connection must be left untouched.
202+
streamFailed(channel, future, new IOException("HTTP/2 stream reset by server, error code: " + errorCode));
200203
}
201204

202205
/**
@@ -300,11 +303,41 @@ private void readFailed(Channel channel, NettyResponseFuture<?> future, Throwabl
300303
}
301304
}
302305

306+
/**
307+
* Fails a single stream's future WITHOUT closing the parent connection. Used for stream-scoped
308+
* events (RST_STREAM, and the {@code channelInactive}/exception Netty delivers when one stream
309+
* dies). {@link #finishUpdate} with {@code close=false} closes only the single-use stream child
310+
* channel and releases its stream slot, leaving the parent connection and its sibling multiplexed
311+
* streams untouched.
312+
* <p>
313+
* When the PARENT connection genuinely drops, Netty fires {@code channelInactive} on every child
314+
* stream, so each in-flight future is still failed individually and promptly.
315+
*/
316+
private void streamFailed(Channel channel, NettyResponseFuture<?> future, Throwable t) {
317+
if (future.isDone()) {
318+
return;
319+
}
320+
try {
321+
requestSender.abort(channel, future, t);
322+
} catch (Exception abortException) {
323+
logger.debug("Abort failed", abortException);
324+
} finally {
325+
finishUpdate(future, channel, false);
326+
}
327+
}
328+
303329
@Override
304330
public void handleException(NettyResponseFuture<?> future, Throwable error) {
331+
// Stream-scoped: an exception on one stream child channel must not tear down the parent
332+
// connection that sibling multiplexed streams share (see streamFailed).
333+
streamFailed(future.channel(), future, error);
305334
}
306335

307336
@Override
308337
public void handleChannelInactive(NettyResponseFuture<?> future) {
338+
// Stream-scoped (see streamFailed): closing the parent here would fail unrelated sibling
339+
// streams on the same connection — the RST_STREAM/single-stream-close blast-radius bug.
340+
streamFailed(future.channel(), future,
341+
new IOException("HTTP/2 stream channel closed unexpectedly"));
309342
}
310343
}

client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,14 @@ private <T> ListenableFuture<T> sendRequestWithOpenChannel(NettyResponseFuture<T
276276

277277
// channelInactive might be called between isChannelValid and writeRequest
278278
// so if we don't store the Future now, channelInactive won't perform
279-
// handleUnexpectedClosedChannel
280-
Channels.setAttribute(channel, future);
279+
// handleUnexpectedClosedChannel.
280+
// For HTTP/2, skip this: the parent connection multiplexes many concurrent requests, so a
281+
// single per-request Future on the parent channel is meaningless — each request's Future is
282+
// stored on its own stream child channel in openHttp2Stream(). The parent also has no
283+
// AsyncHttpClientHandler after the H2 upgrade, so nothing reads this attribute anyway.
284+
if (!ChannelManager.isHttp2(channel)) {
285+
Channels.setAttribute(channel, future);
286+
}
281287

282288
if (Channels.isChannelActive(channel)) {
283289
writeRequest(future, channel);
@@ -485,7 +491,6 @@ public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
485491
*/
486492
private <T> void writeHttp2Request(NettyResponseFuture<T> future, Channel parentChannel) {
487493
Http2ConnectionState state = parentChannel.attr(Http2ConnectionState.HTTP2_STATE_KEY).get();
488-
Runnable openStream = () -> openHttp2Stream(future, parentChannel);
489494

490495
if (state != null && !state.tryAcquireStream()) {
491496
if (state.isDraining()) {
@@ -495,13 +500,27 @@ private <T> void writeHttp2Request(NettyResponseFuture<T> future, Channel parent
495500
return;
496501
}
497502
// Queue for later when a stream slot opens up
498-
state.addPendingOpener(openStream);
503+
state.addPendingOpener(future, () -> openHttp2Stream(future, parentChannel, state));
504+
// The parent connection may have closed concurrently with the enqueue above; if so its
505+
// close listener already drained the queue and this future would be orphaned (it has no
506+
// stream channel, so no channelInactive is ever delivered for it). Detect that race and
507+
// fail it here so it never survives only to the request timeout (Issue #2160).
508+
if (!parentChannel.isActive() && !future.isDone()) {
509+
abort(parentChannel, future,
510+
new java.io.IOException("HTTP/2 connection closed while request was queued"));
511+
}
499512
return;
500513
}
501-
openStream.run();
514+
openHttp2Stream(future, parentChannel, state);
515+
}
516+
517+
private static void releaseHttp2Stream(Http2ConnectionState state) {
518+
if (state != null) {
519+
state.releaseStream();
520+
}
502521
}
503522

504-
private <T> void openHttp2Stream(NettyResponseFuture<T> future, Channel parentChannel) {
523+
private <T> void openHttp2Stream(NettyResponseFuture<T> future, Channel parentChannel, Http2ConnectionState state) {
505524
new Http2StreamChannelBootstrap(parentChannel)
506525
.handler(new ChannelInitializer<Http2StreamChannel>() {
507526
@Override
@@ -519,13 +538,19 @@ protected void initChannel(Http2StreamChannel streamCh) {
519538
Http2StreamChannel streamChannel = f.getNow();
520539
channelManager.registerOpenChannel(streamChannel);
521540
Channels.setAttribute(streamChannel, future);
541+
Channels.setActiveToken(streamChannel);
522542
future.attachChannel(streamChannel, false);
523543
try {
524544
AsyncHandler<T> asyncHandler = future.getAsyncHandler();
525545
try {
526546
asyncHandler.onRequestSend(future.getNettyRequest());
527547
} catch (Exception e) {
528548
LOGGER.error("onRequestSend crashed", e);
549+
// The slot was acquired before open(); aborting here completes the
550+
// future, so the stream's later channelInactive won't run finishUpdate
551+
// (its !future.isDone() guard fails). Release the slot explicitly,
552+
// otherwise activeStreams leaks and eventually wedges the connection.
553+
releaseHttp2Stream(state);
529554
abort(streamChannel, future, e);
530555
return;
531556
}
@@ -538,9 +563,15 @@ protected void initChannel(Http2StreamChannel streamCh) {
538563
scheduleReadTimeout(future);
539564
} catch (Exception e) {
540565
LOGGER.error("Can't write HTTP/2 request", e);
566+
// See above: release the slot the failed stream will never release itself.
567+
releaseHttp2Stream(state);
541568
abort(streamChannel, future, e);
542569
}
543570
} else {
571+
// Stream channel was never opened — release the acquired stream slot
572+
if (state != null) {
573+
state.releaseStream();
574+
}
544575
abort(parentChannel, future, f.cause());
545576
}
546577
});

0 commit comments

Comments
 (0)