|
49 | 49 | import io.opentelemetry.api.baggage.Baggage; |
50 | 50 | import io.opentelemetry.api.common.AttributesBuilder; |
51 | 51 | import io.opentelemetry.context.Context; |
| 52 | +import io.opentelemetry.context.propagation.ContextPropagators; |
52 | 53 | import java.util.ArrayList; |
53 | 54 | import java.util.Collection; |
54 | 55 | import java.util.Collections; |
@@ -100,24 +101,28 @@ final class OpenTelemetryMetricsModule { |
100 | 101 | private final boolean localityEnabled; |
101 | 102 | private final boolean backendServiceEnabled; |
102 | 103 | private final ImmutableList<OpenTelemetryPlugin> plugins; |
| 104 | + private final ContextPropagators aggregators; |
103 | 105 | @Nullable |
104 | 106 | private final TargetFilter targetAttributeFilter; |
105 | 107 |
|
106 | 108 | OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier, |
107 | 109 | OpenTelemetryMetricsResource resource, |
108 | | - Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins) { |
109 | | - this(stopwatchSupplier, resource, optionalLabels, plugins, null); |
| 110 | + Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins, |
| 111 | + ContextPropagators aggregators) { |
| 112 | + this(stopwatchSupplier, resource, optionalLabels, plugins, aggregators, null); |
110 | 113 | } |
111 | 114 |
|
112 | 115 | OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier, |
113 | 116 | OpenTelemetryMetricsResource resource, |
114 | 117 | Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins, |
| 118 | + ContextPropagators aggregators, |
115 | 119 | @Nullable TargetFilter targetAttributeFilter) { |
116 | 120 | this.resource = checkNotNull(resource, "resource"); |
117 | 121 | this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); |
118 | 122 | this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey()); |
119 | 123 | this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey()); |
120 | 124 | this.plugins = ImmutableList.copyOf(plugins); |
| 125 | + this.aggregators = checkNotNull(aggregators, "aggregators"); |
121 | 126 | this.targetAttributeFilter = targetAttributeFilter; |
122 | 127 | } |
123 | 128 |
|
@@ -159,8 +164,7 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) |
159 | 164 | return isGeneratedMethod ? fullMethodName : "other"; |
160 | 165 | } |
161 | 166 |
|
162 | | - private static Context otelContextWithBaggage() { |
163 | | - Baggage baggage = BAGGAGE_KEY.get(); |
| 167 | + private static Context otelContextWithBaggage(Baggage baggage) { |
164 | 168 | if (baggage == null) { |
165 | 169 | return Context.current(); |
166 | 170 | } |
@@ -282,7 +286,7 @@ public void streamClosed(Status status) { |
282 | 286 | } |
283 | 287 |
|
284 | 288 | void recordFinishedAttempt() { |
285 | | - Context otelContext = otelContextWithBaggage(); |
| 289 | + Context otelContext = otelContextWithBaggage(BAGGAGE_KEY.get()); |
286 | 290 | AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() |
287 | 291 | .put(METHOD_KEY, fullMethodName) |
288 | 292 | .put(TARGET_KEY, target) |
@@ -448,7 +452,7 @@ void callEnded(Status status) { |
448 | 452 | } |
449 | 453 |
|
450 | 454 | void recordFinishedCall() { |
451 | | - Context otelContext = otelContextWithBaggage(); |
| 455 | + Context otelContext = otelContextWithBaggage(BAGGAGE_KEY.get()); |
452 | 456 | if (attemptsPerCall.get() == 0) { |
453 | 457 | ClientTracer tracer = newClientTracer(null); |
454 | 458 | tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS); |
@@ -553,13 +557,15 @@ private static final class ServerTracer extends ServerStreamTracer { |
553 | 557 | private final Stopwatch stopwatch; |
554 | 558 | private volatile long outboundWireSize; |
555 | 559 | private volatile long inboundWireSize; |
| 560 | + private final Context otelContext; |
556 | 561 |
|
557 | 562 | ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName, |
558 | | - List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins) { |
| 563 | + List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins, Context otelContext) { |
559 | 564 | this.module = checkNotNull(module, "module"); |
560 | 565 | this.fullMethodName = fullMethodName; |
561 | 566 | this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins"); |
562 | 567 | this.stopwatch = module.stopwatchSupplier.get().start(); |
| 568 | + this.otelContext = checkNotNull(otelContext, "otelContext"); |
563 | 569 | } |
564 | 570 |
|
565 | 571 | @Override |
@@ -606,7 +612,6 @@ public void inboundWireSize(long bytes) { |
606 | 612 | */ |
607 | 613 | @Override |
608 | 614 | public void streamClosed(Status status) { |
609 | | - Context otelContext = otelContextWithBaggage(); |
610 | 615 | if (streamClosedUpdater != null) { |
611 | 616 | if (streamClosedUpdater.getAndSet(this, 1) != 0) { |
612 | 617 | return; |
@@ -657,7 +662,10 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata |
657 | 662 | } |
658 | 663 | streamPlugins = Collections.unmodifiableList(streamPluginsMutable); |
659 | 664 | } |
660 | | - return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins); |
| 665 | + Context context = aggregators.getTextMapPropagator().extract( |
| 666 | + Context.current(), headers, MetadataGetter.getInstance()); |
| 667 | + return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins, |
| 668 | + context); |
661 | 669 | } |
662 | 670 | } |
663 | 671 |
|
|
0 commit comments