Skip to content

Commit a937b81

Browse files
asalajanAlexandru Salajan
authored andcommitted
Flush pending writes before closing the connection on caught exceptions
Motivation: When an exception is caught on a connection, VertxHandler#exceptionCaught dispatches the throwable to the user handlers and then closes the channel with chctx.close(). A response written from the handler -- for instance a 400 written from request().exceptionHandler() when HttpContentDecompressor throws a DecoderException on a malformed compressed body -- is enqueued in the outbound message queue but not flushed, because VertxConnection#write skips the flush while a read is in progress. chctx.close() then causes Netty to discard the unflushed entries from the ChannelOutboundBuffer, so the response never reaches the client: the client observes a connection reset instead of the intended HTTP response. Changes: Route the exception-driven close through the channel so it traverses the pipeline tail and invokes VertxHandler#close -> VertxConnection#writeClose, which flushes pending writes (an empty buffer with forceFlush=true) and closes the channel only once the flush has completed. This is the same flush-then-close path already used by graceful shutdown, idle close and the regular close(); the exception path simply was not using it. The bare chctx.close() is kept as a fallback for the case where no connection has been set yet (handlerAdded has not run), since VertxHandler#close dereferences it. Add a regression test, Http1xTest#testRequestDecompressionInvalidBodyDeliversErrorResponse, that sends a malformed gzip body to a server with decompression enabled and asserts the client receives the response written from the request exception handler. HTTP/2 is unaffected: its codec replaces VertxHandler in the pipeline, so connection-level exceptions do not go through this path. Signed-off-by: Alexandru Salajan <alexandru17@gmail.com>
1 parent d4e4bfa commit a937b81

2 files changed

Lines changed: 53 additions & 1 deletion

File tree

vertx-core/src/main/java/io/vertx/core/net/impl/VertxHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,15 @@ public void exceptionCaught(ChannelHandlerContext chctx, final Throwable t) {
151151
close = true;
152152
}
153153
if (close) {
154-
chctx.close();
154+
if (connection != null) {
155+
// Route the close through the pipeline tail so VertxHandler#close -> VertxConnection#writeClose
156+
// flushes any pending writes (e.g. a response written from the user's exception handler) before
157+
// closing the channel. A bare chctx.close() would let Netty's ChannelOutboundBuffer discard the
158+
// unflushed entries.
159+
chctx.channel().close();
160+
} else {
161+
chctx.close();
162+
}
155163
}
156164
}
157165

vertx-core/src/test/java/io/vertx/tests/http/Http1xTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2875,6 +2875,50 @@ public void testContentDecompression() throws Exception {
28752875
.await();
28762876
}
28772877

2878+
@Test
2879+
// Regression test: when an inbound request body fails to decompress, the application's
2880+
// exception handler must still be able to deliver an error response to the client. Before
2881+
// the VertxHandler#exceptionCaught flush-before-close fix, the response written from the
2882+
// exception handler was discarded and the client only saw a connection reset.
2883+
public void testRequestDecompressionInvalidBodyDeliversErrorResponse() throws Exception {
2884+
server = vertx.createHttpServer(new HttpServerOptions().setDecompressionSupported(true));
2885+
// A valid gzip header (the first 10 bytes) followed by a corrupted deflate payload, so that
2886+
// Netty's HttpContentDecompressor accepts the stream as gzip and then fails while decoding it.
2887+
byte[] corrupted = TestUtils.compressGzip(TestUtils.randomAlphaString(256));
2888+
for (int i = 10; i < corrupted.length; i++) {
2889+
corrupted[i] = (byte) ~corrupted[i];
2890+
}
2891+
server.requestHandler(req -> {
2892+
req.exceptionHandler(err -> {
2893+
if (!req.response().ended()) {
2894+
req.response().setStatusCode(400).end("decode-failed");
2895+
}
2896+
});
2897+
// Should not be reached for a malformed body.
2898+
req.bodyHandler(body -> req.response().end());
2899+
});
2900+
startServer(testAddress);
2901+
NetClient netClient = vertx.createNetClient();
2902+
CompletableFuture<String> result = new CompletableFuture<>();
2903+
netClient.connect(testAddress).onComplete(TestUtils.onSuccess(so -> {
2904+
Buffer respBuff = Buffer.buffer();
2905+
so.handler(respBuff::appendBuffer);
2906+
so.closeHandler(v -> result.complete(respBuff.toString()));
2907+
so.exceptionHandler(result::completeExceptionally);
2908+
Buffer request = Buffer.buffer()
2909+
.appendString("PUT /somepath HTTP/1.1\r\n")
2910+
.appendString("Host: localhost\r\n")
2911+
.appendString("Content-Encoding: gzip\r\n")
2912+
.appendString("Content-Length: " + corrupted.length + "\r\n")
2913+
.appendString("\r\n")
2914+
.appendBytes(corrupted);
2915+
so.write(request);
2916+
}));
2917+
String resp = result.get(20, TimeUnit.SECONDS);
2918+
assertTrue("Expected the response to start with \"HTTP/1.1 400\" but got: [" + resp + "]", resp.startsWith("HTTP/1.1 400"));
2919+
assertTrue("Expected the error body in the response but got: [" + resp + "]", resp.contains("decode-failed"));
2920+
}
2921+
28782922
@Test
28792923
public void testResetClientRequestNotYetSent(Checkpoint checkpoint) throws Exception {
28802924
testResetClientRequestNotYetSent(checkpoint, false, false);

0 commit comments

Comments
 (0)