diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 5ff5102d355..5e01f43a6fc 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -664,7 +664,9 @@ pub async fn publish( ) .into()); } - UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {} + UpdateDatabaseResult::NoUpdateNeeded + | UpdateDatabaseResult::UpdatePerformed + | UpdateDatabaseResult::UpdatePerformedWithClientDisconnect => {} } } diff --git a/crates/core/src/db/update.rs b/crates/core/src/db/update.rs index fa3ea5bdc31..365d5d56463 100644 --- a/crates/core/src/db/update.rs +++ b/crates/core/src/db/update.rs @@ -23,6 +23,14 @@ impl UpdateLogger for SystemLogger { } } +/// The result of a database update. +/// Indicates whether clients should be disconnected when the update is complete. +#[must_use] +pub enum UpdateResult { + Success, + RequiresClientDisconnect, +} + /// Update the database according to the migration plan. /// /// The update is performed within the transactional context `tx`. @@ -39,7 +47,7 @@ pub fn update_database( auth_ctx: AuthCtx, plan: MigratePlan, logger: &dyn UpdateLogger, -) -> anyhow::Result<()> { +) -> anyhow::Result { let existing_tables = stdb.get_all_tables_mut(tx)?; // TODO: consider using `ErrorStream` here. @@ -68,7 +76,7 @@ fn manual_migrate_database( _plan: ManualMigratePlan, _logger: &dyn UpdateLogger, _existing_tables: Vec>, -) -> anyhow::Result<()> { +) -> anyhow::Result { unimplemented!("Manual database migrations are not yet implemented") } @@ -88,7 +96,7 @@ fn auto_migrate_database( plan: AutoMigratePlan, logger: &dyn UpdateLogger, existing_tables: Vec>, -) -> anyhow::Result<()> { +) -> anyhow::Result { // We have already checked in `migrate_database` that `existing_tables` are compatible with the `old` definition in `plan`. // So we can look up tables in there using unwrap. @@ -127,6 +135,7 @@ fn auto_migrate_database( } log::info!("Running database update steps: {}", stdb.database_identity()); + let mut res = UpdateResult::Success; for step in plan.steps { match step { @@ -264,12 +273,17 @@ fn auto_migrate_database( .collect(); stdb.add_columns_to_table(tx, table_id, column_schemas, default_values)?; } - _ => anyhow::bail!("migration step not implemented: {step:?}"), + spacetimedb_schema::auto_migrate::AutoMigrateStep::DisconnectAllUsers => { + log!(logger, "Disconnecting all users"); + // It does not disconnect clients right away, + // but send response indicated that caller should drop clients + res = UpdateResult::RequiresClientDisconnect; + } } } log::info!("Database update complete"); - Ok(()) + Ok(res) } #[cfg(test)] @@ -349,7 +363,8 @@ mod test { // Try to update the db. let mut tx = begin_mut_tx(&stdb); let plan = ponder_migrate(&old, &new)?; - update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)?; + let res = update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)?; + matches!(res, UpdateResult::Success); // Expect the schema change. let idx_b_id = stdb diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index ab4cd55575f..75b0a7b0cc7 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -2,6 +2,7 @@ use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule}; use super::scheduler::SchedulerStarter; use super::wasmtime::WasmtimeRuntime; use super::{Scheduler, UpdateDatabaseResult}; +use crate::client::{ClientActorId, ClientName}; use crate::database_logger::DatabaseLogger; use crate::db::persistence::PersistenceProvider; use crate::db::relational_db::{self, DiskSizeFn, RelationalDB, Txdata}; @@ -409,6 +410,7 @@ impl HostController { .await?; *guard = Some(host); + Ok::<_, anyhow::Error>(update_result) }) .await??; @@ -959,14 +961,43 @@ impl Host { let update_result = update_module(&replica_ctx.relational_db, &module, program, old_module_info, policy).await?; - trace!("update result: {update_result:?}"); + // Only replace the module + scheduler if the update succeeded. // Otherwise, we want the database to continue running with the old state. - if update_result.was_successful() { - self.scheduler = scheduler; - scheduler_starter.start(&module)?; - let old_module = self.module.send_replace(module); - old_module.exit().await; + match update_result { + UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => { + self.scheduler = scheduler; + scheduler_starter.start(&module)?; + let old_module = self.module.send_replace(module); + old_module.exit().await; + } + + // In this case, we need to disconnect all clients connected to the old module + UpdateDatabaseResult::UpdatePerformedWithClientDisconnect => { + // Replace the module first, so that new clients get the new module. + let old_watcher = std::mem::replace(&mut self.module, watch::Sender::new(module.clone())); + + // Disconnect all clients connected to the old module. + let connected_clients = replica_ctx.relational_db.connected_clients()?; + for (identity, connection_id) in connected_clients { + let client_actor_id = ClientActorId { + identity, + connection_id, + name: ClientName(0), + }; + //NOTE: This will call disconnect reducer of the new module, not the old one. + //It makes sense, as relationaldb is already updated to the new module. + module.disconnect_client(client_actor_id).await; + } + + self.scheduler = scheduler; + scheduler_starter.start(&module)?; + // exit the old module, drop the `old_watcher` afterwards, + // which will signal websocket clients that the module is gone. + let old_module = old_watcher.borrow().clone(); + old_module.exit().await; + } + _ => {} } Ok(update_result) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index b655dd76303..5e93a350d53 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -506,6 +506,7 @@ pub struct WeakModuleHost { pub enum UpdateDatabaseResult { NoUpdateNeeded, UpdatePerformed, + UpdatePerformedWithClientDisconnect, AutoMigrateError(ErrorStream), ErrorExecutingMigration(anyhow::Error), } @@ -514,7 +515,9 @@ impl UpdateDatabaseResult { pub fn was_successful(&self) -> bool { matches!( self, - UpdateDatabaseResult::UpdatePerformed | UpdateDatabaseResult::NoUpdateNeeded + UpdateDatabaseResult::UpdatePerformed + | UpdateDatabaseResult::NoUpdateNeeded + | UpdateDatabaseResult::UpdatePerformedWithClientDisconnect ) } } @@ -779,32 +782,49 @@ impl ModuleHost { .map_err(Into::::into)? } + /// Invokes the `client_disconnected` reducer, if present, + /// then deletes the client’s rows from `st_client` and `st_connection_credentials`. + /// If the reducer fails, the rows are still deleted. + /// Calling this on an already-disconnected client is a no-op. pub fn call_identity_disconnected_inner( &self, caller_identity: Identity, caller_connection_id: ConnectionId, inst: &mut dyn ModuleInstance, ) -> Result<(), ReducerCallError> { - let me = self.clone(); - let reducer_lookup = me.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect); + let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect); + let reducer_name = reducer_lookup + .as_ref() + .map(|(_, def)| &*def.name) + .unwrap_or("__identity_disconnected__"); - // A fallback transaction that deletes the client from `st_client`. - let fallback = || { - let reducer_name = reducer_lookup - .as_ref() - .map(|(_, def)| &*def.name) - .unwrap_or("__identity_disconnected__"); + let is_client_exist = |mut_tx: &MutTxId| mut_tx.st_client_row(caller_identity, caller_connection_id).is_some(); - let workload = Workload::Reducer(ReducerContext { + let workload = || { + Workload::Reducer(ReducerContext { name: reducer_name.to_owned(), caller_identity, caller_connection_id, timestamp: Timestamp::now(), arg_bsatn: Bytes::new(), - }); - let stdb = me.module.replica_ctx().relational_db.clone(); + }) + }; + + let me = self.clone(); + let stdb = me.module.replica_ctx().relational_db.clone(); + + // A fallback transaction that deletes the client from `st_client`. + let fallback = || { let database_identity = me.info.database_identity; - stdb.with_auto_commit(workload, |mut_tx| { + stdb.with_auto_commit(workload(), |mut_tx| { + if !is_client_exist(mut_tx) { + // The client is already gone. Nothing to do. + log::debug!( + "`call_identity_disconnected`: no row in `st_client` for ({caller_identity}, {caller_connection_id}), nothing to do", + ); + return Ok(()); + } + mut_tx .delete_st_client(caller_identity, caller_connection_id, database_identity) .map_err(DBError::from) @@ -822,11 +842,22 @@ impl ModuleHost { }; if let Some((reducer_id, reducer_def)) = reducer_lookup { + let stdb = me.module.replica_ctx().relational_db.clone(); + let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload()); + + if !is_client_exist(&mut_tx) { + // The client is already gone. Nothing to do. + log::debug!( + "`call_identity_disconnected`: no row in `st_client` for ({caller_identity}, {caller_connection_id}), nothing to do", + ); + return Ok(()); + } + // The module defined a lifecycle reducer to handle disconnects. Call it. // If it succeeds, `WasmModuleInstance::call_reducer_with_tx` has already ensured // that `st_client` is updated appropriately. let result = me.call_reducer_inner_with_inst( - None, + Some(mut_tx), caller_identity, Some(caller_connection_id), None, diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index db5a26eb86a..c93b81226d2 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -316,13 +316,18 @@ impl InstanceCommon { stdb.report_mut_tx_metrics(reducer, tx_metrics, None); Ok(UpdateDatabaseResult::ErrorExecutingMigration(e)) } - Ok(()) => { + Ok(res) => { if let Some((_tx_offset, tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data)); } system_logger.info("Database updated"); log::info!("Database updated, {}", stdb.database_identity()); - Ok(UpdateDatabaseResult::UpdatePerformed) + match res { + crate::db::update::UpdateResult::Success => Ok(UpdateDatabaseResult::UpdatePerformed), + crate::db::update::UpdateResult::RequiresClientDisconnect => { + Ok(UpdateDatabaseResult::UpdatePerformedWithClientDisconnect) + } + } } } } diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index e311af7688b..06889c0ed5c 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -1576,6 +1576,23 @@ impl MutTxId { self.delete_st_client_credentials(database_identity, connection_id) } + /// Look up a client row by identity and connection ID in the `st_clients` system table. + pub fn st_client_row(&self, identity: Identity, connection_id: ConnectionId) -> Option { + let row = StClientRow { + identity: identity.into(), + connection_id: connection_id.into(), + }; + + self.iter_by_col_eq( + ST_CLIENT_ID, + col_list![StClientFields::Identity, StClientFields::ConnectionId], + &AlgebraicValue::product(row), + ) + .expect("failed to read from st_client system table") + .next() + .map(|row| row.pointer()) + } + pub fn insert_via_serialize_bsatn<'a, T: Serialize>( &'a mut self, table_id: TableId,