Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ colored = { version = "3.1.1", default-features = false }
const-str = { version = "1.1.0", default-features = false }
criterion = "0.8"
crossbeam-utils = { version = "0.8.21", default-features = false }
cuckoo-clock = { version = "0.1.0" , default-features = false }
darling = { version = "0.20.11", default-features = false, features = ["suggestions"] }
dashmap = { version = "6.1.0", default-features = false }
derivative = { version = "2.2.0", default-features = false }
Expand Down Expand Up @@ -362,6 +363,7 @@ chrono.workspace = true
chrono-tz.workspace = true
colored.workspace = true
csv = { version = "1.3", default-features = false }
cuckoo-clock = { workspace = true, optional = true }
databend-client = { version = "0.28.0", default-features = false, features = ["rustls"], optional = true }
derivative.workspace = true
dirs-next = { version = "2.0.0", default-features = false, optional = true }
Expand Down Expand Up @@ -426,6 +428,7 @@ sqlx = { version = "0.8.6", default-features = false, features = ["derive", "pos
stream-cancel = { version = "0.8.2", default-features = false }
strip-ansi-escapes = { version = "0.2.1", default-features = false }
syslog = { version = "6.1.1", default-features = false, optional = true }
tempfile = { workspace = true, optional = true }
tokio-postgres = { version = "0.7.13", default-features = false, features = ["runtime", "with-chrono-0_4"], optional = true }
tokio-tungstenite = { workspace = true, features = ["connect"], optional = true }
toml.workspace = true
Expand Down Expand Up @@ -603,7 +606,7 @@ gcp = ["dep:base64", "dep:goauth", "dep:smpl_jwt"]
enrichment-tables = ["enrichment-tables-geoip", "enrichment-tables-mmdb", "enrichment-tables-memory"]
enrichment-tables-geoip = ["dep:maxminddb"]
enrichment-tables-mmdb = ["dep:maxminddb"]
enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"]
enrichment-tables-memory = ["dep:bloomy", "dep:cuckoo-clock", "dep:evmap", "dep:evmap-derive", "dep:thread_local", "dep:tempfile"]

# Codecs
codecs-arrow = ["dep:arrow", "dep:arrow-schema", "vector-lib/arrow"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added cuckoo filter support for `memory` enrichment table, to provide an efficient way to store and check presence of keys with a low memory footprint at the cost of false positives.

authors: esensar Quad9DNS
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added bloom filter support for `memory` enrichment table, similar to cuckoo filter, providing as imple and efficient way to store and check presence of keys with a low memory footprint at the cost of false positives, but with less features that cuckoo filter.

Check failure on line 1 in changelog.d/25154_enrichment_table_memory_bloom_filter.feature.md

View workflow job for this annotation

GitHub Actions / Check Spelling

`imple` is not a recognized word (unrecognized-spelling)

authors: esensar Quad9DNS
227 changes: 227 additions & 0 deletions src/enrichment_tables/memory/bloom_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
use std::{
sync::{Arc, RwLock},
time::Duration,
};

use async_trait::async_trait;
use bloomy::BloomFilter;
use bytes::Bytes;
use futures::{StreamExt, stream::BoxStream};
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
use vector_config::configurable_component;
use vector_lib::{
EstimatedJsonEncodedSizeOf,
enrichment::{Case, Condition, Error, IndexHandle, Table},
event::{Event, EventStatus, Finalizable},
internal_event::{
ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol,
},
sink::StreamSink,
};
use vrl::value::{KeyString, ObjectMap, Value};

use crate::enrichment_tables::memory::{
MemoryConfig,
internal_events::{
MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead,
MemoryEnrichmentTableReadFailed,
},
};

/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a bloom table.
#[derive(Clone)]
pub(super) struct BloomMemoryTable {
filter: Arc<RwLock<BloomFilter<String>>>,
pub(super) config: MemoryConfig,
}

/// Configuration of bloom filter for memory table.
#[configurable_component]
#[derive(Clone, Debug, PartialEq, Eq)]
#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")]
pub struct BloomMemoryConfig {
/// Maximum number of entries that can be stored in the filter
pub max_entries: usize,
}

impl BloomMemoryTable {
/// Creates a new [BloomMemoryTable] based on the provided config.
pub(super) fn new(
config: MemoryConfig,
bloom_config: BloomMemoryConfig,
) -> crate::Result<Self> {
let filter = Arc::new(RwLock::new(BloomFilter::new(bloom_config.max_entries)));

Ok(Self { config, filter })
}

fn handle_value(&self, value: ObjectMap) {
for (k, _) in value.iter() {
self.filter
.write()
.expect("rwlock poisoned")
.insert(&k.to_string());
emit!(MemoryEnrichmentTableInserted {
key: k,
include_key_metric_tag: self.config.internal_metrics.include_key_tag
});
}
}
}

impl Table for BloomMemoryTable {
fn find_table_row<'a>(
&self,
case: Case,
condition: &'a [Condition<'a>],
select: Option<&'a [String]>,
wildcard: Option<&Value>,
index: Option<IndexHandle>,
) -> Result<ObjectMap, Error> {
let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?;

match rows.pop() {
Some(row) if rows.is_empty() => Ok(row),
Some(_) => Err(Error::MoreThanOneRowFound),
None => Err(Error::NoRowsFound),
}
}

fn find_table_rows<'a>(
&self,
_case: Case,
condition: &'a [Condition<'a>],
_select: Option<&'a [String]>,
_wildcard: Option<&Value>,
_index: Option<IndexHandle>,
) -> Result<Vec<ObjectMap>, Error> {
match condition.first() {
Some(_) if condition.len() > 1 => Err(Error::OnlyOneConditionAllowed),
Some(Condition::Equals { value, .. }) => {
let key = value.to_string_lossy().to_string();
if self.filter.read().expect("rwlock poisoned").contains(&key) {
emit!(MemoryEnrichmentTableRead {
key: &key,
include_key_metric_tag: self.config.internal_metrics.include_key_tag
});
let result = ObjectMap::from([(
KeyString::from("key"),
Value::Bytes(Bytes::copy_from_slice(key.as_bytes())),
)]);
Ok(vec![result])
} else {
emit!(MemoryEnrichmentTableReadFailed {
key: &key,
include_key_metric_tag: self.config.internal_metrics.include_key_tag
});
Ok(Default::default())
}
}
Some(_) => Err(Error::OnlyEqualityConditionAllowed),
None => Err(Error::MissingCondition { kind: "Key" }),
}
}

fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result<IndexHandle, Error> {
match fields.len() {
0 => Err(Error::MissingRequiredField { field: "Key" }),
1 => Ok(IndexHandle(0)),
_ => Err(Error::OnlyOneFieldAllowed),
}
}

/// Returns a list of the field names that are in each index
fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
Vec::new()
}

/// Doesn't need reload, data is written directly
fn needs_reload(&self) -> bool {
false
}
}

impl std::fmt::Debug for BloomMemoryTable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BloomMemoryTable {:?}", self.config)
}
}

#[async_trait]
impl StreamSink<Event> for BloomMemoryTable {
async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
let events_sent = register!(EventsSent::from(Output(None)));
let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),)));
let mut flush_interval = IntervalStream::new(interval(
self.config
.flush_interval
.map(Duration::from_secs)
.unwrap_or(Duration::MAX),
));

loop {
tokio::select! {
event = input.next() => {
let mut event = if let Some(event) = event {
event
} else {
break;
};
let event_byte_size = event.estimated_json_encoded_size_of();

let finalizers = event.take_finalizers();

// Panic: This sink only accepts Logs, so this should never panic
let log = event.into_log();

if let (Value::Object(map), _) = log.into_parts() {
self.handle_value(map)
};

finalizers.update_status(EventStatus::Delivered);
events_sent.emit(CountByteSize(1, event_byte_size));
bytes_sent.emit(ByteSize(event_byte_size.get()));
},

Some(_) = flush_interval.next() => {
let filter = self.filter.read().expect("rwlock poisoned");
emit!(MemoryEnrichmentTableFlushed {
new_objects_count: filter.count(),
new_byte_size: filter.bits() / 8
});
}
}
}

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

fn build_bloom_config(modfn: impl Fn(&mut BloomMemoryConfig)) -> BloomMemoryConfig {
let mut config = BloomMemoryConfig { max_entries: 1000 };
modfn(&mut config);
config
}

#[test]
fn finds_row() {
let memory = BloomMemoryTable::new(Default::default(), build_bloom_config(|_| {}))
.expect("default bloom memory table should build correctly");
memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));

let condition = Condition::Equals {
field: "key",
value: Value::from("test_key"),
};

let result = memory.find_table_row(Case::Sensitive, &[condition], None, None, None);
assert!(result.is_ok());
let result = result.unwrap();
assert_eq!(result.get("key").unwrap(), &Value::from("test_key"));
}
}
Loading
Loading