Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
c4e16f8
feat(enrichment tables): add cuckoo filter to memory table
esensar Dec 10, 2025
8993198
Add changelog entry
esensar Apr 8, 2026
2d295b5
Prevent ttl_val from being zero
esensar Apr 8, 2026
4fe95ea
Remove commented out test code
esensar Apr 8, 2026
c33d602
Set lower limit on ttl override as well
esensar Apr 9, 2026
90ff070
Merge branch 'master' into feature/memory-enrichment-table-cuckoo-filter
esensar May 26, 2026
610b97f
Deny unknown fields in enrichment table memory config
esensar May 26, 2026
1f43d0b
Fix CounterName issues
esensar May 26, 2026
4f6a752
Apply default tll if ttl_field is not defined
esensar May 27, 2026
92bbe47
Cargo.lock update
esensar May 27, 2026
9929710
Prevent intervals from firing off at boot
esensar May 27, 2026
be6e123
Return null value to make value field present
esensar May 27, 2026
8df4f87
Properly calculate default ttl when ttl_field is not present
esensar May 27, 2026
a0b64fa
Use empty streams instead of Duration::MAX for uncofigured intervals
esensar May 27, 2026
3ebe905
Prevent using source functionality when filter is used
esensar Jun 10, 2026
3189194
Discard configuration if cuckoo filter can't be restored
esensar Jun 10, 2026
171ede7
Merge branch 'master' into feature/memory-enrichment-table-cuckoo-filter
esensar Jun 10, 2026
9a18e49
Validate ttl_bits before building the cuckoo filter
esensar Jun 10, 2026
ffe915f
Use div_ceil when calculating TTL ticks
esensar Jun 16, 2026
beb9e0a
Warn when provided ttl is larger than defined ttl_bits
esensar Jun 16, 2026
170f5ea
Update table size stats on scan too
esensar Jun 16, 2026
a938de7
Validate counter_bits on insert
esensar Jun 16, 2026
5be7277
Revert "Validate counter_bits on insert"
esensar Jun 16, 2026
e0f47a5
Reject configuration on failed import too
esensar Jun 16, 2026
4f70866
Document removal for cuckoo
esensar Jun 16, 2026
47077f8
Add note about flush interval in cuckoo
esensar Jun 16, 2026
df7ff0c
Only start with a fresh filter on NotFound error
esensar Jun 16, 2026
a825382
Run `make build-licenses`
esensar Jun 16, 2026
42a7582
Export only when temp file export works
esensar Jun 16, 2026
51702fd
Validate config before importing persisted state
esensar Jun 16, 2026
9b4a2d2
Merge branch 'master' into feature/memory-enrichment-table-cuckoo-filter
esensar Jun 16, 2026
f9fedfc
Add a way to parallelize scan and update for cuckoo table
esensar Jun 17, 2026
a3ab4a7
Bump cuckoo-clock to fix partitioned scanning
esensar Jun 17, 2026
d15eca9
Prevent dropping scanning tasks when using concurrent_scanning
esensar Jun 17, 2026
5ae9694
Merge branch 'master' into feature/memory-enrichment-table-cuckoo-filter
esensar Jun 17, 2026
9f6c1e7
Bump cuckoo-clock to fix partition indices
esensar Jun 17, 2026
110c320
Remove nested serde typetag
esensar Jun 17, 2026
120e334
Track failed insertions
esensar Jun 17, 2026
a4dfda8
Add deny_unknown_fields for cuckoo_table
esensar Jun 18, 2026
90df75f
Track number of scans in progress to prevent piling up of scan tasks
esensar Jun 18, 2026
8daa72c
Prevent 0 export interval for cuckoo_table
esensar Jun 18, 2026
ff66dad
Reject cuckoo configurations that produce filters higher than defined…
esensar Jun 18, 2026
79d6cb8
Merge branch 'master' into feature/memory-enrichment-table-cuckoo-filter
esensar Jun 18, 2026
463ac9a
Calculate cuckoo filter size before building it to prevent needless a…
esensar Jun 18, 2026
0ac29fb
Use `in_current_span` for scanning tasks
esensar Jun 18, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ const-str = { version = "1.1.0", default-features = false }
convert_case = { version = "0.8", default-features = false }
criterion = "0.8"
crossbeam-utils = { version = "0.8.21", default-features = false }
cuckoo-clock = { version = "0.1.5" , default-features = false }
darling = { version = "0.20.11", default-features = false, features = ["suggestions"] }
dashmap = { version = "6.1.0", default-features = false }
derivative = { version = "2.2.0", default-features = false }
Expand Down Expand Up @@ -383,6 +384,7 @@ chrono.workspace = true
chrono-tz.workspace = true
colored.workspace = true
csv = { version = "1.3", default-features = false }
cuckoo-clock = { workspace = true, optional = true }
databend-client = { version = "0.28.0", default-features = false, features = ["rustls"], optional = true }
derivative.workspace = true
dirs-next = { version = "2.0.0", default-features = false, optional = true }
Expand Down Expand Up @@ -448,6 +450,7 @@ sqlx = { version = "0.8.6", default-features = false, features = ["derive", "pos
stream-cancel = { version = "0.8.2", default-features = false }
strip-ansi-escapes = { version = "0.2.1", default-features = false }
syslog = { version = "6.1.1", default-features = false, optional = true }
tempfile = { workspace = true, optional = true }
tokio-postgres = { version = "0.7.18", default-features = false, features = ["runtime", "with-chrono-0_4"], optional = true }
tokio-tungstenite = { workspace = true, features = ["connect"], optional = true }
toml.workspace = true
Expand Down Expand Up @@ -659,7 +662,7 @@ gcp = ["dep:base64", "dep:goauth", "dep:smpl_jwt"]
enrichment-tables = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"]
enrichment-tables-geoip = ["dep:maxminddb"]
enrichment-tables-mmdb = ["dep:maxminddb"]
enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"]
enrichment-tables-memory = ["dep:cuckoo-clock", "dep:evmap", "dep:evmap-derive", "dep:thread_local", "dep:tempfile"]

# Codecs
codecs-arrow = ["dep:arrow", "dep:arrow-schema", "vector-lib/arrow"]
Expand Down
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ csv,https://github.com/BurntSushi/rust-csv,Unlicense OR MIT,Andrew Gallant <jams
csv-core,https://github.com/BurntSushi/rust-csv,Unlicense OR MIT,Andrew Gallant <jamslam@gmail.com>
ctr,https://github.com/RustCrypto/block-modes,MIT OR Apache-2.0,RustCrypto Developers
ctutils,https://github.com/RustCrypto/utils,Apache-2.0 OR MIT,RustCrypto Developers
cuckoo-clock,https://github.com/Quad9DNS/cuckoo-clock,MIT,"John Todd <jtodd@quad9.net>, Ensar Sarajčić <dev@ensarsarajcic.com>"
curl-sys,https://github.com/alexcrichton/curl-rust,MIT,Alex Crichton <alex@alexcrichton.com>
curve25519-dalek,https://github.com/dalek-cryptography/curve25519-dalek/tree/main/curve25519-dalek,BSD-3-Clause,"Isis Lovecruft <isis@patternsinthevoid.net>, Henry de Valence <hdevalence@hdevalence.ca>"
curve25519-dalek-derive,https://github.com/dalek-cryptography/curve25519-dalek,MIT OR Apache-2.0,The curve25519-dalek-derive Authors
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added cuckoo filter support for `memory` enrichment table, to provide an efficient way to store and check presence of keys with a low memory footprint at the cost of false positives.

authors: esensar Quad9DNS
2 changes: 2 additions & 0 deletions lib/vector-common/src/internal_event/metric_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub enum CounterName {
MemoryEnrichmentTableFlushesTotal,
MemoryEnrichmentTableInsertionsTotal,
MemoryEnrichmentTableReadsTotal,
MemoryEnrichmentTableRemovedTotal,
MemoryEnrichmentTableTtlExpirations,
ComponentCpuUsageNsTotal,
}
Expand Down Expand Up @@ -364,6 +365,7 @@ impl CounterName {
"memory_enrichment_table_insertions_total"
}
Self::MemoryEnrichmentTableReadsTotal => "memory_enrichment_table_reads_total",
Self::MemoryEnrichmentTableRemovedTotal => "memory_enrichment_table_removed_total",
Self::MemoryEnrichmentTableTtlExpirations => "memory_enrichment_table_ttl_expirations",
Self::ComponentCpuUsageNsTotal => "component_cpu_usage_ns_total",
}
Expand Down
64 changes: 62 additions & 2 deletions src/enrichment_tables/memory/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ use crate::{
config::{
EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
},
enrichment_tables::memory::cuckoo_table::{CuckooMemoryConfig, CuckooMemoryTable},
sinks::Healthcheck,
sources::Source,
};

/// Configuration for the `memory` enrichment table.
#[configurable_component(enrichment_table("memory"))]
#[derive(Clone)]
#[serde(deny_unknown_fields)]
pub struct MemoryConfig {
/// TTL (time-to-live in seconds) is used to limit the lifetime of data stored in the cache.
/// When TTL expires, data behind a specific key in the cache is removed.
Expand All @@ -43,6 +45,9 @@ pub struct MemoryConfig {
/// Since every TTL scan makes its changes visible, only use this value
/// if it is shorter than the `scan_interval`.
///
/// NOTE: For cuckoo filter, all writes are visible immediately. Flush interval still defines
/// when metrics for cuckoo filter are made visible.
///
/// By default, all writes are made visible immediately.
#[serde(skip_serializing_if = "vector_lib::serde::is_default")]
pub flush_interval: Option<u64>,
Expand All @@ -69,8 +74,16 @@ pub struct MemoryConfig {
#[serde(default)]
pub ttl_field: OptionalValuePath,

/// Set to make the table act as a probabilistic filter instead of storing original values. This
/// will prevent reading values from the table - found keys will have empty value.
#[configurable(derived)]
#[serde(default)]
pub filter: Option<TableFilter>,

#[serde(skip)]
memory: Arc<Mutex<Option<Box<Memory>>>>,
#[serde(skip)]
cuckoo: Arc<Mutex<Option<Box<CuckooMemoryTable>>>>,
}

/// Configuration for memory enrichment table source functionality.
Expand Down Expand Up @@ -102,6 +115,17 @@ pub struct MemorySourceConfig {
pub source_key: String,
}

/// Configuration for memory enrichment table filter functionality.
#[configurable_component]
#[derive(Clone, Debug, PartialEq, Eq)]
#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")]
pub enum TableFilter {
/// Cuckoo filter
///
/// Supports removal by accepting null values for keys, as well as TTL and LRU.
Cuckoo(CuckooMemoryConfig),
}

impl PartialEq for MemoryConfig {
fn eq(&self, other: &Self) -> bool {
self.ttl == other.ttl
Expand All @@ -118,11 +142,13 @@ impl Default for MemoryConfig {
scan_interval: default_scan_interval(),
flush_interval: None,
memory: Arc::new(Mutex::new(None)),
cuckoo: Arc::new(Mutex::new(None)),
max_byte_size: None,
log_namespace: None,
source_config: None,
internal_metrics: InternalMetricsConfig::default(),
ttl_field: OptionalValuePath::none(),
filter: None,
}
}
}
Expand All @@ -142,14 +168,39 @@ impl MemoryConfig {
.get_or_insert_with(|| Box::new(Memory::new(self.clone())))
.clone()
}

pub(super) async fn get_or_build_cuckoo(&self) -> crate::Result<CuckooMemoryTable> {
let mut boxed_cuckoo = self.cuckoo.lock().await;
let Some(TableFilter::Cuckoo(cuckoo)) = &self.filter else {
panic!("No cuckoo");
};
if let Some(boxed_cuckoo) = boxed_cuckoo.as_ref() {
Ok(*boxed_cuckoo.clone())
} else {
Ok(*boxed_cuckoo
.insert(Box::new(CuckooMemoryTable::new(
self.clone(),
cuckoo.clone(),
)?))
.clone())
}
}
}

impl EnrichmentTableConfig for MemoryConfig {
async fn build(
&self,
_globals: &crate::config::GlobalOptions,
) -> crate::Result<Box<dyn Table + Send + Sync>> {
Ok(Box::new(self.get_or_build_memory().await))
match &self.filter {
Some(TableFilter::Cuckoo(_)) => {
Comment thread
esensar marked this conversation as resolved.
if self.source_config.is_some() {
return Err("Source functionality is not supported for cuckoo filter".into());
}
Ok(Box::new(self.get_or_build_cuckoo().await?))
}
None => Ok(Box::new(self.get_or_build_memory().await)),
Comment thread
esensar marked this conversation as resolved.
}
}

fn sink_config(
Expand All @@ -166,6 +217,10 @@ impl EnrichmentTableConfig for MemoryConfig {
let Some(source_config) = &self.source_config else {
return None;
};
// Filters can't be used as a source
if self.filter.is_some() {
return None;
}
Some((
source_config.source_key.clone().into(),
Box::new(self.clone()),
Expand All @@ -177,7 +232,12 @@ impl EnrichmentTableConfig for MemoryConfig {
#[typetag::serde(name = "memory_enrichment_table")]
impl SinkConfig for MemoryConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let sink = VectorSink::from_event_streamsink(self.get_or_build_memory().await);
let sink = match &self.filter {
Some(TableFilter::Cuckoo(_)) => {
VectorSink::from_event_streamsink(self.get_or_build_cuckoo().await?)
}
None => VectorSink::from_event_streamsink(self.get_or_build_memory().await),
Comment thread
esensar marked this conversation as resolved.
};

Ok((sink, future::ok(()).boxed()))
}
Expand Down
Loading
Loading