Skip to content

Commit ac0eae1

Browse files
committed
Updated onClose(Status, Metadata) inside DataPlaneListener to only assign savedStatus and savedTrailers if they are currently null.
This ensures that if an ImmediateResponse is received and sets the initial status and trailers, they are preserved and not overwritten by the cancellation onClose callback triggered subsequently on the transport thread.
1 parent 2d49512 commit ac0eae1

1 file changed

Lines changed: 49 additions & 45 deletions

File tree

xds/src/main/java/io/grpc/xds/ExternalProcessorClientInterceptor.java

Lines changed: 49 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -102,30 +102,7 @@
102102
*/
103103
final class ExternalProcessorClientInterceptor implements ClientInterceptor {
104104

105-
enum ExtProcStreamState {
106-
ACTIVE,
107-
DRAINING,
108-
COMPLETED,
109-
FAILED;
110105

111-
boolean isCompleted() {
112-
return this == COMPLETED || this == FAILED;
113-
}
114-
115-
boolean isFailed() {
116-
return this == FAILED;
117-
}
118-
119-
boolean isDraining() {
120-
return this == DRAINING;
121-
}
122-
}
123-
124-
enum DataPlaneCallState {
125-
IDLE,
126-
ACTIVE,
127-
CLOSED
128-
}
129106

130107
@VisibleForTesting
131108
static final DoubleHistogramMetricInstrument clientHeadersDuration;
@@ -406,6 +383,31 @@ private static class DataPlaneDelayedCall<ReqT, RespT> extends DelayedClientCall
406383
*/
407384
private static class DataPlaneClientCall
408385
extends SimpleForwardingClientCall<InputStream, InputStream> {
386+
enum ExtProcStreamState {
387+
ACTIVE,
388+
DRAINING,
389+
COMPLETED,
390+
FAILED;
391+
392+
boolean isCompleted() {
393+
return this == COMPLETED || this == FAILED;
394+
}
395+
396+
boolean isFailed() {
397+
return this == FAILED;
398+
}
399+
400+
boolean isDraining() {
401+
return this == DRAINING;
402+
}
403+
}
404+
405+
enum DataPlaneCallState {
406+
IDLE,
407+
ACTIVE,
408+
CLOSED
409+
}
410+
409411
private enum EventType {
410412
REQUEST_HEADERS,
411413
REQUEST_BODY,
@@ -420,12 +422,12 @@ private enum EventType {
420422
private final DataPlaneDelayedCall<InputStream, InputStream> delayedCall;
421423
private final ScheduledExecutorService scheduler;
422424
private final Object streamLock = new Object();
423-
private final Queue<EventType> expectedRequestResponses = new ConcurrentLinkedQueue<>();
424-
private final Queue<EventType> expectedResponseResponses = new ConcurrentLinkedQueue<>();
425-
private volatile ClientCallStreamObserver<ProcessingRequest> extProcClientCallRequestObserver;
425+
@Nullable private volatile EventType expectedRequestResponse;
426+
@Nullable private volatile EventType expectedResponseResponse;
427+
@Nullable private volatile ClientCallStreamObserver<ProcessingRequest> extProcClientCallRequestObserver;
426428
private final Queue<InputStream> pendingDrainingMessages =
427429
new ConcurrentLinkedQueue<>();
428-
private volatile DataPlaneListener wrappedListener;
430+
@Nullable private volatile DataPlaneListener wrappedListener;
429431
private final HeaderMutationFilter mutationFilter;
430432
private final HeaderMutator mutator = HeaderMutator.create();
431433
private final AtomicInteger pendingRequests = new AtomicInteger(0);
@@ -445,7 +447,7 @@ private enum EventType {
445447
private boolean protocolConfigSent = false;
446448
private ImmutableMap<String, Struct> collectedAttributes;
447449
private boolean requestAttributesSent = false;
448-
private volatile Metadata requestHeaders;
450+
@Nullable private volatile Metadata requestHeaders;
449451
final AtomicReference<DataPlaneCallState> dataPlaneCallState =
450452
new AtomicReference<>(DataPlaneCallState.IDLE);
451453
final AtomicReference<ExtProcStreamState> extProcStreamState =
@@ -654,37 +656,37 @@ public void onNext(ProcessingResponse response) {
654656
}
655657

656658
if (response.hasRequestHeaders()) {
657-
EventType expected = expectedRequestResponses.peek();
659+
EventType expected = expectedRequestResponse;
658660
if (expected == null || expected != EventType.REQUEST_HEADERS) {
659661
internalOnError(Status.UNAVAILABLE
660662
.withDescription("Protocol error: received response out of order. Expected: "
661663
+ expected + ", Received: REQUEST_HEADERS")
662664
.asRuntimeException());
663665
return;
664666
}
665-
expectedRequestResponses.poll();
667+
expectedRequestResponse = null;
666668
} else if (response.hasResponseHeaders()) {
667-
EventType expected = expectedResponseResponses.peek();
669+
EventType expected = expectedResponseResponse;
668670
if (expected == null || expected != EventType.RESPONSE_HEADERS) {
669671
internalOnError(Status.UNAVAILABLE
670672
.withDescription("Protocol error: received response out of order. Expected: "
671673
+ expected + ", Received: RESPONSE_HEADERS")
672674
.asRuntimeException());
673675
return;
674676
}
675-
expectedResponseResponses.poll();
677+
expectedResponseResponse = null;
676678
} else if (response.hasResponseTrailers()) {
677-
EventType expected = expectedResponseResponses.peek();
679+
EventType expected = expectedResponseResponse;
678680
if (expected == null || expected != EventType.RESPONSE_TRAILERS) {
679681
internalOnError(Status.UNAVAILABLE
680682
.withDescription("Protocol error: received response out of order. Expected: "
681683
+ expected + ", Received: RESPONSE_TRAILERS")
682684
.asRuntimeException());
683685
return;
684686
}
685-
expectedResponseResponses.poll();
687+
expectedResponseResponse = null;
686688
} else if (response.hasRequestBody()) {
687-
EventType expected = expectedRequestResponses.peek();
689+
EventType expected = expectedRequestResponse;
688690
if (expected == EventType.REQUEST_HEADERS) {
689691
internalOnError(Status.UNAVAILABLE
690692
.withDescription(
@@ -693,7 +695,7 @@ public void onNext(ProcessingResponse response) {
693695
return;
694696
}
695697
} else if (response.hasResponseBody()) {
696-
EventType expected = expectedResponseResponses.peek();
698+
EventType expected = expectedResponseResponse;
697699
if (expected == EventType.RESPONSE_HEADERS) {
698700
internalOnError(Status.UNAVAILABLE
699701
.withDescription(
@@ -828,11 +830,11 @@ private void sendToExtProc(ProcessingRequest request) {
828830
}
829831

830832
if (request.hasRequestHeaders()) {
831-
expectedRequestResponses.add(EventType.REQUEST_HEADERS);
833+
expectedRequestResponse = EventType.REQUEST_HEADERS;
832834
} else if (request.hasResponseHeaders()) {
833-
expectedResponseResponses.add(EventType.RESPONSE_HEADERS);
835+
expectedResponseResponse = EventType.RESPONSE_HEADERS;
834836
} else if (request.hasResponseTrailers()) {
835-
expectedResponseResponses.add(EventType.RESPONSE_TRAILERS);
837+
expectedResponseResponse = EventType.RESPONSE_TRAILERS;
836838
}
837839

838840
ProcessingRequest requestToSend = request;
@@ -1173,9 +1175,9 @@ private static class DataPlaneListener extends SimpleForwardingClientCallListene
11731175
private final ClientCall<?, ?> rawCall;
11741176
private final DataPlaneClientCall dataPlaneClientCall;
11751177
private final Queue<InputStream> savedMessages = new ConcurrentLinkedQueue<>();
1176-
private volatile Metadata savedHeaders;
1177-
private volatile Metadata savedTrailers;
1178-
private volatile Status savedStatus;
1178+
@Nullable private volatile Metadata savedHeaders;
1179+
@Nullable private volatile Metadata savedTrailers;
1180+
@Nullable private volatile Status savedStatus;
11791181
private final AtomicBoolean terminationTriggered = new AtomicBoolean(false);
11801182
private final AtomicBoolean responseHeadersSent = new AtomicBoolean(false);
11811183
private final AtomicBoolean trailersOnly = new AtomicBoolean(false);
@@ -1277,7 +1279,7 @@ public void onMessage(InputStream message) {
12771279
@Override
12781280
public void onClose(Status status, Metadata trailers) {
12791281
dataPlaneClientCall.serverTrailersStartNanos = System.nanoTime();
1280-
ExtProcStreamState extProcStreamState = dataPlaneClientCall.extProcStreamState.get();
1282+
DataPlaneClientCall.ExtProcStreamState extProcStreamState = dataPlaneClientCall.extProcStreamState.get();
12811283
if (extProcStreamState.isFailed()
12821284
&& !dataPlaneClientCall.config.getFailureModeAllow()) {
12831285
if (dataPlaneClientCall.markDataPlaneCallClosed()) {
@@ -1293,8 +1295,10 @@ public void onClose(Status status, Metadata trailers) {
12931295
return;
12941296
}
12951297

1296-
this.savedStatus = status;
1297-
this.savedTrailers = trailers;
1298+
if (this.savedStatus == null) {
1299+
this.savedStatus = status;
1300+
this.savedTrailers = trailers;
1301+
}
12981302

12991303
if (savedHeaders != null) {
13001304
return;

0 commit comments

Comments
 (0)