diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 96c8893e611cb..d325d1fc070d3 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -12,7 +12,7 @@ use metrics::Counter; use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire}; use tokio::{ select, - sync::{mpsc::UnboundedSender, oneshot}, + sync::{Mutex as AsyncMutex, mpsc::UnboundedSender, oneshot}, time::timeout, }; use tracing::{Instrument, Span}; @@ -66,6 +66,9 @@ use crate::{ static ENRICHMENT_TABLES: LazyLock = LazyLock::new(vector_lib::enrichment::TableRegistry::default); +// `TableRegistry::load` and `finish_load` are separate operations on a process-global registry. +// Keep topology builds and enrichment-table reloads from interleaving that transition. +static ENRICHMENT_TABLES_LOAD_LOCK: LazyLock> = LazyLock::new(AsyncMutex::default); static METRICS_STORAGE: LazyLock = LazyLock::new(MetricsStorage::default); pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock = @@ -133,6 +136,7 @@ impl<'a> Builder<'a> { /// Builds the new pieces of the topology found in `self.diff`. async fn build(mut self) -> Result> { + let _enrichment_tables_load_guard = ENRICHMENT_TABLES_LOAD_LOCK.lock().await; let enrichment_tables = self.load_enrichment_tables().await; let source_tasks = self.build_sources(enrichment_tables).await; self.build_transforms(enrichment_tables).await; @@ -1003,6 +1007,7 @@ async fn run_source_output_pump( } pub async fn reload_enrichment_tables(config: &Config) { + let _enrichment_tables_load_guard = ENRICHMENT_TABLES_LOAD_LOCK.lock().await; let mut enrichment_tables = HashMap::new(); // Build enrichment tables 'tables: for (name, table_outer) in config.enrichment_tables.iter() {