Skip to content

Commit f007c41

Browse files
authored
Provide canned resource message for OpenTelemetry's "resource.proto" wire protocol (#10957)
Provide canned resource message for OpenTelemetry's "resource.proto" wire protocol review feedback Co-authored-by: stuart.mcculloch <stuart.mcculloch@datadoghq.com>
1 parent c0ce9c5 commit f007c41

5 files changed

Lines changed: 360 additions & 0 deletions

File tree

communication/src/main/java/datadog/communication/serialization/GrowableBuffer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,18 @@ public GrowableBuffer(int initialCapacity) {
1717
this.buffer = ByteBuffer.allocate(initialCapacity);
1818
}
1919

20+
/** Flips the buffer and returns a new slice which shares the buffered content. */
2021
public ByteBuffer slice() {
2122
buffer.flip();
2223
return buffer.slice();
2324
}
2425

26+
/** Flips the buffer and returns the buffered content. */
27+
public ByteBuffer flip() {
28+
buffer.flip();
29+
return buffer;
30+
}
31+
2532
public int messageCount() {
2633
return messageCount;
2734
}

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpCommonProto.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
import static java.nio.charset.StandardCharsets.UTF_8;
44

55
import datadog.communication.serialization.GenerationalUtf8Cache;
6+
import datadog.communication.serialization.GrowableBuffer;
67
import datadog.communication.serialization.SimpleUtf8Cache;
78
import datadog.communication.serialization.StreamingBuffer;
89
import datadog.trace.api.Config;
910
import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
11+
import java.nio.ByteBuffer;
1012
import java.util.List;
1113

1214
/**
@@ -43,6 +45,14 @@ public static int sizeVarInt(long value) {
4345
return 1 + (63 - Long.numberOfLeadingZeros(value)) / 7;
4446
}
4547

48+
public static void writeVarInt(ByteBuffer buf, int value) {
49+
for (int i = 1, len = sizeVarInt(value); i < len; i++) {
50+
buf.put((byte) ((value & 0x7f) | 0x80));
51+
value >>>= 7;
52+
}
53+
buf.put((byte) value);
54+
}
55+
4656
public static void writeVarInt(StreamingBuffer buf, int value) {
4757
for (int i = 1, len = sizeVarInt(value); i < len; i++) {
4858
buf.put((byte) ((value & 0x7f) | 0x80));
@@ -96,10 +106,32 @@ public static void writeString(StreamingBuffer buf, String value) {
96106
writeString(buf, value.getBytes(UTF_8));
97107
}
98108

109+
public static int sizeTag(int fieldNum) {
110+
return sizeVarInt(fieldNum << 3);
111+
}
112+
113+
public static void writeTag(ByteBuffer buf, int fieldNum, int wireType) {
114+
writeVarInt(buf, fieldNum << 3 | wireType);
115+
}
116+
99117
public static void writeTag(StreamingBuffer buf, int fieldNum, int wireType) {
100118
writeVarInt(buf, fieldNum << 3 | wireType);
101119
}
102120

121+
public static byte[] recordMessage(GrowableBuffer buf, int fieldNum) {
122+
try {
123+
ByteBuffer data = buf.flip();
124+
int dataSize = data.remaining();
125+
ByteBuffer message = ByteBuffer.allocate(sizeTag(fieldNum) + sizeVarInt(dataSize) + dataSize);
126+
writeTag(message, fieldNum, LEN_WIRE_TYPE);
127+
writeVarInt(message, dataSize);
128+
message.put(data);
129+
return message.array();
130+
} finally {
131+
buf.reset();
132+
}
133+
}
134+
103135
public static void writeInstrumentationScope(
104136
StreamingBuffer buf, OtelInstrumentationScope scope) {
105137
byte[] scopeNameUtf8 = scope.getName().getUtf8Bytes();
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package datadog.trace.bootstrap.otel.common.export;
2+
3+
import static datadog.trace.bootstrap.otel.common.export.OtlpAttributeVisitor.STRING;
4+
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.LEN_WIRE_TYPE;
5+
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.recordMessage;
6+
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeAttribute;
7+
import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeTag;
8+
9+
import datadog.communication.serialization.GrowableBuffer;
10+
import datadog.communication.serialization.StreamingBuffer;
11+
import datadog.trace.api.Config;
12+
import java.util.Arrays;
13+
import java.util.HashSet;
14+
import java.util.Locale;
15+
import java.util.Set;
16+
17+
/** Provides a canned message for OpenTelemetry's "resource.proto" wire protocol. */
18+
public final class OtlpResourceProto {
19+
private OtlpResourceProto() {}
20+
21+
private static final byte[] RESOURCE_MESSAGE = buildResourceMessage(Config.get());
22+
23+
private static final Set<String> IGNORED_GLOBAL_TAGS =
24+
new HashSet<>(
25+
Arrays.asList(
26+
"service",
27+
"env",
28+
"version",
29+
"service.name",
30+
"deployment.environment.name",
31+
"service.version"));
32+
33+
/** Writes the resource message in protobuf format to the given buffer. */
34+
public static void writeResourceMessage(StreamingBuffer buf) {
35+
buf.put(RESOURCE_MESSAGE);
36+
}
37+
38+
static byte[] buildResourceMessage(Config config) {
39+
GrowableBuffer buf = new GrowableBuffer(512);
40+
41+
String serviceName = config.getServiceName();
42+
String env = config.getEnv();
43+
String version = config.getVersion();
44+
45+
writeResourceAttribute(buf, "service.name", serviceName);
46+
if (!env.isEmpty()) {
47+
writeResourceAttribute(buf, "deployment.environment.name", env);
48+
}
49+
if (!version.isEmpty()) {
50+
writeResourceAttribute(buf, "service.version", version);
51+
}
52+
53+
config
54+
.getGlobalTags()
55+
.forEach(
56+
(key, value) -> {
57+
// ignore datadog tags and their otel equivalents that we map above
58+
if (!IGNORED_GLOBAL_TAGS.contains(key.toLowerCase(Locale.ROOT))) {
59+
writeResourceAttribute(buf, key, value);
60+
}
61+
});
62+
63+
return recordMessage(buf, 1);
64+
}
65+
66+
private static void writeResourceAttribute(StreamingBuffer buf, String key, String value) {
67+
writeTag(buf, 1, LEN_WIRE_TYPE);
68+
writeAttribute(buf, STRING, key, value);
69+
}
70+
}
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
package datadog.trace.bootstrap.otel.common.export;
2+
3+
import static datadog.trace.api.config.GeneralConfig.ENV;
4+
import static datadog.trace.api.config.GeneralConfig.SERVICE_NAME;
5+
import static datadog.trace.api.config.GeneralConfig.TAGS;
6+
import static datadog.trace.api.config.GeneralConfig.VERSION;
7+
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
import static org.junit.jupiter.api.Assertions.assertTrue;
9+
10+
import com.google.protobuf.CodedInputStream;
11+
import com.google.protobuf.WireFormat;
12+
import datadog.trace.api.Config;
13+
import java.io.IOException;
14+
import java.util.LinkedHashMap;
15+
import java.util.Map;
16+
import java.util.Properties;
17+
import java.util.stream.Stream;
18+
import org.junit.jupiter.params.ParameterizedTest;
19+
import org.junit.jupiter.params.provider.Arguments;
20+
import org.junit.jupiter.params.provider.MethodSource;
21+
22+
/**
23+
* Tests for {@link OtlpResourceProto#buildResourceMessage}.
24+
*
25+
* <p>Each test creates a {@link Config} from a {@link Properties} instance, calls {@link
26+
* OtlpResourceProto#buildResourceMessage}, then extracts the byte array and verifies its content
27+
* against the OpenTelemetry protobuf encoding defined in {@code
28+
* opentelemetry/proto/resource/v1/resource.proto}.
29+
*
30+
* <p>Relevant proto field numbers:
31+
*
32+
* <pre>
33+
* Resource { repeated KeyValue attributes = 1; }
34+
* KeyValue { string key = 1; AnyValue value = 2; }
35+
* AnyValue { string string_value = 1; }
36+
* </pre>
37+
*/
38+
class OtlpResourceProtoTest {
39+
40+
// ── test data ─────────────────────────────────────────────────────────────
41+
42+
private static Properties props(String... keyValues) {
43+
Properties props = new Properties();
44+
for (int i = 0; i < keyValues.length; i += 2) {
45+
props.setProperty(keyValues[i], keyValues[i + 1]);
46+
}
47+
return props;
48+
}
49+
50+
private static Map<String, String> attrs(String... keyValues) {
51+
Map<String, String> map = new LinkedHashMap<>();
52+
for (int i = 0; i < keyValues.length; i += 2) {
53+
map.put(keyValues[i], keyValues[i + 1]);
54+
}
55+
return map;
56+
}
57+
58+
static Stream<Arguments> resourceMessageCases() {
59+
return Stream.of(
60+
// service not set: should use the auto-detected name
61+
Arguments.of(
62+
"service not set, no env, no version, no tags",
63+
props(),
64+
attrs("service.name", Config.get().getServiceName())),
65+
// custom service name
66+
Arguments.of(
67+
"custom service name, no env, no version, no tags",
68+
props(SERVICE_NAME, "my-service"),
69+
attrs("service.name", "my-service")),
70+
// env set to empty string: no deployment.environment.name written;
71+
Arguments.of(
72+
"env set to empty string",
73+
props(SERVICE_NAME, "my-service", ENV, ""),
74+
attrs("service.name", "my-service")),
75+
// env set to non-empty value: deployment.environment.name written;
76+
Arguments.of(
77+
"env set to non-empty value",
78+
props(SERVICE_NAME, "my-service", ENV, "prod"),
79+
attrs("service.name", "my-service", "deployment.environment.name", "prod")),
80+
// version set to empty string: no service.version written;
81+
Arguments.of(
82+
"version set to empty string",
83+
props(SERVICE_NAME, "my-service", VERSION, ""),
84+
attrs("service.name", "my-service")),
85+
// version set to non-empty value: service.version written;
86+
Arguments.of(
87+
"version set to non-empty value",
88+
props(SERVICE_NAME, "my-service", VERSION, "1.0.0"),
89+
attrs("service.name", "my-service", "service.version", "1.0.0")),
90+
// tags as comma-separated key:value pairs (no env or version)
91+
Arguments.of(
92+
"tags as comma-separated key:value pairs",
93+
props(SERVICE_NAME, "my-service", TAGS, "region:us-east,team:platform"),
94+
attrs(
95+
"service.name", "my-service",
96+
"region", "us-east",
97+
"team", "platform")),
98+
// all config values set together
99+
Arguments.of(
100+
"service, env, version, and tags all set",
101+
props(
102+
SERVICE_NAME,
103+
"my-service",
104+
ENV,
105+
"staging",
106+
VERSION,
107+
"2.0.0",
108+
TAGS,
109+
"region:eu-west,"
110+
+ "service:ignored-service,"
111+
+ "env:ignored-env,"
112+
+ "version:ignored-version,"
113+
+ "SERVICE:ignored-service,"
114+
+ "ENV:ignored-env,"
115+
+ "VERSION:ignored-version,"
116+
+ "service.name:ignored-service,"
117+
+ "deployment.environment.name:ignored-env,"
118+
+ "service.version:ignored-version,"
119+
+ "SERVICE.NAME:ignored-service,"
120+
+ "DEPLOYMENT.ENVIRONMENT.NAME:ignored-env,"
121+
+ "SERVICE.VERSION:ignored-version"),
122+
attrs(
123+
"service.name", "my-service",
124+
"deployment.environment.name", "staging",
125+
"service.version", "2.0.0",
126+
"region", "eu-west")));
127+
}
128+
129+
// ── test ─────────────────────────────────────────────────────────────────
130+
131+
@ParameterizedTest(name = "{0}")
132+
@MethodSource("resourceMessageCases")
133+
void testBuildResourceMessage(
134+
String caseName, Properties properties, Map<String, String> expectedAttributes)
135+
throws IOException {
136+
Config config = Config.get(properties);
137+
byte[] bytes = OtlpResourceProto.buildResourceMessage(config);
138+
139+
Map<String, String> actualAttributes = parseResourceAttributes(bytes);
140+
assertEquals(expectedAttributes, actualAttributes, "For case: " + caseName);
141+
}
142+
143+
// ── parsing helpers ───────────────────────────────────────────────────────
144+
145+
/**
146+
* Parses the resource message bytes into an attribute map while validating the protobuf wire
147+
* format (field numbers and wire types) of every field read.
148+
*
149+
* <p>{@code buildResourceMessage} returns a length-prefixed message with an outer tag (field 1,
150+
* LEN wire type) followed by the Resource body size and body. Read the outer tag, then iterate
151+
* over all {@code Resource.attributes} (field 1, LEN wire type). Each attribute is a {@code
152+
* KeyValue} whose {@code value} is an {@code AnyValue} containing a {@code string_value}.
153+
*/
154+
private static Map<String, String> parseResourceAttributes(byte[] bytes) throws IOException {
155+
// Read the outer tag (field 1, LEN wire type) that wraps the Resource body
156+
CodedInputStream outer = CodedInputStream.newInstance(bytes);
157+
int outerTag = outer.readTag();
158+
assertEquals(1, WireFormat.getTagFieldNumber(outerTag), "outer field is Resource (field 1)");
159+
assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(outerTag));
160+
CodedInputStream resource = outer.readBytes().newCodedInput();
161+
162+
Map<String, String> attributes = new LinkedHashMap<>();
163+
while (!resource.isAtEnd()) {
164+
// Each attribute is Resource.attributes (field 1, LEN wire type)
165+
int tag = resource.readTag();
166+
assertEquals(1, WireFormat.getTagFieldNumber(tag), "Resource.attributes is field 1");
167+
assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(tag));
168+
169+
// Read the full KeyValue body
170+
CodedInputStream kv = resource.readBytes().newCodedInput();
171+
172+
String key = readKeyField(kv);
173+
CodedInputStream av = readAnyValueField(kv);
174+
175+
// Read AnyValue.string_value (field 1, LEN)
176+
int avTag = av.readTag();
177+
assertEquals(1, WireFormat.getTagFieldNumber(avTag), "AnyValue.string_value is field 1");
178+
assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(avTag));
179+
String value = av.readString();
180+
assertTrue(av.isAtEnd(), "no extra fields in AnyValue");
181+
assertTrue(kv.isAtEnd(), "no extra fields in KeyValue");
182+
183+
attributes.put(key, value);
184+
}
185+
return attributes;
186+
}
187+
188+
/** Reads the {@code KeyValue.key} field (field 1, LEN) and returns the string value. */
189+
private static String readKeyField(CodedInputStream kv) throws IOException {
190+
int tag = kv.readTag();
191+
assertEquals(1, WireFormat.getTagFieldNumber(tag), "KeyValue.key is field 1");
192+
assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(tag));
193+
return kv.readString();
194+
}
195+
196+
/**
197+
* Reads the {@code KeyValue.value} field (field 2, LEN) and returns a stream over the {@code
198+
* AnyValue} body.
199+
*/
200+
private static CodedInputStream readAnyValueField(CodedInputStream kv) throws IOException {
201+
int tag = kv.readTag();
202+
assertEquals(2, WireFormat.getTagFieldNumber(tag), "KeyValue.value is field 2");
203+
assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(tag));
204+
return kv.readBytes().newCodedInput();
205+
}
206+
}

0 commit comments

Comments
 (0)