Skip to content

Commit 2e9bd3f

Browse files
committed
Introduce delays in ext_proc response for request headers so that eager calls from the application getting buffered and delivered after DelayedClientCall start get tested.
Remove unreachable statement checking for ext-proc stream state already closed when data plane onClose happens. Because the same serializing executor is used for both, and if ext-proc stream closed, the passthrough mode would have been set, this condition can never evaluate to true. In observability mode all ProcessingResponses should be ignored including the one with immediate_response.
1 parent 102991d commit 2e9bd3f

2 files changed

Lines changed: 128 additions & 12 deletions

File tree

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1051,6 +1051,10 @@ public void beforeStart(ClientCallStreamObserver<ProcessingRequest> requestStrea
10511051
@Override
10521052
public void onNext(ProcessingResponse response) {
10531053
try {
1054+
if (config.getObservabilityMode()) {
1055+
return;
1056+
}
1057+
10541058
if (response.hasImmediateResponse()) {
10551059
if (config.getDisableImmediateResponse()) {
10561060
internalOnError(Status.UNAVAILABLE
@@ -1063,10 +1067,6 @@ public void onNext(ProcessingResponse response) {
10631067
return;
10641068
}
10651069

1066-
if (config.getObservabilityMode()) {
1067-
return;
1068-
}
1069-
10701070
EventType expected = expectedResponses.peek();
10711071
if (response.hasRequestHeaders()) {
10721072
if (expected == null || expected != EventType.REQUEST_HEADERS) {
@@ -1653,8 +1653,8 @@ public void onMessage(InputStream message) {
16531653
@Override
16541654
public void onClose(Status status, Metadata trailers) {
16551655
dataPlaneClientCall.serverTrailersStartNanos = System.nanoTime();
1656-
ExtProcStreamState state = dataPlaneClientCall.extProcStreamState.get();
1657-
if (state.isFailed()
1656+
ExtProcStreamState extProcStreamState = dataPlaneClientCall.extProcStreamState.get();
1657+
if (extProcStreamState.isFailed()
16581658
&& !dataPlaneClientCall.config.getFailureModeAllow()) {
16591659
if (dataPlaneClientCall.markDataPlaneCallClosed()) {
16601660
proceedWithClose(Status.UNAVAILABLE.withDescription("External processor stream failed")
@@ -1672,11 +1672,6 @@ public void onClose(Status status, Metadata trailers) {
16721672
this.savedStatus = status;
16731673
this.savedTrailers = trailers;
16741674

1675-
if (state.isCompleted()) {
1676-
proceedWithClose();
1677-
return;
1678-
}
1679-
16801675
if (savedHeaders != null) {
16811676
return;
16821677
}

xds/src/test/java/io/grpc/xds/ExternalProcessorFilterTest.java

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1406,6 +1406,8 @@ public void givenRequestHeaderModeSend_whenExtProcRespondsWithMutations_thenCall
14061406
assertThat(configOrError.errorDetail).isNull();
14071407
ExternalProcessorFilterConfig filterConfig = configOrError.config;
14081408

1409+
final CountDownLatch appFinishedLatch = new CountDownLatch(1);
1410+
14091411
// External Processor Server
14101412
ExternalProcessorGrpc.ExternalProcessorImplBase extProcImpl;
14111413
extProcImpl = new ExternalProcessorGrpc.ExternalProcessorImplBase() {
@@ -1419,6 +1421,11 @@ public StreamObserver<ProcessingRequest> process(
14191421
public void onNext(ProcessingRequest request) {
14201422
new Thread(() -> {
14211423
if (request.hasRequestHeaders()) {
1424+
try {
1425+
appFinishedLatch.await();
1426+
} catch (InterruptedException e) {
1427+
Thread.currentThread().interrupt();
1428+
}
14221429
responseObserver.onNext(ProcessingResponse.newBuilder()
14231430
.setRequestHeaders(HeadersResponse.newBuilder()
14241431
.setResponse(CommonResponse.newBuilder()
@@ -1500,11 +1507,14 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
15001507
Metadata headers = new Metadata();
15011508
proxyCall.start(new ClientCall.Listener<String>() {}, headers);
15021509

1503-
// Send message and half-close to trigger unary call
1510+
// Send message and half-close to trigger unary call while the call is buffered (since ext-proc is waiting)
15041511
proxyCall.request(1);
15051512
proxyCall.sendMessage("test");
15061513
proxyCall.halfClose();
15071514

1515+
// Release the ext-proc response now that all app events are buffered
1516+
appFinishedLatch.countDown();
1517+
15081518
// Verify main call started with mutated headers
15091519
assertThat(dataPlaneLatch.await(5, TimeUnit.SECONDS)).isTrue();
15101520
Metadata finalHeaders = capturedHeaders.get();
@@ -5187,6 +5197,117 @@ public void onClose(Status status, Metadata trailers) {
51875197
channelManager.close();
51885198
}
51895199

5200+
@Test
5201+
@SuppressWarnings("unchecked")
5202+
public void givenImmediateResponseAndObservabilityTrue_whenReceived_thenImmediateResponseIgnored()
5203+
throws Exception {
5204+
ExternalProcessor proto = ExternalProcessor.newBuilder()
5205+
.setGrpcService(GrpcService.newBuilder()
5206+
.setGoogleGrpc(GrpcService.GoogleGrpc.newBuilder()
5207+
.setTargetUri("in-process:///" + extProcServerName)
5208+
.addChannelCredentialsPlugin(Any.newBuilder()
5209+
.setTypeUrl("type.googleapis.com/envoy.extensions.grpc_service."
5210+
+ "channel_credentials.insecure.v3.InsecureCredentials")
5211+
.build())
5212+
.build())
5213+
.build())
5214+
.setObservabilityMode(true)
5215+
.build();
5216+
ConfigOrError<ExternalProcessorFilterConfig> configOrError =
5217+
provider.parseFilterConfig(Any.pack(proto), filterContext);
5218+
assertThat(configOrError.errorDetail).isNull();
5219+
ExternalProcessorFilterConfig filterConfig = configOrError.config;
5220+
5221+
// External Processor Server sends ImmediateResponse
5222+
ExternalProcessorGrpc.ExternalProcessorImplBase extProcImpl;
5223+
extProcImpl = new ExternalProcessorGrpc.ExternalProcessorImplBase() {
5224+
@Override
5225+
@SuppressWarnings("unchecked")
5226+
public StreamObserver<ProcessingRequest> process(
5227+
final StreamObserver<ProcessingResponse> responseObserver) {
5228+
((ServerCallStreamObserver<ProcessingResponse>) responseObserver).request(100);
5229+
return new StreamObserver<ProcessingRequest>() {
5230+
@Override
5231+
public void onNext(ProcessingRequest request) {
5232+
if (request.hasRequestHeaders()) {
5233+
responseObserver.onNext(ProcessingResponse.newBuilder()
5234+
.setImmediateResponse(ImmediateResponse.newBuilder()
5235+
.setGrpcStatus(
5236+
io.envoyproxy.envoy.service.ext_proc.v3.GrpcStatus.newBuilder()
5237+
.setStatus(Status.UNAUTHENTICATED.getCode().value())
5238+
.build())
5239+
.setDetails("Custom security rejection")
5240+
.build())
5241+
.build());
5242+
responseObserver.onCompleted();
5243+
}
5244+
}
5245+
5246+
@Override
5247+
public void onError(Throwable t) {}
5248+
5249+
@Override
5250+
public void onCompleted() {}
5251+
};
5252+
}
5253+
};
5254+
grpcCleanup.register(InProcessServerBuilder.forName(extProcServerName)
5255+
.addService(extProcImpl)
5256+
.directExecutor()
5257+
.build().start());
5258+
5259+
CachedChannelManager channelManager = new CachedChannelManager(config -> {
5260+
return grpcCleanup.register(
5261+
InProcessChannelBuilder.forName(extProcServerName)
5262+
.directExecutor()
5263+
.build());
5264+
});
5265+
5266+
ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor(
5267+
filterConfig, channelManager, scheduler, FAKE_CONTEXT);
5268+
5269+
final CountDownLatch dataPlaneLatch = new CountDownLatch(1);
5270+
dataPlaneServiceRegistry.addService(ServerServiceDefinition.builder("test.TestService")
5271+
.addMethod(METHOD_SAY_HELLO, ServerCalls.asyncUnaryCall(
5272+
(request, responseObserver) -> {
5273+
responseObserver.onNext("Hello " + request);
5274+
responseObserver.onCompleted();
5275+
dataPlaneLatch.countDown();
5276+
}))
5277+
.build());
5278+
5279+
ManagedChannel dataPlaneChannel = grpcCleanup.register(
5280+
InProcessChannelBuilder.forName(dataPlaneServerName).directExecutor().build());
5281+
5282+
final CountDownLatch closedLatch = new CountDownLatch(1);
5283+
final AtomicReference<Status> closedStatus = new AtomicReference<>();
5284+
ClientCall.Listener<String> appListener = new ClientCall.Listener<String>() {
5285+
@Override
5286+
public void onClose(Status status, Metadata trailers) {
5287+
closedStatus.set(status);
5288+
closedLatch.countDown();
5289+
}
5290+
};
5291+
5292+
CallOptions callOptions = DEFAULT_CALL_OPTIONS.withExecutor(MoreExecutors.directExecutor());
5293+
ClientCall<String, String> proxyCall =
5294+
interceptCall(interceptor, METHOD_SAY_HELLO, callOptions, dataPlaneChannel);
5295+
proxyCall.start(appListener, new Metadata());
5296+
5297+
proxyCall.request(1);
5298+
proxyCall.sendMessage("test");
5299+
proxyCall.halfClose();
5300+
5301+
// In observability mode, the call should NOT be cancelled by the immediate response.
5302+
// It should proceed normally to the data plane and finish successfully (Status.OK).
5303+
assertThat(dataPlaneLatch.await(5, TimeUnit.SECONDS)).isTrue();
5304+
assertThat(closedLatch.await(5, TimeUnit.SECONDS)).isTrue();
5305+
assertThat(closedStatus.get().isOk()).isTrue();
5306+
5307+
proxyCall.cancel("Cleanup", null);
5308+
channelManager.close();
5309+
}
5310+
51905311
@Test
51915312
@SuppressWarnings("unchecked")
51925313
public void givenImmediateResponseDisabled_whenReceived_thenSidecarStreamErrored()

0 commit comments

Comments
 (0)