Skip to content

Commit 9ae4189

Browse files
committed
GrpcServiceBridgeImpl handle HttpClosedException
fixes #101
1 parent 03b4cd5 commit 9ae4189

3 files changed

Lines changed: 116 additions & 0 deletions

File tree

vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServiceBridgeImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.grpc.ServerServiceDefinition;
2626
import io.grpc.Status;
2727
import io.vertx.core.Vertx;
28+
import io.vertx.core.http.HttpClosedException;
2829
import io.vertx.core.net.SocketAddress;
2930
import io.vertx.grpc.common.GrpcError;
3031
import io.vertx.grpc.common.GrpcStatus;
@@ -152,6 +153,11 @@ void init(ServerCall.Listener<Req> listener) {
152153
listener.onCancel();
153154
}
154155
});
156+
req.exceptionHandler(throwable -> {
157+
if (throwable instanceof HttpClosedException && !closed) {
158+
listener.onCancel();
159+
}
160+
});
155161
readAdapter.init(req, new BridgeMessageDecoder<>(methodDef.getMethodDescriptor().getRequestMarshaller(), decompressor));
156162
writeAdapter.init(req.response(), new BridgeMessageEncoder<>(methodDef.getMethodDescriptor().getResponseMarshaller(), compressor));
157163
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.vertx.grpc.server;
2+
3+
import java.io.File;
4+
import java.io.IOException;
5+
import java.util.ArrayList;
6+
import java.util.Collections;
7+
import java.util.List;
8+
import java.util.Objects;
9+
10+
public class ProcessHelper {
11+
12+
private ProcessHelper() {}
13+
14+
public static Process exec(Class<?> main, List<String> args) throws IOException {
15+
16+
List<String> command = new ArrayList<>();
17+
// java binary executable
18+
command.add(System.getProperty("java.home") + File.separator + "bin" + File.separator + "java");
19+
command.add("-cp");
20+
// inherit classpath
21+
command.add(System.getProperty("java.class.path"));
22+
// main class name
23+
command.add(main.getName());
24+
// args
25+
command.addAll(Objects.requireNonNullElse(args, Collections.emptyList()));
26+
27+
return new ProcessBuilder(command).inheritIO().start();
28+
}
29+
}

vertx-grpc-server/src/test/java/io/vertx/tests/server/ServerBridgeTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,20 @@
1717
import io.grpc.examples.streaming.Empty;
1818
import io.grpc.examples.streaming.Item;
1919
import io.grpc.examples.streaming.StreamingGrpc;
20+
import io.grpc.protobuf.StatusProto;
21+
import io.grpc.examples.streaming.StreamingGrpc.StreamingImplBase;
22+
import io.grpc.protobuf.StatusProto;
2023
import io.grpc.stub.ServerCallStreamObserver;
2124
import io.grpc.stub.StreamObserver;
25+
import io.vertx.core.Promise;
2226
import io.vertx.ext.unit.Async;
2327
import io.vertx.ext.unit.TestContext;
2428
import io.vertx.grpc.server.GrpcServer;
2529
import io.vertx.grpc.server.GrpcServiceBridge;
2630
import org.junit.Ignore;
31+
import java.io.IOException;
32+
import java.util.Collections;
33+
import java.util.concurrent.TimeUnit;
2734
import org.junit.Test;
2835

2936
import java.util.concurrent.atomic.AtomicInteger;
@@ -475,4 +482,78 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
475482
HelloReply res = stub.sayHello(request);
476483
should.assertEquals(1, testAttributesStep.get());
477484
}
485+
486+
@Test
487+
public void testCallNetworkInterrupted(TestContext should) throws InterruptedException, IOException {
488+
AtomicInteger requestCount = new AtomicInteger();
489+
Promise<Void> completed = Promise.promise();
490+
Async async = should.async();
491+
492+
StreamingGrpc.StreamingImplBase impl = new StreamingImplBase() {
493+
@Override
494+
public StreamObserver<Item> pipe(StreamObserver<Item> responseObserver) {
495+
return new StreamObserver<Item>() {
496+
@Override
497+
public void onNext(Item item) {
498+
requestCount.incrementAndGet();
499+
}
500+
501+
@Override
502+
public void onError(Throwable throwable) {
503+
completed.fail(throwable);
504+
async.complete();
505+
}
506+
507+
@Override
508+
public void onCompleted() {
509+
completed.complete();
510+
async.complete();
511+
}
512+
};
513+
}
514+
};
515+
516+
GrpcServer server = GrpcServer.server(vertx);
517+
GrpcServiceBridge serverStub = GrpcServiceBridge.bridge(impl);
518+
serverStub.bind(server);
519+
startServer(server);
520+
521+
Process client = ProcessHelper.exec(ServerBridgeTest.class, Collections.singletonList(String.valueOf(port)));
522+
// waiting for doing request
523+
Thread.sleep(1_000);
524+
client.destroy();
525+
client.waitFor();
526+
527+
async.await(20_000);
528+
529+
should.assertEquals(requestCount.get(), 3);
530+
should.assertTrue(completed.future().failed());
531+
}
532+
533+
public static void main(String... args) throws InterruptedException {
534+
StreamObserver<Item> noop = new StreamObserver<Item>() {
535+
@Override public void onNext(Item item) {
536+
537+
}
538+
539+
@Override public void onError(Throwable throwable) {
540+
541+
}
542+
543+
@Override public void onCompleted() {
544+
545+
}
546+
};
547+
548+
Channel channel = ManagedChannelBuilder.forAddress("localhost", Integer.parseInt(args[0])).usePlaintext().build();
549+
StreamingGrpc.StreamingStub stub = StreamingGrpc.newStub(channel);
550+
StreamObserver<Item> requestObserver = stub.pipe(noop);
551+
Item request = Item.newBuilder().setValue("item").build();
552+
requestObserver.onNext(request);
553+
requestObserver.onNext(request);
554+
requestObserver.onNext(request);
555+
556+
// waiting to be killed
557+
Thread.currentThread().join();
558+
}
478559
}

0 commit comments

Comments
 (0)