Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,9 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate>(
)
.into());
}
UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {}
UpdateDatabaseResult::NoUpdateNeeded
| UpdateDatabaseResult::UpdatePerformed
| UpdateDatabaseResult::UpdatePerformedWithClientDisconnect => {}
}
}

Expand Down
27 changes: 21 additions & 6 deletions crates/core/src/db/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ impl UpdateLogger for SystemLogger {
}
}

/// The result of a database update.
/// Indicates whether clients should be disconnected when the update is complete.
#[must_use]
pub enum UpdateResult {
Comment thread
Shubham8287 marked this conversation as resolved.
Success,
RequiresClientDisconnect,
}

/// Update the database according to the migration plan.
///
/// The update is performed within the transactional context `tx`.
Expand All @@ -39,7 +47,7 @@ pub fn update_database(
auth_ctx: AuthCtx,
plan: MigratePlan,
logger: &dyn UpdateLogger,
) -> anyhow::Result<()> {
) -> anyhow::Result<UpdateResult> {
let existing_tables = stdb.get_all_tables_mut(tx)?;

// TODO: consider using `ErrorStream` here.
Expand Down Expand Up @@ -68,7 +76,7 @@ fn manual_migrate_database(
_plan: ManualMigratePlan,
_logger: &dyn UpdateLogger,
_existing_tables: Vec<Arc<TableSchema>>,
) -> anyhow::Result<()> {
) -> anyhow::Result<UpdateResult> {
unimplemented!("Manual database migrations are not yet implemented")
}

Expand All @@ -88,7 +96,7 @@ fn auto_migrate_database(
plan: AutoMigratePlan,
logger: &dyn UpdateLogger,
existing_tables: Vec<Arc<TableSchema>>,
) -> anyhow::Result<()> {
) -> anyhow::Result<UpdateResult> {
// We have already checked in `migrate_database` that `existing_tables` are compatible with the `old` definition in `plan`.
// So we can look up tables in there using unwrap.

Expand Down Expand Up @@ -127,6 +135,7 @@ fn auto_migrate_database(
}

log::info!("Running database update steps: {}", stdb.database_identity());
let mut res = UpdateResult::Success;

for step in plan.steps {
match step {
Expand Down Expand Up @@ -264,12 +273,17 @@ fn auto_migrate_database(
.collect();
stdb.add_columns_to_table(tx, table_id, column_schemas, default_values)?;
}
_ => anyhow::bail!("migration step not implemented: {step:?}"),
spacetimedb_schema::auto_migrate::AutoMigrateStep::DisconnectAllUsers => {
log!(logger, "Disconnecting all users");
// It does not disconnect clients right away,
// but send response indicated that caller should drop clients
res = UpdateResult::RequiresClientDisconnect;
}
}
}

log::info!("Database update complete");
Ok(())
Ok(res)
}

#[cfg(test)]
Expand Down Expand Up @@ -349,7 +363,8 @@ mod test {
// Try to update the db.
let mut tx = begin_mut_tx(&stdb);
let plan = ponder_migrate(&old, &new)?;
update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)?;
let res = update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)?;
matches!(res, UpdateResult::Success);

// Expect the schema change.
let idx_b_id = stdb
Expand Down
43 changes: 37 additions & 6 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule};
use super::scheduler::SchedulerStarter;
use super::wasmtime::WasmtimeRuntime;
use super::{Scheduler, UpdateDatabaseResult};
use crate::client::{ClientActorId, ClientName};
use crate::database_logger::DatabaseLogger;
use crate::db::relational_db::{self, DiskSizeFn, RelationalDB, Txdata};
use crate::db::{self, spawn_tx_metrics_recorder};
Expand Down Expand Up @@ -415,6 +416,7 @@ impl HostController {
.await?;

*guard = Some(host);

Ok::<_, anyhow::Error>(update_result)
})
.await??;
Expand Down Expand Up @@ -974,14 +976,43 @@ impl Host {

let update_result =
update_module(&replica_ctx.relational_db, &module, program, old_module_info, policy).await?;
trace!("update result: {update_result:?}");

// Only replace the module + scheduler if the update succeeded.
// Otherwise, we want the database to continue running with the old state.
if update_result.was_successful() {
self.scheduler = scheduler;
scheduler_starter.start(&module)?;
let old_module = self.module.send_replace(module);
old_module.exit().await;
match update_result {
UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {
self.scheduler = scheduler;
scheduler_starter.start(&module)?;
let old_module = self.module.send_replace(module);
old_module.exit().await;
}

// In this case, we need to disconnect all clients connected to the old module
UpdateDatabaseResult::UpdatePerformedWithClientDisconnect => {
// Replace the module first, so that new clients get the new module.
let old_watcher = std::mem::replace(&mut self.module, watch::Sender::new(module.clone()));

// Disconnect all clients connected to the old module.
let connected_clients = replica_ctx.relational_db.connected_clients()?;
for (identity, connection_id) in connected_clients {
let client_actor_id = ClientActorId {
identity,
connection_id,
name: ClientName(0),
};
//NOTE: This will call disconnect reducer of the new module, not the old one.
//It makes sense, as relationaldb is already updated to the new module.
module.disconnect_client(client_actor_id).await;
}

self.scheduler = scheduler;
scheduler_starter.start(&module)?;
// exit the old module, drop the `old_watcher` afterwards,
// which will signal websocket clients that the module is gone.
let old_module = old_watcher.borrow().clone();
old_module.exit().await;
}
_ => {}
}

Ok(update_result)
Expand Down
66 changes: 52 additions & 14 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ pub struct WeakModuleHost {
pub enum UpdateDatabaseResult {
NoUpdateNeeded,
UpdatePerformed,
UpdatePerformedWithClientDisconnect,
AutoMigrateError(ErrorStream<AutoMigrateError>),
ErrorExecutingMigration(anyhow::Error),
}
Expand All @@ -514,7 +515,9 @@ impl UpdateDatabaseResult {
pub fn was_successful(&self) -> bool {
matches!(
self,
UpdateDatabaseResult::UpdatePerformed | UpdateDatabaseResult::NoUpdateNeeded
UpdateDatabaseResult::UpdatePerformed
| UpdateDatabaseResult::NoUpdateNeeded
| UpdateDatabaseResult::UpdatePerformedWithClientDisconnect
)
}
}
Expand Down Expand Up @@ -779,32 +782,53 @@ impl ModuleHost {
.map_err(Into::<ReducerCallError>::into)?
}

/// Invokes the `client_disconnected` reducer, if present,
/// then deletes the client’s rows from `st_client` and `st_connection_credentials`.
/// If the reducer fails, the rows are still deleted.
/// Calling this on an already-disconnected client is a no-op.
pub fn call_identity_disconnected_inner(
&self,
caller_identity: Identity,
caller_connection_id: ConnectionId,
inst: &mut dyn ModuleInstance,
) -> Result<(), ReducerCallError> {
let me = self.clone();
let reducer_lookup = me.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect);
let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect);
let reducer_name = reducer_lookup
.as_ref()
.map(|(_, def)| &*def.name)
.unwrap_or("__identity_disconnected__");

// A fallback transaction that deletes the client from `st_client`.
let fallback = || {
let reducer_name = reducer_lookup
.as_ref()
.map(|(_, def)| &*def.name)
.unwrap_or("__identity_disconnected__");
let is_client_exist = |mut_tx: &MutTxId| {
mut_tx
.st_client_row(caller_identity, caller_connection_id)
.map(|row_opt| row_opt.is_some())
};

let workload = Workload::Reducer(ReducerContext {
let workload = || {
Workload::Reducer(ReducerContext {
name: reducer_name.to_owned(),
caller_identity,
caller_connection_id,
timestamp: Timestamp::now(),
arg_bsatn: Bytes::new(),
});
let stdb = me.module.replica_ctx().relational_db.clone();
})
};

let me = self.clone();
let stdb = me.module.replica_ctx().relational_db.clone();

// A fallback transaction that deletes the client from `st_client`.
let fallback = || {
let database_identity = me.info.database_identity;
stdb.with_auto_commit(workload, |mut_tx| {
stdb.with_auto_commit(workload(), |mut_tx| {
if !is_client_exist(mut_tx)? {
// The client is already gone. Nothing to do.
log::debug!(
"`call_identity_disconnected`: no row in `st_client` for ({caller_identity}, {caller_connection_id}), nothing to do",
);
return Ok(());
}

mut_tx
.delete_st_client(caller_identity, caller_connection_id, database_identity)
.map_err(DBError::from)
Expand All @@ -822,11 +846,25 @@ impl ModuleHost {
};

if let Some((reducer_id, reducer_def)) = reducer_lookup {
let stdb = me.module.replica_ctx().relational_db.clone();
let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload());

if !is_client_exist(&mut_tx).map_err(|e| InvalidReducerArguments {
err: e.into(),
reducer: reducer_name.into(),
})? {
// The client is already gone. Nothing to do.
log::debug!(
"`call_identity_disconnected`: no row in `st_client` for ({caller_identity}, {caller_connection_id}), nothing to do",
);
return Ok(());
}

// The module defined a lifecycle reducer to handle disconnects. Call it.
// If it succeeds, `WasmModuleInstance::call_reducer_with_tx` has already ensured
// that `st_client` is updated appropriately.
let result = me.call_reducer_inner_with_inst(
None,
Some(mut_tx),
caller_identity,
Some(caller_connection_id),
None,
Expand Down
9 changes: 7 additions & 2 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,18 @@ impl InstanceCommon {
stdb.report_mut_tx_metrics(reducer, tx_metrics, None);
Ok(UpdateDatabaseResult::ErrorExecutingMigration(e))
}
Ok(()) => {
Ok(res) => {
if let Some((_tx_offset, tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? {
stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data));
}
system_logger.info("Database updated");
log::info!("Database updated, {}", stdb.database_identity());
Ok(UpdateDatabaseResult::UpdatePerformed)
match res {
crate::db::update::UpdateResult::Success => Ok(UpdateDatabaseResult::UpdatePerformed),
crate::db::update::UpdateResult::RequiresClientDisconnect => {
Ok(UpdateDatabaseResult::UpdatePerformedWithClientDisconnect)
}
}
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,23 @@ impl MutTxId {
self.delete_st_client_credentials(database_identity, connection_id)
}

/// Look up a client row by identity and connection ID in the `st_clients` system table.
///
/// `Ok(None)` if no such row exists.
pub fn st_client_row(&self, identity: Identity, connection_id: ConnectionId) -> Result<Option<RowPointer>> {
Comment thread
Shubham8287 marked this conversation as resolved.
Outdated
let row = StClientRow {
identity: identity.into(),
connection_id: connection_id.into(),
};

self.iter_by_col_eq(
ST_CLIENT_ID,
col_list![StClientFields::Identity, StClientFields::ConnectionId],
&AlgebraicValue::product(row),
)
.map(|mut it| it.next().map(|row_ref| row_ref.pointer()))
}

pub fn insert_via_serialize_bsatn<'a, T: Serialize>(
&'a mut self,
table_id: TableId,
Expand Down
Loading