Skip to content

Commit 2d756db

Browse files
authored
Wrap try_init_host futures in tokio::spawn so they don't get cancelled (#2857)
1 parent f4f698e commit 2d756db

1 file changed

Lines changed: 146 additions & 73 deletions

File tree

crates/core/src/host/host_controller.rs

Lines changed: 146 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ pub type ProgramStorage = Arc<dyn ExternalStorage>;
7676

7777
/// A host controller manages the lifecycle of spacetime databases and their
7878
/// associated modules.
79+
///
80+
/// This type is, and must remain, cheap to clone.
81+
/// All of its fields should either be [`Copy`], enclosed in an [`Arc`],
82+
/// or have some other fast [`Clone`] implementation.
7983
#[derive(Clone)]
8084
pub struct HostController {
8185
/// Map of all hosts managed by this controller,
@@ -253,10 +257,31 @@ impl HostController {
253257
}
254258

255259
trace!("launch host {}/{}", database.database_identity, replica_id);
256-
let host = self.try_init_host(database, replica_id).await?;
257260

258-
let rx = host.module.subscribe();
259-
*guard = Some(host);
261+
// `HostController::clone` is fast,
262+
// as all of its fields are either `Copy` or wrapped in `Arc`.
263+
let this = self.clone();
264+
265+
// `try_init_host` is not cancel safe, as it will spawn other async tasks
266+
// which hold a filesystem lock past when `try_init_host` returns or is cancelled.
267+
// This means that, if `try_init_host` is cancelled, subsequent calls will fail.
268+
//
269+
// This is problematic because Axum will cancel its handler tasks if the client disconnects,
270+
// and this method is called from Axum handlers, e.g. for the subscribe route.
271+
// `tokio::spawn` a task to build the `Host` and install it in the `guard`,
272+
// so that it will run to completion even if the caller goes away.
273+
//
274+
// Note that `tokio::spawn` only cancels its tasks when the runtime shuts down,
275+
// at which point we won't be calling `try_init_host` again anyways.
276+
let rx = tokio::spawn(async move {
277+
let host = this.try_init_host(database, replica_id).await?;
278+
279+
let rx = host.module.subscribe();
280+
*guard = Some(host);
281+
282+
Ok::<_, anyhow::Error>(rx)
283+
})
284+
.await??;
260285

261286
Ok(rx)
262287
}
@@ -338,28 +363,52 @@ impl HostController {
338363
);
339364

340365
let mut guard = self.acquire_write_lock(replica_id).await;
341-
let mut host = match guard.take() {
342-
None => {
343-
trace!("host not running, try_init");
344-
self.try_init_host(database, replica_id).await?
345-
}
346-
Some(host) => {
347-
trace!("host found, updating");
348-
host
349-
}
350-
};
351-
let update_result = host
352-
.update_module(
353-
self.runtimes.clone(),
354-
host_type,
355-
program,
356-
self.energy_monitor.clone(),
357-
self.unregister_fn(replica_id),
358-
self.db_cores.take(),
359-
)
360-
.await?;
361-
362-
*guard = Some(host);
366+
367+
// `HostController::clone` is fast,
368+
// as all of its fields are either `Copy` or wrapped in `Arc`.
369+
let this = self.clone();
370+
371+
// `try_init_host` is not cancel safe, as it will spawn other async tasks
372+
// which hold a filesystem lock past when `try_init_host` returns or is cancelled.
373+
// This means that, if `try_init_host` is cancelled, subsequent calls will fail.
374+
//
375+
// The rest of this future is also not cancel safe, as it will `Option::take` out of the guard
376+
// at the start of the block and then store back into it at the end.
377+
//
378+
// This is problematic because Axum will cancel its handler tasks if the client disconnects,
379+
// and this method is called from Axum handlers, e.g. for the publish route.
380+
// `tokio::spawn` a task to update the contents of `guard`,
381+
// so that it will run to completion even if the caller goes away.
382+
//
383+
// Note that `tokio::spawn` only cancels its tasks when the runtime shuts down,
384+
// at which point we won't be calling `try_init_host` again anyways.
385+
let update_result = tokio::spawn(async move {
386+
let mut host = match guard.take() {
387+
None => {
388+
trace!("host not running, try_init");
389+
this.try_init_host(database, replica_id).await?
390+
}
391+
Some(host) => {
392+
trace!("host found, updating");
393+
host
394+
}
395+
};
396+
let update_result = host
397+
.update_module(
398+
this.runtimes.clone(),
399+
host_type,
400+
program,
401+
this.energy_monitor.clone(),
402+
this.unregister_fn(replica_id),
403+
this.db_cores.take(),
404+
)
405+
.await?;
406+
407+
*guard = Some(host);
408+
Ok::<_, anyhow::Error>(update_result)
409+
})
410+
.await??;
411+
363412
Ok(update_result)
364413
}
365414

@@ -395,59 +444,83 @@ impl HostController {
395444
let program_hash = database.initial_program;
396445

397446
let mut guard = self.acquire_write_lock(replica_id).await;
398-
let mut host = match guard.take() {
399-
Some(host) => host,
400-
None => self.try_init_host(database, replica_id).await?,
401-
};
402-
let module = host.module.subscribe();
403447

404-
// The program is now either:
448+
// `HostController::clone` is fast,
449+
// as all of its fields are either `Copy` or wrapped in `Arc`.
450+
let this = self.clone();
451+
452+
// `try_init_host` is not cancel safe, as it will spawn other async tasks
453+
// which hold a filesystem lock past when `try_init_host` returns or is cancelled.
454+
// This means that, if `try_init_host` is cancelled, subsequent calls will fail.
405455
//
406-
// - the desired one from [Database], in which case we do nothing
407-
// - `Some` expected hash, in which case we update to the desired one
408-
// - `None` expected hash, in which case we also update
409-
let stored_hash = stored_program_hash(host.db())?
410-
.with_context(|| format!("[{}] database improperly initialized", db_addr))?;
411-
if stored_hash == program_hash {
412-
info!("[{}] database up-to-date with {}", db_addr, program_hash);
413-
*guard = Some(host);
414-
} else {
415-
if let Some(expected_hash) = expected_hash {
416-
ensure!(
417-
expected_hash == stored_hash,
418-
"[{}] expected program {} found {}",
419-
db_addr,
420-
expected_hash,
421-
stored_hash
422-
);
423-
}
424-
info!(
425-
"[{}] updating database from `{}` to `{}`",
426-
db_addr, stored_hash, program_hash
427-
);
428-
let program = load_program(&self.program_storage, program_hash).await?;
429-
let update_result = host
430-
.update_module(
431-
self.runtimes.clone(),
432-
host_type,
433-
program,
434-
self.energy_monitor.clone(),
435-
self.unregister_fn(replica_id),
436-
self.db_cores.take(),
437-
)
438-
.await?;
439-
match update_result {
440-
UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {
441-
*guard = Some(host);
442-
}
443-
UpdateDatabaseResult::AutoMigrateError(e) => {
444-
return Err(anyhow::anyhow!(e));
456+
// The rest of this future is also not cancel safe, as it will `Option::take` out of the guard
457+
// at the start of the block and then store back into it at the end.
458+
//
459+
// This is problematic because Axum will cancel its handler tasks if the client disconnects,
460+
// and this method is called from Axum handlers, e.g. for the publish route.
461+
// `tokio::spawn` a task to update the contents of `guard`,
462+
// so that it will run to completion even if the caller goes away.
463+
//
464+
// Note that `tokio::spawn` only cancels its tasks when the runtime shuts down,
465+
// at which point we won't be calling `try_init_host` again anyways.
466+
let module = tokio::spawn(async move {
467+
let mut host = match guard.take() {
468+
Some(host) => host,
469+
None => this.try_init_host(database, replica_id).await?,
470+
};
471+
let module = host.module.subscribe();
472+
473+
// The program is now either:
474+
//
475+
// - the desired one from [Database], in which case we do nothing
476+
// - `Some` expected hash, in which case we update to the desired one
477+
// - `None` expected hash, in which case we also update
478+
let stored_hash = stored_program_hash(host.db())?
479+
.with_context(|| format!("[{}] database improperly initialized", db_addr))?;
480+
if stored_hash == program_hash {
481+
info!("[{}] database up-to-date with {}", db_addr, program_hash);
482+
*guard = Some(host);
483+
} else {
484+
if let Some(expected_hash) = expected_hash {
485+
ensure!(
486+
expected_hash == stored_hash,
487+
"[{}] expected program {} found {}",
488+
db_addr,
489+
expected_hash,
490+
stored_hash
491+
);
445492
}
446-
UpdateDatabaseResult::ErrorExecutingMigration(e) => {
447-
return Err(e);
493+
info!(
494+
"[{}] updating database from `{}` to `{}`",
495+
db_addr, stored_hash, program_hash
496+
);
497+
let program = load_program(&this.program_storage, program_hash).await?;
498+
let update_result = host
499+
.update_module(
500+
this.runtimes.clone(),
501+
host_type,
502+
program,
503+
this.energy_monitor.clone(),
504+
this.unregister_fn(replica_id),
505+
this.db_cores.take(),
506+
)
507+
.await?;
508+
match update_result {
509+
UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => {
510+
*guard = Some(host);
511+
}
512+
UpdateDatabaseResult::AutoMigrateError(e) => {
513+
return Err(anyhow::anyhow!(e));
514+
}
515+
UpdateDatabaseResult::ErrorExecutingMigration(e) => {
516+
return Err(e);
517+
}
448518
}
449519
}
450-
}
520+
521+
Ok::<_, anyhow::Error>(module)
522+
})
523+
.await??;
451524

452525
Ok(module)
453526
}

0 commit comments

Comments
 (0)