Skip to content

Commit d173b6e

Browse files
committed
Address review comments.
1 parent b72e960 commit d173b6e

2 files changed

Lines changed: 241 additions & 156 deletions

File tree

xds/src/main/java/io/grpc/xds/ExternalProcessorClientInterceptor.java

Lines changed: 98 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,8 @@ private enum EventType {
424424
private final Object streamLock = new Object();
425425
@Nullable private volatile EventType expectedRequestResponse;
426426
@Nullable private volatile EventType expectedResponseResponse;
427-
@Nullable private volatile ClientCallStreamObserver<ProcessingRequest> extProcClientCallRequestObserver;
427+
@Nullable private volatile ClientCallStreamObserver<ProcessingRequest>
428+
extProcClientCallRequestObserver;
428429
private final Queue<InputStream> pendingDrainingMessages =
429430
new ConcurrentLinkedQueue<>();
430431
@Nullable private volatile DataPlaneListener wrappedListener;
@@ -1166,6 +1167,50 @@ private void checkEndOfStream(ProcessingResponse response) {
11661167
closeExtProcStream();
11671168
}
11681169
}
1170+
1171+
long getServerHeadersStartNanos() {
1172+
return serverHeadersStartNanos;
1173+
}
1174+
1175+
void setServerHeadersStartNanos(long serverHeadersStartNanos) {
1176+
this.serverHeadersStartNanos = serverHeadersStartNanos;
1177+
}
1178+
1179+
long getServerTrailersStartNanos() {
1180+
return serverTrailersStartNanos;
1181+
}
1182+
1183+
void setServerTrailersStartNanos(long serverTrailersStartNanos) {
1184+
this.serverTrailersStartNanos = serverTrailersStartNanos;
1185+
}
1186+
1187+
AtomicReference<ExtProcStreamState> getExtProcStreamState() {
1188+
return extProcStreamState;
1189+
}
1190+
1191+
ProcessingMode getCurrentProcessingMode() {
1192+
return currentProcessingMode;
1193+
}
1194+
1195+
AtomicBoolean getPassThroughMode() {
1196+
return passThroughMode;
1197+
}
1198+
1199+
ExternalProcessorFilterConfig getConfig() {
1200+
return config;
1201+
}
1202+
1203+
Context getCallContext() {
1204+
return callContext;
1205+
}
1206+
1207+
ScheduledExecutorService getScheduler() {
1208+
return scheduler;
1209+
}
1210+
1211+
AtomicBoolean getIsProcessingTrailers() {
1212+
return isProcessingTrailers;
1213+
}
11691214
}
11701215

11711216
private static class DataPlaneListener extends SimpleForwardingClientCallListener<InputStream> {
@@ -1214,20 +1259,20 @@ public void onReady() {
12141259

12151260
@Override
12161261
public void onHeaders(Metadata headers) {
1217-
dataPlaneClientCall.serverHeadersStartNanos = System.nanoTime();
1262+
dataPlaneClientCall.setServerHeadersStartNanos(System.nanoTime());
12181263
responseHeadersSent.set(true);
1219-
if (dataPlaneClientCall.extProcStreamState.get().isDraining()) {
1264+
if (dataPlaneClientCall.getExtProcStreamState().get().isDraining()) {
12201265
this.savedHeaders = headers;
12211266
return;
12221267
}
12231268
boolean sendResponseHeaders =
1224-
dataPlaneClientCall.currentProcessingMode.getResponseHeaderMode()
1269+
dataPlaneClientCall.getCurrentProcessingMode().getResponseHeaderMode()
12251270
== ProcessingMode.HeaderSendMode.SEND
1226-
|| dataPlaneClientCall.currentProcessingMode.getResponseHeaderMode()
1271+
|| dataPlaneClientCall.getCurrentProcessingMode().getResponseHeaderMode()
12271272
== ProcessingMode.HeaderSendMode.DEFAULT;
12281273

1229-
if (dataPlaneClientCall.passThroughMode.get()
1230-
|| dataPlaneClientCall.extProcStreamState.get().isCompleted()
1274+
if (dataPlaneClientCall.getPassThroughMode().get()
1275+
|| dataPlaneClientCall.getExtProcStreamState().get().isCompleted()
12311276
|| !sendResponseHeaders) {
12321277
proceedWithHeaders(headers);
12331278
return;
@@ -1237,11 +1282,11 @@ public void onHeaders(Metadata headers) {
12371282
dataPlaneClientCall.sendToExtProc(ProcessingRequest.newBuilder()
12381283
.setResponseHeaders(HttpHeaders.newBuilder()
12391284
.setHeaders(
1240-
toHeaderMap(headers, dataPlaneClientCall.config.getForwardRulesConfig()))
1285+
toHeaderMap(headers, dataPlaneClientCall.getConfig().getForwardRulesConfig()))
12411286
.build())
12421287
.build());
12431288

1244-
if (dataPlaneClientCall.config.getObservabilityMode()) {
1289+
if (dataPlaneClientCall.getConfig().getObservabilityMode()) {
12451290
proceedWithHeaders();
12461291
}
12471292
}
@@ -1250,34 +1295,35 @@ public void onHeaders(Metadata headers) {
12501295
public void onMessage(InputStream message) {
12511296
synchronized (savedMessages) {
12521297
if (inboundPassThrough) {
1253-
dataPlaneClientCall.callContext.run(() -> delegate().onMessage(message));
1298+
dataPlaneClientCall.getCallContext().run(() -> delegate().onMessage(message));
12541299
return;
12551300
}
12561301

1257-
if (savedHeaders != null || dataPlaneClientCall.extProcStreamState.get().isDraining()) {
1302+
if (savedHeaders != null
1303+
|| dataPlaneClientCall.getExtProcStreamState().get().isDraining()) {
12581304
savedMessages.add(message);
12591305
return;
12601306
}
12611307
}
12621308

1263-
if (dataPlaneClientCall.passThroughMode.get()) {
1264-
dataPlaneClientCall.callContext.run(() -> delegate().onMessage(message));
1309+
if (dataPlaneClientCall.getPassThroughMode().get()) {
1310+
dataPlaneClientCall.getCallContext().run(() -> delegate().onMessage(message));
12651311
return;
12661312
}
12671313

1268-
if (dataPlaneClientCall.extProcStreamState.get().isCompleted()
1269-
|| dataPlaneClientCall.currentProcessingMode.getResponseBodyMode()
1314+
if (dataPlaneClientCall.getExtProcStreamState().get().isCompleted()
1315+
|| dataPlaneClientCall.getCurrentProcessingMode().getResponseBodyMode()
12701316
!= ProcessingMode.BodySendMode.GRPC) {
1271-
dataPlaneClientCall.callContext.run(() -> delegate().onMessage(message));
1317+
dataPlaneClientCall.getCallContext().run(() -> delegate().onMessage(message));
12721318
return;
12731319
}
12741320

12751321
try {
12761322
ByteString bodyByteString = inboundStreamToByteString(message);
12771323
sendResponseBodyToExtProc(bodyByteString, false);
12781324

1279-
if (dataPlaneClientCall.config.getObservabilityMode()) {
1280-
dataPlaneClientCall.callContext.run(
1325+
if (dataPlaneClientCall.getConfig().getObservabilityMode()) {
1326+
dataPlaneClientCall.getCallContext().run(
12811327
() -> delegate().onMessage(new InboundZeroCopyInputStream(bodyByteString)));
12821328
}
12831329
} catch (IOException e) {
@@ -1287,17 +1333,18 @@ public void onMessage(InputStream message) {
12871333

12881334
@Override
12891335
public void onClose(Status status, Metadata trailers) {
1290-
dataPlaneClientCall.serverTrailersStartNanos = System.nanoTime();
1291-
DataPlaneClientCall.ExtProcStreamState extProcStreamState = dataPlaneClientCall.extProcStreamState.get();
1336+
dataPlaneClientCall.setServerTrailersStartNanos(System.nanoTime());
1337+
DataPlaneClientCall.ExtProcStreamState extProcStreamState =
1338+
dataPlaneClientCall.getExtProcStreamState().get();
12921339
if (extProcStreamState.isFailed()
1293-
&& !dataPlaneClientCall.config.getFailureModeAllow()) {
1340+
&& !dataPlaneClientCall.getConfig().getFailureModeAllow()) {
12941341
if (dataPlaneClientCall.markDataPlaneCallClosed()) {
12951342
proceedWithClose(Status.UNAVAILABLE.withDescription("External processor stream failed")
12961343
.withCause(status.getCause()), new Metadata());
12971344
}
12981345
return;
12991346
}
1300-
if (dataPlaneClientCall.passThroughMode.get()) {
1347+
if (dataPlaneClientCall.getPassThroughMode().get()) {
13011348
if (dataPlaneClientCall.markDataPlaneCallClosed()) {
13021349
proceedWithClose(status, trailers);
13031350
}
@@ -1309,11 +1356,13 @@ public void onClose(Status status, Metadata trailers) {
13091356
this.savedTrailers = trailers;
13101357
}
13111358

1359+
// If we are still waiting for the external processor to validate response headers,
1360+
// buffer the close status/trailers and defer the close until headers are processed.
13121361
if (savedHeaders != null) {
13131362
return;
13141363
}
13151364

1316-
if (dataPlaneClientCall.extProcStreamState.get().isDraining()) {
1365+
if (dataPlaneClientCall.getExtProcStreamState().get().isDraining()) {
13171366
return;
13181367
}
13191368

@@ -1323,26 +1372,26 @@ public void onClose(Status status, Metadata trailers) {
13231372

13241373
triggerCloseHandshake();
13251374

1326-
if (dataPlaneClientCall.config.getObservabilityMode()) {
1375+
if (dataPlaneClientCall.getConfig().getObservabilityMode()) {
13271376
proceedWithClose();
13281377
@SuppressWarnings("unused")
1329-
ScheduledFuture<?> unused = dataPlaneClientCall.scheduler.schedule(
1378+
ScheduledFuture<?> unused = dataPlaneClientCall.getScheduler().schedule(
13301379
dataPlaneClientCall::closeExtProcStream,
1331-
dataPlaneClientCall.config.getDeferredCloseTimeoutNanos(),
1380+
dataPlaneClientCall.getConfig().getDeferredCloseTimeoutNanos(),
13321381
TimeUnit.NANOSECONDS);
13331382
}
13341383
}
13351384

13361385
void onReadyNotify() {
1337-
dataPlaneClientCall.callContext.run(() -> delegate().onReady());
1386+
dataPlaneClientCall.getCallContext().run(() -> delegate().onReady());
13381387
}
13391388

13401389
void proceedWithHeaders() {
13411390
if (savedHeaders != null) {
13421391
proceedWithHeaders(savedHeaders);
13431392
synchronized (savedMessages) {
13441393
savedHeaders = null;
1345-
if (!dataPlaneClientCall.extProcStreamState.get().isDraining()) {
1394+
if (!dataPlaneClientCall.getExtProcStreamState().get().isDraining()) {
13461395
InputStream msg;
13471396
while ((msg = savedMessages.poll()) != null) {
13481397
onMessage(msg);
@@ -1357,12 +1406,12 @@ void proceedWithHeaders() {
13571406
}
13581407

13591408
private void proceedWithHeaders(Metadata headers) {
1360-
if (dataPlaneClientCall.serverHeadersStartNanos > 0) {
1361-
long durationNanos = System.nanoTime() - dataPlaneClientCall.serverHeadersStartNanos;
1409+
if (dataPlaneClientCall.getServerHeadersStartNanos() > 0) {
1410+
long durationNanos = System.nanoTime() - dataPlaneClientCall.getServerHeadersStartNanos();
13621411
dataPlaneClientCall.recordDuration(serverHeadersDuration, durationNanos);
1363-
dataPlaneClientCall.serverHeadersStartNanos = 0;
1412+
dataPlaneClientCall.setServerHeadersStartNanos(0);
13641413
}
1365-
dataPlaneClientCall.callContext.run(() -> delegate().onHeaders(headers));
1414+
dataPlaneClientCall.getCallContext().run(() -> delegate().onHeaders(headers));
13661415
}
13671416

13681417
void proceedWithClose() {
@@ -1376,16 +1425,16 @@ void proceedWithClose() {
13761425
}
13771426

13781427
private void proceedWithClose(Status status, Metadata trailers) {
1379-
if (dataPlaneClientCall.serverTrailersStartNanos > 0) {
1380-
long durationNanos = System.nanoTime() - dataPlaneClientCall.serverTrailersStartNanos;
1428+
if (dataPlaneClientCall.getServerTrailersStartNanos() > 0) {
1429+
long durationNanos = System.nanoTime() - dataPlaneClientCall.getServerTrailersStartNanos();
13811430
dataPlaneClientCall.recordDuration(serverTrailersDuration, durationNanos);
1382-
dataPlaneClientCall.serverTrailersStartNanos = 0;
1431+
dataPlaneClientCall.setServerTrailersStartNanos(0);
13831432
}
1384-
dataPlaneClientCall.callContext.run(() -> delegate().onClose(status, trailers));
1433+
dataPlaneClientCall.getCallContext().run(() -> delegate().onClose(status, trailers));
13851434
}
13861435

13871436
void onExternalBody(ByteString body) {
1388-
dataPlaneClientCall.callContext.run(
1437+
dataPlaneClientCall.getCallContext().run(
13891438
() -> delegate().onMessage(new InboundZeroCopyInputStream(body)));
13901439
}
13911440

@@ -1401,26 +1450,26 @@ private void proceedWithSavedMessages() {
14011450
InputStream msg;
14021451
while ((msg = savedMessages.poll()) != null) {
14031452
final InputStream finalMsg = msg;
1404-
dataPlaneClientCall.callContext.run(() -> delegate().onMessage(finalMsg));
1453+
dataPlaneClientCall.getCallContext().run(() -> delegate().onMessage(finalMsg));
14051454
}
14061455
inboundPassThrough = true;
14071456
}
14081457
}
14091458

14101459
private void triggerCloseHandshake() {
1411-
if (dataPlaneClientCall.extProcStreamState.get().isCompleted()
1460+
if (dataPlaneClientCall.getExtProcStreamState().get().isCompleted()
14121461
|| !terminationTriggered.compareAndSet(false, true)) {
14131462
return;
14141463
}
14151464

14161465
boolean sendResponseHeaders =
1417-
dataPlaneClientCall.currentProcessingMode.getResponseHeaderMode()
1466+
dataPlaneClientCall.getCurrentProcessingMode().getResponseHeaderMode()
14181467
== ProcessingMode.HeaderSendMode.SEND
1419-
|| dataPlaneClientCall.currentProcessingMode.getResponseHeaderMode()
1468+
|| dataPlaneClientCall.getCurrentProcessingMode().getResponseHeaderMode()
14201469
== ProcessingMode.HeaderSendMode.DEFAULT;
14211470

14221471
boolean sendResponseTrailers =
1423-
dataPlaneClientCall.currentProcessingMode.getResponseTrailerMode()
1472+
dataPlaneClientCall.getCurrentProcessingMode().getResponseTrailerMode()
14241473
== ProcessingMode.HeaderSendMode.SEND;
14251474

14261475
if (trailersOnly.get()) {
@@ -1430,38 +1479,38 @@ private void triggerCloseHandshake() {
14301479
.setHeaders(
14311480
toHeaderMap(
14321481
savedTrailers,
1433-
dataPlaneClientCall.config.getForwardRulesConfig()))
1482+
dataPlaneClientCall.getConfig().getForwardRulesConfig()))
14341483
.setEndOfStream(true)
14351484
.build())
14361485
.build());
14371486
} else {
14381487
proceedWithClose();
1439-
if (!dataPlaneClientCall.config.getObservabilityMode()) {
1488+
if (!dataPlaneClientCall.getConfig().getObservabilityMode()) {
14401489
dataPlaneClientCall.closeExtProcStream();
14411490
}
14421491
}
14431492
} else if (sendResponseTrailers) {
1444-
dataPlaneClientCall.isProcessingTrailers.set(true);
1493+
dataPlaneClientCall.getIsProcessingTrailers().set(true);
14451494
dataPlaneClientCall.sendToExtProc(ProcessingRequest.newBuilder()
14461495
.setResponseTrailers(HttpTrailers.newBuilder()
14471496
.setTrailers(
14481497
toHeaderMap(
14491498
savedTrailers,
1450-
dataPlaneClientCall.config.getForwardRulesConfig()))
1499+
dataPlaneClientCall.getConfig().getForwardRulesConfig()))
14511500
.build())
14521501
.build());
14531502
} else {
14541503
proceedWithClose();
1455-
if (!dataPlaneClientCall.config.getObservabilityMode()) {
1504+
if (!dataPlaneClientCall.getConfig().getObservabilityMode()) {
14561505
dataPlaneClientCall.closeExtProcStream();
14571506
}
14581507
}
14591508
}
14601509

14611510
private void sendResponseBodyToExtProc(
14621511
@Nullable ByteString bodyByteString, boolean endOfStream) {
1463-
if (dataPlaneClientCall.extProcStreamState.get().isCompleted()
1464-
|| dataPlaneClientCall.currentProcessingMode.getResponseBodyMode()
1512+
if (dataPlaneClientCall.getExtProcStreamState().get().isCompleted()
1513+
|| dataPlaneClientCall.getCurrentProcessingMode().getResponseBodyMode()
14651514
!= ProcessingMode.BodySendMode.GRPC) {
14661515
return;
14671516
}

0 commit comments

Comments
 (0)