diff --git a/changelog.d/tag_cardinality_limit_ttl.enhancement.md b/changelog.d/tag_cardinality_limit_ttl.enhancement.md new file mode 100644 index 0000000000000..7b4e6aa5a5d58 --- /dev/null +++ b/changelog.d/tag_cardinality_limit_ttl.enhancement.md @@ -0,0 +1,9 @@ +The `tag_cardinality_limit` transform gained an optional sliding-window TTL +for tracked tag values. Two new settings — `ttl_secs` and `ttl_generations` — +are available on the global block and each `per_metric_limits` entry; values +not observed within `ttl_secs` are evicted, freeing room under `value_limit`. +A new internal counter `tag_cardinality_ttl_expirations_total` reports the +number of values evicted by TTL. See the transform documentation for +configuration details and mode-specific behavior. + +authors: kaarolch diff --git a/lib/vector-common/src/internal_event/metric_name.rs b/lib/vector-common/src/internal_event/metric_name.rs index f520a3ecee316..52fb1c597d804 100644 --- a/lib/vector-common/src/internal_event/metric_name.rs +++ b/lib/vector-common/src/internal_event/metric_name.rs @@ -76,6 +76,7 @@ pub enum CounterName { StaleEventsFlushedTotal, StartedTotal, StoppedTotal, + TagCardinalityTtlExpirationsTotal, TagCardinalityUntrackedEventsTotal, TagValueLimitExceededTotal, ValueLimitReachedTotal, @@ -335,6 +336,7 @@ impl CounterName { Self::StaleEventsFlushedTotal => "stale_events_flushed_total", Self::StartedTotal => "started_total", Self::StoppedTotal => "stopped_total", + Self::TagCardinalityTtlExpirationsTotal => "tag_cardinality_ttl_expirations_total", Self::TagCardinalityUntrackedEventsTotal => "tag_cardinality_untracked_events_total", Self::TagValueLimitExceededTotal => "tag_value_limit_exceeded_total", Self::ValueLimitReachedTotal => "value_limit_reached_total", diff --git a/src/internal_events/tag_cardinality_limit.rs b/src/internal_events/tag_cardinality_limit.rs index 8b9cd25c6d64c..77b86ebe86a42 100644 --- a/src/internal_events/tag_cardinality_limit.rs +++ b/src/internal_events/tag_cardinality_limit.rs @@ -103,3 +103,27 @@ impl InternalEvent for TagCardinalityTrackedKeys { gauge!(GaugeName::TagCardinalityTrackedKeys).set(self.count as f64); } } + +/// Emitted when a TTL sweep removes tag values from a tracking bucket. +/// +/// The `count` is the number of *distinct* values evicted in that pass. +/// For the probabilistic backend this is the count drained from the oldest +/// rolling-bloom shard; for the exact backend it is the number of entries +/// whose last sighting was older than `ttl_secs`. +#[derive(NamedInternalEvent)] +pub struct TagCardinalityTtlExpired { + pub count: u64, +} + +impl InternalEvent for TagCardinalityTtlExpired { + fn emit(self) { + if self.count == 0 { + return; + } + debug!( + message = "Expired tag values from cardinality cache.", + count = self.count, + ); + counter!(CounterName::TagCardinalityTtlExpirationsTotal).increment(self.count); + } +} diff --git a/src/transforms/tag_cardinality_limit/config.rs b/src/transforms/tag_cardinality_limit/config.rs index a1471746972fa..4d83441b7218e 100644 --- a/src/transforms/tag_cardinality_limit/config.rs +++ b/src/transforms/tag_cardinality_limit/config.rs @@ -98,6 +98,35 @@ pub struct Inner { #[configurable(derived)] #[serde(default)] pub internal_metrics: InternalMetricsConfig, + + /// Expire tracked tag values after this many seconds since they were last seen. + /// + /// When unset (default) or set to `0`, values persist for the lifetime of the + /// process — the historical behavior. When set to a positive value, the + /// transform behaves like a sliding window: any tag value not observed within + /// the TTL is dropped, freeing room under `value_limit` for fresh values. + /// Useful for bounding cost on backends (e.g. Datadog custom metrics) that + /// bill on a rolling unique-series window. + /// + /// In `exact` mode every value carries a precise last-seen timestamp; in + /// `probabilistic` mode the underlying bloom filter is split into + /// `ttl_generations` rolling shards, so eviction is approximate to within + /// `ttl_secs / ttl_generations`. + #[serde(default)] + #[configurable(metadata(docs::human_name = "TTL (seconds)"))] + pub ttl_secs: Option, + + /// Number of time-slices the TTL window is split into for the + /// `probabilistic` backend. + /// + /// Higher values smooth eviction (closer to a true sliding window) at the + /// cost of `ttl_generations * cache_size_per_key` memory per (metric, + /// tag-key) pair. `1` produces a tumbling window: all tracked values are + /// dropped at once every `ttl_secs`. Ignored when `ttl_secs` is unset, or + /// when mode is `exact` (which uses precise per-value timestamps). + #[serde(default = "default_ttl_generations")] + #[configurable(metadata(docs::human_name = "TTL Generations"))] + pub ttl_generations: u8, } /// Controls the approach taken for tracking tag cardinality at the global level. @@ -169,6 +198,23 @@ pub struct OverrideInner { #[configurable(derived)] #[serde(default)] pub internal_metrics: InternalMetricsConfig, + + /// Per-metric TTL for tracked tag values. See [`Inner::ttl_secs`] for the + /// full description. + /// + /// Per-metric TTL is a **full override** of the global TTL — it does not + /// inherit. Leaving this unset means "no TTL for this metric", *not* + /// "fall back to the global `ttl_secs`". This mirrors how a per-metric + /// `value_limit` fully shadows the global one. If you want a metric to + /// share the global TTL, copy the value explicitly. + #[serde(default)] + #[configurable(metadata(docs::human_name = "TTL (seconds)"))] + pub ttl_secs: Option, + + /// Per-metric override for `ttl_generations`. See [`Inner::ttl_generations`]. + #[serde(default = "default_ttl_generations")] + #[configurable(metadata(docs::human_name = "TTL Generations"))] + pub ttl_generations: u8, } /// Controls the approach taken for tracking tag cardinality at the per-metric level. @@ -308,6 +354,13 @@ pub(crate) const fn default_cache_size() -> usize { 5 * 1024 // 5KB } +/// Default number of rolling-bloom shards. Four gives a reasonable middle ground: +/// eviction granularity of `ttl/4`, and a 4x memory multiplier on +/// `cache_size_per_key` for users who opt into TTL. +pub(crate) const fn default_ttl_generations() -> u8 { + 4 +} + // ============================================================================= // Transform plumbing // ============================================================================= @@ -320,6 +373,8 @@ impl GenerateConfig for Config { value_limit: default_value_limit(), limit_exceeded_action: default_limit_exceeded_action(), internal_metrics: InternalMetricsConfig::default(), + ttl_secs: None, + ttl_generations: default_ttl_generations(), }, tracking_scope: TrackingScope::default(), max_tracked_keys: None, diff --git a/src/transforms/tag_cardinality_limit/mod.rs b/src/transforms/tag_cardinality_limit/mod.rs index 755a23c8d1006..3825893fefcfc 100644 --- a/src/transforms/tag_cardinality_limit/mod.rs +++ b/src/transforms/tag_cardinality_limit/mod.rs @@ -82,6 +82,32 @@ impl TagCardinalityLimit { }); } + /// Drop empty `AcceptedTagValueSet` buckets left behind by TTL eviction, + /// decrementing `tracked_keys_count` so freed slots can be reused under + /// `max_tracked_keys`. Called lazily on cap-hit paths so steady-state + /// overhead is zero. `len()` is called on every set because, for TTL + /// backends, it also drives the lazy sweep that may empty the bucket. + fn reclaim_empty_buckets(&mut self) { + let mut reclaimed = 0usize; + self.accepted_tags.retain(|_, inner| { + inner.retain(|_, set| { + if set.len() == 0 { + reclaimed += 1; + false + } else { + true + } + }); + !inner.is_empty() + }); + if reclaimed > 0 { + self.tracked_keys_count = self.tracked_keys_count.saturating_sub(reclaimed); + emit!(TagCardinalityTrackedKeys { + count: self.tracked_keys_count, + }); + } + } + /// Resolve the configuration that applies to a specific (metric, tag) pair. /// /// Per-tag entries support two modes: @@ -120,6 +146,11 @@ impl TagCardinalityLimit { let limit_exceeded_action = per_metric.config.limit_exceeded_action; let metric_value_limit = per_metric.config.value_limit; let internal_metrics = per_metric.config.internal_metrics; + // Per-metric TTL is a *full override* of the global TTL for this metric: + // unset (`None`) means "no TTL for this metric" rather than "inherit from + // global", mirroring how per-metric `value_limit` shadows the global value. + let ttl_secs = per_metric.config.ttl_secs; + let ttl_generations = per_metric.config.ttl_generations; // Per-tag entry: LimitOverride uses an explicit value_limit; Excluded opts // the tag out. All other settings are always inherited from per-metric. @@ -134,6 +165,8 @@ impl TagCardinalityLimit { limit_exceeded_action, mode: metric_mode, internal_metrics, + ttl_secs, + ttl_generations, }); } } @@ -143,6 +176,8 @@ impl TagCardinalityLimit { limit_exceeded_action, mode: metric_mode, internal_metrics, + ttl_secs, + ttl_generations, }) } @@ -205,18 +240,21 @@ impl TagCardinalityLimit { if !pair_exists { if !self.can_allocate_new_key() { - return AcceptResult::Untracked; + // TTL may have emptied buckets that still count against `max_tracked_keys`. + self.reclaim_empty_buckets(); + if !self.can_allocate_new_key() { + return AcceptResult::Untracked; + } } self.record_new_key_allocation(); } let metric_accepted_tags = self.accepted_tags.entry(metric_key_owned).or_default(); - let tag_value_set = metric_accepted_tags - .entry_ref(key) - .or_insert_with(|| AcceptedTagValueSet::new(&config.mode)); + let tag_value_set = metric_accepted_tags.entry_ref(key).or_insert_with(|| { + AcceptedTagValueSet::new(&config.mode, config.ttl_secs, config.ttl_generations) + }); if tag_value_set.contains(value) { - // Tag value has already been accepted, nothing more to do. return AcceptResult::Tracked; } @@ -238,8 +276,12 @@ impl TagCardinalityLimit { /// Checks if recording a key and value corresponding to a tag on an incoming Metric would /// exceed the cardinality limit. + /// + /// Note: takes `&mut self` because TTL-enabled backends (`TtlSet`, + /// `RollingBloom`) perform lazy sweep/rotation inside `contains`/`len`. + /// The non-TTL backends are still effectively read-only here. fn tag_limit_exceeded( - &self, + &mut self, metric_key: Option<&MetricId>, key: &str, value: &TagValueSet, @@ -248,22 +290,34 @@ impl TagCardinalityLimit { TagSettings::Excluded => return false, TagSettings::Tracked(inner) => inner, }; - match self + + if let Some(value_set) = self .accepted_tags - .get(&metric_key.cloned()) - .and_then(|metric_accepted_tags| metric_accepted_tags.get(key)) + .get_mut(&metric_key.cloned()) + .and_then(|metric_accepted_tags| metric_accepted_tags.get_mut(key)) { - // Already accepted — never exceeds. - Some(value_set) if value_set.contains(value) => false, - // Adding this value would push us at or past the configured cap. Treat a - // missing bucket as an empty set so `value_limit: 0` correctly rejects - // the first occurrence too — but only when the (metric, tag) pair would - // actually be tracked. If `max_tracked_keys` is exhausted, `record_tag_value` - // will pass the tag through unchecked and emit `TagCardinalityLimitUntracked`, - // so we must not pre-empt that path by reporting the limit as exceeded here. - Some(value_set) => value_set.len() >= resolved.value_limit, - None => resolved.value_limit == 0 && self.can_allocate_new_key(), + // Non-refreshing: `DropEvent` may still reject this event on + // a later tag; refresh happens on the accept path via + // `record_tag_value`. + return if value_set.contains_no_refresh(value) { + false + } else { + value_set.len() >= resolved.value_limit + }; } + + // Missing bucket: only `value_limit == 0` can flag the first + // sighting as exceeded; every other limit fits an empty set. + if resolved.value_limit != 0 { + return false; + } + // Mirror `record_tag_value`'s capacity view: reclaim empty TTL + // buckets first so we don't pass-through-untracked here and let + // the record path silently admit a value that should be rejected. + if !self.can_allocate_new_key() { + self.reclaim_empty_buckets(); + } + self.can_allocate_new_key() } /// Record an accepted tag value (mutation-only, no limit check). Used by the `DropEvent` @@ -292,7 +346,11 @@ impl TagCardinalityLimit { if !pair_exists { if !self.can_allocate_new_key() { - return true; + // See `try_accept_tag` for rationale. + self.reclaim_empty_buckets(); + if !self.can_allocate_new_key() { + return true; + } } self.record_new_key_allocation(); } @@ -300,7 +358,9 @@ impl TagCardinalityLimit { let metric_accepted_tags = self.accepted_tags.entry(metric_key_owned).or_default(); metric_accepted_tags .entry_ref(key) - .or_insert_with(|| AcceptedTagValueSet::new(&config.mode)) + .or_insert_with(|| { + AcceptedTagValueSet::new(&config.mode, config.ttl_secs, config.ttl_generations) + }) .insert(value.clone()); false } diff --git a/src/transforms/tag_cardinality_limit/tag_value_set.rs b/src/transforms/tag_cardinality_limit/tag_value_set.rs index 0d9bbc216db25..fda797692695d 100644 --- a/src/transforms/tag_cardinality_limit/tag_value_set.rs +++ b/src/transforms/tag_cardinality_limit/tag_value_set.rs @@ -1,8 +1,49 @@ -use std::{collections::HashSet, fmt}; +//! Storage backends for accepted tag values. +//! +//! Four variants, picked at construction time from `(Mode, ttl_secs)`: +//! +//! - `Set` — `HashSet`, no TTL. Original exact-mode behavior. +//! - `Bloom` — single `BloomFilter`, no TTL. Original probabilistic-mode behavior. +//! - `TtlSet` — `HashMap` with periodic sweep. Exact mode + TTL. +//! - `RollingBloom` — `VecDeque` of `ttl_generations` shards, lazily +//! rotated. Probabilistic mode + TTL. +//! +//! Both TTL variants use "refresh on sighting" semantics: every `contains()` hit +//! extends the value's lease, so continuously-observed values stay in the cache +//! across rotation boundaries. Eviction is lazy — driven by `insert()` and +//! `contains()` calls — so there's no background task. + +use std::{ + collections::{HashMap, HashSet, VecDeque}, + fmt, + time::{Duration, Instant}, +}; use bloomy::BloomFilter; -use crate::{event::metric::TagValueSet, transforms::tag_cardinality_limit::config::Mode}; +use crate::{ + event::metric::TagValueSet, internal_events::TagCardinalityTtlExpired, + transforms::tag_cardinality_limit::config::Mode, +}; + +/// `Instant + Duration` panics outside the platform's representable range. +/// On overflow, push the deadline `~136 years` into the future so rotation +/// schedules degrade to a stable no-op rather than panicking or churning +/// (returning `instant` here would leave `next_rotate <= now` on every call +/// and force `generations` rotations per access). +fn saturating_add(instant: Instant, duration: Duration) -> Instant { + if let Some(result) = instant.checked_add(duration) { + return result; + } + let mut fallback = Duration::from_secs(u32::MAX as u64); + while !fallback.is_zero() { + if let Some(result) = instant.checked_add(fallback) { + return result; + } + fallback = Duration::from_secs(fallback.as_secs() / 2); + } + instant +} /// Container for storing the set of accepted values for a given tag key. #[derive(Debug)] @@ -13,6 +54,8 @@ pub struct AcceptedTagValueSet { enum TagValueSetStorage { Set(HashSet), Bloom(BloomFilterStorage), + TtlSet(TtlExactStorage), + RollingBloom(RollingBloomStorage), } /// A bloom filter that tracks the number of items inserted into it. @@ -33,9 +76,12 @@ impl BloomFilterStorage { } fn insert(&mut self, value: &TagValueSet) { - // Only update the count if the value is not already in the bloom filter. - if !self.inner.contains(value) { - self.inner.insert(value); + // Write bits unconditionally so the rolling-bloom refresh path can + // not leave a value riding on another value's false-positive bits. + // Count tracks distinct first sightings only. + let was_already_present = self.inner.contains(value); + self.inner.insert(value); + if !was_already_present { self.count += 1; } } @@ -49,37 +95,275 @@ impl BloomFilterStorage { } } +/// `HashMap`-backed exact cache with per-value last-seen timestamps. +/// +/// At most once per `sweep_interval` (= `ttl / effective_generations`), +/// `retain` drops every entry whose `last_seen` is older than `ttl`. The +/// sweep runs lazily inside `insert`/`contains`/`len` — no background task. +struct TtlExactStorage { + map: HashMap, + ttl: Duration, + sweep_interval: Duration, + last_sweep: Instant, +} + +impl TtlExactStorage { + fn new(ttl: Duration, generations: u8) -> Self { + // Cap effective generations so `sweep_interval >= 1s` and + // `sweep_interval * effective == ttl`. Eviction precision is then + // `[ttl, ttl + sweep_interval)`. + let requested = generations.max(1) as u32; + let max_for_ttl = ttl.as_secs().max(1) as u32; + let effective = requested.min(max_for_ttl).max(1); + let sweep_interval = ttl / effective; + Self { + map: HashMap::new(), + ttl, + sweep_interval, + last_sweep: Instant::now(), + } + } + + fn maybe_sweep(&mut self, now: Instant) { + if now.duration_since(self.last_sweep) < self.sweep_interval { + return; + } + self.sweep(now); + } + + fn sweep(&mut self, now: Instant) { + let ttl = self.ttl; + let before = self.map.len(); + self.map + .retain(|_, last_seen| now.duration_since(*last_seen) <= ttl); + let expired = before.saturating_sub(self.map.len()) as u64; + self.last_sweep = now; + emit!(TagCardinalityTtlExpired { count: expired }); + } + + fn contains(&mut self, value: &TagValueSet) -> bool { + let now = Instant::now(); + self.maybe_sweep(now); + // Refresh lease on every sighting so continuously-seen values don't blink out. + if let Some(slot) = self.map.get_mut(value) { + *slot = now; + true + } else { + false + } + } + + /// Read-only membership check: triggers lazy sweep so the answer reflects + /// post-expiry state, but does **not** refresh the value's lease. Used in + /// `DropEvent` pre-check paths where we must not mutate cache state for + /// events that are about to be dropped. + fn contains_no_refresh(&mut self, value: &TagValueSet) -> bool { + let now = Instant::now(); + self.maybe_sweep(now); + self.map.contains_key(value) + } + + fn insert(&mut self, value: TagValueSet) { + let now = Instant::now(); + self.maybe_sweep(now); + self.map.insert(value, now); + } + + fn len(&mut self) -> usize { + let now = Instant::now(); + self.maybe_sweep(now); + self.map.len() + } +} + +/// Sliding-window bloom: `generations` shards, each a full `cache_size_per_key` +/// bloom filter. Front of the deque is the oldest shard; back is the current. +/// On rotation, the front shard is dropped and a fresh empty one is pushed at the +/// back. Membership is the OR across shards; refresh-on-sighting writes hits +/// into the current shard so hot values survive future rotations. +struct RollingBloomStorage { + shards: VecDeque, + generations: u8, + slice: Duration, + cache_size_per_key: usize, + /// Boundary at which the next rotation is due. Advances by `slice` on + /// every rotation; storing the next tick (not the last) keeps the + /// catch-up loop in `rotate_if_needed` trivial. + next_rotate: Instant, +} + +impl RollingBloomStorage { + fn new(cache_size_per_key: usize, generations: u8, ttl: Duration) -> Self { + // Cap effective generations so `slice >= 1s` and + // `slice * effective_generations == ttl`. + let requested = generations.max(1) as u32; + let max_for_ttl = ttl.as_secs().max(1) as u32; + let effective = requested.min(max_for_ttl).max(1); + let slice = ttl / effective; + let mut shards = VecDeque::with_capacity(effective as usize); + shards.push_back(BloomFilterStorage::new(cache_size_per_key)); + let now = Instant::now(); + Self { + shards, + generations: effective as u8, + slice, + cache_size_per_key, + next_rotate: saturating_add(now, slice), + } + } + + fn rotate_if_needed(&mut self, now: Instant) { + // Catch up if we've been idle for multiple slices. Capped to `generations` + // pops because every shard would have rotated out anyway. + let mut rotations = 0u8; + while now >= self.next_rotate && rotations < self.generations { + if self.shards.len() >= self.generations as usize + && let Some(dropped) = self.shards.pop_front() + { + emit!(TagCardinalityTtlExpired { + count: dropped.count() as u64, + }); + } + self.shards + .push_back(BloomFilterStorage::new(self.cache_size_per_key)); + self.next_rotate = saturating_add(self.next_rotate, self.slice); + rotations += 1; + } + // If we needed more rotations than `generations`, the whole window is + // stale — fast-forward `next_rotate` to avoid a tight catch-up the next + // call after a long idle period. + if now >= self.next_rotate { + self.next_rotate = saturating_add(now, self.slice); + } + } + + fn contains(&mut self, value: &TagValueSet) -> bool { + let now = Instant::now(); + self.rotate_if_needed(now); + // Newest -> oldest short-circuits hot values; re-seed hits into the + // newest shard so they survive the next rotation. + let found = self.shards.iter().rev().any(|s| s.contains(value)); + if found && let Some(newest) = self.shards.back_mut() { + newest.insert(value); + } + found + } + + /// Read-only membership check: triggers lazy rotation but does **not** + /// refresh the value's presence in the current shard. See + /// `TtlExactStorage::contains_no_refresh` for the rationale. + fn contains_no_refresh(&mut self, value: &TagValueSet) -> bool { + let now = Instant::now(); + self.rotate_if_needed(now); + self.shards.iter().rev().any(|s| s.contains(value)) + } + + fn insert(&mut self, value: &TagValueSet) { + let now = Instant::now(); + self.rotate_if_needed(now); + if let Some(newest) = self.shards.back_mut() { + newest.insert(value); + } + } + + /// Strict upper bound on the number of distinct values currently retained. + /// + /// Bloom shards are not enumerable, so the true union cardinality is not + /// directly computable. Summing per-shard counts never under-counts; the + /// alternative — `max` — could let distinct values spread across shards + /// silently exceed `value_limit`. See the `ttl` section of the transform + /// documentation for the over-rejection trade-off. + fn len(&mut self) -> usize { + let now = Instant::now(); + self.rotate_if_needed(now); + self.shards.iter().map(|s| s.count()).sum() + } +} + impl fmt::Debug for TagValueSetStorage { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { TagValueSetStorage::Set(set) => write!(f, "Set({set:?})"), TagValueSetStorage::Bloom(_) => write!(f, "Bloom"), + TagValueSetStorage::TtlSet(s) => { + write!(f, "TtlSet(len={}, ttl={:?})", s.map.len(), s.ttl) + } + TagValueSetStorage::RollingBloom(s) => { + write!( + f, + "RollingBloom(generations={}, slice={:?})", + s.generations, s.slice + ) + } } } } impl AcceptedTagValueSet { - pub fn new(mode: &Mode) -> Self { - let storage = match &mode { - Mode::Exact => TagValueSetStorage::Set(HashSet::new()), - Mode::Probabilistic(config) => { + /// Construct the appropriate backend from `(mode, ttl_secs, ttl_generations)`. + /// + /// When `ttl_secs` is `None` or `0`, this is identical to the pre-TTL + /// behavior — `HashSet` for exact, single `BloomFilter` for probabilistic — + /// so existing configs see zero behavioral change. + pub fn new(mode: &Mode, ttl_secs: Option, ttl_generations: u8) -> Self { + let ttl = ttl_secs.and_then(|s| (s > 0).then(|| Duration::from_secs(s))); + + let storage = match (mode, ttl) { + (Mode::Exact, None) => TagValueSetStorage::Set(HashSet::new()), + (Mode::Exact, Some(ttl)) => { + TagValueSetStorage::TtlSet(TtlExactStorage::new(ttl, ttl_generations)) + } + (Mode::Probabilistic(config), None) => { TagValueSetStorage::Bloom(BloomFilterStorage::new(config.cache_size_per_key)) } + (Mode::Probabilistic(config), Some(ttl)) => TagValueSetStorage::RollingBloom( + RollingBloomStorage::new(config.cache_size_per_key, ttl_generations, ttl), + ), }; Self { storage } } - pub fn contains(&self, value: &TagValueSet) -> bool { - match &self.storage { + /// Returns true if `value` is currently retained. + /// + /// In TTL-enabled backends this is a mutating operation: it triggers lazy + /// sweep/rotation **and refreshes the value's lease on a hit**. Use this + /// on the accept path (`try_accept_tag`, where a hit means we keep the + /// value). For read-only checks where the event might still be rejected, + /// use [`Self::contains_no_refresh`]. + pub fn contains(&mut self, value: &TagValueSet) -> bool { + match &mut self.storage { + TagValueSetStorage::Set(set) => set.contains(value), + TagValueSetStorage::Bloom(bloom) => bloom.contains(value), + TagValueSetStorage::TtlSet(s) => s.contains(value), + TagValueSetStorage::RollingBloom(s) => s.contains(value), + } + } + + /// Like [`Self::contains`] but never refreshes the value's TTL lease. + /// + /// The `DropEvent` pre-check pass uses this so that an event rejected by a + /// later tag does not silently extend the leases of earlier-checked values. + /// The semantic of TTL eviction is "what's been *accepted* in the last N + /// seconds", not "what's been *seen* in the last N seconds". + pub fn contains_no_refresh(&mut self, value: &TagValueSet) -> bool { + match &mut self.storage { TagValueSetStorage::Set(set) => set.contains(value), TagValueSetStorage::Bloom(bloom) => bloom.contains(value), + TagValueSetStorage::TtlSet(s) => s.contains_no_refresh(value), + TagValueSetStorage::RollingBloom(s) => s.contains_no_refresh(value), } } - pub fn len(&self) -> usize { - match &self.storage { + /// Number of distinct values currently retained. + /// + /// In TTL-enabled backends this also triggers lazy sweep/rotation so the + /// returned figure reflects post-expiry state. + pub fn len(&mut self) -> usize { + match &mut self.storage { TagValueSetStorage::Set(set) => set.len(), TagValueSetStorage::Bloom(bloom) => bloom.count(), + TagValueSetStorage::TtlSet(s) => s.len(), + TagValueSetStorage::RollingBloom(s) => s.len(), } } @@ -89,49 +373,340 @@ impl AcceptedTagValueSet { set.insert(value); } TagValueSetStorage::Bloom(bloom) => bloom.insert(&value), + TagValueSetStorage::TtlSet(s) => s.insert(value), + TagValueSetStorage::RollingBloom(s) => s.insert(&value), }; } + + /// Test-only accessor: true iff this set uses a TTL-enabled backend. + /// Lets tests pin backend selection without exposing the internal enum. + #[cfg(test)] + pub(crate) const fn ttl_enabled(&self) -> bool { + matches!( + self.storage, + TagValueSetStorage::TtlSet(_) | TagValueSetStorage::RollingBloom(_) + ) + } } #[cfg(test)] mod tests { use super::*; - use crate::{event::metric::TagValueSet, transforms::tag_cardinality_limit::config::Mode}; + use crate::{ + event::metric::TagValueSet, + transforms::tag_cardinality_limit::config::{BloomFilterConfig, Mode, default_cache_size}, + }; + + fn v(s: &str) -> TagValueSet { + TagValueSet::from([s.to_string()]) + } + + #[test] + fn bloom_filter_storage_count_is_idempotent_per_value() { + let mut b = BloomFilterStorage::new(default_cache_size()); + b.insert(&v("a")); + b.insert(&v("a")); + assert_eq!(b.count(), 1, "duplicate insert must not bump count"); + b.insert(&v("b")); + assert_eq!(b.count(), 2); + } + + #[test] + fn exact_no_ttl_preserves_today_behavior() { + let mut set = AcceptedTagValueSet::new(&Mode::Exact, None, 4); + assert!(!set.contains(&v("a"))); + assert_eq!(set.len(), 0); + set.insert(v("a")); + set.insert(v("b")); + assert_eq!(set.len(), 2); + assert!(set.contains(&v("a"))); + assert!(set.contains(&v("b"))); + } + + #[test] + fn bloom_no_ttl_preserves_today_behavior() { + let mode = Mode::Probabilistic(BloomFilterConfig { + cache_size_per_key: default_cache_size(), + }); + let mut set = AcceptedTagValueSet::new(&mode, None, 4); + set.insert(v("a")); + set.insert(v("a")); + assert_eq!(set.len(), 1, "duplicate insert must not bump count"); + set.insert(v("b")); + assert_eq!(set.len(), 2); + assert!(set.contains(&v("a"))); + assert!(set.contains(&v("b"))); + } + + // The storage types are exercised directly so we can drive + // `Instant`-typed parameters on private helpers; `AcceptedTagValueSet` + // itself calls `Instant::now()`, which can't be mocked cheaply. + + #[test] + fn ttl_exact_expires_values_past_ttl() { + let ttl = Duration::from_secs(60); + let mut s = TtlExactStorage::new(ttl, 4); + let t0 = Instant::now(); + s.map.insert(v("a"), t0); + s.last_sweep = t0; + s.sweep(t0 + Duration::from_secs(30)); + assert!(s.map.contains_key(&v("a")), "still alive within ttl"); + s.sweep(t0 + Duration::from_secs(90)); + assert!(!s.map.contains_key(&v("a")), "evicted past ttl"); + } + + #[test] + fn ttl_exact_refresh_on_contains_extends_lease() { + // Short sleep guarantees `Instant::now()` is strictly after `t_insert` + // on every platform. + let mut s = TtlExactStorage::new(Duration::from_secs(60), 4); + let t_insert = Instant::now(); + s.map.insert(v("a"), t_insert); + s.last_sweep = t_insert; + + std::thread::sleep(Duration::from_millis(2)); + + assert!(s.contains(&v("a"))); + let after = *s.map.get(&v("a")).expect("entry should still be present"); + assert!( + after > t_insert, + "contains() must refresh the stored Instant; was {t_insert:?}, still {after:?}" + ); + } + + #[test] + fn ttl_exact_caps_generations_when_ttl_lt_generations() { + // ttl=1s, generations=4 → effective=1, sweep_interval=1s. + let s = TtlExactStorage::new(Duration::from_secs(1), 4); + assert_eq!(s.sweep_interval, Duration::from_secs(1)); + // ttl=2s, generations=8 → effective=2, sweep_interval=1s. + let s = TtlExactStorage::new(Duration::from_secs(2), 8); + assert_eq!(s.sweep_interval, Duration::from_secs(1)); + // ttl >> generations is unaffected by the cap. + let s = TtlExactStorage::new(Duration::from_secs(3600), 4); + assert_eq!(s.sweep_interval, Duration::from_secs(900)); + } + + #[test] + fn ttl_exact_contains_no_refresh_does_not_extend_lease() { + let ttl = Duration::from_secs(60); + let mut s = TtlExactStorage::new(ttl, 4); + let t0 = Instant::now(); + s.map.insert(v("a"), t0); + s.last_sweep = t0; + assert!(s.contains_no_refresh(&v("a"))); + assert!(s.contains_no_refresh(&v("a"))); + assert_eq!( + s.map.get(&v("a")).copied(), + Some(t0), + "timestamp must remain at t0 after no-refresh checks" + ); + // Sanity: the refreshing variant must move the timestamp forward. + s.contains(&v("a")); + assert!(s.map.get(&v("a")).copied().unwrap() >= t0); + } #[test] - fn test_accepted_tag_value_set_exact() { - let mut accepted_tag_value_set = AcceptedTagValueSet::new(&Mode::Exact); + fn rolling_bloom_contains_no_refresh_does_not_seed_newest_shard() { + let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(4)); + s.shards.back_mut().unwrap().insert(&v("a")); + // Drive a rotation so "a" sits in the (now older) front shard. + let t0 = Instant::now(); + s.next_rotate = t0 + Duration::from_secs(1); + s.rotate_if_needed(t0 + Duration::from_secs(2)); + let newest_before = s.shards.back().unwrap().count(); + assert!(s.contains_no_refresh(&v("a"))); + let newest_after = s.shards.back().unwrap().count(); + assert_eq!( + newest_before, newest_after, + "contains_no_refresh must not seed the newest shard" + ); + // Sanity: the refreshing variant must seed it. + assert!(s.contains(&v("a"))); + assert!(s.shards.back().unwrap().contains(&v("a"))); + } + + #[test] + fn rolling_bloom_drops_oldest_shard_on_rotate() { + let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(4)); + let t0 = Instant::now(); + s.next_rotate = t0 + Duration::from_secs(1); + s.shards.back_mut().unwrap().insert(&v("old")); + s.rotate_if_needed(t0 + Duration::from_secs(5)); + assert_eq!(s.shards.len(), 4); + assert!( + !s.shards.iter().any(|sh| sh.contains(&v("old"))), + "'old' should have rolled out of the window" + ); + } - assert!(!accepted_tag_value_set.contains(&TagValueSet::from(["value1".to_string()]))); - assert_eq!(accepted_tag_value_set.len(), 0); + #[test] + fn rolling_bloom_refresh_on_contains_seeds_newest_shard() { + // After one rotation, `hot` lives in the front shard and the back is + // fresh-empty; `contains` must re-seed it into the newest shard. + let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(4)); + s.shards.back_mut().unwrap().insert(&v("hot")); + let t0 = Instant::now(); + s.next_rotate = t0 + Duration::from_secs(1); + s.rotate_if_needed(t0 + Duration::from_secs(2)); + + assert_eq!( + s.shards.back().unwrap().count(), + 0, + "back shard should be fresh-empty after rotation" + ); + + assert!(s.contains(&v("hot"))); + assert!( + s.shards.back().unwrap().contains(&v("hot")), + "contains() must re-seed found values into the newest shard" + ); + } - accepted_tag_value_set.insert(TagValueSet::from(["value1".to_string()])); - assert_eq!(accepted_tag_value_set.len(), 1); - assert!(accepted_tag_value_set.contains(&TagValueSet::from(["value1".to_string()]))); + #[test] + fn rolling_bloom_catch_up_capped_to_generations() { + // After a long idle gap, `rotate_if_needed` must rotate at most + // `generations` times even if elapsed covers many windows. + let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(4)); + let t0 = Instant::now(); + s.next_rotate = t0 + Duration::from_secs(1); + s.shards.back_mut().unwrap().insert(&v("stale")); + s.rotate_if_needed(t0 + Duration::from_secs(3600)); + assert_eq!(s.shards.len(), 4, "deque size capped at `generations`"); + assert!( + !s.shards.iter().any(|sh| sh.contains(&v("stale"))), + "stale value flushed after long idle" + ); + } + + #[test] + fn rolling_bloom_generations_clamped_to_at_least_one() { + // `generations: 0` would divide by zero / leave an empty deque. + let s = RollingBloomStorage::new(default_cache_size(), 0, Duration::from_secs(60)); + assert_eq!(s.generations, 1); + assert_eq!(s.shards.len(), 1); + } + + #[test] + fn rolling_bloom_caps_generations_when_ttl_lt_generations() { + // ttl=1s, generations=4 → effective=1, slice=1s. + let s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(1)); + assert_eq!(s.generations, 1, "effective generations capped to ttl"); + assert_eq!(s.slice, Duration::from_secs(1)); + // ttl=2s, generations=8 → effective=2, slice=1s. + let s = RollingBloomStorage::new(default_cache_size(), 8, Duration::from_secs(2)); + assert_eq!(s.generations, 2); + assert_eq!(s.slice, Duration::from_secs(1)); + } - accepted_tag_value_set.insert(TagValueSet::from(["value2".to_string()])); - assert_eq!(accepted_tag_value_set.len(), 2); - assert!(accepted_tag_value_set.contains(&TagValueSet::from(["value2".to_string()]))); + #[test] + fn rolling_bloom_window_matches_ttl_exactly() { + // `slice * effective_generations == ttl` must hold for every valid + // (ttl_secs, generations). + for (ttl_secs, generations) in [(1u64, 4u8), (2, 8), (3, 4), (60, 4), (3600, 4), (86400, 6)] + { + let s = RollingBloomStorage::new( + default_cache_size(), + generations, + Duration::from_secs(ttl_secs), + ); + assert_eq!( + s.slice * u32::from(s.generations), + Duration::from_secs(ttl_secs), + "ttl_secs={ttl_secs}, generations={generations}: window must equal ttl", + ); + assert!( + s.slice >= Duration::from_secs(1), + "ttl_secs={ttl_secs}, generations={generations}: slice must be >= 1s", + ); + } } #[test] - fn test_accepted_tag_value_set_probabilistic() { - let mut accepted_tag_value_set = AcceptedTagValueSet::new(&Mode::Exact); + fn rolling_bloom_len_sums_across_shards() { + // Distinct values spread across shards must contribute to `len()`; + // otherwise the union could silently exceed `value_limit`. + let mut s = RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(4)); + s.shards.clear(); + for name in ["a", "b", "c", "d"] { + let mut shard = BloomFilterStorage::new(default_cache_size()); + shard.insert(&v(name)); + s.shards.push_back(shard); + } + // Push the next rotation far out so `len()` doesn't lazily rotate. + s.next_rotate = Instant::now() + Duration::from_secs(3600); + assert_eq!( + s.len(), + 4, + "len() must sum per-shard counts to reflect the union upper bound" + ); + } - assert!(!accepted_tag_value_set.contains(&TagValueSet::from(["value1".to_string()]))); - assert_eq!(accepted_tag_value_set.len(), 0); + #[test] + fn rolling_bloom_oversized_ttl_doesnt_panic() { + let mut s = + RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(u64::MAX)); + // Exercises both `saturating_add` call sites (constructor and + // `rotate_if_needed`). + s.insert(&v("a")); + assert!(s.contains(&v("a"))); + assert_eq!(s.len(), 1); + } - accepted_tag_value_set.insert(TagValueSet::from(["value1".to_string()])); - assert_eq!(accepted_tag_value_set.len(), 1); - assert!(accepted_tag_value_set.contains(&TagValueSet::from(["value1".to_string()]))); + #[test] + fn saturating_add_overflow_pushes_deadline_far_into_future() { + // The fallback must advance `instant` by a non-trivial amount — + // returning `instant` itself would leave `next_rotate <= now` on + // every access and force `generations` rotations per call. + let now = Instant::now(); + let advanced = saturating_add(now, Duration::from_secs(u64::MAX)); + let gain = advanced.duration_since(now); + assert!( + gain >= Duration::from_secs(60 * 60 * 24 * 365), + "saturating_add must push the deadline at least a year out on \ + overflow; got {gain:?}", + ); + } - // Inserting the same value again should not increase the count. - accepted_tag_value_set.insert(TagValueSet::from(["value1".to_string()])); - assert_eq!(accepted_tag_value_set.len(), 1); - assert!(accepted_tag_value_set.contains(&TagValueSet::from(["value1".to_string()]))); + #[test] + fn rolling_bloom_overflow_does_not_churn_on_repeated_access() { + // Repeated reads with an overflowing TTL must not silently rotate + // out values inserted between them; the rotation deadline has to + // sit far enough in the future that `rotate_if_needed` is a no-op. + let mut s = + RollingBloomStorage::new(default_cache_size(), 4, Duration::from_secs(u64::MAX)); + s.insert(&v("a")); + for _ in 0..16 { + assert!(s.contains(&v("a"))); + } + assert_eq!(s.len(), 1); + } - accepted_tag_value_set.insert(TagValueSet::from(["value2".to_string()])); - assert_eq!(accepted_tag_value_set.len(), 2); - assert!(accepted_tag_value_set.contains(&TagValueSet::from(["value2".to_string()]))); + #[test] + fn rolling_bloom_len_upper_bounds_value_limit() { + // `len()` must reach `value_limit` once enough distinct values are + // admitted across the full window so `try_accept_tag` stops admitting. + let value_limit = 8usize; + let generations = 4u8; + let mut s = + RollingBloomStorage::new(default_cache_size(), generations, Duration::from_secs(4)); + s.shards.clear(); + let per_shard = value_limit / generations as usize; + let mut next = 0usize; + for _ in 0..generations { + let mut shard = BloomFilterStorage::new(default_cache_size()); + for _ in 0..per_shard { + shard.insert(&v(&format!("v{next}"))); + next += 1; + } + s.shards.push_back(shard); + } + s.next_rotate = Instant::now() + Duration::from_secs(3600); + assert!( + s.len() >= value_limit, + "len() must reach value_limit once enough distinct values are spread \ + across shards; got {} for value_limit={value_limit}", + s.len(), + ); } } diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index b8a453a3c4844..c835be970b7e2 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -18,6 +18,7 @@ use crate::{ transforms::{ tag_cardinality_limit::config::{ BloomFilterConfig, InternalMetricsConfig, Mode, default_cache_size, + default_ttl_generations, }, test::create_topology, }, @@ -46,17 +47,25 @@ fn make_metric(tags: MetricTags) -> Event { make_metric_with_name(tags, "event") } +/// Default `Inner` for tests: no TTL, default generations. Used as a base for +/// the `make_transform_*` helpers and any test that constructs `Inner` literally. +fn default_inner(value_limit: usize, action: LimitExceededAction, mode: Mode) -> Inner { + Inner { + value_limit, + limit_exceeded_action: action, + mode, + internal_metrics: InternalMetricsConfig::default(), + ttl_secs: None, + ttl_generations: default_ttl_generations(), + } +} + fn make_transform_hashset( value_limit: usize, limit_exceeded_action: LimitExceededAction, ) -> Config { Config { - global: Inner { - value_limit, - limit_exceeded_action, - mode: Mode::Exact, - internal_metrics: InternalMetricsConfig::default(), - }, + global: default_inner(value_limit, limit_exceeded_action, Mode::Exact), tracking_scope: TrackingScope::default(), max_tracked_keys: None, per_metric_limits: HashMap::new(), @@ -66,14 +75,13 @@ fn make_transform_hashset( fn make_transform_bloom(value_limit: usize, limit_exceeded_action: LimitExceededAction) -> Config { Config { - global: Inner { + global: default_inner( value_limit, limit_exceeded_action, - mode: Mode::Probabilistic(BloomFilterConfig { + Mode::Probabilistic(BloomFilterConfig { cache_size_per_key: default_cache_size(), }), - internal_metrics: InternalMetricsConfig::default(), - }, + ), tracking_scope: TrackingScope::default(), max_tracked_keys: None, per_metric_limits: HashMap::new(), @@ -87,12 +95,7 @@ fn make_transform_hashset_with_per_metric_limits( per_metric_limits: HashMap, ) -> Config { Config { - global: Inner { - value_limit, - limit_exceeded_action, - mode: Mode::Exact, - internal_metrics: InternalMetricsConfig::default(), - }, + global: default_inner(value_limit, limit_exceeded_action, Mode::Exact), tracking_scope: TrackingScope::default(), max_tracked_keys: None, per_metric_limits, @@ -106,14 +109,13 @@ fn make_transform_bloom_with_per_metric_limits( per_metric_limits: HashMap, ) -> Config { Config { - global: Inner { + global: default_inner( value_limit, limit_exceeded_action, - mode: Mode::Probabilistic(BloomFilterConfig { + Mode::Probabilistic(BloomFilterConfig { cache_size_per_key: default_cache_size(), }), - internal_metrics: InternalMetricsConfig::default(), - }, + ), tracking_scope: TrackingScope::default(), max_tracked_keys: None, per_metric_limits, @@ -128,12 +130,7 @@ fn make_transform_with_global_per_tag_limits( per_tag_limits: HashMap, ) -> Config { Config { - global: Inner { - value_limit, - limit_exceeded_action, - mode, - internal_metrics: InternalMetricsConfig::default(), - }, + global: default_inner(value_limit, limit_exceeded_action, mode), tracking_scope: TrackingScope::default(), max_tracked_keys: None, per_metric_limits: HashMap::new(), @@ -445,6 +442,8 @@ fn override_inner_hashset(value_limit: usize, action: LimitExceededAction) -> Ov limit_exceeded_action: action, mode: OverrideMode::Exact, internal_metrics: InternalMetricsConfig::default(), + ttl_secs: None, + ttl_generations: default_ttl_generations(), } } @@ -456,6 +455,8 @@ fn override_inner_bloom(value_limit: usize, action: LimitExceededAction) -> Over cache_size_per_key: default_cache_size(), }), internal_metrics: InternalMetricsConfig::default(), + ttl_secs: None, + ttl_generations: default_ttl_generations(), } } @@ -757,18 +758,15 @@ fn max_tracked_keys_unlimited_by_default() { } } -/// Regression: with `value_limit: 0`, `limit_exceeded_action: drop_event`, and -/// `max_tracked_keys` exhausted, the documented untracked-passthrough behavior must -/// still apply. `tag_limit_exceeded` previously rejected *any* missing-bucket lookup -/// when `value_limit == 0`, causing events to be dropped before `record_tag_value` -/// could detect the allocation cap. New (metric, tag-key) pairs that cannot be -/// allocated must instead pass through unchecked. +/// With `value_limit: 0`, `limit_exceeded_action: drop_event`, and +/// `max_tracked_keys` exhausted, new (metric, tag-key) pairs that cannot be +/// allocated must pass through unchecked rather than dropping the event. #[test] fn max_tracked_keys_passthrough_with_zero_value_limit_drop_event() { - // metric_a uses a normal per-metric override with room for a value, so the first - // event reserves the only allocation slot. metric_b inherits the global config, - // which combines `value_limit: 0` + `drop_event` — the corner case that originally - // dropped events even when the pair couldn't be tracked. + // metric_a uses a per-metric override with room for a value, so the first + // event reserves the only allocation slot. metric_b inherits the global + // config — `value_limit: 0` + `drop_event` — which exercises the + // missing-bucket passthrough path under cap exhaustion. let config = make_transform_hashset_with_per_metric_limits( 0, LimitExceededAction::DropEvent, @@ -866,6 +864,8 @@ fn make_per_metric( limit_exceeded_action: action, mode: OverrideMode::Exact, internal_metrics: InternalMetricsConfig::default(), + ttl_secs: None, + ttl_generations: default_ttl_generations(), }, } } @@ -879,6 +879,8 @@ fn make_per_metric_excluded(per_tag_limits: HashMap) -> Pe limit_exceeded_action: LimitExceededAction::DropTag, mode: OverrideMode::Excluded, internal_metrics: InternalMetricsConfig::default(), + ttl_secs: None, + ttl_generations: default_ttl_generations(), }, } } @@ -1388,12 +1390,7 @@ fn global_per_tag_limit_override_caps_at_explicit_value() { #[test] fn global_per_tag_overridden_by_per_metric_entry() { let config = Config { - global: Inner { - value_limit: 2, - limit_exceeded_action: LimitExceededAction::DropTag, - mode: Mode::Exact, - internal_metrics: InternalMetricsConfig::default(), - }, + global: default_inner(2, LimitExceededAction::DropTag, Mode::Exact), tracking_scope: TrackingScope::default(), max_tracked_keys: None, per_metric_limits: HashMap::from([( @@ -1439,6 +1436,236 @@ fn global_per_tag_overridden_by_per_metric_entry() { } } +// Transform-level TTL coverage focuses on the *config surface* (defaults, +// deserialization, the public `contains_no_refresh` contract). The behavioral +// TTL tests, where we need to drive `Instant`s, live in `tag_value_set.rs`. + +#[test] +fn ttl_defaults_off() { + let cfg = make_transform_hashset(2, LimitExceededAction::DropTag); + assert!( + cfg.global.ttl_secs.is_none(), + "default config must not enable TTL" + ); + assert_eq!( + cfg.global.ttl_generations, + default_ttl_generations(), + "default generations should match the documented default" + ); +} + +#[test] +fn ttl_global_yaml_deserializes() { + let yaml = r#" +value_limit: 5 +mode: exact +ttl_secs: 3600 +ttl_generations: 6 +"#; + let parsed: Config = serde_yaml::from_str(yaml).expect("yaml should deserialize"); + assert_eq!(parsed.global.ttl_secs, Some(3600)); + assert_eq!(parsed.global.ttl_generations, 6); +} + +#[test] +fn ttl_per_metric_yaml_deserializes() { + let yaml = r#" +value_limit: 5 +mode: exact +ttl_secs: 3600 +per_metric_limits: + hot_metric: + mode: probabilistic + cache_size_per_key: 1024 + value_limit: 100 + ttl_secs: 600 + ttl_generations: 3 +"#; + let parsed: Config = serde_yaml::from_str(yaml).expect("yaml should deserialize"); + assert_eq!(parsed.global.ttl_secs, Some(3600)); + let pm = parsed.per_metric_limits.get("hot_metric").unwrap(); + assert_eq!(pm.config.ttl_secs, Some(600)); + assert_eq!(pm.config.ttl_generations, 3); +} + +/// Pins the basic contract of `contains_no_refresh`: it must return `true` +/// for a value that was just inserted, across every backend variant. The +/// "no-refresh" timing semantic (the actual *DropEvent* contract) is verified +/// in `tag_value_set.rs::tests::{ttl_exact,rolling_bloom}_contains_no_refresh_*`, +/// where the `Instant`-driven storage methods can be exercised directly. +/// +/// Note: this test does NOT verify that `tag_limit_exceeded` calls +/// `contains_no_refresh` (and not `contains`) — that wiring is enforced by +/// code review of the (private) match arm in `mod.rs::tag_limit_exceeded`. +#[test] +fn contains_no_refresh_finds_inserted_values_on_all_backends() { + use super::tag_value_set::AcceptedTagValueSet; + use crate::event::metric::TagValueSet; + + let v1 = TagValueSet::from(["v1".to_string()]); + let bloom_mode = Mode::Probabilistic(BloomFilterConfig { + cache_size_per_key: default_cache_size(), + }); + + for (label, mut set) in [ + ( + "exact no-ttl", + AcceptedTagValueSet::new(&Mode::Exact, None, 4), + ), + ( + "bloom no-ttl", + AcceptedTagValueSet::new(&bloom_mode, None, 4), + ), + ( + "exact ttl", + AcceptedTagValueSet::new(&Mode::Exact, Some(60), 4), + ), + ( + "bloom ttl", + AcceptedTagValueSet::new(&bloom_mode, Some(60), 4), + ), + ] { + set.insert(v1.clone()); + assert!( + set.contains_no_refresh(&v1), + "{label}: should find inserted value" + ); + } +} + +/// `ttl_secs: 0` must select the **non-TTL** backend (same as `None`). If we +/// ever flipped this to "expire immediately" — i.e. a TTL backend with +/// `Duration::ZERO` — the 1-second `sweep_interval` floor would mask the bug +/// in any externally-observable behavior, but the cache would still degrade +/// silently the moment a sweep boundary was crossed. +#[test] +fn ttl_zero_disables_ttl() { + use super::tag_value_set::AcceptedTagValueSet; + + let bloom_mode = Mode::Probabilistic(BloomFilterConfig { + cache_size_per_key: default_cache_size(), + }); + for (label, set) in [ + ( + "exact ttl=0", + AcceptedTagValueSet::new(&Mode::Exact, Some(0), 4), + ), + ( + "bloom ttl=0", + AcceptedTagValueSet::new(&bloom_mode, Some(0), 4), + ), + ( + "exact ttl=None", + AcceptedTagValueSet::new(&Mode::Exact, None, 4), + ), + ( + "bloom ttl=None", + AcceptedTagValueSet::new(&bloom_mode, None, 4), + ), + ] { + assert!( + !set.ttl_enabled(), + "{label}: must select the non-TTL backend" + ); + } + + // Sanity: TTL with a positive value DOES select the TTL backend. + let ttl_set = AcceptedTagValueSet::new(&Mode::Exact, Some(60), 4); + assert!(ttl_set.ttl_enabled(), "ttl=Some(60) should enable TTL"); +} + +#[test] +fn ttl_existing_yaml_unchanged() { + // A pre-TTL config must continue to parse without any TTL fields and + // produce ttl_secs=None — that's the backwards-compat contract. + let yaml = r#" +value_limit: 5 +mode: probabilistic +cache_size_per_key: 2048 +"#; + let parsed: Config = serde_yaml::from_str(yaml).expect("yaml should deserialize"); + assert!(parsed.global.ttl_secs.is_none()); + assert_eq!(parsed.global.ttl_generations, default_ttl_generations()); +} + +/// Once `max_tracked_keys` is reached, buckets emptied by TTL eviction are +/// reclaimed so new `(metric, tag-key)` pairs can still be tracked. +#[test] +fn max_tracked_keys_reclaims_empty_buckets() { + use super::tag_value_set::AcceptedTagValueSet; + + let mut config = make_transform_hashset(10, LimitExceededAction::DropTag); + config.max_tracked_keys = Some(1); + let mut transform = TagCardinalityLimit::new(config); + + // Pre-seed an empty bucket so the transform sees its slot as used. + let metric_key: Option = None; + let stale_set = AcceptedTagValueSet::new(&Mode::Exact, None, default_ttl_generations()); + let mut inner = hashbrown::HashMap::new(); + inner.insert("stale_tag".to_string(), stale_set); + transform.accepted_tags.insert(metric_key.clone(), inner); + transform.tracked_keys_count = 1; + + assert!( + !transform.can_allocate_new_key(), + "should be at cap before reclaim" + ); + + let value = TagValueSet::from(["v".to_string()]); + let result = transform.try_accept_tag(metric_key.as_ref(), "fresh_tag", &value); + assert_eq!( + result, + AcceptResult::Tracked, + "fresh tag must be tracked after the stale bucket is reclaimed" + ); + let inner_after = transform.accepted_tags.get(&metric_key).unwrap(); + assert!( + inner_after.contains_key("fresh_tag"), + "fresh_tag must be present after reclaim" + ); + assert!( + !inner_after.contains_key("stale_tag"), + "stale_tag bucket must be dropped by reclaim" + ); + assert_eq!( + transform.tracked_keys_count, 1, + "tracked_keys_count must reflect the swap (1 dropped + 1 added)" + ); +} + +/// With `value_limit: 0` + `drop_event` + `max_tracked_keys` full but with a +/// reclaimable empty bucket, `tag_limit_exceeded` must reclaim before deciding +/// — otherwise the pre-check returns "not exceeded" (cap appears full → fall +/// through to untracked passthrough) while `record_tag_value` then reclaims +/// and admits a value that should have been rejected. +#[test] +fn tag_limit_exceeded_reclaims_before_value_limit_zero_check() { + use super::tag_value_set::AcceptedTagValueSet; + + let mut config = make_transform_hashset(0, LimitExceededAction::DropEvent); + config.max_tracked_keys = Some(1); + let mut transform = TagCardinalityLimit::new(config); + + let metric_key: Option = None; + let stale_set = AcceptedTagValueSet::new(&Mode::Exact, None, default_ttl_generations()); + let mut inner = hashbrown::HashMap::new(); + inner.insert("stale_tag".to_string(), stale_set); + transform.accepted_tags.insert(metric_key.clone(), inner); + transform.tracked_keys_count = 1; + + assert!( + !transform.can_allocate_new_key(), + "should be at cap before reclaim" + ); + + let value = TagValueSet::from(["v".to_string()]); + assert!( + transform.tag_limit_exceeded(metric_key.as_ref(), "fresh_tag", &value), + "value_limit: 0 with a reclaimable empty bucket must flag the new \ + tag as exceeded; otherwise record_tag_value would silently admit it" + ); +} + /// Global per-tag YAML syntax mirrors per-metric `per_tag_limits`: `mode: limit_override` /// with `value_limit`, and `mode: excluded`. #[test] diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index 1e990965638c6..e29137894f8c1 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -931,6 +931,18 @@ components: sources: internal_metrics: { default_namespace: "vector" tags: _component_tags } + tag_cardinality_ttl_expirations_total: { + description: """ + The total number of distinct tag values expired by the `tag_cardinality_limit` + transform's TTL eviction (only emitted when `ttl_secs` is configured). In + `exact` mode this is the number of values whose last sighting was older than + `ttl_secs`; in `probabilistic` mode this is the count drained from the oldest + rolling-bloom shard at each rotation. + """ + type: "counter" + default_namespace: "vector" + tags: _component_tags + } tag_value_limit_exceeded_total: { description: """ The total number of events discarded because the tag has been rejected after diff --git a/website/cue/reference/components/transforms/tag_cardinality_limit.cue b/website/cue/reference/components/transforms/tag_cardinality_limit.cue index 91cf2962185b4..b95a7a507787d 100644 --- a/website/cue/reference/components/transforms/tag_cardinality_limit.cue +++ b/website/cue/reference/components/transforms/tag_cardinality_limit.cue @@ -165,6 +165,69 @@ components: transforms: tag_cardinality_limit: { """ } + ttl: { + title: "TTL (sliding-window cardinality)" + body: """ + By default, the cardinality cache grows monotonically — every distinct value + ever seen for a tag occupies a slot under `value_limit` until Vector restarts. + Setting `ttl_secs` turns the cache into a *sliding window*: any tag value not + observed within that many seconds is dropped, freeing room for fresh values. + + This is useful when the downstream system bills or pages on a rolling + unique-series window (e.g. Datadog computes custom-metric cardinality on a + 1-hour p95). A monotonic cache will eventually saturate at `value_limit` and + start rejecting legitimate new values long after the old ones have aged out + of the billing window. + + ```yaml + type: tag_cardinality_limit + value_limit: 500 + mode: probabilistic + cache_size_per_key: 5120 + ttl_secs: 3600 # match the Datadog billing window + ttl_generations: 4 # eviction granularity = 15 min + ``` + + **Refresh-on-sighting**: every cache hit (not just inserts) extends the + value's lease. Continuously-observed values stay in the cache indefinitely; + only values that go silent for longer than `ttl_secs` are evicted. + + **Mode interaction**: + + - `mode: exact` — every value carries a precise last-seen timestamp. + Eviction is exact to within roughly `ttl_secs / ttl_generations`. + `ttl_generations` controls only the sweep cadence in exact mode. + - `mode: probabilistic` — the underlying bloom filter is split into + `ttl_generations` rolling shards. Memory cost rises to + `ttl_generations * cache_size_per_key` per (metric, tag-key) pair. + Reduce `cache_size_per_key` if you want to keep total memory flat. + `ttl_generations: 1` produces a tumbling window (everything resets + at once every `ttl_secs`), which can be useful for matching a strict + billing-window boundary. The `value_limit` cap is enforced against an + upper-bound estimate of the union cardinality across shards. Under + refresh-on-sighting, hot continuously-seen values may be counted in + more than one shard, so the effective cap can be as low as + `value_limit / ttl_generations` for refresh-heavy workloads. If this + shows up as elevated `tag_value_limit_exceeded_total` without a + matching rise in distinct admitted values, set `ttl_generations: 1`. + + **`ttl_secs` shorter than `ttl_generations`**: the effective number of + generations is silently capped to `ttl_secs` so the configured TTL + window is honored exactly. For example `ttl_secs: 2, ttl_generations: 8` + resolves to 2 generations of 1-second slices (not 8 generations of + 1-second slices, which would stretch the window to 8 s). + + **Per-metric overrides do not inherit**: setting `ttl_secs` inside a + `per_metric_limits.` block (or leaving it unset there) fully + replaces the global TTL for that metric — it does not fall back to + the top-level `ttl_secs`. This mirrors the precedence rules for + `value_limit`. To share the global TTL on a specific metric, copy the + value explicitly. + + **Restarts still reset the cache** — see [restarts](#restarts). + """ + } + per_tag_limits: { title: "Per-tag overrides" body: """ @@ -223,7 +286,8 @@ components: transforms: tag_cardinality_limit: { } telemetry: metrics: { - tag_value_limit_exceeded_total: components.sources.internal_metrics.output.metrics.tag_value_limit_exceeded_total - value_limit_reached_total: components.sources.internal_metrics.output.metrics.value_limit_reached_total + tag_cardinality_ttl_expirations_total: components.sources.internal_metrics.output.metrics.tag_cardinality_ttl_expirations_total + tag_value_limit_exceeded_total: components.sources.internal_metrics.output.metrics.tag_value_limit_exceeded_total + value_limit_reached_total: components.sources.internal_metrics.output.metrics.value_limit_reached_total } }