Skip to content

Commit 4ae8ebe

Browse files
feat: use ip quantization when aggregating peer tags for trace stats (#1944)
# What does this PR do? Quantize IP addresses for peer tags when aggregating trace stats. # Motivation Needed for agent side stats computation in the Serverless Compatibility Layer (used as the agent in Azure Functions) in DataDog/serverless-components#124. # Additional Notes Anything else we should know when reviewing? # How to test the change? Describe here in detail how the change can be validated. Co-authored-by: Eldolfin <oscar.ledauphin@datadoghq.com>
1 parent 58b86d5 commit 4ae8ebe

4 files changed

Lines changed: 195 additions & 5 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libdd-trace-stats/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ libdd-common = { version = "4.0.0", path = "../libdd-common", default-features =
1616
libdd-ddsketch = { version = "1.0.1", path = "../libdd-ddsketch" }
1717
libdd-shared-runtime = { version = "0.1.0", path = "../libdd-shared-runtime" }
1818
libdd-trace-protobuf = { version = "3.0.1", path = "../libdd-trace-protobuf" }
19+
libdd-trace-obfuscation = { version = "2.0.0", path = "../libdd-trace-obfuscation", default-features = false }
1920
libdd-trace-utils = { version = "3.0.1", path = "../libdd-trace-utils", default-features = false }
2021
hashbrown = { version = "0.15" }
2122
http = "1.1"

libdd-trace-stats/src/span_concentrator/aggregation.rs

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
//! span.
77
88
use hashbrown::HashMap;
9+
use libdd_trace_obfuscation::ip_address::quantize_peer_ip_addresses;
910
use libdd_trace_protobuf::pb;
1011
use libdd_trace_utils::span::SpanText;
11-
use std::borrow::Borrow;
12+
use std::borrow::{Borrow, Cow};
1213

1314
use crate::span_concentrator::StatSpan;
1415

@@ -79,7 +80,7 @@ impl<T> FixedAggregationKey<T> {
7980
/// Represent a stats aggregation key borrowed from span data
8081
pub(super) struct BorrowedAggregationKey<'a> {
8182
fixed: FixedAggregationKey<&'a str>,
82-
peer_tags: Vec<(&'a str, &'a str)>,
83+
peer_tags: Vec<(&'a str, Cow<'a, str>)>,
8384
}
8485

8586
impl hashbrown::Equivalent<OwnedAggregationKey> for BorrowedAggregationKey<'_> {
@@ -205,14 +206,17 @@ impl<'a> BorrowedAggregationKey<'a> {
205206
let span_kind = span.get_meta(TAG_SPANKIND).unwrap_or_default();
206207
let peer_tags = if should_track_peer_tags(span_kind) {
207208
// Parse the meta tags of the span and return a list of the peer tags based on the list
208-
// of `peer_tag_keys`
209+
// of `peer_tag_keys`. IP address values are quantized to reduce cardinality.
209210
peer_tag_keys
210211
.iter()
211-
.filter_map(|key| Some(((key.as_str()), (span.get_meta(key.as_str())?))))
212+
.filter_map(|key| {
213+
let value = span.get_meta(key.as_str())?;
214+
Some((key.as_str(), quantize_peer_ip_addresses(value)))
215+
})
212216
.collect()
213217
} else if let Some(base_service) = span.get_meta("_dd.base_service") {
214218
// Internal spans with a base service override use only _dd.base_service as peer tag
215-
vec![("_dd.base_service", base_service)]
219+
vec![("_dd.base_service", Cow::Borrowed(base_service))]
216220
} else {
217221
vec![]
218222
};
@@ -961,6 +965,80 @@ mod tests {
961965
}
962966
}
963967

968+
#[test]
969+
fn test_peer_tag_ip_quantization_in_aggregation_key() {
970+
let peer_tag_keys = vec!["peer.hostname".to_string(), "db.instance".to_string()];
971+
972+
// IPv4 address peer tag gets replaced with blocked-ip-address
973+
let span_ipv4 = SpanSlice {
974+
service: "service",
975+
name: "op",
976+
resource: "res",
977+
span_id: 1,
978+
parent_id: 0,
979+
meta: HashMap::from([
980+
("span.kind", "client"),
981+
("peer.hostname", "10.1.2.3"),
982+
("db.instance", "my-db"),
983+
]),
984+
..Default::default()
985+
};
986+
let key = BorrowedAggregationKey::from_span(&span_ipv4, &peer_tag_keys);
987+
let owned = OwnedAggregationKey::from(&key);
988+
assert_eq!(
989+
owned.peer_tags,
990+
vec![
991+
(
992+
"peer.hostname".to_string(),
993+
"blocked-ip-address".to_string()
994+
),
995+
("db.instance".to_string(), "my-db".to_string()),
996+
]
997+
);
998+
999+
// IPv6 address peer tag gets replaced with blocked-ip-address
1000+
let span_ipv6 = SpanSlice {
1001+
service: "service",
1002+
name: "op",
1003+
resource: "res",
1004+
span_id: 1,
1005+
parent_id: 0,
1006+
meta: HashMap::from([
1007+
("span.kind", "client"),
1008+
("peer.hostname", "2001:db8:3333:4444:CCCC:DDDD:EEEE:FFFF"),
1009+
]),
1010+
..Default::default()
1011+
};
1012+
let ipv6_keys = vec!["peer.hostname".to_string()];
1013+
let key = BorrowedAggregationKey::from_span(&span_ipv6, &ipv6_keys);
1014+
let owned = OwnedAggregationKey::from(&key);
1015+
assert_eq!(
1016+
owned.peer_tags,
1017+
vec![(
1018+
"peer.hostname".to_string(),
1019+
"blocked-ip-address".to_string()
1020+
)]
1021+
);
1022+
1023+
// Non-IP peer tags pass through unchanged
1024+
let span_non_ip = SpanSlice {
1025+
service: "service",
1026+
name: "op",
1027+
resource: "res",
1028+
span_id: 1,
1029+
parent_id: 0,
1030+
meta: HashMap::from([("span.kind", "client"), ("db.instance", "dynamo.test.us1")]),
1031+
..Default::default()
1032+
};
1033+
let non_ip_keys = vec!["db.instance".to_string()];
1034+
let key = BorrowedAggregationKey::from_span(&span_non_ip, &non_ip_keys);
1035+
let owned = OwnedAggregationKey::from(&key);
1036+
assert_eq!(
1037+
owned.peer_tags,
1038+
vec![("db.instance".to_string(), "dynamo.test.us1".to_string())]
1039+
);
1040+
}
1041+
9641042
#[test]
9651043
fn test_grpc_status_str_to_int_value() {
9661044
// Numeric strings parse directly

libdd-trace-stats/src/span_concentrator/tests.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -877,6 +877,116 @@ fn test_peer_tags_aggregation() {
877877
);
878878
}
879879

880+
/// Test that spans differing only by peer-tag IPs aggregate after IP quantization
881+
#[test]
882+
fn test_peer_tags_quantization_aggregation() {
883+
let now = SystemTime::now();
884+
let mut spans = vec![
885+
get_test_span_with_meta(
886+
now,
887+
2,
888+
1,
889+
75,
890+
5,
891+
"A1",
892+
"SELECT user_id from users WHERE user_name = ?",
893+
0,
894+
&[
895+
("span.kind", "client"),
896+
("db.instance", "i-1234"),
897+
("db.system", "postgres"),
898+
("region", "us1"),
899+
("peer.hostname", "10.1.2.3"),
900+
],
901+
&[("_dd.measured", 1.0)],
902+
),
903+
get_test_span_with_meta(
904+
now,
905+
3,
906+
1,
907+
75,
908+
5,
909+
"A1",
910+
"SELECT user_id from users WHERE user_name = ?",
911+
0,
912+
&[
913+
("span.kind", "client"),
914+
("db.instance", "i-1234"),
915+
("db.system", "postgres"),
916+
("region", "us1"),
917+
("peer.hostname", "10.1.2.4"),
918+
],
919+
&[("_dd.measured", 1.0)],
920+
),
921+
get_test_span_with_meta(
922+
now,
923+
4,
924+
1,
925+
75,
926+
5,
927+
"A1",
928+
"SELECT user_id from users WHERE user_name = ?",
929+
0,
930+
&[
931+
("span.kind", "client"),
932+
("db.instance", "i-1234"),
933+
("db.system", "postgres"),
934+
("region", "us1"),
935+
("peer.hostname", "2001:db8:3333:4444:CCCC:DDDD:EEEE:FFFF"),
936+
],
937+
&[("_dd.measured", 1.0)],
938+
),
939+
];
940+
compute_top_level_span(spans.as_mut_slice());
941+
let mut concentrator_with_peer_tags = SpanConcentrator::new(
942+
Duration::from_nanos(BUCKET_SIZE),
943+
now,
944+
get_span_kinds(),
945+
vec![
946+
"db.instance".to_string(),
947+
"db.system".to_string(),
948+
"peer.hostname".to_string(),
949+
],
950+
);
951+
for span in &spans {
952+
concentrator_with_peer_tags.add_span(span);
953+
}
954+
955+
let flushtime = now
956+
+ Duration::from_nanos(
957+
concentrator_with_peer_tags.bucket_size * concentrator_with_peer_tags.buffer_len as u64,
958+
);
959+
960+
let expected_with_peer_tags = vec![pb::ClientGroupedStats {
961+
service: "A1".to_string(),
962+
resource: "SELECT user_id from users WHERE user_name = ?".to_string(),
963+
r#type: "db".to_string(),
964+
name: "query".to_string(),
965+
duration: 225,
966+
hits: 3,
967+
top_level_hits: 3,
968+
errors: 0,
969+
is_trace_root: pb::Trilean::False.into(),
970+
span_kind: "client".to_string(),
971+
peer_tags: vec![
972+
"db.instance:i-1234".to_string(),
973+
"db.system:postgres".to_string(),
974+
"peer.hostname:blocked-ip-address".to_string(),
975+
],
976+
..Default::default()
977+
}];
978+
979+
let stats_with_peer_tags = concentrator_with_peer_tags.flush(flushtime, false);
980+
assert_counts_equal(
981+
expected_with_peer_tags,
982+
stats_with_peer_tags
983+
.first()
984+
.expect("There should be at least one time bucket")
985+
.stats
986+
.clone(),
987+
);
988+
}
989+
880990
/// Test that internal spans with _dd.base_service use it as their sole peer tag
881991
#[test]
882992
fn test_base_service_peer_tag() {

0 commit comments

Comments
 (0)