From 6583fd70e4957a5ff42f1c17c93f732232248499 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 1 Jun 2026 14:36:37 +0200 Subject: [PATCH 01/19] fix(enrichment): preserve memory enrichment table state on reload Related: #25143 Related: https://github.com/vectordotdev/vector/pull/25143#discussion_r3310823020 --- lib/vector-vrl/enrichment/src/lib.rs | 20 +++++++++++++++ lib/vector-vrl/enrichment/src/tables.rs | 19 ++++++++++++++ src/enrichment_tables/file.rs | 19 ++++++++++++++ src/enrichment_tables/geoip.rs | 19 ++++++++++++++ src/enrichment_tables/memory/table.rs | 33 +++++++++++++++++++++++-- src/enrichment_tables/mmdb.rs | 19 ++++++++++++++ src/topology/builder.rs | 23 +++++++++++++++-- src/topology/running.rs | 20 ++++++++++++++- 8 files changed, 167 insertions(+), 5 deletions(-) diff --git a/lib/vector-vrl/enrichment/src/lib.rs b/lib/vector-vrl/enrichment/src/lib.rs index 293a5db343d61..c275880e4b77c 100644 --- a/lib/vector-vrl/enrichment/src/lib.rs +++ b/lib/vector-vrl/enrichment/src/lib.rs @@ -8,6 +8,8 @@ pub mod tables; mod test_util; mod vrl_util; +use std::any::Any; + use dyn_clone::DynClone; use indoc::indoc; use snafu::Snafu; @@ -77,6 +79,10 @@ pub enum Error { Internal { source: InternalError }, #[snafu(display("Table {table} not loaded"))] TableNotLoaded { table: String }, + #[snafu(display("Table configuration is not compatible for reload"))] + IncompatibleTableConfig, + #[snafu(display("Table type changed with configuration change, state lost"))] + TableTypeMismatch, } #[derive(Clone, Debug, PartialEq, Eq, Snafu)] @@ -141,6 +147,20 @@ pub trait Table: DynClone { /// Returns true if the underlying data has changed and the table needs reloading. fn needs_reload(&self) -> bool; + + /// Returns true if this table holds state that needs to be moved in case of reload. + fn stateful(&self) -> bool; + + /// Moves state from other table into this table and return back the other table if the move has + /// failed. + fn take_state( + &mut self, + other: Box, + ) -> Result<(), (Box, Error)>; + + fn into_any(self: Box) -> Box; + + fn as_any(&self) -> &dyn Any; } dyn_clone::clone_trait_object!(Table); diff --git a/lib/vector-vrl/enrichment/src/tables.rs b/lib/vector-vrl/enrichment/src/tables.rs index 93542caf03e43..21f9fa4f8784c 100644 --- a/lib/vector-vrl/enrichment/src/tables.rs +++ b/lib/vector-vrl/enrichment/src/tables.rs @@ -196,6 +196,25 @@ impl TableRegistry { None => true, } } + + /// Checks if the table is stateful. + /// If in doubt (the table isn't in our list) we return false. + pub fn is_stateful(&self, table: &str) -> bool { + match &**self.tables.load() { + Some(tables) => tables + .get(table) + .map(|table| table.stateful()) + .unwrap_or(false), + None => false, + } + } + + pub fn get(&self, table: &str) -> Option> { + match &**self.tables.load() { + Some(tables) => tables.get(table).cloned(), + None => None, + } + } } impl std::fmt::Debug for TableRegistry { diff --git a/src/enrichment_tables/file.rs b/src/enrichment_tables/file.rs index bc6e3213d7a1b..20c3aa42d8507 100644 --- a/src/enrichment_tables/file.rs +++ b/src/enrichment_tables/file.rs @@ -674,6 +674,25 @@ impl Table for File { .and_then(|metadata| metadata.modified()), Ok(modified) if modified > self.last_modified) } + + fn stateful(&self) -> bool { + false + } + + fn take_state( + &mut self, + _other: Box, + ) -> Result<(), (Box, Error)> { + panic!("File table is not stateful, can't use take_state") + } + + fn into_any(self: Box) -> Box { + self + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } impl std::fmt::Debug for File { diff --git a/src/enrichment_tables/geoip.rs b/src/enrichment_tables/geoip.rs index 6716b4eaf2216..19c519aaa589e 100644 --- a/src/enrichment_tables/geoip.rs +++ b/src/enrichment_tables/geoip.rs @@ -345,6 +345,25 @@ impl Table for Geoip { .and_then(|metadata| metadata.modified()), Ok(modified) if modified > self.last_modified) } + + fn stateful(&self) -> bool { + false + } + + fn take_state( + &mut self, + _other: Box, + ) -> Result<(), (Box, Error)> { + panic!("MMDB table is not stateful, can't use take_state") + } + + fn into_any(self: Box) -> Box { + self + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } impl std::fmt::Debug for Geoip { diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index a819b3be77912..331633518adfd 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -382,9 +382,38 @@ impl Table for Memory { Vec::new() } - /// Doesn't need reload, data is written directly + /// Has to be reloaded always, because a new component is created to insert data into it fn needs_reload(&self) -> bool { - false + true + } + + fn stateful(&self) -> bool { + true + } + + fn take_state( + &mut self, + other: Box, + ) -> Result<(), (Box, Error)> { + if !other.as_any().is::() { + return Err((other, Error::TableTypeMismatch)); + } + // Type checked already + let other_memory = other.into_any().downcast::().unwrap(); + self.write_handle = other_memory.write_handle; + self.read_handle = other_memory.read_handle; + self.read_handle_factory = other_memory.read_handle_factory; + self.expired_items_sender = other_memory.expired_items_sender; + self.expired_items_receiver = other_memory.expired_items_receiver; + Ok(()) + } + + fn into_any(self: Box) -> Box { + self + } + + fn as_any(&self) -> &dyn std::any::Any { + self } } diff --git a/src/enrichment_tables/mmdb.rs b/src/enrichment_tables/mmdb.rs index 1977d1e0efad6..8c36a704bd28a 100644 --- a/src/enrichment_tables/mmdb.rs +++ b/src/enrichment_tables/mmdb.rs @@ -164,6 +164,25 @@ impl Table for Mmdb { .and_then(|metadata| metadata.modified()), Ok(modified) if modified > self.last_modified) } + + fn stateful(&self) -> bool { + false + } + + fn take_state( + &mut self, + _other: Box, + ) -> Result<(), (Box, Error)> { + panic!("MMDB table is not stateful, can't use take_state") + } + + fn into_any(self: Box) -> Box { + self + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } impl std::fmt::Debug for Mmdb { diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 8be9a0ce73d7a..3d4a05181e588 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -27,6 +27,7 @@ use vector_lib::{ }, }, }, + enrichment::Table, internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered}, latency::LatencyRecorder, schema::Definition, @@ -177,7 +178,7 @@ impl<'a> Builder<'a> { /// Loads, or reloads the enrichment tables. /// The tables are stored in the `ENRICHMENT_TABLES` global variable. async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry { - let mut enrichment_tables = HashMap::new(); + let mut enrichment_tables: HashMap> = HashMap::new(); // Build enrichment tables 'tables: for (name, table_outer) in self.config.enrichment_tables.iter() { @@ -219,6 +220,21 @@ impl<'a> Builder<'a> { } } + if !self.diff.enrichment_tables.is_added(name) + && let Some(existing_table) = ENRICHMENT_TABLES.get(&table_name) + && existing_table.stateful() + && table.stateful() + { + match table.take_state(existing_table) { + Ok(()) => (), + Err((existing, err)) => { + error!(message = "Unable to move the state to the new table.", table = ?name.to_string(), %err); + enrichment_tables.insert(table_name, existing); + continue 'tables; + } + } + } + enrichment_tables.insert(table_name, table); } } @@ -961,12 +977,15 @@ async fn run_source_output_pump( Ok(TaskOutput::Source) } +/// Reloads file based enrichment tables - not stateful ones pub async fn reload_enrichment_tables(config: &Config) { let mut enrichment_tables = HashMap::new(); // Build enrichment tables 'tables: for (name, table_outer) in config.enrichment_tables.iter() { let table_name = name.to_string(); - if ENRICHMENT_TABLES.needs_reload(&table_name) { + if ENRICHMENT_TABLES.needs_reload(&table_name) + && !ENRICHMENT_TABLES.is_stateful(&table_name) + { let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name)); let mut table = match table_outer.inner.build(&config.global).await { diff --git a/src/topology/running.rs b/src/topology/running.rs index 97e309ec0d0b8..e810bec4db39e 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -803,6 +803,21 @@ impl RunningTopology { } } + for key in diff + .enrichment_tables + .changed_and_added() + .filter_map(|key| { + self.config + .enrichment_table(key) + .and_then(|t| t.as_sink(key).map(|(key, _)| key)) + }) + { + if let Some(task) = new_pieces.tasks.get(&key) { + self.component_type_names + .insert(key.clone(), task.typetag().to_string()); + } + } + for (key, input) in &new_pieces.inputs { self.inputs_tap_metadata .insert(key.clone(), input.1.clone()); @@ -965,7 +980,10 @@ impl RunningTopology { for input in inputs { let output = self.outputs.get_mut(&input).expect("unknown output"); - if diff.contains(&input.component) || inputs_to_add.contains(&input) { + if diff.contains(&input.component) + || diff.is_changed(key) + || inputs_to_add.contains(&input) + { // If the input we're connecting to is changing, that means its outputs will have been // recreated, so instead of replacing a paused sink, we have to add it to this new // output for the first time, since there's nothing to actually replace at this point. From 58b908edb383c514e5b0ebff929702d7bf8be20b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 1 Jun 2026 14:52:36 +0200 Subject: [PATCH 02/19] Add changelog entry --- changelog.d/25547_memory_table_state_loss.fix.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/25547_memory_table_state_loss.fix.md diff --git a/changelog.d/25547_memory_table_state_loss.fix.md b/changelog.d/25547_memory_table_state_loss.fix.md new file mode 100644 index 0000000000000..da32c70606de5 --- /dev/null +++ b/changelog.d/25547_memory_table_state_loss.fix.md @@ -0,0 +1,3 @@ +Fixed an issue where memory enrichment table state could be lost or detached from the sink component on configuration reload. + +authors: esensar Quad9DNS From 04110a373c5471952e289a5f81344c284432c4d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 2 Jun 2026 13:19:24 +0200 Subject: [PATCH 03/19] Restore old state when building the new table --- lib/vector-vrl/enrichment/src/test_util.rs | 19 +++++++++++++ lib/vector-vrl/tests/src/test_enrichment.rs | 19 +++++++++++++ src/config/enrichment_table.rs | 1 + src/enrichment_tables/file.rs | 1 + src/enrichment_tables/geoip.rs | 1 + src/enrichment_tables/memory/config.rs | 22 +++++++++++---- src/enrichment_tables/mmdb.rs | 1 + src/topology/builder.rs | 31 ++++++++++----------- 8 files changed, 73 insertions(+), 22 deletions(-) diff --git a/lib/vector-vrl/enrichment/src/test_util.rs b/lib/vector-vrl/enrichment/src/test_util.rs index 38724f36e2dd8..87195fe45a389 100644 --- a/lib/vector-vrl/enrichment/src/test_util.rs +++ b/lib/vector-vrl/enrichment/src/test_util.rs @@ -69,6 +69,25 @@ impl Table for DummyEnrichmentTable { fn needs_reload(&self) -> bool { false } + + fn stateful(&self) -> bool { + false + } + + fn take_state( + &mut self, + other: Box, + ) -> Result<(), (Box, Error)> { + panic!("Unsupported") + } + + fn into_any(self: Box) -> Box { + self + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } /// Create a table registry with dummy data diff --git a/lib/vector-vrl/tests/src/test_enrichment.rs b/lib/vector-vrl/tests/src/test_enrichment.rs index a60e8b4b06553..9e33e1f94d7d1 100644 --- a/lib/vector-vrl/tests/src/test_enrichment.rs +++ b/lib/vector-vrl/tests/src/test_enrichment.rs @@ -59,6 +59,25 @@ impl enrichment::Table for TestEnrichmentTable { fn needs_reload(&self) -> bool { false } + + fn stateful(&self) -> bool { + false + } + + fn take_state( + &mut self, + other: Box, + ) -> Result<(), (Box, enrichment::Error)> { + panic!("Unsupported") + } + + fn into_any(self: Box) -> Box { + self + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } pub(crate) fn test_enrichment_table() -> enrichment::TableRegistry { diff --git a/src/config/enrichment_table.rs b/src/config/enrichment_table.rs index 540a91875b463..1f836e7ce7320 100644 --- a/src/config/enrichment_table.rs +++ b/src/config/enrichment_table.rs @@ -124,6 +124,7 @@ pub trait EnrichmentTableConfig: NamedComponent + core::fmt::Debug + Send + Sync async fn build( &self, globals: &GlobalOptions, + prev_table: Option>, ) -> crate::Result>; fn sink_config( diff --git a/src/enrichment_tables/file.rs b/src/enrichment_tables/file.rs index 20c3aa42d8507..24444ac56557c 100644 --- a/src/enrichment_tables/file.rs +++ b/src/enrichment_tables/file.rs @@ -239,6 +239,7 @@ impl EnrichmentTableConfig for FileConfig { async fn build( &self, globals: &crate::config::GlobalOptions, + _prev_table: Option>, ) -> crate::Result> { Ok(Box::new(File::new( self.clone(), diff --git a/src/enrichment_tables/geoip.rs b/src/enrichment_tables/geoip.rs index 19c519aaa589e..ff577a3a33652 100644 --- a/src/enrichment_tables/geoip.rs +++ b/src/enrichment_tables/geoip.rs @@ -101,6 +101,7 @@ impl EnrichmentTableConfig for GeoipConfig { async fn build( &self, _: &crate::config::GlobalOptions, + _: Option>, ) -> crate::Result> { Ok(Box::new(Geoip::new(self.clone())?)) } diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 9b17ce29750c3..7c94428bf1b6b 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -136,10 +136,21 @@ const fn default_scan_interval() -> NonZeroU64 { } impl MemoryConfig { - pub(super) async fn get_or_build_memory(&self) -> Memory { + pub(super) async fn get_or_build_memory( + &self, + prev_table: Option>, + ) -> Memory { let mut boxed_memory = self.memory.lock().await; *boxed_memory - .get_or_insert_with(|| Box::new(Memory::new(self.clone()))) + .get_or_insert_with(|| { + let mut memory = Memory::new(self.clone()); + if let Some(prev) = prev_table { + if let Err((_, err)) = memory.take_state(prev) { + error!(message = "Unable to move the state to the new table.", %err); + } + } + Box::new(memory) + }) .clone() } } @@ -148,8 +159,9 @@ impl EnrichmentTableConfig for MemoryConfig { async fn build( &self, _globals: &crate::config::GlobalOptions, + prev_table: Option>, ) -> crate::Result> { - Ok(Box::new(self.get_or_build_memory().await)) + Ok(Box::new(self.get_or_build_memory(prev_table).await)) } fn sink_config( @@ -177,7 +189,7 @@ impl EnrichmentTableConfig for MemoryConfig { #[typetag::serde(name = "memory_enrichment_table")] impl SinkConfig for MemoryConfig { async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let sink = VectorSink::from_event_streamsink(self.get_or_build_memory().await); + let sink = VectorSink::from_event_streamsink(self.get_or_build_memory(None).await); Ok((sink, future::ok(()).boxed())) } @@ -195,7 +207,7 @@ impl SinkConfig for MemoryConfig { #[typetag::serde(name = "memory_enrichment_table")] impl SourceConfig for MemoryConfig { async fn build(&self, cx: SourceContext) -> crate::Result { - let memory = self.get_or_build_memory().await; + let memory = self.get_or_build_memory(None).await; let log_namespace = cx.log_namespace(self.log_namespace); diff --git a/src/enrichment_tables/mmdb.rs b/src/enrichment_tables/mmdb.rs index 8c36a704bd28a..db056fb04ee45 100644 --- a/src/enrichment_tables/mmdb.rs +++ b/src/enrichment_tables/mmdb.rs @@ -36,6 +36,7 @@ impl EnrichmentTableConfig for MmdbConfig { async fn build( &self, _: &crate::config::GlobalOptions, + _: Option>, ) -> crate::Result> { Ok(Box::new(Mmdb::new(self.clone())?)) } diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 3d4a05181e588..9d4b011f81ac4 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -192,7 +192,19 @@ impl<'a> Builder<'a> { None }; - let mut table = match table_outer.inner.build(&self.config.global).await { + let mut prev_table = None; + if !self.diff.enrichment_tables.is_added(name) + && let Some(existing_table) = ENRICHMENT_TABLES.get(&table_name) + && existing_table.stateful() + { + prev_table = Some(existing_table) + } + + let mut table = match table_outer + .inner + .build(&self.config.global, prev_table) + .await + { Ok(table) => table, Err(error) => { self.errors @@ -220,21 +232,6 @@ impl<'a> Builder<'a> { } } - if !self.diff.enrichment_tables.is_added(name) - && let Some(existing_table) = ENRICHMENT_TABLES.get(&table_name) - && existing_table.stateful() - && table.stateful() - { - match table.take_state(existing_table) { - Ok(()) => (), - Err((existing, err)) => { - error!(message = "Unable to move the state to the new table.", table = ?name.to_string(), %err); - enrichment_tables.insert(table_name, existing); - continue 'tables; - } - } - } - enrichment_tables.insert(table_name, table); } } @@ -988,7 +985,7 @@ pub async fn reload_enrichment_tables(config: &Config) { { let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name)); - let mut table = match table_outer.inner.build(&config.global).await { + let mut table = match table_outer.inner.build(&config.global, None).await { Ok(table) => table, Err(error) => { error!("Enrichment table \"{name}\" reload failed: {error}"); From 0e0952c6f117a802ea7e585b35b141623c0618e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 2 Jun 2026 15:57:59 +0200 Subject: [PATCH 04/19] Fix reload handling for enrichment tables --- src/config/diff.rs | 9 ++++++--- src/topology/running.rs | 22 +++++++++++++++++----- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/src/config/diff.rs b/src/config/diff.rs index ebedf3c73d5c3..e824603b0440b 100644 --- a/src/config/diff.rs +++ b/src/config/diff.rs @@ -29,6 +29,7 @@ impl ConfigDiff { enrichment_tables: Difference::from_enrichment_tables( &old.enrichment_tables, &new.enrichment_tables, + &components_to_reload, ), components_to_reload, } @@ -56,7 +57,7 @@ impl ConfigDiff { self.sources.is_changed(key) || self.transforms.is_changed(key) || self.sinks.is_changed(key) - || self.enrichment_tables.contains(key) + || self.enrichment_tables.is_changed(key) } /// Checks whether the given component is removed. @@ -64,7 +65,7 @@ impl ConfigDiff { self.sources.is_removed(key) || self.transforms.is_removed(key) || self.sinks.is_removed(key) - || self.enrichment_tables.contains(key) + || self.enrichment_tables.is_removed(key) } } @@ -116,6 +117,7 @@ impl Difference { fn from_enrichment_tables( old: &IndexMap>, new: &IndexMap>, + need_change: &HashSet, ) -> Self { let old_table_keys = extract_table_component_keys(old); let new_table_keys = extract_table_component_keys(new); @@ -131,7 +133,7 @@ impl Difference { // which can iterate in varied orders. let old_value = serde_json::to_value(&old[*table_key]).unwrap(); let new_value = serde_json::to_value(&new[*table_key]).unwrap(); - old_value != new_value + old_value != new_value || need_change.contains(*table_key) }) .cloned() .map(|(_table_key, derived_component_key)| derived_component_key) @@ -305,6 +307,7 @@ mod tests { let diff = Difference::from_enrichment_tables( &old_config.enrichment_tables, &new_config.enrichment_tables, + &Default::default(), ); assert_eq!(diff.to_add, HashSet::from_iter(["memory_table_new".into()])); diff --git a/src/topology/running.rs b/src/topology/running.rs index e810bec4db39e..84545aab49b29 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -563,6 +563,12 @@ impl RunningTopology { .sinks .to_change .iter() + .chain(diff.enrichment_tables.to_change.iter().filter(|key| { + self.config + .enrichment_table(key) + .and_then(|t| t.as_sink(key)) + .is_some() + })) .filter(|&key| { if diff.components_to_reload.contains(key) { return false; @@ -592,10 +598,15 @@ impl RunningTopology { .iter() .filter(|key| { !reuse_buffers.contains(*key) - && self + && (self .config .sink(key) .is_some_and(|s| s.buffer.has_disk_stage()) + || self + .config + .enrichment_table(key) + .and_then(|t| t.as_sink(key)) + .is_some_and(|(_, s)| s.buffer.has_disk_stage())) }) .cloned() .collect::>(); @@ -751,6 +762,7 @@ impl RunningTopology { for key in removed_sinks { // Sinks only have inputs self.inputs_tap_metadata.remove(key); + self.component_type_names.remove(key); } let removed_sources = diff.enrichment_tables.to_remove.iter().filter_map(|key| { @@ -761,6 +773,7 @@ impl RunningTopology { for key in removed_sources { // Sources only have outputs self.outputs_tap_metadata.remove(&key); + self.component_type_names.remove(&key); } for key in diff.sources.changed_and_added() { @@ -784,6 +797,8 @@ impl RunningTopology { if let Some(task) = new_pieces.tasks.get(&key) { self.outputs_tap_metadata .insert(key.clone(), ("source", task.typetag().to_string())); + self.component_type_names + .insert(key.clone(), task.typetag().to_string()); } } @@ -980,10 +995,7 @@ impl RunningTopology { for input in inputs { let output = self.outputs.get_mut(&input).expect("unknown output"); - if diff.contains(&input.component) - || diff.is_changed(key) - || inputs_to_add.contains(&input) - { + if diff.contains(&input.component) || inputs_to_add.contains(&input) { // If the input we're connecting to is changing, that means its outputs will have been // recreated, so instead of replacing a paused sink, we have to add it to this new // output for the first time, since there's nothing to actually replace at this point. From a696cb734eaac2feb834fbe84c701f86e742b21a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 3 Jun 2026 06:01:20 +0200 Subject: [PATCH 05/19] Add additional types for enrichment tables which act as more component types --- src/app.rs | 10 +++++++++- src/config/mod.rs | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/app.rs b/src/app.rs index 580c06642a6e3..d17bdb8aba4ea 100644 --- a/src/app.rs +++ b/src/app.rs @@ -611,11 +611,19 @@ pub async fn load_configs( for (name, table) in config.enrichment_tables() { let files = table.inner.files_to_watch(); let component_config = ComponentConfig::new( - files.into_iter().cloned().collect(), + files.clone().into_iter().cloned().collect(), name.clone(), ComponentType::EnrichmentTable, ); watched_component_paths.push(component_config); + if table.as_sink(name).is_some() { + let sink_component_config = ComponentConfig::new( + files.into_iter().cloned().collect(), + name.clone(), + ComponentType::Sink, + ); + watched_component_paths.push(sink_component_config); + } } info!( diff --git a/src/config/mod.rs b/src/config/mod.rs index 4a24954926c13..abe8f81e9c1b5 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -77,7 +77,7 @@ pub use vector_lib::{ }; #[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)] -// // This is not a comprehensive set; variants are added as needed. +// This is not a comprehensive set; variants are added as needed. pub enum ComponentType { Transform, Sink, From 664d2ae9cd26846f7673dd33e4828851ad98f14e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 3 Jun 2026 06:23:14 +0200 Subject: [PATCH 06/19] Change stateful flag check with sink check --- lib/vector-vrl/enrichment/src/tables.rs | 12 ------------ src/topology/builder.rs | 3 ++- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/lib/vector-vrl/enrichment/src/tables.rs b/lib/vector-vrl/enrichment/src/tables.rs index 21f9fa4f8784c..bc64137fffb47 100644 --- a/lib/vector-vrl/enrichment/src/tables.rs +++ b/lib/vector-vrl/enrichment/src/tables.rs @@ -197,18 +197,6 @@ impl TableRegistry { } } - /// Checks if the table is stateful. - /// If in doubt (the table isn't in our list) we return false. - pub fn is_stateful(&self, table: &str) -> bool { - match &**self.tables.load() { - Some(tables) => tables - .get(table) - .map(|table| table.stateful()) - .unwrap_or(false), - None => false, - } - } - pub fn get(&self, table: &str) -> Option> { match &**self.tables.load() { Some(tables) => tables.get(table).cloned(), diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 5188c4368aacf..014ef5c25ee71 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -981,7 +981,8 @@ pub async fn reload_enrichment_tables(config: &Config) { 'tables: for (name, table_outer) in config.enrichment_tables.iter() { let table_name = name.to_string(); if ENRICHMENT_TABLES.needs_reload(&table_name) - && !ENRICHMENT_TABLES.is_stateful(&table_name) + // Tables that can act as sinks are reloaded through topology + && table_outer.as_sink(name).is_none() { let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name)); From d36e7434cb720509f0e52163cc0f1459f418c47e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 5 Jun 2026 11:28:49 +0200 Subject: [PATCH 07/19] Remove unused enrichment error --- lib/vector-vrl/enrichment/src/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/vector-vrl/enrichment/src/lib.rs b/lib/vector-vrl/enrichment/src/lib.rs index c275880e4b77c..bd198a2cb2c2f 100644 --- a/lib/vector-vrl/enrichment/src/lib.rs +++ b/lib/vector-vrl/enrichment/src/lib.rs @@ -79,8 +79,6 @@ pub enum Error { Internal { source: InternalError }, #[snafu(display("Table {table} not loaded"))] TableNotLoaded { table: String }, - #[snafu(display("Table configuration is not compatible for reload"))] - IncompatibleTableConfig, #[snafu(display("Table type changed with configuration change, state lost"))] TableTypeMismatch, } From ed83146c85b28f8b18bbecd731a4ffebae968cb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 5 Jun 2026 12:17:58 +0200 Subject: [PATCH 08/19] Simplify state extraction from tables --- lib/vector-vrl/enrichment/src/lib.rs | 19 +++------- src/config/enrichment_table.rs | 4 +-- src/enrichment_tables/file.rs | 21 +---------- src/enrichment_tables/geoip.rs | 21 +---------- src/enrichment_tables/memory/config.rs | 16 ++++----- src/enrichment_tables/memory/table.rs | 50 ++++++++++++-------------- src/enrichment_tables/mmdb.rs | 21 +---------- src/topology/builder.rs | 7 ++-- 8 files changed, 41 insertions(+), 118 deletions(-) diff --git a/lib/vector-vrl/enrichment/src/lib.rs b/lib/vector-vrl/enrichment/src/lib.rs index bd198a2cb2c2f..0f2fef3572b16 100644 --- a/lib/vector-vrl/enrichment/src/lib.rs +++ b/lib/vector-vrl/enrichment/src/lib.rs @@ -8,8 +8,6 @@ pub mod tables; mod test_util; mod vrl_util; -use std::any::Any; - use dyn_clone::DynClone; use indoc::indoc; use snafu::Snafu; @@ -146,19 +144,10 @@ pub trait Table: DynClone { /// Returns true if the underlying data has changed and the table needs reloading. fn needs_reload(&self) -> bool; - /// Returns true if this table holds state that needs to be moved in case of reload. - fn stateful(&self) -> bool; - - /// Moves state from other table into this table and return back the other table if the move has - /// failed. - fn take_state( - &mut self, - other: Box, - ) -> Result<(), (Box, Error)>; - - fn into_any(self: Box) -> Box; - - fn as_any(&self) -> &dyn Any; + /// Extracts state from this table + fn extract_state(&self) -> Option> { + None + } } dyn_clone::clone_trait_object!(Table); diff --git a/src/config/enrichment_table.rs b/src/config/enrichment_table.rs index 1f836e7ce7320..baf09124da40d 100644 --- a/src/config/enrichment_table.rs +++ b/src/config/enrichment_table.rs @@ -112,7 +112,7 @@ where /// Generalized interface for describing and building enrichment table components. #[enum_dispatch] pub trait EnrichmentTableConfig: NamedComponent + core::fmt::Debug + Send + Sync { - /// Builds the enrichment table with the given globals. + /// Builds the enrichment table with the given globals and previous table state. /// /// If the enrichment table is built successfully, `Ok(...)` is returned containing the /// enrichment table. @@ -124,7 +124,7 @@ pub trait EnrichmentTableConfig: NamedComponent + core::fmt::Debug + Send + Sync async fn build( &self, globals: &GlobalOptions, - prev_table: Option>, + prev_state: Option>, ) -> crate::Result>; fn sink_config( diff --git a/src/enrichment_tables/file.rs b/src/enrichment_tables/file.rs index 24444ac56557c..c2c82c41e8a71 100644 --- a/src/enrichment_tables/file.rs +++ b/src/enrichment_tables/file.rs @@ -239,7 +239,7 @@ impl EnrichmentTableConfig for FileConfig { async fn build( &self, globals: &crate::config::GlobalOptions, - _prev_table: Option>, + _prev_state: Option>, ) -> crate::Result> { Ok(Box::new(File::new( self.clone(), @@ -675,25 +675,6 @@ impl Table for File { .and_then(|metadata| metadata.modified()), Ok(modified) if modified > self.last_modified) } - - fn stateful(&self) -> bool { - false - } - - fn take_state( - &mut self, - _other: Box, - ) -> Result<(), (Box, Error)> { - panic!("File table is not stateful, can't use take_state") - } - - fn into_any(self: Box) -> Box { - self - } - - fn as_any(&self) -> &dyn std::any::Any { - self - } } impl std::fmt::Debug for File { diff --git a/src/enrichment_tables/geoip.rs b/src/enrichment_tables/geoip.rs index ff577a3a33652..e04e2db176541 100644 --- a/src/enrichment_tables/geoip.rs +++ b/src/enrichment_tables/geoip.rs @@ -101,7 +101,7 @@ impl EnrichmentTableConfig for GeoipConfig { async fn build( &self, _: &crate::config::GlobalOptions, - _: Option>, + _: Option>, ) -> crate::Result> { Ok(Box::new(Geoip::new(self.clone())?)) } @@ -346,25 +346,6 @@ impl Table for Geoip { .and_then(|metadata| metadata.modified()), Ok(modified) if modified > self.last_modified) } - - fn stateful(&self) -> bool { - false - } - - fn take_state( - &mut self, - _other: Box, - ) -> Result<(), (Box, Error)> { - panic!("MMDB table is not stateful, can't use take_state") - } - - fn into_any(self: Box) -> Box { - self - } - - fn as_any(&self) -> &dyn std::any::Any { - self - } } impl std::fmt::Debug for Geoip { diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 7c94428bf1b6b..415f53ebd6657 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -138,18 +138,16 @@ const fn default_scan_interval() -> NonZeroU64 { impl MemoryConfig { pub(super) async fn get_or_build_memory( &self, - prev_table: Option>, + prev_state: Option>, ) -> Memory { let mut boxed_memory = self.memory.lock().await; *boxed_memory .get_or_insert_with(|| { - let mut memory = Memory::new(self.clone()); - if let Some(prev) = prev_table { - if let Err((_, err)) = memory.take_state(prev) { - error!(message = "Unable to move the state to the new table.", %err); - } + if let Some(prev) = prev_state { + Box::new(Memory::from_previous_state(self.clone(), prev)) + } else { + Box::new(Memory::new(self.clone())) } - Box::new(memory) }) .clone() } @@ -159,9 +157,9 @@ impl EnrichmentTableConfig for MemoryConfig { async fn build( &self, _globals: &crate::config::GlobalOptions, - prev_table: Option>, + prev_state: Option>, ) -> crate::Result> { - Ok(Box::new(self.get_or_build_memory(prev_table).await)) + Ok(Box::new(self.get_or_build_memory(prev_state).await)) } fn sink_config( diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index 331633518adfd..9307cfdf22e53 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -142,6 +142,25 @@ impl Memory { } } + /// Creates a new [Memory] based on the provided config and previous state. + pub fn from_previous_state( + config: MemoryConfig, + prev_state: Box, + ) -> Self { + if let Ok(prev_memory) = prev_state.downcast::() { + Self { + config, + read_handle_factory: prev_memory.read_handle_factory, + read_handle: prev_memory.read_handle, + write_handle: prev_memory.write_handle, + expired_items_sender: prev_memory.expired_items_sender, + expired_items_receiver: prev_memory.expired_items_receiver, + } + } else { + Self::new(config) + } + } + pub(super) fn get_read_handle(&self) -> &evmap::ReadHandle { self.read_handle .get_or(|| self.read_handle_factory.handle()) @@ -387,33 +406,8 @@ impl Table for Memory { true } - fn stateful(&self) -> bool { - true - } - - fn take_state( - &mut self, - other: Box, - ) -> Result<(), (Box, Error)> { - if !other.as_any().is::() { - return Err((other, Error::TableTypeMismatch)); - } - // Type checked already - let other_memory = other.into_any().downcast::().unwrap(); - self.write_handle = other_memory.write_handle; - self.read_handle = other_memory.read_handle; - self.read_handle_factory = other_memory.read_handle_factory; - self.expired_items_sender = other_memory.expired_items_sender; - self.expired_items_receiver = other_memory.expired_items_receiver; - Ok(()) - } - - fn into_any(self: Box) -> Box { - self - } - - fn as_any(&self) -> &dyn std::any::Any { - self + fn extract_state(&self) -> Option> { + Some(Box::new(self.clone())) } } @@ -1052,7 +1046,7 @@ mod tests { export_expired_items: false, source_key: "test".to_string(), }); - let memory = memory_config.get_or_build_memory().await; + let memory = memory_config.get_or_build_memory(None).await; memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))])); let mut events: Vec = run_and_assert_source_compliance( diff --git a/src/enrichment_tables/mmdb.rs b/src/enrichment_tables/mmdb.rs index db056fb04ee45..d01b3fe320c0a 100644 --- a/src/enrichment_tables/mmdb.rs +++ b/src/enrichment_tables/mmdb.rs @@ -36,7 +36,7 @@ impl EnrichmentTableConfig for MmdbConfig { async fn build( &self, _: &crate::config::GlobalOptions, - _: Option>, + _: Option>, ) -> crate::Result> { Ok(Box::new(Mmdb::new(self.clone())?)) } @@ -165,25 +165,6 @@ impl Table for Mmdb { .and_then(|metadata| metadata.modified()), Ok(modified) if modified > self.last_modified) } - - fn stateful(&self) -> bool { - false - } - - fn take_state( - &mut self, - _other: Box, - ) -> Result<(), (Box, Error)> { - panic!("MMDB table is not stateful, can't use take_state") - } - - fn into_any(self: Box) -> Box { - self - } - - fn as_any(&self) -> &dyn std::any::Any { - self - } } impl std::fmt::Debug for Mmdb { diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 59032d9a2bdcd..efa7bdf6465f7 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -192,17 +192,16 @@ impl<'a> Builder<'a> { None }; - let mut prev_table = None; + let mut prev_state = None; if !self.diff.enrichment_tables.is_added(name) && let Some(existing_table) = ENRICHMENT_TABLES.get(&table_name) - && existing_table.stateful() { - prev_table = Some(existing_table) + prev_state = existing_table.extract_state(); } let mut table = match table_outer .inner - .build(&self.config.global, prev_table) + .build(&self.config.global, prev_state) .await { Ok(table) => table, From 1ed0732c6689caf24e8cbc34b663ea90b98ec58a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 5 Jun 2026 12:19:00 +0200 Subject: [PATCH 09/19] Remove another unused error from enrichment --- lib/vector-vrl/enrichment/src/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/vector-vrl/enrichment/src/lib.rs b/lib/vector-vrl/enrichment/src/lib.rs index 0f2fef3572b16..a5c64ea474411 100644 --- a/lib/vector-vrl/enrichment/src/lib.rs +++ b/lib/vector-vrl/enrichment/src/lib.rs @@ -77,8 +77,6 @@ pub enum Error { Internal { source: InternalError }, #[snafu(display("Table {table} not loaded"))] TableNotLoaded { table: String }, - #[snafu(display("Table type changed with configuration change, state lost"))] - TableTypeMismatch, } #[derive(Clone, Debug, PartialEq, Eq, Snafu)] From 4ab2ce31c766e1029bf3297c5de6e4522f5d7104 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 5 Jun 2026 12:26:24 +0200 Subject: [PATCH 10/19] Add a test case for memory table state preservation --- src/enrichment_tables/memory/table.rs | 38 +++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index 9307cfdf22e53..ad32e12351a20 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -488,6 +488,7 @@ mod tests { use super::*; use crate::{ + config::EnrichmentTableConfig, enrichment_tables::memory::{ config::MemorySourceConfig, internal_events::InternalMetricsConfig, }, @@ -524,6 +525,43 @@ mod tests { ); } + #[tokio::test] + async fn extract_state_preserves_data() { + let memory = Memory::new(Default::default()); + memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))])); + + let condition = Condition::Equals { + field: "key", + value: Value::from("test_key"), + }; + + let expected = ObjectMap::from([ + ("key".into(), Value::from("test_key")), + ("ttl".into(), Value::from(memory.config.ttl)), + ("value".into(), Value::from(5)), + ]); + assert_eq!( + Ok(expected.clone()), + memory.find_table_row( + Case::Sensitive, + std::slice::from_ref(&condition), + None, + None, + None + ) + ); + + // Now build a new table using old state + let new_memory = MemoryConfig::default() + .build(&Default::default(), memory.extract_state()) + .await + .unwrap(); + assert_eq!( + Ok(expected), + new_memory.find_table_row(Case::Sensitive, &[condition], None, None, None) + ); + } + #[test] fn calculates_ttl() { let ttl = 100; From f4cb301f9e25c3a5230be25d72d42b6ae1732d6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 5 Jun 2026 13:37:53 +0200 Subject: [PATCH 11/19] Remove obsolete trait methods from enrichment tests --- lib/vector-vrl/enrichment/src/test_util.rs | 19 ------------------- lib/vector-vrl/tests/src/test_enrichment.rs | 19 ------------------- 2 files changed, 38 deletions(-) diff --git a/lib/vector-vrl/enrichment/src/test_util.rs b/lib/vector-vrl/enrichment/src/test_util.rs index 87195fe45a389..38724f36e2dd8 100644 --- a/lib/vector-vrl/enrichment/src/test_util.rs +++ b/lib/vector-vrl/enrichment/src/test_util.rs @@ -69,25 +69,6 @@ impl Table for DummyEnrichmentTable { fn needs_reload(&self) -> bool { false } - - fn stateful(&self) -> bool { - false - } - - fn take_state( - &mut self, - other: Box, - ) -> Result<(), (Box, Error)> { - panic!("Unsupported") - } - - fn into_any(self: Box) -> Box { - self - } - - fn as_any(&self) -> &dyn std::any::Any { - self - } } /// Create a table registry with dummy data diff --git a/lib/vector-vrl/tests/src/test_enrichment.rs b/lib/vector-vrl/tests/src/test_enrichment.rs index 9e33e1f94d7d1..a60e8b4b06553 100644 --- a/lib/vector-vrl/tests/src/test_enrichment.rs +++ b/lib/vector-vrl/tests/src/test_enrichment.rs @@ -59,25 +59,6 @@ impl enrichment::Table for TestEnrichmentTable { fn needs_reload(&self) -> bool { false } - - fn stateful(&self) -> bool { - false - } - - fn take_state( - &mut self, - other: Box, - ) -> Result<(), (Box, enrichment::Error)> { - panic!("Unsupported") - } - - fn into_any(self: Box) -> Box { - self - } - - fn as_any(&self) -> &dyn std::any::Any { - self - } } pub(crate) fn test_enrichment_table() -> enrichment::TableRegistry { From e9f9738d8654693dc1a02541c980cc92c3796876 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 8 Jun 2026 15:16:37 +0200 Subject: [PATCH 12/19] Prevent needless clone when extracting state from enrichment tables --- lib/vector-vrl/enrichment/src/tables.rs | 11 ++++++++--- src/topology/builder.rs | 6 ++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/lib/vector-vrl/enrichment/src/tables.rs b/lib/vector-vrl/enrichment/src/tables.rs index bc64137fffb47..a94c460d4a3da 100644 --- a/lib/vector-vrl/enrichment/src/tables.rs +++ b/lib/vector-vrl/enrichment/src/tables.rs @@ -197,10 +197,15 @@ impl TableRegistry { } } - pub fn get(&self, table: &str) -> Option> { + /// Extracts state from the table if available. + pub fn extract_state(&self, table: &str) -> Option> { match &**self.tables.load() { - Some(tables) => tables.get(table).cloned(), - None => None, + Some(tables) => { + tables.get(table).and_then(|t| t.extract_state()) + } + None => { + None + } } } } diff --git a/src/topology/builder.rs b/src/topology/builder.rs index efa7bdf6465f7..94a6b351434cb 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -193,10 +193,8 @@ impl<'a> Builder<'a> { }; let mut prev_state = None; - if !self.diff.enrichment_tables.is_added(name) - && let Some(existing_table) = ENRICHMENT_TABLES.get(&table_name) - { - prev_state = existing_table.extract_state(); + if !self.diff.enrichment_tables.is_added(name) { + prev_state = ENRICHMENT_TABLES.extract_state(&table_name); } let mut table = match table_outer From 79eb2b1b85565a7861f442a3ca69b11721c55513 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 8 Jun 2026 17:25:07 +0200 Subject: [PATCH 13/19] Add topology test for enrichment table state extraction --- lib/vector-vrl/enrichment/src/tables.rs | 8 +- src/enrichment_tables/memory/table.rs | 2 + src/topology/running.rs | 39 ++++++- src/topology/test/mod.rs | 3 +- src/topology/test/reload.rs | 135 +++++++++++++++++++++++- 5 files changed, 174 insertions(+), 13 deletions(-) diff --git a/lib/vector-vrl/enrichment/src/tables.rs b/lib/vector-vrl/enrichment/src/tables.rs index a94c460d4a3da..8e727a607896e 100644 --- a/lib/vector-vrl/enrichment/src/tables.rs +++ b/lib/vector-vrl/enrichment/src/tables.rs @@ -200,12 +200,8 @@ impl TableRegistry { /// Extracts state from the table if available. pub fn extract_state(&self, table: &str) -> Option> { match &**self.tables.load() { - Some(tables) => { - tables.get(table).and_then(|t| t.extract_state()) - } - None => { - None - } + Some(tables) => tables.get(table).and_then(|t| t.extract_state()), + None => None, } } } diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index ad32e12351a20..cc05c4daa33a7 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -407,6 +407,8 @@ impl Table for Memory { } fn extract_state(&self) -> Option> { + let writer = self.write_handle.lock().expect("mutex poisoned"); + self.flush(writer); Some(Box::new(self.clone())) } } diff --git a/src/topology/running.rs b/src/topology/running.rs index 84545aab49b29..cbfb462a54850 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -421,12 +421,29 @@ impl RunningTopology { ) -> HashMap { // First, we shutdown any changed/removed sources. This ensures that we can allow downstream // components to terminate naturally by virtue of the flow of events stopping. - if diff.sources.any_changed_or_removed() { + if diff.sources.any_changed_or_removed() || diff.enrichment_tables.any_changed_or_removed() + { let timeout = Duration::from_secs(30); let mut source_shutdown_handles = Vec::new(); + let to_remove_table_sources = diff + .enrichment_tables + .to_remove + .iter() + .filter_map(|key| { + self.config + .enrichment_table(key) + .and_then(|t| t.as_source(key)) + .map(|(key, _)| key.clone()) + }) + .collect::>(); let deadline = Instant::now() + timeout; - for key in &diff.sources.to_remove { + for key in diff + .sources + .to_remove + .iter() + .chain(to_remove_table_sources.iter()) + { debug!(component_id = %key, "Removing source."); let previous = self.tasks.remove(key).unwrap(); @@ -437,7 +454,23 @@ impl RunningTopology { .push(self.shutdown_coordinator.shutdown_source(key, deadline)); } - for key in &diff.sources.to_change { + let to_change_table_sources = diff + .enrichment_tables + .to_change + .iter() + .filter_map(|key| { + self.config + .enrichment_table(key) + .and_then(|t| t.as_source(key)) + .map(|(key, _)| key.clone()) + }) + .collect::>(); + for key in diff + .sources + .to_change + .iter() + .chain(to_change_table_sources.iter()) + { debug!(component_id = %key, "Changing source."); self.remove_outputs(key); diff --git a/src/topology/test/mod.rs b/src/topology/test/mod.rs index f4ccf974158db..f71710fba4b06 100644 --- a/src/topology/test/mod.rs +++ b/src/topology/test/mod.rs @@ -44,7 +44,8 @@ mod latency_metrics; feature = "sources-prometheus", feature = "sinks-prometheus", feature = "sources-internal_metrics", - feature = "sources-splunk_hec" + feature = "sources-splunk_hec", + feature = "enrichment-tables-memory", ))] mod reload; #[cfg(all(feature = "sinks-console", feature = "sources-demo_logs"))] diff --git a/src/topology/test/reload.rs b/src/topology/test/reload.rs index 7b2db20d4f337..cfde01e71cae7 100644 --- a/src/topology/test/reload.rs +++ b/src/topology/test/reload.rs @@ -6,21 +6,31 @@ use std::{ }; use futures::StreamExt; -use tokio::time::sleep; +use tokio::{sync::oneshot::channel, time::sleep}; use tokio_stream::wrappers::UnboundedReceiverStream; use vector_lib::{ buffers::{BufferConfig, BufferType, MemoryBufferSize, WhenFull}, config::ComponentKey, + event::{Event, EventContainer, LogEvent}, }; use crate::{ - config::Config, + config::{Config, unit_test::UnitTestSourceConfig}, + enrichment_tables::{ + EnrichmentTables, + memory::{MemoryConfig, MemorySourceConfig}, + }, sinks::prometheus::exporter::PrometheusExporterConfig, sources::{ internal_metrics::InternalMetricsConfig, prometheus::PrometheusRemoteWriteConfig, splunk_hec::SplunkConfig, }, - test_util::{self, addr::next_addr, mock::basic_sink, start_topology, temp_dir, wait_for_tcp}, + test_util::{ + self, + addr::next_addr, + mock::{basic_sink, oneshot_sink}, + start_topology, temp_dir, wait_for_tcp, + }, topology::ReloadError::*, }; @@ -451,6 +461,125 @@ async fn topology_disk_buffer_config_change_chained_does_not_stall() { } } +#[tokio::test] +async fn topology_reload_preserves_enrichment_table_state() { + // Changing an enrichment table that has state and supports state preservation should preserve + // the state after reload, even if it was changed (if the state is still valid after the chaange). + test_util::trace_init(); + + let source_event = Event::Log(LogEvent::from("test")); + let (old_tx, old_rx) = channel(); + + let mut old_config = Config::builder(); + let mut old_memory_config = MemoryConfig::default(); + old_memory_config.source_config = Some(MemorySourceConfig { + export_interval: Some(NonZeroU64::new(1).unwrap()), + source_key: "memory_test_source".to_string(), + export_batch_size: None, + remove_after_export: false, + export_expired_items: false, + }); + old_memory_config.ttl = 100; + old_config.add_enrichment_table( + "memory_test", + &["in"], + EnrichmentTables::Memory(old_memory_config), + ); + old_config.add_source( + "in", + UnitTestSourceConfig { + events: vec![source_event.clone()], + }, + ); + old_config.add_sink("out", &["memory_test_source"], oneshot_sink(old_tx)); + + let (new_tx, new_rx) = channel(); + let mut new_config = Config::builder(); + let mut new_memory_config = MemoryConfig::default(); + new_memory_config.source_config = Some(MemorySourceConfig { + export_interval: Some(NonZeroU64::new(1).unwrap()), + source_key: "memory_test_source".to_string(), + export_batch_size: None, + remove_after_export: false, + export_expired_items: false, + }); + new_memory_config.ttl = 101; + new_config.add_enrichment_table( + "memory_test", + // No input to ensure old state is read and not new + &[], + EnrichmentTables::Memory(new_memory_config), + ); + new_config.add_source("in_2", UnitTestSourceConfig { events: vec![] }); + new_config.add_sink("out_2", &["memory_test_source"], oneshot_sink(new_tx)); + + let (mut topology, crash) = start_topology(old_config.build().unwrap(), false).await; + let mut crash_stream = UnboundedReceiverStream::new(crash); + + // Make sure the topology is fully running: other components, etc. + sleep(Duration::from_secs(2)).await; + + // let message = result + // .lines() + // .filter_map(|l| serde_json::from_str::(l).ok()) + // .find(|entry| { + // entry + // .get("key") + // .is_some_and(|k| k.as_str().is_some_and(|k| k == "message")) + // }) + // .and_then(|entry| { + // entry + // .get("value") + // .cloned() + // .map(|m| m.to_string_lossy().into_owned()) + // }); + // assert_eq!(message.unwrap(), log_msg); + tokio::select! { + events = old_rx => { + let events = events.expect("must get event to output"); + let events = events.into_events().collect::>(); + assert_eq!(events.len(), 2); + let message = events.into_iter().filter_map(|e| e.into_log().value().clone().into_object()).find(|e| e.get("key").is_some_and(|k| k.as_str().is_some_and(|k| k == "message"))) + .and_then(|entry| { + entry + .get("value") + .cloned() + .map(|m| m.to_string_lossy().into_owned()) + }); + assert_eq!(message.unwrap(), "test"); + } + _ = crash_stream.next() => panic!(), + } + + // Now reload the topology with the new configuration, and ensure the table still has the same state + topology + .reload_config_and_respawn(new_config.build().unwrap(), Default::default()) + .await + .unwrap(); + + // Give the old topology configuration a chance to shutdown cleanly, etc. + sleep(Duration::from_secs(2)).await; + tokio::select! { + events = new_rx => { + let events = events.expect("must get event to output"); + let events = events.into_events().collect::>(); + assert_eq!(events.len(), 2); + let message = events.into_iter().filter_map(|e| e.into_log().value().clone().into_object()).find(|e| e.get("key").is_some_and(|k| k.as_str().is_some_and(|k| k == "message"))) + .and_then(|entry| { + entry + .get("value") + .cloned() + .map(|m| m.to_string_lossy().into_owned()) + }); + assert_eq!(message.unwrap(), "test"); + }, + _ = tokio::time::sleep(Duration::from_secs(10)) => { + panic!("Never received the events") + } + _ = crash_stream.next() => panic!(), + } +} + async fn reload_sink_test( old_config: Config, new_config: Config, From 12939a04e61edb51244187792d11256e3e2a8d36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Wed, 17 Jun 2026 12:48:51 +0200 Subject: [PATCH 14/19] Add a configuration option for state preservation for memory table --- src/config/enrichment_table.rs | 5 +++++ src/enrichment_tables/memory/config.rs | 21 +++++++++++++++++++++ src/topology/builder.rs | 4 +++- src/topology/test/reload.rs | 18 ++---------------- 4 files changed, 31 insertions(+), 17 deletions(-) diff --git a/src/config/enrichment_table.rs b/src/config/enrichment_table.rs index baf09124da40d..a7595d8c46cbd 100644 --- a/src/config/enrichment_table.rs +++ b/src/config/enrichment_table.rs @@ -127,6 +127,11 @@ pub trait EnrichmentTableConfig: NamedComponent + core::fmt::Debug + Send + Sync prev_state: Option>, ) -> crate::Result>; + /// Checks whether this table wants previous state, to try and restore it. + fn wants_previous_state(&self) -> bool { + false + } + fn sink_config( &self, _default_key: &ComponentKey, diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 415f53ebd6657..b0738160c063c 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -68,11 +68,27 @@ pub struct MemoryConfig { #[configurable(derived)] #[serde(default)] pub ttl_field: OptionalValuePath, + /// Behavior for memory table state on configuration reload. + #[configurable(derived)] + #[serde(default)] + pub reload_behavior: ReloadBehavior, #[serde(skip)] memory: Arc>>>, } +/// Behavior for memory enrichment table state on configuration reload. +#[configurable_component] +#[derive(Clone, Default)] +#[serde(rename_all = "kebab-case")] +pub enum ReloadBehavior { + /// Always clear state on configuration reload. + #[default] + ClearState, + /// Try to preserve state when possible. + PreserveState, +} + /// Configuration for memory enrichment table source functionality. #[configurable_component] #[derive(Clone, Debug, PartialEq, Eq)] @@ -123,6 +139,7 @@ impl Default for MemoryConfig { source_config: None, internal_metrics: InternalMetricsConfig::default(), ttl_field: OptionalValuePath::none(), + reload_behavior: Default::default(), } } } @@ -162,6 +179,10 @@ impl EnrichmentTableConfig for MemoryConfig { Ok(Box::new(self.get_or_build_memory(prev_state).await)) } + fn wants_previous_state(&self) -> bool { + matches!(self.reload_behavior, ReloadBehavior::PreserveState) + } + fn sink_config( &self, default_key: &ComponentKey, diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 140191b4a907a..cf069942e270c 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -199,7 +199,9 @@ impl<'a> Builder<'a> { }; let mut prev_state = None; - if !self.diff.enrichment_tables.is_added(name) { + if !self.diff.enrichment_tables.is_added(name) + && table_outer.inner.wants_previous_state() + { prev_state = ENRICHMENT_TABLES.extract_state(&table_name); } diff --git a/src/topology/test/reload.rs b/src/topology/test/reload.rs index cfde01e71cae7..b86cf24aba0c4 100644 --- a/src/topology/test/reload.rs +++ b/src/topology/test/reload.rs @@ -18,7 +18,7 @@ use crate::{ config::{Config, unit_test::UnitTestSourceConfig}, enrichment_tables::{ EnrichmentTables, - memory::{MemoryConfig, MemorySourceConfig}, + memory::{MemoryConfig, MemorySourceConfig, ReloadBehavior}, }, sinks::prometheus::exporter::PrometheusExporterConfig, sources::{ @@ -496,6 +496,7 @@ async fn topology_reload_preserves_enrichment_table_state() { let (new_tx, new_rx) = channel(); let mut new_config = Config::builder(); let mut new_memory_config = MemoryConfig::default(); + new_memory_config.reload_behavior = ReloadBehavior::PreserveState; new_memory_config.source_config = Some(MemorySourceConfig { export_interval: Some(NonZeroU64::new(1).unwrap()), source_key: "memory_test_source".to_string(), @@ -519,21 +520,6 @@ async fn topology_reload_preserves_enrichment_table_state() { // Make sure the topology is fully running: other components, etc. sleep(Duration::from_secs(2)).await; - // let message = result - // .lines() - // .filter_map(|l| serde_json::from_str::(l).ok()) - // .find(|entry| { - // entry - // .get("key") - // .is_some_and(|k| k.as_str().is_some_and(|k| k == "message")) - // }) - // .and_then(|entry| { - // entry - // .get("value") - // .cloned() - // .map(|m| m.to_string_lossy().into_owned()) - // }); - // assert_eq!(message.unwrap(), log_msg); tokio::select! { events = old_rx => { let events = events.expect("must get event to output"); From e7869bd8451920c4ec0f49cc8f706da4701c73ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 18 Jun 2026 13:40:40 +0200 Subject: [PATCH 15/19] Reattach outputs from enrichment_table sinks in `reattach_severed_inputs` --- src/topology/running.rs | 38 +++++++++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index e1c5089234a58..9bc0147730fa7 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -1105,7 +1105,7 @@ impl RunningTopology { .transforms() .filter(|(key, _)| !diff.transforms.contains(key)); for (transform_key, transform) in unchanged_transforms { - let changed_outputs = get_changed_outputs(diff, transform.inputs.clone()); + let changed_outputs = get_changed_outputs(&self.config, diff, transform.inputs.clone()); for output_id in changed_outputs { debug!(component_id = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout."); @@ -1115,12 +1115,20 @@ impl RunningTopology { } } + let unchanged_table_sinks = self + .config + .enrichment_tables() + .filter(|(key, _)| !diff.enrichment_tables.contains(key)) + .filter_map(|(key, table)| table.as_sink(key)) + .collect::>(); let unchanged_sinks = self .config .sinks() .filter(|(key, _)| !diff.sinks.contains(key)); - for (sink_key, sink) in unchanged_sinks { - let changed_outputs = get_changed_outputs(diff, sink.inputs.clone()); + for (sink_key, sink) in + unchanged_sinks.chain(unchanged_table_sinks.iter().map(|(k, v)| (k, v))) + { + let changed_outputs = get_changed_outputs(&self.config, diff, sink.inputs.clone()); for output_id in changed_outputs { debug!(component_id = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout."); @@ -1475,10 +1483,30 @@ impl RunningTopology { } } -fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs) -> Vec { +fn get_changed_outputs( + config: &Config, + diff: &ConfigDiff, + output_ids: Inputs, +) -> Vec { let mut changed_outputs = Vec::new(); - for source_key in &diff.sources.to_change { + let to_change_table_sources = diff + .enrichment_tables + .to_change + .iter() + .filter_map(|key| { + config + .enrichment_table(key) + .and_then(|t| t.as_source(key)) + .map(|(key, _)| key.clone()) + }) + .collect::>(); + for source_key in diff + .sources + .to_change + .iter() + .chain(to_change_table_sources.iter()) + { changed_outputs.extend( output_ids .iter() From c2bb5df08523571c33b7c2f3a3019709229b522c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 18 Jun 2026 13:54:35 +0200 Subject: [PATCH 16/19] Reload enrichment tables if they are added or changed too --- src/enrichment_tables/memory/table.rs | 4 ++-- src/topology/builder.rs | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index cc05c4daa33a7..463df13604f99 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -401,9 +401,9 @@ impl Table for Memory { Vec::new() } - /// Has to be reloaded always, because a new component is created to insert data into it + /// Doesn't need reload, data is written directly fn needs_reload(&self) -> bool { - true + false } fn extract_state(&self) -> Option> { diff --git a/src/topology/builder.rs b/src/topology/builder.rs index cf069942e270c..9d41c57d43b88 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -189,7 +189,10 @@ impl<'a> Builder<'a> { // Build enrichment tables 'tables: for (name, table_outer) in self.config.enrichment_tables.iter() { let table_name = name.to_string(); - if ENRICHMENT_TABLES.needs_reload(&table_name) { + if ENRICHMENT_TABLES.needs_reload(&table_name) + || self.diff.enrichment_tables.is_changed(name) + || self.diff.enrichment_tables.is_added(name) + { let indexes = if !self.diff.enrichment_tables.is_added(name) { // If this is an existing enrichment table, we need to store the indexes to reapply // them again post load. From 0bded629cc4cd100e3aa858a0f9cbb88e6fa0493 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 18 Jun 2026 14:00:30 +0200 Subject: [PATCH 17/19] Finish enrichment tables load only on successful builds --- src/topology/builder.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 9d41c57d43b88..4d092f53671b9 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -143,11 +143,11 @@ impl<'a> Builder<'a> { self.build_transforms(enrichment_tables).await; self.build_sinks(enrichment_tables).await; - // We should have all the data for the enrichment tables loaded now, so switch them over to - // readonly. - enrichment_tables.finish_load(); - if self.errors.is_empty() { + // We should have all the data for the enrichment tables loaded now, so switch them over to + // readonly. + enrichment_tables.finish_load(); + Ok(TopologyPieces { inputs: self.inputs, outputs: Self::finalize_outputs(self.outputs), From e696b79da36b7dd26428b29dd3b6873e5938ad23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 22 Jun 2026 20:21:09 +0200 Subject: [PATCH 18/19] Generate component docs --- website/cue/reference/generated/configuration.cue | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index 20792a94ee322..3231e3d2da574 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -256,6 +256,18 @@ generated: configuration: { required: false relevant_when: "type = \"memory\"" } + reload_behavior: { + type: string: { + enum: { + "clear-state": "Always clear state on configuration reload." + "preserve-state": "Try to preserve state when possible." + } + default: "clear-state" + } + description: "Behavior for memory table state on configuration reload." + required: false + relevant_when: "type = \"memory\"" + } scan_interval: { type: uint: default: 30 description: """ From 2564ca377fe14eae61c283186c98273d5f972830 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Tue, 23 Jun 2026 06:37:33 +0200 Subject: [PATCH 19/19] Look up enrichment tables in correct config on reload --- src/topology/running.rs | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index 9bc0147730fa7..6aaf694e6d88b 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -329,7 +329,8 @@ impl RunningTopology { .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks) .await { - self.connect_diff(&diff, &mut new_pieces).await; + self.connect_diff(&diff, &mut new_pieces, Some(&new_config)) + .await; self.spawn_diff(&diff, new_pieces); self.config = new_config; @@ -355,7 +356,7 @@ impl RunningTopology { .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks) .await { - self.connect_diff(&diff, &mut new_pieces).await; + self.connect_diff(&diff, &mut new_pieces, None).await; self.spawn_diff(&diff, new_pieces); info!("Old configuration restored successfully."); @@ -762,6 +763,7 @@ impl RunningTopology { &mut self, diff: &ConfigDiff, new_pieces: &mut TopologyPieces, + new_config: Option<&Config>, ) { debug!("Connecting changed/added component(s)."); @@ -822,7 +824,8 @@ impl RunningTopology { .enrichment_tables .changed_and_added() .filter_map(|key| { - self.config + new_config + .unwrap_or(&self.config) .enrichment_table(key) .and_then(|t| t.as_source(key).map(|(key, _)| key)) }) @@ -855,7 +858,8 @@ impl RunningTopology { .enrichment_tables .changed_and_added() .filter_map(|key| { - self.config + new_config + .unwrap_or(&self.config) .enrichment_table(key) .and_then(|t| t.as_sink(key).map(|(key, _)| key)) }) @@ -879,12 +883,17 @@ impl RunningTopology { self.setup_outputs(key, new_pieces).await; } - let added_changed_table_sources: Vec<&ComponentKey> = diff + let added_changed_table_sources: Vec = diff .enrichment_tables .changed_and_added() - .filter(|k| new_pieces.source_tasks.contains_key(k)) + .filter_map(|key| { + new_config + .unwrap_or(&self.config) + .enrichment_table(key) + .and_then(|t| t.as_source(key).map(|(key, _)| key)) + }) .collect(); - for key in added_changed_table_sources.iter() { + for key in &added_changed_table_sources { debug!(component_id = %key, "Connecting outputs for enrichment table source."); self.setup_outputs(key, new_pieces).await; } @@ -908,12 +917,17 @@ impl RunningTopology { debug!(component_id = %key, "Connecting inputs for sink."); self.setup_inputs(key, diff, new_pieces).await; } - let added_changed_tables: Vec<&ComponentKey> = diff + let added_changed_tables: Vec = diff .enrichment_tables .changed_and_added() - .filter(|k| new_pieces.inputs.contains_key(k)) + .filter_map(|key| { + new_config + .unwrap_or(&self.config) + .enrichment_table(key) + .and_then(|t| t.as_sink(key).map(|(key, _)| key)) + }) .collect(); - for key in added_changed_tables.iter() { + for key in &added_changed_tables { debug!(component_id = %key, "Connecting inputs for enrichment table sink."); self.setup_inputs(key, diff, new_pieces).await; } @@ -1445,7 +1459,9 @@ impl RunningTopology { { return None; } - running_topology.connect_diff(&diff, &mut pieces).await; + running_topology + .connect_diff(&diff, &mut pieces, None) + .await; running_topology.spawn_diff(&diff, pieces); let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =