Skip to content

Commit dda9b26

Browse files
lym953claude
andauthored
feat: [SVLS-8390] drop sampled-out traces from backend when compute_trace_stats_on_extension is enabled (#1060)
## Summary - When `compute_trace_stats_on_extension` is `true`, traces with sampling priority `<= 0` (`AUTO_DROP` / `USER_DROP`) are now dropped and **not** flushed to the Datadog backend. - When `compute_trace_stats_on_extension` is `false` (the current default), traces are not dropped because all traces need to be sent to Datadog so trace stats can be computed. - Trace stats computation is unaffected: dropped traces are still processed through `process_traces` and their stats are forwarded to the stats concentrator. ## Test plan - Passed the added unit test ### Manual test #### Steps - Add logging for trace payload flushed, and for traces being dropped - Build a layer and install it on a Lambda function - Invoke the function 6 times #### Result Logs show that - some traces were dropped. - the `aws.lambda` span for some of the 6 invocations were not flushed to Datadog. 🤖 Partially generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent d03d545 commit dda9b26

File tree

2 files changed

+218
-14
lines changed

2 files changed

+218
-14
lines changed

bottlecap/src/otlp/agent.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -183,16 +183,18 @@ impl Agent {
183183
None,
184184
);
185185

186-
match trace_tx.send(send_data_builder).await {
187-
Ok(()) => {
188-
debug!("OTLP | Successfully buffered traces to be aggregated.");
189-
}
190-
Err(err) => {
191-
error!("OTLP | Error sending traces to the trace aggregator: {err}");
192-
return Self::otlp_error_response(
193-
StatusCode::INTERNAL_SERVER_ERROR,
194-
format!("Error sending traces to the trace aggregator: {err}"),
195-
);
186+
if let Some(send_data_builder) = send_data_builder {
187+
match trace_tx.send(send_data_builder).await {
188+
Ok(()) => {
189+
debug!("OTLP | Successfully buffered traces to be aggregated.");
190+
}
191+
Err(err) => {
192+
error!("OTLP | Error sending traces to the trace aggregator: {err}");
193+
return Self::otlp_error_response(
194+
StatusCode::INTERNAL_SERVER_ERROR,
195+
format!("Error sending traces to the trace aggregator: {err}"),
196+
);
197+
}
196198
}
197199
}
198200

bottlecap/src/traces/trace_processor.rs

Lines changed: 206 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use tracing::{debug, error};
3333

3434
use crate::traces::stats_generator::StatsGenerator;
3535
use crate::traces::trace_aggregator::{OwnedTracerHeaderTags, SendDataBuilderInfo};
36+
use libdd_trace_normalization::normalizer::SamplerPriority;
3637

3738
#[derive(Clone)]
3839
#[allow(clippy::module_name_repetitions)]
@@ -314,7 +315,7 @@ pub trait TraceProcessor {
314315
traces: Vec<Vec<pb::Span>>,
315316
body_size: usize,
316317
span_pointers: Option<Vec<SpanPointer>>,
317-
) -> (SendDataBuilderInfo, TracerPayloadCollection);
318+
) -> (Option<SendDataBuilderInfo>, TracerPayloadCollection);
318319
}
319320

320321
#[async_trait]
@@ -327,7 +328,7 @@ impl TraceProcessor for ServerlessTraceProcessor {
327328
traces: Vec<Vec<pb::Span>>,
328329
body_size: usize,
329330
span_pointers: Option<Vec<SpanPointer>>,
330-
) -> (SendDataBuilderInfo, TracerPayloadCollection) {
331+
) -> (Option<SendDataBuilderInfo>, TracerPayloadCollection) {
331332
let mut payload = trace_utils::collect_pb_trace_chunks(
332333
traces,
333334
&header_tags,
@@ -362,6 +363,7 @@ impl TraceProcessor for ServerlessTraceProcessor {
362363

363364
// Clone inner V07 payloads for stats generation (TracerPayload is Clone,
364365
// but TracerPayloadCollection is not).
366+
// This clone is done BEFORE filtering so stats always include sampled-out traces.
365367
let payloads_for_stats = match &payload {
366368
TracerPayloadCollection::V07(payloads) => {
367369
TracerPayloadCollection::V07(payloads.clone())
@@ -372,6 +374,25 @@ impl TraceProcessor for ServerlessTraceProcessor {
372374
}
373375
};
374376

377+
// Remove sampled-out chunks so they won't be sent to Datadog.
378+
// Sampled-out chunks are preserved in payloads_for_stats above so their
379+
// stats are still counted. SamplerPriority::None (-128) means no explicit priority
380+
// was set and the trace is kept; drop priorities are SamplerPriority::AutoDrop (0)
381+
// and UserDrop (-1, not represented in SamplerPriority).
382+
if config.compute_trace_stats_on_extension
383+
&& let TracerPayloadCollection::V07(ref mut tracer_payloads) = payload
384+
{
385+
for tp in tracer_payloads.iter_mut() {
386+
tp.chunks.retain(|chunk| {
387+
chunk.priority > 0 || chunk.priority == SamplerPriority::None as i32
388+
});
389+
}
390+
tracer_payloads.retain(|tp| !tp.chunks.is_empty());
391+
if tracer_payloads.is_empty() {
392+
return (None, payloads_for_stats);
393+
}
394+
}
395+
375396
let owned_header_tags = OwnedTracerHeaderTags::from(header_tags.clone());
376397

377398
// Move original payload into builder (no clone needed)
@@ -385,7 +406,11 @@ impl TraceProcessor for ServerlessTraceProcessor {
385406
));
386407

387408
(
388-
SendDataBuilderInfo::new(builder, body_size, owned_header_tags),
409+
Some(SendDataBuilderInfo::new(
410+
builder,
411+
body_size,
412+
owned_header_tags,
413+
)),
389414
payloads_for_stats,
390415
)
391416
}
@@ -474,7 +499,6 @@ impl SendingTraceProcessor {
474499
body_size,
475500
span_pointers,
476501
);
477-
self.trace_tx.send(payload).await?;
478502

479503
// This needs to be after process_traces() because process_traces()
480504
// performs obfuscation, and we need to compute stats on the obfuscated traces.
@@ -485,6 +509,10 @@ impl SendingTraceProcessor {
485509
// return an error if only stats fail to send.
486510
error!("TRACE_PROCESSOR | Error sending traces to the stats concentrator: {err}");
487511
}
512+
513+
if let Some(payload) = payload {
514+
self.trace_tx.send(payload).await?;
515+
}
488516
Ok(())
489517
}
490518
}
@@ -616,6 +644,7 @@ mod tests {
616644
100,
617645
None,
618646
);
647+
let tracer_payload = tracer_payload.expect("expected Some payload");
619648

620649
let expected_tracer_payload = pb::TracerPayload {
621650
container_id: "33".to_string(),
@@ -1049,4 +1078,177 @@ mod tests {
10491078
"Trace should be kept when no filter tags are set."
10501079
);
10511080
}
1081+
1082+
/// Verifies that when `compute_trace_stats_on_extension` is true, `process_traces`
1083+
/// filters sampled-out chunks from the backend payload while preserving them in the
1084+
/// stats collection.
1085+
#[test]
1086+
#[allow(clippy::unwrap_used)]
1087+
fn test_process_traces_filters_sampled_out_chunks() {
1088+
use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig;
1089+
1090+
let config = Arc::new(Config {
1091+
apm_dd_url: "https://trace.agent.datadoghq.com".to_string(),
1092+
compute_trace_stats_on_extension: true,
1093+
..Config::default()
1094+
});
1095+
let tags_provider = Arc::new(Provider::new(
1096+
config.clone(),
1097+
"lambda".to_string(),
1098+
&std::collections::HashMap::from([(
1099+
"function_arn".to_string(),
1100+
"test-arn".to_string(),
1101+
)]),
1102+
));
1103+
let processor = ServerlessTraceProcessor {
1104+
obfuscation_config: Arc::new(
1105+
ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"),
1106+
),
1107+
};
1108+
1109+
let header_tags = tracer_header_tags::TracerHeaderTags {
1110+
lang: "rust",
1111+
lang_version: "1.0",
1112+
lang_interpreter: "",
1113+
lang_vendor: "",
1114+
tracer_version: "1.0",
1115+
container_id: "",
1116+
client_computed_top_level: false,
1117+
client_computed_stats: false,
1118+
dropped_p0_traces: 0,
1119+
dropped_p0_spans: 0,
1120+
};
1121+
1122+
let make_span = |trace_id: u64, priority: Option<f64>| -> pb::Span {
1123+
let mut metrics = HashMap::new();
1124+
if let Some(p) = priority {
1125+
metrics.insert("_sampling_priority_v1".to_string(), p);
1126+
}
1127+
pb::Span {
1128+
trace_id,
1129+
span_id: trace_id,
1130+
parent_id: 0,
1131+
metrics,
1132+
service: "svc".to_string(),
1133+
name: "op".to_string(),
1134+
resource: "res".to_string(),
1135+
..Default::default()
1136+
}
1137+
};
1138+
1139+
// Three traces: kept (priority 1), dropped (priority 0), dropped (priority -1)
1140+
let traces = vec![
1141+
vec![make_span(1, Some(1.0))],
1142+
vec![make_span(2, Some(0.0))],
1143+
vec![make_span(3, Some(-1.0))],
1144+
];
1145+
1146+
let (payload_info, stats_collection) =
1147+
processor.process_traces(config, tags_provider, header_tags, traces, 0, None);
1148+
let payload_info = payload_info.expect("expected Some payload");
1149+
1150+
// Stats collection must include all three traces
1151+
let TracerPayloadCollection::V07(ref stats_payloads) = stats_collection else {
1152+
panic!("expected V07");
1153+
};
1154+
let stats_span_count: usize = stats_payloads
1155+
.iter()
1156+
.flat_map(|tp| tp.chunks.iter())
1157+
.map(|c| c.spans.len())
1158+
.sum();
1159+
assert_eq!(stats_span_count, 3, "stats must include all traces");
1160+
1161+
// Backend payload must only contain the kept trace (priority 1)
1162+
let backend_send_data = payload_info.builder.build();
1163+
let TracerPayloadCollection::V07(backend_payloads) = backend_send_data.get_payloads()
1164+
else {
1165+
panic!("expected V07");
1166+
};
1167+
let backend_span_count: usize = backend_payloads
1168+
.iter()
1169+
.flat_map(|tp| tp.chunks.iter())
1170+
.map(|c| c.spans.len())
1171+
.sum();
1172+
assert_eq!(
1173+
backend_span_count, 1,
1174+
"backend must only include kept traces"
1175+
);
1176+
}
1177+
1178+
/// Verifies that `process_traces` returns `None` for the backend payload when all
1179+
/// traces are sampled out and `compute_trace_stats_on_extension` is true.
1180+
#[test]
1181+
fn test_process_traces_returns_none_when_all_sampled_out() {
1182+
use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig;
1183+
1184+
let config = Arc::new(Config {
1185+
apm_dd_url: "https://trace.agent.datadoghq.com".to_string(),
1186+
compute_trace_stats_on_extension: true,
1187+
..Config::default()
1188+
});
1189+
let tags_provider = Arc::new(Provider::new(
1190+
config.clone(),
1191+
"lambda".to_string(),
1192+
&std::collections::HashMap::from([(
1193+
"function_arn".to_string(),
1194+
"test-arn".to_string(),
1195+
)]),
1196+
));
1197+
let processor = ServerlessTraceProcessor {
1198+
obfuscation_config: Arc::new(
1199+
ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"),
1200+
),
1201+
};
1202+
let header_tags = tracer_header_tags::TracerHeaderTags {
1203+
lang: "rust",
1204+
lang_version: "1.0",
1205+
lang_interpreter: "",
1206+
lang_vendor: "",
1207+
tracer_version: "1.0",
1208+
container_id: "",
1209+
client_computed_top_level: false,
1210+
client_computed_stats: false,
1211+
dropped_p0_traces: 0,
1212+
dropped_p0_spans: 0,
1213+
};
1214+
1215+
let make_dropped_span = |trace_id: u64| -> pb::Span {
1216+
let mut metrics = HashMap::new();
1217+
metrics.insert("_sampling_priority_v1".to_string(), -1.0_f64);
1218+
pb::Span {
1219+
trace_id,
1220+
span_id: trace_id,
1221+
parent_id: 0,
1222+
metrics,
1223+
service: "svc".to_string(),
1224+
name: "op".to_string(),
1225+
resource: "res".to_string(),
1226+
..Default::default()
1227+
}
1228+
};
1229+
1230+
let traces = vec![vec![make_dropped_span(1)], vec![make_dropped_span(2)]];
1231+
1232+
let (payload, stats_collection) =
1233+
processor.process_traces(config, tags_provider, header_tags, traces, 0, None);
1234+
1235+
assert!(
1236+
payload.is_none(),
1237+
"backend payload must be None when all traces are sampled out"
1238+
);
1239+
1240+
// Stats collection must still include both traces
1241+
let TracerPayloadCollection::V07(ref stats_payloads) = stats_collection else {
1242+
panic!("expected V07");
1243+
};
1244+
let stats_span_count: usize = stats_payloads
1245+
.iter()
1246+
.flat_map(|tp| tp.chunks.iter())
1247+
.map(|c| c.spans.len())
1248+
.sum();
1249+
assert_eq!(
1250+
stats_span_count, 2,
1251+
"stats must include all traces even when all are sampled out"
1252+
);
1253+
}
10521254
}

0 commit comments

Comments
 (0)