Skip to content

Commit bbaa2d2

Browse files
authored
Provide optimized writers for OpenTelemetry's "common.proto" wire protocol (#10947)
Refactor OTLP data types to start with Otlp... Use communication module in otel-bootstrap Introduce OtelAttributeVisitor, preserving the attribute type id Provide optimized writers for OpenTelemetry's "common.proto" wire protocol Provide proto writer for InstrumentationScope review feedback Extend test coverage Prefix OTLP related types/helpers with Otlp Rename visitPoint to visitDataPoint Co-authored-by: stuart.mcculloch <stuart.mcculloch@datadoghq.com>
1 parent 1e19001 commit bbaa2d2

File tree

23 files changed

+952
-78
lines changed

23 files changed

+952
-78
lines changed

dd-java-agent/agent-otel/otel-bootstrap/build.gradle

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,11 @@ dependencies {
7272
// selected bootstrap types shared across multiple OpenTelemetry instrumentations
7373
embeddedClasspath group: 'io.opentelemetry.javaagent.instrumentation', name: 'opentelemetry-javaagent-servlet-common-bootstrap', version: "$otelInstrumentationApiVersion-alpha"
7474

75-
compileOnly project(':dd-java-agent:agent-bootstrap')
76-
75+
implementation project(':dd-java-agent:agent-bootstrap')
7776
implementation project(':utils:logging-utils')
77+
implementation project(':communication')
78+
79+
testImplementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.14.0'
7880
}
7981

8082
tasks.named("shadowJar", ShadowJar) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package datadog.trace.bootstrap.otel.common.export;
2+
3+
/** A visitor to visit OpenTelemetry attributes. */
4+
public interface OtlpAttributeVisitor {
5+
6+
int STRING = 0; // AttributeType.STRING
7+
int BOOLEAN = 1; // AttributeType.BOOLEAN
8+
int LONG = 2; // AttributeType.LONG
9+
int DOUBLE = 3; // AttributeType.DOUBLE
10+
int STRING_ARRAY = 4; // AttributeType.STRING_ARRAY
11+
int BOOLEAN_ARRAY = 5; // AttributeType.BOOLEAN_ARRAY
12+
int LONG_ARRAY = 6; // AttributeType.LONG_ARRAY
13+
int DOUBLE_ARRAY = 7; // AttributeType.DOUBLE_ARRAY
14+
15+
/**
16+
* Visits an attribute.
17+
*
18+
* @param type the attribute type
19+
* @param key the attribute key
20+
* @param value the attribute value
21+
*/
22+
void visitAttribute(int type, String key, Object value);
23+
}
Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
package datadog.trace.bootstrap.otel.common.export;
2+
3+
import static java.nio.charset.StandardCharsets.UTF_8;
4+
5+
import datadog.communication.serialization.GenerationalUtf8Cache;
6+
import datadog.communication.serialization.SimpleUtf8Cache;
7+
import datadog.communication.serialization.StreamingBuffer;
8+
import datadog.trace.api.Config;
9+
import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
10+
import java.util.List;
11+
12+
/**
13+
* Provides optimized writers for OpenTelemetry's "common.proto" wire protocol.
14+
*
15+
* <p>Embedded message sizes are precomputed to avoid the need for temporary buffers.
16+
*/
17+
public final class OtlpCommonProto {
18+
private OtlpCommonProto() {}
19+
20+
// wire types supported in protobuf v3
21+
public static final int VARINT_WIRE_TYPE = 0;
22+
public static final int I64_WIRE_TYPE = 1;
23+
public static final int LEN_WIRE_TYPE = 2;
24+
public static final int I32_WIRE_TYPE = 5;
25+
26+
// use same cache approach for attribute keys as TraceMapperV0_4
27+
private static final SimpleUtf8Cache KEY_CACHE =
28+
Config.get().getTagNameUtf8CacheSize() > 0
29+
? new SimpleUtf8Cache(Config.get().getTagNameUtf8CacheSize())
30+
: null;
31+
32+
// use same cache approach for attribute values as TraceMapperV0_4
33+
private static final GenerationalUtf8Cache VALUE_CACHE =
34+
Config.get().getTagValueUtf8CacheSize() > 0
35+
? new GenerationalUtf8Cache(Config.get().getTagValueUtf8CacheSize())
36+
: null;
37+
38+
public static int sizeVarInt(int value) {
39+
return 1 + (31 - Integer.numberOfLeadingZeros(value)) / 7;
40+
}
41+
42+
public static int sizeVarInt(long value) {
43+
return 1 + (63 - Long.numberOfLeadingZeros(value)) / 7;
44+
}
45+
46+
public static void writeVarInt(StreamingBuffer buf, int value) {
47+
for (int i = 1, len = sizeVarInt(value); i < len; i++) {
48+
buf.put((byte) ((value & 0x7f) | 0x80));
49+
value >>>= 7;
50+
}
51+
buf.put((byte) value);
52+
}
53+
54+
public static void writeVarInt(StreamingBuffer buf, long value) {
55+
for (int i = 1, len = sizeVarInt(value); i < len; i++) {
56+
buf.put((byte) ((value & 0x7f) | 0x80));
57+
value >>>= 7;
58+
}
59+
buf.put((byte) value);
60+
}
61+
62+
public static void writeI32(StreamingBuffer buf, int value) {
63+
buf.putInt( // convert to little-endian
64+
(value & 0xff000000) >>> 24
65+
| (value & 0x00ff0000) >>> 8
66+
| (value & 0x0000ff00) << 8
67+
| (value & 0x000000ff) << 24);
68+
}
69+
70+
public static void writeI32(StreamingBuffer buf, float value) {
71+
writeI32(buf, Float.floatToRawIntBits(value));
72+
}
73+
74+
public static void writeI64(StreamingBuffer buf, long value) {
75+
buf.putLong( // convert to little-endian
76+
(value & 0xff00000000000000L) >>> 56
77+
| (value & 0x00ff000000000000L) >>> 40
78+
| (value & 0x0000ff0000000000L) >>> 24
79+
| (value & 0x000000ff00000000L) >>> 8
80+
| (value & 0x00000000ff000000L) << 8
81+
| (value & 0x0000000000ff0000L) << 24
82+
| (value & 0x000000000000ff00L) << 40
83+
| (value & 0x00000000000000ffL) << 56);
84+
}
85+
86+
public static void writeI64(StreamingBuffer buf, double value) {
87+
writeI64(buf, Double.doubleToRawLongBits(value));
88+
}
89+
90+
public static void writeString(StreamingBuffer buf, byte[] utf8) {
91+
writeVarInt(buf, utf8.length);
92+
buf.put(utf8);
93+
}
94+
95+
public static void writeString(StreamingBuffer buf, String value) {
96+
writeString(buf, value.getBytes(UTF_8));
97+
}
98+
99+
public static void writeTag(StreamingBuffer buf, int fieldNum, int wireType) {
100+
writeVarInt(buf, fieldNum << 3 | wireType);
101+
}
102+
103+
public static void writeInstrumentationScope(
104+
StreamingBuffer buf, OtelInstrumentationScope scope) {
105+
byte[] scopeNameUtf8 = scope.getName().getUtf8Bytes();
106+
int scopeSize = 1 + sizeVarInt(scopeNameUtf8.length) + scopeNameUtf8.length;
107+
byte[] scopeVersionUtf8 = null;
108+
if (scope.getVersion() != null) {
109+
scopeVersionUtf8 = scope.getVersion().getUtf8Bytes();
110+
scopeSize += 1 + sizeVarInt(scopeVersionUtf8.length) + scopeVersionUtf8.length;
111+
}
112+
writeVarInt(buf, scopeSize);
113+
writeTag(buf, 1, LEN_WIRE_TYPE);
114+
writeString(buf, scopeNameUtf8);
115+
if (scopeVersionUtf8 != null) {
116+
writeTag(buf, 2, LEN_WIRE_TYPE);
117+
writeString(buf, scopeVersionUtf8);
118+
}
119+
}
120+
121+
@SuppressWarnings("unchecked")
122+
public static void writeAttribute(StreamingBuffer buf, int type, String key, Object value) {
123+
byte[] keyUtf8 = keyUtf8(key);
124+
switch (type) {
125+
case OtlpAttributeVisitor.STRING:
126+
writeStringAttribute(buf, keyUtf8, valueUtf8((String) value));
127+
break;
128+
case OtlpAttributeVisitor.BOOLEAN:
129+
writeBooleanAttribute(buf, keyUtf8, (boolean) value);
130+
break;
131+
case OtlpAttributeVisitor.LONG:
132+
writeLongAttribute(buf, keyUtf8, (long) value);
133+
break;
134+
case OtlpAttributeVisitor.DOUBLE:
135+
writeDoubleAttribute(buf, keyUtf8, (double) value);
136+
break;
137+
case OtlpAttributeVisitor.STRING_ARRAY:
138+
byte[][] valueUtf8s =
139+
((List<String>) value).stream().map(OtlpCommonProto::valueUtf8).toArray(byte[][]::new);
140+
writeStringArrayAttribute(buf, keyUtf8, valueUtf8s);
141+
break;
142+
case OtlpAttributeVisitor.BOOLEAN_ARRAY:
143+
writeBooleanArrayAttribute(buf, keyUtf8, (List<Boolean>) value);
144+
break;
145+
case OtlpAttributeVisitor.LONG_ARRAY:
146+
writeLongArrayAttribute(buf, keyUtf8, (List<Long>) value);
147+
break;
148+
case OtlpAttributeVisitor.DOUBLE_ARRAY:
149+
writeDoubleArrayAttribute(buf, keyUtf8, (List<Double>) value);
150+
break;
151+
default:
152+
throw new IllegalArgumentException("Unknown attribute type: " + type);
153+
}
154+
}
155+
156+
private static byte[] keyUtf8(String key) {
157+
return KEY_CACHE != null ? KEY_CACHE.getUtf8(key) : key.getBytes(UTF_8);
158+
}
159+
160+
private static byte[] valueUtf8(String value) {
161+
return VALUE_CACHE != null ? VALUE_CACHE.getUtf8(value) : value.getBytes(UTF_8);
162+
}
163+
164+
private static void writeStringAttribute(StreamingBuffer buf, byte[] keyUtf8, byte[] valueUtf8) {
165+
int valueSize = 1 + sizeVarInt(valueUtf8.length) + valueUtf8.length;
166+
int keyValueSize =
167+
1 + sizeVarInt(keyUtf8.length) + keyUtf8.length + 1 + sizeVarInt(valueSize) + valueSize;
168+
writeVarInt(buf, keyValueSize);
169+
writeTag(buf, 1, LEN_WIRE_TYPE);
170+
writeVarInt(buf, keyUtf8.length);
171+
buf.put(keyUtf8);
172+
writeTag(buf, 2, LEN_WIRE_TYPE);
173+
writeVarInt(buf, valueSize);
174+
writeTag(buf, 1, LEN_WIRE_TYPE);
175+
writeVarInt(buf, valueUtf8.length);
176+
buf.put(valueUtf8);
177+
}
178+
179+
private static void writeBooleanAttribute(StreamingBuffer buf, byte[] keyUtf8, boolean value) {
180+
int keyValueSize = 1 + sizeVarInt(keyUtf8.length) + keyUtf8.length + 4;
181+
writeVarInt(buf, keyValueSize);
182+
writeTag(buf, 1, LEN_WIRE_TYPE);
183+
writeVarInt(buf, keyUtf8.length);
184+
buf.put(keyUtf8);
185+
writeTag(buf, 2, LEN_WIRE_TYPE);
186+
buf.put((byte) 2);
187+
writeTag(buf, 2, VARINT_WIRE_TYPE);
188+
buf.put((byte) (value ? 1 : 0));
189+
}
190+
191+
private static void writeLongAttribute(StreamingBuffer buf, byte[] keyUtf8, long value) {
192+
int valueSize = 1 + sizeVarInt(value);
193+
int keyValueSize =
194+
1 + sizeVarInt(keyUtf8.length) + keyUtf8.length + 1 + sizeVarInt(valueSize) + valueSize;
195+
writeVarInt(buf, keyValueSize);
196+
writeTag(buf, 1, LEN_WIRE_TYPE);
197+
writeVarInt(buf, keyUtf8.length);
198+
buf.put(keyUtf8);
199+
writeTag(buf, 2, LEN_WIRE_TYPE);
200+
writeVarInt(buf, valueSize);
201+
writeTag(buf, 3, VARINT_WIRE_TYPE);
202+
writeVarInt(buf, value);
203+
}
204+
205+
private static void writeDoubleAttribute(StreamingBuffer buf, byte[] keyUtf8, double value) {
206+
int keyValueSize = 1 + sizeVarInt(keyUtf8.length) + keyUtf8.length + 11;
207+
writeVarInt(buf, keyValueSize);
208+
writeTag(buf, 1, LEN_WIRE_TYPE);
209+
writeVarInt(buf, keyUtf8.length);
210+
buf.put(keyUtf8);
211+
writeTag(buf, 2, LEN_WIRE_TYPE);
212+
buf.put((byte) 9);
213+
writeTag(buf, 4, I64_WIRE_TYPE);
214+
writeI64(buf, value);
215+
}
216+
217+
private static void writeStringArrayAttribute(
218+
StreamingBuffer buf, byte[] keyUtf8, byte[][] valueUtf8s) {
219+
int[] elementSizes = new int[valueUtf8s.length];
220+
for (int i = 0; i < valueUtf8s.length; i++) {
221+
elementSizes[i] = 1 + sizeVarInt(valueUtf8s[i].length) + valueUtf8s[i].length;
222+
}
223+
int arraySize = 0;
224+
for (int elementSize : elementSizes) {
225+
arraySize += 1 + sizeVarInt(elementSize) + elementSize;
226+
}
227+
int valueSize = 1 + sizeVarInt(arraySize) + arraySize;
228+
int keyValueSize =
229+
1 + sizeVarInt(keyUtf8.length) + keyUtf8.length + 1 + sizeVarInt(valueSize) + valueSize;
230+
writeVarInt(buf, keyValueSize);
231+
writeTag(buf, 1, LEN_WIRE_TYPE);
232+
writeVarInt(buf, keyUtf8.length);
233+
buf.put(keyUtf8);
234+
writeTag(buf, 2, LEN_WIRE_TYPE);
235+
writeVarInt(buf, valueSize);
236+
writeTag(buf, 5, LEN_WIRE_TYPE);
237+
writeVarInt(buf, arraySize);
238+
for (int i = 0; i < elementSizes.length; i++) {
239+
writeTag(buf, 1, LEN_WIRE_TYPE);
240+
writeVarInt(buf, elementSizes[i]);
241+
writeTag(buf, 1, LEN_WIRE_TYPE);
242+
writeVarInt(buf, valueUtf8s[i].length);
243+
buf.put(valueUtf8s[i]);
244+
}
245+
}
246+
247+
private static void writeBooleanArrayAttribute(
248+
StreamingBuffer buf, byte[] keyUtf8, List<Boolean> values) {
249+
int arraySize = 4 * values.size();
250+
int valueSize = 1 + sizeVarInt(arraySize) + arraySize;
251+
int keyValueSize =
252+
1 + sizeVarInt(keyUtf8.length) + keyUtf8.length + 1 + sizeVarInt(valueSize) + valueSize;
253+
writeVarInt(buf, keyValueSize);
254+
writeTag(buf, 1, LEN_WIRE_TYPE);
255+
writeVarInt(buf, keyUtf8.length);
256+
buf.put(keyUtf8);
257+
writeTag(buf, 2, LEN_WIRE_TYPE);
258+
writeVarInt(buf, valueSize);
259+
writeTag(buf, 5, LEN_WIRE_TYPE);
260+
writeVarInt(buf, arraySize);
261+
for (boolean value : values) {
262+
writeTag(buf, 1, LEN_WIRE_TYPE);
263+
buf.put((byte) 2);
264+
writeTag(buf, 2, VARINT_WIRE_TYPE);
265+
buf.put((byte) (value ? 1 : 0));
266+
}
267+
}
268+
269+
private static void writeLongArrayAttribute(
270+
StreamingBuffer buf, byte[] keyUtf8, List<Long> values) {
271+
int[] elementSizes = new int[values.size()];
272+
for (int i = 0; i < values.size(); i++) {
273+
elementSizes[i] = 1 + sizeVarInt(values.get(i));
274+
}
275+
int arraySize = 0;
276+
for (int elementSize : elementSizes) {
277+
arraySize += 1 + sizeVarInt(elementSize) + elementSize;
278+
}
279+
int valueSize = 1 + sizeVarInt(arraySize) + arraySize;
280+
int keyValueSize =
281+
1 + sizeVarInt(keyUtf8.length) + keyUtf8.length + 1 + sizeVarInt(valueSize) + valueSize;
282+
writeVarInt(buf, keyValueSize);
283+
writeTag(buf, 1, LEN_WIRE_TYPE);
284+
writeVarInt(buf, keyUtf8.length);
285+
buf.put(keyUtf8);
286+
writeTag(buf, 2, LEN_WIRE_TYPE);
287+
writeVarInt(buf, valueSize);
288+
writeTag(buf, 5, LEN_WIRE_TYPE);
289+
writeVarInt(buf, arraySize);
290+
for (int i = 0; i < elementSizes.length; i++) {
291+
writeTag(buf, 1, LEN_WIRE_TYPE);
292+
writeVarInt(buf, elementSizes[i]);
293+
writeTag(buf, 3, VARINT_WIRE_TYPE);
294+
writeVarInt(buf, values.get(i));
295+
}
296+
}
297+
298+
private static void writeDoubleArrayAttribute(
299+
StreamingBuffer buf, byte[] keyUtf8, List<Double> values) {
300+
int arraySize = 11 * values.size();
301+
int valueSize = 1 + sizeVarInt(arraySize) + arraySize;
302+
int keyValueSize =
303+
1 + sizeVarInt(keyUtf8.length) + keyUtf8.length + 1 + sizeVarInt(valueSize) + valueSize;
304+
writeVarInt(buf, keyValueSize);
305+
writeTag(buf, 1, LEN_WIRE_TYPE);
306+
writeVarInt(buf, keyUtf8.length);
307+
buf.put(keyUtf8);
308+
writeTag(buf, 2, LEN_WIRE_TYPE);
309+
writeVarInt(buf, valueSize);
310+
writeTag(buf, 5, LEN_WIRE_TYPE);
311+
writeVarInt(buf, arraySize);
312+
for (double value : values) {
313+
writeTag(buf, 1, LEN_WIRE_TYPE);
314+
buf.put((byte) 9);
315+
writeTag(buf, 4, I64_WIRE_TYPE);
316+
writeI64(buf, value);
317+
}
318+
}
319+
}

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/data/OtelAggregator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ final void recordLong(long value) {
1818
empty = false;
1919
}
2020

21-
final OtelPoint collect() {
21+
final OtlpDataPoint collect() {
2222
return doCollect(false);
2323
}
2424

25-
final OtelPoint collectAndReset() {
26-
OtelPoint point = doCollect(true);
25+
final OtlpDataPoint collectAndReset() {
26+
OtlpDataPoint point = doCollect(true);
2727
empty = true;
2828
return point;
2929
}
@@ -36,5 +36,5 @@ void doRecordLong(long value) {
3636
throw new UnsupportedOperationException();
3737
}
3838

39-
abstract OtelPoint doCollect(boolean reset);
39+
abstract OtlpDataPoint doCollect(boolean reset);
4040
}

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/data/OtelDoubleSum.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ void doRecordDouble(double value) {
1111
}
1212

1313
@Override
14-
OtelPoint doCollect(boolean reset) {
15-
return new OtelDoublePoint(reset ? total.sumThenReset() : total.sum());
14+
OtlpDataPoint doCollect(boolean reset) {
15+
return new OtlpDoublePoint(reset ? total.sumThenReset() : total.sum());
1616
}
1717
}

0 commit comments

Comments
 (0)