Skip to content

Commit d60314e

Browse files
committed
remove locking on metrics
1 parent 7590bd2 commit d60314e

5 files changed

Lines changed: 55 additions & 79 deletions

File tree

bottlecap/Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bottlecap/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "158b
7171
libdd-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" }
7272
libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" }
7373
libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" }
74-
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "18b49baba8bfef97060d7edd8b830584d0da3373", default-features = false }
75-
datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "18b49baba8bfef97060d7edd8b830584d0da3373", default-features = false }
74+
dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "e4f0341c84bf57d7af3784f4bf9e7f33e4c7ecd1", default-features = false }
75+
datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "e4f0341c84bf57d7af3784f4bf9e7f33e4c7ecd1", default-features = false }
7676
libddwaf = { version = "1.28.1", git = "https://github.com/DataDog/libddwaf-rust", rev = "d1534a158d976bd4f747bf9fcc58e0712d2d17fc", default-features = false, features = ["serde"] }
7777

7878
[dev-dependencies]

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -450,10 +450,8 @@ async fn extension_loop_active(
450450
// Wait for any pending flushes
451451
flushing_service.await_handles().await;
452452
// Final flush to capture any data that accumulated since the last
453-
// spawn_non_blocking(). We pass force_stats=true since this is our
454-
// last opportunity to send data before shutdown.
455-
let mut locked_metrics = flushing_service.metrics_flushers().lock().await;
456-
flushing_service.flush_blocking(true, &mut locked_metrics).await;
453+
// spawn_non_blocking(). This is our last opportunity to send data.
454+
flushing_service.flush_blocking_final().await;
457455
break;
458456
}
459457
}
@@ -635,19 +633,13 @@ async fn extension_loop_active(
635633
}
636634
}
637635
_ = race_flush_interval.tick() => {
638-
let mut locked_metrics = metrics_flushers.lock().await;
639-
flushing_service
640-
.flush_blocking(false, &mut locked_metrics)
641-
.await;
636+
flushing_service.flush_blocking().await;
642637
race_flush_interval.reset();
643638
}
644639
}
645640
}
646641
// flush
647-
let mut locked_metrics = metrics_flushers.lock().await;
648-
flushing_service
649-
.flush_blocking(false, &mut locked_metrics)
650-
.await;
642+
flushing_service.flush_blocking().await;
651643
race_flush_interval.reset();
652644
let next_response =
653645
extension::next_event(client, &aws_config.runtime_api, &r.extension_id).await;
@@ -664,10 +656,7 @@ async fn extension_loop_active(
664656
}
665657
}
666658
FlushDecision::Periodic => {
667-
let mut locked_metrics = metrics_flushers.lock().await;
668-
flushing_service
669-
.flush_blocking(false, &mut locked_metrics)
670-
.await;
659+
flushing_service.flush_blocking().await;
671660
race_flush_interval.reset();
672661
}
673662
_ => {
@@ -695,10 +684,7 @@ async fn extension_loop_active(
695684
}
696685
_ = race_flush_interval.tick() => {
697686
if flush_control.flush_strategy == FlushStrategy::Default {
698-
let mut locked_metrics = metrics_flushers.lock().await;
699-
flushing_service
700-
.flush_blocking(false, &mut locked_metrics)
701-
.await;
687+
flushing_service.flush_blocking().await;
702688
race_flush_interval.reset();
703689
}
704690
}
@@ -744,11 +730,8 @@ async fn extension_loop_active(
744730
&lifecycle_listener_shutdown_token,
745731
);
746732

747-
// Final flush with force_stats=true since this is our last opportunity
748-
let mut locked_metrics = metrics_flushers.lock().await;
749-
flushing_service
750-
.flush_blocking(true, &mut locked_metrics)
751-
.await;
733+
// Final flush - this is our last opportunity to send data before shutdown
734+
flushing_service.flush_blocking_final().await;
752735

753736
// Even though we're shutting down, we need to reset the flush interval to prevent any future flushes
754737
race_flush_interval.reset();
@@ -1178,7 +1161,7 @@ async fn start_dogstatsd(
11781161
api_key_factory: Arc<ApiKeyFactory>,
11791162
config: &Arc<Config>,
11801163
) -> (
1181-
Arc<TokioMutex<Vec<MetricsFlusher>>>,
1164+
Arc<Vec<MetricsFlusher>>,
11821165
MetricsAggregatorHandle,
11831166
CancellationToken,
11841167
) {
@@ -1200,17 +1183,18 @@ async fn start_dogstatsd(
12001183
});
12011184

12021185
// Get flushers with aggregator handle
1203-
let flushers = Arc::new(TokioMutex::new(start_metrics_flushers(
1186+
let flushers = Arc::new(start_metrics_flushers(
12041187
Arc::clone(&api_key_factory),
12051188
&aggregator_handle,
12061189
config,
1207-
)));
1190+
));
12081191

12091192
// Create Dogstatsd server
12101193
let dogstatsd_config = DogStatsDConfig {
12111194
host: EXTENSION_HOST.to_string(),
12121195
port: DOGSTATSD_PORT,
12131196
metric_namespace: config.statsd_metric_namespace.clone(),
1197+
windows_pipe_name: None,
12141198
};
12151199
let cancel_token = tokio_util::sync::CancellationToken::new();
12161200
let dogstatsd_agent = DogStatsD::new(

bottlecap/src/flushing/service.rs

Lines changed: 38 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
33
use std::sync::Arc;
44

5-
use tokio::sync::Mutex as TokioMutex;
65
use tracing::{debug, error};
76

87
use dogstatsd::{
@@ -29,7 +28,7 @@ pub struct FlushingService {
2928
trace_flusher: Arc<TraceFlusher>,
3029
stats_flusher: Arc<StatsFlusher>,
3130
proxy_flusher: Arc<ProxyFlusher>,
32-
metrics_flushers: Arc<TokioMutex<Vec<MetricsFlusher>>>,
31+
metrics_flushers: Arc<Vec<MetricsFlusher>>,
3332

3433
// Metrics aggregator handle for getting data to flush
3534
metrics_aggr_handle: MetricsAggregatorHandle,
@@ -46,7 +45,7 @@ impl FlushingService {
4645
trace_flusher: Arc<TraceFlusher>,
4746
stats_flusher: Arc<StatsFlusher>,
4847
proxy_flusher: Arc<ProxyFlusher>,
49-
metrics_flushers: Arc<TokioMutex<Vec<MetricsFlusher>>>,
48+
metrics_flushers: Arc<Vec<MetricsFlusher>>,
5049
metrics_aggr_handle: MetricsAggregatorHandle,
5150
) -> Self {
5251
Self {
@@ -90,22 +89,17 @@ impl FlushingService {
9089

9190
// Spawn metrics flush
9291
// First get the data from aggregator, then spawn flush tasks for each flusher
93-
let (metrics_flushers_copy, series, sketches) = {
94-
let locked_metrics = self.metrics_flushers.lock().await;
95-
let flush_response = self
96-
.metrics_aggr_handle
97-
.clone()
98-
.flush()
99-
.await
100-
.expect("can't flush metrics handle");
101-
(
102-
locked_metrics.clone(),
103-
flush_response.series,
104-
flush_response.distributions,
105-
)
106-
};
92+
let flush_response = self
93+
.metrics_aggr_handle
94+
.clone()
95+
.flush()
96+
.await
97+
.expect("can't flush metrics handle");
98+
let series = flush_response.series;
99+
let sketches = flush_response.distributions;
107100

108-
for (idx, mut flusher) in metrics_flushers_copy.into_iter().enumerate() {
101+
for (idx, flusher) in self.metrics_flushers.iter().enumerate() {
102+
let flusher = flusher.clone();
109103
let series_clone = series.clone();
110104
let sketches_clone = sketches.clone();
111105
let handle = tokio::spawn(async move {
@@ -240,8 +234,7 @@ impl FlushingService {
240234
retry_batch.sketches.len()
241235
);
242236
joinset.spawn(async move {
243-
let mut locked_flushers = mf.lock().await;
244-
if let Some(flusher) = locked_flushers.get_mut(retry_batch.flusher_id) {
237+
if let Some(flusher) = mf.get(retry_batch.flusher_id) {
245238
flusher
246239
.flush_metrics(retry_batch.series, retry_batch.sketches)
247240
.await;
@@ -288,34 +281,42 @@ impl FlushingService {
288281
flush_error
289282
}
290283

291-
/// Performs a blocking flush of all data.
284+
/// Performs a blocking flush of all telemetry data.
292285
///
293-
/// This method flushes all data synchronously using `tokio::join!` for parallelism.
294-
/// Unlike `spawn_non_blocking`, this waits for all flushes to complete before returning.
286+
/// Flushes logs, metrics (series and distributions), traces, stats, and APM proxy
287+
/// data in parallel using `tokio::join!`. Unlike `spawn_non_blocking`, this waits
288+
/// for all flushes to complete before returning.
295289
///
296-
/// # Arguments
290+
/// The stats flusher respects its normal timing constraints (time-based bucketing),
291+
/// which may result in some stats being held back until the next flush cycle.
292+
pub async fn flush_blocking(&self) {
293+
self.flush_blocking_inner(false).await;
294+
}
295+
296+
/// Performs a final blocking flush of all telemetry data before shutdown.
297297
///
298-
/// * `force_stats` - If `true`, forces the stats flusher to flush immediately
299-
/// regardless of timing constraints.
300-
/// * `metrics_flushers` - Mutable slice of metrics flushers. The caller must acquire
301-
/// the lock before calling this method.
298+
/// Flushes logs, metrics (series and distributions), traces, stats, and APM proxy
299+
/// data in parallel. Unlike `flush_blocking`, this forces the stats flusher to
300+
/// flush immediately regardless of its normal timing constraints.
302301
///
303-
/// # Note
302+
/// Use this during shutdown when this is the last opportunity to send data.
303+
pub async fn flush_blocking_final(&self) {
304+
self.flush_blocking_inner(true).await;
305+
}
306+
307+
/// Internal implementation for blocking flush operations.
304308
///
305-
/// TODO: The caller must acquire the lock on `metrics_flushers` and pass a mutable slice
306-
/// because `MetricsFlusher::flush_metrics` requires `&mut self`. This creates awkward
307-
/// ergonomics. Consider modifying the `dogstatsd` crate to use interior mutability
308-
/// (e.g., `Arc<Mutex<...>>` internally) so `flush_metrics` can take `&self`, allowing
309-
/// this method to handle locking internally.
310-
pub async fn flush_blocking(&self, force_stats: bool, metrics_flushers: &mut [MetricsFlusher]) {
309+
/// Fetches metrics from the aggregator and flushes all data types in parallel.
310+
async fn flush_blocking_inner(&self, force_stats: bool) {
311311
let flush_response = self
312312
.metrics_aggr_handle
313313
.flush()
314314
.await
315315
.expect("can't flush metrics aggr handle");
316316

317-
let metrics_futures: Vec<_> = metrics_flushers
318-
.iter_mut()
317+
let metrics_futures: Vec<_> = self
318+
.metrics_flushers
319+
.iter()
319320
.map(|f| {
320321
f.flush_metrics(
321322
flush_response.series.clone(),
@@ -332,15 +333,6 @@ impl FlushingService {
332333
self.proxy_flusher.flush(None),
333334
);
334335
}
335-
336-
/// Returns a reference to the metrics flushers mutex for external locking.
337-
///
338-
/// This is useful when you need to lock the metrics flushers and pass them
339-
/// to `flush_blocking` or `flush_blocking_with_interval`.
340-
#[must_use]
341-
pub fn metrics_flushers(&self) -> &Arc<TokioMutex<Vec<MetricsFlusher>>> {
342-
&self.metrics_flushers
343-
}
344336
}
345337

346338
impl std::fmt::Debug for FlushingService {

bottlecap/tests/metrics_integration_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ async fn test_enhanced_metrics() {
5757
retry_strategy: dogstatsd::datadog::RetryStrategy::Immediate(1),
5858
compression_level: 6,
5959
};
60-
let mut metrics_flusher = MetricsFlusher::new(flusher_config);
60+
let metrics_flusher = MetricsFlusher::new(flusher_config);
6161
let lambda_enhanced_metrics =
6262
enhanced_metrics::new(metrics_aggr_handle.clone(), Arc::clone(&arc_config));
6363

0 commit comments

Comments
 (0)