Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
13 commits
Select commit Hold shift + click to select a range
e1fa59b
feat(aws_cloudwatch_metrics sink): #24655 Drops finalizers of dropped…
johannesfloriangeiger Mar 28, 2026
94f0c40
Merge branch 'master' into 24655-drop-finalizers-for-dropped-absolute…
johannesfloriangeiger Apr 2, 2026
08267da
Merge branch 'master' into 24655-drop-finalizers-for-dropped-absolute…
johannesfloriangeiger Apr 6, 2026
339b6a7
Merge branch 'master' into 24655-drop-finalizers-for-dropped-absolute…
johannesfloriangeiger Apr 7, 2026
35628b6
Merge branch 'master' into 24655-drop-finalizers-for-dropped-absolute…
johannesfloriangeiger Apr 9, 2026
955348e
Merge branch 'master' into 24655-drop-finalizers-for-dropped-absolute…
johannesfloriangeiger Apr 11, 2026
c62905f
Merge branch 'master' into 24655-drop-finalizers-for-dropped-absolute…
johannesfloriangeiger Apr 17, 2026
bb7de7b
Merge branch 'master' into 24655-drop-finalizers-for-dropped-absolute…
johannesfloriangeiger Apr 20, 2026
088b6a2
Merge branch 'master' into 24655-drop-finalizers-for-dropped-absolute…
johannesfloriangeiger Apr 21, 2026
0e56850
Merge branch 'master' into 24655-drop-finalizers-for-dropped-absolute…
johannesfloriangeiger Apr 22, 2026
327db36
Merge branch 'master' into 24655-drop-finalizers-for-dropped-absolute…
johannesfloriangeiger Apr 27, 2026
5db0c49
Merge branch 'master' into 24655-drop-finalizers-for-dropped-absolute…
johannesfloriangeiger Apr 30, 2026
063fe64
Merge branch 'master' into 24655-drop-finalizers-for-dropped-absolute…
johannesfloriangeiger May 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.d/24655.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Drops finalizers of absolute metrics when they are get dropped during the transformation into incremental metrics which
currently blocks the cleanup of disk buffers.

authors: johannesfloriangeiger
7 changes: 6 additions & 1 deletion src/sinks/aws_cloudwatch_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,12 @@ impl MetricNormalize for AwsCloudwatchMetricNormalize {
fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
match metric.value() {
MetricValue::Gauge { .. } => state.make_absolute(metric),
_ => state.make_incremental(metric),
MetricValue::Counter { .. }
| MetricValue::Distribution { .. }
| MetricValue::Set { .. } => {
state.make_incremental_consume_dropped_finalizers(metric, true)
}
_ => None,
}
}
}
Expand Down
31 changes: 28 additions & 3 deletions src/sinks/util/buffer/metrics/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
use lru::LruCache;
use serde_with::serde_as;
use snafu::Snafu;
use vector_common::finalization::Finalizable;
use vector_config_macros::configurable_component;
use vector_lib::{
ByteSizeOf,
Expand Down Expand Up @@ -569,9 +570,21 @@ impl MetricSet {
/// Either convert the metric to incremental if absolute, or
/// aggregate it with any previous value if already incremental.
pub fn make_incremental(&mut self, metric: Metric) -> Option<Metric> {
self.make_incremental_consume_dropped_finalizers(metric, false)
}

/// Either convert the metric to incremental if absolute, or
/// aggregate it with any previous value if already incremental and optionally consume the dropped finalizers.
pub fn make_incremental_consume_dropped_finalizers(
&mut self,
metric: Metric,
consume_dropped_finalizers: bool,
) -> Option<Metric> {
self.maybe_cleanup();
match metric.kind() {
MetricKind::Absolute => self.absolute_to_incremental(metric),
MetricKind::Absolute => {
self.absolute_to_incremental(metric, consume_dropped_finalizers)
}
MetricKind::Incremental => Some(metric),
}
}
Expand Down Expand Up @@ -600,8 +613,12 @@ impl MetricSet {
}

/// Convert the absolute metric into an incremental by calculating
/// the increment from the last saved absolute state.
fn absolute_to_incremental(&mut self, mut metric: Metric) -> Option<Metric> {
/// the increment from the last saved absolute state and optionally consume the dropped finalizers.
fn absolute_to_incremental(
&mut self,
mut metric: Metric,
consume_dropped_finalizers: bool,
) -> Option<Metric> {
// NOTE: Crucially, like I did, you may wonder: why do we not always return a metric? Could
// this lead to issues where a metric isn't seen again and we, in effect, never emit it?
//
Expand Down Expand Up @@ -636,12 +653,20 @@ impl MetricSet {
self.insert_with_tracking(metric.series().clone(), new_reference);
Some(metric.into_incremental())
} else {
if consume_dropped_finalizers {
metric.take_finalizers();
}

// Metric changed type, store this and emit nothing
self.insert(metric, timestamp);
None
}
}
None => {
if consume_dropped_finalizers {
metric.take_finalizers();
}

// No reference so store this and emit nothing
self.insert(metric, timestamp);
None
Expand Down
Loading