Skip to content

Commit ef956ee

Browse files
committed
remove finalizer from cache
1 parent ce65175 commit ef956ee

2 files changed

Lines changed: 8 additions & 2 deletions

File tree

src/sinks/util/buffer/metrics/normalize.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,13 @@ impl MetricSet {
649649
}
650650
}
651651

652-
fn insert(&mut self, metric: Metric, timestamp: Option<Instant>) {
652+
fn insert(&mut self, mut metric: Metric, timestamp: Option<Instant>) {
653+
// Strip finalizers before caching. The cache is only used for
654+
// normalization state (tracking running totals) and must not hold
655+
// Arc<EventFinalizer> references, as that prevents the disk buffer
656+
// from acknowledging events — leading to a deadlock once the buffer
657+
// fills up and no new events can replace cache entries.
658+
metric.metadata_mut().take_finalizers();
653659
let (series, entry) = MetricEntry::from_metric(metric, timestamp);
654660
self.insert_with_tracking(series, entry);
655661
}

src/sinks/util/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ pub trait SinkBuilderExt: Stream {
231231
Self: Stream<Item = Metric> + Unpin + Sized,
232232
N: MetricNormalize + Default,
233233
{
234-
match maybe_ttl_secs {
234+
match maybe_ttl_secs.filter(|&ttl| ttl > 0.0) {
235235
None => Normalizer::new(self, N::default()),
236236
Some(ttl) => {
237237
Normalizer::new_with_ttl(self, N::default(), Duration::from_secs(ttl as u64))

0 commit comments

Comments
 (0)