Skip to content

Commit 10c9441

Browse files
committed
use bounded channel for stats concentrator
1 parent ac8d049 commit 10c9441

File tree

3 files changed

+17
-10
lines changed

3 files changed

+17
-10
lines changed

crates/datadog-trace-agent/src/stats_concentrator_service.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ use tracing::error;
1212
const S_TO_NS: u64 = 1_000_000_000;
1313
const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds
1414

15+
/// A bounded channel applies backpressure on the trace request path when the concentrator
16+
/// cannot keep up, instead of growing without limit as an unbounded channel would.
17+
const CONCENTRATOR_COMMAND_CHANNEL_CAPACITY: usize = 8192;
18+
1519
#[derive(Debug, thiserror::Error)]
1620
pub enum StatsError {
1721
#[error("Failed to send command to concentrator: {0}")]
@@ -39,7 +43,7 @@ pub enum ConcentratorCommand {
3943
}
4044

4145
pub struct StatsConcentratorHandle {
42-
tx: mpsc::UnboundedSender<ConcentratorCommand>,
46+
tx: mpsc::Sender<ConcentratorCommand>,
4347
is_tracer_metadata_set: AtomicBool,
4448
}
4549

@@ -58,14 +62,14 @@ impl Clone for StatsConcentratorHandle {
5862

5963
impl StatsConcentratorHandle {
6064
#[must_use]
61-
pub fn new(tx: mpsc::UnboundedSender<ConcentratorCommand>) -> Self {
65+
pub fn new(tx: mpsc::Sender<ConcentratorCommand>) -> Self {
6266
Self {
6367
tx,
6468
is_tracer_metadata_set: AtomicBool::new(false),
6569
}
6670
}
6771

68-
pub fn set_tracer_metadata(&self, trace: &TracerPayload) -> Result<(), StatsError> {
72+
pub async fn set_tracer_metadata(&self, trace: &TracerPayload) -> Result<(), StatsError> {
6973
// Set tracer metadata only once for the first trace because
7074
// it is the same for all traces.
7175
if !self.is_tracer_metadata_set.load(Ordering::Acquire) {
@@ -78,14 +82,16 @@ impl StatsConcentratorHandle {
7882
};
7983
self.tx
8084
.send(ConcentratorCommand::SetTracerMetadata(tracer_metadata))
85+
.await
8186
.map_err(StatsError::SendError)?;
8287
}
8388
Ok(())
8489
}
8590

86-
pub fn add(&self, span: &pb::Span) -> Result<(), StatsError> {
91+
pub async fn add(&self, span: &pb::Span) -> Result<(), StatsError> {
8792
self.tx
8893
.send(ConcentratorCommand::Add(Box::new(span.clone())))
94+
.await
8995
.map_err(StatsError::SendError)?;
9096
Ok(())
9197
}
@@ -94,14 +100,15 @@ impl StatsConcentratorHandle {
94100
let (response_tx, response_rx) = oneshot::channel();
95101
self.tx
96102
.send(ConcentratorCommand::Flush(force_flush, response_tx))
103+
.await
97104
.map_err(StatsError::SendError)?;
98105
response_rx.await.map_err(StatsError::RecvError)
99106
}
100107
}
101108

102109
pub struct StatsConcentratorService {
103110
concentrator: SpanConcentrator,
104-
rx: mpsc::UnboundedReceiver<ConcentratorCommand>,
111+
rx: mpsc::Receiver<ConcentratorCommand>,
105112
tracer_metadata: TracerMetadata,
106113
config: Arc<Config>,
107114
}
@@ -111,7 +118,7 @@ pub struct StatsConcentratorService {
111118
impl StatsConcentratorService {
112119
#[must_use]
113120
pub fn new(config: Arc<Config>) -> (Self, StatsConcentratorHandle) {
114-
let (tx, rx) = mpsc::unbounded_channel();
121+
let (tx, rx) = mpsc::channel(CONCENTRATOR_COMMAND_CHANNEL_CAPACITY);
115122
let handle = StatsConcentratorHandle::new(tx);
116123
// TODO: set span_kinds_stats_computed and peer_tag_keys
117124
let concentrator = SpanConcentrator::new(

crates/datadog-trace-agent/src/stats_generator.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,19 @@ impl StatsGenerator {
2121
Self { stats_concentrator }
2222
}
2323

24-
pub fn send(&self, traces: &TracerPayloadCollection) -> Result<(), StatsGeneratorError> {
24+
pub async fn send(&self, traces: &TracerPayloadCollection) -> Result<(), StatsGeneratorError> {
2525
if let TracerPayloadCollection::V07(traces) = traces {
2626
for trace in traces {
2727
// Set tracer metadata
28-
if let Err(err) = self.stats_concentrator.set_tracer_metadata(trace) {
28+
if let Err(err) = self.stats_concentrator.set_tracer_metadata(trace).await {
2929
error!("Failed to set tracer metadata: {err}");
3030
return Err(StatsGeneratorError::ConcentratorCommandError(err));
3131
}
3232

3333
// Generate stats for each span in the trace
3434
for chunk in &trace.chunks {
3535
for span in &chunk.spans {
36-
if let Err(err) = self.stats_concentrator.add(span) {
36+
if let Err(err) = self.stats_concentrator.add(span).await {
3737
error!("Failed to send trace stats: {err}");
3838
return Err(StatsGeneratorError::ConcentratorCommandError(err));
3939
}

crates/datadog-trace-agent/src/trace_processor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ impl TraceProcessor for ServerlessTraceProcessor {
145145

146146
if let Some(stats_generator) = self.stats_generator.as_ref()
147147
&& !tracer_header_tags.client_computed_stats
148-
&& let Err(e) = stats_generator.send(&payload)
148+
&& let Err(e) = stats_generator.send(&payload).await
149149
{
150150
error!("Stats generator error: {e}");
151151
}

0 commit comments

Comments
 (0)