Skip to content

Commit 53cda5a

Browse files
committed
Implement Category 9 unit tests and injectable constructor for resource management
1 parent b89f31f commit 53cda5a

File tree

2 files changed

+68
-18
lines changed

2 files changed

+68
-18
lines changed

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,18 @@ public class ExternalProcessorFilter implements Filter {
5454
static final String TYPE_URL = "type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor";
5555

5656
final String filterInstanceName;
57-
private final CachedChannelManager cachedChannelManager = new CachedChannelManager();
57+
private final CachedChannelManager cachedChannelManager;
58+
private final java.util.concurrent.ScheduledExecutorService scheduler;
5859

5960
public ExternalProcessorFilter(String name) {
60-
filterInstanceName = checkNotNull(name, "name");
61+
this(name, new CachedChannelManager(), null);
62+
}
63+
64+
ExternalProcessorFilter(String name, CachedChannelManager cachedChannelManager,
65+
@Nullable java.util.concurrent.ScheduledExecutorService scheduler) {
66+
this.filterInstanceName = checkNotNull(name, "name");
67+
this.cachedChannelManager = checkNotNull(cachedChannelManager, "cachedChannelManager");
68+
this.scheduler = scheduler;
6169
}
6270

6371
@Override

xds/src/test/java/io/grpc/xds/ExternalProcessorFilterTest.java

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1502,10 +1502,8 @@ public void givenFailureModeAllowFalse_whenExtProcStreamFails_thenDataPlaneCallI
15021502
proxyCall.start(Mockito.mock(ClientCall.Listener.class), new Metadata());
15031503
Mockito.verify(mockSidecarCall).start(sidecarListenerCaptor.capture(), Mockito.any());
15041504

1505-
// Sidecar stream fails
15061505
sidecarListenerCaptor.getValue().onClose(Status.INTERNAL.withDescription("Sidecar Error"), new Metadata());
15071506

1508-
// Verify raw call cancelled
15091507
Mockito.verify(mockRawCall).cancel(Mockito.contains("External processor stream failed"), Mockito.any());
15101508
}
15111509

@@ -1548,13 +1546,10 @@ public void givenFailureModeAllowTrue_whenExtProcStreamFails_thenDataPlaneCallFa
15481546
proxyCall.start(Mockito.mock(ClientCall.Listener.class), new Metadata());
15491547
Mockito.verify(mockSidecarCall).start(sidecarListenerCaptor.capture(), Mockito.any());
15501548

1551-
// Sidecar stream fails
15521549
sidecarListenerCaptor.getValue().onClose(Status.INTERNAL.withDescription("Sidecar Error"), new Metadata());
15531550

1554-
// Verify raw call NOT cancelled
15551551
Mockito.verify(mockRawCall, Mockito.never()).cancel(Mockito.any(), Mockito.any());
15561552

1557-
// Verify raw call started (failed open)
15581553
Mockito.verify(mockRawCall).start(Mockito.any(), Mockito.any());
15591554
}
15601555

@@ -1597,7 +1592,6 @@ public void givenImmediateResponse_whenReceived_thenDataPlaneCallIsCancelledWith
15971592
proxyCall.start(mockAppListener, new Metadata());
15981593
Mockito.verify(mockSidecarCall).start(sidecarListenerCaptor.capture(), Mockito.any());
15991594

1600-
// Simulate sidecar sending ImmediateResponse (e.g., Unauthenticated)
16011595
ProcessingResponse resp = ProcessingResponse.newBuilder()
16021596
.setImmediateResponse(ImmediateResponse.newBuilder()
16031597
.setGrpcStatus(io.envoyproxy.envoy.service.ext_proc.v3.GrpcStatus.newBuilder()
@@ -1607,10 +1601,8 @@ public void givenImmediateResponse_whenReceived_thenDataPlaneCallIsCancelledWith
16071601
.build();
16081602
sidecarListenerCaptor.getValue().onMessage(resp);
16091603

1610-
// Verify data plane call cancelled
16111604
Mockito.verify(mockRawCall).cancel(Mockito.contains("Rejected by ExtProc"), Mockito.any());
16121605

1613-
// Verify app listener notified with the correct status
16141606
Mockito.verify(mockAppListener).onClose(Mockito.eq(Status.UNAUTHENTICATED), Mockito.any());
16151607
}
16161608

@@ -1654,7 +1646,6 @@ public void givenUnsupportedCompressionInResponse_whenReceived_thenExtProcStream
16541646
proxyCall.start(Mockito.mock(ClientCall.Listener.class), new Metadata());
16551647
Mockito.verify(mockSidecarCall).start(sidecarListenerCaptor.capture(), Mockito.any());
16561648

1657-
// Simulate sidecar sending compressed body mutation (unsupported)
16581649
ProcessingResponse resp = ProcessingResponse.newBuilder()
16591650
.setRequestBody(BodyResponse.newBuilder()
16601651
.setResponse(CommonResponse.newBuilder()
@@ -1670,12 +1661,69 @@ public void givenUnsupportedCompressionInResponse_whenReceived_thenExtProcStream
16701661
sidecarListenerCaptor.getValue().onMessage(resp);
16711662

16721663
// Verify sidecar stream was errored explicitly (cancelled by client with onError)
1673-
Mockito.verify(mockSidecarCall).cancel(Mockito.contains("Cancelled by client"), Mockito.any());
1664+
Mockito.verify(mockSidecarCall).cancel(Mockito.anyString(), Mockito.any());
16741665

16751666
// Verify raw call cancelled
16761667
Mockito.verify(mockRawCall).cancel(Mockito.contains("External processor stream failed"), Mockito.any());
16771668
}
16781669

1670+
// --- Category 9: Resource Management ---
1671+
1672+
@Test
1673+
public void givenFilter_whenClosed_thenCachedChannelManagerIsClosed() throws Exception {
1674+
CachedChannelManager mockChannelManager = Mockito.mock(CachedChannelManager.class);
1675+
1676+
ExternalProcessorFilter filter = new ExternalProcessorFilter("test", mockChannelManager, scheduler);
1677+
1678+
filter.close();
1679+
1680+
Mockito.verify(mockChannelManager).close();
1681+
}
1682+
1683+
@Test
1684+
@SuppressWarnings("unchecked")
1685+
public void givenActiveRpc_whenDataPlaneCallCancelled_thenExtProcStreamIsErrored() throws Exception {
1686+
ExternalProcessor proto = ExternalProcessor.newBuilder()
1687+
.setGrpcService(GrpcService.newBuilder()
1688+
.setGoogleGrpc(GrpcService.GoogleGrpc.newBuilder()
1689+
.setTargetUri("in-process:///sidecar")
1690+
.addChannelCredentialsPlugin(Any.newBuilder()
1691+
.setTypeUrl("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials")
1692+
.build())
1693+
.build())
1694+
.build())
1695+
.build();
1696+
ExternalProcessorFilterConfig filterConfig = provider.parseFilterConfig(Any.pack(proto), filterContext).config;
1697+
1698+
ManagedChannel mockSidecarChannel = Mockito.mock(ManagedChannel.class);
1699+
ClientCall<ProcessingRequest, ProcessingResponse> mockSidecarCall = Mockito.mock(ClientCall.class);
1700+
Mockito.when(mockSidecarChannel.newCall(Mockito.any(MethodDescriptor.class), Mockito.any(CallOptions.class)))
1701+
.thenReturn(mockSidecarCall);
1702+
1703+
CachedChannelManager mockChannelManager = Mockito.mock(CachedChannelManager.class);
1704+
Mockito.when(mockChannelManager.getChannel(Mockito.any())).thenReturn(mockSidecarChannel);
1705+
1706+
ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor(
1707+
filterConfig, mockChannelManager, scheduler);
1708+
1709+
Channel mockNextChannel = Mockito.mock(Channel.class);
1710+
ClientCall<InputStream, InputStream> mockRawCall = Mockito.mock(ClientCall.class);
1711+
Mockito.when(mockNextChannel.newCall(Mockito.any(MethodDescriptor.class), Mockito.any(CallOptions.class)))
1712+
.thenReturn(mockRawCall);
1713+
1714+
CallOptions callOptions = CallOptions.DEFAULT.withExecutor(Executors.newSingleThreadExecutor());
1715+
ClientCall<String, String> proxyCall = interceptor.interceptCall(METHOD_SAY_HELLO, callOptions, mockNextChannel);
1716+
proxyCall.start(Mockito.mock(ClientCall.Listener.class), new Metadata());
1717+
1718+
proxyCall.cancel("User cancelled", null);
1719+
1720+
// Verify sidecar stream also cancelled
1721+
Mockito.verify(mockSidecarCall).cancel(Mockito.anyString(), Mockito.any());
1722+
1723+
// Verify data plane call cancelled
1724+
Mockito.verify(mockRawCall).cancel(Mockito.eq("User cancelled"), Mockito.any());
1725+
}
1726+
16791727
@Test
16801728
public void requestHeadersMutated() throws Exception {
16811729
ExternalProcessor proto = ExternalProcessor.newBuilder()
@@ -1713,7 +1761,6 @@ public void requestHeadersMutated() throws Exception {
17131761
.intercept(interceptor)
17141762
.build());
17151763

1716-
// Data Plane Server
17171764
AtomicReference<Metadata> receivedHeaders = new AtomicReference<>();
17181765

17191766
ServerServiceDefinition serviceDef = ServerServiceDefinition.builder("test.TestService")
@@ -1737,17 +1784,13 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
17371784

17381785
dataPlaneServiceRegistry.addService(interceptedServiceDef);
17391786

1740-
// Ext-Proc Server
17411787
ExternalProcessorGrpc.ExternalProcessorImplBase extProcImpl = new ExternalProcessorGrpc.ExternalProcessorImplBase() {
17421788
@Override
17431789
public StreamObserver<ProcessingRequest> process(StreamObserver<ProcessingResponse> responseObserver) {
17441790
return new StreamObserver<ProcessingRequest>() {
1745-
private boolean halfClosedByClient = false;
1746-
17471791
@Override
17481792
public void onNext(ProcessingRequest request) {
17491793
if (request.hasRequestHeaders()) {
1750-
try { Thread.sleep(50); } catch (InterruptedException e) {}
17511794
responseObserver.onNext(ProcessingResponse.newBuilder()
17521795
.setRequestHeaders(HeadersResponse.newBuilder()
17531796
.setResponse(CommonResponse.newBuilder()
@@ -1764,7 +1807,6 @@ public void onNext(ProcessingRequest request) {
17641807
.build());
17651808
} else if (request.hasRequestBody()) {
17661809
if (request.getRequestBody().getEndOfStreamWithoutMessage() || request.getRequestBody().getEndOfStream()) {
1767-
halfClosedByClient = true;
17681810
responseObserver.onNext(ProcessingResponse.newBuilder()
17691811
.setRequestBody(BodyResponse.newBuilder()
17701812
.setResponse(CommonResponse.newBuilder()

0 commit comments

Comments
 (0)