Skip to content

Commit 15788b7

Browse files
authored
openTelemetry: fix baggage prop (#12665)
1 parent a8f2271 commit 15788b7

File tree

2 files changed

+176
-142
lines changed

2 files changed

+176
-142
lines changed

opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -163,14 +163,6 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod)
163163
return isGeneratedMethod ? fullMethodName : "other";
164164
}
165165

166-
private static Context otelContextWithBaggage() {
167-
Baggage baggage = BAGGAGE_KEY.get();
168-
if (baggage == null) {
169-
return Context.current();
170-
}
171-
return Context.current().with(baggage);
172-
}
173-
174166
private static final class ClientTracer extends ClientStreamTracer {
175167
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
176168
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
@@ -286,7 +278,6 @@ public void streamClosed(Status status) {
286278
}
287279

288280
void recordFinishedAttempt() {
289-
Context otelContext = otelContextWithBaggage();
290281
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
291282
.put(METHOD_KEY, fullMethodName)
292283
.put(TARGET_KEY, target)
@@ -316,15 +307,15 @@ void recordFinishedAttempt() {
316307

317308
if (module.resource.clientAttemptDurationCounter() != null ) {
318309
module.resource.clientAttemptDurationCounter()
319-
.record(attemptNanos * SECONDS_PER_NANO, attribute, otelContext);
310+
.record(attemptNanos * SECONDS_PER_NANO, attribute, attemptsState.otelContext);
320311
}
321312
if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) {
322313
module.resource.clientTotalSentCompressedMessageSizeCounter()
323-
.record(outboundWireSize, attribute, otelContext);
314+
.record(outboundWireSize, attribute, attemptsState.otelContext);
324315
}
325316
if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) {
326317
module.resource.clientTotalReceivedCompressedMessageSizeCounter()
327-
.record(inboundWireSize, attribute, otelContext);
318+
.record(inboundWireSize, attribute, attemptsState.otelContext);
328319
}
329320
}
330321
}
@@ -339,6 +330,7 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory
339330
private boolean callEnded;
340331
private final String fullMethodName;
341332
private final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
333+
private final Context otelContext;
342334
private Status status;
343335
private long retryDelayNanos;
344336
private long callLatencyNanos;
@@ -356,11 +348,12 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory
356348
String target,
357349
CallOptions callOptions,
358350
String fullMethodName,
359-
List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins) {
351+
List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins, Context otelContext) {
360352
this.module = checkNotNull(module, "module");
361353
this.target = checkNotNull(target, "target");
362354
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
363355
this.callPlugins = checkNotNull(callPlugins, "callPlugins");
356+
this.otelContext = checkNotNull(otelContext, "otelContext");
364357
this.attemptDelayStopwatch = module.stopwatchSupplier.get();
365358
this.callStopWatch = module.stopwatchSupplier.get().start();
366359

@@ -375,7 +368,7 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory
375368

376369
// Record here in case mewClientStreamTracer() would never be called.
377370
if (module.resource.clientAttemptCountCounter() != null) {
378-
module.resource.clientAttemptCountCounter().add(1, attribute);
371+
module.resource.clientAttemptCountCounter().add(1, attribute, otelContext);
379372
}
380373
}
381374

@@ -404,7 +397,7 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada
404397
}
405398
io.opentelemetry.api.common.Attributes attribute = builder.build();
406399
if (module.resource.clientAttemptCountCounter() != null) {
407-
module.resource.clientAttemptCountCounter().add(1, attribute);
400+
module.resource.clientAttemptCountCounter().add(1, attribute, otelContext);
408401
}
409402
}
410403
if (info.isTransparentRetry()) {
@@ -467,7 +460,6 @@ void callEnded(Status status, CallOptions callOptions) {
467460
}
468461

469462
void recordFinishedCall(CallOptions callOptions) {
470-
Context otelContext = otelContextWithBaggage();
471463
if (attemptsPerCall.get() == 0) {
472464
ClientTracer tracer = newClientTracer(null);
473465
tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
@@ -569,6 +561,7 @@ private static final class ServerTracer extends ServerStreamTracer {
569561
private final OpenTelemetryMetricsModule module;
570562
private final String fullMethodName;
571563
private final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
564+
private Context otelContext = Context.root();
572565
private volatile boolean isGeneratedMethod;
573566
private volatile int streamClosed;
574567
private final Stopwatch stopwatch;
@@ -583,19 +576,31 @@ private static final class ServerTracer extends ServerStreamTracer {
583576
this.stopwatch = module.stopwatchSupplier.get().start();
584577
}
585578

579+
@Override
580+
public io.grpc.Context filterContext(io.grpc.Context context) {
581+
Baggage baggage = BAGGAGE_KEY.get(context);
582+
if (baggage != null) {
583+
otelContext = Context.current().with(baggage);
584+
} else {
585+
otelContext = Context.current();
586+
}
587+
return context;
588+
}
589+
586590
@Override
587591
public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
588592
// Only record method name as an attribute if isSampledToLocalTracing is set to true,
589593
// which is true for all generated methods. Otherwise, programmatically
590594
// created methods result in high cardinality metrics.
591595
boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing();
592596
isGeneratedMethod = isSampledToLocalTracing;
597+
593598
io.opentelemetry.api.common.Attributes attribute =
594599
io.opentelemetry.api.common.Attributes.of(
595600
METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));
596601

597602
if (module.resource.serverCallCountCounter() != null) {
598-
module.resource.serverCallCountCounter().add(1, attribute);
603+
module.resource.serverCallCountCounter().add(1, attribute, otelContext);
599604
}
600605
}
601606

@@ -627,7 +632,6 @@ public void inboundWireSize(long bytes) {
627632
*/
628633
@Override
629634
public void streamClosed(Status status) {
630-
Context otelContext = otelContextWithBaggage();
631635
if (streamClosedUpdater != null) {
632636
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
633637
return;
@@ -678,7 +682,8 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
678682
}
679683
streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
680684
}
681-
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins);
685+
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName,
686+
streamPlugins);
682687
}
683688
}
684689

@@ -716,7 +721,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
716721
final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory(
717722
OpenTelemetryMetricsModule.this, target, callOptions,
718723
recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing()),
719-
callPlugins);
724+
callPlugins, Context.current());
720725
ClientCall<ReqT, RespT> call =
721726
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
722727
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@@ -739,3 +744,4 @@ public void onClose(Status status, Metadata trailers) {
739744
}
740745
}
741746
}
747+

0 commit comments

Comments
 (0)