Skip to content

Commit bdcf52c

Browse files
committed
xds: Fix ext_proc client stream protocol ordering validation
Split the single `expectedResponses` queue into `expectedRequestResponses` (for client-to-server request messages) and `expectedResponseResponses` (for server-to-client response messages). This decouples validation of the two independent directions on the bidirectional stream, allowing interleaved events to be validated and processed out-of-lockstep. Add `givenBidiStreamInterleavedEvents_whenExtProcRespondsOutOfLockstep_thenSucceeds` to `ExternalProcessorFilterTest` to verify that response headers can be processed independently before pending request body messages are resolved.
1 parent 2e9bd3f commit bdcf52c

2 files changed

Lines changed: 192 additions & 10 deletions

File tree

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -841,7 +841,8 @@ private enum EventType {
841841
private final DataPlaneDelayedCall<InputStream, InputStream> delayedCall;
842842
private final ScheduledExecutorService scheduler;
843843
private final Object streamLock = new Object();
844-
private final Queue<EventType> expectedResponses = new ConcurrentLinkedQueue<>();
844+
private final Queue<EventType> expectedRequestResponses = new ConcurrentLinkedQueue<>();
845+
private final Queue<EventType> expectedResponseResponses = new ConcurrentLinkedQueue<>();
845846
private volatile ClientCallStreamObserver<ProcessingRequest> extProcClientCallRequestObserver;
846847
private final Queue<InputStream> pendingDrainingMessages =
847848
new ConcurrentLinkedQueue<>();
@@ -1067,35 +1068,38 @@ public void onNext(ProcessingResponse response) {
10671068
return;
10681069
}
10691070

1070-
EventType expected = expectedResponses.peek();
10711071
if (response.hasRequestHeaders()) {
1072+
EventType expected = expectedRequestResponses.peek();
10721073
if (expected == null || expected != EventType.REQUEST_HEADERS) {
10731074
internalOnError(Status.UNAVAILABLE
10741075
.withDescription("Protocol error: received response out of order. Expected: "
10751076
+ expected + ", Received: REQUEST_HEADERS")
10761077
.asRuntimeException());
10771078
return;
10781079
}
1079-
expectedResponses.poll();
1080+
expectedRequestResponses.poll();
10801081
} else if (response.hasResponseHeaders()) {
1082+
EventType expected = expectedResponseResponses.peek();
10811083
if (expected == null || expected != EventType.RESPONSE_HEADERS) {
10821084
internalOnError(Status.UNAVAILABLE
10831085
.withDescription("Protocol error: received response out of order. Expected: "
10841086
+ expected + ", Received: RESPONSE_HEADERS")
10851087
.asRuntimeException());
10861088
return;
10871089
}
1088-
expectedResponses.poll();
1090+
expectedResponseResponses.poll();
10891091
} else if (response.hasResponseTrailers()) {
1092+
EventType expected = expectedResponseResponses.peek();
10901093
if (expected == null || expected != EventType.RESPONSE_TRAILERS) {
10911094
internalOnError(Status.UNAVAILABLE
10921095
.withDescription("Protocol error: received response out of order. Expected: "
10931096
+ expected + ", Received: RESPONSE_TRAILERS")
10941097
.asRuntimeException());
10951098
return;
10961099
}
1097-
expectedResponses.poll();
1100+
expectedResponseResponses.poll();
10981101
} else if (response.hasRequestBody()) {
1102+
EventType expected = expectedRequestResponses.peek();
10991103
if (expected == EventType.REQUEST_HEADERS) {
11001104
internalOnError(Status.UNAVAILABLE
11011105
.withDescription(
@@ -1104,8 +1108,8 @@ public void onNext(ProcessingResponse response) {
11041108
return;
11051109
}
11061110
} else if (response.hasResponseBody()) {
1107-
if (expected == EventType.REQUEST_HEADERS
1108-
|| expected == EventType.RESPONSE_HEADERS) {
1111+
EventType expected = expectedResponseResponses.peek();
1112+
if (expected == EventType.RESPONSE_HEADERS) {
11091113
internalOnError(Status.UNAVAILABLE
11101114
.withDescription(
11111115
"Protocol error: received response_body before headers response.")
@@ -1242,11 +1246,11 @@ private void sendToExtProc(ProcessingRequest request) {
12421246
}
12431247

12441248
if (request.hasRequestHeaders()) {
1245-
expectedResponses.add(EventType.REQUEST_HEADERS);
1249+
expectedRequestResponses.add(EventType.REQUEST_HEADERS);
12461250
} else if (request.hasResponseHeaders()) {
1247-
expectedResponses.add(EventType.RESPONSE_HEADERS);
1251+
expectedResponseResponses.add(EventType.RESPONSE_HEADERS);
12481252
} else if (request.hasResponseTrailers()) {
1249-
expectedResponses.add(EventType.RESPONSE_TRAILERS);
1253+
expectedResponseResponses.add(EventType.RESPONSE_TRAILERS);
12501254
}
12511255

12521256
ProcessingRequest requestToSend = request;

xds/src/test/java/io/grpc/xds/ExternalProcessorFilterTest.java

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8428,6 +8428,184 @@ public void onCompleted() {
84288428
channelManager.close();
84298429
}
84308430

8431+
@Test
8432+
public void givenBidiStreamInterleavedEvents_whenExtProcRespondsOutOfLockstep_thenSucceeds()
8433+
throws Exception {
8434+
String uniqueExtProcServerName = InProcessServerBuilder.generateName();
8435+
String uniqueDataPlaneServerName = InProcessServerBuilder.generateName();
8436+
ExecutorService bidiTestExecutor = Executors.newCachedThreadPool();
8437+
8438+
final CountDownLatch sidecarRequestBodyLatch = new CountDownLatch(1);
8439+
final CountDownLatch sidecarResponseHeadersLatch = new CountDownLatch(1);
8440+
final CountDownLatch allDoneLatch = new CountDownLatch(1);
8441+
8442+
ExternalProcessorGrpc.ExternalProcessorImplBase extProcImpl =
8443+
new ExternalProcessorGrpc.ExternalProcessorImplBase() {
8444+
@Override
8445+
public StreamObserver<ProcessingRequest> process(
8446+
final StreamObserver<ProcessingResponse> responseObserver) {
8447+
((ServerCallStreamObserver<ProcessingResponse>) responseObserver).request(100);
8448+
final AtomicReference<StreamObserver<ProcessingResponse>> observerRef =
8449+
new AtomicReference<>(responseObserver);
8450+
return new StreamObserver<ProcessingRequest>() {
8451+
private ProcessingRequest savedRequestBody;
8452+
8453+
@Override
8454+
public void onNext(ProcessingRequest request) {
8455+
if (request.hasRequestBody()) {
8456+
if (request.getRequestBody().getEndOfStream()
8457+
|| request.getRequestBody().getEndOfStreamWithoutMessage()) {
8458+
// This is the half-close request!
8459+
observerRef.get().onNext(ProcessingResponse.newBuilder()
8460+
.setRequestBody(BodyResponse.newBuilder()
8461+
.setResponse(CommonResponse.newBuilder()
8462+
.setBodyMutation(BodyMutation.newBuilder()
8463+
.setStreamedResponse(StreamedBodyResponse.newBuilder()
8464+
.setEndOfStream(true)
8465+
.build())
8466+
.build())
8467+
.build())
8468+
.build())
8469+
.build());
8470+
} else {
8471+
savedRequestBody = request;
8472+
sidecarRequestBodyLatch.countDown();
8473+
}
8474+
} else if (request.hasResponseHeaders()) {
8475+
// When RESPONSE_HEADERS is received, we respond to it first!
8476+
// This is out-of-lockstep because REQUEST_BODY response is still outstanding.
8477+
observerRef.get().onNext(ProcessingResponse.newBuilder()
8478+
.setResponseHeaders(HeadersResponse.newBuilder().build())
8479+
.build());
8480+
sidecarResponseHeadersLatch.countDown();
8481+
8482+
// Now send response to REQUEST_BODY with streamed response containing the body
8483+
if (savedRequestBody != null) {
8484+
observerRef.get().onNext(ProcessingResponse.newBuilder()
8485+
.setRequestBody(BodyResponse.newBuilder()
8486+
.setResponse(CommonResponse.newBuilder()
8487+
.setBodyMutation(BodyMutation.newBuilder()
8488+
.setStreamedResponse(StreamedBodyResponse.newBuilder()
8489+
.setBody(savedRequestBody.getRequestBody().getBody())
8490+
.build())
8491+
.build())
8492+
.build())
8493+
.build())
8494+
.build());
8495+
}
8496+
}
8497+
}
8498+
8499+
@Override
8500+
public void onError(Throwable t) {}
8501+
8502+
@Override
8503+
public void onCompleted() {
8504+
observerRef.get().onCompleted();
8505+
}
8506+
};
8507+
}
8508+
};
8509+
8510+
grpcCleanup.register(InProcessServerBuilder.forName(uniqueExtProcServerName)
8511+
.addService(extProcImpl)
8512+
.executor(bidiTestExecutor)
8513+
.build().start());
8514+
8515+
MutableHandlerRegistry uniqueBidiRegistry = new MutableHandlerRegistry();
8516+
uniqueBidiRegistry.addService(ServerServiceDefinition.builder("test.TestService")
8517+
.addMethod(METHOD_BIDI_STREAMING, ServerCalls.asyncBidiStreamingCall(
8518+
new ServerCalls.BidiStreamingMethod<String, String>() {
8519+
@Override
8520+
public StreamObserver<String> invoke(StreamObserver<String> responseObserver) {
8521+
// Send headers immediately by sending a message when stream starts
8522+
responseObserver.onNext("Welcome");
8523+
return new StreamObserver<String>() {
8524+
@Override
8525+
public void onNext(String value) {}
8526+
8527+
@Override
8528+
public void onError(Throwable t) {}
8529+
8530+
@Override
8531+
public void onCompleted() {
8532+
responseObserver.onCompleted();
8533+
}
8534+
};
8535+
}
8536+
}))
8537+
.build());
8538+
8539+
grpcCleanup.register(InProcessServerBuilder.forName(uniqueDataPlaneServerName)
8540+
.fallbackHandlerRegistry(uniqueBidiRegistry)
8541+
.executor(bidiTestExecutor)
8542+
.build().start());
8543+
8544+
ExternalProcessor proto = createBaseProto(uniqueExtProcServerName)
8545+
.setProcessingMode(ProcessingMode.newBuilder()
8546+
// SKIP so data plane call starts immediately
8547+
.setRequestHeaderMode(ProcessingMode.HeaderSendMode.SKIP)
8548+
// GRPC body mode to trigger REQUEST_BODY
8549+
.setRequestBodyMode(ProcessingMode.BodySendMode.GRPC)
8550+
// SEND to trigger RESPONSE_HEADERS
8551+
.setResponseHeaderMode(ProcessingMode.HeaderSendMode.SEND)
8552+
.build())
8553+
.build();
8554+
ExternalProcessorFilterConfig filterConfig =
8555+
provider.parseFilterConfig(Any.pack(proto), filterContext).config;
8556+
8557+
CachedChannelManager channelManager = new CachedChannelManager(config -> {
8558+
return grpcCleanup.register(InProcessChannelBuilder.forName(uniqueExtProcServerName)
8559+
.executor(bidiTestExecutor)
8560+
.build());
8561+
});
8562+
8563+
ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor(
8564+
filterConfig, channelManager, scheduler, FAKE_CONTEXT);
8565+
8566+
ManagedChannel dataPlaneChannel = grpcCleanup.register(
8567+
InProcessChannelBuilder.forName(uniqueDataPlaneServerName)
8568+
.executor(bidiTestExecutor)
8569+
.build());
8570+
8571+
ClientCall<String, String> clientCall = interceptCall(interceptor,
8572+
METHOD_BIDI_STREAMING,
8573+
DEFAULT_CALL_OPTIONS.withExecutor(bidiTestExecutor),
8574+
dataPlaneChannel);
8575+
8576+
StreamObserver<String> bidiRequestObserver = ClientCalls.asyncBidiStreamingCall(
8577+
clientCall,
8578+
new StreamObserver<String>() {
8579+
@Override
8580+
public void onNext(String value) {}
8581+
8582+
@Override
8583+
public void onError(Throwable t) {}
8584+
8585+
@Override
8586+
public void onCompleted() {
8587+
allDoneLatch.countDown();
8588+
}
8589+
});
8590+
8591+
// Send client message to trigger REQUEST_BODY to ext_proc
8592+
bidiRequestObserver.onNext("ClientMsg");
8593+
8594+
// Wait for ext_proc to process out-of-lockstep events
8595+
assertThat(sidecarRequestBodyLatch.await(10, TimeUnit.SECONDS)).isTrue();
8596+
assertThat(sidecarResponseHeadersLatch.await(10, TimeUnit.SECONDS)).isTrue();
8597+
8598+
// Complete the bidi stream
8599+
bidiRequestObserver.onCompleted();
8600+
assertThat(allDoneLatch.await(10, TimeUnit.SECONDS)).isTrue();
8601+
8602+
// Clean up by cancelling the call explicitly
8603+
clientCall.cancel("Test finished", null);
8604+
8605+
channelManager.close();
8606+
bidiTestExecutor.shutdown();
8607+
}
8608+
84318609
// --- Category 19: Header Response Status Checks ---
84328610

84338611
@Test

0 commit comments

Comments
 (0)