diff --git a/changelog.d/24655.fix.md b/changelog.d/24655.fix.md new file mode 100644 index 0000000000000..b263b1cb00668 --- /dev/null +++ b/changelog.d/24655.fix.md @@ -0,0 +1,4 @@ +Drops finalizers of absolute metrics when they are get dropped during the transformation into incremental metrics which +currently blocks the cleanup of disk buffers. + +authors: johannesfloriangeiger \ No newline at end of file diff --git a/src/sinks/aws_cloudwatch_metrics/mod.rs b/src/sinks/aws_cloudwatch_metrics/mod.rs index 39b3ac17bd217..aa1ff8945c1ae 100644 --- a/src/sinks/aws_cloudwatch_metrics/mod.rs +++ b/src/sinks/aws_cloudwatch_metrics/mod.rs @@ -204,7 +204,12 @@ impl MetricNormalize for AwsCloudwatchMetricNormalize { fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option { match metric.value() { MetricValue::Gauge { .. } => state.make_absolute(metric), - _ => state.make_incremental(metric), + MetricValue::Counter { .. } + | MetricValue::Distribution { .. } + | MetricValue::Set { .. } => { + state.make_incremental_consume_dropped_finalizers(metric, true) + } + _ => None, } } } diff --git a/src/sinks/util/buffer/metrics/normalize.rs b/src/sinks/util/buffer/metrics/normalize.rs index c244506508f18..3ff6add956eb6 100644 --- a/src/sinks/util/buffer/metrics/normalize.rs +++ b/src/sinks/util/buffer/metrics/normalize.rs @@ -6,6 +6,7 @@ use std::{ use lru::LruCache; use serde_with::serde_as; use snafu::Snafu; +use vector_common::finalization::Finalizable; use vector_config_macros::configurable_component; use vector_lib::{ ByteSizeOf, @@ -569,9 +570,21 @@ impl MetricSet { /// Either convert the metric to incremental if absolute, or /// aggregate it with any previous value if already incremental. pub fn make_incremental(&mut self, metric: Metric) -> Option { + self.make_incremental_consume_dropped_finalizers(metric, false) + } + + /// Either convert the metric to incremental if absolute, or + /// aggregate it with any previous value if already incremental and optionally consume the dropped finalizers. + pub fn make_incremental_consume_dropped_finalizers( + &mut self, + metric: Metric, + consume_dropped_finalizers: bool, + ) -> Option { self.maybe_cleanup(); match metric.kind() { - MetricKind::Absolute => self.absolute_to_incremental(metric), + MetricKind::Absolute => { + self.absolute_to_incremental(metric, consume_dropped_finalizers) + } MetricKind::Incremental => Some(metric), } } @@ -600,8 +613,12 @@ impl MetricSet { } /// Convert the absolute metric into an incremental by calculating - /// the increment from the last saved absolute state. - fn absolute_to_incremental(&mut self, mut metric: Metric) -> Option { + /// the increment from the last saved absolute state and optionally consume the dropped finalizers. + fn absolute_to_incremental( + &mut self, + mut metric: Metric, + consume_dropped_finalizers: bool, + ) -> Option { // NOTE: Crucially, like I did, you may wonder: why do we not always return a metric? Could // this lead to issues where a metric isn't seen again and we, in effect, never emit it? // @@ -636,12 +653,20 @@ impl MetricSet { self.insert_with_tracking(metric.series().clone(), new_reference); Some(metric.into_incremental()) } else { + if consume_dropped_finalizers { + metric.take_finalizers(); + } + // Metric changed type, store this and emit nothing self.insert(metric, timestamp); None } } None => { + if consume_dropped_finalizers { + metric.take_finalizers(); + } + // No reference so store this and emit nothing self.insert(metric, timestamp); None