From 7e3008040e1e28387ea3465d21c1c189779d6a12 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Tue, 23 Sep 2025 15:17:58 +0530 Subject: [PATCH 01/11] update: disconnect clients --- crates/core/src/db/update.rs | 23 ++++++++++++++++--- .../src/host/wasm_common/module_host_actor.rs | 9 +++++++- .../subscription/module_subscription_actor.rs | 5 ++++ .../module_subscription_manager.rs | 6 +++++ 4 files changed, 39 insertions(+), 4 deletions(-) diff --git a/crates/core/src/db/update.rs b/crates/core/src/db/update.rs index fa3ea5bdc31..92be4d5643d 100644 --- a/crates/core/src/db/update.rs +++ b/crates/core/src/db/update.rs @@ -1,6 +1,7 @@ use super::relational_db::RelationalDB; use crate::database_logger::SystemLogger; use crate::sql::parser::RowLevelExpr; +use crate::subscription::module_subscription_actor::ModuleSubscriptions; use spacetimedb_data_structures::map::HashMap; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_lib::db::auth::StTableType; @@ -35,6 +36,7 @@ impl UpdateLogger for SystemLogger { // drop_* become transactional. pub fn update_database( stdb: &RelationalDB, + subscriptions: &ModuleSubscriptions, tx: &mut MutTxId, auth_ctx: AuthCtx, plan: MigratePlan, @@ -57,7 +59,9 @@ pub fn update_database( match plan { MigratePlan::Manual(plan) => manual_migrate_database(stdb, tx, plan, logger, existing_tables), - MigratePlan::Auto(plan) => auto_migrate_database(stdb, tx, auth_ctx, plan, logger, existing_tables), + MigratePlan::Auto(plan) => { + auto_migrate_database(stdb, subscriptions, tx, auth_ctx, plan, logger, existing_tables) + } } } @@ -83,6 +87,7 @@ macro_rules! log { /// Automatically migrate a database. fn auto_migrate_database( stdb: &RelationalDB, + subscriptions: &ModuleSubscriptions, tx: &mut MutTxId, auth_ctx: AuthCtx, plan: AutoMigratePlan, @@ -264,7 +269,12 @@ fn auto_migrate_database( .collect(); stdb.add_columns_to_table(tx, table_id, column_schemas, default_values)?; } - _ => anyhow::bail!("migration step not implemented: {step:?}"), + spacetimedb_schema::auto_migrate::AutoMigrateStep::DisconnectAllUsers => { + // Disconnect all clients from subscriptions. + // Any dangling clients will be handled during the launch of module hosts, + // which invokes `ModuleHost::call_identity_disconnected`. + subscriptions.remove_all_subscribers(); + } } } @@ -349,7 +359,14 @@ mod test { // Try to update the db. let mut tx = begin_mut_tx(&stdb); let plan = ponder_migrate(&old, &new)?; - update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)?; + update_database( + &stdb, + &ModuleSubscriptions::for_test_new_runtime(Arc::new(stdb.db.clone())).0, + &mut tx, + auth_ctx, + plan, + &TestLogger, + )?; // Expect the schema change. let idx_b_id = stdb diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index db5a26eb86a..724cc78fa02 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -306,7 +306,14 @@ impl InstanceCommon { system_logger.info(&format!("Updated program to {program_hash}")); let auth_ctx = AuthCtx::for_current(replica_ctx.database.owner_identity); - let res = crate::db::update::update_database(stdb, &mut tx, auth_ctx, plan, system_logger); + let res = crate::db::update::update_database( + stdb, + &replica_ctx.subscriptions, + &mut tx, + auth_ctx, + plan, + system_logger, + ); match res { Err(e) => { diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 4df0ff9306c..13b5544fc51 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -853,6 +853,11 @@ impl ModuleSubscriptions { subscriptions.remove_all_subscriptions(&(client_id.identity, client_id.connection_id)); } + pub fn remove_all_subscribers(&self) { + let mut subscriptions = self.subscriptions.write(); + subscriptions.remove_all_clients(); + } + /// Commit a transaction and broadcast its ModuleEvent to all interested subscribers. /// /// The returned [`ExecutionMetrics`] are reported in this method via `report_tx_metrics`. diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index eab10f22aa9..0079074ce0a 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -779,6 +779,12 @@ impl SubscriptionManager { } } + pub fn remove_all_clients(&mut self) { + for id in self.clients.keys().copied().collect::>() { + self.remove_all_subscriptions(&id); + } + } + /// Remove a single subscription for a client. /// This will return an error if the client does not have a subscription with the given query id. pub fn remove_subscription(&mut self, client_id: ClientId, query_id: ClientQueryId) -> Result, DBError> { From 49cfb51a7c74beae09715ab2c831494427d987f5 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Wed, 24 Sep 2025 18:06:13 +0530 Subject: [PATCH 02/11] disconnect client by dropping module --- crates/client-api/src/routes/database.rs | 4 +- crates/core/src/db/update.rs | 40 +++++++++---------- crates/core/src/host/host_controller.rs | 13 ++++-- crates/core/src/host/module_host.rs | 10 ++++- .../src/host/wasm_common/module_host_actor.rs | 18 ++++----- .../subscription/module_subscription_actor.rs | 5 --- .../module_subscription_manager.rs | 6 --- 7 files changed, 48 insertions(+), 48 deletions(-) diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 5ff5102d355..5e01f43a6fc 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -664,7 +664,9 @@ pub async fn publish( ) .into()); } - UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {} + UpdateDatabaseResult::NoUpdateNeeded + | UpdateDatabaseResult::UpdatePerformed + | UpdateDatabaseResult::UpdatePerformedWithClientDisconnect => {} } } diff --git a/crates/core/src/db/update.rs b/crates/core/src/db/update.rs index 92be4d5643d..ea97381f523 100644 --- a/crates/core/src/db/update.rs +++ b/crates/core/src/db/update.rs @@ -1,7 +1,6 @@ use super::relational_db::RelationalDB; use crate::database_logger::SystemLogger; use crate::sql::parser::RowLevelExpr; -use crate::subscription::module_subscription_actor::ModuleSubscriptions; use spacetimedb_data_structures::map::HashMap; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_lib::db::auth::StTableType; @@ -24,6 +23,13 @@ impl UpdateLogger for SystemLogger { } } +/// The result of a database update. +/// Indicates whether clients should be disconnected when the update is complete. +pub enum UpdateResult { + Success, + RequiresClientDisconnect, +} + /// Update the database according to the migration plan. /// /// The update is performed within the transactional context `tx`. @@ -36,12 +42,11 @@ impl UpdateLogger for SystemLogger { // drop_* become transactional. pub fn update_database( stdb: &RelationalDB, - subscriptions: &ModuleSubscriptions, tx: &mut MutTxId, auth_ctx: AuthCtx, plan: MigratePlan, logger: &dyn UpdateLogger, -) -> anyhow::Result<()> { +) -> anyhow::Result { let existing_tables = stdb.get_all_tables_mut(tx)?; // TODO: consider using `ErrorStream` here. @@ -59,9 +64,7 @@ pub fn update_database( match plan { MigratePlan::Manual(plan) => manual_migrate_database(stdb, tx, plan, logger, existing_tables), - MigratePlan::Auto(plan) => { - auto_migrate_database(stdb, subscriptions, tx, auth_ctx, plan, logger, existing_tables) - } + MigratePlan::Auto(plan) => auto_migrate_database(stdb, tx, auth_ctx, plan, logger, existing_tables), } } @@ -72,7 +75,7 @@ fn manual_migrate_database( _plan: ManualMigratePlan, _logger: &dyn UpdateLogger, _existing_tables: Vec>, -) -> anyhow::Result<()> { +) -> anyhow::Result { unimplemented!("Manual database migrations are not yet implemented") } @@ -87,13 +90,12 @@ macro_rules! log { /// Automatically migrate a database. fn auto_migrate_database( stdb: &RelationalDB, - subscriptions: &ModuleSubscriptions, tx: &mut MutTxId, auth_ctx: AuthCtx, plan: AutoMigratePlan, logger: &dyn UpdateLogger, existing_tables: Vec>, -) -> anyhow::Result<()> { +) -> anyhow::Result { // We have already checked in `migrate_database` that `existing_tables` are compatible with the `old` definition in `plan`. // So we can look up tables in there using unwrap. @@ -132,6 +134,7 @@ fn auto_migrate_database( } log::info!("Running database update steps: {}", stdb.database_identity()); + let mut res = UpdateResult::Success; for step in plan.steps { match step { @@ -270,16 +273,16 @@ fn auto_migrate_database( stdb.add_columns_to_table(tx, table_id, column_schemas, default_values)?; } spacetimedb_schema::auto_migrate::AutoMigrateStep::DisconnectAllUsers => { - // Disconnect all clients from subscriptions. - // Any dangling clients will be handled during the launch of module hosts, - // which invokes `ModuleHost::call_identity_disconnected`. - subscriptions.remove_all_subscribers(); + log!(logger, "Disconnecting all users"); + // It does disconnect clients right away, + // but send response indicated that caller should drop clients + res = UpdateResult::RequiresClientDisconnect; } } } log::info!("Database update complete"); - Ok(()) + Ok(res) } #[cfg(test)] @@ -359,14 +362,7 @@ mod test { // Try to update the db. let mut tx = begin_mut_tx(&stdb); let plan = ponder_migrate(&old, &new)?; - update_database( - &stdb, - &ModuleSubscriptions::for_test_new_runtime(Arc::new(stdb.db.clone())).0, - &mut tx, - auth_ctx, - plan, - &TestLogger, - )?; + update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)?; // Expect the schema change. let idx_b_id = stdb diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 6c586bb1465..3709b734e44 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -19,7 +19,7 @@ use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, Context}; use async_trait::async_trait; use durability::{Durability, EmptyHistory}; -use log::{info, trace, warn}; +use log::{debug, info, trace, warn}; use parking_lot::Mutex; use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::IntMap; @@ -414,7 +414,14 @@ impl HostController { ) .await?; - *guard = Some(host); + // If hotswap is disabled, we drop the host after the update. + // which will drop all connected clients. + if !update_result.hotswap_disabled() { + *guard = Some(host); + } else { + debug!("dropping host after update with hotswap disabled"); + } + Ok::<_, anyhow::Error>(update_result) }) .await??; @@ -977,7 +984,7 @@ impl Host { trace!("update result: {update_result:?}"); // Only replace the module + scheduler if the update succeeded. // Otherwise, we want the database to continue running with the old state. - if update_result.was_successful() { + if update_result.was_successful() && !update_result.hotswap_disabled() { self.scheduler = scheduler; scheduler_starter.start(&module)?; let old_module = self.module.send_replace(module); diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index b655dd76303..dbf828ddea8 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -506,6 +506,7 @@ pub struct WeakModuleHost { pub enum UpdateDatabaseResult { NoUpdateNeeded, UpdatePerformed, + UpdatePerformedWithClientDisconnect, AutoMigrateError(ErrorStream), ErrorExecutingMigration(anyhow::Error), } @@ -514,9 +515,16 @@ impl UpdateDatabaseResult { pub fn was_successful(&self) -> bool { matches!( self, - UpdateDatabaseResult::UpdatePerformed | UpdateDatabaseResult::NoUpdateNeeded + UpdateDatabaseResult::UpdatePerformed + | UpdateDatabaseResult::NoUpdateNeeded + | UpdateDatabaseResult::UpdatePerformedWithClientDisconnect ) } + + /// Check if hotswap was disabled due to the update. + pub fn hotswap_disabled(&self) -> bool { + matches!(self, UpdateDatabaseResult::UpdatePerformedWithClientDisconnect) + } } #[derive(thiserror::Error, Debug)] diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 724cc78fa02..c93b81226d2 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -306,14 +306,7 @@ impl InstanceCommon { system_logger.info(&format!("Updated program to {program_hash}")); let auth_ctx = AuthCtx::for_current(replica_ctx.database.owner_identity); - let res = crate::db::update::update_database( - stdb, - &replica_ctx.subscriptions, - &mut tx, - auth_ctx, - plan, - system_logger, - ); + let res = crate::db::update::update_database(stdb, &mut tx, auth_ctx, plan, system_logger); match res { Err(e) => { @@ -323,13 +316,18 @@ impl InstanceCommon { stdb.report_mut_tx_metrics(reducer, tx_metrics, None); Ok(UpdateDatabaseResult::ErrorExecutingMigration(e)) } - Ok(()) => { + Ok(res) => { if let Some((_tx_offset, tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data)); } system_logger.info("Database updated"); log::info!("Database updated, {}", stdb.database_identity()); - Ok(UpdateDatabaseResult::UpdatePerformed) + match res { + crate::db::update::UpdateResult::Success => Ok(UpdateDatabaseResult::UpdatePerformed), + crate::db::update::UpdateResult::RequiresClientDisconnect => { + Ok(UpdateDatabaseResult::UpdatePerformedWithClientDisconnect) + } + } } } } diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 13b5544fc51..4df0ff9306c 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -853,11 +853,6 @@ impl ModuleSubscriptions { subscriptions.remove_all_subscriptions(&(client_id.identity, client_id.connection_id)); } - pub fn remove_all_subscribers(&self) { - let mut subscriptions = self.subscriptions.write(); - subscriptions.remove_all_clients(); - } - /// Commit a transaction and broadcast its ModuleEvent to all interested subscribers. /// /// The returned [`ExecutionMetrics`] are reported in this method via `report_tx_metrics`. diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 0079074ce0a..eab10f22aa9 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -779,12 +779,6 @@ impl SubscriptionManager { } } - pub fn remove_all_clients(&mut self) { - for id in self.clients.keys().copied().collect::>() { - self.remove_all_subscriptions(&id); - } - } - /// Remove a single subscription for a client. /// This will return an error if the client does not have a subscription with the given query id. pub fn remove_subscription(&mut self, client_id: ClientId, query_id: ClientQueryId) -> Result, DBError> { From af068a16737d9da22997c939d1a6ab44d8a766c7 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Thu, 25 Sep 2025 01:23:10 +0530 Subject: [PATCH 03/11] disconnect clients without dropping ModuleHost --- crates/core/src/host/host_controller.rs | 28 +++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 3709b734e44..8520ee37c83 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -981,14 +981,30 @@ impl Host { let update_result = update_module(&replica_ctx.relational_db, &module, program, old_module_info, policy).await?; - trace!("update result: {update_result:?}"); + // Only replace the module + scheduler if the update succeeded. // Otherwise, we want the database to continue running with the old state. - if update_result.was_successful() && !update_result.hotswap_disabled() { - self.scheduler = scheduler; - scheduler_starter.start(&module)?; - let old_module = self.module.send_replace(module); - old_module.exit().await; + match update_result { + UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => { + self.scheduler = scheduler; + scheduler_starter.start(&module)?; + let old_module = self.module.send_replace(module); + old_module.exit().await; + } + + // To disconnect clients, drop `self.module` (a `watch::Sender`) so that subscribed + // clients observe that the module is gone and will close their connections. + // The `ws_client_actor` in `spacetimedb_client_api::routes::subscribe` handles + // cleanup of client state on disconnection, so we don’t need to perform it here. + UpdateDatabaseResult::UpdatePerformedWithClientDisconnect => { + let old_module = self.module.borrow().clone(); + self.module = watch::Sender::new(module.clone()); + + self.scheduler = scheduler; + scheduler_starter.start(&module)?; + old_module.exit().await; + } + _ => {} } Ok(update_result) From 068d84f20a474e06c516d4847881615fd347a250 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Thu, 25 Sep 2025 01:58:32 +0530 Subject: [PATCH 04/11] fixup! remove hotwap_disable --- crates/core/src/host/host_controller.rs | 10 ++-------- crates/core/src/host/module_host.rs | 5 ----- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 8520ee37c83..47b3fe7a20b 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -19,7 +19,7 @@ use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, Context}; use async_trait::async_trait; use durability::{Durability, EmptyHistory}; -use log::{debug, info, trace, warn}; +use log::{info, trace, warn}; use parking_lot::Mutex; use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::IntMap; @@ -414,13 +414,7 @@ impl HostController { ) .await?; - // If hotswap is disabled, we drop the host after the update. - // which will drop all connected clients. - if !update_result.hotswap_disabled() { - *guard = Some(host); - } else { - debug!("dropping host after update with hotswap disabled"); - } + *guard = Some(host); Ok::<_, anyhow::Error>(update_result) }) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index dbf828ddea8..7e61c8ed2ab 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -520,11 +520,6 @@ impl UpdateDatabaseResult { | UpdateDatabaseResult::UpdatePerformedWithClientDisconnect ) } - - /// Check if hotswap was disabled due to the update. - pub fn hotswap_disabled(&self) -> bool { - matches!(self, UpdateDatabaseResult::UpdatePerformedWithClientDisconnect) - } } #[derive(thiserror::Error, Debug)] From 3dfba6cda67df50ac7d8ca796ded82f039782e55 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Thu, 25 Sep 2025 19:58:23 +0530 Subject: [PATCH 05/11] Update crates/core/src/db/update.rs Co-authored-by: Phoebe Goldman Signed-off-by: Shubham Mishra --- crates/core/src/db/update.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/core/src/db/update.rs b/crates/core/src/db/update.rs index ea97381f523..204d26787c5 100644 --- a/crates/core/src/db/update.rs +++ b/crates/core/src/db/update.rs @@ -25,6 +25,7 @@ impl UpdateLogger for SystemLogger { /// The result of a database update. /// Indicates whether clients should be disconnected when the update is complete. +#[must_use] pub enum UpdateResult { Success, RequiresClientDisconnect, From 4256d780ed14aed64e396f480c80f2c38f7c0f64 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Thu, 25 Sep 2025 20:26:16 +0530 Subject: [PATCH 06/11] fix: test lint --- crates/core/src/db/update.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/core/src/db/update.rs b/crates/core/src/db/update.rs index 204d26787c5..207c186f123 100644 --- a/crates/core/src/db/update.rs +++ b/crates/core/src/db/update.rs @@ -363,7 +363,8 @@ mod test { // Try to update the db. let mut tx = begin_mut_tx(&stdb); let plan = ponder_migrate(&old, &new)?; - update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)?; + let res = update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)?; + matches!(res, UpdateResult::Success); // Expect the schema change. let idx_b_id = stdb From 6eb572839665a20bf89eedf9ff096264183670f7 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Fri, 26 Sep 2025 18:11:55 +0530 Subject: [PATCH 07/11] fix client disconnection by calling explicitly from . --- crates/core/src/db/relational_db.rs | 27 ++++++++++++++++- crates/core/src/host/host_controller.rs | 26 +++++++++++++---- crates/core/src/host/module_host.rs | 39 ++++++++++++++++++++----- 3 files changed, 78 insertions(+), 14 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 3dfc8e496fa..bbbcd820e4c 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -20,7 +20,7 @@ use spacetimedb_datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView, }; use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId}; -use spacetimedb_datastore::system_tables::{system_tables, StModuleRow}; +use spacetimedb_datastore::system_tables::{system_tables, StClientFields, StClientRow, StModuleRow, ST_CLIENT_ID}; use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID}; use spacetimedb_datastore::traits::{ InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore, @@ -502,6 +502,31 @@ impl RelationalDB { .map_err(DBError::from) } + /// Returns `Ok(Some(pointer))` if the given client is currently connected to the database, + /// `Ok(None)` if not, and `Err(_)` on error. + pub fn st_client_row( + &self, + identity: Identity, + connection_id: ConnectionId, + ) -> Result, DBError> { + let row = StClientRow { + identity: identity.into(), + connection_id: connection_id.into(), + }; + + self.with_read_only(Workload::Internal, |tx| { + self.inner + .iter_by_col_range_tx( + tx, + ST_CLIENT_ID, + col_list![StClientFields::Identity, StClientFields::ConnectionId], + &AlgebraicValue::product(row), + ) + .map(|mut it| it.next().map(|row_ref| row_ref.pointer())) + }) + .map_err(DBError::from) + } + /// Update the module associated with this database. /// /// The caller must ensure that: diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 47b3fe7a20b..2f0f3a160f4 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -2,6 +2,7 @@ use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule}; use super::scheduler::SchedulerStarter; use super::wasmtime::WasmtimeRuntime; use super::{Scheduler, UpdateDatabaseResult}; +use crate::client::{ClientActorId, ClientName}; use crate::database_logger::DatabaseLogger; use crate::db::relational_db::{self, DiskSizeFn, RelationalDB, Txdata}; use crate::db::{self, spawn_tx_metrics_recorder}; @@ -986,16 +987,29 @@ impl Host { old_module.exit().await; } - // To disconnect clients, drop `self.module` (a `watch::Sender`) so that subscribed - // clients observe that the module is gone and will close their connections. - // The `ws_client_actor` in `spacetimedb_client_api::routes::subscribe` handles - // cleanup of client state on disconnection, so we don’t need to perform it here. + // In this case, we need to disconnect all clients connected to the old module UpdateDatabaseResult::UpdatePerformedWithClientDisconnect => { - let old_module = self.module.borrow().clone(); - self.module = watch::Sender::new(module.clone()); + // Replace the module first, so that new clients get the new module. + let old_watcher = std::mem::replace(&mut self.module, watch::Sender::new(module.clone())); + + // Disconnect all clients connected to the old module. + let connected_clients = replica_ctx.relational_db.connected_clients()?; + for (identity, connection_id) in connected_clients { + let client_actor_id = ClientActorId { + identity, + connection_id, + name: ClientName(0), + }; + //NOTE: This will call disconnect reducer of the new module, not the old one. + //It makes sense, as relationaldb is already updated to the new module. + module.disconnect_client(client_actor_id).await; + } self.scheduler = scheduler; scheduler_starter.start(&module)?; + // exit the old module, drop the `old_watcher` afterwards, + // which will signal websocket clients that the module is gone. + let old_module = old_watcher.borrow().clone(); old_module.exit().await; } _ => {} diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 7e61c8ed2ab..9554006bb61 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -782,22 +782,47 @@ impl ModuleHost { .map_err(Into::::into)? } + /// Invokes the `client_disconnected` reducer, if present, + /// then deletes the client’s rows from `st_client` and `st_connection_credentials`. + /// If the reducer fails, the rows are still deleted. + /// Calling this on an already-disconnected client is a no-op. pub fn call_identity_disconnected_inner( &self, caller_identity: Identity, caller_connection_id: ConnectionId, inst: &mut dyn ModuleInstance, ) -> Result<(), ReducerCallError> { - let me = self.clone(); - let reducer_lookup = me.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect); + let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect); + let reducer_name = reducer_lookup + .as_ref() + .map(|(_, def)| &*def.name) + .unwrap_or("__identity_disconnected__"); + + match self + .module + .replica_ctx() + .relational_db + .st_client_row(caller_identity, caller_connection_id) + { + Ok(None) => { + log::debug!( + "client: {caller_identity} with connection_id: {caller_connection_id} already disconnected" + ); + return Ok(()); + } + Err(e) => { + return Err(InvalidReducerArguments { + err: e.into(), + reducer: reducer_name.into(), + } + .into()) + } + _ => {} // Ok(true) – client is connected; proceed + } + let me = self.clone(); // A fallback transaction that deletes the client from `st_client`. let fallback = || { - let reducer_name = reducer_lookup - .as_ref() - .map(|(_, def)| &*def.name) - .unwrap_or("__identity_disconnected__"); - let workload = Workload::Reducer(ReducerContext { name: reducer_name.to_owned(), caller_identity, From 27ca50e61c9aa01dcc0084b83157c28477985e3d Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Fri, 26 Sep 2025 19:07:54 +0530 Subject: [PATCH 08/11] fix doc --- crates/core/src/db/relational_db.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index bbbcd820e4c..9d51182c5d8 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -502,8 +502,9 @@ impl RelationalDB { .map_err(DBError::from) } - /// Returns `Ok(Some(pointer))` if the given client is currently connected to the database, - /// `Ok(None)` if not, and `Err(_)` on error. + /// Look up a client row by identity and connection ID in the `st_clients` system table. + /// + /// `Ok(None)` if no such row exists. pub fn st_client_row( &self, identity: Identity, From 1901887af7c9d3e8d5744e1243cead700eae6e44 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Fri, 26 Sep 2025 19:11:19 +0530 Subject: [PATCH 09/11] fix doc --- crates/core/src/db/update.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/db/update.rs b/crates/core/src/db/update.rs index 207c186f123..365d5d56463 100644 --- a/crates/core/src/db/update.rs +++ b/crates/core/src/db/update.rs @@ -275,7 +275,7 @@ fn auto_migrate_database( } spacetimedb_schema::auto_migrate::AutoMigrateStep::DisconnectAllUsers => { log!(logger, "Disconnecting all users"); - // It does disconnect clients right away, + // It does not disconnect clients right away, // but send response indicated that caller should drop clients res = UpdateResult::RequiresClientDisconnect; } From 8765aec5efcf7924cae2616131e0190241b41d4c Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Fri, 26 Sep 2025 22:36:47 +0530 Subject: [PATCH 10/11] make disconnect_client method atomic --- crates/core/src/db/relational_db.rs | 28 +------- crates/core/src/host/module_host.rs | 68 +++++++++++-------- .../src/locking_tx_datastore/mut_tx.rs | 17 +++++ 3 files changed, 57 insertions(+), 56 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 9d51182c5d8..3dfc8e496fa 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -20,7 +20,7 @@ use spacetimedb_datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView, }; use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId}; -use spacetimedb_datastore::system_tables::{system_tables, StClientFields, StClientRow, StModuleRow, ST_CLIENT_ID}; +use spacetimedb_datastore::system_tables::{system_tables, StModuleRow}; use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID}; use spacetimedb_datastore::traits::{ InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore, @@ -502,32 +502,6 @@ impl RelationalDB { .map_err(DBError::from) } - /// Look up a client row by identity and connection ID in the `st_clients` system table. - /// - /// `Ok(None)` if no such row exists. - pub fn st_client_row( - &self, - identity: Identity, - connection_id: ConnectionId, - ) -> Result, DBError> { - let row = StClientRow { - identity: identity.into(), - connection_id: connection_id.into(), - }; - - self.with_read_only(Workload::Internal, |tx| { - self.inner - .iter_by_col_range_tx( - tx, - ST_CLIENT_ID, - col_list![StClientFields::Identity, StClientFields::ConnectionId], - &AlgebraicValue::product(row), - ) - .map(|mut it| it.next().map(|row_ref| row_ref.pointer())) - }) - .map_err(DBError::from) - } - /// Update the module associated with this database. /// /// The caller must ensure that: diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 9554006bb61..24527c71d54 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -798,41 +798,37 @@ impl ModuleHost { .map(|(_, def)| &*def.name) .unwrap_or("__identity_disconnected__"); - match self - .module - .replica_ctx() - .relational_db - .st_client_row(caller_identity, caller_connection_id) - { - Ok(None) => { - log::debug!( - "client: {caller_identity} with connection_id: {caller_connection_id} already disconnected" - ); - return Ok(()); - } - Err(e) => { - return Err(InvalidReducerArguments { - err: e.into(), - reducer: reducer_name.into(), - } - .into()) - } - _ => {} // Ok(true) – client is connected; proceed - } + let is_client_exist = |mut_tx: &MutTxId| { + mut_tx + .st_client_row(caller_identity, caller_connection_id) + .map(|row_opt| row_opt.is_some()) + }; - let me = self.clone(); - // A fallback transaction that deletes the client from `st_client`. - let fallback = || { - let workload = Workload::Reducer(ReducerContext { + let workload = || { + Workload::Reducer(ReducerContext { name: reducer_name.to_owned(), caller_identity, caller_connection_id, timestamp: Timestamp::now(), arg_bsatn: Bytes::new(), - }); - let stdb = me.module.replica_ctx().relational_db.clone(); + }) + }; + + let me = self.clone(); + let stdb = me.module.replica_ctx().relational_db.clone(); + + // A fallback transaction that deletes the client from `st_client`. + let fallback = || { let database_identity = me.info.database_identity; - stdb.with_auto_commit(workload, |mut_tx| { + stdb.with_auto_commit(workload(), |mut_tx| { + if !is_client_exist(mut_tx)? { + // The client is already gone. Nothing to do. + log::debug!( + "`call_identity_disconnected`: no row in `st_client` for ({caller_identity}, {caller_connection_id}), nothing to do", + ); + return Ok(()); + } + mut_tx .delete_st_client(caller_identity, caller_connection_id, database_identity) .map_err(DBError::from) @@ -850,11 +846,25 @@ impl ModuleHost { }; if let Some((reducer_id, reducer_def)) = reducer_lookup { + let stdb = me.module.replica_ctx().relational_db.clone(); + let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload()); + + if !is_client_exist(&mut_tx).map_err(|e| InvalidReducerArguments { + err: e.into(), + reducer: reducer_name.into(), + })? { + // The client is already gone. Nothing to do. + log::debug!( + "`call_identity_disconnected`: no row in `st_client` for ({caller_identity}, {caller_connection_id}), nothing to do", + ); + return Ok(()); + } + // The module defined a lifecycle reducer to handle disconnects. Call it. // If it succeeds, `WasmModuleInstance::call_reducer_with_tx` has already ensured // that `st_client` is updated appropriately. let result = me.call_reducer_inner_with_inst( - None, + Some(mut_tx), caller_identity, Some(caller_connection_id), None, diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index e311af7688b..0f430931598 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -1576,6 +1576,23 @@ impl MutTxId { self.delete_st_client_credentials(database_identity, connection_id) } + /// Look up a client row by identity and connection ID in the `st_clients` system table. + /// + /// `Ok(None)` if no such row exists. + pub fn st_client_row(&self, identity: Identity, connection_id: ConnectionId) -> Result> { + let row = StClientRow { + identity: identity.into(), + connection_id: connection_id.into(), + }; + + self.iter_by_col_eq( + ST_CLIENT_ID, + col_list![StClientFields::Identity, StClientFields::ConnectionId], + &AlgebraicValue::product(row), + ) + .map(|mut it| it.next().map(|row_ref| row_ref.pointer())) + } + pub fn insert_via_serialize_bsatn<'a, T: Serialize>( &'a mut self, table_id: TableId, From 1805a00d962744f225f984300861864e2559f5e9 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Mon, 29 Sep 2025 22:11:28 +0530 Subject: [PATCH 11/11] panic if st_client_row fails --- crates/core/src/host/module_host.rs | 13 +++---------- crates/datastore/src/locking_tx_datastore/mut_tx.rs | 8 ++++---- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 24527c71d54..5e93a350d53 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -798,11 +798,7 @@ impl ModuleHost { .map(|(_, def)| &*def.name) .unwrap_or("__identity_disconnected__"); - let is_client_exist = |mut_tx: &MutTxId| { - mut_tx - .st_client_row(caller_identity, caller_connection_id) - .map(|row_opt| row_opt.is_some()) - }; + let is_client_exist = |mut_tx: &MutTxId| mut_tx.st_client_row(caller_identity, caller_connection_id).is_some(); let workload = || { Workload::Reducer(ReducerContext { @@ -821,7 +817,7 @@ impl ModuleHost { let fallback = || { let database_identity = me.info.database_identity; stdb.with_auto_commit(workload(), |mut_tx| { - if !is_client_exist(mut_tx)? { + if !is_client_exist(mut_tx) { // The client is already gone. Nothing to do. log::debug!( "`call_identity_disconnected`: no row in `st_client` for ({caller_identity}, {caller_connection_id}), nothing to do", @@ -849,10 +845,7 @@ impl ModuleHost { let stdb = me.module.replica_ctx().relational_db.clone(); let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload()); - if !is_client_exist(&mut_tx).map_err(|e| InvalidReducerArguments { - err: e.into(), - reducer: reducer_name.into(), - })? { + if !is_client_exist(&mut_tx) { // The client is already gone. Nothing to do. log::debug!( "`call_identity_disconnected`: no row in `st_client` for ({caller_identity}, {caller_connection_id}), nothing to do", diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 0f430931598..06889c0ed5c 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -1577,9 +1577,7 @@ impl MutTxId { } /// Look up a client row by identity and connection ID in the `st_clients` system table. - /// - /// `Ok(None)` if no such row exists. - pub fn st_client_row(&self, identity: Identity, connection_id: ConnectionId) -> Result> { + pub fn st_client_row(&self, identity: Identity, connection_id: ConnectionId) -> Option { let row = StClientRow { identity: identity.into(), connection_id: connection_id.into(), @@ -1590,7 +1588,9 @@ impl MutTxId { col_list![StClientFields::Identity, StClientFields::ConnectionId], &AlgebraicValue::product(row), ) - .map(|mut it| it.next().map(|row_ref| row_ref.pointer())) + .expect("failed to read from st_client system table") + .next() + .map(|row| row.pointer()) } pub fn insert_via_serialize_bsatn<'a, T: Serialize>(