Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions core/src/duckdb/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use itertools::Itertools;
use snafu::prelude::*;
use std::collections::HashSet;
use std::fmt::Display;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use super::DuckDB;
use crate::util::{
Expand Down Expand Up @@ -48,12 +48,17 @@ impl From<TableReference> for RelationName {

/// A table definition, which includes the table name, schema, constraints, and indexes.
/// This is used to store the definition of a table for a dataset, and can be re-used to create one or more tables (like internal data tables).
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug)]
pub struct TableDefinition {
name: RelationName,
schema: SchemaRef,
constraints: Option<Constraints>,
indexes: Vec<(ColumnReference, IndexType)>,
/// Index name prefixes that are managed externally to the write pipeline.
/// Indexes whose names start with any of these prefixes are excluded from
/// the drift-check comparison in [`TableManager::verify_indexes_match`].
/// Uses interior mutability so callers can register prefixes after construction.
ignored_index_prefixes: Mutex<Vec<String>>,
}

impl TableDefinition {
Expand All @@ -64,6 +69,7 @@ impl TableDefinition {
schema,
constraints: None,
indexes: Vec::new(),
ignored_index_prefixes: Mutex::new(Vec::new()),
}
}

Expand All @@ -79,13 +85,27 @@ impl TableDefinition {
self
}

/// Register an index name prefix whose indexes are managed outside the write pipeline.
/// Indexes whose names start with this prefix are excluded from the drift-check
/// comparison so that externally-managed indexes do not cause refresh failures.
///
/// May be called after construction (e.g. after the vector engine is configured).
pub fn add_ignored_index_prefix(&self, prefix: impl Into<String>) {
self.ignored_index_prefixes
.lock()
.unwrap_or_else(|e| e.into_inner())
.push(prefix.into());
}

#[must_use]
pub fn with_name(self, name: RelationName) -> Self {
let prefixes = self.ignored_index_prefixes.into_inner().unwrap_or_default();
Self {
name,
schema: self.schema,
constraints: self.constraints,
indexes: self.indexes,
ignored_index_prefixes: Mutex::new(prefixes),
}
}

Expand All @@ -96,7 +116,35 @@ impl TableDefinition {
pub fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}

impl PartialEq for TableDefinition {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.schema == other.schema
&& self.constraints == other.constraints
&& self.indexes == other.indexes
}
}

impl Clone for TableDefinition {
fn clone(&self) -> Self {
let prefixes = self
.ignored_index_prefixes
.lock()
.unwrap_or_else(|e| e.into_inner())
.clone();
Self {
name: self.name.clone(),
schema: Arc::clone(&self.schema),
constraints: self.constraints.clone(),
indexes: self.indexes.clone(),
ignored_index_prefixes: Mutex::new(prefixes),
}
}
}

impl TableDefinition {
pub fn indexes(&self) -> &[(ColumnReference, IndexType)] {
&self.indexes
}
Expand Down Expand Up @@ -637,8 +685,18 @@ impl TableManager {
.map(|index| index.replace(&self.table_name().to_string(), ""))
.collect::<HashSet<_>>();

let ignored_prefixes = self
.table_definition
.ignored_index_prefixes
.lock()
.unwrap_or_else(|e| e.into_inner());
let actual_indexes_str_map = actual_indexes_str_map
.iter()
.filter(|index| {
!ignored_prefixes
.iter()
.any(|prefix| index.starts_with(prefix.as_str()))
})
.map(|index| index.replace(&other_table.table_name().to_string(), ""))
.collect::<HashSet<_>>();

Expand Down
Loading