Skip to content

Commit a202a21

Browse files
committed
swap handle async
1 parent e419b5d commit a202a21

1 file changed

Lines changed: 34 additions & 14 deletions

File tree

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ public final class AsynchronousMetricStorage<T extends PointData, U extends Exem
5353
private final RegisteredReader registeredReader;
5454
private final MetricDescriptor metricDescriptor;
5555
private final AggregationTemporality aggregationTemporality;
56-
private final Aggregator<T, U> aggregator;
5756
private final AttributesProcessor attributesProcessor;
5857
private final MemoryMode memoryMode;
5958

@@ -71,16 +70,18 @@ public final class AsynchronousMetricStorage<T extends PointData, U extends Exem
7170

7271
// Only populated if memoryMode == REUSABLE_DATA
7372
private final ObjectPool<T> reusablePointsPool;
74-
private final ObjectPool<AggregatorHandle<T, U>> reusableHandlesPool;
7573
private final Function<Attributes, AggregatorHandle<T, U>> handleBuilder;
76-
private final BiConsumer<Attributes, AggregatorHandle<T, U>> handleReleaser;
7774
private final BiConsumer<Attributes, T> pointReleaser;
7875

7976
private final List<T> reusablePointsList = new ArrayList<>();
8077
// If aggregationTemporality == DELTA, this reference and lastPoints will be swapped at every
8178
// collection
8279
private Map<Attributes, T> reusablePointsMap = new PooledHashMap<>();
8380

81+
// deliberately not volatile because of performance concerns
82+
// - which means its eventually consistent
83+
private AggregatorHolder aggregatorHolder;
84+
8485
// Time information relative to recording of data in aggregatorHandles, set while calling
8586
// callbacks
8687
private long startEpochNanos;
@@ -99,15 +100,14 @@ private AsynchronousMetricStorage(
99100
.getReader()
100101
.getAggregationTemporality(metricDescriptor.getSourceInstrument().getType());
101102
this.memoryMode = registeredReader.getReader().getMemoryMode();
102-
this.aggregator = aggregator;
103103
this.attributesProcessor = attributesProcessor;
104104
this.maxCardinality = maxCardinality - 1;
105105
this.reusablePointsPool = new ObjectPool<>(aggregator::createReusablePoint);
106-
this.reusableHandlesPool = new ObjectPool<>(aggregator::createHandle);
107-
this.handleBuilder = ignored -> reusableHandlesPool.borrowObject();
108-
this.handleReleaser = (ignored, handle) -> reusableHandlesPool.returnObject(handle);
109106
this.pointReleaser = (ignored, point) -> reusablePointsPool.returnObject(point);
110107

108+
this.aggregatorHolder = new AggregatorHolder(aggregator);
109+
this.handleBuilder = ignored -> aggregatorHolder.reusableHandlesPool.borrowObject();
110+
111111
if (memoryMode == REUSABLE_DATA) {
112112
this.lastPoints = new PooledHashMap<>();
113113
this.aggregatorHandles = new PooledHashMap<>();
@@ -143,6 +143,10 @@ AsynchronousMetricStorage<T, U> create(
143143
registeredView.getCardinalityLimit());
144144
}
145145

146+
void swapAggregator(Aggregator<T, U> aggregator) {
147+
this.aggregatorHolder = new AggregatorHolder(aggregator);
148+
}
149+
146150
/** Record callback measurement from {@link ObservableLongMeasurement}. */
147151
void record(Attributes attributes, long value) {
148152
attributes = validateAndProcessAttributes(attributes);
@@ -198,20 +202,24 @@ public MetricData collect(
198202
InstrumentationScopeInfo instrumentationScopeInfo,
199203
long startEpochNanos,
200204
long epochNanos) {
205+
AggregatorHolder localAggregatorHolder = aggregatorHolder;
206+
201207
Collection<T> result =
202208
aggregationTemporality == AggregationTemporality.DELTA
203209
? collectWithDeltaAggregationTemporality()
204210
: collectWithCumulativeAggregationTemporality();
205211

206212
// collectWith*AggregationTemporality() methods are responsible for resetting the handle
207-
aggregatorHandles.forEach(handleReleaser);
213+
aggregatorHandles.forEach(localAggregatorHolder.handleReleaser);
208214
aggregatorHandles.clear();
209215

210-
return aggregator.toMetricData(
216+
return localAggregatorHolder.aggregator.toMetricData(
211217
resource, instrumentationScopeInfo, metricDescriptor, result, aggregationTemporality);
212218
}
213219

214220
private Collection<T> collectWithDeltaAggregationTemporality() {
221+
AggregatorHolder localAggregatorHolder = aggregatorHolder;
222+
215223
Map<Attributes, T> currentPoints;
216224
if (memoryMode == REUSABLE_DATA) {
217225
// deltaPoints computed in the previous collection can be released
@@ -234,7 +242,7 @@ private Collection<T> collectWithDeltaAggregationTemporality() {
234242
// AggregatorHandle is going to modify the point eventually, but we must persist its
235243
// value to used it at the next collection (within lastPoints). Thus, we make a copy.
236244
pointForCurrentPoints = reusablePointsPool.borrowObject();
237-
aggregator.copyPoint(point, pointForCurrentPoints);
245+
localAggregatorHolder.aggregator.copyPoint(point, pointForCurrentPoints);
238246
} else {
239247
pointForCurrentPoints = point;
240248
}
@@ -253,16 +261,16 @@ private Collection<T> collectWithDeltaAggregationTemporality() {
253261
// to make sure currentPoint can still be used within lastPoints during the next
254262
// collection.
255263
deltaPoint = reusablePointsPool.borrowObject();
256-
aggregator.copyPoint(currentPoint, deltaPoint);
264+
localAggregatorHolder.aggregator.copyPoint(currentPoint, deltaPoint);
257265
} else {
258266
deltaPoint = currentPoint;
259267
}
260268
} else {
261269
if (memoryMode == REUSABLE_DATA) {
262-
aggregator.diffInPlace(lastPoint, currentPoint);
270+
localAggregatorHolder.aggregator.diffInPlace(lastPoint, currentPoint);
263271
deltaPoint = lastPoint;
264272
} else {
265-
deltaPoint = aggregator.diff(lastPoint, currentPoint);
273+
deltaPoint = localAggregatorHolder.aggregator.diff(lastPoint, currentPoint);
266274
}
267275
}
268276
deltaPoints.add(deltaPoint);
@@ -311,6 +319,18 @@ private Collection<T> collectWithCumulativeAggregationTemporality() {
311319

312320
@Override
313321
public boolean isEmpty() {
314-
return aggregator == Aggregator.drop();
322+
return aggregatorHolder.aggregator == Aggregator.drop();
323+
}
324+
325+
private final class AggregatorHolder {
326+
private final Aggregator<T, U> aggregator;
327+
private final ObjectPool<AggregatorHandle<T, U>> reusableHandlesPool;
328+
private final BiConsumer<Attributes, AggregatorHandle<T, U>> handleReleaser;
329+
330+
private AggregatorHolder(Aggregator<T, U> aggregator) {
331+
this.aggregator = aggregator;
332+
this.reusableHandlesPool = new ObjectPool<>(aggregator::createHandle);
333+
this.handleReleaser = (ignored, handle) -> reusableHandlesPool.returnObject(handle);
334+
}
315335
}
316336
}

0 commit comments

Comments
 (0)