Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use futures::channel::mpsc;
use futures::StreamExt;
use parking_lot::RwLock;
use spacetimedb_commitlog as commitlog;
use spacetimedb_data_structures::map::IntSet;
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::error::{DatastoreError, TableError};
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
Expand Down Expand Up @@ -889,12 +890,17 @@ impl RelationalDB {
rowdata: rowdata.clone(),
})
.collect();

let truncates: IntSet<TableId> = tx_data.truncates().collect();

let deletes: Box<_> = tx_data
.deletes()
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
})
// filter out deletes for tables that are truncated in the same transaction.
.filter(|ops| !truncates.contains(&ops.table_id))
.collect();

let inputs = reducer_context.map(|rcx| rcx.into());
Expand All @@ -905,7 +911,7 @@ impl RelationalDB {
mutations: Some(Mutations {
inserts,
deletes,
truncates: [].into(),
truncates: truncates.into_iter().collect(),
}),
};

Expand Down Expand Up @@ -1427,13 +1433,9 @@ impl RelationalDB {
}

/// Clear all rows from a table without dropping it.
pub fn clear_table(&self, tx: &mut MutTx, table_id: TableId) -> Result<(), DBError> {
let relation = self
.iter_mut(tx, table_id)?
.map(|row_ref| row_ref.pointer())
.collect::<Vec<_>>();
self.delete(tx, table_id, relation);
Ok(())
pub fn clear_table(&self, tx: &mut MutTx, table_id: TableId) -> Result<usize, DBError> {
let rows_deleted = tx.clear_table(table_id)?;
Ok(rows_deleted)
}

pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result<SequenceId, DBError> {
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,8 @@ impl ModuleHost {
let workload = Workload::Internal;
stdb.with_auto_commit(workload, |mut_tx| {
stdb.clear_table(mut_tx, ST_CONNECTION_CREDENTIALS_ID)?;
stdb.clear_table(mut_tx, ST_CLIENT_ID)
stdb.clear_table(mut_tx, ST_CLIENT_ID)?;
Ok::<(), DBError>(())
})
})
.await?
Expand Down
16 changes: 8 additions & 8 deletions crates/core/src/subscription/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ impl DeltaStore for DeltaTx<'_> {
self.data
.and_then(|data| {
data.inserts()
.find(|(id, _)| **id == table_id)
.map(|(_, rows)| rows.len())
.find(|(id, ..)| **id == table_id)
.map(|(.., rows)| rows.len())
})
.unwrap_or_default()
}
Expand All @@ -143,25 +143,25 @@ impl DeltaStore for DeltaTx<'_> {
self.data
.and_then(|data| {
data.deletes()
.find(|(id, _)| **id == table_id)
.map(|(_, rows)| rows.len())
.find(|(id, ..)| **id == table_id)
.map(|(.., rows)| rows.len())
})
.unwrap_or_default()
}

fn inserts_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
self.data.and_then(|data| {
data.inserts()
.find(|(id, _)| **id == table_id)
.map(|(_, rows)| rows.iter())
.find(|(id, ..)| **id == table_id)
.map(|(.., rows)| rows.iter())
})
}

fn deletes_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
self.data.and_then(|data| {
data.deletes()
.find(|(id, _)| **id == table_id)
.map(|(_, rows)| rows.iter())
.find(|(id, ..)| **id == table_id)
.map(|(.., rows)| rows.iter())
})
}

Expand Down
Loading
Loading