Skip to content

Commit f4a4093

Browse files
committed
atomicref
1 parent c83804d commit f4a4093

1 file changed

Lines changed: 34 additions & 31 deletions

File tree

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.ConcurrentHashMap;
3333
import java.util.concurrent.ConcurrentLinkedQueue;
3434
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.concurrent.atomic.AtomicReference;
3536
import java.util.logging.Level;
3637
import java.util.logging.Logger;
3738

@@ -51,8 +52,7 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
5152
private final RegisteredReader registeredReader;
5253
private final MetricDescriptor metricDescriptor;
5354
private final AggregationTemporality aggregationTemporality;
54-
private final Aggregator<T, U> aggregator;
55-
private volatile AggregatorHolder<T, U> aggregatorHolder = new AggregatorHolder<>();
55+
private final AtomicReference<AggregatorHolder<T, U>> aggregatorHolder;
5656
private final AttributesProcessor attributesProcessor;
5757

5858
private final MemoryMode memoryMode;
@@ -82,16 +82,20 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
8282
int maxCardinality) {
8383
this.registeredReader = registeredReader;
8484
this.metricDescriptor = metricDescriptor;
85+
this.aggregatorHolder = new AtomicReference<>(new AggregatorHolder<>(aggregator));
8586
this.aggregationTemporality =
8687
registeredReader
8788
.getReader()
8889
.getAggregationTemporality(metricDescriptor.getSourceInstrument().getType());
89-
this.aggregator = aggregator;
9090
this.attributesProcessor = attributesProcessor;
9191
this.maxCardinality = maxCardinality - 1;
9292
this.memoryMode = registeredReader.getReader().getMemoryMode();
9393
}
9494

95+
void swapAggregator(Aggregator<T, U> aggregator) {
96+
this.aggregatorHolder.set(new AggregatorHolder<>(aggregator));
97+
}
98+
9599
// Visible for testing
96100
Queue<AggregatorHandle<T, U>> getAggregatorHandlePool() {
97101
return aggregatorHandlePool;
@@ -101,8 +105,7 @@ Queue<AggregatorHandle<T, U>> getAggregatorHandlePool() {
101105
public void recordLong(long value, Attributes attributes, Context context) {
102106
AggregatorHolder<T, U> aggregatorHolder = getHolderForRecord();
103107
try {
104-
AggregatorHandle<T, U> handle =
105-
getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context);
108+
AggregatorHandle<T, U> handle = getAggregatorHandle(aggregatorHolder, attributes, context);
106109
handle.recordLong(value, attributes, context);
107110
} finally {
108111
releaseHolderForRecord(aggregatorHolder);
@@ -123,8 +126,7 @@ public void recordDouble(double value, Attributes attributes, Context context) {
123126
}
124127
AggregatorHolder<T, U> aggregatorHolder = getHolderForRecord();
125128
try {
126-
AggregatorHandle<T, U> handle =
127-
getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context);
129+
AggregatorHandle<T, U> handle = getAggregatorHandle(aggregatorHolder, attributes, context);
128130
handle.recordDouble(value, attributes, context);
129131
} finally {
130132
releaseHolderForRecord(aggregatorHolder);
@@ -146,14 +148,14 @@ public boolean isEnabled() {
146148
*/
147149
private AggregatorHolder<T, U> getHolderForRecord() {
148150
do {
149-
AggregatorHolder<T, U> aggregatorHolder = this.aggregatorHolder;
150-
int recordsInProgress = aggregatorHolder.activeRecordingThreads.addAndGet(2);
151+
AggregatorHolder<T, U> localAggregatorHolder = this.aggregatorHolder.get();
152+
int recordsInProgress = localAggregatorHolder.activeRecordingThreads.addAndGet(2);
151153
if (recordsInProgress % 2 == 0) {
152-
return aggregatorHolder;
154+
return localAggregatorHolder;
153155
} else {
154156
// Collect is in progress, decrement recordsInProgress to allow collect to proceed and
155157
// re-read aggregatorHolder
156-
aggregatorHolder.activeRecordingThreads.addAndGet(-2);
158+
localAggregatorHolder.activeRecordingThreads.addAndGet(-2);
157159
}
158160
} while (true);
159161
}
@@ -162,21 +164,19 @@ private AggregatorHolder<T, U> getHolderForRecord() {
162164
* Called on the {@link AggregatorHolder} obtained from {@link #getHolderForRecord()} to indicate
163165
* that recording is complete, and it is safe to collect.
164166
*/
165-
private void releaseHolderForRecord(AggregatorHolder<T, U> aggregatorHolder) {
166-
aggregatorHolder.activeRecordingThreads.addAndGet(-2);
167+
private void releaseHolderForRecord(AggregatorHolder<T, U> localAggregatorHolder) {
168+
localAggregatorHolder.activeRecordingThreads.addAndGet(-2);
167169
}
168170

169171
private AggregatorHandle<T, U> getAggregatorHandle(
170-
ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles,
171-
Attributes attributes,
172-
Context context) {
172+
AggregatorHolder<T, U> localAggregatorHolder, Attributes attributes, Context context) {
173173
Objects.requireNonNull(attributes, "attributes");
174174
attributes = attributesProcessor.process(attributes, context);
175-
AggregatorHandle<T, U> handle = aggregatorHandles.get(attributes);
175+
AggregatorHandle<T, U> handle = localAggregatorHolder.aggregatorHandles.get(attributes);
176176
if (handle != null) {
177177
return handle;
178178
}
179-
if (aggregatorHandles.size() >= maxCardinality) {
179+
if (localAggregatorHolder.aggregatorHandles.size() >= maxCardinality) {
180180
logger.log(
181181
Level.WARNING,
182182
"Instrument "
@@ -186,17 +186,17 @@ private AggregatorHandle<T, U> getAggregatorHandle(
186186
+ ").");
187187
// Return handle for overflow series, first checking if a handle already exists for it
188188
attributes = MetricStorage.CARDINALITY_OVERFLOW;
189-
handle = aggregatorHandles.get(attributes);
189+
handle = localAggregatorHolder.aggregatorHandles.get(attributes);
190190
if (handle != null) {
191191
return handle;
192192
}
193193
}
194194
// Get handle from pool if available, else create a new one.
195195
AggregatorHandle<T, U> newHandle = aggregatorHandlePool.poll();
196196
if (newHandle == null) {
197-
newHandle = aggregator.createHandle();
197+
newHandle = localAggregatorHolder.aggregator.createHandle();
198198
}
199-
handle = aggregatorHandles.putIfAbsent(attributes, newHandle);
199+
handle = localAggregatorHolder.aggregatorHandles.putIfAbsent(attributes, newHandle);
200200
return handle != null ? handle : newHandle;
201201
}
202202

@@ -211,14 +211,16 @@ public MetricData collect(
211211
aggregationTemporality == DELTA
212212
? registeredReader.getLastCollectEpochNanos()
213213
: startEpochNanos;
214+
AggregatorHolder<T, U> holder = this.aggregatorHolder.get();
214215

215216
ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles;
216217
if (reset) {
217-
AggregatorHolder<T, U> holder = this.aggregatorHolder;
218-
this.aggregatorHolder =
218+
AggregatorHolder<T, U> newHolder =
219219
(memoryMode == REUSABLE_DATA)
220-
? new AggregatorHolder<>(previousCollectionAggregatorHandles)
221-
: new AggregatorHolder<>();
220+
? new AggregatorHolder<>(holder.aggregator, previousCollectionAggregatorHandles)
221+
: new AggregatorHolder<>(holder.aggregator);
222+
// Otherwise, swapAggregator was called and the update should be ignored
223+
aggregatorHolder.compareAndSet(holder, newHolder);
222224

223225
// Increment recordsInProgress by 1, which produces an odd number acting as a signal that
224226
// record operations should re-read the volatile this.aggregatorHolder.
@@ -228,10 +230,8 @@ public MetricData collect(
228230
while (recordsInProgress > 1) {
229231
recordsInProgress = holder.activeRecordingThreads.get();
230232
}
231-
aggregatorHandles = holder.aggregatorHandles;
232-
} else {
233-
aggregatorHandles = this.aggregatorHolder.aggregatorHandles;
234233
}
234+
aggregatorHandles = holder.aggregatorHandles;
235235

236236
List<T> points;
237237
if (memoryMode == REUSABLE_DATA) {
@@ -303,7 +303,7 @@ public MetricData collect(
303303
return EmptyMetricData.getInstance();
304304
}
305305

306-
return aggregator.toMetricData(
306+
return holder.aggregator.toMetricData(
307307
resource, instrumentationScopeInfo, metricDescriptor, points, aggregationTemporality);
308308
}
309309

@@ -313,6 +313,7 @@ public MetricDescriptor getMetricDescriptor() {
313313
}
314314

315315
private static class AggregatorHolder<T extends PointData, U extends ExemplarData> {
316+
private final Aggregator<T, U> aggregator;
316317
private final ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles;
317318
// Recording threads grab the current interval (AggregatorHolder) and atomically increment
318319
// this by 2 before recording against it (and then decrement by two when done).
@@ -330,12 +331,14 @@ private static class AggregatorHolder<T extends PointData, U extends ExemplarDat
330331
// and then grab and record against the new current interval (AggregatorHolder).
331332
private final AtomicInteger activeRecordingThreads = new AtomicInteger(0);
332333

333-
private AggregatorHolder() {
334-
aggregatorHandles = new ConcurrentHashMap<>();
334+
private AggregatorHolder(Aggregator<T, U> aggregator) {
335+
this(aggregator, new ConcurrentHashMap<>());
335336
}
336337

337338
private AggregatorHolder(
339+
Aggregator<T, U> aggregator,
338340
ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles) {
341+
this.aggregator = aggregator;
339342
this.aggregatorHandles = aggregatorHandles;
340343
}
341344
}

0 commit comments

Comments
 (0)