Skip to content

Commit b72e960

Browse files
committed
xds: synchronize savedMessages operations to prevent race conditions during fail-open transitions
Synchronize all enqueue, check, and drain operations on the savedMessages queue inside DataPlaneListener. Added an inboundPassThrough flag to ensure any concurrent incoming messages on the data plane transport thread do not get enqueued and left behind after the queue has been drained.
1 parent ac0eae1 commit b72e960

1 file changed

Lines changed: 33 additions & 19 deletions

File tree

xds/src/main/java/io/grpc/xds/ExternalProcessorClientInterceptor.java

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,10 +1115,7 @@ private void handleImmediateResponse(ImmediateResponse immediate, DataPlaneListe
11151115
applyHeaderMutations(trailers, immediate.getHeaders());
11161116
}
11171117

1118-
// ImmediateResponse should take precedence over any other closure
1119-
// if it arrives before the app is notified.
1120-
listener.savedStatus = status;
1121-
listener.savedTrailers = trailers;
1118+
listener.setImmediateResponse(status, trailers);
11221119

11231120
if (isProcessingTrailers.get()) {
11241121
// If sent in response to a server trailers event, sets the status and optionally
@@ -1175,6 +1172,7 @@ private static class DataPlaneListener extends SimpleForwardingClientCallListene
11751172
private final ClientCall<?, ?> rawCall;
11761173
private final DataPlaneClientCall dataPlaneClientCall;
11771174
private final Queue<InputStream> savedMessages = new ConcurrentLinkedQueue<>();
1175+
private boolean inboundPassThrough = false;
11781176
@Nullable private volatile Metadata savedHeaders;
11791177
@Nullable private volatile Metadata savedTrailers;
11801178
@Nullable private volatile Status savedStatus;
@@ -1203,6 +1201,11 @@ Metadata getSavedTrailers() {
12031201
return savedTrailers;
12041202
}
12051203

1204+
void setImmediateResponse(Status status, Metadata trailers) {
1205+
this.savedStatus = status;
1206+
this.savedTrailers = trailers;
1207+
}
1208+
12061209
@Override
12071210
public void onReady() {
12081211
dataPlaneClientCall.drainPendingRequests();
@@ -1211,7 +1214,6 @@ public void onReady() {
12111214

12121215
@Override
12131216
public void onHeaders(Metadata headers) {
1214-
System.out.println("=== CLIENT RECEIVED RESPONSE HEADERS: " + headers + " ===");
12151217
dataPlaneClientCall.serverHeadersStartNanos = System.nanoTime();
12161218
responseHeadersSent.set(true);
12171219
if (dataPlaneClientCall.extProcStreamState.get().isDraining()) {
@@ -1246,13 +1248,20 @@ public void onHeaders(Metadata headers) {
12461248

12471249
@Override
12481250
public void onMessage(InputStream message) {
1249-
if (dataPlaneClientCall.passThroughMode.get()) {
1250-
dataPlaneClientCall.callContext.run(() -> delegate().onMessage(message));
1251-
return;
1251+
synchronized (savedMessages) {
1252+
if (inboundPassThrough) {
1253+
dataPlaneClientCall.callContext.run(() -> delegate().onMessage(message));
1254+
return;
1255+
}
1256+
1257+
if (savedHeaders != null || dataPlaneClientCall.extProcStreamState.get().isDraining()) {
1258+
savedMessages.add(message);
1259+
return;
1260+
}
12521261
}
12531262

1254-
if (savedHeaders != null || dataPlaneClientCall.extProcStreamState.get().isDraining()) {
1255-
savedMessages.add(message);
1263+
if (dataPlaneClientCall.passThroughMode.get()) {
1264+
dataPlaneClientCall.callContext.run(() -> delegate().onMessage(message));
12561265
return;
12571266
}
12581267

@@ -1331,11 +1340,13 @@ void onReadyNotify() {
13311340
void proceedWithHeaders() {
13321341
if (savedHeaders != null) {
13331342
proceedWithHeaders(savedHeaders);
1334-
savedHeaders = null;
1335-
if (!dataPlaneClientCall.extProcStreamState.get().isDraining()) {
1336-
InputStream msg;
1337-
while ((msg = savedMessages.poll()) != null) {
1338-
onMessage(msg);
1343+
synchronized (savedMessages) {
1344+
savedHeaders = null;
1345+
if (!dataPlaneClientCall.extProcStreamState.get().isDraining()) {
1346+
InputStream msg;
1347+
while ((msg = savedMessages.poll()) != null) {
1348+
onMessage(msg);
1349+
}
13391350
}
13401351
}
13411352
onReadyNotify();
@@ -1386,10 +1397,13 @@ void unblockAfterStreamComplete() {
13861397
}
13871398

13881399
private void proceedWithSavedMessages() {
1389-
InputStream msg;
1390-
while ((msg = savedMessages.poll()) != null) {
1391-
final InputStream finalMsg = msg;
1392-
dataPlaneClientCall.callContext.run(() -> delegate().onMessage(finalMsg));
1400+
synchronized (savedMessages) {
1401+
InputStream msg;
1402+
while ((msg = savedMessages.poll()) != null) {
1403+
final InputStream finalMsg = msg;
1404+
dataPlaneClientCall.callContext.run(() -> delegate().onMessage(finalMsg));
1405+
}
1406+
inboundPassThrough = true;
13931407
}
13941408
}
13951409

0 commit comments

Comments
 (0)