Skip to content

Commit 6bf8474

Browse files
committed
composite filter init
1 parent 9d5842b commit 6bf8474

File tree

49 files changed

+5157
-2154
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+5157
-2154
lines changed

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,6 +1115,7 @@ public void start(Listener<RespT> observer, Metadata headers) {
11151115
callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
11161116
}
11171117
if (interceptor != null) {
1118+
// todo: AgraVator this is where the interceptors are applied
11181119
delegate = interceptor.interceptCall(method, callOptions, channel);
11191120
} else {
11201121
delegate = channel.newCall(method, callOptions);

opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private GrpcOpenTelemetry(Builder builder) {
101101
this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels);
102102
this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule(
103103
STOPWATCH_SUPPLIER, resource, optionalLabels, builder.plugins,
104-
builder.targetFilter);
104+
openTelemetrySdk.getPropagators(), builder.targetFilter);
105105
this.openTelemetryTracingModule = new OpenTelemetryTracingModule(openTelemetrySdk);
106106
this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels);
107107
}

opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import io.opentelemetry.api.baggage.Baggage;
5050
import io.opentelemetry.api.common.AttributesBuilder;
5151
import io.opentelemetry.context.Context;
52+
import io.opentelemetry.context.propagation.ContextPropagators;
5253
import java.util.ArrayList;
5354
import java.util.Collection;
5455
import java.util.Collections;
@@ -100,24 +101,28 @@ final class OpenTelemetryMetricsModule {
100101
private final boolean localityEnabled;
101102
private final boolean backendServiceEnabled;
102103
private final ImmutableList<OpenTelemetryPlugin> plugins;
104+
private final ContextPropagators contextPropagators;
103105
@Nullable
104106
private final TargetFilter targetAttributeFilter;
105107

106108
OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
107109
OpenTelemetryMetricsResource resource,
108-
Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins) {
109-
this(stopwatchSupplier, resource, optionalLabels, plugins, null);
110+
Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins,
111+
ContextPropagators contextPropagators) {
112+
this(stopwatchSupplier, resource, optionalLabels, plugins, contextPropagators, null);
110113
}
111114

112115
OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
113116
OpenTelemetryMetricsResource resource,
114117
Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins,
115-
@Nullable TargetFilter targetAttributeFilter) {
118+
ContextPropagators contextPropagators,
119+
@Nullable TargetFilter targetAttributeFilter) {
116120
this.resource = checkNotNull(resource, "resource");
117121
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
118122
this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
119123
this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
120124
this.plugins = ImmutableList.copyOf(plugins);
125+
this.contextPropagators = checkNotNull(contextPropagators, "contextPropagators");
121126
this.targetAttributeFilter = targetAttributeFilter;
122127
}
123128

@@ -159,10 +164,13 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod)
159164
return isGeneratedMethod ? fullMethodName : "other";
160165
}
161166

162-
private static Context otelContextWithBaggage() {
163-
Baggage baggage = BAGGAGE_KEY.get();
167+
private static Context otelContextWithBaggage(@Nullable Baggage baggage) {
164168
if (baggage == null) {
165-
return Context.current();
169+
Baggage currentBaggage = BAGGAGE_KEY.get();
170+
if (currentBaggage == null) {
171+
return Context.current();
172+
}
173+
return Context.current().with(currentBaggage);
166174
}
167175
return Context.current().with(baggage);
168176
}
@@ -282,7 +290,7 @@ public void streamClosed(Status status) {
282290
}
283291

284292
void recordFinishedAttempt() {
285-
Context otelContext = otelContextWithBaggage();
293+
Context otelContext = otelContextWithBaggage(null);
286294
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
287295
.put(METHOD_KEY, fullMethodName)
288296
.put(TARGET_KEY, target)
@@ -448,7 +456,7 @@ void callEnded(Status status) {
448456
}
449457

450458
void recordFinishedCall() {
451-
Context otelContext = otelContextWithBaggage();
459+
Context otelContext = otelContextWithBaggage(null);
452460
if (attemptsPerCall.get() == 0) {
453461
ClientTracer tracer = newClientTracer(null);
454462
tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
@@ -553,13 +561,15 @@ private static final class ServerTracer extends ServerStreamTracer {
553561
private final Stopwatch stopwatch;
554562
private volatile long outboundWireSize;
555563
private volatile long inboundWireSize;
564+
private final Baggage baggage;
556565

557566
ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName,
558-
List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins) {
567+
List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins, @Nullable Baggage baggage) {
559568
this.module = checkNotNull(module, "module");
560569
this.fullMethodName = fullMethodName;
561570
this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins");
562571
this.stopwatch = module.stopwatchSupplier.get().start();
572+
this.baggage = baggage;
563573
}
564574

565575
@Override
@@ -606,7 +616,7 @@ public void inboundWireSize(long bytes) {
606616
*/
607617
@Override
608618
public void streamClosed(Status status) {
609-
Context otelContext = otelContextWithBaggage();
619+
Context otelContext = otelContextWithBaggage(baggage);
610620
if (streamClosedUpdater != null) {
611621
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
612622
return;
@@ -657,7 +667,10 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
657667
}
658668
streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
659669
}
660-
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins);
670+
Context context = contextPropagators.getTextMapPropagator().extract(
671+
Context.current(), headers, MetadataGetter.getInstance());
672+
Baggage baggage = Baggage.fromContext(context);
673+
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins, baggage);
661674
}
662675
}
663676

0 commit comments

Comments
 (0)