Skip to content

Commit 3a7729f

Browse files
committed
In observability mode all ProcessingResponses should be ignored including the one with immediate_response.
1 parent 102991d commit 3a7729f

2 files changed

Lines changed: 115 additions & 4 deletions

File tree

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1051,6 +1051,10 @@ public void beforeStart(ClientCallStreamObserver<ProcessingRequest> requestStrea
10511051
@Override
10521052
public void onNext(ProcessingResponse response) {
10531053
try {
1054+
if (config.getObservabilityMode()) {
1055+
return;
1056+
}
1057+
10541058
if (response.hasImmediateResponse()) {
10551059
if (config.getDisableImmediateResponse()) {
10561060
internalOnError(Status.UNAVAILABLE
@@ -1063,10 +1067,6 @@ public void onNext(ProcessingResponse response) {
10631067
return;
10641068
}
10651069

1066-
if (config.getObservabilityMode()) {
1067-
return;
1068-
}
1069-
10701070
EventType expected = expectedResponses.peek();
10711071
if (response.hasRequestHeaders()) {
10721072
if (expected == null || expected != EventType.REQUEST_HEADERS) {

xds/src/test/java/io/grpc/xds/ExternalProcessorFilterTest.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5187,6 +5187,117 @@ public void onClose(Status status, Metadata trailers) {
51875187
channelManager.close();
51885188
}
51895189

5190+
@Test
5191+
@SuppressWarnings("unchecked")
5192+
public void givenImmediateResponseAndObservabilityTrue_whenReceived_thenImmediateResponseIgnored()
5193+
throws Exception {
5194+
ExternalProcessor proto = ExternalProcessor.newBuilder()
5195+
.setGrpcService(GrpcService.newBuilder()
5196+
.setGoogleGrpc(GrpcService.GoogleGrpc.newBuilder()
5197+
.setTargetUri("in-process:///" + extProcServerName)
5198+
.addChannelCredentialsPlugin(Any.newBuilder()
5199+
.setTypeUrl("type.googleapis.com/envoy.extensions.grpc_service."
5200+
+ "channel_credentials.insecure.v3.InsecureCredentials")
5201+
.build())
5202+
.build())
5203+
.build())
5204+
.setObservabilityMode(true)
5205+
.build();
5206+
ConfigOrError<ExternalProcessorFilterConfig> configOrError =
5207+
provider.parseFilterConfig(Any.pack(proto), filterContext);
5208+
assertThat(configOrError.errorDetail).isNull();
5209+
ExternalProcessorFilterConfig filterConfig = configOrError.config;
5210+
5211+
// External Processor Server sends ImmediateResponse
5212+
ExternalProcessorGrpc.ExternalProcessorImplBase extProcImpl;
5213+
extProcImpl = new ExternalProcessorGrpc.ExternalProcessorImplBase() {
5214+
@Override
5215+
@SuppressWarnings("unchecked")
5216+
public StreamObserver<ProcessingRequest> process(
5217+
final StreamObserver<ProcessingResponse> responseObserver) {
5218+
((ServerCallStreamObserver<ProcessingResponse>) responseObserver).request(100);
5219+
return new StreamObserver<ProcessingRequest>() {
5220+
@Override
5221+
public void onNext(ProcessingRequest request) {
5222+
if (request.hasRequestHeaders()) {
5223+
responseObserver.onNext(ProcessingResponse.newBuilder()
5224+
.setImmediateResponse(ImmediateResponse.newBuilder()
5225+
.setGrpcStatus(
5226+
io.envoyproxy.envoy.service.ext_proc.v3.GrpcStatus.newBuilder()
5227+
.setStatus(Status.UNAUTHENTICATED.getCode().value())
5228+
.build())
5229+
.setDetails("Custom security rejection")
5230+
.build())
5231+
.build());
5232+
responseObserver.onCompleted();
5233+
}
5234+
}
5235+
5236+
@Override
5237+
public void onError(Throwable t) {}
5238+
5239+
@Override
5240+
public void onCompleted() {}
5241+
};
5242+
}
5243+
};
5244+
grpcCleanup.register(InProcessServerBuilder.forName(extProcServerName)
5245+
.addService(extProcImpl)
5246+
.directExecutor()
5247+
.build().start());
5248+
5249+
CachedChannelManager channelManager = new CachedChannelManager(config -> {
5250+
return grpcCleanup.register(
5251+
InProcessChannelBuilder.forName(extProcServerName)
5252+
.directExecutor()
5253+
.build());
5254+
});
5255+
5256+
ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor(
5257+
filterConfig, channelManager, scheduler, FAKE_CONTEXT);
5258+
5259+
final CountDownLatch dataPlaneLatch = new CountDownLatch(1);
5260+
dataPlaneServiceRegistry.addService(ServerServiceDefinition.builder("test.TestService")
5261+
.addMethod(METHOD_SAY_HELLO, ServerCalls.asyncUnaryCall(
5262+
(request, responseObserver) -> {
5263+
responseObserver.onNext("Hello " + request);
5264+
responseObserver.onCompleted();
5265+
dataPlaneLatch.countDown();
5266+
}))
5267+
.build());
5268+
5269+
ManagedChannel dataPlaneChannel = grpcCleanup.register(
5270+
InProcessChannelBuilder.forName(dataPlaneServerName).directExecutor().build());
5271+
5272+
final CountDownLatch closedLatch = new CountDownLatch(1);
5273+
final AtomicReference<Status> closedStatus = new AtomicReference<>();
5274+
ClientCall.Listener<String> appListener = new ClientCall.Listener<String>() {
5275+
@Override
5276+
public void onClose(Status status, Metadata trailers) {
5277+
closedStatus.set(status);
5278+
closedLatch.countDown();
5279+
}
5280+
};
5281+
5282+
CallOptions callOptions = DEFAULT_CALL_OPTIONS.withExecutor(MoreExecutors.directExecutor());
5283+
ClientCall<String, String> proxyCall =
5284+
interceptCall(interceptor, METHOD_SAY_HELLO, callOptions, dataPlaneChannel);
5285+
proxyCall.start(appListener, new Metadata());
5286+
5287+
proxyCall.request(1);
5288+
proxyCall.sendMessage("test");
5289+
proxyCall.halfClose();
5290+
5291+
// In observability mode, the call should NOT be cancelled by the immediate response.
5292+
// It should proceed normally to the data plane and finish successfully (Status.OK).
5293+
assertThat(dataPlaneLatch.await(5, TimeUnit.SECONDS)).isTrue();
5294+
assertThat(closedLatch.await(5, TimeUnit.SECONDS)).isTrue();
5295+
assertThat(closedStatus.get().isOk()).isTrue();
5296+
5297+
proxyCall.cancel("Cleanup", null);
5298+
channelManager.close();
5299+
}
5300+
51905301
@Test
51915302
@SuppressWarnings("unchecked")
51925303
public void givenImmediateResponseDisabled_whenReceived_thenSidecarStreamErrored()

0 commit comments

Comments
 (0)