2323import io .grpc .stub .ServerCallStreamObserver ;
2424import io .grpc .stub .StreamObserver ;
2525import io .vertx .core .Promise ;
26+ import io .vertx .core .Vertx ;
27+ import io .vertx .core .net .NetClient ;
28+ import io .vertx .core .net .NetServer ;
29+ import io .vertx .core .net .NetSocket ;
2630import io .vertx .ext .unit .Async ;
2731import io .vertx .ext .unit .TestContext ;
2832import io .vertx .grpc .server .GrpcServer ;
2933import io .vertx .grpc .server .GrpcServiceBridge ;
3034import org .junit .Ignore ;
3135import java .io .IOException ;
36+ import java .net .InetSocketAddress ;
37+ import java .net .SocketAddress ;
38+ import java .util .ArrayList ;
3239import java .util .Collections ;
33- import java .util .concurrent .TimeUnit ;
40+ import java .util .List ;
41+ import java .util .Map ;
42+ import org .jetbrains .annotations .Nullable ;
3443import org .junit .Test ;
3544
3645import java .util .concurrent .atomic .AtomicInteger ;
@@ -492,7 +501,7 @@ public void testCallNetworkInterrupted(TestContext should) throws InterruptedExc
492501 StreamingGrpc .StreamingImplBase impl = new StreamingImplBase () {
493502 @ Override
494503 public StreamObserver <Item > pipe (StreamObserver <Item > responseObserver ) {
495- return new StreamObserver <Item >() {
504+ return new StreamObserver <>() {
496505 @ Override
497506 public void onNext (Item item ) {
498507 requestCount .incrementAndGet ();
@@ -518,42 +527,92 @@ public void onCompleted() {
518527 serverStub .bind (server );
519528 startServer (server );
520529
521- Process client = ProcessHelper .exec (ServerBridgeTest .class , Collections .singletonList (String .valueOf (port )));
522- // waiting for doing request
523- Thread .sleep (1_000 );
524- client .destroy ();
525- client .waitFor ();
530+ try (var proxyServer = new ProxyServer (vertx , port + 1 , port )) {
531+ proxyServer .start ();
532+
533+ int proxyPort = proxyServer .proxyServer .actualPort ();
534+ Channel channel = ManagedChannelBuilder .forAddress ("localhost" , proxyPort ).usePlaintext ().build ();
535+ StreamingGrpc .StreamingStub stub = StreamingGrpc .newStub (channel );
536+ StreamObserver <Item > requestObserver = stub .pipe (new NoopStreamObserver <>());
537+ Item request = Item .newBuilder ().setValue ("item" ).build ();
538+ requestObserver .onNext (request );
539+ requestObserver .onNext (request );
540+ requestObserver .onNext (request );
541+
542+ // waiting for the connection to be established.
543+ Thread .sleep (1000 );
544+ }
526545
527546 async .await (20_000 );
528547
529548 should .assertEquals (requestCount .get (), 3 );
530549 should .assertTrue (completed .future ().failed ());
531550 }
532551
533- public static void main (String ... args ) throws InterruptedException {
534- StreamObserver <Item > noop = new StreamObserver <Item >() {
535- @ Override public void onNext (Item item ) {
552+ static class NoopStreamObserver <T > implements StreamObserver <T > {
553+ @ Override public void onNext (T ignored ) {}
536554
537- }
555+ @ Override public void onError ( Throwable ignored ) { }
538556
539- @ Override public void onError (Throwable throwable ) {
557+ @ Override public void onCompleted () {}
558+ }
540559
541- }
560+ static class ProxyServer implements AutoCloseable {
542561
543- @ Override public void onCompleted () {
562+ private final int listenPort ;
544563
545- }
546- };
564+ private final int targetPort ;
565+
566+ private final NetServer proxyServer ;
547567
548- Channel channel = ManagedChannelBuilder .forAddress ("localhost" , Integer .parseInt (args [0 ])).usePlaintext ().build ();
549- StreamingGrpc .StreamingStub stub = StreamingGrpc .newStub (channel );
550- StreamObserver <Item > requestObserver = stub .pipe (noop );
551- Item request = Item .newBuilder ().setValue ("item" ).build ();
552- requestObserver .onNext (request );
553- requestObserver .onNext (request );
554- requestObserver .onNext (request );
568+ private final NetClient proxyClient ;
555569
556- // waiting to be killed
557- Thread .currentThread ().join ();
570+ // live or dead
571+ private final List <Map .Entry <NetSocket , NetSocket >> sockets = new ArrayList <>();
572+
573+ ProxyServer (Vertx vertx , int listenPort , int targetPort ) {
574+ this .listenPort = listenPort ;
575+ this .targetPort = targetPort ;
576+ this .proxyServer = vertx .createNetServer ().connectHandler (this ::handle );
577+ this .proxyClient = vertx .createNetClient ();
578+ }
579+
580+ void start () {
581+ this .proxyServer .listen (listenPort ).toCompletionStage ().toCompletableFuture ().join ();
582+ }
583+
584+ void handle (NetSocket socket ) {
585+ socket .pause ();
586+
587+ proxyClient .connect (targetPort , "localhost" )
588+ .onComplete (ar -> {
589+ if (ar .succeeded ()) {
590+ NetSocket proxySocket = ar .result ();
591+ proxySocket .pause ();
592+
593+ socket .handler (proxySocket ::write );
594+ proxySocket .handler (socket ::write );
595+ socket .closeHandler (ignored -> proxySocket .close ());
596+ proxySocket .closeHandler (ignored -> socket .close ());
597+
598+ sockets .add (Map .entry (socket , proxySocket ));
599+
600+ proxySocket .resume ();
601+ socket .resume ();
602+ } else {
603+ socket .close ();
604+ }
605+ });
606+ }
607+
608+ @ Override
609+ public void close () {
610+ this .sockets .forEach (entry -> {
611+ entry .getKey ().close ();
612+ entry .getValue ().close ();
613+ });
614+ this .proxyClient .close ();
615+ this .proxyServer .close ();
616+ }
558617 }
559618}
0 commit comments