Skip to content

Commit 5418feb

Browse files
authored
Implement OpenTelemetry Observable Metrics API (#10631)
Implement Observable Metrics API review feedback Simplify buildWithCallback approach Merge remote-tracking branch 'origin/master' into mcculls/otel-metrics-async-storage Co-authored-by: stuart.mcculloch <stuart.mcculloch@datadoghq.com>
1 parent fa2f591 commit 5418feb

17 files changed

Lines changed: 713 additions & 75 deletions

File tree

dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleCounter.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofDoubles;
44
import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.COUNTER;
5-
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME;
6-
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER;
75

86
import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage;
97
import datadog.trace.relocate.api.RatelimitedLogger;
@@ -74,23 +72,18 @@ public DoubleCounterBuilder setUnit(String unit) {
7472
@Override
7573
public DoubleCounter build() {
7674
return new OtelDoubleCounter(
77-
meter.registerStorage(builder.descriptor(), OtelMetricStorage::newDoubleSumStorage));
75+
meter.registerStorage(builder, OtelMetricStorage::newDoubleSumStorage));
7876
}
7977

8078
@Override
81-
public ObservableDoubleCounter buildWithCallback(
82-
Consumer<ObservableDoubleMeasurement> callback) {
83-
// FIXME: implement callback
84-
return NOOP_METER
85-
.counterBuilder(NOOP_INSTRUMENT_NAME)
86-
.ofDoubles()
87-
.buildWithCallback(callback);
79+
public ObservableDoubleMeasurement buildObserver() {
80+
return meter.registerObservableStorage(builder, OtelMetricStorage::newDoubleSumStorage);
8881
}
8982

9083
@Override
91-
public ObservableDoubleMeasurement buildObserver() {
92-
// FIXME: implement observer
93-
return NOOP_METER.counterBuilder(NOOP_INSTRUMENT_NAME).ofDoubles().buildObserver();
84+
public ObservableDoubleCounter buildWithCallback(
85+
Consumer<ObservableDoubleMeasurement> callback) {
86+
return meter.registerObservableCallback(callback, buildObserver());
9487
}
9588
}
9689
}

dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleGauge.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofDoubles;
44
import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.GAUGE;
5-
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME;
6-
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER;
75

86
import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage;
97
import io.opentelemetry.api.common.Attributes;
@@ -66,19 +64,17 @@ public LongGaugeBuilder ofLongs() {
6664
@Override
6765
public DoubleGauge build() {
6866
return new OtelDoubleGauge(
69-
meter.registerStorage(builder.descriptor(), OtelMetricStorage::newDoubleValueStorage));
67+
meter.registerStorage(builder, OtelMetricStorage::newDoubleValueStorage));
7068
}
7169

7270
@Override
73-
public ObservableDoubleGauge buildWithCallback(Consumer<ObservableDoubleMeasurement> callback) {
74-
// FIXME: implement callback
75-
return NOOP_METER.gaugeBuilder(NOOP_INSTRUMENT_NAME).buildWithCallback(callback);
71+
public ObservableDoubleMeasurement buildObserver() {
72+
return meter.registerObservableStorage(builder, OtelMetricStorage::newDoubleValueStorage);
7673
}
7774

7875
@Override
79-
public ObservableDoubleMeasurement buildObserver() {
80-
// FIXME: implement observer
81-
return NOOP_METER.gaugeBuilder(NOOP_INSTRUMENT_NAME).buildObserver();
76+
public ObservableDoubleGauge buildWithCallback(Consumer<ObservableDoubleMeasurement> callback) {
77+
return meter.registerObservableCallback(callback, buildObserver());
8278
}
8379
}
8480
}

dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleHistogram.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,7 @@ public LongHistogramBuilder ofLongs() {
102102
public DoubleHistogram build() {
103103
return new OtelDoubleHistogram(
104104
meter.registerStorage(
105-
builder.descriptor(),
106-
descriptor -> newHistogramStorage(descriptor, bucketBoundaries)));
105+
builder, descriptor -> newHistogramStorage(descriptor, bucketBoundaries)));
107106
}
108107

109108
static List<Double> validateBoundaries(List<Double> boundaries) {

dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleUpDownCounter.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofDoubles;
44
import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.UP_DOWN_COUNTER;
5-
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME;
6-
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER;
75

86
import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage;
97
import io.opentelemetry.api.common.Attributes;
@@ -60,23 +58,18 @@ public DoubleUpDownCounterBuilder setUnit(String unit) {
6058
@Override
6159
public DoubleUpDownCounter build() {
6260
return new OtelDoubleUpDownCounter(
63-
meter.registerStorage(builder.descriptor(), OtelMetricStorage::newDoubleSumStorage));
61+
meter.registerStorage(builder, OtelMetricStorage::newDoubleSumStorage));
6462
}
6563

6664
@Override
67-
public ObservableDoubleUpDownCounter buildWithCallback(
68-
Consumer<ObservableDoubleMeasurement> callback) {
69-
// FIXME: implement callback
70-
return NOOP_METER
71-
.upDownCounterBuilder(NOOP_INSTRUMENT_NAME)
72-
.ofDoubles()
73-
.buildWithCallback(callback);
65+
public ObservableDoubleMeasurement buildObserver() {
66+
return meter.registerObservableStorage(builder, OtelMetricStorage::newDoubleSumStorage);
7467
}
7568

7669
@Override
77-
public ObservableDoubleMeasurement buildObserver() {
78-
// FIXME: implement observer
79-
return NOOP_METER.upDownCounterBuilder(NOOP_INSTRUMENT_NAME).ofDoubles().buildObserver();
70+
public ObservableDoubleUpDownCounter buildWithCallback(
71+
Consumer<ObservableDoubleMeasurement> callback) {
72+
return meter.registerObservableCallback(callback, buildObserver());
8073
}
8174
}
8275
}

dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelInstrumentBuilder.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,27 @@ OtelInstrumentDescriptor descriptor() {
7575
return new OtelInstrumentDescriptor(
7676
instrumentName, instrumentType, longValues, description, unit);
7777
}
78+
79+
OtelInstrumentDescriptor observableDescriptor() {
80+
return new OtelInstrumentDescriptor(
81+
instrumentName, observableType(instrumentType), longValues, description, unit);
82+
}
83+
84+
/**
85+
* Maps the given {@link OtelInstrumentType} to its observable equivalent.
86+
*
87+
* @throws IllegalArgumentException if the type has no observable equivalent
88+
*/
89+
private OtelInstrumentType observableType(OtelInstrumentType instrumentType) {
90+
switch (instrumentType) {
91+
case COUNTER:
92+
return OtelInstrumentType.OBSERVABLE_COUNTER;
93+
case UP_DOWN_COUNTER:
94+
return OtelInstrumentType.OBSERVABLE_UP_DOWN_COUNTER;
95+
case GAUGE:
96+
return OtelInstrumentType.OBSERVABLE_GAUGE;
97+
default:
98+
throw new IllegalArgumentException(instrumentType + " has no observable equivalent");
99+
}
100+
}
78101
}

dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongCounter.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofLongs;
44
import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.COUNTER;
5-
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME;
6-
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER;
75

86
import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage;
97
import datadog.trace.relocate.api.RatelimitedLogger;
@@ -80,19 +78,17 @@ public DoubleCounterBuilder ofDoubles() {
8078
@Override
8179
public LongCounter build() {
8280
return new OtelLongCounter(
83-
meter.registerStorage(builder.descriptor(), OtelMetricStorage::newLongSumStorage));
81+
meter.registerStorage(builder, OtelMetricStorage::newLongSumStorage));
8482
}
8583

8684
@Override
87-
public ObservableLongCounter buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
88-
// FIXME: implement callback
89-
return NOOP_METER.counterBuilder(NOOP_INSTRUMENT_NAME).buildWithCallback(callback);
85+
public ObservableLongMeasurement buildObserver() {
86+
return meter.registerObservableStorage(builder, OtelMetricStorage::newLongSumStorage);
9087
}
9188

9289
@Override
93-
public ObservableLongMeasurement buildObserver() {
94-
// FIXME: implement observer
95-
return NOOP_METER.counterBuilder(NOOP_INSTRUMENT_NAME).buildObserver();
90+
public ObservableLongCounter buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
91+
return meter.registerObservableCallback(callback, buildObserver());
9692
}
9793
}
9894
}

dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongGauge.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofLongs;
44
import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.GAUGE;
5-
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME;
6-
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER;
75

86
import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage;
97
import io.opentelemetry.api.common.Attributes;
@@ -60,19 +58,17 @@ public LongGaugeBuilder setUnit(String unit) {
6058
@Override
6159
public LongGauge build() {
6260
return new OtelLongGauge(
63-
meter.registerStorage(builder.descriptor(), OtelMetricStorage::newLongValueStorage));
61+
meter.registerStorage(builder, OtelMetricStorage::newLongValueStorage));
6462
}
6563

6664
@Override
67-
public ObservableLongGauge buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
68-
// FIXME: implement callback
69-
return NOOP_METER.gaugeBuilder(NOOP_INSTRUMENT_NAME).ofLongs().buildWithCallback(callback);
65+
public ObservableLongMeasurement buildObserver() {
66+
return meter.registerObservableStorage(builder, OtelMetricStorage::newLongValueStorage);
7067
}
7168

7269
@Override
73-
public ObservableLongMeasurement buildObserver() {
74-
// FIXME: implement observer
75-
return NOOP_METER.gaugeBuilder(NOOP_INSTRUMENT_NAME).ofLongs().buildObserver();
70+
public ObservableLongGauge buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
71+
return meter.registerObservableCallback(callback, buildObserver());
7672
}
7773
}
7874
}

dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongHistogram.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,7 @@ public LongHistogramBuilder setExplicitBucketBoundariesAdvice(List<Long> bucketB
9191
public LongHistogram build() {
9292
return new OtelLongHistogram(
9393
meter.registerStorage(
94-
builder.descriptor(),
95-
descriptor -> newHistogramStorage(descriptor, bucketBoundaries)));
94+
builder, descriptor -> newHistogramStorage(descriptor, bucketBoundaries)));
9695
}
9796
}
9897
}

dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongUpDownCounter.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofLongs;
44
import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.UP_DOWN_COUNTER;
5-
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME;
6-
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER;
75

86
import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage;
97
import io.opentelemetry.api.common.Attributes;
@@ -66,20 +64,18 @@ public DoubleUpDownCounterBuilder ofDoubles() {
6664
@Override
6765
public LongUpDownCounter build() {
6866
return new OtelLongUpDownCounter(
69-
meter.registerStorage(builder.descriptor(), OtelMetricStorage::newLongSumStorage));
67+
meter.registerStorage(builder, OtelMetricStorage::newLongSumStorage));
7068
}
7169

7270
@Override
73-
public ObservableLongUpDownCounter buildWithCallback(
74-
Consumer<ObservableLongMeasurement> callback) {
75-
// FIXME: implement callback
76-
return NOOP_METER.upDownCounterBuilder(NOOP_INSTRUMENT_NAME).buildWithCallback(callback);
71+
public ObservableLongMeasurement buildObserver() {
72+
return meter.registerObservableStorage(builder, OtelMetricStorage::newLongSumStorage);
7773
}
7874

7975
@Override
80-
public ObservableLongMeasurement buildObserver() {
81-
// FIXME: implement observer
82-
return NOOP_METER.upDownCounterBuilder(NOOP_INSTRUMENT_NAME).buildObserver();
76+
public ObservableLongUpDownCounter buildWithCallback(
77+
Consumer<ObservableLongMeasurement> callback) {
78+
return meter.registerObservableCallback(callback, buildObserver());
8379
}
8480
}
8581
}

dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelMeter.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package datadog.opentelemetry.shim.metrics;
22

3+
import static java.util.Collections.singletonList;
4+
import static java.util.stream.Collectors.toList;
5+
import static java.util.stream.Stream.concat;
6+
37
import datadog.opentelemetry.shim.OtelInstrumentationScope;
48
import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage;
59
import datadog.opentelemetry.shim.metrics.export.OtelMeterVisitor;
@@ -11,10 +15,14 @@
1115
import io.opentelemetry.api.metrics.Meter;
1216
import io.opentelemetry.api.metrics.MeterProvider;
1317
import io.opentelemetry.api.metrics.ObservableMeasurement;
18+
import java.util.ArrayList;
19+
import java.util.List;
1420
import java.util.Map;
1521
import java.util.concurrent.ConcurrentHashMap;
22+
import java.util.function.Consumer;
1623
import java.util.function.Function;
1724
import java.util.regex.Pattern;
25+
import java.util.stream.Stream;
1826
import javax.annotation.Nullable;
1927
import javax.annotation.ParametersAreNonnullByDefault;
2028
import org.slf4j.Logger;
@@ -35,6 +43,8 @@ final class OtelMeter implements Meter {
3543
private final Map<OtelInstrumentDescriptor, OtelMetricStorage> storage =
3644
new ConcurrentHashMap<>();
3745

46+
private final List<OtelObservableCallback> observables = new ArrayList<>();
47+
3848
OtelMeter(OtelInstrumentationScope instrumentationScope) {
3949
this.instrumentationScope = instrumentationScope;
4050
}
@@ -76,8 +86,12 @@ public BatchCallback batchCallback(
7686
Runnable callback,
7787
ObservableMeasurement observableMeasurement,
7888
ObservableMeasurement... additionalMeasurements) {
79-
// FIXME: implement callback
80-
return NOOP_METER.batchCallback(callback, observableMeasurement, additionalMeasurements);
89+
return registerObservableCallback(
90+
callback,
91+
concat(Stream.of(observableMeasurement), Stream.of(additionalMeasurements))
92+
.filter(OtelObservableMeasurement.class::isInstance)
93+
.map(OtelObservableMeasurement.class::cast)
94+
.collect(toList()));
8195
}
8296

8397
@Override
@@ -86,12 +100,44 @@ public String toString() {
86100
}
87101

88102
OtelMetricStorage registerStorage(
89-
OtelInstrumentDescriptor descriptor,
103+
OtelInstrumentBuilder builder,
90104
Function<OtelInstrumentDescriptor, OtelMetricStorage> storageFactory) {
91-
return storage.computeIfAbsent(descriptor, storageFactory);
105+
return storage.computeIfAbsent(builder.descriptor(), storageFactory);
106+
}
107+
108+
OtelObservableMeasurement registerObservableStorage(
109+
OtelInstrumentBuilder builder,
110+
Function<OtelInstrumentDescriptor, OtelMetricStorage> storageFactory) {
111+
return new OtelObservableMeasurement(
112+
storage.computeIfAbsent(builder.observableDescriptor(), storageFactory));
113+
}
114+
115+
<M> OtelObservableCallback registerObservableCallback(Consumer<M> callback, M measurement) {
116+
return registerObservableCallback(
117+
() -> callback.accept(measurement), singletonList((OtelObservableMeasurement) measurement));
118+
}
119+
120+
OtelObservableCallback registerObservableCallback(
121+
Runnable callback, List<OtelObservableMeasurement> measurements) {
122+
OtelObservableCallback observable = new OtelObservableCallback(this, callback, measurements);
123+
synchronized (observables) {
124+
observables.add(observable);
125+
}
126+
return observable;
127+
}
128+
129+
boolean unregisterObservableCallback(OtelObservableCallback observable) {
130+
synchronized (observables) {
131+
return observables.remove(observable);
132+
}
92133
}
93134

94135
void collect(OtelMeterVisitor visitor) {
136+
List<OtelObservableCallback> observablesCopy;
137+
synchronized (observables) {
138+
observablesCopy = new ArrayList<>(observables);
139+
}
140+
observablesCopy.forEach(OtelObservableCallback::observeMeasurements);
95141
storage.forEach((descriptor, storage) -> storage.collect(visitor.visitInstrument(descriptor)));
96142
}
97143

0 commit comments

Comments
 (0)