Skip to content

Commit be0f519

Browse files
mccullsdevflow.devflow-routing-intake
andauthored
Fix OtelLogRecordProcessor schedule (#11315)
Fix OtelLogRecordProcessor schedule: * if we have enough logs for a batch, keep sending batched logs * otherwise wait for the interval to elapse, then send what we have Review feedback Co-authored-by: devflow.devflow-routing-intake <devflow.devflow-routing-intake@kubernetes.us1.ddbuild.io>
1 parent 1f9c136 commit be0f519

11 files changed

Lines changed: 110 additions & 92 deletions

File tree

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/logs/data/OtelLogRecordProcessor.java

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414
import java.util.Queue;
1515
import java.util.WeakHashMap;
1616
import java.util.concurrent.ArrayBlockingQueue;
17+
import java.util.concurrent.BlockingQueue;
18+
import java.util.concurrent.TimeUnit;
1719
import java.util.function.BiConsumer;
1820

1921
/** Processes log records, grouping them by instrumentation scope. */
2022
public final class OtelLogRecordProcessor {
21-
private static final int MAX_QUEUE_SIZE = Config.get().getLogsOtelQueueSize();
22-
private static final int MAX_BATCH_SIZE = Config.get().getLogsOtelBatchSize();
2323

2424
private static final Comparator<OtlpLogRecord> BY_SCOPE =
2525
Comparator.comparing(o -> o.instrumentationScope);
@@ -29,18 +29,67 @@ public final class OtelLogRecordProcessor {
2929

3030
public static final OtelLogRecordProcessor INSTANCE = new OtelLogRecordProcessor();
3131

32-
private final Queue<OtlpLogRecord> queue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
32+
private final int maxQueueSize = Config.get().getLogsOtelQueueSize();
33+
private final int maxBatchSize = Config.get().getLogsOtelBatchSize();
34+
35+
private final Queue<OtlpLogRecord> queue = new ArrayBlockingQueue<>(maxQueueSize);
36+
37+
private final BlockingQueue<Boolean> logsReady = new ArrayBlockingQueue<>(1);
38+
private volatile int logsNeeded = Integer.MAX_VALUE;
3339

3440
public void addLog(OtlpLogRecord logRecord) {
35-
queue.offer(logRecord);
41+
if (queue.offer(logRecord)) {
42+
// report when we have enough logs for the collector's needs
43+
if (queue.size() >= logsNeeded) {
44+
logsReady.offer(true);
45+
}
46+
}
3647
}
3748

38-
public void collectLogs(OtlpLogsVisitor visitor) {
49+
public void waitForLogs(OtlpLogsVisitor visitor, int intervalMillis) {
50+
long nextExportNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMillis);
51+
List<OtlpLogRecord> batch = new ArrayList<>(maxBatchSize);
52+
int batchSize = 0;
53+
54+
while (true) {
55+
56+
// attempt to collect enough logs to complete the batch
57+
OtlpLogRecord record;
58+
while (batchSize < maxBatchSize && (record = queue.poll()) != null) {
59+
batch.add(record);
60+
batchSize++;
61+
}
62+
63+
// bail out if we have enough logs, or the interval has expired
64+
long waitNanos;
65+
if (batchSize >= maxBatchSize || (waitNanos = nextExportNanos - System.nanoTime()) <= 0) {
66+
break;
67+
}
68+
69+
logsNeeded = maxBatchSize - batchSize; // declare what we need and wait
70+
try {
71+
if (queue.isEmpty()) {
72+
logsReady.poll(waitNanos, TimeUnit.NANOSECONDS);
73+
}
74+
} catch (InterruptedException ignore) {
75+
// don't set interrupt flag as we might then busy-loop, just return batch as-is
76+
break;
77+
} finally {
78+
logsNeeded = Integer.MAX_VALUE;
79+
}
80+
}
81+
82+
visitBatch(visitor, batch); // send what we have for this interval
83+
}
84+
85+
private static void visitBatch(OtlpLogsVisitor visitor, List<OtlpLogRecord> batch) {
86+
batch.sort(BY_SCOPE);
87+
3988
OtlpScopedLogsVisitor scopedVisitor = null;
4089
OtelInstrumentationScope currentScope = null;
4190
BiConsumer<Map<?, ?>, OtlpAttributeVisitor> attributesReader = null;
4291
ClassLoader attributesClassLoader = null;
43-
for (OtlpLogRecord logRecord : batchByScope()) {
92+
for (OtlpLogRecord logRecord : batch) {
4493
if (logRecord.instrumentationScope != currentScope) {
4594
currentScope = logRecord.instrumentationScope;
4695
scopedVisitor = visitor.visitScopedLogs(currentScope);
@@ -70,20 +119,4 @@ public static void registerAttributeReader(
70119
ClassLoader cl, BiConsumer<Map<?, ?>, OtlpAttributeVisitor> reader) {
71120
ATTRIBUTE_READERS.put(cl, reader);
72121
}
73-
74-
private List<OtlpLogRecord> batchByScope() {
75-
// capture expected batch size; records emitted after here go into next batch
76-
int batchSize = Math.min(queue.size(), MAX_BATCH_SIZE);
77-
List<OtlpLogRecord> batch = new ArrayList<>(batchSize);
78-
for (int i = 0; i < batchSize; i++) {
79-
OtlpLogRecord logRecord = queue.poll();
80-
if (logRecord != null) {
81-
batch.add(logRecord);
82-
} else {
83-
break; // should not happen unless another thread is also batching records
84-
}
85-
}
86-
batch.sort(BY_SCOPE);
87-
return batch;
88-
}
89122
}

dd-java-agent/instrumentation/opentelemetry/opentelemetry-1.27/src/test/java/opentelemetry127/logs/OpenTelemetryLogsTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class OpenTelemetryLogsTest extends AbstractInstrumentationTest {
3535
@BeforeEach
3636
void drainQueue() {
3737
// drain any stale log records from the shared processor queue before each test
38-
OtelLogRecordProcessor.INSTANCE.collectLogs(LogsDrainer.INSTANCE);
38+
OtelLogRecordProcessor.INSTANCE.waitForLogs(LogsDrainer.INSTANCE, 0);
3939
}
4040

4141
@ParameterizedTest
@@ -48,7 +48,7 @@ void testSeverity(Severity severity) {
4848

4949
logger.logRecordBuilder().setBody("test message").setSeverity(severity).emit();
5050

51-
OtelLogRecordProcessor.INSTANCE.collectLogs(logsReader);
51+
OtelLogRecordProcessor.INSTANCE.waitForLogs(logsReader, 0);
5252

5353
assertEquals(1, logsReader.logs.size());
5454
CapturedLog log = logsReader.logs.get(0);
@@ -68,7 +68,7 @@ void testSeverityText() {
6868
.setSeverityText("custom-level")
6969
.emit();
7070

71-
OtelLogRecordProcessor.INSTANCE.collectLogs(logsReader);
71+
OtelLogRecordProcessor.INSTANCE.waitForLogs(logsReader, 0);
7272

7373
assertEquals(1, logsReader.logs.size());
7474
CapturedLog log = logsReader.logs.get(0);
@@ -89,7 +89,7 @@ void testAttributes() {
8989
.setAttribute(doubleKey("double.key"), 1.5)
9090
.emit();
9191

92-
OtelLogRecordProcessor.INSTANCE.collectLogs(logsReader);
92+
OtelLogRecordProcessor.INSTANCE.waitForLogs(logsReader, 0);
9393

9494
assertEquals(1, logsReader.logs.size());
9595
CapturedLog log = logsReader.logs.get(0);
@@ -110,7 +110,7 @@ void testMultipleScopes() {
110110
loggerB.logRecordBuilder().setBody("b-1").setSeverity(Severity.WARN).emit();
111111
loggerA.logRecordBuilder().setBody("a-2").setSeverity(Severity.DEBUG).emit();
112112

113-
OtelLogRecordProcessor.INSTANCE.collectLogs(logsReader);
113+
OtelLogRecordProcessor.INSTANCE.waitForLogs(logsReader, 0);
114114

115115
// logs are sorted by scope name, so all scope-a logs come before scope-b logs
116116
assertEquals(3, logsReader.logs.size());

dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/NoopOtlpLogsCollector.java

Lines changed: 0 additions & 11 deletions
This file was deleted.

dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsCollector.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,7 @@
44

55
/** Collects logs ready for export. */
66
public abstract class OtlpLogsCollector {
7-
public abstract OtlpPayload collectLogs();
7+
8+
/** Waits for logs to be batched within the given interval. */
9+
public abstract OtlpPayload waitForLogs(int intervalMillis);
810
}

dd-trace-core/src/main/java/datadog/trace/core/otlp/logs/OtlpLogsProtoCollector.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.ArrayList;
2121
import java.util.Deque;
2222
import java.util.List;
23-
import java.util.function.Consumer;
23+
import java.util.function.ObjIntConsumer;
2424

2525
/**
2626
* Collects OpenTelemetry logs and marshals them into a chunked 'logs.proto' payload.
@@ -61,14 +61,14 @@ private OtlpLogsProtoCollector() {}
6161
* <p>This payload is only valid for the calling thread until the next collection.
6262
*/
6363
@Override
64-
public OtlpPayload collectLogs() {
65-
return collectLogs(OtelLogRecordProcessor.INSTANCE::collectLogs);
64+
public OtlpPayload waitForLogs(int intervalMillis) {
65+
return collectLogs(OtelLogRecordProcessor.INSTANCE::waitForLogs, intervalMillis);
6666
}
6767

68-
OtlpPayload collectLogs(Consumer<OtlpLogsVisitor> processor) {
68+
OtlpPayload collectLogs(ObjIntConsumer<OtlpLogsVisitor> processor, int intervalMillis) {
6969
start();
7070
try {
71-
processor.accept(this);
71+
processor.accept(this, intervalMillis);
7272
return completePayload();
7373
} finally {
7474
stop();
Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
package datadog.trace.core.otlp.logs;
22

33
import static datadog.trace.util.AgentThreadFactory.AgentThread.OTLP_LOGS_EXPORTER;
4+
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
45

56
import datadog.trace.api.Config;
67
import datadog.trace.core.otlp.common.OtlpGrpcSender;
78
import datadog.trace.core.otlp.common.OtlpHttpSender;
89
import datadog.trace.core.otlp.common.OtlpPayload;
910
import datadog.trace.core.otlp.common.OtlpSender;
10-
import datadog.trace.util.AgentTaskScheduler;
11-
import java.util.concurrent.ThreadLocalRandom;
12-
import java.util.concurrent.TimeUnit;
1311
import org.slf4j.Logger;
1412
import org.slf4j.LoggerFactory;
1513

@@ -19,17 +17,14 @@ public final class OtlpLogsService {
1917

2018
public static final OtlpLogsService INSTANCE = new OtlpLogsService(Config.get());
2119

22-
private final AgentTaskScheduler scheduler;
20+
private final int intervalMillis;
2321
private final OtlpLogsCollector collector;
2422
private final OtlpSender sender;
2523

26-
private final int intervalMillis;
27-
28-
private AgentTaskScheduler.Scheduled<?> scheduledTask = null;
24+
private volatile Thread exporterThread;
2925

3026
private OtlpLogsService(Config config) {
31-
this.scheduler = new AgentTaskScheduler(OTLP_LOGS_EXPORTER);
32-
27+
intervalMillis = config.getLogsOtelInterval();
3328
switch (config.getOtlpLogsProtocol()) {
3429
case GRPC:
3530
this.collector = OtlpLogsProtoCollector.INSTANCE;
@@ -53,51 +48,53 @@ private OtlpLogsService(Config config) {
5348
break;
5449
default:
5550
LOGGER.debug("Unsupported OTLP logs protocol: {}", config.getOtlpLogsProtocol());
56-
this.collector = NoopOtlpLogsCollector.INSTANCE;
51+
this.collector = null;
5752
this.sender = null;
5853
}
59-
60-
this.intervalMillis = config.getLogsOtelInterval();
6154
}
6255

6356
public void start() {
6457
if (sender == null) {
6558
return;
6659
}
6760

68-
// add random jitter of up to 5 seconds to initial delay; avoids a fleet
69-
// of apps starting at the same time from exporting OTLP logs in sync
70-
long initialMillis =
71-
intervalMillis
72-
+ Math.min(
73-
(long)
74-
(500d
75-
* Math.log(ThreadLocalRandom.current().nextDouble())
76-
/ Math.log(1 - 0.25)),
77-
5_000);
78-
79-
scheduledTask =
80-
scheduler.scheduleAtFixedRate(
81-
this::export, initialMillis, intervalMillis, TimeUnit.MILLISECONDS);
61+
exporterThread = newAgentThread(OTLP_LOGS_EXPORTER, this::export);
62+
exporterThread.start();
8263
}
8364

8465
public void flush() {
85-
scheduler.execute(this::export);
66+
Thread thread = exporterThread;
67+
if (thread != null) {
68+
thread.interrupt();
69+
}
8670
}
8771

8872
public void shutdown() {
89-
if (scheduledTask != null) {
90-
scheduledTask.cancel();
73+
Thread thread = exporterThread;
74+
if (thread != null) {
75+
exporterThread = null;
76+
thread.interrupt();
77+
try {
78+
thread.join(1_000);
79+
} catch (InterruptedException ignore) {
80+
// don't set interrupt flag as we're mid-shutdown
81+
}
9182
}
9283
if (sender != null) {
9384
sender.shutdown();
9485
}
9586
}
9687

9788
private void export() {
98-
OtlpPayload payload = collector.collectLogs();
99-
if (payload != OtlpPayload.EMPTY) {
100-
sender.send(payload);
89+
while (Thread.currentThread() == exporterThread) {
90+
try {
91+
OtlpPayload payload = collector.waitForLogs(intervalMillis);
92+
if (payload != OtlpPayload.EMPTY) {
93+
sender.send(payload);
94+
}
95+
} catch (RuntimeException e) {
96+
LOGGER.debug("Uncaught exception exporting logs", e);
97+
}
10198
}
10299
}
103100
}

dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/NoopOtlpMetricsCollector.java

Lines changed: 0 additions & 11 deletions
This file was deleted.

dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsCollector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,7 @@
44

55
/** Collects metrics ready for export. */
66
public abstract class OtlpMetricsCollector {
7+
8+
/** Collects all metrics recorded since the last collection. */
79
public abstract OtlpPayload collectMetrics();
810
}

dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ private OtlpMetricsService(Config config) {
5353
break;
5454
default:
5555
LOGGER.debug("Unsupported OTLP metrics protocol: {}", config.getOtlpMetricsProtocol());
56-
this.collector = NoopOtlpMetricsCollector.INSTANCE;
56+
this.collector = null;
5757
this.sender = null;
5858
}
5959

@@ -82,7 +82,9 @@ public void start() {
8282
}
8383

8484
public void flush() {
85-
scheduler.execute(this::export);
85+
if (sender != null) {
86+
scheduler.execute(this::export);
87+
}
8688
}
8789

8890
public void shutdown() {

dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceCollector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
/** Collects traces ready for export. */
88
public abstract class OtlpTraceCollector {
99

10+
/** Adds spans from the given trace to the collector. */
1011
public abstract void addTrace(List<? extends CoreSpan<?>> spans);
1112

13+
/** Collects all spans added since the last collection. */
1214
public abstract OtlpPayload collectTraces();
1315
}

0 commit comments

Comments
 (0)