Skip to content

Commit c2191f8

Browse files
committed
add shutdown channel to stats flusher
1 parent ee8cefb commit c2191f8

File tree

5 files changed

+87
-53
lines changed

5 files changed

+87
-53
lines changed

crates/datadog-serverless-compat/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ pub async fn main() {
177177

178178
let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher {
179179
stats_concentrator: stats_concentrator_handle.clone(),
180-
force_flush_concentrator: false,
181180
});
182181
let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {});
183182

@@ -199,8 +198,9 @@ pub async fn main() {
199198
proxy_flusher,
200199
});
201200

201+
let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
202202
tokio::spawn(async move {
203-
let res = mini_agent.start_mini_agent().await;
203+
let res = mini_agent.start_mini_agent(shutdown_rx).await;
204204
if let Err(e) = res {
205205
error!("Error when starting serverless trace mini agent: {e:?}");
206206
}

crates/datadog-trace-agent/src/mini_agent.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@ pub struct MiniAgent {
5050
}
5151

5252
impl MiniAgent {
53-
pub async fn start_mini_agent(&self) -> Result<(), Box<dyn std::error::Error>> {
53+
pub async fn start_mini_agent(
54+
&self,
55+
shutdown_rx: tokio::sync::oneshot::Receiver<()>,
56+
) -> Result<(), Box<dyn std::error::Error>> {
5457
let now = Instant::now();
5558

5659
// verify we are in a serverless function environment. if not, shut down the mini agent.
@@ -93,7 +96,7 @@ impl MiniAgent {
9396
let stats_config = self.config.clone();
9497
let stats_flusher_handle = tokio::spawn(async move {
9598
stats_flusher
96-
.start_stats_flusher(stats_config, stats_rx)
99+
.start_stats_flusher(stats_config, stats_rx, shutdown_rx)
97100
.await;
98101
});
99102

crates/datadog-trace-agent/src/stats_flusher.rs

Lines changed: 52 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33

44
use async_trait::async_trait;
55
use std::{sync::Arc, time};
6-
use tokio::sync::{Mutex, mpsc::Receiver};
6+
use tokio::sync::mpsc::Receiver;
7+
use tokio::sync::oneshot;
78
use tracing::{debug, error};
89

910
use libdd_trace_protobuf::pb;
@@ -46,22 +47,28 @@ async fn send_stats_payload(config: &Arc<Config>, payload: pb::StatsPayload) {
4647
#[async_trait]
4748
pub trait StatsFlusher {
4849
/// Starts a stats flusher that listens for stats payloads sent to the tokio mpsc Receiver,
49-
/// implementing flushing logic that calls flush_stats.
50+
/// implementing flushing logic that calls flush_stats. Runs until the shutdown signal fires,
51+
/// at which point it performs a final force flush and returns.
5052
async fn start_stats_flusher(
5153
&self,
5254
config: Arc<Config>,
53-
mut rx: Receiver<pb::ClientStatsPayload>,
55+
rx: Receiver<pb::ClientStatsPayload>,
56+
shutdown_rx: oneshot::Receiver<()>,
5457
);
5558
/// Flushes stats to the Datadog trace stats intake.
56-
async fn flush_stats(&self, config: Arc<Config>, client_stats: Vec<pb::ClientStatsPayload>);
59+
/// `force_flush` controls whether in-progress concentrator buckets are flushed (true on
60+
/// shutdown, false on normal interval flushes).
61+
async fn flush_stats(
62+
&self,
63+
config: Arc<Config>,
64+
client_stats: Vec<pb::ClientStatsPayload>,
65+
force_flush: bool,
66+
);
5767
}
5868

5969
#[derive(Clone)]
6070
pub struct ServerlessStatsFlusher {
6171
pub stats_concentrator: Option<StatsConcentrator>,
62-
/// When false, flushes are done on completed buckets
63-
/// When true, flushes are done on any in progress buckets, useful for integration tests
64-
pub force_flush_concentrator: bool,
6572
}
6673

6774
#[async_trait]
@@ -70,44 +77,51 @@ impl StatsFlusher for ServerlessStatsFlusher {
7077
&self,
7178
config: Arc<Config>,
7279
mut rx: Receiver<pb::ClientStatsPayload>,
80+
mut shutdown_rx: oneshot::Receiver<()>,
7381
) {
74-
let buffer: Arc<Mutex<Vec<pb::ClientStatsPayload>>> = Arc::new(Mutex::new(Vec::new()));
75-
76-
let buffer_producer = buffer.clone();
77-
let buffer_consumer = buffer.clone();
78-
79-
// Drain the stats channel continuously into the buffer
80-
tokio::spawn(async move {
81-
while let Some(stats_payload) = rx.recv().await {
82-
let mut buffer = buffer_producer.lock().await;
83-
buffer.push(stats_payload);
84-
}
85-
});
82+
let mut interval =
83+
tokio::time::interval(time::Duration::from_secs(config.stats_flush_interval_secs));
84+
let mut buffer: Vec<pb::ClientStatsPayload> = Vec::new();
8685

87-
// Flush stats from the buffer on a fixed interval
8886
loop {
89-
tokio::time::sleep(time::Duration::from_secs(config.stats_flush_interval_secs)).await;
90-
91-
let mut buffer = buffer_consumer.lock().await;
92-
// Copy the batch for this flush
93-
let channel_stats = buffer.to_vec();
94-
// Reset the buffer so the next tick only sees new stats
95-
buffer.clear();
96-
// Release the mutex before flushing stats
97-
drop(buffer);
98-
99-
let should_flush = should_flush_stats_buffer(
100-
!channel_stats.is_empty(),
101-
self.stats_concentrator.is_some(),
102-
);
103-
if should_flush {
104-
self.flush_stats(config.clone(), channel_stats).await;
87+
tokio::select! {
88+
// Receive client stats and add them to the buffer
89+
Some(stats) = rx.recv() => {
90+
buffer.push(stats);
91+
}
92+
93+
// Drain client stats in buffer and stats from concentrator on interval
94+
_ = interval.tick() => {
95+
let client_stats = std::mem::take(&mut buffer);
96+
let should_flush = should_flush_stats_buffer(
97+
!client_stats.is_empty(),
98+
self.stats_concentrator.is_some(),
99+
);
100+
if should_flush {
101+
self.flush_stats(config.clone(), client_stats, false).await;
102+
}
103+
}
104+
105+
_ = &mut shutdown_rx => {
106+
// Drain any client stats that arrived before the shutdown signal
107+
while let Ok(stats) = rx.try_recv() {
108+
buffer.push(stats);
109+
}
110+
// Force flush all in progress concentrator buckets on shutdown signal
111+
self.flush_stats(config.clone(), std::mem::take(&mut buffer), true).await;
112+
return;
113+
}
105114
}
106115
}
107116
}
108117

109118
/// Flushes client computed stats from the tracer and serverless computed stats as separate payloads
110-
async fn flush_stats(&self, config: Arc<Config>, client_stats: Vec<pb::ClientStatsPayload>) {
119+
async fn flush_stats(
120+
&self,
121+
config: Arc<Config>,
122+
client_stats: Vec<pb::ClientStatsPayload>,
123+
force_flush: bool,
124+
) {
111125
// Flush client computed stats from the tracer
112126
if !client_stats.is_empty() {
113127
let payload = stats_utils::construct_stats_payload(client_stats);
@@ -116,7 +130,7 @@ impl StatsFlusher for ServerlessStatsFlusher {
116130

117131
// Flush serverless computed stats from the concentrator
118132
if let Some(ref concentrator) = self.stats_concentrator
119-
&& let Some(agent_stats) = concentrator.flush(self.force_flush_concentrator)
133+
&& let Some(agent_stats) = concentrator.flush(force_flush)
120134
{
121135
let mut payload = stats_utils::construct_stats_payload(vec![agent_stats]);
122136
payload.client_computed = false;

crates/datadog-trace-agent/tests/common/mocks.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use libdd_trace_protobuf::pb;
1212
use libdd_trace_utils::trace_utils::{self, MiniAgentMetadata, SendData};
1313
use std::sync::Arc;
1414
use tokio::sync::mpsc::{Receiver, Sender};
15+
use tokio::sync::oneshot;
1516

1617
/// Mock trace processor that returns 200 OK for all requests
1718
#[allow(dead_code)]
@@ -86,14 +87,20 @@ impl StatsFlusher for MockStatsFlusher {
8687
&self,
8788
_config: Arc<Config>,
8889
mut stats_rx: Receiver<pb::ClientStatsPayload>,
90+
_shutdown_rx: oneshot::Receiver<()>,
8991
) {
9092
// Consume messages from the channel without processing them
9193
while let Some(_stats) = stats_rx.recv().await {
9294
// Just discard the stats - we're not testing the flusher
9395
}
9496
}
9597

96-
async fn flush_stats(&self, _config: Arc<Config>, _traces: Vec<pb::ClientStatsPayload>) {
98+
async fn flush_stats(
99+
&self,
100+
_config: Arc<Config>,
101+
_traces: Vec<pb::ClientStatsPayload>,
102+
_force_flush: bool,
103+
) {
97104
// Do nothing
98105
}
99106
}

crates/datadog-trace-agent/tests/integration_test.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ pub fn create_mini_agent_with_real_flushers(config: Arc<Config>) -> MiniAgent {
6969
stats_processor: Arc::new(ServerlessStatsProcessor {}),
7070
stats_flusher: Arc::new(ServerlessStatsFlusher {
7171
stats_concentrator: Some(stats_concentrator_handle),
72-
force_flush_concentrator: true,
7372
}),
7473
env_verifier: Arc::new(MockEnvVerifier),
7574
proxy_flusher: Arc::new(ProxyFlusher::new(config.clone())),
@@ -178,7 +177,8 @@ async fn test_mini_agent_tcp_handles_requests() {
178177

179178
// Start the mini agent
180179
let agent_handle = tokio::spawn(async move {
181-
let _ = mini_agent.start_mini_agent().await;
180+
let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
181+
let _ = mini_agent.start_mini_agent(shutdown_rx).await;
182182
});
183183

184184
// Give server time to start
@@ -277,7 +277,8 @@ async fn test_mini_agent_named_pipe_handles_requests() {
277277

278278
// Start the mini agent
279279
let agent_handle = tokio::spawn(async move {
280-
let _ = mini_agent.start_mini_agent().await;
280+
let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
281+
let _ = mini_agent.start_mini_agent(shutdown_rx).await;
281282
});
282283

283284
// Give server time to create pipe
@@ -348,8 +349,9 @@ async fn test_mini_agent_tcp_with_real_flushers() {
348349

349350
let mini_agent = create_mini_agent_with_real_flushers(config);
350351

352+
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
351353
let agent_handle = tokio::spawn(async move {
352-
let _ = mini_agent.start_mini_agent().await;
354+
let _ = mini_agent.start_mini_agent(shutdown_rx).await;
353355
});
354356

355357
// Wait for server to be ready
@@ -376,10 +378,13 @@ async fn test_mini_agent_tcp_with_real_flushers() {
376378
.expect("Failed to send /v0.4/traces request");
377379
assert_eq!(trace_response.status(), StatusCode::OK);
378380

379-
// Wait for flush
381+
// Wait for trace flush
380382
tokio::time::sleep(FLUSH_WAIT_DURATION).await;
381-
382383
verify_trace_request(&mock_server);
384+
385+
// Trigger shutdown to force flush in progress concentrator buckets
386+
let _ = shutdown_tx.send(());
387+
tokio::time::sleep(FLUSH_WAIT_DURATION).await;
383388
verify_stats_request(&mock_server); // Stats generator should generate stats from trace payload
384389

385390
// Clean up
@@ -401,7 +406,8 @@ async fn test_mini_agent_tcp_with_real_flushers_and_tracer_computed_stats() {
401406
let mini_agent = create_mini_agent_with_real_flushers(config);
402407

403408
let agent_handle = tokio::spawn(async move {
404-
let _ = mini_agent.start_mini_agent().await;
409+
let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
410+
let _ = mini_agent.start_mini_agent(shutdown_rx).await;
405411
});
406412

407413
// Wait for server to be ready
@@ -459,8 +465,9 @@ async fn test_mini_agent_named_pipe_with_real_flushers() {
459465

460466
let mini_agent = create_mini_agent_with_real_flushers(config);
461467

468+
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
462469
let agent_handle = tokio::spawn(async move {
463-
let _ = mini_agent.start_mini_agent().await;
470+
let _ = mini_agent.start_mini_agent(shutdown_rx).await;
464471
});
465472

466473
// Wait for server to be ready
@@ -487,10 +494,13 @@ async fn test_mini_agent_named_pipe_with_real_flushers() {
487494
.expect("Failed to send /v0.4/traces request over named pipe");
488495
assert_eq!(trace_response.status(), StatusCode::OK);
489496

490-
// Wait for flush
497+
// Wait for trace flush
491498
tokio::time::sleep(FLUSH_WAIT_DURATION).await;
492-
493499
verify_trace_request(&mock_server);
500+
501+
// Trigger shutdown to force flush in progress concentrator buckets
502+
let _ = shutdown_tx.send(());
503+
tokio::time::sleep(FLUSH_WAIT_DURATION).await;
494504
verify_stats_request(&mock_server);
495505

496506
// Clean up

0 commit comments

Comments
 (0)