1515import io .envoyproxy .envoy .service .ext_proc .v3 .ExternalProcessorGrpc ;
1616import io .envoyproxy .envoy .service .ext_proc .v3 .HeaderMutation ;
1717import io .envoyproxy .envoy .service .ext_proc .v3 .HeadersResponse ;
18+ import io .envoyproxy .envoy .service .ext_proc .v3 .ImmediateResponse ;
1819import io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest ;
1920import io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingResponse ;
2021import io .envoyproxy .envoy .service .ext_proc .v3 .StreamedBodyResponse ;
@@ -1266,7 +1267,6 @@ public void givenObservabilityModeTrue_whenExtProcBusy_thenAppRequestsAreBuffere
12661267 .thenReturn (mockRawCall );
12671268 Mockito .when (mockRawCall .isReady ()).thenReturn (true );
12681269
1269- // Sidecar is NOT ready
12701270 Mockito .when (mockSidecarCall .isReady ()).thenReturn (false );
12711271
12721272 CallOptions callOptions = CallOptions .DEFAULT .withExecutor (Executors .newSingleThreadExecutor ());
@@ -1275,7 +1275,6 @@ public void givenObservabilityModeTrue_whenExtProcBusy_thenAppRequestsAreBuffere
12751275
12761276 proxyCall .request (5 );
12771277
1278- // Verify raw call NOT requested yet
12791278 Mockito .verify (mockRawCall , Mockito .never ()).request (Mockito .anyInt ());
12801279 }
12811280
@@ -1311,7 +1310,6 @@ public void givenObservabilityModeFalse_whenExtProcBusy_thenAppRequestsAreNOTBuf
13111310 Mockito .when (mockNextChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
13121311 .thenReturn (mockRawCall );
13131312
1314- // Sidecar is NOT ready
13151313 Mockito .when (mockSidecarCall .isReady ()).thenReturn (false );
13161314
13171315 CallOptions callOptions = CallOptions .DEFAULT .withExecutor (Executors .newSingleThreadExecutor ());
@@ -1320,7 +1318,6 @@ public void givenObservabilityModeFalse_whenExtProcBusy_thenAppRequestsAreNOTBuf
13201318
13211319 proxyCall .request (5 );
13221320
1323- // Verify raw call requested immediately because obs_mode is false
13241321 Mockito .verify (mockRawCall ).request (5 );
13251322 }
13261323
@@ -1363,12 +1360,10 @@ public void givenRequestDrainActive_whenAppRequestsMessages_thenRequestsAreBuffe
13631360
13641361 Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
13651362
1366- // Enter drain
13671363 sidecarListenerCaptor .getValue ().onMessage (ProcessingResponse .newBuilder ().setRequestDrain (true ).build ());
13681364
13691365 proxyCall .request (3 );
13701366
1371- // Verify raw call NOT requested during drain
13721367 Mockito .verify (mockRawCall , Mockito .never ()).request (Mockito .anyInt ());
13731368 }
13741369
@@ -1405,7 +1400,6 @@ public void givenBufferedRequests_whenExtProcStreamBecomesReady_thenDataPlaneReq
14051400 .thenReturn (mockRawCall );
14061401 Mockito .when (mockRawCall .isReady ()).thenReturn (true );
14071402
1408- // Start with sidecar NOT ready
14091403 Mockito .when (mockSidecarCall .isReady ()).thenReturn (false );
14101404
14111405 ArgumentCaptor <ClientCall .Listener <ProcessingResponse >> sidecarListenerCaptor = ArgumentCaptor .forClass (ClientCall .Listener .class );
@@ -1417,11 +1411,9 @@ public void givenBufferedRequests_whenExtProcStreamBecomesReady_thenDataPlaneReq
14171411 proxyCall .request (10 );
14181412 Mockito .verify (mockRawCall , Mockito .never ()).request (Mockito .anyInt ());
14191413
1420- // Sidecar becomes ready
14211414 Mockito .when (mockSidecarCall .isReady ()).thenReturn (true );
14221415 sidecarListenerCaptor .getValue ().onReady ();
14231416
1424- // Verify buffered request drained
14251417 Mockito .verify (mockRawCall ).request (10 );
14261418 }
14271419
@@ -1462,15 +1454,228 @@ public void givenExtProcStreamCompleted_whenAppRequestsMessages_thenRequestsAreF
14621454 proxyCall .start (Mockito .mock (ClientCall .Listener .class ), new Metadata ());
14631455 Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
14641456
1465- // Sidecar stream completes
14661457 sidecarListenerCaptor .getValue ().onClose (Status .OK , new Metadata ());
14671458
14681459 proxyCall .request (7 );
14691460
1470- // Verify requested immediately after sidecar is gone
14711461 Mockito .verify (mockRawCall ).request (7 );
14721462 }
14731463
1464+ // --- Category 8: Error Handling & Security ---
1465+
1466+ @ Test
1467+ @ SuppressWarnings ("unchecked" )
1468+ public void givenFailureModeAllowFalse_whenExtProcStreamFails_thenDataPlaneCallIsCancelled () throws Exception {
1469+ ExternalProcessor proto = ExternalProcessor .newBuilder ()
1470+ .setGrpcService (GrpcService .newBuilder ()
1471+ .setGoogleGrpc (GrpcService .GoogleGrpc .newBuilder ()
1472+ .setTargetUri ("in-process:///sidecar" )
1473+ .addChannelCredentialsPlugin (Any .newBuilder ()
1474+ .setTypeUrl ("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials" )
1475+ .build ())
1476+ .build ())
1477+ .build ())
1478+ .setFailureModeAllow (false ) // Fail Closed
1479+ .build ();
1480+ ExternalProcessorFilterConfig filterConfig = provider .parseFilterConfig (Any .pack (proto ), filterContext ).config ;
1481+
1482+ ManagedChannel mockSidecarChannel = Mockito .mock (ManagedChannel .class );
1483+ ClientCall <ProcessingRequest , ProcessingResponse > mockSidecarCall = Mockito .mock (ClientCall .class );
1484+ Mockito .when (mockSidecarChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1485+ .thenReturn (mockSidecarCall );
1486+
1487+ CachedChannelManager mockChannelManager = Mockito .mock (CachedChannelManager .class );
1488+ Mockito .when (mockChannelManager .getChannel (Mockito .any ())).thenReturn (mockSidecarChannel );
1489+
1490+ ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor (
1491+ filterConfig , mockChannelManager , scheduler );
1492+
1493+ Channel mockNextChannel = Mockito .mock (Channel .class );
1494+ ClientCall <InputStream , InputStream > mockRawCall = Mockito .mock (ClientCall .class );
1495+ Mockito .when (mockNextChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1496+ .thenReturn (mockRawCall );
1497+
1498+ ArgumentCaptor <ClientCall .Listener <ProcessingResponse >> sidecarListenerCaptor = ArgumentCaptor .forClass (ClientCall .Listener .class );
1499+
1500+ CallOptions callOptions = CallOptions .DEFAULT .withExecutor (Executors .newSingleThreadExecutor ());
1501+ ClientCall <String , String > proxyCall = interceptor .interceptCall (METHOD_SAY_HELLO , callOptions , mockNextChannel );
1502+ proxyCall .start (Mockito .mock (ClientCall .Listener .class ), new Metadata ());
1503+ Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
1504+
1505+ // Sidecar stream fails
1506+ sidecarListenerCaptor .getValue ().onClose (Status .INTERNAL .withDescription ("Sidecar Error" ), new Metadata ());
1507+
1508+ // Verify raw call cancelled
1509+ Mockito .verify (mockRawCall ).cancel (Mockito .contains ("External processor stream failed" ), Mockito .any ());
1510+ }
1511+
1512+ @ Test
1513+ @ SuppressWarnings ("unchecked" )
1514+ public void givenFailureModeAllowTrue_whenExtProcStreamFails_thenDataPlaneCallFailsOpen () throws Exception {
1515+ ExternalProcessor proto = ExternalProcessor .newBuilder ()
1516+ .setGrpcService (GrpcService .newBuilder ()
1517+ .setGoogleGrpc (GrpcService .GoogleGrpc .newBuilder ()
1518+ .setTargetUri ("in-process:///sidecar" )
1519+ .addChannelCredentialsPlugin (Any .newBuilder ()
1520+ .setTypeUrl ("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials" )
1521+ .build ())
1522+ .build ())
1523+ .build ())
1524+ .setFailureModeAllow (true ) // Fail Open
1525+ .build ();
1526+ ExternalProcessorFilterConfig filterConfig = provider .parseFilterConfig (Any .pack (proto ), filterContext ).config ;
1527+
1528+ ManagedChannel mockSidecarChannel = Mockito .mock (ManagedChannel .class );
1529+ ClientCall <ProcessingRequest , ProcessingResponse > mockSidecarCall = Mockito .mock (ClientCall .class );
1530+ Mockito .when (mockSidecarChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1531+ .thenReturn (mockSidecarCall );
1532+
1533+ CachedChannelManager mockChannelManager = Mockito .mock (CachedChannelManager .class );
1534+ Mockito .when (mockChannelManager .getChannel (Mockito .any ())).thenReturn (mockSidecarChannel );
1535+
1536+ ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor (
1537+ filterConfig , mockChannelManager , scheduler );
1538+
1539+ Channel mockNextChannel = Mockito .mock (Channel .class );
1540+ ClientCall <InputStream , InputStream > mockRawCall = Mockito .mock (ClientCall .class );
1541+ Mockito .when (mockNextChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1542+ .thenReturn (mockRawCall );
1543+
1544+ ArgumentCaptor <ClientCall .Listener <ProcessingResponse >> sidecarListenerCaptor = ArgumentCaptor .forClass (ClientCall .Listener .class );
1545+
1546+ CallOptions callOptions = CallOptions .DEFAULT .withExecutor (Executors .newSingleThreadExecutor ());
1547+ ClientCall <String , String > proxyCall = interceptor .interceptCall (METHOD_SAY_HELLO , callOptions , mockNextChannel );
1548+ proxyCall .start (Mockito .mock (ClientCall .Listener .class ), new Metadata ());
1549+ Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
1550+
1551+ // Sidecar stream fails
1552+ sidecarListenerCaptor .getValue ().onClose (Status .INTERNAL .withDescription ("Sidecar Error" ), new Metadata ());
1553+
1554+ // Verify raw call NOT cancelled
1555+ Mockito .verify (mockRawCall , Mockito .never ()).cancel (Mockito .any (), Mockito .any ());
1556+
1557+ // Verify raw call started (failed open)
1558+ Mockito .verify (mockRawCall ).start (Mockito .any (), Mockito .any ());
1559+ }
1560+
1561+ @ Test
1562+ @ SuppressWarnings ("unchecked" )
1563+ public void givenImmediateResponse_whenReceived_thenDataPlaneCallIsCancelledWithProvidedStatus () throws Exception {
1564+ ExternalProcessor proto = ExternalProcessor .newBuilder ()
1565+ .setGrpcService (GrpcService .newBuilder ()
1566+ .setGoogleGrpc (GrpcService .GoogleGrpc .newBuilder ()
1567+ .setTargetUri ("in-process:///sidecar" )
1568+ .addChannelCredentialsPlugin (Any .newBuilder ()
1569+ .setTypeUrl ("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials" )
1570+ .build ())
1571+ .build ())
1572+ .build ())
1573+ .build ();
1574+ ExternalProcessorFilterConfig filterConfig = provider .parseFilterConfig (Any .pack (proto ), filterContext ).config ;
1575+
1576+ ManagedChannel mockSidecarChannel = Mockito .mock (ManagedChannel .class );
1577+ ClientCall <ProcessingRequest , ProcessingResponse > mockSidecarCall = Mockito .mock (ClientCall .class );
1578+ Mockito .when (mockSidecarChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1579+ .thenReturn (mockSidecarCall );
1580+
1581+ CachedChannelManager mockChannelManager = Mockito .mock (CachedChannelManager .class );
1582+ Mockito .when (mockChannelManager .getChannel (Mockito .any ())).thenReturn (mockSidecarChannel );
1583+
1584+ ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor (
1585+ filterConfig , mockChannelManager , scheduler );
1586+
1587+ Channel mockNextChannel = Mockito .mock (Channel .class );
1588+ ClientCall <InputStream , InputStream > mockRawCall = Mockito .mock (ClientCall .class );
1589+ Mockito .when (mockNextChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1590+ .thenReturn (mockRawCall );
1591+
1592+ ArgumentCaptor <ClientCall .Listener <ProcessingResponse >> sidecarListenerCaptor = ArgumentCaptor .forClass (ClientCall .Listener .class );
1593+ ClientCall .Listener <String > mockAppListener = Mockito .mock (ClientCall .Listener .class );
1594+
1595+ CallOptions callOptions = CallOptions .DEFAULT .withExecutor (Executors .newSingleThreadExecutor ());
1596+ ClientCall <String , String > proxyCall = interceptor .interceptCall (METHOD_SAY_HELLO , callOptions , mockNextChannel );
1597+ proxyCall .start (mockAppListener , new Metadata ());
1598+ Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
1599+
1600+ // Simulate sidecar sending ImmediateResponse (e.g., Unauthenticated)
1601+ ProcessingResponse resp = ProcessingResponse .newBuilder ()
1602+ .setImmediateResponse (ImmediateResponse .newBuilder ()
1603+ .setGrpcStatus (io .envoyproxy .envoy .service .ext_proc .v3 .GrpcStatus .newBuilder ()
1604+ .setStatus (Status .UNAUTHENTICATED .getCode ().value ())
1605+ .build ())
1606+ .build ())
1607+ .build ();
1608+ sidecarListenerCaptor .getValue ().onMessage (resp );
1609+
1610+ // Verify data plane call cancelled
1611+ Mockito .verify (mockRawCall ).cancel (Mockito .contains ("Rejected by ExtProc" ), Mockito .any ());
1612+
1613+ // Verify app listener notified with the correct status
1614+ Mockito .verify (mockAppListener ).onClose (Mockito .eq (Status .UNAUTHENTICATED ), Mockito .any ());
1615+ }
1616+
1617+ @ Test
1618+ @ SuppressWarnings ("unchecked" )
1619+ public void givenUnsupportedCompressionInResponse_whenReceived_thenExtProcStreamIsErroredAndCallIsCancelled () throws Exception {
1620+ ExternalProcessor proto = ExternalProcessor .newBuilder ()
1621+ .setGrpcService (GrpcService .newBuilder ()
1622+ .setGoogleGrpc (GrpcService .GoogleGrpc .newBuilder ()
1623+ .setTargetUri ("in-process:///sidecar" )
1624+ .addChannelCredentialsPlugin (Any .newBuilder ()
1625+ .setTypeUrl ("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials" )
1626+ .build ())
1627+ .build ())
1628+ .build ())
1629+ .setProcessingMode (ProcessingMode .newBuilder ()
1630+ .setRequestBodyMode (ProcessingMode .BodySendMode .GRPC ).build ())
1631+ .build ();
1632+ ExternalProcessorFilterConfig filterConfig = provider .parseFilterConfig (Any .pack (proto ), filterContext ).config ;
1633+
1634+ ManagedChannel mockSidecarChannel = Mockito .mock (ManagedChannel .class );
1635+ ClientCall <ProcessingRequest , ProcessingResponse > mockSidecarCall = Mockito .mock (ClientCall .class );
1636+ Mockito .when (mockSidecarChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1637+ .thenReturn (mockSidecarCall );
1638+
1639+ CachedChannelManager mockChannelManager = Mockito .mock (CachedChannelManager .class );
1640+ Mockito .when (mockChannelManager .getChannel (Mockito .any ())).thenReturn (mockSidecarChannel );
1641+
1642+ ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor (
1643+ filterConfig , mockChannelManager , scheduler );
1644+
1645+ Channel mockNextChannel = Mockito .mock (Channel .class );
1646+ ClientCall <InputStream , InputStream > mockRawCall = Mockito .mock (ClientCall .class );
1647+ Mockito .when (mockNextChannel .newCall (Mockito .any (MethodDescriptor .class ), Mockito .any (CallOptions .class )))
1648+ .thenReturn (mockRawCall );
1649+
1650+ ArgumentCaptor <ClientCall .Listener <ProcessingResponse >> sidecarListenerCaptor = ArgumentCaptor .forClass (ClientCall .Listener .class );
1651+
1652+ CallOptions callOptions = CallOptions .DEFAULT .withExecutor (Executors .newSingleThreadExecutor ());
1653+ ClientCall <String , String > proxyCall = interceptor .interceptCall (METHOD_SAY_HELLO , callOptions , mockNextChannel );
1654+ proxyCall .start (Mockito .mock (ClientCall .Listener .class ), new Metadata ());
1655+ Mockito .verify (mockSidecarCall ).start (sidecarListenerCaptor .capture (), Mockito .any ());
1656+
1657+ // Simulate sidecar sending compressed body mutation (unsupported)
1658+ ProcessingResponse resp = ProcessingResponse .newBuilder ()
1659+ .setRequestBody (BodyResponse .newBuilder ()
1660+ .setResponse (CommonResponse .newBuilder ()
1661+ .setBodyMutation (BodyMutation .newBuilder ()
1662+ .setStreamedResponse (StreamedBodyResponse .newBuilder ()
1663+ .setGrpcMessageCompressed (true )
1664+ .build ())
1665+ .build ())
1666+ .build ())
1667+ .build ())
1668+ .build ();
1669+
1670+ sidecarListenerCaptor .getValue ().onMessage (resp );
1671+
1672+ // Verify sidecar stream was errored explicitly (cancelled by client with onError)
1673+ Mockito .verify (mockSidecarCall ).cancel (Mockito .contains ("Cancelled by client" ), Mockito .any ());
1674+
1675+ // Verify raw call cancelled
1676+ Mockito .verify (mockRawCall ).cancel (Mockito .contains ("External processor stream failed" ), Mockito .any ());
1677+ }
1678+
14741679 @ Test
14751680 public void requestHeadersMutated () throws Exception {
14761681 ExternalProcessor proto = ExternalProcessor .newBuilder ()
0 commit comments