Skip to content

Commit 80b6c45

Browse files
committed
Fix request-body double-free and harden HTTP/2 lifecycle
Fix the cancel/timeout-vs-write request-body double-free with an atomic NettyRequest state, bound the pendingOpeners queue, keep a stream-open failure from closing the multiplexed parent, skip interceptors on interim 1xx, and build the WebSocket http/1.1 SSL context lazily. Document the setBody(ByteBuf) ownership contract and strengthen the request-body-leak regression tests.
1 parent 7a71276 commit 80b6c45

12 files changed

Lines changed: 225 additions & 70 deletions

client/src/main/java/org/asynchttpclient/RequestBuilderBase.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,18 @@ public T setBody(ByteBuffer data) {
478478
return asDerivedType();
479479
}
480480

481+
/**
482+
* Sets the request body from a Netty {@link ByteBuf}.
483+
* <p>
484+
* <strong>Ownership:</strong> the caller retains ownership of {@code data}. AsyncHttpClient sends a
485+
* retained duplicate per attempt (so redirects, auth replays and retries each get their own reference and
486+
* the body survives across them) and never releases {@code data} itself. The caller is responsible for
487+
* releasing {@code data} once the request has completed. (This differs from older releases, which consumed
488+
* and released the buffer on the first send — and double-freed it on any retry.)
489+
*
490+
* @param data the request body; the caller keeps ownership and must release it after the request completes
491+
* @return this builder
492+
*/
481493
public T setBody(ByteBuf data) {
482494
resetBody();
483495
byteBufData = data;

client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,9 @@ private boolean terminateAndExit() {
276276
*/
277277
private void releaseRequestIfNotHandedToChannel() {
278278
NettyRequest request = nettyRequest;
279-
if (request != null && !request.isHandedToChannel()) {
279+
if (request != null) {
280+
// release() atomically no-ops if the request was already handed to the channel encoder (which then
281+
// owns the release), so there is no check-then-act race with the concurrent event-loop write.
280282
request.release();
281283
}
282284
}

client/src/main/java/org/asynchttpclient/netty/channel/Http2ConnectionState.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,15 @@ private static final class PendingOpener {
5858
private volatile int maxConcurrentStreams = Integer.MAX_VALUE;
5959
private final AtomicBoolean draining = new AtomicBoolean(false);
6060
private volatile int lastGoAwayStreamId = Integer.MAX_VALUE;
61+
// Hard cap on requests queued waiting for a free stream slot. A peer that accepts the connection but never
62+
// grants slots — SETTINGS_MAX_CONCURRENT_STREAMS=0, or a small limit with streams it never completes — would
63+
// otherwise make every subsequent request queue forever, each pinning a NettyResponseFuture and its request
64+
// body buffer, until the client OOMs. Past the cap, offerPendingOpener rejects and the caller fails the
65+
// request fast. All queue mutations happen under pendingLock, so pendingCount tracks size in O(1)
66+
// (ConcurrentLinkedQueue.size() is O(n)).
67+
private static final int MAX_PENDING_OPENERS = 10_000;
6168
private final ConcurrentLinkedQueue<PendingOpener> pendingOpeners = new ConcurrentLinkedQueue<>();
69+
private int pendingCount;
6270
private final Object pendingLock = new Object();
6371
private final AtomicBoolean closed = new AtomicBoolean(false);
6472
// Set when this connection lost the per-partition registration race (thundering herd): it is not in
@@ -106,17 +114,19 @@ public boolean offerPendingOpener(Runnable opener) {
106114
/**
107115
* Runs {@code opener} immediately if a stream slot is free, otherwise queues it for a later
108116
* {@link #releaseStream()}. Returns {@code false} — <em>without</em> queuing — when the connection is
109-
* already draining or closed: such a connection never runs a queued opener ({@link #drainPendingOpeners}
110-
* only re-offers it, and {@link #failPendingOpeners} has already drained the queue), so the caller MUST
111-
* fail the request itself rather than let it sit until the request timeout fires (Issue #2160).
117+
* already draining or closed, or when the pending queue is already at {@link #MAX_PENDING_OPENERS}: in
118+
* each case the caller MUST fail the request itself rather than let it sit until the request timeout fires
119+
* (Issue #2160). A draining/closed connection never runs a queued opener ({@link #drainPendingOpeners} only
120+
* re-offers it, and {@link #failPendingOpeners} has already drained the queue); a full queue means the peer
121+
* is starving slots and the request would otherwise grow heap without bound.
112122
* <p>
113-
* Race-free against {@link #failPendingOpeners}: that method sets {@code closed} before draining under
114-
* {@code pendingLock}. An opener enqueued before the drain acquires the lock is caught by the drain; an
115-
* enqueue attempt sequenced after the drain observes {@code closed} here (the lock provides the
116-
* happens-before) and is rejected. Either way no opener is left stranded.
123+
* Race-free against {@link #failPendingOpeners}: that method sets {@code closed} and drains the queue under
124+
* {@code pendingLock}. An opener enqueued before the drain runs is caught by the drain; an enqueue attempt
125+
* sequenced after it observes {@code closed} here (the lock provides the happens-before) and is rejected.
126+
* Either way no opener is left stranded.
117127
*
118128
* @return {@code true} if the opener was run inline or queued; {@code false} if rejected because the
119-
* connection is draining/closed (caller must fail the request)
129+
* connection is draining/closed or the pending queue is full (caller must fail the request)
120130
*/
121131
public boolean offerPendingOpener(NettyResponseFuture<?> future, Runnable opener) {
122132
synchronized (pendingLock) {
@@ -126,7 +136,11 @@ public boolean offerPendingOpener(NettyResponseFuture<?> future, Runnable opener
126136
if (tryAcquireStream()) {
127137
opener.run();
128138
} else {
139+
if (pendingCount >= MAX_PENDING_OPENERS) {
140+
return false;
141+
}
129142
pendingOpeners.add(new PendingOpener(future, opener));
143+
pendingCount++;
130144
}
131145
return true;
132146
}
@@ -142,6 +156,7 @@ private void drainPendingOpeners() {
142156
// this never over-opens; every poll is under pendingLock, so a non-empty queue always yields a
143157
// non-null opener.
144158
while (!pendingOpeners.isEmpty() && tryAcquireStream()) {
159+
pendingCount--;
145160
pendingOpeners.poll().opener.run();
146161
}
147162
}
@@ -161,13 +176,18 @@ private void drainPendingOpeners() {
161176
* @param failer invoked once per orphaned request future (e.g. to abort it)
162177
*/
163178
public void failPendingOpeners(Consumer<NettyResponseFuture<?>> failer) {
164-
closed.set(true);
165179
List<PendingOpener> drained = new ArrayList<>();
166180
synchronized (pendingLock) {
181+
// Set closed UNDER pendingLock, before draining: an offerPendingOpener that already holds the lock
182+
// finishes its enqueue and is caught by the drain below; one that has not yet acquired the lock
183+
// observes closed==true (lock happens-before) and is rejected. Setting it outside the lock would
184+
// leave the invariant the offerPendingOpener javadoc relies on resting on luck, not the lock.
185+
closed.set(true);
167186
PendingOpener p;
168187
while ((p = pendingOpeners.poll()) != null) {
169188
drained.add(p);
170189
}
190+
pendingCount = 0;
171191
}
172192
// Fail outside the lock — failer may re-enter client code.
173193
for (PendingOpener p : drained) {

client/src/main/java/org/asynchttpclient/netty/handler/Http2ContentDecompressor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
134134
totalDecompressedBytes += decompressed.readableBytes();
135135
if (totalDecompressedBytes > maxDecompressedBytes) {
136136
decompressed.release();
137-
releaseDecompressor();
137+
// Swallow any teardown throw (finishAndReleaseAll() can re-run the decoder over a leftover
138+
// and throw a second DecompressionException) so the caller sees the bomb-limit message below,
139+
// not the raw codec error. The decoder's own cumulation is freed in channelInputClosed's
140+
// finally regardless, so swallowing here leaks nothing — same as the corrupt-body catch.
141+
try {
142+
releaseDecompressor();
143+
} catch (Throwable cleanupError) {
144+
// best-effort decoder teardown
145+
}
138146
throw new DecompressionException(
139147
"HTTP/2 response body exceeds the maximum decompressed size of "
140148
+ maxDecompressedBytes + " bytes");

client/src/main/java/org/asynchttpclient/netty/handler/Http2Handler.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,13 +163,20 @@ private void handleHttp2HeadersFrame(Http2HeadersFrame headersFrame, Channel cha
163163

164164
NettyResponseStatus status = new NettyResponseStatus(future.getUri(), syntheticResponse, channel);
165165

166+
// RFC 9110 §15.2: a 1xx is an INTERIM response, not the final one. 100-continue must still run the
167+
// interceptor chain (Continue100Interceptor resumes the deferred body), but any OTHER interim — 102
168+
// Processing, 103 Early Hints — must NOT touch the chain at all: running it would persist the interim's
169+
// Set-Cookie into the CookieStore and execute response filters against a non-final response, then do it
170+
// again on the real response. The interim HEADERS has endStream=false, so just wait for the final frame.
171+
if (statusCode > 100 && statusCode < 200) {
172+
return;
173+
}
174+
166175
if (!interceptors.exitAfterIntercept(channel, future, handler, syntheticResponse, status, responseHeaders)) {
167-
// RFC 9113 §8.1 / RFC 9110 §15.2: a 1xx is an INTERIM response, not the final one. Any 1xx not
168-
// consumed by an interceptor (100-continue is handled in exitAfterIntercept above; 102 Processing
169-
// and 103 Early Hints are not) must NOT be delivered to the AsyncHandler as the final status —
170-
// that fires onStatusReceived/onHeadersReceived a second time when the real response arrives.
171-
// The interim HEADERS has endStream=false, so just keep waiting for the final HEADERS frame.
172-
if (statusCode >= 100 && statusCode < 200) {
176+
// A 100 that the interceptor chain did not consume (no Expect/100-continue in flight) is still
177+
// interim and must not be delivered to the AsyncHandler as the final status — that would fire
178+
// onStatusReceived/onHeadersReceived a second time when the real response arrives.
179+
if (statusCode == 100) {
173180
return;
174181
}
175182
boolean abort = handler.onStatusReceived(status) == State.ABORT;

client/src/main/java/org/asynchttpclient/netty/request/NettyRequest.java

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,21 @@
2424
public final class NettyRequest {
2525

2626
@SuppressWarnings("rawtypes")
27-
private static final AtomicIntegerFieldUpdater<NettyRequest> RELEASED_UPDATER =
28-
AtomicIntegerFieldUpdater.newUpdater(NettyRequest.class, "released");
27+
private static final AtomicIntegerFieldUpdater<NettyRequest> STATE_UPDATER =
28+
AtomicIntegerFieldUpdater.newUpdater(NettyRequest.class, "state");
29+
30+
// Single atomic state machine guarding who releases httpRequest (and the request-body ByteBuf it holds),
31+
// so it is released EXACTLY once — no double-free, no leak — even when an abort/cancel/timeout on one
32+
// thread races the channel write on the event loop. The two transitions out of OWNED_BY_AHC are mutually
33+
// exclusive (CAS), so exactly one of {Netty's encoder, AHC} ends up owning the release.
34+
private static final int OWNED_BY_AHC = 0; // initial: AHC owns the release
35+
private static final int HANDED_TO_CHANNEL = 1; // handed to Netty's HTTP/1.1 encoder; IT releases
36+
private static final int RELEASED = 2; // AHC released it
2937

3038
private final HttpRequest httpRequest;
3139
private final NettyBody body;
3240
@SuppressWarnings("unused")
33-
private volatile int released;
34-
// Set once httpRequest has been handed to a channel write (HTTP/1.1): from then on Netty's encoder owns
35-
// and releases it, so AsyncHttpClient must NOT also release it. Until then — and always in the HTTP/2
36-
// path, where httpRequest is never written to a channel but re-encoded as frames — AHC owns the release.
37-
private volatile boolean handedToChannel;
41+
private volatile int state;
3842

3943
NettyRequest(HttpRequest httpRequest, NettyBody body) {
4044
this.httpRequest = httpRequest;
@@ -51,30 +55,35 @@ public NettyBody getBody() {
5155

5256
/**
5357
* Releases the underlying HTTP/1.1 {@link HttpRequest} (and the request-body {@link io.netty.buffer.ByteBuf}
54-
* it holds) exactly once. In the HTTP/2 path the {@code httpRequest} object itself is never written to a
55-
* channel — its content is re-encoded as HTTP/2 frames — so AsyncHttpClient owns its release.
56-
* {@code NettyRequestSender.sendHttp2Frames} releases it on the normal send path; this lets the
57-
* early-abort paths (a draining/closed/queued-then-dropped connection, or a crashing {@code onRequestSend})
58-
* release it too, so the request body buffer never leaks. Idempotent and thread-safe — extra calls are no-ops.
58+
* it holds) — but only while AsyncHttpClient still owns it (it was never handed to a channel write). Once
59+
* {@link #markHandedToChannel()} has claimed it, Netty's encoder owns the release and this becomes a no-op,
60+
* so the two can never double-free. The atomic CAS also closes the check-then-release TOCTOU an external
61+
* {@code isHandedToChannel()} test would leave between an abort thread and the event-loop write.
62+
* <p>
63+
* In the HTTP/2 path the {@code httpRequest} object is never written to a channel — its content is
64+
* re-encoded as HTTP/2 frames — so AHC always owns its release and this performs it. The early-abort paths
65+
* (a draining/closed/queued-then-dropped connection, or a crashing {@code onRequestSend}) call it too.
66+
* Idempotent and thread-safe — extra calls are no-ops.
5967
*/
6068
public void release() {
61-
if (RELEASED_UPDATER.compareAndSet(this, 0, 1)) {
69+
if (STATE_UPDATER.compareAndSet(this, OWNED_BY_AHC, RELEASED)) {
6270
ReferenceCountUtil.release(httpRequest);
6371
}
6472
}
6573

6674
/**
67-
* Marks that {@link #getHttpRequest()} has been handed to a channel write (the HTTP/1.1 path), after which
68-
* Netty's encoder owns its release. This lets the abort/terminate paths release the request body only when
69-
* it was never written — preventing a double-free with the encoder while still freeing it on an
70-
* abort-before-write (e.g. a {@code setBody(ByteBuf)} request whose connection fails before the request is
71-
* sent). Idempotent.
75+
* Atomically claims {@link #getHttpRequest()} for a channel write (the HTTP/1.1 path), after which Netty's
76+
* encoder owns its release. Returns {@code true} if the caller may proceed with the write; returns
77+
* {@code false} when a concurrent abort/cancel/timeout has ALREADY released the request body — in which case
78+
* the caller MUST NOT write the now-freed buffer. This single CAS resolves the double-free/use-after-free
79+
* race a separate {@code handedToChannel} flag plus a non-atomic {@code release()} would otherwise allow
80+
* (e.g. a {@code setBody(ByteBuf)} request cancelled on one thread while the event loop writes it).
7281
*/
73-
public void markHandedToChannel() {
74-
handedToChannel = true;
82+
public boolean markHandedToChannel() {
83+
return STATE_UPDATER.compareAndSet(this, OWNED_BY_AHC, HANDED_TO_CHANNEL);
7584
}
7685

7786
public boolean isHandedToChannel() {
78-
return handedToChannel;
87+
return state == HANDED_TO_CHANNEL;
7988
}
8089
}

client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -508,10 +508,14 @@ public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
508508
return;
509509
}
510510

511-
// About to hand httpRequest to the channel — from here Netty's HTTP/1.1 encoder owns and
512-
// releases it, so a later abort must not release it again (NettyRequest.release() is gated on
513-
// isHandedToChannel()). Before this point an abort releases the request body itself.
514-
nettyRequest.markHandedToChannel();
511+
// Atomically claim httpRequest for the channel write — from here Netty's HTTP/1.1 encoder owns
512+
// and releases it, so a later abort must not release it again (NettyRequest.release() then
513+
// no-ops). If the claim FAILS, a concurrent abort/cancel/timeout has already released the
514+
// request body on another thread; writing the freed buffer would be a use-after-free, so bail
515+
// (the future is already terminal). Before this point an abort releases the request body itself.
516+
if (!nettyRequest.markHandedToChannel()) {
517+
return;
518+
}
515519

516520
// if the request has a body, we want to track progress
517521
if (writeBody) {
@@ -676,7 +680,13 @@ protected void initChannel(Http2StreamChannel streamCh) {
676680
state.releaseStream();
677681
}
678682
releaseHttp2Request(future);
679-
abort(parentChannel, future, f.cause());
683+
// Fail ONLY this future (future.abort, not abort(parentChannel, ...)): opening one stream
684+
// can fail for a stream-local reason (e.g. Netty rejecting it as the outbound max-streams
685+
// bookkeeping races AHC's own cap) while the parent connection is healthy with sibling
686+
// streams. abort(channel, ...) would closeChannel(parentChannel) and take them all down —
687+
// the multiplexed-connection blast radius. Keep it stream-scoped, like the draining/queued
688+
// paths above and Http2Handler.streamFailed(close=false).
689+
future.abort(f.cause());
680690
}
681691
});
682692
}

client/src/main/java/org/asynchttpclient/netty/request/body/Http2BodyWriter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@
4747
* the stream pipeline resumes it from {@code channelWritabilityChanged}.</li>
4848
* <li>It reads one chunk ahead so the <em>final</em> DATA frame can carry {@code endStream=true};
4949
* an empty body still sends a single empty DATA frame with {@code endStream=true}.</li>
50-
* <li>In-flight heap is bounded to O(chunk size): at most one buffered "pending" chunk is held across
51-
* a writability wait.</li>
50+
* <li>This writer holds at most one buffered "pending" chunk across a writability wait. Production stops
51+
* once the channel goes unwritable, so total in-flight heap is bounded by the channel's write
52+
* high-water mark (the already-written chunks the channel's outbound buffer still owns) plus that one
53+
* pending chunk — not by the size of the whole body.</li>
5254
* </ul>
5355
* <p>
5456
* <strong>Lifecycle / cleanup.</strong> Because the pump completes asynchronously (after {@code writeHttp2}

0 commit comments

Comments
 (0)