diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index a3231df6dfd..38acc94bee9 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -76,6 +76,10 @@ pub type ProgramStorage = Arc; /// A host controller manages the lifecycle of spacetime databases and their /// associated modules. +/// +/// This type is, and must remain, cheap to clone. +/// All of its fields should either be [`Copy`], enclosed in an [`Arc`], +/// or have some other fast [`Clone`] implementation. #[derive(Clone)] pub struct HostController { /// Map of all hosts managed by this controller, @@ -253,10 +257,31 @@ impl HostController { } trace!("launch host {}/{}", database.database_identity, replica_id); - let host = self.try_init_host(database, replica_id).await?; - let rx = host.module.subscribe(); - *guard = Some(host); + // `HostController::clone` is fast, + // as all of its fields are either `Copy` or wrapped in `Arc`. + let this = self.clone(); + + // `try_init_host` is not cancel safe, as it will spawn other async tasks + // which hold a filesystem lock past when `try_init_host` returns or is cancelled. + // This means that, if `try_init_host` is cancelled, subsequent calls will fail. + // + // This is problematic because Axum will cancel its handler tasks if the client disconnects, + // and this method is called from Axum handlers, e.g. for the subscribe route. + // `tokio::spawn` a task to build the `Host` and install it in the `guard`, + // so that it will run to completion even if the caller goes away. + // + // Note that `tokio::spawn` only cancels its tasks when the runtime shuts down, + // at which point we won't be calling `try_init_host` again anyways. + let rx = tokio::spawn(async move { + let host = this.try_init_host(database, replica_id).await?; + + let rx = host.module.subscribe(); + *guard = Some(host); + + Ok::<_, anyhow::Error>(rx) + }) + .await??; Ok(rx) } @@ -338,28 +363,52 @@ impl HostController { ); let mut guard = self.acquire_write_lock(replica_id).await; - let mut host = match guard.take() { - None => { - trace!("host not running, try_init"); - self.try_init_host(database, replica_id).await? - } - Some(host) => { - trace!("host found, updating"); - host - } - }; - let update_result = host - .update_module( - self.runtimes.clone(), - host_type, - program, - self.energy_monitor.clone(), - self.unregister_fn(replica_id), - self.db_cores.take(), - ) - .await?; - - *guard = Some(host); + + // `HostController::clone` is fast, + // as all of its fields are either `Copy` or wrapped in `Arc`. + let this = self.clone(); + + // `try_init_host` is not cancel safe, as it will spawn other async tasks + // which hold a filesystem lock past when `try_init_host` returns or is cancelled. + // This means that, if `try_init_host` is cancelled, subsequent calls will fail. + // + // The rest of this future is also not cancel safe, as it will `Option::take` out of the guard + // at the start of the block and then store back into it at the end. + // + // This is problematic because Axum will cancel its handler tasks if the client disconnects, + // and this method is called from Axum handlers, e.g. for the publish route. + // `tokio::spawn` a task to update the contents of `guard`, + // so that it will run to completion even if the caller goes away. + // + // Note that `tokio::spawn` only cancels its tasks when the runtime shuts down, + // at which point we won't be calling `try_init_host` again anyways. + let update_result = tokio::spawn(async move { + let mut host = match guard.take() { + None => { + trace!("host not running, try_init"); + this.try_init_host(database, replica_id).await? + } + Some(host) => { + trace!("host found, updating"); + host + } + }; + let update_result = host + .update_module( + this.runtimes.clone(), + host_type, + program, + this.energy_monitor.clone(), + this.unregister_fn(replica_id), + this.db_cores.take(), + ) + .await?; + + *guard = Some(host); + Ok::<_, anyhow::Error>(update_result) + }) + .await??; + Ok(update_result) } @@ -395,59 +444,83 @@ impl HostController { let program_hash = database.initial_program; let mut guard = self.acquire_write_lock(replica_id).await; - let mut host = match guard.take() { - Some(host) => host, - None => self.try_init_host(database, replica_id).await?, - }; - let module = host.module.subscribe(); - // The program is now either: + // `HostController::clone` is fast, + // as all of its fields are either `Copy` or wrapped in `Arc`. + let this = self.clone(); + + // `try_init_host` is not cancel safe, as it will spawn other async tasks + // which hold a filesystem lock past when `try_init_host` returns or is cancelled. + // This means that, if `try_init_host` is cancelled, subsequent calls will fail. // - // - the desired one from [Database], in which case we do nothing - // - `Some` expected hash, in which case we update to the desired one - // - `None` expected hash, in which case we also update - let stored_hash = stored_program_hash(host.db())? - .with_context(|| format!("[{}] database improperly initialized", db_addr))?; - if stored_hash == program_hash { - info!("[{}] database up-to-date with {}", db_addr, program_hash); - *guard = Some(host); - } else { - if let Some(expected_hash) = expected_hash { - ensure!( - expected_hash == stored_hash, - "[{}] expected program {} found {}", - db_addr, - expected_hash, - stored_hash - ); - } - info!( - "[{}] updating database from `{}` to `{}`", - db_addr, stored_hash, program_hash - ); - let program = load_program(&self.program_storage, program_hash).await?; - let update_result = host - .update_module( - self.runtimes.clone(), - host_type, - program, - self.energy_monitor.clone(), - self.unregister_fn(replica_id), - self.db_cores.take(), - ) - .await?; - match update_result { - UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => { - *guard = Some(host); - } - UpdateDatabaseResult::AutoMigrateError(e) => { - return Err(anyhow::anyhow!(e)); + // The rest of this future is also not cancel safe, as it will `Option::take` out of the guard + // at the start of the block and then store back into it at the end. + // + // This is problematic because Axum will cancel its handler tasks if the client disconnects, + // and this method is called from Axum handlers, e.g. for the publish route. + // `tokio::spawn` a task to update the contents of `guard`, + // so that it will run to completion even if the caller goes away. + // + // Note that `tokio::spawn` only cancels its tasks when the runtime shuts down, + // at which point we won't be calling `try_init_host` again anyways. + let module = tokio::spawn(async move { + let mut host = match guard.take() { + Some(host) => host, + None => this.try_init_host(database, replica_id).await?, + }; + let module = host.module.subscribe(); + + // The program is now either: + // + // - the desired one from [Database], in which case we do nothing + // - `Some` expected hash, in which case we update to the desired one + // - `None` expected hash, in which case we also update + let stored_hash = stored_program_hash(host.db())? + .with_context(|| format!("[{}] database improperly initialized", db_addr))?; + if stored_hash == program_hash { + info!("[{}] database up-to-date with {}", db_addr, program_hash); + *guard = Some(host); + } else { + if let Some(expected_hash) = expected_hash { + ensure!( + expected_hash == stored_hash, + "[{}] expected program {} found {}", + db_addr, + expected_hash, + stored_hash + ); } - UpdateDatabaseResult::ErrorExecutingMigration(e) => { - return Err(e); + info!( + "[{}] updating database from `{}` to `{}`", + db_addr, stored_hash, program_hash + ); + let program = load_program(&this.program_storage, program_hash).await?; + let update_result = host + .update_module( + this.runtimes.clone(), + host_type, + program, + this.energy_monitor.clone(), + this.unregister_fn(replica_id), + this.db_cores.take(), + ) + .await?; + match update_result { + UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => { + *guard = Some(host); + } + UpdateDatabaseResult::AutoMigrateError(e) => { + return Err(anyhow::anyhow!(e)); + } + UpdateDatabaseResult::ErrorExecutingMigration(e) => { + return Err(e); + } } } - } + + Ok::<_, anyhow::Error>(module) + }) + .await??; Ok(module) }