Skip to content

Commit b3e4db7

Browse files
committed
Simplify the batchMetrics function
1 parent 2e9efb7 commit b3e4db7

2 files changed

Lines changed: 26 additions & 108 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
#### Incubating
1313

14+
* Add support for configuring `setMaxExportBatchSize` in `PeriodicMetricReader` ([#8296](https://github.com/open-telemetry/opentelemetry-java/pull/8296))
15+
1416
* **BREAKING** Update `EnvironmentGetter` and `EnvironmentSetter` key normalization to reflect spec
1517
changes
1618
([#8233](https://github.com/open-telemetry/opentelemetry-java/pull/8233))

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java

Lines changed: 24 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -66,66 +66,35 @@ Collection<Collection<MetricData>> batchMetrics(Collection<MetricData> metrics)
6666
return Collections.emptyList();
6767
}
6868
Collection<Collection<MetricData>> preparedBatchesForExport = new ArrayList<>();
69-
BatchState currentBatch = new BatchState(new ArrayList<>(maxExportBatchSize), 0);
70-
71-
// Fill active batch and split overlapping metric points if needed
69+
List<MetricData> currentBatch = new ArrayList<>(maxExportBatchSize);
70+
int currentPointsInBatch = 0;
7271
for (MetricData metricData : metrics) {
73-
MetricDataSplitOperationResult splitResult = prepareExportBatches(metricData, currentBatch);
74-
preparedBatchesForExport.addAll(splitResult.getPreparedBatches());
75-
currentBatch = splitResult.getLastInProgressBatch();
76-
}
77-
78-
// Push trailing capacity block
79-
if (!currentBatch.metrics.isEmpty()) {
80-
preparedBatchesForExport.add(currentBatch.metrics);
81-
}
82-
return Collections.unmodifiableCollection(preparedBatchesForExport);
83-
}
84-
85-
/**
86-
* Prepares export batches from a single metric data object. This function only operates on a
87-
* single metric data object, fills up the current batch with as many points as possible from the
88-
* metric data object, and then creates new metric data objects for the remaining points.
89-
*
90-
* @param metricData The metric data object to split.
91-
* @param currentBatch The current batch of metric data objects.
92-
* @return A result containing the prepared batches and the last in-progress batch.
93-
*/
94-
private MetricDataSplitOperationResult prepareExportBatches(
95-
MetricData metricData, BatchState currentBatch) {
96-
int remainingCapacityInCurrentBatch = maxExportBatchSize - currentBatch.points;
97-
int totalPointsInMetricData = metricData.getData().getPoints().size();
98-
99-
if (remainingCapacityInCurrentBatch >= totalPointsInMetricData) {
100-
currentBatch.metrics.add(metricData);
101-
currentBatch.points += totalPointsInMetricData;
102-
return new MetricDataSplitOperationResult(Collections.emptyList(), currentBatch);
103-
} else {
104-
// Remaining capacity can't hold all points, partition existing metric data object
105-
List<PointData> originalPointsList = new ArrayList<>(metricData.getData().getPoints());
106-
Collection<Collection<MetricData>> preparedBatches = new ArrayList<>();
72+
int totalPointsInMetric = metricData.getData().getPoints().size();
73+
if (currentPointsInBatch + totalPointsInMetric <= maxExportBatchSize) {
74+
currentBatch.add(metricData);
75+
currentPointsInBatch += totalPointsInMetric;
76+
continue;
77+
}
10778
int currentIndex = 0;
108-
109-
while (currentIndex < totalPointsInMetricData) {
110-
int pointsToTake =
111-
Math.min(totalPointsInMetricData - currentIndex, remainingCapacityInCurrentBatch);
112-
113-
if (pointsToTake > 0) {
114-
currentBatch.metrics.add(
115-
copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake));
116-
currentBatch.points += pointsToTake;
117-
currentIndex += pointsToTake;
118-
remainingCapacityInCurrentBatch -= pointsToTake;
119-
}
120-
121-
if (remainingCapacityInCurrentBatch == 0) {
122-
preparedBatches.add(currentBatch.metrics);
123-
currentBatch = new BatchState(new ArrayList<>(maxExportBatchSize), 0);
124-
remainingCapacityInCurrentBatch = maxExportBatchSize;
79+
List<PointData> originalPointsList = new ArrayList<>(metricData.getData().getPoints());
80+
while (currentIndex < totalPointsInMetric) {
81+
if (currentPointsInBatch == maxExportBatchSize) {
82+
preparedBatchesForExport.add(currentBatch);
83+
currentBatch = new ArrayList<>(maxExportBatchSize);
84+
currentPointsInBatch = 0;
12585
}
86+
int pointsToTake =
87+
Math.min(maxExportBatchSize - currentPointsInBatch, totalPointsInMetric - currentIndex);
88+
currentBatch.add(
89+
copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake));
90+
currentPointsInBatch += pointsToTake;
91+
currentIndex += pointsToTake;
12692
}
127-
return new MetricDataSplitOperationResult(preparedBatches, currentBatch);
12893
}
94+
if (!currentBatch.isEmpty()) {
95+
preparedBatchesForExport.add(currentBatch);
96+
}
97+
return Collections.unmodifiableCollection(preparedBatchesForExport);
12998
}
13099

131100
private static MetricData copyMetricData(
@@ -224,57 +193,4 @@ private static MetricData createMetricDataWithPoints(
224193
}
225194
throw new UnsupportedOperationException("Unsupported metric type: " + original.getType());
226195
}
227-
228-
/**
229-
* A data class to store the result of a split operation performed on a single {@link MetricData}
230-
* object.
231-
*/
232-
private static class MetricDataSplitOperationResult {
233-
private final Collection<Collection<MetricData>> preparedBatches;
234-
private final BatchState lastInProgressBatch;
235-
236-
/**
237-
* Creates a new MetricDataSplitOperationResult.
238-
*
239-
* @param preparedBatches The collection of prepared batches of metric data for export. Each
240-
* batch of {@link MetricData} objects is guaranteed to have at most {@link
241-
* #maxExportBatchSize} points.
242-
* @param lastInProgressBatch The last batch that is still in progress. This batch may have less
243-
* than {@link #maxExportBatchSize} points.
244-
*/
245-
MetricDataSplitOperationResult(
246-
Collection<Collection<MetricData>> preparedBatches, BatchState lastInProgressBatch) {
247-
this.preparedBatches = preparedBatches;
248-
this.lastInProgressBatch = lastInProgressBatch;
249-
}
250-
251-
Collection<Collection<MetricData>> getPreparedBatches() {
252-
return preparedBatches;
253-
}
254-
255-
BatchState getLastInProgressBatch() {
256-
return lastInProgressBatch;
257-
}
258-
}
259-
260-
/**
261-
* Tracks the active batch while batching stays linear: {@code metrics} is the current export
262-
* payload being assembled and {@code points} is its running point count, so callers do not need
263-
* to rescan the batch on every append.
264-
*/
265-
private static final class BatchState {
266-
private final Collection<MetricData> metrics;
267-
private int points;
268-
269-
/**
270-
* Creates the mutable state for the current in-progress batch.
271-
*
272-
* @param metrics metric entries collected into the current export batch
273-
* @param points running total of data points across {@code metrics}
274-
*/
275-
private BatchState(Collection<MetricData> metrics, int points) {
276-
this.metrics = metrics;
277-
this.points = points;
278-
}
279-
}
280196
}

0 commit comments

Comments
 (0)