Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog.d/tag_cardinality_limit_ttl.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
The `tag_cardinality_limit` transform gained an optional sliding-window TTL
for tracked tag values. Two new settings — `ttl_secs` and `ttl_generations` —
are available on the global block and each `per_metric_limits` entry; values
not observed within `ttl_secs` are evicted, freeing room under `value_limit`.
A new internal counter `tag_cardinality_ttl_expirations_total` reports the
number of values evicted by TTL. See the transform documentation for
configuration details and mode-specific behavior.

authors: kaarolch
2 changes: 2 additions & 0 deletions lib/vector-common/src/internal_event/metric_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub enum CounterName {
StaleEventsFlushedTotal,
StartedTotal,
StoppedTotal,
TagCardinalityTtlExpirationsTotal,
TagCardinalityUntrackedEventsTotal,
TagValueLimitExceededTotal,
ValueLimitReachedTotal,
Expand Down Expand Up @@ -335,6 +336,7 @@ impl CounterName {
Self::StaleEventsFlushedTotal => "stale_events_flushed_total",
Self::StartedTotal => "started_total",
Self::StoppedTotal => "stopped_total",
Self::TagCardinalityTtlExpirationsTotal => "tag_cardinality_ttl_expirations_total",
Self::TagCardinalityUntrackedEventsTotal => "tag_cardinality_untracked_events_total",
Self::TagValueLimitExceededTotal => "tag_value_limit_exceeded_total",
Self::ValueLimitReachedTotal => "value_limit_reached_total",
Expand Down
24 changes: 24 additions & 0 deletions src/internal_events/tag_cardinality_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,27 @@ impl InternalEvent for TagCardinalityTrackedKeys {
gauge!(GaugeName::TagCardinalityTrackedKeys).set(self.count as f64);
}
}

/// Emitted when a TTL sweep removes tag values from a tracking bucket.
///
/// The `count` is the number of *distinct* values evicted in that pass.
/// For the probabilistic backend this is the count drained from the oldest
/// rolling-bloom shard; for the exact backend it is the number of entries
/// whose last sighting was older than `ttl_secs`.
#[derive(NamedInternalEvent)]
pub struct TagCardinalityTtlExpired {
pub count: u64,
}

impl InternalEvent for TagCardinalityTtlExpired {
fn emit(self) {
if self.count == 0 {
return;
}
debug!(
message = "Expired tag values from cardinality cache.",
count = self.count,
);
counter!(CounterName::TagCardinalityTtlExpirationsTotal).increment(self.count);
}
}
55 changes: 55 additions & 0 deletions src/transforms/tag_cardinality_limit/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,35 @@ pub struct Inner {
#[configurable(derived)]
#[serde(default)]
pub internal_metrics: InternalMetricsConfig,

/// Expire tracked tag values after this many seconds since they were last seen.
///
/// When unset (default) or set to `0`, values persist for the lifetime of the
/// process — the historical behavior. When set to a positive value, the
/// transform behaves like a sliding window: any tag value not observed within
/// the TTL is dropped, freeing room under `value_limit` for fresh values.
/// Useful for bounding cost on backends (e.g. Datadog custom metrics) that
/// bill on a rolling unique-series window.
///
/// In `exact` mode every value carries a precise last-seen timestamp; in
/// `probabilistic` mode the underlying bloom filter is split into
/// `ttl_generations` rolling shards, so eviction is approximate to within
/// `ttl_secs / ttl_generations`.
#[serde(default)]
#[configurable(metadata(docs::human_name = "TTL (seconds)"))]
pub ttl_secs: Option<u64>,

/// Number of time-slices the TTL window is split into for the
/// `probabilistic` backend.
///
/// Higher values smooth eviction (closer to a true sliding window) at the
/// cost of `ttl_generations * cache_size_per_key` memory per (metric,
/// tag-key) pair. `1` produces a tumbling window: all tracked values are
/// dropped at once every `ttl_secs`. Ignored when `ttl_secs` is unset, or
/// when mode is `exact` (which uses precise per-value timestamps).
#[serde(default = "default_ttl_generations")]
#[configurable(metadata(docs::human_name = "TTL Generations"))]
pub ttl_generations: u8,
}

/// Controls the approach taken for tracking tag cardinality at the global level.
Expand Down Expand Up @@ -169,6 +198,23 @@ pub struct OverrideInner {
#[configurable(derived)]
#[serde(default)]
pub internal_metrics: InternalMetricsConfig,

/// Per-metric TTL for tracked tag values. See [`Inner::ttl_secs`] for the
/// full description.
///
/// Per-metric TTL is a **full override** of the global TTL — it does not
/// inherit. Leaving this unset means "no TTL for this metric", *not*
/// "fall back to the global `ttl_secs`". This mirrors how a per-metric
/// `value_limit` fully shadows the global one. If you want a metric to
/// share the global TTL, copy the value explicitly.
#[serde(default)]
#[configurable(metadata(docs::human_name = "TTL (seconds)"))]
pub ttl_secs: Option<u64>,

/// Per-metric override for `ttl_generations`. See [`Inner::ttl_generations`].
#[serde(default = "default_ttl_generations")]
#[configurable(metadata(docs::human_name = "TTL Generations"))]
pub ttl_generations: u8,
}

/// Controls the approach taken for tracking tag cardinality at the per-metric level.
Expand Down Expand Up @@ -308,6 +354,13 @@ pub(crate) const fn default_cache_size() -> usize {
5 * 1024 // 5KB
}

/// Default number of rolling-bloom shards. Four gives a reasonable middle ground:
/// eviction granularity of `ttl/4`, and a 4x memory multiplier on
/// `cache_size_per_key` for users who opt into TTL.
pub(crate) const fn default_ttl_generations() -> u8 {
4
}

// =============================================================================
// Transform plumbing
// =============================================================================
Expand All @@ -320,6 +373,8 @@ impl GenerateConfig for Config {
value_limit: default_value_limit(),
limit_exceeded_action: default_limit_exceeded_action(),
internal_metrics: InternalMetricsConfig::default(),
ttl_secs: None,
ttl_generations: default_ttl_generations(),
},
tracking_scope: TrackingScope::default(),
max_tracked_keys: None,
Expand Down
102 changes: 81 additions & 21 deletions src/transforms/tag_cardinality_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,32 @@ impl TagCardinalityLimit {
});
}

/// Drop empty `AcceptedTagValueSet` buckets left behind by TTL eviction,
/// decrementing `tracked_keys_count` so freed slots can be reused under
/// `max_tracked_keys`. Called lazily on cap-hit paths so steady-state
/// overhead is zero. `len()` is called on every set because, for TTL
/// backends, it also drives the lazy sweep that may empty the bucket.
fn reclaim_empty_buckets(&mut self) {
let mut reclaimed = 0usize;
self.accepted_tags.retain(|_, inner| {
inner.retain(|_, set| {
if set.len() == 0 {
reclaimed += 1;
false
} else {
true
}
});
!inner.is_empty()
});
if reclaimed > 0 {
self.tracked_keys_count = self.tracked_keys_count.saturating_sub(reclaimed);
emit!(TagCardinalityTrackedKeys {
count: self.tracked_keys_count,
});
}
}

/// Resolve the configuration that applies to a specific (metric, tag) pair.
///
/// Per-tag entries support two modes:
Expand Down Expand Up @@ -120,6 +146,11 @@ impl TagCardinalityLimit {
let limit_exceeded_action = per_metric.config.limit_exceeded_action;
let metric_value_limit = per_metric.config.value_limit;
let internal_metrics = per_metric.config.internal_metrics;
// Per-metric TTL is a *full override* of the global TTL for this metric:
// unset (`None`) means "no TTL for this metric" rather than "inherit from
// global", mirroring how per-metric `value_limit` shadows the global value.
let ttl_secs = per_metric.config.ttl_secs;
let ttl_generations = per_metric.config.ttl_generations;

// Per-tag entry: LimitOverride uses an explicit value_limit; Excluded opts
// the tag out. All other settings are always inherited from per-metric.
Expand All @@ -134,6 +165,8 @@ impl TagCardinalityLimit {
limit_exceeded_action,
mode: metric_mode,
internal_metrics,
ttl_secs,
ttl_generations,
});
}
}
Expand All @@ -143,6 +176,8 @@ impl TagCardinalityLimit {
limit_exceeded_action,
mode: metric_mode,
internal_metrics,
ttl_secs,
ttl_generations,
})
}

Expand Down Expand Up @@ -205,18 +240,21 @@ impl TagCardinalityLimit {

if !pair_exists {
if !self.can_allocate_new_key() {
return AcceptResult::Untracked;
// TTL may have emptied buckets that still count against `max_tracked_keys`.
self.reclaim_empty_buckets();
if !self.can_allocate_new_key() {
return AcceptResult::Untracked;
}
}
self.record_new_key_allocation();
}

let metric_accepted_tags = self.accepted_tags.entry(metric_key_owned).or_default();
let tag_value_set = metric_accepted_tags
.entry_ref(key)
.or_insert_with(|| AcceptedTagValueSet::new(&config.mode));
let tag_value_set = metric_accepted_tags.entry_ref(key).or_insert_with(|| {
AcceptedTagValueSet::new(&config.mode, config.ttl_secs, config.ttl_generations)
});

if tag_value_set.contains(value) {
// Tag value has already been accepted, nothing more to do.
return AcceptResult::Tracked;
}

Expand All @@ -238,8 +276,12 @@ impl TagCardinalityLimit {

/// Checks if recording a key and value corresponding to a tag on an incoming Metric would
/// exceed the cardinality limit.
///
/// Note: takes `&mut self` because TTL-enabled backends (`TtlSet`,
/// `RollingBloom`) perform lazy sweep/rotation inside `contains`/`len`.
/// The non-TTL backends are still effectively read-only here.
fn tag_limit_exceeded(
&self,
&mut self,
metric_key: Option<&MetricId>,
key: &str,
value: &TagValueSet,
Expand All @@ -248,22 +290,34 @@ impl TagCardinalityLimit {
TagSettings::Excluded => return false,
TagSettings::Tracked(inner) => inner,
};
match self

if let Some(value_set) = self
.accepted_tags
.get(&metric_key.cloned())
.and_then(|metric_accepted_tags| metric_accepted_tags.get(key))
.get_mut(&metric_key.cloned())
.and_then(|metric_accepted_tags| metric_accepted_tags.get_mut(key))
{
// Already accepted — never exceeds.
Some(value_set) if value_set.contains(value) => false,
// Adding this value would push us at or past the configured cap. Treat a
// missing bucket as an empty set so `value_limit: 0` correctly rejects
// the first occurrence too — but only when the (metric, tag) pair would
// actually be tracked. If `max_tracked_keys` is exhausted, `record_tag_value`
// will pass the tag through unchecked and emit `TagCardinalityLimitUntracked`,
// so we must not pre-empt that path by reporting the limit as exceeded here.
Some(value_set) => value_set.len() >= resolved.value_limit,
None => resolved.value_limit == 0 && self.can_allocate_new_key(),
// Non-refreshing: `DropEvent` may still reject this event on
// a later tag; refresh happens on the accept path via
// `record_tag_value`.
return if value_set.contains_no_refresh(value) {
false
} else {
value_set.len() >= resolved.value_limit
};
}

// Missing bucket: only `value_limit == 0` can flag the first
// sighting as exceeded; every other limit fits an empty set.
if resolved.value_limit != 0 {
return false;
}
// Mirror `record_tag_value`'s capacity view: reclaim empty TTL
// buckets first so we don't pass-through-untracked here and let
// the record path silently admit a value that should be rejected.
if !self.can_allocate_new_key() {
self.reclaim_empty_buckets();
}
self.can_allocate_new_key()
}

/// Record an accepted tag value (mutation-only, no limit check). Used by the `DropEvent`
Expand Down Expand Up @@ -292,15 +346,21 @@ impl TagCardinalityLimit {

if !pair_exists {
if !self.can_allocate_new_key() {
return true;
// See `try_accept_tag` for rationale.
self.reclaim_empty_buckets();
if !self.can_allocate_new_key() {
return true;
}
}
self.record_new_key_allocation();
}

let metric_accepted_tags = self.accepted_tags.entry(metric_key_owned).or_default();
metric_accepted_tags
.entry_ref(key)
.or_insert_with(|| AcceptedTagValueSet::new(&config.mode))
.or_insert_with(|| {
AcceptedTagValueSet::new(&config.mode, config.ttl_secs, config.ttl_generations)
})
.insert(value.clone());
false
}
Expand Down
Loading
Loading