Skip to content

Commit 43d46d6

Browse files
lym953claude
andcommitted
fix(traces): use protobuf encoded_len for body_size after sampling-priority filtering
After filtering sampled-out TraceChunks from the payload, recompute body_size using prost::Message::encoded_len() on the remaining TracerPayloads instead of passing through the original (unfiltered) request body size. This ensures the TraceAggregator's 3.2 MB batch limit is evaluated against only the data that will actually be sent to the backend. Since libdd-trace-protobuf uses prost 0.14 while bottlecap depends on prost 0.13, add prost014 as an aliased direct dependency so the prost 0.14 Message trait is accessible for TracerPayload. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent dda9b26 commit 43d46d6

3 files changed

Lines changed: 120 additions & 2 deletions

File tree

bottlecap/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bottlecap/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ rustls-pki-types = { version = "1.0", default-features = false }
4545
hyper-rustls = { version = "0.27.7", default-features = false }
4646
rand = { version = "0.8", default-features = false }
4747
prost = { version = "0.13", default-features = false }
48+
# libdd-trace-protobuf uses prost 0.14, so we alias it here to call encoded_len() on TracerPayload
49+
prost014 = { package = "prost", version = "0.14", default-features = false }
4850
tonic-types = { version = "0.13", default-features = false }
4951
zstd = { version = "0.13.3", default-features = false }
5052
futures = { version = "0.3.31", default-features = false }

bottlecap/src/traces/trace_processor.rs

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use tracing::{debug, error};
3434
use crate::traces::stats_generator::StatsGenerator;
3535
use crate::traces::trace_aggregator::{OwnedTracerHeaderTags, SendDataBuilderInfo};
3636
use libdd_trace_normalization::normalizer::SamplerPriority;
37+
use prost014::Message as _;
3738

3839
#[derive(Clone)]
3940
#[allow(clippy::module_name_repetitions)]
@@ -379,7 +380,7 @@ impl TraceProcessor for ServerlessTraceProcessor {
379380
// stats are still counted. SamplerPriority::None (-128) means no explicit priority
380381
// was set and the trace is kept; drop priorities are SamplerPriority::AutoDrop (0)
381382
// and UserDrop (-1, not represented in SamplerPriority).
382-
if config.compute_trace_stats_on_extension
383+
let body_size = if config.compute_trace_stats_on_extension
383384
&& let TracerPayloadCollection::V07(ref mut tracer_payloads) = payload
384385
{
385386
for tp in tracer_payloads.iter_mut() {
@@ -391,7 +392,14 @@ impl TraceProcessor for ServerlessTraceProcessor {
391392
if tracer_payloads.is_empty() {
392393
return (None, payloads_for_stats);
393394
}
394-
}
395+
396+
// Use the protobuf-encoded size of the filtered payload so the
397+
// TraceAggregator's 3.2 MB batch limit reflects only the data that
398+
// will actually be sent to the backend.
399+
tracer_payloads.iter().map(|tp| tp.encoded_len()).sum()
400+
} else {
401+
body_size
402+
};
395403

396404
let owned_header_tags = OwnedTracerHeaderTags::from(header_tags.clone());
397405

@@ -1251,4 +1259,111 @@ mod tests {
12511259
"stats must include all traces even when all are sampled out"
12521260
);
12531261
}
1262+
1263+
/// Verifies that body_size in the returned SendDataBuilderInfo reflects the
1264+
/// protobuf-encoded size of the filtered payload, not the original request body.
1265+
#[test]
1266+
fn test_process_traces_body_size_reflects_filtered_payload() {
1267+
use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig;
1268+
use prost014::Message as _;
1269+
1270+
let config = Arc::new(Config {
1271+
apm_dd_url: "https://trace.agent.datadoghq.com".to_string(),
1272+
compute_trace_stats_on_extension: true,
1273+
..Config::default()
1274+
});
1275+
let tags_provider = Arc::new(Provider::new(
1276+
config.clone(),
1277+
"lambda".to_string(),
1278+
&std::collections::HashMap::from([(
1279+
"function_arn".to_string(),
1280+
"test-arn".to_string(),
1281+
)]),
1282+
));
1283+
let processor = ServerlessTraceProcessor {
1284+
obfuscation_config: Arc::new(
1285+
ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"),
1286+
),
1287+
};
1288+
let header_tags = tracer_header_tags::TracerHeaderTags {
1289+
lang: "rust",
1290+
lang_version: "1.0",
1291+
lang_interpreter: "",
1292+
lang_vendor: "",
1293+
tracer_version: "1.0",
1294+
container_id: "",
1295+
client_computed_top_level: false,
1296+
client_computed_stats: false,
1297+
dropped_p0_traces: 0,
1298+
dropped_p0_spans: 0,
1299+
};
1300+
1301+
let make_span = |trace_id: u64, priority: f64| -> pb::Span {
1302+
let mut metrics = HashMap::new();
1303+
metrics.insert("_sampling_priority_v1".to_string(), priority);
1304+
pb::Span {
1305+
trace_id,
1306+
span_id: trace_id,
1307+
parent_id: 0,
1308+
metrics,
1309+
service: "svc".to_string(),
1310+
name: "op".to_string(),
1311+
resource: "res".to_string(),
1312+
..Default::default()
1313+
}
1314+
};
1315+
1316+
// 1 kept trace, 3 dropped traces; original body_size is intentionally large
1317+
let traces = vec![
1318+
vec![make_span(1, 1.0)],
1319+
vec![make_span(2, -1.0)],
1320+
vec![make_span(3, -1.0)],
1321+
vec![make_span(4, -1.0)],
1322+
];
1323+
1324+
let (payload_info, stats_collection) =
1325+
processor.process_traces(config, tags_provider, header_tags, traces, 999_999, None);
1326+
1327+
let info = payload_info.expect("expected Some payload");
1328+
1329+
// The reported size must equal the sum of encoded_len() of the kept TracerPayloads.
1330+
// stats_collection has all 4 traces. Reconstruct the filtered payload (only trace_id=1
1331+
// was kept with priority=1) and compute its encoded_len.
1332+
let TracerPayloadCollection::V07(ref all_payloads) = stats_collection else {
1333+
panic!("expected V07");
1334+
};
1335+
let expected_size: usize = all_payloads
1336+
.iter()
1337+
.filter_map(|tp| {
1338+
let kept_chunks: Vec<pb::TraceChunk> = tp
1339+
.chunks
1340+
.iter()
1341+
.filter(|c| c.spans.iter().any(|s| s.trace_id == 1))
1342+
.cloned()
1343+
.collect();
1344+
if kept_chunks.is_empty() {
1345+
None
1346+
} else {
1347+
Some(pb::TracerPayload {
1348+
chunks: kept_chunks,
1349+
..tp.clone()
1350+
})
1351+
}
1352+
})
1353+
.map(|tp| tp.encoded_len())
1354+
.sum();
1355+
1356+
assert!(
1357+
expected_size > 0,
1358+
"expected_size must be non-zero for a non-empty payload"
1359+
);
1360+
assert_eq!(
1361+
info.size, expected_size,
1362+
"body_size must equal protobuf encoded_len of the filtered payload"
1363+
);
1364+
assert!(
1365+
info.size < 999_999,
1366+
"body_size must be smaller than the original unfiltered request size"
1367+
);
1368+
}
12541369
}

0 commit comments

Comments
 (0)