Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.core.http.impl.headers.HeadersAdaptor;
Expand Down Expand Up @@ -45,21 +46,55 @@ public class HttpClientResponseImpl implements HttpClientResponse {
private Handler<StreamPriority> priorityHandler;

// Cache these for performance
private MultiMap headers;
private final MultiMap headers;
private MultiMap trailers;
private List<String> cookies;
private NetSocket netSocket;

/**
* This {@code ended} field is used internally to track whether the response has ended.
* The {@link #completion} promise is used to expose the response result, completed as failed
* or succeeded. The promise's {@link Future} is exposed to external users via {@link #end()}.
*
* <p>The {@code ended} / {@link #completion} pair decouples the "response ended" state handling
* from the potentially expensive work that might be incurred by handlers registered on the promise's
* {@link Future}.
*
* <p>The pattern for both fields in {@link #handleException(Throwable)} and {@link #handleEnd(MultiMap)}
* is:
* <ol>
* <li>Acquire lock ({@code synchronized (conn)}).
* <li>Check {@code ended}, if {@code ended != null} return immediately.
* <li>Set {@code ended}.
* <li>Release lock.
* <li>Complete the {@link #completion} promise.
* </ol>
*
* <p>Possible states of {@code ended}:
* <ul>
* <li>{@code null} - the response has not ended yet</li>
* <li>{@link #ENDED_SENTINEL ENDED_SENTINEL} - the response has ended successfully</li>
* <li>any other {@link Throwable} - the response has ended with an exception</li>
* </ul>
*
* <p>All accesses to this field must be guarded by {@code conn}.
*/
private Throwable ended;
private static final Throwable ENDED_SENTINEL = new Throwable();
private final Promise<Void> completion;

HttpClientResponseImpl(HttpClientRequestBase request, HttpVersion version, HttpClientStream stream, int statusCode, String statusMessage, MultiMap headers) {
this.version = version;
this.statusCode = statusCode;
this.statusMessage = statusMessage;
this.request = request;
this.stream = stream;
this.conn = stream.connection();
this.completion = request.context.promise();
this.headers = headers;
}

// Must be guarded by a `synchronized (conn)` block.
private HttpEventHandler eventHandler(boolean create) {
if (eventHandler == null && create) {
eventHandler = new HttpEventHandler(request.context);
Expand Down Expand Up @@ -122,6 +157,10 @@ public MultiMap trailers() {

@Override
public String getTrailer(String trailerName) {
MultiMap trailers;
synchronized (conn) {
trailers = this.trailers;
}
return trailers != null ? trailers.get(trailerName) : null;
}

Expand All @@ -139,9 +178,10 @@ public List<String> cookies() {
}
}

/** Must be called within a {@code synchronized (conn)} block. */
private void checkEnded() {
if (trailers != null) {
throw new IllegalStateException();
if (ended != null) {
throw new IllegalStateException("Response already ended");
}
}

Expand Down Expand Up @@ -236,23 +276,43 @@ void handleChunk(Buffer data) {

void handleEnd(MultiMap trailers) {
HttpEventHandler handler;
Throwable wasEnded;
// This synchronized block is used to guarantee that a handler's `handleEnd()` is
// called only once and that setting/updating `trailers` does not race with
// `trailers()`, where the `trailers` field can escape.
synchronized (conn) {
this.trailers = trailers;
wasEnded = this.ended;
if (wasEnded != null) {
return;
}
if (this.trailers == null) {
this.trailers = trailers;
} else if (this.trailers != trailers) {
this.trailers.setAll(trailers);
}
this.ended = ENDED_SENTINEL;
handler = eventHandler;
}
completion.tryComplete();
if (handler != null) {
handler.handleEnd();
}
}

void handleException(Throwable e) {
HttpEventHandler handler;
Throwable wasEnded;
synchronized (conn) {
if (trailers != null) {
// Only report the first exception, and generally only report when the
// response has not yet ended.
wasEnded = this.ended;
if (wasEnded != null) {
return;
}
this.ended = e;
handler = eventHandler;
}
completion.tryFail(e);
if (handler != null) {
handler.handleException(e);
} else {
Expand All @@ -262,13 +322,25 @@ void handleException(Throwable e) {

@Override
public Future<Buffer> body() {
return eventHandler(true).body();
HttpEventHandler eventHandler;
Future<Buffer> bodyFuture;
Throwable wasEnded;
synchronized (conn) {
eventHandler = eventHandler(true);
bodyFuture = eventHandler.body();
wasEnded = ended;
}
if (wasEnded == ENDED_SENTINEL) {
eventHandler.handleEnd();
} else if (wasEnded != null) {
eventHandler.handleException(wasEnded);
}
return bodyFuture;
}

@Override
public synchronized Future<Void> end() {
checkEnded();
return eventHandler(true).end();
public Future<Void> end() {
return completion.future();
}

@Override
Expand Down
81 changes: 53 additions & 28 deletions vertx-core/src/test/java/io/vertx/tests/http/Http1xTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.*;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand All @@ -72,6 +71,37 @@ protected VertxOptions getOptions() {
return options;
}

@Test
// Regression test for https://github.com/eclipse-vertx/vert.x/issues/6038
public void testResponseBodyAfterResponseEnd() throws Exception {
server.requestHandler(req -> req.response().setStatusCode(204).end());
startServer(testAddress);
client.request(requestOptions)
.compose(HttpClientRequest::send)
// Ensure body() is invoked after response end has been processed.
.compose(resp -> vertx.timer(50).compose(v -> resp.body()))
.timeout(5, TimeUnit.SECONDS)
.onComplete(onSuccess(body -> {
assertEquals(0, body.length());
testComplete();
}));
await();
}

@Test
// Regression test for https://github.com/eclipse-vertx/vert.x/issues/6038
public void testResponseEndAfterResponseEnd() throws Exception {
server.requestHandler(req -> req.response().setStatusCode(204).end());
startServer(testAddress);
client.request(requestOptions)
.compose(HttpClientRequest::send)
// Ensure end() is invoked after response end has been processed.
.compose(resp -> vertx.timer(50).compose(v -> resp.end()))
.timeout(5, TimeUnit.SECONDS)
.onComplete(onSuccess(v -> testComplete()));
await();
}

@Test
public void testClientOptions() {
HttpClientOptions options = new HttpClientOptions();
Expand Down Expand Up @@ -3609,21 +3639,10 @@ public void testInvalidChunkInHttpClientResponse() throws Exception {
cont.whenComplete((v,e) -> {
so.write("invalid\r\n"); // Invalid chunk
});

}).listen(testAddress).onComplete(onSuccess(v -> listenLatch.countDown()));
awaitLatch(listenLatch);
AtomicInteger status = new AtomicInteger();
testHttpClientResponseDecodeError(cont::complete, err -> {
switch (status.incrementAndGet()) {
case 1:
assertTrue(err instanceof NumberFormatException);
break;
case 2:
assertTrue(err instanceof VertxException);
assertTrue(err.getMessage().equals("Connection was closed"));
testComplete();
break;
}
});
testHttpClientResponseDecodeError(cont::complete, err -> assertTrue(err instanceof NumberFormatException));
}

@Test
Expand All @@ -3647,26 +3666,32 @@ public void testInvalidTrailersInHttpClientResponse() throws Exception {
});
}).listen(testAddress).onComplete(onSuccess(v -> listenLatch.countDown()));
awaitLatch(listenLatch);
AtomicInteger status = new AtomicInteger();
testHttpClientResponseDecodeError(cont::complete, err -> {
switch (status.incrementAndGet()) {
case 1:
assertTrue(err instanceof TooLongFrameException);
break;
case 2:
assertTrue(err instanceof VertxException);
assertTrue(err.getMessage().equals("Connection was closed"));
testComplete();
break;
}
});
testHttpClientResponseDecodeError(cont::complete, err -> assertTrue(err instanceof TooLongFrameException));
}

private void testHttpClientResponseDecodeError(Handler<Void> continuation, Handler<Throwable> errorHandler) throws Exception {
AtomicInteger status = new AtomicInteger();
AtomicBoolean connectionClosed = new AtomicBoolean();
client.request(requestOptions)
.onComplete(onSuccess(req -> {
req.send().onComplete(onSuccess(resp -> {
resp.exceptionHandler(errorHandler);
resp.request().connection().closeHandler(v -> {
connectionClosed.set(true);
if (status.get() == 1) {
testComplete();
}
});
resp.exceptionHandler(err -> {
int current = status.incrementAndGet();
if (current == 1) {
errorHandler.handle(err);
if (connectionClosed.get()) {
testComplete();
}
} else {
fail("Unexpected extra response exception callback: " + err);
}
});
continuation.handle(null);
}));
}));
Expand Down
Loading