Skip to content

Commit 1e50c7d

Browse files
Aggregate and broadcast DbUpdates off the main thread (#2793)
Co-authored-by: joshua-spacetime <josh@clockworklabs.io>
1 parent ac18790 commit 1e50c7d

14 files changed

Lines changed: 505 additions & 175 deletions

File tree

crates/client-api-messages/src/websocket.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -622,13 +622,19 @@ pub struct TableUpdate<F: WebsocketFormat> {
622622
pub updates: SmallVec<[F::QueryUpdate; 1]>,
623623
}
624624

625+
/// Computed update for a single query, annotated with the number of matching rows.
626+
pub struct SingleQueryUpdate<F: WebsocketFormat> {
627+
pub update: F::QueryUpdate,
628+
pub num_rows: u64,
629+
}
630+
625631
impl<F: WebsocketFormat> TableUpdate<F> {
626-
pub fn new(table_id: TableId, table_name: Box<str>, (update, num_rows): (F::QueryUpdate, u64)) -> Self {
632+
pub fn new(table_id: TableId, table_name: Box<str>, update: SingleQueryUpdate<F>) -> Self {
627633
Self {
628634
table_id,
629635
table_name,
630-
num_rows,
631-
updates: [update].into(),
636+
num_rows: update.num_rows,
637+
updates: [update.update].into(),
632638
}
633639
}
634640

@@ -641,9 +647,9 @@ impl<F: WebsocketFormat> TableUpdate<F> {
641647
}
642648
}
643649

644-
pub fn push(&mut self, (update, num_rows): (F::QueryUpdate, u64)) {
645-
self.updates.push(update);
646-
self.num_rows += num_rows;
650+
pub fn push(&mut self, update: SingleQueryUpdate<F>) {
651+
self.updates.push(update.update);
652+
self.num_rows += update.num_rows;
647653
}
648654

649655
pub fn num_rows(&self) -> usize {

crates/core/src/host/host_controller.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::messages::control_db::{Database, HostType};
1313
use crate::module_host_context::ModuleCreationContext;
1414
use crate::replica_context::ReplicaContext;
1515
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
16+
use crate::subscription::module_subscription_manager::SubscriptionManager;
1617
use crate::util::{asyncify, spawn_rayon};
1718
use crate::worker_metrics::WORKER_METRICS;
1819
use anyhow::{anyhow, ensure, Context};
@@ -525,7 +526,9 @@ async fn make_replica_ctx(
525526
relational_db: Arc<RelationalDB>,
526527
) -> anyhow::Result<ReplicaContext> {
527528
let logger = tokio::task::block_in_place(move || Arc::new(DatabaseLogger::open_today(path.module_logs())));
528-
let subscriptions = <_>::default();
529+
let subscriptions = Arc::new(parking_lot::RwLock::new(SubscriptionManager::for_database(
530+
database.database_identity,
531+
)));
529532
let downgraded = Arc::downgrade(&subscriptions);
530533
let subscriptions = ModuleSubscriptions::new(relational_db.clone(), subscriptions, database.owner_identity);
531534

crates/core/src/host/instance_env.rs

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -532,38 +532,41 @@ mod test {
532532
Ok(DatabaseLogger::open(path))
533533
}
534534

535-
/// An `InstanceEnv` requires `ModuleSubscriptions`
536-
fn subscription_actor(relational_db: Arc<RelationalDB>) -> ModuleSubscriptions {
537-
ModuleSubscriptions::new(relational_db, <_>::default(), Identity::ZERO)
538-
}
539-
540535
/// An `InstanceEnv` requires a `ReplicaContext`.
541536
/// For our purposes this is just a wrapper for `RelationalDB`.
542-
fn replica_ctx(relational_db: Arc<RelationalDB>) -> Result<ReplicaContext> {
543-
Ok(ReplicaContext {
544-
database: Database {
545-
id: 0,
546-
database_identity: Identity::ZERO,
547-
owner_identity: Identity::ZERO,
548-
host_type: HostType::Wasm,
549-
initial_program: Hash::ZERO,
537+
fn replica_ctx(relational_db: Arc<RelationalDB>) -> Result<(ReplicaContext, tokio::runtime::Runtime)> {
538+
let (subs, runtime) = ModuleSubscriptions::for_test_new_runtime(relational_db.clone());
539+
Ok((
540+
ReplicaContext {
541+
database: Database {
542+
id: 0,
543+
database_identity: Identity::ZERO,
544+
owner_identity: Identity::ZERO,
545+
host_type: HostType::Wasm,
546+
initial_program: Hash::ZERO,
547+
},
548+
replica_id: 0,
549+
logger: Arc::new(temp_logger()?),
550+
subscriptions: subs,
551+
relational_db,
550552
},
551-
replica_id: 0,
552-
logger: Arc::new(temp_logger()?),
553-
subscriptions: subscription_actor(relational_db.clone()),
554-
relational_db,
555-
})
553+
runtime,
554+
))
556555
}
557556

558557
/// An `InstanceEnv` used for testing the database syscalls.
559-
fn instance_env(db: Arc<RelationalDB>) -> Result<InstanceEnv> {
558+
fn instance_env(db: Arc<RelationalDB>) -> Result<(InstanceEnv, tokio::runtime::Runtime)> {
560559
let (scheduler, _) = Scheduler::open(db.clone());
561-
Ok(InstanceEnv {
562-
replica_ctx: Arc::new(replica_ctx(db)?),
563-
scheduler,
564-
tx: TxSlot::default(),
565-
start_time: Timestamp::now(),
566-
})
560+
let (replica_context, runtime) = replica_ctx(db)?;
561+
Ok((
562+
InstanceEnv {
563+
replica_ctx: Arc::new(replica_context),
564+
scheduler,
565+
tx: TxSlot::default(),
566+
start_time: Timestamp::now(),
567+
},
568+
runtime,
569+
))
567570
}
568571

569572
/// An in-memory `RelationalDB` for testing.
@@ -662,7 +665,7 @@ mod test {
662665
#[test]
663666
fn table_scan_metrics() -> Result<()> {
664667
let db = relational_db()?;
665-
let env = instance_env(db.clone())?;
668+
let (env, _runtime) = instance_env(db.clone())?;
666669

667670
let (table_id, _) = create_table_with_index(&db)?;
668671

@@ -694,7 +697,7 @@ mod test {
694697
#[test]
695698
fn index_scan_metrics() -> Result<()> {
696699
let db = relational_db()?;
697-
let env = instance_env(db.clone())?;
700+
let (env, _runtime) = instance_env(db.clone())?;
698701

699702
let (_, index_id) = create_table_with_index(&db)?;
700703

@@ -746,7 +749,7 @@ mod test {
746749
#[test]
747750
fn insert_metrics() -> Result<()> {
748751
let db = relational_db()?;
749-
let env = instance_env(db.clone())?;
752+
let (env, _runtime) = instance_env(db.clone())?;
750753

751754
let (table_id, _) = create_table_with_index(&db)?;
752755

@@ -783,7 +786,7 @@ mod test {
783786
#[test]
784787
fn update_metrics() -> Result<()> {
785788
let db = relational_db()?;
786-
let env = instance_env(db.clone())?;
789+
let (env, _runtime) = instance_env(db.clone())?;
787790

788791
let (table_id, index_id) = create_table_with_unique_index(&db)?;
789792

@@ -810,7 +813,7 @@ mod test {
810813
#[test]
811814
fn delete_by_index_metrics() -> Result<()> {
812815
let db = relational_db()?;
813-
let env = instance_env(db.clone())?;
816+
let (env, _runtime) = instance_env(db.clone())?;
814817

815818
let (_, index_id) = create_table_with_index(&db)?;
816819

@@ -838,7 +841,7 @@ mod test {
838841
#[test]
839842
fn delete_by_value_metrics() -> Result<()> {
840843
let db = relational_db()?;
841-
let env = instance_env(db.clone())?;
844+
let (env, _runtime) = instance_env(db.clone())?;
842845

843846
let (table_id, _) = create_table_with_index(&db)?;
844847

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
510510
let (event, _) = match self
511511
.info
512512
.subscriptions
513-
.commit_and_broadcast_event(client.as_deref(), event, tx)
513+
.commit_and_broadcast_event(client, event, tx)
514514
.unwrap()
515515
{
516516
Ok(ev) => ev,

crates/core/src/sql/execute.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,8 @@ pub fn translate_col(tx: &Tx, field: FieldName) -> Option<Box<str>> {
301301

302302
#[cfg(test)]
303303
pub(crate) mod tests {
304+
use std::sync::Arc;
305+
304306
use super::*;
305307
use crate::db::datastore::system_tables::{
306308
StRowLevelSecurityRow, StTableFields, ST_ROW_LEVEL_SECURITY_ID, ST_TABLE_ID, ST_TABLE_NAME,
@@ -317,20 +319,19 @@ pub(crate) mod tests {
317319
use spacetimedb_primitives::{col_list, ColId, TableId};
318320
use spacetimedb_sats::{product, AlgebraicType, ArrayValue, ProductType};
319321
use spacetimedb_vm::eval::test_helpers::create_game_data;
320-
use std::sync::Arc;
321322

322323
pub(crate) fn execute_for_testing(
323324
db: &RelationalDB,
324325
sql_text: &str,
325326
q: Vec<CrudExpr>,
326327
) -> Result<Vec<MemTable>, DBError> {
327-
let subs = ModuleSubscriptions::new(Arc::new(db.clone()), <_>::default(), Identity::ZERO);
328+
let (subs, _runtime) = ModuleSubscriptions::for_test_new_runtime(Arc::new(db.clone()));
328329
execute_sql(db, sql_text, q, AuthCtx::for_testing(), Some(&subs))
329330
}
330331

331332
/// Short-cut for simplify test execution
332333
pub(crate) fn run_for_testing(db: &RelationalDB, sql_text: &str) -> Result<Vec<ProductValue>, DBError> {
333-
let subs = ModuleSubscriptions::new(Arc::new(db.clone()), <_>::default(), Identity::ZERO);
334+
let (subs, _runtime) = ModuleSubscriptions::for_test_new_runtime(Arc::new(db.clone()));
334335
run(db, sql_text, AuthCtx::for_testing(), Some(&subs), &mut vec![]).map(|x| x.rows)
335336
}
336337

crates/core/src/subscription/execution_unit.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ use crate::host::module_host::{DatabaseTableUpdate, DatabaseTableUpdateRelValue,
88
use crate::messages::websocket::TableUpdate;
99
use crate::util::slow::SlowQueryLogger;
1010
use crate::vm::{build_query, TxMode};
11-
use spacetimedb_client_api_messages::websocket::{Compression, QueryUpdate, RowListLen as _, WebsocketFormat};
11+
use spacetimedb_client_api_messages::websocket::{
12+
Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate, WebsocketFormat,
13+
};
1214
use spacetimedb_lib::db::error::AuthError;
1315
use spacetimedb_lib::relation::DbTable;
1416
use spacetimedb_lib::{Identity, ProductValue};
@@ -254,7 +256,11 @@ impl ExecutionUnit {
254256
let deletes = F::List::default();
255257
let qu = QueryUpdate { deletes, inserts };
256258
let update = F::into_query_update(qu, compression);
257-
TableUpdate::new(self.return_table(), self.return_name(), (update, num_rows))
259+
TableUpdate::new(
260+
self.return_table(),
261+
self.return_name(),
262+
SingleQueryUpdate { update, num_rows },
263+
)
258264
})
259265
}
260266

crates/core/src/subscription/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use module_subscription_manager::Plan;
55
use prometheus::IntCounter;
66
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
77
use spacetimedb_client_api_messages::websocket::{
8-
ByteListLen, Compression, DatabaseUpdate, QueryUpdate, TableUpdate, WebsocketFormat,
8+
ByteListLen, Compression, DatabaseUpdate, QueryUpdate, SingleQueryUpdate, TableUpdate, WebsocketFormat,
99
};
1010
use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore};
1111
use spacetimedb_lib::{metrics::ExecutionMetrics, Identity};
@@ -151,7 +151,10 @@ where
151151
// after we release the tx lock.
152152
// There's no need to compress the inner table update too.
153153
let update = F::into_query_update(qu, Compression::None);
154-
(TableUpdate::new(table_id, table_name, (update, num_rows)), metrics)
154+
(
155+
TableUpdate::new(table_id, table_name, SingleQueryUpdate { update, num_rows }),
156+
metrics,
157+
)
155158
})
156159
}
157160

@@ -180,7 +183,7 @@ where
180183
.clone()
181184
.optimize()
182185
.map(|plan| (sql, PipelinedProject::from(plan)))
183-
.and_then(|(_, plan)| collect_table_update(&[plan], table_id, table_name.into(), tx, update_type))
186+
.and_then(|(_, plan)| collect_table_update(&[plan], table_id, (&**table_name).into(), tx, update_type))
184187
.map_err(|err| DBError::WithSql {
185188
sql: sql.into(),
186189
error: Box::new(DBError::Other(err)),

0 commit comments

Comments
 (0)