Skip to content

Commit 9aedff2

Browse files
Efficient Trace Payload Protocol (v1 protocol) (#10801)
# What Does This Do - Implements support for the Datadog Agent Efficient Trace Payload Protocol `v1.0` (`v1.0/traces` endpoint) in the Java tracer. - Introduces `TraceMapperV1`, a new serializer that encodes traces in the `v1 msgpack` format with a flat attribute model, string table deduplication, and typed attribute values. - Adds a `ProtocolVersion` enum to cleanly represent `v0.4`, `v0.5`, and `v1.0` protocols including their fallback endpoint chains. - Updates `DDAgentFeaturesDiscovery` to probe endpoints based on the configured protocol version. # Motivation Protocol v1 is a more efficient wire format: it deduplicates repeated strings via a string table, uses typed attribute values (string/bool/float/int/bytes/array). This reduces payload size and provides a cleaner foundation for future span model evolution. The implementation falls back gracefully to `v0.4` when the agent does not advertise `v1` support. # Additional Notes Protocol v1 [spec](https://docs.google.com/document/d/1hNS6anKYutOYW-nmR759UlKXUdT6H0mRwVt7_L70ESc/edit?pli=1&tab=t.0#heading=h.sf675dts2e4d) System test passed both on GitHub and GitLab. Co-authored-by: alexey.kuznetsov <alexey.kuznetsov@datadoghq.com>
1 parent d4d2069 commit 9aedff2

51 files changed

Lines changed: 4068 additions & 292 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static datadog.communication.http.OkHttpUtils.msgpackRequestBodyOf;
55
import static datadog.communication.http.OkHttpUtils.prepareRequest;
66
import static datadog.communication.serialization.msgpack.MsgPackWriter.FIXARRAY;
7+
import static datadog.trace.api.ProtocolVersion.V0_4;
78
import static java.util.Collections.emptyMap;
89
import static java.util.Collections.emptySet;
910
import static java.util.Collections.singletonList;
@@ -17,6 +18,7 @@
1718
import datadog.metrics.api.Recording;
1819
import datadog.metrics.impl.statsd.DDAgentStatsDClientManager;
1920
import datadog.trace.api.BaseHash;
21+
import datadog.trace.api.ProtocolVersion;
2022
import datadog.trace.api.telemetry.LogCollector;
2123
import datadog.trace.util.Strings;
2224
import java.nio.ByteBuffer;
@@ -50,6 +52,7 @@ public class DDAgentFeaturesDiscovery implements DroppingPolicy {
5052
public static final String V03_ENDPOINT = "v0.3/traces";
5153
public static final String V04_ENDPOINT = "v0.4/traces";
5254
public static final String V05_ENDPOINT = "v0.5/traces";
55+
public static final String V1_ENDPOINT = "v1.0/traces";
5356

5457
public static final String V06_METRICS_ENDPOINT = "v0.6/stats";
5558
public static final String V07_CONFIG_ENDPOINT = "v0.7/config";
@@ -72,7 +75,7 @@ public class DDAgentFeaturesDiscovery implements DroppingPolicy {
7275
private final OkHttpClient client;
7376
private final HttpUrl agentBaseUrl;
7477
private final Recording discoveryTimer;
75-
private final String[] traceEndpoints;
78+
private final ProtocolVersion protocolVersion;
7679
private final String[] metricsEndpoints = {V06_METRICS_ENDPOINT};
7780
private final String[] configEndpoints = {V07_CONFIG_ENDPOINT};
7881
private final boolean metricsEnabled;
@@ -107,15 +110,12 @@ public DDAgentFeaturesDiscovery(
107110
OkHttpClient client,
108111
Monitoring monitoring,
109112
HttpUrl agentUrl,
110-
boolean enableV05Traces,
113+
ProtocolVersion protocolVersion,
111114
boolean metricsEnabled) {
112115
this.client = client;
113116
this.agentBaseUrl = agentUrl;
114117
this.metricsEnabled = metricsEnabled;
115-
this.traceEndpoints =
116-
enableV05Traces
117-
? new String[] {V05_ENDPOINT, V04_ENDPOINT, V03_ENDPOINT}
118-
: new String[] {V04_ENDPOINT, V03_ENDPOINT};
118+
this.protocolVersion = protocolVersion != null ? protocolVersion : V0_4;
119119
this.discoveryTimer = monitoring.newTimer("trace.agent.discovery.time");
120120
this.discoveryState = new State();
121121
}
@@ -173,10 +173,10 @@ private void doDiscovery(State newState) {
173173

174174
// don't want to rewire the traces pipeline
175175
if (null == newState.traceEndpoint) {
176-
newState.traceEndpoint = probeTracesEndpoint(newState, traceEndpoints);
176+
newState.traceEndpoint = probeTracesEndpoint(newState, protocolVersion.endpointsToProbe());
177177
} else if (newState.state == null || newState.state.isEmpty()) {
178178
// Still need to probe so that state is correctly assigned
179-
probeTracesEndpoint(newState, new String[] {newState.traceEndpoint});
179+
probeTracesEndpoint(newState, singletonList(newState.traceEndpoint));
180180
}
181181
}
182182

@@ -194,7 +194,7 @@ private void doDiscovery(State newState) {
194194
}
195195
}
196196

197-
private String probeTracesEndpoint(State newState, String[] endpoints) {
197+
private String probeTracesEndpoint(State newState, List<String> endpoints) {
198198
for (String candidate : endpoints) {
199199
try (Response response =
200200
client
@@ -253,7 +253,7 @@ private boolean processInfoResponse(State newState, String response) {
253253
// This is done outside of the loop to set metricsEndpoint to null if not found
254254
newState.metricsEndpoint = foundMetricsEndpoint;
255255

256-
for (String endpoint : traceEndpoints) {
256+
for (String endpoint : protocolVersion.endpointsToProbe()) {
257257
if (containsEndpoint(endpoints, endpoint)) {
258258
newState.traceEndpoint = endpoint;
259259
break;

communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ public DDAgentFeaturesDiscovery featuresDiscovery(Config config) {
173173
agentHttpClient,
174174
monitoring,
175175
agentUrl,
176-
config.isTraceAgentV05Enabled(),
176+
config.getProtocolVersion(),
177177
config.isTracerMetricsEnabled());
178178

179179
if (paused) {

communication/src/main/java/datadog/communication/serialization/Writable.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ public interface Writable {
2727

2828
void writeBinary(byte[] binary, int offset, int length);
2929

30+
/**
31+
* Encodes 128 bits as binary.
32+
*
33+
* @param hi the high-order 64 bits.
34+
* @param lo The low-order 64 bits.
35+
*/
36+
void writeBinary(long hi, long lo);
37+
3038
/**
3139
* Start a part of the message containing key-value pairs
3240
*

communication/src/main/java/datadog/communication/serialization/msgpack/MsgPackWriter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,13 @@ public void writeBinary(byte[] binary, int offset, int length) {
213213
buffer.put(binary, offset, length);
214214
}
215215

216+
@Override
217+
public void writeBinary(long hi, long lo) {
218+
writeBinaryHeader(16);
219+
buffer.putLong(hi);
220+
buffer.putLong(lo);
221+
}
222+
216223
@Override
217224
public void writeBinary(ByteBuffer binary) {
218225
ByteBuffer slice = binary.slice();

communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@ import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V04_ENDPOIN
55
import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V05_ENDPOINT
66
import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V06_METRICS_ENDPOINT
77
import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V07_CONFIG_ENDPOINT
8+
import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V1_ENDPOINT
89
import static datadog.communication.http.OkHttpUtils.DATADOG_CONTAINER_ID
910
import static datadog.communication.http.OkHttpUtils.DATADOG_CONTAINER_TAGS_HASH
11+
import static datadog.trace.api.ProtocolVersion.V0_4
12+
import static datadog.trace.api.ProtocolVersion.V0_5
13+
import static datadog.trace.api.ProtocolVersion.V1_0
1014

1115
import datadog.common.container.ContainerInfo
1216
import datadog.metrics.api.Monitoring
@@ -51,7 +55,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
5155
def "test parse /info response"() {
5256
setup:
5357
OkHttpClient client = Mock(OkHttpClient)
54-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, v05Enabled, true)
58+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, protocol, true)
5559

5660
when: "/info available"
5761
features.discover()
@@ -77,15 +81,33 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
7781
0 * _
7882

7983
where:
80-
v05Enabled | expectedTraceEndpoint
81-
false | V04_ENDPOINT
82-
true | V05_ENDPOINT
84+
protocol | expectedTraceEndpoint
85+
V0_4 | V04_ENDPOINT
86+
V0_5 | V05_ENDPOINT
87+
V1_0 | V1_ENDPOINT
88+
}
89+
90+
def "null protocol version falls back to v0.4 trace endpoints"() {
91+
setup:
92+
OkHttpClient client = Mock(OkHttpClient)
93+
DDAgentFeaturesDiscovery features =
94+
new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, null, true)
95+
96+
when:
97+
features.discover()
98+
99+
then:
100+
1 * client.newCall({ Request request -> request.url().toString() == "http://localhost:8125/info" }) >> { Request request -> infoResponse(request, "{}") }
101+
0 * client.newCall({ Request request -> request.url().toString() == "http://localhost:8125/v0.5/traces" }) >> { Request request -> success(request) }
102+
1 * client.newCall({ Request request -> request.url().toString() == "http://localhost:8125/v0.4/traces" }) >> { Request request -> success(request) }
103+
features.getTraceEndpoint() == V04_ENDPOINT
104+
0 * _
83105
}
84106

85107
def "Should change discovery state atomically after discovery happened"() {
86108
setup:
87109
OkHttpClient client = Mock(OkHttpClient)
88-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true)
110+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true)
89111

90112
when: "/info available"
91113
features.discover()
@@ -111,7 +133,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
111133
def "test parse /info response with discoverIfOutdated"() {
112134
setup:
113135
OkHttpClient client = Mock(OkHttpClient)
114-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true)
136+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true)
115137

116138
when: "/info available"
117139
features.discoverIfOutdated()
@@ -139,7 +161,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
139161
def "test parse /info response with client dropping"() {
140162
setup:
141163
OkHttpClient client = Mock(OkHttpClient)
142-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true)
164+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true)
143165

144166
when: "/info available"
145167
features.discover()
@@ -157,7 +179,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
157179
def "test parse /info response with data streams unavailable"() {
158180
setup:
159181
OkHttpClient client = Mock(OkHttpClient)
160-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true)
182+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true)
161183

162184
when: "/info available"
163185
features.discover()
@@ -176,7 +198,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
176198
def "test parse /info response with long running spans available"() {
177199
setup:
178200
OkHttpClient client = Mock(OkHttpClient)
179-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true)
201+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true)
180202

181203
when: "/info available"
182204
features.discover()
@@ -190,7 +212,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
190212
def "test fallback when /info empty"() {
191213
setup:
192214
OkHttpClient client = Mock(OkHttpClient)
193-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, false, true)
215+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_4, true)
194216

195217
when: "/info is empty"
196218
features.discover()
@@ -212,7 +234,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
212234
def "test fallback when /info not found"() {
213235
setup:
214236
OkHttpClient client = Mock(OkHttpClient)
215-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true)
237+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true)
216238

217239
when: "/info unavailable"
218240
features.discover()
@@ -234,7 +256,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
234256
def "test fallback when /info not found and agent returns ok"() {
235257
setup:
236258
OkHttpClient client = Mock(OkHttpClient)
237-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true)
259+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true)
238260

239261
when: "/info unavailable"
240262
features.discover()
@@ -254,7 +276,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
254276
def "test fallback when /info not found and v0.5 disabled"() {
255277
setup:
256278
OkHttpClient client = Mock(OkHttpClient)
257-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, false, true)
279+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_4, true)
258280

259281
when: "/info unavailable"
260282
features.discover()
@@ -275,7 +297,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
275297
def "test fallback when /info not found and v0.5 unavailable agent side"() {
276298
setup:
277299
OkHttpClient client = Mock(OkHttpClient)
278-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true)
300+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true)
279301

280302
when: "/info unavailable"
281303
features.discover()
@@ -296,7 +318,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
296318
def "test fallback on very old agent"() {
297319
setup:
298320
OkHttpClient client = Mock(OkHttpClient)
299-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true)
321+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true)
300322

301323
when: "/info unavailable"
302324
features.discover()
@@ -318,7 +340,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
318340
def "disabling metrics disables metrics and dropping"() {
319341
setup:
320342
OkHttpClient client = Mock(OkHttpClient)
321-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, false)
343+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, false)
322344

323345
when: "/info unavailable"
324346
features.discover()
@@ -354,7 +376,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
354376
def "discovery of metrics endpoint after agent upgrade enables dropping and metrics"() {
355377
setup:
356378
OkHttpClient client = Mock(OkHttpClient)
357-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, false, true)
379+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_4, true)
358380

359381
when: "/info unavailable"
360382
features.discover()
@@ -382,7 +404,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
382404
def "disappearance of info endpoint after agent downgrade disables metrics and dropping"() {
383405
setup:
384406
OkHttpClient client = Mock(OkHttpClient)
385-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, false, true)
407+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_4, true)
386408

387409
when: "/info available"
388410
features.discover()
@@ -411,7 +433,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
411433
def "disappearance of metrics endpoint after agent downgrade disables metrics and dropping"() {
412434
setup:
413435
OkHttpClient client = Mock(OkHttpClient)
414-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, false, true)
436+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_4, true)
415437

416438
when: "/info available"
417439
features.discover()
@@ -441,7 +463,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
441463
def "test parse /info response with telemetry proxy"() {
442464
setup:
443465
OkHttpClient client = Mock(OkHttpClient)
444-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true)
466+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true)
445467

446468
when: "/info available"
447469
features.discover()
@@ -458,7 +480,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
458480
def "test parse /info response with old EVP proxy"() {
459481
setup:
460482
OkHttpClient client = Mock(OkHttpClient)
461-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true)
483+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true)
462484

463485
when: "/info available"
464486
features.discover()
@@ -477,7 +499,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
477499
def "test parse /info response with peer tag back propagation"() {
478500
setup:
479501
OkHttpClient client = Mock(OkHttpClient)
480-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true)
502+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true)
481503

482504
when: "/info available"
483505
features.discover()
@@ -510,7 +532,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
510532
def "test metrics disabled for agent version below 7.65"() {
511533
setup:
512534
OkHttpClient client = Mock(OkHttpClient)
513-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true)
535+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true)
514536

515537
when: "agent version is below 7.65"
516538
features.discover()
@@ -544,7 +566,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
544566
def "test metrics disabled for agent with unparseable version"() {
545567
setup:
546568
OkHttpClient client = Mock(OkHttpClient)
547-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true)
569+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true)
548570

549571
when: "agent version is unparseable"
550572
features.discover()
@@ -570,7 +592,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
570592
def "should send container id as header on the info request and parse the hash in the response"() {
571593
setup:
572594
OkHttpClient client = Mock(OkHttpClient)
573-
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true)
595+
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true)
574596
def oldContainerId = ContainerInfo.get().getContainerId()
575597
def oldContainerTagsHash = ContainerInfo.get().getContainerTagsHash()
576598
ContainerInfo.get().setContainerId("test")

communication/src/test/groovy/datadog/communication/ddagent/SharedCommunicationsObjectsSpecification.groovy

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package datadog.communication.ddagent
22

3+
import static datadog.trace.api.ProtocolVersion.V0_4
4+
import static datadog.trace.api.config.TracerConfig.AGENT_HOST
5+
36
import datadog.metrics.api.Monitoring
47
import datadog.trace.api.Config
58
import datadog.trace.test.util.DDSpecification
69
import okhttp3.HttpUrl
710
import okhttp3.OkHttpClient
811

9-
import static datadog.trace.api.config.TracerConfig.AGENT_HOST
10-
1112
class SharedCommunicationsObjectsSpecification extends DDSpecification {
1213
SharedCommunicationObjects sco = new SharedCommunicationObjects()
1314

@@ -31,7 +32,7 @@ class SharedCommunicationsObjectsSpecification extends DDSpecification {
3132
sco.featuresDiscovery(config)
3233

3334
then:
34-
1 * config.traceAgentV05Enabled >> false
35+
1 * config.protocolVersion >> V0_4
3536
1 * config.tracerMetricsEnabled >> false
3637
sco.featuresDiscovery != null
3738

0 commit comments

Comments
 (0)