4141import io .grpc .inprocess .InProcessChannelBuilder ;
4242import io .grpc .inprocess .InProcessServerBuilder ;
4343import io .grpc .stub .ClientCalls ;
44+ import io .grpc .stub .ClientCallStreamObserver ;
45+ import io .grpc .stub .ClientResponseObserver ;
4446import io .grpc .stub .ServerCalls ;
4547import io .grpc .stub .StreamObserver ;
4648import io .grpc .testing .GrpcCleanupRule ;
@@ -411,10 +413,12 @@ public void givenRequestHeaderModeSend_whenStartCalled_thenExtProcReceivesHeader
411413
412414 proxyCall .start (Mockito .mock (ClientCall .Listener .class ), new Metadata ());
413415
416+ // Verify headers sent to sidecar
414417 ArgumentCaptor <ProcessingRequest > requestCaptor = ArgumentCaptor .forClass (ProcessingRequest .class );
415418 Mockito .verify (mockSidecarCall ).sendMessage (requestCaptor .capture ());
416419 assertThat (requestCaptor .getValue ().hasRequestHeaders ()).isTrue ();
417420
421+ // Verify main call NOT yet started
418422 Mockito .verify (mockRawCall , Mockito .never ()).start (Mockito .any (), Mockito .any ());
419423 }
420424
@@ -462,6 +466,7 @@ public void givenRequestHeaderModeSend_whenExtProcRespondsWithMutations_thenMuta
462466
463467 Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
464468
469+ // Simulate sidecar response with header mutation
465470 ProcessingResponse resp = ProcessingResponse .newBuilder ()
466471 .setRequestHeaders (HeadersResponse .newBuilder ()
467472 .setResponse (CommonResponse .newBuilder ()
@@ -477,6 +482,7 @@ public void givenRequestHeaderModeSend_whenExtProcRespondsWithMutations_thenMuta
477482
478483 sidecarListenerCaptor .getValue ().onMessage (resp );
479484
485+ // Verify mutations applied and call started
480486 assertThat (headers .get (Metadata .Key .of ("x-mutated" , Metadata .ASCII_STRING_MARSHALLER ))).isEqualTo ("true" );
481487 Mockito .verify (mockRawCall ).start (Mockito .any (), Mockito .eq (headers ));
482488 }
@@ -521,7 +527,10 @@ public void givenRequestHeaderModeSkip_whenStartCalled_thenDataPlaneCallIsActiva
521527 Metadata headers = new Metadata ();
522528 proxyCall .start (Mockito .mock (ClientCall .Listener .class ), headers );
523529
530+ // Verify main call started immediately
524531 Mockito .verify (mockRawCall ).start (Mockito .any (), Mockito .eq (headers ));
532+
533+ // Verify sidecar NOT messaged about headers
525534 Mockito .verify (mockSidecarCall , Mockito .never ()).sendMessage (Mockito .any ());
526535 }
527536
@@ -845,10 +854,8 @@ public void givenResponseBodyModeGrpc_whenOnMessageCalled_thenMessageIsSentToExt
845854
846855 Mockito .verify (mockRawCall ).start (rawListenerCaptor .capture (), Mockito .any ());
847856
848- // Simulate server response message
849857 rawListenerCaptor .getValue ().onMessage (new ByteArrayInputStream ("Server Message" .getBytes ()));
850858
851- // Verify sent to sidecar
852859 ArgumentCaptor <ProcessingRequest > requestCaptor = ArgumentCaptor .forClass (ProcessingRequest .class );
853860 Mockito .verify (mockSidecarCall ).sendMessage (requestCaptor .capture ());
854861 assertThat (requestCaptor .getValue ().hasResponseBody ()).isTrue ();
@@ -903,7 +910,6 @@ public void givenResponseBodyModeGrpc_whenExtProcRespondsWithMutatedBody_thenMut
903910
904911 rawListenerCaptor .getValue ().onMessage (new ByteArrayInputStream ("Original" .getBytes ()));
905912
906- // Simulate sidecar response with mutated body
907913 ProcessingResponse resp = ProcessingResponse .newBuilder ()
908914 .setResponseBody (BodyResponse .newBuilder ()
909915 .setResponse (CommonResponse .newBuilder ()
@@ -917,7 +923,6 @@ public void givenResponseBodyModeGrpc_whenExtProcRespondsWithMutatedBody_thenMut
917923 .build ();
918924 sidecarListenerCaptor .getValue ().onMessage (resp );
919925
920- // Verify app listener received mutated body
921926 Mockito .verify (mockAppListener ).onMessage ("Mutated Server" );
922927 }
923928
@@ -967,13 +972,10 @@ public void givenResponseBodyModeGrpc_whenExtProcRespondsWithEndOfStream_thenCli
967972 Mockito .verify (mockRawCall ).start (rawListenerCaptor .capture (), Mockito .any ());
968973 Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
969974
970- // Simulate server closing call
971975 rawListenerCaptor .getValue ().onClose (Status .OK , new Metadata ());
972976
973- // Verify app listener NOT closed yet (waiting for sidecar EOS)
974977 Mockito .verify (mockAppListener , Mockito .never ()).onClose (Mockito .any (), Mockito .any ());
975978
976- // Sidecar confirms EOS
977979 ProcessingResponse resp = ProcessingResponse .newBuilder ()
978980 .setResponseBody (BodyResponse .newBuilder ()
979981 .setResponse (CommonResponse .newBuilder ()
@@ -987,10 +989,258 @@ public void givenResponseBodyModeGrpc_whenExtProcRespondsWithEndOfStream_thenCli
987989 .build ();
988990 sidecarListenerCaptor .getValue ().onMessage (resp );
989991
990- // Verify app listener finally closed
991992 Mockito .verify (mockAppListener ).onClose (Mockito .eq (Status .OK ), Mockito .any ());
992993 }
993994
995+ // --- Category 6: Outbound Backpressure (isReady / onReady) ---
996+
997+ @ Test
998+ @ SuppressWarnings ("unchecked" )
999+ public void givenObservabilityModeTrue_whenExtProcBusy_thenIsReadyReturnsFalse () throws Exception {
1000+ ExternalProcessor proto = ExternalProcessor .newBuilder ()
1001+ .setGrpcService (GrpcService .newBuilder ()
1002+ .setGoogleGrpc (GrpcService .GoogleGrpc .newBuilder ()
1003+ .setTargetUri ("in-process:///sidecar" )
1004+ .addChannelCredentialsPlugin (Any .newBuilder ()
1005+ .setTypeUrl ("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials" )
1006+ .build ())
1007+ .build ())
1008+ .build ())
1009+ .setObservabilityMode (true )
1010+ .build ();
1011+ ExternalProcessorFilterConfig filterConfig = provider .parseFilterConfig (Any .pack (proto ), filterContext ).config ;
1012+
1013+ ManagedChannel mockSidecarChannel = Mockito .mock (ManagedChannel .class );
1014+ ClientCall <ProcessingRequest , ProcessingResponse > mockSidecarCall = Mockito .mock (ClientCall .class );
1015+ Mockito .when (mockSidecarChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1016+ .thenReturn (mockSidecarCall );
1017+
1018+ CachedChannelManager mockChannelManager = Mockito .mock (CachedChannelManager .class );
1019+ Mockito .when (mockChannelManager .getChannel (Mockito .any ())).thenReturn (mockSidecarChannel );
1020+
1021+ ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor (
1022+ filterConfig , mockChannelManager , scheduler );
1023+
1024+ Channel mockNextChannel = Mockito .mock (Channel .class );
1025+ ClientCall <InputStream , InputStream > mockRawCall = Mockito .mock (ClientCall .class );
1026+ Mockito .when (mockNextChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1027+ .thenReturn (mockRawCall );
1028+ Mockito .when (mockRawCall .isReady ()).thenReturn (true );
1029+
1030+ ArgumentCaptor <ClientCall .Listener <ProcessingResponse >> sidecarListenerCaptor = ArgumentCaptor .forClass (ClientCall .Listener .class );
1031+
1032+ CallOptions callOptions = CallOptions .DEFAULT .withExecutor (Executors .newSingleThreadExecutor ());
1033+ ClientCall <String , String > proxyCall = interceptor .interceptCall (METHOD_SAY_HELLO , callOptions , mockNextChannel );
1034+ proxyCall .start (Mockito .mock (ClientCall .Listener .class ), new Metadata ());
1035+
1036+ Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
1037+
1038+ // Simulate sidecar is busy
1039+ Mockito .when (mockSidecarCall .isReady ()).thenReturn (false );
1040+
1041+ assertThat (proxyCall .isReady ()).isFalse ();
1042+ }
1043+
1044+ @ Test
1045+ @ SuppressWarnings ("unchecked" )
1046+ public void givenObservabilityModeFalse_whenExtProcBusy_thenIsReadyReturnsTrue () throws Exception {
1047+ ExternalProcessor proto = ExternalProcessor .newBuilder ()
1048+ .setGrpcService (GrpcService .newBuilder ()
1049+ .setGoogleGrpc (GrpcService .GoogleGrpc .newBuilder ()
1050+ .setTargetUri ("in-process:///sidecar" )
1051+ .addChannelCredentialsPlugin (Any .newBuilder ()
1052+ .setTypeUrl ("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials" )
1053+ .build ())
1054+ .build ())
1055+ .build ())
1056+ .setObservabilityMode (false )
1057+ .build ();
1058+ ExternalProcessorFilterConfig filterConfig = provider .parseFilterConfig (Any .pack (proto ), filterContext ).config ;
1059+
1060+ ManagedChannel mockSidecarChannel = Mockito .mock (ManagedChannel .class );
1061+ ClientCall <ProcessingRequest , ProcessingResponse > mockSidecarCall = Mockito .mock (ClientCall .class );
1062+ Mockito .when (mockSidecarChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1063+ .thenReturn (mockSidecarCall );
1064+
1065+ CachedChannelManager mockChannelManager = Mockito .mock (CachedChannelManager .class );
1066+ Mockito .when (mockChannelManager .getChannel (Mockito .any ())).thenReturn (mockSidecarChannel );
1067+
1068+ ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor (
1069+ filterConfig , mockChannelManager , scheduler );
1070+
1071+ Channel mockNextChannel = Mockito .mock (Channel .class );
1072+ ClientCall <InputStream , InputStream > mockRawCall = Mockito .mock (ClientCall .class );
1073+ Mockito .when (mockNextChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1074+ .thenReturn (mockRawCall );
1075+ Mockito .when (mockRawCall .isReady ()).thenReturn (true );
1076+
1077+ ArgumentCaptor <ClientCall .Listener <ProcessingResponse >> sidecarListenerCaptor = ArgumentCaptor .forClass (ClientCall .Listener .class );
1078+
1079+ CallOptions callOptions = CallOptions .DEFAULT .withExecutor (Executors .newSingleThreadExecutor ());
1080+ ClientCall <String , String > proxyCall = interceptor .interceptCall (METHOD_SAY_HELLO , callOptions , mockNextChannel );
1081+ proxyCall .start (Mockito .mock (ClientCall .Listener .class ), new Metadata ());
1082+
1083+ Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
1084+
1085+ // Sidecar is busy
1086+ Mockito .when (mockSidecarCall .isReady ()).thenReturn (false );
1087+
1088+ // Should still be ready because observability_mode is false
1089+ assertThat (proxyCall .isReady ()).isTrue ();
1090+ }
1091+
1092+ @ Test
1093+ @ SuppressWarnings ("unchecked" )
1094+ public void givenRequestDrainActive_whenIsReadyCalled_thenReturnsFalse () throws Exception {
1095+ ExternalProcessor proto = ExternalProcessor .newBuilder ()
1096+ .setGrpcService (GrpcService .newBuilder ()
1097+ .setGoogleGrpc (GrpcService .GoogleGrpc .newBuilder ()
1098+ .setTargetUri ("in-process:///sidecar" )
1099+ .addChannelCredentialsPlugin (Any .newBuilder ()
1100+ .setTypeUrl ("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials" )
1101+ .build ())
1102+ .build ())
1103+ .build ())
1104+ .build ();
1105+ ExternalProcessorFilterConfig filterConfig = provider .parseFilterConfig (Any .pack (proto ), filterContext ).config ;
1106+
1107+ ManagedChannel mockSidecarChannel = Mockito .mock (ManagedChannel .class );
1108+ ClientCall <ProcessingRequest , ProcessingResponse > mockSidecarCall = Mockito .mock (ClientCall .class );
1109+ Mockito .when (mockSidecarChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1110+ .thenReturn (mockSidecarCall );
1111+
1112+ CachedChannelManager mockChannelManager = Mockito .mock (CachedChannelManager .class );
1113+ Mockito .when (mockChannelManager .getChannel (Mockito .any ())).thenReturn (mockSidecarChannel );
1114+
1115+ ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor (
1116+ filterConfig , mockChannelManager , scheduler );
1117+
1118+ Channel mockNextChannel = Mockito .mock (Channel .class );
1119+ ClientCall <InputStream , InputStream > mockRawCall = Mockito .mock (ClientCall .class );
1120+ Mockito .when (mockNextChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1121+ .thenReturn (mockRawCall );
1122+ Mockito .when (mockRawCall .isReady ()).thenReturn (true );
1123+
1124+ ArgumentCaptor <ClientCall .Listener <ProcessingResponse >> sidecarListenerCaptor = ArgumentCaptor .forClass (ClientCall .Listener .class );
1125+
1126+ CallOptions callOptions = CallOptions .DEFAULT .withExecutor (Executors .newSingleThreadExecutor ());
1127+ ClientCall <String , String > proxyCall = interceptor .interceptCall (METHOD_SAY_HELLO , callOptions , mockNextChannel );
1128+ proxyCall .start (Mockito .mock (ClientCall .Listener .class ), new Metadata ());
1129+
1130+ Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
1131+
1132+ // Send request_drain: true
1133+ ProcessingResponse resp = ProcessingResponse .newBuilder ().setRequestDrain (true ).build ();
1134+ sidecarListenerCaptor .getValue ().onMessage (resp );
1135+
1136+ // isReady() must return false during drain
1137+ assertThat (proxyCall .isReady ()).isFalse ();
1138+ }
1139+
1140+ @ Test
1141+ @ SuppressWarnings ("unchecked" )
1142+ public void givenCongestionInExtProc_whenExtProcBecomesReady_thenTriggersOnReady () throws Exception {
1143+ ExternalProcessor proto = ExternalProcessor .newBuilder ()
1144+ .setGrpcService (GrpcService .newBuilder ()
1145+ .setGoogleGrpc (GrpcService .GoogleGrpc .newBuilder ()
1146+ .setTargetUri ("in-process:///sidecar" )
1147+ .addChannelCredentialsPlugin (Any .newBuilder ()
1148+ .setTypeUrl ("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials" )
1149+ .build ())
1150+ .build ())
1151+ .build ())
1152+ .setObservabilityMode (true )
1153+ .build ();
1154+ ExternalProcessorFilterConfig filterConfig = provider .parseFilterConfig (Any .pack (proto ), filterContext ).config ;
1155+
1156+ ManagedChannel mockSidecarChannel = Mockito .mock (ManagedChannel .class );
1157+ ClientCall <ProcessingRequest , ProcessingResponse > mockSidecarCall = Mockito .mock (ClientCall .class );
1158+ Mockito .when (mockSidecarChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1159+ .thenReturn (mockSidecarCall );
1160+
1161+ CachedChannelManager mockChannelManager = Mockito .mock (CachedChannelManager .class );
1162+ Mockito .when (mockChannelManager .getChannel (Mockito .any ())).thenReturn (mockSidecarChannel );
1163+
1164+ ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor (
1165+ filterConfig , mockChannelManager , scheduler );
1166+
1167+ Channel mockNextChannel = Mockito .mock (Channel .class );
1168+ ClientCall <InputStream , InputStream > mockRawCall = Mockito .mock (ClientCall .class );
1169+ Mockito .when (mockNextChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1170+ .thenReturn (mockRawCall );
1171+ Mockito .when (mockRawCall .isReady ()).thenReturn (true );
1172+ Mockito .when (mockSidecarCall .isReady ()).thenReturn (true );
1173+
1174+ ArgumentCaptor <ClientCall .Listener <ProcessingResponse >> sidecarListenerCaptor = ArgumentCaptor .forClass (ClientCall .Listener .class );
1175+ ClientCall .Listener <String > mockAppListener = Mockito .mock (ClientCall .Listener .class );
1176+
1177+ CallOptions callOptions = CallOptions .DEFAULT .withExecutor (Executors .newSingleThreadExecutor ());
1178+ ClientCall <String , String > proxyCall = interceptor .interceptCall (METHOD_SAY_HELLO , callOptions , mockNextChannel );
1179+ proxyCall .start (mockAppListener , new Metadata ());
1180+
1181+ Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
1182+
1183+ // Trigger sidecar onReady
1184+ sidecarListenerCaptor .getValue ().onReady ();
1185+
1186+ // Verify app listener notified
1187+ Mockito .verify (mockAppListener ).onReady ();
1188+ }
1189+
1190+ @ Test
1191+ @ SuppressWarnings ("unchecked" )
1192+ public void givenDrainingStream_whenExtProcStreamCompletes_thenTriggersOnReady () throws Exception {
1193+ ExternalProcessor proto = ExternalProcessor .newBuilder ()
1194+ .setGrpcService (GrpcService .newBuilder ()
1195+ .setGoogleGrpc (GrpcService .GoogleGrpc .newBuilder ()
1196+ .setTargetUri ("in-process:///sidecar" )
1197+ .addChannelCredentialsPlugin (Any .newBuilder ()
1198+ .setTypeUrl ("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials" )
1199+ .build ())
1200+ .build ())
1201+ .build ())
1202+ .build ();
1203+ ExternalProcessorFilterConfig filterConfig = provider .parseFilterConfig (Any .pack (proto ), filterContext ).config ;
1204+
1205+ ManagedChannel mockSidecarChannel = Mockito .mock (ManagedChannel .class );
1206+ ClientCall <ProcessingRequest , ProcessingResponse > mockSidecarCall = Mockito .mock (ClientCall .class );
1207+ Mockito .when (mockSidecarChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1208+ .thenReturn (mockSidecarCall );
1209+
1210+ CachedChannelManager mockChannelManager = Mockito .mock (CachedChannelManager .class );
1211+ Mockito .when (mockChannelManager .getChannel (Mockito .any ())).thenReturn (mockSidecarChannel );
1212+
1213+ ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor (
1214+ filterConfig , mockChannelManager , scheduler );
1215+
1216+ Channel mockNextChannel = Mockito .mock (Channel .class );
1217+ ClientCall <InputStream , InputStream > mockRawCall = Mockito .mock (ClientCall .class );
1218+ Mockito .when (mockNextChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1219+ .thenReturn (mockRawCall );
1220+ Mockito .when (mockRawCall .isReady ()).thenReturn (true );
1221+ Mockito .when (mockSidecarCall .isReady ()).thenReturn (true );
1222+
1223+ ArgumentCaptor <ClientCall .Listener <ProcessingResponse >> sidecarListenerCaptor = ArgumentCaptor .forClass (ClientCall .Listener .class );
1224+ ClientCall .Listener <String > mockAppListener = Mockito .mock (ClientCall .Listener .class );
1225+
1226+ CallOptions callOptions = CallOptions .DEFAULT .withExecutor (Executors .newSingleThreadExecutor ());
1227+ ClientCall <String , String > proxyCall = interceptor .interceptCall (METHOD_SAY_HELLO , callOptions , mockNextChannel );
1228+ proxyCall .start (mockAppListener , new Metadata ());
1229+
1230+ Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
1231+
1232+ // Enter drain
1233+ sidecarListenerCaptor .getValue ().onMessage (ProcessingResponse .newBuilder ().setRequestDrain (true ).build ());
1234+ assertThat (proxyCall .isReady ()).isFalse ();
1235+
1236+ // Sidecar stream completes
1237+ sidecarListenerCaptor .getValue ().onClose (Status .OK , new Metadata ());
1238+
1239+ // Verify app listener notified to resume flow
1240+ Mockito .verify (mockAppListener ).onReady ();
1241+ assertThat (proxyCall .isReady ()).isTrue ();
1242+ }
1243+
9941244 @ Test
9951245 public void requestHeadersMutated () throws Exception {
9961246 ExternalProcessor proto = ExternalProcessor .newBuilder ()
0 commit comments