Skip to content

Commit 555a22e

Browse files
feat(data-pipeline): port dd-trace-rs trace buffer implementation (#1826)
# Motivation As we implement native spans in ruby in python, we have opportuinities to replace more components of the span pipeline with native code. One of them i the span buffer which we is before the trace exporter. # Changes Add a TraceBuffer implementation. The TraceBuffer is split in two part, a Sender which allows putting trace chunks in the queue and a background task, spawned on the shared runtime which periodically pulls data from the chunk and submit them using the TraceExporter.
1 parent 989e11a commit 555a22e

6 files changed

Lines changed: 1045 additions & 2 deletions

File tree

libdd-data-pipeline/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ sha2 = "0.10"
2626
either = "1.13.0"
2727
tokio = { version = "1.23", features = [
2828
"rt",
29+
"sync",
30+
"time",
2931
], default-features = false }
3032
uuid = { version = "1.10.0", features = ["v4"] }
3133
tokio-util = "0.7.11"
@@ -53,9 +55,14 @@ uuid = { version = "1", features = ["js"] }
5355
[lib]
5456
bench = false
5557

58+
[[bench]]
59+
name = "trace_buffer"
60+
harness = false
61+
5662
[dev-dependencies]
5763
libdd-capabilities-impl = { version = "0.1.0", path = "../libdd-capabilities-impl" }
5864
libdd-log = { path = "../libdd-log" }
65+
libdd-shared-runtime = { version = "0.1.0", path = "../libdd-shared-runtime" }
5966
clap = { version = "4.0", features = ["derive"] }
6067
criterion = "0.5.1"
6168
libdd-trace-utils = { path = "../libdd-trace-utils", features = [
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use std::pin::Pin;
5+
use std::sync::Arc;
6+
use std::time::Duration;
7+
8+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
9+
use libdd_data_pipeline::trace_buffer::{Export, TraceBuffer, TraceBufferConfig, TraceChunk};
10+
use libdd_data_pipeline::trace_exporter::{
11+
agent_response::AgentResponse, error::TraceExporterError,
12+
};
13+
use libdd_shared_runtime::SharedRuntime;
14+
15+
type Span = [u8; 100];
16+
17+
// Number of chunks each sender thread sends per benchmark iteration.
18+
const CHUNKS_PER_SENDER: usize = 900;
19+
20+
// Simulates async IO by sleeping 2ms per export batch.
21+
#[derive(Debug)]
22+
struct SleepExport;
23+
24+
impl Export<Span> for SleepExport {
25+
fn export_trace_chunks(
26+
&mut self,
27+
_trace_chunks: Vec<TraceChunk<Span>>,
28+
) -> Pin<
29+
Box<
30+
dyn std::future::Future<Output = Result<AgentResponse, TraceExporterError>> + Send + '_,
31+
>,
32+
> {
33+
Box::pin(async {
34+
tokio::time::sleep(Duration::from_millis(2)).await;
35+
Ok(AgentResponse::Unchanged)
36+
})
37+
}
38+
}
39+
40+
fn setup_buffer() -> (Arc<SharedRuntime>, Arc<TraceBuffer<Span>>) {
41+
let rt = Arc::new(SharedRuntime::new().expect("SharedRuntime::new"));
42+
let cfg = TraceBufferConfig::new()
43+
.max_buffered_spans(1_000)
44+
.span_flush_threshold(500)
45+
.max_flush_interval(Duration::from_secs(2));
46+
let (buf, worker) = TraceBuffer::new(cfg, Box::new(|_| {}), Box::new(SleepExport));
47+
rt.spawn_worker(worker, true).expect("spawn_worker");
48+
(rt, Arc::new(buf))
49+
}
50+
51+
fn bench_trace_buffer(c: &mut Criterion) {
52+
let mut group = c.benchmark_group("trace_buffer");
53+
54+
// (label, inter-send delay)
55+
let workloads: &[(&str, Option<Duration>)] = &[
56+
("no_delay", None),
57+
("1us_delay", Some(Duration::from_micros(1))),
58+
("10us_delay", Some(Duration::from_micros(10))),
59+
];
60+
61+
for &(delay_label, delay) in workloads {
62+
for num_senders in [1_usize, 2, 4, 8] {
63+
let (rt, sender) = setup_buffer();
64+
65+
group.throughput(Throughput::Elements(
66+
(num_senders * CHUNKS_PER_SENDER) as u64,
67+
));
68+
69+
group.bench_function(
70+
BenchmarkId::new(format!("{}_senders", num_senders), delay_label),
71+
|b| {
72+
b.iter(|| {
73+
std::thread::scope(|s| {
74+
for _ in 0..num_senders {
75+
let sender = sender.clone();
76+
s.spawn(move || {
77+
for _ in 0..CHUNKS_PER_SENDER {
78+
// BatchFull errors are expected under high load.
79+
let _ = sender.send_chunk(vec![[0u8; 100]]);
80+
if let Some(d) = delay {
81+
std::thread::sleep(d);
82+
}
83+
}
84+
});
85+
}
86+
});
87+
});
88+
},
89+
);
90+
91+
rt.shutdown(None).expect("runtime shutdown");
92+
}
93+
}
94+
95+
group.finish();
96+
}
97+
98+
criterion_group!(benches, bench_trace_buffer);
99+
criterion_main!(benches);

libdd-data-pipeline/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,7 @@ mod health_metrics;
1515
pub(crate) mod otlp;
1616
#[cfg(feature = "telemetry")]
1717
pub(crate) mod telemetry;
18+
#[cfg(not(target_arch = "wasm32"))]
19+
pub mod trace_buffer;
1820
#[allow(missing_docs)]
1921
pub mod trace_exporter;

0 commit comments

Comments
 (0)