Skip to content

Commit ddbebbc

Browse files
CentrilShubham8287
andauthored
datastore: add support for truncation (#3215)
# Description of Changes Adds support for truncation in the datastore. The commitlog format already supported it. A table is considered truncated in a transaction when there are deletes and when the row count post-deletes is 0. # API and ABI breaking changes None # Expected complexity level and risk 3? # Testing Some existing datastore tests have been amended to check whether `TxData` contain truncation or not. --------- Signed-off-by: Mazdak Farrokhzad <twingoow@gmail.com> Co-authored-by: Shubham Mishra <shubham@clockworklabs.io> Co-authored-by: Shubham Mishra <shivam828787@gmail.com>
1 parent e93de33 commit ddbebbc

6 files changed

Lines changed: 174 additions & 51 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use futures::channel::mpsc;
1010
use futures::StreamExt;
1111
use parking_lot::RwLock;
1212
use spacetimedb_commitlog as commitlog;
13+
use spacetimedb_data_structures::map::IntSet;
1314
use spacetimedb_datastore::db_metrics::DB_METRICS;
1415
use spacetimedb_datastore::error::{DatastoreError, TableError};
1516
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
@@ -889,12 +890,17 @@ impl RelationalDB {
889890
rowdata: rowdata.clone(),
890891
})
891892
.collect();
893+
894+
let truncates: IntSet<TableId> = tx_data.truncates().collect();
895+
892896
let deletes: Box<_> = tx_data
893897
.deletes()
894898
.map(|(table_id, rowdata)| Ops {
895899
table_id: *table_id,
896900
rowdata: rowdata.clone(),
897901
})
902+
// filter out deletes for tables that are truncated in the same transaction.
903+
.filter(|ops| !truncates.contains(&ops.table_id))
898904
.collect();
899905

900906
let inputs = reducer_context.map(|rcx| rcx.into());
@@ -905,7 +911,7 @@ impl RelationalDB {
905911
mutations: Some(Mutations {
906912
inserts,
907913
deletes,
908-
truncates: [].into(),
914+
truncates: truncates.into_iter().collect(),
909915
}),
910916
};
911917

crates/core/src/subscription/tx.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,8 @@ impl DeltaStore for DeltaTx<'_> {
133133
self.data
134134
.and_then(|data| {
135135
data.inserts()
136-
.find(|(id, _)| **id == table_id)
137-
.map(|(_, rows)| rows.len())
136+
.find(|(id, ..)| **id == table_id)
137+
.map(|(.., rows)| rows.len())
138138
})
139139
.unwrap_or_default()
140140
}
@@ -143,25 +143,25 @@ impl DeltaStore for DeltaTx<'_> {
143143
self.data
144144
.and_then(|data| {
145145
data.deletes()
146-
.find(|(id, _)| **id == table_id)
147-
.map(|(_, rows)| rows.len())
146+
.find(|(id, ..)| **id == table_id)
147+
.map(|(.., rows)| rows.len())
148148
})
149149
.unwrap_or_default()
150150
}
151151

152152
fn inserts_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
153153
self.data.and_then(|data| {
154154
data.inserts()
155-
.find(|(id, _)| **id == table_id)
156-
.map(|(_, rows)| rows.iter())
155+
.find(|(id, ..)| **id == table_id)
156+
.map(|(.., rows)| rows.iter())
157157
})
158158
}
159159

160160
fn deletes_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
161161
self.data.and_then(|data| {
162162
data.deletes()
163-
.find(|(id, _)| **id == table_id)
164-
.map(|(_, rows)| rows.iter())
163+
.find(|(id, ..)| **id == table_id)
164+
.map(|(.., rows)| rows.iter())
165165
})
166166
}
167167

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 82 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -315,19 +315,35 @@ impl CommittedState {
315315
Ok(())
316316
}
317317

318+
pub(super) fn replay_truncate(&mut self, table_id: TableId) -> Result<()> {
319+
// (1) Table dropped? Avoid an error and just ignore the row instead.
320+
if self.table_dropped.contains(&table_id) {
321+
return Ok(());
322+
}
323+
324+
// Get the table for mutation.
325+
let (table, blob_store, ..) = self.get_table_and_blob_store_mut(table_id)?;
326+
327+
// We do not need to consider a truncation of `st_table` itself,
328+
// as if that happens, the database is bricked.
329+
330+
table.clear(blob_store);
331+
332+
Ok(())
333+
}
334+
318335
pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, row: &ProductValue) -> Result<()> {
336+
// (1) Table dropped? Avoid an error and just ignore the row instead.
337+
if self.table_dropped.contains(&table_id) {
338+
return Ok(());
339+
}
340+
319341
// Get the table for mutation.
320-
let table = match self.tables.get_mut(&table_id) {
321-
Some(t) => t,
322-
// (1) If it was dropped, avoid an error and just ignore the row instead.
323-
None if self.table_dropped.contains(&table_id) => return Ok(()),
324-
None => return Err(TableError::IdNotFoundState(table_id).into()),
325-
};
342+
let (table, blob_store, _, page_pool) = self.get_table_and_blob_store_mut(table_id)?;
326343

327344
// Delete the row.
328-
let blob_store = &mut self.blob_store;
329345
table
330-
.delete_equal_row(&self.page_pool, blob_store, row)
346+
.delete_equal_row(page_pool, blob_store, row)
331347
.map_err(TableError::Bflatn)?
332348
.ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?;
333349

@@ -461,9 +477,9 @@ impl CommittedState {
461477
for index_row in rows {
462478
let index_id = index_row.index_id;
463479
let table_id = index_row.table_id;
464-
let (Some(table), blob_store, index_id_map) = self.get_table_and_blob_store_mut(table_id) else {
465-
panic!("Cannot create index for table which doesn't exist in committed state");
466-
};
480+
let (table, blob_store, index_id_map, _) = self
481+
.get_table_and_blob_store_mut(table_id)
482+
.expect("index should exist in committed state; cannot create it");
467483
let algo: IndexAlgorithm = index_row.index_algorithm.into();
468484
let columns: ColSet = algo.columns().into();
469485
let is_unique = unique_constraints.contains(&(table_id, columns));
@@ -564,8 +580,7 @@ impl CommittedState {
564580
"Cannot get TX_STATE RowPointer from CommittedState.",
565581
);
566582
let table = self
567-
.tables
568-
.get(&table_id)
583+
.get_table(table_id)
569584
.expect("Attempt to get COMMITTED_STATE row from table not present in tables.");
570585
// TODO(perf, deep-integration): Use `get_row_ref_unchecked`.
571586
table.get_row_ref(&self.blob_store, row_ptr).unwrap()
@@ -595,13 +610,27 @@ impl CommittedState {
595610

596611
pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData {
597612
let mut tx_data = TxData::default();
613+
let mut truncates = IntSet::default();
598614

599615
// First, apply deletes. This will free up space in the committed tables.
600-
self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables, tx_state.pending_schema_changes);
616+
self.merge_apply_deletes(
617+
&mut tx_data,
618+
tx_state.delete_tables,
619+
tx_state.pending_schema_changes,
620+
&mut truncates,
621+
);
601622

602623
// Then, apply inserts. This will re-fill the holes freed by deletions
603624
// before allocating new pages.
604-
self.merge_apply_inserts(&mut tx_data, tx_state.insert_tables, tx_state.blob_store);
625+
self.merge_apply_inserts(
626+
&mut tx_data,
627+
tx_state.insert_tables,
628+
tx_state.blob_store,
629+
&mut truncates,
630+
);
631+
632+
// Record any truncated tables in the `TxData`.
633+
tx_data.add_truncates(truncates);
605634

606635
// If the TX will be logged, record its projected tx offset,
607636
// then increment the counter.
@@ -618,6 +647,7 @@ impl CommittedState {
618647
tx_data: &mut TxData,
619648
delete_tables: BTreeMap<TableId, DeleteTable>,
620649
pending_schema_changes: ThinVec<PendingSchemaChange>,
650+
truncates: &mut IntSet<TableId>,
621651
) {
622652
fn delete_rows(
623653
tx_data: &mut TxData,
@@ -626,6 +656,7 @@ impl CommittedState {
626656
blob_store: &mut dyn BlobStore,
627657
row_ptrs_len: usize,
628658
row_ptrs: impl Iterator<Item = RowPointer>,
659+
truncates: &mut IntSet<TableId>,
629660
) {
630661
let mut deletes = Vec::with_capacity(row_ptrs_len);
631662

@@ -645,17 +676,27 @@ impl CommittedState {
645676

646677
if !deletes.is_empty() {
647678
let table_name = &*table.get_schema().table_name;
648-
// TODO(centril): Pass this along to record truncated tables.
649-
let _truncated = table.row_count == 0;
650679
tx_data.set_deletes_for_table(table_id, table_name, deletes.into());
680+
let truncated = table.row_count == 0;
681+
if truncated {
682+
truncates.insert(table_id);
683+
}
651684
}
652685
}
653686

654687
for (table_id, row_ptrs) in delete_tables {
655-
if let (Some(table), blob_store, _) = self.get_table_and_blob_store_mut(table_id) {
656-
delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter());
657-
} else if !row_ptrs.is_empty() {
658-
panic!("Deletion for non-existent table {table_id:?}... huh?");
688+
match self.get_table_and_blob_store_mut(table_id) {
689+
Ok((table, blob_store, ..)) => delete_rows(
690+
tx_data,
691+
table_id,
692+
table,
693+
blob_store,
694+
row_ptrs.len(),
695+
row_ptrs.iter(),
696+
truncates,
697+
),
698+
Err(_) if !row_ptrs.is_empty() => panic!("Deletion for non-existent table {table_id:?}... huh?"),
699+
Err(_) => {}
659700
}
660701
}
661702

@@ -672,6 +713,7 @@ impl CommittedState {
672713
&mut self.blob_store,
673714
row_ptrs.len(),
674715
row_ptrs.into_iter(),
716+
truncates,
675717
);
676718
}
677719
}
@@ -682,6 +724,7 @@ impl CommittedState {
682724
tx_data: &mut TxData,
683725
insert_tables: BTreeMap<TableId, Table>,
684726
tx_blob_store: impl BlobStore,
727+
truncates: &mut IntSet<TableId>,
685728
) {
686729
// TODO(perf): Consider moving whole pages from the `insert_tables` into the committed state,
687730
// rather than copying individual rows out of them.
@@ -710,6 +753,11 @@ impl CommittedState {
710753
if !inserts.is_empty() {
711754
let table_name = &*commit_table.get_schema().table_name;
712755
tx_data.set_inserts_for_table(table_id, table_name, inserts.into());
756+
757+
// if table has inserted rows, it cannot be truncated
758+
if truncates.contains(&table_id) {
759+
truncates.remove(&table_id);
760+
}
713761
}
714762

715763
let (schema, _indexes, pages) = tx_table.consume_for_merge();
@@ -835,12 +883,21 @@ impl CommittedState {
835883
pub(super) fn get_table_and_blob_store_mut(
836884
&mut self,
837885
table_id: TableId,
838-
) -> (Option<&mut Table>, &mut dyn BlobStore, &mut IndexIdMap) {
839-
(
840-
self.tables.get_mut(&table_id),
886+
) -> Result<(&mut Table, &mut dyn BlobStore, &mut IndexIdMap, &PagePool)> {
887+
// NOTE(centril): `TableError` is a fairly large type.
888+
// Not making this lazy made `TableError::drop` show up in perf.
889+
// TODO(centril): Box all the errors.
890+
#[allow(clippy::unnecessary_lazy_evaluations)]
891+
let table = self
892+
.tables
893+
.get_mut(&table_id)
894+
.ok_or_else(|| TableError::IdNotFoundState(table_id))?;
895+
Ok((
896+
table,
841897
&mut self.blob_store as &mut dyn BlobStore,
842898
&mut self.index_id_map,
843-
)
899+
&self.page_pool,
900+
))
844901
}
845902

846903
fn make_table(schema: Arc<TableSchema>) -> Table {

0 commit comments

Comments
 (0)