Skip to content

Commit e2fb886

Browse files
feat!: add encoder from v04 to v1 (#1896)
# What does this PR do? Implements the v0.4 to V1 encoder for the Trace Exporter. Key additions: - StringTable for streaming string interning: first occurrence writes the string, subsequent ones write an integer ID - Chunk-level attribute extraction (trace_id 128-bit, sampling_priority, origin) promoted from the root span - Integer keys for all msgpack fields # Motivation [APMSP-2808](https://datadoghq.atlassian.net/browse/APMSP-2808) # Additional Notes - V04 and V05 formats are untouched, no existing behavior changes - TraceExporterOutputFormat::V1 is not exposed in the C FFI yet so there is no production impact - The string interning is scoped per payload (table reset on each to_vec call) # How to test the change? Added unit tests. [APMSP-2808]: https://datadoghq.atlassian.net/browse/APMSP-2808?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ Co-authored-by: jules.wiriath <jules.wiriath@datadoghq.com>
1 parent cea1e44 commit e2fb886

15 files changed

Lines changed: 1525 additions & 99 deletions

File tree

libdd-data-pipeline/src/trace_exporter/builder.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,9 @@ impl TraceExporterBuilder {
505505
match input {
506506
TraceExporterInputFormat::V04 => matches!(
507507
output,
508-
TraceExporterOutputFormat::V04 | TraceExporterOutputFormat::V05
508+
TraceExporterOutputFormat::V04
509+
| TraceExporterOutputFormat::V05
510+
| TraceExporterOutputFormat::V1
509511
),
510512
TraceExporterInputFormat::V05 => matches!(output, TraceExporterOutputFormat::V05),
511513
}

libdd-data-pipeline/src/trace_exporter/mod.rs

Lines changed: 4 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ pub enum TraceExporterOutputFormat {
8383
#[default]
8484
V04,
8585
V05,
86+
V1,
8687
}
8788

8889
impl TraceExporterOutputFormat {
@@ -93,6 +94,7 @@ impl TraceExporterOutputFormat {
9394
match self {
9495
TraceExporterOutputFormat::V04 => "/v0.4/traces",
9596
TraceExporterOutputFormat::V05 => "/v0.5/traces",
97+
TraceExporterOutputFormat::V1 => "/v1.0/traces",
9698
},
9799
)
98100
}
@@ -127,44 +129,7 @@ fn add_path(url: &Uri, path: &str) -> Uri {
127129
Uri::from_parts(parts).unwrap()
128130
}
129131

130-
#[derive(Clone, Default, Debug)]
131-
pub struct TracerMetadata {
132-
pub hostname: String,
133-
pub env: String,
134-
pub app_version: String,
135-
pub runtime_id: String,
136-
pub service: String,
137-
pub tracer_version: String,
138-
pub language: String,
139-
pub language_version: String,
140-
pub language_interpreter: String,
141-
pub language_interpreter_vendor: String,
142-
pub git_commit_sha: String,
143-
pub process_tags: String,
144-
pub client_computed_stats: bool,
145-
pub client_computed_top_level: bool,
146-
}
147-
148-
impl<'a> From<&'a TracerMetadata> for TracerHeaderTags<'a> {
149-
fn from(tags: &'a TracerMetadata) -> TracerHeaderTags<'a> {
150-
TracerHeaderTags::<'_> {
151-
lang: &tags.language,
152-
lang_version: &tags.language_version,
153-
tracer_version: &tags.tracer_version,
154-
lang_interpreter: &tags.language_interpreter,
155-
lang_vendor: &tags.language_interpreter_vendor,
156-
client_computed_stats: tags.client_computed_stats,
157-
client_computed_top_level: tags.client_computed_top_level,
158-
..Default::default()
159-
}
160-
}
161-
}
162-
163-
impl<'a> From<&'a TracerMetadata> for HeaderMap {
164-
fn from(tags: &'a TracerMetadata) -> HeaderMap {
165-
TracerHeaderTags::from(tags).into()
166-
}
167-
}
132+
pub use libdd_trace_utils::tracer_metadata::TracerMetadata;
168133

169134
/// Handles for the background workers owned by a [`TraceExporter`].
170135
#[cfg(not(target_arch = "wasm32"))]
@@ -633,6 +598,7 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
633598
let prepared = match self.serializer.prepare_traces_payload(
634599
traces,
635600
header_tags,
601+
&self.metadata,
636602
self.agent_payload_response_version.as_ref(),
637603
) {
638604
Ok(p) => p,

libdd-data-pipeline/src/trace_exporter/stats.rs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -339,26 +339,6 @@ pub(crate) fn is_stats_worker_active(client_side_stats: &ArcSwap<StatsComputatio
339339
)
340340
}
341341

342-
#[cfg(not(target_arch = "wasm32"))]
343-
impl From<TracerMetadata> for StatsMetadata {
344-
fn from(m: TracerMetadata) -> StatsMetadata {
345-
StatsMetadata {
346-
hostname: m.hostname,
347-
env: m.env,
348-
app_version: m.app_version,
349-
runtime_id: m.runtime_id,
350-
language: m.language,
351-
lang_version: m.language_version,
352-
lang_interpreter: m.language_interpreter,
353-
lang_vendor: m.language_interpreter_vendor,
354-
tracer_version: m.tracer_version,
355-
git_commit_sha: m.git_commit_sha,
356-
process_tags: m.process_tags,
357-
service: m.service,
358-
}
359-
}
360-
}
361-
362342
#[cfg(test)]
363343
mod tests {
364344
#[cfg(feature = "stats-obfuscation")]

libdd-data-pipeline/src/trace_exporter/trace_serializer.rs

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use libdd_trace_utils::msgpack_decoder::decode::error::DecodeError;
1616
use libdd_trace_utils::msgpack_encoder;
1717
use libdd_trace_utils::span::{v04::Span, TraceData};
1818
use libdd_trace_utils::trace_utils::{self, TracerHeaderTags};
19+
use libdd_trace_utils::tracer_metadata::TracerMetadata;
1920
use libdd_trace_utils::tracer_payload;
2021

2122
/// Minimal capacity of fresh buffers allocated to encode traces, in bytes.
@@ -52,13 +53,14 @@ impl TraceSerializer {
5253
&self,
5354
traces: Vec<Vec<Span<T>>>,
5455
header_tags: TracerHeaderTags,
56+
metadata: &TracerMetadata,
5557
agent_payload_response_version: Option<&AgentResponsePayloadVersion>,
5658
) -> Result<PreparedTracesPayload, TraceExporterError> {
5759
let payload = self.collect_and_process_traces(traces)?;
5860
let chunks = payload.size();
5961
let headers =
6062
self.build_traces_headers(header_tags, chunks, agent_payload_response_version);
61-
let mp_payload = self.serialize_payload(&payload)?;
63+
let mp_payload = self.serialize_payload(&payload, metadata)?;
6264

6365
Ok(PreparedTracesPayload {
6466
data: mp_payload,
@@ -72,13 +74,15 @@ impl TraceSerializer {
7274
&self,
7375
traces: Vec<Vec<Span<T>>>,
7476
) -> Result<tracer_payload::TraceChunks<T>, TraceExporterError> {
75-
let use_v05_format = match self.output_format {
76-
TraceExporterOutputFormat::V05 => true,
77-
TraceExporterOutputFormat::V04 => false,
78-
};
79-
trace_utils::collect_trace_chunks(traces, use_v05_format).map_err(|e| {
80-
TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string()))
81-
})
77+
match self.output_format {
78+
TraceExporterOutputFormat::V1 => Ok(tracer_payload::TraceChunks::V1(traces)),
79+
format => {
80+
let use_v05_format = matches!(format, TraceExporterOutputFormat::V05);
81+
trace_utils::collect_trace_chunks(traces, use_v05_format).map_err(|e| {
82+
TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string()))
83+
})
84+
}
85+
}
8286
}
8387

8488
/// Build HTTP headers for traces request
@@ -105,6 +109,7 @@ impl TraceSerializer {
105109
fn serialize_payload<T: TraceData>(
106110
&self,
107111
payload: &tracer_payload::TraceChunks<T>,
112+
metadata: &TracerMetadata,
108113
) -> Result<Vec<u8>, TraceExporterError> {
109114
let capacity = self
110115
.previous_serialised_len
@@ -120,6 +125,9 @@ impl TraceSerializer {
120125
.map_err(TraceExporterError::Serialization)?;
121126
buff
122127
}
128+
tracer_payload::TraceChunks::V1(p) => {
129+
msgpack_encoder::v1::to_vec_with_capacity(p, capacity as u32, metadata)
130+
}
123131
};
124132
self.previous_serialised_len
125133
.store(buff.len(), Ordering::Relaxed);
@@ -275,7 +283,7 @@ mod tests {
275283
.collect_and_process_traces(original_traces.clone())
276284
.unwrap();
277285

278-
let result = serializer.serialize_payload(&payload);
286+
let result = serializer.serialize_payload(&payload, &TracerMetadata::default());
279287
assert!(result.is_ok());
280288

281289
let serialized = result.unwrap();
@@ -310,7 +318,7 @@ mod tests {
310318
.collect_and_process_traces(original_traces.clone())
311319
.unwrap();
312320

313-
let result = serializer.serialize_payload(&payload);
321+
let result = serializer.serialize_payload(&payload, &TracerMetadata::default());
314322
assert!(result.is_ok());
315323

316324
let serialized = result.unwrap();
@@ -346,7 +354,12 @@ mod tests {
346354
];
347355
let header_tags = create_test_header_tags();
348356

349-
let result = serializer.prepare_traces_payload(traces, header_tags, None);
357+
let result = serializer.prepare_traces_payload(
358+
traces,
359+
header_tags,
360+
&TracerMetadata::default(),
361+
None,
362+
);
350363
assert!(result.is_ok());
351364

352365
let prepared = result.unwrap();
@@ -365,7 +378,12 @@ mod tests {
365378
let traces = vec![vec![create_test_span()]];
366379
let header_tags = create_test_header_tags();
367380

368-
let result = serializer.prepare_traces_payload(traces, header_tags, None);
381+
let result = serializer.prepare_traces_payload(
382+
traces,
383+
header_tags,
384+
&TracerMetadata::default(),
385+
None,
386+
);
369387
assert!(result.is_ok());
370388

371389
let prepared = result.unwrap();
@@ -381,7 +399,12 @@ mod tests {
381399
let traces = vec![vec![create_test_span()]];
382400
let header_tags = create_test_header_tags();
383401

384-
let result = serializer.prepare_traces_payload(traces, header_tags, Some(&agent_version));
402+
let result = serializer.prepare_traces_payload(
403+
traces,
404+
header_tags,
405+
&TracerMetadata::default(),
406+
Some(&agent_version),
407+
);
385408
assert!(result.is_ok());
386409

387410
let prepared = result.unwrap();
@@ -395,7 +418,12 @@ mod tests {
395418
let traces: Vec<Vec<SpanBytes>> = vec![];
396419
let header_tags = create_test_header_tags();
397420

398-
let result = serializer.prepare_traces_payload(traces, header_tags, None);
421+
let result = serializer.prepare_traces_payload(
422+
traces,
423+
header_tags,
424+
&TracerMetadata::default(),
425+
None,
426+
);
399427
assert!(result.is_ok());
400428

401429
let prepared = result.unwrap();
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
[[
2+
{
3+
"name": "test_exporter_v04_v1_snapshot_root",
4+
"service": "test-service",
5+
"resource": "test-resource",
6+
"trace_id": 0,
7+
"span_id": 1,
8+
"parent_id": 0,
9+
"type": "web",
10+
"meta": {
11+
"_dd.hostname": "my-host",
12+
"_dd.origin": "lambda",
13+
"_dd.p.dm": "-4",
14+
"_dd.p.tid": "0x0",
15+
"component": "http",
16+
"env": "test-env",
17+
"runtime-id": "test-runtime-id-value",
18+
"service": "test-service",
19+
"span.kind": "server",
20+
"version": "1.0.0"
21+
},
22+
"metrics": {
23+
"_dd.top_level": 1.0,
24+
"_sampling_priority_v1": 1
25+
},
26+
"duration": 5,
27+
"start": 0
28+
},
29+
{
30+
"name": "test_exporter_v04_v1_snapshot_child",
31+
"service": "test-service",
32+
"resource": "test-resource",
33+
"trace_id": 0,
34+
"span_id": 2,
35+
"parent_id": 1,
36+
"meta": {
37+
"_dd.origin": "lambda",
38+
"_dd.p.dm": "-4",
39+
"_dd.p.tid": "0x0",
40+
"env": "test-env",
41+
"runtime-id": "test-runtime-id-value",
42+
"service": "test-service",
43+
"span.kind": "internal"
44+
},
45+
"metrics": {
46+
"_dd_metric1": 1.0,
47+
"_dd_metric2": 2.0,
48+
"_sampling_priority_v1": 1
49+
},
50+
"duration": 5,
51+
"start": 1
52+
}]]

libdd-data-pipeline/tests/test_trace_exporter.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,80 @@ mod tracing_integration_tests {
250250
test_agent.assert_snapshot(snapshot_name).await;
251251
}
252252

253+
fn get_v04_to_v1_trace_snapshot_test_payload(name_prefix: &str) -> Vec<u8> {
254+
// Root span: exercises chunk-level attrs (sampling priority, origin, mechanism)
255+
// and span-level promoted fields (env, version, component, span.kind).
256+
let mut root_span = create_test_json_span(1234, 12341, 0, 0, true);
257+
root_span["name"] = json!(format!("{name_prefix}_root"));
258+
root_span["type"] = json!("web");
259+
root_span["meta"] = json!({
260+
"env": "test-env",
261+
"version": "1.0.0",
262+
"component": "http",
263+
"span.kind": "server",
264+
"_dd.hostname": "my-host",
265+
"_dd.origin": "lambda",
266+
"_dd.p.dm": "-4",
267+
"runtime-id": "test-runtime-id-value",
268+
"service": "test-service",
269+
});
270+
root_span["metrics"] = json!({
271+
"_sampling_priority_v1": 1.0,
272+
"_dd.top_level": 1.0,
273+
});
274+
275+
// Child span: exercises metrics and meta attributes without promoted fields.
276+
let mut child_span = create_test_json_span(1234, 12342, 12341, 1, false);
277+
child_span["name"] = json!(format!("{name_prefix}_child"));
278+
child_span["metrics"] = json!({
279+
"_dd_metric1": 1.0,
280+
"_dd_metric2": 2.0,
281+
});
282+
283+
rmp_serde::to_vec_named(&vec![vec![root_span, child_span]]).unwrap()
284+
}
285+
286+
#[cfg_attr(miri, ignore)]
287+
#[tokio::test]
288+
async fn compare_v04_to_v1_trace_snapshot_test() {
289+
let relative_snapshot_path = "libdd-data-pipeline/tests/snapshots/";
290+
let snapshot_name = "compare_exporter_v04_to_v1_trace_snapshot_test";
291+
let test_agent = DatadogTestAgent::new(Some(relative_snapshot_path), None, &[]).await;
292+
let url = test_agent.get_base_uri().await;
293+
294+
test_agent.start_session(snapshot_name, None).await;
295+
296+
let task_result = task::spawn_blocking(move || {
297+
let mut builder = TraceExporter::<NativeCapabilities>::builder();
298+
builder
299+
.set_url(url.to_string().as_ref())
300+
.set_language("test-lang")
301+
.set_language_version("2.0")
302+
.set_language_interpreter_vendor("vendor")
303+
.set_language_interpreter("interpreter")
304+
.set_tracer_version("1.0")
305+
.set_env("test_env")
306+
.set_service("test")
307+
.set_test_session_token(snapshot_name)
308+
.set_input_format(TraceExporterInputFormat::V04)
309+
.set_output_format(TraceExporterOutputFormat::V1);
310+
311+
let trace_exporter = builder
312+
.build::<NativeCapabilities>()
313+
.expect("Unable to build TraceExporter");
314+
315+
let data = get_v04_to_v1_trace_snapshot_test_payload("test_exporter_v04_v1_snapshot");
316+
317+
let response = trace_exporter.send(data.as_ref());
318+
assert!(response.is_ok(), "send failed: {:?}", response.err());
319+
})
320+
.await;
321+
322+
assert!(task_result.is_ok());
323+
324+
test_agent.assert_snapshot(snapshot_name).await;
325+
}
326+
253327
#[cfg_attr(miri, ignore)]
254328
#[cfg(target_os = "linux")]
255329
#[tokio::test]

0 commit comments

Comments
 (0)