Skip to content

Commit c67d95b

Browse files
committed
xds: Implement Context propagation in ExternalProcessorClientInterceptor
Ensure request-scoped io.grpc.Context is correctly captured and propagated across asynchronous thread boundaries during the external processor client-side interceptor lifecycle. Previously, asynchronous boundaries—including delayed application call activation and downstream listener callbacks—ran on transport executor threads without the request-scoped context, leading to lost telemetry, tracing, and deadline propagation. This change: - Captures the active context as 'callContext' inside DataPlaneClientCall.start(), representing the exact point when the application starts the RPC. - Wraps delayed call activation execution in 'callContext.run(...)' to ensure that downstream transport activation runs under the captured context. - Refactors DataPlaneListener callback invocations to wrap downstream delegate calls (onHeaders, onMessage, onClose, onReady) in 'callContext.run(...)'. - Relies on implicit context propagation during stub.process() stream initiation on the calling application thread. - Adds Category 19 unit tests verifying context propagation to startCall, listener callbacks, and the outbound ext_proc stream stub. - Fixes checkstyle style compliance in main and test source classes.
1 parent 32532c2 commit c67d95b

2 files changed

Lines changed: 323 additions & 8 deletions

File tree

xds/src/main/java/io/grpc/xds/ExternalProcessorClientInterceptor.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import io.grpc.Channel;
4848
import io.grpc.ClientCall;
4949
import io.grpc.ClientInterceptor;
50+
import io.grpc.Context;
5051
import io.grpc.Deadline;
5152
import io.grpc.Detachable;
5253
import io.grpc.DoubleHistogramMetricInstrument;
@@ -434,6 +435,7 @@ private enum EventType {
434435
private final MetricRecorder metricsRecorder;
435436
private final String target;
436437
private final String backendService;
438+
private volatile Context callContext = Context.ROOT;
437439

438440
private long clientHeadersStartNanos;
439441
private long clientHalfCloseStartNanos;
@@ -526,7 +528,7 @@ private void activateCall() {
526528
}
527529
Runnable toRun = delayedCall.setCall(rawCall);
528530
if (toRun != null) {
529-
toRun.run();
531+
callContext.run(toRun);
530532
}
531533
drainPendingRequests();
532534
onReadyNotify();
@@ -611,6 +613,7 @@ private void applyHeaderMutations(Metadata metadata,
611613

612614
@Override
613615
public void start(Listener<InputStream> responseListener, Metadata headers) {
616+
this.callContext = Context.current();
614617
clientHeadersStartNanos = System.nanoTime();
615618
this.requestHeaders = headers;
616619
this.wrappedListener = new DataPlaneListener(responseListener, rawCall, this);
@@ -1204,7 +1207,7 @@ public void onHeaders(Metadata headers) {
12041207
@Override
12051208
public void onMessage(InputStream message) {
12061209
if (dataPlaneClientCall.passThroughMode.get()) {
1207-
delegate().onMessage(message);
1210+
dataPlaneClientCall.callContext.run(() -> delegate().onMessage(message));
12081211
return;
12091212
}
12101213

@@ -1216,7 +1219,7 @@ public void onMessage(InputStream message) {
12161219
if (dataPlaneClientCall.extProcStreamState.get().isCompleted()
12171220
|| dataPlaneClientCall.currentProcessingMode.getResponseBodyMode()
12181221
!= ProcessingMode.BodySendMode.GRPC) {
1219-
delegate().onMessage(message);
1222+
dataPlaneClientCall.callContext.run(() -> delegate().onMessage(message));
12201223
return;
12211224
}
12221225

@@ -1225,7 +1228,8 @@ public void onMessage(InputStream message) {
12251228
sendResponseBodyToExtProc(bodyByteString, false);
12261229

12271230
if (dataPlaneClientCall.config.getObservabilityMode()) {
1228-
delegate().onMessage(new InboundZeroCopyInputStream(bodyByteString));
1231+
dataPlaneClientCall.callContext.run(
1232+
() -> delegate().onMessage(new InboundZeroCopyInputStream(bodyByteString)));
12291233
}
12301234
} catch (IOException e) {
12311235
rawCall.cancel("Failed to read server response", e);
@@ -1275,7 +1279,7 @@ public void onClose(Status status, Metadata trailers) {
12751279
}
12761280

12771281
void onReadyNotify() {
1278-
delegate().onReady();
1282+
dataPlaneClientCall.callContext.run(() -> delegate().onReady());
12791283
}
12801284

12811285
void proceedWithHeaders() {
@@ -1299,7 +1303,7 @@ private void proceedWithHeaders(Metadata headers) {
12991303
dataPlaneClientCall.recordDuration(serverHeadersDuration, durationNanos);
13001304
dataPlaneClientCall.serverHeadersStartNanos = 0;
13011305
}
1302-
delegate().onHeaders(headers);
1306+
dataPlaneClientCall.callContext.run(() -> delegate().onHeaders(headers));
13031307
}
13041308

13051309
void proceedWithClose() {
@@ -1318,11 +1322,12 @@ private void proceedWithClose(Status status, Metadata trailers) {
13181322
dataPlaneClientCall.recordDuration(serverTrailersDuration, durationNanos);
13191323
dataPlaneClientCall.serverTrailersStartNanos = 0;
13201324
}
1321-
delegate().onClose(status, trailers);
1325+
dataPlaneClientCall.callContext.run(() -> delegate().onClose(status, trailers));
13221326
}
13231327

13241328
void onExternalBody(ByteString body) {
1325-
delegate().onMessage(new InboundZeroCopyInputStream(body));
1329+
dataPlaneClientCall.callContext.run(
1330+
() -> delegate().onMessage(new InboundZeroCopyInputStream(body)));
13261331
}
13271332

13281333
void unblockAfterStreamComplete() {

0 commit comments

Comments
 (0)