Skip to content

Commit 92d221d

Browse files
psx95zeitlinger
andauthored
Add support for batching in PeriodicMetricReader (#8296)
Signed-off-by: Gregor Zeitlinger <gregor.zeitlinger@grafana.com> Co-authored-by: Gregor Zeitlinger <gregor.zeitlinger@grafana.com>
1 parent d60b1d9 commit 92d221d

7 files changed

Lines changed: 1330 additions & 10 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
#### Incubating
2222

23+
* Add support for configuring `setMaxExportBatchSize` in `PeriodicMetricReader` ([#8296](https://github.com/open-telemetry/opentelemetry-java/pull/8296))
24+
2325
* **BREAKING** Update `EnvironmentGetter` and `EnvironmentSetter` key normalization to reflect spec
2426
changes
2527
([#8233](https://github.com/open-telemetry/opentelemetry-java/pull/8233))
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.metrics.export;
7+
8+
import io.opentelemetry.sdk.metrics.data.DoublePointData;
9+
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramData;
10+
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData;
11+
import io.opentelemetry.sdk.metrics.data.HistogramData;
12+
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
13+
import io.opentelemetry.sdk.metrics.data.LongPointData;
14+
import io.opentelemetry.sdk.metrics.data.MetricData;
15+
import io.opentelemetry.sdk.metrics.data.PointData;
16+
import io.opentelemetry.sdk.metrics.data.SumData;
17+
import io.opentelemetry.sdk.metrics.data.SummaryPointData;
18+
import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData;
19+
import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
20+
import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData;
21+
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
22+
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
23+
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryData;
24+
import java.util.ArrayList;
25+
import java.util.Collection;
26+
import java.util.Collections;
27+
import java.util.List;
28+
29+
/**
30+
* Batches metric data into multiple batches based on the maximum export batch size. This is used by
31+
* the {@link PeriodicMetricReader} to batch metric data before exporting it.
32+
*
33+
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
34+
* at any time.
35+
*/
36+
class MetricExportBatcher {
37+
38+
private MetricExportBatcher() {}
39+
40+
private static void validateMaxExportBatchSize(int maxExportBatchSize) {
41+
if (maxExportBatchSize <= 0) {
42+
throw new IllegalArgumentException("maxExportBatchSize must be positive");
43+
}
44+
}
45+
46+
/**
47+
* Batches the given metric data into multiple batches based on the maximum export batch size.
48+
*
49+
* @param metrics The collection of metric data objects to batch based on the number of data
50+
* points they contain.
51+
* @return A collection of batches of metric data.
52+
*/
53+
static Collection<Collection<MetricData>> batchMetrics(
54+
Collection<MetricData> metrics, int maxExportBatchSize) {
55+
validateMaxExportBatchSize(maxExportBatchSize);
56+
if (metrics.isEmpty()) {
57+
return Collections.emptyList();
58+
}
59+
Collection<Collection<MetricData>> preparedBatchesForExport = new ArrayList<>();
60+
List<MetricData> currentBatch = new ArrayList<>(maxExportBatchSize);
61+
int currentPointsInBatch = 0;
62+
for (MetricData metricData : metrics) {
63+
int totalPointsInMetric = metricData.getData().getPoints().size();
64+
if (currentPointsInBatch + totalPointsInMetric <= maxExportBatchSize) {
65+
currentBatch.add(metricData);
66+
currentPointsInBatch += totalPointsInMetric;
67+
continue;
68+
}
69+
int currentIndex = 0;
70+
List<PointData> originalPointsList = new ArrayList<>(metricData.getData().getPoints());
71+
while (currentIndex < totalPointsInMetric) {
72+
if (currentPointsInBatch == maxExportBatchSize) {
73+
preparedBatchesForExport.add(currentBatch);
74+
currentBatch = new ArrayList<>(maxExportBatchSize);
75+
currentPointsInBatch = 0;
76+
}
77+
int pointsToTake =
78+
Math.min(maxExportBatchSize - currentPointsInBatch, totalPointsInMetric - currentIndex);
79+
currentBatch.add(
80+
copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake));
81+
currentPointsInBatch += pointsToTake;
82+
currentIndex += pointsToTake;
83+
}
84+
}
85+
if (!currentBatch.isEmpty()) {
86+
preparedBatchesForExport.add(currentBatch);
87+
}
88+
return Collections.unmodifiableCollection(preparedBatchesForExport);
89+
}
90+
91+
private static MetricData copyMetricData(
92+
MetricData original,
93+
List<PointData> originalPointsList,
94+
int dataPointsOffset,
95+
int dataPointsToTake) {
96+
List<PointData> points =
97+
Collections.unmodifiableList(
98+
new ArrayList<>(
99+
originalPointsList.subList(dataPointsOffset, dataPointsOffset + dataPointsToTake)));
100+
return createMetricDataWithPoints(original, points);
101+
}
102+
103+
/**
104+
* Creates a new MetricData with the given points.
105+
*
106+
* @param original The original MetricData.
107+
* @param points The points to use for the new MetricData.
108+
* @return A new MetricData with the given points.
109+
*/
110+
@SuppressWarnings("unchecked")
111+
private static MetricData createMetricDataWithPoints(
112+
MetricData original, Collection<PointData> points) {
113+
switch (original.getType()) {
114+
case DOUBLE_GAUGE:
115+
return ImmutableMetricData.createDoubleGauge(
116+
original.getResource(),
117+
original.getInstrumentationScopeInfo(),
118+
original.getName(),
119+
original.getDescription(),
120+
original.getUnit(),
121+
ImmutableGaugeData.create((Collection<DoublePointData>) (Collection<?>) points));
122+
case LONG_GAUGE:
123+
return ImmutableMetricData.createLongGauge(
124+
original.getResource(),
125+
original.getInstrumentationScopeInfo(),
126+
original.getName(),
127+
original.getDescription(),
128+
original.getUnit(),
129+
ImmutableGaugeData.create((Collection<LongPointData>) (Collection<?>) points));
130+
case DOUBLE_SUM:
131+
SumData<DoublePointData> doubleSumData = original.getDoubleSumData();
132+
return ImmutableMetricData.createDoubleSum(
133+
original.getResource(),
134+
original.getInstrumentationScopeInfo(),
135+
original.getName(),
136+
original.getDescription(),
137+
original.getUnit(),
138+
ImmutableSumData.create(
139+
doubleSumData.isMonotonic(),
140+
doubleSumData.getAggregationTemporality(),
141+
(Collection<DoublePointData>) (Collection<?>) points));
142+
case LONG_SUM:
143+
SumData<LongPointData> longSumData = original.getLongSumData();
144+
return ImmutableMetricData.createLongSum(
145+
original.getResource(),
146+
original.getInstrumentationScopeInfo(),
147+
original.getName(),
148+
original.getDescription(),
149+
original.getUnit(),
150+
ImmutableSumData.create(
151+
longSumData.isMonotonic(),
152+
longSumData.getAggregationTemporality(),
153+
(Collection<LongPointData>) (Collection<?>) points));
154+
case HISTOGRAM:
155+
HistogramData histogramData = original.getHistogramData();
156+
return ImmutableMetricData.createDoubleHistogram(
157+
original.getResource(),
158+
original.getInstrumentationScopeInfo(),
159+
original.getName(),
160+
original.getDescription(),
161+
original.getUnit(),
162+
ImmutableHistogramData.create(
163+
histogramData.getAggregationTemporality(),
164+
(Collection<HistogramPointData>) (Collection<?>) points));
165+
case EXPONENTIAL_HISTOGRAM:
166+
ExponentialHistogramData expHistogramData = original.getExponentialHistogramData();
167+
return ImmutableMetricData.createExponentialHistogram(
168+
original.getResource(),
169+
original.getInstrumentationScopeInfo(),
170+
original.getName(),
171+
original.getDescription(),
172+
original.getUnit(),
173+
ImmutableExponentialHistogramData.create(
174+
expHistogramData.getAggregationTemporality(),
175+
(Collection<ExponentialHistogramPointData>) (Collection<?>) points));
176+
case SUMMARY:
177+
return ImmutableMetricData.createDoubleSummary(
178+
original.getResource(),
179+
original.getInstrumentationScopeInfo(),
180+
original.getName(),
181+
original.getDescription(),
182+
original.getUnit(),
183+
ImmutableSummaryData.create((Collection<SummaryPointData>) (Collection<?>) points));
184+
}
185+
throw new UnsupportedOperationException("Unsupported metric type: " + original.getType());
186+
}
187+
}

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
1818
import io.opentelemetry.sdk.metrics.data.MetricData;
1919
import java.util.Collection;
20+
import java.util.Iterator;
2021
import java.util.concurrent.ScheduledExecutorService;
2122
import java.util.concurrent.ScheduledFuture;
2223
import java.util.concurrent.TimeUnit;
@@ -51,6 +52,7 @@ public final class PeriodicMetricReader implements MetricReader {
5152
private volatile CollectionRegistration collectionRegistration = CollectionRegistration.noop();
5253

5354
@Nullable private volatile ScheduledFuture<?> scheduledFuture;
55+
private final int maxExportBatchSize;
5456

5557
/**
5658
* Returns a new {@link PeriodicMetricReader} which exports to the {@code exporter} once every
@@ -66,10 +68,14 @@ public static PeriodicMetricReaderBuilder builder(MetricExporter exporter) {
6668
}
6769

6870
PeriodicMetricReader(
69-
MetricExporter exporter, long intervalNanos, ScheduledExecutorService scheduler) {
71+
MetricExporter exporter,
72+
long intervalNanos,
73+
ScheduledExecutorService scheduler,
74+
int maxExportBatchSize) {
7075
this.exporter = exporter;
7176
this.intervalNanos = intervalNanos;
7277
this.scheduler = scheduler;
78+
this.maxExportBatchSize = maxExportBatchSize;
7379
this.scheduled = new Scheduled();
7480
}
7581

@@ -163,6 +169,8 @@ public String toString() {
163169
+ exporter
164170
+ ", intervalNanos="
165171
+ intervalNanos
172+
+ ", maxExportBatchSize="
173+
+ maxExportBatchSize
166174
+ '}';
167175
}
168176

@@ -187,13 +195,56 @@ private final class Scheduled implements Runnable {
187195

188196
private Scheduled() {}
189197

198+
private CompletableResultCode exportMetrics(Collection<MetricData> metricData) {
199+
if (maxExportBatchSize == 0) {
200+
return exporter.export(metricData);
201+
}
202+
Collection<Collection<MetricData>> batches =
203+
MetricExportBatcher.batchMetrics(metricData, maxExportBatchSize);
204+
CompletableResultCode sequentialResult = new CompletableResultCode();
205+
AtomicBoolean anyFailed = new AtomicBoolean(false);
206+
Iterator<Collection<MetricData>> batchIterator = batches.iterator();
207+
Runnable exportNext =
208+
new Runnable() {
209+
@Override
210+
public void run() {
211+
while (batchIterator.hasNext()) {
212+
Collection<MetricData> currentBatch = batchIterator.next();
213+
CompletableResultCode currentResult = exporter.export(currentBatch);
214+
if (currentResult.isDone()) {
215+
if (!currentResult.isSuccess()) {
216+
anyFailed.set(true);
217+
}
218+
} else {
219+
currentResult.whenComplete(
220+
() -> {
221+
if (!currentResult.isSuccess()) {
222+
anyFailed.set(true);
223+
}
224+
this.run();
225+
});
226+
return;
227+
}
228+
}
229+
if (anyFailed.get()) {
230+
sequentialResult.fail();
231+
} else {
232+
sequentialResult.succeed();
233+
}
234+
}
235+
};
236+
exportNext.run();
237+
return sequentialResult;
238+
}
239+
190240
void setMeterProvider(MeterProvider meterProvider) {
191241
instrumentation = new MetricReaderInstrumentation(COMPONENT_ID, meterProvider);
192242
}
193243

194244
@Override
195245
public void run() {
196-
// Ignore the CompletableResultCode from doRun() in order to keep run() asynchronous
246+
// Ignore the CompletableResultCode from doRun() in order to keep run()
247+
// asynchronous
197248
doRun();
198249
}
199250

@@ -217,11 +268,11 @@ CompletableResultCode doRun() {
217268
exportAvailable.set(true);
218269
flushResult.succeed();
219270
} else {
220-
CompletableResultCode result = exporter.export(metricData);
271+
CompletableResultCode result = exportMetrics(metricData);
221272
result.whenComplete(
222273
() -> {
223274
if (!result.isSuccess()) {
224-
logger.log(Level.FINE, "Exporter failed");
275+
logger.log(Level.WARNING, "Exporter failed");
225276
}
226277
exportAvailable.set(true);
227278
flushResult.succeed();

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public final class PeriodicMetricReaderBuilder {
3030

3131
@Nullable private ScheduledExecutorService executor;
3232

33+
private int maxExportBatchSize;
34+
3335
PeriodicMetricReaderBuilder(MetricExporter metricExporter) {
3436
this.metricExporter = metricExporter;
3537
}
@@ -59,13 +61,27 @@ public PeriodicMetricReaderBuilder setExecutor(ScheduledExecutorService executor
5961
return this;
6062
}
6163

64+
/**
65+
* Sets the maximum number of data points to include in a single export batch. If unset, no
66+
* batching will be performed. The maximum number of data points is considered across MetricData
67+
* objects scheduled for export.
68+
*
69+
* @param maxExportBatchSize The maximum number of data points to include in a single export
70+
* batch.
71+
*/
72+
PeriodicMetricReaderBuilder setMaxExportBatchSize(int maxExportBatchSize) {
73+
checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive");
74+
this.maxExportBatchSize = maxExportBatchSize;
75+
return this;
76+
}
77+
6278
/** Build a {@link PeriodicMetricReader} with the configuration of this builder. */
6379
public PeriodicMetricReader build() {
6480
ScheduledExecutorService executor = this.executor;
6581
if (executor == null) {
6682
executor =
6783
Executors.newScheduledThreadPool(1, new DaemonThreadFactory("PeriodicMetricReader"));
6884
}
69-
return new PeriodicMetricReader(metricExporter, intervalNanos, executor);
85+
return new PeriodicMetricReader(metricExporter, intervalNanos, executor, maxExportBatchSize);
7086
}
7187
}

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/SdkMeterProviderUtil.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,21 @@ public static SdkMeterProviderBuilder addMeterConfiguratorCondition(
7676
return sdkMeterProviderBuilder;
7777
}
7878

79+
/** Reflectively set the max export batch size for the {@link SdkMeterProviderBuilder}. */
80+
public static SdkMeterProviderBuilder setMaxExportBatchSize(
81+
SdkMeterProviderBuilder sdkMeterProviderBuilder, int maxExportBatchSize) {
82+
try {
83+
Method method =
84+
SdkMeterProviderBuilder.class.getDeclaredMethod("setMaxExportBatchSize", int.class);
85+
method.setAccessible(true);
86+
method.invoke(sdkMeterProviderBuilder, maxExportBatchSize);
87+
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
88+
throw new IllegalStateException(
89+
"Error calling setMaxExportBatchSize on SdkMeterProviderBuilder", e);
90+
}
91+
return sdkMeterProviderBuilder;
92+
}
93+
7994
/**
8095
* Reflectively add an {@link AttributesProcessor} to the {@link ViewBuilder} which appends
8196
* key-values from baggage to all measurements.

0 commit comments

Comments
 (0)