From c4e16f8ff331cb23da2d578c5015a20ee2356ebd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 10 Dec 2025 15:09:29 +0100 Subject: [PATCH 01/40] feat(enrichment tables): add cuckoo filter to memory table This adds support for cuckoo filters in memory enrichment tables, to support use cases where only presence of a key needs to be checked and false positives are acceptable, greatly improving memory usage compared to regular memory tables. --- Cargo.lock | 10 + Cargo.toml | 5 +- src/enrichment_tables/memory/config.rs | 51 +- src/enrichment_tables/memory/cuckoo_table.rs | 504 ++++++++++++++++++ .../memory/internal_events.rs | 31 ++ src/enrichment_tables/memory/mod.rs | 1 + .../cue/reference/generated/configuration.cue | 96 ++++ 7 files changed, 695 insertions(+), 3 deletions(-) create mode 100644 src/enrichment_tables/memory/cuckoo_table.rs diff --git a/Cargo.lock b/Cargo.lock index 76e4c912d09ee..9aeb4373eacf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3014,6 +3014,15 @@ dependencies = [ "cipher", ] +[[package]] +name = "cuckoo-clock" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55e69956d137a478913ed8c017a31f483258e3a238282c40d89b014478e978b3" +dependencies = [ + "rand 0.9.2", +] + [[package]] name = "curl-sys" version = "0.4.84+curl-8.17.0" @@ -12285,6 +12294,7 @@ dependencies = [ "console-subscriber", "criterion", "csv", + "cuckoo-clock", "databend-client", "deadpool 0.13.0", "derivative", diff --git a/Cargo.toml b/Cargo.toml index 21c40c793a9bf..feae567685d68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,6 +151,7 @@ colored = { version = "3.1.1", default-features = false } const-str = { version = "1.1.0", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } +cuckoo-clock = { version = "0.1.0" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } @@ -362,6 +363,7 @@ chrono.workspace = true chrono-tz.workspace = true colored.workspace = true csv = { version = "1.3", default-features = false } +cuckoo-clock = { workspace = true, optional = true } databend-client = { version = "0.28.0", default-features = false, features = ["rustls"], optional = true } derivative.workspace = true dirs-next = { version = "2.0.0", default-features = false, optional = true } @@ -426,6 +428,7 @@ sqlx = { version = "0.8.6", default-features = false, features = ["derive", "pos stream-cancel = { version = "0.8.2", default-features = false } strip-ansi-escapes = { version = "0.2.1", default-features = false } syslog = { version = "6.1.1", default-features = false, optional = true } +tempfile = { workspace = true, optional = true } tokio-postgres = { version = "0.7.13", default-features = false, features = ["runtime", "with-chrono-0_4"], optional = true } tokio-tungstenite = { workspace = true, features = ["connect"], optional = true } toml.workspace = true @@ -603,7 +606,7 @@ gcp = ["dep:base64", "dep:goauth", "dep:smpl_jwt"] enrichment-tables = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"] enrichment-tables-geoip = ["dep:maxminddb"] enrichment-tables-mmdb = ["dep:maxminddb"] -enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"] +enrichment-tables-memory = ["dep:cuckoo-clock", "dep:evmap", "dep:evmap-derive", "dep:thread_local", "dep:tempfile"] # Codecs codecs-arrow = ["dep:arrow", "dep:arrow-schema", "vector-lib/arrow"] diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 9b17ce29750c3..83d4cb2e666bb 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -19,6 +19,7 @@ use crate::{ config::{ EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput, }, + enrichment_tables::memory::cuckoo_table::{CuckooMemoryConfig, CuckooMemoryTable}, sinks::Healthcheck, sources::Source, }; @@ -69,8 +70,16 @@ pub struct MemoryConfig { #[serde(default)] pub ttl_field: OptionalValuePath, + /// Set to make the table act as a probabilistic filter instead of storing original values. This + /// will prevent reading values from the table - found keys will have empty value. + #[configurable(derived)] + #[serde(default)] + pub filter: Option, + #[serde(skip)] memory: Arc>>>, + #[serde(skip)] + cuckoo: Arc>>>, } /// Configuration for memory enrichment table source functionality. @@ -102,6 +111,17 @@ pub struct MemorySourceConfig { pub source_key: String, } +/// Configuration for memory enrichment table filter functionality. +#[configurable_component] +#[derive(Clone, Debug, PartialEq, Eq)] +#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")] +pub enum TableFilter { + /// Cuckoo filter + /// + /// Supports removal too, as well as TTL and LRU + Cuckoo(CuckooMemoryConfig), +} + impl PartialEq for MemoryConfig { fn eq(&self, other: &Self) -> bool { self.ttl == other.ttl @@ -118,11 +138,13 @@ impl Default for MemoryConfig { scan_interval: default_scan_interval(), flush_interval: None, memory: Arc::new(Mutex::new(None)), + cuckoo: Arc::new(Mutex::new(None)), max_byte_size: None, log_namespace: None, source_config: None, internal_metrics: InternalMetricsConfig::default(), ttl_field: OptionalValuePath::none(), + filter: None, } } } @@ -142,6 +164,23 @@ impl MemoryConfig { .get_or_insert_with(|| Box::new(Memory::new(self.clone()))) .clone() } + + pub(super) async fn get_or_build_cuckoo(&self) -> crate::Result { + let mut boxed_cuckoo = self.cuckoo.lock().await; + let Some(TableFilter::Cuckoo(cuckoo)) = &self.filter else { + panic!("No cuckoo"); + }; + if let Some(boxed_cuckoo) = boxed_cuckoo.as_ref() { + Ok(*boxed_cuckoo.clone()) + } else { + Ok(*boxed_cuckoo + .insert(Box::new(CuckooMemoryTable::new( + self.clone(), + cuckoo.clone(), + )?)) + .clone()) + } + } } impl EnrichmentTableConfig for MemoryConfig { @@ -149,7 +188,10 @@ impl EnrichmentTableConfig for MemoryConfig { &self, _globals: &crate::config::GlobalOptions, ) -> crate::Result> { - Ok(Box::new(self.get_or_build_memory().await)) + match &self.filter { + Some(TableFilter::Cuckoo(_)) => Ok(Box::new(self.get_or_build_cuckoo().await?)), + None => Ok(Box::new(self.get_or_build_memory().await)), + } } fn sink_config( @@ -177,7 +219,12 @@ impl EnrichmentTableConfig for MemoryConfig { #[typetag::serde(name = "memory_enrichment_table")] impl SinkConfig for MemoryConfig { async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let sink = VectorSink::from_event_streamsink(self.get_or_build_memory().await); + let sink = match &self.filter { + Some(TableFilter::Cuckoo(_)) => { + VectorSink::from_event_streamsink(self.get_or_build_cuckoo().await?) + } + None => VectorSink::from_event_streamsink(self.get_or_build_memory().await), + }; Ok((sink, future::ok(()).boxed())) } diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs new file mode 100644 index 0000000000000..31abedf038108 --- /dev/null +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -0,0 +1,504 @@ +use std::{ + fs::File, + io::{BufReader, BufWriter, Write}, + num::NonZeroUsize, + path::PathBuf, + time::Duration, +}; + +use async_trait::async_trait; +use bytes::Bytes; +use cuckoo_clock::{ + CuckooFilter, ExportableRandomState, InsertValues, LookupValues, + config::{CounterConfig, CuckooConfiguration, LruConfig, TtlConfig}, +}; +use futures::{StreamExt, stream::BoxStream}; +use tempfile::NamedTempFile; +use tokio::time::interval; +use tokio_stream::wrappers::IntervalStream; +use vector_config::configurable_component; +use vector_lib::{ + EstimatedJsonEncodedSizeOf, + enrichment::{Case, Condition, Error, IndexHandle, Table}, + event::{Event, EventStatus, Finalizable}, + internal_event::{ + ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol, + }, + lookup::lookup_v2::OptionalValuePath, + sink::StreamSink, +}; +use vrl::value::{KeyString, ObjectMap, Value}; + +use crate::enrichment_tables::memory::{ + MemoryConfig, + internal_events::{ + MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead, + MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableRemoved, + MemoryEnrichmentTableTtlExpiredCount, + }, +}; + +/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a cuckoo table. +#[derive(Clone)] +pub(super) struct CuckooMemoryTable { + filter: CuckooFilter, + pub(super) config: MemoryConfig, + cuckoo_config: CuckooMemoryConfig, +} + +/// Configuration of cuckoo filter for memory table. +#[configurable_component] +#[derive(Clone, Debug, PartialEq, Eq)] +#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")] +pub struct CuckooMemoryConfig { + /// Number of bits used for fingerprint. + #[serde(default = "default_cuckoo_fingerprint_bits")] + pub fingerprint_bits: NonZeroUsize, + /// Number of slots in each bucket + #[serde(default = "default_cuckoo_bucket_size")] + pub bucket_size: NonZeroUsize, + /// Maximum number of entries that can be stored in the filter (actual capacity will usually be + /// larger) + pub max_entries: usize, + /// Max number of kicks when experiencing hash collisions. + #[serde(default = "default_cuckoo_max_kicks")] + pub max_kicks: usize, + /// Can be set to true to use LRU strategy for kicking. + #[serde(default = "crate::serde::default_false")] + pub lru_enabled: bool, + /// Can be set to true to also track TTL for entries. + #[serde(default = "crate::serde::default_true")] + pub ttl_enabled: bool, + /// Number of bits to use to track TTL. Low bit count will reduce maximum TTL and also require a + /// worse resolution to keep working. + #[serde(default = "default_cuckoo_ttl_bits")] + pub ttl_bits: NonZeroUsize, + /// Can be set to true to track a count alongside hashes. + #[serde(default = "crate::serde::default_false")] + pub counter_enabled: bool, + /// Number of bits to use to track counter. This will limit the max value. + #[serde(default = "default_cuckoo_counter_bits")] + pub counter_bits: NonZeroUsize, + /// Field in the incoming value used as the counter override. + #[configurable(derived)] + #[serde(default)] + pub counter_field: OptionalValuePath, + /// Path to the file to export data to periodically and on exit. + /// Data will be imported from this file on startup. + #[configurable(derived)] + #[serde(default)] + pub persistence_path: Option, + /// The interval used for exporting data. + /// + /// By default, export is only done on exit. + #[serde(skip_serializing_if = "vector_lib::serde::is_default")] + pub export_interval: Option, +} + +const fn default_cuckoo_fingerprint_bits() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(8) } +} + +const fn default_cuckoo_bucket_size() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(4) } +} + +const fn default_cuckoo_ttl_bits() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(8) } +} + +const fn default_cuckoo_counter_bits() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(8) } +} + +const fn default_cuckoo_max_kicks() -> usize { + 500 +} + +impl CuckooMemoryTable { + /// Creates a new [CuckooMemoryTable] based on the provided config. + pub(super) fn new( + config: MemoryConfig, + cuckoo_config: CuckooMemoryConfig, + ) -> crate::Result { + let ttl_val = config.ttl / config.scan_interval.get(); + let mut builder = CuckooConfiguration::builder(cuckoo_config.max_entries) + .fingerprint_bits(cuckoo_config.fingerprint_bits.get().try_into()?) + .bucket_size(cuckoo_config.bucket_size) + .max_kicks(cuckoo_config.max_kicks); + + if cuckoo_config.lru_enabled { + builder = builder.with_lru(LruConfig::default()); + } + + if cuckoo_config.ttl_enabled { + builder = builder.with_ttl(TtlConfig { + ttl: u32::try_from(ttl_val)?.try_into()?, + ttl_bits: cuckoo_config.ttl_bits.get().try_into()?, + }); + } + + if cuckoo_config.counter_enabled { + builder = builder.with_counter(CounterConfig { + counter_bits: cuckoo_config.counter_bits.get().try_into()?, + ..Default::default() + }); + } + + let built_config = builder.build()?; + + let filter = 'import: { + if let Some(path) = &cuckoo_config.persistence_path { + let Ok(file) = File::open(path) else { + warn!( + "Couldn't open \"{}\" for cuckoo filter state import.", + path.to_str().unwrap_or("") + ); + break 'import CuckooFilter::new_random_exportable(built_config); + }; + let mut reader = BufReader::new(file); + let filter = match CuckooFilter::import_random_exportable(&mut reader) { + Ok(filter) => filter, + Err(error) => { + warn!("Cuckoo filter state import failed: {}", error); + break 'import CuckooFilter::new_random_exportable(built_config); + } + }; + + if filter.get_configuration() != built_config { + // TODO: Should this stop the build from succeeding? The import will be lost, + // because it will be overwritter very soon. + warn!( + "Stored cuckoo filter configuration doesn't match with new configuration. Ignoring the import.", + ); + break 'import CuckooFilter::new_random_exportable(built_config); + } + + filter + } else { + CuckooFilter::new_random_exportable(built_config) + } + }; + + Ok(Self { + config, + filter, + cuckoo_config, + }) + } + + fn export(&self) { + if let Some(path) = &self.cuckoo_config.persistence_path { + let mut parent = path.clone(); + if parent.pop() + && let Ok(temp) = NamedTempFile::new_in(parent) + { + { + let mut writer = BufWriter::new(temp.as_file()); + if self.export_to(&mut writer).is_err() { + return; + } + } + if let Err(error) = temp.persist(path) { + warn!("Cuckoo filter export failed: {}", error); + } + } else { + warn!( + "Couldn't open temporary file for export. Trying to write directly to \"{}\"", + path.to_str().unwrap_or("") + ); + let Ok(file) = File::create(path) else { + warn!( + "Couldn't open \"{}\" for cuckoo filter state export.", + path.to_str().unwrap_or("") + ); + return; + }; + let mut writer = BufWriter::new(file); + let _ = self.export_to(&mut writer); + }; + } + } + + fn export_to(&self, mut writer: impl Write) -> Result<(), ()> { + match self.filter.exporter().write_to(&mut writer) { + Ok(()) => { + if let Err(error) = writer.flush() { + warn!("Cuckoo filter export failed: {}", error); + return Err(()); + }; + Ok(()) + } + Err(error) => { + warn!("Cuckoo filter export failed: {}", error); + Err(()) + } + } + } + + fn handle_value(&self, value: ObjectMap) { + for (k, value) in value.iter() { + if matches!(value, Value::Null) { + if self.filter.remove(k) { + emit!(MemoryEnrichmentTableRemoved { + key: k, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + } + + continue; + }; + + if self.cuckoo_config.ttl_enabled || self.cuckoo_config.counter_enabled { + let ttl = self + .config + .ttl_field + .path + .as_ref() + .and_then(|p| value.get(p)) + .and_then(|v| v.as_integer()) + .and_then(|v| u64::try_from(v).ok()) + .map(|v| v / self.config.scan_interval.get()) + .and_then(|v| u32::try_from(v).ok()); + let counter = self + .cuckoo_config + .counter_field + .path + .as_ref() + .and_then(|p| value.get(p)) + .and_then(|v| v.as_integer()) + .and_then(|v| i32::try_from(v).ok()); + let _ = self.filter.insert_if_not_present_with_update( + k, + InsertValues { ttl, counter }, + LookupValues { + ttl, + counter_diff: counter, + }, + ); + } else { + let _ = self.filter.insert_if_not_present(k); + } + emit!(MemoryEnrichmentTableInserted { + key: k, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + } + } +} + +impl Table for CuckooMemoryTable { + fn find_table_row<'a>( + &self, + case: Case, + condition: &'a [Condition<'a>], + select: Option<&'a [String]>, + wildcard: Option<&Value>, + index: Option, + ) -> Result { + let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?; + + match rows.pop() { + Some(row) if rows.is_empty() => Ok(row), + Some(_) => Err(Error::MoreThanOneRowFound), + None => Err(Error::NoRowsFound), + } + } + + fn find_table_rows<'a>( + &self, + _case: Case, + condition: &'a [Condition<'a>], + _select: Option<&'a [String]>, + _wildcard: Option<&Value>, + _index: Option, + ) -> Result, Error> { + match condition.first() { + Some(_) if condition.len() > 1 => Err(Error::OnlyOneConditionAllowed), + Some(Condition::Equals { value, .. }) => { + let key = value.to_string_lossy(); + if let Some(associated_data) = self.filter.get_associated_data(&key) { + emit!(MemoryEnrichmentTableRead { + key: &key, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + let mut result = ObjectMap::from([ + ( + KeyString::from("key"), + Value::Bytes(Bytes::copy_from_slice(key.as_bytes())), + ), + ( + KeyString::from("fingerprint"), + Value::Bytes(Bytes::from(format!( + "{:X}", + associated_data.get_fingerprint() + ))), + ), + ]); + if let Ok(ttl) = associated_data.get_stored_ttl_value() + && let Ok(ttl) = (ttl as u64 * self.config.scan_interval.get()).try_into() + { + result.insert(KeyString::from("ttl"), Value::Integer(ttl)); + } + if let Ok(counter) = associated_data.get_counter() { + result.insert(KeyString::from("counter"), Value::Integer(counter.into())); + } + Ok(vec![result]) + } else { + emit!(MemoryEnrichmentTableReadFailed { + key: &key, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + Ok(Default::default()) + } + } + Some(_) => Err(Error::OnlyEqualityConditionAllowed), + None => Err(Error::MissingCondition { kind: "Key" }), + } + } + + fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result { + match fields.len() { + 0 => Err(Error::MissingRequiredField { field: "Key" }), + 1 => Ok(IndexHandle(0)), + _ => Err(Error::OnlyOneFieldAllowed), + } + } + + /// Returns a list of the field names that are in each index + fn index_fields(&self) -> Vec<(Case, Vec)> { + Vec::new() + } + + /// Doesn't need reload, data is written directly + fn needs_reload(&self) -> bool { + false + } +} + +impl std::fmt::Debug for CuckooMemoryTable { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "CuckooMemoryTable {:?}", self.config) + } +} + +#[async_trait] +impl StreamSink for CuckooMemoryTable { + async fn run(mut self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { + let events_sent = register!(EventsSent::from(Output(None))); + let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),))); + let mut scan_interval = IntervalStream::new(interval(Duration::from_secs( + self.config.scan_interval.into(), + ))); + let mut flush_interval = IntervalStream::new(interval( + self.config + .flush_interval + .map(Duration::from_secs) + .unwrap_or(Duration::MAX), + )); + let mut export_interval = IntervalStream::new(interval( + self.cuckoo_config + .export_interval + .map(Duration::from_secs) + .unwrap_or(Duration::MAX), + )); + + loop { + tokio::select! { + event = input.next() => { + let mut event = if let Some(event) = event { + event + } else { + break; + }; + let event_byte_size = event.estimated_json_encoded_size_of(); + + let finalizers = event.take_finalizers(); + + // Panic: This sink only accepts Logs, so this should never panic + let log = event.into_log(); + + if let (Value::Object(map), _) = log.into_parts() { + self.handle_value(map) + }; + + finalizers.update_status(EventStatus::Delivered); + events_sent.emit(CountByteSize(1, event_byte_size)); + bytes_sent.emit(ByteSize(event_byte_size.get())); + }, + + Some(_) = flush_interval.next() => { + emit!(MemoryEnrichmentTableFlushed { + new_objects_count: self.filter.get_item_count(), + new_byte_size: self.filter.get_memory_usage() + }); + } + + Some(_) = export_interval.next() => { + self.export(); + } + + Some(_) = scan_interval.next() => { + let expired = self.filter.scan_and_update_full(); + emit!(MemoryEnrichmentTableTtlExpiredCount { + count: expired as u64 + }); + } + } + } + + // Final export before exiting + self.export(); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig { + // let mut config = MemoryConfig::default(); + // modfn(&mut config); + // config + // } + + fn build_cuckoo_config(modfn: impl Fn(&mut CuckooMemoryConfig)) -> CuckooMemoryConfig { + let mut config = CuckooMemoryConfig { + fingerprint_bits: default_cuckoo_fingerprint_bits(), + bucket_size: default_cuckoo_bucket_size(), + max_entries: 1000, + max_kicks: default_cuckoo_max_kicks(), + lru_enabled: false, + ttl_enabled: false, + ttl_bits: default_cuckoo_ttl_bits(), + counter_enabled: false, + counter_bits: default_cuckoo_counter_bits(), + counter_field: OptionalValuePath::none(), + persistence_path: None, + export_interval: None, + }; + modfn(&mut config); + config + } + + #[test] + fn finds_row() { + let memory = CuckooMemoryTable::new(Default::default(), build_cuckoo_config(|_| {})) + .expect("default cuckoo memory table should build correctly"); + memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))])); + + let condition = Condition::Equals { + field: "key", + value: Value::from("test_key"), + }; + + let result = memory.find_table_row(Case::Sensitive, &[condition], None, None, None); + assert!(result.is_ok()); + let result = result.unwrap(); + assert_eq!(result.get("key").unwrap(), &Value::from("test_key")); + // Cuckoo fingerprint is provided too + assert!(result.contains_key("fingerprint")); + } +} diff --git a/src/enrichment_tables/memory/internal_events.rs b/src/enrichment_tables/memory/internal_events.rs index 0799c6662a6af..ff805ad8ef744 100644 --- a/src/enrichment_tables/memory/internal_events.rs +++ b/src/enrichment_tables/memory/internal_events.rs @@ -56,6 +56,26 @@ impl InternalEvent for MemoryEnrichmentTableInserted<'_> { } } +#[derive(Debug, NamedInternalEvent)] +pub(crate) struct MemoryEnrichmentTableRemoved<'a> { + pub key: &'a str, + pub include_key_metric_tag: bool, +} + +impl InternalEvent for MemoryEnrichmentTableRemoved<'_> { + fn emit(self) { + if self.include_key_metric_tag { + counter!( + "memory_enrichment_table_removed_total", + "key" => self.key.to_owned() + ) + .increment(1); + } else { + counter!("memory_enrichment_table_removed_total",).increment(1); + } + } +} + #[derive(Debug, NamedInternalEvent)] pub(crate) struct MemoryEnrichmentTableFlushed { pub new_objects_count: usize, @@ -90,6 +110,17 @@ impl InternalEvent for MemoryEnrichmentTableTtlExpired<'_> { } } +#[derive(Debug, NamedInternalEvent)] +pub(crate) struct MemoryEnrichmentTableTtlExpiredCount { + pub count: u64, +} + +impl InternalEvent for MemoryEnrichmentTableTtlExpiredCount { + fn emit(self) { + counter!("memory_enrichment_table_ttl_expirations",).increment(self.count); + } +} + #[derive(Debug, NamedInternalEvent)] pub(crate) struct MemoryEnrichmentTableReadFailed<'a> { pub key: &'a str, diff --git a/src/enrichment_tables/memory/mod.rs b/src/enrichment_tables/memory/mod.rs index 72b0986f9b9e9..61308ef48cfbf 100644 --- a/src/enrichment_tables/memory/mod.rs +++ b/src/enrichment_tables/memory/mod.rs @@ -1,6 +1,7 @@ //! Handles enrichment tables for `type = memory`. mod config; +mod cuckoo_table; mod internal_events; mod source; mod table; diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index fdb5d0cb15bd0..101da50fcebdb 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -216,6 +216,102 @@ generated: configuration: { required: false relevant_when: "type = \"file\"" } + filter: { + type: object: options: { + bucket_size: { + type: uint: default: 4 + description: "Number of slots in each bucket" + required: false + } + counter_bits: { + type: uint: default: 8 + description: "Number of bits to use to track counter. This will limit the max value." + required: false + } + counter_enabled: { + type: bool: default: false + description: "Can be set to true to track a count alongside hashes." + required: false + } + counter_field: { + type: string: default: "" + description: "Field in the incoming value used as the counter override." + required: false + } + export_interval: { + type: uint: {} + description: """ + The interval used for exporting data. + + By default, export is only done on exit. + """ + required: false + } + fingerprint_bits: { + type: uint: default: 8 + description: "Number of bits used for fingerprint." + required: false + } + lru_enabled: { + type: bool: default: false + description: "Can be set to true to use LRU strategy for kicking." + required: false + } + max_entries: { + type: uint: {} + description: """ + Maximum number of entries that can be stored in the filter (actual capacity will usually be + larger) + """ + required: true + } + max_kicks: { + type: uint: default: 500 + description: "Max number of kicks when experiencing hash collisions." + required: false + } + persistence_path: { + type: string: {} + description: """ + Path to the file to export data to periodically and on exit. + Data will be imported from this file on startup. + """ + required: false + } + ttl_bits: { + type: uint: default: 8 + description: """ + Number of bits to use to track TTL. Low bit count will reduce maximum TTL and also require a + worse resolution to keep working. + """ + required: false + } + ttl_enabled: { + type: bool: default: true + description: "Can be set to true to also track TTL for entries." + required: false + } + type: { + type: string: enum: cuckoo: """ + Cuckoo filter + + Supports removal too, as well as TTL and LRU + """ + description: """ + Cuckoo filter + + Supports removal too, as well as TTL and LRU + """ + required: true + } + } + description: """ + Set to make the table act as a probabilistic filter instead of storing original values. This + will prevent reading values from the table - found keys will have empty value. + """ + required: false + relevant_when: "type = \"memory\"" + } flush_interval: { type: uint: {} description: """ From 8993198f694efa547421853774e6c84aa7899d98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 8 Apr 2026 17:41:13 +0200 Subject: [PATCH 02/40] Add changelog entry --- .../25143_enrichment_table_memory_cuckoo_filter.feature.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/25143_enrichment_table_memory_cuckoo_filter.feature.md diff --git a/changelog.d/25143_enrichment_table_memory_cuckoo_filter.feature.md b/changelog.d/25143_enrichment_table_memory_cuckoo_filter.feature.md new file mode 100644 index 0000000000000..36eaeb1a2c64d --- /dev/null +++ b/changelog.d/25143_enrichment_table_memory_cuckoo_filter.feature.md @@ -0,0 +1,3 @@ +Added cuckoo filter support for `memory` enrichment table, to provide an efficient way to store and check presence of keys with a low memory footprint at the cost of false positives. + +authors: esensar Quad9DNS From 2d295b5e143b844798148cb1045d091c1ecfa161 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 8 Apr 2026 17:52:58 +0200 Subject: [PATCH 03/40] Prevent ttl_val from being zero --- src/enrichment_tables/memory/cuckoo_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 31abedf038108..2c04844b50bc8 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -121,7 +121,7 @@ impl CuckooMemoryTable { config: MemoryConfig, cuckoo_config: CuckooMemoryConfig, ) -> crate::Result { - let ttl_val = config.ttl / config.scan_interval.get(); + let ttl_val = (config.ttl / config.scan_interval.get()).max(1); let mut builder = CuckooConfiguration::builder(cuckoo_config.max_entries) .fingerprint_bits(cuckoo_config.fingerprint_bits.get().try_into()?) .bucket_size(cuckoo_config.bucket_size) From 4fe95ea7430b6cda474c68098caa90c8cb90c5d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 8 Apr 2026 18:18:48 +0200 Subject: [PATCH 04/40] Remove commented out test code --- src/enrichment_tables/memory/cuckoo_table.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 2c04844b50bc8..52a085befa2cb 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -458,12 +458,6 @@ impl StreamSink for CuckooMemoryTable { mod tests { use super::*; - // fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig { - // let mut config = MemoryConfig::default(); - // modfn(&mut config); - // config - // } - fn build_cuckoo_config(modfn: impl Fn(&mut CuckooMemoryConfig)) -> CuckooMemoryConfig { let mut config = CuckooMemoryConfig { fingerprint_bits: default_cuckoo_fingerprint_bits(), From c33d602045d66259eed1d5d7ecbd80d51a2d2813 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 9 Apr 2026 13:02:20 +0200 Subject: [PATCH 05/40] Set lower limit on ttl override as well --- src/enrichment_tables/memory/cuckoo_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 52a085befa2cb..0422f56aa985b 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -258,7 +258,7 @@ impl CuckooMemoryTable { .and_then(|p| value.get(p)) .and_then(|v| v.as_integer()) .and_then(|v| u64::try_from(v).ok()) - .map(|v| v / self.config.scan_interval.get()) + .map(|v| (v / self.config.scan_interval.get()).max(1)) .and_then(|v| u32::try_from(v).ok()); let counter = self .cuckoo_config From 610b97f94310e66eb2e3b9beee7d621148d26bd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 26 May 2026 14:39:08 +0200 Subject: [PATCH 06/40] Deny unknown fields in enrichment table memory config --- src/enrichment_tables/memory/config.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 83d4cb2e666bb..0562eb4880764 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -27,6 +27,7 @@ use crate::{ /// Configuration for the `memory` enrichment table. #[configurable_component(enrichment_table("memory"))] #[derive(Clone)] +#[serde(deny_unknown_fields)] pub struct MemoryConfig { /// TTL (time-to-live in seconds) is used to limit the lifetime of data stored in the cache. /// When TTL expires, data behind a specific key in the cache is removed. From 1f43d0b75dfb92b9a973954bb3540b919c4f3eef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 26 May 2026 15:37:00 +0200 Subject: [PATCH 07/40] Fix CounterName issues --- lib/vector-common/src/internal_event/metric_name.rs | 2 ++ src/enrichment_tables/memory/internal_events.rs | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/vector-common/src/internal_event/metric_name.rs b/lib/vector-common/src/internal_event/metric_name.rs index f520a3ecee316..62ba00237a70b 100644 --- a/lib/vector-common/src/internal_event/metric_name.rs +++ b/lib/vector-common/src/internal_event/metric_name.rs @@ -98,6 +98,7 @@ pub enum CounterName { MemoryEnrichmentTableFlushesTotal, MemoryEnrichmentTableInsertionsTotal, MemoryEnrichmentTableReadsTotal, + MemoryEnrichmentTableRemovedTotal, MemoryEnrichmentTableTtlExpirations, } @@ -363,6 +364,7 @@ impl CounterName { "memory_enrichment_table_insertions_total" } Self::MemoryEnrichmentTableReadsTotal => "memory_enrichment_table_reads_total", + Self::MemoryEnrichmentTableRemovedTotal => "memory_enrichment_table_removed_total", Self::MemoryEnrichmentTableTtlExpirations => "memory_enrichment_table_ttl_expirations", } } diff --git a/src/enrichment_tables/memory/internal_events.rs b/src/enrichment_tables/memory/internal_events.rs index 8ab8f13f24c0c..f781416a2fdb5 100644 --- a/src/enrichment_tables/memory/internal_events.rs +++ b/src/enrichment_tables/memory/internal_events.rs @@ -68,12 +68,12 @@ impl InternalEvent for MemoryEnrichmentTableRemoved<'_> { fn emit(self) { if self.include_key_metric_tag { counter!( - "memory_enrichment_table_removed_total", + CounterName::MemoryEnrichmentTableRemovedTotal, "key" => self.key.to_owned() ) .increment(1); } else { - counter!("memory_enrichment_table_removed_total",).increment(1); + counter!(CounterName::MemoryEnrichmentTableRemovedTotal).increment(1); } } } @@ -119,7 +119,7 @@ pub(crate) struct MemoryEnrichmentTableTtlExpiredCount { impl InternalEvent for MemoryEnrichmentTableTtlExpiredCount { fn emit(self) { - counter!("memory_enrichment_table_ttl_expirations",).increment(self.count); + counter!(CounterName::MemoryEnrichmentTableTtlExpirations,).increment(self.count); } } From 4f6a752cad2dace2e2c4169cd6ea6b5aa77508a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 27 May 2026 14:24:24 +0200 Subject: [PATCH 08/40] Apply default tll if ttl_field is not defined --- src/enrichment_tables/memory/cuckoo_table.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 0422f56aa985b..0ad96d9c271a1 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -259,7 +259,8 @@ impl CuckooMemoryTable { .and_then(|v| v.as_integer()) .and_then(|v| u64::try_from(v).ok()) .map(|v| (v / self.config.scan_interval.get()).max(1)) - .and_then(|v| u32::try_from(v).ok()); + .and_then(|v| u32::try_from(v).ok()) + .or(self.config.ttl.try_into().ok()); let counter = self .cuckoo_config .counter_field From 92bbe4752dfd760250e6eed3660908d0a46c2c78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 27 May 2026 14:24:34 +0200 Subject: [PATCH 09/40] Cargo.lock update --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 19ffcf06b5dd4..b1ab04254bc7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3333,7 +3333,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55e69956d137a478913ed8c017a31f483258e3a238282c40d89b014478e978b3" dependencies = [ - "rand 0.9.2", + "rand 0.9.4", ] [[package]] From 99297103b369c3a3d010f24175d4a4de20504ad4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 27 May 2026 14:33:27 +0200 Subject: [PATCH 10/40] Prevent intervals from firing off at boot --- src/enrichment_tables/memory/cuckoo_table.rs | 37 ++++++++++++-------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 0ad96d9c271a1..902fce5b4b10b 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -14,7 +14,7 @@ use cuckoo_clock::{ }; use futures::{StreamExt, stream::BoxStream}; use tempfile::NamedTempFile; -use tokio::time::interval; +use tokio::time::{Instant, interval_at}; use tokio_stream::wrappers::IntervalStream; use vector_config::configurable_component; use vector_lib::{ @@ -388,20 +388,29 @@ impl StreamSink for CuckooMemoryTable { async fn run(mut self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { let events_sent = register!(EventsSent::from(Output(None))); let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),))); - let mut scan_interval = IntervalStream::new(interval(Duration::from_secs( - self.config.scan_interval.into(), - ))); - let mut flush_interval = IntervalStream::new(interval( - self.config - .flush_interval - .map(Duration::from_secs) - .unwrap_or(Duration::MAX), + let now = Instant::now(); + let cuckoo_scan_interval = Duration::from_secs(self.config.scan_interval.into()); + let mut scan_interval = IntervalStream::new(interval_at( + now + cuckoo_scan_interval, + cuckoo_scan_interval, )); - let mut export_interval = IntervalStream::new(interval( - self.cuckoo_config - .export_interval - .map(Duration::from_secs) - .unwrap_or(Duration::MAX), + let cuckoo_flush_interval = self + .config + .flush_interval + .map(Duration::from_secs) + .unwrap_or(Duration::MAX); + let mut flush_interval = IntervalStream::new(interval_at( + now + cuckoo_flush_interval, + cuckoo_flush_interval, + )); + let cuckoo_export_interval = self + .cuckoo_config + .export_interval + .map(Duration::from_secs) + .unwrap_or(Duration::MAX); + let mut export_interval = IntervalStream::new(interval_at( + now + cuckoo_export_interval, + cuckoo_export_interval, )); loop { From be6e123fda7187bf5c07481002c0630de36e973a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 27 May 2026 14:51:41 +0200 Subject: [PATCH 11/40] Return null value to make value field present --- src/enrichment_tables/memory/cuckoo_table.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 902fce5b4b10b..b909fe7e47bc9 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -335,6 +335,7 @@ impl Table for CuckooMemoryTable { associated_data.get_fingerprint() ))), ), + (KeyString::from("value"), Value::Null), ]); if let Ok(ttl) = associated_data.get_stored_ttl_value() && let Ok(ttl) = (ttl as u64 * self.config.scan_interval.get()).try_into() From 8df4f876fd290add25a67b666f269e101038a0a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 27 May 2026 14:54:39 +0200 Subject: [PATCH 12/40] Properly calculate default ttl when ttl_field is not present --- src/enrichment_tables/memory/cuckoo_table.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index b909fe7e47bc9..c2df8043bb882 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -258,9 +258,9 @@ impl CuckooMemoryTable { .and_then(|p| value.get(p)) .and_then(|v| v.as_integer()) .and_then(|v| u64::try_from(v).ok()) + .or(Some(self.config.ttl)) .map(|v| (v / self.config.scan_interval.get()).max(1)) - .and_then(|v| u32::try_from(v).ok()) - .or(self.config.ttl.try_into().ok()); + .and_then(|v| u32::try_from(v).ok()); let counter = self .cuckoo_config .counter_field From a0b64fa1104e7de6358947d59ac97b408a06641f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 27 May 2026 15:32:19 +0200 Subject: [PATCH 13/40] Use empty streams instead of Duration::MAX for uncofigured intervals --- src/enrichment_tables/memory/cuckoo_table.rs | 36 +++++++++++--------- src/enrichment_tables/memory/table.rs | 20 +++++++---- 2 files changed, 32 insertions(+), 24 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index c2df8043bb882..55622f2fd2c7b 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -3,6 +3,7 @@ use std::{ io::{BufReader, BufWriter, Write}, num::NonZeroUsize, path::PathBuf, + pin::Pin, time::Duration, }; @@ -12,9 +13,12 @@ use cuckoo_clock::{ CuckooFilter, ExportableRandomState, InsertValues, LookupValues, config::{CounterConfig, CuckooConfiguration, LruConfig, TtlConfig}, }; -use futures::{StreamExt, stream::BoxStream}; +use futures::{ + Stream, StreamExt, + stream::{self, BoxStream}, +}; use tempfile::NamedTempFile; -use tokio::time::{Instant, interval_at}; +use tokio::time::{Instant, interval, interval_at}; use tokio_stream::wrappers::IntervalStream; use vector_config::configurable_component; use vector_lib::{ @@ -390,29 +394,27 @@ impl StreamSink for CuckooMemoryTable { let events_sent = register!(EventsSent::from(Output(None))); let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),))); let now = Instant::now(); - let cuckoo_scan_interval = Duration::from_secs(self.config.scan_interval.into()); + let scan_interval_duration = Duration::from_secs(self.config.scan_interval.into()); let mut scan_interval = IntervalStream::new(interval_at( - now + cuckoo_scan_interval, - cuckoo_scan_interval, + now.checked_add(scan_interval_duration).unwrap_or(now), + scan_interval_duration, )); - let cuckoo_flush_interval = self + let mut flush_interval: Pin + Send>> = self .config .flush_interval .map(Duration::from_secs) - .unwrap_or(Duration::MAX); - let mut flush_interval = IntervalStream::new(interval_at( - now + cuckoo_flush_interval, - cuckoo_flush_interval, - )); - let cuckoo_export_interval = self + .map:: + Send>>, _>(|d| { + Box::pin(IntervalStream::new(interval(d))) + }) + .unwrap_or(Box::pin(stream::empty())); + let mut export_interval: Pin + Send>> = self .cuckoo_config .export_interval .map(Duration::from_secs) - .unwrap_or(Duration::MAX); - let mut export_interval = IntervalStream::new(interval_at( - now + cuckoo_export_interval, - cuckoo_export_interval, - )); + .map:: + Send>>, _>(|d| { + Box::pin(IntervalStream::new(interval(d))) + }) + .unwrap_or(Box::pin(stream::empty())); loop { tokio::select! { diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index a819b3be77912..d198ce1281d17 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -1,6 +1,7 @@ #![allow(unsafe_op_in_unsafe_fn)] // TODO review ShallowCopy usage code and fix properly. use std::{ + pin::Pin, sync::{Arc, Mutex, MutexGuard}, time::{Duration, Instant}, }; @@ -12,7 +13,10 @@ use evmap::{ {self}, }; use evmap_derive::ShallowCopy; -use futures::{StreamExt, stream::BoxStream}; +use futures::{ + Stream, StreamExt, + stream::{self, BoxStream}, +}; use thread_local::ThreadLocal; use tokio::{ sync::broadcast::{Receiver, Sender}, @@ -399,12 +403,14 @@ impl StreamSink for Memory { async fn run(mut self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { let events_sent = register!(EventsSent::from(Output(None))); let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),))); - let mut flush_interval = IntervalStream::new(interval( - self.config - .flush_interval - .map(Duration::from_secs) - .unwrap_or(Duration::MAX), - )); + let mut flush_interval: Pin + Send>> = self + .config + .flush_interval + .map(Duration::from_secs) + .map:: + Send>>, _>(|d| { + Box::pin(IntervalStream::new(interval(d))) + }) + .unwrap_or(Box::pin(stream::empty())); let mut scan_interval = IntervalStream::new(interval(Duration::from_secs( self.config.scan_interval.into(), ))); From 3ebe905afadddb9cb733525c277a1f6f544c39ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 10 Jun 2026 10:00:39 +0200 Subject: [PATCH 14/40] Prevent using source functionality when filter is used --- src/enrichment_tables/memory/config.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 0562eb4880764..931c5c3b34b41 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -190,7 +190,12 @@ impl EnrichmentTableConfig for MemoryConfig { _globals: &crate::config::GlobalOptions, ) -> crate::Result> { match &self.filter { - Some(TableFilter::Cuckoo(_)) => Ok(Box::new(self.get_or_build_cuckoo().await?)), + Some(TableFilter::Cuckoo(_)) => { + if self.source_config.is_some() { + return Err("Source functionality is not supported for cuckoo filter".into()); + } + Ok(Box::new(self.get_or_build_cuckoo().await?)) + } None => Ok(Box::new(self.get_or_build_memory().await)), } } @@ -209,6 +214,10 @@ impl EnrichmentTableConfig for MemoryConfig { let Some(source_config) = &self.source_config else { return None; }; + // Filters can't be used as a source + if self.filter.is_some() { + return None; + } Some(( source_config.source_key.clone().into(), Box::new(self.clone()), From 3189194da3c6c2b2d552801d6cb8c5ad79911ba9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 10 Jun 2026 10:26:19 +0200 Subject: [PATCH 15/40] Discard configuration if cuckoo filter can't be restored --- src/enrichment_tables/memory/cuckoo_table.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 55622f2fd2c7b..c37e4f83609c7 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -170,12 +170,9 @@ impl CuckooMemoryTable { }; if filter.get_configuration() != built_config { - // TODO: Should this stop the build from succeeding? The import will be lost, - // because it will be overwritter very soon. - warn!( - "Stored cuckoo filter configuration doesn't match with new configuration. Ignoring the import.", + return Err( + format!("Stored cuckoo filter configuration doesn't match with new configuration. If this is intended, remove the persisted state file ({}).", path.to_str().unwrap_or("")).into(), ); - break 'import CuckooFilter::new_random_exportable(built_config); } filter From 9a18e49722fc1aa7c5e06540af5f17453dda7353 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 10 Jun 2026 12:18:15 +0200 Subject: [PATCH 16/40] Validate ttl_bits before building the cuckoo filter --- src/enrichment_tables/memory/cuckoo_table.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index c37e4f83609c7..737b046a4aad2 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -136,8 +136,20 @@ impl CuckooMemoryTable { } if cuckoo_config.ttl_enabled { + let ttl_val: u32 = u32::try_from(ttl_val)?; + let needed_bits = ttl_val.ilog2() + 1; + if needed_bits as usize > cuckoo_config.ttl_bits.get() { + return Err( + format!( + "`ttl_bits` ({}) must be set to at least {} to support the default `ttl` value ({}) at the configured scan interval ({}).", + cuckoo_config.ttl_bits.get(), + needed_bits, + config.ttl, + config.scan_interval.get()).into(), + ); + } builder = builder.with_ttl(TtlConfig { - ttl: u32::try_from(ttl_val)?.try_into()?, + ttl: ttl_val.try_into()?, ttl_bits: cuckoo_config.ttl_bits.get().try_into()?, }); } From ffe915fe303eae2adfe83366f6ba70b5e9ea20d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 14:06:27 +0200 Subject: [PATCH 17/40] Use div_ceil when calculating TTL ticks --- src/enrichment_tables/memory/cuckoo_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 737b046a4aad2..9793b58ac4488 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -125,7 +125,7 @@ impl CuckooMemoryTable { config: MemoryConfig, cuckoo_config: CuckooMemoryConfig, ) -> crate::Result { - let ttl_val = (config.ttl / config.scan_interval.get()).max(1); + let ttl_val = (config.ttl.div_ceil(config.scan_interval.get())).max(1); let mut builder = CuckooConfiguration::builder(cuckoo_config.max_entries) .fingerprint_bits(cuckoo_config.fingerprint_bits.get().try_into()?) .bucket_size(cuckoo_config.bucket_size) From beb9e0af2671a570b4614e6dc77894627ab3b5ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 14:14:55 +0200 Subject: [PATCH 18/40] Warn when provided ttl is larger than defined ttl_bits --- src/enrichment_tables/memory/cuckoo_table.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 9793b58ac4488..04f0b0e0447d3 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -272,8 +272,20 @@ impl CuckooMemoryTable { .and_then(|v| v.as_integer()) .and_then(|v| u64::try_from(v).ok()) .or(Some(self.config.ttl)) - .map(|v| (v / self.config.scan_interval.get()).max(1)) + .map(|v| (v.div_ceil(self.config.scan_interval.get())).max(1)) .and_then(|v| u32::try_from(v).ok()); + if let Some(ttl) = ttl { + let needed_bits = ttl.ilog2() + 1; + if needed_bits as usize > self.cuckoo_config.ttl_bits.get() { + warn!( + "`ttl_bits` ({}) must be set to at least {} to support the provided `ttl` value ({}) at the configured scan interval ({}).", + self.cuckoo_config.ttl_bits.get(), + needed_bits, + self.config.ttl, + self.config.scan_interval.get() + ); + } + } let counter = self .cuckoo_config .counter_field From 170f5ea98bdcb9339f021f90e5236e5bf8397164 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 14:34:50 +0200 Subject: [PATCH 19/40] Update table size stats on scan too --- src/enrichment_tables/memory/cuckoo_table.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 04f0b0e0447d3..afac14c6ef0a1 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -477,6 +477,10 @@ impl StreamSink for CuckooMemoryTable { emit!(MemoryEnrichmentTableTtlExpiredCount { count: expired as u64 }); + emit!(MemoryEnrichmentTableFlushed { + new_objects_count: self.filter.get_item_count(), + new_byte_size: self.filter.get_memory_usage() + }); } } } From a938de7d194569dca54a48ab195dfeb3b7bd4b35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 15:39:19 +0200 Subject: [PATCH 20/40] Validate counter_bits on insert --- src/enrichment_tables/memory/cuckoo_table.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index afac14c6ef0a1..600341a0e35ac 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -281,7 +281,7 @@ impl CuckooMemoryTable { "`ttl_bits` ({}) must be set to at least {} to support the provided `ttl` value ({}) at the configured scan interval ({}).", self.cuckoo_config.ttl_bits.get(), needed_bits, - self.config.ttl, + ttl, self.config.scan_interval.get() ); } @@ -294,6 +294,17 @@ impl CuckooMemoryTable { .and_then(|p| value.get(p)) .and_then(|v| v.as_integer()) .and_then(|v| i32::try_from(v).ok()); + if let Some(counter) = counter { + let needed_bits = counter.ilog2() + 2; + if needed_bits as usize > self.cuckoo_config.counter_bits.get() { + warn!( + "`counter_bits` ({}) must be set to at least {} to support the provided `counter` value ({}).", + self.cuckoo_config.counter_bits.get(), + needed_bits, + counter + ); + } + } let _ = self.filter.insert_if_not_present_with_update( k, InsertValues { ttl, counter }, From 5be7277c97cf88c302fa1c33c7cd9a97542da804 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 15:56:36 +0200 Subject: [PATCH 21/40] Revert "Validate counter_bits on insert" This reverts commit a938de7d194569dca54a48ab195dfeb3b7bd4b35. --- src/enrichment_tables/memory/cuckoo_table.rs | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 600341a0e35ac..afac14c6ef0a1 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -281,7 +281,7 @@ impl CuckooMemoryTable { "`ttl_bits` ({}) must be set to at least {} to support the provided `ttl` value ({}) at the configured scan interval ({}).", self.cuckoo_config.ttl_bits.get(), needed_bits, - ttl, + self.config.ttl, self.config.scan_interval.get() ); } @@ -294,17 +294,6 @@ impl CuckooMemoryTable { .and_then(|p| value.get(p)) .and_then(|v| v.as_integer()) .and_then(|v| i32::try_from(v).ok()); - if let Some(counter) = counter { - let needed_bits = counter.ilog2() + 2; - if needed_bits as usize > self.cuckoo_config.counter_bits.get() { - warn!( - "`counter_bits` ({}) must be set to at least {} to support the provided `counter` value ({}).", - self.cuckoo_config.counter_bits.get(), - needed_bits, - counter - ); - } - } let _ = self.filter.insert_if_not_present_with_update( k, InsertValues { ttl, counter }, From e0f47a5c8e2659d9a46f366698066941b0b4323e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 16:07:05 +0200 Subject: [PATCH 22/40] Reject configuration on failed import too --- src/enrichment_tables/memory/cuckoo_table.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index afac14c6ef0a1..bcc86e71036a5 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -176,8 +176,9 @@ impl CuckooMemoryTable { let filter = match CuckooFilter::import_random_exportable(&mut reader) { Ok(filter) => filter, Err(error) => { - warn!("Cuckoo filter state import failed: {}", error); - break 'import CuckooFilter::new_random_exportable(built_config); + return Err( + format!("Cuckoo filter state import failed: {}. Delete the persited state file ({}) to proceed.", error, path.to_str().unwrap_or("")).into(), + ); } }; From 4f7086624d1807c82d0441762b3fbc042baf79b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 16:55:38 +0200 Subject: [PATCH 23/40] Document removal for cuckoo --- src/enrichment_tables/memory/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 931c5c3b34b41..3819d40f25eab 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -119,7 +119,7 @@ pub struct MemorySourceConfig { pub enum TableFilter { /// Cuckoo filter /// - /// Supports removal too, as well as TTL and LRU + /// Supports removal by accepting null values for keys, as well as TTL and LRU. Cuckoo(CuckooMemoryConfig), } From 47077f8eb5ee5754bbb1d979831c55eeaa39cfb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 17:08:08 +0200 Subject: [PATCH 24/40] Add note about flush interval in cuckoo --- src/enrichment_tables/memory/config.rs | 3 +++ website/cue/reference/generated/configuration.cue | 7 +++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 3819d40f25eab..56574a7d7a9a9 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -45,6 +45,9 @@ pub struct MemoryConfig { /// Since every TTL scan makes its changes visible, only use this value /// if it is shorter than the `scan_interval`. /// + /// NOTE: For cuckoo filter, all writes are visible immediately. Flush interval still defines + /// when metrics for cuckoo filter are made visible. + /// /// By default, all writes are made visible immediately. #[serde(skip_serializing_if = "vector_lib::serde::is_default")] pub flush_interval: Option, diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index 4c179886e5a83..a817948697994 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -295,12 +295,12 @@ generated: configuration: { type: string: enum: cuckoo: """ Cuckoo filter - Supports removal too, as well as TTL and LRU + Supports removal by accepting null values for keys, as well as TTL and LRU. """ description: """ Cuckoo filter - Supports removal too, as well as TTL and LRU + Supports removal by accepting null values for keys, as well as TTL and LRU. """ required: true } @@ -321,6 +321,9 @@ generated: configuration: { Since every TTL scan makes its changes visible, only use this value if it is shorter than the `scan_interval`. + NOTE: For cuckoo filter, all writes are visible immediately. Flush interval still defines + when metrics for cuckoo filter are made visible. + By default, all writes are made visible immediately. """ required: false From df7ff0cb3987d30449475fea0205b5900b5d2ba1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 17:12:00 +0200 Subject: [PATCH 25/40] Only start with a fresh filter on NotFound error --- src/enrichment_tables/memory/cuckoo_table.rs | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index bcc86e71036a5..3c053f3d62cd7 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -165,12 +165,21 @@ impl CuckooMemoryTable { let filter = 'import: { if let Some(path) = &cuckoo_config.persistence_path { - let Ok(file) = File::open(path) else { - warn!( - "Couldn't open \"{}\" for cuckoo filter state import.", - path.to_str().unwrap_or("") - ); - break 'import CuckooFilter::new_random_exportable(built_config); + let file = match File::open(path) { + Ok(file) => file, + Err(err) => match err.kind() { + std::io::ErrorKind::NotFound => { + break 'import CuckooFilter::new_random_exportable(built_config); + } + _ => { + return Err(format!( + "Couldn't open \"{}\" for cuckoo filter state import. {}", + path.to_str().unwrap_or(""), + err + ) + .into()); + } + }, }; let mut reader = BufReader::new(file); let filter = match CuckooFilter::import_random_exportable(&mut reader) { From a8253826b21b557a0522a9be9499b520ca4823f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 17:12:29 +0200 Subject: [PATCH 26/40] Run `make build-licenses` --- LICENSE-3rdparty.csv | 1 + 1 file changed, 1 insertion(+) diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index c04a948b51fd7..996e02c3c6067 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -204,6 +204,7 @@ crypto_secretbox,https://github.com/RustCrypto/nacl-compat/tree/master/crypto_se csv,https://github.com/BurntSushi/rust-csv,Unlicense OR MIT,Andrew Gallant csv-core,https://github.com/BurntSushi/rust-csv,Unlicense OR MIT,Andrew Gallant ctr,https://github.com/RustCrypto/block-modes,MIT OR Apache-2.0,RustCrypto Developers +cuckoo-clock,https://github.com/Quad9DNS/cuckoo-clock,MIT,"John Todd , Ensar Sarajčić " curl-sys,https://github.com/alexcrichton/curl-rust,MIT,Alex Crichton curve25519-dalek,https://github.com/dalek-cryptography/curve25519-dalek/tree/main/curve25519-dalek,BSD-3-Clause,"Isis Lovecruft , Henry de Valence " curve25519-dalek-derive,https://github.com/dalek-cryptography/curve25519-dalek,MIT OR Apache-2.0,The curve25519-dalek-derive Authors From 42a758212a19b6c439678730ec6199bc49ff1423 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 17:14:19 +0200 Subject: [PATCH 27/40] Export only when temp file export works --- src/enrichment_tables/memory/cuckoo_table.rs | 42 ++++++++------------ 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 3c053f3d62cd7..77c7f27dc8da7 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -213,33 +213,25 @@ impl CuckooMemoryTable { fn export(&self) { if let Some(path) = &self.cuckoo_config.persistence_path { let mut parent = path.clone(); - if parent.pop() - && let Ok(temp) = NamedTempFile::new_in(parent) - { - { - let mut writer = BufWriter::new(temp.as_file()); - if self.export_to(&mut writer).is_err() { - return; + if parent.pop() { + match NamedTempFile::new_in(parent) { + Ok(temp) => { + { + let mut writer = BufWriter::new(temp.as_file()); + if self.export_to(&mut writer).is_err() { + return; + } + } + if let Err(error) = temp.persist(path) { + warn!("Cuckoo filter export failed: {}", error); + } } + Err(err) => warn!( + "Couldn't open temporary file for export. Aborting export. Error: {}", + err + ), } - if let Err(error) = temp.persist(path) { - warn!("Cuckoo filter export failed: {}", error); - } - } else { - warn!( - "Couldn't open temporary file for export. Trying to write directly to \"{}\"", - path.to_str().unwrap_or("") - ); - let Ok(file) = File::create(path) else { - warn!( - "Couldn't open \"{}\" for cuckoo filter state export.", - path.to_str().unwrap_or("") - ); - return; - }; - let mut writer = BufWriter::new(file); - let _ = self.export_to(&mut writer); - }; + } } } From 51702fd7579989e3bb2fb3602676f60551a3eb73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 16 Jun 2026 17:46:39 +0200 Subject: [PATCH 28/40] Validate config before importing persisted state --- Cargo.lock | 4 +-- Cargo.toml | 2 +- src/enrichment_tables/memory/cuckoo_table.rs | 26 +++++++++++++------- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b62e4afebb707..780ab1826c161 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3093,9 +3093,9 @@ dependencies = [ [[package]] name = "cuckoo-clock" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55e69956d137a478913ed8c017a31f483258e3a238282c40d89b014478e978b3" +checksum = "747de28146841c571d1ae290df8a28ddcd6630247d9dc75d89113a17e515011c" dependencies = [ "rand 0.9.4", ] diff --git a/Cargo.toml b/Cargo.toml index 56b8aa52b0065..df60db3794055 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,7 +152,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } -cuckoo-clock = { version = "0.1.0" , default-features = false } +cuckoo-clock = { version = "0.1.1" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 77c7f27dc8da7..df816d7c33162 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -182,22 +182,30 @@ impl CuckooMemoryTable { }, }; let mut reader = BufReader::new(file); - let filter = match CuckooFilter::import_random_exportable(&mut reader) { - Ok(filter) => filter, - Err(error) => { - return Err( - format!("Cuckoo filter state import failed: {}. Delete the persited state file ({}) to proceed.", error, path.to_str().unwrap_or("")).into(), + let (hasher, persisted_config) = + match CuckooFilter::::import_config(&mut reader) { + Ok(imported) => imported, + Err(error) => { + return Err( + format!("Cuckoo filter state import failed: {}. Delete the persisted state file ({}) to proceed.", error, path.to_str().unwrap_or("")).into(), ); - } - }; + } + }; - if filter.get_configuration() != built_config { + if persisted_config != built_config { return Err( format!("Stored cuckoo filter configuration doesn't match with new configuration. If this is intended, remove the persisted state file ({}).", path.to_str().unwrap_or("")).into(), ); } - filter + match CuckooFilter::import_state(hasher, persisted_config, &mut reader) { + Ok(filter) => filter, + Err(error) => { + return Err( + format!("Cuckoo filter state import failed: {}. Delete the persisted state file ({}) to proceed.", error, path.to_str().unwrap_or("")).into(), + ); + } + } } else { CuckooFilter::new_random_exportable(built_config) } From f9fedfc53651604ea4f48697380908a48114bed5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 17 Jun 2026 16:48:33 +0200 Subject: [PATCH 29/40] Add a way to parallelize scan and update for cuckoo table --- Cargo.lock | 4 +- Cargo.toml | 2 +- src/enrichment_tables/memory/cuckoo_table.rs | 45 +++++++++++++++---- .../cue/reference/generated/configuration.cue | 19 ++++++++ 4 files changed, 58 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b442d02b6ef36..1f747b311a53a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3132,9 +3132,9 @@ dependencies = [ [[package]] name = "cuckoo-clock" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "747de28146841c571d1ae290df8a28ddcd6630247d9dc75d89113a17e515011c" +checksum = "b590a6b01bf6bb079a8e64ec6e7697421085aed63392db55f938cadfe38f5935" dependencies = [ "rand 0.9.4", ] diff --git a/Cargo.toml b/Cargo.toml index 53fd5b6cee6be..1ea3b7311c0c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,7 +152,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } -cuckoo-clock = { version = "0.1.1" , default-features = false } +cuckoo-clock = { version = "0.1.2" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index df816d7c33162..8c65ef241a5a3 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -18,7 +18,10 @@ use futures::{ stream::{self, BoxStream}, }; use tempfile::NamedTempFile; -use tokio::time::{Instant, interval, interval_at}; +use tokio::{ + task::JoinSet, + time::{Instant, interval, interval_at}, +}; use tokio_stream::wrappers::IntervalStream; use vector_config::configurable_component; use vector_lib::{ @@ -97,6 +100,17 @@ pub struct CuckooMemoryConfig { /// By default, export is only done on exit. #[serde(skip_serializing_if = "vector_lib::serde::is_default")] pub export_interval: Option, + /// Number of threads to use for scanning and updating LRU/TTL. + /// + /// By default, scanning is single threaded. + #[serde(default)] + pub scanning_threads: Option, + /// If set to true scanning will not block insertions. + /// This may affect behavior since blocking scans would free up space before insertions. + /// + /// By default, scanning is blocking. + #[serde(default = "crate::serde::default_false")] + pub concurrent_scanning: bool, } const fn default_cuckoo_fingerprint_bits() -> NonZeroUsize { @@ -483,14 +497,25 @@ impl StreamSink for CuckooMemoryTable { } Some(_) = scan_interval.next() => { - let expired = self.filter.scan_and_update_full(); - emit!(MemoryEnrichmentTableTtlExpiredCount { - count: expired as u64 - }); - emit!(MemoryEnrichmentTableFlushed { - new_objects_count: self.filter.get_item_count(), - new_byte_size: self.filter.get_memory_usage() - }); + let mut handles = JoinSet::new(); + let filter = self.filter.clone(); + let count = self.cuckoo_config.scanning_threads.unwrap_or(NonZeroUsize::new(1).unwrap()); + for i in 0..count.get() { + let filter = filter.clone(); + handles.spawn(async move { + let expired = filter.scan_and_update_full_partition(count, i); + emit!(MemoryEnrichmentTableTtlExpiredCount { + count: expired as u64 + }); + emit!(MemoryEnrichmentTableFlushed { + new_objects_count: filter.get_item_count(), + new_byte_size: filter.get_memory_usage() + }); + }); + } + if !self.cuckoo_config.concurrent_scanning { + let _ = handles.join_all().await; + } } } } @@ -520,6 +545,8 @@ mod tests { counter_field: OptionalValuePath::none(), persistence_path: None, export_interval: None, + scanning_threads: None, + concurrent_scanning: false, }; modfn(&mut config); config diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index 83db287fa1683..d11bcf1b5f169 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -223,6 +223,16 @@ generated: configuration: { description: "Number of slots in each bucket" required: false } + concurrent_scanning: { + type: bool: default: false + description: """ + If set to true scanning will not block insertions. + This may affect behavior since blocking scans would free up space before insertions. + + By default, scanning is blocking. + """ + required: false + } counter_bits: { type: uint: default: 8 description: "Number of bits to use to track counter. This will limit the max value." @@ -278,6 +288,15 @@ generated: configuration: { """ required: false } + scanning_threads: { + type: uint: {} + description: """ + Number of threads to use for scanning and updating LRU/TTL. + + By default, scanning is single threaded. + """ + required: false + } ttl_bits: { type: uint: default: 8 description: """ From a3ab4a75d4ef61454a5ea214d2ccfc44217830e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 17 Jun 2026 17:30:37 +0200 Subject: [PATCH 30/40] Bump cuckoo-clock to fix partitioned scanning --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1f747b311a53a..92fef7200538d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3132,9 +3132,9 @@ dependencies = [ [[package]] name = "cuckoo-clock" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b590a6b01bf6bb079a8e64ec6e7697421085aed63392db55f938cadfe38f5935" +checksum = "3955ec500ca96631dfc03dfaab4c4fbc57332ff7d8953fe4397392cb48f9c77c" dependencies = [ "rand 0.9.4", ] diff --git a/Cargo.toml b/Cargo.toml index 1ea3b7311c0c4..11120ef568618 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,7 +152,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } -cuckoo-clock = { version = "0.1.2" , default-features = false } +cuckoo-clock = { version = "0.1.3" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } From d15eca925874a36e1b0f4a223c15ac03b7ce7051 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 17 Jun 2026 17:36:00 +0200 Subject: [PATCH 31/40] Prevent dropping scanning tasks when using concurrent_scanning --- src/enrichment_tables/memory/cuckoo_table.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 8c65ef241a5a3..e333d414b8622 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -502,7 +502,7 @@ impl StreamSink for CuckooMemoryTable { let count = self.cuckoo_config.scanning_threads.unwrap_or(NonZeroUsize::new(1).unwrap()); for i in 0..count.get() { let filter = filter.clone(); - handles.spawn(async move { + let task = async move { let expired = filter.scan_and_update_full_partition(count, i); emit!(MemoryEnrichmentTableTtlExpiredCount { count: expired as u64 @@ -511,7 +511,12 @@ impl StreamSink for CuckooMemoryTable { new_objects_count: filter.get_item_count(), new_byte_size: filter.get_memory_usage() }); - }); + }; + if !self.cuckoo_config.concurrent_scanning { + handles.spawn(task); + } else { + tokio::spawn(task); + } } if !self.cuckoo_config.concurrent_scanning { let _ = handles.join_all().await; From 9f6c1e764894052a2d8bc4914f695789fa19588f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 17 Jun 2026 18:10:30 +0200 Subject: [PATCH 32/40] Bump cuckoo-clock to fix partition indices --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d47588c417e3a..740a973099f4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3171,9 +3171,9 @@ dependencies = [ [[package]] name = "cuckoo-clock" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3955ec500ca96631dfc03dfaab4c4fbc57332ff7d8953fe4397392cb48f9c77c" +checksum = "2ec0d006c9c09cbae8d3b5bcd617229d1e104a5177e5ebed760f21f9f804030d" dependencies = [ "rand 0.9.4", ] diff --git a/Cargo.toml b/Cargo.toml index 595443a7489c2..62e308f646490 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -156,7 +156,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } -cuckoo-clock = { version = "0.1.3" , default-features = false } +cuckoo-clock = { version = "0.1.4" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } From 110c3200e643e2a9ec0621dbc5830dd7cdbb15c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 17 Jun 2026 18:12:04 +0200 Subject: [PATCH 33/40] Remove nested serde typetag --- src/enrichment_tables/memory/cuckoo_table.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index e333d414b8622..e1d68c63da3ff 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -56,7 +56,6 @@ pub(super) struct CuckooMemoryTable { /// Configuration of cuckoo filter for memory table. #[configurable_component] #[derive(Clone, Debug, PartialEq, Eq)] -#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")] pub struct CuckooMemoryConfig { /// Number of bits used for fingerprint. #[serde(default = "default_cuckoo_fingerprint_bits")] From 120e3343b8169d8a6c22b13f3558edac12693c8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 17 Jun 2026 18:13:34 +0200 Subject: [PATCH 34/40] Track failed insertions --- src/enrichment_tables/memory/cuckoo_table.rs | 30 +++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index e1d68c63da3ff..cd998bf5e4edf 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -39,9 +39,9 @@ use vrl::value::{KeyString, ObjectMap, Value}; use crate::enrichment_tables::memory::{ MemoryConfig, internal_events::{ - MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead, - MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableRemoved, - MemoryEnrichmentTableTtlExpiredCount, + MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInsertFailed, + MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead, MemoryEnrichmentTableReadFailed, + MemoryEnrichmentTableRemoved, MemoryEnrichmentTableTtlExpiredCount, }, }; @@ -285,7 +285,7 @@ impl CuckooMemoryTable { continue; }; - if self.cuckoo_config.ttl_enabled || self.cuckoo_config.counter_enabled { + let res = if self.cuckoo_config.ttl_enabled || self.cuckoo_config.counter_enabled { let ttl = self .config .ttl_field @@ -317,21 +317,29 @@ impl CuckooMemoryTable { .and_then(|p| value.get(p)) .and_then(|v| v.as_integer()) .and_then(|v| i32::try_from(v).ok()); - let _ = self.filter.insert_if_not_present_with_update( + self.filter.insert_if_not_present_with_update( k, InsertValues { ttl, counter }, LookupValues { ttl, counter_diff: counter, }, - ); + ) } else { - let _ = self.filter.insert_if_not_present(k); + self.filter.insert_if_not_present(k) + }; + + if res.is_some_and(|r| r.matches_key(k, &self.filter)) { + emit!(MemoryEnrichmentTableInsertFailed { + key: k, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + } else { + emit!(MemoryEnrichmentTableInserted { + key: k, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); } - emit!(MemoryEnrichmentTableInserted { - key: k, - include_key_metric_tag: self.config.internal_metrics.include_key_tag - }); } } } From a4dfda81daa38139246413226757c2bfbc9a2580 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 18 Jun 2026 14:05:05 +0200 Subject: [PATCH 35/40] Add deny_unknown_fields for cuckoo_table --- src/enrichment_tables/memory/cuckoo_table.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index cd998bf5e4edf..aa1a6629bbfd2 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -56,6 +56,7 @@ pub(super) struct CuckooMemoryTable { /// Configuration of cuckoo filter for memory table. #[configurable_component] #[derive(Clone, Debug, PartialEq, Eq)] +#[serde(deny_unknown_fields)] pub struct CuckooMemoryConfig { /// Number of bits used for fingerprint. #[serde(default = "default_cuckoo_fingerprint_bits")] From 90df75f2359945e4cb803ab535aeda625e9f910a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 18 Jun 2026 14:19:05 +0200 Subject: [PATCH 36/40] Track number of scans in progress to prevent piling up of scan tasks --- src/enrichment_tables/memory/cuckoo_table.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index aa1a6629bbfd2..4c3927ac4de27 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -4,6 +4,10 @@ use std::{ num::NonZeroUsize, path::PathBuf, pin::Pin, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, time::Duration, }; @@ -469,6 +473,8 @@ impl StreamSink for CuckooMemoryTable { }) .unwrap_or(Box::pin(stream::empty())); + let scans_in_progress = Arc::new(AtomicUsize::new(0)); + loop { tokio::select! { event = input.next() => { @@ -505,11 +511,17 @@ impl StreamSink for CuckooMemoryTable { } Some(_) = scan_interval.next() => { + if scans_in_progress.load(Ordering::Acquire) > 0 { + warn!("Previous scan still in progress for cuckoo enrichment table. New scan will be skipped until previous one is complete. Consider increasing scan interval."); + continue; + } let mut handles = JoinSet::new(); let filter = self.filter.clone(); let count = self.cuckoo_config.scanning_threads.unwrap_or(NonZeroUsize::new(1).unwrap()); + scans_in_progress.fetch_add(count.get(), Ordering::AcqRel); for i in 0..count.get() { let filter = filter.clone(); + let scans_in_progress = Arc::clone(&scans_in_progress); let task = async move { let expired = filter.scan_and_update_full_partition(count, i); emit!(MemoryEnrichmentTableTtlExpiredCount { @@ -519,6 +531,7 @@ impl StreamSink for CuckooMemoryTable { new_objects_count: filter.get_item_count(), new_byte_size: filter.get_memory_usage() }); + scans_in_progress.fetch_sub(1, Ordering::AcqRel); }; if !self.cuckoo_config.concurrent_scanning { handles.spawn(task); From 8daa72c3f9ee7938591e25f250a70519fb04dd16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 18 Jun 2026 14:21:36 +0200 Subject: [PATCH 37/40] Prevent 0 export interval for cuckoo_table --- src/enrichment_tables/memory/cuckoo_table.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 4c3927ac4de27..bb20e9163fd92 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -1,7 +1,7 @@ use std::{ fs::File, io::{BufReader, BufWriter, Write}, - num::NonZeroUsize, + num::{NonZeroU64, NonZeroUsize}, path::PathBuf, pin::Pin, sync::{ @@ -103,7 +103,7 @@ pub struct CuckooMemoryConfig { /// /// By default, export is only done on exit. #[serde(skip_serializing_if = "vector_lib::serde::is_default")] - pub export_interval: Option, + pub export_interval: Option, /// Number of threads to use for scanning and updating LRU/TTL. /// /// By default, scanning is single threaded. @@ -467,6 +467,7 @@ impl StreamSink for CuckooMemoryTable { let mut export_interval: Pin + Send>> = self .cuckoo_config .export_interval + .map(NonZeroU64::get) .map(Duration::from_secs) .map:: + Send>>, _>(|d| { Box::pin(IntervalStream::new(interval(d))) From ff66dad61f6fbaddee2cc4b87676fae164e4b440 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 18 Jun 2026 14:32:13 +0200 Subject: [PATCH 38/40] Reject cuckoo configurations that produce filters higher than defined `max_byte_size` --- src/enrichment_tables/memory/cuckoo_table.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index bb20e9163fd92..646aa3084de85 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -229,6 +229,13 @@ impl CuckooMemoryTable { } }; + let filter_size = filter.get_memory_usage(); + if let Some(max_byte_size) = config.max_byte_size + && filter.get_memory_usage() as u64 > max_byte_size + { + return Err(format!("Configured cuckoo filter is larger ({}) than defined `max_byte_size` ({}). Reduce the size of cuckoo filter or increase or remove `max_byte_size`.", filter_size, max_byte_size).into()); + } + Ok(Self { config, filter, From 463ac9ab96285fa72a3974c7372895fb6849ca36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 18 Jun 2026 15:35:16 +0200 Subject: [PATCH 39/40] Calculate cuckoo filter size before building it to prevent needless allocation --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- src/enrichment_tables/memory/cuckoo_table.rs | 14 +++++++------- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7235912b85fb1..23966d53a49b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3171,9 +3171,9 @@ dependencies = [ [[package]] name = "cuckoo-clock" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ec0d006c9c09cbae8d3b5bcd617229d1e104a5177e5ebed760f21f9f804030d" +checksum = "3420f89fadc21611faa8f16dda185fcb2c35818963000466756f0698df1b5a3a" dependencies = [ "rand 0.9.4", ] diff --git a/Cargo.toml b/Cargo.toml index ad93d4bfe8a63..2e7dbe1f4fe68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,7 +157,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } -cuckoo-clock = { version = "0.1.4" , default-features = false } +cuckoo-clock = { version = "0.1.5" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index 646aa3084de85..a7a6175d16bcf 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -181,6 +181,13 @@ impl CuckooMemoryTable { let built_config = builder.build()?; + let filter_size = built_config.get_configured_memory_usage(); + if let Some(max_byte_size) = config.max_byte_size + && filter_size as u64 > max_byte_size + { + return Err(format!("Configured cuckoo filter is larger ({}) than defined `max_byte_size` ({}). Reduce the size of cuckoo filter or increase or remove `max_byte_size`.", filter_size, max_byte_size).into()); + } + let filter = 'import: { if let Some(path) = &cuckoo_config.persistence_path { let file = match File::open(path) { @@ -229,13 +236,6 @@ impl CuckooMemoryTable { } }; - let filter_size = filter.get_memory_usage(); - if let Some(max_byte_size) = config.max_byte_size - && filter.get_memory_usage() as u64 > max_byte_size - { - return Err(format!("Configured cuckoo filter is larger ({}) than defined `max_byte_size` ({}). Reduce the size of cuckoo filter or increase or remove `max_byte_size`.", filter_size, max_byte_size).into()); - } - Ok(Self { config, filter, From 0ac29fb0641bde9648e19cb039b0d3709b518468 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 18 Jun 2026 16:10:46 +0200 Subject: [PATCH 40/40] Use `in_current_span` for scanning tasks --- src/enrichment_tables/memory/cuckoo_table.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs index a7a6175d16bcf..e1d2daba4d11e 100644 --- a/src/enrichment_tables/memory/cuckoo_table.rs +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -27,6 +27,7 @@ use tokio::{ time::{Instant, interval, interval_at}, }; use tokio_stream::wrappers::IntervalStream; +use tracing::Instrument; use vector_config::configurable_component; use vector_lib::{ EstimatedJsonEncodedSizeOf, @@ -540,7 +541,7 @@ impl StreamSink for CuckooMemoryTable { new_byte_size: filter.get_memory_usage() }); scans_in_progress.fetch_sub(1, Ordering::AcqRel); - }; + }.in_current_span(); if !self.cuckoo_config.concurrent_scanning { handles.spawn(task); } else {