Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 146 additions & 73 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ pub type ProgramStorage = Arc<dyn ExternalStorage>;

/// A host controller manages the lifecycle of spacetime databases and their
/// associated modules.
///
/// This type is, and must remain, cheap to clone.
/// All of its fields should either be [`Copy`], enclosed in an [`Arc`],
/// or have some other fast [`Clone`] implementation.
#[derive(Clone)]
pub struct HostController {
/// Map of all hosts managed by this controller,
Expand Down Expand Up @@ -253,10 +257,31 @@ impl HostController {
}

trace!("launch host {}/{}", database.database_identity, replica_id);
let host = self.try_init_host(database, replica_id).await?;

let rx = host.module.subscribe();
*guard = Some(host);
// `HostController::clone` is fast,
// as all of its fields are either `Copy` or wrapped in `Arc`.
let this = self.clone();

// `try_init_host` is not cancel safe, as it will spawn other async tasks
// which hold a filesystem lock past when `try_init_host` returns or is cancelled.
Comment thread
gefjon marked this conversation as resolved.
// This means that, if `try_init_host` is cancelled, subsequent calls will fail.
//
// This is problematic because Axum will cancel its handler tasks if the client disconnects,
// and this method is called from Axum handlers, e.g. for the subscribe route.
// `tokio::spawn` a task to build the `Host` and install it in the `guard`,
// so that it will run to completion even if the caller goes away.
//
// Note that `tokio::spawn` only cancels its tasks when the runtime shuts down,
// at which point we won't be calling `try_init_host` again anyways.
let rx = tokio::spawn(async move {
let host = this.try_init_host(database, replica_id).await?;

let rx = host.module.subscribe();
*guard = Some(host);

Ok::<_, anyhow::Error>(rx)
})
.await??;

Ok(rx)
}
Expand Down Expand Up @@ -338,28 +363,52 @@ impl HostController {
);

let mut guard = self.acquire_write_lock(replica_id).await;
let mut host = match guard.take() {
None => {
trace!("host not running, try_init");
self.try_init_host(database, replica_id).await?
}
Some(host) => {
trace!("host found, updating");
host
}
};
let update_result = host
.update_module(
self.runtimes.clone(),
host_type,
program,
self.energy_monitor.clone(),
self.unregister_fn(replica_id),
self.db_cores.take(),
)
.await?;

*guard = Some(host);

// `HostController::clone` is fast,
// as all of its fields are either `Copy` or wrapped in `Arc`.
let this = self.clone();

// `try_init_host` is not cancel safe, as it will spawn other async tasks
// which hold a filesystem lock past when `try_init_host` returns or is cancelled.
// This means that, if `try_init_host` is cancelled, subsequent calls will fail.
//
// The rest of this future is also not cancel safe, as it will `Option::take` out of the guard
// at the start of the block and then store back into it at the end.
//
// This is problematic because Axum will cancel its handler tasks if the client disconnects,
// and this method is called from Axum handlers, e.g. for the publish route.
// `tokio::spawn` a task to update the contents of `guard`,
// so that it will run to completion even if the caller goes away.
//
// Note that `tokio::spawn` only cancels its tasks when the runtime shuts down,
// at which point we won't be calling `try_init_host` again anyways.
let update_result = tokio::spawn(async move {
let mut host = match guard.take() {
None => {
trace!("host not running, try_init");
this.try_init_host(database, replica_id).await?
}
Some(host) => {
trace!("host found, updating");
host
}
};
let update_result = host
.update_module(
this.runtimes.clone(),
host_type,
program,
this.energy_monitor.clone(),
this.unregister_fn(replica_id),
this.db_cores.take(),
)
.await?;

*guard = Some(host);
Ok::<_, anyhow::Error>(update_result)
})
.await??;

Ok(update_result)
}

Expand Down Expand Up @@ -395,59 +444,83 @@ impl HostController {
let program_hash = database.initial_program;

let mut guard = self.acquire_write_lock(replica_id).await;
let mut host = match guard.take() {
Some(host) => host,
None => self.try_init_host(database, replica_id).await?,
};
let module = host.module.subscribe();

// The program is now either:
// `HostController::clone` is fast,
// as all of its fields are either `Copy` or wrapped in `Arc`.
let this = self.clone();

// `try_init_host` is not cancel safe, as it will spawn other async tasks
// which hold a filesystem lock past when `try_init_host` returns or is cancelled.
// This means that, if `try_init_host` is cancelled, subsequent calls will fail.
//
// - the desired one from [Database], in which case we do nothing
// - `Some` expected hash, in which case we update to the desired one
// - `None` expected hash, in which case we also update
let stored_hash = stored_program_hash(host.db())?
.with_context(|| format!("[{}] database improperly initialized", db_addr))?;
if stored_hash == program_hash {
info!("[{}] database up-to-date with {}", db_addr, program_hash);
*guard = Some(host);
} else {
if let Some(expected_hash) = expected_hash {
ensure!(
expected_hash == stored_hash,
"[{}] expected program {} found {}",
db_addr,
expected_hash,
stored_hash
);
}
info!(
"[{}] updating database from `{}` to `{}`",
db_addr, stored_hash, program_hash
);
let program = load_program(&self.program_storage, program_hash).await?;
let update_result = host
.update_module(
self.runtimes.clone(),
host_type,
program,
self.energy_monitor.clone(),
self.unregister_fn(replica_id),
self.db_cores.take(),
)
.await?;
match update_result {
UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {
*guard = Some(host);
}
UpdateDatabaseResult::AutoMigrateError(e) => {
return Err(anyhow::anyhow!(e));
// The rest of this future is also not cancel safe, as it will `Option::take` out of the guard
// at the start of the block and then store back into it at the end.
//
// This is problematic because Axum will cancel its handler tasks if the client disconnects,
// and this method is called from Axum handlers, e.g. for the publish route.
// `tokio::spawn` a task to update the contents of `guard`,
// so that it will run to completion even if the caller goes away.
//
// Note that `tokio::spawn` only cancels its tasks when the runtime shuts down,
// at which point we won't be calling `try_init_host` again anyways.
let module = tokio::spawn(async move {
let mut host = match guard.take() {
Some(host) => host,
None => this.try_init_host(database, replica_id).await?,
};
let module = host.module.subscribe();

// The program is now either:
//
// - the desired one from [Database], in which case we do nothing
// - `Some` expected hash, in which case we update to the desired one
// - `None` expected hash, in which case we also update
let stored_hash = stored_program_hash(host.db())?
.with_context(|| format!("[{}] database improperly initialized", db_addr))?;
if stored_hash == program_hash {
info!("[{}] database up-to-date with {}", db_addr, program_hash);
*guard = Some(host);
} else {
if let Some(expected_hash) = expected_hash {
ensure!(
expected_hash == stored_hash,
"[{}] expected program {} found {}",
db_addr,
expected_hash,
stored_hash
);
}
UpdateDatabaseResult::ErrorExecutingMigration(e) => {
return Err(e);
info!(
"[{}] updating database from `{}` to `{}`",
db_addr, stored_hash, program_hash
);
let program = load_program(&this.program_storage, program_hash).await?;
let update_result = host
.update_module(
this.runtimes.clone(),
host_type,
program,
this.energy_monitor.clone(),
this.unregister_fn(replica_id),
this.db_cores.take(),
)
.await?;
match update_result {
UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {
*guard = Some(host);
}
UpdateDatabaseResult::AutoMigrateError(e) => {
return Err(anyhow::anyhow!(e));
}
UpdateDatabaseResult::ErrorExecutingMigration(e) => {
return Err(e);
}
}
}
}

Ok::<_, anyhow::Error>(module)
})
.await??;

Ok(module)
}
Expand Down
Loading