diff --git a/Cargo.lock b/Cargo.lock index f4041c9a1f47e..23966d53a49b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3169,6 +3169,15 @@ dependencies = [ "cmov", ] +[[package]] +name = "cuckoo-clock" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3420f89fadc21611faa8f16dda185fcb2c35818963000466756f0698df1b5a3a" +dependencies = [ + "rand 0.9.4", +] + [[package]] name = "curl-sys" version = "0.4.84+curl-8.17.0" @@ -12883,6 +12892,7 @@ dependencies = [ "console-subscriber", "criterion", "csv", + "cuckoo-clock", "databend-client", "databricks-zerobus-ingest-sdk", "deadpool 0.13.0", diff --git a/Cargo.toml b/Cargo.toml index 324843447172d..2e7dbe1f4fe68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,6 +157,7 @@ const-str = { version = "1.1.0", default-features = false } convert_case = { version = "0.8", default-features = false } criterion = "0.8" crossbeam-utils = { version = "0.8.21", default-features = false } +cuckoo-clock = { version = "0.1.5" , default-features = false } darling = { version = "0.20.11", default-features = false, features = ["suggestions"] } dashmap = { version = "6.1.0", default-features = false } derivative = { version = "2.2.0", default-features = false } @@ -383,6 +384,7 @@ chrono.workspace = true chrono-tz.workspace = true colored.workspace = true csv = { version = "1.3", default-features = false } +cuckoo-clock = { workspace = true, optional = true } databend-client = { version = "0.28.0", default-features = false, features = ["rustls"], optional = true } derivative.workspace = true dirs-next = { version = "2.0.0", default-features = false, optional = true } @@ -448,6 +450,7 @@ sqlx = { version = "0.8.6", default-features = false, features = ["derive", "pos stream-cancel = { version = "0.8.2", default-features = false } strip-ansi-escapes = { version = "0.2.1", default-features = false } syslog = { version = "6.1.1", default-features = false, optional = true } +tempfile = { workspace = true, optional = true } tokio-postgres = { version = "0.7.18", default-features = false, features = ["runtime", "with-chrono-0_4"], optional = true } tokio-tungstenite = { workspace = true, features = ["connect"], optional = true } toml.workspace = true @@ -659,7 +662,7 @@ gcp = ["dep:base64", "dep:goauth", "dep:smpl_jwt"] enrichment-tables = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"] enrichment-tables-geoip = ["dep:maxminddb"] enrichment-tables-mmdb = ["dep:maxminddb"] -enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"] +enrichment-tables-memory = ["dep:cuckoo-clock", "dep:evmap", "dep:evmap-derive", "dep:thread_local", "dep:tempfile"] # Codecs codecs-arrow = ["dep:arrow", "dep:arrow-schema", "vector-lib/arrow"] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 809e71411a172..4813ad74d499b 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -209,6 +209,7 @@ csv,https://github.com/BurntSushi/rust-csv,Unlicense OR MIT,Andrew Gallant ctr,https://github.com/RustCrypto/block-modes,MIT OR Apache-2.0,RustCrypto Developers ctutils,https://github.com/RustCrypto/utils,Apache-2.0 OR MIT,RustCrypto Developers +cuckoo-clock,https://github.com/Quad9DNS/cuckoo-clock,MIT,"John Todd , Ensar Sarajčić " curl-sys,https://github.com/alexcrichton/curl-rust,MIT,Alex Crichton curve25519-dalek,https://github.com/dalek-cryptography/curve25519-dalek/tree/main/curve25519-dalek,BSD-3-Clause,"Isis Lovecruft , Henry de Valence " curve25519-dalek-derive,https://github.com/dalek-cryptography/curve25519-dalek,MIT OR Apache-2.0,The curve25519-dalek-derive Authors diff --git a/changelog.d/25143_enrichment_table_memory_cuckoo_filter.feature.md b/changelog.d/25143_enrichment_table_memory_cuckoo_filter.feature.md new file mode 100644 index 0000000000000..36eaeb1a2c64d --- /dev/null +++ b/changelog.d/25143_enrichment_table_memory_cuckoo_filter.feature.md @@ -0,0 +1,3 @@ +Added cuckoo filter support for `memory` enrichment table, to provide an efficient way to store and check presence of keys with a low memory footprint at the cost of false positives. + +authors: esensar Quad9DNS diff --git a/lib/vector-common/src/internal_event/metric_name.rs b/lib/vector-common/src/internal_event/metric_name.rs index 6212d245cef40..ecd12164d8ecc 100644 --- a/lib/vector-common/src/internal_event/metric_name.rs +++ b/lib/vector-common/src/internal_event/metric_name.rs @@ -98,6 +98,7 @@ pub enum CounterName { MemoryEnrichmentTableFlushesTotal, MemoryEnrichmentTableInsertionsTotal, MemoryEnrichmentTableReadsTotal, + MemoryEnrichmentTableRemovedTotal, MemoryEnrichmentTableTtlExpirations, ComponentCpuUsageNsTotal, } @@ -364,6 +365,7 @@ impl CounterName { "memory_enrichment_table_insertions_total" } Self::MemoryEnrichmentTableReadsTotal => "memory_enrichment_table_reads_total", + Self::MemoryEnrichmentTableRemovedTotal => "memory_enrichment_table_removed_total", Self::MemoryEnrichmentTableTtlExpirations => "memory_enrichment_table_ttl_expirations", Self::ComponentCpuUsageNsTotal => "component_cpu_usage_ns_total", } diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 9b17ce29750c3..56574a7d7a9a9 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -19,6 +19,7 @@ use crate::{ config::{ EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput, }, + enrichment_tables::memory::cuckoo_table::{CuckooMemoryConfig, CuckooMemoryTable}, sinks::Healthcheck, sources::Source, }; @@ -26,6 +27,7 @@ use crate::{ /// Configuration for the `memory` enrichment table. #[configurable_component(enrichment_table("memory"))] #[derive(Clone)] +#[serde(deny_unknown_fields)] pub struct MemoryConfig { /// TTL (time-to-live in seconds) is used to limit the lifetime of data stored in the cache. /// When TTL expires, data behind a specific key in the cache is removed. @@ -43,6 +45,9 @@ pub struct MemoryConfig { /// Since every TTL scan makes its changes visible, only use this value /// if it is shorter than the `scan_interval`. /// + /// NOTE: For cuckoo filter, all writes are visible immediately. Flush interval still defines + /// when metrics for cuckoo filter are made visible. + /// /// By default, all writes are made visible immediately. #[serde(skip_serializing_if = "vector_lib::serde::is_default")] pub flush_interval: Option, @@ -69,8 +74,16 @@ pub struct MemoryConfig { #[serde(default)] pub ttl_field: OptionalValuePath, + /// Set to make the table act as a probabilistic filter instead of storing original values. This + /// will prevent reading values from the table - found keys will have empty value. + #[configurable(derived)] + #[serde(default)] + pub filter: Option, + #[serde(skip)] memory: Arc>>>, + #[serde(skip)] + cuckoo: Arc>>>, } /// Configuration for memory enrichment table source functionality. @@ -102,6 +115,17 @@ pub struct MemorySourceConfig { pub source_key: String, } +/// Configuration for memory enrichment table filter functionality. +#[configurable_component] +#[derive(Clone, Debug, PartialEq, Eq)] +#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")] +pub enum TableFilter { + /// Cuckoo filter + /// + /// Supports removal by accepting null values for keys, as well as TTL and LRU. + Cuckoo(CuckooMemoryConfig), +} + impl PartialEq for MemoryConfig { fn eq(&self, other: &Self) -> bool { self.ttl == other.ttl @@ -118,11 +142,13 @@ impl Default for MemoryConfig { scan_interval: default_scan_interval(), flush_interval: None, memory: Arc::new(Mutex::new(None)), + cuckoo: Arc::new(Mutex::new(None)), max_byte_size: None, log_namespace: None, source_config: None, internal_metrics: InternalMetricsConfig::default(), ttl_field: OptionalValuePath::none(), + filter: None, } } } @@ -142,6 +168,23 @@ impl MemoryConfig { .get_or_insert_with(|| Box::new(Memory::new(self.clone()))) .clone() } + + pub(super) async fn get_or_build_cuckoo(&self) -> crate::Result { + let mut boxed_cuckoo = self.cuckoo.lock().await; + let Some(TableFilter::Cuckoo(cuckoo)) = &self.filter else { + panic!("No cuckoo"); + }; + if let Some(boxed_cuckoo) = boxed_cuckoo.as_ref() { + Ok(*boxed_cuckoo.clone()) + } else { + Ok(*boxed_cuckoo + .insert(Box::new(CuckooMemoryTable::new( + self.clone(), + cuckoo.clone(), + )?)) + .clone()) + } + } } impl EnrichmentTableConfig for MemoryConfig { @@ -149,7 +192,15 @@ impl EnrichmentTableConfig for MemoryConfig { &self, _globals: &crate::config::GlobalOptions, ) -> crate::Result> { - Ok(Box::new(self.get_or_build_memory().await)) + match &self.filter { + Some(TableFilter::Cuckoo(_)) => { + if self.source_config.is_some() { + return Err("Source functionality is not supported for cuckoo filter".into()); + } + Ok(Box::new(self.get_or_build_cuckoo().await?)) + } + None => Ok(Box::new(self.get_or_build_memory().await)), + } } fn sink_config( @@ -166,6 +217,10 @@ impl EnrichmentTableConfig for MemoryConfig { let Some(source_config) = &self.source_config else { return None; }; + // Filters can't be used as a source + if self.filter.is_some() { + return None; + } Some(( source_config.source_key.clone().into(), Box::new(self.clone()), @@ -177,7 +232,12 @@ impl EnrichmentTableConfig for MemoryConfig { #[typetag::serde(name = "memory_enrichment_table")] impl SinkConfig for MemoryConfig { async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let sink = VectorSink::from_event_streamsink(self.get_or_build_memory().await); + let sink = match &self.filter { + Some(TableFilter::Cuckoo(_)) => { + VectorSink::from_event_streamsink(self.get_or_build_cuckoo().await?) + } + None => VectorSink::from_event_streamsink(self.get_or_build_memory().await), + }; Ok((sink, future::ok(()).boxed())) } diff --git a/src/enrichment_tables/memory/cuckoo_table.rs b/src/enrichment_tables/memory/cuckoo_table.rs new file mode 100644 index 0000000000000..e1d2daba4d11e --- /dev/null +++ b/src/enrichment_tables/memory/cuckoo_table.rs @@ -0,0 +1,608 @@ +use std::{ + fs::File, + io::{BufReader, BufWriter, Write}, + num::{NonZeroU64, NonZeroUsize}, + path::PathBuf, + pin::Pin, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + time::Duration, +}; + +use async_trait::async_trait; +use bytes::Bytes; +use cuckoo_clock::{ + CuckooFilter, ExportableRandomState, InsertValues, LookupValues, + config::{CounterConfig, CuckooConfiguration, LruConfig, TtlConfig}, +}; +use futures::{ + Stream, StreamExt, + stream::{self, BoxStream}, +}; +use tempfile::NamedTempFile; +use tokio::{ + task::JoinSet, + time::{Instant, interval, interval_at}, +}; +use tokio_stream::wrappers::IntervalStream; +use tracing::Instrument; +use vector_config::configurable_component; +use vector_lib::{ + EstimatedJsonEncodedSizeOf, + enrichment::{Case, Condition, Error, IndexHandle, Table}, + event::{Event, EventStatus, Finalizable}, + internal_event::{ + ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol, + }, + lookup::lookup_v2::OptionalValuePath, + sink::StreamSink, +}; +use vrl::value::{KeyString, ObjectMap, Value}; + +use crate::enrichment_tables::memory::{ + MemoryConfig, + internal_events::{ + MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInsertFailed, + MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead, MemoryEnrichmentTableReadFailed, + MemoryEnrichmentTableRemoved, MemoryEnrichmentTableTtlExpiredCount, + }, +}; + +/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a cuckoo table. +#[derive(Clone)] +pub(super) struct CuckooMemoryTable { + filter: CuckooFilter, + pub(super) config: MemoryConfig, + cuckoo_config: CuckooMemoryConfig, +} + +/// Configuration of cuckoo filter for memory table. +#[configurable_component] +#[derive(Clone, Debug, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct CuckooMemoryConfig { + /// Number of bits used for fingerprint. + #[serde(default = "default_cuckoo_fingerprint_bits")] + pub fingerprint_bits: NonZeroUsize, + /// Number of slots in each bucket + #[serde(default = "default_cuckoo_bucket_size")] + pub bucket_size: NonZeroUsize, + /// Maximum number of entries that can be stored in the filter (actual capacity will usually be + /// larger) + pub max_entries: usize, + /// Max number of kicks when experiencing hash collisions. + #[serde(default = "default_cuckoo_max_kicks")] + pub max_kicks: usize, + /// Can be set to true to use LRU strategy for kicking. + #[serde(default = "crate::serde::default_false")] + pub lru_enabled: bool, + /// Can be set to true to also track TTL for entries. + #[serde(default = "crate::serde::default_true")] + pub ttl_enabled: bool, + /// Number of bits to use to track TTL. Low bit count will reduce maximum TTL and also require a + /// worse resolution to keep working. + #[serde(default = "default_cuckoo_ttl_bits")] + pub ttl_bits: NonZeroUsize, + /// Can be set to true to track a count alongside hashes. + #[serde(default = "crate::serde::default_false")] + pub counter_enabled: bool, + /// Number of bits to use to track counter. This will limit the max value. + #[serde(default = "default_cuckoo_counter_bits")] + pub counter_bits: NonZeroUsize, + /// Field in the incoming value used as the counter override. + #[configurable(derived)] + #[serde(default)] + pub counter_field: OptionalValuePath, + /// Path to the file to export data to periodically and on exit. + /// Data will be imported from this file on startup. + #[configurable(derived)] + #[serde(default)] + pub persistence_path: Option, + /// The interval used for exporting data. + /// + /// By default, export is only done on exit. + #[serde(skip_serializing_if = "vector_lib::serde::is_default")] + pub export_interval: Option, + /// Number of threads to use for scanning and updating LRU/TTL. + /// + /// By default, scanning is single threaded. + #[serde(default)] + pub scanning_threads: Option, + /// If set to true scanning will not block insertions. + /// This may affect behavior since blocking scans would free up space before insertions. + /// + /// By default, scanning is blocking. + #[serde(default = "crate::serde::default_false")] + pub concurrent_scanning: bool, +} + +const fn default_cuckoo_fingerprint_bits() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(8) } +} + +const fn default_cuckoo_bucket_size() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(4) } +} + +const fn default_cuckoo_ttl_bits() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(8) } +} + +const fn default_cuckoo_counter_bits() -> NonZeroUsize { + unsafe { NonZeroUsize::new_unchecked(8) } +} + +const fn default_cuckoo_max_kicks() -> usize { + 500 +} + +impl CuckooMemoryTable { + /// Creates a new [CuckooMemoryTable] based on the provided config. + pub(super) fn new( + config: MemoryConfig, + cuckoo_config: CuckooMemoryConfig, + ) -> crate::Result { + let ttl_val = (config.ttl.div_ceil(config.scan_interval.get())).max(1); + let mut builder = CuckooConfiguration::builder(cuckoo_config.max_entries) + .fingerprint_bits(cuckoo_config.fingerprint_bits.get().try_into()?) + .bucket_size(cuckoo_config.bucket_size) + .max_kicks(cuckoo_config.max_kicks); + + if cuckoo_config.lru_enabled { + builder = builder.with_lru(LruConfig::default()); + } + + if cuckoo_config.ttl_enabled { + let ttl_val: u32 = u32::try_from(ttl_val)?; + let needed_bits = ttl_val.ilog2() + 1; + if needed_bits as usize > cuckoo_config.ttl_bits.get() { + return Err( + format!( + "`ttl_bits` ({}) must be set to at least {} to support the default `ttl` value ({}) at the configured scan interval ({}).", + cuckoo_config.ttl_bits.get(), + needed_bits, + config.ttl, + config.scan_interval.get()).into(), + ); + } + builder = builder.with_ttl(TtlConfig { + ttl: ttl_val.try_into()?, + ttl_bits: cuckoo_config.ttl_bits.get().try_into()?, + }); + } + + if cuckoo_config.counter_enabled { + builder = builder.with_counter(CounterConfig { + counter_bits: cuckoo_config.counter_bits.get().try_into()?, + ..Default::default() + }); + } + + let built_config = builder.build()?; + + let filter_size = built_config.get_configured_memory_usage(); + if let Some(max_byte_size) = config.max_byte_size + && filter_size as u64 > max_byte_size + { + return Err(format!("Configured cuckoo filter is larger ({}) than defined `max_byte_size` ({}). Reduce the size of cuckoo filter or increase or remove `max_byte_size`.", filter_size, max_byte_size).into()); + } + + let filter = 'import: { + if let Some(path) = &cuckoo_config.persistence_path { + let file = match File::open(path) { + Ok(file) => file, + Err(err) => match err.kind() { + std::io::ErrorKind::NotFound => { + break 'import CuckooFilter::new_random_exportable(built_config); + } + _ => { + return Err(format!( + "Couldn't open \"{}\" for cuckoo filter state import. {}", + path.to_str().unwrap_or(""), + err + ) + .into()); + } + }, + }; + let mut reader = BufReader::new(file); + let (hasher, persisted_config) = + match CuckooFilter::::import_config(&mut reader) { + Ok(imported) => imported, + Err(error) => { + return Err( + format!("Cuckoo filter state import failed: {}. Delete the persisted state file ({}) to proceed.", error, path.to_str().unwrap_or("")).into(), + ); + } + }; + + if persisted_config != built_config { + return Err( + format!("Stored cuckoo filter configuration doesn't match with new configuration. If this is intended, remove the persisted state file ({}).", path.to_str().unwrap_or("")).into(), + ); + } + + match CuckooFilter::import_state(hasher, persisted_config, &mut reader) { + Ok(filter) => filter, + Err(error) => { + return Err( + format!("Cuckoo filter state import failed: {}. Delete the persisted state file ({}) to proceed.", error, path.to_str().unwrap_or("")).into(), + ); + } + } + } else { + CuckooFilter::new_random_exportable(built_config) + } + }; + + Ok(Self { + config, + filter, + cuckoo_config, + }) + } + + fn export(&self) { + if let Some(path) = &self.cuckoo_config.persistence_path { + let mut parent = path.clone(); + if parent.pop() { + match NamedTempFile::new_in(parent) { + Ok(temp) => { + { + let mut writer = BufWriter::new(temp.as_file()); + if self.export_to(&mut writer).is_err() { + return; + } + } + if let Err(error) = temp.persist(path) { + warn!("Cuckoo filter export failed: {}", error); + } + } + Err(err) => warn!( + "Couldn't open temporary file for export. Aborting export. Error: {}", + err + ), + } + } + } + } + + fn export_to(&self, mut writer: impl Write) -> Result<(), ()> { + match self.filter.exporter().write_to(&mut writer) { + Ok(()) => { + if let Err(error) = writer.flush() { + warn!("Cuckoo filter export failed: {}", error); + return Err(()); + }; + Ok(()) + } + Err(error) => { + warn!("Cuckoo filter export failed: {}", error); + Err(()) + } + } + } + + fn handle_value(&self, value: ObjectMap) { + for (k, value) in value.iter() { + if matches!(value, Value::Null) { + if self.filter.remove(k) { + emit!(MemoryEnrichmentTableRemoved { + key: k, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + } + + continue; + }; + + let res = if self.cuckoo_config.ttl_enabled || self.cuckoo_config.counter_enabled { + let ttl = self + .config + .ttl_field + .path + .as_ref() + .and_then(|p| value.get(p)) + .and_then(|v| v.as_integer()) + .and_then(|v| u64::try_from(v).ok()) + .or(Some(self.config.ttl)) + .map(|v| (v.div_ceil(self.config.scan_interval.get())).max(1)) + .and_then(|v| u32::try_from(v).ok()); + if let Some(ttl) = ttl { + let needed_bits = ttl.ilog2() + 1; + if needed_bits as usize > self.cuckoo_config.ttl_bits.get() { + warn!( + "`ttl_bits` ({}) must be set to at least {} to support the provided `ttl` value ({}) at the configured scan interval ({}).", + self.cuckoo_config.ttl_bits.get(), + needed_bits, + self.config.ttl, + self.config.scan_interval.get() + ); + } + } + let counter = self + .cuckoo_config + .counter_field + .path + .as_ref() + .and_then(|p| value.get(p)) + .and_then(|v| v.as_integer()) + .and_then(|v| i32::try_from(v).ok()); + self.filter.insert_if_not_present_with_update( + k, + InsertValues { ttl, counter }, + LookupValues { + ttl, + counter_diff: counter, + }, + ) + } else { + self.filter.insert_if_not_present(k) + }; + + if res.is_some_and(|r| r.matches_key(k, &self.filter)) { + emit!(MemoryEnrichmentTableInsertFailed { + key: k, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + } else { + emit!(MemoryEnrichmentTableInserted { + key: k, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + } + } + } +} + +impl Table for CuckooMemoryTable { + fn find_table_row<'a>( + &self, + case: Case, + condition: &'a [Condition<'a>], + select: Option<&'a [String]>, + wildcard: Option<&Value>, + index: Option, + ) -> Result { + let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?; + + match rows.pop() { + Some(row) if rows.is_empty() => Ok(row), + Some(_) => Err(Error::MoreThanOneRowFound), + None => Err(Error::NoRowsFound), + } + } + + fn find_table_rows<'a>( + &self, + _case: Case, + condition: &'a [Condition<'a>], + _select: Option<&'a [String]>, + _wildcard: Option<&Value>, + _index: Option, + ) -> Result, Error> { + match condition.first() { + Some(_) if condition.len() > 1 => Err(Error::OnlyOneConditionAllowed), + Some(Condition::Equals { value, .. }) => { + let key = value.to_string_lossy(); + if let Some(associated_data) = self.filter.get_associated_data(&key) { + emit!(MemoryEnrichmentTableRead { + key: &key, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + let mut result = ObjectMap::from([ + ( + KeyString::from("key"), + Value::Bytes(Bytes::copy_from_slice(key.as_bytes())), + ), + ( + KeyString::from("fingerprint"), + Value::Bytes(Bytes::from(format!( + "{:X}", + associated_data.get_fingerprint() + ))), + ), + (KeyString::from("value"), Value::Null), + ]); + if let Ok(ttl) = associated_data.get_stored_ttl_value() + && let Ok(ttl) = (ttl as u64 * self.config.scan_interval.get()).try_into() + { + result.insert(KeyString::from("ttl"), Value::Integer(ttl)); + } + if let Ok(counter) = associated_data.get_counter() { + result.insert(KeyString::from("counter"), Value::Integer(counter.into())); + } + Ok(vec![result]) + } else { + emit!(MemoryEnrichmentTableReadFailed { + key: &key, + include_key_metric_tag: self.config.internal_metrics.include_key_tag + }); + Ok(Default::default()) + } + } + Some(_) => Err(Error::OnlyEqualityConditionAllowed), + None => Err(Error::MissingCondition { kind: "Key" }), + } + } + + fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result { + match fields.len() { + 0 => Err(Error::MissingRequiredField { field: "Key" }), + 1 => Ok(IndexHandle(0)), + _ => Err(Error::OnlyOneFieldAllowed), + } + } + + /// Returns a list of the field names that are in each index + fn index_fields(&self) -> Vec<(Case, Vec)> { + Vec::new() + } + + /// Doesn't need reload, data is written directly + fn needs_reload(&self) -> bool { + false + } +} + +impl std::fmt::Debug for CuckooMemoryTable { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "CuckooMemoryTable {:?}", self.config) + } +} + +#[async_trait] +impl StreamSink for CuckooMemoryTable { + async fn run(mut self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { + let events_sent = register!(EventsSent::from(Output(None))); + let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),))); + let now = Instant::now(); + let scan_interval_duration = Duration::from_secs(self.config.scan_interval.into()); + let mut scan_interval = IntervalStream::new(interval_at( + now.checked_add(scan_interval_duration).unwrap_or(now), + scan_interval_duration, + )); + let mut flush_interval: Pin + Send>> = self + .config + .flush_interval + .map(Duration::from_secs) + .map:: + Send>>, _>(|d| { + Box::pin(IntervalStream::new(interval(d))) + }) + .unwrap_or(Box::pin(stream::empty())); + let mut export_interval: Pin + Send>> = self + .cuckoo_config + .export_interval + .map(NonZeroU64::get) + .map(Duration::from_secs) + .map:: + Send>>, _>(|d| { + Box::pin(IntervalStream::new(interval(d))) + }) + .unwrap_or(Box::pin(stream::empty())); + + let scans_in_progress = Arc::new(AtomicUsize::new(0)); + + loop { + tokio::select! { + event = input.next() => { + let mut event = if let Some(event) = event { + event + } else { + break; + }; + let event_byte_size = event.estimated_json_encoded_size_of(); + + let finalizers = event.take_finalizers(); + + // Panic: This sink only accepts Logs, so this should never panic + let log = event.into_log(); + + if let (Value::Object(map), _) = log.into_parts() { + self.handle_value(map) + }; + + finalizers.update_status(EventStatus::Delivered); + events_sent.emit(CountByteSize(1, event_byte_size)); + bytes_sent.emit(ByteSize(event_byte_size.get())); + }, + + Some(_) = flush_interval.next() => { + emit!(MemoryEnrichmentTableFlushed { + new_objects_count: self.filter.get_item_count(), + new_byte_size: self.filter.get_memory_usage() + }); + } + + Some(_) = export_interval.next() => { + self.export(); + } + + Some(_) = scan_interval.next() => { + if scans_in_progress.load(Ordering::Acquire) > 0 { + warn!("Previous scan still in progress for cuckoo enrichment table. New scan will be skipped until previous one is complete. Consider increasing scan interval."); + continue; + } + let mut handles = JoinSet::new(); + let filter = self.filter.clone(); + let count = self.cuckoo_config.scanning_threads.unwrap_or(NonZeroUsize::new(1).unwrap()); + scans_in_progress.fetch_add(count.get(), Ordering::AcqRel); + for i in 0..count.get() { + let filter = filter.clone(); + let scans_in_progress = Arc::clone(&scans_in_progress); + let task = async move { + let expired = filter.scan_and_update_full_partition(count, i); + emit!(MemoryEnrichmentTableTtlExpiredCount { + count: expired as u64 + }); + emit!(MemoryEnrichmentTableFlushed { + new_objects_count: filter.get_item_count(), + new_byte_size: filter.get_memory_usage() + }); + scans_in_progress.fetch_sub(1, Ordering::AcqRel); + }.in_current_span(); + if !self.cuckoo_config.concurrent_scanning { + handles.spawn(task); + } else { + tokio::spawn(task); + } + } + if !self.cuckoo_config.concurrent_scanning { + let _ = handles.join_all().await; + } + } + } + } + + // Final export before exiting + self.export(); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn build_cuckoo_config(modfn: impl Fn(&mut CuckooMemoryConfig)) -> CuckooMemoryConfig { + let mut config = CuckooMemoryConfig { + fingerprint_bits: default_cuckoo_fingerprint_bits(), + bucket_size: default_cuckoo_bucket_size(), + max_entries: 1000, + max_kicks: default_cuckoo_max_kicks(), + lru_enabled: false, + ttl_enabled: false, + ttl_bits: default_cuckoo_ttl_bits(), + counter_enabled: false, + counter_bits: default_cuckoo_counter_bits(), + counter_field: OptionalValuePath::none(), + persistence_path: None, + export_interval: None, + scanning_threads: None, + concurrent_scanning: false, + }; + modfn(&mut config); + config + } + + #[test] + fn finds_row() { + let memory = CuckooMemoryTable::new(Default::default(), build_cuckoo_config(|_| {})) + .expect("default cuckoo memory table should build correctly"); + memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))])); + + let condition = Condition::Equals { + field: "key", + value: Value::from("test_key"), + }; + + let result = memory.find_table_row(Case::Sensitive, &[condition], None, None, None); + assert!(result.is_ok()); + let result = result.unwrap(); + assert_eq!(result.get("key").unwrap(), &Value::from("test_key")); + // Cuckoo fingerprint is provided too + assert!(result.contains_key("fingerprint")); + } +} diff --git a/src/enrichment_tables/memory/internal_events.rs b/src/enrichment_tables/memory/internal_events.rs index dfd7b8d6f4c78..f781416a2fdb5 100644 --- a/src/enrichment_tables/memory/internal_events.rs +++ b/src/enrichment_tables/memory/internal_events.rs @@ -58,6 +58,26 @@ impl InternalEvent for MemoryEnrichmentTableInserted<'_> { } } +#[derive(Debug, NamedInternalEvent)] +pub(crate) struct MemoryEnrichmentTableRemoved<'a> { + pub key: &'a str, + pub include_key_metric_tag: bool, +} + +impl InternalEvent for MemoryEnrichmentTableRemoved<'_> { + fn emit(self) { + if self.include_key_metric_tag { + counter!( + CounterName::MemoryEnrichmentTableRemovedTotal, + "key" => self.key.to_owned() + ) + .increment(1); + } else { + counter!(CounterName::MemoryEnrichmentTableRemovedTotal).increment(1); + } + } +} + #[derive(Debug, NamedInternalEvent)] pub(crate) struct MemoryEnrichmentTableFlushed { pub new_objects_count: usize, @@ -92,6 +112,17 @@ impl InternalEvent for MemoryEnrichmentTableTtlExpired<'_> { } } +#[derive(Debug, NamedInternalEvent)] +pub(crate) struct MemoryEnrichmentTableTtlExpiredCount { + pub count: u64, +} + +impl InternalEvent for MemoryEnrichmentTableTtlExpiredCount { + fn emit(self) { + counter!(CounterName::MemoryEnrichmentTableTtlExpirations,).increment(self.count); + } +} + #[derive(Debug, NamedInternalEvent)] pub(crate) struct MemoryEnrichmentTableReadFailed<'a> { pub key: &'a str, diff --git a/src/enrichment_tables/memory/mod.rs b/src/enrichment_tables/memory/mod.rs index 72b0986f9b9e9..61308ef48cfbf 100644 --- a/src/enrichment_tables/memory/mod.rs +++ b/src/enrichment_tables/memory/mod.rs @@ -1,6 +1,7 @@ //! Handles enrichment tables for `type = memory`. mod config; +mod cuckoo_table; mod internal_events; mod source; mod table; diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index a819b3be77912..d198ce1281d17 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -1,6 +1,7 @@ #![allow(unsafe_op_in_unsafe_fn)] // TODO review ShallowCopy usage code and fix properly. use std::{ + pin::Pin, sync::{Arc, Mutex, MutexGuard}, time::{Duration, Instant}, }; @@ -12,7 +13,10 @@ use evmap::{ {self}, }; use evmap_derive::ShallowCopy; -use futures::{StreamExt, stream::BoxStream}; +use futures::{ + Stream, StreamExt, + stream::{self, BoxStream}, +}; use thread_local::ThreadLocal; use tokio::{ sync::broadcast::{Receiver, Sender}, @@ -399,12 +403,14 @@ impl StreamSink for Memory { async fn run(mut self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { let events_sent = register!(EventsSent::from(Output(None))); let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),))); - let mut flush_interval = IntervalStream::new(interval( - self.config - .flush_interval - .map(Duration::from_secs) - .unwrap_or(Duration::MAX), - )); + let mut flush_interval: Pin + Send>> = self + .config + .flush_interval + .map(Duration::from_secs) + .map:: + Send>>, _>(|d| { + Box::pin(IntervalStream::new(interval(d))) + }) + .unwrap_or(Box::pin(stream::empty())); let mut scan_interval = IntervalStream::new(interval(Duration::from_secs( self.config.scan_interval.into(), ))); diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index 20792a94ee322..d11bcf1b5f169 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -216,6 +216,121 @@ generated: configuration: { required: false relevant_when: "type = \"file\"" } + filter: { + type: object: options: { + bucket_size: { + type: uint: default: 4 + description: "Number of slots in each bucket" + required: false + } + concurrent_scanning: { + type: bool: default: false + description: """ + If set to true scanning will not block insertions. + This may affect behavior since blocking scans would free up space before insertions. + + By default, scanning is blocking. + """ + required: false + } + counter_bits: { + type: uint: default: 8 + description: "Number of bits to use to track counter. This will limit the max value." + required: false + } + counter_enabled: { + type: bool: default: false + description: "Can be set to true to track a count alongside hashes." + required: false + } + counter_field: { + type: string: default: "" + description: "Field in the incoming value used as the counter override." + required: false + } + export_interval: { + type: uint: {} + description: """ + The interval used for exporting data. + + By default, export is only done on exit. + """ + required: false + } + fingerprint_bits: { + type: uint: default: 8 + description: "Number of bits used for fingerprint." + required: false + } + lru_enabled: { + type: bool: default: false + description: "Can be set to true to use LRU strategy for kicking." + required: false + } + max_entries: { + type: uint: {} + description: """ + Maximum number of entries that can be stored in the filter (actual capacity will usually be + larger) + """ + required: true + } + max_kicks: { + type: uint: default: 500 + description: "Max number of kicks when experiencing hash collisions." + required: false + } + persistence_path: { + type: string: {} + description: """ + Path to the file to export data to periodically and on exit. + Data will be imported from this file on startup. + """ + required: false + } + scanning_threads: { + type: uint: {} + description: """ + Number of threads to use for scanning and updating LRU/TTL. + + By default, scanning is single threaded. + """ + required: false + } + ttl_bits: { + type: uint: default: 8 + description: """ + Number of bits to use to track TTL. Low bit count will reduce maximum TTL and also require a + worse resolution to keep working. + """ + required: false + } + ttl_enabled: { + type: bool: default: true + description: "Can be set to true to also track TTL for entries." + required: false + } + type: { + type: string: enum: cuckoo: """ + Cuckoo filter + + Supports removal by accepting null values for keys, as well as TTL and LRU. + """ + description: """ + Cuckoo filter + + Supports removal by accepting null values for keys, as well as TTL and LRU. + """ + required: true + } + } + description: """ + Set to make the table act as a probabilistic filter instead of storing original values. This + will prevent reading values from the table - found keys will have empty value. + """ + required: false + relevant_when: "type = \"memory\"" + } flush_interval: { type: uint: {} description: """ @@ -225,6 +340,9 @@ generated: configuration: { Since every TTL scan makes its changes visible, only use this value if it is shorter than the `scan_interval`. + NOTE: For cuckoo filter, all writes are visible immediately. Flush interval still defines + when metrics for cuckoo filter are made visible. + By default, all writes are made visible immediately. """ required: false