Skip to content

Commit a81d360

Browse files
committed
datastore: add truncation support
1 parent 813f34c commit a81d360

6 files changed

Lines changed: 141 additions & 58 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use spacetimedb_datastore::system_tables::StModuleRow;
2323
use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID};
2424
use spacetimedb_datastore::traits::{
2525
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
26-
UpdateFlags,
26+
TxTableTruncated, UpdateFlags,
2727
};
2828
use spacetimedb_datastore::{
2929
locking_tx_datastore::{
@@ -869,11 +869,17 @@ impl RelationalDB {
869869
.collect();
870870
let deletes: Box<_> = tx_data
871871
.deletes()
872-
.map(|(table_id, rowdata)| Ops {
872+
.filter(|(_, truncated, _)| *truncated == TxTableTruncated::No)
873+
.map(|(table_id, _, rowdata)| Ops {
873874
table_id: *table_id,
874875
rowdata: rowdata.clone(),
875876
})
876877
.collect();
878+
let truncates: Box<_> = tx_data
879+
.deletes()
880+
.filter(|(_, truncated, _)| *truncated == TxTableTruncated::Yes)
881+
.map(|(table_id, ..)| *table_id)
882+
.collect();
877883

878884
let inputs = reducer_context.map(|rcx| rcx.into());
879885

@@ -883,7 +889,7 @@ impl RelationalDB {
883889
mutations: Some(Mutations {
884890
inserts,
885891
deletes,
886-
truncates: [].into(),
892+
truncates,
887893
}),
888894
};
889895

crates/core/src/subscription/tx.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,10 @@ impl DeltaTableIndexes {
7676
indexes
7777
}
7878

79+
let deletes = data.deletes().map(|(table_id, _, rows)| (table_id, rows));
7980
Self {
8081
inserts: build_indexes_for_rows(tx, meta, data.inserts()),
81-
deletes: build_indexes_for_rows(tx, meta, data.deletes()),
82+
deletes: build_indexes_for_rows(tx, meta, deletes),
8283
}
8384
}
8485
}
@@ -133,8 +134,8 @@ impl DeltaStore for DeltaTx<'_> {
133134
self.data
134135
.and_then(|data| {
135136
data.inserts()
136-
.find(|(id, _)| **id == table_id)
137-
.map(|(_, rows)| rows.len())
137+
.find(|(id, ..)| **id == table_id)
138+
.map(|(.., rows)| rows.len())
138139
})
139140
.unwrap_or_default()
140141
}
@@ -143,25 +144,25 @@ impl DeltaStore for DeltaTx<'_> {
143144
self.data
144145
.and_then(|data| {
145146
data.deletes()
146-
.find(|(id, _)| **id == table_id)
147-
.map(|(_, rows)| rows.len())
147+
.find(|(id, ..)| **id == table_id)
148+
.map(|(.., rows)| rows.len())
148149
})
149150
.unwrap_or_default()
150151
}
151152

152153
fn inserts_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
153154
self.data.and_then(|data| {
154155
data.inserts()
155-
.find(|(id, _)| **id == table_id)
156-
.map(|(_, rows)| rows.iter())
156+
.find(|(id, ..)| **id == table_id)
157+
.map(|(.., rows)| rows.iter())
157158
})
158159
}
159160

160161
fn deletes_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
161162
self.data.and_then(|data| {
162163
data.deletes()
163-
.find(|(id, _)| **id == table_id)
164-
.map(|(_, rows)| rows.iter())
164+
.find(|(id, ..)| **id == table_id)
165+
.map(|(.., rows)| rows.iter())
165166
})
166167
}
167168

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -345,19 +345,35 @@ impl CommittedState {
345345
Ok(())
346346
}
347347

348+
pub(super) fn replay_truncate(&mut self, table_id: TableId) -> Result<()> {
349+
// Get the table for mutation.
350+
// If it was dropped, avoid an error and just ignore the row instead.
351+
let (table, blob_store, ..) = match self.get_table_and_blob_store_mut(table_id) {
352+
Ok(t) => t,
353+
Err(_) if self.table_dropped.contains(&table_id) => return Ok(()),
354+
Err(e) => return Err(e),
355+
};
356+
357+
// We do not need to consider a truncation of `st_table` itself,
358+
// as if that happens, the database is bricked.
359+
360+
table.clear(blob_store);
361+
362+
Ok(())
363+
}
364+
348365
pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, row: &ProductValue) -> Result<()> {
349366
// Get the table for mutation.
350367
// If it was dropped, avoid an error and just ignore the row instead.
351-
let table = match self.tables.get_mut(&table_id) {
352-
Some(t) => t,
353-
None if self.table_dropped.contains(&table_id) => return Ok(()),
354-
None => return Err(TableError::IdNotFoundState(table_id).into()),
368+
let (table, blob_store, _, page_pool) = match self.get_table_and_blob_store_mut(table_id) {
369+
Ok(t) => t,
370+
Err(_) if self.table_dropped.contains(&table_id) => return Ok(()),
371+
Err(e) => return Err(e),
355372
};
356373

357374
// Delete the row.
358-
let blob_store = &mut self.blob_store;
359375
table
360-
.delete_equal_row(&self.page_pool, blob_store, row)
376+
.delete_equal_row(page_pool, blob_store, row)
361377
.map_err(TableError::Bflatn)?
362378
.ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?;
363379

@@ -488,9 +504,9 @@ impl CommittedState {
488504
for index_row in rows {
489505
let index_id = index_row.index_id;
490506
let table_id = index_row.table_id;
491-
let (Some(table), blob_store, index_id_map) = self.get_table_and_blob_store_mut(table_id) else {
492-
panic!("Cannot create index for table which doesn't exist in committed state");
493-
};
507+
let (table, blob_store, index_id_map, _) = self
508+
.get_table_and_blob_store_mut(table_id)
509+
.expect("index should exist in committed state; cannot create it");
494510
let algo: IndexAlgorithm = index_row.index_algorithm.into();
495511
let columns: ColSet = algo.columns().into();
496512
let is_unique = unique_constraints.contains(&(table_id, columns));
@@ -591,8 +607,7 @@ impl CommittedState {
591607
"Cannot get TX_STATE RowPointer from CommittedState.",
592608
);
593609
let table = self
594-
.tables
595-
.get(&table_id)
610+
.get_table(table_id)
596611
.expect("Attempt to get COMMITTED_STATE row from table not present in tables.");
597612
// TODO(perf, deep-integration): Use `get_row_ref_unchecked`.
598613
table.get_row_ref(&self.blob_store, row_ptr).unwrap()
@@ -672,17 +687,18 @@ impl CommittedState {
672687

673688
if !deletes.is_empty() {
674689
let table_name = &*table.get_schema().table_name;
675-
// TODO(centril): Pass this along to record truncated tables.
676-
let _truncated = table.row_count == 0;
677-
tx_data.set_deletes_for_table(table_id, table_name, deletes.into());
690+
let truncated = table.row_count == 0;
691+
tx_data.set_deletes_for_table(table_id, table_name, deletes.into(), truncated);
678692
}
679693
}
680694

681695
for (table_id, row_ptrs) in delete_tables {
682-
if let (Some(table), blob_store, _) = self.get_table_and_blob_store_mut(table_id) {
683-
delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter());
684-
} else if !row_ptrs.is_empty() {
685-
panic!("Deletion for non-existent table {table_id:?}... huh?");
696+
match self.get_table_and_blob_store_mut(table_id) {
697+
Ok((table, blob_store, ..)) => {
698+
delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter())
699+
}
700+
Err(_) if !row_ptrs.is_empty() => panic!("Deletion for non-existent table {table_id:?}... huh?"),
701+
Err(_) => {}
686702
}
687703
}
688704

@@ -861,12 +877,21 @@ impl CommittedState {
861877
pub(super) fn get_table_and_blob_store_mut(
862878
&mut self,
863879
table_id: TableId,
864-
) -> (Option<&mut Table>, &mut dyn BlobStore, &mut IndexIdMap) {
865-
(
866-
self.tables.get_mut(&table_id),
880+
) -> Result<(&mut Table, &mut dyn BlobStore, &mut IndexIdMap, &PagePool)> {
881+
// NOTE(centril): `TableError` is a fairly large type.
882+
// Not making this lazy made `TableError::drop` show up in perf.
883+
// TODO(centril): Box all the errors.
884+
#[allow(clippy::unnecessary_lazy_evaluations)]
885+
let table = self
886+
.tables
887+
.get_mut(&table_id)
888+
.ok_or_else(|| TableError::IdNotFoundState(table_id))?;
889+
Ok((
890+
table,
867891
&mut self.blob_store as &mut dyn BlobStore,
868892
&mut self.index_id_map,
869-
)
893+
&self.page_pool,
894+
))
870895
}
871896

872897
fn make_table(schema: Arc<TableSchema>) -> Table {

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,8 +1139,26 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi
11391139
Ok(row)
11401140
}
11411141

1142-
fn visit_truncate(&mut self, _table_id: TableId) -> std::result::Result<(), Self::Error> {
1143-
Err(anyhow!("visit: truncate not yet supported").into())
1142+
fn visit_truncate(&mut self, table_id: TableId) -> std::result::Result<(), Self::Error> {
1143+
let schema = self.committed_state.schema_for_table(table_id)?;
1144+
// TODO: avoid clone
1145+
let table_name = schema.table_name.clone();
1146+
1147+
self.committed_state.replay_truncate(table_id).with_context(|| {
1148+
format!(
1149+
"Error truncating table {:?} during transaction {:?} playback",
1150+
table_id, self.committed_state.next_tx_offset
1151+
)
1152+
})?;
1153+
1154+
// NOTE: the `rdb_num_table_rows` metric is used by the query optimizer,
1155+
// and therefore has performance implications and must not be disabled.
1156+
DB_METRICS
1157+
.rdb_num_table_rows
1158+
.with_label_values(self.database_identity, &table_id.into(), &table_name)
1159+
.set(0);
1160+
1161+
Ok(())
11441162
}
11451163

11461164
fn visit_tx_start(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
@@ -1194,7 +1212,7 @@ mod tests {
11941212
ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, ST_SEQUENCE_NAME,
11951213
ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME,
11961214
};
1197-
use crate::traits::{IsolationLevel, MutTx};
1215+
use crate::traits::{IsolationLevel, MutTx, TxTableTruncated};
11981216
use crate::Result;
11991217
use bsatn::to_vec;
12001218
use core::{fmt, mem};
@@ -2816,7 +2834,7 @@ mod tests {
28162834
let tx_data_2 = commit(&datastore, tx)?;
28172835
// Ensure that none of the commits deleted rows in our table.
28182836
for tx_data in [&tx_data_1, &tx_data_2] {
2819-
assert_eq!(tx_data.deletes().find(|(tid, _)| **tid == table_id), None);
2837+
assert_eq!(tx_data.deletes().find(|(tid, ..)| **tid == table_id), None);
28202838
}
28212839
// Ensure that the first commit added the row but that the second didn't.
28222840
for (tx_data, expected_rows) in [(&tx_data_1, vec![row.clone()]), (&tx_data_2, vec![])] {
@@ -3140,11 +3158,12 @@ mod tests {
31403158
// Now drop the table again and commit.
31413159
assert!(datastore.drop_table_mut_tx(&mut tx, table_id).is_ok());
31423160
let tx_data = commit(&datastore, tx)?;
3143-
let (_, deleted) = tx_data
3161+
let (_, truncated, deleted_rows) = tx_data
31443162
.deletes()
3145-
.find(|(id, _)| **id == table_id)
3163+
.find(|(id, ..)| **id == table_id)
31463164
.expect("should have deleted rows for `table_id`");
3147-
assert_eq!(&**deleted, [row]);
3165+
assert_eq!(&**deleted_rows, [row]);
3166+
assert_eq!(truncated, TxTableTruncated::Yes);
31483167

31493168
// In the next transaction, the table doesn't exist.
31503169
assert!(
@@ -3348,8 +3367,9 @@ mod tests {
33483367
let to_product = |col: &ColumnSchema| value_serialize(&StColumnRow::from(col.clone())).into_product().unwrap();
33493368
let (_, inserts) = tx_data.inserts().find(|(id, _)| **id == ST_COLUMN_ID).unwrap();
33503369
assert_eq!(&**inserts, [to_product(&columns[1])].as_slice());
3351-
let (_, deletes) = tx_data.deletes().find(|(id, _)| **id == ST_COLUMN_ID).unwrap();
3370+
let (_, truncated, deletes) = tx_data.deletes().find(|(id, ..)| **id == ST_COLUMN_ID).unwrap();
33523371
assert_eq!(&**deletes, [to_product(&columns_original[1])].as_slice());
3372+
assert_eq!(truncated, TxTableTruncated::No);
33533373

33543374
// Check that we can successfully scan using the new schema type post commit.
33553375
let tx = begin_tx(&datastore);

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -414,13 +414,8 @@ impl MutTxId {
414414
TxTableForInsertion<'_>,
415415
(&mut Table, &mut dyn BlobStore, &mut IndexIdMap),
416416
)> {
417-
let (commit_table, commit_bs, idx_map) = self.committed_state_write_lock.get_table_and_blob_store_mut(table_id);
418-
// NOTE(centril): `TableError` is a fairly large type.
419-
// Not making this lazy made `TableError::drop` show up in perf.
420-
// TODO(centril): Box all the errors.
421-
#[allow(clippy::unnecessary_lazy_evaluations)]
422-
let commit_table = commit_table.ok_or_else(|| TableError::IdNotFoundState(table_id))?;
423-
417+
let (commit_table, commit_bs, idx_map, _) =
418+
self.committed_state_write_lock.get_table_and_blob_store_mut(table_id)?;
424419
// Get the insert table, so we can write the row into it.
425420
let tx = self
426421
.tx_state

0 commit comments

Comments
 (0)