Skip to content

Commit 674ae04

Browse files
committed
add integration tests
1 parent e107a3a commit 674ae04

5 files changed

Lines changed: 268 additions & 2 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/datadog-trace-agent/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ reqwest = { version = "0.12.23", features = [
4343
bytes = "1.10.1"
4444

4545
[dev-dependencies]
46+
flate2 = "1"
4647
rmp-serde = "1.1.1"
4748
serial_test = "2.0.0"
4849
duplicate = "2.0.1"

crates/datadog-trace-agent/src/trace_processor.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ mod tests {
224224

225225
use crate::{
226226
config::{Config, Tags},
227+
peer_tags::peer_tag_keys,
227228
trace_processor::{self, TRACER_PAYLOAD_FUNCTION_TAGS_TAG_KEY, TraceProcessor},
228229
};
229230
use libdd_common::{Endpoint, http_common};
@@ -276,7 +277,7 @@ mod tests {
276277
},
277278
tags: Tags::from_env_string("env:test,service:my-service"),
278279
env: "test-env".to_string(),
279-
peer_tags: vec![],
280+
peer_tags: peer_tag_keys().unwrap(),
280281
agent_stats_computation_enabled: false,
281282
}
282283
}

crates/datadog-trace-agent/tests/common/helpers.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33

44
//! Helper functions for integration tests
55
6+
use flate2::read::GzDecoder;
67
use hyper::{Request, Response};
78
use hyper_util::rt::TokioIo;
89
use libdd_common::http_common;
10+
use libdd_trace_protobuf::pb;
911
use libdd_trace_utils::test_utils::create_test_json_span;
12+
use serde_json::json;
13+
use std::io::Read;
1014
use std::time::{Duration, UNIX_EPOCH};
1115
use tokio::time::timeout;
1216

@@ -17,6 +21,44 @@ pub fn create_test_trace_payload() -> Vec<u8> {
1721
rmp_serde::to_vec(&vec![vec![json_span]]).expect("Failed to serialize test trace")
1822
}
1923

24+
/// Create a trace payload with a root span and two non-top-level, non-measured child spans:
25+
/// - a `"server"` child, which is eligible for stats only via `span_kinds_stats_computed`
26+
/// - an `"internal"` child, which is never eligible regardless of configuration
27+
pub fn create_trace_with_span_kind_children_payload() -> Vec<u8> {
28+
let start = UNIX_EPOCH.elapsed().unwrap().as_nanos() as i64;
29+
let root = create_test_json_span(300, 301, 0, start, false);
30+
let mut server_child = create_test_json_span(300, 302, 301, start, false);
31+
server_child["name"] = json!("server_op");
32+
server_child["meta"]["span.kind"] = json!("server");
33+
let mut internal_child = create_test_json_span(300, 303, 301, start, false);
34+
internal_child["name"] = json!("internal_op");
35+
internal_child["meta"]["span.kind"] = json!("internal");
36+
let payload = rmp_serde::to_vec(&vec![vec![root, server_child, internal_child]])
37+
.expect("Failed to serialize span kind children trace");
38+
payload
39+
}
40+
41+
/// Create a trace payload with a single client span that carries a `peer.service` peer tag.
42+
/// The span is a trace root (parent_id == 0) so it is both top-level and eligible due to span.kind.
43+
pub fn create_client_span_with_peer_tag_payload() -> Vec<u8> {
44+
let start = UNIX_EPOCH.elapsed().unwrap().as_nanos() as i64;
45+
let mut span = create_test_json_span(400, 401, 0, start, false);
46+
span["meta"]["span.kind"] = json!("client");
47+
span["meta"]["peer.service"] = json!("my-db");
48+
rmp_serde::to_vec(&vec![vec![span]]).expect("Failed to serialize client peer tag trace")
49+
}
50+
51+
/// Decompress a gzip+msgpack stats payload and deserialize it into a `StatsPayload`.
52+
/// The stats flusher encodes payloads as `gzip(rmp_serde::to_vec_named(StatsPayload))`.
53+
pub fn decode_stats_payload(body: &[u8]) -> pb::StatsPayload {
54+
let mut decoder = GzDecoder::new(body);
55+
let mut decompressed = Vec::new();
56+
decoder
57+
.read_to_end(&mut decompressed)
58+
.expect("Failed to decompress stats payload");
59+
rmp_serde::from_slice(&decompressed).expect("Failed to deserialize stats payload")
60+
}
61+
2062
/// Send an HTTP request over TCP and return the response
2163
pub async fn send_tcp_request(
2264
port: u16,

crates/datadog-trace-agent/tests/integration_test.rs

Lines changed: 222 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,18 @@
33

44
mod common;
55

6-
use common::helpers::{create_test_trace_payload, send_tcp_request};
6+
use common::helpers::{
7+
create_client_span_with_peer_tag_payload, create_test_trace_payload,
8+
create_trace_with_span_kind_children_payload, decode_stats_payload, send_tcp_request,
9+
};
710
use common::mock_server::MockServer;
811
use common::mocks::{MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockTraceFlusher};
912
use datadog_trace_agent::{
1013
config::{Config, test_helpers::create_tcp_test_config},
1114
mini_agent::MiniAgent,
15+
peer_tags::peer_tag_keys,
1216
proxy_flusher::ProxyFlusher,
17+
stats_concentrator_service::SPAN_KINDS_STATS_COMPUTED,
1318
trace_flusher::TraceFlusher,
1419
trace_processor::ServerlessTraceProcessor,
1520
};
@@ -224,6 +229,21 @@ async fn test_mini_agent_tcp_handles_requests() {
224229
"Expected client_drop_p0s to be true"
225230
);
226231

232+
// Check span_kinds_stats_computed
233+
assert_eq!(
234+
json["span_kinds_stats_computed"],
235+
serde_json::json!(SPAN_KINDS_STATS_COMPUTED),
236+
"Expected span_kinds_stats_computed to match SPAN_KINDS_STATS_COMPUTED"
237+
);
238+
239+
// Check peer_tags
240+
let expected_peer_tags = peer_tag_keys().unwrap();
241+
assert_eq!(
242+
json["peer_tags"],
243+
serde_json::json!(expected_peer_tags),
244+
"Expected peer_tags to match peer_tag_keys()"
245+
);
246+
227247
// Check config object
228248
let config = &json["config"];
229249
assert_eq!(
@@ -306,6 +326,39 @@ async fn test_mini_agent_named_pipe_handles_requests() {
306326
let json: Value =
307327
serde_json::from_slice(&body).expect("Failed to parse /info response as JSON");
308328

329+
// Check endpoints array
330+
assert_eq!(
331+
json["endpoints"],
332+
serde_json::json!([
333+
"/v0.4/traces",
334+
"/v0.6/stats",
335+
"/info",
336+
"/profiling/v1/input"
337+
]),
338+
"Expected endpoints array"
339+
);
340+
341+
// Check client_drop_p0s flag
342+
assert_eq!(
343+
json["client_drop_p0s"], true,
344+
"Expected client_drop_p0s to be true"
345+
);
346+
347+
// Check span_kinds_stats_computed
348+
assert_eq!(
349+
json["span_kinds_stats_computed"],
350+
serde_json::json!(SPAN_KINDS_STATS_COMPUTED),
351+
"Expected span_kinds_stats_computed to match SPAN_KINDS_STATS_COMPUTED"
352+
);
353+
354+
// Check peer_tags
355+
let expected_peer_tags = peer_tag_keys().unwrap();
356+
assert_eq!(
357+
json["peer_tags"],
358+
serde_json::json!(expected_peer_tags),
359+
"Expected peer_tags to match peer_tag_keys()"
360+
);
361+
309362
// Check config object specific to named pipe
310363
let config_value = &json["config"];
311364
assert_eq!(
@@ -506,6 +559,174 @@ async fn test_mini_agent_tcp_with_real_flushers_and_tracer_computed_stats() {
506559
agent_handle.abort();
507560
}
508561

562+
/// Verify that `span_kinds_stats_computed` controls which non-top-level, non-measured child spans
563+
/// produce stats.
564+
#[cfg(test)]
565+
#[tokio::test]
566+
#[serial]
567+
async fn test_internal_span_kind_does_not_produce_stats() {
568+
let mock_server = MockServer::start().await;
569+
tokio::time::sleep(Duration::from_millis(50)).await;
570+
571+
let mut config = create_tcp_test_config(8130);
572+
configure_mock_endpoints(&mut config, &mock_server.url());
573+
config.agent_stats_computation_enabled = true;
574+
let config = Arc::new(config);
575+
let test_port = config.dd_apm_receiver_port;
576+
577+
let (mini_agent, stats_concentrator_service_handle) =
578+
create_mini_agent_with_real_flushers(config);
579+
580+
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
581+
let agent_handle = tokio::spawn(async move {
582+
let _ = mini_agent
583+
.start_mini_agent(shutdown_rx, Some(stats_concentrator_service_handle))
584+
.await;
585+
});
586+
587+
let mut server_ready = false;
588+
for _ in 0..20 {
589+
tokio::time::sleep(Duration::from_millis(50)).await;
590+
if let Ok(response) = send_tcp_request(test_port, "/info", "GET", None, &[]).await
591+
&& response.status().is_success()
592+
{
593+
server_ready = true;
594+
break;
595+
}
596+
}
597+
assert!(
598+
server_ready,
599+
"Mini agent server failed to start within timeout"
600+
);
601+
602+
let trace_response = send_tcp_request(
603+
test_port,
604+
"/v0.4/traces",
605+
"POST",
606+
Some(create_trace_with_span_kind_children_payload()),
607+
&[],
608+
)
609+
.await
610+
.expect("Failed to send /v0.4/traces request");
611+
assert_eq!(trace_response.status(), StatusCode::OK);
612+
613+
tokio::time::sleep(FLUSH_WAIT_DURATION).await;
614+
615+
let _ = shutdown_tx.send(());
616+
let _ = agent_handle.await;
617+
618+
let stats_reqs = mock_server.get_requests_for_path("/api/v0.2/stats");
619+
assert!(
620+
!stats_reqs.is_empty(),
621+
"Expected a stats request from the root span"
622+
);
623+
624+
let all_groups: Vec<_> = stats_reqs
625+
.iter()
626+
.map(|req| decode_stats_payload(&req.body))
627+
.flat_map(|payload| {
628+
payload
629+
.stats
630+
.into_iter()
631+
.flat_map(|csp| csp.stats.into_iter())
632+
.flat_map(|bucket| bucket.stats.into_iter())
633+
.collect::<Vec<_>>()
634+
})
635+
.collect();
636+
637+
// server_op is only eligible via span_kinds_stats_computed — fails if span kinds are not
638+
// passed to the concentrator
639+
assert!(
640+
all_groups.iter().any(|g| g.name == "server_op"),
641+
"Expected server_op to appear in stats (eligible via span_kinds_stats_computed), got: {:?}",
642+
all_groups.iter().map(|g| &g.name).collect::<Vec<_>>()
643+
);
644+
645+
// internal_op is never eligible regardless of configuration
646+
assert!(
647+
!all_groups.iter().any(|g| g.name == "internal_op"),
648+
"internal_op should not appear in stats, but got: {:?}",
649+
all_groups.iter().map(|g| &g.name).collect::<Vec<_>>()
650+
);
651+
}
652+
653+
/// Verify that peer tags are present in the flushed stats payload when a client span carries a
654+
/// peer tag (`peer.service`).
655+
#[cfg(test)]
656+
#[tokio::test]
657+
#[serial]
658+
async fn test_peer_tags_in_flushed_stats() {
659+
let mock_server = MockServer::start().await;
660+
tokio::time::sleep(Duration::from_millis(50)).await;
661+
662+
let mut config = create_tcp_test_config(8131);
663+
configure_mock_endpoints(&mut config, &mock_server.url());
664+
config.agent_stats_computation_enabled = true;
665+
let config = Arc::new(config);
666+
let test_port = config.dd_apm_receiver_port;
667+
668+
let (mini_agent, stats_concentrator_service_handle) =
669+
create_mini_agent_with_real_flushers(config);
670+
671+
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
672+
let agent_handle = tokio::spawn(async move {
673+
let _ = mini_agent
674+
.start_mini_agent(shutdown_rx, Some(stats_concentrator_service_handle))
675+
.await;
676+
});
677+
678+
let mut server_ready = false;
679+
for _ in 0..20 {
680+
tokio::time::sleep(Duration::from_millis(50)).await;
681+
if let Ok(response) = send_tcp_request(test_port, "/info", "GET", None, &[]).await
682+
&& response.status().is_success()
683+
{
684+
server_ready = true;
685+
break;
686+
}
687+
}
688+
assert!(
689+
server_ready,
690+
"Mini agent server failed to start within timeout"
691+
);
692+
693+
let trace_response = send_tcp_request(
694+
test_port,
695+
"/v0.4/traces",
696+
"POST",
697+
Some(create_client_span_with_peer_tag_payload()),
698+
&[],
699+
)
700+
.await
701+
.expect("Failed to send /v0.4/traces request");
702+
assert_eq!(trace_response.status(), StatusCode::OK);
703+
704+
tokio::time::sleep(FLUSH_WAIT_DURATION).await;
705+
706+
let _ = shutdown_tx.send(());
707+
let _ = agent_handle.await;
708+
709+
let stats_reqs = mock_server.get_requests_for_path("/api/v0.2/stats");
710+
assert!(
711+
!stats_reqs.is_empty(),
712+
"Expected at least one stats request"
713+
);
714+
715+
let payload = decode_stats_payload(&stats_reqs[0].body);
716+
let all_peer_tags: Vec<&str> = payload
717+
.stats
718+
.iter()
719+
.flat_map(|csp| &csp.stats)
720+
.flat_map(|bucket| &bucket.stats)
721+
.flat_map(|group| group.peer_tags.iter().map(String::as_str))
722+
.collect();
723+
724+
assert!(
725+
all_peer_tags.contains(&"peer.service:my-db"),
726+
"Expected peer.service:my-db in stats peer_tags, got: {all_peer_tags:?}"
727+
);
728+
}
729+
509730
#[cfg(all(test, windows, feature = "windows-pipes"))]
510731
#[tokio::test]
511732
#[serial]

0 commit comments

Comments
 (0)