Skip to content

Commit fd6abf6

Browse files
committed
otel: fix baggage propagation to metrics
1 parent 02cb1de commit fd6abf6

2 files changed

Lines changed: 79 additions & 7 deletions

File tree

opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,7 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod)
159159
return isGeneratedMethod ? fullMethodName : "other";
160160
}
161161

162-
private static Context otelContextWithBaggage() {
163-
Baggage baggage = BAGGAGE_KEY.get();
162+
private static Context otelContextWithBaggage(@Nullable Baggage baggage) {
164163
if (baggage == null) {
165164
return Context.current();
166165
}
@@ -206,16 +205,19 @@ private static final class ClientTracer extends ClientStreamTracer {
206205
volatile String backendService;
207206
long attemptNanos;
208207
Code statusCode;
208+
final Baggage baggage;
209209

210210
ClientTracer(CallAttemptsTracerFactory attemptsState, OpenTelemetryMetricsModule module,
211211
StreamInfo info, String target, String fullMethodName,
212-
List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins) {
212+
List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins,
213+
Baggage baggage) {
213214
this.attemptsState = attemptsState;
214215
this.module = module;
215216
this.info = info;
216217
this.target = target;
217218
this.fullMethodName = fullMethodName;
218219
this.streamPlugins = streamPlugins;
220+
this.baggage = baggage;
219221
this.stopwatch = module.stopwatchSupplier.get().start();
220222
}
221223

@@ -282,7 +284,7 @@ public void streamClosed(Status status) {
282284
}
283285

284286
void recordFinishedAttempt() {
285-
Context otelContext = otelContextWithBaggage();
287+
Context otelContext = otelContextWithBaggage(baggage);
286288
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
287289
.put(METHOD_KEY, fullMethodName)
288290
.put(TARGET_KEY, target)
@@ -301,6 +303,10 @@ void recordFinishedAttempt() {
301303
}
302304
builder.put(BACKEND_SERVICE_KEY, savedBackendService);
303305
}
306+
for (java.util.Map.Entry<String, io.opentelemetry.api.baggage.BaggageEntry> entry :
307+
baggage.asMap().entrySet()) {
308+
builder.put(entry.getKey(), entry.getValue().getValue());
309+
}
304310
for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
305311
plugin.addLabels(builder);
306312
}
@@ -342,6 +348,7 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory
342348
private int activeStreams;
343349
@GuardedBy("lock")
344350
private boolean finishedCallToBeRecorded;
351+
private final Baggage baggage;
345352

346353
CallAttemptsTracerFactory(
347354
OpenTelemetryMetricsModule module,
@@ -354,6 +361,7 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory
354361
this.callPlugins = checkNotNull(callPlugins, "callPlugins");
355362
this.attemptDelayStopwatch = module.stopwatchSupplier.get();
356363
this.callStopWatch = module.stopwatchSupplier.get().start();
364+
this.baggage = Baggage.fromContext(Context.current());
357365

358366
io.opentelemetry.api.common.Attributes attribute = io.opentelemetry.api.common.Attributes.of(
359367
METHOD_KEY, fullMethodName,
@@ -407,7 +415,8 @@ private ClientTracer newClientTracer(StreamInfo info) {
407415
}
408416
streamPlugins = Collections.unmodifiableList(streamPlugins);
409417
}
410-
return new ClientTracer(this, module, info, target, fullMethodName, streamPlugins);
418+
return new ClientTracer(this, module, info, target, fullMethodName, streamPlugins,
419+
baggage);
411420
}
412421

413422
// Called whenever each attempt is ended.
@@ -448,7 +457,7 @@ void callEnded(Status status) {
448457
}
449458

450459
void recordFinishedCall() {
451-
Context otelContext = otelContextWithBaggage();
460+
Context otelContext = otelContextWithBaggage(baggage);
452461
if (attemptsPerCall.get() == 0) {
453462
ClientTracer tracer = newClientTracer(null);
454463
tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
@@ -463,6 +472,12 @@ void recordFinishedCall() {
463472
METHOD_KEY, fullMethodName,
464473
TARGET_KEY, target
465474
);
475+
AttributesBuilder baseAttributesBuilder = baseAttributes.toBuilder();
476+
for (java.util.Map.Entry<String, io.opentelemetry.api.baggage.BaggageEntry> entry
477+
: baggage.asMap().entrySet()) {
478+
baseAttributesBuilder.put(entry.getKey(), entry.getValue().getValue());
479+
}
480+
baseAttributes = baseAttributesBuilder.build();
466481

467482
// Duration
468483
if (module.resource.clientCallDurationCounter() != null) {
@@ -606,7 +621,8 @@ public void inboundWireSize(long bytes) {
606621
*/
607622
@Override
608623
public void streamClosed(Status status) {
609-
Context otelContext = otelContextWithBaggage();
624+
Baggage baggage = BAGGAGE_KEY.get();
625+
Context otelContext = otelContextWithBaggage(baggage);
610626
if (streamClosedUpdater != null) {
611627
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
612628
return;
@@ -622,6 +638,12 @@ public void streamClosed(Status status) {
622638
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
623639
.put(METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod))
624640
.put(STATUS_KEY, status.getCode().toString());
641+
if (baggage != null) {
642+
for (java.util.Map.Entry<String, io.opentelemetry.api.baggage.BaggageEntry> entry
643+
: baggage.asMap().entrySet()) {
644+
builder.put(entry.getKey(), entry.getValue().getValue());
645+
}
646+
}
625647
for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) {
626648
plugin.addLabels(builder);
627649
}

opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import io.opentelemetry.api.common.AttributeKey;
6666
import io.opentelemetry.api.metrics.DoubleHistogram;
6767
import io.opentelemetry.api.metrics.Meter;
68+
import io.opentelemetry.context.Scope;
6869
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
6970
import io.opentelemetry.sdk.metrics.data.MetricData;
7071
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
@@ -1629,6 +1630,55 @@ public void serverBasicMetrics() {
16291630

16301631
}
16311632

1633+
@Test
1634+
public void clientBaggagePropagationToMetrics() {
1635+
// Create module and tracer factory
1636+
// We use a custom resource with a mock counter to check the Context passed to
1637+
// record()
1638+
DoubleHistogram mockClientAttemptDurationCounter = org.mockito.Mockito
1639+
.mock(DoubleHistogram.class);
1640+
OpenTelemetryMetricsResource customResource = OpenTelemetryMetricsResource.builder()
1641+
.clientAttemptDurationCounter(mockClientAttemptDurationCounter)
1642+
.build();
1643+
1644+
OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
1645+
fakeClock.getStopwatchSupplier(), customResource, emptyList(), emptyList());
1646+
1647+
// Define the test baggage and create a Context with it
1648+
Baggage testBaggage = Baggage.builder()
1649+
.put("user-id", "42")
1650+
.build();
1651+
io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context
1652+
.current().with(testBaggage);
1653+
1654+
// Create Tracer Factory within the Scope of the Context (simulating
1655+
// application thread)
1656+
CallAttemptsTracerFactory tracerFactory;
1657+
try (Scope scope = otelContext.makeCurrent()) {
1658+
tracerFactory = new CallAttemptsTracerFactory(
1659+
module, "target", method.getFullMethodName(), emptyList());
1660+
}
1661+
1662+
// 4. Create a stream tracer (simulating an attempt)
1663+
ClientStreamTracer.StreamInfo streamInfo = ClientStreamTracer.StreamInfo.newBuilder().build();
1664+
ClientStreamTracer tracer = tracerFactory.newClientStreamTracer(streamInfo, new Metadata());
1665+
1666+
// 5. Trigger metric recording
1667+
tracer.streamClosed(Status.OK);
1668+
1669+
// Verify the record call and capture the OTel Context
1670+
verify(mockClientAttemptDurationCounter).record(
1671+
anyDouble(),
1672+
any(io.opentelemetry.api.common.Attributes.class),
1673+
contextCaptor.capture());
1674+
1675+
// Assert on the captured OTel Context
1676+
io.opentelemetry.context.Context capturedOtelContext = contextCaptor.getValue();
1677+
Baggage capturedBaggage = Baggage.fromContext(capturedOtelContext);
1678+
1679+
assertEquals("42", capturedBaggage.getEntryValue("user-id"));
1680+
}
1681+
16321682
@Test
16331683
public void serverBaggagePropagationToMetrics() {
16341684
// 1. Create module and tracer factory using the mock resource

0 commit comments

Comments
 (0)