Skip to content

Commit f95856c

Browse files
snazyvietj
authored andcommitted
Fix HttpClientResponse body()/end() race (4.x backport)
This change fixes the race in `HttpClientResponseImpl` described in #6038: `handleTrailers()`/`handleException()` can run before `body()`/`end()` creates the promise. If that happens, the completion signal is dropped and the future can hang. This is observed from the call site as a HTTP request timeout. This fix is only applied to `HttpClientResponseImpl`: - keep ended/failure state on `HttpClientResponseImpl` - in `body()`/`end()`, create the `HttpEventHandler` promise while holding `synchronized(conn)` - after unlock, replay completion/failure via `handleEnd()`/`handleException()` when needed Although `HttpEventHandler` looks very similar (like it _could_ be affected by the same/similar race condition), it is not affected. I intentionally did not change `HttpEventHandler`, because it is also used in server request processing. The only case when server request processing could be affected , is when `body()` is called from a _different_ thread while `end()` is being processed - that is very unlikely (feels like a misuse). Also added two new tests to `Http1xTest` that reliably fail without the change to `HttpClientResponseImpl` and reliably pass with that change. The added regression tests are `testResponseBodyAfterResponseEnd` and `testResponseEndAfterResponseEnd`. Fixes #6038
1 parent 9fe7732 commit f95856c

2 files changed

Lines changed: 135 additions & 38 deletions

File tree

src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java

Lines changed: 81 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.vertx.core.Future;
1616
import io.vertx.core.Handler;
1717
import io.vertx.core.MultiMap;
18+
import io.vertx.core.Promise;
1819
import io.vertx.core.buffer.Buffer;
1920
import io.vertx.core.http.*;
2021
import io.vertx.core.http.impl.headers.HeadersAdaptor;
@@ -51,21 +52,55 @@ public class HttpClientResponseImpl implements HttpClientResponse {
5152
private Handler<StreamPriority> priorityHandler;
5253

5354
// Cache these for performance
54-
private MultiMap headers;
55+
private final MultiMap headers;
5556
private MultiMap trailers;
5657
private List<String> cookies;
5758
private NetSocket netSocket;
5859

60+
/**
61+
* This {@code ended} field is used internally to track whether the response has ended.
62+
* The {@link #completion} promise is used to expose the response result, completed as failed
63+
* or succeeded. The promise's {@link Future} is exposed to external users via {@link #end()}.
64+
*
65+
* <p>The {@code ended} / {@link #completion} pair decouples the "response ended" state handling
66+
* from the potentially expensive work that might be incurred by handlers registered on the promise's
67+
* {@link Future}.
68+
*
69+
* <p>The pattern for both fields in {@link #handleException(Throwable)} and {@link #handleEnd(MultiMap)}
70+
* is:
71+
* <ol>
72+
* <li>Acquire lock ({@code synchronized (conn)}).
73+
* <li>Check {@code ended}, if {@code ended != null} return immediately.
74+
* <li>Set {@code ended}.
75+
* <li>Release lock.
76+
* <li>Complete the {@link #completion} promise.
77+
* </ol>
78+
*
79+
* <p>Possible states of {@code ended}:
80+
* <ul>
81+
* <li>{@code null} - the response has not ended yet</li>
82+
* <li>{@link #ENDED_SENTINEL ENDED_SENTINEL} - the response has ended successfully</li>
83+
* <li>any other {@link Throwable} - the response has ended with an exception</li>
84+
* </ul>
85+
*
86+
* <p>All accesses to this field must be guarded by {@code conn}.
87+
*/
88+
private Throwable ended;
89+
private static final Throwable ENDED_SENTINEL = new Throwable();
90+
private final Promise<Void> completion;
91+
5992
HttpClientResponseImpl(HttpClientRequestBase request, HttpVersion version, HttpClientStream stream, int statusCode, String statusMessage, MultiMap headers) {
6093
this.version = version;
6194
this.statusCode = statusCode;
6295
this.statusMessage = statusMessage;
6396
this.request = request;
6497
this.stream = stream;
6598
this.conn = stream.connection();
99+
this.completion = request.context.promise();
66100
this.headers = headers;
67101
}
68102

103+
// Must be guarded by a `synchronized (conn)` block.
69104
private HttpEventHandler eventHandler(boolean create) {
70105
if (eventHandler == null && create) {
71106
eventHandler = new HttpEventHandler(request.context);
@@ -128,6 +163,10 @@ public MultiMap trailers() {
128163

129164
@Override
130165
public String getTrailer(String trailerName) {
166+
MultiMap trailers;
167+
synchronized (conn) {
168+
trailers = this.trailers;
169+
}
131170
return trailers != null ? trailers.get(trailerName) : null;
132171
}
133172

@@ -145,9 +184,10 @@ public List<String> cookies() {
145184
}
146185
}
147186

187+
/** Must be called within a {@code synchronized (conn)} block. */
148188
private void checkEnded() {
149-
if (trailers != null) {
150-
throw new IllegalStateException();
189+
if (ended != null) {
190+
throw new IllegalStateException("Response already ended");
151191
}
152192
}
153193

@@ -242,23 +282,43 @@ void handleChunk(Buffer data) {
242282

243283
void handleEnd(MultiMap trailers) {
244284
HttpEventHandler handler;
285+
Throwable wasEnded;
286+
// This synchronized block is used to guarantee that a handler's `handleEnd()` is
287+
// called only once and that setting/updating `trailers` does not race with
288+
// `trailers()`, where the `trailers` field can escape.
245289
synchronized (conn) {
246-
this.trailers = trailers;
290+
wasEnded = this.ended;
291+
if (wasEnded != null) {
292+
return;
293+
}
294+
if (this.trailers == null) {
295+
this.trailers = trailers;
296+
} else if (this.trailers != trailers) {
297+
this.trailers.setAll(trailers);
298+
}
299+
this.ended = ENDED_SENTINEL;
247300
handler = eventHandler;
248301
}
302+
completion.tryComplete();
249303
if (handler != null) {
250304
handler.handleEnd();
251305
}
252306
}
253307

254308
void handleException(Throwable e) {
255309
HttpEventHandler handler;
310+
Throwable wasEnded;
256311
synchronized (conn) {
257-
if (trailers != null) {
312+
// Only report the first exception, and generally only report when the
313+
// response has not yet ended.
314+
wasEnded = this.ended;
315+
if (wasEnded != null) {
258316
return;
259317
}
318+
this.ended = e;
260319
handler = eventHandler;
261320
}
321+
completion.tryFail(e);
262322
if (handler != null) {
263323
handler.handleException(e);
264324
} else {
@@ -268,13 +328,25 @@ void handleException(Throwable e) {
268328

269329
@Override
270330
public Future<Buffer> body() {
271-
return eventHandler(true).body();
331+
HttpEventHandler eventHandler;
332+
Future<Buffer> bodyFuture;
333+
Throwable wasEnded;
334+
synchronized (conn) {
335+
eventHandler = eventHandler(true);
336+
bodyFuture = eventHandler.body();
337+
wasEnded = ended;
338+
}
339+
if (wasEnded == ENDED_SENTINEL) {
340+
eventHandler.handleEnd();
341+
} else if (wasEnded != null) {
342+
eventHandler.handleException(wasEnded);
343+
}
344+
return bodyFuture;
272345
}
273346

274347
@Override
275-
public synchronized Future<Void> end() {
276-
checkEnded();
277-
return eventHandler(true).end();
348+
public Future<Void> end() {
349+
return completion.future();
278350
}
279351

280352
@Override

src/test/java/io/vertx/core/http/Http1xTest.java

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,37 @@ protected VertxOptions getOptions() {
7070
return options;
7171
}
7272

73+
@Test
74+
// Regression test for https://github.com/eclipse-vertx/vert.x/issues/6038
75+
public void testResponseBodyAfterResponseEnd() throws Exception {
76+
server.requestHandler(req -> req.response().setStatusCode(204).end());
77+
startServer(testAddress);
78+
client.request(requestOptions)
79+
.compose(HttpClientRequest::send)
80+
// Ensure body() is invoked after response end has been processed.
81+
.compose(resp -> vertx.timer(50).compose(v -> resp.body()))
82+
.timeout(5, TimeUnit.SECONDS)
83+
.onComplete(onSuccess(body -> {
84+
assertEquals(0, body.length());
85+
testComplete();
86+
}));
87+
await();
88+
}
89+
90+
@Test
91+
// Regression test for https://github.com/eclipse-vertx/vert.x/issues/6038
92+
public void testResponseEndAfterResponseEnd() throws Exception {
93+
server.requestHandler(req -> req.response().setStatusCode(204).end());
94+
startServer(testAddress);
95+
client.request(requestOptions)
96+
.compose(HttpClientRequest::send)
97+
// Ensure end() is invoked after response end has been processed.
98+
.compose(resp -> vertx.timer(50).compose(v -> resp.end()))
99+
.timeout(5, TimeUnit.SECONDS)
100+
.onComplete(onSuccess(v -> testComplete()));
101+
await();
102+
}
103+
73104
@Test
74105
public void testClientOptions() {
75106
HttpClientOptions options = new HttpClientOptions();
@@ -3732,21 +3763,9 @@ public void testInvalidChunkInHttpClientResponse() throws Exception {
37323763
cont.whenComplete((v,e) -> {
37333764
so.write("invalid\r\n"); // Invalid chunk
37343765
});
3735-
}).listen(testAddress, onSuccess(v -> listenLatch.countDown()));
3766+
}).listen(testAddress).onComplete(onSuccess(v -> listenLatch.countDown()));
37363767
awaitLatch(listenLatch);
3737-
AtomicInteger status = new AtomicInteger();
3738-
testHttpClientResponseDecodeError(cont::complete, err -> {
3739-
switch (status.incrementAndGet()) {
3740-
case 1:
3741-
assertTrue(err instanceof NumberFormatException);
3742-
break;
3743-
case 2:
3744-
assertTrue(err instanceof VertxException);
3745-
assertTrue(err.getMessage().equals("Connection was closed"));
3746-
testComplete();
3747-
break;
3748-
}
3749-
});
3768+
testHttpClientResponseDecodeError(cont::complete, err -> assertTrue(err instanceof NumberFormatException));
37503769
}
37513770

37523771
@Test
@@ -3770,26 +3789,32 @@ public void testInvalidTrailersInHttpClientResponse() throws Exception {
37703789
});
37713790
}).listen(testAddress, onSuccess(v -> listenLatch.countDown()));
37723791
awaitLatch(listenLatch);
3773-
AtomicInteger status = new AtomicInteger();
3774-
testHttpClientResponseDecodeError(cont::complete, err -> {
3775-
switch (status.incrementAndGet()) {
3776-
case 1:
3777-
assertTrue(err instanceof TooLongFrameException);
3778-
break;
3779-
case 2:
3780-
assertTrue(err instanceof VertxException);
3781-
assertTrue(err.getMessage().equals("Connection was closed"));
3782-
testComplete();
3783-
break;
3784-
}
3785-
});
3792+
testHttpClientResponseDecodeError(cont::complete, err -> assertTrue(err instanceof TooLongFrameException));
37863793
}
37873794

37883795
private void testHttpClientResponseDecodeError(Handler<Void> continuation, Handler<Throwable> errorHandler) throws Exception {
3796+
AtomicInteger status = new AtomicInteger();
3797+
AtomicBoolean connectionClosed = new AtomicBoolean();
37893798
client.request(requestOptions)
37903799
.onComplete(onSuccess(req -> {
3791-
req.send(onSuccess(resp -> {
3792-
resp.exceptionHandler(errorHandler);
3800+
req.send().onComplete(onSuccess(resp -> {
3801+
resp.request().connection().closeHandler(v -> {
3802+
connectionClosed.set(true);
3803+
if (status.get() == 1) {
3804+
testComplete();
3805+
}
3806+
});
3807+
resp.exceptionHandler(err -> {
3808+
int current = status.incrementAndGet();
3809+
if (current == 1) {
3810+
errorHandler.handle(err);
3811+
if (connectionClosed.get()) {
3812+
testComplete();
3813+
}
3814+
} else {
3815+
fail("Unexpected extra response exception callback: " + err);
3816+
}
3817+
});
37933818
continuation.handle(null);
37943819
}));
37953820
}));

0 commit comments

Comments
 (0)