Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Adds a per-tag `cache_size_per_key` option to configuration options in probabilistic mode. Previously, per-tag overrides always inherited the bloom filter cache size from the enclosing config, which could cause a higher false positive rate when the per-tag `value_limit` is higher than the global or per-metric `value_limit`. When omitted, the cache size value from the enclosing config is used. Only valid in `probabilistic` mode — using it in `exact` mode will cause a configuration error.

Comment thread
pront marked this conversation as resolved.
authors: ArunPiduguDD
75 changes: 68 additions & 7 deletions src/transforms/tag_cardinality_limit/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;

use snafu::Snafu;
use vector_lib::configurable::configurable_component;

use crate::{
Expand Down Expand Up @@ -142,8 +143,9 @@ pub struct PerMetricConfig {
/// - `mode: limit_override` + `value_limit: N` — track with a per-tag cap.
/// - `mode: excluded` — opt this tag out of tracking entirely.
///
/// All other settings (tracking algorithm, `limit_exceeded_action`, etc.)
/// are inherited from the enclosing per-metric configuration.
/// All other settings (tracking algorithm, `limit_exceeded_action`, etc.)
/// are inherited from the enclosing per-metric configuration, except
/// `cache_size_per_key`, which can be overridden per tag in probabilistic mode.
/// Tags not listed here use the per-metric configuration.
#[configurable(
derived,
Expand Down Expand Up @@ -223,6 +225,10 @@ impl OverrideMode {
/// environment:
/// mode: limit_override # track with a per-tag cap
/// value_limit: 3
/// high_cardinality_tag:
/// mode: limit_override
/// value_limit: 1000
/// cache_size_per_key: 102400 # larger bloom filter for this tag
/// trace_id:
/// mode: excluded # opt out of tracking entirely
/// ```
Expand All @@ -236,19 +242,25 @@ pub struct PerTagConfig {

/// Mode applied to a specific tag key within a per-metric override.
///
/// The tracking algorithm (`exact`/`probabilistic`), `cache_size_per_key`,
/// `limit_exceeded_action`, and `internal_metrics` are always inherited from the
/// enclosing per-metric configuration.
/// The tracking algorithm (`exact`/`probabilistic`), `limit_exceeded_action`, and
/// `internal_metrics` are inherited from the enclosing per-metric (or global) configuration.
/// `cache_size_per_key` may optionally be overridden per tag when probabilistic mode is in use.
#[configurable_component]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[serde(tag = "mode", rename_all = "snake_case", deny_unknown_fields)]
#[configurable(metadata(docs::enum_tag_description = "Controls how this tag key is handled."))]
pub enum PerTagMode {
/// Track this tag with a per-tag value limit. The enclosing per-metric tracking
/// algorithm and all other settings still apply.
/// Track this tag with a per-tag value limit. All other settings are inherited from
/// the enclosing config.
LimitOverride {
/// Maximum number of distinct values to accept for this tag key.
value_limit: usize,

/// Override the bloom filter cache size for this specific tag key.
/// Only valid in `probabilistic` mode; setting this in `exact` mode is a configuration error.
/// Inherits from the enclosing config when unset.
#[serde(default)]
cache_size_per_key: Option<usize>,
},
/// Opt this tag out of cardinality tracking entirely. All values pass through
/// without being recorded or checked against any `value_limit`.
Expand Down Expand Up @@ -341,10 +353,59 @@ impl GenerateConfig for Config {
}
}

#[derive(Debug, Snafu)]
pub enum BuildError {
#[snafu(display(
"cache_size_per_key set on per-tag entry `{tag_key}` but the inherited mode is not \
`probabilistic`, where it has no effect. Remove the field or switch to `probabilistic` mode."
))]
CacheSizeRequiresProbabilistic { tag_key: String },
}

impl Config {
fn validate(&self) -> crate::Result<()> {
// Global per_tag_limits: cache_size_per_key only applies in probabilistic mode.
if !matches!(self.global.mode, Mode::Probabilistic(_)) {
for (tag_key, tag_cfg) in &self.per_tag_limits {
if let PerTagMode::LimitOverride {
cache_size_per_key: Some(_),
..
} = tag_cfg.mode
{
return Err(Box::new(BuildError::CacheSizeRequiresProbabilistic {
tag_key: tag_key.clone(),
}));
}
}
}

// Per-metric per_tag_limits: cache_size_per_key only applies when the per-metric
// mode is probabilistic.
for per_metric in self.per_metric_limits.values() {
if !matches!(per_metric.config.mode, OverrideMode::Probabilistic(_)) {
for (tag_key, tag_cfg) in &per_metric.per_tag_limits {
if let PerTagMode::LimitOverride {
cache_size_per_key: Some(_),
..
} = tag_cfg.mode
{
return Err(Box::new(BuildError::CacheSizeRequiresProbabilistic {
tag_key: tag_key.clone(),
}));
}
}
}
}

Ok(())
}
}

#[async_trait::async_trait]
#[typetag::serde(name = "tag_cardinality_limit")]
impl TransformConfig for Config {
async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
self.validate()?;
Ok(Transform::event_task(TagCardinalityLimit::new(
self.clone(),
)))
Expand Down
39 changes: 28 additions & 11 deletions src/transforms/tag_cardinality_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ use crate::event::metric::TagValueSet;

type MetricId = (Option<String>, String);

/// Replaces the bloom filter size in a `Probabilistic` mode with the override. No-op when
/// `override_size` is `None`.
const fn apply_cache_size_override(mode: Mode, override_size: Option<usize>) -> Mode {
match (mode, override_size) {
(Mode::Probabilistic(_), Some(size)) => Mode::Probabilistic(BloomFilterConfig {
cache_size_per_key: size,
}),
_ => mode,
}
}

/// Outcome of applying tag cardinality tracking to a tag value.
#[derive(Debug, Eq, PartialEq)]
enum AcceptResult {
Expand Down Expand Up @@ -85,10 +96,10 @@ impl TagCardinalityLimit {
/// Resolve the configuration that applies to a specific (metric, tag) pair.
///
/// Per-tag entries support two modes:
/// - `mode: limit_override` — uses the per-tag `value_limit`; all other settings
/// (`mode`, `cache_size_per_key`, `limit_exceeded_action`, `internal_metrics`)
/// are inherited from the enclosing per-metric (or, for global overrides, the
/// global) config.
/// - `mode: limit_override` — uses the per-tag `value_limit` and an optional per-tag
/// `cache_size_per_key`; all other settings (`mode`, `limit_exceeded_action`,
/// `internal_metrics`) are inherited from the enclosing per-metric (or, for global
/// overrides, the global) config.
/// - `mode: excluded` — opts the tag out entirely; all values pass through.
///
/// Per-metric exclusion is blanket: `mode: excluded` on a per-metric entry opts out
Expand Down Expand Up @@ -121,18 +132,20 @@ impl TagCardinalityLimit {
let metric_value_limit = per_metric.config.value_limit;
let internal_metrics = per_metric.config.internal_metrics;

// Per-tag entry: LimitOverride uses an explicit value_limit; Excluded opts
// the tag out. All other settings are always inherited from per-metric.
// Per-tag entry: LimitOverride uses an explicit value_limit (and optional
// cache_size_per_key override); Excluded opts the tag out. All other settings
// are always inherited from per-metric.
if let Some(per_tag) = per_metric.per_tag_limits.get(tag_key) {
match per_tag.mode {
PerTagMode::Excluded => return TagSettings::Excluded,
PerTagMode::LimitOverride { value_limit } => {
// Tracking algorithm and all other settings are always inherited
// from the per-metric config.
PerTagMode::LimitOverride {
value_limit,
cache_size_per_key,
} => {
return TagSettings::Tracked(Inner {
value_limit,
limit_exceeded_action,
mode: metric_mode,
mode: apply_cache_size_override(metric_mode, cache_size_per_key),
internal_metrics,
});
}
Expand All @@ -152,8 +165,12 @@ impl TagCardinalityLimit {
let global = self.config.global;
match self.config.per_tag_limits.get(tag_key).map(|c| c.mode) {
Some(PerTagMode::Excluded) => TagSettings::Excluded,
Some(PerTagMode::LimitOverride { value_limit }) => TagSettings::Tracked(Inner {
Some(PerTagMode::LimitOverride {
value_limit,
cache_size_per_key,
}) => TagSettings::Tracked(Inner {
value_limit,
mode: apply_cache_size_override(global.mode, cache_size_per_key),
..global
}),
None => TagSettings::Tracked(global),
Expand Down
Loading
Loading