Skip to content

Commit 431f7bb

Browse files
committed
decided on truncate after processing inserts
1 parent f44dcfa commit 431f7bb

5 files changed

Lines changed: 76 additions & 64 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use futures::channel::mpsc;
1010
use futures::StreamExt;
1111
use parking_lot::RwLock;
1212
use spacetimedb_commitlog as commitlog;
13+
use spacetimedb_data_structures::map::IntSet;
1314
use spacetimedb_datastore::db_metrics::DB_METRICS;
1415
use spacetimedb_datastore::error::{DatastoreError, TableError};
1516
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
@@ -23,7 +24,7 @@ use spacetimedb_datastore::system_tables::{system_tables, StModuleRow};
2324
use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID};
2425
use spacetimedb_datastore::traits::{
2526
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
26-
TxTableTruncated, UpdateFlags,
27+
UpdateFlags,
2728
};
2829
use spacetimedb_datastore::{
2930
locking_tx_datastore::{
@@ -889,18 +890,17 @@ impl RelationalDB {
889890
rowdata: rowdata.clone(),
890891
})
891892
.collect();
893+
894+
let truncates: IntSet<TableId> = tx_data.truncates().collect();
895+
892896
let deletes: Box<_> = tx_data
893897
.deletes()
894-
.filter(|(_, truncated, _)| *truncated == TxTableTruncated::No)
895-
.map(|(table_id, _, rowdata)| Ops {
898+
.map(|(table_id, rowdata)| Ops {
896899
table_id: *table_id,
897900
rowdata: rowdata.clone(),
898901
})
899-
.collect();
900-
let truncates: Box<_> = tx_data
901-
.deletes()
902-
.filter(|(_, truncated, _)| *truncated == TxTableTruncated::Yes)
903-
.map(|(table_id, ..)| *table_id)
902+
// filter out deletes for tables that are truncated in the same transaction.
903+
.filter(|ops| !truncates.contains(&ops.table_id))
904904
.collect();
905905

906906
let inputs = reducer_context.map(|rcx| rcx.into());
@@ -911,7 +911,7 @@ impl RelationalDB {
911911
mutations: Some(Mutations {
912912
inserts,
913913
deletes,
914-
truncates,
914+
truncates: truncates.into_iter().collect(),
915915
}),
916916
};
917917

crates/core/src/subscription/tx.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,9 @@ impl DeltaTableIndexes {
7676
indexes
7777
}
7878

79-
let deletes = data.deletes().map(|(table_id, _, rows)| (table_id, rows));
8079
Self {
8180
inserts: build_indexes_for_rows(tx, meta, data.inserts()),
82-
deletes: build_indexes_for_rows(tx, meta, deletes),
81+
deletes: build_indexes_for_rows(tx, meta, data.deletes()),
8382
}
8483
}
8584
}

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -610,13 +610,27 @@ impl CommittedState {
610610

611611
pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData {
612612
let mut tx_data = TxData::default();
613+
let mut truncates = IntSet::default();
613614

614615
// First, apply deletes. This will free up space in the committed tables.
615-
self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables, tx_state.pending_schema_changes);
616+
self.merge_apply_deletes(
617+
&mut tx_data,
618+
tx_state.delete_tables,
619+
tx_state.pending_schema_changes,
620+
&mut truncates,
621+
);
616622

617623
// Then, apply inserts. This will re-fill the holes freed by deletions
618624
// before allocating new pages.
619-
self.merge_apply_inserts(&mut tx_data, tx_state.insert_tables, tx_state.blob_store);
625+
self.merge_apply_inserts(
626+
&mut tx_data,
627+
tx_state.insert_tables,
628+
tx_state.blob_store,
629+
&mut truncates,
630+
);
631+
632+
// Record any truncated tables in the `TxData`.
633+
tx_data.add_truncates(truncates);
620634

621635
// If the TX will be logged, record its projected tx offset,
622636
// then increment the counter.
@@ -633,6 +647,7 @@ impl CommittedState {
633647
tx_data: &mut TxData,
634648
delete_tables: BTreeMap<TableId, DeleteTable>,
635649
pending_schema_changes: ThinVec<PendingSchemaChange>,
650+
truncates: &mut IntSet<TableId>,
636651
) {
637652
fn delete_rows(
638653
tx_data: &mut TxData,
@@ -641,6 +656,7 @@ impl CommittedState {
641656
blob_store: &mut dyn BlobStore,
642657
row_ptrs_len: usize,
643658
row_ptrs: impl Iterator<Item = RowPointer>,
659+
truncates: &mut IntSet<TableId>,
644660
) {
645661
let mut deletes = Vec::with_capacity(row_ptrs_len);
646662

@@ -660,16 +676,25 @@ impl CommittedState {
660676

661677
if !deletes.is_empty() {
662678
let table_name = &*table.get_schema().table_name;
679+
tx_data.set_deletes_for_table(table_id, table_name, deletes.into());
663680
let truncated = table.row_count == 0;
664-
tx_data.set_deletes_for_table(table_id, table_name, deletes.into(), truncated);
681+
if truncated {
682+
truncates.insert(table_id);
683+
}
665684
}
666685
}
667686

668687
for (table_id, row_ptrs) in delete_tables {
669688
match self.get_table_and_blob_store_mut(table_id) {
670-
Ok((table, blob_store, ..)) => {
671-
delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter())
672-
}
689+
Ok((table, blob_store, ..)) => delete_rows(
690+
tx_data,
691+
table_id,
692+
table,
693+
blob_store,
694+
row_ptrs.len(),
695+
row_ptrs.iter(),
696+
truncates,
697+
),
673698
Err(_) if !row_ptrs.is_empty() => panic!("Deletion for non-existent table {table_id:?}... huh?"),
674699
Err(_) => {}
675700
}
@@ -681,13 +706,15 @@ impl CommittedState {
681706
for change in pending_schema_changes {
682707
if let PendingSchemaChange::TableRemoved(table_id, mut table) = change {
683708
let row_ptrs = table.scan_all_row_ptrs();
709+
truncates.insert(table_id);
684710
delete_rows(
685711
tx_data,
686712
table_id,
687713
&mut table,
688714
&mut self.blob_store,
689715
row_ptrs.len(),
690716
row_ptrs.into_iter(),
717+
truncates,
691718
);
692719
}
693720
}
@@ -698,6 +725,7 @@ impl CommittedState {
698725
tx_data: &mut TxData,
699726
insert_tables: BTreeMap<TableId, Table>,
700727
tx_blob_store: impl BlobStore,
728+
truncates: &mut IntSet<TableId>,
701729
) {
702730
// TODO(perf): Consider moving whole pages from the `insert_tables` into the committed state,
703731
// rather than copying individual rows out of them.
@@ -726,6 +754,11 @@ impl CommittedState {
726754
if !inserts.is_empty() {
727755
let table_name = &*commit_table.get_schema().table_name;
728756
tx_data.set_inserts_for_table(table_id, table_name, inserts.into());
757+
758+
// if table has inserted rows, it cannot be truncated
759+
if truncates.contains(&table_id) {
760+
truncates.remove(&table_id);
761+
}
729762
}
730763

731764
let (schema, _indexes, pages) = tx_table.consume_for_merge();

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,7 +1231,7 @@ mod tests {
12311231
ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, ST_SEQUENCE_NAME,
12321232
ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME,
12331233
};
1234-
use crate::traits::{IsolationLevel, MutTx, TxTableTruncated};
1234+
use crate::traits::{IsolationLevel, MutTx};
12351235
use crate::Result;
12361236
use bsatn::to_vec;
12371237
use core::{fmt, mem};
@@ -3201,12 +3201,12 @@ mod tests {
32013201
// Now drop the table again and commit.
32023202
assert!(datastore.drop_table_mut_tx(&mut tx, table_id).is_ok());
32033203
let tx_data = commit(&datastore, tx)?;
3204-
let (_, truncated, deleted_rows) = tx_data
3204+
let (_, deleted_rows) = tx_data
32053205
.deletes()
32063206
.find(|(id, ..)| **id == table_id)
32073207
.expect("should have deleted rows for `table_id`");
32083208
assert_eq!(&**deleted_rows, [row]);
3209-
assert_eq!(truncated, TxTableTruncated::Yes);
3209+
assert!(tx_data.truncates().contains(&table_id), "table should be truncated");
32103210

32113211
// In the next transaction, the table doesn't exist.
32123212
assert!(
@@ -3410,9 +3410,9 @@ mod tests {
34103410
let to_product = |col: &ColumnSchema| value_serialize(&StColumnRow::from(col.clone())).into_product().unwrap();
34113411
let (_, inserts) = tx_data.inserts().find(|(id, _)| **id == ST_COLUMN_ID).unwrap();
34123412
assert_eq!(&**inserts, [to_product(&columns[1])].as_slice());
3413-
let (_, truncated, deletes) = tx_data.deletes().find(|(id, ..)| **id == ST_COLUMN_ID).unwrap();
3413+
let (_, deletes) = tx_data.deletes().find(|(id, ..)| **id == ST_COLUMN_ID).unwrap();
34143414
assert_eq!(&**deletes, [to_product(&columns_original[1])].as_slice());
3415-
assert_eq!(truncated, TxTableTruncated::No);
3415+
// assert_eq!(truncated, TxTableTruncated::No);
34163416

34173417
// Check that we can successfully scan using the new schema type post commit.
34183418
let tx = begin_tx(&datastore);

crates/datastore/src/traits.rs

Lines changed: 22 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use super::system_tables::ModuleKind;
88
use super::Result;
99
use crate::execution_context::{ReducerContext, Workload};
1010
use crate::system_tables::ST_TABLE_ID;
11-
use spacetimedb_data_structures::map::IntMap;
11+
use spacetimedb_data_structures::map::{IntMap, IntSet};
1212
use spacetimedb_durability::TxOffset;
1313
use spacetimedb_lib::{hash_bytes, Identity};
1414
use spacetimedb_primitives::*;
@@ -175,9 +175,13 @@ pub struct TxData {
175175
/// The inserted rows per table.
176176
inserts: BTreeMap<TableId, Arc<[ProductValue]>>,
177177
/// The deleted rows per table.
178+
deletes: BTreeMap<TableId, Arc<[ProductValue]>>,
179+
180+
/// The set of `TableId`s for tables that have been truncated.
178181
///
179-
/// Also stores per table whether it has been truncated.
180-
deletes: BTreeMap<TableId, TxDeleteEntry>,
182+
/// “Truncating” means that all rows in the table have been deleted, or the table
183+
/// itself has been dropped
184+
truncates: IntSet<TableId>,
181185
/// Map of all `TableId`s in both `inserts` and `deletes` to their
182186
/// corresponding table name.
183187
// TODO: Store table name as ref counted string.
@@ -189,24 +193,6 @@ pub struct TxData {
189193
tx_offset: Option<u64>,
190194
}
191195

192-
/// A record of a list of deletes for and potential truncation of a table,
193-
/// within a transaction.
194-
pub struct TxDeleteEntry {
195-
/// Were all rows previously in the table deleted within this transaction?
196-
truncated: TxTableTruncated,
197-
/// The deleted rows of the table.
198-
rows: Arc<[ProductValue]>,
199-
}
200-
201-
/// Whether a table was truncated in a transaction.
202-
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
203-
pub enum TxTableTruncated {
204-
/// It was truncated.
205-
Yes,
206-
/// It was not truncated.
207-
No,
208-
}
209-
210196
impl TxData {
211197
/// Set `tx_offset` as the expected on-disk transaction offset of this transaction.
212198
pub fn set_tx_offset(&mut self, tx_offset: u64) {
@@ -231,23 +217,15 @@ impl TxData {
231217
/// Set `rows` as the deleted rows for `(table_id, table_name)`.
232218
///
233219
/// When `truncated` is set, the table has been emptied in this transaction.
234-
pub fn set_deletes_for_table(
235-
&mut self,
236-
table_id: TableId,
237-
table_name: &str,
238-
rows: Arc<[ProductValue]>,
239-
truncated: bool,
240-
) {
241-
let truncated = if truncated {
242-
TxTableTruncated::Yes
243-
} else {
244-
TxTableTruncated::No
245-
};
246-
let entry = TxDeleteEntry { truncated, rows };
247-
self.deletes.insert(table_id, entry);
220+
pub fn set_deletes_for_table(&mut self, table_id: TableId, table_name: &str, rows: Arc<[ProductValue]>) {
221+
self.deletes.insert(table_id, rows);
248222
self.tables.entry(table_id).or_insert_with(|| table_name.to_owned());
249223
}
250224

225+
pub fn add_truncates(&mut self, truncated_tables: impl IntoIterator<Item = TableId>) {
226+
self.truncates.extend(truncated_tables);
227+
}
228+
251229
/// Obtain an iterator over the inserted rows per table.
252230
pub fn inserts(&self) -> impl Iterator<Item = (&TableId, &Arc<[ProductValue]>)> + '_ {
253231
self.inserts.iter()
@@ -273,31 +251,33 @@ impl TxData {
273251
}
274252

275253
/// Obtain an iterator over the deleted rows per table.
276-
pub fn deletes(&self) -> impl Iterator<Item = (&TableId, TxTableTruncated, &Arc<[ProductValue]>)> + '_ {
277-
self.deletes
278-
.iter()
279-
.map(|(table_id, entry)| (table_id, entry.truncated, &entry.rows))
254+
pub fn deletes(&self) -> impl Iterator<Item = (&TableId, &Arc<[ProductValue]>)> + '_ {
255+
self.deletes.iter()
280256
}
281257

282258
/// Get the `i`th deleted row for `table_id` if it exists
283259
pub fn get_ith_delete(&self, table_id: TableId, i: usize) -> Option<&ProductValue> {
284-
self.deletes.get(&table_id).and_then(|entry| entry.rows.get(i))
260+
self.deletes.get(&table_id).and_then(|rows| rows.get(i))
285261
}
286262

287263
/// Obtain an iterator over the inserted rows per table.
288264
///
289265
/// If you don't need access to the table name, [`Self::deletes`] is
290266
/// slightly more efficient.
291267
pub fn deletes_with_table_name(&self) -> impl Iterator<Item = (&TableId, &str, &Arc<[ProductValue]>)> + '_ {
292-
self.deletes.iter().map(|(table_id, entry)| {
268+
self.deletes.iter().map(|(table_id, rows)| {
293269
let table_name = self
294270
.tables
295271
.get(table_id)
296272
.expect("invalid `TxData`: partial table name mapping");
297-
(table_id, table_name.as_str(), &entry.rows)
273+
(table_id, table_name.as_str(), rows)
298274
})
299275
}
300276

277+
pub fn truncates(&self) -> impl Iterator<Item = TableId> + '_ {
278+
self.truncates.iter().copied()
279+
}
280+
301281
/// Check if this [`TxData`] contains any `inserted | deleted` rows or `connect/disconnect` operations.
302282
///
303283
/// This is used to determine if a transaction should be written to disk.

0 commit comments

Comments
 (0)