Skip to content

Commit 4196b17

Browse files
committed
cc
1 parent 21d36d5 commit 4196b17

1 file changed

Lines changed: 31 additions & 36 deletions

File tree

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ public final class AsynchronousMetricStorage<T extends PointData, U extends Exem
5555
private final AggregationTemporality aggregationTemporality;
5656
private final AttributesProcessor attributesProcessor;
5757
private final MemoryMode memoryMode;
58+
private final Aggregator<T, U> originalAggregator;
59+
60+
private Aggregator<T, U> aggregator;
5861

5962
/**
6063
* This field is set to 1 less than the actual intended cardinality limit, allowing the last slot
@@ -70,19 +73,16 @@ public final class AsynchronousMetricStorage<T extends PointData, U extends Exem
7073

7174
// Only populated if memoryMode == REUSABLE_DATA
7275
private final ObjectPool<T> reusablePointsPool;
76+
private final ObjectPool<AggregatorHandle<T, U>> reusableHandlesPool;
7377
private final Function<Attributes, AggregatorHandle<T, U>> handleBuilder;
78+
private final BiConsumer<Attributes, AggregatorHandle<T, U>> handleReleaser;
7479
private final BiConsumer<Attributes, T> pointReleaser;
7580

7681
private final List<T> reusablePointsList = new ArrayList<>();
7782
// If aggregationTemporality == DELTA, this reference and lastPoints will be swapped at every
7883
// collection
7984
private Map<Attributes, T> reusablePointsMap = new PooledHashMap<>();
8085

81-
// deliberately not volatile because of performance concerns
82-
// - which means its eventually consistent
83-
private AggregatorHolder<T, U> aggregatorHolder;
84-
private final Aggregator<T, U> originalAggregator;
85-
8686
// Time information relative to recording of data in aggregatorHandles, set while calling
8787
// callbacks
8888
private long startEpochNanos;
@@ -101,15 +101,16 @@ private AsynchronousMetricStorage(
101101
.getReader()
102102
.getAggregationTemporality(metricDescriptor.getSourceInstrument().getType());
103103
this.memoryMode = registeredReader.getReader().getMemoryMode();
104+
this.aggregator = aggregator;
105+
this.originalAggregator = aggregator;
104106
this.attributesProcessor = attributesProcessor;
105107
this.maxCardinality = maxCardinality - 1;
106108
this.reusablePointsPool = new ObjectPool<>(aggregator::createReusablePoint);
109+
this.reusableHandlesPool = new ObjectPool<>(aggregator::createHandle);
110+
this.handleBuilder = ignored -> reusableHandlesPool.borrowObject();
111+
this.handleReleaser = (ignored, handle) -> reusableHandlesPool.returnObject(handle);
107112
this.pointReleaser = (ignored, point) -> reusablePointsPool.returnObject(point);
108113

109-
this.aggregatorHolder = new AggregatorHolder<>(aggregator);
110-
this.originalAggregator = aggregator;
111-
this.handleBuilder = ignored -> aggregatorHolder.reusableHandlesPool.borrowObject();
112-
113114
if (memoryMode == REUSABLE_DATA) {
114115
this.lastPoints = new PooledHashMap<>();
115116
this.aggregatorHandles = new PooledHashMap<>();
@@ -200,24 +201,20 @@ public MetricData collect(
200201
InstrumentationScopeInfo instrumentationScopeInfo,
201202
long startEpochNanos,
202203
long epochNanos) {
203-
AggregatorHolder<T, U> localAggregatorHolder = aggregatorHolder;
204-
205204
Collection<T> result =
206205
aggregationTemporality == AggregationTemporality.DELTA
207206
? collectWithDeltaAggregationTemporality()
208207
: collectWithCumulativeAggregationTemporality();
209208

210209
// collectWith*AggregationTemporality() methods are responsible for resetting the handle
211-
aggregatorHandles.forEach(localAggregatorHolder.handleReleaser);
210+
aggregatorHandles.forEach(handleReleaser);
212211
aggregatorHandles.clear();
213212

214-
return localAggregatorHolder.aggregator.toMetricData(
213+
return aggregator.toMetricData(
215214
resource, instrumentationScopeInfo, metricDescriptor, result, aggregationTemporality);
216215
}
217216

218217
private Collection<T> collectWithDeltaAggregationTemporality() {
219-
AggregatorHolder<T, U> localAggregatorHolder = aggregatorHolder;
220-
221218
Map<Attributes, T> currentPoints;
222219
if (memoryMode == REUSABLE_DATA) {
223220
// deltaPoints computed in the previous collection can be released
@@ -240,7 +237,7 @@ private Collection<T> collectWithDeltaAggregationTemporality() {
240237
// AggregatorHandle is going to modify the point eventually, but we must persist its
241238
// value to used it at the next collection (within lastPoints). Thus, we make a copy.
242239
pointForCurrentPoints = reusablePointsPool.borrowObject();
243-
localAggregatorHolder.aggregator.copyPoint(point, pointForCurrentPoints);
240+
aggregator.copyPoint(point, pointForCurrentPoints);
244241
} else {
245242
pointForCurrentPoints = point;
246243
}
@@ -259,16 +256,16 @@ private Collection<T> collectWithDeltaAggregationTemporality() {
259256
// to make sure currentPoint can still be used within lastPoints during the next
260257
// collection.
261258
deltaPoint = reusablePointsPool.borrowObject();
262-
localAggregatorHolder.aggregator.copyPoint(currentPoint, deltaPoint);
259+
aggregator.copyPoint(currentPoint, deltaPoint);
263260
} else {
264261
deltaPoint = currentPoint;
265262
}
266263
} else {
267264
if (memoryMode == REUSABLE_DATA) {
268-
localAggregatorHolder.aggregator.diffInPlace(lastPoint, currentPoint);
265+
aggregator.diffInPlace(lastPoint, currentPoint);
269266
deltaPoint = lastPoint;
270267
} else {
271-
deltaPoint = localAggregatorHolder.aggregator.diff(lastPoint, currentPoint);
268+
deltaPoint = aggregator.diff(lastPoint, currentPoint);
272269
}
273270
}
274271
deltaPoints.add(deltaPoint);
@@ -318,29 +315,27 @@ private Collection<T> collectWithCumulativeAggregationTemporality() {
318315
@Override
319316
public void setEnabled(boolean enabled) {
320317
if (enabled) {
321-
if (aggregatorHolder.aggregator == Aggregator.drop()) {
322-
aggregatorHolder = new AggregatorHolder<>(originalAggregator);
318+
if (aggregator == Aggregator.drop()) {
319+
aggregator = originalAggregator;
323320
}
324321
} else {
325-
aggregatorHolder = new AggregatorHolder<>(Aggregator.drop());
322+
aggregator = Aggregator.drop();
323+
324+
if (memoryMode == REUSABLE_DATA) {
325+
aggregatorHandles.forEach(
326+
(attributes, handle) -> {
327+
handle.aggregateThenMaybeReset(0, 0, Attributes.empty(), /* reset= */ true);
328+
reusableHandlesPool.returnObject(handle);
329+
});
330+
lastPoints.forEach(pointReleaser);
331+
}
332+
aggregatorHandles.clear();
333+
lastPoints.clear();
326334
}
327335
}
328336

329337
@Override
330338
public boolean isEmpty() {
331-
return aggregatorHolder.aggregator == Aggregator.drop();
332-
}
333-
334-
private static final class AggregatorHolder<T extends PointData, U extends ExemplarData> {
335-
336-
private final Aggregator<T, U> aggregator;
337-
private final ObjectPool<AggregatorHandle<T, U>> reusableHandlesPool;
338-
private final BiConsumer<Attributes, AggregatorHandle<T, U>> handleReleaser;
339-
340-
private AggregatorHolder(Aggregator<T, U> aggregator) {
341-
this.aggregator = aggregator;
342-
this.reusableHandlesPool = new ObjectPool<>(aggregator::createHandle);
343-
this.handleReleaser = (ignored, handle) -> reusableHandlesPool.returnObject(handle);
344-
}
339+
return aggregator == Aggregator.drop();
345340
}
346341
}

0 commit comments

Comments
 (0)