Skip to content

Commit f766c46

Browse files
fix(traces): use channel-based aggregator (#909)
Use a channel-based aggregator for traces to avoid locking on aggregation. Adds an aggregator service which receives incoming commands on a channel. Commands can be to insert payloads, get batches, and shutdown. Follows a similar pattern as #879. Tested with self-monitoring and tested that dual shipping still works as well.
1 parent 7aeb121 commit f766c46

5 files changed

Lines changed: 220 additions & 42 deletions

File tree

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ use bottlecap::{
4545
},
4646
logger,
4747
logs::{
48-
agent::LogsAgent, aggregator_service::AggregatorService as LogsAggregatorService,
48+
agent::LogsAgent,
49+
aggregator_service::{
50+
AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService,
51+
},
4952
flusher::LogsFlusher,
5053
},
5154
otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent},
@@ -64,7 +67,10 @@ use bottlecap::{
6467
stats_flusher::{self, StatsFlusher},
6568
stats_generator::StatsGenerator,
6669
stats_processor, trace_agent,
67-
trace_aggregator::{self, SendDataBuilderInfo},
70+
trace_aggregator::SendDataBuilderInfo,
71+
trace_aggregator_service::{
72+
AggregatorHandle as TraceAggregatorHandle, AggregatorService as TraceAggregatorService,
73+
},
6874
trace_flusher::{self, ServerlessTraceFlusher, TraceFlusher},
6975
trace_processor::{self, SendingTraceProcessor},
7076
},
@@ -402,12 +408,13 @@ async fn extension_loop_active(
402408
.to_string();
403409
let tags_provider = setup_tag_provider(&Arc::clone(&aws_config), config, &account_id);
404410

405-
let (logs_agent_channel, logs_flusher, logs_agent_cancel_token) = start_logs_agent(
406-
config,
407-
Arc::clone(&api_key_factory),
408-
&tags_provider,
409-
event_bus_tx.clone(),
410-
);
411+
let (logs_agent_channel, logs_flusher, logs_agent_cancel_token, logs_aggregator_handle) =
412+
start_logs_agent(
413+
config,
414+
Arc::clone(&api_key_factory),
415+
&tags_provider,
416+
event_bus_tx.clone(),
417+
);
411418

412419
let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) =
413420
start_dogstatsd(tags_provider.clone(), Arc::clone(&api_key_factory), config).await;
@@ -446,6 +453,7 @@ async fn extension_loop_active(
446453
proxy_flusher,
447454
trace_agent_shutdown_token,
448455
stats_concentrator,
456+
trace_aggregator_handle,
449457
) = start_trace_agent(
450458
config,
451459
&api_key_factory,
@@ -745,6 +753,15 @@ async fn extension_loop_active(
745753
true, // force_flush_trace_stats
746754
)
747755
.await;
756+
757+
// Shutdown aggregator services
758+
if let Err(e) = logs_aggregator_handle.shutdown() {
759+
error!("Failed to shutdown logs aggregator: {e}");
760+
}
761+
if let Err(e) = trace_aggregator_handle.shutdown() {
762+
error!("Failed to shutdown trace aggregator: {e}");
763+
}
764+
748765
return Ok(());
749766
}
750767
}
@@ -959,7 +976,12 @@ fn start_logs_agent(
959976
api_key_factory: Arc<ApiKeyFactory>,
960977
tags_provider: &Arc<TagProvider>,
961978
event_bus: Sender<Event>,
962-
) -> (Sender<TelemetryEvent>, LogsFlusher, CancellationToken) {
979+
) -> (
980+
Sender<TelemetryEvent>,
981+
LogsFlusher,
982+
CancellationToken,
983+
LogsAggregatorHandle,
984+
) {
963985
let (aggregator_service, aggregator_handle) = LogsAggregatorService::default();
964986
// Start service in background
965987
tokio::spawn(async move {
@@ -981,8 +1003,8 @@ fn start_logs_agent(
9811003
drop(agent);
9821004
});
9831005

984-
let flusher = LogsFlusher::new(api_key_factory, aggregator_handle, config.clone());
985-
(tx, flusher, cancel_token)
1006+
let flusher = LogsFlusher::new(api_key_factory, aggregator_handle.clone(), config.clone());
1007+
(tx, flusher, cancel_token, aggregator_handle)
9861008
}
9871009

9881010
#[allow(clippy::type_complexity)]
@@ -1000,6 +1022,7 @@ fn start_trace_agent(
10001022
Arc<ProxyFlusher>,
10011023
tokio_util::sync::CancellationToken,
10021024
StatsConcentratorHandle,
1025+
TraceAggregatorHandle,
10031026
) {
10041027
// Stats
10051028
let (stats_concentrator_service, stats_concentrator_handle) =
@@ -1017,9 +1040,11 @@ fn start_trace_agent(
10171040
let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {});
10181041

10191042
// Traces
1020-
let trace_aggregator = Arc::new(TokioMutex::new(trace_aggregator::TraceAggregator::default()));
1043+
let (trace_aggregator_service, trace_aggregator_handle) = TraceAggregatorService::default();
1044+
tokio::spawn(trace_aggregator_service.run());
1045+
10211046
let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher::new(
1022-
trace_aggregator.clone(),
1047+
trace_aggregator_handle.clone(),
10231048
config.clone(),
10241049
api_key_factory.clone(),
10251050
));
@@ -1048,7 +1073,7 @@ fn start_trace_agent(
10481073

10491074
let trace_agent = trace_agent::TraceAgent::new(
10501075
Arc::clone(config),
1051-
trace_aggregator,
1076+
trace_aggregator_handle.clone(),
10521077
trace_processor.clone(),
10531078
stats_aggregator,
10541079
stats_processor,
@@ -1075,6 +1100,7 @@ fn start_trace_agent(
10751100
proxy_flusher,
10761101
shutdown_token,
10771102
stats_concentrator_handle,
1103+
trace_aggregator_handle,
10781104
)
10791105
}
10801106

bottlecap/src/traces/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub mod stats_generator;
1313
pub mod stats_processor;
1414
pub mod trace_agent;
1515
pub mod trace_aggregator;
16+
pub mod trace_aggregator_service;
1617
pub mod trace_flusher;
1718
pub mod trace_processor;
1819

bottlecap/src/traces/trace_agent.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ use crate::{
3636
stats_aggregator,
3737
stats_generator::StatsGenerator,
3838
stats_processor,
39-
trace_aggregator::{self, SendDataBuilderInfo},
39+
trace_aggregator::SendDataBuilderInfo,
40+
trace_aggregator_service::AggregatorHandle,
4041
trace_processor,
4142
},
4243
};
@@ -123,7 +124,7 @@ impl TraceAgent {
123124
#[allow(clippy::too_many_arguments)]
124125
pub fn new(
125126
config: Arc<config::Config>,
126-
trace_aggregator: Arc<Mutex<trace_aggregator::TraceAggregator>>,
127+
aggregator_handle: AggregatorHandle,
127128
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
128129
stats_aggregator: Arc<Mutex<stats_aggregator::StatsAggregator>>,
129130
stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>,
@@ -142,8 +143,9 @@ impl TraceAgent {
142143
// Start the trace aggregator, which receives and buffers trace payloads to be consumed by the trace flusher.
143144
tokio::spawn(async move {
144145
while let Some(tracer_payload_info) = trace_rx.recv().await {
145-
let mut aggregator = trace_aggregator.lock().await;
146-
aggregator.add(tracer_payload_info);
146+
if let Err(e) = aggregator_handle.insert_payload(tracer_payload_info) {
147+
error!("TRACE_AGENT | Failed to insert payload into aggregator: {e}");
148+
}
147149
}
148150
});
149151

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
use datadog_trace_utils::send_data::SendDataBuilder;
2+
use tokio::sync::{mpsc, oneshot};
3+
use tracing::{debug, error};
4+
5+
use crate::traces::trace_aggregator::{
6+
MAX_CONTENT_SIZE_BYTES, SendDataBuilderInfo, TraceAggregator,
7+
};
8+
9+
pub enum AggregatorCommand {
10+
InsertPayload(SendDataBuilderInfo),
11+
GetBatches(oneshot::Sender<Vec<Vec<SendDataBuilder>>>),
12+
Clear,
13+
Shutdown,
14+
}
15+
16+
#[derive(Clone, Debug)]
17+
pub struct AggregatorHandle {
18+
tx: mpsc::UnboundedSender<AggregatorCommand>,
19+
}
20+
21+
impl AggregatorHandle {
22+
pub fn insert_payload(
23+
&self,
24+
payload_info: SendDataBuilderInfo,
25+
) -> Result<(), mpsc::error::SendError<AggregatorCommand>> {
26+
self.tx.send(AggregatorCommand::InsertPayload(payload_info))
27+
}
28+
29+
pub async fn get_batches(&self) -> Result<Vec<Vec<SendDataBuilder>>, String> {
30+
let (response_tx, response_rx) = oneshot::channel();
31+
self.tx
32+
.send(AggregatorCommand::GetBatches(response_tx))
33+
.map_err(|e| format!("Failed to send flush command: {e}"))?;
34+
35+
response_rx
36+
.await
37+
.map_err(|e| format!("Failed to receive flush response: {e}"))
38+
}
39+
40+
pub fn clear(&self) -> Result<(), mpsc::error::SendError<AggregatorCommand>> {
41+
self.tx.send(AggregatorCommand::Clear)
42+
}
43+
44+
pub fn shutdown(&self) -> Result<(), mpsc::error::SendError<AggregatorCommand>> {
45+
self.tx.send(AggregatorCommand::Shutdown)
46+
}
47+
}
48+
49+
pub struct AggregatorService {
50+
aggregator: TraceAggregator,
51+
rx: mpsc::UnboundedReceiver<AggregatorCommand>,
52+
}
53+
54+
impl AggregatorService {
55+
#[must_use]
56+
#[allow(clippy::should_implement_trait)]
57+
pub fn default() -> (Self, AggregatorHandle) {
58+
Self::new(MAX_CONTENT_SIZE_BYTES)
59+
}
60+
61+
#[must_use]
62+
pub fn new(max_content_size_bytes: usize) -> (Self, AggregatorHandle) {
63+
let (tx, rx) = mpsc::unbounded_channel();
64+
let aggregator = TraceAggregator::new(max_content_size_bytes);
65+
66+
let service = Self { aggregator, rx };
67+
let handle = AggregatorHandle { tx };
68+
69+
(service, handle)
70+
}
71+
72+
pub async fn run(mut self) {
73+
debug!("Trace aggregator service started");
74+
75+
while let Some(command) = self.rx.recv().await {
76+
match command {
77+
AggregatorCommand::InsertPayload(payload_info) => {
78+
self.aggregator.add(payload_info);
79+
}
80+
AggregatorCommand::GetBatches(response_tx) => {
81+
let mut batches = Vec::new();
82+
let mut current_batch = self.aggregator.get_batch();
83+
while !current_batch.is_empty() {
84+
batches.push(current_batch);
85+
current_batch = self.aggregator.get_batch();
86+
}
87+
if response_tx.send(batches).is_err() {
88+
error!("Failed to send trace flush response - receiver dropped");
89+
}
90+
}
91+
AggregatorCommand::Clear => {
92+
self.aggregator.clear();
93+
}
94+
AggregatorCommand::Shutdown => {
95+
debug!("Trace aggregator service shutting down");
96+
break;
97+
}
98+
}
99+
}
100+
}
101+
}
102+
103+
#[cfg(test)]
104+
#[allow(clippy::unwrap_used)]
105+
mod tests {
106+
use super::*;
107+
use datadog_trace_utils::{
108+
trace_utils::TracerHeaderTags, tracer_payload::TracerPayloadCollection,
109+
};
110+
use ddcommon::Endpoint;
111+
112+
#[tokio::test]
113+
async fn test_aggregator_service_insert_and_flush() {
114+
let (service, handle) = AggregatorService::default();
115+
116+
let service_handle = tokio::spawn(async move {
117+
service.run().await;
118+
});
119+
120+
let tracer_header_tags = TracerHeaderTags {
121+
lang: "lang",
122+
lang_version: "lang_version",
123+
lang_interpreter: "lang_interpreter",
124+
lang_vendor: "lang_vendor",
125+
tracer_version: "tracer_version",
126+
container_id: "container_id",
127+
client_computed_top_level: true,
128+
client_computed_stats: true,
129+
dropped_p0_traces: 0,
130+
dropped_p0_spans: 0,
131+
};
132+
let size = 1;
133+
let payload = SendDataBuilder::new(
134+
size,
135+
TracerPayloadCollection::V07(Vec::new()),
136+
tracer_header_tags,
137+
&Endpoint::from_slice("localhost"),
138+
);
139+
140+
handle
141+
.insert_payload(SendDataBuilderInfo::new(payload, size))
142+
.unwrap();
143+
144+
let batches = handle.get_batches().await.unwrap();
145+
assert_eq!(batches.len(), 1);
146+
assert_eq!(batches[0].len(), 1);
147+
148+
handle
149+
.shutdown()
150+
.expect("Failed to shutdown aggregator service");
151+
service_handle
152+
.await
153+
.expect("Aggregator service task failed");
154+
}
155+
}

0 commit comments

Comments
 (0)