From 62d85d822fcc5fd8ba582afb048d39a763cdeeb8 Mon Sep 17 00:00:00 2001 From: Noa Date: Tue, 27 May 2025 23:51:10 -0500 Subject: [PATCH 01/25] Pin db threads to cores --- Cargo.lock | 13 ++ crates/core/Cargo.toml | 2 + crates/core/src/host/host_controller.rs | 30 +++- crates/core/src/host/module_host.rs | 35 ++++- crates/core/src/startup.rs | 150 +++++++++++++++---- crates/core/src/subscription/subscription.rs | 3 +- crates/standalone/src/lib.rs | 15 +- crates/standalone/src/main.rs | 18 ++- crates/standalone/src/subcommands/start.rs | 37 ++--- crates/testing/src/modules.rs | 7 +- 10 files changed, 234 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4f6ccf4d35a..98a33930d4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -977,6 +977,17 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core_affinity" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342" +dependencies = [ + "libc", + "num_cpus", + "winapi", +] + [[package]] name = "cpp_demangle" version = "0.4.4" @@ -5444,8 +5455,10 @@ dependencies = [ "bytes", "bytestring", "chrono", + "core_affinity", "criterion", "crossbeam-channel", + "crossbeam-queue", "derive_more", "dirs", "enum-as-inner", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 0c473b72f7f..ce081b9c5f8 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -48,6 +48,7 @@ bytes.workspace = true bytestring.workspace = true chrono.workspace = true crossbeam-channel.workspace = true +crossbeam-queue.workspace = true derive_more.workspace = true dirs.workspace = true enum-as-inner.workspace = true @@ -113,6 +114,7 @@ wasmtime.workspace = true jwks.workspace = true async_cache = "0.3.1" faststr = "0.2.23" +core_affinity = "0.8" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = {workspace = true} diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index bb2c47cfc75..befa7abd8c9 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -12,6 +12,7 @@ use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor}; use crate::messages::control_db::{Database, HostType}; use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; +use crate::startup::{DatabaseCore, DatabaseCores}; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::module_subscription_manager::SubscriptionManager; use crate::util::{asyncify, spawn_rayon}; @@ -95,6 +96,8 @@ pub struct HostController { pub page_pool: PagePool, /// The runtimes for running our modules. runtimes: Arc, + /// The CPU cores that are reserved for running databases on. + db_cores: DatabaseCores, } struct HostRuntimes { @@ -169,6 +172,7 @@ impl HostController { program_storage: ProgramStorage, energy_monitor: Arc, durability: Arc, + db_cores: DatabaseCores, ) -> Self { Self { hosts: <_>::default(), @@ -179,6 +183,7 @@ impl HostController { runtimes: HostRuntimes::new(Some(&data_dir)), data_dir, page_pool: PagePool::new(default_config.page_pool_max_size), + db_cores, } } @@ -267,7 +272,14 @@ impl HostController { /// This is not necessary during hotswap publishes, /// as the automigration planner and executor accomplish the same validity checks. pub async fn check_module_validity(&self, database: Database, program: Program) -> anyhow::Result> { - Host::try_init_in_memory_to_check(&self.runtimes, self.page_pool.clone(), database, program).await + Host::try_init_in_memory_to_check( + &self.runtimes, + self.page_pool.clone(), + database, + program, + self.db_cores.take(), + ) + .await } /// Run a computation on the [`RelationalDB`] of a [`ModuleHost`] managed by @@ -338,6 +350,7 @@ impl HostController { program, self.energy_monitor.clone(), self.unregister_fn(replica_id), + self.db_cores.take(), ) .await?; @@ -415,6 +428,7 @@ impl HostController { program, self.energy_monitor.clone(), self.unregister_fn(replica_id), + self.db_cores.take(), ) .await?; match update_result { @@ -556,6 +570,7 @@ async fn make_replica_ctx( /// Initialize a module host for the given program. /// The passed replica_ctx may not be configured for this version of the program's database schema yet. +#[allow(clippy::too_many_arguments)] async fn make_module_host( runtimes: Arc, host_type: HostType, @@ -564,6 +579,7 @@ async fn make_module_host( program: Program, energy_monitor: Arc, unregister: impl Fn() + Send + Sync + 'static, + core: DatabaseCore, ) -> anyhow::Result<(Program, ModuleHost)> { spawn_rayon(move || { let module_host = match host_type { @@ -577,7 +593,7 @@ async fn make_module_host( let start = Instant::now(); let actor = runtimes.wasmtime.make_actor(mcc)?; trace!("wasmtime::make_actor blocked for {:?}", start.elapsed()); - ModuleHost::new(actor, unregister) + ModuleHost::new(actor, unregister, core) } }; Ok((program, module_host)) @@ -610,6 +626,7 @@ async fn launch_module( energy_monitor: Arc, replica_dir: ReplicaDir, runtimes: Arc, + core: DatabaseCore, ) -> anyhow::Result<(Program, LaunchedModule)> { let db_identity = database.database_identity; let host_type = database.host_type; @@ -626,6 +643,7 @@ async fn launch_module( program, energy_monitor.clone(), on_panic, + core, ) .await?; @@ -776,6 +794,7 @@ impl Host { energy_monitor.clone(), replica_dir, runtimes.clone(), + host_controller.db_cores.take(), ) .await?; @@ -834,6 +853,7 @@ impl Host { page_pool: PagePool, database: Database, program: Program, + core: DatabaseCore, ) -> anyhow::Result> { // Even in-memory databases acquire a lockfile. // Grab a tempdir to put that lockfile in. @@ -865,6 +885,7 @@ impl Host { Arc::new(NullEnergyMonitor), phony_replica_dir, runtimes.clone(), + core, ) .await?; @@ -895,6 +916,7 @@ impl Host { program: Program, energy_monitor: Arc, on_panic: impl Fn() + Send + Sync + 'static, + core: DatabaseCore, ) -> anyhow::Result { let replica_ctx = &self.replica_ctx; let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone()); @@ -907,6 +929,7 @@ impl Host { program, energy_monitor, on_panic, + core, ) .await?; @@ -981,7 +1004,8 @@ pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> an let runtimes = HostRuntimes::new(None); let page_pool = PagePool::new(None); - let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program).await?; + let core = DatabaseCore::default(); + let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program, core).await?; let module_info = Arc::into_inner(module_info).unwrap(); Ok(module_info.module_def) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index d88c3b4b5a2..e177a0a826c 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -13,6 +13,7 @@ use crate::messages::control_db::Database; use crate::replica_context::ReplicaContext; use crate::sql::ast::SchemaViewer; use crate::sql::parser::RowLevelExpr; +use crate::startup::DatabaseCore; use crate::subscription::execute_plan; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::tx::DeltaTx; @@ -46,6 +47,7 @@ use spacetimedb_vm::relation::RelValue; use std::fmt; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; +use tokio::sync::{mpsc, oneshot}; #[derive(Debug, Default, Clone, From)] pub struct DatabaseUpdate { @@ -444,6 +446,7 @@ pub struct ModuleHost { inner: Arc, /// Called whenever a reducer call on this host panics. on_panic: Arc, + job_tx: mpsc::Sender>, } impl fmt::Debug for ModuleHost { @@ -536,6 +539,7 @@ pub struct WeakModuleHost { info: Arc, inner: Weak, on_panic: Weak, + tx: mpsc::WeakSender>, } #[derive(Debug)] @@ -596,7 +600,11 @@ pub enum ClientConnectedError { } impl ModuleHost { - pub fn new(mut module: impl Module, on_panic: impl Fn() + Send + Sync + 'static) -> Self { + pub(super) fn new( + mut module: impl Module, + on_panic: impl Fn() + Send + Sync + 'static, + core: DatabaseCore, + ) -> Self { let info = module.info(); let instance_pool = LendingPool::new(); instance_pool.add_multiple(module.initial_instances()).unwrap(); @@ -605,7 +613,18 @@ impl ModuleHost { instance_pool, }); let on_panic = Arc::new(on_panic); - ModuleHost { info, inner, on_panic } + let (tx, mut rx) = mpsc::channel::>(8); + core.spawn(move || { + while let Some(f) = rx.blocking_recv() { + f() + } + }); + ModuleHost { + info, + inner, + on_panic, + job_tx: tx, + } } #[inline] @@ -645,7 +664,14 @@ impl ModuleHost { log::warn!("reducer {reducer} panicked"); (self.on_panic)(); }); - let result = asyncify(move || f(&mut *inst)).await; + let (ret_tx, ret_rx) = oneshot::channel(); + self.job_tx + .send(Box::new(move || { + let _ = ret_tx.send(std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(&mut *inst)))); + })) + .await + .unwrap(); + let result = ret_rx.await.unwrap().unwrap_or_else(|e| std::panic::resume_unwind(e)); Ok(result) } @@ -1122,6 +1148,7 @@ impl ModuleHost { info: self.info.clone(), inner: Arc::downgrade(&self.inner), on_panic: Arc::downgrade(&self.on_panic), + tx: self.job_tx.downgrade(), } } @@ -1138,10 +1165,12 @@ impl WeakModuleHost { pub fn upgrade(&self) -> Option { let inner = self.inner.upgrade()?; let on_panic = self.on_panic.upgrade()?; + let tx = self.tx.upgrade()?; Some(ModuleHost { info: self.info.clone(), inner, on_panic, + job_tx: tx, }) } } diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index e7f2b240981..08657af5054 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -1,5 +1,8 @@ +use core_affinity::CoreId; +use crossbeam_queue::ArrayQueue; use spacetimedb_paths::server::{ConfigToml, LogsDir}; use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; use tracing_appender::rolling; use tracing_core::LevelFilter; @@ -12,23 +15,6 @@ use tracing_subscriber::{reload, EnvFilter}; use crate::config::{ConfigFile, LogConfig}; -pub struct StartupOptions { - /// Options for tracing to configure the global tracing subscriber. Tracing will be disabled - /// if `None`. - pub tracing: Option, - /// Whether or not to configure the global rayon threadpool. - pub rayon: bool, -} - -impl Default for StartupOptions { - fn default() -> Self { - Self { - tracing: Some(TracingOptions::default()), - rayon: true, - } - } -} - pub struct TracingOptions { pub config: LogConfig, /// Whether or not to periodically reload the log config in the background. @@ -54,19 +40,12 @@ impl Default for TracingOptions { } } } - -impl StartupOptions { - pub fn configure(self) { - if let Some(tracing_opts) = self.tracing { - configure_tracing(tracing_opts) - } - if self.rayon { - configure_rayon() - } - } +#[must_use] +pub fn pin_threads() -> Cores { + Cores::get().unwrap_or_default() } -fn configure_tracing(opts: TracingOptions) { +pub fn configure_tracing(opts: TracingOptions) { // Use this to change log levels at runtime. // This means you can change the default log level to trace // if you are trying to debug an issue and need more logs on then turn it off @@ -172,10 +151,115 @@ fn reload_config(conf_file: &ConfigToml, reload_handle: &reload::Handle Self { + Self { + databases: DatabaseCores::default(), + tokio_workers: TokioCores(None), + rest: std::thread::available_parallelism().map_or(4, |x| x.get()), + } + } +} + +impl Cores { + fn get() -> Option { + let mut cores = core_affinity::get_core_ids() + .filter(|cores| cores.len() >= 8)? + .into_iter(); + + let total = cores.len() as f64; + let mut take = |frac: f64| cores.by_ref().take((total * frac).ceil() as usize).collect::>(); + + let databases = DatabaseCores { + queue: Some(vec_to_queue(take(1.0 / 8.0))), + }; + + let tokio_workers = TokioCores(Some(take(5.0 / 8.0))); + + Some(Self { + databases, + tokio_workers, + rest: cores.len(), + }) + } +} + +type CoreQueue = Arc>; +fn vec_to_queue(cores: Vec) -> CoreQueue { + let queue = Arc::new(ArrayQueue::new(cores.len())); + for core in cores { + queue.push(core).unwrap(); + } + queue +} + +pub struct TokioCores(Option>); + +impl TokioCores { + pub fn configure(self, builder: &mut tokio::runtime::Builder) { + if let Some(cores) = self.0 { + let cores = vec_to_queue(cores); + // `on_thread_start` gets called for both async worker threads and blocking threads, + // but the first `worker_threads` threads that tokio spawns are worker threads, + // so this ends up working fine + builder.worker_threads(cores.len()).on_thread_start(move || { + if let Some(core) = cores.pop() { + core_affinity::set_for_current(core); + } + }); + } + } +} + +#[derive(Clone, Default)] +pub struct DatabaseCores { + queue: Option, +} + +impl DatabaseCores { + pub fn take(&self) -> DatabaseCore { + if let Some(queue) = &self.queue { + if let Some(core) = queue.pop() { + return DatabaseCore { + cores_id: Some((queue.clone(), core)), + }; + } + } + DatabaseCore { cores_id: None } + } +} + +#[derive(Default)] +pub struct DatabaseCore { + cores_id: Option<(CoreQueue, CoreId)>, +} + +impl DatabaseCore { + pub fn spawn(self, f: impl FnOnce() + Send + 'static) { + std::thread::spawn(move || { + if let Some((_, id)) = self.cores_id { + core_affinity::set_for_current(id); + } + scopeguard::defer!({ + if let Some((queue, id)) = self.cores_id { + queue.push(id).unwrap(); + } + }); + f(); + }); + } +} + +pub fn configure_rayon(num_threads: usize, tokio_handle: &tokio::runtime::Handle) { rayon_core::ThreadPoolBuilder::new() .thread_name(|_idx| "rayon-worker".to_string()) - .spawn_handler(thread_spawn_handler(tokio::runtime::Handle::current())) + .spawn_handler(thread_spawn_handler(tokio_handle)) // TODO(perf, pgoldman 2024-02-22): // in the case where we have many modules running many reducers, // we'll wind up with Rayon threads competing with each other and with Tokio threads @@ -187,7 +271,7 @@ fn configure_rayon() { // and Rayon threads to the other. // Then we should give Tokio and Rayon each a number of worker threads // equal to the size of their pool. - .num_threads(std::thread::available_parallelism().unwrap().get() / 2) + .num_threads(num_threads) .build_global() .unwrap() } @@ -205,7 +289,9 @@ fn configure_rayon() { /// i.e. that every async operation they invoke immediately completes. /// I (pgoldman 2024-02-22) believe that our Rayon threads only ever send to unbounded channels, /// and therefore never wait. -fn thread_spawn_handler(rt: tokio::runtime::Handle) -> impl FnMut(rayon::ThreadBuilder) -> Result<(), std::io::Error> { +fn thread_spawn_handler( + rt: &tokio::runtime::Handle, +) -> impl FnMut(rayon::ThreadBuilder) -> Result<(), std::io::Error> + '_ { move |thread| { let rt = rt.clone(); let mut builder = std::thread::Builder::new(); diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index 38a89c4cd07..c80f46189ce 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -32,7 +32,6 @@ use crate::sql::ast::SchemaViewer; use crate::vm::{build_query, TxMode}; use anyhow::Context; use itertools::Either; -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use spacetimedb_client_api_messages::websocket::{Compression, WebsocketFormat}; use spacetimedb_data_structures::map::HashSet; use spacetimedb_lib::db::auth::{StAccess, StTableType}; @@ -523,7 +522,7 @@ impl ExecutionSet { let tables = self .exec_units // if you need eval to run single-threaded for debugging, change this to .iter() - .par_iter() + .iter() .filter_map(|unit| unit.eval(db, tx, &unit.sql, slow_query_threshold, compression)) .collect(); ws::DatabaseUpdate { tables } diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index d19e1a4415f..991623b9fab 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -20,6 +20,7 @@ use spacetimedb::host::{ }; use spacetimedb::identity::Identity; use spacetimedb::messages::control_db::{Database, Node, Replica}; +use spacetimedb::startup::DatabaseCores; use spacetimedb::worker_metrics::WORKER_METRICS; use spacetimedb_client_api::auth::{self, LOCALHOST}; use spacetimedb_client_api::{Host, NodeDelegate}; @@ -46,6 +47,7 @@ impl StandaloneEnv { config: Config, certs: &CertificateAuthority, data_dir: Arc, + db_cores: DatabaseCores, ) -> anyhow::Result> { let _pid_file = data_dir.pid_file()?; let meta_path = data_dir.metadata_toml(); @@ -68,6 +70,7 @@ impl StandaloneEnv { program_store.clone(), energy_monitor, durability_provider, + db_cores, ); let client_actor_index = ClientActorIndex::new(); let jwt_keys = certs.get_or_create_keys()?; @@ -461,9 +464,9 @@ impl StandaloneEnv { } } -pub async fn exec_subcommand(cmd: &str, args: &ArgMatches) -> Result<(), anyhow::Error> { +pub async fn exec_subcommand(cmd: &str, args: &ArgMatches, db_cores: DatabaseCores) -> Result<(), anyhow::Error> { match cmd { - "start" => start::exec(args).await, + "start" => start::exec(args, db_cores).await, "extract-schema" => extract_schema::exec(args).await, unknown => Err(anyhow::anyhow!("Invalid subcommand: {}", unknown)), } @@ -479,7 +482,7 @@ pub async fn start_server(data_dir: &ServerDataDir, cert_dir: Option<&std::path: args.extend(["--jwt-key-dir".as_ref(), cert_dir.as_os_str()]) } let args = start::cli().try_get_matches_from(args)?; - start::exec(&args).await + start::exec(&args, DatabaseCores::default()).await } #[cfg(test)] @@ -516,9 +519,11 @@ mod tests { page_pool_max_size: None, }; - let _env = StandaloneEnv::init(config, &ca, data_dir.clone()).await?; + let _env = StandaloneEnv::init(config, &ca, data_dir.clone(), Default::default()).await?; // Ensure that we have a lock. - assert!(StandaloneEnv::init(config, &ca, data_dir.clone()).await.is_err()); + assert!(StandaloneEnv::init(config, &ca, data_dir.clone(), Default::default()) + .await + .is_err()); Ok(()) } diff --git a/crates/standalone/src/main.rs b/crates/standalone/src/main.rs index cb1868fde95..9879b57e9f0 100644 --- a/crates/standalone/src/main.rs +++ b/crates/standalone/src/main.rs @@ -1,15 +1,16 @@ use clap::Command; +use spacetimedb::startup; use tokio::runtime::Builder; use spacetimedb_standalone::*; use std::panic; use std::process; -async fn async_main() -> anyhow::Result<()> { +async fn async_main(db_cores: startup::DatabaseCores) -> anyhow::Result<()> { let matches = get_command().get_matches(); let (cmd, subcommand_args) = matches.subcommand().unwrap(); - exec_subcommand(cmd, subcommand_args).await?; + exec_subcommand(cmd, subcommand_args, db_cores).await?; Ok(()) } @@ -66,10 +67,13 @@ fn main() -> anyhow::Result<()> { process::exit(1); })); + let cores = startup::pin_threads(); + // Create a multi-threaded run loop - Builder::new_multi_thread() - .enable_all() - .build() - .unwrap() - .block_on(async_main()) + let mut builder = Builder::new_multi_thread(); + builder.enable_all(); + cores.tokio_workers.configure(&mut builder); + let rt = builder.build().unwrap(); + startup::configure_rayon(cores.rest, rt.handle()); + rt.block_on(async_main(cores.databases)) } diff --git a/crates/standalone/src/subcommands/start.rs b/crates/standalone/src/subcommands/start.rs index e7c2bb69c22..63075bc56bf 100644 --- a/crates/standalone/src/subcommands/start.rs +++ b/crates/standalone/src/subcommands/start.rs @@ -7,7 +7,7 @@ use clap::ArgAction::SetTrue; use clap::{Arg, ArgMatches}; use spacetimedb::config::{CertificateAuthority, ConfigFile}; use spacetimedb::db::{Config, Storage}; -use spacetimedb::startup::{self, TracingOptions}; +use spacetimedb::startup::{self, DatabaseCores, TracingOptions}; use spacetimedb::worker_metrics; use spacetimedb_client_api::routes::database::DatabaseRoutes; use spacetimedb_client_api::routes::router; @@ -75,7 +75,7 @@ pub fn cli() -> clap::Command { // .after_help("Run `spacetime help start` for more detailed information.") } -pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> { +pub async fn exec(args: &ArgMatches, db_cores: DatabaseCores) -> anyhow::Result<()> { let listen_addr = args.get_one::("listen_addr").unwrap(); let cert_dir = args.get_one::("jwt_key_dir"); let certs = Option::zip( @@ -122,24 +122,20 @@ pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> { } }; - startup::StartupOptions { - tracing: Some(TracingOptions { - config: config.logs, - reload_config: cfg!(debug_assertions).then_some(config_path), - disk_logging: std::env::var_os("SPACETIMEDB_DISABLE_DISK_LOGGING") - .is_none() - .then(|| data_dir.logs()), - edition: "standalone".to_owned(), - tracy: enable_tracy || std::env::var_os("SPACETIMEDB_TRACY").is_some(), - flamegraph: std::env::var_os("SPACETIMEDB_FLAMEGRAPH").map(|_| { - std::env::var_os("SPACETIMEDB_FLAMEGRAPH_PATH") - .unwrap_or("/var/log/flamegraph.folded".into()) - .into() - }), + startup::configure_tracing(TracingOptions { + config: config.logs, + reload_config: cfg!(debug_assertions).then_some(config_path), + disk_logging: std::env::var_os("SPACETIMEDB_DISABLE_DISK_LOGGING") + .is_none() + .then(|| data_dir.logs()), + edition: "standalone".to_owned(), + tracy: enable_tracy || std::env::var_os("SPACETIMEDB_TRACY").is_some(), + flamegraph: std::env::var_os("SPACETIMEDB_FLAMEGRAPH").map(|_| { + std::env::var_os("SPACETIMEDB_FLAMEGRAPH_PATH") + .unwrap_or("/var/log/flamegraph.folded".into()) + .into() }), - ..Default::default() - } - .configure(); + }); let certs = certs .or(config.certificate_authority) @@ -147,11 +143,10 @@ pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> { .context("cannot omit --jwt-{pub,priv}-key-path when those options are not specified in config.toml")?; let data_dir = Arc::new(data_dir.clone()); - let ctx = StandaloneEnv::init(db_config, &certs, data_dir).await?; + let ctx = StandaloneEnv::init(db_config, &certs, data_dir, db_cores).await?; worker_metrics::spawn_jemalloc_stats(listen_addr.clone()); worker_metrics::spawn_tokio_stats(listen_addr.clone()); worker_metrics::spawn_page_pool_stats(listen_addr.clone(), ctx.page_pool().clone()); - let mut db_routes = DatabaseRoutes::default(); db_routes.root_post = db_routes.root_post.layer(DefaultBodyLimit::disable()); db_routes.db_put = db_routes.db_put.layer(DefaultBodyLimit::disable()); diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index d8e2556902a..22c55b9af8a 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -178,9 +178,10 @@ impl CompiledModule { }; let certs = CertificateAuthority::in_cli_config_dir(&paths.cli_config_dir); - let env = spacetimedb_standalone::StandaloneEnv::init(config, &certs, paths.data_dir.into()) - .await - .unwrap(); + let env = + spacetimedb_standalone::StandaloneEnv::init(config, &certs, paths.data_dir.into(), Default::default()) + .await + .unwrap(); // TODO: Fix this when we update identity generation. let identity = Identity::ZERO; let db_identity = SpacetimeAuth::alloc(&env).await.unwrap().identity; From 3391f0d1d999885cef49b3748d01dcd299b026d2 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Wed, 28 May 2025 09:56:23 -0700 Subject: [PATCH 02/25] Move the module instance inside the database core. --- crates/core/src/host/module_host.rs | 55 +++++++++++++++++------------ 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index e177a0a826c..9db23c2987e 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -18,7 +18,7 @@ use crate::subscription::execute_plan; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::tx::DeltaTx; use crate::util::asyncify; -use crate::util::lending_pool::{LendingPool, LentResource, PoolClosed}; +use crate::util::lending_pool::{LendingPool, PoolClosed}; use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; use anyhow::Context; @@ -408,14 +408,14 @@ pub struct CallReducerParams { // TODO: figure out how we want to handle traps. maybe it should just not return to the LendingPool and // let the get_instance logic handle it? struct AutoReplacingModuleInstance { - inst: LentResource, + inst: T::Instance, module: Arc, } impl AutoReplacingModuleInstance { fn check_trap(&mut self) { if self.inst.trapped() { - *self.inst = self.module.create_instance() + self.inst = self.module.create_instance() } } } @@ -446,7 +446,8 @@ pub struct ModuleHost { inner: Arc, /// Called whenever a reducer call on this host panics. on_panic: Arc, - job_tx: mpsc::Sender>, + // job_tx: mpsc::Sender>, + job_tx: mpsc::Sender>, } impl fmt::Debug for ModuleHost { @@ -460,7 +461,7 @@ impl fmt::Debug for ModuleHost { #[async_trait::async_trait] trait DynModuleHost: Send + Sync + 'static { - async fn get_instance(&self, db: Identity) -> Result, NoSuchModule>; + // async fn get_instance(&self, db: Identity) -> Result, NoSuchModule>; fn replica_ctx(&self) -> &ReplicaContext; async fn exit(&self); async fn exited(&self); @@ -497,9 +498,11 @@ async fn select_first>(fut_a: A, fut_b: B) -> #[async_trait::async_trait] impl DynModuleHost for HostControllerActor { + /* async fn get_instance(&self, db: Identity) -> Result, NoSuchModule> { // in the future we should do something like in the else branch here -- add more instances based on load. // we need to do write-skew retries first - right now there's only ever once instance per module. + /* let inst = if true { self.instance_pool .request_with_context(db) @@ -514,12 +517,16 @@ impl DynModuleHost for HostControllerActor { .await .map_err(|_| NoSuchModule)? }; + + */ Ok(Box::new(AutoReplacingModuleInstance { - inst, + inst: self.module.create_instance(), module: self.module.clone(), })) } + */ + fn replica_ctx(&self) -> &ReplicaContext { self.module.replica_ctx() } @@ -539,7 +546,7 @@ pub struct WeakModuleHost { info: Arc, inner: Weak, on_panic: Weak, - tx: mpsc::WeakSender>, + tx: mpsc::WeakSender>, } #[derive(Debug)] @@ -601,22 +608,28 @@ pub enum ClientConnectedError { impl ModuleHost { pub(super) fn new( - mut module: impl Module, + module: impl Module, on_panic: impl Fn() + Send + Sync + 'static, core: DatabaseCore, ) -> Self { let info = module.info(); let instance_pool = LendingPool::new(); - instance_pool.add_multiple(module.initial_instances()).unwrap(); + let module = Arc::new(module); let inner = Arc::new(HostControllerActor { - module: Arc::new(module), + module: module.clone(), instance_pool, }); let on_panic = Arc::new(on_panic); - let (tx, mut rx) = mpsc::channel::>(8); + let (tx, mut rx) = mpsc::channel::>(8); + + let module_clone = module.clone(); core.spawn(move || { + let mut instance = AutoReplacingModuleInstance { + inst: module_clone.create_instance(), + module: module_clone, + }; while let Some(f) = rx.blocking_recv() { - f() + f(&mut instance); } }); ModuleHost { @@ -642,14 +655,11 @@ impl ModuleHost { F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static, R: Send + 'static, { - let mut inst = { - // Record the time spent waiting in the queue - let _guard = WORKER_METRICS - .reducer_wait_time - .with_label_values(&self.info.database_identity, reducer) - .start_timer(); - self.inner.get_instance(self.info.database_identity).await? - }; + // Record the time until our function starts running. + let queue_timer = WORKER_METRICS + .reducer_wait_time + .with_label_values(&self.info.database_identity, reducer) + .start_timer(); // Operations on module instances (e.g. calling reducers) is blocking, // partially because the computation can potentialyl take a long time @@ -666,8 +676,9 @@ impl ModuleHost { }); let (ret_tx, ret_rx) = oneshot::channel(); self.job_tx - .send(Box::new(move || { - let _ = ret_tx.send(std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(&mut *inst)))); + .send(Box::new(move |inst| { + queue_timer.stop_and_record(); + let _ = ret_tx.send(std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(inst)))); })) .await .unwrap(); From 42fe57b466ac77cc96652e928b9046a1eb0d52da Mon Sep 17 00:00:00 2001 From: Noa Date: Wed, 28 May 2025 11:01:13 -0500 Subject: [PATCH 03/25] Fix errors --- crates/core/src/host/host_controller.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index befa7abd8c9..3049d336c17 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -1006,9 +1006,13 @@ pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> an let page_pool = PagePool::new(None); let core = DatabaseCore::default(); let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program, core).await?; - let module_info = Arc::into_inner(module_info).unwrap(); + // this should always succeed, but sometimes it doesn't + let module_def = match Arc::try_unwrap(module_info) { + Ok(info) => info.module_def, + Err(info) => info.module_def.clone(), + }; - Ok(module_info.module_def) + Ok(module_def) } // Remove all gauges associated with a database. From 48f1a3db7c33eb8d6ec03ec736769f54388fa4e4 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Wed, 28 May 2025 10:11:32 -0700 Subject: [PATCH 04/25] Allow 50 messages at a time on the queue. --- crates/core/src/host/module_host.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 9db23c2987e..b96460ea7e8 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -620,7 +620,7 @@ impl ModuleHost { instance_pool, }); let on_panic = Arc::new(on_panic); - let (tx, mut rx) = mpsc::channel::>(8); + let (tx, mut rx) = mpsc::channel::>(50); let module_clone = module.clone(); core.spawn(move || { From 31f1fe8f45b5bd9260d7a98b02e718f5f4da5ef2 Mon Sep 17 00:00:00 2001 From: Noa Date: Wed, 28 May 2025 13:37:53 -0500 Subject: [PATCH 05/25] More robust job system --- crates/core/src/host/host_controller.rs | 20 +- crates/core/src/host/module_host.rs | 31 +--- crates/core/src/startup.rs | 54 +----- crates/core/src/util/jobs.rs | 205 +++++++++++++++++++++ crates/core/src/util/mod.rs | 1 + crates/standalone/src/lib.rs | 8 +- crates/standalone/src/main.rs | 3 +- crates/standalone/src/subcommands/start.rs | 5 +- 8 files changed, 239 insertions(+), 88 deletions(-) create mode 100644 crates/core/src/util/jobs.rs diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 3049d336c17..f35d30a0800 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -12,10 +12,10 @@ use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor}; use crate::messages::control_db::{Database, HostType}; use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; -use crate::startup::{DatabaseCore, DatabaseCores}; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::module_subscription_manager::SubscriptionManager; -use crate::util::{asyncify, spawn_rayon}; +use crate::util::asyncify; +use crate::util::jobs::{JobCore, JobCores}; use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, ensure, Context}; use async_trait::async_trait; @@ -97,7 +97,7 @@ pub struct HostController { /// The runtimes for running our modules. runtimes: Arc, /// The CPU cores that are reserved for running databases on. - db_cores: DatabaseCores, + db_cores: JobCores, } struct HostRuntimes { @@ -172,7 +172,7 @@ impl HostController { program_storage: ProgramStorage, energy_monitor: Arc, durability: Arc, - db_cores: DatabaseCores, + db_cores: JobCores, ) -> Self { Self { hosts: <_>::default(), @@ -579,9 +579,9 @@ async fn make_module_host( program: Program, energy_monitor: Arc, unregister: impl Fn() + Send + Sync + 'static, - core: DatabaseCore, + core: JobCore, ) -> anyhow::Result<(Program, ModuleHost)> { - spawn_rayon(move || { + asyncify(move || { let module_host = match host_type { HostType::Wasm => { let mcc = ModuleCreationContext { @@ -626,7 +626,7 @@ async fn launch_module( energy_monitor: Arc, replica_dir: ReplicaDir, runtimes: Arc, - core: DatabaseCore, + core: JobCore, ) -> anyhow::Result<(Program, LaunchedModule)> { let db_identity = database.database_identity; let host_type = database.host_type; @@ -853,7 +853,7 @@ impl Host { page_pool: PagePool, database: Database, program: Program, - core: DatabaseCore, + core: JobCore, ) -> anyhow::Result> { // Even in-memory databases acquire a lockfile. // Grab a tempdir to put that lockfile in. @@ -916,7 +916,7 @@ impl Host { program: Program, energy_monitor: Arc, on_panic: impl Fn() + Send + Sync + 'static, - core: DatabaseCore, + core: JobCore, ) -> anyhow::Result { let replica_ctx = &self.replica_ctx; let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone()); @@ -1004,7 +1004,7 @@ pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> an let runtimes = HostRuntimes::new(None); let page_pool = PagePool::new(None); - let core = DatabaseCore::default(); + let core = JobCore::default(); let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program, core).await?; // this should always succeed, but sometimes it doesn't let module_def = match Arc::try_unwrap(module_info) { diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index e177a0a826c..cb85693a67f 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -13,11 +13,11 @@ use crate::messages::control_db::Database; use crate::replica_context::ReplicaContext; use crate::sql::ast::SchemaViewer; use crate::sql::parser::RowLevelExpr; -use crate::startup::DatabaseCore; use crate::subscription::execute_plan; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::tx::DeltaTx; use crate::util::asyncify; +use crate::util::jobs::{JobCore, JobThread, WeakJobThread}; use crate::util::lending_pool::{LendingPool, LentResource, PoolClosed}; use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; @@ -47,7 +47,6 @@ use spacetimedb_vm::relation::RelValue; use std::fmt; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; -use tokio::sync::{mpsc, oneshot}; #[derive(Debug, Default, Clone, From)] pub struct DatabaseUpdate { @@ -446,7 +445,7 @@ pub struct ModuleHost { inner: Arc, /// Called whenever a reducer call on this host panics. on_panic: Arc, - job_tx: mpsc::Sender>, + job_tx: JobThread<()>, } impl fmt::Debug for ModuleHost { @@ -539,7 +538,7 @@ pub struct WeakModuleHost { info: Arc, inner: Weak, on_panic: Weak, - tx: mpsc::WeakSender>, + tx: WeakJobThread<()>, } #[derive(Debug)] @@ -600,11 +599,7 @@ pub enum ClientConnectedError { } impl ModuleHost { - pub(super) fn new( - mut module: impl Module, - on_panic: impl Fn() + Send + Sync + 'static, - core: DatabaseCore, - ) -> Self { + pub(super) fn new(mut module: impl Module, on_panic: impl Fn() + Send + Sync + 'static, core: JobCore) -> Self { let info = module.info(); let instance_pool = LendingPool::new(); instance_pool.add_multiple(module.initial_instances()).unwrap(); @@ -613,17 +608,12 @@ impl ModuleHost { instance_pool, }); let on_panic = Arc::new(on_panic); - let (tx, mut rx) = mpsc::channel::>(8); - core.spawn(move || { - while let Some(f) = rx.blocking_recv() { - f() - } - }); + let job_tx = core.start(|| (), |x| x); ModuleHost { info, inner, on_panic, - job_tx: tx, + job_tx, } } @@ -664,14 +654,7 @@ impl ModuleHost { log::warn!("reducer {reducer} panicked"); (self.on_panic)(); }); - let (ret_tx, ret_rx) = oneshot::channel(); - self.job_tx - .send(Box::new(move || { - let _ = ret_tx.send(std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(&mut *inst)))); - })) - .await - .unwrap(); - let result = ret_rx.await.unwrap().unwrap_or_else(|e| std::panic::resume_unwind(e)); + let result = self.job_tx.run(move |()| f(&mut *inst)).await; Ok(result) } diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index 08657af5054..27fb3ea7ae9 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -14,6 +14,7 @@ use tracing_subscriber::prelude::*; use tracing_subscriber::{reload, EnvFilter}; use crate::config::{ConfigFile, LogConfig}; +use crate::util::jobs::JobCores; pub struct TracingOptions { pub config: LogConfig, @@ -152,7 +153,7 @@ fn reload_config(conf_file: &ConfigToml, reload_handle: &reload::Handle Self { Self { - databases: DatabaseCores::default(), + databases: JobCores::default(), tokio_workers: TokioCores(None), rest: std::thread::available_parallelism().map_or(4, |x| x.get()), } @@ -169,18 +170,16 @@ impl Default for Cores { impl Cores { fn get() -> Option { - let mut cores = core_affinity::get_core_ids() + let cores = &mut core_affinity::get_core_ids() .filter(|cores| cores.len() >= 8)? .into_iter(); let total = cores.len() as f64; - let mut take = |frac: f64| cores.by_ref().take((total * frac).ceil() as usize).collect::>(); + let frac = |frac: f64| (total * frac).ceil() as usize; - let databases = DatabaseCores { - queue: Some(vec_to_queue(take(1.0 / 8.0))), - }; + let databases = cores.take(frac(1.0 / 8.0)).collect(); - let tokio_workers = TokioCores(Some(take(5.0 / 8.0))); + let tokio_workers = TokioCores(Some(cores.take(frac(5.0 / 8.0)).collect())); Some(Self { databases, @@ -217,45 +216,6 @@ impl TokioCores { } } -#[derive(Clone, Default)] -pub struct DatabaseCores { - queue: Option, -} - -impl DatabaseCores { - pub fn take(&self) -> DatabaseCore { - if let Some(queue) = &self.queue { - if let Some(core) = queue.pop() { - return DatabaseCore { - cores_id: Some((queue.clone(), core)), - }; - } - } - DatabaseCore { cores_id: None } - } -} - -#[derive(Default)] -pub struct DatabaseCore { - cores_id: Option<(CoreQueue, CoreId)>, -} - -impl DatabaseCore { - pub fn spawn(self, f: impl FnOnce() + Send + 'static) { - std::thread::spawn(move || { - if let Some((_, id)) = self.cores_id { - core_affinity::set_for_current(id); - } - scopeguard::defer!({ - if let Some((queue, id)) = self.cores_id { - queue.push(id).unwrap(); - } - }); - f(); - }); - } -} - pub fn configure_rayon(num_threads: usize, tokio_handle: &tokio::runtime::Handle) { rayon_core::ThreadPoolBuilder::new() .thread_name(|_idx| "rayon-worker".to_string()) diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs new file mode 100644 index 00000000000..f63b734de5e --- /dev/null +++ b/crates/core/src/util/jobs.rs @@ -0,0 +1,205 @@ +use std::cmp; +use std::sync::{Arc, Mutex, Weak}; + +use core_affinity::CoreId; +use indexmap::IndexMap; +use smallvec::SmallVec; +use spacetimedb_data_structures::map::HashMap; +use tokio::sync::{mpsc, oneshot, watch}; + +#[derive(Default, Clone)] +pub struct JobCores { + inner: Option>>, +} + +struct JobCoresInner { + job_threads: HashMap>, + cores: IndexMap, + next_id: usize, +} + +#[derive(Default)] +struct CoreInfo { + jobs: SmallVec<[JobThreadId; 4]>, +} +fn cores_cmp(_: &CoreId, v1: &CoreInfo, _: &CoreId, v2: &CoreInfo) -> cmp::Ordering { + Ord::cmp(&v1.jobs.len(), &v2.jobs.len()) +} + +#[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +struct JobThreadId(usize); + +impl JobCores { + pub fn take(&self) -> JobCore { + let inner = if let Some(inner) = &self.inner { + let cores = Arc::downgrade(inner); + let mut inner = inner.lock().unwrap(); + let id = JobThreadId(inner.next_id); + inner.next_id += 1; + let (&core_id, core) = inner.cores.first_mut().unwrap(); + core.jobs.push(id); + inner.cores.sort_by(cores_cmp); + + let (repin_tx, repin_rx) = watch::channel(core_id); + inner.job_threads.insert(id, repin_tx); + + Some(JobCoreInner { repin_rx, cores, id }) + } else { + None + }; + + JobCore { inner } + } +} + +impl FromIterator for JobCores { + fn from_iter>(iter: T) -> Self { + let cores: IndexMap<_, _> = iter.into_iter().map(|id| (id, CoreInfo::default())).collect(); + let inner = (!cores.is_empty()).then(|| { + Arc::new(Mutex::new(JobCoresInner { + job_threads: HashMap::default(), + cores, + next_id: 0, + })) + }); + Self { inner } + } +} + +impl JobCoresInner { + fn thread_exited(&mut self, id: JobThreadId) { + let core_id = *self.job_threads.remove(&id).unwrap().borrow(); + let core_index = self.cores.get_index_of(&core_id).unwrap(); + let last_index = self.cores.len() - 1; + let (core, last_core) = match self.cores.get_disjoint_indices_mut([core_index, last_index]) { + Ok([(_, core), (_, last)]) => (core, Some(last)), + Err(_) => (&mut self.cores[core_index], None), + }; + let pos = core.jobs.iter().position(|x| *x == id).unwrap(); + core.jobs.remove(pos); + if let Some(job) = last_core.and_then(|last| last.jobs.pop()) { + core.jobs[pos] = job; + let sender = self.job_threads.get_mut(&job).unwrap(); + sender.send_replace(core_id); + } else { + core.jobs.remove(pos); + } + self.cores.sort_by(cores_cmp); + } +} + +#[derive(Default)] +pub struct JobCore { + inner: Option, +} + +struct JobCoreInner { + repin_rx: watch::Receiver, + cores: Weak>, + id: JobThreadId, +} + +impl JobCore { + pub fn start(self, init: F, as_t: F2) -> JobThread + where + F: FnOnce() -> U + Send + 'static, + F2: FnOnce(&mut U) -> &mut T + Send + 'static, + U: 'static, + T: ?Sized + 'static, + { + let (tx, rx) = mpsc::channel::>>(8); + + let handle = tokio::runtime::Handle::current(); + std::thread::spawn(move || { + let mut data = init(); + let data = as_t(&mut data); + self.job_loop(&handle, rx, data); + }); + + JobThread { tx } + } + + fn job_loop(self, handle: &tokio::runtime::Handle, mut rx: mpsc::Receiver>>, data: &mut T) { + let (mut repin_rx, cores_id) = self + .inner + .map(|inner| (inner.repin_rx, (inner.cores, inner.id))) + .unzip(); + if let Some(repin_rx) = &mut repin_rx { + core_affinity::set_for_current(*repin_rx.borrow_and_update()); + } + scopeguard::defer!({ + if let Some((cores, id)) = cores_id { + if let Some(cores) = cores.upgrade() { + cores.lock().unwrap().thread_exited(id); + } + } + }); + handle.block_on(async { + loop { + let repin_fut = async { + let rx = repin_rx.as_mut()?; + rx.changed().await.ok()?; + Some(*rx.borrow_and_update()) + }; + tokio::select! { + Some(core_id) = repin_fut => { + core_affinity::set_for_current(core_id); + } + job = rx.recv() => { + let Some(job) = job else { break }; + tokio::task::block_in_place(|| job(data)) + } + } + } + }); + } +} + +pub struct JobThread { + tx: mpsc::Sender>>, +} + +impl Clone for JobThread { + fn clone(&self) -> Self { + Self { tx: self.tx.clone() } + } +} + +type Job = dyn FnOnce(&mut T) + Send; + +impl JobThread { + pub async fn run(&self, f: F) -> R + where + F: FnOnce(&mut T) -> R + Send + 'static, + R: Send + 'static, + { + let span = tracing::Span::current(); + let (ret_tx, ret_rx) = oneshot::channel(); + self.tx + .send(Box::new(move |data| { + let _entered = span.entered(); + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(data))); + if let Err(Err(_panic)) = ret_tx.send(result) { + tracing::warn!("uncaught panic on threadpool") + } + })) + .await + .expect("job thread terminated unexpectedly"); + ret_rx.await.unwrap().unwrap_or_else(|e| std::panic::resume_unwind(e)) + } + + pub fn downgrade(&self) -> WeakJobThread { + let tx = self.tx.downgrade(); + WeakJobThread { tx } + } +} + +pub struct WeakJobThread { + tx: mpsc::WeakSender>>, +} + +impl WeakJobThread { + pub fn upgrade(&self) -> Option> { + self.tx.upgrade().map(|tx| JobThread { tx }) + } +} diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs index 5830e4b299e..9599f55ee45 100644 --- a/crates/core/src/util/mod.rs +++ b/crates/core/src/util/mod.rs @@ -5,6 +5,7 @@ use tokio::sync::oneshot; pub mod prometheus_handle; +pub mod jobs; pub mod lending_pool; pub mod notify_once; pub mod slow; diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 991623b9fab..6021694c9a7 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -20,7 +20,7 @@ use spacetimedb::host::{ }; use spacetimedb::identity::Identity; use spacetimedb::messages::control_db::{Database, Node, Replica}; -use spacetimedb::startup::DatabaseCores; +use spacetimedb::util::jobs::JobCores; use spacetimedb::worker_metrics::WORKER_METRICS; use spacetimedb_client_api::auth::{self, LOCALHOST}; use spacetimedb_client_api::{Host, NodeDelegate}; @@ -47,7 +47,7 @@ impl StandaloneEnv { config: Config, certs: &CertificateAuthority, data_dir: Arc, - db_cores: DatabaseCores, + db_cores: JobCores, ) -> anyhow::Result> { let _pid_file = data_dir.pid_file()?; let meta_path = data_dir.metadata_toml(); @@ -464,7 +464,7 @@ impl StandaloneEnv { } } -pub async fn exec_subcommand(cmd: &str, args: &ArgMatches, db_cores: DatabaseCores) -> Result<(), anyhow::Error> { +pub async fn exec_subcommand(cmd: &str, args: &ArgMatches, db_cores: JobCores) -> Result<(), anyhow::Error> { match cmd { "start" => start::exec(args, db_cores).await, "extract-schema" => extract_schema::exec(args).await, @@ -482,7 +482,7 @@ pub async fn start_server(data_dir: &ServerDataDir, cert_dir: Option<&std::path: args.extend(["--jwt-key-dir".as_ref(), cert_dir.as_os_str()]) } let args = start::cli().try_get_matches_from(args)?; - start::exec(&args, DatabaseCores::default()).await + start::exec(&args, JobCores::default()).await } #[cfg(test)] diff --git a/crates/standalone/src/main.rs b/crates/standalone/src/main.rs index 9879b57e9f0..8e754608858 100644 --- a/crates/standalone/src/main.rs +++ b/crates/standalone/src/main.rs @@ -1,13 +1,14 @@ use clap::Command; use spacetimedb::startup; +use spacetimedb::util::jobs::JobCores; use tokio::runtime::Builder; use spacetimedb_standalone::*; use std::panic; use std::process; -async fn async_main(db_cores: startup::DatabaseCores) -> anyhow::Result<()> { +async fn async_main(db_cores: JobCores) -> anyhow::Result<()> { let matches = get_command().get_matches(); let (cmd, subcommand_args) = matches.subcommand().unwrap(); exec_subcommand(cmd, subcommand_args, db_cores).await?; diff --git a/crates/standalone/src/subcommands/start.rs b/crates/standalone/src/subcommands/start.rs index 63075bc56bf..54ae5a1cefa 100644 --- a/crates/standalone/src/subcommands/start.rs +++ b/crates/standalone/src/subcommands/start.rs @@ -7,7 +7,8 @@ use clap::ArgAction::SetTrue; use clap::{Arg, ArgMatches}; use spacetimedb::config::{CertificateAuthority, ConfigFile}; use spacetimedb::db::{Config, Storage}; -use spacetimedb::startup::{self, DatabaseCores, TracingOptions}; +use spacetimedb::startup::{self, TracingOptions}; +use spacetimedb::util::jobs::JobCores; use spacetimedb::worker_metrics; use spacetimedb_client_api::routes::database::DatabaseRoutes; use spacetimedb_client_api::routes::router; @@ -75,7 +76,7 @@ pub fn cli() -> clap::Command { // .after_help("Run `spacetime help start` for more detailed information.") } -pub async fn exec(args: &ArgMatches, db_cores: DatabaseCores) -> anyhow::Result<()> { +pub async fn exec(args: &ArgMatches, db_cores: JobCores) -> anyhow::Result<()> { let listen_addr = args.get_one::("listen_addr").unwrap(); let cert_dir = args.get_one::("jwt_key_dir"); let certs = Option::zip( From f46b4e97d6f18916fa122a9f1a4f1929dfacf0ed Mon Sep 17 00:00:00 2001 From: Noa Date: Wed, 28 May 2025 16:12:31 -0500 Subject: [PATCH 06/25] Try pinning rayon --- crates/core/src/startup.rs | 89 ++++++++++++++++++----------------- crates/standalone/src/main.rs | 4 +- 2 files changed, 49 insertions(+), 44 deletions(-) diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index 27fb3ea7ae9..dd462ba29c0 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -152,20 +152,11 @@ fn reload_config(conf_file: &ConfigToml, reload_handle: &reload::Handle Self { - Self { - databases: JobCores::default(), - tokio_workers: TokioCores(None), - rest: std::thread::available_parallelism().map_or(4, |x| x.get()), - } - } + pub tokio_workers: ThreadPoolCores, + pub rayon: ThreadPoolCores, } impl Cores { @@ -179,12 +170,14 @@ impl Cores { let databases = cores.take(frac(1.0 / 8.0)).collect(); - let tokio_workers = TokioCores(Some(cores.take(frac(5.0 / 8.0)).collect())); + let tokio_workers = ThreadPoolCores(Some(cores.take(frac(4.0 / 8.0)).collect())); + + let rayon = ThreadPoolCores(Some(cores.take(frac(1.0 / 8.0)).collect())); Some(Self { databases, tokio_workers, - rest: cores.len(), + rayon, }) } } @@ -198,42 +191,54 @@ fn vec_to_queue(cores: Vec) -> CoreQueue { queue } -pub struct TokioCores(Option>); +#[derive(Default)] +pub struct ThreadPoolCores(Option>); -impl TokioCores { - pub fn configure(self, builder: &mut tokio::runtime::Builder) { - if let Some(cores) = self.0 { +impl ThreadPoolCores { + fn into_setup_fn(self) -> Option<(usize, impl Fn())> { + self.0.map(|cores| { let cores = vec_to_queue(cores); - // `on_thread_start` gets called for both async worker threads and blocking threads, - // but the first `worker_threads` threads that tokio spawns are worker threads, - // so this ends up working fine - builder.worker_threads(cores.len()).on_thread_start(move || { + (cores.len(), move || { if let Some(core) = cores.pop() { core_affinity::set_for_current(core); } - }); + }) + }) + } + + pub fn configure_tokio(self, builder: &mut tokio::runtime::Builder) { + if let Some((num, setup)) = self.into_setup_fn() { + // `on_thread_start` gets called for both async worker threads and blocking threads, + // but the first `worker_threads` threads that tokio spawns are worker threads, + // so this ends up working fine + builder.worker_threads(num).on_thread_start(setup); } } -} -pub fn configure_rayon(num_threads: usize, tokio_handle: &tokio::runtime::Handle) { - rayon_core::ThreadPoolBuilder::new() - .thread_name(|_idx| "rayon-worker".to_string()) - .spawn_handler(thread_spawn_handler(tokio_handle)) - // TODO(perf, pgoldman 2024-02-22): - // in the case where we have many modules running many reducers, - // we'll wind up with Rayon threads competing with each other and with Tokio threads - // for CPU time. - // - // We should investigate creating two separate CPU pools, - // possibly via https://docs.rs/nix/latest/nix/sched/fn.sched_setaffinity.html, - // and restricting Tokio threads to one CPU pool - // and Rayon threads to the other. - // Then we should give Tokio and Rayon each a number of worker threads - // equal to the size of their pool. - .num_threads(num_threads) - .build_global() - .unwrap() + pub fn configure_rayon(self, tokio_handle: &tokio::runtime::Handle) { + rayon_core::ThreadPoolBuilder::new() + .thread_name(|_idx| "rayon-worker".to_string()) + .spawn_handler(thread_spawn_handler(tokio_handle)) + // TODO(perf, pgoldman 2024-02-22): + // in the case where we have many modules running many reducers, + // we'll wind up with Rayon threads competing with each other and with Tokio threads + // for CPU time. + // + // We should investigate creating two separate CPU pools, + // possibly via https://docs.rs/nix/latest/nix/sched/fn.sched_setaffinity.html, + // and restricting Tokio threads to one CPU pool + // and Rayon threads to the other. + // Then we should give Tokio and Rayon each a number of worker threads + // equal to the size of their pool. + .num_threads(self.0.as_ref().map_or(0, |cores| cores.len())) + .start_handler(move |i| { + if let Some(cores) = &self.0 { + core_affinity::set_for_current(cores[i]); + } + }) + .build_global() + .unwrap() + } } /// A Rayon [spawn_handler](https://docs.rs/rustc-rayon-core/latest/rayon_core/struct.ThreadPoolBuilder.html#method.spawn_handler) diff --git a/crates/standalone/src/main.rs b/crates/standalone/src/main.rs index 8e754608858..15fafd60858 100644 --- a/crates/standalone/src/main.rs +++ b/crates/standalone/src/main.rs @@ -73,8 +73,8 @@ fn main() -> anyhow::Result<()> { // Create a multi-threaded run loop let mut builder = Builder::new_multi_thread(); builder.enable_all(); - cores.tokio_workers.configure(&mut builder); + cores.tokio_workers.configure_tokio(&mut builder); let rt = builder.build().unwrap(); - startup::configure_rayon(cores.rest, rt.handle()); + cores.rayon.configure_rayon(rt.handle()); rt.block_on(async_main(cores.databases)) } From ea9901a9b6f82d8240af404b8b71e6b4fcd8e618 Mon Sep 17 00:00:00 2001 From: Noa Date: Wed, 28 May 2025 16:40:39 -0500 Subject: [PATCH 07/25] fixup! More robust job system --- crates/core/src/util/jobs.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 18265bd15fb..1f9a5708f49 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -76,7 +76,6 @@ impl JobCoresInner { Err(_) => (&mut self.cores[core_index], None), }; let pos = core.jobs.iter().position(|x| *x == id).unwrap(); - core.jobs.remove(pos); if let Some(job) = last_core.and_then(|last| last.jobs.pop()) { core.jobs[pos] = job; let sender = self.job_threads.get_mut(&job).unwrap(); From 31e7cc621cacdcac1557d53f8c14e6c47c43a452 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 28 May 2025 15:55:00 -0700 Subject: [PATCH 08/25] fix lints --- crates/core/src/host/module_host.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index c99d5db0244..8aae5cee69e 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -471,6 +471,7 @@ struct HostControllerActor { } impl HostControllerActor { + #[allow(unused)] fn spinup_new_instance(&self) { let (module, instance_pool) = (self.module.clone(), self.instance_pool.clone()); rayon::spawn(move || { @@ -487,6 +488,7 @@ impl HostControllerActor { /// runs future A and future B concurrently. if A completes before B, B is cancelled. if B completes /// before A, A is polled to completion +#[allow(unused)] async fn select_first>(fut_a: A, fut_b: B) -> A::Output { tokio::select! { ret = fut_a => ret, From 511a6a1b9e1d267c8bf07c442c71ccf0fabaca07 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Wed, 28 May 2025 12:28:01 -0700 Subject: [PATCH 09/25] Bring back queue length stats. --- crates/core/src/host/module_host.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 8aae5cee69e..585e97d0178 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -653,6 +653,17 @@ impl ModuleHost { .reducer_wait_time .with_label_values(&self.info.database_identity, reducer) .start_timer(); + let queue_length_gauge = WORKER_METRICS + .instance_queue_length + .with_label_values(&self.info.database_identity); + queue_length_gauge.inc(); + { + let queue_length = queue_length_gauge.get(); + WORKER_METRICS + .instance_queue_length_histogram + .with_label_values(&self.info.database_identity) + .observe(queue_length as f64); + } // Operations on module instances (e.g. calling reducers) is blocking, // partially because the computation can potentialyl take a long time @@ -671,6 +682,7 @@ impl ModuleHost { .job_tx .run(move |inst| { queue_timer.stop_and_record(); + queue_length_gauge.dec(); f(inst) }) .await; From 72e4e564b8488844fe009ea68b2638e63573895c Mon Sep 17 00:00:00 2001 From: Noa Date: Mon, 2 Jun 2025 15:57:56 -0400 Subject: [PATCH 10/25] Use sched_setaffinity on tokio blocking threads --- Cargo.lock | 15 +++++++- Cargo.toml | 1 + crates/core/Cargo.toml | 3 ++ crates/core/src/startup.rs | 64 +++++++++++++++++++++++------------ crates/standalone/src/main.rs | 4 +-- 5 files changed, 63 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 98a33930d4d..012e9f5841c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3289,6 +3289,18 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" +dependencies = [ + "bitflags 2.9.0", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nohash-hasher" version = "0.2.0" @@ -4734,7 +4746,7 @@ dependencies = [ "libc", "log", "memchr", - "nix", + "nix 0.26.4", "radix_trie", "scopeguard", "unicode-segmentation", @@ -5478,6 +5490,7 @@ dependencies = [ "lazy_static", "log", "memchr", + "nix 0.30.1", "once_cell", "openssl", "parking_lot 0.12.3", diff --git a/Cargo.toml b/Cargo.toml index 2372cbc497d..1b28274897e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -200,6 +200,7 @@ log = "0.4.17" memchr = "2" mimalloc = "0.1.39" nohash-hasher = "0.2" +nix = "0.30" once_cell = "1.16" parking_lot = { version = "0.12.1", features = ["send_guard", "arc_lock"] } parse-size = "1.1.0" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index ce081b9c5f8..5599fb11615 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -120,6 +120,9 @@ core_affinity = "0.8" tikv-jemallocator = {workspace = true} tikv-jemalloc-ctl = {workspace = true} +[target.'cfg(target_os = "linux")'.dependencies] +nix = { workspace = true, features = ["sched"] } + [features] # Print a warning when doing an unindexed `iter_by_col_range` on a large table. unindexed_iter_by_col_range_warn = [] diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index dd462ba29c0..2b639453585 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -155,8 +155,8 @@ fn reload_config(conf_file: &ConfigToml, reload_handle: &reload::Handle) -> CoreQueue { } #[derive(Default)] -pub struct ThreadPoolCores(Option>); +pub struct TokioCores { + workers: Option>, + #[cfg(target_os = "linux")] + blocking: Option, +} -impl ThreadPoolCores { - fn into_setup_fn(self) -> Option<(usize, impl Fn())> { - self.0.map(|cores| { +impl TokioCores { + pub fn configure(self, builder: &mut tokio::runtime::Builder) { + if let Some(cores) = self.workers { + let num = cores.len(); let cores = vec_to_queue(cores); - (cores.len(), move || { - if let Some(core) = cores.pop() { - core_affinity::set_for_current(core); - } - }) - }) - } - - pub fn configure_tokio(self, builder: &mut tokio::runtime::Builder) { - if let Some((num, setup)) = self.into_setup_fn() { // `on_thread_start` gets called for both async worker threads and blocking threads, // but the first `worker_threads` threads that tokio spawns are worker threads, // so this ends up working fine - builder.worker_threads(num).on_thread_start(setup); + builder.worker_threads(num).on_thread_start(move || { + if let Some(core) = cores.pop() { + core_affinity::set_for_current(core); + } else { + #[cfg(target_os = "linux")] + if let Some(cpuset) = &self.blocking { + let this = nix::unistd::Pid::from_raw(0); + let _ = nix::sched::sched_setaffinity(this, cpuset); + } + } + }); } } +} + +#[derive(Default)] +pub struct RayonCores(Option>); - pub fn configure_rayon(self, tokio_handle: &tokio::runtime::Handle) { +impl RayonCores { + pub fn configure(self, tokio_handle: &tokio::runtime::Handle) { rayon_core::ThreadPoolBuilder::new() .thread_name(|_idx| "rayon-worker".to_string()) .spawn_handler(thread_spawn_handler(tokio_handle)) diff --git a/crates/standalone/src/main.rs b/crates/standalone/src/main.rs index 15fafd60858..81aee00bfa3 100644 --- a/crates/standalone/src/main.rs +++ b/crates/standalone/src/main.rs @@ -73,8 +73,8 @@ fn main() -> anyhow::Result<()> { // Create a multi-threaded run loop let mut builder = Builder::new_multi_thread(); builder.enable_all(); - cores.tokio_workers.configure_tokio(&mut builder); + cores.tokio.configure(&mut builder); let rt = builder.build().unwrap(); - cores.rayon.configure_rayon(rt.handle()); + cores.rayon.configure(rt.handle()); rt.block_on(async_main(cores.databases)) } From d3a0323bff171a1cb2c6cb32e9ad45c96e94d014 Mon Sep 17 00:00:00 2001 From: Noa Date: Tue, 3 Jun 2025 12:44:19 -0500 Subject: [PATCH 11/25] Remove unnecessary utility function --- crates/core/src/startup.rs | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index 2b639453585..2416f70b701 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -2,7 +2,6 @@ use core_affinity::CoreId; use crossbeam_queue::ArrayQueue; use spacetimedb_paths::server::{ConfigToml, LogsDir}; use std::path::PathBuf; -use std::sync::Arc; use std::time::Duration; use tracing_appender::rolling; use tracing_core::LevelFilter; @@ -194,15 +193,6 @@ impl Cores { } } -type CoreQueue = Arc>; -fn vec_to_queue(cores: Vec) -> CoreQueue { - let queue = Arc::new(ArrayQueue::new(cores.len())); - for core in cores { - queue.push(core).unwrap(); - } - queue -} - #[derive(Default)] pub struct TokioCores { workers: Option>, @@ -213,13 +203,18 @@ pub struct TokioCores { impl TokioCores { pub fn configure(self, builder: &mut tokio::runtime::Builder) { if let Some(cores) = self.workers { - let num = cores.len(); - let cores = vec_to_queue(cores); + builder.worker_threads(cores.len()); + + let cores_queue = Box::new(ArrayQueue::new(cores.len())); + for core in cores { + cores_queue.push(core).unwrap(); + } + // `on_thread_start` gets called for both async worker threads and blocking threads, // but the first `worker_threads` threads that tokio spawns are worker threads, // so this ends up working fine - builder.worker_threads(num).on_thread_start(move || { - if let Some(core) = cores.pop() { + builder.on_thread_start(move || { + if let Some(core) = cores_queue.pop() { core_affinity::set_for_current(core); } else { #[cfg(target_os = "linux")] From 54c9fb82e9e39caf47291cc26e4b5502bce7dd85 Mon Sep 17 00:00:00 2001 From: Noa Date: Tue, 3 Jun 2025 12:51:43 -0500 Subject: [PATCH 12/25] Remove outdated comment --- crates/core/src/startup.rs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index 2416f70b701..103df5ca567 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -236,17 +236,6 @@ impl RayonCores { rayon_core::ThreadPoolBuilder::new() .thread_name(|_idx| "rayon-worker".to_string()) .spawn_handler(thread_spawn_handler(tokio_handle)) - // TODO(perf, pgoldman 2024-02-22): - // in the case where we have many modules running many reducers, - // we'll wind up with Rayon threads competing with each other and with Tokio threads - // for CPU time. - // - // We should investigate creating two separate CPU pools, - // possibly via https://docs.rs/nix/latest/nix/sched/fn.sched_setaffinity.html, - // and restricting Tokio threads to one CPU pool - // and Rayon threads to the other. - // Then we should give Tokio and Rayon each a number of worker threads - // equal to the size of their pool. .num_threads(self.0.as_ref().map_or(0, |cores| cores.len())) .start_handler(move |i| { if let Some(cores) = &self.0 { From 655b48f40d67a2295d884a64b66741b7204ed7d2 Mon Sep 17 00:00:00 2001 From: Noa Date: Tue, 3 Jun 2025 14:07:11 -0500 Subject: [PATCH 13/25] Clarify and and add docs for core::util::jobs --- crates/core/src/util/jobs.rs | 147 ++++++++++++++++++++++++----------- 1 file changed, 101 insertions(+), 46 deletions(-) diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 1f9a5708f49..d9278049ec3 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -7,15 +7,30 @@ use smallvec::SmallVec; use spacetimedb_data_structures::map::HashMap; use tokio::sync::{mpsc, oneshot, watch}; +/// A pool of CPU cores for running jobs on. +/// +/// Each thread is represented by a [`JobThread`], which is pinned to a single +/// core and sequentially runs the jobs that are passed to [`JobThread::run`]. +/// This pool attempts to keep the number of `JobThread`s pinned to each core +/// as equitable as possible; new threads allocated by [`Self::take()`] are +/// assigned to cores in a round-robin fashion, and when a thread exits, it +/// takes a thread pinned to a busier core and repins it to the core it was +/// just running on. +/// +/// Construction is done via the `FromIterator` impl. If created from an empty +/// iterator or via `JobCores::default()`, the job threads will work but not be +/// pinned to any threads. #[derive(Default, Clone)] pub struct JobCores { inner: Option>>, } struct JobCoresInner { + /// A map to the repin_tx for each job thread job_threads: HashMap>, + /// Kept sorted by `CoreInfo.jobs.len()`, in ascending order cores: IndexMap, - next_id: usize, + next_id: JobThreadId, } #[derive(Default)] @@ -30,12 +45,15 @@ fn cores_cmp(_: &CoreId, v1: &CoreInfo, _: &CoreId, v2: &CoreInfo) -> cmp::Order struct JobThreadId(usize); impl JobCores { + /// Reserve a core from the pool to later start a job thread on. pub fn take(&self) -> JobCore { let inner = if let Some(inner) = &self.inner { let cores = Arc::downgrade(inner); let mut inner = inner.lock().unwrap(); - let id = JobThreadId(inner.next_id); - inner.next_id += 1; + + let id = inner.next_id; + inner.next_id.0 += 1; + let (&core_id, core) = inner.cores.first_mut().unwrap(); core.jobs.push(id); inner.cores.sort_by(cores_cmp); @@ -43,7 +61,10 @@ impl JobCores { let (repin_tx, repin_rx) = watch::channel(core_id); inner.job_threads.insert(id, repin_tx); - Some(JobCoreInner { repin_rx, cores, id }) + Some(JobCoreInner { + repin_rx, + _guard: JobCoreGuard { cores, id }, + }) } else { None }; @@ -59,7 +80,7 @@ impl FromIterator for JobCores { Arc::new(Mutex::new(JobCoresInner { job_threads: HashMap::default(), cores, - next_id: 0, + next_id: JobThreadId(0), })) }); Self { inner } @@ -67,26 +88,35 @@ impl FromIterator for JobCores { } impl JobCoresInner { - fn thread_exited(&mut self, id: JobThreadId) { + /// Run when a `JobThread` exits. + fn on_thread_exit(&mut self, id: JobThreadId) { let core_id = *self.job_threads.remove(&id).unwrap().borrow(); + let core_index = self.cores.get_index_of(&core_id).unwrap(); let last_index = self.cores.len() - 1; + + // `last_core` will be Some if `core_index` is not `last_index` + // FIXME(noa): this will fail to level sometimes; we should keep a partition point and + // manually move cores before it when they're low and above it when they're high. let (core, last_core) = match self.cores.get_disjoint_indices_mut([core_index, last_index]) { Ok([(_, core), (_, last)]) => (core, Some(last)), Err(_) => (&mut self.cores[core_index], None), }; - let pos = core.jobs.iter().position(|x| *x == id).unwrap(); + + let job_pos = core.jobs.iter().position(|x| *x == id).unwrap(); + if let Some(job) = last_core.and_then(|last| last.jobs.pop()) { - core.jobs[pos] = job; + core.jobs[job_pos] = job; let sender = self.job_threads.get_mut(&job).unwrap(); sender.send_replace(core_id); } else { - core.jobs.remove(pos); + core.jobs.remove(job_pos); } self.cores.sort_by(cores_cmp); } } +/// A core taken from [`JobCores`], not yet running a job loop. #[derive(Default)] pub struct JobCore { inner: Option, @@ -94,66 +124,82 @@ pub struct JobCore { struct JobCoreInner { repin_rx: watch::Receiver, - cores: Weak>, - id: JobThreadId, + _guard: JobCoreGuard, } impl JobCore { - pub fn start(self, init: F, as_t: F2) -> JobThread + /// Start running a job thread on this core. + /// + /// `init` constructs the data provided to each job, and `unsize` unsizes + /// it to `&mut T`, if necessary. + pub fn start(self, init: F, unsize: F2) -> JobThread where F: FnOnce() -> U + Send + 'static, F2: FnOnce(&mut U) -> &mut T + Send + 'static, U: 'static, T: ?Sized + 'static, { - let (tx, rx) = mpsc::channel::>>(50); + let (tx, rx) = mpsc::channel::>>(Self::JOB_CHANNEL_LENGTH); let handle = tokio::runtime::Handle::current(); std::thread::spawn(move || { let mut data = init(); - let data = as_t(&mut data); - self.job_loop(&handle, rx, data); + let data = unsize(&mut data); + handle.block_on(self.job_loop(rx, data)) }); JobThread { tx } } - fn job_loop(self, handle: &tokio::runtime::Handle, mut rx: mpsc::Receiver>>, data: &mut T) { - let (mut repin_rx, cores_id) = self - .inner - .map(|inner| (inner.repin_rx, (inner.cores, inner.id))) - .unzip(); - if let Some(repin_rx) = &mut repin_rx { - core_affinity::set_for_current(*repin_rx.borrow_and_update()); - } - scopeguard::defer!({ - if let Some((cores, id)) = cores_id { - if let Some(cores) = cores.upgrade() { - cores.lock().unwrap().thread_exited(id); + // this shouldn't matter too much, since callers will need to wait for + // the job to finish anyway. + const JOB_CHANNEL_LENGTH: usize = 50; + + async fn job_loop(mut self, mut rx: mpsc::Receiver>>, data: &mut T) { + // this function is async because we need to recv on the repin channel + // and the jobs channel, but the jobs being run are blocking + + let repin_rx = self.inner.as_mut().map(|inner| &mut inner.repin_rx); + let repin_loop = async { + if let Some(rx) = repin_rx { + rx.mark_changed(); + while rx.changed().await.is_ok() { + core_affinity::set_for_current(*rx.borrow_and_update()); } } - }); - handle.block_on(async { - loop { - let repin_fut = async { - let rx = repin_rx.as_mut()?; - rx.changed().await.ok()?; - Some(*rx.borrow_and_update()) - }; - tokio::select! { - Some(core_id) = repin_fut => { - core_affinity::set_for_current(core_id); - } - job = rx.recv() => { - let Some(job) = job else { break }; - tokio::task::block_in_place(|| job(data)) - } - } + }; + + let job_loop = async { + while let Some(job) = rx.recv().await { + // blocking in place means that other futures on the same task + // won't get polled - in this case, that's just the repin loop, + // which is fine because it can just run before the next job. + tokio::task::block_in_place(|| job(data)) } - }); + }; + + super::also_poll(job_loop, repin_loop).await } } +/// On drop, tells the `JobCores` that this core has been freed up. +struct JobCoreGuard { + cores: Weak>, + id: JobThreadId, +} + +impl Drop for JobCoreGuard { + fn drop(&mut self) { + if let Some(cores) = self.cores.upgrade() { + cores.lock().unwrap().on_thread_exit(self.id); + } + } +} + +/// A handle to a thread running a job loop; see [`JobCores`] for more details. +/// +/// This handle is cheaply cloneäble, and the thread will shut down when all +/// handles to it have been dropped. pub struct JobThread { tx: mpsc::Sender>>, } @@ -167,13 +213,19 @@ impl Clone for JobThread { type Job = dyn FnOnce(&mut T) + Send; impl JobThread { + /// Run a blocking job on this thread. + /// + /// The job (`f`) will be placed in a queue, and will run strictly after + /// jobs ahead of it in the queue. If `f` panics, it will be bubbled up to + /// the calling task. pub async fn run(&self, f: F) -> R where F: FnOnce(&mut T) -> R + Send + 'static, R: Send + 'static, { - let span = tracing::Span::current(); let (ret_tx, ret_rx) = oneshot::channel(); + + let span = tracing::Span::current(); self.tx .send(Box::new(move |data| { let _entered = span.entered(); @@ -184,15 +236,18 @@ impl JobThread { })) .await .expect("job thread terminated unexpectedly"); + ret_rx.await.unwrap().unwrap_or_else(|e| std::panic::resume_unwind(e)) } + /// Obtain a weak version of this handle. pub fn downgrade(&self) -> WeakJobThread { let tx = self.tx.downgrade(); WeakJobThread { tx } } } +/// A weak version of `JobThread` that does not hold the thread open. pub struct WeakJobThread { tx: mpsc::WeakSender>>, } From dcf94a95e85934de180656bc1161002bf78e9b5c Mon Sep 17 00:00:00 2001 From: Noa Date: Tue, 3 Jun 2025 15:55:01 -0500 Subject: [PATCH 14/25] Use a slightly better data structure for JobCores --- crates/core/src/util/jobs.rs | 80 ++++++++++++++++++++---------------- 1 file changed, 45 insertions(+), 35 deletions(-) diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index d9278049ec3..6dfc1add2af 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -1,4 +1,3 @@ -use std::cmp; use std::sync::{Arc, Mutex, Weak}; use core_affinity::CoreId; @@ -28,8 +27,12 @@ pub struct JobCores { struct JobCoresInner { /// A map to the repin_tx for each job thread job_threads: HashMap>, - /// Kept sorted by `CoreInfo.jobs.len()`, in ascending order cores: IndexMap, + /// An index into `cores` of the next core to put a new job onto. + /// + /// This acts as a partition point in `cores`; all cores in `..index` have + /// one fewer job on them than the cores in `index..`. + next_core: usize, next_id: JobThreadId, } @@ -37,9 +40,6 @@ struct JobCoresInner { struct CoreInfo { jobs: SmallVec<[JobThreadId; 4]>, } -fn cores_cmp(_: &CoreId, v1: &CoreInfo, _: &CoreId, v2: &CoreInfo) -> cmp::Ordering { - Ord::cmp(&v1.jobs.len(), &v2.jobs.len()) -} #[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] struct JobThreadId(usize); @@ -49,18 +49,7 @@ impl JobCores { pub fn take(&self) -> JobCore { let inner = if let Some(inner) = &self.inner { let cores = Arc::downgrade(inner); - let mut inner = inner.lock().unwrap(); - - let id = inner.next_id; - inner.next_id.0 += 1; - - let (&core_id, core) = inner.cores.first_mut().unwrap(); - core.jobs.push(id); - inner.cores.sort_by(cores_cmp); - - let (repin_tx, repin_rx) = watch::channel(core_id); - inner.job_threads.insert(id, repin_tx); - + let (id, repin_rx) = inner.lock().unwrap().allocate(); Some(JobCoreInner { repin_rx, _guard: JobCoreGuard { cores, id }, @@ -80,6 +69,7 @@ impl FromIterator for JobCores { Arc::new(Mutex::new(JobCoresInner { job_threads: HashMap::default(), cores, + next_core: 0, next_id: JobThreadId(0), })) }); @@ -88,31 +78,51 @@ impl FromIterator for JobCores { } impl JobCoresInner { + fn allocate(&mut self) -> (JobThreadId, watch::Receiver) { + let id = self.next_id; + self.next_id.0 += 1; + + let (&core_id, core) = self.cores.get_index_mut(self.next_core).unwrap(); + core.jobs.push(id); + self.next_core = (self.next_core + 1) % self.cores.len(); + + let (repin_tx, repin_rx) = watch::channel(core_id); + self.job_threads.insert(id, repin_tx); + + (id, repin_rx) + } + /// Run when a `JobThread` exits. - fn on_thread_exit(&mut self, id: JobThreadId) { + fn deallocate(&mut self, id: JobThreadId) { let core_id = *self.job_threads.remove(&id).unwrap().borrow(); let core_index = self.cores.get_index_of(&core_id).unwrap(); - let last_index = self.cores.len() - 1; - - // `last_core` will be Some if `core_index` is not `last_index` - // FIXME(noa): this will fail to level sometimes; we should keep a partition point and - // manually move cores before it when they're low and above it when they're high. - let (core, last_core) = match self.cores.get_disjoint_indices_mut([core_index, last_index]) { - Ok([(_, core), (_, last)]) => (core, Some(last)), - Err(_) => (&mut self.cores[core_index], None), - }; - let job_pos = core.jobs.iter().position(|x| *x == id).unwrap(); + // This core is now less busy than it should be - bump `next_core` back + // by 1 and steal a thread from the core there. + // + // This wraps around in the 0 case, so the partition point is simply + // moved to the end of the ring buffer. + + let steal_from = self.next_core.checked_sub(1).unwrap_or(self.cores.len() - 1); + + if let Ok([(_, core), (_, steal_from)]) = self.cores.get_disjoint_indices_mut([core_index, steal_from]) { + // This unwrap will never fail, since cores below `next_core` always have + // at least 1 thread on them. Edge case: if `next_core` is 0, `steal_from` + // would wrap around to the end - but when `next_core` is 0, every core has + // the same number of threads; so, if the last core is empty, all the cores + // would be empty, but we know that's impossible because we're deallocating + // a thread right now. + let stolen = steal_from.jobs.pop().unwrap(); + + let pos = core.jobs.iter().position(|x| *x == id).unwrap(); + core.jobs[pos] = stolen; - if let Some(job) = last_core.and_then(|last| last.jobs.pop()) { - core.jobs[job_pos] = job; - let sender = self.job_threads.get_mut(&job).unwrap(); - sender.send_replace(core_id); + self.job_threads[&stolen].send_replace(core_id); } else { - core.jobs.remove(job_pos); + // this core was already at `next_core - 1` - nothing needs to be done! + self.next_core = steal_from; } - self.cores.sort_by(cores_cmp); } } @@ -191,7 +201,7 @@ struct JobCoreGuard { impl Drop for JobCoreGuard { fn drop(&mut self) { if let Some(cores) = self.cores.upgrade() { - cores.lock().unwrap().on_thread_exit(self.id); + cores.lock().unwrap().deallocate(self.id); } } } From 909ee0b1107b06b737e67193de307af9c376fcdb Mon Sep 17 00:00:00 2001 From: Noa Date: Tue, 3 Jun 2025 17:00:46 -0500 Subject: [PATCH 15/25] More docs --- crates/core/src/host/host_controller.rs | 3 +- crates/core/src/startup.rs | 50 +++++++++++++++++++++++-- crates/core/src/util/jobs.rs | 15 ++++++-- 3 files changed, 59 insertions(+), 9 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index f35d30a0800..858857bc7c5 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -96,7 +96,7 @@ pub struct HostController { pub page_pool: PagePool, /// The runtimes for running our modules. runtimes: Arc, - /// The CPU cores that are reserved for running databases on. + /// The CPU cores that are reserved for ModuleHost operations to run on. db_cores: JobCores, } @@ -581,6 +581,7 @@ async fn make_module_host( unregister: impl Fn() + Send + Sync + 'static, core: JobCore, ) -> anyhow::Result<(Program, ModuleHost)> { + // `make_actor` is blocking, as it needs to compile the wasm to native code. asyncify(move || { let module_host = match host_type { HostType::Wasm => { diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index 103df5ca567..36a3f1c867d 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -40,10 +40,6 @@ impl Default for TracingOptions { } } } -#[must_use] -pub fn pin_threads() -> Cores { - Cores::get().unwrap_or_default() -} pub fn configure_tracing(opts: TracingOptions) { // Use this to change log levels at runtime. @@ -151,10 +147,47 @@ fn reload_config(conf_file: &ConfigToml, reload_handle: &reload::Handle Cores { + Cores::get().unwrap_or_default() +} + +/// A type holding cores divvied up into different sets. +/// +/// Obtained from [`pin_threads()`]. #[derive(Default)] pub struct Cores { + /// The cores to run database instances on. + /// + /// Currently, this is 1/8 of num_cpus. pub databases: JobCores, + /// The cores to run tokio worker and blocking threads on. + /// + /// Currently, tokio worker threads are 4/8 of num_cpus, and tokio blocking + /// threads are pinned non-exclusively to 2/8 of num_cpus. pub tokio: TokioCores, + /// The cores to run rayon threads on. + /// + /// Currently, this is 1/8 of num_cpus. pub rayon: RayonCores, } @@ -196,11 +229,17 @@ impl Cores { #[derive(Default)] pub struct TokioCores { workers: Option>, + // For blocking threads, we don't want to limit them to a specific number + // and pin them to their own cores - they're supposed to run concurrently + // with each other. However, `core_affinity` doesn't support affinity masks, + // so we just use the Linux-specific API, since this is only a slight boost + // and we don't care enough about performance on other platforms. #[cfg(target_os = "linux")] blocking: Option, } impl TokioCores { + /// Configures `builder` to pin its worker threads to specific cores. pub fn configure(self, builder: &mut tokio::runtime::Builder) { if let Some(cores) = self.workers { builder.worker_threads(cores.len()); @@ -232,6 +271,9 @@ impl TokioCores { pub struct RayonCores(Option>); impl RayonCores { + /// Configures a global rayon threadpool, pinning its threads to specific cores. + /// + /// All rayon threads will be run with `tokio_handle` enetered into. pub fn configure(self, tokio_handle: &tokio::runtime::Handle) { rayon_core::ThreadPoolBuilder::new() .thread_name(|_idx| "rayon-worker".to_string()) diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 6dfc1add2af..5ab80581c75 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -6,7 +6,7 @@ use smallvec::SmallVec; use spacetimedb_data_structures::map::HashMap; use tokio::sync::{mpsc, oneshot, watch}; -/// A pool of CPU cores for running jobs on. +/// A handle to a pool of CPU cores for running job threads on. /// /// Each thread is represented by a [`JobThread`], which is pinned to a single /// core and sequentially runs the jobs that are passed to [`JobThread::run`]. @@ -19,6 +19,10 @@ use tokio::sync::{mpsc, oneshot, watch}; /// Construction is done via the `FromIterator` impl. If created from an empty /// iterator or via `JobCores::default()`, the job threads will work but not be /// pinned to any threads. +/// +/// This handle is cheaply cloneable. If all instances of it are dropped, +/// threads will continue running, but will no longer repin each other +/// when one exits. #[derive(Default, Clone)] pub struct JobCores { inner: Option>>, @@ -208,8 +212,11 @@ impl Drop for JobCoreGuard { /// A handle to a thread running a job loop; see [`JobCores`] for more details. /// -/// This handle is cheaply cloneäble, and the thread will shut down when all -/// handles to it have been dropped. +/// The thread stores data of type `T`; jobs run on the thread will be given +/// mutable access to it. +/// +/// This handle is cheaply cloneable. If all strong handles have been dropped, +/// the thread will shut down. pub struct JobThread { tx: mpsc::Sender>>, } @@ -223,7 +230,7 @@ impl Clone for JobThread { type Job = dyn FnOnce(&mut T) + Send; impl JobThread { - /// Run a blocking job on this thread. + /// Run a blocking job on this `JobThread`. /// /// The job (`f`) will be placed in a queue, and will run strictly after /// jobs ahead of it in the queue. If `f` panics, it will be bubbled up to From 759a1656f450383e79f78b9bb7a283dcd1f92833 Mon Sep 17 00:00:00 2001 From: Noa Date: Wed, 4 Jun 2025 11:17:32 -0500 Subject: [PATCH 16/25] Allow closing a JobThread --- crates/core/src/host/module_host.rs | 138 ++++-------------- .../src/host/wasm_common/module_host_actor.rs | 21 +-- crates/core/src/util/jobs.rs | 63 ++++++-- 3 files changed, 86 insertions(+), 136 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 585e97d0178..5810dbb6cde 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -17,14 +17,12 @@ use crate::subscription::execute_plan; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::tx::DeltaTx; use crate::util::asyncify; -use crate::util::jobs::{JobCore, JobThread, WeakJobThread}; -use crate::util::lending_pool::{LendingPool, PoolClosed}; +use crate::util::jobs::{JobCore, JobThread, JobThreadClosed, WeakJobThread}; use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; use anyhow::Context; use bytes::Bytes; use derive_more::From; -use futures::{Future, FutureExt}; use indexmap::IndexSet; use itertools::Itertools; use prometheus::{Histogram, IntGauge}; @@ -295,14 +293,17 @@ impl ReducersMap { } } -pub trait Module: Send + Sync + 'static { +pub trait DynModule: Send + Sync + 'static { + fn replica_ctx(&self) -> &Arc; + fn scheduler(&self) -> &Scheduler; +} + +pub trait Module: DynModule { type Instance: ModuleInstance; type InitialInstances<'a>: IntoIterator + 'a; fn initial_instances(&mut self) -> Self::InitialInstances<'_>; fn info(&self) -> Arc; fn create_instance(&self) -> Self::Instance; - fn replica_ctx(&self) -> &ReplicaContext; - fn scheduler(&self) -> &Scheduler; } pub trait ModuleInstance: Send + 'static { @@ -442,7 +443,7 @@ impl ModuleInstance for AutoReplacingModuleInstance { #[derive(Clone)] pub struct ModuleHost { pub info: Arc, - inner: Arc, + module: Arc, /// Called whenever a reducer call on this host panics. on_panic: Arc, job_tx: JobThread, @@ -452,99 +453,14 @@ impl fmt::Debug for ModuleHost { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ModuleHost") .field("info", &self.info) - .field("inner", &Arc::as_ptr(&self.inner)) + .field("module", &Arc::as_ptr(&self.module)) .finish() } } -#[async_trait::async_trait] -trait DynModuleHost: Send + Sync + 'static { - // async fn get_instance(&self, db: Identity) -> Result, NoSuchModule>; - fn replica_ctx(&self) -> &ReplicaContext; - async fn exit(&self); - async fn exited(&self); -} - -struct HostControllerActor { - module: Arc, - instance_pool: LendingPool, -} - -impl HostControllerActor { - #[allow(unused)] - fn spinup_new_instance(&self) { - let (module, instance_pool) = (self.module.clone(), self.instance_pool.clone()); - rayon::spawn(move || { - let instance = module.create_instance(); - match instance_pool.add(instance) { - Ok(()) => {} - Err(PoolClosed) => { - // if the module closed since this new instance was requested, oh well, just throw it away - } - } - }) - } -} - -/// runs future A and future B concurrently. if A completes before B, B is cancelled. if B completes -/// before A, A is polled to completion -#[allow(unused)] -async fn select_first>(fut_a: A, fut_b: B) -> A::Output { - tokio::select! { - ret = fut_a => ret, - Err(x) = fut_b.never_error() => match x {}, - } -} - -#[async_trait::async_trait] -impl DynModuleHost for HostControllerActor { - /* - async fn get_instance(&self, db: Identity) -> Result, NoSuchModule> { - // in the future we should do something like in the else branch here -- add more instances based on load. - // we need to do write-skew retries first - right now there's only ever once instance per module. - /* - let inst = if true { - self.instance_pool - .request_with_context(db) - .await - .map_err(|_| NoSuchModule)? - } else { - const GET_INSTANCE_TIMEOUT: Duration = Duration::from_millis(500); - select_first( - self.instance_pool.request_with_context(db), - tokio::time::sleep(GET_INSTANCE_TIMEOUT).map(|()| self.spinup_new_instance()), - ) - .await - .map_err(|_| NoSuchModule)? - }; - - */ - Ok(Box::new(AutoReplacingModuleInstance { - inst: self.module.create_instance(), - module: self.module.clone(), - })) - } - - */ - - fn replica_ctx(&self) -> &ReplicaContext { - self.module.replica_ctx() - } - - async fn exit(&self) { - self.module.scheduler().close(); - self.instance_pool.close(); - self.exited().await - } - - async fn exited(&self) { - tokio::join!(self.module.scheduler().closed(), self.instance_pool.closed()); - } -} - pub struct WeakModuleHost { info: Arc, - inner: Weak, + inner: Weak, on_panic: Weak, tx: WeakJobThread, } @@ -609,12 +525,7 @@ pub enum ClientConnectedError { impl ModuleHost { pub(super) fn new(module: impl Module, on_panic: impl Fn() + Send + Sync + 'static, core: JobCore) -> Self { let info = module.info(); - let instance_pool = LendingPool::new(); let module = Arc::new(module); - let inner = Arc::new(HostControllerActor { - module: module.clone(), - instance_pool, - }); let on_panic = Arc::new(on_panic); let module_clone = module.clone(); @@ -627,7 +538,7 @@ impl ModuleHost { ); ModuleHost { info, - inner, + module, on_panic, job_tx, } @@ -678,15 +589,14 @@ impl ModuleHost { log::warn!("reducer {reducer} panicked"); (self.on_panic)(); }); - let result = self - .job_tx + self.job_tx .run(move |inst| { queue_timer.stop_and_record(); queue_length_gauge.dec(); f(inst) }) - .await; - Ok(result) + .await + .map_err(|_: JobThreadClosed| NoSuchModule) } pub async fn disconnect_client(&self, client_id: ClientActorId) { @@ -777,7 +687,7 @@ impl ModuleHost { arg_bsatn: Bytes::new(), }); - let stdb = self.inner.replica_ctx().relational_db.clone(); + let stdb = self.module.replica_ctx().relational_db.clone(); asyncify(move || { stdb.with_auto_commit(workload, |mut_tx| { mut_tx @@ -830,7 +740,7 @@ impl ModuleHost { timestamp: Timestamp::now(), arg_bsatn: Bytes::new(), }); - let stdb = self.inner.replica_ctx().relational_db.clone(); + let stdb = self.module.replica_ctx().relational_db.clone(); let database_identity = self.info.database_identity; asyncify(move || { stdb.with_auto_commit(workload, |mut_tx| { @@ -992,7 +902,7 @@ impl ModuleHost { &self, call_reducer_params: impl FnOnce(&MutTxId) -> anyhow::Result> + Send + 'static, ) -> Result { - let db = self.inner.replica_ctx().relational_db.clone(); + let db = self.module.replica_ctx().relational_db.clone(); // scheduled reducer name not fetched yet, anyway this is only for logging purpose const REDUCER: &str = "scheduled_reducer"; let module = self.info.clone(); @@ -1038,7 +948,7 @@ impl ModuleHost { } pub async fn init_database(&self, program: Program) -> Result, InitDatabaseError> { - let replica_ctx = self.inner.replica_ctx().clone(); + let replica_ctx = self.module.replica_ctx().clone(); let info = self.info.clone(); self.call("", move |inst| { init_database(&replica_ctx, &info.module_def, inst, program) @@ -1060,11 +970,13 @@ impl ModuleHost { } pub async fn exit(&self) { - self.inner.exit().await + self.module.scheduler().close(); + self.job_tx.close(); + self.exited().await; } pub async fn exited(&self) { - self.inner.exited().await + tokio::join!(self.module.scheduler().closed(), self.job_tx.closed()); } pub fn inject_logs(&self, log_level: LogLevel, message: &str) { @@ -1160,7 +1072,7 @@ impl ModuleHost { pub fn downgrade(&self) -> WeakModuleHost { WeakModuleHost { info: self.info.clone(), - inner: Arc::downgrade(&self.inner), + inner: Arc::downgrade(&self.module), on_panic: Arc::downgrade(&self.on_panic), tx: self.job_tx.downgrade(), } @@ -1171,7 +1083,7 @@ impl ModuleHost { } pub(crate) fn replica_ctx(&self) -> &ReplicaContext { - self.inner.replica_ctx() + self.module.replica_ctx() } } @@ -1182,7 +1094,7 @@ impl WeakModuleHost { let tx = self.tx.upgrade()?; Some(ModuleHost { info: self.info.clone(), - inner, + module: inner, on_panic, job_tx: tx, }) diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 1b1c61ef07d..716102f456d 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -15,7 +15,8 @@ use crate::energy::{EnergyMonitor, EnergyQuanta, ReducerBudget, ReducerFingerpri use crate::execution_context::{self, ReducerContext, Workload}; use crate::host::instance_env::InstanceEnv; use crate::host::module_host::{ - CallReducerParams, DatabaseUpdate, EventStatus, Module, ModuleEvent, ModuleFunctionCall, ModuleInfo, ModuleInstance, + CallReducerParams, DatabaseUpdate, DynModule, EventStatus, Module, ModuleEvent, ModuleFunctionCall, ModuleInfo, + ModuleInstance, }; use crate::host::{ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, UpdateDatabaseResult}; use crate::identity::Identity; @@ -200,6 +201,16 @@ impl WasmModuleHostActor { } } +impl DynModule for WasmModuleHostActor { + fn replica_ctx(&self) -> &Arc { + &self.replica_context + } + + fn scheduler(&self) -> &Scheduler { + &self.scheduler + } +} + impl Module for WasmModuleHostActor { type Instance = WasmModuleInstance; @@ -224,14 +235,6 @@ impl Module for WasmModuleHostActor { let _ = instance.extract_descriptions(); self.make_from_instance(instance) } - - fn replica_ctx(&self) -> &ReplicaContext { - &self.replica_context - } - - fn scheduler(&self) -> &Scheduler { - &self.scheduler - } } pub struct WasmModuleInstance { diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 5ab80581c75..65afbee20fc 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -1,11 +1,15 @@ +use std::pin::pin; use std::sync::{Arc, Mutex, Weak}; use core_affinity::CoreId; +use futures::FutureExt; use indexmap::IndexMap; use smallvec::SmallVec; use spacetimedb_data_structures::map::HashMap; use tokio::sync::{mpsc, oneshot, watch}; +use super::notify_once::NotifyOnce; + /// A handle to a pool of CPU cores for running job threads on. /// /// Each thread is represented by a [`JobThread`], which is pinned to a single @@ -154,22 +158,24 @@ impl JobCore { T: ?Sized + 'static, { let (tx, rx) = mpsc::channel::>>(Self::JOB_CHANNEL_LENGTH); + let close = Arc::new(NotifyOnce::new()); + let closed = close.clone(); let handle = tokio::runtime::Handle::current(); std::thread::spawn(move || { let mut data = init(); let data = unsize(&mut data); - handle.block_on(self.job_loop(rx, data)) + handle.block_on(self.job_loop(rx, closed, data)) }); - JobThread { tx } + JobThread { tx, close } } // this shouldn't matter too much, since callers will need to wait for // the job to finish anyway. const JOB_CHANNEL_LENGTH: usize = 50; - async fn job_loop(mut self, mut rx: mpsc::Receiver>>, data: &mut T) { + async fn job_loop(mut self, mut rx: mpsc::Receiver>>, closed: Arc, data: &mut T) { // this function is async because we need to recv on the repin channel // and the jobs channel, but the jobs being run are blocking @@ -184,11 +190,18 @@ impl JobCore { }; let job_loop = async { - while let Some(job) = rx.recv().await { - // blocking in place means that other futures on the same task - // won't get polled - in this case, that's just the repin loop, - // which is fine because it can just run before the next job. - tokio::task::block_in_place(|| job(data)) + let mut closed = pin!(closed.notified().fuse()); + loop { + tokio::select! { + job = rx.recv() => { + let Some(job) = job else { break }; + // blocking in place means that other futures on the same task + // won't get polled - in this case, that's just the repin loop, + // which is fine because it can just run before the next job. + tokio::task::block_in_place(|| job(data)) + } + () = &mut closed => rx.close(), + } } }; @@ -219,11 +232,15 @@ impl Drop for JobCoreGuard { /// the thread will shut down. pub struct JobThread { tx: mpsc::Sender>>, + close: Arc, } impl Clone for JobThread { fn clone(&self) -> Self { - Self { tx: self.tx.clone() } + Self { + tx: self.tx.clone(), + close: self.close.clone(), + } } } @@ -235,7 +252,7 @@ impl JobThread { /// The job (`f`) will be placed in a queue, and will run strictly after /// jobs ahead of it in the queue. If `f` panics, it will be bubbled up to /// the calling task. - pub async fn run(&self, f: F) -> R + pub async fn run(&self, f: F) -> Result where F: FnOnce(&mut T) -> R + Send + 'static, R: Send + 'static, @@ -252,25 +269,43 @@ impl JobThread { } })) .await - .expect("job thread terminated unexpectedly"); + .map_err(|_| JobThreadClosed)?; - ret_rx.await.unwrap().unwrap_or_else(|e| std::panic::resume_unwind(e)) + match ret_rx.await { + Ok(Ok(ret)) => Ok(ret), + Ok(Err(panic)) => std::panic::resume_unwind(panic), + Err(_closed) => Err(JobThreadClosed), + } + } + + /// Shutdown the job thread. + pub fn close(&self) { + self.close.notify(); + } + + /// Returns a future that resolves once the job thread has been closed. + pub async fn closed(&self) { + self.tx.closed().await } /// Obtain a weak version of this handle. pub fn downgrade(&self) -> WeakJobThread { let tx = self.tx.downgrade(); - WeakJobThread { tx } + let close = Arc::downgrade(&self.close); + WeakJobThread { tx, close } } } +pub struct JobThreadClosed; + /// A weak version of `JobThread` that does not hold the thread open. pub struct WeakJobThread { tx: mpsc::WeakSender>>, + close: Weak, } impl WeakJobThread { pub fn upgrade(&self) -> Option> { - self.tx.upgrade().map(|tx| JobThread { tx }) + Option::zip(self.tx.upgrade(), self.close.upgrade()).map(|(tx, close)| JobThread { tx, close }) } } From 72623f8365c2461691b8e413e733231d6d5a1cb0 Mon Sep 17 00:00:00 2001 From: Noa Date: Wed, 4 Jun 2025 11:29:11 -0500 Subject: [PATCH 17/25] Tiny fix --- crates/core/src/util/jobs.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 65afbee20fc..be47de376be 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -129,8 +129,9 @@ impl JobCoresInner { self.job_threads[&stolen].send_replace(core_id); } else { // this core was already at `next_core - 1` - nothing needs to be done! - self.next_core = steal_from; } + + self.next_core = steal_from; } } From 69b77dd84d5c2b168271187dd57ac42cb2465632 Mon Sep 17 00:00:00 2001 From: Noa Date: Wed, 4 Jun 2025 11:35:48 -0500 Subject: [PATCH 18/25] Tiny fix 2 --- crates/core/src/util/jobs.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index be47de376be..52ec8a008fb 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -112,9 +112,18 @@ impl JobCoresInner { // This wraps around in the 0 case, so the partition point is simply // moved to the end of the ring buffer. - let steal_from = self.next_core.checked_sub(1).unwrap_or(self.cores.len() - 1); + let steal_from_index = self.next_core.checked_sub(1).unwrap_or(self.cores.len() - 1); - if let Ok([(_, core), (_, steal_from)]) = self.cores.get_disjoint_indices_mut([core_index, steal_from]) { + // if this core was already at `next_core - 1`, we don't need to steal from anywhere + let (core, steal_from) = match self.cores.get_disjoint_indices_mut([core_index, steal_from_index]) { + Ok([(_, core), (_, steal_from)]) => (core, Some(steal_from)), + Err(_) => (&mut self.cores[core_index], None), + }; + + let pos = core.jobs.iter().position(|x| *x == id).unwrap(); + core.jobs.remove(pos); + + if let Some(steal_from) = steal_from { // This unwrap will never fail, since cores below `next_core` always have // at least 1 thread on them. Edge case: if `next_core` is 0, `steal_from` // would wrap around to the end - but when `next_core` is 0, every core has @@ -122,16 +131,13 @@ impl JobCoresInner { // would be empty, but we know that's impossible because we're deallocating // a thread right now. let stolen = steal_from.jobs.pop().unwrap(); - - let pos = core.jobs.iter().position(|x| *x == id).unwrap(); - core.jobs[pos] = stolen; - + // the way we pop and push here means that older job threads will be less + // likely to be repinned, while younger ones are liable to bounce around. + core.jobs.push(stolen); self.job_threads[&stolen].send_replace(core_id); - } else { - // this core was already at `next_core - 1` - nothing needs to be done! } - self.next_core = steal_from; + self.next_core = steal_from_index; } } From 3f872b5adfc8f7a235752697f53959b1de4d2c2b Mon Sep 17 00:00:00 2001 From: Noa Date: Wed, 4 Jun 2025 11:56:20 -0500 Subject: [PATCH 19/25] Don't finish the queue before closing --- crates/core/src/util/jobs.rs | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 52ec8a008fb..bd7b5fe9ed4 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -1,8 +1,6 @@ -use std::pin::pin; use std::sync::{Arc, Mutex, Weak}; use core_affinity::CoreId; -use futures::FutureExt; use indexmap::IndexMap; use smallvec::SmallVec; use spacetimedb_data_structures::map::HashMap; @@ -197,22 +195,20 @@ impl JobCore { }; let job_loop = async { - let mut closed = pin!(closed.notified().fuse()); - loop { - tokio::select! { - job = rx.recv() => { - let Some(job) = job else { break }; - // blocking in place means that other futures on the same task - // won't get polled - in this case, that's just the repin loop, - // which is fine because it can just run before the next job. - tokio::task::block_in_place(|| job(data)) - } - () = &mut closed => rx.close(), - } + while let Some(job) = rx.recv().await { + // blocking in place means that other futures on the same task + // won't get polled - in this case, that's just the repin loop, + // which is fine because it can just run before the next job. + tokio::task::block_in_place(|| job(data)) } }; - super::also_poll(job_loop, repin_loop).await + tokio::select! { + () = super::also_poll(job_loop, repin_loop) => {} + // when we receive a close notification, we immediately drop all + // remaining jobs in the queue. + () = closed.notified() => {} + } } } From cc2b42bec04b1fe460a67e5b00845120d1668937 Mon Sep 17 00:00:00 2001 From: Noa Date: Wed, 4 Jun 2025 12:32:57 -0500 Subject: [PATCH 20/25] Bump JOB_CHANNEL_LENGTH to 64 --- crates/core/src/util/jobs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index bd7b5fe9ed4..92848aac98a 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -178,7 +178,7 @@ impl JobCore { // this shouldn't matter too much, since callers will need to wait for // the job to finish anyway. - const JOB_CHANNEL_LENGTH: usize = 50; + const JOB_CHANNEL_LENGTH: usize = 64; async fn job_loop(mut self, mut rx: mpsc::Receiver>>, closed: Arc, data: &mut T) { // this function is async because we need to recv on the repin channel From 7e7e4623a3f86f8eef9c954cf5419c4f54cf0a80 Mon Sep 17 00:00:00 2001 From: Noa Date: Fri, 6 Jun 2025 12:40:38 -0500 Subject: [PATCH 21/25] Update crates/core/src/startup.rs Co-authored-by: Phoebe Goldman Signed-off-by: Noa --- crates/core/src/startup.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index 36a3f1c867d..41e7ca49ba2 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -156,7 +156,7 @@ fn reload_config(conf_file: &ConfigToml, reload_handle: &reload::Handle Date: Fri, 6 Jun 2025 12:51:40 -0500 Subject: [PATCH 22/25] Address review --- crates/core/src/host/host_controller.rs | 5 +++++ crates/core/src/util/jobs.rs | 1 + 2 files changed, 6 insertions(+) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 858857bc7c5..81ad9cc9b99 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -277,6 +277,11 @@ impl HostController { self.page_pool.clone(), database, program, + // This takes a db core to check validity, and we will later take + // another core to actually run the module. Due to the round-robin + // algorithm that JobCores uses, that will likely just be the same + // core - there's not a concern that we'll only end up using 1/2 + // of the actual cores. self.db_cores.take(), ) .await diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 92848aac98a..b31be07727e 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -302,6 +302,7 @@ impl JobThread { pub struct JobThreadClosed; /// A weak version of `JobThread` that does not hold the thread open. +// used in crate::core::module_host::WeakModuleHost pub struct WeakJobThread { tx: mpsc::WeakSender>>, close: Weak, From 75cf78bea2cd11dc8b875889f80e7e742d186759 Mon Sep 17 00:00:00 2001 From: Noa Date: Fri, 6 Jun 2025 12:52:08 -0500 Subject: [PATCH 23/25] Remove lending_pool.rs --- crates/core/src/util/lending_pool.rs | 219 --------------------------- crates/core/src/util/mod.rs | 1 - 2 files changed, 220 deletions(-) delete mode 100644 crates/core/src/util/lending_pool.rs diff --git a/crates/core/src/util/lending_pool.rs b/crates/core/src/util/lending_pool.rs deleted file mode 100644 index 09b55969b2b..00000000000 --- a/crates/core/src/util/lending_pool.rs +++ /dev/null @@ -1,219 +0,0 @@ -//! like, a semaphore but with values. or something - -use std::collections::VecDeque; -use std::future::Future; -use std::mem::ManuallyDrop; -use std::ops::{Deref, DerefMut}; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use parking_lot::Mutex; -use spacetimedb_lib::Identity; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; - -use crate::worker_metrics::WORKER_METRICS; - -use super::notify_once::{NotifiedOnce, NotifyOnce}; - -pub struct LendingPool { - sem: Arc, - inner: Arc>, -} - -impl Default for LendingPool { - fn default() -> Self { - Self::new() - } -} - -impl Clone for LendingPool { - fn clone(&self) -> Self { - Self { - sem: self.sem.clone(), - inner: self.inner.clone(), - } - } -} - -struct LendingPoolInner { - closed_notify: NotifyOnce, - vec: Mutex>, -} - -struct PoolVec { - total_count: usize, - deque: Option>, -} - -#[derive(Debug)] -pub struct PoolClosed; - -/// A scope guard for the reducer queue length metric, -/// ensuring an increment is always be paired with one and only one decrement. -struct QueueMetric { - db: Identity, -} - -impl Drop for QueueMetric { - fn drop(&mut self) { - WORKER_METRICS.instance_queue_length.with_label_values(&self.db).dec(); - let queue_length = WORKER_METRICS.instance_queue_length.with_label_values(&self.db).get(); - WORKER_METRICS - .instance_queue_length_histogram - .with_label_values(&self.db) - .observe(queue_length as f64); - } -} - -impl QueueMetric { - fn inc(db: Identity) -> Self { - WORKER_METRICS.instance_queue_length.with_label_values(&db).inc(); - let queue_length = WORKER_METRICS.instance_queue_length.with_label_values(&db).get(); - WORKER_METRICS - .instance_queue_length_histogram - .with_label_values(&db) - .observe(queue_length as f64); - Self { db } - } -} - -impl LendingPool { - pub fn new() -> Self { - Self::from_iter(std::iter::empty()) - } - - pub fn request_with_context(&self, db: Identity) -> impl Future, PoolClosed>> { - let acq = self.sem.clone().acquire_owned(); - let pool_inner = self.inner.clone(); - - async move { - let _guard = QueueMetric::inc(db); - let permit = acq.await.map_err(|_| PoolClosed)?; - let resource = pool_inner - .vec - .lock() - .deque - .as_mut() - .ok_or(PoolClosed)? - .pop_front() - .ok_or(PoolClosed)?; - Ok(LentResource { - resource: ManuallyDrop::new(resource), - permit: ManuallyDrop::new(permit), - pool_inner, - }) - } - } - - pub fn add(&self, resource: T) -> Result<(), PoolClosed> { - self.add_multiple(std::iter::once(resource)) - } - - pub fn add_multiple>(&self, resources: I) -> Result<(), PoolClosed> { - let resources = resources.into_iter(); - let mut inner = self.inner.vec.lock(); - let deque = inner.deque.as_mut().ok_or(PoolClosed)?; - let mut num_new = 0; - deque.extend(resources.inspect(|_| num_new += 1)); - inner.total_count += num_new; - self.sem.add_permits(num_new); - Ok(()) - } - - pub fn num_total(&self) -> usize { - self.inner.vec.lock().total_count - } - - pub fn num_available(&self) -> usize { - self.sem.available_permits() - } - - pub fn close(&self) { - let mut vec = self.inner.vec.lock(); - self.sem.close(); - if let Some(deque) = vec.deque.take() { - vec.total_count -= deque.len(); - } - if vec.total_count == 0 { - self.inner.closed_notify.notify(); - } - } - - pub fn closed(&self) -> Closed<'_> { - Closed { - notified: self.inner.closed_notify.notified(), - } - } -} - -impl FromIterator for LendingPool { - fn from_iter>(iter: I) -> Self { - let deque = VecDeque::from_iter(iter); - Self { - sem: Arc::new(Semaphore::new(deque.len())), - inner: Arc::new(LendingPoolInner { - closed_notify: NotifyOnce::new(), - vec: Mutex::new(PoolVec { - total_count: deque.len(), - deque: Some(deque), - }), - }), - } - } -} - -pin_project_lite::pin_project! { - pub struct Closed<'a> { - #[pin] - notified: NotifiedOnce<'a>, - } -} - -impl Future for Closed<'_> { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().notified.poll(cx) - } -} - -pub struct LentResource { - resource: ManuallyDrop, - permit: ManuallyDrop, - pool_inner: Arc>, -} - -impl Deref for LentResource { - type Target = T; - fn deref(&self) -> &T { - &self.resource - } -} - -impl DerefMut for LentResource { - fn deref_mut(&mut self) -> &mut T { - &mut self.resource - } -} - -impl Drop for LentResource { - fn drop(&mut self) { - let resource = unsafe { ManuallyDrop::take(&mut self.resource) }; - let permit = unsafe { ManuallyDrop::take(&mut self.permit) }; - { - let mut vec = self.pool_inner.vec.lock(); - if let Some(deque) = &mut vec.deque { - deque.push_back(resource); - drop(permit); - } else { - drop(resource); - permit.forget(); - vec.total_count -= 1; - if vec.total_count == 0 { - self.pool_inner.closed_notify.notify(); - } - } - } - } -} diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs index 9599f55ee45..cc37d75ed38 100644 --- a/crates/core/src/util/mod.rs +++ b/crates/core/src/util/mod.rs @@ -6,7 +6,6 @@ use tokio::sync::oneshot; pub mod prometheus_handle; pub mod jobs; -pub mod lending_pool; pub mod notify_once; pub mod slow; From ba56187456c23256c87c5e6ee17cc90392e25809 Mon Sep 17 00:00:00 2001 From: Noa Date: Fri, 6 Jun 2025 12:57:05 -0500 Subject: [PATCH 24/25] More comments --- crates/core/src/host/host_controller.rs | 3 ++- crates/core/src/startup.rs | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 81ad9cc9b99..1b18fe31bd7 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -586,7 +586,8 @@ async fn make_module_host( unregister: impl Fn() + Send + Sync + 'static, core: JobCore, ) -> anyhow::Result<(Program, ModuleHost)> { - // `make_actor` is blocking, as it needs to compile the wasm to native code. + // `make_actor` is blocking, as it needs to compile the wasm to native code, + // which may be computationally expensive. asyncify(move || { let module_host = match host_type { HostType::Wasm => { diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index 41e7ca49ba2..57d9f7177ee 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -206,6 +206,7 @@ impl Cores { let rayon = RayonCores(Some(cores.take(frac(1.0 / 8.0)).collect())); + // see comment on `TokioCores.blocking` #[cfg(target_os = "linux")] let remaining = cores.try_fold(nix::sched::CpuSet::new(), |mut cpuset, core| { cpuset.set(core.id).ok()?; From 08d30b22b86293c43d1553e4a9aacb3abc61f607 Mon Sep 17 00:00:00 2001 From: Noa Date: Fri, 6 Jun 2025 13:14:42 -0500 Subject: [PATCH 25/25] Add TODO --- crates/core/src/host/host_controller.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 1b18fe31bd7..e642bcd5211 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -587,7 +587,10 @@ async fn make_module_host( core: JobCore, ) -> anyhow::Result<(Program, ModuleHost)> { // `make_actor` is blocking, as it needs to compile the wasm to native code, - // which may be computationally expensive. + // which may be computationally expensive - sometimes up to 1s for a large module. + // TODO: change back to using `spawn_rayon` here - asyncify runs on tokio blocking + // threads, but those aren't for computation. Also, wasmtime uses rayon + // to run compilation in parallel, so it'll need to run stuff in rayon anyway. asyncify(move || { let module_host = match host_type { HostType::Wasm => {