Skip to content

Commit 08b6975

Browse files
fix(metrics): restore IndexMap for unbounded MetricSet to fix CPU regression (#25493)
* perf(metrics): restore IndexMap for unbounded MetricSet to fix CPU regression * refactor(metrics): remove pop_lru from MetricSetInner, check Bounded in enforce_capacity_policy * chore(changelog): add context on when the perf regression is noticeable * refactor(metrics): add debug_assert for impossible Bounded state in enforce_capacity_policy Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent b7aae73 commit 08b6975

2 files changed

Lines changed: 180 additions & 21 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Fixed a CPU regression introduced in 0.50.0 affecting all sinks that use metric normalization such as `prometheus_remote_write`, `aws_cloudwatch_metrics`, `statsd` and others.
2+
3+
The only exception is the `incremental_to_absolute` transform when `max_bytes` or `max_events` are configured, where the overhead is expected and necessary for eviction to work correctly.
4+
5+
authors: thomasqueirozb

src/sinks/util/buffer/metrics/normalize.rs

Lines changed: 175 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{
33
time::{Duration, Instant},
44
};
55

6+
use indexmap::IndexMap;
67
use lru::LruCache;
78
use serde_with::serde_as;
89
use snafu::Snafu;
@@ -375,14 +376,96 @@ pub struct MetricSetSettings {
375376
pub time_to_live: Option<u64>,
376377
}
377378

378-
/// Dual-limit cache using standard LRU with optional capacity and TTL policies.
379+
/// Inner storage for `MetricSet`.
379380
///
380-
/// This implementation uses the standard LRU crate with optional enforcement of both
381-
/// memory and entry count limits via CapacityPolicy, plus optional TTL via TtlPolicy.
381+
/// Uses `IndexMap` when no capacity eviction policy is configured — avoiding the
382+
/// per-access LRU bookkeeping (pointer chasing in a doubly-linked list) that
383+
/// `LruCache::get_mut` performs unconditionally. `LruCache` is used only when a
384+
/// capacity policy is set, so that LRU eviction order is maintained correctly.
385+
#[derive(Clone, Debug)]
386+
enum MetricSetInner {
387+
/// Unbounded storage with no eviction. Hash-map lookup only, no LRU overhead.
388+
Unbounded(IndexMap<MetricSeries, MetricEntry>),
389+
/// Bounded storage with LRU eviction semantics.
390+
Bounded(LruCache<MetricSeries, MetricEntry>),
391+
}
392+
393+
impl MetricSetInner {
394+
fn len(&self) -> usize {
395+
match self {
396+
Self::Unbounded(m) => m.len(),
397+
Self::Bounded(m) => m.len(),
398+
}
399+
}
400+
401+
fn is_empty(&self) -> bool {
402+
match self {
403+
Self::Unbounded(m) => m.is_empty(),
404+
Self::Bounded(m) => m.is_empty(),
405+
}
406+
}
407+
408+
/// Returns a mutable reference to the entry.
409+
///
410+
/// For `Unbounded` this is a plain hash-map lookup.
411+
/// For `Bounded` this also promotes the entry to the MRU end of the LRU list.
412+
fn get_mut(&mut self, key: &MetricSeries) -> Option<&mut MetricEntry> {
413+
match self {
414+
Self::Unbounded(m) => m.get_mut(key),
415+
Self::Bounded(m) => m.get_mut(key),
416+
}
417+
}
418+
419+
/// Inserts or replaces an entry, returning the previous value if any.
420+
fn put(&mut self, key: MetricSeries, value: MetricEntry) -> Option<MetricEntry> {
421+
match self {
422+
Self::Unbounded(m) => m.insert(key, value),
423+
Self::Bounded(m) => m.put(key, value),
424+
}
425+
}
426+
427+
/// Removes an entry by key, returning it if present.
428+
fn pop(&mut self, key: &MetricSeries) -> Option<MetricEntry> {
429+
match self {
430+
// swap_remove is O(1) vs shift_remove's O(n); insertion order is not required here.
431+
Self::Unbounded(m) => m.swap_remove(key),
432+
Self::Bounded(m) => m.pop(key),
433+
}
434+
}
435+
436+
fn iter(&self) -> MetricSetIter<'_> {
437+
match self {
438+
Self::Unbounded(m) => MetricSetIter::Unbounded(m.iter()),
439+
Self::Bounded(m) => MetricSetIter::Bounded(m.iter()),
440+
}
441+
}
442+
}
443+
444+
enum MetricSetIter<'a> {
445+
Unbounded(indexmap::map::Iter<'a, MetricSeries, MetricEntry>),
446+
Bounded(lru::Iter<'a, MetricSeries, MetricEntry>),
447+
}
448+
449+
impl<'a> Iterator for MetricSetIter<'a> {
450+
type Item = (&'a MetricSeries, &'a MetricEntry);
451+
452+
fn next(&mut self) -> Option<Self::Item> {
453+
match self {
454+
Self::Unbounded(it) => it.next(),
455+
Self::Bounded(it) => it.next(),
456+
}
457+
}
458+
}
459+
460+
/// Dual-limit cache for metric normalization with optional capacity and TTL policies.
461+
///
462+
/// Uses `IndexMap` internally when no capacity eviction policy is configured, avoiding
463+
/// the per-access LRU pointer-manipulation overhead of `LruCache`. Switches to
464+
/// `LruCache` only when a `max_bytes` or `max_events` capacity policy is set, so that
465+
/// LRU eviction ordering is preserved for those cases.
382466
#[derive(Clone, Debug)]
383467
pub struct MetricSet {
384-
/// LRU cache for storing metric entries
385-
inner: LruCache<MetricSeries, MetricEntry>,
468+
inner: MetricSetInner,
386469
/// Optional capacity policy for memory and/or entry count limits
387470
capacity_policy: Option<CapacityPolicy>,
388471
/// Optional TTL policy for time-based expiration
@@ -411,10 +494,15 @@ impl MetricSet {
411494
capacity_policy: Option<CapacityPolicy>,
412495
ttl_policy: Option<TtlPolicy>,
413496
) -> Self {
414-
// Always use an unbounded cache since we manually track limits
415-
// This ensures our capacity policy can properly track memory for all evicted entries
497+
// Use LruCache only when a capacity policy requires LRU eviction ordering.
498+
// Without a capacity policy, IndexMap avoids the per-access LRU overhead.
499+
let inner = if capacity_policy.is_some() {
500+
MetricSetInner::Bounded(LruCache::unbounded())
501+
} else {
502+
MetricSetInner::Unbounded(IndexMap::default())
503+
};
416504
Self {
417-
inner: LruCache::unbounded(),
505+
inner,
418506
capacity_policy,
419507
ttl_policy,
420508
}
@@ -463,9 +551,15 @@ impl MetricSet {
463551
return; // No capacity limits configured
464552
};
465553

554+
// A capacity policy is only set when inner is Bounded; this should always be true.
555+
let MetricSetInner::Bounded(ref mut lru) = self.inner else {
556+
debug_assert!(false, "capacity policy set but inner is not Bounded");
557+
return;
558+
};
559+
466560
// Keep evicting until we're within limits
467-
while capacity_policy.needs_eviction(self.inner.len()) {
468-
if let Some((series, entry)) = self.inner.pop_lru() {
561+
while capacity_policy.needs_eviction(lru.len()) {
562+
if let Some((series, entry)) = lru.pop_lru() {
469563
capacity_policy.free_item(&series, &entry);
470564
} else {
471565
break; // No more entries to evict
@@ -497,14 +591,13 @@ impl MetricSet {
497591
return; // No TTL policy, nothing to do
498592
};
499593

500-
let mut expired_keys = Vec::new();
501-
502594
// Collect expired keys using the provided timestamp
503-
for (series, entry) in self.inner.iter() {
504-
if entry.is_expired(ttl, now) {
505-
expired_keys.push(series.clone());
506-
}
507-
}
595+
let expired_keys: Vec<MetricSeries> = self
596+
.inner
597+
.iter()
598+
.filter(|(_, e)| e.is_expired(ttl, now))
599+
.map(|(s, _)| s.clone())
600+
.collect();
508601

509602
// Remove expired entries and update memory tracking (if max_bytes is set)
510603
for series in expired_keys {
@@ -549,11 +642,19 @@ impl MetricSet {
549642
pub fn into_metrics(mut self) -> Vec<Metric> {
550643
// Clean up expired entries first (using current time)
551644
self.cleanup_expired(Instant::now());
552-
let mut metrics = Vec::new();
553-
while let Some((series, entry)) = self.inner.pop_lru() {
554-
metrics.push(entry.into_metric(series));
645+
match self.inner {
646+
MetricSetInner::Unbounded(m) => m
647+
.into_iter()
648+
.map(|(series, entry)| entry.into_metric(series))
649+
.collect(),
650+
MetricSetInner::Bounded(mut m) => {
651+
let mut metrics = Vec::with_capacity(m.len());
652+
while let Some((series, entry)) = m.pop_lru() {
653+
metrics.push(entry.into_metric(series));
654+
}
655+
metrics
656+
}
555657
}
556-
metrics
557658
}
558659

559660
/// Either pass the metric through as-is if absolute, or convert it
@@ -705,3 +806,56 @@ impl Default for MetricSet {
705806
Self::new(MetricSetSettings::default())
706807
}
707808
}
809+
810+
#[cfg(test)]
811+
mod tests {
812+
use vector_lib::event::metric::{MetricKind, MetricValue};
813+
814+
use super::*;
815+
816+
fn counter(name: &str, value: f64, kind: MetricKind) -> Metric {
817+
Metric::new(name, kind, MetricValue::Counter { value })
818+
}
819+
820+
// Verifies that the default (no capacity policy) path uses IndexMap and that
821+
// make_absolute / into_metrics behave correctly across multiple updates.
822+
#[test]
823+
fn unbounded_incremental_to_absolute_accumulates() {
824+
let mut set = MetricSet::default();
825+
assert!(matches!(set.inner, MetricSetInner::Unbounded(_)));
826+
827+
// First incremental: stored as reference, emitted as absolute 1.0
828+
let out = set.make_absolute(counter("hits", 1.0, MetricKind::Incremental));
829+
assert_eq!(out.unwrap().value(), &MetricValue::Counter { value: 1.0 });
830+
831+
// Second incremental: accumulated with previous, emitted as absolute 3.0
832+
let out = set.make_absolute(counter("hits", 2.0, MetricKind::Incremental));
833+
assert_eq!(out.unwrap().value(), &MetricValue::Counter { value: 3.0 });
834+
835+
// into_metrics drains the set and returns all tracked series
836+
let metrics = set.into_metrics();
837+
assert_eq!(metrics.len(), 1);
838+
assert_eq!(metrics[0].name(), "hits");
839+
}
840+
841+
#[test]
842+
fn unbounded_absolute_passes_through() {
843+
let mut set = MetricSet::default();
844+
845+
let out = set.make_absolute(counter("rps", 42.0, MetricKind::Absolute));
846+
assert_eq!(out.unwrap().value(), &MetricValue::Counter { value: 42.0 });
847+
848+
// Absolute metrics are not stored in the set
849+
assert!(set.is_empty());
850+
}
851+
852+
// Verifies that capacity policy switches to the LruCache (Bounded) path.
853+
#[test]
854+
fn bounded_path_selected_when_capacity_policy_set() {
855+
let set = MetricSet::new(MetricSetSettings {
856+
max_events: Some(10),
857+
..Default::default()
858+
});
859+
assert!(matches!(set.inner, MetricSetInner::Bounded(_)));
860+
}
861+
}

0 commit comments

Comments
 (0)