diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServiceBridgeImpl.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServiceBridgeImpl.java index 490db9ef4..41df346b0 100644 --- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServiceBridgeImpl.java +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServiceBridgeImpl.java @@ -25,6 +25,7 @@ import io.grpc.ServerServiceDefinition; import io.grpc.Status; import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClosedException; import io.vertx.core.net.SocketAddress; import io.vertx.grpc.common.GrpcError; import io.vertx.grpc.common.GrpcStatus; @@ -152,6 +153,11 @@ void init(ServerCall.Listener listener) { listener.onCancel(); } }); + req.exceptionHandler(throwable -> { + if (throwable instanceof HttpClosedException && !closed) { + listener.onCancel(); + } + }); readAdapter.init(req, new BridgeMessageDecoder<>(methodDef.getMethodDescriptor().getRequestMarshaller(), decompressor)); writeAdapter.init(req.response(), new BridgeMessageEncoder<>(methodDef.getMethodDescriptor().getResponseMarshaller(), compressor)); } diff --git a/vertx-grpc-server/src/test/java/io/vertx/tests/server/ServerBridgeTest.java b/vertx-grpc-server/src/test/java/io/vertx/tests/server/ServerBridgeTest.java index 23b913798..6874497c4 100644 --- a/vertx-grpc-server/src/test/java/io/vertx/tests/server/ServerBridgeTest.java +++ b/vertx-grpc-server/src/test/java/io/vertx/tests/server/ServerBridgeTest.java @@ -17,13 +17,29 @@ import io.grpc.examples.streaming.Empty; import io.grpc.examples.streaming.Item; import io.grpc.examples.streaming.StreamingGrpc; +import io.grpc.protobuf.StatusProto; +import io.grpc.examples.streaming.StreamingGrpc.StreamingImplBase; +import io.grpc.protobuf.StatusProto; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetServer; +import io.vertx.core.net.NetSocket; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.grpc.server.GrpcServer; import io.vertx.grpc.server.GrpcServiceBridge; import org.junit.Ignore; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.jetbrains.annotations.Nullable; import org.junit.Test; import java.util.concurrent.atomic.AtomicInteger; @@ -475,4 +491,128 @@ public ServerCall.Listener interceptCall(ServerCall completed = Promise.promise(); + Async async = should.async(); + + StreamingGrpc.StreamingImplBase impl = new StreamingImplBase() { + @Override + public StreamObserver pipe(StreamObserver responseObserver) { + return new StreamObserver<>() { + @Override + public void onNext(Item item) { + requestCount.incrementAndGet(); + } + + @Override + public void onError(Throwable throwable) { + completed.fail(throwable); + async.complete(); + } + + @Override + public void onCompleted() { + completed.complete(); + async.complete(); + } + }; + } + }; + + GrpcServer server = GrpcServer.server(vertx); + GrpcServiceBridge serverStub = GrpcServiceBridge.bridge(impl); + serverStub.bind(server); + startServer(server); + + try (var proxyServer = new ProxyServer(vertx, port + 1, port)) { + proxyServer.start(); + + int proxyPort = proxyServer.proxyServer.actualPort(); + Channel channel = ManagedChannelBuilder.forAddress("localhost", proxyPort).usePlaintext().build(); + StreamingGrpc.StreamingStub stub = StreamingGrpc.newStub(channel); + StreamObserver requestObserver = stub.pipe(new NoopStreamObserver<>()); + Item request = Item.newBuilder().setValue("item").build(); + requestObserver.onNext(request); + requestObserver.onNext(request); + requestObserver.onNext(request); + + // waiting for the connection to be established. + Thread.sleep(1000); + } + + async.await(20_000); + + should.assertEquals(requestCount.get(), 3); + should.assertTrue(completed.future().failed()); + } + + static class NoopStreamObserver implements StreamObserver { + @Override public void onNext(T ignored) {} + + @Override public void onError(Throwable ignored) {} + + @Override public void onCompleted() {} + } + + static class ProxyServer implements AutoCloseable { + + private final int listenPort; + + private final int targetPort; + + private final NetServer proxyServer; + + private final NetClient proxyClient; + + // live or dead + private final List> sockets = new ArrayList<>(); + + ProxyServer(Vertx vertx, int listenPort, int targetPort) { + this.listenPort = listenPort; + this.targetPort = targetPort; + this.proxyServer = vertx.createNetServer().connectHandler(this::handle); + this.proxyClient = vertx.createNetClient(); + } + + void start() { + this.proxyServer.listen(listenPort).toCompletionStage().toCompletableFuture().join(); + } + + void handle(NetSocket socket) { + socket.pause(); + + proxyClient.connect(targetPort, "localhost") + .onComplete(ar -> { + if (ar.succeeded()) { + NetSocket proxySocket = ar.result(); + proxySocket.pause(); + + socket.handler(proxySocket::write); + proxySocket.handler(socket::write); + socket.closeHandler(ignored -> proxySocket.close()); + proxySocket.closeHandler(ignored -> socket.close()); + + sockets.add(Map.entry(socket, proxySocket)); + + proxySocket.resume(); + socket.resume(); + } else { + socket.close(); + } + }); + } + + @Override + public void close() { + this.sockets.forEach(entry -> { + entry.getKey().close(); + entry.getValue().close(); + }); + this.proxyClient.close(); + this.proxyServer.close(); + } + } }