Skip to content

Commit 9e67ddd

Browse files
committed
netty: Propagate first handshake failure from buffering handler
When a handshake failure occurs before any writes are buffered on the server side, WriteBufferingAndExceptionHandler can record the failure internally but never surface it to downstream inbound handlers. This makes the original handshake error unobservable and complicates debugging and instrumentation. Propagate only the first failure via exceptionCaught, gated on the absence of a previous failure, so that the canonical error becomes observable while avoiding duplicate propagation and preserving existing close semantics.
1 parent 9d5842b commit 9e67ddd

2 files changed

Lines changed: 63 additions & 0 deletions

File tree

netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
9999
// 4c. active, prev!=null[handlerRemoved]: channel will be closed out-of-band by buffered write.
100100
// 4d. active, prev!=null[connect]: impossible, channel can't be active after a failed connect.
101101
if (ctx.channel().isActive() && previousFailure == null) {
102+
ctx.fireExceptionCaught(cause);
103+
102104
final class LogOnFailure implements ChannelFutureListener {
103105
@Override
104106
public void operationComplete(ChannelFuture future) {

netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,11 @@
3131
import io.netty.channel.Channel;
3232
import io.netty.channel.ChannelDuplexHandler;
3333
import io.netty.channel.ChannelFuture;
34+
import io.netty.channel.ChannelHandler;
3435
import io.netty.channel.ChannelHandlerAdapter;
3536
import io.netty.channel.ChannelHandlerContext;
37+
import io.netty.channel.ChannelInboundHandlerAdapter;
38+
import io.netty.channel.ChannelInitializer;
3639
import io.netty.channel.ChannelOutboundHandlerAdapter;
3740
import io.netty.channel.ChannelPromise;
3841
import io.netty.channel.DefaultEventLoop;
@@ -41,6 +44,7 @@
4144
import io.netty.channel.local.LocalChannel;
4245
import io.netty.channel.local.LocalServerChannel;
4346
import java.net.ConnectException;
47+
import java.util.concurrent.CountDownLatch;
4448
import java.util.concurrent.TimeUnit;
4549
import java.util.concurrent.atomic.AtomicBoolean;
4650
import java.util.concurrent.atomic.AtomicInteger;
@@ -381,4 +385,61 @@ public void uncaughtReadFails() throws Exception {
381385
assertThat(status.getDescription()).contains("channelRead() missed");
382386
}
383387
}
388+
389+
@Test
390+
public void handshakeFailure_isPropagatedOnce() throws Exception {
391+
AtomicInteger exceptionCount = new AtomicInteger();
392+
CountDownLatch latch = new CountDownLatch(1);
393+
394+
ChannelHandler observer =
395+
new ChannelInboundHandlerAdapter() {
396+
@Override
397+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
398+
exceptionCount.incrementAndGet();
399+
latch.countDown();
400+
}
401+
};
402+
403+
WriteBufferingAndExceptionHandler handler =
404+
new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {});
405+
406+
LocalAddress addr = new LocalAddress("local");
407+
408+
ChannelFuture cf =
409+
new Bootstrap()
410+
.channel(LocalChannel.class)
411+
.group(group)
412+
.handler(
413+
new ChannelInitializer<Channel>() {
414+
@Override
415+
protected void initChannel(Channel ch) {
416+
ch.pipeline().addLast(handler);
417+
ch.pipeline().addLast(observer);
418+
}
419+
})
420+
.register();
421+
422+
chan = cf.channel();
423+
cf.sync();
424+
425+
ChannelFuture sf =
426+
new ServerBootstrap()
427+
.group(group)
428+
.channel(LocalServerChannel.class)
429+
.childHandler(new ChannelInboundHandlerAdapter() {})
430+
.bind(addr);
431+
server = sf.channel();
432+
sf.sync();
433+
434+
chan.connect(addr).sync();
435+
436+
RuntimeException handshakeFailure =
437+
Status.UNAVAILABLE.withDescription("handshake failed").asRuntimeException();
438+
439+
chan.pipeline().fireExceptionCaught(handshakeFailure);
440+
chan.pipeline().fireExceptionCaught(new RuntimeException("Second"));
441+
442+
assertTrue(latch.await(5, TimeUnit.SECONDS));
443+
assertThat(exceptionCount.get()).isEqualTo(1);
444+
}
384445
}

0 commit comments

Comments
 (0)