Skip to content

Commit 48a6076

Browse files
committed
Provide optimized writers for OpenTelemetry's "logs.proto" wire protocol
1 parent 4eb1384 commit 48a6076

21 files changed

Lines changed: 1569 additions & 59 deletions

File tree

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/logs/data/OtelLogRecordProcessor.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.bootstrap.otel.logs.data;
22

3+
import datadog.trace.api.Config;
34
import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
45
import datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor;
56
import datadog.trace.bootstrap.otlp.logs.OtlpLogRecord;
@@ -18,15 +19,18 @@
1819

1920
/** Processes log records, grouping them by instrumentation scope. */
2021
public final class OtelLogRecordProcessor {
21-
public static final OtelLogRecordProcessor INSTANCE = new OtelLogRecordProcessor();
22+
private static final int MAX_QUEUE_SIZE = Config.get().getLogsOtelQueueSize();
23+
private static final int MAX_BATCH_SIZE = Config.get().getLogsOtelBatchSize();
2224

2325
private static final Comparator<OtlpLogRecord> BY_SCOPE =
2426
Comparator.comparing(o -> o.instrumentationScope);
2527

2628
private static final Map<ClassLoader, BiConsumer<Map<?, ?>, OtlpAttributeVisitor>>
2729
ATTRIBUTE_READERS = Collections.synchronizedMap(new WeakHashMap<>());
2830

29-
private final Queue<OtlpLogRecord> queue = new ArrayBlockingQueue<>(2048);
31+
public static final OtelLogRecordProcessor INSTANCE = new OtelLogRecordProcessor();
32+
33+
private final Queue<OtlpLogRecord> queue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
3034

3135
public void addLog(OtlpLogRecord logRecord) {
3236
queue.offer(logRecord);
@@ -70,7 +74,7 @@ public static void registerAttributeReader(
7074

7175
private List<OtlpLogRecord> batchByScope() {
7276
// capture expected batch size; records emitted after here go into next batch
73-
int batchSize = queue.size();
77+
int batchSize = Math.min(queue.size(), MAX_BATCH_SIZE);
7478
List<OtlpLogRecord> batch = new ArrayList<>(batchSize);
7579
for (int i = 0; i < batchSize; i++) {
7680
OtlpLogRecord logRecord = queue.poll();

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otlp/logs/OtlpLogRecord.java

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,46 +7,35 @@
77

88
public final class OtlpLogRecord {
99

10-
public static final int STRING_BODY = 0; // ValueType.STRING
11-
public static final int BOOLEAN_BODY = 1; // ValueType.BOOLEAN
12-
public static final int LONG_BODY = 2; // ValueType.LONG
13-
public static final int DOUBLE_BODY = 3; // ValueType.DOUBLE
14-
public static final int ARRAY_BODY = 4; // ValueType.ARRAY
15-
public static final int KEY_VALUE_LIST_BODY = 5; // ValueType.KEY_VALUE_LIST
16-
public static final int BYTES_BODY = 6; // ValueType.BYTES
17-
1810
public final OtelInstrumentationScope instrumentationScope;
1911

2012
public final long timestampNanos;
2113
public final long observedNanos;
2214
public final int severityNumber;
2315
@Nullable public final String severityText;
24-
public final int bodyType;
25-
@Nullable public final Object bodyValue;
26-
@Nullable public final String eventName;
16+
public final String body;
2717
public final Map<?, ?> attributes;
2818
@Nullable public final AgentSpanContext spanContext;
19+
@Nullable public final String eventName;
2920

3021
public OtlpLogRecord(
3122
OtelInstrumentationScope instrumentationScope,
3223
long timestampNanos,
3324
long observedNanos,
3425
int severityNumber,
3526
@Nullable String severityText,
36-
int bodyType,
37-
@Nullable Object bodyValue,
38-
@Nullable String eventName,
27+
final String body,
3928
Map<?, ?> attributes,
40-
@Nullable AgentSpanContext spanContext) {
29+
@Nullable AgentSpanContext spanContext,
30+
@Nullable String eventName) {
4131
this.instrumentationScope = instrumentationScope;
4232
this.timestampNanos = timestampNanos;
4333
this.observedNanos = observedNanos;
4434
this.severityNumber = severityNumber;
4535
this.severityText = severityText;
46-
this.bodyType = bodyType;
47-
this.bodyValue = bodyValue;
48-
this.eventName = eventName;
36+
this.body = body;
4937
this.attributes = attributes;
5038
this.spanContext = spanContext;
39+
this.eventName = eventName;
5140
}
5241
}

dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/logs/OtelLogRecordBuilder.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package datadog.opentelemetry.shim.logs;
22

33
import static datadog.opentelemetry.shim.trace.OtelExtractedContext.extract;
4-
import static datadog.trace.bootstrap.otlp.logs.OtlpLogRecord.STRING_BODY;
54
import static io.opentelemetry.api.common.AttributeKey.stringKey;
65

76
import datadog.trace.api.time.SystemTimeSource;
@@ -17,6 +16,7 @@
1716
import java.util.Collections;
1817
import java.util.HashMap;
1918
import java.util.Map;
19+
import java.util.Objects;
2020
import java.util.concurrent.TimeUnit;
2121
import javax.annotation.Nullable;
2222
import javax.annotation.ParametersAreNonnullByDefault;
@@ -35,11 +35,10 @@ final class OtelLogRecordBuilder implements LogRecordBuilder {
3535
private long observedNanos;
3636
private Severity severity = Severity.UNDEFINED_SEVERITY_NUMBER;
3737
@Nullable private String severityText;
38-
private int bodyType;
39-
@Nullable private Object bodyValue;
40-
@Nullable private String eventName;
38+
@Nullable private String body;
4139
@Nullable private Map<AttributeKey<?>, Object> attributes;
4240
@Nullable private Context context;
41+
@Nullable private String eventName;
4342

4443
private boolean attributesEmitted;
4544

@@ -73,7 +72,7 @@ public LogRecordBuilder setObservedTimestamp(Instant instant) {
7372

7473
@Override
7574
public LogRecordBuilder setSeverity(Severity severity) {
76-
this.severity = severity;
75+
this.severity = Objects.requireNonNull(severity);
7776
return this;
7877
}
7978

@@ -84,16 +83,14 @@ public LogRecordBuilder setSeverityText(String severityText) {
8483
}
8584

8685
@Override
87-
public LogRecordBuilder setBody(String value) {
88-
this.bodyType = STRING_BODY;
89-
this.bodyValue = value;
86+
public LogRecordBuilder setBody(String body) {
87+
this.body = body;
9088
return this;
9189
}
9290

9391
@Override
9492
public LogRecordBuilder setBody(Value<?> body) {
95-
this.bodyType = body.getType().ordinal();
96-
this.bodyValue = body.getValue();
93+
this.body = body.asString();
9794
return this;
9895
}
9996

@@ -146,7 +143,9 @@ private void setExceptionAttribute(AttributeKey<String> key, @Nullable String va
146143

147144
@Override
148145
public void emit() {
149-
attributesEmitted = true;
146+
if (body == null || body.isEmpty()) {
147+
return; // drop log records without a body
148+
}
150149
Context context = this.context != null ? this.context : Context.current();
151150
if (logger.isEnabled(severity, context)) {
152151
OtelLogRecordProcessor.INSTANCE.addLog(
@@ -156,11 +155,12 @@ public void emit() {
156155
observedNanos != 0 ? observedNanos : TIME_SOURCE.getCurrentTimeNanos(),
157156
severity.getSeverityNumber(),
158157
severityText,
159-
bodyType,
160-
bodyValue,
161-
eventName,
158+
body,
162159
attributes != null ? attributes : Collections.emptyMap(),
163-
extract(context)));
160+
extract(context),
161+
eventName));
162+
163+
attributesEmitted = true;
164164
}
165165
}
166166
}

dd-java-agent/instrumentation/opentelemetry/opentelemetry-1.27/src/test/java/opentelemetry127/logs/OpenTelemetryLogsTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ void testSeverity(Severity severity) {
5454
CapturedLog log = logsReader.logs.get(0);
5555
assertEquals("test-severity", log.scopeName);
5656
assertEquals(severity.getSeverityNumber(), log.severityNumber);
57-
assertEquals("test message", log.bodyValue);
57+
assertEquals("test message", log.body);
5858
}
5959

6060
@Test
@@ -94,7 +94,7 @@ void testAttributes() {
9494
assertEquals(1, logsReader.logs.size());
9595
CapturedLog log = logsReader.logs.get(0);
9696
assertEquals("test-attributes", log.scopeName);
97-
assertEquals("attributed message", log.bodyValue);
97+
assertEquals("attributed message", log.body);
9898
assertEquals("str-value", log.attributes.get("str.key"));
9999
assertEquals(42L, log.attributes.get("long.key"));
100100
assertEquals(true, log.attributes.get("bool.key"));
@@ -125,30 +125,30 @@ void testMultipleScopes() {
125125
.collect(Collectors.toList());
126126

127127
assertEquals(2, scopeALogs.size());
128-
assertEquals("a-1", scopeALogs.get(0).bodyValue);
129-
assertEquals("a-2", scopeALogs.get(1).bodyValue);
128+
assertEquals("a-1", scopeALogs.get(0).body);
129+
assertEquals("a-2", scopeALogs.get(1).body);
130130

131131
assertEquals(1, scopeBLogs.size());
132-
assertEquals("b-1", scopeBLogs.get(0).bodyValue);
132+
assertEquals("b-1", scopeBLogs.get(0).body);
133133
}
134134

135135
static class CapturedLog {
136136
final String scopeName;
137137
final int severityNumber;
138138
final String severityText;
139-
final Object bodyValue;
139+
final String body;
140140
final Map<String, Object> attributes;
141141

142142
CapturedLog(
143143
String scopeName,
144144
int severityNumber,
145145
String severityText,
146-
Object bodyValue,
146+
String body,
147147
Map<String, Object> attributes) {
148148
this.scopeName = scopeName;
149149
this.severityNumber = severityNumber;
150150
this.severityText = severityText;
151-
this.bodyValue = bodyValue;
151+
this.body = body;
152152
this.attributes = attributes;
153153
}
154154
}
@@ -176,7 +176,7 @@ public void visitLogRecord(OtlpLogRecord logRecord) {
176176
currentScopeName,
177177
logRecord.severityNumber,
178178
logRecord.severityText,
179-
logRecord.bodyValue,
179+
logRecord.body,
180180
new HashMap<>(currentAttributes)));
181181
currentAttributes.clear();
182182
}

dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,10 @@ public final class ConfigDefaults {
102102
static final int DEFAULT_JMX_FETCH_MULTIPLE_RUNTIME_SERVICES_LIMIT = 10;
103103

104104
public static final boolean DEFAULT_LOGS_OTEL_ENABLED = false;
105-
static final int DEFAULT_OTLP_LOGS_TIMEOUT = 10_000; // ms
105+
static final int DEFAULT_LOGS_OTEL_INTERVAL = 1_000; // ms
106+
static final int DEFAULT_LOGS_OTEL_TIMEOUT = 30_000; // ms
107+
static final int DEFAULT_LOGS_OTEL_QUEUE_SIZE = 2048;
108+
static final int DEFAULT_LOGS_OTEL_BATCH_SIZE = 512;
106109

107110
public static final boolean DEFAULT_METRICS_OTEL_ENABLED = false;
108111
// Default recommended by Datadog; it differs from Otel’s default of 60000 (60s)

dd-trace-api/src/main/java/datadog/trace/api/config/OtlpConfig.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ public final class OtlpConfig {
44

55
public static final String LOGS_OTEL_ENABLED = "logs.otel.enabled";
66
public static final String LOGS_OTEL_EXPORTER = "logs.otel.exporter";
7+
public static final String LOGS_OTEL_INTERVAL = "logs.otel.interval";
8+
public static final String LOGS_OTEL_TIMEOUT = "logs.otel.timeout";
9+
public static final String LOGS_OTEL_QUEUE_SIZE = "logs.otel.queue.size";
10+
public static final String LOGS_OTEL_BATCH_SIZE = "logs.otel.batch.size";
711

812
public static final String OTLP_LOGS_ENDPOINT = "otlp.logs.endpoint";
913
public static final String OTLP_LOGS_HEADERS = "otlp.logs.headers";
@@ -13,7 +17,6 @@ public final class OtlpConfig {
1317

1418
public static final String METRICS_OTEL_ENABLED = "metrics.otel.enabled";
1519
public static final String METRICS_OTEL_EXPORTER = "metrics.otel.exporter";
16-
1720
public static final String METRICS_OTEL_INTERVAL = "metrics.otel.interval";
1821
public static final String METRICS_OTEL_TIMEOUT = "metrics.otel.timeout";
1922
public static final String METRICS_OTEL_CARDINALITY_LIMIT = "metrics.otel.cardinality.limit";

dd-trace-api/src/main/java/datadog/trace/api/internal/InternalTracer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ public interface InternalTracer {
2020

2121
void flushMetrics();
2222

23+
void flushLogs();
24+
2325
Profiling getProfilingContext();
2426

2527
TraceSegment getTraceSegment();

dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
import datadog.trace.core.datastreams.DisabledDataStreamsMonitoring;
9595
import datadog.trace.core.monitor.HealthMetrics;
9696
import datadog.trace.core.monitor.TracerHealthMetrics;
97+
import datadog.trace.core.otlp.logs.OtlpLogsService;
9798
import datadog.trace.core.otlp.metrics.OtlpMetricsService;
9899
import datadog.trace.core.propagation.ExtractedContext;
99100
import datadog.trace.core.propagation.HttpCodec;
@@ -802,10 +803,16 @@ private CoreTracer(
802803
// allowed
803804
metricsAggregator = NoOpMetricsAggregator.INSTANCE;
804805
final SharedCommunicationObjects sco = sharedCommunicationObjects;
805-
// asynchronously create the aggregator to avoid triggering expensive classloading during the
806-
// tracer initialisation.
806+
// asynchronously create these aggregator/export components to avoid triggering
807+
// expensive classloading during the tracer initialisation.
807808
sharedCommunicationObjects.whenReady(
808-
() -> AgentTaskScheduler.get().execute(() -> startMetricsAggregation(config, sco)));
809+
() ->
810+
AgentTaskScheduler.get()
811+
.execute(
812+
() -> {
813+
startMetricsAggregation(config, sco);
814+
maybeStartLogsExport(config);
815+
}));
809816

810817
if (dataStreamsMonitoring == null) {
811818
// Avoid DSM in bazel hermetic mode
@@ -929,6 +936,12 @@ private void startMetricsAggregation(Config config, SharedCommunicationObjects s
929936
}
930937
}
931938

939+
private void maybeStartLogsExport(Config config) {
940+
if (config.isLogsOtlpExporterEnabled()) {
941+
OtlpLogsService.INSTANCE.start();
942+
}
943+
}
944+
932945
/** Used by AgentTestRunner to inject configuration into the test tracer. */
933946
public void rebuildTraceConfig(Config config) {
934947
dynamicConfig
@@ -1455,6 +1468,9 @@ public void close() {
14551468
if (initialConfig.isMetricsOtlpExporterEnabled()) {
14561469
OtlpMetricsService.INSTANCE.shutdown();
14571470
}
1471+
if (initialConfig.isLogsOtlpExporterEnabled()) {
1472+
OtlpLogsService.INSTANCE.shutdown();
1473+
}
14581474
dataStreamsMonitoring.close();
14591475
externalAgentLauncher.close();
14601476
healthMetrics.close();
@@ -1496,6 +1512,13 @@ public void flushMetrics() {
14961512
}
14971513
}
14981514

1515+
@Override
1516+
public void flushLogs() {
1517+
if (initialConfig.isLogsOtlpExporterEnabled()) {
1518+
OtlpLogsService.INSTANCE.flush();
1519+
}
1520+
}
1521+
14991522
@Override
15001523
public ProfilingContextIntegration getProfilingContext() {
15011524
return profilingContextIntegration;
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package datadog.trace.core.otlp.logs;
2+
3+
import datadog.trace.core.otlp.common.OtlpPayload;
4+
5+
/** Collects logs ready for export. */
6+
public interface OtlpLogsCollector {
7+
OtlpLogsCollector NOOP_COLLECTOR = () -> OtlpPayload.EMPTY;
8+
9+
OtlpPayload collectLogs();
10+
}

0 commit comments

Comments
 (0)