Skip to content

Commit 0e211f8

Browse files
committed
restructure TxData to store everything for a table together
1 parent 41bc988 commit 0e211f8

11 files changed

Lines changed: 233 additions & 241 deletions

File tree

crates/core/src/db/durability.rs

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,9 @@ use spacetimedb_commitlog::payload::{
66
txdata::{Mutations, Ops},
77
Txdata,
88
};
9-
use spacetimedb_data_structures::map::IntSet;
109
use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData};
1110
use spacetimedb_durability::{DurableOffset, TxOffset};
1211
use spacetimedb_lib::Identity;
13-
use spacetimedb_primitives::TableId;
1412
use tokio::{
1513
runtime,
1614
sync::{
@@ -208,32 +206,17 @@ impl DurabilityWorkerActor {
208206
return;
209207
}
210208

211-
let is_persistent_table = |table_id: &TableId| -> bool { !tx_data.is_ephemeral_table(table_id) };
212-
213209
let inserts: Box<_> = tx_data
214-
.inserts()
215-
// Skip ephemeral tables
216-
.filter(|(table_id, _)| is_persistent_table(table_id))
217-
.map(|(table_id, rowdata)| Ops {
218-
table_id: *table_id,
219-
rowdata: rowdata.clone(),
220-
})
210+
.persistent_inserts()
211+
.map(|(table_id, rowdata)| Ops { table_id, rowdata })
221212
.collect();
222213

223-
let truncates: IntSet<TableId> = tx_data.truncates().collect();
224-
225214
let deletes: Box<_> = tx_data
226-
.deletes()
227-
.filter(|(table_id, _)| is_persistent_table(table_id))
228-
.map(|(table_id, rowdata)| Ops {
229-
table_id: *table_id,
230-
rowdata: rowdata.clone(),
231-
})
232-
// filter out deletes for tables that are truncated in the same transaction.
233-
.filter(|ops| !truncates.contains(&ops.table_id))
215+
.persistent_deletes()
216+
.map(|(table_id, rowdata)| Ops { table_id, rowdata })
234217
.collect();
235218

236-
let truncates: Box<_> = truncates.into_iter().filter(is_persistent_table).collect();
219+
let truncates: Box<[_]> = tx_data.persistent_truncates().collect();
237220

238221
let inputs = reducer_context.map(|rcx| rcx.into());
239222

crates/core/src/host/module_host.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,17 @@ use spacetimedb_data_structures::error_stream::ErrorStream;
4747
use spacetimedb_datastore::error::DatastoreError;
4848
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
4949
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo};
50-
use spacetimedb_datastore::traits::{DatabaseTableUpdate, IsolationLevel, Program, TxData};
50+
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
5151
use spacetimedb_durability::DurableOffset;
5252
use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject};
5353
use spacetimedb_expr::expr::CollectViews;
5454
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
5555
use spacetimedb_lib::identity::{AuthCtx, RequestId};
5656
use spacetimedb_lib::metrics::ExecutionMetrics;
57-
use spacetimedb_lib::Timestamp;
58-
use spacetimedb_lib::{AlgebraicType, ConnectionId};
57+
use spacetimedb_lib::{ConnectionId, Timestamp};
5958
use spacetimedb_primitives::{ArgId, ProcedureId, TableId, ViewFnPtr, ViewId};
6059
use spacetimedb_query::compile_subscription;
61-
use spacetimedb_sats::AlgebraicTypeRef;
60+
use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, ProductValue};
6261
use spacetimedb_schema::auto_migrate::{AutoMigrateError, MigrationPolicy};
6362
use spacetimedb_schema::def::{ModuleDef, ProcedureDef, ReducerDef, TableDef, ViewDef};
6463
use spacetimedb_schema::reducer_name::ReducerName;
@@ -95,9 +94,14 @@ impl DatabaseUpdate {
9594
}
9695

9796
pub fn from_writes(tx_data: &TxData) -> Self {
98-
let updates = tx_data.database_table_updates();
99-
let mut tables = SmallVec::with_capacity(updates.len());
100-
tables.extend(updates);
97+
let entries = tx_data.iter_table_entries();
98+
let mut tables = SmallVec::with_capacity(entries.len());
99+
tables.extend(entries.map(|(table_id, e)| DatabaseTableUpdate {
100+
table_id,
101+
table_name: e.table_name.clone(),
102+
inserts: e.inserts.clone(),
103+
deletes: e.deletes.clone(),
104+
}));
101105
DatabaseUpdate { tables }
102106
}
103107

@@ -107,6 +111,17 @@ impl DatabaseUpdate {
107111
}
108112
}
109113

114+
#[derive(Debug, Clone, PartialEq, Eq)]
115+
pub struct DatabaseTableUpdate {
116+
pub table_id: TableId,
117+
pub table_name: TableName,
118+
// Note: `Arc<[ProductValue]>` allows to cheaply
119+
// use the values from `TxData` without cloning the
120+
// contained `ProductValue`s.
121+
pub inserts: Arc<[ProductValue]>,
122+
pub deletes: Arc<[ProductValue]>,
123+
}
124+
110125
#[derive(Debug)]
111126
pub struct DatabaseUpdateRelValue<'a> {
112127
pub tables: Vec<DatabaseTableUpdateRelValue<'a>>,

crates/core/src/sql/execute.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use crate::energy::EnergyQuanta;
77
use crate::error::DBError;
88
use crate::estimation::estimate_rows_scanned;
99
use crate::host::module_host::{
10-
DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, RefInstance, ViewCallError, ViewCallResult,
11-
ViewOutcome, WasmInstance,
10+
DatabaseTableUpdate, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, RefInstance, ViewCallError,
11+
ViewCallResult, ViewOutcome, WasmInstance,
1212
};
1313
use crate::host::{ArgsTuple, ModuleHost};
1414
use crate::subscription::module_subscription_actor::{commit_and_broadcast_event, ModuleSubscriptions};
@@ -20,7 +20,7 @@ use anyhow::anyhow;
2020
use smallvec::SmallVec;
2121
use spacetimedb_datastore::execution_context::Workload;
2222
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
23-
use spacetimedb_datastore::traits::{DatabaseTableUpdate, IsolationLevel};
23+
use spacetimedb_datastore::traits::IsolationLevel;
2424
use spacetimedb_expr::statement::Statement;
2525
use spacetimedb_lib::identity::AuthCtx;
2626
use spacetimedb_lib::metrics::ExecutionMetrics;

crates/core/src/subscription/execution_unit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ use super::subscription::{IncrementalJoin, SupportedQuery};
33
use crate::db::relational_db::{RelationalDB, Tx};
44
use crate::error::DBError;
55
use crate::estimation;
6+
use crate::host::module_host::DatabaseTableUpdate;
67
use crate::host::module_host::{DatabaseTableUpdateRelValue, UpdatesRelValue};
78
use crate::messages::websocket::TableUpdate;
89
use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource};
910
use crate::util::slow::SlowQueryLogger;
1011
use crate::vm::{build_query, TxMode};
1112
use spacetimedb_client_api_messages::websocket::{Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate};
1213
use spacetimedb_datastore::locking_tx_datastore::TxId;
13-
use spacetimedb_datastore::traits::DatabaseTableUpdate;
1414
use spacetimedb_lib::identity::AuthCtx;
1515
use spacetimedb_lib::Identity;
1616
use spacetimedb_primitives::TableId;

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::client::messages::{
66
};
77
use crate::client::{ClientConnectionSender, Protocol};
88
use crate::error::DBError;
9-
use crate::host::module_host::{ModuleEvent, UpdatesRelValue};
9+
use crate::host::module_host::{DatabaseTableUpdate, ModuleEvent, UpdatesRelValue};
1010
use crate::messages::websocket::{self as ws, TableUpdate};
1111
use crate::subscription::delta::eval_delta;
1212
use crate::subscription::row_list_builder_pool::{BsatnRowListBuilderPool, JsonRowListBuilderFakePool};
@@ -24,7 +24,6 @@ use spacetimedb_data_structures::map::{
2424
HashMap, HashSet, IntMap,
2525
};
2626
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
27-
use spacetimedb_datastore::traits::DatabaseTableUpdate;
2827
use spacetimedb_durability::TxOffset;
2928
use spacetimedb_expr::expr::CollectViews;
3029
use spacetimedb_lib::metrics::ExecutionMetrics;

crates/core/src/subscription/query.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ mod tests {
162162
use crate::vm::tests::create_table_with_rows;
163163
use crate::vm::DbProgram;
164164
use itertools::Itertools;
165+
use smallvec::SmallVec;
165166
use spacetimedb_client_api_messages::websocket::{BsatnFormat, CompressableQueryUpdate, Compression};
166167
use spacetimedb_datastore::execution_context::Workload;
167168
use spacetimedb_lib::bsatn;
@@ -196,7 +197,7 @@ mod tests {
196197
let q = Expr::Crud(Box::new(CrudExpr::Query(query.clone())));
197198

198199
let mut result = Vec::with_capacity(1);
199-
let mut updates = Vec::new();
200+
let mut updates = SmallVec::new();
200201
collect_result(&mut result, &mut updates, run_ast(p, q, sources).into())?;
201202
Ok(result)
202203
}
@@ -446,12 +447,13 @@ mod tests {
446447
}
447448

448449
let update = DatabaseUpdate {
449-
tables: vec![DatabaseTableUpdate {
450+
tables: [DatabaseTableUpdate {
450451
table_id,
451452
table_name: TableName::new_from_str("test"),
452453
deletes: deletes.into(),
453454
inserts: [].into(),
454-
}],
455+
}]
456+
.into(),
455457
};
456458

457459
db.commit_tx(tx)?;
@@ -536,7 +538,7 @@ mod tests {
536538
};
537539

538540
let update = DatabaseUpdate {
539-
tables: vec![data.clone()],
541+
tables: [data.clone()].into(),
540542
};
541543

542544
check_query_incr(&db, &tx, &s, &update, 1, &[row])?;
@@ -657,7 +659,7 @@ mod tests {
657659
};
658660

659661
let update = DatabaseUpdate {
660-
tables: vec![data1, data2],
662+
tables: smallvec::smallvec![data1, data2],
661663
};
662664

663665
let row_1 = product!(1u64, "health");
@@ -1161,9 +1163,9 @@ mod tests {
11611163
.collect::<Arc<_>>();
11621164

11631165
let tables = if inserts.is_empty() && deletes.is_empty() {
1164-
vec![]
1166+
smallvec::smallvec![]
11651167
} else {
1166-
vec![DatabaseTableUpdate {
1168+
smallvec::smallvec![DatabaseTableUpdate {
11671169
table_id,
11681170
table_name,
11691171
inserts,

crates/core/src/subscription/subscription.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use super::module_subscription_manager::Plan;
2525
use super::query;
2626
use crate::db::relational_db::{RelationalDB, Tx};
2727
use crate::error::{DBError, SubscriptionError};
28-
use crate::host::module_host::{DatabaseUpdateRelValue, UpdatesRelValue};
28+
use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdateRelValue, UpdatesRelValue};
2929
use crate::messages::websocket as ws;
3030
use crate::sql::ast::SchemaViewer;
3131
use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource};
@@ -36,7 +36,6 @@ use spacetimedb_client_api_messages::websocket::Compression;
3636
use spacetimedb_data_structures::map::HashSet;
3737
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
3838
use spacetimedb_datastore::locking_tx_datastore::TxId;
39-
use spacetimedb_datastore::traits::DatabaseTableUpdate;
4039
use spacetimedb_lib::db::auth::StTableType;
4140
use spacetimedb_lib::identity::AuthCtx;
4241
use spacetimedb_primitives::TableId;

crates/core/src/subscription/tx.rs

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -46,21 +46,21 @@ impl DeltaTableIndexes {
4646
fn build_indexes_for_rows<'a>(
4747
tx: &'a TxId,
4848
meta: &'a QueriedTableIndexIds,
49-
rows: impl Iterator<Item = (&'a TableId, &'a Arc<[ProductValue]>)>,
49+
rows: impl Iterator<Item = (TableId, &'a Arc<[ProductValue]>)>,
5050
) -> HashMap<(TableId, IndexId), DeltaTableIndex> {
5151
let mut indexes: HashMap<(TableId, IndexId), DeltaTableIndex> = HashMap::new();
5252
for (table_id, rows) in rows {
53-
if let Some(schema) = tx.get_schema(*table_id) {
53+
if let Some(schema) = tx.get_schema(table_id) {
5454
// Fetch the column ids for each index
5555
let mut cols_for_index = vec![];
56-
for index_id in meta.index_ids_for_table(*table_id) {
56+
for index_id in meta.index_ids_for_table(table_id) {
5757
cols_for_index.push((index_id, schema.col_list_for_index_id(index_id)));
5858
}
5959
for (i, row) in rows.iter().enumerate() {
6060
for (index_id, col_list) in &cols_for_index {
6161
if !col_list.is_empty() {
6262
indexes
63-
.entry((*table_id, *index_id))
63+
.entry((table_id, *index_id))
6464
.or_default()
6565
.entry(row.project(col_list).unwrap())
6666
.or_default()
@@ -161,38 +161,24 @@ impl Datastore for DeltaTx<'_> {
161161
impl DeltaStore for DeltaTx<'_> {
162162
fn num_inserts(&self, table_id: TableId) -> usize {
163163
self.data
164-
.and_then(|data| {
165-
data.inserts()
166-
.find(|(id, ..)| **id == table_id)
167-
.map(|(.., rows)| rows.len())
168-
})
164+
.and_then(|data| data.inserts_for_table(table_id).map(|rows| rows.len()))
169165
.unwrap_or_default()
170166
}
171167

172168
fn num_deletes(&self, table_id: TableId) -> usize {
173169
self.data
174-
.and_then(|data| {
175-
data.deletes()
176-
.find(|(id, ..)| **id == table_id)
177-
.map(|(.., rows)| rows.len())
178-
})
170+
.and_then(|data| data.deletes_for_table(table_id).map(|rows| rows.len()))
179171
.unwrap_or_default()
180172
}
181173

182174
fn inserts_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
183-
self.data.and_then(|data| {
184-
data.inserts()
185-
.find(|(id, ..)| **id == table_id)
186-
.map(|(.., rows)| rows.iter())
187-
})
175+
self.data
176+
.and_then(|data| data.inserts_for_table(table_id).map(|rows| rows.iter()))
188177
}
189178

190179
fn deletes_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
191-
self.data.and_then(|data| {
192-
data.deletes()
193-
.find(|(id, ..)| **id == table_id)
194-
.map(|(.., rows)| rows.iter())
195-
})
180+
self.data
181+
.and_then(|data| data.deletes_for_table(table_id).map(|rows| rows.iter()))
196182
}
197183

198184
fn index_scan_range_for_delta(

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1061,7 +1061,7 @@ impl CommittedState {
10611061
);
10621062

10631063
// Record any truncated tables in the `TxData`.
1064-
tx_data.add_truncates(truncates);
1064+
tx_data.set_truncates(truncates);
10651065

10661066
// Merge read sets from the `MutTxId` into the `CommittedState`.
10671067
// It's important that this happens after applying the changes to `tx_data`,

0 commit comments

Comments
 (0)