Skip to content

Commit 1f2362c

Browse files
lym953claude
andcommitted
refactor: move sampling drop logic into process_traces, simplify send_processed_traces
Instead of partitioning raw traces and calling process_traces twice, the sampling-out filtering now happens inside ServerlessTraceProcessor::process_traces. The payloads_for_stats clone (for stats) is taken before the filter, so stats always include sampled-out traces. The backend payload has sampled-out chunks removed in-place. send_processed_traces is simplified to a single process_traces call, with stats computed before the backend send. The OwnedTracerHeaderTags upfront conversion is also removed since only one process_traces call is needed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 9730672 commit 1f2362c

1 file changed

Lines changed: 106 additions & 94 deletions

File tree

bottlecap/src/traces/trace_processor.rs

Lines changed: 106 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ use tracing::{debug, error};
3434
use crate::traces::stats_generator::StatsGenerator;
3535
use crate::traces::trace_aggregator::{OwnedTracerHeaderTags, SendDataBuilderInfo};
3636

37-
const SAMPLING_PRIORITY_KEY: &str = "_sampling_priority_v1";
37+
/// Sentinel value used by `collect_pb_trace_chunks` when `_sampling_priority_v1` is absent.
38+
const CHUNK_PRIORITY_NOT_SET: i32 = i8::MIN as i32;
3839

3940
#[derive(Clone)]
4041
#[allow(clippy::module_name_repetitions)]
@@ -364,6 +365,7 @@ impl TraceProcessor for ServerlessTraceProcessor {
364365

365366
// Clone inner V07 payloads for stats generation (TracerPayload is Clone,
366367
// but TracerPayloadCollection is not).
368+
// This clone is done BEFORE filtering so stats always include sampled-out traces.
367369
let payloads_for_stats = match &payload {
368370
TracerPayloadCollection::V07(payloads) => {
369371
TracerPayloadCollection::V07(payloads.clone())
@@ -374,6 +376,22 @@ impl TraceProcessor for ServerlessTraceProcessor {
374376
}
375377
};
376378

379+
// When computing stats on extension, remove sampled-out chunks from the backend
380+
// payload. Sampled-out chunks are preserved in payloads_for_stats above so their
381+
// stats are still counted. CHUNK_PRIORITY_NOT_SET (-128) means no explicit priority
382+
// was set and the trace is kept; valid drop priorities are -1 (USER_DROP) and 0
383+
// (AUTO_DROP).
384+
if config.compute_trace_stats_on_extension
385+
&& let TracerPayloadCollection::V07(ref mut tracer_payloads) = payload
386+
{
387+
for tp in tracer_payloads.iter_mut() {
388+
tp.chunks.retain(|chunk| {
389+
chunk.priority > 0 || chunk.priority == CHUNK_PRIORITY_NOT_SET
390+
});
391+
}
392+
tracer_payloads.retain(|tp| !tp.chunks.is_empty());
393+
}
394+
377395
let owned_header_tags = OwnedTracerHeaderTags::from(header_tags.clone());
378396

379397
// Move original payload into builder (no clone needed)
@@ -426,12 +444,7 @@ impl SendingTraceProcessor {
426444
body_size: usize,
427445
span_pointers: Option<Vec<SpanPointer>>,
428446
) -> Result<(), SendError<SendDataBuilderInfo>> {
429-
// Convert to owned upfront so header_tags can be reconstructed multiple times
430-
// if compute_trace_stats_on_extension requires two calls to process_traces.
431-
let owned_header_tags = OwnedTracerHeaderTags::from(header_tags);
432-
433447
traces = if let Some(appsec) = &self.appsec {
434-
let header_tags = owned_header_tags.to_tracer_header_tags();
435448
let mut appsec = appsec.lock().await;
436449
traces.into_iter().filter_map(|mut trace| {
437450
let Some(span) = AppSecProcessor::service_entry_span_mut(&mut trace) else {
@@ -473,77 +486,31 @@ impl SendingTraceProcessor {
473486
return Ok(());
474487
}
475488

476-
// When compute_trace_stats_on_extension is true, traces with sampling priority <= 0
477-
// should be dropped (not sent to the backend) but still counted in trace stats.
478-
if config.compute_trace_stats_on_extension {
479-
let (traces_to_keep, traces_to_drop): (Vec<_>, Vec<_>) = traces
480-
.into_iter()
481-
.partition(|trace| !is_trace_sampled_out(trace));
482-
483-
// Compute stats for dropped traces without sending them to the backend.
484-
if !traces_to_drop.is_empty() {
485-
let (_, dropped_stats) = self.processor.process_traces(
486-
config.clone(),
487-
tags_provider.clone(),
488-
owned_header_tags.to_tracer_header_tags(),
489-
traces_to_drop,
490-
0,
491-
span_pointers.clone(),
492-
);
493-
// This needs to be after process_traces() because process_traces()
494-
// performs obfuscation, and we need to compute stats on the obfuscated traces.
495-
if let Err(err) = self.stats_generator.send(&dropped_stats) {
496-
error!(
497-
"TRACE_PROCESSOR | Error sending dropped trace stats to the stats concentrator: {err}"
498-
);
499-
}
500-
}
501-
502-
traces = traces_to_keep;
503-
if traces.is_empty() {
504-
debug!("TRACE_PROCESSOR | All traces were sampled out, skipping backend send.");
505-
return Ok(());
506-
}
507-
}
508-
509489
let (payload, processed_traces) = self.processor.process_traces(
510490
config.clone(),
511491
tags_provider,
512-
owned_header_tags.to_tracer_header_tags(),
492+
header_tags,
513493
traces,
514494
body_size,
515495
span_pointers,
516496
);
517-
self.trace_tx.send(payload).await?;
518497

519-
// This needs to be after process_traces() because process_traces()
520-
// performs obfuscation, and we need to compute stats on the obfuscated traces.
498+
// Compute stats for ALL traces (including sampled-out ones). process_traces()
499+
// handles obfuscation and filters sampled-out chunks from the backend payload
500+
// when compute_trace_stats_on_extension is true, so stats must come first here.
521501
if config.compute_trace_stats_on_extension
522502
&& let Err(err) = self.stats_generator.send(&processed_traces)
523503
{
524504
// Just log the error. We don't think trace stats are critical, so we don't want to
525505
// return an error if only stats fail to send.
526506
error!("TRACE_PROCESSOR | Error sending traces to the stats concentrator: {err}");
527507
}
508+
509+
self.trace_tx.send(payload).await?;
528510
Ok(())
529511
}
530512
}
531513

532-
/// Returns true if the trace should be dropped based on its sampling priority.
533-
/// A trace is sampled out when the root span's `_sampling_priority_v1` metric is <= 0.
534-
/// If the sampling priority is absent, the trace is kept.
535-
fn is_trace_sampled_out(trace: &[pb::Span]) -> bool {
536-
let root_span = trace
537-
.iter()
538-
.find(|s| s.parent_id == 0)
539-
.or_else(|| trace.first());
540-
if let Some(span) = root_span
541-
&& let Some(&priority) = span.metrics.get(SAMPLING_PRIORITY_KEY)
542-
{
543-
return priority <= 0.0;
544-
}
545-
false
546-
}
547514

548515
#[cfg(test)]
549516
mod tests {
@@ -1106,53 +1073,98 @@ mod tests {
11061073
);
11071074
}
11081075

1076+
/// Verifies that when `compute_trace_stats_on_extension` is true, `process_traces`
1077+
/// filters sampled-out chunks from the backend payload while preserving them in the
1078+
/// stats collection.
11091079
#[test]
1110-
fn test_is_trace_sampled_out() {
1111-
let make_span = |parent_id: u64, priority: Option<f64>| -> pb::Span {
1080+
#[allow(clippy::unwrap_used)]
1081+
fn test_process_traces_filters_sampled_out_chunks() {
1082+
use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig;
1083+
1084+
let config = Arc::new(Config {
1085+
apm_dd_url: "https://trace.agent.datadoghq.com".to_string(),
1086+
compute_trace_stats_on_extension: true,
1087+
..Config::default()
1088+
});
1089+
let tags_provider = Arc::new(Provider::new(
1090+
config.clone(),
1091+
"lambda".to_string(),
1092+
&std::collections::HashMap::from([(
1093+
"function_arn".to_string(),
1094+
"test-arn".to_string(),
1095+
)]),
1096+
));
1097+
let processor = ServerlessTraceProcessor {
1098+
obfuscation_config: Arc::new(
1099+
ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"),
1100+
),
1101+
};
1102+
1103+
let header_tags = tracer_header_tags::TracerHeaderTags {
1104+
lang: "rust",
1105+
lang_version: "1.0",
1106+
lang_interpreter: "",
1107+
lang_vendor: "",
1108+
tracer_version: "1.0",
1109+
container_id: "",
1110+
client_computed_top_level: false,
1111+
client_computed_stats: false,
1112+
dropped_p0_traces: 0,
1113+
dropped_p0_spans: 0,
1114+
};
1115+
1116+
let make_span = |trace_id: u64, priority: Option<f64>| -> pb::Span {
11121117
let mut metrics = HashMap::new();
11131118
if let Some(p) = priority {
1114-
metrics.insert(SAMPLING_PRIORITY_KEY.to_string(), p);
1119+
metrics.insert("_sampling_priority_v1".to_string(), p);
11151120
}
11161121
pb::Span {
1117-
trace_id: 1,
1118-
span_id: 1,
1119-
parent_id,
1122+
trace_id,
1123+
span_id: trace_id,
1124+
parent_id: 0,
11201125
metrics,
1126+
service: "svc".to_string(),
1127+
name: "op".to_string(),
1128+
resource: "res".to_string(),
11211129
..Default::default()
11221130
}
11231131
};
11241132

1125-
// Root span (parent_id == 0) with priority -1 (USER_DROP) → sampled out
1126-
let trace = vec![make_span(0, Some(-1.0))];
1127-
assert!(is_trace_sampled_out(&trace));
1128-
1129-
// Root span with priority 0 (AUTO_DROP) → sampled out
1130-
let trace = vec![make_span(0, Some(0.0))];
1131-
assert!(is_trace_sampled_out(&trace));
1132-
1133-
// Root span with priority 1 (AUTO_KEEP) → kept
1134-
let trace = vec![make_span(0, Some(1.0))];
1135-
assert!(!is_trace_sampled_out(&trace));
1136-
1137-
// Root span with priority 2 (USER_KEEP) → kept
1138-
let trace = vec![make_span(0, Some(2.0))];
1139-
assert!(!is_trace_sampled_out(&trace));
1140-
1141-
// No sampling priority tag → kept (default)
1142-
let trace = vec![make_span(0, None)];
1143-
assert!(!is_trace_sampled_out(&trace));
1144-
1145-
// Empty trace → kept
1146-
assert!(!is_trace_sampled_out(&[]));
1133+
// Three traces: kept (priority 1), dropped (priority 0), dropped (priority -1)
1134+
let traces = vec![
1135+
vec![make_span(1, Some(1.0))],
1136+
vec![make_span(2, Some(0.0))],
1137+
vec![make_span(3, Some(-1.0))],
1138+
];
11471139

1148-
// No root span (no parent_id == 0); falls back to first span with priority 0 → sampled out
1149-
let trace = vec![make_span(99, Some(0.0)), make_span(99, Some(1.0))];
1150-
assert!(is_trace_sampled_out(&trace));
1140+
let (payload_info, stats_collection) =
1141+
processor.process_traces(config, tags_provider, header_tags, traces, 0, None);
11511142

1152-
// Root span is not first; priority on root span wins
1153-
let root = make_span(0, Some(-1.0));
1154-
let child = make_span(1, Some(1.0));
1155-
let trace = vec![child, root];
1156-
assert!(is_trace_sampled_out(&trace));
1143+
// Stats collection must include all three traces
1144+
let TracerPayloadCollection::V07(ref stats_payloads) = stats_collection else {
1145+
panic!("expected V07");
1146+
};
1147+
let stats_span_count: usize = stats_payloads
1148+
.iter()
1149+
.flat_map(|tp| tp.chunks.iter())
1150+
.map(|c| c.spans.len())
1151+
.sum();
1152+
assert_eq!(stats_span_count, 3, "stats must include all traces");
1153+
1154+
// Backend payload must only contain the kept trace (priority 1)
1155+
let backend_send_data = payload_info.builder.build();
1156+
let TracerPayloadCollection::V07(backend_payloads) = backend_send_data.get_payloads()
1157+
else {
1158+
panic!("expected V07");
1159+
};
1160+
let backend_span_count: usize = backend_payloads
1161+
.iter()
1162+
.flat_map(|tp| tp.chunks.iter())
1163+
.map(|c| c.spans.len())
1164+
.sum();
1165+
assert_eq!(
1166+
backend_span_count, 1,
1167+
"backend must only include kept traces"
1168+
);
11571169
}
11581170
}

0 commit comments

Comments
 (0)