From c4e16f8ff331cb23da2d578c5015a20ee2356ebd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 10 Dec 2025 15:09:29 +0100 Subject: [PATCH 1/6] feat(enrichment tables): add cuckoo filter to memory table This adds support for cuckoo filters in memory enrichment tables, to support use cases where only presence of a key needs to be checked and false positives are acceptable, greatly improving memory usage compared to regular memory tables. --- Cargo.lock | 10 + Cargo.toml | 5 +- src/enrichment_tables/memory/config.rs | 51 +- src/enrichment_tables/memory/cuckoo_table.rs | 504 ++++++++++++++++++ .../memory/internal_events.rs | 31 ++ src/enrichment_tables/memory/mod.rs | 1 + .../cue/reference/generated/configuration.cue | 96 ++++ 7 files changed, 695 insertions(+), 3 deletions(-) create mode 100644 src/enrichment_tables/memory/cuckoo_table.rs diff --git a/Cargo.lock b/Cargo.lock index 76e4c912d09ee..9aeb4373eacf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3014,6 +3014,15 @@ dependencies = [ "cipher", ] +[[package]] +name = "cuckoo-clock" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55e69956d137a478913ed8c017a31f483258e3a238282c40d89b014478e978b3" +dependencies = [ + "rand 0.9.2", +] + [[package]] name = "curl-sys" version = "0.4.84+curl-8.17.0" @@ -12285,6 +12294,7 @@ dependencies = [ "console-subscriber", "criterion", "csv", + "cuckoo-clock", "databend-client", "deadpool 0.13.0", "derivative", diff --git a/Cargo.toml b/Cargo.toml index 21c40c793a9bf..feae567685d68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,6 +151,7 @@ colored = { version = "3.1.1", default-features = false } const-str = { version = "1.1.0", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } +cuckoo-clock = { version = "0.1.0" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } @@ -362,6 +363,7 @@ chrono.workspace = true chrono-tz.workspace = true colored.workspace = true csv = { version = "1.3", default-features = false } +cuckoo-clock = { workspace = true, optional = true } databend-client = { version = "0.28.0", default-features = false, features = ["rustls"], optional = true } derivative.workspace = true dirs-next = { version = "2.0.0", default-features = false, optional = true } @@ -426,6 +428,7 @@ sqlx = { version = "0.8.6", default-features = false, features = ["derive", "pos stream-cancel = { version = "0.8.2", default-features = false } strip-ansi-escapes = { version = "0.2.1", default-features = false } syslog = { version = "6.1.1", default-features = false, optional = true } +tempfile = { workspace = true, optional = true } tokio-postgres = { version = "0.7.13", default-features = false, features = ["runtime", "with-chrono-0_4"], optional = true } tokio-tungstenite = { workspace = true, features = ["connect"], optional = true } toml.workspace = true @@ -603,7 +606,7 @@ gcp = ["dep:base64", "dep:goauth", "dep:smpl_jwt"] enrichment-tables = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"] enrichment-tables-geoip = ["dep:maxminddb"] enrichment-tables-mmdb = ["dep:maxminddb"] -enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"] +enrichment-tables-memory = ["dep:cuckoo-clock", "dep:evmap", "dep:evmap-derive", "dep:thread_local", "dep:tempfile"] # Codecs codecs-arrow = ["dep:arrow", "dep:arrow-schema", "vector-lib/arrow"] diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 9b17ce29750c3..83d4cb2e666bb 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -19,6 +19,7 @@ use crate::{ config::{ EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput, }, + enrichment_tables::memory::cuckoo_table::{CuckooMemoryConfig, CuckooMemoryTable}, sinks::Healthcheck, sources::Source, }; @@ -69,8 +70,16 @@ pub struct MemoryConfig { #[serde(default)] pub ttl_field: OptionalValuePath, + /// Set to make the table act as a probabilistic filter instead of storing original values. This + /// will prevent reading values from the table - found keys will have empty value. + #[configurable(derived)] + #[serde(default)] + pub filter: Option, + #[serde(skip)] memory: Arc>>>, + #[serde(skip)] + cuckoo: Arc>>>, } /// Configuration for memory enrichment table source functionality. @@ -102,6 +111,17 @@ pub struct MemorySourceConfig { pub source_key: String, } +/// Configuration for memory enrichment table filter functionality. +#[configurable_component] +#[derive(Clone, Debug, PartialEq, Eq)] +#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")] +pub enum TableFilter { + /// Cuckoo filter + /// + /// Supports removal too, as well as TTL and LRU + Cuckoo(CuckooMemoryConfig), +} + impl PartialEq for MemoryConfig { fn eq(&self, other: &Self) -> bool { self.ttl == other.ttl @@ -118,11 +138,13 @@ impl Default for MemoryConfig { scan_interval: default_scan_interval(), flush_interval: None, memory: Arc::new(Mutex::new(None)), + cuckoo: Arc::new(Mutex::new(None)), max_byte_size: None, log_namespace: None, source_config: None, internal_metrics: InternalMetricsConfig::default(), ttl_field: OptionalValuePath::none(), + filter: None, } } } @@ -142,6 +164,23 @@ impl MemoryConfig { .get_or_insert_with(|| Box::new(Memory::new(self.clone()))) .clone() } + + pub(super) async fn get_or_build_cuckoo(&self) -> crate::Result { + let mut boxed_cuckoo = self.cuckoo.lock().await; + let Some(TableFilter::Cuckoo(cuckoo)) = &self.filter else { + panic!("No cuckoo"); + }; + if let Some(boxed_cuckoo) = boxed_cuckoo.as_ref() { + Ok(*boxed_cuckoo.clone()) + } else { + Ok(*boxed_cuckoo + .insert(Box::new(CuckooMemoryTable::new( + self.clone(), + cuckoo.clone(), + )?)) + .clone()) + } + } } impl EnrichmentTableConfig for MemoryConfig { @@ -149,7 +188,10 @@ impl EnrichmentTableConfig for MemoryConfig { &self, _globals: &crate::config::GlobalOptions, ) -> crate::Result> { - Ok(Box::new(self.get_or_build_memory().await)) + match &self.filter { + Some(TableFilter::Cuckoo(_)) => Ok(Box::new(self.get_or_build_cuckoo().await?)), + None => Ok(Box::new(self.get_or_build_memory().await)), + } } fn sink_config( @@ -177,7 +219,12 @@ impl EnrichmentTableConfig for MemoryConfig { #[typetag::serde(name = "memory_enrichment_table")] impl SinkConfig for MemoryConfig { async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let sink = VectorSink::from_event_streamsink(self.get_or_build_memory().await); + let sink = match &self.filter { + Some(TableFilter::Cuckoo(_)) => { + VectorSink::from_event_streamsink(self.get_or_build_cuckoo().await?) + } + None => VectorSink::from_event_streamsink(self.get_or_build_memory().await), + }; Ok((sink, future::ok(()).boxed())) } diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs new file mode 100644 index 0000000000000..31abedf038108 --- /dev/null +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -0,0 +1,504 @@ +use std::{ + fs::File, + io::{BufReader, BufWriter, Write}, + num::NonZeroUsize, + path::PathBuf, + time::Duration, +}; + +use async_trait::async_trait; +use bytes::Bytes; +use cuckoo_clock::{ + CuckooFilter, ExportableRandomState, InsertValues, LookupValues, + config::{CounterConfig, CuckooConfiguration, LruConfig, TtlConfig}, +}; +use futures::{StreamExt, stream::BoxStream}; +use tempfile::NamedTempFile; +use tokio::time::interval; +use tokio_stream::wrappers::IntervalStream; +use vector_config::configurable_component; +use vector_lib::{ + EstimatedJsonEncodedSizeOf, + enrichment::{Case, Condition, Error, IndexHandle, Table}, + event::{Event, EventStatus, Finalizable}, + internal_event::{ + ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol, + }, + lookup::lookup_v2::OptionalValuePath, + sink::StreamSink, +}; +use vrl::value::{KeyString, ObjectMap, Value}; + +use crate::enrichment_tables::memory::{ + MemoryConfig, + internal_events::{ + MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead, + MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableRemoved, + MemoryEnrichmentTableTtlExpiredCount, + }, +}; + +/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a cuckoo table. +#[derive(Clone)] +pub(super) struct CuckooMemoryTable { + filter: CuckooFilter, + pub(super) config: MemoryConfig, + cuckoo_config: CuckooMemoryConfig, +} + +/// Configuration of cuckoo filter for memory table. +#[configurable_component] +#[derive(Clone, Debug, PartialEq, Eq)] +#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")] +pub struct CuckooMemoryConfig { + /// Number of bits used for fingerprint. + #[serde(default = "default_cuckoo_fingerprint_bits")] + pub fingerprint_bits: NonZeroUsize, + /// Number of slots in each bucket + #[serde(default = "default_cuckoo_bucket_size")] + pub bucket_size: NonZeroUsize, + /// Maximum number of entries that can be stored in the filter (actual capacity will usually be + /// larger) + pub max_entries: usize, + /// Max number of kicks when experiencing hash collisions. + #[serde(default = "default_cuckoo_max_kicks")] + pub max_kicks: usize, + /// Can be set to true to use LRU strategy for kicking. + #[serde(default = "crate::serde::default_false")] + pub lru_enabled: bool, + /// Can be set to true to also track TTL for entries. + #[serde(default = "crate::serde::default_true")] + pub ttl_enabled: bool, + /// Number of bits to use to track TTL. Low bit count will reduce maximum TTL and also require a + /// worse resolution to keep working. + #[serde(default = "default_cuckoo_ttl_bits")] + pub ttl_bits: NonZeroUsize, + /// Can be set to true to track a count alongside hashes. + #[serde(default = "crate::serde::default_false")] + pub counter_enabled: bool, + /// Number of bits to use to track counter. This will limit the max value. + #[serde(default = "default_cuckoo_counter_bits")] + pub counter_bits: NonZeroUsize, + /// Field in the incoming value used as the counter override. + #[configurable(derived)] + #[serde(default)] + pub counter_field: OptionalValuePath, + /// Path to the file to export data to periodically and on exit. + /// Data will be imported from this file on startup. + #[configurable(derived)] + #[serde(default)] + pub persistence_path: Option, + /// The interval used for exporting data. + /// + /// By default, export is only done on exit. + #[serde(skip_serializing_if = "vector_lib::serde::is_default")] + pub export_interval: Option, +} + +const fn default_cuckoo_fingerprint_bits() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(8) } +} + +const fn default_cuckoo_bucket_size() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(4) } +} + +const fn default_cuckoo_ttl_bits() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(8) } +} + +const fn default_cuckoo_counter_bits() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(8) } +} + +const fn default_cuckoo_max_kicks() -> usize { + 500 +} + +impl CuckooMemoryTable { + /// Creates a new [CuckooMemoryTable] based on the provided config. + pub(super) fn new( + config: MemoryConfig, + cuckoo_config: CuckooMemoryConfig, + ) -> crate::Result { + let ttl_val = config.ttl / config.scan_interval.get(); + let mut builder = CuckooConfiguration::builder(cuckoo_config.max_entries) + .fingerprint_bits(cuckoo_config.fingerprint_bits.get().try_into()?) + .bucket_size(cuckoo_config.bucket_size) + .max_kicks(cuckoo_config.max_kicks); + + if cuckoo_config.lru_enabled { + builder = builder.with_lru(LruConfig::default()); + } + + if cuckoo_config.ttl_enabled { + builder = builder.with_ttl(TtlConfig { + ttl: u32::try_from(ttl_val)?.try_into()?, + ttl_bits: cuckoo_config.ttl_bits.get().try_into()?, + }); + } + + if cuckoo_config.counter_enabled { + builder = builder.with_counter(CounterConfig { + counter_bits: cuckoo_config.counter_bits.get().try_into()?, + ..Default::default() + }); + } + + let built_config = builder.build()?; + + let filter = 'import: { + if let Some(path) = &cuckoo_config.persistence_path { + let Ok(file) = File::open(path) else { + warn!( + "Couldn't open \"{}\" for cuckoo filter state import.", + path.to_str().unwrap_or("") + ); + break 'import CuckooFilter::new_random_exportable(built_config); + }; + let mut reader = BufReader::new(file); + let filter = match CuckooFilter::import_random_exportable(&mut reader) { + Ok(filter) => filter, + Err(error) => { + warn!("Cuckoo filter state import failed: {}", error); + break 'import CuckooFilter::new_random_exportable(built_config); + } + }; + + if filter.get_configuration() != built_config { + // TODO: Should this stop the build from succeeding? The import will be lost, + // because it will be overwritter very soon. + warn!( + "Stored cuckoo filter configuration doesn't match with new configuration. Ignoring the import.", + ); + break 'import CuckooFilter::new_random_exportable(built_config); + } + + filter + } else { + CuckooFilter::new_random_exportable(built_config) + } + }; + + Ok(Self { + config, + filter, + cuckoo_config, + }) + } + + fn export(&self) { + if let Some(path) = &self.cuckoo_config.persistence_path { + let mut parent = path.clone(); + if parent.pop() + && let Ok(temp) = NamedTempFile::new_in(parent) + { + { + let mut writer = BufWriter::new(temp.as_file()); + if self.export_to(&mut writer).is_err() { + return; + } + } + if let Err(error) = temp.persist(path) { + warn!("Cuckoo filter export failed: {}", error); + } + } else { + warn!( + "Couldn't open temporary file for export. Trying to write directly to \"{}\"", + path.to_str().unwrap_or("") + ); + let Ok(file) = File::create(path) else { + warn!( + "Couldn't open \"{}\" for cuckoo filter state export.", + path.to_str().unwrap_or("") + ); + return; + }; + let mut writer = BufWriter::new(file); + let _ = self.export_to(&mut writer); + }; + } + } + + fn export_to(&self, mut writer: impl Write) -> Result<(), ()> { + match self.filter.exporter().write_to(&mut writer) { + Ok(()) => { + if let Err(error) = writer.flush() { + warn!("Cuckoo filter export failed: {}", error); + return Err(()); + }; + Ok(()) + } + Err(error) => { + warn!("Cuckoo filter export failed: {}", error); + Err(()) + } + } + } + + fn handle_value(&self, value: ObjectMap) { + for (k, value) in value.iter() { + if matches!(value, Value::Null) { + if self.filter.remove(k) { + emit!(MemoryEnrichmentTableRemoved { + key: k, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + } + + continue; + }; + + if self.cuckoo_config.ttl_enabled || self.cuckoo_config.counter_enabled { + let ttl = self + .config + .ttl_field + .path + .as_ref() + .and_then(|p| value.get(p)) + .and_then(|v| v.as_integer()) + .and_then(|v| u64::try_from(v).ok()) + .map(|v| v / self.config.scan_interval.get()) + .and_then(|v| u32::try_from(v).ok()); + let counter = self + .cuckoo_config + .counter_field + .path + .as_ref() + .and_then(|p| value.get(p)) + .and_then(|v| v.as_integer()) + .and_then(|v| i32::try_from(v).ok()); + let _ = self.filter.insert_if_not_present_with_update( + k, + InsertValues { ttl, counter }, + LookupValues { + ttl, + counter_diff: counter, + }, + ); + } else { + let _ = self.filter.insert_if_not_present(k); + } + emit!(MemoryEnrichmentTableInserted { + key: k, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + } + } +} + +impl Table for CuckooMemoryTable { + fn find_table_row<'a>( + &self, + case: Case, + condition: &'a [Condition<'a>], + select: Option<&'a [String]>, + wildcard: Option<&Value>, + index: Option, + ) -> Result { + let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?; + + match rows.pop() { + Some(row) if rows.is_empty() => Ok(row), + Some(_) => Err(Error::MoreThanOneRowFound), + None => Err(Error::NoRowsFound), + } + } + + fn find_table_rows<'a>( + &self, + _case: Case, + condition: &'a [Condition<'a>], + _select: Option<&'a [String]>, + _wildcard: Option<&Value>, + _index: Option, + ) -> Result, Error> { + match condition.first() { + Some(_) if condition.len() > 1 => Err(Error::OnlyOneConditionAllowed), + Some(Condition::Equals { value, .. }) => { + let key = value.to_string_lossy(); + if let Some(associated_data) = self.filter.get_associated_data(&key) { + emit!(MemoryEnrichmentTableRead { + key: &key, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + let mut result = ObjectMap::from([ + ( + KeyString::from("key"), + Value::Bytes(Bytes::copy_from_slice(key.as_bytes())), + ), + ( + KeyString::from("fingerprint"), + Value::Bytes(Bytes::from(format!( + "{:X}", + associated_data.get_fingerprint() + ))), + ), + ]); + if let Ok(ttl) = associated_data.get_stored_ttl_value() + && let Ok(ttl) = (ttl as u64 * self.config.scan_interval.get()).try_into() + { + result.insert(KeyString::from("ttl"), Value::Integer(ttl)); + } + if let Ok(counter) = associated_data.get_counter() { + result.insert(KeyString::from("counter"), Value::Integer(counter.into())); + } + Ok(vec![result]) + } else { + emit!(MemoryEnrichmentTableReadFailed { + key: &key, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + Ok(Default::default()) + } + } + Some(_) => Err(Error::OnlyEqualityConditionAllowed), + None => Err(Error::MissingCondition { kind: "Key" }), + } + } + + fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result { + match fields.len() { + 0 => Err(Error::MissingRequiredField { field: "Key" }), + 1 => Ok(IndexHandle(0)), + _ => Err(Error::OnlyOneFieldAllowed), + } + } + + /// Returns a list of the field names that are in each index + fn index_fields(&self) -> Vec<(Case, Vec)> { + Vec::new() + } + + /// Doesn't need reload, data is written directly + fn needs_reload(&self) -> bool { + false + } +} + +impl std::fmt::Debug for CuckooMemoryTable { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "CuckooMemoryTable {:?}", self.config) + } +} + +#[async_trait] +impl StreamSink for CuckooMemoryTable { + async fn run(mut self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { + let events_sent = register!(EventsSent::from(Output(None))); + let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),))); + let mut scan_interval = IntervalStream::new(interval(Duration::from_secs( + self.config.scan_interval.into(), + ))); + let mut flush_interval = IntervalStream::new(interval( + self.config + .flush_interval + .map(Duration::from_secs) + .unwrap_or(Duration::MAX), + )); + let mut export_interval = IntervalStream::new(interval( + self.cuckoo_config + .export_interval + .map(Duration::from_secs) + .unwrap_or(Duration::MAX), + )); + + loop { + tokio::select! { + event = input.next() => { + let mut event = if let Some(event) = event { + event + } else { + break; + }; + let event_byte_size = event.estimated_json_encoded_size_of(); + + let finalizers = event.take_finalizers(); + + // Panic: This sink only accepts Logs, so this should never panic + let log = event.into_log(); + + if let (Value::Object(map), _) = log.into_parts() { + self.handle_value(map) + }; + + finalizers.update_status(EventStatus::Delivered); + events_sent.emit(CountByteSize(1, event_byte_size)); + bytes_sent.emit(ByteSize(event_byte_size.get())); + }, + + Some(_) = flush_interval.next() => { + emit!(MemoryEnrichmentTableFlushed { + new_objects_count: self.filter.get_item_count(), + new_byte_size: self.filter.get_memory_usage() + }); + } + + Some(_) = export_interval.next() => { + self.export(); + } + + Some(_) = scan_interval.next() => { + let expired = self.filter.scan_and_update_full(); + emit!(MemoryEnrichmentTableTtlExpiredCount { + count: expired as u64 + }); + } + } + } + + // Final export before exiting + self.export(); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig { + // let mut config = MemoryConfig::default(); + // modfn(&mut config); + // config + // } + + fn build_cuckoo_config(modfn: impl Fn(&mut CuckooMemoryConfig)) -> CuckooMemoryConfig { + let mut config = CuckooMemoryConfig { + fingerprint_bits: default_cuckoo_fingerprint_bits(), + bucket_size: default_cuckoo_bucket_size(), + max_entries: 1000, + max_kicks: default_cuckoo_max_kicks(), + lru_enabled: false, + ttl_enabled: false, + ttl_bits: default_cuckoo_ttl_bits(), + counter_enabled: false, + counter_bits: default_cuckoo_counter_bits(), + counter_field: OptionalValuePath::none(), + persistence_path: None, + export_interval: None, + }; + modfn(&mut config); + config + } + + #[test] + fn finds_row() { + let memory = CuckooMemoryTable::new(Default::default(), build_cuckoo_config(|_| {})) + .expect("default cuckoo memory table should build correctly"); + memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))])); + + let condition = Condition::Equals { + field: "key", + value: Value::from("test_key"), + }; + + let result = memory.find_table_row(Case::Sensitive, &[condition], None, None, None); + assert!(result.is_ok()); + let result = result.unwrap(); + assert_eq!(result.get("key").unwrap(), &Value::from("test_key")); + // Cuckoo fingerprint is provided too + assert!(result.contains_key("fingerprint")); + } +} diff --git a/src/enrichment_tables/memory/internal_events.rs b/src/enrichment_tables/memory/internal_events.rs index 0799c6662a6af..ff805ad8ef744 100644 --- a/src/enrichment_tables/memory/internal_events.rs +++ b/src/enrichment_tables/memory/internal_events.rs @@ -56,6 +56,26 @@ impl InternalEvent for MemoryEnrichmentTableInserted<'_> { } } +#[derive(Debug, NamedInternalEvent)] +pub(crate) struct MemoryEnrichmentTableRemoved<'a> { + pub key: &'a str, + pub include_key_metric_tag: bool, +} + +impl InternalEvent for MemoryEnrichmentTableRemoved<'_> { + fn emit(self) { + if self.include_key_metric_tag { + counter!( + "memory_enrichment_table_removed_total", + "key" => self.key.to_owned() + ) + .increment(1); + } else { + counter!("memory_enrichment_table_removed_total",).increment(1); + } + } +} + #[derive(Debug, NamedInternalEvent)] pub(crate) struct MemoryEnrichmentTableFlushed { pub new_objects_count: usize, @@ -90,6 +110,17 @@ impl InternalEvent for MemoryEnrichmentTableTtlExpired<'_> { } } +#[derive(Debug, NamedInternalEvent)] +pub(crate) struct MemoryEnrichmentTableTtlExpiredCount { + pub count: u64, +} + +impl InternalEvent for MemoryEnrichmentTableTtlExpiredCount { + fn emit(self) { + counter!("memory_enrichment_table_ttl_expirations",).increment(self.count); + } +} + #[derive(Debug, NamedInternalEvent)] pub(crate) struct MemoryEnrichmentTableReadFailed<'a> { pub key: &'a str, diff --git a/src/enrichment_tables/memory/mod.rs b/src/enrichment_tables/memory/mod.rs index 72b0986f9b9e9..61308ef48cfbf 100644 --- a/src/enrichment_tables/memory/mod.rs +++ b/src/enrichment_tables/memory/mod.rs @@ -1,6 +1,7 @@ //! Handles enrichment tables for `type = memory`. mod config; +mod cuckoo_table; mod internal_events; mod source; mod table; diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index fdb5d0cb15bd0..101da50fcebdb 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -216,6 +216,102 @@ generated: configuration: { required: false relevant_when: "type = \"file\"" } + filter: { + type: object: options: { + bucket_size: { + type: uint: default: 4 + description: "Number of slots in each bucket" + required: false + } + counter_bits: { + type: uint: default: 8 + description: "Number of bits to use to track counter. This will limit the max value." + required: false + } + counter_enabled: { + type: bool: default: false + description: "Can be set to true to track a count alongside hashes." + required: false + } + counter_field: { + type: string: default: "" + description: "Field in the incoming value used as the counter override." + required: false + } + export_interval: { + type: uint: {} + description: """ + The interval used for exporting data. + + By default, export is only done on exit. + """ + required: false + } + fingerprint_bits: { + type: uint: default: 8 + description: "Number of bits used for fingerprint." + required: false + } + lru_enabled: { + type: bool: default: false + description: "Can be set to true to use LRU strategy for kicking." + required: false + } + max_entries: { + type: uint: {} + description: """ + Maximum number of entries that can be stored in the filter (actual capacity will usually be + larger) + """ + required: true + } + max_kicks: { + type: uint: default: 500 + description: "Max number of kicks when experiencing hash collisions." + required: false + } + persistence_path: { + type: string: {} + description: """ + Path to the file to export data to periodically and on exit. + Data will be imported from this file on startup. + """ + required: false + } + ttl_bits: { + type: uint: default: 8 + description: """ + Number of bits to use to track TTL. Low bit count will reduce maximum TTL and also require a + worse resolution to keep working. + """ + required: false + } + ttl_enabled: { + type: bool: default: true + description: "Can be set to true to also track TTL for entries." + required: false + } + type: { + type: string: enum: cuckoo: """ + Cuckoo filter + + Supports removal too, as well as TTL and LRU + """ + description: """ + Cuckoo filter + + Supports removal too, as well as TTL and LRU + """ + required: true + } + } + description: """ + Set to make the table act as a probabilistic filter instead of storing original values. This + will prevent reading values from the table - found keys will have empty value. + """ + required: false + relevant_when: "type = \"memory\"" + } flush_interval: { type: uint: {} description: """ From 8993198f694efa547421853774e6c84aa7899d98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 8 Apr 2026 17:41:13 +0200 Subject: [PATCH 2/6] Add changelog entry --- .../25143_enrichment_table_memory_cuckoo_filter.feature.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/25143_enrichment_table_memory_cuckoo_filter.feature.md diff --git a/changelog.d/25143_enrichment_table_memory_cuckoo_filter.feature.md b/changelog.d/25143_enrichment_table_memory_cuckoo_filter.feature.md new file mode 100644 index 0000000000000..36eaeb1a2c64d --- /dev/null +++ b/changelog.d/25143_enrichment_table_memory_cuckoo_filter.feature.md @@ -0,0 +1,3 @@ +Added cuckoo filter support for `memory` enrichment table, to provide an efficient way to store and check presence of keys with a low memory footprint at the cost of false positives. + +authors: esensar Quad9DNS From 2d295b5e143b844798148cb1045d091c1ecfa161 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 8 Apr 2026 17:52:58 +0200 Subject: [PATCH 3/6] Prevent ttl_val from being zero --- src/enrichment_tables/memory/cuckoo_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 31abedf038108..2c04844b50bc8 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -121,7 +121,7 @@ impl CuckooMemoryTable { config: MemoryConfig, cuckoo_config: CuckooMemoryConfig, ) -> crate::Result { - let ttl_val = config.ttl / config.scan_interval.get(); + let ttl_val = (config.ttl / config.scan_interval.get()).max(1); let mut builder = CuckooConfiguration::builder(cuckoo_config.max_entries) .fingerprint_bits(cuckoo_config.fingerprint_bits.get().try_into()?) .bucket_size(cuckoo_config.bucket_size) From 4fe95ea7430b6cda474c68098caa90c8cb90c5d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 8 Apr 2026 18:18:48 +0200 Subject: [PATCH 4/6] Remove commented out test code --- src/enrichment_tables/memory/cuckoo_table.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 2c04844b50bc8..52a085befa2cb 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -458,12 +458,6 @@ impl StreamSink for CuckooMemoryTable { mod tests { use super::*; - // fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig { - // let mut config = MemoryConfig::default(); - // modfn(&mut config); - // config - // } - fn build_cuckoo_config(modfn: impl Fn(&mut CuckooMemoryConfig)) -> CuckooMemoryConfig { let mut config = CuckooMemoryConfig { fingerprint_bits: default_cuckoo_fingerprint_bits(), From a0afaf48347baf228b5f870a22c927d053a2a9ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 9 Apr 2026 12:44:24 +0200 Subject: [PATCH 5/6] feat(enrichment tables): add bloom filter to memory table This adds support for bloom filters in memory enrichment tables, similar to cuckoo filter, but simpler and supporting less features (no TTL, no deletion). --- Cargo.toml | 2 +- src/enrichment_tables/memory/bloom_table.rs | 227 ++++++++++++++++++ src/enrichment_tables/memory/config.rs | 34 ++- src/enrichment_tables/memory/mod.rs | 1 + .../cue/reference/generated/configuration.cue | 64 +++-- 5 files changed, 301 insertions(+), 27 deletions(-) create mode 100644 src/enrichment_tables/memory/bloom_table.rs diff --git a/Cargo.toml b/Cargo.toml index feae567685d68..b3998e682298a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -606,7 +606,7 @@ gcp = ["dep:base64", "dep:goauth", "dep:smpl_jwt"] enrichment-tables = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"] enrichment-tables-geoip = ["dep:maxminddb"] enrichment-tables-mmdb = ["dep:maxminddb"] -enrichment-tables-memory = ["dep:cuckoo-clock", "dep:evmap", "dep:evmap-derive", "dep:thread_local", "dep:tempfile"] +enrichment-tables-memory = ["dep:bloomy", "dep:cuckoo-clock", "dep:evmap", "dep:evmap-derive", "dep:thread_local", "dep:tempfile"] # Codecs codecs-arrow = ["dep:arrow", "dep:arrow-schema", "vector-lib/arrow"] diff --git a/src/enrichment_tables/memory/bloom_table.rs b/src/enrichment_tables/memory/bloom_table.rs new file mode 100644 index 0000000000000..ded5e9639be96 --- /dev/null +++ b/src/enrichment_tables/memory/bloom_table.rs @@ -0,0 +1,227 @@ +use std::{ + sync::{Arc, RwLock}, + time::Duration, +}; + +use async_trait::async_trait; +use bloomy::BloomFilter; +use bytes::Bytes; +use futures::{StreamExt, stream::BoxStream}; +use tokio::time::interval; +use tokio_stream::wrappers::IntervalStream; +use vector_config::configurable_component; +use vector_lib::{ + EstimatedJsonEncodedSizeOf, + enrichment::{Case, Condition, Error, IndexHandle, Table}, + event::{Event, EventStatus, Finalizable}, + internal_event::{ + ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol, + }, + sink::StreamSink, +}; +use vrl::value::{KeyString, ObjectMap, Value}; + +use crate::enrichment_tables::memory::{ + MemoryConfig, + internal_events::{ + MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead, + MemoryEnrichmentTableReadFailed, + }, +}; + +/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a bloom table. +#[derive(Clone)] +pub(super) struct BloomMemoryTable { + filter: Arc>>, + pub(super) config: MemoryConfig, +} + +/// Configuration of bloom filter for memory table. +#[configurable_component] +#[derive(Clone, Debug, PartialEq, Eq)] +#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")] +pub struct BloomMemoryConfig { + /// Maximum number of entries that can be stored in the filter + pub max_entries: usize, +} + +impl BloomMemoryTable { + /// Creates a new [BloomMemoryTable] based on the provided config. + pub(super) fn new( + config: MemoryConfig, + bloom_config: BloomMemoryConfig, + ) -> crate::Result { + let filter = Arc::new(RwLock::new(BloomFilter::new(bloom_config.max_entries))); + + Ok(Self { config, filter }) + } + + fn handle_value(&self, value: ObjectMap) { + for (k, _) in value.iter() { + self.filter + .write() + .expect("rwlock poisoned") + .insert(&k.to_string()); + emit!(MemoryEnrichmentTableInserted { + key: k, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + } + } +} + +impl Table for BloomMemoryTable { + fn find_table_row<'a>( + &self, + case: Case, + condition: &'a [Condition<'a>], + select: Option<&'a [String]>, + wildcard: Option<&Value>, + index: Option, + ) -> Result { + let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?; + + match rows.pop() { + Some(row) if rows.is_empty() => Ok(row), + Some(_) => Err(Error::MoreThanOneRowFound), + None => Err(Error::NoRowsFound), + } + } + + fn find_table_rows<'a>( + &self, + _case: Case, + condition: &'a [Condition<'a>], + _select: Option<&'a [String]>, + _wildcard: Option<&Value>, + _index: Option, + ) -> Result, Error> { + match condition.first() { + Some(_) if condition.len() > 1 => Err(Error::OnlyOneConditionAllowed), + Some(Condition::Equals { value, .. }) => { + let key = value.to_string_lossy().to_string(); + if self.filter.read().expect("rwlock poisoned").contains(&key) { + emit!(MemoryEnrichmentTableRead { + key: &key, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + let result = ObjectMap::from([( + KeyString::from("key"), + Value::Bytes(Bytes::copy_from_slice(key.as_bytes())), + )]); + Ok(vec![result]) + } else { + emit!(MemoryEnrichmentTableReadFailed { + key: &key, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + Ok(Default::default()) + } + } + Some(_) => Err(Error::OnlyEqualityConditionAllowed), + None => Err(Error::MissingCondition { kind: "Key" }), + } + } + + fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result { + match fields.len() { + 0 => Err(Error::MissingRequiredField { field: "Key" }), + 1 => Ok(IndexHandle(0)), + _ => Err(Error::OnlyOneFieldAllowed), + } + } + + /// Returns a list of the field names that are in each index + fn index_fields(&self) -> Vec<(Case, Vec)> { + Vec::new() + } + + /// Doesn't need reload, data is written directly + fn needs_reload(&self) -> bool { + false + } +} + +impl std::fmt::Debug for BloomMemoryTable { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BloomMemoryTable {:?}", self.config) + } +} + +#[async_trait] +impl StreamSink for BloomMemoryTable { + async fn run(mut self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { + let events_sent = register!(EventsSent::from(Output(None))); + let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),))); + let mut flush_interval = IntervalStream::new(interval( + self.config + .flush_interval + .map(Duration::from_secs) + .unwrap_or(Duration::MAX), + )); + + loop { + tokio::select! { + event = input.next() => { + let mut event = if let Some(event) = event { + event + } else { + break; + }; + let event_byte_size = event.estimated_json_encoded_size_of(); + + let finalizers = event.take_finalizers(); + + // Panic: This sink only accepts Logs, so this should never panic + let log = event.into_log(); + + if let (Value::Object(map), _) = log.into_parts() { + self.handle_value(map) + }; + + finalizers.update_status(EventStatus::Delivered); + events_sent.emit(CountByteSize(1, event_byte_size)); + bytes_sent.emit(ByteSize(event_byte_size.get())); + }, + + Some(_) = flush_interval.next() => { + let filter = self.filter.read().expect("rwlock poisoned"); + emit!(MemoryEnrichmentTableFlushed { + new_objects_count: filter.count(), + new_byte_size: filter.bits() / 8 + }); + } + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn build_bloom_config(modfn: impl Fn(&mut BloomMemoryConfig)) -> BloomMemoryConfig { + let mut config = BloomMemoryConfig { max_entries: 1000 }; + modfn(&mut config); + config + } + + #[test] + fn finds_row() { + let memory = BloomMemoryTable::new(Default::default(), build_bloom_config(|_| {})) + .expect("default bloom memory table should build correctly"); + memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))])); + + let condition = Condition::Equals { + field: "key", + value: Value::from("test_key"), + }; + + let result = memory.find_table_row(Case::Sensitive, &[condition], None, None, None); + assert!(result.is_ok()); + let result = result.unwrap(); + assert_eq!(result.get("key").unwrap(), &Value::from("test_key")); + } +} diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 83d4cb2e666bb..cc5ea113b8e95 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -19,7 +19,10 @@ use crate::{ config::{ EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput, }, - enrichment_tables::memory::cuckoo_table::{CuckooMemoryConfig, CuckooMemoryTable}, + enrichment_tables::memory::{ + bloom_table::{BloomMemoryConfig, BloomMemoryTable}, + cuckoo_table::{CuckooMemoryConfig, CuckooMemoryTable}, + }, sinks::Healthcheck, sources::Source, }; @@ -80,6 +83,8 @@ pub struct MemoryConfig { memory: Arc>>>, #[serde(skip)] cuckoo: Arc>>>, + #[serde(skip)] + bloom: Arc>>>, } /// Configuration for memory enrichment table source functionality. @@ -115,11 +120,16 @@ pub struct MemorySourceConfig { #[configurable_component] #[derive(Clone, Debug, PartialEq, Eq)] #[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")] +#[configurable(metadata(docs::enum_tag_description = "The probabilistic filter to use."))] pub enum TableFilter { /// Cuckoo filter /// /// Supports removal too, as well as TTL and LRU Cuckoo(CuckooMemoryConfig), + /// Bloom filter + /// + /// Only supports insertion and presence check, no TTL + Bloom(BloomMemoryConfig), } impl PartialEq for MemoryConfig { @@ -139,6 +149,7 @@ impl Default for MemoryConfig { flush_interval: None, memory: Arc::new(Mutex::new(None)), cuckoo: Arc::new(Mutex::new(None)), + bloom: Arc::new(Mutex::new(None)), max_byte_size: None, log_namespace: None, source_config: None, @@ -181,6 +192,23 @@ impl MemoryConfig { .clone()) } } + + pub(super) async fn get_or_build_bloom(&self) -> crate::Result { + let mut boxed_bloom = self.bloom.lock().await; + let Some(TableFilter::Bloom(bloom)) = &self.filter else { + panic!("No bloom"); + }; + if let Some(boxed_bloom) = boxed_bloom.as_ref() { + Ok(*boxed_bloom.clone()) + } else { + Ok(*boxed_bloom + .insert(Box::new(BloomMemoryTable::new( + self.clone(), + bloom.clone(), + )?)) + .clone()) + } + } } impl EnrichmentTableConfig for MemoryConfig { @@ -190,6 +218,7 @@ impl EnrichmentTableConfig for MemoryConfig { ) -> crate::Result> { match &self.filter { Some(TableFilter::Cuckoo(_)) => Ok(Box::new(self.get_or_build_cuckoo().await?)), + Some(TableFilter::Bloom(_)) => Ok(Box::new(self.get_or_build_bloom().await?)), None => Ok(Box::new(self.get_or_build_memory().await)), } } @@ -223,6 +252,9 @@ impl SinkConfig for MemoryConfig { Some(TableFilter::Cuckoo(_)) => { VectorSink::from_event_streamsink(self.get_or_build_cuckoo().await?) } + Some(TableFilter::Bloom(_)) => { + VectorSink::from_event_streamsink(self.get_or_build_bloom().await?) + } None => VectorSink::from_event_streamsink(self.get_or_build_memory().await), }; diff --git a/src/enrichment_tables/memory/mod.rs b/src/enrichment_tables/memory/mod.rs index 61308ef48cfbf..d37e0e99f7a2e 100644 --- a/src/enrichment_tables/memory/mod.rs +++ b/src/enrichment_tables/memory/mod.rs @@ -1,5 +1,6 @@ //! Handles enrichment tables for `type = memory`. +mod bloom_table; mod config; mod cuckoo_table; mod internal_events; diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index 101da50fcebdb..9550b4f3ba837 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -220,23 +220,27 @@ generated: configuration: { type: object: options: { bucket_size: { type: uint: default: 4 - description: "Number of slots in each bucket" - required: false + description: "Number of slots in each bucket" + required: false + relevant_when: "type = \"cuckoo\"" } counter_bits: { type: uint: default: 8 - description: "Number of bits to use to track counter. This will limit the max value." - required: false + description: "Number of bits to use to track counter. This will limit the max value." + required: false + relevant_when: "type = \"cuckoo\"" } counter_enabled: { type: bool: default: false - description: "Can be set to true to track a count alongside hashes." - required: false + description: "Can be set to true to track a count alongside hashes." + required: false + relevant_when: "type = \"cuckoo\"" } counter_field: { type: string: default: "" - description: "Field in the incoming value used as the counter override." - required: false + description: "Field in the incoming value used as the counter override." + required: false + relevant_when: "type = \"cuckoo\"" } export_interval: { type: uint: {} @@ -245,17 +249,20 @@ generated: configuration: { By default, export is only done on exit. """ - required: false + required: false + relevant_when: "type = \"cuckoo\"" } fingerprint_bits: { type: uint: default: 8 - description: "Number of bits used for fingerprint." - required: false + description: "Number of bits used for fingerprint." + required: false + relevant_when: "type = \"cuckoo\"" } lru_enabled: { type: bool: default: false - description: "Can be set to true to use LRU strategy for kicking." - required: false + description: "Can be set to true to use LRU strategy for kicking." + required: false + relevant_when: "type = \"cuckoo\"" } max_entries: { type: uint: {} @@ -267,8 +274,9 @@ generated: configuration: { } max_kicks: { type: uint: default: 500 - description: "Max number of kicks when experiencing hash collisions." - required: false + description: "Max number of kicks when experiencing hash collisions." + required: false + relevant_when: "type = \"cuckoo\"" } persistence_path: { type: string: {} @@ -276,7 +284,8 @@ generated: configuration: { Path to the file to export data to periodically and on exit. Data will be imported from this file on startup. """ - required: false + required: false + relevant_when: "type = \"cuckoo\"" } ttl_bits: { type: uint: default: 8 @@ -284,25 +293,30 @@ generated: configuration: { Number of bits to use to track TTL. Low bit count will reduce maximum TTL and also require a worse resolution to keep working. """ - required: false + required: false + relevant_when: "type = \"cuckoo\"" } ttl_enabled: { type: bool: default: true - description: "Can be set to true to also track TTL for entries." - required: false + description: "Can be set to true to also track TTL for entries." + required: false + relevant_when: "type = \"cuckoo\"" } type: { - type: string: enum: cuckoo: """ + required: true + type: string: enum: { + cuckoo: """ Cuckoo filter Supports removal too, as well as TTL and LRU """ - description: """ - Cuckoo filter + bloom: """ + Bloom filter - Supports removal too, as well as TTL and LRU - """ - required: true + Only supports insertion and presence check, no TTL + """ + } + description: "The probabilistic filter to use." } } description: """ From b4ddbb42182e3d450e605a53642e7c1db35aa830 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 9 Apr 2026 12:49:13 +0200 Subject: [PATCH 6/6] Add changelog entry --- .../25154_enrichment_table_memory_bloom_filter.feature.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/25154_enrichment_table_memory_bloom_filter.feature.md diff --git a/changelog.d/25154_enrichment_table_memory_bloom_filter.feature.md b/changelog.d/25154_enrichment_table_memory_bloom_filter.feature.md new file mode 100644 index 0000000000000..462beefdf3621 --- /dev/null +++ b/changelog.d/25154_enrichment_table_memory_bloom_filter.feature.md @@ -0,0 +1,3 @@ +Added bloom filter support for `memory` enrichment table, similar to cuckoo filter, providing as imple and efficient way to store and check presence of keys with a low memory footprint at the cost of false positives, but with less features that cuckoo filter. + +authors: esensar Quad9DNS