2222import static org .junit .Assert .assertFalse ;
2323import static org .junit .Assert .assertNotEquals ;
2424import static org .junit .Assert .assertTrue ;
25- import static org .junit .Assert .fail ;
2625
2726import com .google .auth .oauth2 .AccessToken ;
2827import com .google .auth .oauth2 .ComputeEngineCredentials ;
4039import io .grpc .Grpc ;
4140import io .grpc .InsecureChannelCredentials ;
4241import io .grpc .InsecureServerCredentials ;
43- import io .grpc .InternalManagedChannelBuilder ;
4442import io .grpc .LoadBalancerProvider ;
4543import io .grpc .LoadBalancerRegistry ;
46- import io .grpc .LongUpDownCounterMetricInstrument ;
4744import io .grpc .ManagedChannel ;
4845import io .grpc .ManagedChannelBuilder ;
4946import io .grpc .Metadata ;
5047import io .grpc .MethodDescriptor ;
51- import io .grpc .MetricInstrument ;
52- import io .grpc .MetricSink ;
5348import io .grpc .ServerBuilder ;
5449import io .grpc .TlsChannelCredentials ;
5550import io .grpc .alts .AltsChannelCredentials ;
5853import io .grpc .auth .MoreCallCredentials ;
5954import io .grpc .internal .GrpcUtil ;
6055import io .grpc .internal .JsonParser ;
61- import io .grpc .internal .testing .StreamRecorder ;
6256import io .grpc .netty .InsecureFromHttp1ChannelCredentials ;
6357import io .grpc .netty .InternalNettyChannelBuilder ;
6458import io .grpc .netty .NettyChannelBuilder ;
7266import io .grpc .testing .integration .Messages .ResponseParameters ;
7367import io .grpc .testing .integration .Messages .SimpleRequest ;
7468import io .grpc .testing .integration .Messages .SimpleResponse ;
75- import io .grpc .testing .integration .Messages .StreamingInputCallRequest ;
76- import io .grpc .testing .integration .Messages .StreamingInputCallResponse ;
7769import io .grpc .testing .integration .Messages .StreamingOutputCallRequest ;
7870import io .grpc .testing .integration .Messages .StreamingOutputCallResponse ;
7971import io .grpc .testing .integration .Messages .TestOrcaReport ;
8375import java .io .InputStream ;
8476import java .nio .charset .Charset ;
8577import java .util .Arrays ;
86- import java .util .List ;
8778import java .util .Map ;
88- import java .util .Set ;
8979import java .util .concurrent .BlockingQueue ;
9080import java .util .concurrent .LinkedBlockingQueue ;
9181import java .util .concurrent .TimeUnit ;
@@ -612,12 +602,10 @@ private ClientInterceptor maybeCreateAdditionalMetadataInterceptor(
612602 }
613603
614604 private class Tester extends AbstractInteropTest {
615- private FakeMetricsSink fakeMetricsSink = new FakeMetricsSink ();
616605
617606 @ Override
618607 protected ManagedChannelBuilder <?> createChannelBuilder () {
619- boolean useSubchannelMetricsSink = testCase .equals (MCS_CS .toString ());
620- boolean useGeneric = testCase .equals (MCS_CS .toString ())? true : false ;
608+ boolean useGeneric = testCase .equals (MCS_CS .toString ()) ? true : false ;
621609 ChannelCredentials channelCredentials ;
622610 if (customCredentialsType != null ) {
623611 useGeneric = true ; // Retain old behavior; avoids erroring if incompatible
@@ -694,9 +682,6 @@ protected ManagedChannelBuilder<?> createChannelBuilder() {
694682 if (addMdInterceptor != null ) {
695683 channelBuilder .intercept (addMdInterceptor );
696684 }
697- if (useSubchannelMetricsSink ) {
698- InternalManagedChannelBuilder .addMetricSink (channelBuilder , fakeMetricsSink );
699- }
700685 return channelBuilder ;
701686 }
702687 if (!useOkHttp ) {
@@ -1070,7 +1055,8 @@ protected int operationTimeoutMillis() {
10701055 return 15000 ;
10711056 }
10721057
1073- class StreamingOutputCallResponseObserver implements StreamObserver <StreamingOutputCallResponse > {
1058+ class StreamingOutputCallResponseObserver implements
1059+ StreamObserver <StreamingOutputCallResponse > {
10741060 private final BlockingQueue <Object > queue = new LinkedBlockingQueue <>();
10751061 private volatile boolean isCompleted = true ;
10761062
@@ -1101,30 +1087,35 @@ public void testMcs() throws Exception {
11011087 asyncStub .fullDuplexCall (responseObserver1 );
11021088 StreamingOutputCallRequest request = StreamingOutputCallRequest .newBuilder ()
11031089 .setPayload (Payload .newBuilder ().setBody (
1104- ByteString .copyFrom (MCS_CS .description (). getBytes ())).build ()).build ();
1090+ ByteString .copyFromUtf8 (MCS_CS .description ())).build ()).build ();
11051091 streamObserver1 .onNext (request );
11061092 Object responseObj = responseObserver1 .take ();
11071093 StreamingOutputCallResponse callResponse = (StreamingOutputCallResponse ) responseObj ;
1108- String clientSocketAddressInCall1 = new String (callResponse .getPayload ().getBody ().toByteArray ());
1094+ String clientSocketAddressInCall1 = new String (callResponse .getPayload ().getBody ()
1095+ .toByteArray (), UTF_8 );
11091096 assertThat (clientSocketAddressInCall1 ).isNotEmpty ();
11101097
1111- StreamingOutputCallResponseObserver responseObserver2 = new StreamingOutputCallResponseObserver ();
1098+ StreamingOutputCallResponseObserver responseObserver2 =
1099+ new StreamingOutputCallResponseObserver ();
11121100 StreamObserver <StreamingOutputCallRequest > streamObserver2 =
11131101 asyncStub .fullDuplexCall (responseObserver2 );
11141102 streamObserver2 .onNext (request );
11151103 callResponse = (StreamingOutputCallResponse ) responseObserver2 .take ();
1116- String clientSocketAddressInCall2 = new String (callResponse .getPayload ().getBody ().toByteArray ());
1104+ String clientSocketAddressInCall2 =
1105+ new String (callResponse .getPayload ().getBody ().toByteArray (), UTF_8 );
11171106
11181107 assertThat (clientSocketAddressInCall1 ).isEqualTo (clientSocketAddressInCall2 );
11191108
11201109 // The first connection is at max rpc call count of 2, so the 3rd rpc will cause a new
11211110 // connection to be created in the same subchannel and not get queued.
1122- StreamingOutputCallResponseObserver responseObserver3 = new StreamingOutputCallResponseObserver ();
1111+ StreamingOutputCallResponseObserver responseObserver3 =
1112+ new StreamingOutputCallResponseObserver ();
11231113 StreamObserver <StreamingOutputCallRequest > streamObserver3 =
11241114 asyncStub .fullDuplexCall (responseObserver3 );
11251115 streamObserver3 .onNext (request );
11261116 callResponse = (StreamingOutputCallResponse ) responseObserver3 .take ();
1127- String clientSocketAddressInCall3 = new String (callResponse .getPayload ().getBody ().toByteArray ());
1117+ String clientSocketAddressInCall3 =
1118+ new String (callResponse .getPayload ().getBody ().toByteArray (), UTF_8 );
11281119
11291120 assertThat (clientSocketAddressInCall3 ).isNotEqualTo (clientSocketAddressInCall1 );
11301121
@@ -1135,8 +1126,7 @@ public void testMcs() throws Exception {
11351126 streamObserver3 .onCompleted ();
11361127 assertThat (responseObserver3 .isCompleted ).isTrue ();
11371128 }
1138-
1139- }
1129+ }
11401130
11411131 private static String validTestCasesHelpText () {
11421132 StringBuilder builder = new StringBuilder ();
@@ -1149,39 +1139,4 @@ private static String validTestCasesHelpText() {
11491139 }
11501140 return builder .toString ();
11511141 }
1152-
1153- static class FakeMetricsSink implements MetricSink {
1154- private volatile long openConnectionCount ;
1155-
1156- @ Override
1157- public Map <String , Boolean > getEnabledMetrics () {
1158- return null ;
1159- }
1160-
1161- @ Override
1162- public Set <String > getOptionalLabels () {
1163- return null ;
1164- }
1165-
1166- @ Override
1167- public int getMeasuresSize () {
1168- return 0 ;
1169- }
1170-
1171- @ Override
1172- public void updateMeasures (List <MetricInstrument > instruments ) {}
1173-
1174- @ Override
1175- public void addLongUpDownCounter (LongUpDownCounterMetricInstrument metricInstrument , long value ,
1176- List <String > requiredLabelValues ,
1177- List <String > optionalLabelValues ) {
1178- if (metricInstrument .getName ().equals ("grpc.subchannel.open_connections" )) {
1179- openConnectionCount = value ;
1180- }
1181- }
1182-
1183- synchronized long getOpenConnectionCount () {
1184- return openConnectionCount ;
1185- }
1186- }
11871142}
0 commit comments