@@ -1020,8 +1020,9 @@ public void testOrcaOob() throws Exception {
10201020 .build ();
10211021
10221022 final int retryLimit = 5 ;
1023+ final Object lastItem = new Object ();
10231024 StreamingOutputCallResponseObserver streamingOutputCallResponseObserver =
1024- new StreamingOutputCallResponseObserver ();
1025+ new StreamingOutputCallResponseObserver (lastItem );
10251026 StreamObserver <StreamingOutputCallRequest > streamObserver =
10261027 asyncStub .fullDuplexCall (streamingOutputCallResponseObserver );
10271028
@@ -1054,7 +1055,7 @@ public void testOrcaOob() throws Exception {
10541055 }
10551056 assertThat (i ).isLessThan (retryLimit );
10561057 streamObserver .onCompleted ();
1057- assertThat (streamingOutputCallResponseObserver .verifiedCompleted ()).isTrue ( );
1058+ assertThat (streamingOutputCallResponseObserver .take ()).isSameInstanceAs ( lastItem );
10581059 }
10591060
10601061 @ Override
@@ -1084,9 +1085,13 @@ protected int operationTimeoutMillis() {
10841085
10851086 class StreamingOutputCallResponseObserver implements
10861087 StreamObserver <StreamingOutputCallResponse > {
1087- private final Object lastItem = new Object () ;
1088+ private final Object lastItem ;
10881089 private final BlockingQueue <Object > queue = new LinkedBlockingQueue <>();
10891090
1091+ public StreamingOutputCallResponseObserver (Object lastItem ) {
1092+ this .lastItem = lastItem ;
1093+ }
1094+
10901095 @ Override
10911096 public void onNext (StreamingOutputCallResponse value ) {
10921097 queue .add (value );
@@ -1105,59 +1110,56 @@ public void onCompleted() {
11051110 Object take () throws InterruptedException {
11061111 return queue .take ();
11071112 }
1108-
1109- boolean verifiedCompleted () throws InterruptedException {
1110- return queue .take () == lastItem ;
1111- }
11121113 }
11131114
11141115 public void testMcs (TestServiceGrpc .TestServiceStub asyncStub ) throws Exception {
1116+ final Object lastItem = new Object ();
11151117 StreamingOutputCallResponseObserver responseObserver1 =
1116- new StreamingOutputCallResponseObserver ();
1118+ new StreamingOutputCallResponseObserver (lastItem );
11171119 StreamObserver <StreamingOutputCallRequest > streamObserver1 =
11181120 asyncStub .fullDuplexCall (responseObserver1 );
11191121 StreamingOutputCallRequest request = StreamingOutputCallRequest .newBuilder ()
11201122 .addResponseParameters (ResponseParameters .newBuilder ()
1121- .setSendClientSocketAddressInResponse (
1123+ .setFillPeerSocketAddress (
11221124 Messages .BoolValue .newBuilder ().setValue (true ).build ())
11231125 .build ())
11241126 .build ();
11251127 streamObserver1 .onNext (request );
11261128 Object responseObj = responseObserver1 .take ();
11271129 StreamingOutputCallResponse callResponse = (StreamingOutputCallResponse ) responseObj ;
1128- String clientSocketAddressInCall1 = callResponse .getClientSocketAddress ();
1130+ String clientSocketAddressInCall1 = callResponse .getPeerSocketAddress ();
11291131 assertThat (clientSocketAddressInCall1 ).isNotEmpty ();
11301132
11311133 StreamingOutputCallResponseObserver responseObserver2 =
1132- new StreamingOutputCallResponseObserver ();
1134+ new StreamingOutputCallResponseObserver (lastItem );
11331135 StreamObserver <StreamingOutputCallRequest > streamObserver2 =
11341136 asyncStub .fullDuplexCall (responseObserver2 );
11351137 streamObserver2 .onNext (request );
11361138 callResponse = (StreamingOutputCallResponse ) responseObserver2 .take ();
1137- String clientSocketAddressInCall2 = callResponse .getClientSocketAddress ();
1139+ String clientSocketAddressInCall2 = callResponse .getPeerSocketAddress ();
11381140
11391141 assertThat (clientSocketAddressInCall1 ).isEqualTo (clientSocketAddressInCall2 );
11401142
11411143 // The first connection is at max rpc call count of 2, so the 3rd rpc will cause a new
11421144 // connection to be created in the same subchannel and not get queued.
11431145 StreamingOutputCallResponseObserver responseObserver3 =
1144- new StreamingOutputCallResponseObserver ();
1146+ new StreamingOutputCallResponseObserver (lastItem );
11451147 StreamObserver <StreamingOutputCallRequest > streamObserver3 =
11461148 asyncStub .fullDuplexCall (responseObserver3 );
11471149 streamObserver3 .onNext (request );
11481150 callResponse = (StreamingOutputCallResponse ) responseObserver3 .take ();
1149- String clientSocketAddressInCall3 = callResponse .getClientSocketAddress ();
1151+ String clientSocketAddressInCall3 = callResponse .getPeerSocketAddress ();
11501152
11511153 // This assertion is currently failing because connection scaling when MCS limit has been
11521154 // reached is not yet implemented in gRPC Java.
11531155 assertThat (clientSocketAddressInCall3 ).isNotEqualTo (clientSocketAddressInCall1 );
11541156
11551157 streamObserver1 .onCompleted ();
1156- assertThat (responseObserver1 .verifiedCompleted ()).isTrue ();
11571158 streamObserver2 .onCompleted ();
1158- assertThat (responseObserver2 .verifiedCompleted ()).isTrue ();
11591159 streamObserver3 .onCompleted ();
1160- assertThat (responseObserver3 .verifiedCompleted ()).isTrue ();
1160+ assertThat (responseObserver1 .take ()).isSameInstanceAs (lastItem );
1161+ assertThat (responseObserver2 .take ()).isSameInstanceAs (lastItem );
1162+ assertThat (responseObserver3 .take ()).isSameInstanceAs (lastItem );
11611163 }
11621164 }
11631165
0 commit comments