Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void exceptionCaught(ChannelHandlerContext chctx, final Throwable t) {
close = true;
}
if (close) {
chctx.close();
chctx.channel().close();
}
}

Expand Down
27 changes: 27 additions & 0 deletions vertx-core/src/test/java/io/vertx/tests/net/NetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.*;
import io.vertx.core.net.impl.HAProxyMessageCompletionHandler;
import io.vertx.core.net.impl.VertxConnection;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.net.impl.tcp.CleanableNetClient;
import io.vertx.core.internal.net.NetServerInternal;
Expand Down Expand Up @@ -4641,4 +4642,30 @@ public void testCloseServerUnwritableSocket(Checkpoint checkpoint1, Checkpoint c
TestUtils.assertWaitUntil(closing::get);
TestUtils.assertWaitUntil(closed::get);
}

@Test
public void testFlushPendingWriteOnExceptionClose() throws Exception {
server.connectHandler(so -> {
so.exceptionHandler(err -> {});
so.handler(data -> {
VertxConnection conn = (VertxConnection) so;
// A response queued on the connection but not yet flushed (the state produced when a
// response is written while a read is in progress), then a pipeline failure that tears
// the connection down.
conn.unsafeWrite(Unpooled.copiedBuffer("pending-response", StandardCharsets.UTF_8), false);
conn.channelHandlerContext().pipeline().fireExceptionCaught(new RuntimeException("boom"));
});
});
startServer();
Buffer received = Buffer.buffer();
CompletableFuture<String> result = new CompletableFuture<>();
NetSocket socket = client.connect(testAddress).await();
socket.handler(received::appendBuffer);
socket.closeHandler(v -> result.complete(received.toString()));
socket.exceptionHandler(result::completeExceptionally);
socket.write("ping").await();
// With the fix the pending response is flushed before the connection closes; without it the
// client only sees the close (Netty discards the unflushed entry).
assertEquals("pending-response", result.get(20, TimeUnit.SECONDS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -922,4 +922,27 @@ public void testResumeWhenReadInProgress() {
outbound = ch.readOutbound();
assertSame(expected, outbound);
}

@Test
public void testFlushPendingWriteOnExceptionClose() {
EmbeddedChannel ch = new EmbeddedChannel();
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(VertxHandler.create(chctx -> new TestConnection(chctx)));
TestConnection connection = (TestConnection) pipeline.get(VertxHandler.class).getConnection();
connection.exceptionHandler(err -> {});
// Queue a write without flushing it: this is the state produced when a response is written
// while a read is in progress (e.g. an inbound codec failing mid-decode, which is how a
// request-body decompression failure surfaces). The message sits in the channel outbound
// buffer as an unflushed entry.
Object response = new Object();
connection.unsafeWrite(response, false);
assertNull(ch.readOutbound());
// The failure surfaces as a caught exception that asks the connection to close. The close
// must flush the pending write before closing the channel, otherwise Netty's
// ChannelOutboundBuffer discards the unflushed entry and the peer never sees the response.
pipeline.fireExceptionCaught(new RuntimeException());
ch.runPendingTasks();
assertSame(response, ch.readOutbound());
assertFalse(ch.isOpen());
}
}
Loading