Skip to content

Commit 15a7a08

Browse files
lym953claude
andcommitted
feat: return None from process_traces when all chunks are sampled out
When compute_trace_stats_on_extension is true and all trace chunks are filtered out due to sampling priority, process_traces now returns Option::None for the SendDataBuilderInfo instead of constructing an empty payload. Callers skip the backend send when None is received. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 1f2362c commit 15a7a08

2 files changed

Lines changed: 98 additions & 14 deletions

File tree

bottlecap/src/otlp/agent.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -183,16 +183,18 @@ impl Agent {
183183
None,
184184
);
185185

186-
match trace_tx.send(send_data_builder).await {
187-
Ok(()) => {
188-
debug!("OTLP | Successfully buffered traces to be aggregated.");
189-
}
190-
Err(err) => {
191-
error!("OTLP | Error sending traces to the trace aggregator: {err}");
192-
return Self::otlp_error_response(
193-
StatusCode::INTERNAL_SERVER_ERROR,
194-
format!("Error sending traces to the trace aggregator: {err}"),
195-
);
186+
if let Some(send_data_builder) = send_data_builder {
187+
match trace_tx.send(send_data_builder).await {
188+
Ok(()) => {
189+
debug!("OTLP | Successfully buffered traces to be aggregated.");
190+
}
191+
Err(err) => {
192+
error!("OTLP | Error sending traces to the trace aggregator: {err}");
193+
return Self::otlp_error_response(
194+
StatusCode::INTERNAL_SERVER_ERROR,
195+
format!("Error sending traces to the trace aggregator: {err}"),
196+
);
197+
}
196198
}
197199
}
198200

bottlecap/src/traces/trace_processor.rs

Lines changed: 86 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ pub trait TraceProcessor {
317317
traces: Vec<Vec<pb::Span>>,
318318
body_size: usize,
319319
span_pointers: Option<Vec<SpanPointer>>,
320-
) -> (SendDataBuilderInfo, TracerPayloadCollection);
320+
) -> (Option<SendDataBuilderInfo>, TracerPayloadCollection);
321321
}
322322

323323
#[async_trait]
@@ -330,7 +330,7 @@ impl TraceProcessor for ServerlessTraceProcessor {
330330
traces: Vec<Vec<pb::Span>>,
331331
body_size: usize,
332332
span_pointers: Option<Vec<SpanPointer>>,
333-
) -> (SendDataBuilderInfo, TracerPayloadCollection) {
333+
) -> (Option<SendDataBuilderInfo>, TracerPayloadCollection) {
334334
let mut payload = trace_utils::collect_pb_trace_chunks(
335335
traces,
336336
&header_tags,
@@ -390,6 +390,10 @@ impl TraceProcessor for ServerlessTraceProcessor {
390390
});
391391
}
392392
tracer_payloads.retain(|tp| !tp.chunks.is_empty());
393+
if tracer_payloads.is_empty() {
394+
debug!("TRACE_PROCESSOR | All traces were sampled out, skipping backend send.");
395+
return (None, payloads_for_stats);
396+
}
393397
}
394398

395399
let owned_header_tags = OwnedTracerHeaderTags::from(header_tags.clone());
@@ -405,7 +409,7 @@ impl TraceProcessor for ServerlessTraceProcessor {
405409
));
406410

407411
(
408-
SendDataBuilderInfo::new(builder, body_size, owned_header_tags),
412+
Some(SendDataBuilderInfo::new(builder, body_size, owned_header_tags)),
409413
payloads_for_stats,
410414
)
411415
}
@@ -506,7 +510,9 @@ impl SendingTraceProcessor {
506510
error!("TRACE_PROCESSOR | Error sending traces to the stats concentrator: {err}");
507511
}
508512

509-
self.trace_tx.send(payload).await?;
513+
if let Some(payload) = payload {
514+
self.trace_tx.send(payload).await?;
515+
}
510516
Ok(())
511517
}
512518
}
@@ -639,6 +645,7 @@ mod tests {
639645
100,
640646
None,
641647
);
648+
let tracer_payload = tracer_payload.expect("expected Some payload");
642649

643650
let expected_tracer_payload = pb::TracerPayload {
644651
container_id: "33".to_string(),
@@ -1139,6 +1146,7 @@ mod tests {
11391146

11401147
let (payload_info, stats_collection) =
11411148
processor.process_traces(config, tags_provider, header_tags, traces, 0, None);
1149+
let payload_info = payload_info.expect("expected Some payload");
11421150

11431151
// Stats collection must include all three traces
11441152
let TracerPayloadCollection::V07(ref stats_payloads) = stats_collection else {
@@ -1167,4 +1175,78 @@ mod tests {
11671175
"backend must only include kept traces"
11681176
);
11691177
}
1178+
1179+
/// Verifies that `process_traces` returns `None` for the backend payload when all
1180+
/// traces are sampled out and `compute_trace_stats_on_extension` is true.
1181+
#[test]
1182+
fn test_process_traces_returns_none_when_all_sampled_out() {
1183+
use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig;
1184+
1185+
let config = Arc::new(Config {
1186+
apm_dd_url: "https://trace.agent.datadoghq.com".to_string(),
1187+
compute_trace_stats_on_extension: true,
1188+
..Config::default()
1189+
});
1190+
let tags_provider = Arc::new(Provider::new(
1191+
config.clone(),
1192+
"lambda".to_string(),
1193+
&std::collections::HashMap::from([(
1194+
"function_arn".to_string(),
1195+
"test-arn".to_string(),
1196+
)]),
1197+
));
1198+
let processor = ServerlessTraceProcessor {
1199+
obfuscation_config: Arc::new(
1200+
ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"),
1201+
),
1202+
};
1203+
let header_tags = tracer_header_tags::TracerHeaderTags {
1204+
lang: "rust",
1205+
lang_version: "1.0",
1206+
lang_interpreter: "",
1207+
lang_vendor: "",
1208+
tracer_version: "1.0",
1209+
container_id: "",
1210+
client_computed_top_level: false,
1211+
client_computed_stats: false,
1212+
dropped_p0_traces: 0,
1213+
dropped_p0_spans: 0,
1214+
};
1215+
1216+
let make_dropped_span = |trace_id: u64| -> pb::Span {
1217+
let mut metrics = HashMap::new();
1218+
metrics.insert("_sampling_priority_v1".to_string(), -1.0_f64);
1219+
pb::Span {
1220+
trace_id,
1221+
span_id: trace_id,
1222+
parent_id: 0,
1223+
metrics,
1224+
service: "svc".to_string(),
1225+
name: "op".to_string(),
1226+
resource: "res".to_string(),
1227+
..Default::default()
1228+
}
1229+
};
1230+
1231+
let traces = vec![
1232+
vec![make_dropped_span(1)],
1233+
vec![make_dropped_span(2)],
1234+
];
1235+
1236+
let (payload, stats_collection) =
1237+
processor.process_traces(config, tags_provider, header_tags, traces, 0, None);
1238+
1239+
assert!(payload.is_none(), "backend payload must be None when all traces are sampled out");
1240+
1241+
// Stats collection must still include both traces
1242+
let TracerPayloadCollection::V07(ref stats_payloads) = stats_collection else {
1243+
panic!("expected V07");
1244+
};
1245+
let stats_span_count: usize = stats_payloads
1246+
.iter()
1247+
.flat_map(|tp| tp.chunks.iter())
1248+
.map(|c| c.spans.len())
1249+
.sum();
1250+
assert_eq!(stats_span_count, 2, "stats must include all traces even when all are sampled out");
1251+
}
11701252
}

0 commit comments

Comments
 (0)