diff --git a/libdd-data-pipeline/src/stats_exporter.rs b/libdd-data-pipeline/src/stats_exporter.rs index b1b235c995..db7cd0cadc 100644 --- a/libdd-data-pipeline/src/stats_exporter.rs +++ b/libdd-data-pipeline/src/stats_exporter.rs @@ -159,22 +159,26 @@ fn encode_stats_payload( ) -> pb::ClientStatsPayload { pb::ClientStatsPayload { hostname: meta.hostname.clone(), - env: meta.env.clone(), - lang: meta.language.clone(), + env: if meta.env.is_empty() { + "unknown-env".to_string() + } else { + meta.env.clone() + }, version: meta.app_version.clone(), runtime_id: meta.runtime_id.clone(), - tracer_version: meta.tracer_version.clone(), sequence, + service: meta.service.clone(), stats: buckets, git_commit_sha: meta.git_commit_sha.clone(), process_tags: meta.process_tags.clone(), - // These fields are unused or will be set by the Agent - service: String::new(), + // These fields will be set by the Agent container_id: String::new(), tags: Vec::new(), agent_aggregation: String::new(), image_tag: String::new(), process_tags_hash: 0, + lang: String::new(), + tracer_version: String::new(), } } @@ -387,4 +391,28 @@ mod tests { "Expected max retry attempts" ); } + + #[test] + fn test_encode_stats_payload_defaults_empty_env() { + // Test that empty env defaults to "unknown-env" + let mut meta_with_empty_env = get_test_metadata(); + meta_with_empty_env.env = "".to_string(); + + let buckets = vec![]; + let payload = encode_stats_payload(&meta_with_empty_env, 1, buckets.clone()); + + assert_eq!( + payload.env, "unknown-env", + "Empty env should default to 'unknown-env'" + ); + + // Test that non-empty env is preserved + let meta_with_env = get_test_metadata(); + let payload_with_env = encode_stats_payload(&meta_with_env, 2, buckets); + + assert_eq!( + payload_with_env.env, "test", + "Non-empty env should be preserved" + ); + } } diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index 443e9eeb6a..693473e0b8 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -206,13 +206,16 @@ impl<'a> BorrowedAggregationKey<'a> { /// key. pub(super) fn from_span>(span: &'a T, peer_tag_keys: &'a [String]) -> Self { let span_kind = span.get_meta(TAG_SPANKIND).unwrap_or_default(); - let peer_tags = if client_or_producer(span_kind) { + let peer_tags = if should_track_peer_tags(span_kind) { // Parse the meta tags of the span and return a list of the peer tags based on the list // of `peer_tag_keys` peer_tag_keys .iter() .filter_map(|key| Some(((key.as_str()), (span.get_meta(key.as_str())?)))) .collect() + } else if let Some(base_service) = span.get_meta("_dd.base_service") { + // Internal spans with a base service override use only _dd.base_service as peer tag + vec![("_dd.base_service", base_service)] } else { vec![] }; @@ -279,14 +282,14 @@ impl From for OwnedAggregationKey { } } -/// Return true if the span kind is "client" or "producer" -fn client_or_producer(span_kind: T) -> bool +/// Return true if we care about peer tags on the span +fn should_track_peer_tags(span_kind: T) -> bool where T: SpanText, { matches!( span_kind.borrow().to_lowercase().as_str(), - "client" | "producer" + "client" | "producer" | "consumer" ) } diff --git a/libdd-trace-stats/src/span_concentrator/tests.rs b/libdd-trace-stats/src/span_concentrator/tests.rs index bd5883d32c..0a45bb3151 100644 --- a/libdd-trace-stats/src/span_concentrator/tests.rs +++ b/libdd-trace-stats/src/span_concentrator/tests.rs @@ -877,6 +877,173 @@ fn test_peer_tags_aggregation() { ); } +/// Test that internal spans with _dd.base_service use it as their sole peer tag +#[test] +fn test_base_service_peer_tag() { + let now = SystemTime::now(); + let mut spans = vec![ + // Regular internal span without base_service (no peer tags) + get_test_span_with_meta( + now, + 1, + 0, + 100, + 5, + "A1", + "internal.operation", + 0, + &[], + &[("_dd.measured", 1.0)], + ), + // Internal span with _dd.base_service (should have base_service as peer tag) + get_test_span_with_meta( + now, + 2, + 0, + 75, + 5, + "A1", + "internal.with.base.service", + 0, + &[("_dd.base_service", "original-service")], + &[("_dd.measured", 1.0)], + ), + // Another internal span with same _dd.base_service (should aggregate together) + get_test_span_with_meta( + now, + 3, + 0, + 50, + 5, + "A1", + "internal.with.base.service", + 0, + &[("_dd.base_service", "original-service")], + &[("_dd.measured", 1.0)], + ), + // Internal span with different _dd.base_service (should be separate group) + get_test_span_with_meta( + now, + 4, + 0, + 60, + 5, + "A1", + "internal.with.base.service", + 0, + &[("_dd.base_service", "other-service")], + &[("_dd.measured", 1.0)], + ), + // Client span with _dd.base_service and other peer tags enabled + // (should use configured peer tags, not base_service) + get_test_span_with_meta( + now, + 5, + 0, + 80, + 5, + "A1", + "SELECT * FROM users", + 0, + &[ + ("span.kind", "client"), + ("_dd.base_service", "ignored-for-client"), + ("db.instance", "i-1234"), + ("db.system", "postgres"), + ], + &[("_dd.measured", 1.0)], + ), + ]; + compute_top_level_span(spans.as_mut_slice()); + + let mut concentrator = SpanConcentrator::new( + Duration::from_nanos(BUCKET_SIZE), + now, + get_span_kinds(), + vec!["db.instance".to_string(), "db.system".to_string()], + ); + + for span in &spans { + concentrator.add_span(span); + } + + let flushtime = + now + Duration::from_nanos(concentrator.bucket_size * concentrator.buffer_len as u64); + + let expected = vec![ + // Internal span without base_service - no peer tags + pb::ClientGroupedStats { + service: "A1".to_string(), + resource: "internal.operation".to_string(), + r#type: "db".to_string(), + name: "query".to_string(), + duration: 100, + hits: 1, + top_level_hits: 1, + errors: 0, + is_trace_root: pb::Trilean::True.into(), + ..Default::default() + }, + // Internal spans with _dd.base_service="original-service" - aggregated with base_service + // peer tag + pb::ClientGroupedStats { + service: "A1".to_string(), + resource: "internal.with.base.service".to_string(), + r#type: "db".to_string(), + name: "query".to_string(), + peer_tags: vec!["_dd.base_service:original-service".to_string()], + duration: 125, + hits: 2, + top_level_hits: 2, + errors: 0, + is_trace_root: pb::Trilean::True.into(), + ..Default::default() + }, + // Internal span with _dd.base_service="other-service" - separate group + pb::ClientGroupedStats { + service: "A1".to_string(), + resource: "internal.with.base.service".to_string(), + r#type: "db".to_string(), + name: "query".to_string(), + peer_tags: vec!["_dd.base_service:other-service".to_string()], + duration: 60, + hits: 1, + top_level_hits: 1, + errors: 0, + is_trace_root: pb::Trilean::True.into(), + ..Default::default() + }, + // Client span - uses configured peer tags, not base_service + pb::ClientGroupedStats { + service: "A1".to_string(), + resource: "SELECT * FROM users".to_string(), + r#type: "db".to_string(), + name: "query".to_string(), + span_kind: "client".to_string(), + peer_tags: vec![ + "db.instance:i-1234".to_string(), + "db.system:postgres".to_string(), + ], + duration: 80, + hits: 1, + top_level_hits: 1, + errors: 0, + is_trace_root: pb::Trilean::True.into(), + ..Default::default() + }, + ]; + + let stats = concentrator.flush(flushtime, false); + assert_counts_equal( + expected, + stats + .first() + .expect("There should be at least one time bucket") + .stats + .clone(), + ); +} + #[test] fn test_compute_stats_for_span_kind() { let test_cases: Vec<(SpanSlice, bool)> = vec![