Skip to content

Commit 3dc15ea

Browse files
committed
fix: dedup before msgpack span encoding
1 parent 5e3aa4f commit 3dc15ea

7 files changed

Lines changed: 49 additions & 30 deletions

File tree

libdd-data-pipeline/src/trace_buffer/mod.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::trace_exporter::{
2222

2323
/// Trait for types stored in a [`TraceBuffer`] that can report their approximate byte size.
2424
pub trait BufferSize {
25-
fn byte_size(&self) -> usize;
25+
fn byte_size(&mut self) -> usize;
2626
}
2727

2828
impl<T> BufferSize for libdd_trace_utils::span::v04::Span<T>
@@ -31,9 +31,11 @@ where
3131
T::Text: AsRef<str>,
3232
T::Bytes: AsRef<[u8]>,
3333
{
34-
fn byte_size(&self) -> usize {
34+
fn byte_size(&mut self) -> usize {
3535
use libdd_trace_utils::span::v04::AttributeAnyValue;
3636

37+
self.dedup();
38+
3739
// trace_id(16) + span_id(8) + parent_id(8) + start(8) + duration(8) + error(4)
3840
let mut size: usize = 52;
3941

@@ -235,7 +237,7 @@ impl<T> Batch<T> {
235237
/// batch. So the batch can be over the maximum size after this call.
236238
/// This is because we don't want to always drop traces that contain more bytes than the maximum
237239
/// size.
238-
fn add_trace_chunk(&mut self, chunk: Vec<T>) -> Result<(), BatchFullError>
240+
fn add_trace_chunk(&mut self, mut chunk: Vec<T>) -> Result<(), BatchFullError>
239241
where
240242
T: BufferSize,
241243
{
@@ -248,7 +250,7 @@ impl<T> Batch<T> {
248250
return Ok(());
249251
}
250252

251-
self.byte_count += chunk.iter().map(|s| s.byte_size()).sum::<usize>();
253+
self.byte_count += chunk.iter_mut().map(|s| s.byte_size()).sum::<usize>();
252254
self.chunks.push(chunk);
253255
Ok(())
254256
}
@@ -788,7 +790,7 @@ mod tests {
788790

789791
// Used for tests, 1 byte per item so size computations are easier
790792
impl BufferSize for () {
791-
fn byte_size(&self) -> usize {
793+
fn byte_size(&mut self) -> usize {
792794
1
793795
}
794796
}

libdd-data-pipeline/src/trace_exporter/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,12 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
584584
self.client_computed_top_level,
585585
);
586586

587+
for chunk in &mut traces {
588+
for span in chunk.iter_mut() {
589+
span.dedup();
590+
}
591+
}
592+
587593
// OTLP path: send sampled traces via OTLP when an OTLP endpoint is configured.
588594
// Unlike the agent path, there is no downstream agent to drop unsampled traces,
589595
// so drop_chunks is always called here regardless of whether stats are enabled.

libdd-trace-utils/src/msgpack_encoder/v04/span.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -288,17 +288,19 @@ pub fn encode_span<W: RmpWrite, T: TraceData>(
288288

289289
if !span.meta.is_empty() {
290290
write_const_msg_pack_str!(writer, "meta")?;
291-
rmp::encode::write_map_len(writer, span.meta.len() as u32)?;
292-
for (k, v) in span.meta.iter() {
291+
let meta_dd = span.meta.defensive_dedup();
292+
rmp::encode::write_map_len(writer, meta_dd.len() as u32)?;
293+
for (k, v) in meta_dd.iter() {
293294
write_str(writer, k.borrow())?;
294295
write_str(writer, v.borrow())?;
295296
}
296297
}
297298

298299
if !span.metrics.is_empty() {
299300
write_const_msg_pack_str!(writer, "metrics")?;
300-
rmp::encode::write_map_len(writer, span.metrics.len() as u32)?;
301-
for (k, v) in span.metrics.iter() {
301+
let metrics_dd = span.metrics.defensive_dedup();
302+
rmp::encode::write_map_len(writer, metrics_dd.len() as u32)?;
303+
for (k, v) in metrics_dd.iter() {
302304
write_str(writer, k.borrow())?;
303305
write_f64(writer, *v)?;
304306
}
@@ -311,8 +313,9 @@ pub fn encode_span<W: RmpWrite, T: TraceData>(
311313

312314
if !span.meta_struct.is_empty() {
313315
write_const_msg_pack_str!(writer, "meta_struct")?;
314-
rmp::encode::write_map_len(writer, span.meta_struct.len() as u32)?;
315-
for (k, v) in span.meta_struct.iter() {
316+
let meta_struct_dd = span.meta_struct.defensive_dedup();
317+
rmp::encode::write_map_len(writer, meta_struct_dd.len() as u32)?;
318+
for (k, v) in meta_struct_dd.iter() {
316319
write_str(writer, k.borrow())?;
317320
write_bin(writer, v.borrow())?;
318321
}

libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,14 @@ pub fn encode_span<W: RmpWrite, T: TraceData>(
255255
"env" | "version" | "component" | "span.kind" | "_dd.p.tid"
256256
)
257257
};
258-
let non_promoted_meta = span.meta.iter().filter(|(k, _)| !is_promoted(k)).count() as u32;
259-
let attr_count = non_promoted_meta + span.metrics.len() as u32 + span.meta_struct.len() as u32;
258+
let meta_dd = span.meta.defensive_dedup();
259+
let metrics_dd = span.metrics.defensive_dedup();
260+
let meta_struct_dd = span.meta_struct.defensive_dedup();
261+
262+
let non_promoted_meta = meta_dd.iter().filter(|(k, _)| !is_promoted(k)).count() as u32;
263+
let metrics_len = metrics_dd.len() as u32;
264+
let meta_struct_len = meta_struct_dd.len() as u32;
265+
let attr_count = non_promoted_meta + metrics_len + meta_struct_len;
260266
let has_attributes = attr_count > 0;
261267

262268
let env = span.meta.get("env").map(|v| v.borrow());
@@ -344,25 +350,25 @@ pub fn encode_span<W: RmpWrite, T: TraceData>(
344350
write_uint8(writer, SpanKey::Attributes as u8)?;
345351
rmp::encode::write_array_len(writer, attr_count * 3)?;
346352

347-
for (k, v) in span.meta.iter() {
353+
for (k, v) in meta_dd.iter() {
348354
if is_promoted(k) {
349355
continue;
350356
}
351-
table.write_interned(writer, k.borrow())?;
357+
table.write_interned(writer, (*k).borrow())?;
352358
write_uint8(writer, AnyValueKey::String as u8)?;
353-
table.write_interned(writer, v.borrow())?;
359+
table.write_interned(writer, (*v).borrow())?;
354360
}
355361

356-
for (k, v) in span.metrics.iter() {
357-
table.write_interned(writer, k.borrow())?;
362+
for (k, v) in metrics_dd.iter() {
363+
table.write_interned(writer, (*k).borrow())?;
358364
write_uint8(writer, AnyValueKey::Double as u8)?;
359365
write_f64(writer, *v)?;
360366
}
361367

362-
for (k, v) in span.meta_struct.iter() {
363-
table.write_interned(writer, k.borrow())?;
368+
for (k, v) in meta_struct_dd.iter() {
369+
table.write_interned(writer, (*k).borrow())?;
364370
write_uint8(writer, AnyValueKey::Bytes as u8)?;
365-
write_bin(writer, v.borrow())?;
371+
write_bin(writer, (*v).borrow())?;
366372
}
367373
}
368374

libdd-trace-utils/src/span/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ use std::{fmt, ptr};
2121
/// Trait representing the requirements for a type to be used as a Span "string" type.
2222
/// Note: Borrow<str> is not required by the derived traits, but allows to access HashMap elements
2323
/// from a static str and check if the string is empty.
24-
pub trait SpanText: Debug + Eq + Hash + Borrow<str> + Serialize + Default + From<String> {
24+
pub trait SpanText:
25+
Debug + Eq + Hash + Clone + Borrow<str> + Serialize + Default + From<String>
26+
{
2527
fn from_static_str(value: &'static str) -> Self;
2628
}
2729

libdd-trace-utils/src/span/v04/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,14 @@ impl<T: TraceData> From<&AttributeArrayValue<T>> for u8 {
310310
}
311311
}
312312

313+
impl<T: TraceData> Span<T> {
314+
pub fn dedup(&mut self) {
315+
self.meta.dedup();
316+
self.metrics.dedup();
317+
self.meta_struct.dedup();
318+
}
319+
}
320+
313321
fn is_default<T: Default + PartialEq>(t: &T) -> bool {
314322
t == &T::default()
315323
}

libdd-trace-utils/src/span/vec_map.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,6 @@ impl<K: Eq + Hash, V> VecMap<K, V> {
194194
if self.deduped {
195195
DedupedVecMap::Borrowed(self)
196196
} else {
197-
static WARNED: AtomicBool = AtomicBool::new(false);
198-
if !WARNED.swap(true, Ordering::Relaxed) {
199-
tracing::warn!(
200-
"VecMap not deduped before encoding. Performing defensive on-the-fly dedup"
201-
);
202-
}
203-
204197
DedupedVecMap::Owned(
205198
self.data
206199
.iter()
@@ -215,7 +208,6 @@ impl<K: Eq + Hash, V> VecMap<K, V> {
215208
/// This is a convenience wrapper around [Self::as_deduped_map] used in the msgpack encoder,
216209
/// where we expect the map to be deduped, but call `as_deduped_map` as a defensive measure. If
217210
/// the latter had to deduplicate and allocate a new vec, we log a warning (at most once).
218-
#[allow(unused)]
219211
pub(crate) fn defensive_dedup(&self) -> DedupedVecMap<'_, K, V> {
220212
if !self.is_deduped() {
221213
static WARNED: AtomicBool = AtomicBool::new(false);

0 commit comments

Comments
 (0)