Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
Expand All @@ -33,6 +35,9 @@ public final class OtelMetricStorage {
private static final Attributes CARDINALITY_OVERFLOW =
Attributes.builder().put("otel.metric.overflow", true).build();

private static final Map<ClassLoader, BiConsumer<Object, BiConsumer<String, Object>>>
ATTRIBUTE_READERS = new WeakHashMap<>();

private final OtelInstrumentDescriptor descriptor;
private final boolean resetOnCollect;
private final Function<Object, OtelAggregator> aggregatorSupplier;
Expand Down Expand Up @@ -157,13 +162,27 @@ public void collectMetric(OtelMetricVisitor visitor) {
}
}

public static void registerAttributeReader(
ClassLoader cl, BiConsumer<Object, BiConsumer<String, Object>> reader) {
ATTRIBUTE_READERS.put(cl, reader);
}

private static void visitAttributes(Object attributes, OtelMetricVisitor visitor) {
ClassLoader cl = attributes.getClass().getClassLoader();
BiConsumer<Object, BiConsumer<String, Object>> reader = ATTRIBUTE_READERS.get(cl);
if (reader != null) {
reader.accept(attributes, visitor::visitAttribute);
}
}

/** Collect data for CUMULATIVE temporality, keeping aggregators for future writes. */
private void doCollect(OtelMetricVisitor visitor) {
// no need to hold writers back if we are not resetting metrics on collect
currentRecording.aggregators.forEach(
(attributes, aggregator) -> {
if (!aggregator.isEmpty()) {
visitor.visitPoint(attributes, aggregator.collect());
visitAttributes(attributes, visitor);
visitor.visitPoint(aggregator.collect());
}
});
}
Expand Down Expand Up @@ -197,7 +216,8 @@ private void doCollectAndReset(OtelMetricVisitor visitor) {
aggregators.forEach(
(attributes, aggregator) -> {
if (!aggregator.isEmpty()) {
visitor.visitPoint(attributes, aggregator.collectAndReset());
visitAttributes(attributes, visitor);
visitor.visitPoint(aggregator.collectAndReset());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,15 @@

import datadog.trace.bootstrap.otel.metrics.data.OtelPoint;

/** A visitor to visit a metric in an instrumentation scope. */
/**
* A visitor to visit a metric in an instrumentation scope.
*
* <p>Methods must be called in the following order: ( visitAttribute* visitPoint )*
*/
public interface OtelMetricVisitor {
/** Visits an attribute of the upcoming data point. */
void visitAttribute(String key, Object value);

/** Visits a data point in the metric. */
void visitPoint(Object attributes, OtelPoint point);
void visitPoint(OtelPoint point);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package datadog.opentelemetry.shim.metrics;

import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
import datadog.trace.bootstrap.otel.metrics.data.OtelMetricStorage;
import datadog.trace.util.Strings;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterBuilder;
import io.opentelemetry.api.metrics.MeterProvider;
Expand All @@ -22,6 +24,15 @@ public final class OtelMeterProvider implements MeterProvider {
/** Meter shims, indexed by instrumentation scope. */
private final Map<OtelInstrumentationScope, OtelMeter> meters = new ConcurrentHashMap<>();

private OtelMeterProvider() {
// register attribute reader for class-loader where this provider is being used/injected
OtelMetricStorage.registerAttributeReader(
Attributes.class.getClassLoader(),
(attributes, consumer) ->
((Attributes) attributes)
.forEach((attribute, value) -> consumer.accept(attribute.getKey(), value)));
}

@Override
public Meter get(String instrumentationScopeName) {
return getMeterShim(instrumentationScopeName, null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ class MetricsTest extends InstrumentationSpecification {
class MeterReader implements OtelMetricsVisitor, OtelScopedMetricsVisitor, OtelMetricVisitor {
def scopeName
def instrumentName
def attributes = [:]

@Override
OtelScopedMetricsVisitor visitScopedMetrics(OtelInstrumentationScope scope) {
Expand All @@ -443,10 +444,16 @@ class MetricsTest extends InstrumentationSpecification {
}

@Override
void visitPoint(Object attributes, OtelPoint point) {
void visitAttribute(String key, Object value) {
attributes.put(key, value)
}

@Override
void visitPoint(OtelPoint point) {
def key = scopeName + ':' + instrumentName
if (!attributes.isEmpty()) {
key = key + '@' + attributes.asMap()
key = key + '@' + attributes
attributes.clear()
}
switch (point.class) {
case OtelLongPoint:
Expand Down
Loading