Skip to content

Commit 291f784

Browse files
committed
Added an explicit call to drainPendingRequests() directly inside handleFailOpen() in ExternalProcessorFilter.java, ensuring that all accumulated/buffered message requests are successfully flushed to the underlying call upon transitioning to fail-open/pass-through mode.
Also handle passing along empty request or response messages.
1 parent 5f8c2b6 commit 291f784

2 files changed

Lines changed: 182 additions & 13 deletions

File tree

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,7 +1239,14 @@ private void sendToExtProc(ProcessingRequest request) {
12391239
expectedResponses.add(EventType.RESPONSE_TRAILERS);
12401240
}
12411241

1242-
extProcClientCallRequestObserver.onNext(request);
1242+
ProcessingRequest requestToSend = request;
1243+
if (config.getObservabilityMode()) {
1244+
requestToSend = ProcessingRequest.newBuilder(request)
1245+
.setObservabilityMode(true)
1246+
.build();
1247+
}
1248+
1249+
extProcClientCallRequestObserver.onNext(requestToSend);
12431250
}
12441251
}
12451252

@@ -1445,7 +1452,7 @@ private void handleRequestBodyResponse(BodyResponse bodyResponse) {
14451452
BodyMutation mutation = bodyResponse.getResponse().getBodyMutation();
14461453
if (mutation.hasStreamedResponse()) {
14471454
StreamedBodyResponse streamed = mutation.getStreamedResponse();
1448-
if (!streamed.getBody().isEmpty()) {
1455+
if (!streamed.getEndOfStreamWithoutMessage()) {
14491456
super.sendMessage(streamed.getBody().newInput());
14501457
}
14511458
if (streamed.getEndOfStream() || streamed.getEndOfStreamWithoutMessage()) {
@@ -1463,9 +1470,7 @@ private void handleResponseBodyResponse(
14631470
BodyMutation mutation = bodyResponse.getResponse().getBodyMutation();
14641471
if (mutation.hasStreamedResponse()) {
14651472
StreamedBodyResponse streamed = mutation.getStreamedResponse();
1466-
if (!streamed.getBody().isEmpty()) {
1467-
listener.onExternalBody(streamed.getBody());
1468-
}
1473+
listener.onExternalBody(streamed.getBody());
14691474
}
14701475
}
14711476
}
@@ -1513,6 +1518,7 @@ private void drainPendingDrainingMessages() {
15131518

15141519
private void handleFailOpen(DataPlaneListener listener) {
15151520
activateCall();
1521+
drainPendingRequests();
15161522
listener.unblockAfterStreamComplete();
15171523
closeExtProcStream();
15181524
}

xds/src/test/java/io/grpc/xds/ExternalProcessorFilterTest.java

Lines changed: 171 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1931,6 +1931,7 @@ public void onNext(ProcessingRequest request) {
19311931
.setBodyMutation(BodyMutation.newBuilder()
19321932
.setStreamedResponse(StreamedBodyResponse.newBuilder()
19331933
.setEndOfStream(true)
1934+
.setEndOfStreamWithoutMessage(true)
19341935
.build())
19351936
.build())
19361937
.build());
@@ -2011,6 +2012,153 @@ public void onCompleted() {
20112012
channelManager.close();
20122013
}
20132014

2015+
@Test
2016+
@SuppressWarnings("unchecked")
2017+
public void givenRequestBodyModeGrpc_whenClientSendsEmptyMessage_thenEmptyMessageIsDelivered()
2018+
throws Exception {
2019+
String uniqueExtProcServerName =
2020+
"extProc-emptyMsg-" + InProcessServerBuilder.generateName();
2021+
String uniqueDataPlaneServerName =
2022+
"dataPlane-emptyMsg-" + InProcessServerBuilder.generateName();
2023+
ExternalProcessor proto = ExternalProcessor.newBuilder()
2024+
.setGrpcService(GrpcService.newBuilder()
2025+
.setGoogleGrpc(GrpcService.GoogleGrpc.newBuilder()
2026+
.setTargetUri("in-process:///" + uniqueExtProcServerName)
2027+
.addChannelCredentialsPlugin(Any.newBuilder()
2028+
.setTypeUrl("type.googleapis.com/envoy.extensions.grpc_service."
2029+
+ "channel_credentials.insecure.v3.InsecureCredentials")
2030+
.build())
2031+
.build())
2032+
.build())
2033+
.setProcessingMode(ProcessingMode.newBuilder()
2034+
.setRequestHeaderMode(ProcessingMode.HeaderSendMode.SKIP)
2035+
.setResponseHeaderMode(ProcessingMode.HeaderSendMode.SKIP)
2036+
.setResponseTrailerMode(ProcessingMode.HeaderSendMode.SKIP)
2037+
.setRequestBodyMode(ProcessingMode.BodySendMode.GRPC).build())
2038+
.build();
2039+
ConfigOrError<ExternalProcessorFilterConfig> configOrError =
2040+
provider.parseFilterConfig(Any.pack(proto), filterContext);
2041+
assertThat(configOrError.errorDetail).isNull();
2042+
ExternalProcessorFilterConfig filterConfig = configOrError.config;
2043+
2044+
// External Processor Server
2045+
ExternalProcessorGrpc.ExternalProcessorImplBase extProcImpl;
2046+
extProcImpl = new ExternalProcessorGrpc.ExternalProcessorImplBase() {
2047+
@Override
2048+
@SuppressWarnings("unchecked")
2049+
public StreamObserver<ProcessingRequest> process(
2050+
final StreamObserver<ProcessingResponse> responseObserver) {
2051+
((ServerCallStreamObserver<ProcessingResponse>) responseObserver).request(100);
2052+
return new StreamObserver<ProcessingRequest>() {
2053+
@Override
2054+
public void onNext(ProcessingRequest request) {
2055+
if (request.hasRequestBody()) {
2056+
BodyResponse.Builder bodyResponse = BodyResponse.newBuilder();
2057+
if (request.getRequestBody().getBody().isEmpty()
2058+
&& request.getRequestBody().getEndOfStreamWithoutMessage()) {
2059+
bodyResponse.setResponse(CommonResponse.newBuilder()
2060+
.setBodyMutation(BodyMutation.newBuilder()
2061+
.setStreamedResponse(StreamedBodyResponse.newBuilder()
2062+
.setEndOfStream(true)
2063+
.setEndOfStreamWithoutMessage(true)
2064+
.build())
2065+
.build())
2066+
.build());
2067+
} else {
2068+
bodyResponse.setResponse(CommonResponse.newBuilder()
2069+
.setBodyMutation(BodyMutation.newBuilder()
2070+
.setStreamedResponse(StreamedBodyResponse.newBuilder()
2071+
.setBody(ByteString.EMPTY)
2072+
.setEndOfStream(request.getRequestBody().getEndOfStream())
2073+
.build())
2074+
.build())
2075+
.build());
2076+
}
2077+
responseObserver.onNext(ProcessingResponse.newBuilder()
2078+
.setRequestBody(bodyResponse.build())
2079+
.build());
2080+
}
2081+
}
2082+
2083+
@Override
2084+
public void onError(Throwable t) {
2085+
}
2086+
2087+
@Override
2088+
public void onCompleted() {
2089+
responseObserver.onCompleted();
2090+
}
2091+
};
2092+
}
2093+
};
2094+
grpcCleanup.register(InProcessServerBuilder.forName(uniqueExtProcServerName)
2095+
.addService(extProcImpl)
2096+
.directExecutor()
2097+
.build().start());
2098+
2099+
CachedChannelManager channelManager = new CachedChannelManager(config -> {
2100+
return grpcCleanup.register(
2101+
InProcessChannelBuilder.forName(uniqueExtProcServerName).directExecutor().build());
2102+
});
2103+
2104+
ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor(
2105+
filterConfig, channelManager, scheduler, FAKE_CONTEXT);
2106+
2107+
final AtomicReference<String> receivedBody = new AtomicReference<>();
2108+
final CountDownLatch dataPlaneLatch = new CountDownLatch(1);
2109+
MutableHandlerRegistry uniqueRegistry = new MutableHandlerRegistry();
2110+
grpcCleanup.register(InProcessServerBuilder.forName(uniqueDataPlaneServerName)
2111+
.fallbackHandlerRegistry(uniqueRegistry)
2112+
.directExecutor()
2113+
.build().start());
2114+
2115+
uniqueRegistry.addService(ServerServiceDefinition.builder("test.TestService")
2116+
.addMethod(METHOD_SAY_HELLO, ServerCalls.asyncUnaryCall(
2117+
(request, responseObserver) -> {
2118+
receivedBody.set(request);
2119+
responseObserver.onNext("Hello");
2120+
responseObserver.onCompleted();
2121+
dataPlaneLatch.countDown();
2122+
}))
2123+
.build());
2124+
2125+
ManagedChannel dataPlaneChannel = grpcCleanup.register(
2126+
InProcessChannelBuilder.forName(uniqueDataPlaneServerName).directExecutor().build());
2127+
2128+
CallOptions callOptions = DEFAULT_CALL_OPTIONS.withExecutor(MoreExecutors.directExecutor());
2129+
ClientCall<String, String> proxyCall =
2130+
interceptCall(interceptor, METHOD_SAY_HELLO, callOptions, dataPlaneChannel);
2131+
final AtomicReference<Status> clientStatus = new AtomicReference<>();
2132+
final CountDownLatch clientCloseLatch = new CountDownLatch(1);
2133+
proxyCall.start(new ClientCall.Listener<String>() {
2134+
@Override
2135+
public void onClose(Status status, Metadata trailers) {
2136+
clientStatus.set(status);
2137+
clientCloseLatch.countDown();
2138+
}
2139+
}, new Metadata());
2140+
2141+
proxyCall.request(1);
2142+
proxyCall.sendMessage("");
2143+
proxyCall.halfClose();
2144+
2145+
boolean dataPlaneOk = dataPlaneLatch.await(5, TimeUnit.SECONDS);
2146+
boolean clientClosedOk = clientCloseLatch.await(5, TimeUnit.SECONDS);
2147+
2148+
System.out.println("--- dataPlaneOk: " + dataPlaneOk);
2149+
System.out.println("--- clientClosedOk: " + clientClosedOk);
2150+
System.out.println("--- clientStatus: " + clientStatus.get());
2151+
System.out.println("--- receivedBody: " + receivedBody.get());
2152+
2153+
assertThat(dataPlaneOk).isTrue();
2154+
assertThat(receivedBody.get()).isEqualTo("");
2155+
assertThat(clientClosedOk).isTrue();
2156+
assertThat(clientStatus.get().isOk()).isTrue();
2157+
2158+
proxyCall.cancel("Cleanup", null);
2159+
channelManager.close();
2160+
}
2161+
20142162
@Test
20152163
@SuppressWarnings("unchecked")
20162164
public void givenExtProcSignaledEndOfStream_whenClientSendsMoreMessages_thenMessagesDiscarded()
@@ -3084,7 +3232,6 @@ public void onNext(ProcessingRequest request) {
30843232
.setBodyMutation(BodyMutation.newBuilder()
30853233
.setStreamedResponse(StreamedBodyResponse.newBuilder()
30863234
.setBody(request.getResponseBody().getBody())
3087-
.setEndOfStream(request.getResponseBody().getEndOfStream())
30883235
.build())
30893236
.build())
30903237
.build())
@@ -3230,7 +3377,6 @@ public void onNext(ProcessingRequest request) {
32303377
.setBodyMutation(BodyMutation.newBuilder()
32313378
.setStreamedResponse(StreamedBodyResponse.newBuilder()
32323379
.setBody(ByteString.copyFromUtf8("Mutated Server"))
3233-
.setEndOfStream(request.getResponseBody().getEndOfStream())
32343380
.build())
32353381
.build())
32363382
.build())
@@ -3494,6 +3640,7 @@ public void givenObservabilityTrue_whenExtProcBusy_thenIsReadyReturnsFalse()
34943640
assertThat(configOrError.errorDetail).isNull();
34953641
ExternalProcessorFilterConfig filterConfig = configOrError.config;
34963642

3643+
final List<ProcessingRequest> extProcRequests = new java.util.concurrent.CopyOnWriteArrayList<>();
34973644
// External Processor Server
34983645
ExternalProcessorGrpc.ExternalProcessorImplBase extProcImpl;
34993646
extProcImpl = new ExternalProcessorGrpc.ExternalProcessorImplBase() {
@@ -3505,6 +3652,7 @@ public StreamObserver<ProcessingRequest> process(
35053652
return new StreamObserver<ProcessingRequest>() {
35063653
@Override
35073654
public void onNext(ProcessingRequest request) {
3655+
extProcRequests.add(request);
35083656
if (request.hasRequestHeaders()) {
35093657
responseObserver.onNext(ProcessingResponse.newBuilder()
35103658
.setRequestHeaders(HeadersResponse.newBuilder().build())
@@ -3578,6 +3726,11 @@ public boolean isReady() {
35783726
// Sidecar busy
35793727
sidecarReady.set(false);
35803728
assertThat(proxyCall.isReady()).isFalse();
3729+
3730+
assertThat(extProcRequests).isNotEmpty();
3731+
for (ProcessingRequest request : extProcRequests) {
3732+
assertThat(request.getObservabilityMode()).isTrue();
3733+
}
35813734

35823735
proxyCall.cancel("Cleanup", null);
35833736
channelManager.close();
@@ -4083,6 +4236,9 @@ public void onMessage(String message) {
40834236
}
40844237
assertThat(proxyCall.isReady()).isFalse();
40854238

4239+
// Request messages from server while stream is draining (and sidecar not ready)
4240+
proxyCall.request(1);
4241+
40864242
// Now let sidecar complete
40874243
sidecarFinishLatch.countDown();
40884244

@@ -4093,9 +4249,6 @@ public void onMessage(String message) {
40934249
}
40944250
assertThat(proxyCall.isReady()).isTrue();
40954251

4096-
// Request messages from server
4097-
proxyCall.request(1);
4098-
40994252
// 1. Verify application message is forwarded to data plane WITHOUT sidecar contact
41004253
proxyCall.sendMessage("Direct Message");
41014254
proxyCall.halfClose();
@@ -6015,6 +6168,7 @@ public void givenObservabilityModeFalse_whenExtProcBusy_thenIsReadyReturnsFalse(
60156168
assertThat(configOrError.errorDetail).isNull();
60166169
ExternalProcessorFilterConfig filterConfig = configOrError.config;
60176170

6171+
final List<ProcessingRequest> extProcRequests = new java.util.concurrent.CopyOnWriteArrayList<>();
60186172
// Sidecar server
60196173
final CountDownLatch sidecarActionLatch = new CountDownLatch(1);
60206174
ExternalProcessorGrpc.ExternalProcessorImplBase extProcImpl;
@@ -6027,6 +6181,7 @@ public StreamObserver<ProcessingRequest> process(
60276181
return new StreamObserver<ProcessingRequest>() {
60286182
@Override
60296183
public void onNext(ProcessingRequest request) {
6184+
extProcRequests.add(request);
60306185
new Thread(() -> {
60316186
if (request.hasRequestHeaders()) {
60326187
sidecarActionLatch.countDown();
@@ -6127,6 +6282,11 @@ public boolean isReady() {
61276282
dataPlaneReady.set(false);
61286283
assertThat(proxyCall.isReady()).isTrue();
61296284

6285+
assertThat(extProcRequests).isNotEmpty();
6286+
for (ProcessingRequest request : extProcRequests) {
6287+
assertThat(request.getObservabilityMode()).isFalse();
6288+
}
6289+
61306290
proxyCall.cancel("Cleanup", null);
61316291
channelManager.close();
61326292
}
@@ -6359,6 +6519,9 @@ public void onNext(ProcessingRequest request) {
63596519
.setStreamedResponse(
63606520
StreamedBodyResponse.newBuilder()
63616521
.setEndOfStream(true)
6522+
.setEndOfStreamWithoutMessage(
6523+
request.getRequestBody()
6524+
.getEndOfStreamWithoutMessage())
63626525
.build())
63636526
.build())
63646527
.build())
@@ -6390,7 +6553,6 @@ public void onNext(ProcessingRequest request) {
63906553
.setBodyMutation(BodyMutation.newBuilder()
63916554
.setStreamedResponse(StreamedBodyResponse.newBuilder()
63926555
.setBody(request.getResponseBody().getBody())
6393-
.setEndOfStream(request.getResponseBody().getEndOfStream())
63946556
.build())
63956557
.build())
63966558
.build())
@@ -6635,6 +6797,9 @@ public void onNext(ProcessingRequest request) {
66356797
.setStreamedResponse(
66366798
StreamedBodyResponse.newBuilder()
66376799
.setEndOfStream(true)
6800+
.setEndOfStreamWithoutMessage(
6801+
request.getRequestBody()
6802+
.getEndOfStreamWithoutMessage())
66386803
.build())
66396804
.build())
66406805
.build())
@@ -6670,8 +6835,6 @@ public void onNext(ProcessingRequest request) {
66706835
.setStreamedResponse(
66716836
StreamedBodyResponse.newBuilder()
66726837
.setBody(request.getResponseBody().getBody())
6673-
.setEndOfStream(
6674-
request.getResponseBody().getEndOfStream())
66756838
.build())
66766839
.build())
66776840
.build())

0 commit comments

Comments
 (0)