11package datadog .opentelemetry .shim .metrics .data ;
22
33import datadog .opentelemetry .shim .metrics .OtelInstrumentDescriptor ;
4+ import datadog .opentelemetry .shim .metrics .OtelInstrumentType ;
45import datadog .opentelemetry .shim .metrics .export .OtelInstrumentVisitor ;
56import datadog .trace .api .Config ;
67import datadog .trace .api .config .OtlpConfig ;
@@ -23,15 +24,16 @@ public final class OtelMetricStorage {
2324 private static final RatelimitedLogger RATELIMITED_LOGGER =
2425 new RatelimitedLogger (LOGGER , 5 , TimeUnit .MINUTES );
2526
27+ private static final OtlpConfig .Temporality TEMPORALITY_PREFERENCE =
28+ Config .get ().getOtlpMetricsTemporalityPreference ();
29+
2630 private static final int CARDINALITY_LIMIT = Config .get ().getMetricsOtelCardinalityLimit ();
2731
2832 private static final Attributes CARDINALITY_OVERFLOW =
2933 Attributes .builder ().put ("otel.metric.overflow" , true ).build ();
3034
31- private static final boolean RESET_ON_COLLECT =
32- Config .get ().getOtlpMetricsTemporalityPreference () == OtlpConfig .Temporality .DELTA ;
33-
3435 private final OtelInstrumentDescriptor descriptor ;
36+ private final boolean resetOnCollect ;
3537 private final Function <Attributes , OtelAggregator > aggregatorSupplier ;
3638 private volatile Recording currentRecording ;
3739
@@ -41,13 +43,31 @@ public final class OtelMetricStorage {
4143 private OtelMetricStorage (
4244 OtelInstrumentDescriptor descriptor , Supplier <OtelAggregator > aggregatorSupplier ) {
4345 this .descriptor = descriptor ;
46+ this .resetOnCollect = shouldResetOnCollect (descriptor .getType ());
4447 this .aggregatorSupplier = unused -> aggregatorSupplier .get ();
4548 this .currentRecording = new Recording ();
46- if (RESET_ON_COLLECT ) {
49+ if (resetOnCollect ) {
4750 this .previousRecording = new Recording ();
4851 }
4952 }
5053
54+ /** Should storage reset on collect? (Depends on instrument type and temporality preference.) */
55+ private static boolean shouldResetOnCollect (OtelInstrumentType type ) {
56+ switch (TEMPORALITY_PREFERENCE ) {
57+ case DELTA :
58+ // gauges and up/down counters stay as cumulative
59+ return type == OtelInstrumentType .HISTOGRAM
60+ || type == OtelInstrumentType .COUNTER
61+ || type == OtelInstrumentType .OBSERVABLE_COUNTER ;
62+ case LOWMEMORY :
63+ // observable counters, gauges, and up/down counters stay as cumulative
64+ return type == OtelInstrumentType .HISTOGRAM || type == OtelInstrumentType .COUNTER ;
65+ case CUMULATIVE :
66+ default :
67+ return false ;
68+ }
69+ }
70+
5171 public static OtelMetricStorage newDoubleSumStorage (OtelInstrumentDescriptor descriptor ) {
5272 return new OtelMetricStorage (descriptor , OtelDoubleSum ::new );
5373 }
@@ -78,20 +98,30 @@ public OtelInstrumentDescriptor getDescriptor() {
7898 }
7999
80100 public void recordLong (long value , Attributes attributes ) {
81- Recording recording = acquireRecordingForWrite ();
82- try {
83- aggregator (recording .aggregators , attributes ).recordLong (value );
84- } finally {
85- releaseRecordingAfterWrite (recording );
101+ if (resetOnCollect ) {
102+ Recording recording = acquireRecordingForWrite ();
103+ try {
104+ aggregator (recording .aggregators , attributes ).recordLong (value );
105+ } finally {
106+ releaseRecordingAfterWrite (recording );
107+ }
108+ } else {
109+ // no need to hold writers back if we are not resetting metrics on collect
110+ aggregator (currentRecording .aggregators , attributes ).recordLong (value );
86111 }
87112 }
88113
89114 public void recordDouble (double value , Attributes attributes ) {
90- Recording recording = acquireRecordingForWrite ();
91- try {
92- aggregator (recording .aggregators , attributes ).recordDouble (value );
93- } finally {
94- releaseRecordingAfterWrite (recording );
115+ if (resetOnCollect ) {
116+ Recording recording = acquireRecordingForWrite ();
117+ try {
118+ aggregator (recording .aggregators , attributes ).recordDouble (value );
119+ } finally {
120+ releaseRecordingAfterWrite (recording );
121+ }
122+ } else {
123+ // no need to hold writers back if we are not resetting metrics on collect
124+ aggregator (currentRecording .aggregators , attributes ).recordDouble (value );
95125 }
96126 }
97127
@@ -113,7 +143,7 @@ private OtelAggregator aggregator(
113143 }
114144
115145 public void collect (OtelInstrumentVisitor visitor ) {
116- if (RESET_ON_COLLECT ) {
146+ if (resetOnCollect ) {
117147 doCollectAndReset (visitor );
118148 } else {
119149 doCollect (visitor );
@@ -168,27 +198,21 @@ private void doCollectAndReset(OtelInstrumentVisitor visitor) {
168198 }
169199
170200 private Recording acquireRecordingForWrite () {
171- if (RESET_ON_COLLECT ) {
172- // busy loop to limit impact on caller
173- while (true ) {
174- final Recording recording = currentRecording ;
175- // atomically notify collector of write activity and check state
176- if ((ACTIVITY .addAndGet (recording , WRITER ) & RESET_PENDING ) == 0 ) {
177- return recording ;
178- } else {
179- // reset pending: rollback and check again for a fresh recording
180- ACTIVITY .addAndGet (recording , -WRITER );
181- }
201+ // busy loop to limit impact on caller
202+ while (true ) {
203+ final Recording recording = currentRecording ;
204+ // atomically notify collector of write activity and check state
205+ if ((ACTIVITY .addAndGet (recording , WRITER ) & RESET_PENDING ) == 0 ) {
206+ return recording ;
207+ } else {
208+ // reset pending: rollback and check again for a fresh recording
209+ ACTIVITY .addAndGet (recording , -WRITER );
182210 }
183- } else {
184- return currentRecording ;
185211 }
186212 }
187213
188214 private void releaseRecordingAfterWrite (Recording recording ) {
189- if (RESET_ON_COLLECT ) {
190- ACTIVITY .addAndGet (recording , -WRITER );
191- }
215+ ACTIVITY .addAndGet (recording , -WRITER );
192216 }
193217
194218 static final AtomicIntegerFieldUpdater <Recording > ACTIVITY =
0 commit comments