Skip to content

Commit 12edbfa

Browse files
committed
xds: Fix ext_proc client stream protocol ordering validation
Split the single `expectedResponses` queue into `expectedRequestResponses` (for client-to-server request messages) and `expectedResponseResponses` (for server-to-client response messages). This decouples validation of the two independent directions on the bidirectional stream, allowing interleaved events to be validated and processed out-of-lockstep. Add `givenBidiStreamInterleavedEvents_whenExtProcRespondsOutOfLockstep_thenSucceeds` to `ExternalProcessorFilterTest` to verify that response headers can be processed independently before pending request body messages are resolved.
1 parent 2e9bd3f commit 12edbfa

2 files changed

Lines changed: 178 additions & 10 deletions

File tree

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -841,7 +841,8 @@ private enum EventType {
841841
private final DataPlaneDelayedCall<InputStream, InputStream> delayedCall;
842842
private final ScheduledExecutorService scheduler;
843843
private final Object streamLock = new Object();
844-
private final Queue<EventType> expectedResponses = new ConcurrentLinkedQueue<>();
844+
private final Queue<EventType> expectedRequestResponses = new ConcurrentLinkedQueue<>();
845+
private final Queue<EventType> expectedResponseResponses = new ConcurrentLinkedQueue<>();
845846
private volatile ClientCallStreamObserver<ProcessingRequest> extProcClientCallRequestObserver;
846847
private final Queue<InputStream> pendingDrainingMessages =
847848
new ConcurrentLinkedQueue<>();
@@ -1067,35 +1068,38 @@ public void onNext(ProcessingResponse response) {
10671068
return;
10681069
}
10691070

1070-
EventType expected = expectedResponses.peek();
10711071
if (response.hasRequestHeaders()) {
1072+
EventType expected = expectedRequestResponses.peek();
10721073
if (expected == null || expected != EventType.REQUEST_HEADERS) {
10731074
internalOnError(Status.UNAVAILABLE
10741075
.withDescription("Protocol error: received response out of order. Expected: "
10751076
+ expected + ", Received: REQUEST_HEADERS")
10761077
.asRuntimeException());
10771078
return;
10781079
}
1079-
expectedResponses.poll();
1080+
expectedRequestResponses.poll();
10801081
} else if (response.hasResponseHeaders()) {
1082+
EventType expected = expectedResponseResponses.peek();
10811083
if (expected == null || expected != EventType.RESPONSE_HEADERS) {
10821084
internalOnError(Status.UNAVAILABLE
10831085
.withDescription("Protocol error: received response out of order. Expected: "
10841086
+ expected + ", Received: RESPONSE_HEADERS")
10851087
.asRuntimeException());
10861088
return;
10871089
}
1088-
expectedResponses.poll();
1090+
expectedResponseResponses.poll();
10891091
} else if (response.hasResponseTrailers()) {
1092+
EventType expected = expectedResponseResponses.peek();
10901093
if (expected == null || expected != EventType.RESPONSE_TRAILERS) {
10911094
internalOnError(Status.UNAVAILABLE
10921095
.withDescription("Protocol error: received response out of order. Expected: "
10931096
+ expected + ", Received: RESPONSE_TRAILERS")
10941097
.asRuntimeException());
10951098
return;
10961099
}
1097-
expectedResponses.poll();
1100+
expectedResponseResponses.poll();
10981101
} else if (response.hasRequestBody()) {
1102+
EventType expected = expectedRequestResponses.peek();
10991103
if (expected == EventType.REQUEST_HEADERS) {
11001104
internalOnError(Status.UNAVAILABLE
11011105
.withDescription(
@@ -1104,8 +1108,8 @@ public void onNext(ProcessingResponse response) {
11041108
return;
11051109
}
11061110
} else if (response.hasResponseBody()) {
1107-
if (expected == EventType.REQUEST_HEADERS
1108-
|| expected == EventType.RESPONSE_HEADERS) {
1111+
EventType expected = expectedResponseResponses.peek();
1112+
if (expected == EventType.RESPONSE_HEADERS) {
11091113
internalOnError(Status.UNAVAILABLE
11101114
.withDescription(
11111115
"Protocol error: received response_body before headers response.")
@@ -1242,11 +1246,11 @@ private void sendToExtProc(ProcessingRequest request) {
12421246
}
12431247

12441248
if (request.hasRequestHeaders()) {
1245-
expectedResponses.add(EventType.REQUEST_HEADERS);
1249+
expectedRequestResponses.add(EventType.REQUEST_HEADERS);
12461250
} else if (request.hasResponseHeaders()) {
1247-
expectedResponses.add(EventType.RESPONSE_HEADERS);
1251+
expectedResponseResponses.add(EventType.RESPONSE_HEADERS);
12481252
} else if (request.hasResponseTrailers()) {
1249-
expectedResponses.add(EventType.RESPONSE_TRAILERS);
1253+
expectedResponseResponses.add(EventType.RESPONSE_TRAILERS);
12501254
}
12511255

12521256
ProcessingRequest requestToSend = request;

xds/src/test/java/io/grpc/xds/ExternalProcessorFilterTest.java

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8428,6 +8428,170 @@ public void onCompleted() {
84288428
channelManager.close();
84298429
}
84308430

8431+
@Test
8432+
public void givenBidiStreamInterleavedEvents_whenExtProcRespondsOutOfLockstep_thenSucceeds()
8433+
throws Exception {
8434+
String uniqueExtProcServerName = InProcessServerBuilder.generateName();
8435+
String uniqueDataPlaneServerName = InProcessServerBuilder.generateName();
8436+
ExecutorService bidiTestExecutor = Executors.newCachedThreadPool();
8437+
8438+
final CountDownLatch sidecarRequestBodyLatch = new CountDownLatch(1);
8439+
final CountDownLatch sidecarResponseHeadersLatch = new CountDownLatch(1);
8440+
final CountDownLatch allDoneLatch = new CountDownLatch(1);
8441+
8442+
ExternalProcessorGrpc.ExternalProcessorImplBase extProcImpl =
8443+
new ExternalProcessorGrpc.ExternalProcessorImplBase() {
8444+
@Override
8445+
public StreamObserver<ProcessingRequest> process(
8446+
final StreamObserver<ProcessingResponse> responseObserver) {
8447+
((ServerCallStreamObserver<ProcessingResponse>) responseObserver).request(100);
8448+
final AtomicReference<StreamObserver<ProcessingResponse>> observerRef =
8449+
new AtomicReference<>(responseObserver);
8450+
return new StreamObserver<ProcessingRequest>() {
8451+
private ProcessingRequest savedRequestBody;
8452+
8453+
@Override
8454+
public void onNext(ProcessingRequest request) {
8455+
if (request.hasRequestBody()) {
8456+
if (request.getRequestBody().getEndOfStream() || request.getRequestBody().getEndOfStreamWithoutMessage()) {
8457+
// This is the half-close request!
8458+
observerRef.get().onNext(ProcessingResponse.newBuilder()
8459+
.setRequestBody(BodyResponse.newBuilder()
8460+
.setResponse(CommonResponse.newBuilder()
8461+
.setBodyMutation(BodyMutation.newBuilder()
8462+
.setStreamedResponse(StreamedBodyResponse.newBuilder()
8463+
.setEndOfStream(true)
8464+
.build())
8465+
.build())
8466+
.build())
8467+
.build())
8468+
.build());
8469+
} else {
8470+
savedRequestBody = request;
8471+
sidecarRequestBodyLatch.countDown();
8472+
}
8473+
} else if (request.hasResponseHeaders()) {
8474+
// When RESPONSE_HEADERS is received, we respond to it first!
8475+
// This is out-of-lockstep because REQUEST_BODY response is still outstanding.
8476+
observerRef.get().onNext(ProcessingResponse.newBuilder()
8477+
.setResponseHeaders(HeadersResponse.newBuilder().build())
8478+
.build());
8479+
sidecarResponseHeadersLatch.countDown();
8480+
8481+
// Now send response to REQUEST_BODY with streamed response containing the body
8482+
if (savedRequestBody != null) {
8483+
observerRef.get().onNext(ProcessingResponse.newBuilder()
8484+
.setRequestBody(BodyResponse.newBuilder()
8485+
.setResponse(CommonResponse.newBuilder()
8486+
.setBodyMutation(BodyMutation.newBuilder()
8487+
.setStreamedResponse(StreamedBodyResponse.newBuilder()
8488+
.setBody(savedRequestBody.getRequestBody().getBody())
8489+
.build())
8490+
.build())
8491+
.build())
8492+
.build())
8493+
.build());
8494+
}
8495+
}
8496+
}
8497+
8498+
@Override
8499+
public void onError(Throwable t) {}
8500+
8501+
@Override
8502+
public void onCompleted() {
8503+
observerRef.get().onCompleted();
8504+
}
8505+
};
8506+
}
8507+
};
8508+
8509+
grpcCleanup.register(InProcessServerBuilder.forName(uniqueExtProcServerName)
8510+
.addService(extProcImpl)
8511+
.executor(bidiTestExecutor)
8512+
.build().start());
8513+
8514+
MutableHandlerRegistry uniqueBidiRegistry = new MutableHandlerRegistry();
8515+
uniqueBidiRegistry.addService(ServerServiceDefinition.builder("test.TestService")
8516+
.addMethod(METHOD_BIDI_STREAMING, ServerCalls.asyncBidiStreamingCall(
8517+
new ServerCalls.BidiStreamingMethod<String, String>() {
8518+
@Override
8519+
public StreamObserver<String> invoke(StreamObserver<String> responseObserver) {
8520+
// When bidi stream starts on data plane, send headers immediately by sending a message
8521+
responseObserver.onNext("Welcome");
8522+
return new StreamObserver<String>() {
8523+
@Override public void onNext(String value) {}
8524+
@Override public void onError(Throwable t) {}
8525+
@Override public void onCompleted() {
8526+
responseObserver.onCompleted();
8527+
}
8528+
};
8529+
}
8530+
}))
8531+
.build());
8532+
8533+
grpcCleanup.register(InProcessServerBuilder.forName(uniqueDataPlaneServerName)
8534+
.fallbackHandlerRegistry(uniqueBidiRegistry)
8535+
.executor(bidiTestExecutor)
8536+
.build().start());
8537+
8538+
ExternalProcessor proto = createBaseProto(uniqueExtProcServerName)
8539+
.setProcessingMode(ProcessingMode.newBuilder()
8540+
.setRequestHeaderMode(ProcessingMode.HeaderSendMode.SKIP) // SKIP so data plane call starts immediately
8541+
.setRequestBodyMode(ProcessingMode.BodySendMode.GRPC) // GRPC body mode to trigger REQUEST_BODY
8542+
.setResponseHeaderMode(ProcessingMode.HeaderSendMode.SEND) // SEND to trigger RESPONSE_HEADERS
8543+
.build())
8544+
.build();
8545+
ExternalProcessorFilterConfig filterConfig =
8546+
provider.parseFilterConfig(Any.pack(proto), filterContext).config;
8547+
8548+
CachedChannelManager channelManager = new CachedChannelManager(config -> {
8549+
return grpcCleanup.register(InProcessChannelBuilder.forName(uniqueExtProcServerName)
8550+
.executor(bidiTestExecutor)
8551+
.build());
8552+
});
8553+
8554+
ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor(
8555+
filterConfig, channelManager, scheduler, FAKE_CONTEXT);
8556+
8557+
ManagedChannel dataPlaneChannel = grpcCleanup.register(
8558+
InProcessChannelBuilder.forName(uniqueDataPlaneServerName)
8559+
.executor(bidiTestExecutor)
8560+
.build());
8561+
8562+
ClientCall<String, String> clientCall = interceptCall(interceptor,
8563+
METHOD_BIDI_STREAMING,
8564+
DEFAULT_CALL_OPTIONS.withExecutor(bidiTestExecutor),
8565+
dataPlaneChannel);
8566+
8567+
StreamObserver<String> bidiRequestObserver = ClientCalls.asyncBidiStreamingCall(
8568+
clientCall,
8569+
new StreamObserver<String>() {
8570+
@Override public void onNext(String value) {}
8571+
@Override public void onError(Throwable t) {}
8572+
@Override public void onCompleted() {
8573+
allDoneLatch.countDown();
8574+
}
8575+
});
8576+
8577+
// Send client message to trigger REQUEST_BODY to ext_proc
8578+
bidiRequestObserver.onNext("ClientMsg");
8579+
8580+
// Wait for ext_proc to process out-of-lockstep events
8581+
assertThat(sidecarRequestBodyLatch.await(10, TimeUnit.SECONDS)).isTrue();
8582+
assertThat(sidecarResponseHeadersLatch.await(10, TimeUnit.SECONDS)).isTrue();
8583+
8584+
// Complete the bidi stream
8585+
bidiRequestObserver.onCompleted();
8586+
assertThat(allDoneLatch.await(10, TimeUnit.SECONDS)).isTrue();
8587+
8588+
// Clean up by cancelling the call explicitly
8589+
clientCall.cancel("Test finished", null);
8590+
8591+
channelManager.close();
8592+
bidiTestExecutor.shutdown();
8593+
}
8594+
84318595
// --- Category 19: Header Response Status Checks ---
84328596

84338597
@Test

0 commit comments

Comments
 (0)