From 13e40e41f019e3e5a93829dea35539e728e139e3 Mon Sep 17 00:00:00 2001 From: Karol Chrapek Date: Fri, 22 May 2026 10:12:33 +0200 Subject: [PATCH 1/8] enhancement(tag_cardinality_limit transform): add sliding-window TTL for tracked tag values MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `ttl_secs` and `ttl_generations` to the global config, per-metric overrides, and the corresponding schemas. When set, tag values not observed within `ttl_secs` are expired so fresh values can take their slot under `value_limit` — useful when the downstream backend (e.g. Datadog custom metrics) counts uniqueness over a rolling window. `mode: exact` uses a `HashMap` with lazy sweep; `mode: probabilistic` uses a generational rolling bloom filter so a non-deletable bloom can still expire values (eviction granularity = `ttl_secs / ttl_generations`). Both refresh on sighting and evict lazily on access. The `DropEvent` pre-check uses a new `contains_no_refresh` variant so rejected events don't extend leases. New `tag_cardinality_ttl_expirations_total` internal counter exposes eviction volume. Backwards compatible: unset / `0` ⇒ pre-PR behavior. --- .../src/internal_event/metric_name.rs | 2 + src/internal_events/tag_cardinality_limit.rs | 24 + .../tag_cardinality_limit/config.rs | 55 ++ src/transforms/tag_cardinality_limit/mod.rs | 71 ++- .../tag_cardinality_limit/tag_value_set.rs | 506 ++++++++++++++++-- src/transforms/tag_cardinality_limit/tests.rs | 176 ++++-- .../components/sources/internal_metrics.cue | 12 + .../transforms/tag_cardinality_limit.cue | 57 +- 8 files changed, 816 insertions(+), 87 deletions(-) diff --git a/lib/vector-common/src/internal_event/metric_name.rs b/lib/vector-common/src/internal_event/metric_name.rs index f520a3ecee316..52fb1c597d804 100644 --- a/lib/vector-common/src/internal_event/metric_name.rs +++ b/lib/vector-common/src/internal_event/metric_name.rs @@ -76,6 +76,7 @@ pub enum CounterName { StaleEventsFlushedTotal, StartedTotal, StoppedTotal, + TagCardinalityTtlExpirationsTotal, TagCardinalityUntrackedEventsTotal, TagValueLimitExceededTotal, ValueLimitReachedTotal, @@ -335,6 +336,7 @@ impl CounterName { Self::StaleEventsFlushedTotal => "stale_events_flushed_total", Self::StartedTotal => "started_total", Self::StoppedTotal => "stopped_total", + Self::TagCardinalityTtlExpirationsTotal => "tag_cardinality_ttl_expirations_total", Self::TagCardinalityUntrackedEventsTotal => "tag_cardinality_untracked_events_total", Self::TagValueLimitExceededTotal => "tag_value_limit_exceeded_total", Self::ValueLimitReachedTotal => "value_limit_reached_total", diff --git a/src/internal_events/tag_cardinality_limit.rs b/src/internal_events/tag_cardinality_limit.rs index 8b9cd25c6d64c..77b86ebe86a42 100644 --- a/src/internal_events/tag_cardinality_limit.rs +++ b/src/internal_events/tag_cardinality_limit.rs @@ -103,3 +103,27 @@ impl InternalEvent for TagCardinalityTrackedKeys { gauge!(GaugeName::TagCardinalityTrackedKeys).set(self.count as f64); } } + +/// Emitted when a TTL sweep removes tag values from a tracking bucket. +/// +/// The `count` is the number of *distinct* values evicted in that pass. +/// For the probabilistic backend this is the count drained from the oldest +/// rolling-bloom shard; for the exact backend it is the number of entries +/// whose last sighting was older than `ttl_secs`. +#[derive(NamedInternalEvent)] +pub struct TagCardinalityTtlExpired { + pub count: u64, +} + +impl InternalEvent for TagCardinalityTtlExpired { + fn emit(self) { + if self.count == 0 { + return; + } + debug!( + message = "Expired tag values from cardinality cache.", + count = self.count, + ); + counter!(CounterName::TagCardinalityTtlExpirationsTotal).increment(self.count); + } +} diff --git a/src/transforms/tag_cardinality_limit/config.rs b/src/transforms/tag_cardinality_limit/config.rs index a1471746972fa..4d83441b7218e 100644 --- a/src/transforms/tag_cardinality_limit/config.rs +++ b/src/transforms/tag_cardinality_limit/config.rs @@ -98,6 +98,35 @@ pub struct Inner { #[configurable(derived)] #[serde(default)] pub internal_metrics: InternalMetricsConfig, + + /// Expire tracked tag values after this many seconds since they were last seen. + /// + /// When unset (default) or set to `0`, values persist for the lifetime of the + /// process — the historical behavior. When set to a positive value, the + /// transform behaves like a sliding window: any tag value not observed within + /// the TTL is dropped, freeing room under `value_limit` for fresh values. + /// Useful for bounding cost on backends (e.g. Datadog custom metrics) that + /// bill on a rolling unique-series window. + /// + /// In `exact` mode every value carries a precise last-seen timestamp; in + /// `probabilistic` mode the underlying bloom filter is split into + /// `ttl_generations` rolling shards, so eviction is approximate to within + /// `ttl_secs / ttl_generations`. + #[serde(default)] + #[configurable(metadata(docs::human_name = "TTL (seconds)"))] + pub ttl_secs: Option, + + /// Number of time-slices the TTL window is split into for the + /// `probabilistic` backend. + /// + /// Higher values smooth eviction (closer to a true sliding window) at the + /// cost of `ttl_generations * cache_size_per_key` memory per (metric, + /// tag-key) pair. `1` produces a tumbling window: all tracked values are + /// dropped at once every `ttl_secs`. Ignored when `ttl_secs` is unset, or + /// when mode is `exact` (which uses precise per-value timestamps). + #[serde(default = "default_ttl_generations")] + #[configurable(metadata(docs::human_name = "TTL Generations"))] + pub ttl_generations: u8, } /// Controls the approach taken for tracking tag cardinality at the global level. @@ -169,6 +198,23 @@ pub struct OverrideInner { #[configurable(derived)] #[serde(default)] pub internal_metrics: InternalMetricsConfig, + + /// Per-metric TTL for tracked tag values. See [`Inner::ttl_secs`] for the + /// full description. + /// + /// Per-metric TTL is a **full override** of the global TTL — it does not + /// inherit. Leaving this unset means "no TTL for this metric", *not* + /// "fall back to the global `ttl_secs`". This mirrors how a per-metric + /// `value_limit` fully shadows the global one. If you want a metric to + /// share the global TTL, copy the value explicitly. + #[serde(default)] + #[configurable(metadata(docs::human_name = "TTL (seconds)"))] + pub ttl_secs: Option, + + /// Per-metric override for `ttl_generations`. See [`Inner::ttl_generations`]. + #[serde(default = "default_ttl_generations")] + #[configurable(metadata(docs::human_name = "TTL Generations"))] + pub ttl_generations: u8, } /// Controls the approach taken for tracking tag cardinality at the per-metric level. @@ -308,6 +354,13 @@ pub(crate) const fn default_cache_size() -> usize { 5 * 1024 // 5KB } +/// Default number of rolling-bloom shards. Four gives a reasonable middle ground: +/// eviction granularity of `ttl/4`, and a 4x memory multiplier on +/// `cache_size_per_key` for users who opt into TTL. +pub(crate) const fn default_ttl_generations() -> u8 { + 4 +} + // ============================================================================= // Transform plumbing // ============================================================================= @@ -320,6 +373,8 @@ impl GenerateConfig for Config { value_limit: default_value_limit(), limit_exceeded_action: default_limit_exceeded_action(), internal_metrics: InternalMetricsConfig::default(), + ttl_secs: None, + ttl_generations: default_ttl_generations(), }, tracking_scope: TrackingScope::default(), max_tracked_keys: None, diff --git a/src/transforms/tag_cardinality_limit/mod.rs b/src/transforms/tag_cardinality_limit/mod.rs index 471d44661d752..22624dfb44491 100644 --- a/src/transforms/tag_cardinality_limit/mod.rs +++ b/src/transforms/tag_cardinality_limit/mod.rs @@ -120,6 +120,11 @@ impl TagCardinalityLimit { let limit_exceeded_action = per_metric.config.limit_exceeded_action; let metric_value_limit = per_metric.config.value_limit; let internal_metrics = per_metric.config.internal_metrics; + // Per-metric TTL is a *full override* of the global TTL for this metric: + // unset (`None`) means "no TTL for this metric" rather than "inherit from + // global", mirroring how per-metric `value_limit` shadows the global value. + let ttl_secs = per_metric.config.ttl_secs; + let ttl_generations = per_metric.config.ttl_generations; // Per-tag entry: LimitOverride uses an explicit value_limit; Excluded opts // the tag out. All other settings are always inherited from per-metric. @@ -134,6 +139,8 @@ impl TagCardinalityLimit { limit_exceeded_action, mode: metric_mode, internal_metrics, + ttl_secs, + ttl_generations, }); } } @@ -143,6 +150,8 @@ impl TagCardinalityLimit { limit_exceeded_action, mode: metric_mode, internal_metrics, + ttl_secs, + ttl_generations, }) } @@ -211,12 +220,18 @@ impl TagCardinalityLimit { } let metric_accepted_tags = self.accepted_tags.entry(metric_key_owned).or_default(); - let tag_value_set = metric_accepted_tags - .entry_ref(key) - .or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode)); + let tag_value_set = metric_accepted_tags.entry_ref(key).or_insert_with(|| { + AcceptedTagValueSet::new( + config.value_limit, + &config.mode, + config.ttl_secs, + config.ttl_generations, + ) + }); if tag_value_set.contains(value) { - // Tag value has already been accepted, nothing more to do. + // Already accepted; `contains` also refreshes the TTL lease on + // TTL backends. See `AcceptedTagValueSet::contains`. return AcceptResult::Tracked; } @@ -238,8 +253,12 @@ impl TagCardinalityLimit { /// Checks if recording a key and value corresponding to a tag on an incoming Metric would /// exceed the cardinality limit. + /// + /// Note: takes `&mut self` because TTL-enabled backends (`TtlSet`, + /// `RollingBloom`) perform lazy sweep/rotation inside `contains`/`len`. + /// The non-TTL backends are still effectively read-only here. fn tag_limit_exceeded( - &self, + &mut self, metric_key: Option<&MetricId>, key: &str, value: &TagValueSet, @@ -248,21 +267,30 @@ impl TagCardinalityLimit { TagSettings::Excluded => return false, TagSettings::Tracked(inner) => inner, }; + let can_allocate = self.can_allocate_new_key(); match self .accepted_tags - .get(&metric_key.cloned()) - .and_then(|metric_accepted_tags| metric_accepted_tags.get(key)) + .get_mut(&metric_key.cloned()) + .and_then(|metric_accepted_tags| metric_accepted_tags.get_mut(key)) { - // Already accepted — never exceeds. - Some(value_set) if value_set.contains(value) => false, - // Adding this value would push us at or past the configured cap. Treat a - // missing bucket as an empty set so `value_limit: 0` correctly rejects - // the first occurrence too — but only when the (metric, tag) pair would - // actually be tracked. If `max_tracked_keys` is exhausted, `record_tag_value` - // will pass the tag through unchecked and emit `TagCardinalityLimitUntracked`, - // so we must not pre-empt that path by reporting the limit as exceeded here. - Some(value_set) => value_set.len() >= resolved.value_limit, - None => resolved.value_limit == 0 && self.can_allocate_new_key(), + // Pattern guards bind variables immutably, so the mutable call + // can't live in a guard; hence the if/else inside the arm. + Some(value_set) => { + // Must be the non-refreshing variant: `DropEvent` may still + // reject this event on a later tag, and we must not extend + // any TTL lease for an event that gets dropped. Refresh + // happens on the accept path via `record_tag_value::insert`. + if value_set.contains_no_refresh(value) { + false + } else { + value_set.len() >= resolved.value_limit + } + } + // Missing bucket: treat as empty so `value_limit: 0` rejects the + // first occurrence too — but only when the pair can actually be + // allocated. Otherwise `record_tag_value` will forward untracked + // (and emit `TagCardinalityLimitUntracked`). + None => resolved.value_limit == 0 && can_allocate, } } @@ -300,7 +328,14 @@ impl TagCardinalityLimit { let metric_accepted_tags = self.accepted_tags.entry(metric_key_owned).or_default(); metric_accepted_tags .entry_ref(key) - .or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode)) + .or_insert_with(|| { + AcceptedTagValueSet::new( + config.value_limit, + &config.mode, + config.ttl_secs, + config.ttl_generations, + ) + }) .insert(value.clone()); false } diff --git a/src/transforms/tag_cardinality_limit/tag_value_set.rs b/src/transforms/tag_cardinality_limit/tag_value_set.rs index abb7c9f80ac03..8d7562cfdc764 100644 --- a/src/transforms/tag_cardinality_limit/tag_value_set.rs +++ b/src/transforms/tag_cardinality_limit/tag_value_set.rs @@ -1,8 +1,31 @@ -use std::{collections::HashSet, fmt}; +//! Storage backends for accepted tag values. +//! +//! Four variants, picked at construction time from `(Mode, ttl_secs)`: +//! +//! - `Set` — `HashSet`, no TTL. Original exact-mode behavior. +//! - `Bloom` — single `BloomFilter`, no TTL. Original probabilistic-mode behavior. +//! - `TtlSet` — `HashMap` with periodic sweep. Exact mode + TTL. +//! - `RollingBloom` — `VecDeque` of `ttl_generations` shards, lazily +//! rotated. Probabilistic mode + TTL. +//! +//! Both TTL variants use "refresh on sighting" semantics: every `contains()` hit +//! extends the value's lease, so continuously-observed values stay in the cache +//! across rotation boundaries. Eviction is lazy — driven by `insert()` and +//! `contains()` calls — so there's no background task. + +use std::{ + collections::{HashMap, HashSet, VecDeque}, + fmt, + time::{Duration, Instant}, +}; use bloomy::BloomFilter; -use crate::{event::metric::TagValueSet, transforms::tag_cardinality_limit::config::Mode}; +use crate::{ + event::metric::TagValueSet, + internal_events::TagCardinalityTtlExpired, + transforms::tag_cardinality_limit::config::Mode, +}; /// Container for storing the set of accepted values for a given tag key. #[derive(Debug)] @@ -13,6 +36,8 @@ pub struct AcceptedTagValueSet { enum TagValueSetStorage { Set(HashSet), Bloom(BloomFilterStorage), + TtlSet(TtlExactStorage), + RollingBloom(RollingBloomStorage), } /// A bloom filter that tracks the number of items inserted into it. @@ -49,37 +74,286 @@ impl BloomFilterStorage { } } +// ============================================================================= +// Exact mode + TTL +// ============================================================================= + +/// `HashMap`-backed exact cache with per-value last-seen timestamps. +/// +/// Sweep policy: at most once per `sweep_interval` (= `ttl / max(generations, 1)`, +/// floored to 1 second), `retain` drops every entry whose `last_seen` is older +/// than `ttl`. The sweep runs inside `insert`/`contains`/`len` — lazy, no +/// background task. +struct TtlExactStorage { + map: HashMap, + ttl: Duration, + sweep_interval: Duration, + last_sweep: Instant, +} + +impl TtlExactStorage { + fn new(value_limit: usize, ttl: Duration, generations: u8) -> Self { + let now = Instant::now(); + let divisor = generations.max(1) as u32; + let sweep_interval = (ttl / divisor).max(Duration::from_secs(1)); + Self { + map: HashMap::with_capacity(value_limit), + ttl, + sweep_interval, + last_sweep: now, + } + } + + fn maybe_sweep(&mut self, now: Instant) { + if now.duration_since(self.last_sweep) < self.sweep_interval { + return; + } + self.sweep(now); + } + + fn sweep(&mut self, now: Instant) { + let ttl = self.ttl; + let before = self.map.len(); + self.map + .retain(|_, last_seen| now.duration_since(*last_seen) <= ttl); + let expired = before.saturating_sub(self.map.len()) as u64; + self.last_sweep = now; + emit!(TagCardinalityTtlExpired { count: expired }); + } + + fn contains(&mut self, value: &TagValueSet) -> bool { + let now = Instant::now(); + self.maybe_sweep(now); + // Refresh lease on every sighting so continuously-seen values don't blink out. + if let Some(slot) = self.map.get_mut(value) { + *slot = now; + true + } else { + false + } + } + + /// Read-only membership check: triggers lazy sweep so the answer reflects + /// post-expiry state, but does **not** refresh the value's lease. Used in + /// `DropEvent` pre-check paths where we must not mutate cache state for + /// events that are about to be dropped. + fn contains_no_refresh(&mut self, value: &TagValueSet) -> bool { + let now = Instant::now(); + self.maybe_sweep(now); + self.map.contains_key(value) + } + + fn insert(&mut self, value: TagValueSet) { + let now = Instant::now(); + self.maybe_sweep(now); + self.map.insert(value, now); + } + + fn len(&mut self) -> usize { + let now = Instant::now(); + self.maybe_sweep(now); + self.map.len() + } +} + +// ============================================================================= +// Probabilistic mode + TTL: rolling bloom filter +// ============================================================================= + +/// Sliding-window bloom: `generations` shards, each a full `cache_size_per_key` +/// bloom filter. Front of the deque is the oldest shard; back is the current. +/// On rotation, the front shard is dropped and a fresh empty one is pushed at the +/// back. Membership is the OR across shards; refresh-on-sighting writes hits +/// into the current shard so hot values survive future rotations. +struct RollingBloomStorage { + shards: VecDeque, + generations: u8, + slice: Duration, + cache_size_per_key: usize, + /// The boundary at which the next rotation is due. Advances by `slice` on + /// every rotation. Storing the *next-tick* timestamp instead of "last_rotate" + /// makes the catch-up loop in `rotate_if_needed` trivial and tolerant of + /// long pauses between calls. + next_rotate: Instant, +} + +impl RollingBloomStorage { + fn new(cache_size_per_key: usize, generations: u8, ttl: Duration) -> Self { + let generations = generations.max(1); + // Avoid a zero-duration slice (would cause `rotate_if_needed` to spin). + let slice = (ttl / generations as u32).max(Duration::from_secs(1)); + let mut shards = VecDeque::with_capacity(generations as usize); + shards.push_back(BloomFilterStorage::new(cache_size_per_key)); + let now = Instant::now(); + Self { + shards, + generations, + slice, + cache_size_per_key, + next_rotate: now + slice, + } + } + + fn rotate_if_needed(&mut self, now: Instant) { + // Catch up if we've been idle for multiple slices. Capped to `generations` + // pops because every shard would have rotated out anyway. + let mut rotations = 0u8; + while now >= self.next_rotate && rotations < self.generations { + if self.shards.len() >= self.generations as usize + && let Some(dropped) = self.shards.pop_front() + { + emit!(TagCardinalityTtlExpired { + count: dropped.count() as u64, + }); + } + self.shards + .push_back(BloomFilterStorage::new(self.cache_size_per_key)); + self.next_rotate += self.slice; + rotations += 1; + } + // If we needed more rotations than `generations`, the whole window is + // stale — fast-forward `next_rotate` to avoid a tight catch-up the next + // call after a long idle period. + if now >= self.next_rotate { + self.next_rotate = now + self.slice; + } + } + + fn contains(&mut self, value: &TagValueSet) -> bool { + let now = Instant::now(); + self.rotate_if_needed(now); + + // Check newest -> oldest so hot values short-circuit immediately. + let found = self.shards.iter().rev().any(|s| s.contains(value)); + if found { + // Refresh: ensure the value is in the current shard so it survives + // the next rotation. `BloomFilterStorage::insert` is idempotent. + if let Some(newest) = self.shards.back_mut() { + newest.insert(value); + } + } + found + } + + /// Read-only membership check: triggers lazy rotation but does **not** + /// refresh the value's presence in the current shard. See + /// `TtlExactStorage::contains_no_refresh` for the rationale. + fn contains_no_refresh(&mut self, value: &TagValueSet) -> bool { + let now = Instant::now(); + self.rotate_if_needed(now); + self.shards.iter().rev().any(|s| s.contains(value)) + } + + fn insert(&mut self, value: &TagValueSet) { + let now = Instant::now(); + self.rotate_if_needed(now); + if let Some(newest) = self.shards.back_mut() { + newest.insert(value); + } + } + + fn len(&mut self) -> usize { + let now = Instant::now(); + self.rotate_if_needed(now); + // Cardinality is bounded above by any individual shard's count under + // refresh-on-sighting (hot values are present in every shard). Taking + // the max is cheap and converges to the true unique count as soon as + // every retained value has been seen at least once per slice. + self.shards.iter().map(|s| s.count()).max().unwrap_or(0) + } +} + +// ============================================================================= + impl fmt::Debug for TagValueSetStorage { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { TagValueSetStorage::Set(set) => write!(f, "Set({set:?})"), TagValueSetStorage::Bloom(_) => write!(f, "Bloom"), + TagValueSetStorage::TtlSet(s) => { + write!(f, "TtlSet(len={}, ttl={:?})", s.map.len(), s.ttl) + } + TagValueSetStorage::RollingBloom(s) => { + write!( + f, + "RollingBloom(generations={}, slice={:?})", + s.generations, s.slice + ) + } } } } impl AcceptedTagValueSet { - pub fn new(value_limit: usize, mode: &Mode) -> Self { - let storage = match &mode { - Mode::Exact => TagValueSetStorage::Set(HashSet::with_capacity(value_limit)), - Mode::Probabilistic(config) => { + /// Construct the appropriate backend. + /// + /// When `ttl_secs` is `None` (default), the backend is identical to the pre-TTL + /// behavior — `HashSet` for exact, single `BloomFilter` for probabilistic — so + /// existing configs see zero behavioral change. + pub fn new( + value_limit: usize, + mode: &Mode, + ttl_secs: Option, + ttl_generations: u8, + ) -> Self { + let ttl = ttl_secs.and_then(|s| (s > 0).then(|| Duration::from_secs(s))); + + let storage = match (mode, ttl) { + (Mode::Exact, None) => TagValueSetStorage::Set(HashSet::with_capacity(value_limit)), + (Mode::Exact, Some(ttl)) => { + TagValueSetStorage::TtlSet(TtlExactStorage::new(value_limit, ttl, ttl_generations)) + } + (Mode::Probabilistic(config), None) => { TagValueSetStorage::Bloom(BloomFilterStorage::new(config.cache_size_per_key)) } + (Mode::Probabilistic(config), Some(ttl)) => TagValueSetStorage::RollingBloom( + RollingBloomStorage::new(config.cache_size_per_key, ttl_generations, ttl), + ), }; Self { storage } } - pub fn contains(&self, value: &TagValueSet) -> bool { - match &self.storage { + /// Returns true if `value` is currently retained. + /// + /// In TTL-enabled backends this is a mutating operation: it triggers lazy + /// sweep/rotation **and refreshes the value's lease on a hit**. Use this + /// on the accept path (`try_accept_tag`, where a hit means we keep the + /// value). For read-only checks where the event might still be rejected, + /// use [`Self::contains_no_refresh`]. + pub fn contains(&mut self, value: &TagValueSet) -> bool { + match &mut self.storage { TagValueSetStorage::Set(set) => set.contains(value), TagValueSetStorage::Bloom(bloom) => bloom.contains(value), + TagValueSetStorage::TtlSet(s) => s.contains(value), + TagValueSetStorage::RollingBloom(s) => s.contains(value), } } - pub fn len(&self) -> usize { - match &self.storage { + /// Like [`Self::contains`] but never refreshes the value's TTL lease. + /// + /// The `DropEvent` pre-check pass uses this so that an event rejected by a + /// later tag does not silently extend the leases of earlier-checked values. + /// The semantic of TTL eviction is "what's been *accepted* in the last N + /// seconds", not "what's been *seen* in the last N seconds". + pub fn contains_no_refresh(&mut self, value: &TagValueSet) -> bool { + match &mut self.storage { + TagValueSetStorage::Set(set) => set.contains(value), + TagValueSetStorage::Bloom(bloom) => bloom.contains(value), + TagValueSetStorage::TtlSet(s) => s.contains_no_refresh(value), + TagValueSetStorage::RollingBloom(s) => s.contains_no_refresh(value), + } + } + + /// Number of distinct values currently retained. + /// + /// In TTL-enabled backends this also triggers lazy sweep/rotation so the + /// returned figure reflects post-expiry state. + pub fn len(&mut self) -> usize { + match &mut self.storage { TagValueSetStorage::Set(set) => set.len(), TagValueSetStorage::Bloom(bloom) => bloom.count(), + TagValueSetStorage::TtlSet(s) => s.len(), + TagValueSetStorage::RollingBloom(s) => s.len(), } } @@ -89,6 +363,8 @@ impl AcceptedTagValueSet { set.insert(value); } TagValueSetStorage::Bloom(bloom) => bloom.insert(&value), + TagValueSetStorage::TtlSet(s) => s.insert(value), + TagValueSetStorage::RollingBloom(s) => s.insert(&value), }; } } @@ -96,42 +372,202 @@ impl AcceptedTagValueSet { #[cfg(test)] mod tests { use super::*; - use crate::{event::metric::TagValueSet, transforms::tag_cardinality_limit::config::Mode}; + use crate::{ + event::metric::TagValueSet, + transforms::tag_cardinality_limit::config::{BloomFilterConfig, Mode, default_cache_size}, + }; + + fn v(s: &str) -> TagValueSet { + TagValueSet::from([s.to_string()]) + } #[test] - fn test_accepted_tag_value_set_exact() { - let mut accepted_tag_value_set = AcceptedTagValueSet::new(2, &Mode::Exact); + fn exact_no_ttl_preserves_today_behavior() { + let mut set = AcceptedTagValueSet::new(2, &Mode::Exact, None, 4); + assert!(!set.contains(&v("a"))); + assert_eq!(set.len(), 0); + set.insert(v("a")); + set.insert(v("b")); + assert_eq!(set.len(), 2); + assert!(set.contains(&v("a"))); + assert!(set.contains(&v("b"))); + } - assert!(!accepted_tag_value_set.contains(&TagValueSet::from(["value1".to_string()]))); - assert_eq!(accepted_tag_value_set.len(), 0); + #[test] + fn bloom_no_ttl_preserves_today_behavior() { + let mode = Mode::Probabilistic(BloomFilterConfig { + cache_size_per_key: default_cache_size(), + }); + let mut set = AcceptedTagValueSet::new(2, &mode, None, 4); + set.insert(v("a")); + set.insert(v("a")); + assert_eq!(set.len(), 1, "duplicate insert must not bump count"); + set.insert(v("b")); + assert_eq!(set.len(), 2); + assert!(set.contains(&v("a"))); + assert!(set.contains(&v("b"))); + } - accepted_tag_value_set.insert(TagValueSet::from(["value1".to_string()])); - assert_eq!(accepted_tag_value_set.len(), 1); - assert!(accepted_tag_value_set.contains(&TagValueSet::from(["value1".to_string()]))); + // -------------------- TtlExactStorage -------------------- + // + // We exercise the storage type directly so we can advance wall-clock time + // via the `now: Instant` parameter on private helpers. `AcceptedTagValueSet` + // itself uses `Instant::now()`, which can't be mocked cheaply. - accepted_tag_value_set.insert(TagValueSet::from(["value2".to_string()])); - assert_eq!(accepted_tag_value_set.len(), 2); - assert!(accepted_tag_value_set.contains(&TagValueSet::from(["value2".to_string()]))); + #[test] + fn ttl_exact_expires_values_past_ttl() { + let ttl = Duration::from_secs(60); + let mut s = TtlExactStorage::new(8, ttl, 4); + // t0: insert v=a manually so we control its timestamp. + let t0 = Instant::now(); + s.map.insert(v("a"), t0); + s.last_sweep = t0; + // t0+30s: still alive. + s.sweep(t0 + Duration::from_secs(30)); + assert!(s.map.contains_key(&v("a"))); + // t0+90s: expired. + s.sweep(t0 + Duration::from_secs(90)); + assert!(!s.map.contains_key(&v("a"))); } #[test] - fn test_accepted_tag_value_set_probabilistic() { - let mut accepted_tag_value_set = AcceptedTagValueSet::new(2, &Mode::Exact); + fn ttl_exact_refresh_on_contains_extends_lease() { + // Simulate: insert at t0, "sighting" at t0+30s, advance to t0+90s. + // Without refresh, value would be evicted; with refresh, it survives. + let ttl = Duration::from_secs(60); + let mut s = TtlExactStorage::new(8, ttl, 4); + let t0 = Instant::now(); + s.map.insert(v("a"), t0); + s.last_sweep = t0; + // Refresh manually as if `contains` were called at t0+30s. + s.map.insert(v("a"), t0 + Duration::from_secs(30)); + // Sweep at t0+90s: a's last_seen = t0+30, age = 60s, not yet > ttl. + s.sweep(t0 + Duration::from_secs(90)); + assert!( + s.map.contains_key(&v("a")), + "refresh should have extended lease" + ); + } - assert!(!accepted_tag_value_set.contains(&TagValueSet::from(["value1".to_string()]))); - assert_eq!(accepted_tag_value_set.len(), 0); + #[test] + fn ttl_exact_sweep_interval_floors_to_one_second() { + // ttl=2s, generations=8 → naive slice = 250ms; we floor to 1s so sweeps + // never become dominant. Verify the floor. + let s = TtlExactStorage::new(8, Duration::from_secs(2), 8); + assert!(s.sweep_interval >= Duration::from_secs(1)); + } + + #[test] + fn ttl_exact_contains_no_refresh_does_not_extend_lease() { + // Regression for the `DropEvent` pre-check bug: a read-only check + // must NOT update the stored Instant. + let ttl = Duration::from_secs(60); + let mut s = TtlExactStorage::new(8, ttl, 4); + let t0 = Instant::now(); + s.map.insert(v("a"), t0); + s.last_sweep = t0; + assert!(s.contains_no_refresh(&v("a"))); + assert!(s.contains_no_refresh(&v("a"))); + assert_eq!( + s.map.get(&v("a")).copied(), + Some(t0), + "timestamp must remain at t0 after no-refresh checks" + ); + // Sanity: the refreshing variant `contains` *does* update the + // timestamp. We can't pin its exact post-call value (it depends on + // `Instant::now()`), but it must have moved forward. + s.contains(&v("a")); + assert!(s.map.get(&v("a")).copied().unwrap() >= t0); + } + + #[test] + fn rolling_bloom_contains_no_refresh_does_not_seed_newest_shard() { + // Same regression as above, but for the probabilistic backend: a + // no-refresh check must not insert into the newest shard. + let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(4)); + s.shards.back_mut().unwrap().insert(&v("a")); + // Drive a rotation so we have a distinct newest shard. + let t0 = Instant::now(); + s.next_rotate = t0 + Duration::from_secs(1); + s.rotate_if_needed(t0 + Duration::from_secs(2)); + // "a" is in the (now older) front shard, not the back. + let newest_before = s.shards.back().unwrap().count(); + assert!(s.contains_no_refresh(&v("a"))); + let newest_after = s.shards.back().unwrap().count(); + assert_eq!( + newest_before, newest_after, + "contains_no_refresh must not seed the newest shard" + ); + // Sanity: the refreshing variant *does* seed it. + assert!(s.contains(&v("a"))); + assert!(s.shards.back().unwrap().contains(&v("a"))); + } + + // -------------------- RollingBloomStorage -------------------- + + #[test] + fn rolling_bloom_drops_oldest_shard_on_rotate() { + // ttl=4s, generations=4 → 1s per shard. + let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(4)); + let t0 = Instant::now(); + s.next_rotate = t0 + Duration::from_secs(1); + s.shards.back_mut().unwrap().insert(&v("old")); + s.rotate_if_needed(t0 + Duration::from_secs(5)); + assert_eq!(s.shards.len(), 4); + assert!( + !s.shards.iter().any(|sh| sh.contains(&v("old"))), + "'old' should have rolled out of the window" + ); + } - accepted_tag_value_set.insert(TagValueSet::from(["value1".to_string()])); - assert_eq!(accepted_tag_value_set.len(), 1); - assert!(accepted_tag_value_set.contains(&TagValueSet::from(["value1".to_string()]))); + #[test] + fn rolling_bloom_refresh_on_contains_keeps_hot_values() { + // Drive 4 rotations (one full window) and "sight" `hot` between each + // one. Refresh-on-sighting should re-seat `hot` in the current shard + // so it survives the eventual eviction of its original shard. + let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(4)); + let t0 = Instant::now(); + s.next_rotate = t0 + Duration::from_secs(1); + s.shards.back_mut().unwrap().insert(&v("hot")); - // Inserting the same value again should not increase the count. - accepted_tag_value_set.insert(TagValueSet::from(["value1".to_string()])); - assert_eq!(accepted_tag_value_set.len(), 1); - assert!(accepted_tag_value_set.contains(&TagValueSet::from(["value1".to_string()]))); + for step in 1..=4 { + s.rotate_if_needed(t0 + Duration::from_secs(step)); + assert!(s.shards.iter().rev().any(|sh| sh.contains(&v("hot")))); + s.shards.back_mut().unwrap().insert(&v("hot")); + } + assert!(s.shards.iter().any(|sh| sh.contains(&v("hot")))); + } - accepted_tag_value_set.insert(TagValueSet::from(["value2".to_string()])); - assert_eq!(accepted_tag_value_set.len(), 2); - assert!(accepted_tag_value_set.contains(&TagValueSet::from(["value2".to_string()]))); + #[test] + fn rolling_bloom_catch_up_capped_to_generations() { + // Long idle period: ensure rotate_if_needed doesn't spin past + // `generations` even if the elapsed time covers many windows. + let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(4)); + let t0 = Instant::now(); + s.next_rotate = t0 + Duration::from_secs(1); + s.shards.back_mut().unwrap().insert(&v("stale")); + // 1 hour gap: should rotate exactly `generations` times. + s.rotate_if_needed(t0 + Duration::from_secs(3600)); + assert_eq!(s.shards.len(), 4, "deque size capped at `generations`"); + assert!( + !s.shards.iter().any(|sh| sh.contains(&v("stale"))), + "stale value flushed after long idle" + ); + } + + #[test] + fn rolling_bloom_slice_floors_to_one_second() { + // ttl=2s, generations=8 → naive slice = 250ms; floor to 1s. + let s = RollingBloomStorage::new(default_cache_size(), 8, Duration::from_secs(2)); + assert!(s.slice >= Duration::from_secs(1)); + } + + #[test] + fn rolling_bloom_generations_clamped_to_at_least_one() { + // generations=0 would imply div-by-zero or an empty deque; ensure + // the constructor clamps it so we always have at least one shard. + let s = RollingBloomStorage::new(default_cache_size(), 0, Duration::from_secs(60)); + assert_eq!(s.generations, 1); + assert_eq!(s.shards.len(), 1); } } diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index b8a453a3c4844..fec3b7df6b073 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -18,6 +18,7 @@ use crate::{ transforms::{ tag_cardinality_limit::config::{ BloomFilterConfig, InternalMetricsConfig, Mode, default_cache_size, + default_ttl_generations, }, test::create_topology, }, @@ -46,17 +47,25 @@ fn make_metric(tags: MetricTags) -> Event { make_metric_with_name(tags, "event") } +/// Default `Inner` for tests: no TTL, default generations. Used as a base for +/// the `make_transform_*` helpers and any test that constructs `Inner` literally. +fn default_inner(value_limit: usize, action: LimitExceededAction, mode: Mode) -> Inner { + Inner { + value_limit, + limit_exceeded_action: action, + mode, + internal_metrics: InternalMetricsConfig::default(), + ttl_secs: None, + ttl_generations: default_ttl_generations(), + } +} + fn make_transform_hashset( value_limit: usize, limit_exceeded_action: LimitExceededAction, ) -> Config { Config { - global: Inner { - value_limit, - limit_exceeded_action, - mode: Mode::Exact, - internal_metrics: InternalMetricsConfig::default(), - }, + global: default_inner(value_limit, limit_exceeded_action, Mode::Exact), tracking_scope: TrackingScope::default(), max_tracked_keys: None, per_metric_limits: HashMap::new(), @@ -66,14 +75,13 @@ fn make_transform_hashset( fn make_transform_bloom(value_limit: usize, limit_exceeded_action: LimitExceededAction) -> Config { Config { - global: Inner { + global: default_inner( value_limit, limit_exceeded_action, - mode: Mode::Probabilistic(BloomFilterConfig { + Mode::Probabilistic(BloomFilterConfig { cache_size_per_key: default_cache_size(), }), - internal_metrics: InternalMetricsConfig::default(), - }, + ), tracking_scope: TrackingScope::default(), max_tracked_keys: None, per_metric_limits: HashMap::new(), @@ -87,12 +95,7 @@ fn make_transform_hashset_with_per_metric_limits( per_metric_limits: HashMap, ) -> Config { Config { - global: Inner { - value_limit, - limit_exceeded_action, - mode: Mode::Exact, - internal_metrics: InternalMetricsConfig::default(), - }, + global: default_inner(value_limit, limit_exceeded_action, Mode::Exact), tracking_scope: TrackingScope::default(), max_tracked_keys: None, per_metric_limits, @@ -106,14 +109,13 @@ fn make_transform_bloom_with_per_metric_limits( per_metric_limits: HashMap, ) -> Config { Config { - global: Inner { + global: default_inner( value_limit, limit_exceeded_action, - mode: Mode::Probabilistic(BloomFilterConfig { + Mode::Probabilistic(BloomFilterConfig { cache_size_per_key: default_cache_size(), }), - internal_metrics: InternalMetricsConfig::default(), - }, + ), tracking_scope: TrackingScope::default(), max_tracked_keys: None, per_metric_limits, @@ -128,12 +130,7 @@ fn make_transform_with_global_per_tag_limits( per_tag_limits: HashMap, ) -> Config { Config { - global: Inner { - value_limit, - limit_exceeded_action, - mode, - internal_metrics: InternalMetricsConfig::default(), - }, + global: default_inner(value_limit, limit_exceeded_action, mode), tracking_scope: TrackingScope::default(), max_tracked_keys: None, per_metric_limits: HashMap::new(), @@ -445,6 +442,8 @@ fn override_inner_hashset(value_limit: usize, action: LimitExceededAction) -> Ov limit_exceeded_action: action, mode: OverrideMode::Exact, internal_metrics: InternalMetricsConfig::default(), + ttl_secs: None, + ttl_generations: default_ttl_generations(), } } @@ -456,6 +455,8 @@ fn override_inner_bloom(value_limit: usize, action: LimitExceededAction) -> Over cache_size_per_key: default_cache_size(), }), internal_metrics: InternalMetricsConfig::default(), + ttl_secs: None, + ttl_generations: default_ttl_generations(), } } @@ -866,6 +867,8 @@ fn make_per_metric( limit_exceeded_action: action, mode: OverrideMode::Exact, internal_metrics: InternalMetricsConfig::default(), + ttl_secs: None, + ttl_generations: default_ttl_generations(), }, } } @@ -879,6 +882,8 @@ fn make_per_metric_excluded(per_tag_limits: HashMap) -> Pe limit_exceeded_action: LimitExceededAction::DropTag, mode: OverrideMode::Excluded, internal_metrics: InternalMetricsConfig::default(), + ttl_secs: None, + ttl_generations: default_ttl_generations(), }, } } @@ -1388,12 +1393,7 @@ fn global_per_tag_limit_override_caps_at_explicit_value() { #[test] fn global_per_tag_overridden_by_per_metric_entry() { let config = Config { - global: Inner { - value_limit: 2, - limit_exceeded_action: LimitExceededAction::DropTag, - mode: Mode::Exact, - internal_metrics: InternalMetricsConfig::default(), - }, + global: default_inner(2, LimitExceededAction::DropTag, Mode::Exact), tracking_scope: TrackingScope::default(), max_tracked_keys: None, per_metric_limits: HashMap::from([( @@ -1439,6 +1439,118 @@ fn global_per_tag_overridden_by_per_metric_entry() { } } +// Transform-level TTL coverage focuses on the *config surface* (defaults, +// deserialization, the public `contains_no_refresh` contract). The behavioral +// TTL tests, where we need to drive `Instant`s, live in `tag_value_set.rs`. + +#[test] +fn ttl_defaults_off() { + let cfg = make_transform_hashset(2, LimitExceededAction::DropTag); + assert!( + cfg.global.ttl_secs.is_none(), + "default config must not enable TTL" + ); + assert_eq!( + cfg.global.ttl_generations, + default_ttl_generations(), + "default generations should match the documented default" + ); +} + +#[test] +fn ttl_global_yaml_deserializes() { + let yaml = r#" +value_limit: 5 +mode: exact +ttl_secs: 3600 +ttl_generations: 6 +"#; + let parsed: Config = serde_yaml::from_str(yaml).expect("yaml should deserialize"); + assert_eq!(parsed.global.ttl_secs, Some(3600)); + assert_eq!(parsed.global.ttl_generations, 6); +} + +#[test] +fn ttl_per_metric_yaml_deserializes() { + let yaml = r#" +value_limit: 5 +mode: exact +ttl_secs: 3600 +per_metric_limits: + hot_metric: + mode: probabilistic + cache_size_per_key: 1024 + value_limit: 100 + ttl_secs: 600 + ttl_generations: 3 +"#; + let parsed: Config = serde_yaml::from_str(yaml).expect("yaml should deserialize"); + assert_eq!(parsed.global.ttl_secs, Some(3600)); + let pm = parsed.per_metric_limits.get("hot_metric").unwrap(); + assert_eq!(pm.config.ttl_secs, Some(600)); + assert_eq!(pm.config.ttl_generations, 3); +} + +/// Pins the public surface of `contains_no_refresh` for both TTL and non-TTL +/// backends. The "no refresh" timing behavior itself is verified in +/// `tag_value_set.rs::tests` where we can drive `Instant`s directly. +#[test] +fn drop_event_pre_check_uses_no_refresh_variant() { + use super::tag_value_set::AcceptedTagValueSet; + use crate::event::metric::TagValueSet; + + let v1 = TagValueSet::from(["v1".to_string()]); + let bloom_mode = Mode::Probabilistic(BloomFilterConfig { + cache_size_per_key: default_cache_size(), + }); + + for (label, mut set) in [ + ("exact no-ttl", AcceptedTagValueSet::new(4, &Mode::Exact, None, 4)), + ("bloom no-ttl", AcceptedTagValueSet::new(4, &bloom_mode, None, 4)), + ("exact ttl", AcceptedTagValueSet::new(4, &Mode::Exact, Some(60), 4)), + ("bloom ttl", AcceptedTagValueSet::new(4, &bloom_mode, Some(60), 4)), + ] { + set.insert(v1.clone()); + assert!(set.contains_no_refresh(&v1), "{label}: should find inserted value"); + } +} + +/// `ttl_secs: 0` must be equivalent to "TTL disabled". If we ever flipped this +/// to "expire immediately" the cache would empty on every call, silently +/// breaking `value_limit` for anyone who configured `0` to disable TTL. +#[test] +fn ttl_zero_disables_ttl() { + use super::tag_value_set::AcceptedTagValueSet; + use crate::event::metric::TagValueSet; + + let v = TagValueSet::from(["v1".to_string()]); + let bloom_mode = Mode::Probabilistic(BloomFilterConfig { + cache_size_per_key: default_cache_size(), + }); + for mut set in [ + AcceptedTagValueSet::new(4, &Mode::Exact, Some(0), 4), + AcceptedTagValueSet::new(4, &bloom_mode, Some(0), 4), + ] { + set.insert(v.clone()); + assert!(set.contains(&v)); + assert_eq!(set.len(), 1); + } +} + +#[test] +fn ttl_existing_yaml_unchanged() { + // A pre-TTL config must continue to parse without any TTL fields and + // produce ttl_secs=None — that's the backwards-compat contract. + let yaml = r#" +value_limit: 5 +mode: probabilistic +cache_size_per_key: 2048 +"#; + let parsed: Config = serde_yaml::from_str(yaml).expect("yaml should deserialize"); + assert!(parsed.global.ttl_secs.is_none()); + assert_eq!(parsed.global.ttl_generations, default_ttl_generations()); +} + /// Global per-tag YAML syntax mirrors per-metric `per_tag_limits`: `mode: limit_override` /// with `value_limit`, and `mode: excluded`. #[test] diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index 1e990965638c6..e29137894f8c1 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -931,6 +931,18 @@ components: sources: internal_metrics: { default_namespace: "vector" tags: _component_tags } + tag_cardinality_ttl_expirations_total: { + description: """ + The total number of distinct tag values expired by the `tag_cardinality_limit` + transform's TTL eviction (only emitted when `ttl_secs` is configured). In + `exact` mode this is the number of values whose last sighting was older than + `ttl_secs`; in `probabilistic` mode this is the count drained from the oldest + rolling-bloom shard at each rotation. + """ + type: "counter" + default_namespace: "vector" + tags: _component_tags + } tag_value_limit_exceeded_total: { description: """ The total number of events discarded because the tag has been rejected after diff --git a/website/cue/reference/components/transforms/tag_cardinality_limit.cue b/website/cue/reference/components/transforms/tag_cardinality_limit.cue index 91cf2962185b4..4b4191d98f4ca 100644 --- a/website/cue/reference/components/transforms/tag_cardinality_limit.cue +++ b/website/cue/reference/components/transforms/tag_cardinality_limit.cue @@ -165,6 +165,58 @@ components: transforms: tag_cardinality_limit: { """ } + ttl: { + title: "TTL (sliding-window cardinality)" + body: """ + By default, the cardinality cache grows monotonically — every distinct value + ever seen for a tag occupies a slot under `value_limit` until Vector restarts. + Setting `ttl_secs` turns the cache into a *sliding window*: any tag value not + observed within that many seconds is dropped, freeing room for fresh values. + + This is useful when the downstream system bills or pages on a rolling + unique-series window (e.g. Datadog computes custom-metric cardinality on a + 1-hour p95). A monotonic cache will eventually saturate at `value_limit` and + start rejecting legitimate new values long after the old ones have aged out + of the billing window. + + ```yaml + type: tag_cardinality_limit + value_limit: 500 + mode: probabilistic + cache_size_per_key: 5120 + ttl_secs: 3600 # match the Datadog billing window + ttl_generations: 4 # eviction granularity = 15 min + ``` + + **Refresh-on-sighting**: every cache hit (not just inserts) extends the + value's lease. Continuously-observed values stay in the cache indefinitely; + only values that go silent for longer than `ttl_secs` are evicted. + + **Mode interaction**: + + - `mode: exact` — every value carries a precise last-seen timestamp. + Eviction is exact to within roughly `ttl_secs / ttl_generations` + (capped at a 1-second minimum to keep sweep cost negligible). + `ttl_generations` controls only the sweep cadence in exact mode. + - `mode: probabilistic` — the underlying bloom filter is split into + `ttl_generations` rolling shards. Memory cost rises to + `ttl_generations * cache_size_per_key` per (metric, tag-key) pair. + Reduce `cache_size_per_key` if you want to keep total memory flat. + `ttl_generations: 1` produces a tumbling window (everything resets + at once every `ttl_secs`), which can be useful for matching a strict + billing-window boundary. + + **Per-metric overrides do not inherit**: setting `ttl_secs` inside a + `per_metric_limits.` block (or leaving it unset there) fully + replaces the global TTL for that metric — it does not fall back to + the top-level `ttl_secs`. This mirrors the precedence rules for + `value_limit`. To share the global TTL on a specific metric, copy the + value explicitly. + + **Restarts still reset the cache** — see [restarts](#restarts). + """ + } + per_tag_limits: { title: "Per-tag overrides" body: """ @@ -223,7 +275,8 @@ components: transforms: tag_cardinality_limit: { } telemetry: metrics: { - tag_value_limit_exceeded_total: components.sources.internal_metrics.output.metrics.tag_value_limit_exceeded_total - value_limit_reached_total: components.sources.internal_metrics.output.metrics.value_limit_reached_total + tag_cardinality_ttl_expirations_total: components.sources.internal_metrics.output.metrics.tag_cardinality_ttl_expirations_total + tag_value_limit_exceeded_total: components.sources.internal_metrics.output.metrics.tag_value_limit_exceeded_total + value_limit_reached_total: components.sources.internal_metrics.output.metrics.value_limit_reached_total } } From 3c9a0bf2681ab1ab80c18a839f5b85d019e8865c Mon Sep 17 00:00:00 2001 From: Karol Chrapek Date: Fri, 22 May 2026 10:54:39 +0200 Subject: [PATCH 2/8] Fix tests --- .../tag_cardinality_limit/tag_value_set.rs | 68 ++++++++++++------- src/transforms/tag_cardinality_limit/tests.rs | 43 ++++++++---- 2 files changed, 72 insertions(+), 39 deletions(-) diff --git a/src/transforms/tag_cardinality_limit/tag_value_set.rs b/src/transforms/tag_cardinality_limit/tag_value_set.rs index 8d7562cfdc764..195bad6749bec 100644 --- a/src/transforms/tag_cardinality_limit/tag_value_set.rs +++ b/src/transforms/tag_cardinality_limit/tag_value_set.rs @@ -367,6 +367,16 @@ impl AcceptedTagValueSet { TagValueSetStorage::RollingBloom(s) => s.insert(&value), }; } + + /// Test-only accessor: true iff this set uses a TTL-enabled backend. + /// Lets tests pin backend selection without exposing the internal enum. + #[cfg(test)] + pub(crate) fn ttl_enabled(&self) -> bool { + matches!( + self.storage, + TagValueSetStorage::TtlSet(_) | TagValueSetStorage::RollingBloom(_) + ) + } } #[cfg(test)] @@ -432,20 +442,23 @@ mod tests { #[test] fn ttl_exact_refresh_on_contains_extends_lease() { - // Simulate: insert at t0, "sighting" at t0+30s, advance to t0+90s. - // Without refresh, value would be evicted; with refresh, it survives. - let ttl = Duration::from_secs(60); - let mut s = TtlExactStorage::new(8, ttl, 4); - let t0 = Instant::now(); - s.map.insert(v("a"), t0); - s.last_sweep = t0; - // Refresh manually as if `contains` were called at t0+30s. - s.map.insert(v("a"), t0 + Duration::from_secs(30)); - // Sweep at t0+90s: a's last_seen = t0+30, age = 60s, not yet > ttl. - s.sweep(t0 + Duration::from_secs(90)); + // Seed the map with an old timestamp, then drive a real `contains`. + // The refresh path (`*slot = now;`) must push the stored Instant + // forward — otherwise sweeps continue to use the old (potentially + // expired) timestamp. A short sleep guarantees `Instant::now()` is + // strictly after `t_insert` on every platform. + let mut s = TtlExactStorage::new(8, Duration::from_secs(60), 4); + let t_insert = Instant::now(); + s.map.insert(v("a"), t_insert); + s.last_sweep = t_insert; + + std::thread::sleep(Duration::from_millis(2)); + + assert!(s.contains(&v("a"))); + let after = *s.map.get(&v("a")).expect("entry should still be present"); assert!( - s.map.contains_key(&v("a")), - "refresh should have extended lease" + after > t_insert, + "contains() must refresh the stored Instant; was {t_insert:?}, still {after:?}" ); } @@ -521,21 +534,28 @@ mod tests { } #[test] - fn rolling_bloom_refresh_on_contains_keeps_hot_values() { - // Drive 4 rotations (one full window) and "sight" `hot` between each - // one. Refresh-on-sighting should re-seat `hot` in the current shard - // so it survives the eventual eviction of its original shard. + fn rolling_bloom_refresh_on_contains_seeds_newest_shard() { + // Force one rotation so `hot` lives in the front shard and the back + // is fresh-empty. A real `contains` call must re-seed it into the + // newest shard — this is what gives hot values survival across + // future rotations. let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(4)); + s.shards.back_mut().unwrap().insert(&v("hot")); let t0 = Instant::now(); s.next_rotate = t0 + Duration::from_secs(1); - s.shards.back_mut().unwrap().insert(&v("hot")); + s.rotate_if_needed(t0 + Duration::from_secs(2)); - for step in 1..=4 { - s.rotate_if_needed(t0 + Duration::from_secs(step)); - assert!(s.shards.iter().rev().any(|sh| sh.contains(&v("hot")))); - s.shards.back_mut().unwrap().insert(&v("hot")); - } - assert!(s.shards.iter().any(|sh| sh.contains(&v("hot")))); + assert_eq!( + s.shards.back().unwrap().count(), + 0, + "back shard should be fresh-empty after rotation" + ); + + assert!(s.contains(&v("hot"))); + assert!( + s.shards.back().unwrap().contains(&v("hot")), + "contains() must re-seed found values into the newest shard" + ); } #[test] diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index fec3b7df6b073..5af9e17c38d68 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -1491,11 +1491,17 @@ per_metric_limits: assert_eq!(pm.config.ttl_generations, 3); } -/// Pins the public surface of `contains_no_refresh` for both TTL and non-TTL -/// backends. The "no refresh" timing behavior itself is verified in -/// `tag_value_set.rs::tests` where we can drive `Instant`s directly. +/// Pins the basic contract of `contains_no_refresh`: it must return `true` +/// for a value that was just inserted, across every backend variant. The +/// "no-refresh" timing semantic (the actual *DropEvent* contract) is verified +/// in `tag_value_set.rs::tests::{ttl_exact,rolling_bloom}_contains_no_refresh_*`, +/// where the `Instant`-driven storage methods can be exercised directly. +/// +/// Note: this test does NOT verify that `tag_limit_exceeded` calls +/// `contains_no_refresh` (and not `contains`) — that wiring is enforced by +/// code review of the (private) match arm in `mod.rs::tag_limit_exceeded`. #[test] -fn drop_event_pre_check_uses_no_refresh_variant() { +fn contains_no_refresh_finds_inserted_values_on_all_backends() { use super::tag_value_set::AcceptedTagValueSet; use crate::event::metric::TagValueSet; @@ -1515,26 +1521,33 @@ fn drop_event_pre_check_uses_no_refresh_variant() { } } -/// `ttl_secs: 0` must be equivalent to "TTL disabled". If we ever flipped this -/// to "expire immediately" the cache would empty on every call, silently -/// breaking `value_limit` for anyone who configured `0` to disable TTL. +/// `ttl_secs: 0` must select the **non-TTL** backend (same as `None`). If we +/// ever flipped this to "expire immediately" — i.e. a TTL backend with +/// `Duration::ZERO` — the 1-second `sweep_interval` floor would mask the bug +/// in any externally-observable behavior, but the cache would still degrade +/// silently the moment a sweep boundary was crossed. #[test] fn ttl_zero_disables_ttl() { use super::tag_value_set::AcceptedTagValueSet; - use crate::event::metric::TagValueSet; - let v = TagValueSet::from(["v1".to_string()]); let bloom_mode = Mode::Probabilistic(BloomFilterConfig { cache_size_per_key: default_cache_size(), }); - for mut set in [ - AcceptedTagValueSet::new(4, &Mode::Exact, Some(0), 4), - AcceptedTagValueSet::new(4, &bloom_mode, Some(0), 4), + for (label, set) in [ + ("exact ttl=0", AcceptedTagValueSet::new(4, &Mode::Exact, Some(0), 4)), + ("bloom ttl=0", AcceptedTagValueSet::new(4, &bloom_mode, Some(0), 4)), + ("exact ttl=None", AcceptedTagValueSet::new(4, &Mode::Exact, None, 4)), + ("bloom ttl=None", AcceptedTagValueSet::new(4, &bloom_mode, None, 4)), ] { - set.insert(v.clone()); - assert!(set.contains(&v)); - assert_eq!(set.len(), 1); + assert!( + !set.ttl_enabled(), + "{label}: must select the non-TTL backend" + ); } + + // Sanity: TTL with a positive value DOES select the TTL backend. + let ttl_set = AcceptedTagValueSet::new(4, &Mode::Exact, Some(60), 4); + assert!(ttl_set.ttl_enabled(), "ttl=Some(60) should enable TTL"); } #[test] From 6395814106da90375842213999cc87d6f916f830 Mon Sep 17 00:00:00 2001 From: Karol Chrapek Date: Fri, 22 May 2026 10:56:32 +0200 Subject: [PATCH 3/8] Add changelog.md --- .../tag_cardinality_limit_ttl.enhancement.md | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 changelog.d/tag_cardinality_limit_ttl.enhancement.md diff --git a/changelog.d/tag_cardinality_limit_ttl.enhancement.md b/changelog.d/tag_cardinality_limit_ttl.enhancement.md new file mode 100644 index 0000000000000..bb75eaeaee3fe --- /dev/null +++ b/changelog.d/tag_cardinality_limit_ttl.enhancement.md @@ -0,0 +1,20 @@ +The `tag_cardinality_limit` transform gained an optional sliding-window TTL +for tracked tag values, controlled by two new settings on the global block, +each `per_metric_limits` entry, and the `Inner`/`OverrideInner` config schemas: + +- **`ttl_secs`**: expire tracked tag values after this many seconds since they + were last seen. Useful when the downstream system (e.g. Datadog custom + metrics) bills on a rolling unique-series window — without TTL, the + cardinality cache saturates `value_limit` and starts rejecting fresh values + long after the old ones have aged out of the billing window. When unset + (default), behavior is unchanged from previous releases. +- **`ttl_generations`**: tune how the TTL window is sliced for the + `probabilistic` backend. Defaults to `4` (eviction granularity = + `ttl_secs / 4`). Memory cost is `ttl_generations * cache_size_per_key` per + (metric, tag-key) pair; lower `cache_size_per_key` to keep total memory flat. + In `exact` mode this knob only controls the sweep cadence. + +A new internal counter `tag_cardinality_ttl_expirations_total` reports how +many distinct values are evicted by TTL. + +authors: kaarolch From 59e05f8db4ca4d929c6acd9c596452ef98e99fb0 Mon Sep 17 00:00:00 2001 From: Karol Chrapek Date: Fri, 22 May 2026 11:01:43 +0200 Subject: [PATCH 4/8] Fix formating --- .../tag_cardinality_limit_ttl.enhancement.md | 40 ++++++++--------- .../tag_cardinality_limit/tag_value_set.rs | 3 +- src/transforms/tag_cardinality_limit/tests.rs | 45 +++++++++++++++---- 3 files changed, 57 insertions(+), 31 deletions(-) diff --git a/changelog.d/tag_cardinality_limit_ttl.enhancement.md b/changelog.d/tag_cardinality_limit_ttl.enhancement.md index bb75eaeaee3fe..03cf1b6aacaea 100644 --- a/changelog.d/tag_cardinality_limit_ttl.enhancement.md +++ b/changelog.d/tag_cardinality_limit_ttl.enhancement.md @@ -1,20 +1,20 @@ -The `tag_cardinality_limit` transform gained an optional sliding-window TTL -for tracked tag values, controlled by two new settings on the global block, -each `per_metric_limits` entry, and the `Inner`/`OverrideInner` config schemas: - -- **`ttl_secs`**: expire tracked tag values after this many seconds since they - were last seen. Useful when the downstream system (e.g. Datadog custom - metrics) bills on a rolling unique-series window — without TTL, the - cardinality cache saturates `value_limit` and starts rejecting fresh values - long after the old ones have aged out of the billing window. When unset - (default), behavior is unchanged from previous releases. -- **`ttl_generations`**: tune how the TTL window is sliced for the - `probabilistic` backend. Defaults to `4` (eviction granularity = - `ttl_secs / 4`). Memory cost is `ttl_generations * cache_size_per_key` per - (metric, tag-key) pair; lower `cache_size_per_key` to keep total memory flat. - In `exact` mode this knob only controls the sweep cadence. - -A new internal counter `tag_cardinality_ttl_expirations_total` reports how -many distinct values are evicted by TTL. - -authors: kaarolch +The `tag_cardinality_limit` transform gained an optional sliding-window TTL +for tracked tag values, controlled by two new settings on the global block, +each `per_metric_limits` entry, and the `Inner`/`OverrideInner` config schemas: + +- **`ttl_secs`**: expire tracked tag values after this many seconds since they + were last seen. Useful when the downstream system (e.g. Datadog custom + metrics) bills on a rolling unique-series window — without TTL, the + cardinality cache saturates `value_limit` and starts rejecting fresh values + long after the old ones have aged out of the billing window. When unset + (default), behavior is unchanged from previous releases. +- **`ttl_generations`**: tune how the TTL window is sliced for the + `probabilistic` backend. Defaults to `4` (eviction granularity = + `ttl_secs / 4`). Memory cost is `ttl_generations * cache_size_per_key` per + (metric, tag-key) pair; lower `cache_size_per_key` to keep total memory flat. + In `exact` mode this knob only controls the sweep cadence. + +A new internal counter `tag_cardinality_ttl_expirations_total` reports how +many distinct values are evicted by TTL. + +authors: kaarolch diff --git a/src/transforms/tag_cardinality_limit/tag_value_set.rs b/src/transforms/tag_cardinality_limit/tag_value_set.rs index 195bad6749bec..c6f5b4fb78ed8 100644 --- a/src/transforms/tag_cardinality_limit/tag_value_set.rs +++ b/src/transforms/tag_cardinality_limit/tag_value_set.rs @@ -22,8 +22,7 @@ use std::{ use bloomy::BloomFilter; use crate::{ - event::metric::TagValueSet, - internal_events::TagCardinalityTtlExpired, + event::metric::TagValueSet, internal_events::TagCardinalityTtlExpired, transforms::tag_cardinality_limit::config::Mode, }; diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index 5af9e17c38d68..0bc150665aa35 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -1511,13 +1511,28 @@ fn contains_no_refresh_finds_inserted_values_on_all_backends() { }); for (label, mut set) in [ - ("exact no-ttl", AcceptedTagValueSet::new(4, &Mode::Exact, None, 4)), - ("bloom no-ttl", AcceptedTagValueSet::new(4, &bloom_mode, None, 4)), - ("exact ttl", AcceptedTagValueSet::new(4, &Mode::Exact, Some(60), 4)), - ("bloom ttl", AcceptedTagValueSet::new(4, &bloom_mode, Some(60), 4)), + ( + "exact no-ttl", + AcceptedTagValueSet::new(4, &Mode::Exact, None, 4), + ), + ( + "bloom no-ttl", + AcceptedTagValueSet::new(4, &bloom_mode, None, 4), + ), + ( + "exact ttl", + AcceptedTagValueSet::new(4, &Mode::Exact, Some(60), 4), + ), + ( + "bloom ttl", + AcceptedTagValueSet::new(4, &bloom_mode, Some(60), 4), + ), ] { set.insert(v1.clone()); - assert!(set.contains_no_refresh(&v1), "{label}: should find inserted value"); + assert!( + set.contains_no_refresh(&v1), + "{label}: should find inserted value" + ); } } @@ -1534,10 +1549,22 @@ fn ttl_zero_disables_ttl() { cache_size_per_key: default_cache_size(), }); for (label, set) in [ - ("exact ttl=0", AcceptedTagValueSet::new(4, &Mode::Exact, Some(0), 4)), - ("bloom ttl=0", AcceptedTagValueSet::new(4, &bloom_mode, Some(0), 4)), - ("exact ttl=None", AcceptedTagValueSet::new(4, &Mode::Exact, None, 4)), - ("bloom ttl=None", AcceptedTagValueSet::new(4, &bloom_mode, None, 4)), + ( + "exact ttl=0", + AcceptedTagValueSet::new(4, &Mode::Exact, Some(0), 4), + ), + ( + "bloom ttl=0", + AcceptedTagValueSet::new(4, &bloom_mode, Some(0), 4), + ), + ( + "exact ttl=None", + AcceptedTagValueSet::new(4, &Mode::Exact, None, 4), + ), + ( + "bloom ttl=None", + AcceptedTagValueSet::new(4, &bloom_mode, None, 4), + ), ] { assert!( !set.ttl_enabled(), From 7aa7efbef389d4b72a026d1fd35b9b667a31ae32 Mon Sep 17 00:00:00 2001 From: Karol Chrapek Date: Fri, 22 May 2026 11:12:35 +0200 Subject: [PATCH 5/8] Add missing const --- src/transforms/tag_cardinality_limit/tag_value_set.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transforms/tag_cardinality_limit/tag_value_set.rs b/src/transforms/tag_cardinality_limit/tag_value_set.rs index c6f5b4fb78ed8..fd1829ff91e3b 100644 --- a/src/transforms/tag_cardinality_limit/tag_value_set.rs +++ b/src/transforms/tag_cardinality_limit/tag_value_set.rs @@ -370,7 +370,7 @@ impl AcceptedTagValueSet { /// Test-only accessor: true iff this set uses a TTL-enabled backend. /// Lets tests pin backend selection without exposing the internal enum. #[cfg(test)] - pub(crate) fn ttl_enabled(&self) -> bool { + pub(crate) const fn ttl_enabled(&self) -> bool { matches!( self.storage, TagValueSetStorage::TtlSet(_) | TagValueSetStorage::RollingBloom(_) From b95265e91cfdccdb32ffb30db73f83b12b2038dd Mon Sep 17 00:00:00 2001 From: Karol Chrapek Date: Fri, 22 May 2026 12:50:53 +0200 Subject: [PATCH 6/8] fix(tag_cardinality_limit transform): address Codex review on TTL implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five fixes flagged by Codex on PR #25485: P1 - `RollingBloomStorage::len()` returned the max across shards, which under-counts the window-union cardinality. Distinct cold values spread across shards kept every individual shard below `value_limit` while the union exceeded it, silently admitting values past the cap. Now sums shard counts as a strict upper bound; the only failure mode is over-rejection (observable via `tag_value_limit_exceeded_total`). P1 - `BloomFilterStorage::insert` skipped setting bits when `contains` already returned true. For the rolling-bloom refresh path, that meant a recently-sighted value could ride on another value's false-positive bits rather than holding its own — its lifetime then depended on when those unrelated bits aged out instead of its own activity. The insert now sets bits unconditionally; count tracking remains conditional. P2 - `slice` was floored to 1s, silently stretching the configured TTL window to `ttl_generations` seconds whenever `ttl_secs < ttl_generations`. The effective number of generations is now capped to `ttl_secs` so `slice * generations == ttl` is preserved exactly. Same cap applied to `TtlExactStorage::new` for consistency. P2 - `Instant + Duration` could panic on pathological `ttl_secs` close to `u64::MAX`. Now uses checked arithmetic with saturation, so an oversized config keeps the transform alive instead of crashing it. P2 - TTL eviction left empty `(metric, tag-key)` buckets in place, which permanently consumed slots under `max_tracked_keys`. A lazy `reclaim_empty_buckets` runs only on the cap-hit path; freed slots are now reused. Steady-state allocations are unaffected. Refs: vectordotdev/vector#25485 --- .../tag_cardinality_limit_ttl_fixes.fix.md | 31 +++ src/transforms/tag_cardinality_limit/mod.rs | 50 +++- .../tag_cardinality_limit/tag_value_set.rs | 249 +++++++++++++++--- src/transforms/tag_cardinality_limit/tests.rs | 51 ++++ .../transforms/tag_cardinality_limit.cue | 19 +- 5 files changed, 363 insertions(+), 37 deletions(-) create mode 100644 changelog.d/tag_cardinality_limit_ttl_fixes.fix.md diff --git a/changelog.d/tag_cardinality_limit_ttl_fixes.fix.md b/changelog.d/tag_cardinality_limit_ttl_fixes.fix.md new file mode 100644 index 0000000000000..e845e291aa72b --- /dev/null +++ b/changelog.d/tag_cardinality_limit_ttl_fixes.fix.md @@ -0,0 +1,31 @@ +Several fixes to the `tag_cardinality_limit` transform's sliding-window TTL: + +- The probabilistic backend's `value_limit` cap is now enforced against an + upper-bound estimate of the union cardinality across all generational + shards. Previously the cap was checked against the maximum count of any + single shard, which under-counts when distinct values are spread across + shards (high-churn / low-repeat traffic) and silently admitted values past + the configured `value_limit`. +- `ttl_generations` is now silently capped to `ttl_secs` when `ttl_secs` is + smaller, so the configured TTL window is honored exactly. Previously the + per-slice duration was floored to 1 second, which stretched the effective + retention window to `ttl_generations` seconds when `ttl_secs < + ttl_generations` (for example `ttl_secs: 1, ttl_generations: 4` kept values + for about 4 seconds). +- Rolling-bloom refresh-on-sighting now sets the bits in the newest shard + unconditionally instead of skipping the write when the bloom already reports + the value as present. The skip path could leave a recently-observed value + riding on another value's false-positive bits, so its lifetime depended on + when those unrelated bits aged out rather than on its own activity. +- Pathological `ttl_secs` configs near `u64::MAX` no longer panic the + transform on construction or rotation. `Instant + Duration` overflows are + now caught and saturated so the transform stays alive instead of crashing + on misconfiguration. +- When TTL eviction empties a `(metric, tag-key)` bucket, the slot is now + reclaimed under `max_tracked_keys`. Previously the bucket lingered with an + empty inner set and permanently consumed a slot, so high-churn workloads + could hit the cap forever even after older pairs had fully aged out. + Reclamation runs lazily on the cap-hit path, so steady-state allocations + are unaffected. + +authors: kaarolch diff --git a/src/transforms/tag_cardinality_limit/mod.rs b/src/transforms/tag_cardinality_limit/mod.rs index cc920a9b9a6ad..587e4e61a2649 100644 --- a/src/transforms/tag_cardinality_limit/mod.rs +++ b/src/transforms/tag_cardinality_limit/mod.rs @@ -82,6 +82,42 @@ impl TagCardinalityLimit { }); } + /// Drop (metric, tag-key) buckets whose `AcceptedTagValueSet` has been + /// fully expired by TTL, decrementing `tracked_keys_count` so freed slots + /// can be reused under `max_tracked_keys`. + /// + /// Without this, TTL sweeps in `TtlExactStorage` / `RollingBloomStorage` + /// only shrink the inner storage; the empty bucket itself lingers and + /// permanently consumes a slot. High-churn workloads under + /// `max_tracked_keys` would then hit the cap forever — every new + /// `(metric, tag-key)` pair would fall through as untracked even after + /// the cache for older pairs has fully aged out. + /// + /// Called lazily on cap-hit paths so the steady-state overhead is zero; + /// the scan only fires when a new allocation would otherwise be rejected. + /// `len()` is intentionally called on every set because it also drives + /// the lazy sweep/rotation that may be what empties the bucket. + fn reclaim_empty_buckets(&mut self) { + let mut reclaimed = 0usize; + self.accepted_tags.retain(|_, inner| { + inner.retain(|_, set| { + if set.len() == 0 { + reclaimed += 1; + false + } else { + true + } + }); + !inner.is_empty() + }); + if reclaimed > 0 { + self.tracked_keys_count = self.tracked_keys_count.saturating_sub(reclaimed); + emit!(TagCardinalityTrackedKeys { + count: self.tracked_keys_count, + }); + } + } + /// Resolve the configuration that applies to a specific (metric, tag) pair. /// /// Per-tag entries support two modes: @@ -214,7 +250,12 @@ impl TagCardinalityLimit { if !pair_exists { if !self.can_allocate_new_key() { - return AcceptResult::Untracked; + // Reclaim before giving up: a TTL backend may have emptied + // buckets we're still accounting against `max_tracked_keys`. + self.reclaim_empty_buckets(); + if !self.can_allocate_new_key() { + return AcceptResult::Untracked; + } } self.record_new_key_allocation(); } @@ -315,7 +356,12 @@ impl TagCardinalityLimit { if !pair_exists { if !self.can_allocate_new_key() { - return true; + // See `try_accept_tag` — try reclaiming empty TTL buckets + // before treating the new pair as untracked. + self.reclaim_empty_buckets(); + if !self.can_allocate_new_key() { + return true; + } } self.record_new_key_allocation(); } diff --git a/src/transforms/tag_cardinality_limit/tag_value_set.rs b/src/transforms/tag_cardinality_limit/tag_value_set.rs index 2791fb54785d8..f97b3bff17132 100644 --- a/src/transforms/tag_cardinality_limit/tag_value_set.rs +++ b/src/transforms/tag_cardinality_limit/tag_value_set.rs @@ -26,6 +26,14 @@ use crate::{ transforms::tag_cardinality_limit::config::Mode, }; +/// `Instant + Duration` panics if the resulting instant is outside the +/// platform's representable range. Pathological `ttl_secs` values (close to +/// `u64::MAX`) can hit that limit; this helper falls back to the original +/// instant so the transform stays alive instead of crashing on misconfiguration. +fn saturating_add(instant: Instant, duration: Duration) -> Instant { + instant.checked_add(duration).unwrap_or(instant) +} + /// Container for storing the set of accepted values for a given tag key. #[derive(Debug)] pub struct AcceptedTagValueSet { @@ -57,9 +65,19 @@ impl BloomFilterStorage { } fn insert(&mut self, value: &TagValueSet) { - // Only update the count if the value is not already in the bloom filter. - if !self.inner.contains(value) { - self.inner.insert(value); + // Set the bits unconditionally; `bloomy::BloomFilter::insert` is + // idempotent at the bit level (already-set bits are a no-op). The + // unconditional form matters for the rolling-bloom refresh path: + // `RollingBloomStorage::contains` re-inserts hits into the newest + // shard, and skipping that write on a false-positive `contains` hit + // would mean the value rides on someone else's bits in the newest + // shard rather than holding its own — making its lifetime depend on + // when those unrelated bits age out instead of the value's own + // activity. Count tracking remains conditional so we still report + // distinct first-time inserts. + let was_already_present = self.inner.contains(value); + self.inner.insert(value); + if !was_already_present { self.count += 1; } } @@ -92,14 +110,22 @@ struct TtlExactStorage { impl TtlExactStorage { fn new(ttl: Duration, generations: u8) -> Self { - let now = Instant::now(); - let divisor = generations.max(1) as u32; - let sweep_interval = (ttl / divisor).max(Duration::from_secs(1)); + // Same cap as `RollingBloomStorage::new`: clamp effective generations + // so `sweep_interval >= 1s` without stretching the configured TTL + // window. For `ttl_secs >= 1` the invariant + // `sweep_interval * effective == ttl` holds exactly. Eviction + // precision in exact mode is `[ttl, ttl + sweep_interval)` — the cap + // keeps that upper bound proportional to the configured TTL instead + // of being silently widened by the old 1-second floor. + let requested = generations.max(1) as u32; + let max_for_ttl = ttl.as_secs().max(1) as u32; + let effective = requested.min(max_for_ttl).max(1); + let sweep_interval = ttl / effective; Self { map: HashMap::new(), ttl, sweep_interval, - last_sweep: now, + last_sweep: Instant::now(), } } @@ -178,18 +204,32 @@ struct RollingBloomStorage { impl RollingBloomStorage { fn new(cache_size_per_key: usize, generations: u8, ttl: Duration) -> Self { - let generations = generations.max(1); - // Avoid a zero-duration slice (would cause `rotate_if_needed` to spin). - let slice = (ttl / generations as u32).max(Duration::from_secs(1)); - let mut shards = VecDeque::with_capacity(generations as usize); + // Cap effective generations so `slice >= 1s` (avoid `rotate_if_needed` + // spinning) WITHOUT stretching the TTL window past `ttl_secs`. The + // previous behavior — flooring `slice` to 1s — silently grew the + // effective retention window to `1s × ttl_generations` whenever + // `ttl_secs < ttl_generations`, violating the configured TTL contract. + // For every valid `ttl_secs >= 1`, this preserves the invariant + // `slice * effective_generations == ttl` exactly. + let requested = generations.max(1) as u32; + let max_for_ttl = ttl.as_secs().max(1) as u32; + let effective = requested.min(max_for_ttl).max(1); + let slice = ttl / effective; + let mut shards = VecDeque::with_capacity(effective as usize); shards.push_back(BloomFilterStorage::new(cache_size_per_key)); let now = Instant::now(); Self { shards, - generations, + generations: effective as u8, slice, cache_size_per_key, - next_rotate: now + slice, + // Pathological `ttl_secs` (e.g. close to `u64::MAX`) could push + // `now + slice` past `Instant`'s platform-specific range and panic. + // Saturating to `now` is safe: the first call to `rotate_if_needed` + // will fast-forward `next_rotate` via the same guard below and + // settle into a normal rotation cadence (which for any oversized + // TTL will never actually fire under realistic Vector uptime). + next_rotate: saturating_add(now, slice), } } @@ -207,14 +247,14 @@ impl RollingBloomStorage { } self.shards .push_back(BloomFilterStorage::new(self.cache_size_per_key)); - self.next_rotate += self.slice; + self.next_rotate = saturating_add(self.next_rotate, self.slice); rotations += 1; } // If we needed more rotations than `generations`, the whole window is // stale — fast-forward `next_rotate` to avoid a tight catch-up the next // call after a long idle period. if now >= self.next_rotate { - self.next_rotate = now + self.slice; + self.next_rotate = saturating_add(now, self.slice); } } @@ -251,14 +291,31 @@ impl RollingBloomStorage { } } + /// Upper bound on the number of distinct values currently retained. + /// + /// Bloom shards can't be enumerated, so the true union cardinality + /// isn't directly computable. Summing per-shard counts is a strict + /// upper bound: a value present in N shards (because refresh-on-sighting + /// has re-seeded it into newer shards) contributes N to the sum but + /// only 1 to the union. + /// + /// Bias is bounded by `ttl_generations`. Cold-churn workloads — the + /// original TTL motivation (`pod_name`, ephemeral IDs) — see no bias + /// because each value lives in exactly one shard. Hot, continuously- + /// sighted values are over-counted by up to a factor of `ttl_generations`; + /// users running such workloads can set `ttl_generations: 1` (tumbling + /// window) to eliminate it. + /// + /// `sum` is preferred over `max` because a cardinality limiter must + /// never silently admit values past `value_limit`. `max` underestimates + /// the union (distinct values spread across shards keep every shard + /// below the cap while the union exceeds it). The only failure mode of + /// `sum` is over-rejection, which is observable via + /// `tag_value_limit_exceeded_total`. fn len(&mut self) -> usize { let now = Instant::now(); self.rotate_if_needed(now); - // Cardinality is bounded above by any individual shard's count under - // refresh-on-sighting (hot values are present in every shard). Taking - // the max is cheap and converges to the true unique count as soon as - // every retained value has been seen at least once per slice. - self.shards.iter().map(|s| s.count()).max().unwrap_or(0) + self.shards.iter().map(|s| s.count()).sum() } } @@ -387,6 +444,20 @@ mod tests { TagValueSet::from([s.to_string()]) } + #[test] + fn bloom_filter_storage_count_is_idempotent_per_value() { + // Pin the count contract that survives Codex P1: `BloomFilterStorage::insert` + // now writes the bloom bits unconditionally, but the count must still + // bump exactly once per distinct value so per-shard `len()` stays + // accurate. Inserting the same value twice must leave count at 1. + let mut b = BloomFilterStorage::new(default_cache_size()); + b.insert(&v("a")); + b.insert(&v("a")); + assert_eq!(b.count(), 1, "duplicate insert must not bump count"); + b.insert(&v("b")); + assert_eq!(b.count(), 2); + } + #[test] fn exact_no_ttl_preserves_today_behavior() { let mut set = AcceptedTagValueSet::new(&Mode::Exact, None, 4); @@ -459,11 +530,22 @@ mod tests { } #[test] - fn ttl_exact_sweep_interval_floors_to_one_second() { - // ttl=2s, generations=8 → naive slice = 250ms; we floor to 1s so sweeps - // never become dominant. Verify the floor. + fn ttl_exact_caps_generations_when_ttl_lt_generations() { + // Regression for Codex P2: previously `sweep_interval` was floored to + // 1s, which silently stretched the effective TTL window when + // `ttl_secs < generations`. The fix caps the effective generations + // so `sweep_interval * effective == ttl` exactly while keeping + // `sweep_interval >= 1s`. + let s = TtlExactStorage::new(Duration::from_secs(1), 4); + assert_eq!(s.sweep_interval, Duration::from_secs(1)); + // ttl=2s / generations=8 → effective=2, sweep_interval=1s (not floored + // from 250ms with the old code, which would have stretched the + // window to 8s). let s = TtlExactStorage::new(Duration::from_secs(2), 8); - assert!(s.sweep_interval >= Duration::from_secs(1)); + assert_eq!(s.sweep_interval, Duration::from_secs(1)); + // Tall TTL is unaffected. + let s = TtlExactStorage::new(Duration::from_secs(3600), 4); + assert_eq!(s.sweep_interval, Duration::from_secs(900)); } #[test] @@ -571,13 +653,6 @@ mod tests { ); } - #[test] - fn rolling_bloom_slice_floors_to_one_second() { - // ttl=2s, generations=8 → naive slice = 250ms; floor to 1s. - let s = RollingBloomStorage::new(default_cache_size(), 8, Duration::from_secs(2)); - assert!(s.slice >= Duration::from_secs(1)); - } - #[test] fn rolling_bloom_generations_clamped_to_at_least_one() { // generations=0 would imply div-by-zero or an empty deque; ensure @@ -586,4 +661,116 @@ mod tests { assert_eq!(s.generations, 1); assert_eq!(s.shards.len(), 1); } + + #[test] + fn rolling_bloom_caps_generations_when_ttl_lt_generations() { + // Regression for Codex P2: with ttl=1s and the default generations=4, + // the old code floored `slice` to 1s and silently stretched the + // effective window to ~4s. The fix caps effective generations to + // `ttl.as_secs()` so `slice * generations == ttl` is preserved. + let s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(1)); + assert_eq!(s.generations, 1, "effective generations capped to ttl"); + assert_eq!(s.slice, Duration::from_secs(1)); + // ttl=2s / generations=8 → effective=2, slice=1s. Without the cap + // the old code produced slice=1s but kept generations=8, stretching + // the window to 8s. + let s = RollingBloomStorage::new(default_cache_size(), 8, Duration::from_secs(2)); + assert_eq!(s.generations, 2); + assert_eq!(s.slice, Duration::from_secs(1)); + } + + #[test] + fn rolling_bloom_window_matches_ttl_exactly() { + // `slice * effective_generations == ttl` must hold for every valid + // (ttl_secs, generations) so the configured TTL is honored exactly. + for (ttl_secs, generations) in [(1u64, 4u8), (2, 8), (3, 4), (60, 4), (3600, 4), (86400, 6)] + { + let s = RollingBloomStorage::new( + default_cache_size(), + generations, + Duration::from_secs(ttl_secs), + ); + assert_eq!( + s.slice * u32::from(s.generations), + Duration::from_secs(ttl_secs), + "ttl_secs={ttl_secs}, generations={generations}: window must equal ttl", + ); + assert!( + s.slice >= Duration::from_secs(1), + "ttl_secs={ttl_secs}, generations={generations}: slice must be >= 1s", + ); + } + } + + #[test] + fn rolling_bloom_len_sums_across_shards() { + // Regression for Codex P1: `len()` must return the sum of per-shard + // counts, not the max. Otherwise distinct cold values spread across + // shards keep every individual shard below `value_limit` while the + // union exceeds it, and the cardinality limiter silently admits past + // the cap. Build the deque directly so the layout is unambiguous; + // `rotate_if_needed` is exercised separately by other tests. + let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(4)); + s.shards.clear(); + for name in ["a", "b", "c", "d"] { + let mut shard = BloomFilterStorage::new(default_cache_size()); + shard.insert(&v(name)); + s.shards.push_back(shard); + } + // Push the next rotation far enough out that the `len()` call below + // doesn't lazily rotate and disturb our hand-built layout. + s.next_rotate = Instant::now() + Duration::from_secs(3600); + // Each shard holds 1 distinct value. max() would return 1; the union + // is 4 — and `sum` must return 4. + assert_eq!( + s.len(), + 4, + "len() must sum per-shard counts to reflect the union upper bound" + ); + } + + #[test] + fn rolling_bloom_oversized_ttl_doesnt_panic() { + // Regression for Codex P2 (Instant overflow): a pathological + // `ttl_secs` close to `u64::MAX` would previously panic the + // transform via `now + slice` overflowing `Instant`'s + // platform-specific range. The `saturating_add` guard must keep + // construction and the first rotation alive. + let mut s = + RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(u64::MAX)); + // Operate on the storage so we exercise both call sites that use + // `saturating_add` (constructor + `rotate_if_needed`). + s.insert(&v("a")); + assert!(s.contains(&v("a"))); + assert_eq!(s.len(), 1); + } + + #[test] + fn rolling_bloom_len_upper_bounds_value_limit() { + // Pin the contract that motivates Fix 1: across the full window the + // sum-based `len()` must reach `value_limit` once enough distinct + // values are admitted, so `try_accept_tag` actually stops admitting. + let value_limit = 8usize; + let generations = 4u8; + let mut s = + RollingBloomStorage::new(default_cache_size(), generations, Duration::from_secs(4)); + s.shards.clear(); + let per_shard = value_limit / generations as usize; + let mut next = 0usize; + for _ in 0..generations { + let mut shard = BloomFilterStorage::new(default_cache_size()); + for _ in 0..per_shard { + shard.insert(&v(&format!("v{next}"))); + next += 1; + } + s.shards.push_back(shard); + } + s.next_rotate = Instant::now() + Duration::from_secs(3600); + assert!( + s.len() >= value_limit, + "len() must reach value_limit once enough distinct values are spread \ + across shards; got {} for value_limit={value_limit}", + s.len(), + ); + } } diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index aad452ee434ad..4d6bad10db0d0 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -1591,6 +1591,57 @@ cache_size_per_key: 2048 assert_eq!(parsed.global.ttl_generations, default_ttl_generations()); } +/// Regression for Codex P2 (empty-bucket reclaim): once `max_tracked_keys` +/// is reached, the transform must reclaim buckets that have been fully +/// emptied by TTL eviction so new `(metric, tag-key)` pairs can be tracked +/// again. Without reclaim, high-churn workloads under `max_tracked_keys` +/// would silently stay in `Untracked` mode forever, even after old buckets +/// fully aged out. +#[test] +fn max_tracked_keys_reclaims_empty_buckets() { + use super::tag_value_set::AcceptedTagValueSet; + + let mut config = make_transform_hashset(10, LimitExceededAction::DropTag); + config.max_tracked_keys = Some(1); + let mut transform = TagCardinalityLimit::new(config); + + // Pre-seed an empty bucket so the transform sees its slot as used. + let metric_key: Option = None; + let stale_set = AcceptedTagValueSet::new(&Mode::Exact, None, default_ttl_generations()); + let mut inner = hashbrown::HashMap::new(); + inner.insert("stale_tag".to_string(), stale_set); + transform.accepted_tags.insert(metric_key.clone(), inner); + transform.tracked_keys_count = 1; + + assert!( + !transform.can_allocate_new_key(), + "should be at cap before reclaim" + ); + + // A new tag key should be tracked: reclaim fires, drops the empty + // `stale_tag` bucket, frees the slot, and the new pair is allocated. + let value = TagValueSet::from(["v".to_string()]); + let result = transform.try_accept_tag(metric_key.as_ref(), "fresh_tag", &value); + assert_eq!( + result, + AcceptResult::Tracked, + "fresh tag must be tracked after the stale bucket is reclaimed" + ); + let inner_after = transform.accepted_tags.get(&metric_key).unwrap(); + assert!( + inner_after.contains_key("fresh_tag"), + "fresh_tag must be present after reclaim" + ); + assert!( + !inner_after.contains_key("stale_tag"), + "stale_tag bucket must be dropped by reclaim" + ); + assert_eq!( + transform.tracked_keys_count, 1, + "tracked_keys_count must reflect the swap (1 dropped + 1 added)" + ); +} + /// Global per-tag YAML syntax mirrors per-metric `per_tag_limits`: `mode: limit_override` /// with `value_limit`, and `mode: excluded`. #[test] diff --git a/website/cue/reference/components/transforms/tag_cardinality_limit.cue b/website/cue/reference/components/transforms/tag_cardinality_limit.cue index 4b4191d98f4ca..b95a7a507787d 100644 --- a/website/cue/reference/components/transforms/tag_cardinality_limit.cue +++ b/website/cue/reference/components/transforms/tag_cardinality_limit.cue @@ -195,16 +195,27 @@ components: transforms: tag_cardinality_limit: { **Mode interaction**: - `mode: exact` — every value carries a precise last-seen timestamp. - Eviction is exact to within roughly `ttl_secs / ttl_generations` - (capped at a 1-second minimum to keep sweep cost negligible). + Eviction is exact to within roughly `ttl_secs / ttl_generations`. `ttl_generations` controls only the sweep cadence in exact mode. - `mode: probabilistic` — the underlying bloom filter is split into `ttl_generations` rolling shards. Memory cost rises to `ttl_generations * cache_size_per_key` per (metric, tag-key) pair. Reduce `cache_size_per_key` if you want to keep total memory flat. - `ttl_generations: 1` produces a tumbling window (everything resets + `ttl_generations: 1` produces a tumbling window (everything resets at once every `ttl_secs`), which can be useful for matching a strict - billing-window boundary. + billing-window boundary. The `value_limit` cap is enforced against an + upper-bound estimate of the union cardinality across shards. Under + refresh-on-sighting, hot continuously-seen values may be counted in + more than one shard, so the effective cap can be as low as + `value_limit / ttl_generations` for refresh-heavy workloads. If this + shows up as elevated `tag_value_limit_exceeded_total` without a + matching rise in distinct admitted values, set `ttl_generations: 1`. + + **`ttl_secs` shorter than `ttl_generations`**: the effective number of + generations is silently capped to `ttl_secs` so the configured TTL + window is honored exactly. For example `ttl_secs: 2, ttl_generations: 8` + resolves to 2 generations of 1-second slices (not 8 generations of + 1-second slices, which would stretch the window to 8 s). **Per-metric overrides do not inherit**: setting `ttl_secs` inside a `per_metric_limits.` block (or leaving it unset there) fully From 528561d02ff9a9b09c83882b1cba72c38411d24c Mon Sep 17 00:00:00 2001 From: Karol Chrapek Date: Fri, 22 May 2026 14:28:43 +0200 Subject: [PATCH 7/8] fix(tag_cardinality_limit transform): tighten TTL comments, capacity check, and Instant saturation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Trim Codex / "previously …" framing and consolidate the changelog so all pre-merge refinements ship under the single feature entry. - `tag_limit_exceeded` now reclaims empty TTL buckets before the `value_limit: 0` capacity check, matching `record_tag_value` and preventing `drop_event` events from silently admitting a value when the cap is full but contains reclaimable empty buckets. - `saturating_add` falls back to `instant + ~136 years` (with bisection) on `Instant + Duration` overflow so `next_rotate` is never pinned to `now`, avoiding `generations`-per-call rotation churn under pathological `ttl_secs`. Co-authored-by: Cursor --- .../tag_cardinality_limit_ttl.enhancement.md | 23 +- .../tag_cardinality_limit_ttl_fixes.fix.md | 31 --- src/transforms/tag_cardinality_limit/mod.rs | 71 ++--- .../tag_cardinality_limit/tag_value_set.rs | 260 +++++++----------- src/transforms/tag_cardinality_limit/tests.rs | 60 ++-- 5 files changed, 176 insertions(+), 269 deletions(-) delete mode 100644 changelog.d/tag_cardinality_limit_ttl_fixes.fix.md diff --git a/changelog.d/tag_cardinality_limit_ttl.enhancement.md b/changelog.d/tag_cardinality_limit_ttl.enhancement.md index 03cf1b6aacaea..7b4e6aa5a5d58 100644 --- a/changelog.d/tag_cardinality_limit_ttl.enhancement.md +++ b/changelog.d/tag_cardinality_limit_ttl.enhancement.md @@ -1,20 +1,9 @@ The `tag_cardinality_limit` transform gained an optional sliding-window TTL -for tracked tag values, controlled by two new settings on the global block, -each `per_metric_limits` entry, and the `Inner`/`OverrideInner` config schemas: - -- **`ttl_secs`**: expire tracked tag values after this many seconds since they - were last seen. Useful when the downstream system (e.g. Datadog custom - metrics) bills on a rolling unique-series window — without TTL, the - cardinality cache saturates `value_limit` and starts rejecting fresh values - long after the old ones have aged out of the billing window. When unset - (default), behavior is unchanged from previous releases. -- **`ttl_generations`**: tune how the TTL window is sliced for the - `probabilistic` backend. Defaults to `4` (eviction granularity = - `ttl_secs / 4`). Memory cost is `ttl_generations * cache_size_per_key` per - (metric, tag-key) pair; lower `cache_size_per_key` to keep total memory flat. - In `exact` mode this knob only controls the sweep cadence. - -A new internal counter `tag_cardinality_ttl_expirations_total` reports how -many distinct values are evicted by TTL. +for tracked tag values. Two new settings — `ttl_secs` and `ttl_generations` — +are available on the global block and each `per_metric_limits` entry; values +not observed within `ttl_secs` are evicted, freeing room under `value_limit`. +A new internal counter `tag_cardinality_ttl_expirations_total` reports the +number of values evicted by TTL. See the transform documentation for +configuration details and mode-specific behavior. authors: kaarolch diff --git a/changelog.d/tag_cardinality_limit_ttl_fixes.fix.md b/changelog.d/tag_cardinality_limit_ttl_fixes.fix.md deleted file mode 100644 index e845e291aa72b..0000000000000 --- a/changelog.d/tag_cardinality_limit_ttl_fixes.fix.md +++ /dev/null @@ -1,31 +0,0 @@ -Several fixes to the `tag_cardinality_limit` transform's sliding-window TTL: - -- The probabilistic backend's `value_limit` cap is now enforced against an - upper-bound estimate of the union cardinality across all generational - shards. Previously the cap was checked against the maximum count of any - single shard, which under-counts when distinct values are spread across - shards (high-churn / low-repeat traffic) and silently admitted values past - the configured `value_limit`. -- `ttl_generations` is now silently capped to `ttl_secs` when `ttl_secs` is - smaller, so the configured TTL window is honored exactly. Previously the - per-slice duration was floored to 1 second, which stretched the effective - retention window to `ttl_generations` seconds when `ttl_secs < - ttl_generations` (for example `ttl_secs: 1, ttl_generations: 4` kept values - for about 4 seconds). -- Rolling-bloom refresh-on-sighting now sets the bits in the newest shard - unconditionally instead of skipping the write when the bloom already reports - the value as present. The skip path could leave a recently-observed value - riding on another value's false-positive bits, so its lifetime depended on - when those unrelated bits aged out rather than on its own activity. -- Pathological `ttl_secs` configs near `u64::MAX` no longer panic the - transform on construction or rotation. `Instant + Duration` overflows are - now caught and saturated so the transform stays alive instead of crashing - on misconfiguration. -- When TTL eviction empties a `(metric, tag-key)` bucket, the slot is now - reclaimed under `max_tracked_keys`. Previously the bucket lingered with an - empty inner set and permanently consumed a slot, so high-churn workloads - could hit the cap forever even after older pairs had fully aged out. - Reclamation runs lazily on the cap-hit path, so steady-state allocations - are unaffected. - -authors: kaarolch diff --git a/src/transforms/tag_cardinality_limit/mod.rs b/src/transforms/tag_cardinality_limit/mod.rs index 587e4e61a2649..3825893fefcfc 100644 --- a/src/transforms/tag_cardinality_limit/mod.rs +++ b/src/transforms/tag_cardinality_limit/mod.rs @@ -82,21 +82,11 @@ impl TagCardinalityLimit { }); } - /// Drop (metric, tag-key) buckets whose `AcceptedTagValueSet` has been - /// fully expired by TTL, decrementing `tracked_keys_count` so freed slots - /// can be reused under `max_tracked_keys`. - /// - /// Without this, TTL sweeps in `TtlExactStorage` / `RollingBloomStorage` - /// only shrink the inner storage; the empty bucket itself lingers and - /// permanently consumes a slot. High-churn workloads under - /// `max_tracked_keys` would then hit the cap forever — every new - /// `(metric, tag-key)` pair would fall through as untracked even after - /// the cache for older pairs has fully aged out. - /// - /// Called lazily on cap-hit paths so the steady-state overhead is zero; - /// the scan only fires when a new allocation would otherwise be rejected. - /// `len()` is intentionally called on every set because it also drives - /// the lazy sweep/rotation that may be what empties the bucket. + /// Drop empty `AcceptedTagValueSet` buckets left behind by TTL eviction, + /// decrementing `tracked_keys_count` so freed slots can be reused under + /// `max_tracked_keys`. Called lazily on cap-hit paths so steady-state + /// overhead is zero. `len()` is called on every set because, for TTL + /// backends, it also drives the lazy sweep that may empty the bucket. fn reclaim_empty_buckets(&mut self) { let mut reclaimed = 0usize; self.accepted_tags.retain(|_, inner| { @@ -250,8 +240,7 @@ impl TagCardinalityLimit { if !pair_exists { if !self.can_allocate_new_key() { - // Reclaim before giving up: a TTL backend may have emptied - // buckets we're still accounting against `max_tracked_keys`. + // TTL may have emptied buckets that still count against `max_tracked_keys`. self.reclaim_empty_buckets(); if !self.can_allocate_new_key() { return AcceptResult::Untracked; @@ -266,8 +255,6 @@ impl TagCardinalityLimit { }); if tag_value_set.contains(value) { - // Already accepted; `contains` also refreshes the TTL lease on - // TTL backends. See `AcceptedTagValueSet::contains`. return AcceptResult::Tracked; } @@ -303,31 +290,34 @@ impl TagCardinalityLimit { TagSettings::Excluded => return false, TagSettings::Tracked(inner) => inner, }; - let can_allocate = self.can_allocate_new_key(); - match self + + if let Some(value_set) = self .accepted_tags .get_mut(&metric_key.cloned()) .and_then(|metric_accepted_tags| metric_accepted_tags.get_mut(key)) { - // Pattern guards bind variables immutably, so the mutable call - // can't live in a guard; hence the if/else inside the arm. - Some(value_set) => { - // Must be the non-refreshing variant: `DropEvent` may still - // reject this event on a later tag, and we must not extend - // any TTL lease for an event that gets dropped. Refresh - // happens on the accept path via `record_tag_value::insert`. - if value_set.contains_no_refresh(value) { - false - } else { - value_set.len() >= resolved.value_limit - } - } - // Missing bucket: treat as empty so `value_limit: 0` rejects the - // first occurrence too — but only when the pair can actually be - // allocated. Otherwise `record_tag_value` will forward untracked - // (and emit `TagCardinalityLimitUntracked`). - None => resolved.value_limit == 0 && can_allocate, + // Non-refreshing: `DropEvent` may still reject this event on + // a later tag; refresh happens on the accept path via + // `record_tag_value`. + return if value_set.contains_no_refresh(value) { + false + } else { + value_set.len() >= resolved.value_limit + }; + } + + // Missing bucket: only `value_limit == 0` can flag the first + // sighting as exceeded; every other limit fits an empty set. + if resolved.value_limit != 0 { + return false; + } + // Mirror `record_tag_value`'s capacity view: reclaim empty TTL + // buckets first so we don't pass-through-untracked here and let + // the record path silently admit a value that should be rejected. + if !self.can_allocate_new_key() { + self.reclaim_empty_buckets(); } + self.can_allocate_new_key() } /// Record an accepted tag value (mutation-only, no limit check). Used by the `DropEvent` @@ -356,8 +346,7 @@ impl TagCardinalityLimit { if !pair_exists { if !self.can_allocate_new_key() { - // See `try_accept_tag` — try reclaiming empty TTL buckets - // before treating the new pair as untracked. + // See `try_accept_tag` for rationale. self.reclaim_empty_buckets(); if !self.can_allocate_new_key() { return true; diff --git a/src/transforms/tag_cardinality_limit/tag_value_set.rs b/src/transforms/tag_cardinality_limit/tag_value_set.rs index f97b3bff17132..fda797692695d 100644 --- a/src/transforms/tag_cardinality_limit/tag_value_set.rs +++ b/src/transforms/tag_cardinality_limit/tag_value_set.rs @@ -26,12 +26,23 @@ use crate::{ transforms::tag_cardinality_limit::config::Mode, }; -/// `Instant + Duration` panics if the resulting instant is outside the -/// platform's representable range. Pathological `ttl_secs` values (close to -/// `u64::MAX`) can hit that limit; this helper falls back to the original -/// instant so the transform stays alive instead of crashing on misconfiguration. +/// `Instant + Duration` panics outside the platform's representable range. +/// On overflow, push the deadline `~136 years` into the future so rotation +/// schedules degrade to a stable no-op rather than panicking or churning +/// (returning `instant` here would leave `next_rotate <= now` on every call +/// and force `generations` rotations per access). fn saturating_add(instant: Instant, duration: Duration) -> Instant { - instant.checked_add(duration).unwrap_or(instant) + if let Some(result) = instant.checked_add(duration) { + return result; + } + let mut fallback = Duration::from_secs(u32::MAX as u64); + while !fallback.is_zero() { + if let Some(result) = instant.checked_add(fallback) { + return result; + } + fallback = Duration::from_secs(fallback.as_secs() / 2); + } + instant } /// Container for storing the set of accepted values for a given tag key. @@ -65,16 +76,9 @@ impl BloomFilterStorage { } fn insert(&mut self, value: &TagValueSet) { - // Set the bits unconditionally; `bloomy::BloomFilter::insert` is - // idempotent at the bit level (already-set bits are a no-op). The - // unconditional form matters for the rolling-bloom refresh path: - // `RollingBloomStorage::contains` re-inserts hits into the newest - // shard, and skipping that write on a false-positive `contains` hit - // would mean the value rides on someone else's bits in the newest - // shard rather than holding its own — making its lifetime depend on - // when those unrelated bits age out instead of the value's own - // activity. Count tracking remains conditional so we still report - // distinct first-time inserts. + // Write bits unconditionally so the rolling-bloom refresh path can + // not leave a value riding on another value's false-positive bits. + // Count tracks distinct first sightings only. let was_already_present = self.inner.contains(value); self.inner.insert(value); if !was_already_present { @@ -91,16 +95,11 @@ impl BloomFilterStorage { } } -// ============================================================================= -// Exact mode + TTL -// ============================================================================= - /// `HashMap`-backed exact cache with per-value last-seen timestamps. /// -/// Sweep policy: at most once per `sweep_interval` (= `ttl / max(generations, 1)`, -/// floored to 1 second), `retain` drops every entry whose `last_seen` is older -/// than `ttl`. The sweep runs inside `insert`/`contains`/`len` — lazy, no -/// background task. +/// At most once per `sweep_interval` (= `ttl / effective_generations`), +/// `retain` drops every entry whose `last_seen` is older than `ttl`. The +/// sweep runs lazily inside `insert`/`contains`/`len` — no background task. struct TtlExactStorage { map: HashMap, ttl: Duration, @@ -110,13 +109,9 @@ struct TtlExactStorage { impl TtlExactStorage { fn new(ttl: Duration, generations: u8) -> Self { - // Same cap as `RollingBloomStorage::new`: clamp effective generations - // so `sweep_interval >= 1s` without stretching the configured TTL - // window. For `ttl_secs >= 1` the invariant - // `sweep_interval * effective == ttl` holds exactly. Eviction - // precision in exact mode is `[ttl, ttl + sweep_interval)` — the cap - // keeps that upper bound proportional to the configured TTL instead - // of being silently widened by the old 1-second floor. + // Cap effective generations so `sweep_interval >= 1s` and + // `sweep_interval * effective == ttl`. Eviction precision is then + // `[ttl, ttl + sweep_interval)`. let requested = generations.max(1) as u32; let max_for_ttl = ttl.as_secs().max(1) as u32; let effective = requested.min(max_for_ttl).max(1); @@ -181,10 +176,6 @@ impl TtlExactStorage { } } -// ============================================================================= -// Probabilistic mode + TTL: rolling bloom filter -// ============================================================================= - /// Sliding-window bloom: `generations` shards, each a full `cache_size_per_key` /// bloom filter. Front of the deque is the oldest shard; back is the current. /// On rotation, the front shard is dropped and a fresh empty one is pushed at the @@ -195,22 +186,16 @@ struct RollingBloomStorage { generations: u8, slice: Duration, cache_size_per_key: usize, - /// The boundary at which the next rotation is due. Advances by `slice` on - /// every rotation. Storing the *next-tick* timestamp instead of "last_rotate" - /// makes the catch-up loop in `rotate_if_needed` trivial and tolerant of - /// long pauses between calls. + /// Boundary at which the next rotation is due. Advances by `slice` on + /// every rotation; storing the next tick (not the last) keeps the + /// catch-up loop in `rotate_if_needed` trivial. next_rotate: Instant, } impl RollingBloomStorage { fn new(cache_size_per_key: usize, generations: u8, ttl: Duration) -> Self { - // Cap effective generations so `slice >= 1s` (avoid `rotate_if_needed` - // spinning) WITHOUT stretching the TTL window past `ttl_secs`. The - // previous behavior — flooring `slice` to 1s — silently grew the - // effective retention window to `1s × ttl_generations` whenever - // `ttl_secs < ttl_generations`, violating the configured TTL contract. - // For every valid `ttl_secs >= 1`, this preserves the invariant - // `slice * effective_generations == ttl` exactly. + // Cap effective generations so `slice >= 1s` and + // `slice * effective_generations == ttl`. let requested = generations.max(1) as u32; let max_for_ttl = ttl.as_secs().max(1) as u32; let effective = requested.min(max_for_ttl).max(1); @@ -223,12 +208,6 @@ impl RollingBloomStorage { generations: effective as u8, slice, cache_size_per_key, - // Pathological `ttl_secs` (e.g. close to `u64::MAX`) could push - // `now + slice` past `Instant`'s platform-specific range and panic. - // Saturating to `now` is safe: the first call to `rotate_if_needed` - // will fast-forward `next_rotate` via the same guard below and - // settle into a normal rotation cadence (which for any oversized - // TTL will never actually fire under realistic Vector uptime). next_rotate: saturating_add(now, slice), } } @@ -261,15 +240,11 @@ impl RollingBloomStorage { fn contains(&mut self, value: &TagValueSet) -> bool { let now = Instant::now(); self.rotate_if_needed(now); - - // Check newest -> oldest so hot values short-circuit immediately. + // Newest -> oldest short-circuits hot values; re-seed hits into the + // newest shard so they survive the next rotation. let found = self.shards.iter().rev().any(|s| s.contains(value)); - if found { - // Refresh: ensure the value is in the current shard so it survives - // the next rotation. `BloomFilterStorage::insert` is idempotent. - if let Some(newest) = self.shards.back_mut() { - newest.insert(value); - } + if found && let Some(newest) = self.shards.back_mut() { + newest.insert(value); } found } @@ -291,27 +266,13 @@ impl RollingBloomStorage { } } - /// Upper bound on the number of distinct values currently retained. - /// - /// Bloom shards can't be enumerated, so the true union cardinality - /// isn't directly computable. Summing per-shard counts is a strict - /// upper bound: a value present in N shards (because refresh-on-sighting - /// has re-seeded it into newer shards) contributes N to the sum but - /// only 1 to the union. - /// - /// Bias is bounded by `ttl_generations`. Cold-churn workloads — the - /// original TTL motivation (`pod_name`, ephemeral IDs) — see no bias - /// because each value lives in exactly one shard. Hot, continuously- - /// sighted values are over-counted by up to a factor of `ttl_generations`; - /// users running such workloads can set `ttl_generations: 1` (tumbling - /// window) to eliminate it. + /// Strict upper bound on the number of distinct values currently retained. /// - /// `sum` is preferred over `max` because a cardinality limiter must - /// never silently admit values past `value_limit`. `max` underestimates - /// the union (distinct values spread across shards keep every shard - /// below the cap while the union exceeds it). The only failure mode of - /// `sum` is over-rejection, which is observable via - /// `tag_value_limit_exceeded_total`. + /// Bloom shards are not enumerable, so the true union cardinality is not + /// directly computable. Summing per-shard counts never under-counts; the + /// alternative — `max` — could let distinct values spread across shards + /// silently exceed `value_limit`. See the `ttl` section of the transform + /// documentation for the over-rejection trade-off. fn len(&mut self) -> usize { let now = Instant::now(); self.rotate_if_needed(now); @@ -319,8 +280,6 @@ impl RollingBloomStorage { } } -// ============================================================================= - impl fmt::Debug for TagValueSetStorage { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -345,9 +304,7 @@ impl AcceptedTagValueSet { /// /// When `ttl_secs` is `None` or `0`, this is identical to the pre-TTL /// behavior — `HashSet` for exact, single `BloomFilter` for probabilistic — - /// so existing configs see zero behavioral change. Neither backend - /// pre-allocates with `value_limit` capacity (see vectordotdev/vector#25480 - /// for the memory rationale). + /// so existing configs see zero behavioral change. pub fn new(mode: &Mode, ttl_secs: Option, ttl_generations: u8) -> Self { let ttl = ttl_secs.and_then(|s| (s > 0).then(|| Duration::from_secs(s))); @@ -446,10 +403,6 @@ mod tests { #[test] fn bloom_filter_storage_count_is_idempotent_per_value() { - // Pin the count contract that survives Codex P1: `BloomFilterStorage::insert` - // now writes the bloom bits unconditionally, but the count must still - // bump exactly once per distinct value so per-shard `len()` stays - // accurate. Inserting the same value twice must leave count at 1. let mut b = BloomFilterStorage::new(default_cache_size()); b.insert(&v("a")); b.insert(&v("a")); @@ -485,35 +438,27 @@ mod tests { assert!(set.contains(&v("b"))); } - // -------------------- TtlExactStorage -------------------- - // - // We exercise the storage type directly so we can advance wall-clock time - // via the `now: Instant` parameter on private helpers. `AcceptedTagValueSet` - // itself uses `Instant::now()`, which can't be mocked cheaply. + // The storage types are exercised directly so we can drive + // `Instant`-typed parameters on private helpers; `AcceptedTagValueSet` + // itself calls `Instant::now()`, which can't be mocked cheaply. #[test] fn ttl_exact_expires_values_past_ttl() { let ttl = Duration::from_secs(60); let mut s = TtlExactStorage::new(ttl, 4); - // t0: insert v=a manually so we control its timestamp. let t0 = Instant::now(); s.map.insert(v("a"), t0); s.last_sweep = t0; - // t0+30s: still alive. s.sweep(t0 + Duration::from_secs(30)); - assert!(s.map.contains_key(&v("a"))); - // t0+90s: expired. + assert!(s.map.contains_key(&v("a")), "still alive within ttl"); s.sweep(t0 + Duration::from_secs(90)); - assert!(!s.map.contains_key(&v("a"))); + assert!(!s.map.contains_key(&v("a")), "evicted past ttl"); } #[test] fn ttl_exact_refresh_on_contains_extends_lease() { - // Seed the map with an old timestamp, then drive a real `contains`. - // The refresh path (`*slot = now;`) must push the stored Instant - // forward — otherwise sweeps continue to use the old (potentially - // expired) timestamp. A short sleep guarantees `Instant::now()` is - // strictly after `t_insert` on every platform. + // Short sleep guarantees `Instant::now()` is strictly after `t_insert` + // on every platform. let mut s = TtlExactStorage::new(Duration::from_secs(60), 4); let t_insert = Instant::now(); s.map.insert(v("a"), t_insert); @@ -531,27 +476,19 @@ mod tests { #[test] fn ttl_exact_caps_generations_when_ttl_lt_generations() { - // Regression for Codex P2: previously `sweep_interval` was floored to - // 1s, which silently stretched the effective TTL window when - // `ttl_secs < generations`. The fix caps the effective generations - // so `sweep_interval * effective == ttl` exactly while keeping - // `sweep_interval >= 1s`. + // ttl=1s, generations=4 → effective=1, sweep_interval=1s. let s = TtlExactStorage::new(Duration::from_secs(1), 4); assert_eq!(s.sweep_interval, Duration::from_secs(1)); - // ttl=2s / generations=8 → effective=2, sweep_interval=1s (not floored - // from 250ms with the old code, which would have stretched the - // window to 8s). + // ttl=2s, generations=8 → effective=2, sweep_interval=1s. let s = TtlExactStorage::new(Duration::from_secs(2), 8); assert_eq!(s.sweep_interval, Duration::from_secs(1)); - // Tall TTL is unaffected. + // ttl >> generations is unaffected by the cap. let s = TtlExactStorage::new(Duration::from_secs(3600), 4); assert_eq!(s.sweep_interval, Duration::from_secs(900)); } #[test] fn ttl_exact_contains_no_refresh_does_not_extend_lease() { - // Regression for the `DropEvent` pre-check bug: a read-only check - // must NOT update the stored Instant. let ttl = Duration::from_secs(60); let mut s = TtlExactStorage::new(ttl, 4); let t0 = Instant::now(); @@ -564,24 +501,19 @@ mod tests { Some(t0), "timestamp must remain at t0 after no-refresh checks" ); - // Sanity: the refreshing variant `contains` *does* update the - // timestamp. We can't pin its exact post-call value (it depends on - // `Instant::now()`), but it must have moved forward. + // Sanity: the refreshing variant must move the timestamp forward. s.contains(&v("a")); assert!(s.map.get(&v("a")).copied().unwrap() >= t0); } #[test] fn rolling_bloom_contains_no_refresh_does_not_seed_newest_shard() { - // Same regression as above, but for the probabilistic backend: a - // no-refresh check must not insert into the newest shard. let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(4)); s.shards.back_mut().unwrap().insert(&v("a")); - // Drive a rotation so we have a distinct newest shard. + // Drive a rotation so "a" sits in the (now older) front shard. let t0 = Instant::now(); s.next_rotate = t0 + Duration::from_secs(1); s.rotate_if_needed(t0 + Duration::from_secs(2)); - // "a" is in the (now older) front shard, not the back. let newest_before = s.shards.back().unwrap().count(); assert!(s.contains_no_refresh(&v("a"))); let newest_after = s.shards.back().unwrap().count(); @@ -589,16 +521,13 @@ mod tests { newest_before, newest_after, "contains_no_refresh must not seed the newest shard" ); - // Sanity: the refreshing variant *does* seed it. + // Sanity: the refreshing variant must seed it. assert!(s.contains(&v("a"))); assert!(s.shards.back().unwrap().contains(&v("a"))); } - // -------------------- RollingBloomStorage -------------------- - #[test] fn rolling_bloom_drops_oldest_shard_on_rotate() { - // ttl=4s, generations=4 → 1s per shard. let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(4)); let t0 = Instant::now(); s.next_rotate = t0 + Duration::from_secs(1); @@ -613,10 +542,8 @@ mod tests { #[test] fn rolling_bloom_refresh_on_contains_seeds_newest_shard() { - // Force one rotation so `hot` lives in the front shard and the back - // is fresh-empty. A real `contains` call must re-seed it into the - // newest shard — this is what gives hot values survival across - // future rotations. + // After one rotation, `hot` lives in the front shard and the back is + // fresh-empty; `contains` must re-seed it into the newest shard. let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(4)); s.shards.back_mut().unwrap().insert(&v("hot")); let t0 = Instant::now(); @@ -638,13 +565,12 @@ mod tests { #[test] fn rolling_bloom_catch_up_capped_to_generations() { - // Long idle period: ensure rotate_if_needed doesn't spin past - // `generations` even if the elapsed time covers many windows. + // After a long idle gap, `rotate_if_needed` must rotate at most + // `generations` times even if elapsed covers many windows. let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(4)); let t0 = Instant::now(); s.next_rotate = t0 + Duration::from_secs(1); s.shards.back_mut().unwrap().insert(&v("stale")); - // 1 hour gap: should rotate exactly `generations` times. s.rotate_if_needed(t0 + Duration::from_secs(3600)); assert_eq!(s.shards.len(), 4, "deque size capped at `generations`"); assert!( @@ -655,8 +581,7 @@ mod tests { #[test] fn rolling_bloom_generations_clamped_to_at_least_one() { - // generations=0 would imply div-by-zero or an empty deque; ensure - // the constructor clamps it so we always have at least one shard. + // `generations: 0` would divide by zero / leave an empty deque. let s = RollingBloomStorage::new(default_cache_size(), 0, Duration::from_secs(60)); assert_eq!(s.generations, 1); assert_eq!(s.shards.len(), 1); @@ -664,16 +589,11 @@ mod tests { #[test] fn rolling_bloom_caps_generations_when_ttl_lt_generations() { - // Regression for Codex P2: with ttl=1s and the default generations=4, - // the old code floored `slice` to 1s and silently stretched the - // effective window to ~4s. The fix caps effective generations to - // `ttl.as_secs()` so `slice * generations == ttl` is preserved. + // ttl=1s, generations=4 → effective=1, slice=1s. let s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(1)); assert_eq!(s.generations, 1, "effective generations capped to ttl"); assert_eq!(s.slice, Duration::from_secs(1)); - // ttl=2s / generations=8 → effective=2, slice=1s. Without the cap - // the old code produced slice=1s but kept generations=8, stretching - // the window to 8s. + // ttl=2s, generations=8 → effective=2, slice=1s. let s = RollingBloomStorage::new(default_cache_size(), 8, Duration::from_secs(2)); assert_eq!(s.generations, 2); assert_eq!(s.slice, Duration::from_secs(1)); @@ -682,7 +602,7 @@ mod tests { #[test] fn rolling_bloom_window_matches_ttl_exactly() { // `slice * effective_generations == ttl` must hold for every valid - // (ttl_secs, generations) so the configured TTL is honored exactly. + // (ttl_secs, generations). for (ttl_secs, generations) in [(1u64, 4u8), (2, 8), (3, 4), (60, 4), (3600, 4), (86400, 6)] { let s = RollingBloomStorage::new( @@ -704,12 +624,8 @@ mod tests { #[test] fn rolling_bloom_len_sums_across_shards() { - // Regression for Codex P1: `len()` must return the sum of per-shard - // counts, not the max. Otherwise distinct cold values spread across - // shards keep every individual shard below `value_limit` while the - // union exceeds it, and the cardinality limiter silently admits past - // the cap. Build the deque directly so the layout is unambiguous; - // `rotate_if_needed` is exercised separately by other tests. + // Distinct values spread across shards must contribute to `len()`; + // otherwise the union could silently exceed `value_limit`. let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(4)); s.shards.clear(); for name in ["a", "b", "c", "d"] { @@ -717,11 +633,8 @@ mod tests { shard.insert(&v(name)); s.shards.push_back(shard); } - // Push the next rotation far enough out that the `len()` call below - // doesn't lazily rotate and disturb our hand-built layout. + // Push the next rotation far out so `len()` doesn't lazily rotate. s.next_rotate = Instant::now() + Duration::from_secs(3600); - // Each shard holds 1 distinct value. max() would return 1; the union - // is 4 — and `sum` must return 4. assert_eq!( s.len(), 4, @@ -731,25 +644,48 @@ mod tests { #[test] fn rolling_bloom_oversized_ttl_doesnt_panic() { - // Regression for Codex P2 (Instant overflow): a pathological - // `ttl_secs` close to `u64::MAX` would previously panic the - // transform via `now + slice` overflowing `Instant`'s - // platform-specific range. The `saturating_add` guard must keep - // construction and the first rotation alive. let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(u64::MAX)); - // Operate on the storage so we exercise both call sites that use - // `saturating_add` (constructor + `rotate_if_needed`). + // Exercises both `saturating_add` call sites (constructor and + // `rotate_if_needed`). s.insert(&v("a")); assert!(s.contains(&v("a"))); assert_eq!(s.len(), 1); } + #[test] + fn saturating_add_overflow_pushes_deadline_far_into_future() { + // The fallback must advance `instant` by a non-trivial amount — + // returning `instant` itself would leave `next_rotate <= now` on + // every access and force `generations` rotations per call. + let now = Instant::now(); + let advanced = saturating_add(now, Duration::from_secs(u64::MAX)); + let gain = advanced.duration_since(now); + assert!( + gain >= Duration::from_secs(60 * 60 * 24 * 365), + "saturating_add must push the deadline at least a year out on \ + overflow; got {gain:?}", + ); + } + + #[test] + fn rolling_bloom_overflow_does_not_churn_on_repeated_access() { + // Repeated reads with an overflowing TTL must not silently rotate + // out values inserted between them; the rotation deadline has to + // sit far enough in the future that `rotate_if_needed` is a no-op. + let mut s = + RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(u64::MAX)); + s.insert(&v("a")); + for _ in 0..16 { + assert!(s.contains(&v("a"))); + } + assert_eq!(s.len(), 1); + } + #[test] fn rolling_bloom_len_upper_bounds_value_limit() { - // Pin the contract that motivates Fix 1: across the full window the - // sum-based `len()` must reach `value_limit` once enough distinct - // values are admitted, so `try_accept_tag` actually stops admitting. + // `len()` must reach `value_limit` once enough distinct values are + // admitted across the full window so `try_accept_tag` stops admitting. let value_limit = 8usize; let generations = 4u8; let mut s = diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index 4d6bad10db0d0..c835be970b7e2 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -758,18 +758,15 @@ fn max_tracked_keys_unlimited_by_default() { } } -/// Regression: with `value_limit: 0`, `limit_exceeded_action: drop_event`, and -/// `max_tracked_keys` exhausted, the documented untracked-passthrough behavior must -/// still apply. `tag_limit_exceeded` previously rejected *any* missing-bucket lookup -/// when `value_limit == 0`, causing events to be dropped before `record_tag_value` -/// could detect the allocation cap. New (metric, tag-key) pairs that cannot be -/// allocated must instead pass through unchecked. +/// With `value_limit: 0`, `limit_exceeded_action: drop_event`, and +/// `max_tracked_keys` exhausted, new (metric, tag-key) pairs that cannot be +/// allocated must pass through unchecked rather than dropping the event. #[test] fn max_tracked_keys_passthrough_with_zero_value_limit_drop_event() { - // metric_a uses a normal per-metric override with room for a value, so the first - // event reserves the only allocation slot. metric_b inherits the global config, - // which combines `value_limit: 0` + `drop_event` — the corner case that originally - // dropped events even when the pair couldn't be tracked. + // metric_a uses a per-metric override with room for a value, so the first + // event reserves the only allocation slot. metric_b inherits the global + // config — `value_limit: 0` + `drop_event` — which exercises the + // missing-bucket passthrough path under cap exhaustion. let config = make_transform_hashset_with_per_metric_limits( 0, LimitExceededAction::DropEvent, @@ -1591,12 +1588,8 @@ cache_size_per_key: 2048 assert_eq!(parsed.global.ttl_generations, default_ttl_generations()); } -/// Regression for Codex P2 (empty-bucket reclaim): once `max_tracked_keys` -/// is reached, the transform must reclaim buckets that have been fully -/// emptied by TTL eviction so new `(metric, tag-key)` pairs can be tracked -/// again. Without reclaim, high-churn workloads under `max_tracked_keys` -/// would silently stay in `Untracked` mode forever, even after old buckets -/// fully aged out. +/// Once `max_tracked_keys` is reached, buckets emptied by TTL eviction are +/// reclaimed so new `(metric, tag-key)` pairs can still be tracked. #[test] fn max_tracked_keys_reclaims_empty_buckets() { use super::tag_value_set::AcceptedTagValueSet; @@ -1618,8 +1611,6 @@ fn max_tracked_keys_reclaims_empty_buckets() { "should be at cap before reclaim" ); - // A new tag key should be tracked: reclaim fires, drops the empty - // `stale_tag` bucket, frees the slot, and the new pair is allocated. let value = TagValueSet::from(["v".to_string()]); let result = transform.try_accept_tag(metric_key.as_ref(), "fresh_tag", &value); assert_eq!( @@ -1642,6 +1633,39 @@ fn max_tracked_keys_reclaims_empty_buckets() { ); } +/// With `value_limit: 0` + `drop_event` + `max_tracked_keys` full but with a +/// reclaimable empty bucket, `tag_limit_exceeded` must reclaim before deciding +/// — otherwise the pre-check returns "not exceeded" (cap appears full → fall +/// through to untracked passthrough) while `record_tag_value` then reclaims +/// and admits a value that should have been rejected. +#[test] +fn tag_limit_exceeded_reclaims_before_value_limit_zero_check() { + use super::tag_value_set::AcceptedTagValueSet; + + let mut config = make_transform_hashset(0, LimitExceededAction::DropEvent); + config.max_tracked_keys = Some(1); + let mut transform = TagCardinalityLimit::new(config); + + let metric_key: Option = None; + let stale_set = AcceptedTagValueSet::new(&Mode::Exact, None, default_ttl_generations()); + let mut inner = hashbrown::HashMap::new(); + inner.insert("stale_tag".to_string(), stale_set); + transform.accepted_tags.insert(metric_key.clone(), inner); + transform.tracked_keys_count = 1; + + assert!( + !transform.can_allocate_new_key(), + "should be at cap before reclaim" + ); + + let value = TagValueSet::from(["v".to_string()]); + assert!( + transform.tag_limit_exceeded(metric_key.as_ref(), "fresh_tag", &value), + "value_limit: 0 with a reclaimable empty bucket must flag the new \ + tag as exceeded; otherwise record_tag_value would silently admit it" + ); +} + /// Global per-tag YAML syntax mirrors per-metric `per_tag_limits`: `mode: limit_override` /// with `value_limit`, and `mode: excluded`. #[test] From 44d9f079e4e1c66e625ae588aff5cad789ba37d0 Mon Sep 17 00:00:00 2001 From: Karol Chrapek Date: Wed, 3 Jun 2026 17:17:14 +0200 Subject: [PATCH 8/8] fix(tag_cardinality_limit transform): fix TTL window math, expired refresh, and zero-limit reclaim --- src/transforms/tag_cardinality_limit/mod.rs | 45 +++-- .../tag_cardinality_limit/tag_value_set.rs | 160 +++++++++++++++--- src/transforms/tag_cardinality_limit/tests.rs | 43 +++-- 3 files changed, 201 insertions(+), 47 deletions(-) diff --git a/src/transforms/tag_cardinality_limit/mod.rs b/src/transforms/tag_cardinality_limit/mod.rs index 3825893fefcfc..8d637c9cd2519 100644 --- a/src/transforms/tag_cardinality_limit/mod.rs +++ b/src/transforms/tag_cardinality_limit/mod.rs @@ -87,19 +87,42 @@ impl TagCardinalityLimit { /// `max_tracked_keys`. Called lazily on cap-hit paths so steady-state /// overhead is zero. `len()` is called on every set because, for TTL /// backends, it also drives the lazy sweep that may empty the bucket. + /// + /// Intentionally empty `value_limit: 0` buckets are kept: they enforce that + /// every value for that tag is rejected without storing state. fn reclaim_empty_buckets(&mut self) { let mut reclaimed = 0usize; - self.accepted_tags.retain(|_, inner| { - inner.retain(|_, set| { - if set.len() == 0 { - reclaimed += 1; - false - } else { - true - } - }); - !inner.is_empty() - }); + let empty_buckets: Vec<(Option, String)> = self + .accepted_tags + .iter_mut() + .flat_map(|(metric_key, inner)| { + inner.iter_mut().filter_map(|(tag_key, set)| { + if set.len() != 0 { + return None; + } + Some((metric_key.clone(), tag_key.clone())) + }) + }) + .collect(); + + let keys_to_drop: Vec<_> = empty_buckets + .into_iter() + .filter(|(metric_key, tag_key)| { + !matches!( + self.get_config_for_metric_tag(metric_key.as_ref(), tag_key), + TagSettings::Tracked(cfg) if cfg.value_limit == 0 + ) + }) + .collect(); + + for (metric_key, tag_key) in keys_to_drop { + if let Some(inner) = self.accepted_tags.get_mut(&metric_key) + && inner.remove(&tag_key).is_some() + { + reclaimed += 1; + } + } + self.accepted_tags.retain(|_, inner| !inner.is_empty()); if reclaimed > 0 { self.tracked_keys_count = self.tracked_keys_count.saturating_sub(reclaimed); emit!(TagCardinalityTrackedKeys { diff --git a/src/transforms/tag_cardinality_limit/tag_value_set.rs b/src/transforms/tag_cardinality_limit/tag_value_set.rs index fda797692695d..4a244b8130705 100644 --- a/src/transforms/tag_cardinality_limit/tag_value_set.rs +++ b/src/transforms/tag_cardinality_limit/tag_value_set.rs @@ -45,6 +45,23 @@ fn saturating_add(instant: Instant, duration: Duration) -> Instant { instant } +/// Compute time-slice count and duration so `slice * generations == ttl` exactly. +/// +/// Caps `generations` to at most one slice per second of TTL (so each slice is +/// `>= 1s` when `ttl >= 1s`) and to the caller's `ttl_generations` request. +/// Uses `Duration` division for `slice` so non-divisible TTLs (e.g. 10s / 4) +/// still sum to the configured window. `ttl.as_secs()` is kept in `u64` when +/// deriving the per-second cap so values `>= 2^32` seconds are not truncated +/// to zero by a premature `u32` cast. +fn compute_ttl_slices(ttl: Duration, ttl_generations: u8) -> (u32, Duration) { + let requested = u32::from(ttl_generations.max(1)); + let ttl_secs = ttl.as_secs().max(1); + let max_for_ttl = ttl_secs.min(u32::MAX as u64) as u32; + let generations = requested.min(max_for_ttl).max(1); + let slice = ttl / generations; + (generations, slice) +} + /// Container for storing the set of accepted values for a given tag key. #[derive(Debug)] pub struct AcceptedTagValueSet { @@ -97,7 +114,7 @@ impl BloomFilterStorage { /// `HashMap`-backed exact cache with per-value last-seen timestamps. /// -/// At most once per `sweep_interval` (= `ttl / effective_generations`), +/// At most once per `sweep_interval` (= `ttl / generations`, after capping), /// `retain` drops every entry whose `last_seen` is older than `ttl`. The /// sweep runs lazily inside `insert`/`contains`/`len` — no background task. struct TtlExactStorage { @@ -109,13 +126,10 @@ struct TtlExactStorage { impl TtlExactStorage { fn new(ttl: Duration, generations: u8) -> Self { - // Cap effective generations so `sweep_interval >= 1s` and - // `sweep_interval * effective == ttl`. Eviction precision is then + // Cap generations so `sweep_interval >= 1s` and + // `sweep_interval * generations == ttl`. Eviction precision is then // `[ttl, ttl + sweep_interval)`. - let requested = generations.max(1) as u32; - let max_for_ttl = ttl.as_secs().max(1) as u32; - let effective = requested.min(max_for_ttl).max(1); - let sweep_interval = ttl / effective; + let (_generations, sweep_interval) = compute_ttl_slices(ttl, generations); Self { map: HashMap::new(), ttl, @@ -131,6 +145,14 @@ impl TtlExactStorage { self.sweep(now); } + /// Drop entries whose own `last_seen` is outside the TTL window. Runs even + /// when `maybe_sweep` is not yet due so a lazy sweep interval cannot leave + /// expired values visible to `contains` / `len`. + fn purge_expired_entries(&mut self, now: Instant) { + self.map + .retain(|_, last_seen| now.duration_since(*last_seen) <= self.ttl); + } + fn sweep(&mut self, now: Instant) { let ttl = self.ttl; let before = self.map.len(); @@ -142,8 +164,12 @@ impl TtlExactStorage { } fn contains(&mut self, value: &TagValueSet) -> bool { - let now = Instant::now(); + self.contains_with_now(value, Instant::now()) + } + + fn contains_with_now(&mut self, value: &TagValueSet, now: Instant) -> bool { self.maybe_sweep(now); + self.purge_expired_entries(now); // Refresh lease on every sighting so continuously-seen values don't blink out. if let Some(slot) = self.map.get_mut(value) { *slot = now; @@ -158,20 +184,26 @@ impl TtlExactStorage { /// `DropEvent` pre-check paths where we must not mutate cache state for /// events that are about to be dropped. fn contains_no_refresh(&mut self, value: &TagValueSet) -> bool { - let now = Instant::now(); + self.contains_no_refresh_with_now(value, Instant::now()) + } + + fn contains_no_refresh_with_now(&mut self, value: &TagValueSet, now: Instant) -> bool { self.maybe_sweep(now); + self.purge_expired_entries(now); self.map.contains_key(value) } fn insert(&mut self, value: TagValueSet) { let now = Instant::now(); self.maybe_sweep(now); + self.purge_expired_entries(now); self.map.insert(value, now); } fn len(&mut self) -> usize { let now = Instant::now(); self.maybe_sweep(now); + self.purge_expired_entries(now); self.map.len() } } @@ -194,18 +226,14 @@ struct RollingBloomStorage { impl RollingBloomStorage { fn new(cache_size_per_key: usize, generations: u8, ttl: Duration) -> Self { - // Cap effective generations so `slice >= 1s` and - // `slice * effective_generations == ttl`. - let requested = generations.max(1) as u32; - let max_for_ttl = ttl.as_secs().max(1) as u32; - let effective = requested.min(max_for_ttl).max(1); - let slice = ttl / effective; - let mut shards = VecDeque::with_capacity(effective as usize); + // Cap generations so `slice >= 1s` and `slice * generations == ttl`. + let (generations, slice) = compute_ttl_slices(ttl, generations); + let mut shards = VecDeque::with_capacity(generations as usize); shards.push_back(BloomFilterStorage::new(cache_size_per_key)); let now = Instant::now(); Self { shards, - generations: effective as u8, + generations: generations as u8, slice, cache_size_per_key, next_rotate: saturating_add(now, slice), @@ -455,6 +483,29 @@ mod tests { assert!(!s.map.contains_key(&v("a")), "evicted past ttl"); } + #[test] + fn ttl_exact_contains_does_not_refresh_expired_entry() { + // Between sweeps, `contains` must not extend a value whose own + // `last_seen + ttl` has passed, even if `maybe_sweep` has not run yet. + let ttl = Duration::from_secs(60); + let mut s = TtlExactStorage::new(ttl, 4); + let t0 = Instant::now(); + s.map.insert(v("a"), t0); + s.last_sweep = t0; + let t50 = t0 + Duration::from_secs(50); + s.sweep(t50); + assert!(s.map.contains_key(&v("a")), "still within ttl at t=50"); + let t61 = t0 + Duration::from_secs(61); + assert!( + !s.contains_with_now(&v("a"), t61), + "expired value must not be refreshed into the window" + ); + assert!( + !s.map.contains_key(&v("a")), + "expired entry must be purged on access" + ); + } + #[test] fn ttl_exact_refresh_on_contains_extends_lease() { // Short sleep guarantees `Instant::now()` is strictly after `t_insert` @@ -601,10 +652,18 @@ mod tests { #[test] fn rolling_bloom_window_matches_ttl_exactly() { - // `slice * effective_generations == ttl` must hold for every valid - // (ttl_secs, generations). - for (ttl_secs, generations) in [(1u64, 4u8), (2, 8), (3, 4), (60, 4), (3600, 4), (86400, 6)] - { + // `slice * generations == ttl` must hold for every valid + // (ttl_secs, ttl_generations), including non-divisible TTLs (e.g. 10 / 4). + for (ttl_secs, generations) in [ + (1u64, 4u8), + (2, 8), + (3, 4), + (10, 4), + (60, 4), + (3600, 4), + (86400, 6), + (4294967296, 4), + ] { let s = RollingBloomStorage::new( default_cache_size(), generations, @@ -622,6 +681,65 @@ mod tests { } } + #[test] + fn compute_ttl_slices_non_divisible_ttl_uses_fractional_slices() { + // Integer seconds division would yield 10/4=2s slices and an ~8s window; + // Duration division must preserve the full 10s contract. + let (_, slice) = compute_ttl_slices(Duration::from_secs(10), 4); + assert_eq!(slice, Duration::from_millis(2500)); + assert_eq!(slice * 4, Duration::from_secs(10)); + } + + #[test] + fn compute_ttl_slices_large_ttl_secs_avoids_u32_truncation() { + // `ttl.as_secs() as u32` wraps at 2^32, collapsing the generation cap + // to 1 and changing rotation cadence. + let ttl = Duration::from_secs(4294967296); + let (generations, slice) = compute_ttl_slices(ttl, 4); + assert_eq!(generations, 4, "requested generations must not collapse to 1"); + assert_eq!(slice * generations, ttl); + + let rolling = RollingBloomStorage::new(default_cache_size(), 4, ttl); + assert_eq!(rolling.generations, 4); + assert_eq!(rolling.slice * u32::from(rolling.generations), ttl); + + let exact = TtlExactStorage::new(ttl, 4); + assert_eq!(exact.sweep_interval * 4, ttl); + } + + #[test] + fn rolling_bloom_hot_value_survives_rotation_after_contains_refresh() { + // If refresh skipped insert when the newest shard already matched via + // false positive, the value could vanish once the older shard rotated + // out. Pollute the newest shard heavily so FP hits are likely, then + // verify a front-shard-only value survives rotation after `contains()` + // refresh. + const TINY: usize = 128; + let mut s = RollingBloomStorage::new(TINY, 2, Duration::from_secs(120)); + let hot = v("hot-value-survivor"); + s.shards.back_mut().unwrap().insert(&hot); + let t0 = Instant::now(); + s.next_rotate = t0 + Duration::from_secs(1); + s.rotate_if_needed(t0 + Duration::from_secs(2)); + assert!( + s.shards.front().unwrap().contains(&hot), + "after one rotation hot must live in the front shard" + ); + for i in 0..512 { + s.shards.back_mut().unwrap().insert(&v(&format!("pollute-{i}"))); + } + assert!(s.contains(&hot), "must hit the front shard and refresh"); + assert!( + s.shards.back().unwrap().contains(&hot), + "refresh must unconditionally seed the newest shard" + ); + s.rotate_if_needed(t0 + Duration::from_secs(62)); + assert!( + s.contains_no_refresh(&hot), + "hot must survive after the front shard carrying the original copy rotates out" + ); + } + #[test] fn rolling_bloom_len_sums_across_shards() { // Distinct values spread across shards must contribute to `len()`; diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index c835be970b7e2..73785d5840e4d 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -1633,13 +1633,11 @@ fn max_tracked_keys_reclaims_empty_buckets() { ); } -/// With `value_limit: 0` + `drop_event` + `max_tracked_keys` full but with a -/// reclaimable empty bucket, `tag_limit_exceeded` must reclaim before deciding -/// — otherwise the pre-check returns "not exceeded" (cap appears full → fall -/// through to untracked passthrough) while `record_tag_value` then reclaims -/// and admits a value that should have been rejected. +/// With `value_limit: 0` + `max_tracked_keys` at cap, an intentionally empty +/// enforcement bucket must not be reclaimed, so additional tag keys pass through +/// unchecked rather than stealing the slot. #[test] -fn tag_limit_exceeded_reclaims_before_value_limit_zero_check() { +fn max_tracked_keys_preserves_zero_limit_enforcement_bucket() { use super::tag_value_set::AcceptedTagValueSet; let mut config = make_transform_hashset(0, LimitExceededAction::DropEvent); @@ -1647,22 +1645,37 @@ fn tag_limit_exceeded_reclaims_before_value_limit_zero_check() { let mut transform = TagCardinalityLimit::new(config); let metric_key: Option = None; - let stale_set = AcceptedTagValueSet::new(&Mode::Exact, None, default_ttl_generations()); + let zero_limit_bucket = AcceptedTagValueSet::new(&Mode::Exact, None, default_ttl_generations()); let mut inner = hashbrown::HashMap::new(); - inner.insert("stale_tag".to_string(), stale_set); + inner.insert("enforced_tag".to_string(), zero_limit_bucket); transform.accepted_tags.insert(metric_key.clone(), inner); transform.tracked_keys_count = 1; + let value = TagValueSet::from(["v".to_string()]); assert!( - !transform.can_allocate_new_key(), - "should be at cap before reclaim" + !transform.tag_limit_exceeded(metric_key.as_ref(), "fresh_tag", &value), + "cap-full zero-limit bucket must not be reclaimed; new key is untracked" + ); + let result = transform.try_accept_tag(metric_key.as_ref(), "fresh_tag", &value); + assert_eq!( + result, + AcceptResult::Untracked, + "fresh tag must pass through unchecked when the zero-limit slot is preserved" + ); + assert!( + transform + .accepted_tags + .get(&metric_key) + .unwrap() + .contains_key("enforced_tag"), + "zero-limit enforcement bucket must remain allocated" ); - - let value = TagValueSet::from(["v".to_string()]); assert!( - transform.tag_limit_exceeded(metric_key.as_ref(), "fresh_tag", &value), - "value_limit: 0 with a reclaimable empty bucket must flag the new \ - tag as exceeded; otherwise record_tag_value would silently admit it" + !transform + .accepted_tags + .get(&metric_key) + .unwrap() + .contains_key("fresh_tag"), ); }