Skip to content
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
2a58cea
datastore: add clear_table op
Centril Sep 1, 2025
58e82f5
datastore: fix drop_table not deleting rows
Centril Sep 1, 2025
16f8d1e
datastore: test & handle `expect` during tx_metrics reporting
Shubham8287 Sep 2, 2025
5f9e2c7
fix typo
Centril Sep 2, 2025
b4bec5b
enhance test_drop_table_is_transactional further
Centril Sep 2, 2025
b7bb652
Add columns to table and test
Shubham8287 Sep 5, 2025
d7d94f9
lint
Shubham8287 Sep 5, 2025
2706c4c
datastore: add clear_table op
Centril Sep 1, 2025
6ae80bb
datastore: fix drop_table not deleting rows
Centril Sep 1, 2025
4b6e2aa
datastore: test & handle `expect` during tx_metrics reporting
Shubham8287 Sep 2, 2025
fb9bfec
fix typo
Centril Sep 2, 2025
b48e42e
enhance test_drop_table_is_transactional further
Centril Sep 2, 2025
22b068b
replay_delete_by_rel: delete in-mmory table during replay
Centril Sep 5, 2025
813f34c
TxMetrics::report: remove_label_values for dropped tables
Centril Sep 5, 2025
a50df64
replay_delete_by_rel: actually populate 'table_dropped'
Centril Sep 8, 2025
ae14efe
comments address && test
Shubham8287 Sep 11, 2025
03f6b7d
rollback test
Shubham8287 Sep 12, 2025
7ae2d4b
Merge branch 'centril/clear-table-and-fix-drop-table' into shub/add-c…
Shubham8287 Sep 12, 2025
8c48efb
Merge branch 'master' into centril/clear-table-and-fix-drop-table
Shubham8287 Sep 18, 2025
e6b147a
Merge branch 'master' into centril/clear-table-and-fix-drop-table
Shubham8287 Sep 19, 2025
b9b958c
datastore: add truncation support
Centril Sep 2, 2025
e81b83c
fix truncate
Shubham8287 Sep 19, 2025
29b0ad9
Merge branch 'centril/truncate' into shub/add-columns
Shubham8287 Sep 19, 2025
81c7bd5
merge conflicts
Shubham8287 Sep 19, 2025
1903e1c
Merge branch 'master' into centril/clear-table-and-fix-drop-table
Shubham8287 Sep 22, 2025
f44dcfa
datastore: add truncation support
Centril Sep 2, 2025
431f7bb
decided on truncate after processing inserts
Shubham8287 Sep 22, 2025
f25ae4a
cache table name on replay truncates
Shubham8287 Sep 22, 2025
e4e6b6a
rename: truncated_table -> dropped_table_names
Shubham8287 Sep 22, 2025
f2b54b9
uncomment test assert
Shubham8287 Sep 22, 2025
c3dc868
Merge branch 'shub/truncate' into shub/add-columns
Shubham8287 Sep 22, 2025
7ae4f0f
introduced type_check, and handle redundant defaults
Shubham8287 Sep 23, 2025
df7fa04
fix: truncate (#3261)
Shubham8287 Sep 23, 2025
40df4b7
Apply suggestions from code review
Shubham8287 Sep 23, 2025
5ecc584
Merge branch 'master' into centril/truncate
Shubham8287 Sep 23, 2025
1de7520
Merge branch 'centril/truncate' into shub/add-columns
Shubham8287 Sep 23, 2025
00ff271
comments
Shubham8287 Sep 23, 2025
a8860e8
test lint
Shubham8287 Sep 23, 2025
1ec5add
Apply suggestions from code review
Shubham8287 Sep 23, 2025
3945d75
Merge branch 'master' into shub/add-columns
Shubham8287 Sep 23, 2025
1d0d0ac
lint
Shubham8287 Sep 23, 2025
cc6753a
lint
Shubham8287 Sep 23, 2025
44e12c4
Merge branch 'master' into shub/add-columns
Shubham8287 Sep 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use futures::channel::mpsc;
use futures::StreamExt;
use parking_lot::RwLock;
use spacetimedb_commitlog as commitlog;
use spacetimedb_data_structures::map::IntSet;
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::error::{DatastoreError, TableError};
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
Expand Down Expand Up @@ -889,12 +890,17 @@ impl RelationalDB {
rowdata: rowdata.clone(),
})
.collect();

let truncates: IntSet<TableId> = tx_data.truncates().collect();

let deletes: Box<_> = tx_data
.deletes()
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
})
// filter out deletes for tables that are truncated in the same transaction.
.filter(|ops| !truncates.contains(&ops.table_id))
.collect();

let inputs = reducer_context.map(|rcx| rcx.into());
Expand All @@ -905,7 +911,7 @@ impl RelationalDB {
mutations: Some(Mutations {
inserts,
deletes,
truncates: [].into(),
truncates: truncates.into_iter().collect(),
}),
};

Expand Down Expand Up @@ -1082,6 +1088,18 @@ impl RelationalDB {
Ok(self.inner.alter_table_row_type_mut_tx(tx, table_id, column_schemas)?)
}

pub(crate) fn add_columns_to_table(
&self,
tx: &mut MutTx,
table_id: TableId,
column_schemas: Vec<ColumnSchema>,
default_values: Vec<AlgebraicValue>,
) -> Result<TableId, DBError> {
Ok(self
.inner
.add_columns_to_table_mut_tx(tx, table_id, column_schemas, default_values)?)
}

/// Reports the `TxMetrics`s passed.
///
/// Should only be called after the tx lock has been fully released.
Expand Down
12 changes: 12 additions & 0 deletions crates/core/src/db/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,18 @@ fn auto_migrate_database(
log!(logger, "Removing-row level security `{sql_rls}`");
stdb.drop_row_level_security(tx, sql_rls.clone())?;
}
spacetimedb_schema::auto_migrate::AutoMigrateStep::AddColumns(table_name) => {
let table_def = plan.new.stored_in_table_def(table_name).expect("table must exist");
let table_id = stdb.table_id_from_name_mut(tx, table_name).unwrap().unwrap();
let column_schemas = column_schemas_from_defs(plan.new, &table_def.columns, table_id);

let default_values: Vec<AlgebraicValue> = table_def
.columns
.iter()
.filter_map(|col_def| col_def.default_value.clone())
.collect();
stdb.add_columns_to_table(tx, table_id, column_schemas, default_values)?;
}
_ => anyhow::bail!("migration step not implemented: {step:?}"),
}
}
Expand Down
16 changes: 8 additions & 8 deletions crates/core/src/subscription/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ impl DeltaStore for DeltaTx<'_> {
self.data
.and_then(|data| {
data.inserts()
.find(|(id, _)| **id == table_id)
.map(|(_, rows)| rows.len())
.find(|(id, ..)| **id == table_id)
.map(|(.., rows)| rows.len())
})
.unwrap_or_default()
}
Expand All @@ -143,25 +143,25 @@ impl DeltaStore for DeltaTx<'_> {
self.data
.and_then(|data| {
data.deletes()
.find(|(id, _)| **id == table_id)
.map(|(_, rows)| rows.len())
.find(|(id, ..)| **id == table_id)
.map(|(.., rows)| rows.len())
})
.unwrap_or_default()
}

fn inserts_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
self.data.and_then(|data| {
data.inserts()
.find(|(id, _)| **id == table_id)
.map(|(_, rows)| rows.iter())
.find(|(id, ..)| **id == table_id)
.map(|(.., rows)| rows.iter())
})
}

fn deletes_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
self.data.and_then(|data| {
data.deletes()
.find(|(id, _)| **id == table_id)
.map(|(_, rows)| rows.iter())
.find(|(id, ..)| **id == table_id)
.map(|(.., rows)| rows.iter())
})
}

Expand Down
2 changes: 2 additions & 0 deletions crates/datastore/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub enum TableError {
#[error(transparent)]
// Error here is `Box`ed to avoid triggering https://rust-lang.github.io/rust-clippy/master/index.html#result_large_err .
ChangeColumnsError(#[from] Box<table::ChangeColumnsError>),
#[error(transparent)]
AddColumnsError(#[from] Box<table::AddColumnsError>),
}

#[derive(Error, Debug, PartialEq, Eq)]
Expand Down
108 changes: 83 additions & 25 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,19 +315,35 @@ impl CommittedState {
Ok(())
}

pub(super) fn replay_truncate(&mut self, table_id: TableId) -> Result<()> {
// (1) Table dropped? Avoid an error and just ignore the row instead.
if self.table_dropped.contains(&table_id) {
return Ok(());
}

// Get the table for mutation.
let (table, blob_store, ..) = self.get_table_and_blob_store_mut(table_id)?;

// We do not need to consider a truncation of `st_table` itself,
// as if that happens, the database is bricked.

table.clear(blob_store);

Ok(())
}

pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, row: &ProductValue) -> Result<()> {
// (1) Table dropped? Avoid an error and just ignore the row instead.
if self.table_dropped.contains(&table_id) {
return Ok(());
}

// Get the table for mutation.
let table = match self.tables.get_mut(&table_id) {
Some(t) => t,
// (1) If it was dropped, avoid an error and just ignore the row instead.
None if self.table_dropped.contains(&table_id) => return Ok(()),
None => return Err(TableError::IdNotFoundState(table_id).into()),
};
let (table, blob_store, _, page_pool) = self.get_table_and_blob_store_mut(table_id)?;

// Delete the row.
let blob_store = &mut self.blob_store;
table
.delete_equal_row(&self.page_pool, blob_store, row)
.delete_equal_row(page_pool, blob_store, row)
.map_err(TableError::Bflatn)?
.ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?;

Expand Down Expand Up @@ -461,9 +477,9 @@ impl CommittedState {
for index_row in rows {
let index_id = index_row.index_id;
let table_id = index_row.table_id;
let (Some(table), blob_store, index_id_map) = self.get_table_and_blob_store_mut(table_id) else {
panic!("Cannot create index for table which doesn't exist in committed state");
};
let (table, blob_store, index_id_map, _) = self
.get_table_and_blob_store_mut(table_id)
.expect("index should exist in committed state; cannot create it");
let algo: IndexAlgorithm = index_row.index_algorithm.into();
let columns: ColSet = algo.columns().into();
let is_unique = unique_constraints.contains(&(table_id, columns));
Expand Down Expand Up @@ -564,8 +580,7 @@ impl CommittedState {
"Cannot get TX_STATE RowPointer from CommittedState.",
);
let table = self
.tables
.get(&table_id)
.get_table(table_id)
.expect("Attempt to get COMMITTED_STATE row from table not present in tables.");
// TODO(perf, deep-integration): Use `get_row_ref_unchecked`.
table.get_row_ref(&self.blob_store, row_ptr).unwrap()
Expand Down Expand Up @@ -595,13 +610,27 @@ impl CommittedState {

pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData {
let mut tx_data = TxData::default();
let mut truncates = IntSet::default();

// First, apply deletes. This will free up space in the committed tables.
self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables, tx_state.pending_schema_changes);
self.merge_apply_deletes(
&mut tx_data,
tx_state.delete_tables,
tx_state.pending_schema_changes,
&mut truncates,
);

// Then, apply inserts. This will re-fill the holes freed by deletions
// before allocating new pages.
self.merge_apply_inserts(&mut tx_data, tx_state.insert_tables, tx_state.blob_store);
self.merge_apply_inserts(
&mut tx_data,
tx_state.insert_tables,
tx_state.blob_store,
&mut truncates,
);

// Record any truncated tables in the `TxData`.
tx_data.add_truncates(truncates);

// If the TX will be logged, record its projected tx offset,
// then increment the counter.
Expand All @@ -618,6 +647,7 @@ impl CommittedState {
tx_data: &mut TxData,
delete_tables: BTreeMap<TableId, DeleteTable>,
pending_schema_changes: ThinVec<PendingSchemaChange>,
truncates: &mut IntSet<TableId>,
) {
fn delete_rows(
tx_data: &mut TxData,
Expand All @@ -626,6 +656,7 @@ impl CommittedState {
blob_store: &mut dyn BlobStore,
row_ptrs_len: usize,
row_ptrs: impl Iterator<Item = RowPointer>,
truncates: &mut IntSet<TableId>,
) {
let mut deletes = Vec::with_capacity(row_ptrs_len);

Expand All @@ -645,17 +676,27 @@ impl CommittedState {

if !deletes.is_empty() {
let table_name = &*table.get_schema().table_name;
// TODO(centril): Pass this along to record truncated tables.
let _truncated = table.row_count == 0;
tx_data.set_deletes_for_table(table_id, table_name, deletes.into());
let truncated = table.row_count == 0;
if truncated {
truncates.insert(table_id);
}
}
}

for (table_id, row_ptrs) in delete_tables {
if let (Some(table), blob_store, _) = self.get_table_and_blob_store_mut(table_id) {
delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter());
} else if !row_ptrs.is_empty() {
panic!("Deletion for non-existent table {table_id:?}... huh?");
match self.get_table_and_blob_store_mut(table_id) {
Ok((table, blob_store, ..)) => delete_rows(
tx_data,
table_id,
table,
blob_store,
row_ptrs.len(),
row_ptrs.iter(),
truncates,
),
Err(_) if !row_ptrs.is_empty() => panic!("Deletion for non-existent table {table_id:?}... huh?"),
Err(_) => {}
}
}

Expand All @@ -665,13 +706,15 @@ impl CommittedState {
for change in pending_schema_changes {
if let PendingSchemaChange::TableRemoved(table_id, mut table) = change {
let row_ptrs = table.scan_all_row_ptrs();
truncates.insert(table_id);
delete_rows(
tx_data,
table_id,
&mut table,
&mut self.blob_store,
row_ptrs.len(),
row_ptrs.into_iter(),
truncates,
);
}
}
Expand All @@ -682,6 +725,7 @@ impl CommittedState {
tx_data: &mut TxData,
insert_tables: BTreeMap<TableId, Table>,
tx_blob_store: impl BlobStore,
truncates: &mut IntSet<TableId>,
) {
// TODO(perf): Consider moving whole pages from the `insert_tables` into the committed state,
// rather than copying individual rows out of them.
Expand Down Expand Up @@ -710,6 +754,11 @@ impl CommittedState {
if !inserts.is_empty() {
let table_name = &*commit_table.get_schema().table_name;
tx_data.set_inserts_for_table(table_id, table_name, inserts.into());

// if table has inserted rows, it cannot be truncated
if truncates.contains(&table_id) {
truncates.remove(&table_id);
}
}

let (schema, _indexes, pages) = tx_table.consume_for_merge();
Expand Down Expand Up @@ -835,12 +884,21 @@ impl CommittedState {
pub(super) fn get_table_and_blob_store_mut(
&mut self,
table_id: TableId,
) -> (Option<&mut Table>, &mut dyn BlobStore, &mut IndexIdMap) {
(
self.tables.get_mut(&table_id),
) -> Result<(&mut Table, &mut dyn BlobStore, &mut IndexIdMap, &PagePool)> {
// NOTE(centril): `TableError` is a fairly large type.
// Not making this lazy made `TableError::drop` show up in perf.
// TODO(centril): Box all the errors.
#[allow(clippy::unnecessary_lazy_evaluations)]
let table = self
.tables
.get_mut(&table_id)
.ok_or_else(|| TableError::IdNotFoundState(table_id))?;
Ok((
table,
&mut self.blob_store as &mut dyn BlobStore,
&mut self.index_id_map,
)
&self.page_pool,
))
}

fn make_table(schema: Arc<TableSchema>) -> Table {
Expand Down
Loading
Loading