Skip to content

Commit f44dcfa

Browse files
CentrilShubham8287
authored andcommitted
datastore: add truncation support
1 parent 1903e1c commit f44dcfa

6 files changed

Lines changed: 143 additions & 60 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use spacetimedb_datastore::system_tables::{system_tables, StModuleRow};
2323
use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID};
2424
use spacetimedb_datastore::traits::{
2525
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
26-
UpdateFlags,
26+
TxTableTruncated, UpdateFlags,
2727
};
2828
use spacetimedb_datastore::{
2929
locking_tx_datastore::{
@@ -891,11 +891,17 @@ impl RelationalDB {
891891
.collect();
892892
let deletes: Box<_> = tx_data
893893
.deletes()
894-
.map(|(table_id, rowdata)| Ops {
894+
.filter(|(_, truncated, _)| *truncated == TxTableTruncated::No)
895+
.map(|(table_id, _, rowdata)| Ops {
895896
table_id: *table_id,
896897
rowdata: rowdata.clone(),
897898
})
898899
.collect();
900+
let truncates: Box<_> = tx_data
901+
.deletes()
902+
.filter(|(_, truncated, _)| *truncated == TxTableTruncated::Yes)
903+
.map(|(table_id, ..)| *table_id)
904+
.collect();
899905

900906
let inputs = reducer_context.map(|rcx| rcx.into());
901907

@@ -905,7 +911,7 @@ impl RelationalDB {
905911
mutations: Some(Mutations {
906912
inserts,
907913
deletes,
908-
truncates: [].into(),
914+
truncates,
909915
}),
910916
};
911917

crates/core/src/subscription/tx.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,10 @@ impl DeltaTableIndexes {
7676
indexes
7777
}
7878

79+
let deletes = data.deletes().map(|(table_id, _, rows)| (table_id, rows));
7980
Self {
8081
inserts: build_indexes_for_rows(tx, meta, data.inserts()),
81-
deletes: build_indexes_for_rows(tx, meta, data.deletes()),
82+
deletes: build_indexes_for_rows(tx, meta, deletes),
8283
}
8384
}
8485
}
@@ -133,8 +134,8 @@ impl DeltaStore for DeltaTx<'_> {
133134
self.data
134135
.and_then(|data| {
135136
data.inserts()
136-
.find(|(id, _)| **id == table_id)
137-
.map(|(_, rows)| rows.len())
137+
.find(|(id, ..)| **id == table_id)
138+
.map(|(.., rows)| rows.len())
138139
})
139140
.unwrap_or_default()
140141
}
@@ -143,25 +144,25 @@ impl DeltaStore for DeltaTx<'_> {
143144
self.data
144145
.and_then(|data| {
145146
data.deletes()
146-
.find(|(id, _)| **id == table_id)
147-
.map(|(_, rows)| rows.len())
147+
.find(|(id, ..)| **id == table_id)
148+
.map(|(.., rows)| rows.len())
148149
})
149150
.unwrap_or_default()
150151
}
151152

152153
fn inserts_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
153154
self.data.and_then(|data| {
154155
data.inserts()
155-
.find(|(id, _)| **id == table_id)
156-
.map(|(_, rows)| rows.iter())
156+
.find(|(id, ..)| **id == table_id)
157+
.map(|(.., rows)| rows.iter())
157158
})
158159
}
159160

160161
fn deletes_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
161162
self.data.and_then(|data| {
162163
data.deletes()
163-
.find(|(id, _)| **id == table_id)
164-
.map(|(_, rows)| rows.iter())
164+
.find(|(id, ..)| **id == table_id)
165+
.map(|(.., rows)| rows.iter())
165166
})
166167
}
167168

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -315,19 +315,35 @@ impl CommittedState {
315315
Ok(())
316316
}
317317

318+
pub(super) fn replay_truncate(&mut self, table_id: TableId) -> Result<()> {
319+
// (1) Table dropped? Avoid an error and just ignore the row instead.
320+
if self.table_dropped.contains(&table_id) {
321+
return Ok(());
322+
}
323+
324+
// Get the table for mutation.
325+
let (table, blob_store, ..) = self.get_table_and_blob_store_mut(table_id)?;
326+
327+
// We do not need to consider a truncation of `st_table` itself,
328+
// as if that happens, the database is bricked.
329+
330+
table.clear(blob_store);
331+
332+
Ok(())
333+
}
334+
318335
pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, row: &ProductValue) -> Result<()> {
336+
// (1) Table dropped? Avoid an error and just ignore the row instead.
337+
if self.table_dropped.contains(&table_id) {
338+
return Ok(());
339+
}
340+
319341
// Get the table for mutation.
320-
let table = match self.tables.get_mut(&table_id) {
321-
Some(t) => t,
322-
// (1) If it was dropped, avoid an error and just ignore the row instead.
323-
None if self.table_dropped.contains(&table_id) => return Ok(()),
324-
None => return Err(TableError::IdNotFoundState(table_id).into()),
325-
};
342+
let (table, blob_store, _, page_pool) = self.get_table_and_blob_store_mut(table_id)?;
326343

327344
// Delete the row.
328-
let blob_store = &mut self.blob_store;
329345
table
330-
.delete_equal_row(&self.page_pool, blob_store, row)
346+
.delete_equal_row(page_pool, blob_store, row)
331347
.map_err(TableError::Bflatn)?
332348
.ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?;
333349

@@ -461,9 +477,9 @@ impl CommittedState {
461477
for index_row in rows {
462478
let index_id = index_row.index_id;
463479
let table_id = index_row.table_id;
464-
let (Some(table), blob_store, index_id_map) = self.get_table_and_blob_store_mut(table_id) else {
465-
panic!("Cannot create index for table which doesn't exist in committed state");
466-
};
480+
let (table, blob_store, index_id_map, _) = self
481+
.get_table_and_blob_store_mut(table_id)
482+
.expect("index should exist in committed state; cannot create it");
467483
let algo: IndexAlgorithm = index_row.index_algorithm.into();
468484
let columns: ColSet = algo.columns().into();
469485
let is_unique = unique_constraints.contains(&(table_id, columns));
@@ -564,8 +580,7 @@ impl CommittedState {
564580
"Cannot get TX_STATE RowPointer from CommittedState.",
565581
);
566582
let table = self
567-
.tables
568-
.get(&table_id)
583+
.get_table(table_id)
569584
.expect("Attempt to get COMMITTED_STATE row from table not present in tables.");
570585
// TODO(perf, deep-integration): Use `get_row_ref_unchecked`.
571586
table.get_row_ref(&self.blob_store, row_ptr).unwrap()
@@ -645,17 +660,18 @@ impl CommittedState {
645660

646661
if !deletes.is_empty() {
647662
let table_name = &*table.get_schema().table_name;
648-
// TODO(centril): Pass this along to record truncated tables.
649-
let _truncated = table.row_count == 0;
650-
tx_data.set_deletes_for_table(table_id, table_name, deletes.into());
663+
let truncated = table.row_count == 0;
664+
tx_data.set_deletes_for_table(table_id, table_name, deletes.into(), truncated);
651665
}
652666
}
653667

654668
for (table_id, row_ptrs) in delete_tables {
655-
if let (Some(table), blob_store, _) = self.get_table_and_blob_store_mut(table_id) {
656-
delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter());
657-
} else if !row_ptrs.is_empty() {
658-
panic!("Deletion for non-existent table {table_id:?}... huh?");
669+
match self.get_table_and_blob_store_mut(table_id) {
670+
Ok((table, blob_store, ..)) => {
671+
delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter())
672+
}
673+
Err(_) if !row_ptrs.is_empty() => panic!("Deletion for non-existent table {table_id:?}... huh?"),
674+
Err(_) => {}
659675
}
660676
}
661677

@@ -835,12 +851,21 @@ impl CommittedState {
835851
pub(super) fn get_table_and_blob_store_mut(
836852
&mut self,
837853
table_id: TableId,
838-
) -> (Option<&mut Table>, &mut dyn BlobStore, &mut IndexIdMap) {
839-
(
840-
self.tables.get_mut(&table_id),
854+
) -> Result<(&mut Table, &mut dyn BlobStore, &mut IndexIdMap, &PagePool)> {
855+
// NOTE(centril): `TableError` is a fairly large type.
856+
// Not making this lazy made `TableError::drop` show up in perf.
857+
// TODO(centril): Box all the errors.
858+
#[allow(clippy::unnecessary_lazy_evaluations)]
859+
let table = self
860+
.tables
861+
.get_mut(&table_id)
862+
.ok_or_else(|| TableError::IdNotFoundState(table_id))?;
863+
Ok((
864+
table,
841865
&mut self.blob_store as &mut dyn BlobStore,
842866
&mut self.index_id_map,
843-
)
867+
&self.page_pool,
868+
))
844869
}
845870

846871
fn make_table(schema: Arc<TableSchema>) -> Table {

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,8 +1157,26 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi
11571157
Ok(row)
11581158
}
11591159

1160-
fn visit_truncate(&mut self, _table_id: TableId) -> std::result::Result<(), Self::Error> {
1161-
Err(anyhow!("visit: truncate not yet supported").into())
1160+
fn visit_truncate(&mut self, table_id: TableId) -> std::result::Result<(), Self::Error> {
1161+
let schema = self.committed_state.schema_for_table(table_id)?;
1162+
// TODO: avoid clone
1163+
let table_name = schema.table_name.clone();
1164+
1165+
self.committed_state.replay_truncate(table_id).with_context(|| {
1166+
format!(
1167+
"Error truncating table {:?} during transaction {:?} playback",
1168+
table_id, self.committed_state.next_tx_offset
1169+
)
1170+
})?;
1171+
1172+
// NOTE: the `rdb_num_table_rows` metric is used by the query optimizer,
1173+
// and therefore has performance implications and must not be disabled.
1174+
DB_METRICS
1175+
.rdb_num_table_rows
1176+
.with_label_values(self.database_identity, &table_id.into(), &table_name)
1177+
.set(0);
1178+
1179+
Ok(())
11621180
}
11631181

11641182
fn visit_tx_start(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
@@ -1213,7 +1231,7 @@ mod tests {
12131231
ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, ST_SEQUENCE_NAME,
12141232
ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME,
12151233
};
1216-
use crate::traits::{IsolationLevel, MutTx};
1234+
use crate::traits::{IsolationLevel, MutTx, TxTableTruncated};
12171235
use crate::Result;
12181236
use bsatn::to_vec;
12191237
use core::{fmt, mem};
@@ -2859,7 +2877,7 @@ mod tests {
28592877
let tx_data_2 = commit(&datastore, tx)?;
28602878
// Ensure that none of the commits deleted rows in our table.
28612879
for tx_data in [&tx_data_1, &tx_data_2] {
2862-
assert_eq!(tx_data.deletes().find(|(tid, _)| **tid == table_id), None);
2880+
assert_eq!(tx_data.deletes().find(|(tid, ..)| **tid == table_id), None);
28632881
}
28642882
// Ensure that the first commit added the row but that the second didn't.
28652883
for (tx_data, expected_rows) in [(&tx_data_1, vec![row.clone()]), (&tx_data_2, vec![])] {
@@ -3183,11 +3201,12 @@ mod tests {
31833201
// Now drop the table again and commit.
31843202
assert!(datastore.drop_table_mut_tx(&mut tx, table_id).is_ok());
31853203
let tx_data = commit(&datastore, tx)?;
3186-
let (_, deleted) = tx_data
3204+
let (_, truncated, deleted_rows) = tx_data
31873205
.deletes()
3188-
.find(|(id, _)| **id == table_id)
3206+
.find(|(id, ..)| **id == table_id)
31893207
.expect("should have deleted rows for `table_id`");
3190-
assert_eq!(&**deleted, [row]);
3208+
assert_eq!(&**deleted_rows, [row]);
3209+
assert_eq!(truncated, TxTableTruncated::Yes);
31913210

31923211
// In the next transaction, the table doesn't exist.
31933212
assert!(
@@ -3391,8 +3410,9 @@ mod tests {
33913410
let to_product = |col: &ColumnSchema| value_serialize(&StColumnRow::from(col.clone())).into_product().unwrap();
33923411
let (_, inserts) = tx_data.inserts().find(|(id, _)| **id == ST_COLUMN_ID).unwrap();
33933412
assert_eq!(&**inserts, [to_product(&columns[1])].as_slice());
3394-
let (_, deletes) = tx_data.deletes().find(|(id, _)| **id == ST_COLUMN_ID).unwrap();
3413+
let (_, truncated, deletes) = tx_data.deletes().find(|(id, ..)| **id == ST_COLUMN_ID).unwrap();
33953414
assert_eq!(&**deletes, [to_product(&columns_original[1])].as_slice());
3415+
assert_eq!(truncated, TxTableTruncated::No);
33963416

33973417
// Check that we can successfully scan using the new schema type post commit.
33983418
let tx = begin_tx(&datastore);

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -420,13 +420,8 @@ impl MutTxId {
420420
TxTableForInsertion<'_>,
421421
(&mut Table, &mut dyn BlobStore, &mut IndexIdMap),
422422
)> {
423-
let (commit_table, commit_bs, idx_map) = self.committed_state_write_lock.get_table_and_blob_store_mut(table_id);
424-
// NOTE(centril): `TableError` is a fairly large type.
425-
// Not making this lazy made `TableError::drop` show up in perf.
426-
// TODO(centril): Box all the errors.
427-
#[allow(clippy::unnecessary_lazy_evaluations)]
428-
let commit_table = commit_table.ok_or_else(|| TableError::IdNotFoundState(table_id))?;
429-
423+
let (commit_table, commit_bs, idx_map, _) =
424+
self.committed_state_write_lock.get_table_and_blob_store_mut(table_id)?;
430425
// Get the insert table, so we can write the row into it.
431426
let tx = self
432427
.tx_state

0 commit comments

Comments
 (0)