@@ -1035,7 +1035,6 @@ public void givenObservabilityModeTrue_whenExtProcBusy_thenIsReadyReturnsFalse()
10351035
10361036 Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
10371037
1038- // Simulate sidecar is busy
10391038 Mockito .when (mockSidecarCall .isReady ()).thenReturn (false );
10401039
10411040 assertThat (proxyCall .isReady ()).isFalse ();
@@ -1082,10 +1081,8 @@ public void givenObservabilityModeFalse_whenExtProcBusy_thenIsReadyReturnsTrue()
10821081
10831082 Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
10841083
1085- // Sidecar is busy
10861084 Mockito .when (mockSidecarCall .isReady ()).thenReturn (false );
10871085
1088- // Should still be ready because observability_mode is false
10891086 assertThat (proxyCall .isReady ()).isTrue ();
10901087 }
10911088
@@ -1129,11 +1126,9 @@ public void givenRequestDrainActive_whenIsReadyCalled_thenReturnsFalse() throws
11291126
11301127 Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
11311128
1132- // Send request_drain: true
11331129 ProcessingResponse resp = ProcessingResponse .newBuilder ().setRequestDrain (true ).build ();
11341130 sidecarListenerCaptor .getValue ().onMessage (resp );
11351131
1136- // isReady() must return false during drain
11371132 assertThat (proxyCall .isReady ()).isFalse ();
11381133 }
11391134
@@ -1180,10 +1175,8 @@ public void givenCongestionInExtProc_whenExtProcBecomesReady_thenTriggersOnReady
11801175
11811176 Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
11821177
1183- // Trigger sidecar onReady
11841178 sidecarListenerCaptor .getValue ().onReady ();
11851179
1186- // Verify app listener notified
11871180 Mockito .verify (mockAppListener ).onReady ();
11881181 }
11891182
@@ -1229,18 +1222,255 @@ public void givenDrainingStream_whenExtProcStreamCompletes_thenTriggersOnReady()
12291222
12301223 Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
12311224
1232- // Enter drain
12331225 sidecarListenerCaptor .getValue ().onMessage (ProcessingResponse .newBuilder ().setRequestDrain (true ).build ());
12341226 assertThat (proxyCall .isReady ()).isFalse ();
12351227
1236- // Sidecar stream completes
12371228 sidecarListenerCaptor .getValue ().onClose (Status .OK , new Metadata ());
12381229
1239- // Verify app listener notified to resume flow
12401230 Mockito .verify (mockAppListener ).onReady ();
12411231 assertThat (proxyCall .isReady ()).isTrue ();
12421232 }
12431233
1234+ // --- Category 7: Inbound Backpressure (request(n) / pendingRequests) ---
1235+
1236+ @ Test
1237+ @ SuppressWarnings ("unchecked" )
1238+ public void givenObservabilityModeTrue_whenExtProcBusy_thenAppRequestsAreBuffered () throws Exception {
1239+ ExternalProcessor proto = ExternalProcessor .newBuilder ()
1240+ .setGrpcService (GrpcService .newBuilder ()
1241+ .setGoogleGrpc (GrpcService .GoogleGrpc .newBuilder ()
1242+ .setTargetUri ("in-process:///sidecar" )
1243+ .addChannelCredentialsPlugin (Any .newBuilder ()
1244+ .setTypeUrl ("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials" )
1245+ .build ())
1246+ .build ())
1247+ .build ())
1248+ .setObservabilityMode (true )
1249+ .build ();
1250+ ExternalProcessorFilterConfig filterConfig = provider .parseFilterConfig (Any .pack (proto ), filterContext ).config ;
1251+
1252+ ManagedChannel mockSidecarChannel = Mockito .mock (ManagedChannel .class );
1253+ ClientCall <ProcessingRequest , ProcessingResponse > mockSidecarCall = Mockito .mock (ClientCall .class );
1254+ Mockito .when (mockSidecarChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1255+ .thenReturn (mockSidecarCall );
1256+
1257+ CachedChannelManager mockChannelManager = Mockito .mock (CachedChannelManager .class );
1258+ Mockito .when (mockChannelManager .getChannel (Mockito .any ())).thenReturn (mockSidecarChannel );
1259+
1260+ ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor (
1261+ filterConfig , mockChannelManager , scheduler );
1262+
1263+ Channel mockNextChannel = Mockito .mock (Channel .class );
1264+ ClientCall <InputStream , InputStream > mockRawCall = Mockito .mock (ClientCall .class );
1265+ Mockito .when (mockNextChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1266+ .thenReturn (mockRawCall );
1267+ Mockito .when (mockRawCall .isReady ()).thenReturn (true );
1268+
1269+ // Sidecar is NOT ready
1270+ Mockito .when (mockSidecarCall .isReady ()).thenReturn (false );
1271+
1272+ CallOptions callOptions = CallOptions .DEFAULT .withExecutor (Executors .newSingleThreadExecutor ());
1273+ ClientCall <String , String > proxyCall = interceptor .interceptCall (METHOD_SAY_HELLO , callOptions , mockNextChannel );
1274+ proxyCall .start (Mockito .mock (ClientCall .Listener .class ), new Metadata ());
1275+
1276+ proxyCall .request (5 );
1277+
1278+ // Verify raw call NOT requested yet
1279+ Mockito .verify (mockRawCall , Mockito .never ()).request (Mockito .anyInt ());
1280+ }
1281+
1282+ @ Test
1283+ @ SuppressWarnings ("unchecked" )
1284+ public void givenObservabilityModeFalse_whenExtProcBusy_thenAppRequestsAreNOTBuffered () throws Exception {
1285+ ExternalProcessor proto = ExternalProcessor .newBuilder ()
1286+ .setGrpcService (GrpcService .newBuilder ()
1287+ .setGoogleGrpc (GrpcService .GoogleGrpc .newBuilder ()
1288+ .setTargetUri ("in-process:///sidecar" )
1289+ .addChannelCredentialsPlugin (Any .newBuilder ()
1290+ .setTypeUrl ("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials" )
1291+ .build ())
1292+ .build ())
1293+ .build ())
1294+ .setObservabilityMode (false )
1295+ .build ();
1296+ ExternalProcessorFilterConfig filterConfig = provider .parseFilterConfig (Any .pack (proto ), filterContext ).config ;
1297+
1298+ ManagedChannel mockSidecarChannel = Mockito .mock (ManagedChannel .class );
1299+ ClientCall <ProcessingRequest , ProcessingResponse > mockSidecarCall = Mockito .mock (ClientCall .class );
1300+ Mockito .when (mockSidecarChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1301+ .thenReturn (mockSidecarCall );
1302+
1303+ CachedChannelManager mockChannelManager = Mockito .mock (CachedChannelManager .class );
1304+ Mockito .when (mockChannelManager .getChannel (Mockito .any ())).thenReturn (mockSidecarChannel );
1305+
1306+ ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor (
1307+ filterConfig , mockChannelManager , scheduler );
1308+
1309+ Channel mockNextChannel = Mockito .mock (Channel .class );
1310+ ClientCall <InputStream , InputStream > mockRawCall = Mockito .mock (ClientCall .class );
1311+ Mockito .when (mockNextChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1312+ .thenReturn (mockRawCall );
1313+
1314+ // Sidecar is NOT ready
1315+ Mockito .when (mockSidecarCall .isReady ()).thenReturn (false );
1316+
1317+ CallOptions callOptions = CallOptions .DEFAULT .withExecutor (Executors .newSingleThreadExecutor ());
1318+ ClientCall <String , String > proxyCall = interceptor .interceptCall (METHOD_SAY_HELLO , callOptions , mockNextChannel );
1319+ proxyCall .start (Mockito .mock (ClientCall .Listener .class ), new Metadata ());
1320+
1321+ proxyCall .request (5 );
1322+
1323+ // Verify raw call requested immediately because obs_mode is false
1324+ Mockito .verify (mockRawCall ).request (5 );
1325+ }
1326+
1327+ @ Test
1328+ @ SuppressWarnings ("unchecked" )
1329+ public void givenRequestDrainActive_whenAppRequestsMessages_thenRequestsAreBuffered () throws Exception {
1330+ ExternalProcessor proto = ExternalProcessor .newBuilder ()
1331+ .setGrpcService (GrpcService .newBuilder ()
1332+ .setGoogleGrpc (GrpcService .GoogleGrpc .newBuilder ()
1333+ .setTargetUri ("in-process:///sidecar" )
1334+ .addChannelCredentialsPlugin (Any .newBuilder ()
1335+ .setTypeUrl ("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials" )
1336+ .build ())
1337+ .build ())
1338+ .build ())
1339+ .build ();
1340+ ExternalProcessorFilterConfig filterConfig = provider .parseFilterConfig (Any .pack (proto ), filterContext ).config ;
1341+
1342+ ManagedChannel mockSidecarChannel = Mockito .mock (ManagedChannel .class );
1343+ ClientCall <ProcessingRequest , ProcessingResponse > mockSidecarCall = Mockito .mock (ClientCall .class );
1344+ Mockito .when (mockSidecarChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1345+ .thenReturn (mockSidecarCall );
1346+
1347+ CachedChannelManager mockChannelManager = Mockito .mock (CachedChannelManager .class );
1348+ Mockito .when (mockChannelManager .getChannel (Mockito .any ())).thenReturn (mockSidecarChannel );
1349+
1350+ ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor (
1351+ filterConfig , mockChannelManager , scheduler );
1352+
1353+ Channel mockNextChannel = Mockito .mock (Channel .class );
1354+ ClientCall <InputStream , InputStream > mockRawCall = Mockito .mock (ClientCall .class );
1355+ Mockito .when (mockNextChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1356+ .thenReturn (mockRawCall );
1357+
1358+ ArgumentCaptor <ClientCall .Listener <ProcessingResponse >> sidecarListenerCaptor = ArgumentCaptor .forClass (ClientCall .Listener .class );
1359+
1360+ CallOptions callOptions = CallOptions .DEFAULT .withExecutor (Executors .newSingleThreadExecutor ());
1361+ ClientCall <String , String > proxyCall = interceptor .interceptCall (METHOD_SAY_HELLO , callOptions , mockNextChannel );
1362+ proxyCall .start (Mockito .mock (ClientCall .Listener .class ), new Metadata ());
1363+
1364+ Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
1365+
1366+ // Enter drain
1367+ sidecarListenerCaptor .getValue ().onMessage (ProcessingResponse .newBuilder ().setRequestDrain (true ).build ());
1368+
1369+ proxyCall .request (3 );
1370+
1371+ // Verify raw call NOT requested during drain
1372+ Mockito .verify (mockRawCall , Mockito .never ()).request (Mockito .anyInt ());
1373+ }
1374+
1375+ @ Test
1376+ @ SuppressWarnings ("unchecked" )
1377+ public void givenBufferedRequests_whenExtProcStreamBecomesReady_thenDataPlaneRequestIsDrained () throws Exception {
1378+ ExternalProcessor proto = ExternalProcessor .newBuilder ()
1379+ .setGrpcService (GrpcService .newBuilder ()
1380+ .setGoogleGrpc (GrpcService .GoogleGrpc .newBuilder ()
1381+ .setTargetUri ("in-process:///sidecar" )
1382+ .addChannelCredentialsPlugin (Any .newBuilder ()
1383+ .setTypeUrl ("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials" )
1384+ .build ())
1385+ .build ())
1386+ .build ())
1387+ .setObservabilityMode (true )
1388+ .build ();
1389+ ExternalProcessorFilterConfig filterConfig = provider .parseFilterConfig (Any .pack (proto ), filterContext ).config ;
1390+
1391+ ManagedChannel mockSidecarChannel = Mockito .mock (ManagedChannel .class );
1392+ ClientCall <ProcessingRequest , ProcessingResponse > mockSidecarCall = Mockito .mock (ClientCall .class );
1393+ Mockito .when (mockSidecarChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1394+ .thenReturn (mockSidecarCall );
1395+
1396+ CachedChannelManager mockChannelManager = Mockito .mock (CachedChannelManager .class );
1397+ Mockito .when (mockChannelManager .getChannel (Mockito .any ())).thenReturn (mockSidecarChannel );
1398+
1399+ ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor (
1400+ filterConfig , mockChannelManager , scheduler );
1401+
1402+ Channel mockNextChannel = Mockito .mock (Channel .class );
1403+ ClientCall <InputStream , InputStream > mockRawCall = Mockito .mock (ClientCall .class );
1404+ Mockito .when (mockNextChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1405+ .thenReturn (mockRawCall );
1406+ Mockito .when (mockRawCall .isReady ()).thenReturn (true );
1407+
1408+ // Start with sidecar NOT ready
1409+ Mockito .when (mockSidecarCall .isReady ()).thenReturn (false );
1410+
1411+ ArgumentCaptor <ClientCall .Listener <ProcessingResponse >> sidecarListenerCaptor = ArgumentCaptor .forClass (ClientCall .Listener .class );
1412+ CallOptions callOptions = CallOptions .DEFAULT .withExecutor (Executors .newSingleThreadExecutor ());
1413+ ClientCall <String , String > proxyCall = interceptor .interceptCall (METHOD_SAY_HELLO , callOptions , mockNextChannel );
1414+ proxyCall .start (Mockito .mock (ClientCall .Listener .class ), new Metadata ());
1415+ Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
1416+
1417+ proxyCall .request (10 );
1418+ Mockito .verify (mockRawCall , Mockito .never ()).request (Mockito .anyInt ());
1419+
1420+ // Sidecar becomes ready
1421+ Mockito .when (mockSidecarCall .isReady ()).thenReturn (true );
1422+ sidecarListenerCaptor .getValue ().onReady ();
1423+
1424+ // Verify buffered request drained
1425+ Mockito .verify (mockRawCall ).request (10 );
1426+ }
1427+
1428+ @ Test
1429+ @ SuppressWarnings ("unchecked" )
1430+ public void givenExtProcStreamCompleted_whenAppRequestsMessages_thenRequestsAreForwardedImmediately () throws Exception {
1431+ ExternalProcessor proto = ExternalProcessor .newBuilder ()
1432+ .setGrpcService (GrpcService .newBuilder ()
1433+ .setGoogleGrpc (GrpcService .GoogleGrpc .newBuilder ()
1434+ .setTargetUri ("in-process:///sidecar" )
1435+ .addChannelCredentialsPlugin (Any .newBuilder ()
1436+ .setTypeUrl ("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials" )
1437+ .build ())
1438+ .build ())
1439+ .build ())
1440+ .build ();
1441+ ExternalProcessorFilterConfig filterConfig = provider .parseFilterConfig (Any .pack (proto ), filterContext ).config ;
1442+
1443+ ManagedChannel mockSidecarChannel = Mockito .mock (ManagedChannel .class );
1444+ ClientCall <ProcessingRequest , ProcessingResponse > mockSidecarCall = Mockito .mock (ClientCall .class );
1445+ Mockito .when (mockSidecarChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1446+ .thenReturn (mockSidecarCall );
1447+
1448+ CachedChannelManager mockChannelManager = Mockito .mock (CachedChannelManager .class );
1449+ Mockito .when (mockChannelManager .getChannel (Mockito .any ())).thenReturn (mockSidecarChannel );
1450+
1451+ ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor (
1452+ filterConfig , mockChannelManager , scheduler );
1453+
1454+ Channel mockNextChannel = Mockito .mock (Channel .class );
1455+ ClientCall <InputStream , InputStream > mockRawCall = Mockito .mock (ClientCall .class );
1456+ Mockito .when (mockNextChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1457+ .thenReturn (mockRawCall );
1458+
1459+ ArgumentCaptor <ClientCall .Listener <ProcessingResponse >> sidecarListenerCaptor = ArgumentCaptor .forClass (ClientCall .Listener .class );
1460+ CallOptions callOptions = CallOptions .DEFAULT .withExecutor (Executors .newSingleThreadExecutor ());
1461+ ClientCall <String , String > proxyCall = interceptor .interceptCall (METHOD_SAY_HELLO , callOptions , mockNextChannel );
1462+ proxyCall .start (Mockito .mock (ClientCall .Listener .class ), new Metadata ());
1463+ Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
1464+
1465+ // Sidecar stream completes
1466+ sidecarListenerCaptor .getValue ().onClose (Status .OK , new Metadata ());
1467+
1468+ proxyCall .request (7 );
1469+
1470+ // Verify requested immediately after sidecar is gone
1471+ Mockito .verify (mockRawCall ).request (7 );
1472+ }
1473+
12441474 @ Test
12451475 public void requestHeadersMutated () throws Exception {
12461476 ExternalProcessor proto = ExternalProcessor .newBuilder ()
0 commit comments