diff --git a/core/src/duckdb/creator.rs b/core/src/duckdb/creator.rs index e59bb9a7..12b0a9a5 100644 --- a/core/src/duckdb/creator.rs +++ b/core/src/duckdb/creator.rs @@ -15,7 +15,7 @@ use itertools::Itertools; use snafu::prelude::*; use std::collections::HashSet; use std::fmt::Display; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use super::DuckDB; use crate::util::{ @@ -48,12 +48,17 @@ impl From for RelationName { /// A table definition, which includes the table name, schema, constraints, and indexes. /// This is used to store the definition of a table for a dataset, and can be re-used to create one or more tables (like internal data tables). -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug)] pub struct TableDefinition { name: RelationName, schema: SchemaRef, constraints: Option, indexes: Vec<(ColumnReference, IndexType)>, + /// Index name prefixes that are managed externally to the write pipeline. + /// Indexes whose names start with any of these prefixes are excluded from + /// the drift-check comparison in [`TableManager::verify_indexes_match`]. + /// Uses interior mutability so callers can register prefixes after construction. + ignored_index_prefixes: Mutex>, } impl TableDefinition { @@ -64,6 +69,7 @@ impl TableDefinition { schema, constraints: None, indexes: Vec::new(), + ignored_index_prefixes: Mutex::new(Vec::new()), } } @@ -79,13 +85,27 @@ impl TableDefinition { self } + /// Register an index name prefix whose indexes are managed outside the write pipeline. + /// Indexes whose names start with this prefix are excluded from the drift-check + /// comparison so that externally-managed indexes do not cause refresh failures. + /// + /// May be called after construction (e.g. after the vector engine is configured). + pub fn add_ignored_index_prefix(&self, prefix: impl Into) { + self.ignored_index_prefixes + .lock() + .unwrap_or_else(|e| e.into_inner()) + .push(prefix.into()); + } + #[must_use] pub fn with_name(self, name: RelationName) -> Self { + let prefixes = self.ignored_index_prefixes.into_inner().unwrap_or_default(); Self { name, schema: self.schema, constraints: self.constraints, indexes: self.indexes, + ignored_index_prefixes: Mutex::new(prefixes), } } @@ -96,7 +116,35 @@ impl TableDefinition { pub fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } +} + +impl PartialEq for TableDefinition { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + && self.schema == other.schema + && self.constraints == other.constraints + && self.indexes == other.indexes + } +} +impl Clone for TableDefinition { + fn clone(&self) -> Self { + let prefixes = self + .ignored_index_prefixes + .lock() + .unwrap_or_else(|e| e.into_inner()) + .clone(); + Self { + name: self.name.clone(), + schema: Arc::clone(&self.schema), + constraints: self.constraints.clone(), + indexes: self.indexes.clone(), + ignored_index_prefixes: Mutex::new(prefixes), + } + } +} + +impl TableDefinition { pub fn indexes(&self) -> &[(ColumnReference, IndexType)] { &self.indexes } @@ -637,8 +685,18 @@ impl TableManager { .map(|index| index.replace(&self.table_name().to_string(), "")) .collect::>(); + let ignored_prefixes = self + .table_definition + .ignored_index_prefixes + .lock() + .unwrap_or_else(|e| e.into_inner()); let actual_indexes_str_map = actual_indexes_str_map .iter() + .filter(|index| { + !ignored_prefixes + .iter() + .any(|prefix| index.starts_with(prefix.as_str())) + }) .map(|index| index.replace(&other_table.table_name().to_string(), "")) .collect::>();