Skip to content

Commit 7899800

Browse files
author
truongnd2
committed
feat(sdk-logs): add maxConcurrentExports to enable concurrent batch exports
1 parent 207c861 commit 7899800

3 files changed

Lines changed: 71 additions & 3 deletions

File tree

sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.Queue;
2121
import java.util.concurrent.ArrayBlockingQueue;
2222
import java.util.concurrent.BlockingQueue;
23-
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.atomic.AtomicBoolean;
2525
import java.util.concurrent.atomic.AtomicInteger;
2626
import java.util.concurrent.atomic.AtomicReference;
@@ -68,6 +68,7 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord
6868
long scheduleDelayNanos,
6969
int maxQueueSize,
7070
int maxExportBatchSize,
71+
int maxConcurrentExports,
7172
long exporterTimeoutNanos) {
7273
this.worker =
7374
new Worker(
@@ -76,6 +77,7 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord
7677
telemetryVersion,
7778
scheduleDelayNanos,
7879
maxExportBatchSize,
80+
maxConcurrentExports,
7981
exporterTimeoutNanos,
8082
new ArrayBlockingQueue<>(maxQueueSize),
8183
maxQueueSize); // TODO: use JcTools.newFixedSizeQueue(..)
@@ -161,13 +163,15 @@ private static final class Worker implements Runnable {
161163
private volatile boolean continueWork = true;
162164
private final ArrayList<LogRecordData> batch;
163165
private final long maxQueueSize;
166+
private final Semaphore concurrencyLimiter;
164167

165168
private Worker(
166169
LogRecordExporter logRecordExporter,
167170
Supplier<MeterProvider> meterProvider,
168171
InternalTelemetryVersion telemetryVersion,
169172
long scheduleDelayNanos,
170173
int maxExportBatchSize,
174+
int maxConcurrentExports,
171175
long exporterTimeoutNanos,
172176
Queue<ReadWriteLogRecord> queue,
173177
long maxQueueSize) {
@@ -180,6 +184,7 @@ private Worker(
180184
logProcessorInstrumentation =
181185
LogRecordProcessorInstrumentation.get(telemetryVersion, COMPONENT_ID, meterProvider);
182186
this.maxQueueSize = maxQueueSize;
187+
this.concurrencyLimiter = new Semaphore(maxConcurrentExports - 1, true);
183188

184189
this.batch = new ArrayList<>(this.maxExportBatchSize);
185190
}
@@ -234,10 +239,10 @@ private void flush() {
234239
batch.add(logRecord.toLogRecordData());
235240
logsToFlush--;
236241
if (batch.size() >= maxExportBatchSize) {
237-
exportCurrentBatch();
242+
exportCurrentBatchSync();
238243
}
239244
}
240-
exportCurrentBatch();
245+
exportCurrentBatchSync();
241246
CompletableResultCode flushResult = flushRequested.get();
242247
if (flushResult != null) {
243248
flushResult.succeed();
@@ -284,6 +289,15 @@ private CompletableResultCode forceFlush() {
284289
}
285290

286291
private void exportCurrentBatch() {
292+
if (!concurrencyLimiter.tryAcquire()) {
293+
exportCurrentBatchSync();
294+
return;
295+
}
296+
297+
exportCurrentBatchAsync();
298+
}
299+
300+
private void exportCurrentBatchSync() {
287301
if (batch.isEmpty()) {
288302
return;
289303
}
@@ -309,5 +323,29 @@ private void exportCurrentBatch() {
309323
batch.clear();
310324
}
311325
}
326+
327+
private void exportCurrentBatchAsync() {
328+
if (batch.isEmpty()) {
329+
return;
330+
}
331+
332+
List<LogRecordData> batchCopy = new ArrayList<>(batch);
333+
batch.clear();
334+
335+
CompletableResultCode result = logRecordExporter.export(Collections.unmodifiableList(batchCopy));
336+
result.whenComplete(() -> {
337+
String error = null;
338+
if (!result.isSuccess()) {
339+
logger.log(Level.FINE, "Exporter failed");
340+
if (result.getFailureThrowable() != null) {
341+
error = result.getFailureThrowable().getClass().getName();
342+
} else {
343+
error = "export_failed";
344+
}
345+
}
346+
logProcessorInstrumentation.finishLogs(batchCopy.size(), error);
347+
concurrencyLimiter.release();
348+
});
349+
}
312350
}
313351
}

sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,15 @@ public final class BatchLogRecordProcessorBuilder {
3232
// Visible for testing
3333
static final int DEFAULT_MAX_EXPORT_BATCH_SIZE = 512;
3434
// Visible for testing
35+
static final int DEFAULT_MAX_CONCURRENT_EXPORTS = 1;
36+
// Visible for testing
3537
static final int DEFAULT_EXPORT_TIMEOUT_MILLIS = 30_000;
3638

3739
private final LogRecordExporter logRecordExporter;
3840
private long scheduleDelayNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_SCHEDULE_DELAY_MILLIS);
3941
private int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
4042
private int maxExportBatchSize = DEFAULT_MAX_EXPORT_BATCH_SIZE;
43+
private int maxConcurrentExports = DEFAULT_MAX_CONCURRENT_EXPORTS;
4144
private long exporterTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_EXPORT_TIMEOUT_MILLIS);
4245
private Supplier<MeterProvider> meterProvider = MeterProvider::noop;
4346
private InternalTelemetryVersion telemetryVersion = InternalTelemetryVersion.LEGACY;
@@ -135,6 +138,21 @@ public BatchLogRecordProcessorBuilder setMaxExportBatchSize(int maxExportBatchSi
135138
return this;
136139
}
137140

141+
/**
142+
* Sets the maximum number of concurrent exports.
143+
*
144+
* <p>Default value is {@code 1}.
145+
*
146+
* @param maxConcurrentExports the maximum number of concurrent exports.
147+
* @return this.
148+
* @see BatchLogRecordProcessorBuilder#DEFAULT_MAX_CONCURRENT_EXPORTS
149+
*/
150+
public BatchLogRecordProcessorBuilder setMaxConcurrentExports(int maxConcurrentExports) {
151+
checkArgument(maxConcurrentExports > 0, "maxConcurrentExports must be positive.");
152+
this.maxConcurrentExports = maxConcurrentExports;
153+
return this;
154+
}
155+
138156
/**
139157
* Sets the {@link MeterProvider} to use to collect metrics related to batch export. If not set,
140158
* metrics will not be collected.
@@ -174,6 +192,11 @@ int getMaxExportBatchSize() {
174192
return maxExportBatchSize;
175193
}
176194

195+
// Visible for testing
196+
int getMaxConcurrentExports() {
197+
return maxConcurrentExports;
198+
}
199+
177200
/**
178201
* Returns a new {@link BatchLogRecordProcessor} that batches, then forwards them to the given
179202
* {@code logRecordExporter}.
@@ -195,6 +218,7 @@ public BatchLogRecordProcessor build() {
195218
scheduleDelayNanos,
196219
maxQueueSize,
197220
maxExportBatchSize,
221+
maxConcurrentExports,
198222
exporterTimeoutNanos);
199223
}
200224
}

sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ void builderDefaults() {
7878
.isEqualTo(BatchLogRecordProcessorBuilder.DEFAULT_MAX_QUEUE_SIZE);
7979
assertThat(builder.getMaxExportBatchSize())
8080
.isEqualTo(BatchLogRecordProcessorBuilder.DEFAULT_MAX_EXPORT_BATCH_SIZE);
81+
assertThat(builder.getMaxConcurrentExports())
82+
.isEqualTo(BatchLogRecordProcessorBuilder.DEFAULT_MAX_CONCURRENT_EXPORTS);
8183
assertThat(builder.getExporterTimeoutNanos())
8284
.isEqualTo(
8385
TimeUnit.MILLISECONDS.toNanos(
@@ -119,6 +121,10 @@ void builderInvalidConfig() {
119121
() -> BatchLogRecordProcessor.builder(mockLogRecordExporter).setMaxQueueSize(0))
120122
.isInstanceOf(IllegalArgumentException.class)
121123
.hasMessage("maxQueueSize must be positive.");
124+
assertThatThrownBy(
125+
() -> BatchLogRecordProcessor.builder(mockLogRecordExporter).setMaxConcurrentExports(0))
126+
.isInstanceOf(IllegalArgumentException.class)
127+
.hasMessage("maxConcurrentExports must be positive.");
122128
}
123129

124130
@Test

0 commit comments

Comments
 (0)