Skip to content

Commit ec14b26

Browse files
committed
Identified some incorrect handlings of end of stream indicator and fixed them:
1. Client-to-Server EOS & Cardinality Decoupling (handleRequestBodyResponse) The Issue: Previously, the filter tracked application-initiated half-closes using an internal halfClosed atomic boolean. Once set, it assumed the very next ProcessingResponse from the sidecar was the final one and immediately half-closed the upstream RPC, violating gRFC A93’s 1-to-N / M-to-N streaming body specification. The Fix: Stateless Half-Close: Completely eliminated halfClosed state tracking; upstream half-closing is now stateless and driven exclusively by sidecar commands. Explicit A93 Protocol Boundaries: In handleRequestBodyResponse(), proceedWithHalfClose() is triggered strictly when the sidecar sends a ProcessingResponse explicitly marked with end_of_stream = true (piggybacked on a body chunk) or end_of_stream_without_message = true. Streaming Queue Decoupling: Refined the expectedResponses queue to enforce 1-to-1 synchronous ordering only for headers and trailers, allowing asynchronous streaming body chunks to be exchanged freely without triggering cardinality mismatch errors. 2. Strict proceedWithClose() Lifecycle on Trailers Response The Issue: handleResponseBodyResponse() previously triggered premature call closure during body processing. Furthermore, there was ambiguity around whether server-to-client body EOS indicators should trigger call completion. The Fix: Removed Body-Triggered Closure: Completely removed proceedWithClose() from server-to-client body processing (handleResponseBodyResponse()), ensuring body EOS indicators do not terminate the RPC. Trailers-Driven Completion: Enforced that client call completion (proceedWithClose()) relies strictly on the receipt of response trailers from the sidecar (hasResponseTrailers()). Clean Handshake for Skipped Trailers: If response_trailer_mode is set to SKIP (or default), the filter notifies the sidecar that the server stream is finished via an empty body carrying end_of_stream_without_message = true, and immediately invokes proceedWithClose() without waiting for a response.
1 parent 59be21b commit ec14b26

2 files changed

Lines changed: 561 additions & 80 deletions

File tree

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,6 @@ private enum EventType {
851851
final AtomicReference<ExtProcStreamState> extProcStreamState =
852852
new AtomicReference<>(ExtProcStreamState.ACTIVE);
853853
final AtomicBoolean passThroughMode = new AtomicBoolean(false);
854-
final AtomicBoolean halfClosed = new AtomicBoolean(false);
855854
final AtomicBoolean requestSideClosed = new AtomicBoolean(false);
856855
final AtomicBoolean isProcessingTrailers = new AtomicBoolean(false);
857856

@@ -1067,28 +1066,47 @@ public void onNext(ProcessingResponse response) {
10671066
}
10681067

10691068
EventType expected = expectedResponses.peek();
1070-
EventType received = null;
10711069
if (response.hasRequestHeaders()) {
1072-
received = EventType.REQUEST_HEADERS;
1073-
} else if (response.hasRequestBody()) {
1074-
received = EventType.REQUEST_BODY;
1070+
if (expected == null || expected != EventType.REQUEST_HEADERS) {
1071+
internalOnError(Status.UNAVAILABLE
1072+
.withDescription("Protocol error: received response out of order. Expected: "
1073+
+ expected + ", Received: REQUEST_HEADERS")
1074+
.asRuntimeException());
1075+
return;
1076+
}
1077+
expectedResponses.poll();
10751078
} else if (response.hasResponseHeaders()) {
1076-
received = EventType.RESPONSE_HEADERS;
1077-
} else if (response.hasResponseBody()) {
1078-
received = EventType.RESPONSE_BODY;
1079+
if (expected == null || expected != EventType.RESPONSE_HEADERS) {
1080+
internalOnError(Status.UNAVAILABLE
1081+
.withDescription("Protocol error: received response out of order. Expected: "
1082+
+ expected + ", Received: RESPONSE_HEADERS")
1083+
.asRuntimeException());
1084+
return;
1085+
}
1086+
expectedResponses.poll();
10791087
} else if (response.hasResponseTrailers()) {
1080-
received = EventType.RESPONSE_TRAILERS;
1081-
}
1082-
1083-
if (received != null) {
1084-
if (expected == null || expected != received) {
1088+
if (expected == null || expected != EventType.RESPONSE_TRAILERS) {
10851089
internalOnError(Status.UNAVAILABLE
10861090
.withDescription("Protocol error: received response out of order. Expected: "
1087-
+ expected + ", Received: " + received)
1091+
+ expected + ", Received: RESPONSE_TRAILERS")
10881092
.asRuntimeException());
10891093
return;
10901094
}
10911095
expectedResponses.poll();
1096+
} else if (response.hasRequestBody()) {
1097+
if (expected == EventType.REQUEST_HEADERS) {
1098+
internalOnError(Status.UNAVAILABLE
1099+
.withDescription("Protocol error: received request_body before request_headers response.")
1100+
.asRuntimeException());
1101+
return;
1102+
}
1103+
} else if (response.hasResponseBody()) {
1104+
if (expected == EventType.REQUEST_HEADERS || expected == EventType.RESPONSE_HEADERS) {
1105+
internalOnError(Status.UNAVAILABLE
1106+
.withDescription("Protocol error: received response_body before headers response.")
1107+
.asRuntimeException());
1108+
return;
1109+
}
10921110
}
10931111

10941112
if (response.getRequestDrain()) {
@@ -1154,6 +1172,7 @@ else if (response.hasResponseTrailers()) {
11541172
response.getResponseTrailers().getHeaderMutation()
11551173
);
11561174
}
1175+
wrappedListener.proceedWithClose();
11571176
}
11581177

11591178
checkEndOfStream(response);
@@ -1219,12 +1238,8 @@ private void sendToExtProc(ProcessingRequest request) {
12191238

12201239
if (request.hasRequestHeaders()) {
12211240
expectedResponses.add(EventType.REQUEST_HEADERS);
1222-
} else if (request.hasRequestBody()) {
1223-
expectedResponses.add(EventType.REQUEST_BODY);
12241241
} else if (request.hasResponseHeaders()) {
12251242
expectedResponses.add(EventType.RESPONSE_HEADERS);
1226-
} else if (request.hasResponseBody()) {
1227-
expectedResponses.add(EventType.RESPONSE_BODY);
12281243
} else if (request.hasResponseTrailers()) {
12291244
expectedResponses.add(EventType.RESPONSE_TRAILERS);
12301245
}
@@ -1396,7 +1411,6 @@ private void proceedWithHalfClose() {
13961411
@Override
13971412
public void halfClose() {
13981413
clientHalfCloseStartNanos = System.nanoTime();
1399-
halfClosed.set(true);
14001414
if (passThroughMode.get() || isExtProcStreamCompleted()) {
14011415
if (requestSideClosed.compareAndSet(false, true)) {
14021416
proceedWithHalfClose();
@@ -1443,13 +1457,11 @@ private void handleRequestBodyResponse(BodyResponse bodyResponse) {
14431457
if (!streamed.getBody().isEmpty()) {
14441458
super.sendMessage(streamed.getBody().newInput());
14451459
}
1446-
}
1447-
}
1448-
// If the application already half-closed, and we just received a response from
1449-
// the sidecar for the last part of the request body, we can now half-close the data plane.
1450-
if (halfClosed.get()) {
1451-
if (requestSideClosed.compareAndSet(false, true)) {
1452-
proceedWithHalfClose();
1460+
if (streamed.getEndOfStream() || streamed.getEndOfStreamWithoutMessage()) {
1461+
if (requestSideClosed.compareAndSet(false, true)) {
1462+
proceedWithHalfClose();
1463+
}
1464+
}
14531465
}
14541466
}
14551467
}
@@ -1463,9 +1475,6 @@ private void handleResponseBodyResponse(
14631475
if (!streamed.getBody().isEmpty()) {
14641476
listener.onExternalBody(streamed.getBody());
14651477
}
1466-
if (streamed.getEndOfStream() || streamed.getEndOfStreamWithoutMessage()) {
1467-
listener.proceedWithClose();
1468-
}
14691478
}
14701479
}
14711480
}
@@ -1777,10 +1786,7 @@ private void triggerCloseHandshake() {
17771786
.build())
17781787
.build());
17791788

1780-
if (dataPlaneClientCall.config.getObservabilityMode()) {
1781-
// In observability mode we don't wait for handshake response
1782-
proceedWithClose();
1783-
}
1789+
proceedWithClose();
17841790
}
17851791
}
17861792

0 commit comments

Comments
 (0)