4848import io .grpc .opentelemetry .GrpcOpenTelemetry .TargetFilter ;
4949import io .opentelemetry .api .baggage .Baggage ;
5050import io .opentelemetry .api .common .AttributesBuilder ;
51+ import io .opentelemetry .context .Context ;
5152import java .util .ArrayList ;
5253import java .util .Collection ;
5354import java .util .Collections ;
@@ -298,15 +299,15 @@ void recordFinishedAttempt() {
298299
299300 if (module .resource .clientAttemptDurationCounter () != null ) {
300301 module .resource .clientAttemptDurationCounter ()
301- .record (attemptNanos * SECONDS_PER_NANO , attribute );
302+ .record (attemptNanos * SECONDS_PER_NANO , attribute , attemptsState . otelContext );
302303 }
303304 if (module .resource .clientTotalSentCompressedMessageSizeCounter () != null ) {
304305 module .resource .clientTotalSentCompressedMessageSizeCounter ()
305- .record (outboundWireSize , attribute );
306+ .record (outboundWireSize , attribute , attemptsState . otelContext );
306307 }
307308 if (module .resource .clientTotalReceivedCompressedMessageSizeCounter () != null ) {
308309 module .resource .clientTotalReceivedCompressedMessageSizeCounter ()
309- .record (inboundWireSize , attribute );
310+ .record (inboundWireSize , attribute , attemptsState . otelContext );
310311 }
311312 }
312313 }
@@ -321,6 +322,7 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory
321322 private boolean callEnded ;
322323 private final String fullMethodName ;
323324 private final List <OpenTelemetryPlugin .ClientCallPlugin > callPlugins ;
325+ private final Context otelContext ;
324326 private Status status ;
325327 private long retryDelayNanos ;
326328 private long callLatencyNanos ;
@@ -337,11 +339,12 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory
337339 OpenTelemetryMetricsModule module ,
338340 String target ,
339341 String fullMethodName ,
340- List <OpenTelemetryPlugin .ClientCallPlugin > callPlugins ) {
342+ List <OpenTelemetryPlugin .ClientCallPlugin > callPlugins , Context otelContext ) {
341343 this .module = checkNotNull (module , "module" );
342344 this .target = checkNotNull (target , "target" );
343345 this .fullMethodName = checkNotNull (fullMethodName , "fullMethodName" );
344346 this .callPlugins = checkNotNull (callPlugins , "callPlugins" );
347+ this .otelContext = checkNotNull (otelContext , "otelContext" );
345348 this .attemptDelayStopwatch = module .stopwatchSupplier .get ();
346349 this .callStopWatch = module .stopwatchSupplier .get ().start ();
347350
@@ -351,7 +354,7 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory
351354
352355 // Record here in case mewClientStreamTracer() would never be called.
353356 if (module .resource .clientAttemptCountCounter () != null ) {
354- module .resource .clientAttemptCountCounter ().add (1 , attribute );
357+ module .resource .clientAttemptCountCounter ().add (1 , attribute , otelContext );
355358 }
356359 }
357360
@@ -375,7 +378,7 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada
375378 io .opentelemetry .api .common .Attributes .of (METHOD_KEY , fullMethodName ,
376379 TARGET_KEY , target );
377380 if (module .resource .clientAttemptCountCounter () != null ) {
378- module .resource .clientAttemptCountCounter ().add (1 , attribute );
381+ module .resource .clientAttemptCountCounter ().add (1 , attribute , otelContext );
379382 }
380383 }
381384 if (info .isTransparentRetry ()) {
@@ -459,7 +462,8 @@ void recordFinishedCall() {
459462 callLatencyNanos * SECONDS_PER_NANO ,
460463 baseAttributes .toBuilder ()
461464 .put (STATUS_KEY , status .getCode ().toString ())
462- .build ()
465+ .build (),
466+ otelContext
463467 );
464468 }
465469
@@ -468,7 +472,7 @@ void recordFinishedCall() {
468472 long retriesPerCall = Math .max (attemptsPerCall .get () - 1 , 0 );
469473 if (retriesPerCall > 0 ) {
470474 module .resource .clientCallRetriesCounter ()
471- .record (retriesPerCall , baseAttributes );
475+ .record (retriesPerCall , baseAttributes , otelContext );
472476 }
473477 }
474478
@@ -477,7 +481,7 @@ void recordFinishedCall() {
477481 long hedges = hedgedAttemptsPerCall .get ();
478482 if (hedges > 0 ) {
479483 module .resource .clientCallHedgesCounter ()
480- .record (hedges , baseAttributes );
484+ .record (hedges , baseAttributes , otelContext );
481485 }
482486 }
483487
@@ -486,15 +490,16 @@ void recordFinishedCall() {
486490 long transparentRetries = transparentRetriesPerCall .get ();
487491 if (transparentRetries > 0 ) {
488492 module .resource .clientCallTransparentRetriesCounter ()
489- .record (transparentRetries , baseAttributes );
493+ .record (transparentRetries , baseAttributes , otelContext );
490494 }
491495 }
492496
493497 // Retry delay
494498 if (module .resource .clientCallRetryDelayCounter () != null ) {
495499 module .resource .clientCallRetryDelayCounter ().record (
496500 retryDelayNanos * SECONDS_PER_NANO ,
497- baseAttributes
501+ baseAttributes ,
502+ otelContext
498503 );
499504 }
500505 }
@@ -535,12 +540,12 @@ private static final class ServerTracer extends ServerStreamTracer {
535540 private final OpenTelemetryMetricsModule module ;
536541 private final String fullMethodName ;
537542 private final List <OpenTelemetryPlugin .ServerStreamPlugin > streamPlugins ;
543+ private Context otelContext = Context .root ();
538544 private volatile boolean isGeneratedMethod ;
539545 private volatile int streamClosed ;
540546 private final Stopwatch stopwatch ;
541547 private volatile long outboundWireSize ;
542548 private volatile long inboundWireSize ;
543- private volatile Baggage baggage ;
544549
545550 ServerTracer (OpenTelemetryMetricsModule module , String fullMethodName ,
546551 List <OpenTelemetryPlugin .ServerStreamPlugin > streamPlugins ) {
@@ -550,29 +555,31 @@ private static final class ServerTracer extends ServerStreamTracer {
550555 this .stopwatch = module .stopwatchSupplier .get ().start ();
551556 }
552557
558+ @ Override
559+ public io .grpc .Context filterContext (io .grpc .Context context ) {
560+ Baggage baggage = BAGGAGE_KEY .get (context );
561+ if (baggage == null ) {
562+ throw new IllegalStateException ("Baggage from OpenTelemetryTracingModule is missing" );
563+ }
564+ otelContext = Context .current ().with (baggage );
565+ return context ;
566+ }
567+
553568 @ Override
554569 public void serverCallStarted (ServerCallInfo <?, ?> callInfo ) {
555570 // Only record method name as an attribute if isSampledToLocalTracing is set to true,
556571 // which is true for all generated methods. Otherwise, programmatically
557572 // created methods result in high cardinality metrics.
558573 boolean isSampledToLocalTracing = callInfo .getMethodDescriptor ().isSampledToLocalTracing ();
559- baggage = BAGGAGE_KEY .get (io .grpc .Context .current ());
560574 isGeneratedMethod = isSampledToLocalTracing ;
561575
562576 AttributesBuilder builder = io .opentelemetry .api .common .Attributes .builder ()
563577 .put (METHOD_KEY , recordMethodName (fullMethodName , isSampledToLocalTracing ));
564578
565- if (baggage != null ) {
566- for (java .util .Map .Entry <String , io .opentelemetry .api .baggage .BaggageEntry > entry :
567- baggage .asMap ().entrySet ()) {
568- builder .put (entry .getKey (), entry .getValue ().getValue ());
569- }
570- }
571-
572579 io .opentelemetry .api .common .Attributes attributes = builder .build ();
573580
574581 if (module .resource .serverCallCountCounter () != null ) {
575- module .resource .serverCallCountCounter ().add (1 , attributes );
582+ module .resource .serverCallCountCounter ().add (1 , attributes , otelContext );
576583 }
577584 }
578585
@@ -620,29 +627,22 @@ public void streamClosed(Status status) {
620627 .put (METHOD_KEY , recordMethodName (fullMethodName , isGeneratedMethod ))
621628 .put (STATUS_KEY , status .getCode ().toString ());
622629
623- if (baggage != null ) {
624- for (java .util .Map .Entry <String , io .opentelemetry .api .baggage .BaggageEntry > entry :
625- baggage .asMap ().entrySet ()) {
626- builder .put (entry .getKey (), entry .getValue ().getValue ());
627- }
628- }
629-
630630 for (OpenTelemetryPlugin .ServerStreamPlugin plugin : streamPlugins ) {
631631 plugin .addLabels (builder );
632632 }
633633 io .opentelemetry .api .common .Attributes attributes = builder .build ();
634634
635635 if (module .resource .serverCallDurationCounter () != null ) {
636636 module .resource .serverCallDurationCounter ()
637- .record (elapsedTimeNanos * SECONDS_PER_NANO , attributes );
637+ .record (elapsedTimeNanos * SECONDS_PER_NANO , attributes , otelContext );
638638 }
639639 if (module .resource .serverTotalSentCompressedMessageSizeCounter () != null ) {
640640 module .resource .serverTotalSentCompressedMessageSizeCounter ()
641- .record (outboundWireSize , attributes );
641+ .record (outboundWireSize , attributes , otelContext );
642642 }
643643 if (module .resource .serverTotalReceivedCompressedMessageSizeCounter () != null ) {
644644 module .resource .serverTotalReceivedCompressedMessageSizeCounter ()
645- .record (inboundWireSize , attributes );
645+ .record (inboundWireSize , attributes , otelContext );
646646 }
647647 }
648648 }
@@ -700,7 +700,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
700700 final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory (
701701 OpenTelemetryMetricsModule .this , target ,
702702 recordMethodName (method .getFullMethodName (), method .isSampledToLocalTracing ()),
703- callPlugins );
703+ callPlugins , Context . current () );
704704 ClientCall <ReqT , RespT > call =
705705 next .newCall (method , callOptions .withStreamTracerFactory (tracerFactory ));
706706 return new SimpleForwardingClientCall <ReqT , RespT >(call ) {
0 commit comments