Skip to content

Commit 62181a8

Browse files
committed
implement span derived primary tags
1 parent 102231d commit 62181a8

File tree

7 files changed

+275
-15
lines changed

7 files changed

+275
-15
lines changed

libdd-data-pipeline/src/stats_exporter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ mod tests {
232232
SystemTime::now() - BUCKETS_DURATION * 3,
233233
vec![],
234234
vec![],
235+
vec![],
235236
);
236237
let mut trace = vec![];
237238

libdd-data-pipeline/src/trace_exporter/stats.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ pub(crate) fn start_stats_computation(
7272
std::time::SystemTime::now(),
7373
span_kinds,
7474
peer_tags,
75+
vec![], // TODO: Add span-derived primary tags
7576
)));
7677
let cancellation_token = CancellationToken::new();
7778
create_and_start_stats_worker(

libdd-trace-stats/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ let mut concentrator = SpanConcentrator::new(
5353
SystemTime::now(),
5454
vec!["client".to_string(), "server".to_string()], // eligible span kinds
5555
vec!["peer.service".to_string()], // peer tag keys
56+
vec!["example.key".to_string()], // span derived primary tag keys
5657
);
5758

5859
// Add spans

libdd-trace-stats/benches/span_concentrator_bench.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
4747
now,
4848
vec![],
4949
vec!["db_name".into(), "bucket_s3".into()],
50+
vec![],
5051
);
5152
let mut spans = vec![];
5253
for trace_id in 1..100 {

libdd-trace-stats/src/span_concentrator/aggregation.rs

Lines changed: 100 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub(super) struct BorrowedAggregationKey<'a> {
3939
http_endpoint: &'a str,
4040
grpc_status_code: Option<u8>,
4141
service_source: &'a str,
42+
span_derived_primary_tags: Vec<(&'a str, &'a str)>,
4243
}
4344

4445
impl hashbrown::Equivalent<OwnedAggregationKey> for BorrowedAggregationKey<'_> {
@@ -59,6 +60,7 @@ impl hashbrown::Equivalent<OwnedAggregationKey> for BorrowedAggregationKey<'_> {
5960
http_endpoint,
6061
grpc_status_code,
6162
service_source,
63+
span_derived_primary_tags,
6264
}: &OwnedAggregationKey,
6365
) -> bool {
6466
self.resource_name == resource_name
@@ -79,6 +81,12 @@ impl hashbrown::Equivalent<OwnedAggregationKey> for BorrowedAggregationKey<'_> {
7981
&& self.http_endpoint == http_endpoint
8082
&& self.grpc_status_code == *grpc_status_code
8183
&& self.service_source == service_source
84+
&& self.span_derived_primary_tags.len() == span_derived_primary_tags.len()
85+
&& self
86+
.span_derived_primary_tags
87+
.iter()
88+
.zip(span_derived_primary_tags.iter())
89+
.all(|((k1, v1), (k2, v2))| k1 == k2 && v1 == v2)
8290
}
8391
}
8492

@@ -104,6 +112,7 @@ pub(super) struct OwnedAggregationKey {
104112
http_endpoint: String,
105113
grpc_status_code: Option<u8>,
106114
service_source: String,
115+
span_derived_primary_tags: Vec<(String, String)>,
107116
}
108117

109118
impl From<&BorrowedAggregationKey<'_>> for OwnedAggregationKey {
@@ -126,6 +135,11 @@ impl From<&BorrowedAggregationKey<'_>> for OwnedAggregationKey {
126135
http_endpoint: value.http_endpoint.to_owned(),
127136
grpc_status_code: value.grpc_status_code,
128137
service_source: value.service_source.to_owned(),
138+
span_derived_primary_tags: value
139+
.span_derived_primary_tags
140+
.iter()
141+
.map(|(k, v)| (k.to_string(), v.to_string()))
142+
.collect(),
129143
}
130144
}
131145
}
@@ -208,9 +222,16 @@ fn grpc_status_str_to_int_value(v: &str) -> Option<u8> {
208222
impl<'a> BorrowedAggregationKey<'a> {
209223
/// Return an AggregationKey matching the given span.
210224
///
211-
/// If `peer_tags_keys` is not empty then the peer tags of the span will be included in the
225+
/// If `peer_tag_keys` is not empty then the peer tags of the span will be included in the
212226
/// key.
213-
pub(super) fn from_span<T: StatSpan<'a>>(span: &'a T, peer_tag_keys: &'a [String]) -> Self {
227+
///
228+
/// If `span_derived_primary_tag_keys` is not empty then matching span tags keys are included
229+
/// in the key.
230+
pub(super) fn from_span<T: StatSpan<'a>>(
231+
span: &'a T,
232+
peer_tag_keys: &'a [String],
233+
span_derived_primary_tag_keys: &'a [String],
234+
) -> Self {
214235
let span_kind = span.get_meta(TAG_SPANKIND).unwrap_or_default();
215236
let peer_tags = if should_track_peer_tags(span_kind) {
216237
// Parse the meta tags of the span and return a list of the peer tags based on the list
@@ -245,6 +266,11 @@ impl<'a> BorrowedAggregationKey<'a> {
245266

246267
let service_source = span.get_meta(TAG_SVC_SRC).unwrap_or_default();
247268

269+
let span_derived_primary_tags: Vec<(&'a str, &'a str)> = span_derived_primary_tag_keys
270+
.iter()
271+
.filter_map(|key| Some(((key.as_str()), (span.get_meta(key.as_str())?))))
272+
.collect();
273+
248274
Self {
249275
resource_name: span.resource(),
250276
service_name: span.service(),
@@ -261,6 +287,7 @@ impl<'a> BorrowedAggregationKey<'a> {
261287
http_endpoint,
262288
grpc_status_code,
263289
service_source,
290+
span_derived_primary_tags,
264291
}
265292
}
266293
}
@@ -288,6 +315,14 @@ impl From<pb::ClientGroupedStats> for OwnedAggregationKey {
288315
http_endpoint: value.http_endpoint,
289316
grpc_status_code: value.grpc_status_code.parse().ok(),
290317
service_source: value.service_source,
318+
span_derived_primary_tags: value
319+
.span_derived_primary_tags
320+
.into_iter()
321+
.filter_map(|t| {
322+
let (key, value) = t.split_once(':')?;
323+
Some((key.to_string(), value.to_string()))
324+
})
325+
.collect(),
291326
}
292327
}
293328
}
@@ -418,7 +453,11 @@ fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::Cl
418453
.map(|c| c.to_string())
419454
.unwrap_or_default(),
420455
service_source: key.service_source,
421-
span_derived_primary_tags: vec![], // Todo
456+
span_derived_primary_tags: key
457+
.span_derived_primary_tags
458+
.into_iter()
459+
.map(|(k, v)| format!("{k}:{v}"))
460+
.collect(),
422461
}
423462
}
424463

@@ -820,6 +859,51 @@ mod tests {
820859
),
821860
];
822861

862+
let test_primary_tag_keys = vec!["region".to_string(), "env".to_string()];
863+
let test_cases_primary_tags: Vec<(SpanSlice, OwnedAggregationKey)> = vec![
864+
(
865+
SpanSlice {
866+
service: "service",
867+
name: "op",
868+
resource: "res",
869+
span_id: 1,
870+
parent_id: 0,
871+
meta: HashMap::from([("region", "us1"), ("env", "prod")]),
872+
..Default::default()
873+
},
874+
OwnedAggregationKey {
875+
service_name: "service".into(),
876+
operation_name: "op".into(),
877+
resource_name: "res".into(),
878+
is_trace_root: true,
879+
span_derived_primary_tags: vec![
880+
("region".into(), "us1".into()),
881+
("env".into(), "prod".into()),
882+
],
883+
..Default::default()
884+
},
885+
),
886+
(
887+
SpanSlice {
888+
service: "service",
889+
name: "op",
890+
resource: "res",
891+
span_id: 1,
892+
parent_id: 0,
893+
meta: HashMap::from([("region", "us1")]),
894+
..Default::default()
895+
},
896+
OwnedAggregationKey {
897+
service_name: "service".into(),
898+
operation_name: "op".into(),
899+
resource_name: "res".into(),
900+
is_trace_root: true,
901+
span_derived_primary_tags: vec![("region".into(), "us1".into())],
902+
..Default::default()
903+
},
904+
),
905+
];
906+
823907
let test_peer_tags = vec![
824908
"aws.s3.bucket".to_string(),
825909
"db.instance".to_string(),
@@ -907,7 +991,7 @@ mod tests {
907991
];
908992

909993
for (span, expected_key) in test_cases {
910-
let borrowed_key = BorrowedAggregationKey::from_span(&span, &[]);
994+
let borrowed_key = BorrowedAggregationKey::from_span(&span, &[], &[]);
911995
assert_eq!(
912996
OwnedAggregationKey::from(&borrowed_key),
913997
expected_key,
@@ -919,8 +1003,19 @@ mod tests {
9191003
);
9201004
}
9211005

1006+
for (span, expected_key) in test_cases_primary_tags {
1007+
let borrowed_key =
1008+
BorrowedAggregationKey::from_span(&span, &[], test_primary_tag_keys.as_slice());
1009+
assert_eq!(OwnedAggregationKey::from(&borrowed_key), expected_key);
1010+
assert_eq!(
1011+
get_hash(&borrowed_key),
1012+
get_hash(&OwnedAggregationKey::from(&borrowed_key))
1013+
);
1014+
}
1015+
9221016
for (span, expected_key) in test_cases_with_peer_tags {
923-
let borrowed_key = BorrowedAggregationKey::from_span(&span, test_peer_tags.as_slice());
1017+
let borrowed_key =
1018+
BorrowedAggregationKey::from_span(&span, test_peer_tags.as_slice(), &[]);
9241019
assert_eq!(OwnedAggregationKey::from(&borrowed_key), expected_key);
9251020
assert_eq!(
9261021
get_hash(&borrowed_key),

libdd-trace-stats/src/span_concentrator/mod.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,19 +66,24 @@ pub struct SpanConcentrator {
6666
span_kinds_stats_computed: Vec<String>,
6767
/// keys for supplementary tags that describe peer.service entities
6868
peer_tag_keys: Vec<String>,
69+
/// keys for second primary tags on trace stats
70+
span_derived_primary_tag_keys: Vec<String>,
6971
}
7072

7173
impl SpanConcentrator {
7274
/// Return a new concentrator with the given parameters
7375
/// - `bucket_size` is the size of the time buckets
7476
/// - `now` the current system time, used to define the oldest bucket
7577
/// - `span_kinds_stats_computed` list of span kinds eligible for stats computation
76-
/// - `peer_tags_keys` list of keys considered as peer tags for aggregation
78+
/// - `peer_tag_keys` list of keys considered as peer tags for aggregation
79+
/// - `span_derived_primary_tag_keys` list of keys considered as second primary tags for
80+
/// aggregation
7781
pub fn new(
7882
bucket_size: Duration,
7983
now: SystemTime,
8084
span_kinds_stats_computed: Vec<String>,
8185
peer_tag_keys: Vec<String>,
86+
span_derived_primary_tag_keys: Vec<String>,
8287
) -> SpanConcentrator {
8388
SpanConcentrator {
8489
bucket_size: bucket_size.as_nanos() as u64,
@@ -90,6 +95,7 @@ impl SpanConcentrator {
9095
buffer_len: 2,
9196
span_kinds_stats_computed,
9297
peer_tag_keys,
98+
span_derived_primary_tag_keys,
9399
}
94100
}
95101

@@ -103,6 +109,11 @@ impl SpanConcentrator {
103109
self.peer_tag_keys = peer_tags;
104110
}
105111

112+
/// Set the list of keys considered as span_derived_primary_tag_keys for aggregation
113+
pub fn set_span_derived_primary_tags(&mut self, tag_keys: Vec<String>) {
114+
self.span_derived_primary_tag_keys = tag_keys;
115+
}
116+
106117
/// Return the bucket size used for aggregation
107118
pub fn get_bucket_size(&self) -> Duration {
108119
Duration::from_nanos(self.bucket_size)
@@ -124,7 +135,11 @@ impl SpanConcentrator {
124135
bucket_timestamp = self.oldest_timestamp;
125136
}
126137

127-
let agg_key = BorrowedAggregationKey::from_span(span, self.peer_tag_keys.as_slice());
138+
let agg_key = BorrowedAggregationKey::from_span(
139+
span,
140+
self.peer_tag_keys.as_slice(),
141+
self.span_derived_primary_tag_keys.as_slice(),
142+
);
128143

129144
self.buckets
130145
.entry(bucket_timestamp)

0 commit comments

Comments
 (0)