Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use futures::channel::mpsc;
use futures::StreamExt;
use parking_lot::RwLock;
use spacetimedb_commitlog as commitlog;
use spacetimedb_data_structures::map::IntSet;
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::error::{DatastoreError, TableError};
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
Expand All @@ -23,7 +24,7 @@ use spacetimedb_datastore::system_tables::{system_tables, StModuleRow};
use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID};
use spacetimedb_datastore::traits::{
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
TxTableTruncated, UpdateFlags,
UpdateFlags,
};
use spacetimedb_datastore::{
locking_tx_datastore::{
Expand Down Expand Up @@ -889,18 +890,17 @@ impl RelationalDB {
rowdata: rowdata.clone(),
})
.collect();

let truncates: IntSet<TableId> = tx_data.truncates().collect();

let deletes: Box<_> = tx_data
.deletes()
.filter(|(_, truncated, _)| *truncated == TxTableTruncated::No)
.map(|(table_id, _, rowdata)| Ops {
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
})
.collect();
let truncates: Box<_> = tx_data
.deletes()
.filter(|(_, truncated, _)| *truncated == TxTableTruncated::Yes)
.map(|(table_id, ..)| *table_id)
// filter out deletes for tables that are truncated in the same transaction.
.filter(|ops| !truncates.contains(&ops.table_id))
.collect();

let inputs = reducer_context.map(|rcx| rcx.into());
Expand All @@ -911,7 +911,7 @@ impl RelationalDB {
mutations: Some(Mutations {
inserts,
deletes,
truncates,
truncates: truncates.into_iter().collect(),
}),
};

Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/subscription/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,9 @@ impl DeltaTableIndexes {
indexes
}

let deletes = data.deletes().map(|(table_id, _, rows)| (table_id, rows));
Self {
inserts: build_indexes_for_rows(tx, meta, data.inserts()),
deletes: build_indexes_for_rows(tx, meta, deletes),
deletes: build_indexes_for_rows(tx, meta, data.deletes()),
}
}
}
Expand Down
45 changes: 39 additions & 6 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,13 +610,27 @@ impl CommittedState {

pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData {
let mut tx_data = TxData::default();
let mut truncates = IntSet::default();

// First, apply deletes. This will free up space in the committed tables.
self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables, tx_state.pending_schema_changes);
self.merge_apply_deletes(
&mut tx_data,
tx_state.delete_tables,
tx_state.pending_schema_changes,
&mut truncates,
);

// Then, apply inserts. This will re-fill the holes freed by deletions
// before allocating new pages.
self.merge_apply_inserts(&mut tx_data, tx_state.insert_tables, tx_state.blob_store);
self.merge_apply_inserts(
&mut tx_data,
tx_state.insert_tables,
tx_state.blob_store,
&mut truncates,
);

// Record any truncated tables in the `TxData`.
tx_data.add_truncates(truncates);

// If the TX will be logged, record its projected tx offset,
// then increment the counter.
Expand All @@ -633,6 +647,7 @@ impl CommittedState {
tx_data: &mut TxData,
delete_tables: BTreeMap<TableId, DeleteTable>,
pending_schema_changes: ThinVec<PendingSchemaChange>,
truncates: &mut IntSet<TableId>,
) {
fn delete_rows(
tx_data: &mut TxData,
Expand All @@ -641,6 +656,7 @@ impl CommittedState {
blob_store: &mut dyn BlobStore,
row_ptrs_len: usize,
row_ptrs: impl Iterator<Item = RowPointer>,
truncates: &mut IntSet<TableId>,
) {
let mut deletes = Vec::with_capacity(row_ptrs_len);

Expand All @@ -660,16 +676,25 @@ impl CommittedState {

if !deletes.is_empty() {
let table_name = &*table.get_schema().table_name;
tx_data.set_deletes_for_table(table_id, table_name, deletes.into());
let truncated = table.row_count == 0;
tx_data.set_deletes_for_table(table_id, table_name, deletes.into(), truncated);
if truncated {
truncates.insert(table_id);
}
}
}

for (table_id, row_ptrs) in delete_tables {
match self.get_table_and_blob_store_mut(table_id) {
Ok((table, blob_store, ..)) => {
delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter())
}
Ok((table, blob_store, ..)) => delete_rows(
tx_data,
table_id,
table,
blob_store,
row_ptrs.len(),
row_ptrs.iter(),
truncates,
),
Err(_) if !row_ptrs.is_empty() => panic!("Deletion for non-existent table {table_id:?}... huh?"),
Err(_) => {}
}
Expand All @@ -681,13 +706,15 @@ impl CommittedState {
for change in pending_schema_changes {
if let PendingSchemaChange::TableRemoved(table_id, mut table) = change {
let row_ptrs = table.scan_all_row_ptrs();
truncates.insert(table_id);
Comment thread
Shubham8287 marked this conversation as resolved.
Outdated
delete_rows(
tx_data,
table_id,
&mut table,
&mut self.blob_store,
row_ptrs.len(),
row_ptrs.into_iter(),
truncates,
);
}
}
Expand All @@ -698,6 +725,7 @@ impl CommittedState {
tx_data: &mut TxData,
insert_tables: BTreeMap<TableId, Table>,
tx_blob_store: impl BlobStore,
truncates: &mut IntSet<TableId>,
) {
// TODO(perf): Consider moving whole pages from the `insert_tables` into the committed state,
// rather than copying individual rows out of them.
Expand Down Expand Up @@ -726,6 +754,11 @@ impl CommittedState {
if !inserts.is_empty() {
let table_name = &*commit_table.get_schema().table_name;
tx_data.set_inserts_for_table(table_id, table_name, inserts.into());

// if table has inserted rows, it cannot be truncated
if truncates.contains(&table_id) {
truncates.remove(&table_id);
}
}

let (schema, _indexes, pages) = tx_table.consume_for_merge();
Expand Down
10 changes: 5 additions & 5 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1231,7 +1231,7 @@ mod tests {
ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, ST_SEQUENCE_NAME,
ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME,
};
use crate::traits::{IsolationLevel, MutTx, TxTableTruncated};
use crate::traits::{IsolationLevel, MutTx};
use crate::Result;
use bsatn::to_vec;
use core::{fmt, mem};
Expand Down Expand Up @@ -3201,12 +3201,12 @@ mod tests {
// Now drop the table again and commit.
assert!(datastore.drop_table_mut_tx(&mut tx, table_id).is_ok());
let tx_data = commit(&datastore, tx)?;
let (_, truncated, deleted_rows) = tx_data
let (_, deleted_rows) = tx_data
.deletes()
.find(|(id, ..)| **id == table_id)
.expect("should have deleted rows for `table_id`");
assert_eq!(&**deleted_rows, [row]);
assert_eq!(truncated, TxTableTruncated::Yes);
assert!(tx_data.truncates().contains(&table_id), "table should be truncated");

// In the next transaction, the table doesn't exist.
assert!(
Expand Down Expand Up @@ -3410,9 +3410,9 @@ mod tests {
let to_product = |col: &ColumnSchema| value_serialize(&StColumnRow::from(col.clone())).into_product().unwrap();
let (_, inserts) = tx_data.inserts().find(|(id, _)| **id == ST_COLUMN_ID).unwrap();
assert_eq!(&**inserts, [to_product(&columns[1])].as_slice());
let (_, truncated, deletes) = tx_data.deletes().find(|(id, ..)| **id == ST_COLUMN_ID).unwrap();
let (_, deletes) = tx_data.deletes().find(|(id, ..)| **id == ST_COLUMN_ID).unwrap();
assert_eq!(&**deletes, [to_product(&columns_original[1])].as_slice());
assert_eq!(truncated, TxTableTruncated::No);
// assert_eq!(truncated, TxTableTruncated::No);

// Check that we can successfully scan using the new schema type post commit.
let tx = begin_tx(&datastore);
Expand Down
64 changes: 22 additions & 42 deletions crates/datastore/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::system_tables::ModuleKind;
use super::Result;
use crate::execution_context::{ReducerContext, Workload};
use crate::system_tables::ST_TABLE_ID;
use spacetimedb_data_structures::map::IntMap;
use spacetimedb_data_structures::map::{IntMap, IntSet};
use spacetimedb_durability::TxOffset;
use spacetimedb_lib::{hash_bytes, Identity};
use spacetimedb_primitives::*;
Expand Down Expand Up @@ -175,9 +175,13 @@ pub struct TxData {
/// The inserted rows per table.
inserts: BTreeMap<TableId, Arc<[ProductValue]>>,
/// The deleted rows per table.
deletes: BTreeMap<TableId, Arc<[ProductValue]>>,

/// The set of `TableId`s for tables that have been truncated.
///
/// Also stores per table whether it has been truncated.
deletes: BTreeMap<TableId, TxDeleteEntry>,
/// “Truncating” means that all rows in the table have been deleted, or the table
/// itself has been dropped
Comment thread
Shubham8287 marked this conversation as resolved.
Outdated
truncates: IntSet<TableId>,
/// Map of all `TableId`s in both `inserts` and `deletes` to their
/// corresponding table name.
// TODO: Store table name as ref counted string.
Expand All @@ -189,24 +193,6 @@ pub struct TxData {
tx_offset: Option<u64>,
}

/// A record of a list of deletes for and potential truncation of a table,
/// within a transaction.
pub struct TxDeleteEntry {
/// Were all rows previously in the table deleted within this transaction?
truncated: TxTableTruncated,
/// The deleted rows of the table.
rows: Arc<[ProductValue]>,
}

/// Whether a table was truncated in a transaction.
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub enum TxTableTruncated {
/// It was truncated.
Yes,
/// It was not truncated.
No,
}

impl TxData {
/// Set `tx_offset` as the expected on-disk transaction offset of this transaction.
pub fn set_tx_offset(&mut self, tx_offset: u64) {
Expand All @@ -231,23 +217,15 @@ impl TxData {
/// Set `rows` as the deleted rows for `(table_id, table_name)`.
///
/// When `truncated` is set, the table has been emptied in this transaction.
pub fn set_deletes_for_table(
&mut self,
table_id: TableId,
table_name: &str,
rows: Arc<[ProductValue]>,
truncated: bool,
) {
let truncated = if truncated {
TxTableTruncated::Yes
} else {
TxTableTruncated::No
};
let entry = TxDeleteEntry { truncated, rows };
self.deletes.insert(table_id, entry);
pub fn set_deletes_for_table(&mut self, table_id: TableId, table_name: &str, rows: Arc<[ProductValue]>) {
self.deletes.insert(table_id, rows);
self.tables.entry(table_id).or_insert_with(|| table_name.to_owned());
}

pub fn add_truncates(&mut self, truncated_tables: impl IntoIterator<Item = TableId>) {
self.truncates.extend(truncated_tables);
}

/// Obtain an iterator over the inserted rows per table.
pub fn inserts(&self) -> impl Iterator<Item = (&TableId, &Arc<[ProductValue]>)> + '_ {
self.inserts.iter()
Expand All @@ -273,31 +251,33 @@ impl TxData {
}

/// Obtain an iterator over the deleted rows per table.
pub fn deletes(&self) -> impl Iterator<Item = (&TableId, TxTableTruncated, &Arc<[ProductValue]>)> + '_ {
self.deletes
.iter()
.map(|(table_id, entry)| (table_id, entry.truncated, &entry.rows))
pub fn deletes(&self) -> impl Iterator<Item = (&TableId, &Arc<[ProductValue]>)> + '_ {
self.deletes.iter()
}

/// Get the `i`th deleted row for `table_id` if it exists
pub fn get_ith_delete(&self, table_id: TableId, i: usize) -> Option<&ProductValue> {
self.deletes.get(&table_id).and_then(|entry| entry.rows.get(i))
self.deletes.get(&table_id).and_then(|rows| rows.get(i))
}

/// Obtain an iterator over the inserted rows per table.
///
/// If you don't need access to the table name, [`Self::deletes`] is
/// slightly more efficient.
pub fn deletes_with_table_name(&self) -> impl Iterator<Item = (&TableId, &str, &Arc<[ProductValue]>)> + '_ {
self.deletes.iter().map(|(table_id, entry)| {
self.deletes.iter().map(|(table_id, rows)| {
let table_name = self
.tables
.get(table_id)
.expect("invalid `TxData`: partial table name mapping");
(table_id, table_name.as_str(), &entry.rows)
(table_id, table_name.as_str(), rows)
})
}

pub fn truncates(&self) -> impl Iterator<Item = TableId> + '_ {
self.truncates.iter().copied()
}

/// Check if this [`TxData`] contains any `inserted | deleted` rows or `connect/disconnect` operations.
///
/// This is used to determine if a transaction should be written to disk.
Expand Down