Skip to content

Commit 82b01d9

Browse files
snazyvietj
authored andcommitted
Fix HttpClientResponse body()/end() race (5.0 backport)
This change fixes the race in `HttpClientResponseImpl` described in #6038: `handleTrailers()`/`handleException()` can run before `body()`/`end()` creates the promise. If that happens, the completion signal is dropped and the future can hang. This is observed from the call site as a HTTP request timeout. This fix is only applied to `HttpClientResponseImpl`: - keep ended/failure state on `HttpClientResponseImpl` - in `body()`/`end()`, create the `HttpEventHandler` promise while holding `synchronized(conn)` - after unlock, replay completion/failure via `handleEnd()`/`handleException()` when needed Although `HttpEventHandler` looks very similar (like it _could_ be affected by the same/similar race condition), it is not affected. I intentionally did not change `HttpEventHandler`, because it is also used in server request processing. The only case when server request processing could be affected , is when `body()` is called from a _different_ thread while `end()` is being processed - that is very unlikely (feels like a misuse). Also added two new tests to `Http1xTest` that reliably fail without the change to `HttpClientResponseImpl` and reliably pass with that change. The added regression tests are `testResponseBodyAfterResponseEnd` and `testResponseEndAfterResponseEnd`. Fixes #6038
1 parent 89878bd commit 82b01d9

2 files changed

Lines changed: 134 additions & 37 deletions

File tree

vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java

Lines changed: 81 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.vertx.core.Future;
1616
import io.vertx.core.Handler;
1717
import io.vertx.core.MultiMap;
18+
import io.vertx.core.Promise;
1819
import io.vertx.core.buffer.Buffer;
1920
import io.vertx.core.http.*;
2021
import io.vertx.core.http.impl.headers.HeadersAdaptor;
@@ -45,21 +46,55 @@ public class HttpClientResponseImpl implements HttpClientResponse {
4546
private Handler<StreamPriority> priorityHandler;
4647

4748
// Cache these for performance
48-
private MultiMap headers;
49+
private final MultiMap headers;
4950
private MultiMap trailers;
5051
private List<String> cookies;
5152
private NetSocket netSocket;
5253

54+
/**
55+
* This {@code ended} field is used internally to track whether the response has ended.
56+
* The {@link #completion} promise is used to expose the response result, completed as failed
57+
* or succeeded. The promise's {@link Future} is exposed to external users via {@link #end()}.
58+
*
59+
* <p>The {@code ended} / {@link #completion} pair decouples the "response ended" state handling
60+
* from the potentially expensive work that might be incurred by handlers registered on the promise's
61+
* {@link Future}.
62+
*
63+
* <p>The pattern for both fields in {@link #handleException(Throwable)} and {@link #handleEnd(MultiMap)}
64+
* is:
65+
* <ol>
66+
* <li>Acquire lock ({@code synchronized (conn)}).
67+
* <li>Check {@code ended}, if {@code ended != null} return immediately.
68+
* <li>Set {@code ended}.
69+
* <li>Release lock.
70+
* <li>Complete the {@link #completion} promise.
71+
* </ol>
72+
*
73+
* <p>Possible states of {@code ended}:
74+
* <ul>
75+
* <li>{@code null} - the response has not ended yet</li>
76+
* <li>{@link #ENDED_SENTINEL ENDED_SENTINEL} - the response has ended successfully</li>
77+
* <li>any other {@link Throwable} - the response has ended with an exception</li>
78+
* </ul>
79+
*
80+
* <p>All accesses to this field must be guarded by {@code conn}.
81+
*/
82+
private Throwable ended;
83+
private static final Throwable ENDED_SENTINEL = new Throwable();
84+
private final Promise<Void> completion;
85+
5386
HttpClientResponseImpl(HttpClientRequestBase request, HttpVersion version, HttpClientStream stream, int statusCode, String statusMessage, MultiMap headers) {
5487
this.version = version;
5588
this.statusCode = statusCode;
5689
this.statusMessage = statusMessage;
5790
this.request = request;
5891
this.stream = stream;
5992
this.conn = stream.connection();
93+
this.completion = request.context.promise();
6094
this.headers = headers;
6195
}
6296

97+
// Must be guarded by a `synchronized (conn)` block.
6398
private HttpEventHandler eventHandler(boolean create) {
6499
if (eventHandler == null && create) {
65100
eventHandler = new HttpEventHandler(request.context);
@@ -122,6 +157,10 @@ public MultiMap trailers() {
122157

123158
@Override
124159
public String getTrailer(String trailerName) {
160+
MultiMap trailers;
161+
synchronized (conn) {
162+
trailers = this.trailers;
163+
}
125164
return trailers != null ? trailers.get(trailerName) : null;
126165
}
127166

@@ -139,9 +178,10 @@ public List<String> cookies() {
139178
}
140179
}
141180

181+
/** Must be called within a {@code synchronized (conn)} block. */
142182
private void checkEnded() {
143-
if (trailers != null) {
144-
throw new IllegalStateException();
183+
if (ended != null) {
184+
throw new IllegalStateException("Response already ended");
145185
}
146186
}
147187

@@ -236,23 +276,43 @@ void handleChunk(Buffer data) {
236276

237277
void handleEnd(MultiMap trailers) {
238278
HttpEventHandler handler;
279+
Throwable wasEnded;
280+
// This synchronized block is used to guarantee that a handler's `handleEnd()` is
281+
// called only once and that setting/updating `trailers` does not race with
282+
// `trailers()`, where the `trailers` field can escape.
239283
synchronized (conn) {
240-
this.trailers = trailers;
284+
wasEnded = this.ended;
285+
if (wasEnded != null) {
286+
return;
287+
}
288+
if (this.trailers == null) {
289+
this.trailers = trailers;
290+
} else if (this.trailers != trailers) {
291+
this.trailers.setAll(trailers);
292+
}
293+
this.ended = ENDED_SENTINEL;
241294
handler = eventHandler;
242295
}
296+
completion.tryComplete();
243297
if (handler != null) {
244298
handler.handleEnd();
245299
}
246300
}
247301

248302
void handleException(Throwable e) {
249303
HttpEventHandler handler;
304+
Throwable wasEnded;
250305
synchronized (conn) {
251-
if (trailers != null) {
306+
// Only report the first exception, and generally only report when the
307+
// response has not yet ended.
308+
wasEnded = this.ended;
309+
if (wasEnded != null) {
252310
return;
253311
}
312+
this.ended = e;
254313
handler = eventHandler;
255314
}
315+
completion.tryFail(e);
256316
if (handler != null) {
257317
handler.handleException(e);
258318
} else {
@@ -262,13 +322,25 @@ void handleException(Throwable e) {
262322

263323
@Override
264324
public Future<Buffer> body() {
265-
return eventHandler(true).body();
325+
HttpEventHandler eventHandler;
326+
Future<Buffer> bodyFuture;
327+
Throwable wasEnded;
328+
synchronized (conn) {
329+
eventHandler = eventHandler(true);
330+
bodyFuture = eventHandler.body();
331+
wasEnded = ended;
332+
}
333+
if (wasEnded == ENDED_SENTINEL) {
334+
eventHandler.handleEnd();
335+
} else if (wasEnded != null) {
336+
eventHandler.handleException(wasEnded);
337+
}
338+
return bodyFuture;
266339
}
267340

268341
@Override
269-
public synchronized Future<Void> end() {
270-
checkEnded();
271-
return eventHandler(true).end();
342+
public Future<Void> end() {
343+
return completion.future();
272344
}
273345

274346
@Override

vertx-core/src/test/java/io/vertx/tests/http/Http1xTest.java

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import java.util.concurrent.atomic.AtomicInteger;
4747
import java.util.concurrent.atomic.AtomicReference;
4848
import java.util.function.*;
49-
import java.util.stream.Collector;
5049
import java.util.stream.Collectors;
5150
import java.util.stream.IntStream;
5251
import java.util.stream.Stream;
@@ -72,6 +71,37 @@ protected VertxOptions getOptions() {
7271
return options;
7372
}
7473

74+
@Test
75+
// Regression test for https://github.com/eclipse-vertx/vert.x/issues/6038
76+
public void testResponseBodyAfterResponseEnd() throws Exception {
77+
server.requestHandler(req -> req.response().setStatusCode(204).end());
78+
startServer(testAddress);
79+
client.request(requestOptions)
80+
.compose(HttpClientRequest::send)
81+
// Ensure body() is invoked after response end has been processed.
82+
.compose(resp -> vertx.timer(50).compose(v -> resp.body()))
83+
.timeout(5, TimeUnit.SECONDS)
84+
.onComplete(onSuccess(body -> {
85+
assertEquals(0, body.length());
86+
testComplete();
87+
}));
88+
await();
89+
}
90+
91+
@Test
92+
// Regression test for https://github.com/eclipse-vertx/vert.x/issues/6038
93+
public void testResponseEndAfterResponseEnd() throws Exception {
94+
server.requestHandler(req -> req.response().setStatusCode(204).end());
95+
startServer(testAddress);
96+
client.request(requestOptions)
97+
.compose(HttpClientRequest::send)
98+
// Ensure end() is invoked after response end has been processed.
99+
.compose(resp -> vertx.timer(50).compose(v -> resp.end()))
100+
.timeout(5, TimeUnit.SECONDS)
101+
.onComplete(onSuccess(v -> testComplete()));
102+
await();
103+
}
104+
75105
@Test
76106
public void testClientOptions() {
77107
HttpClientOptions options = new HttpClientOptions();
@@ -3609,21 +3639,10 @@ public void testInvalidChunkInHttpClientResponse() throws Exception {
36093639
cont.whenComplete((v,e) -> {
36103640
so.write("invalid\r\n"); // Invalid chunk
36113641
});
3642+
36123643
}).listen(testAddress).onComplete(onSuccess(v -> listenLatch.countDown()));
36133644
awaitLatch(listenLatch);
3614-
AtomicInteger status = new AtomicInteger();
3615-
testHttpClientResponseDecodeError(cont::complete, err -> {
3616-
switch (status.incrementAndGet()) {
3617-
case 1:
3618-
assertTrue(err instanceof NumberFormatException);
3619-
break;
3620-
case 2:
3621-
assertTrue(err instanceof VertxException);
3622-
assertTrue(err.getMessage().equals("Connection was closed"));
3623-
testComplete();
3624-
break;
3625-
}
3626-
});
3645+
testHttpClientResponseDecodeError(cont::complete, err -> assertTrue(err instanceof NumberFormatException));
36273646
}
36283647

36293648
@Test
@@ -3647,26 +3666,32 @@ public void testInvalidTrailersInHttpClientResponse() throws Exception {
36473666
});
36483667
}).listen(testAddress).onComplete(onSuccess(v -> listenLatch.countDown()));
36493668
awaitLatch(listenLatch);
3650-
AtomicInteger status = new AtomicInteger();
3651-
testHttpClientResponseDecodeError(cont::complete, err -> {
3652-
switch (status.incrementAndGet()) {
3653-
case 1:
3654-
assertTrue(err instanceof TooLongFrameException);
3655-
break;
3656-
case 2:
3657-
assertTrue(err instanceof VertxException);
3658-
assertTrue(err.getMessage().equals("Connection was closed"));
3659-
testComplete();
3660-
break;
3661-
}
3662-
});
3669+
testHttpClientResponseDecodeError(cont::complete, err -> assertTrue(err instanceof TooLongFrameException));
36633670
}
36643671

36653672
private void testHttpClientResponseDecodeError(Handler<Void> continuation, Handler<Throwable> errorHandler) throws Exception {
3673+
AtomicInteger status = new AtomicInteger();
3674+
AtomicBoolean connectionClosed = new AtomicBoolean();
36663675
client.request(requestOptions)
36673676
.onComplete(onSuccess(req -> {
36683677
req.send().onComplete(onSuccess(resp -> {
3669-
resp.exceptionHandler(errorHandler);
3678+
resp.request().connection().closeHandler(v -> {
3679+
connectionClosed.set(true);
3680+
if (status.get() == 1) {
3681+
testComplete();
3682+
}
3683+
});
3684+
resp.exceptionHandler(err -> {
3685+
int current = status.incrementAndGet();
3686+
if (current == 1) {
3687+
errorHandler.handle(err);
3688+
if (connectionClosed.get()) {
3689+
testComplete();
3690+
}
3691+
} else {
3692+
fail("Unexpected extra response exception callback: " + err);
3693+
}
3694+
});
36703695
continuation.handle(null);
36713696
}));
36723697
}));

0 commit comments

Comments
 (0)