Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,15 @@ impl RelationalDB {
Ok(self.inner.alter_table_primary_key_mut_tx(tx, name, primary_key)?)
}

pub(crate) fn alter_index_source_name(
&self,
tx: &mut MutTx,
index_id: IndexId,
source_name: spacetimedb_sats::raw_identifier::RawIdentifier,
) -> Result<(), DBError> {
Ok(self.inner.alter_index_source_name_mut_tx(tx, index_id, source_name)?)
}

pub(crate) fn alter_table_row_type(
&self,
tx: &mut MutTx,
Expand Down
141 changes: 137 additions & 4 deletions crates/core/src/db/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,36 @@ fn auto_migrate_database(
.indexes
.iter()
.find(|index| index.index_name[..] == index_name[..])
.unwrap();
.ok_or_else(|| anyhow::anyhow!("Index `{index_name}` not found in table `{}`", table_def.name))?;

log!(logger, "Dropping index `{}` on table `{}`", index_name, table_def.name);
stdb.drop_index(tx, index_schema.index_id)?;
}
spacetimedb_schema::auto_migrate::AutoMigrateStep::ChangeIndexSourceName(index_name) => {
let old_table_def = plan.old.stored_in_table_def(index_name).unwrap();
let new_table_def = plan.new.stored_in_table_def(index_name).unwrap();
let new_index_def = new_table_def.indexes.get(index_name).unwrap();

let table_id = stdb.table_id_from_name_mut(tx, &old_table_def.name)?.unwrap();
let table_schema = stdb.schema_for_table_mut(tx, table_id)?;
let index_schema = table_schema
.indexes
.iter()
.find(|index| index.index_name[..] == index_name[..])
.ok_or_else(|| {
anyhow::anyhow!("Index `{index_name}` not found in table `{}`", old_table_def.name)
})?;

log!(
logger,
"Changing index source name for `{}` on table `{}` from `{}` to `{}`",
index_name,
old_table_def.name,
index_schema.alias.as_deref().unwrap_or(""),
new_index_def.source_name,
);
stdb.alter_index_source_name(tx, index_schema.index_id, new_index_def.source_name.clone())?;
}
spacetimedb_schema::auto_migrate::AutoMigrateStep::RemoveConstraint(constraint_name) => {
let table_def = plan.old.stored_in_table_def(constraint_name).unwrap();

Expand Down Expand Up @@ -340,9 +365,13 @@ mod test {
host::module_host::create_table_from_def,
};
use spacetimedb_datastore::locking_tx_datastore::PendingSchemaChange;
use spacetimedb_lib::db::raw_def::v9::{btree, RawIndexAlgorithm, RawModuleDefV9Builder, TableAccess};
use spacetimedb_sats::{product, AlgebraicType, AlgebraicType::U64};
use spacetimedb_schema::{auto_migrate::ponder_migrate, def::ModuleDef};
use spacetimedb_lib::db::raw_def::{
v10::{ExplicitNames, RawModuleDefV10Builder},
v9::{btree, RawIndexAlgorithm, RawModuleDefV9Builder, TableAccess},
};
use spacetimedb_sats::{product, raw_identifier::RawIdentifier, AlgebraicType, AlgebraicType::U64, ProductType};
use spacetimedb_schema::auto_migrate::{ponder_migrate, AutoMigrateStep, MigratePlan};
use spacetimedb_schema::def::ModuleDef;

struct TestLogger;
impl UpdateLogger for TestLogger {
Expand Down Expand Up @@ -424,6 +453,110 @@ mod test {
Ok(())
}

#[test]
fn update_db_change_index_source_name_updates_lookup_and_persists() -> anyhow::Result<()> {
let auth_ctx = AuthCtx::for_testing();
let stdb = TestDB::durable()?;

fn module_def(table_source_name: &str, index_source_name: &str) -> ModuleDef {
let mut builder = RawModuleDefV10Builder::new();
builder
.build_table_with_new_type(
table_source_name.to_owned(),
ProductType::from([("id", U64), ("emailAddress", AlgebraicType::String)]),
true,
)
.with_access(TableAccess::Public)
.with_index(btree(1), index_source_name.to_owned(), "emailAddress")
.finish();

if table_source_name != "users" {
let mut explicit_names = ExplicitNames::default();
explicit_names.insert_table(table_source_name.to_owned(), "users");
builder.add_explicit_names(explicit_names);
}

builder
.finish()
.try_into()
.expect("builder should create a valid database definition")
}

let old_source_name = "users_emailAddress_idx_btree";
let new_source_name = "appUsers_emailAddress_idx_btree";
let old = module_def("users", old_source_name);
let new = module_def("appUsers", new_source_name);

let mut tx = begin_mut_tx(&stdb);
for def in old.tables() {
create_table_from_def(&stdb, &mut tx, &old, def)?;
}
stdb.commit_tx(tx)?;

let tx = begin_mut_tx(&stdb);
let table_id = stdb
.table_id_from_name_mut(&tx, "users")?
.expect("there should be a table named users");
let table_schema = stdb.schema_for_table_mut(&tx, table_id)?;
let index_schema = table_schema
.indexes
.first()
.expect("there should be a single index")
.clone();
let canonical_index_name = index_schema.index_name.to_string();
let index_id = index_schema.index_id;
assert_eq!(stdb.index_id_from_name_mut(&tx, old_source_name)?, Some(index_id));
assert_eq!(stdb.index_id_from_name_mut(&tx, new_source_name)?, None);
assert_eq!(stdb.index_id_from_name_mut(&tx, &canonical_index_name)?, Some(index_id));
drop(tx);

let MigratePlan::Auto(plan) = ponder_migrate(&old, &new)? else {
panic!("expected automatic migration");
};
let index_name = RawIdentifier::new(canonical_index_name.as_str());
assert!(
plan.steps
.contains(&AutoMigrateStep::ChangeIndexSourceName(&index_name)),
"plan steps: {:?}",
plan.steps
);
assert!(
!plan.steps.contains(&AutoMigrateStep::RemoveIndex(&index_name)),
"plan steps: {:?}",
plan.steps
);
assert!(
!plan.steps.contains(&AutoMigrateStep::AddIndex(&index_name)),
"plan steps: {:?}",
plan.steps
);
let mut tx = begin_mut_tx(&stdb);
let res = update_database(&stdb, &mut tx, auth_ctx, MigratePlan::Auto(plan), &TestLogger)?;
assert!(matches!(res, UpdateResult::Success));

assert_eq!(stdb.index_id_from_name_mut(&tx, old_source_name)?, None);
assert_eq!(stdb.index_id_from_name_mut(&tx, new_source_name)?, Some(index_id));
assert_eq!(stdb.index_id_from_name_mut(&tx, &canonical_index_name)?, Some(index_id));
assert!(
tx.pending_schema_changes().iter().any(|change| matches!(
change,
PendingSchemaChange::IndexAlterSourceName(tid, iid, Some(old_alias))
if *tid == table_id && *iid == index_id && old_alias.as_ref() == old_source_name
)),
"pending schema changes: {:?}",
tx.pending_schema_changes()
);
stdb.commit_tx(tx)?;

let stdb = stdb.reopen()?;
let tx = begin_mut_tx(&stdb);
assert_eq!(stdb.index_id_from_name_mut(&tx, old_source_name)?, None);
assert_eq!(stdb.index_id_from_name_mut(&tx, new_source_name)?, Some(index_id));
assert_eq!(stdb.index_id_from_name_mut(&tx, &canonical_index_name)?, Some(index_id));

Ok(())
}

/// Regression test for #3934: removing a primary key annotation and then
/// re-publishing causes "Primary key mismatch" on the NEXT publish.
#[test]
Expand Down
12 changes: 12 additions & 0 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,18 @@ impl CommittedState {
table.with_mut_schema(|s| s.remove_index(index_id));
self.index_id_map.remove(&index_id);
}
// An index alias/source-name changed. Change it back.
IndexAlterSourceName(table_id, index_id, old_alias) => {
let table = self.tables.get_mut(&table_id)?;
let mut index_schema = table
.get_schema()
.indexes
.iter()
.find(|x| x.index_id == index_id)?
.clone();
index_schema.alias = old_alias;
table.with_mut_schema(|s| s.update_index(index_schema));
}
// A table was removed. Add it back.
TableRemoved(table_id, table) => {
let is_view_table = table.schema.is_view();
Expand Down
18 changes: 18 additions & 0 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,15 @@ impl Locking {
tx.alter_table_primary_key(table_id, primary_key)
}

pub fn alter_index_source_name_mut_tx(
&self,
tx: &mut MutTxId,
index_id: IndexId,
source_name: spacetimedb_sats::raw_identifier::RawIdentifier,
) -> Result<()> {
tx.alter_index_source_name(index_id, source_name)
}

pub fn alter_table_row_type_mut_tx(
&self,
tx: &mut MutTxId,
Expand Down Expand Up @@ -527,6 +536,15 @@ impl MutTxDatastore for Locking {
tx.drop_index(index_id)
}

fn alter_index_source_name_mut_tx(
&self,
tx: &mut Self::MutTx,
index_id: IndexId,
source_name: spacetimedb_sats::raw_identifier::RawIdentifier,
) -> Result<()> {
tx.alter_index_source_name(index_id, source_name)
}

fn index_id_from_name_mut_tx(&self, tx: &Self::MutTx, index_name: &str) -> Result<Option<IndexId>> {
tx.index_id_from_name_or_alias(index_name)
}
Expand Down
40 changes: 40 additions & 0 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,46 @@ impl MutTxId {
Ok(())
}

/// Change the runtime source-name alias of the index identified by `index_id`.
pub(crate) fn alter_index_source_name(
&mut self,
index_id: IndexId,
source_name: spacetimedb_sats::raw_identifier::RawIdentifier,
) -> Result<()> {
let st_index_ref = self
.iter_by_col_eq(ST_INDEX_ID, StIndexFields::IndexId, &index_id.into())?
.next()
.ok_or_else(|| TableError::IdNotFound(SystemTable::st_index, index_id.into()))?;
let st_index_row = StIndexRow::try_from(st_index_ref)?;
let table_id = st_index_row.table_id;

let old_alias = self
.find_st_index_accessor_row_by_index_name(st_index_row.index_name.as_ref())?
.map(|row| row.accessor_name);

if old_alias.as_ref() == Some(&source_name) {
return Ok(());
}

let ((tx_table, ..), (commit_table, ..)) = self.get_or_create_insert_table_mut(table_id)?;
let mut index_schema = tx_table
.get_schema()
.indexes
.iter()
.find(|index| index.index_id == index_id)
.cloned()
.ok_or_else(|| TableError::IdNotFound(SystemTable::st_index, index_id.into()))?;
index_schema.alias = Some(source_name.clone());
tx_table.with_mut_schema_and_clone(commit_table, |s| s.update_index(index_schema));

self.drop_st_index_accessor(&st_index_row.index_name)?;
self.insert_st_index_accessor(&st_index_row.index_name, Some(&source_name))?;

self.push_schema_change(PendingSchemaChange::IndexAlterSourceName(table_id, index_id, old_alias));

Ok(())
}

/// Change the row type of the table identified by `table_id`.
///
/// In practice, this should not error,
Expand Down
27 changes: 25 additions & 2 deletions crates/datastore/src/locking_tx_datastore/state_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use core::ops::RangeBounds;
use spacetimedb_lib::ConnectionId;
use spacetimedb_primitives::{ColList, TableId};
use spacetimedb_sats::AlgebraicValue;
use spacetimedb_schema::schema::{ColumnSchema, TableSchema, ViewDefInfo};
use spacetimedb_schema::schema::{ColumnSchema, IndexSchema, TableSchema, ViewDefInfo};
use spacetimedb_table::table::IndexScanPointIter;
use spacetimedb_table::{
blob_store::HashMapBlobStore,
Expand Down Expand Up @@ -120,6 +120,22 @@ pub trait StateView {
.transpose()
}

/// Look up an `st_index_accessor` row by its canonical index name.
fn find_st_index_accessor_row_by_index_name(&self, index_name: &str) -> Result<Option<StIndexAccessorRow>> {
match self.iter_by_col_eq(
ST_INDEX_ACCESSOR_ID,
StIndexAccessorFields::IndexName,
&index_name.into(),
) {
Ok(mut iter) => iter.next().map(StIndexAccessorRow::try_from).transpose(),
// `schema_for_table_raw` is called while restoring snapshots,
// before `migrate_system_tables` creates newer system tables.
// Treat a missing `st_index_accessor` as "no aliases yet" here.
Err(DatastoreError::Table(TableError::IdNotFound(..))) => Ok(None),
Err(e) => Err(e),
}
}

/// Look up an `st_column_accessor` row by its canonical table and column names
fn find_st_column_accessor_row(&self, table_name: &str, col_name: &str) -> Result<Option<StColumnAccessorRow>> {
match self.iter_by_col_eq(
Expand Down Expand Up @@ -187,7 +203,14 @@ pub trait StateView {
// Look up the indexes for the table in question.
let indexes = self
.iter_by_col_eq(ST_INDEX_ID, StIndexFields::TableId, value_eq)?
.map(|row| StIndexRow::try_from(row).map(Into::into))
.map(|row| {
let row = StIndexRow::try_from(row)?;
let mut index_schema = IndexSchema::from(row);
index_schema.alias = self
.find_st_index_accessor_row_by_index_name(index_schema.index_name.as_ref())?
.map(|row| row.accessor_name);
Comment thread
joshua-spacetime marked this conversation as resolved.
Ok(index_schema)
})
.collect::<Result<Vec<_>>>()?;

let schedule = self
Expand Down
10 changes: 10 additions & 0 deletions crates/datastore/src/locking_tx_datastore/tx_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ pub enum PendingSchemaChange {
/// If adding this index caused the pointer map to be removed,
/// it will be present here.
IndexAdded(TableId, IndexId, Option<PointerMap>),
/// The source-name alias of the index with [`IndexId`] changed.
/// The old alias is stored for rollback.
IndexAlterSourceName(
TableId,
IndexId,
Option<spacetimedb_sats::raw_identifier::RawIdentifier>,
),
/// The [`Table`] with [`TableId`] was removed.
TableRemoved(TableId, Table),
/// The table with [`TableId`] was added.
Expand Down Expand Up @@ -145,6 +152,9 @@ impl MemoryUsage for PendingSchemaChange {
Self::IndexAdded(table_id, index_id, pointer_map) => {
table_id.heap_usage() + index_id.heap_usage() + pointer_map.heap_usage()
}
Self::IndexAlterSourceName(table_id, index_id, alias) => {
table_id.heap_usage() + index_id.heap_usage() + alias.heap_usage()
}
Self::TableRemoved(table_id, table) => table_id.heap_usage() + table.heap_usage(),
Self::TableAdded(table_id) => table_id.heap_usage(),
Self::TableAlterAccess(table_id, st_access) => table_id.heap_usage() + st_access.heap_usage(),
Expand Down
6 changes: 6 additions & 0 deletions crates/datastore/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,12 @@ pub trait MutTxDatastore: TxDatastore + MutTx {

fn create_index_mut_tx(&self, tx: &mut Self::MutTx, index_schema: IndexSchema, is_unique: bool) -> Result<IndexId>;
fn drop_index_mut_tx(&self, tx: &mut Self::MutTx, index_id: IndexId) -> Result<()>;
fn alter_index_source_name_mut_tx(
&self,
tx: &mut Self::MutTx,
index_id: IndexId,
source_name: spacetimedb_sats::raw_identifier::RawIdentifier,
) -> Result<()>;
fn index_id_from_name_mut_tx(&self, tx: &Self::MutTx, index_name: &str) -> super::Result<Option<IndexId>>;

// TODO: Index data
Expand Down
Loading
Loading