Skip to content

Commit 7951fb3

Browse files
committed
Fix the behaviour of asynchronous counters and up-down counters.
Unlike synchronous counters which take the increment/delta value, asynchronous callbacks report the absolute value of the counter.
1 parent fc39960 commit 7951fb3

15 files changed

Lines changed: 408 additions & 60 deletions

File tree

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/OtelInstrumentType.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,21 @@
22

33
public enum OtelInstrumentType {
44
// same order as io.opentelemetry.sdk.metrics.InstrumentType
5-
COUNTER,
6-
UP_DOWN_COUNTER,
7-
HISTOGRAM,
8-
OBSERVABLE_COUNTER,
9-
OBSERVABLE_UP_DOWN_COUNTER,
10-
OBSERVABLE_GAUGE,
11-
GAUGE,
5+
COUNTER(false),
6+
UP_DOWN_COUNTER(false),
7+
HISTOGRAM(false),
8+
OBSERVABLE_COUNTER(true),
9+
OBSERVABLE_UP_DOWN_COUNTER(true),
10+
OBSERVABLE_GAUGE(true),
11+
GAUGE(false);
12+
13+
private final boolean observable;
14+
15+
OtelInstrumentType(boolean observable) {
16+
this.observable = observable;
17+
}
18+
19+
public boolean isObservable() {
20+
return observable;
21+
}
1222
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package datadog.trace.bootstrap.otel.metrics.data;
2+
3+
import datadog.trace.bootstrap.otlp.metrics.OtlpDataPoint;
4+
import datadog.trace.bootstrap.otlp.metrics.OtlpDoublePoint;
5+
6+
/** Reports the delta value since the last reset. */
7+
final class OtelDoubleDelta extends OtelAggregator {
8+
private volatile double value;
9+
private double lastValue;
10+
11+
@Override
12+
void doRecordDouble(double value) {
13+
this.value = value;
14+
}
15+
16+
@Override
17+
OtlpDataPoint doCollect(boolean reset) {
18+
double collectedValue = value;
19+
double delta = collectedValue - lastValue;
20+
if (reset) {
21+
lastValue = collectedValue;
22+
}
23+
return new OtlpDoublePoint(delta);
24+
}
25+
}

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/data/OtelDoubleSum.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import datadog.trace.bootstrap.otlp.metrics.OtlpDoublePoint;
55
import java.util.concurrent.atomic.DoubleAdder;
66

7+
/** Reports the sum of values since the last reset. */
78
final class OtelDoubleSum extends OtelAggregator {
89
private final DoubleAdder total = new DoubleAdder();
910

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/data/OtelDoubleValue.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import datadog.trace.bootstrap.otlp.metrics.OtlpDataPoint;
44
import datadog.trace.bootstrap.otlp.metrics.OtlpDoublePoint;
55

6+
/** Always reports the latest value. */
67
final class OtelDoubleValue extends OtelAggregator {
78
private volatile double value;
89

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/data/OtelHistogramSketch.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import datadog.trace.bootstrap.otlp.metrics.OtlpHistogramPoint;
77
import java.util.List;
88

9+
/** Reports the histogram of values since the last reset. */
910
final class OtelHistogramSketch extends OtelAggregator {
1011
private final HistogramWithSum histogram;
1112

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package datadog.trace.bootstrap.otel.metrics.data;
2+
3+
import datadog.trace.bootstrap.otlp.metrics.OtlpDataPoint;
4+
import datadog.trace.bootstrap.otlp.metrics.OtlpLongPoint;
5+
6+
/** Reports the delta value since the last reset. */
7+
final class OtelLongDelta extends OtelAggregator {
8+
private volatile long value;
9+
private long lastValue;
10+
11+
@Override
12+
void doRecordLong(long value) {
13+
this.value = value;
14+
}
15+
16+
@Override
17+
OtlpDataPoint doCollect(boolean reset) {
18+
long collectedValue = value;
19+
long delta = collectedValue - lastValue;
20+
if (reset) {
21+
lastValue = collectedValue;
22+
}
23+
return new OtlpLongPoint(delta);
24+
}
25+
}

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/data/OtelLongSum.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import datadog.trace.bootstrap.otlp.metrics.OtlpLongPoint;
55
import java.util.concurrent.atomic.LongAdder;
66

7+
/** Reports the sum of values since the last reset. */
78
final class OtelLongSum extends OtelAggregator {
89
private final LongAdder total = new LongAdder();
910

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/data/OtelLongValue.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import datadog.trace.bootstrap.otlp.metrics.OtlpDataPoint;
44
import datadog.trace.bootstrap.otlp.metrics.OtlpLongPoint;
55

6+
/** Always reports the latest value. */
67
final class OtelLongValue extends OtelAggregator {
78
private volatile long value;
89

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/data/OtelMetricStorage.java

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import datadog.trace.bootstrap.otel.metrics.OtelInstrumentDescriptor;
1212
import datadog.trace.bootstrap.otel.metrics.OtelInstrumentType;
1313
import datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor;
14+
import datadog.trace.bootstrap.otlp.metrics.OtlpDataPoint;
1415
import datadog.trace.bootstrap.otlp.metrics.OtlpMetricVisitor;
1516
import io.opentelemetry.api.common.Attributes;
1617
import java.util.Collections;
@@ -46,6 +47,7 @@ public final class OtelMetricStorage {
4647

4748
private final OtelInstrumentDescriptor descriptor;
4849
private final boolean resetOnCollect;
50+
private final boolean toggleRecordings;
4951
private final Function<Object, OtelAggregator> aggregatorSupplier;
5052
private volatile Recording currentRecording;
5153

@@ -56,9 +58,12 @@ private OtelMetricStorage(
5658
OtelInstrumentDescriptor descriptor, Supplier<OtelAggregator> aggregatorSupplier) {
5759
this.descriptor = descriptor;
5860
this.resetOnCollect = shouldResetOnCollect(descriptor.getType());
61+
// no need to toggle if not resetting on collect, or if it's an observable instrument
62+
// (observables are always invoked within the collect cycle, so no concurrent writers)
63+
this.toggleRecordings = resetOnCollect && !descriptor.getType().isObservable();
5964
this.aggregatorSupplier = unused -> aggregatorSupplier.get();
6065
this.currentRecording = new Recording();
61-
if (resetOnCollect) {
66+
if (toggleRecordings) {
6267
this.previousRecording = new Recording();
6368
}
6469
}
@@ -86,6 +91,10 @@ public static OtelMetricStorage newDoubleValueStorage(OtelInstrumentDescriptor d
8691
return new OtelMetricStorage(descriptor, OtelDoubleValue::new);
8792
}
8893

94+
public static OtelMetricStorage newDoubleDeltaStorage(OtelInstrumentDescriptor descriptor) {
95+
return new OtelMetricStorage(descriptor, OtelDoubleDelta::new);
96+
}
97+
8998
public static OtelMetricStorage newLongSumStorage(OtelInstrumentDescriptor descriptor) {
9099
return new OtelMetricStorage(descriptor, OtelLongSum::new);
91100
}
@@ -94,6 +103,10 @@ public static OtelMetricStorage newLongValueStorage(OtelInstrumentDescriptor des
94103
return new OtelMetricStorage(descriptor, OtelLongValue::new);
95104
}
96105

106+
public static OtelMetricStorage newLongDeltaStorage(OtelInstrumentDescriptor descriptor) {
107+
return new OtelMetricStorage(descriptor, OtelLongDelta::new);
108+
}
109+
97110
public static OtelMetricStorage newHistogramStorage(
98111
OtelInstrumentDescriptor descriptor, List<Double> bucketBoundaries) {
99112
return new OtelMetricStorage(descriptor, () -> new OtelHistogramSketch(bucketBoundaries));
@@ -108,7 +121,7 @@ public OtelInstrumentDescriptor getDescriptor() {
108121
}
109122

110123
public void recordLong(long value, Object attributes) {
111-
if (resetOnCollect) {
124+
if (toggleRecordings) {
112125
Recording recording = acquireRecordingForWrite();
113126
try {
114127
aggregator(recording.aggregators, attributes).recordLong(value);
@@ -129,7 +142,7 @@ public void recordDouble(double value, Object attributes) {
129142
attributes);
130143
return;
131144
}
132-
if (resetOnCollect) {
145+
if (toggleRecordings) {
133146
Recording recording = acquireRecordingForWrite();
134147
try {
135148
aggregator(recording.aggregators, attributes).recordDouble(value);
@@ -173,26 +186,8 @@ public static void registerAttributeReader(
173186

174187
/** Collect data for CUMULATIVE temporality, keeping aggregators for future writes. */
175188
private void doCollect(OtlpMetricVisitor visitor) {
176-
BiConsumer<Object, OtlpAttributeVisitor> attributesReader = null;
177-
ClassLoader attributesClassLoader = null;
178-
179189
// no need to hold writers back if we are not resetting metrics on collect
180-
for (Map.Entry<Object, OtelAggregator> entry : currentRecording.aggregators.entrySet()) {
181-
OtelAggregator aggregator = entry.getValue();
182-
if (!aggregator.isEmpty()) {
183-
Object attributes = entry.getKey();
184-
ClassLoader cl = attributes.getClass().getClassLoader();
185-
// avoid repeated lookups when attribute class-loader is same for all records
186-
if (attributesReader == null || cl != attributesClassLoader) {
187-
attributesReader = ATTRIBUTE_READERS.get(cl);
188-
attributesClassLoader = cl;
189-
}
190-
if (attributesReader != null) {
191-
attributesReader.accept(attributes, visitor);
192-
}
193-
visitor.visitDataPoint(aggregator.collect());
194-
}
195-
}
190+
collectDataPoints(currentRecording.aggregators, visitor, OtelAggregator::collect);
196191
}
197192

198193
/**
@@ -205,13 +200,15 @@ private void doCollectAndReset(OtlpMetricVisitor visitor) {
205200
// capture _current_ recording for collection, its aggregators will be reset at the end
206201
final Recording recording = currentRecording;
207202

208-
// publish fresh recording for new writers, using aggregators from _previous_ recording
209-
currentRecording = new Recording(previousRecording);
203+
if (toggleRecordings) {
204+
// publish fresh recording for new writers, using aggregators from _previous_ recording
205+
currentRecording = new Recording(previousRecording);
210206

211-
// notify writers that the captured recording is about to be reset
212-
ACTIVITY.addAndGet(recording, RESET_PENDING);
213-
while (recording.activity > 1) {
214-
Thread.yield(); // other threads are still writing to this recording
207+
// notify writers that the captured recording is about to be reset
208+
ACTIVITY.addAndGet(recording, RESET_PENDING);
209+
while (recording.activity > 1) {
210+
Thread.yield(); // other threads are still writing to this recording
211+
}
215212
}
216213

217214
Map<Object, OtelAggregator> aggregators = recording.aggregators;
@@ -221,6 +218,17 @@ private void doCollectAndReset(OtlpMetricVisitor visitor) {
221218
aggregators.values().removeIf(OtelAggregator::isEmpty);
222219
}
223220

221+
collectDataPoints(aggregators, visitor, OtelAggregator::collectAndReset);
222+
223+
if (toggleRecordings) {
224+
previousRecording = recording;
225+
}
226+
}
227+
228+
private void collectDataPoints(
229+
Map<Object, OtelAggregator> aggregators,
230+
OtlpMetricVisitor visitor,
231+
Function<OtelAggregator, OtlpDataPoint> collect) {
224232
BiConsumer<Object, OtlpAttributeVisitor> attributesReader = null;
225233
ClassLoader attributesClassLoader = null;
226234

@@ -237,11 +245,9 @@ private void doCollectAndReset(OtlpMetricVisitor visitor) {
237245
if (attributesReader != null) {
238246
attributesReader.accept(attributes, visitor);
239247
}
240-
visitor.visitDataPoint(aggregator.collectAndReset());
248+
visitor.visitDataPoint(collect.apply(aggregator));
241249
}
242250
}
243-
244-
previousRecording = recording;
245251
}
246252

247253
private Recording acquireRecordingForWrite() {

dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleCounter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public DoubleCounter build() {
7979

8080
@Override
8181
public ObservableDoubleMeasurement buildObserver() {
82-
return meter.registerObservableStorage(builder, OtelMetricStorage::newDoubleSumStorage);
82+
return meter.registerObservableStorage(builder, OtelMetricStorage::newDoubleDeltaStorage);
8383
}
8484

8585
@Override

0 commit comments

Comments
 (0)