Skip to content

Commit 1602f06

Browse files
mccullsdevflow.devflow-routing-intake
andauthored
Implement OpenTelemetry Logs API (#11224)
Implement OpenTelemetry Logs API Review feedback Move defensive attributes copy to if builder is used after emit. Also make attributes non-null in OtlpLogRecord. Add 'opentelemetry-1' as an alias for OTel logs instrumentation - to match OTel traces instrumentation Cleanup test package/class names Review feedback Add gradle.lockfile Co-authored-by: devflow.devflow-routing-intake <devflow.devflow-routing-intake@kubernetes.us1.ddbuild.io>
1 parent 52c2ac6 commit 1602f06

31 files changed

Lines changed: 1335 additions & 69 deletions

File tree

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/OtelInstrumentationScope.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import javax.annotation.Nullable;
66

77
/** Instrumentation scopes have a mandatory name, optional version, and optional schema URL. */
8-
public final class OtelInstrumentationScope {
8+
public final class OtelInstrumentationScope implements Comparable<OtelInstrumentationScope> {
99

1010
private final UTF8BytesString scopeName;
1111
@Nullable private final UTF8BytesString scopeVersion;
@@ -32,6 +32,34 @@ public UTF8BytesString getSchemaUrl() {
3232
return schemaUrl;
3333
}
3434

35+
@Override
36+
public int compareTo(OtelInstrumentationScope that) {
37+
int cmp = scopeName.toString().compareTo(that.scopeName.toString());
38+
if (cmp != 0) {
39+
return cmp;
40+
}
41+
if (scopeVersion != that.scopeVersion) {
42+
if (scopeVersion == null) {
43+
return -1;
44+
} else if (that.scopeVersion == null) {
45+
return 1;
46+
}
47+
cmp = scopeVersion.toString().compareTo(that.scopeVersion.toString());
48+
if (cmp != 0) {
49+
return cmp;
50+
}
51+
}
52+
if (schemaUrl != that.schemaUrl) {
53+
if (schemaUrl == null) {
54+
return -1;
55+
} else if (that.schemaUrl == null) {
56+
return 1;
57+
}
58+
return schemaUrl.toString().compareTo(that.schemaUrl.toString());
59+
}
60+
return 0;
61+
}
62+
3563
@Override
3664
public boolean equals(Object o) {
3765
if (!(o instanceof OtelInstrumentationScope)) {
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package datadog.trace.bootstrap.otel.logs.data;
2+
3+
import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
4+
import datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor;
5+
import datadog.trace.bootstrap.otlp.logs.OtlpLogRecord;
6+
import datadog.trace.bootstrap.otlp.logs.OtlpLogsVisitor;
7+
import datadog.trace.bootstrap.otlp.logs.OtlpScopedLogsVisitor;
8+
import java.util.ArrayList;
9+
import java.util.Collections;
10+
import java.util.Comparator;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.Objects;
14+
import java.util.Queue;
15+
import java.util.WeakHashMap;
16+
import java.util.concurrent.ArrayBlockingQueue;
17+
import java.util.function.BiConsumer;
18+
19+
/** Processes log records, grouping them by instrumentation scope. */
20+
public final class OtelLogRecordProcessor {
21+
public static final OtelLogRecordProcessor INSTANCE = new OtelLogRecordProcessor();
22+
23+
private static final Comparator<OtlpLogRecord> BY_SCOPE =
24+
Comparator.comparing(o -> o.instrumentationScope);
25+
26+
private static final Map<ClassLoader, BiConsumer<Map<?, ?>, OtlpAttributeVisitor>>
27+
ATTRIBUTE_READERS = Collections.synchronizedMap(new WeakHashMap<>());
28+
29+
private final Queue<OtlpLogRecord> queue = new ArrayBlockingQueue<>(2048);
30+
31+
public void addLog(OtlpLogRecord logRecord) {
32+
queue.offer(logRecord);
33+
}
34+
35+
public void collectLogs(OtlpLogsVisitor visitor) {
36+
OtlpScopedLogsVisitor scopedVisitor = null;
37+
OtelInstrumentationScope currentScope = null;
38+
BiConsumer<Map<?, ?>, OtlpAttributeVisitor> attributesReader = null;
39+
ClassLoader attributesClassLoader = null;
40+
for (OtlpLogRecord logRecord : batchByScope()) {
41+
if (logRecord.instrumentationScope != currentScope) {
42+
currentScope = logRecord.instrumentationScope;
43+
scopedVisitor = visitor.visitScopedLogs(currentScope);
44+
}
45+
Map<?, ?> attributes = logRecord.attributes;
46+
if (!attributes.isEmpty()) {
47+
ClassLoader cl = getAttributesClassLoader(attributes);
48+
// avoid repeated lookups when attribute class-loader is same for all records
49+
if (attributesReader == null || !Objects.equals(cl, attributesClassLoader)) {
50+
attributesReader = ATTRIBUTE_READERS.get(cl);
51+
attributesClassLoader = cl;
52+
}
53+
if (attributesReader != null) {
54+
attributesReader.accept(attributes, scopedVisitor);
55+
}
56+
}
57+
scopedVisitor.visitLogRecord(logRecord);
58+
}
59+
}
60+
61+
private static ClassLoader getAttributesClassLoader(Map<?, ?> attributes) {
62+
// need to peek at the first key, as the map will be a JDK collection type
63+
return attributes.keySet().iterator().next().getClass().getClassLoader();
64+
}
65+
66+
public static void registerAttributeReader(
67+
ClassLoader cl, BiConsumer<Map<?, ?>, OtlpAttributeVisitor> reader) {
68+
ATTRIBUTE_READERS.put(cl, reader);
69+
}
70+
71+
private List<OtlpLogRecord> batchByScope() {
72+
// capture expected batch size; records emitted after here go into next batch
73+
int batchSize = queue.size();
74+
List<OtlpLogRecord> batch = new ArrayList<>(batchSize);
75+
for (int i = 0; i < batchSize; i++) {
76+
OtlpLogRecord logRecord = queue.poll();
77+
if (logRecord != null) {
78+
batch.add(logRecord);
79+
} else {
80+
break; // should not happen unless another thread is also batching records
81+
}
82+
}
83+
batch.sort(BY_SCOPE);
84+
return batch;
85+
}
86+
}

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otlp/common/OtlpAttributeVisitor.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
/** A visitor to visit OpenTelemetry attributes. */
44
public interface OtlpAttributeVisitor {
55

6-
int STRING = 0; // AttributeType.STRING
7-
int BOOLEAN = 1; // AttributeType.BOOLEAN
8-
int LONG = 2; // AttributeType.LONG
9-
int DOUBLE = 3; // AttributeType.DOUBLE
10-
int STRING_ARRAY = 4; // AttributeType.STRING_ARRAY
11-
int BOOLEAN_ARRAY = 5; // AttributeType.BOOLEAN_ARRAY
12-
int LONG_ARRAY = 6; // AttributeType.LONG_ARRAY
13-
int DOUBLE_ARRAY = 7; // AttributeType.DOUBLE_ARRAY
6+
int STRING_ATTRIBUTE = 0; // AttributeType.STRING
7+
int BOOLEAN_ATTRIBUTE = 1; // AttributeType.BOOLEAN
8+
int LONG_ATTRIBUTE = 2; // AttributeType.LONG
9+
int DOUBLE_ATTRIBUTE = 3; // AttributeType.DOUBLE
10+
int STRING_ARRAY_ATTRIBUTE = 4; // AttributeType.STRING_ARRAY
11+
int BOOLEAN_ARRAY_ATTRIBUTE = 5; // AttributeType.BOOLEAN_ARRAY
12+
int LONG_ARRAY_ATTRIBUTE = 6; // AttributeType.LONG_ARRAY
13+
int DOUBLE_ARRAY_ATTRIBUTE = 7; // AttributeType.DOUBLE_ARRAY
1414

1515
/**
1616
* Visits an attribute.
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package datadog.trace.bootstrap.otlp.logs;
2+
3+
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
4+
import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
5+
import java.util.Map;
6+
import javax.annotation.Nullable;
7+
8+
public final class OtlpLogRecord {
9+
10+
public static final int STRING_BODY = 0; // ValueType.STRING
11+
public static final int BOOLEAN_BODY = 1; // ValueType.BOOLEAN
12+
public static final int LONG_BODY = 2; // ValueType.LONG
13+
public static final int DOUBLE_BODY = 3; // ValueType.DOUBLE
14+
public static final int ARRAY_BODY = 4; // ValueType.ARRAY
15+
public static final int KEY_VALUE_LIST_BODY = 5; // ValueType.KEY_VALUE_LIST
16+
public static final int BYTES_BODY = 6; // ValueType.BYTES
17+
18+
public final OtelInstrumentationScope instrumentationScope;
19+
20+
public final long timestampNanos;
21+
public final long observedNanos;
22+
public final int severityNumber;
23+
@Nullable public final String severityText;
24+
public final int bodyType;
25+
@Nullable public final Object bodyValue;
26+
@Nullable public final String eventName;
27+
public final Map<?, ?> attributes;
28+
@Nullable public final AgentSpanContext spanContext;
29+
30+
public OtlpLogRecord(
31+
OtelInstrumentationScope instrumentationScope,
32+
long timestampNanos,
33+
long observedNanos,
34+
int severityNumber,
35+
@Nullable String severityText,
36+
int bodyType,
37+
@Nullable Object bodyValue,
38+
@Nullable String eventName,
39+
Map<?, ?> attributes,
40+
@Nullable AgentSpanContext spanContext) {
41+
this.instrumentationScope = instrumentationScope;
42+
this.timestampNanos = timestampNanos;
43+
this.observedNanos = observedNanos;
44+
this.severityNumber = severityNumber;
45+
this.severityText = severityText;
46+
this.bodyType = bodyType;
47+
this.bodyValue = bodyValue;
48+
this.eventName = eventName;
49+
this.attributes = attributes;
50+
this.spanContext = spanContext;
51+
}
52+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package datadog.trace.bootstrap.otlp.logs;
2+
3+
import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
4+
5+
/** A visitor to visit OpenTelemetry logs. */
6+
public interface OtlpLogsVisitor {
7+
/** Visits logs produced by an instrumentation scope. */
8+
OtlpScopedLogsVisitor visitScopedLogs(OtelInstrumentationScope scope);
9+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package datadog.trace.bootstrap.otlp.logs;
2+
3+
import datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor;
4+
5+
/** A visitor to visit log records produced by an instrumentation scope. */
6+
public interface OtlpScopedLogsVisitor extends OtlpAttributeVisitor {
7+
8+
/** Visits an attribute of the upcoming log record. */
9+
void visitAttribute(int type, String key, Object value);
10+
11+
/** Visits a log record. */
12+
void visitLogRecord(OtlpLogRecord logRecord);
13+
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package datadog.opentelemetry.shim.logs;
2+
3+
import static datadog.opentelemetry.shim.trace.OtelExtractedContext.extract;
4+
import static datadog.trace.bootstrap.otlp.logs.OtlpLogRecord.STRING_BODY;
5+
import static io.opentelemetry.api.common.AttributeKey.stringKey;
6+
7+
import datadog.trace.api.time.SystemTimeSource;
8+
import datadog.trace.api.time.TimeSource;
9+
import datadog.trace.bootstrap.otel.logs.data.OtelLogRecordProcessor;
10+
import datadog.trace.bootstrap.otlp.logs.OtlpLogRecord;
11+
import io.opentelemetry.api.common.AttributeKey;
12+
import io.opentelemetry.api.common.Value;
13+
import io.opentelemetry.api.logs.LogRecordBuilder;
14+
import io.opentelemetry.api.logs.Severity;
15+
import io.opentelemetry.context.Context;
16+
import java.time.Instant;
17+
import java.util.Collections;
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
import java.util.concurrent.TimeUnit;
21+
import javax.annotation.Nullable;
22+
import javax.annotation.ParametersAreNonnullByDefault;
23+
24+
@ParametersAreNonnullByDefault
25+
final class OtelLogRecordBuilder implements LogRecordBuilder {
26+
// package-visible for testing
27+
static TimeSource TIME_SOURCE = SystemTimeSource.INSTANCE;
28+
29+
private static final AttributeKey<String> EXCEPTION_TYPE_KEY = stringKey("exception.type");
30+
private static final AttributeKey<String> EXCEPTION_MESSAGE_KEY = stringKey("exception.message");
31+
32+
private final OtelLogger logger;
33+
34+
private long timestampNanos;
35+
private long observedNanos;
36+
private Severity severity = Severity.UNDEFINED_SEVERITY_NUMBER;
37+
@Nullable private String severityText;
38+
private int bodyType;
39+
@Nullable private Object bodyValue;
40+
@Nullable private String eventName;
41+
@Nullable private Map<AttributeKey<?>, Object> attributes;
42+
@Nullable private Context context;
43+
44+
private boolean attributesEmitted;
45+
46+
OtelLogRecordBuilder(OtelLogger logger) {
47+
this.logger = logger;
48+
}
49+
50+
@Override
51+
public LogRecordBuilder setTimestamp(long timestamp, TimeUnit unit) {
52+
this.timestampNanos = unit.toNanos(timestamp);
53+
return this;
54+
}
55+
56+
@Override
57+
public LogRecordBuilder setTimestamp(Instant instant) {
58+
this.timestampNanos = TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano();
59+
return this;
60+
}
61+
62+
@Override
63+
public LogRecordBuilder setObservedTimestamp(long timestamp, TimeUnit unit) {
64+
this.observedNanos = unit.toNanos(timestamp);
65+
return this;
66+
}
67+
68+
@Override
69+
public LogRecordBuilder setObservedTimestamp(Instant instant) {
70+
this.observedNanos = TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano();
71+
return this;
72+
}
73+
74+
@Override
75+
public LogRecordBuilder setSeverity(Severity severity) {
76+
this.severity = severity;
77+
return this;
78+
}
79+
80+
@Override
81+
public LogRecordBuilder setSeverityText(String severityText) {
82+
this.severityText = severityText;
83+
return this;
84+
}
85+
86+
@Override
87+
public LogRecordBuilder setBody(String value) {
88+
this.bodyType = STRING_BODY;
89+
this.bodyValue = value;
90+
return this;
91+
}
92+
93+
@Override
94+
public LogRecordBuilder setBody(Value<?> body) {
95+
this.bodyType = body.getType().ordinal();
96+
this.bodyValue = body.getValue();
97+
return this;
98+
}
99+
100+
@Override
101+
public <T> LogRecordBuilder setAttribute(@Nullable AttributeKey<T> key, @Nullable T value) {
102+
if (key == null || key.getKey().isEmpty()) {
103+
return this;
104+
}
105+
if (attributesEmitted && attributes != null) {
106+
// defensive copy if builder used after emit
107+
attributes = new HashMap<>(attributes);
108+
attributesEmitted = false;
109+
}
110+
if (value != null) {
111+
if (attributes == null) {
112+
attributes = new HashMap<>();
113+
}
114+
attributes.put(key, value);
115+
} else if (attributes != null) {
116+
attributes.remove(key);
117+
}
118+
return this;
119+
}
120+
121+
@Override
122+
public LogRecordBuilder setContext(Context context) {
123+
this.context = context;
124+
return this;
125+
}
126+
127+
public LogRecordBuilder setEventName(String eventName) {
128+
this.eventName = eventName;
129+
return this;
130+
}
131+
132+
public LogRecordBuilder setException(@Nullable Throwable throwable) {
133+
if (throwable != null) {
134+
setExceptionAttribute(EXCEPTION_TYPE_KEY, throwable.getClass().getName());
135+
setExceptionAttribute(EXCEPTION_MESSAGE_KEY, throwable.getMessage());
136+
}
137+
return this;
138+
}
139+
140+
private void setExceptionAttribute(AttributeKey<String> key, @Nullable String value) {
141+
// avoid overwriting/removing existing exception details
142+
if (value != null && (attributes == null || !attributes.containsKey(key))) {
143+
setAttribute(key, value);
144+
}
145+
}
146+
147+
@Override
148+
public void emit() {
149+
attributesEmitted = true;
150+
Context context = this.context != null ? this.context : Context.current();
151+
if (logger.isEnabled(severity, context)) {
152+
OtelLogRecordProcessor.INSTANCE.addLog(
153+
new OtlpLogRecord(
154+
logger.instrumentationScope,
155+
timestampNanos,
156+
observedNanos != 0 ? observedNanos : TIME_SOURCE.getCurrentTimeNanos(),
157+
severity.getSeverityNumber(),
158+
severityText,
159+
bodyType,
160+
bodyValue,
161+
eventName,
162+
attributes != null ? attributes : Collections.emptyMap(),
163+
extract(context)));
164+
}
165+
}
166+
}

0 commit comments

Comments
 (0)