diff --git a/Cargo.lock b/Cargo.lock index 4f6ccf4d35a..012e9f5841c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -977,6 +977,17 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core_affinity" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342" +dependencies = [ + "libc", + "num_cpus", + "winapi", +] + [[package]] name = "cpp_demangle" version = "0.4.4" @@ -3278,6 +3289,18 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" +dependencies = [ + "bitflags 2.9.0", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nohash-hasher" version = "0.2.0" @@ -4723,7 +4746,7 @@ dependencies = [ "libc", "log", "memchr", - "nix", + "nix 0.26.4", "radix_trie", "scopeguard", "unicode-segmentation", @@ -5444,8 +5467,10 @@ dependencies = [ "bytes", "bytestring", "chrono", + "core_affinity", "criterion", "crossbeam-channel", + "crossbeam-queue", "derive_more", "dirs", "enum-as-inner", @@ -5465,6 +5490,7 @@ dependencies = [ "lazy_static", "log", "memchr", + "nix 0.30.1", "once_cell", "openssl", "parking_lot 0.12.3", diff --git a/Cargo.toml b/Cargo.toml index 2372cbc497d..1b28274897e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -200,6 +200,7 @@ log = "0.4.17" memchr = "2" mimalloc = "0.1.39" nohash-hasher = "0.2" +nix = "0.30" once_cell = "1.16" parking_lot = { version = "0.12.1", features = ["send_guard", "arc_lock"] } parse-size = "1.1.0" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 0c473b72f7f..5599fb11615 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -48,6 +48,7 @@ bytes.workspace = true bytestring.workspace = true chrono.workspace = true crossbeam-channel.workspace = true +crossbeam-queue.workspace = true derive_more.workspace = true dirs.workspace = true enum-as-inner.workspace = true @@ -113,11 +114,15 @@ wasmtime.workspace = true jwks.workspace = true async_cache = "0.3.1" faststr = "0.2.23" +core_affinity = "0.8" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = {workspace = true} tikv-jemalloc-ctl = {workspace = true} +[target.'cfg(target_os = "linux")'.dependencies] +nix = { workspace = true, features = ["sched"] } + [features] # Print a warning when doing an unindexed `iter_by_col_range` on a large table. unindexed_iter_by_col_range_warn = [] diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index bb2c47cfc75..e642bcd5211 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -14,7 +14,8 @@ use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::module_subscription_manager::SubscriptionManager; -use crate::util::{asyncify, spawn_rayon}; +use crate::util::asyncify; +use crate::util::jobs::{JobCore, JobCores}; use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, ensure, Context}; use async_trait::async_trait; @@ -95,6 +96,8 @@ pub struct HostController { pub page_pool: PagePool, /// The runtimes for running our modules. runtimes: Arc, + /// The CPU cores that are reserved for ModuleHost operations to run on. + db_cores: JobCores, } struct HostRuntimes { @@ -169,6 +172,7 @@ impl HostController { program_storage: ProgramStorage, energy_monitor: Arc, durability: Arc, + db_cores: JobCores, ) -> Self { Self { hosts: <_>::default(), @@ -179,6 +183,7 @@ impl HostController { runtimes: HostRuntimes::new(Some(&data_dir)), data_dir, page_pool: PagePool::new(default_config.page_pool_max_size), + db_cores, } } @@ -267,7 +272,19 @@ impl HostController { /// This is not necessary during hotswap publishes, /// as the automigration planner and executor accomplish the same validity checks. pub async fn check_module_validity(&self, database: Database, program: Program) -> anyhow::Result> { - Host::try_init_in_memory_to_check(&self.runtimes, self.page_pool.clone(), database, program).await + Host::try_init_in_memory_to_check( + &self.runtimes, + self.page_pool.clone(), + database, + program, + // This takes a db core to check validity, and we will later take + // another core to actually run the module. Due to the round-robin + // algorithm that JobCores uses, that will likely just be the same + // core - there's not a concern that we'll only end up using 1/2 + // of the actual cores. + self.db_cores.take(), + ) + .await } /// Run a computation on the [`RelationalDB`] of a [`ModuleHost`] managed by @@ -338,6 +355,7 @@ impl HostController { program, self.energy_monitor.clone(), self.unregister_fn(replica_id), + self.db_cores.take(), ) .await?; @@ -415,6 +433,7 @@ impl HostController { program, self.energy_monitor.clone(), self.unregister_fn(replica_id), + self.db_cores.take(), ) .await?; match update_result { @@ -556,6 +575,7 @@ async fn make_replica_ctx( /// Initialize a module host for the given program. /// The passed replica_ctx may not be configured for this version of the program's database schema yet. +#[allow(clippy::too_many_arguments)] async fn make_module_host( runtimes: Arc, host_type: HostType, @@ -564,8 +584,14 @@ async fn make_module_host( program: Program, energy_monitor: Arc, unregister: impl Fn() + Send + Sync + 'static, + core: JobCore, ) -> anyhow::Result<(Program, ModuleHost)> { - spawn_rayon(move || { + // `make_actor` is blocking, as it needs to compile the wasm to native code, + // which may be computationally expensive - sometimes up to 1s for a large module. + // TODO: change back to using `spawn_rayon` here - asyncify runs on tokio blocking + // threads, but those aren't for computation. Also, wasmtime uses rayon + // to run compilation in parallel, so it'll need to run stuff in rayon anyway. + asyncify(move || { let module_host = match host_type { HostType::Wasm => { let mcc = ModuleCreationContext { @@ -577,7 +603,7 @@ async fn make_module_host( let start = Instant::now(); let actor = runtimes.wasmtime.make_actor(mcc)?; trace!("wasmtime::make_actor blocked for {:?}", start.elapsed()); - ModuleHost::new(actor, unregister) + ModuleHost::new(actor, unregister, core) } }; Ok((program, module_host)) @@ -610,6 +636,7 @@ async fn launch_module( energy_monitor: Arc, replica_dir: ReplicaDir, runtimes: Arc, + core: JobCore, ) -> anyhow::Result<(Program, LaunchedModule)> { let db_identity = database.database_identity; let host_type = database.host_type; @@ -626,6 +653,7 @@ async fn launch_module( program, energy_monitor.clone(), on_panic, + core, ) .await?; @@ -776,6 +804,7 @@ impl Host { energy_monitor.clone(), replica_dir, runtimes.clone(), + host_controller.db_cores.take(), ) .await?; @@ -834,6 +863,7 @@ impl Host { page_pool: PagePool, database: Database, program: Program, + core: JobCore, ) -> anyhow::Result> { // Even in-memory databases acquire a lockfile. // Grab a tempdir to put that lockfile in. @@ -865,6 +895,7 @@ impl Host { Arc::new(NullEnergyMonitor), phony_replica_dir, runtimes.clone(), + core, ) .await?; @@ -895,6 +926,7 @@ impl Host { program: Program, energy_monitor: Arc, on_panic: impl Fn() + Send + Sync + 'static, + core: JobCore, ) -> anyhow::Result { let replica_ctx = &self.replica_ctx; let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone()); @@ -907,6 +939,7 @@ impl Host { program, energy_monitor, on_panic, + core, ) .await?; @@ -981,10 +1014,15 @@ pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> an let runtimes = HostRuntimes::new(None); let page_pool = PagePool::new(None); - let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program).await?; - let module_info = Arc::into_inner(module_info).unwrap(); + let core = JobCore::default(); + let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program, core).await?; + // this should always succeed, but sometimes it doesn't + let module_def = match Arc::try_unwrap(module_info) { + Ok(info) => info.module_def, + Err(info) => info.module_def.clone(), + }; - Ok(module_info.module_def) + Ok(module_def) } // Remove all gauges associated with a database. diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index d88c3b4b5a2..5810dbb6cde 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -17,13 +17,12 @@ use crate::subscription::execute_plan; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::tx::DeltaTx; use crate::util::asyncify; -use crate::util::lending_pool::{LendingPool, LentResource, PoolClosed}; +use crate::util::jobs::{JobCore, JobThread, JobThreadClosed, WeakJobThread}; use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; use anyhow::Context; use bytes::Bytes; use derive_more::From; -use futures::{Future, FutureExt}; use indexmap::IndexSet; use itertools::Itertools; use prometheus::{Histogram, IntGauge}; @@ -294,14 +293,17 @@ impl ReducersMap { } } -pub trait Module: Send + Sync + 'static { +pub trait DynModule: Send + Sync + 'static { + fn replica_ctx(&self) -> &Arc; + fn scheduler(&self) -> &Scheduler; +} + +pub trait Module: DynModule { type Instance: ModuleInstance; type InitialInstances<'a>: IntoIterator + 'a; fn initial_instances(&mut self) -> Self::InitialInstances<'_>; fn info(&self) -> Arc; fn create_instance(&self) -> Self::Instance; - fn replica_ctx(&self) -> &ReplicaContext; - fn scheduler(&self) -> &Scheduler; } pub trait ModuleInstance: Send + 'static { @@ -406,14 +408,14 @@ pub struct CallReducerParams { // TODO: figure out how we want to handle traps. maybe it should just not return to the LendingPool and // let the get_instance logic handle it? struct AutoReplacingModuleInstance { - inst: LentResource, + inst: T::Instance, module: Arc, } impl AutoReplacingModuleInstance { fn check_trap(&mut self) { if self.inst.trapped() { - *self.inst = self.module.create_instance() + self.inst = self.module.create_instance() } } } @@ -441,101 +443,26 @@ impl ModuleInstance for AutoReplacingModuleInstance { #[derive(Clone)] pub struct ModuleHost { pub info: Arc, - inner: Arc, + module: Arc, /// Called whenever a reducer call on this host panics. on_panic: Arc, + job_tx: JobThread, } impl fmt::Debug for ModuleHost { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ModuleHost") .field("info", &self.info) - .field("inner", &Arc::as_ptr(&self.inner)) + .field("module", &Arc::as_ptr(&self.module)) .finish() } } -#[async_trait::async_trait] -trait DynModuleHost: Send + Sync + 'static { - async fn get_instance(&self, db: Identity) -> Result, NoSuchModule>; - fn replica_ctx(&self) -> &ReplicaContext; - async fn exit(&self); - async fn exited(&self); -} - -struct HostControllerActor { - module: Arc, - instance_pool: LendingPool, -} - -impl HostControllerActor { - fn spinup_new_instance(&self) { - let (module, instance_pool) = (self.module.clone(), self.instance_pool.clone()); - rayon::spawn(move || { - let instance = module.create_instance(); - match instance_pool.add(instance) { - Ok(()) => {} - Err(PoolClosed) => { - // if the module closed since this new instance was requested, oh well, just throw it away - } - } - }) - } -} - -/// runs future A and future B concurrently. if A completes before B, B is cancelled. if B completes -/// before A, A is polled to completion -async fn select_first>(fut_a: A, fut_b: B) -> A::Output { - tokio::select! { - ret = fut_a => ret, - Err(x) = fut_b.never_error() => match x {}, - } -} - -#[async_trait::async_trait] -impl DynModuleHost for HostControllerActor { - async fn get_instance(&self, db: Identity) -> Result, NoSuchModule> { - // in the future we should do something like in the else branch here -- add more instances based on load. - // we need to do write-skew retries first - right now there's only ever once instance per module. - let inst = if true { - self.instance_pool - .request_with_context(db) - .await - .map_err(|_| NoSuchModule)? - } else { - const GET_INSTANCE_TIMEOUT: Duration = Duration::from_millis(500); - select_first( - self.instance_pool.request_with_context(db), - tokio::time::sleep(GET_INSTANCE_TIMEOUT).map(|()| self.spinup_new_instance()), - ) - .await - .map_err(|_| NoSuchModule)? - }; - Ok(Box::new(AutoReplacingModuleInstance { - inst, - module: self.module.clone(), - })) - } - - fn replica_ctx(&self) -> &ReplicaContext { - self.module.replica_ctx() - } - - async fn exit(&self) { - self.module.scheduler().close(); - self.instance_pool.close(); - self.exited().await - } - - async fn exited(&self) { - tokio::join!(self.module.scheduler().closed(), self.instance_pool.closed()); - } -} - pub struct WeakModuleHost { info: Arc, - inner: Weak, + inner: Weak, on_panic: Weak, + tx: WeakJobThread, } #[derive(Debug)] @@ -596,16 +523,25 @@ pub enum ClientConnectedError { } impl ModuleHost { - pub fn new(mut module: impl Module, on_panic: impl Fn() + Send + Sync + 'static) -> Self { + pub(super) fn new(module: impl Module, on_panic: impl Fn() + Send + Sync + 'static, core: JobCore) -> Self { let info = module.info(); - let instance_pool = LendingPool::new(); - instance_pool.add_multiple(module.initial_instances()).unwrap(); - let inner = Arc::new(HostControllerActor { - module: Arc::new(module), - instance_pool, - }); + let module = Arc::new(module); let on_panic = Arc::new(on_panic); - ModuleHost { info, inner, on_panic } + + let module_clone = module.clone(); + let job_tx = core.start( + move || AutoReplacingModuleInstance { + inst: module_clone.create_instance(), + module: module_clone, + }, + |x| x as &mut dyn ModuleInstance, + ); + ModuleHost { + info, + module, + on_panic, + job_tx, + } } #[inline] @@ -623,14 +559,22 @@ impl ModuleHost { F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static, R: Send + 'static, { - let mut inst = { - // Record the time spent waiting in the queue - let _guard = WORKER_METRICS - .reducer_wait_time - .with_label_values(&self.info.database_identity, reducer) - .start_timer(); - self.inner.get_instance(self.info.database_identity).await? - }; + // Record the time until our function starts running. + let queue_timer = WORKER_METRICS + .reducer_wait_time + .with_label_values(&self.info.database_identity, reducer) + .start_timer(); + let queue_length_gauge = WORKER_METRICS + .instance_queue_length + .with_label_values(&self.info.database_identity); + queue_length_gauge.inc(); + { + let queue_length = queue_length_gauge.get(); + WORKER_METRICS + .instance_queue_length_histogram + .with_label_values(&self.info.database_identity) + .observe(queue_length as f64); + } // Operations on module instances (e.g. calling reducers) is blocking, // partially because the computation can potentialyl take a long time @@ -645,8 +589,14 @@ impl ModuleHost { log::warn!("reducer {reducer} panicked"); (self.on_panic)(); }); - let result = asyncify(move || f(&mut *inst)).await; - Ok(result) + self.job_tx + .run(move |inst| { + queue_timer.stop_and_record(); + queue_length_gauge.dec(); + f(inst) + }) + .await + .map_err(|_: JobThreadClosed| NoSuchModule) } pub async fn disconnect_client(&self, client_id: ClientActorId) { @@ -737,7 +687,7 @@ impl ModuleHost { arg_bsatn: Bytes::new(), }); - let stdb = self.inner.replica_ctx().relational_db.clone(); + let stdb = self.module.replica_ctx().relational_db.clone(); asyncify(move || { stdb.with_auto_commit(workload, |mut_tx| { mut_tx @@ -790,7 +740,7 @@ impl ModuleHost { timestamp: Timestamp::now(), arg_bsatn: Bytes::new(), }); - let stdb = self.inner.replica_ctx().relational_db.clone(); + let stdb = self.module.replica_ctx().relational_db.clone(); let database_identity = self.info.database_identity; asyncify(move || { stdb.with_auto_commit(workload, |mut_tx| { @@ -952,7 +902,7 @@ impl ModuleHost { &self, call_reducer_params: impl FnOnce(&MutTxId) -> anyhow::Result> + Send + 'static, ) -> Result { - let db = self.inner.replica_ctx().relational_db.clone(); + let db = self.module.replica_ctx().relational_db.clone(); // scheduled reducer name not fetched yet, anyway this is only for logging purpose const REDUCER: &str = "scheduled_reducer"; let module = self.info.clone(); @@ -998,7 +948,7 @@ impl ModuleHost { } pub async fn init_database(&self, program: Program) -> Result, InitDatabaseError> { - let replica_ctx = self.inner.replica_ctx().clone(); + let replica_ctx = self.module.replica_ctx().clone(); let info = self.info.clone(); self.call("", move |inst| { init_database(&replica_ctx, &info.module_def, inst, program) @@ -1020,11 +970,13 @@ impl ModuleHost { } pub async fn exit(&self) { - self.inner.exit().await + self.module.scheduler().close(); + self.job_tx.close(); + self.exited().await; } pub async fn exited(&self) { - self.inner.exited().await + tokio::join!(self.module.scheduler().closed(), self.job_tx.closed()); } pub fn inject_logs(&self, log_level: LogLevel, message: &str) { @@ -1120,8 +1072,9 @@ impl ModuleHost { pub fn downgrade(&self) -> WeakModuleHost { WeakModuleHost { info: self.info.clone(), - inner: Arc::downgrade(&self.inner), + inner: Arc::downgrade(&self.module), on_panic: Arc::downgrade(&self.on_panic), + tx: self.job_tx.downgrade(), } } @@ -1130,7 +1083,7 @@ impl ModuleHost { } pub(crate) fn replica_ctx(&self) -> &ReplicaContext { - self.inner.replica_ctx() + self.module.replica_ctx() } } @@ -1138,10 +1091,12 @@ impl WeakModuleHost { pub fn upgrade(&self) -> Option { let inner = self.inner.upgrade()?; let on_panic = self.on_panic.upgrade()?; + let tx = self.tx.upgrade()?; Some(ModuleHost { info: self.info.clone(), - inner, + module: inner, on_panic, + job_tx: tx, }) } } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 1b1c61ef07d..716102f456d 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -15,7 +15,8 @@ use crate::energy::{EnergyMonitor, EnergyQuanta, ReducerBudget, ReducerFingerpri use crate::execution_context::{self, ReducerContext, Workload}; use crate::host::instance_env::InstanceEnv; use crate::host::module_host::{ - CallReducerParams, DatabaseUpdate, EventStatus, Module, ModuleEvent, ModuleFunctionCall, ModuleInfo, ModuleInstance, + CallReducerParams, DatabaseUpdate, DynModule, EventStatus, Module, ModuleEvent, ModuleFunctionCall, ModuleInfo, + ModuleInstance, }; use crate::host::{ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, UpdateDatabaseResult}; use crate::identity::Identity; @@ -200,6 +201,16 @@ impl WasmModuleHostActor { } } +impl DynModule for WasmModuleHostActor { + fn replica_ctx(&self) -> &Arc { + &self.replica_context + } + + fn scheduler(&self) -> &Scheduler { + &self.scheduler + } +} + impl Module for WasmModuleHostActor { type Instance = WasmModuleInstance; @@ -224,14 +235,6 @@ impl Module for WasmModuleHostActor { let _ = instance.extract_descriptions(); self.make_from_instance(instance) } - - fn replica_ctx(&self) -> &ReplicaContext { - &self.replica_context - } - - fn scheduler(&self) -> &Scheduler { - &self.scheduler - } } pub struct WasmModuleInstance { diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index e7f2b240981..57d9f7177ee 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -1,3 +1,5 @@ +use core_affinity::CoreId; +use crossbeam_queue::ArrayQueue; use spacetimedb_paths::server::{ConfigToml, LogsDir}; use std::path::PathBuf; use std::time::Duration; @@ -11,23 +13,7 @@ use tracing_subscriber::prelude::*; use tracing_subscriber::{reload, EnvFilter}; use crate::config::{ConfigFile, LogConfig}; - -pub struct StartupOptions { - /// Options for tracing to configure the global tracing subscriber. Tracing will be disabled - /// if `None`. - pub tracing: Option, - /// Whether or not to configure the global rayon threadpool. - pub rayon: bool, -} - -impl Default for StartupOptions { - fn default() -> Self { - Self { - tracing: Some(TracingOptions::default()), - rayon: true, - } - } -} +use crate::util::jobs::JobCores; pub struct TracingOptions { pub config: LogConfig, @@ -55,18 +41,7 @@ impl Default for TracingOptions { } } -impl StartupOptions { - pub fn configure(self) { - if let Some(tracing_opts) = self.tracing { - configure_tracing(tracing_opts) - } - if self.rayon { - configure_rayon() - } - } -} - -fn configure_tracing(opts: TracingOptions) { +pub fn configure_tracing(opts: TracingOptions) { // Use this to change log levels at runtime. // This means you can change the default log level to trace // if you are trying to debug an issue and need more logs on then turn it off @@ -172,24 +147,147 @@ fn reload_config(conf_file: &ConfigToml, reload_handle: &reload::Handle Cores { + Cores::get().unwrap_or_default() +} + +/// A type holding cores divvied up into different sets. +/// +/// Obtained from [`pin_threads()`]. +#[derive(Default)] +pub struct Cores { + /// The cores to run database instances on. + /// + /// Currently, this is 1/8 of num_cpus. + pub databases: JobCores, + /// The cores to run tokio worker and blocking threads on. + /// + /// Currently, tokio worker threads are 4/8 of num_cpus, and tokio blocking + /// threads are pinned non-exclusively to 2/8 of num_cpus. + pub tokio: TokioCores, + /// The cores to run rayon threads on. + /// + /// Currently, this is 1/8 of num_cpus. + pub rayon: RayonCores, +} + +impl Cores { + fn get() -> Option { + let cores = &mut core_affinity::get_core_ids() + .filter(|cores| cores.len() >= 8)? + .into_iter(); + + let total = cores.len() as f64; + let frac = |frac: f64| (total * frac).ceil() as usize; + + let databases = cores.take(frac(1.0 / 8.0)).collect(); + + let tokio_workers = cores.take(frac(4.0 / 8.0)).collect(); + + let rayon = RayonCores(Some(cores.take(frac(1.0 / 8.0)).collect())); + + // see comment on `TokioCores.blocking` + #[cfg(target_os = "linux")] + let remaining = cores.try_fold(nix::sched::CpuSet::new(), |mut cpuset, core| { + cpuset.set(core.id).ok()?; + Some(cpuset) + }); + + let tokio = TokioCores { + workers: Some(tokio_workers), + #[cfg(target_os = "linux")] + blocking: remaining, + }; + + Some(Self { + databases, + tokio, + rayon, + }) + } +} + +#[derive(Default)] +pub struct TokioCores { + workers: Option>, + // For blocking threads, we don't want to limit them to a specific number + // and pin them to their own cores - they're supposed to run concurrently + // with each other. However, `core_affinity` doesn't support affinity masks, + // so we just use the Linux-specific API, since this is only a slight boost + // and we don't care enough about performance on other platforms. + #[cfg(target_os = "linux")] + blocking: Option, +} + +impl TokioCores { + /// Configures `builder` to pin its worker threads to specific cores. + pub fn configure(self, builder: &mut tokio::runtime::Builder) { + if let Some(cores) = self.workers { + builder.worker_threads(cores.len()); + + let cores_queue = Box::new(ArrayQueue::new(cores.len())); + for core in cores { + cores_queue.push(core).unwrap(); + } + + // `on_thread_start` gets called for both async worker threads and blocking threads, + // but the first `worker_threads` threads that tokio spawns are worker threads, + // so this ends up working fine + builder.on_thread_start(move || { + if let Some(core) = cores_queue.pop() { + core_affinity::set_for_current(core); + } else { + #[cfg(target_os = "linux")] + if let Some(cpuset) = &self.blocking { + let this = nix::unistd::Pid::from_raw(0); + let _ = nix::sched::sched_setaffinity(this, cpuset); + } + } + }); + } + } +} + +#[derive(Default)] +pub struct RayonCores(Option>); + +impl RayonCores { + /// Configures a global rayon threadpool, pinning its threads to specific cores. + /// + /// All rayon threads will be run with `tokio_handle` enetered into. + pub fn configure(self, tokio_handle: &tokio::runtime::Handle) { + rayon_core::ThreadPoolBuilder::new() + .thread_name(|_idx| "rayon-worker".to_string()) + .spawn_handler(thread_spawn_handler(tokio_handle)) + .num_threads(self.0.as_ref().map_or(0, |cores| cores.len())) + .start_handler(move |i| { + if let Some(cores) = &self.0 { + core_affinity::set_for_current(cores[i]); + } + }) + .build_global() + .unwrap() + } } /// A Rayon [spawn_handler](https://docs.rs/rustc-rayon-core/latest/rayon_core/struct.ThreadPoolBuilder.html#method.spawn_handler) @@ -205,7 +303,9 @@ fn configure_rayon() { /// i.e. that every async operation they invoke immediately completes. /// I (pgoldman 2024-02-22) believe that our Rayon threads only ever send to unbounded channels, /// and therefore never wait. -fn thread_spawn_handler(rt: tokio::runtime::Handle) -> impl FnMut(rayon::ThreadBuilder) -> Result<(), std::io::Error> { +fn thread_spawn_handler( + rt: &tokio::runtime::Handle, +) -> impl FnMut(rayon::ThreadBuilder) -> Result<(), std::io::Error> + '_ { move |thread| { let rt = rt.clone(); let mut builder = std::thread::Builder::new(); diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index 38a89c4cd07..c80f46189ce 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -32,7 +32,6 @@ use crate::sql::ast::SchemaViewer; use crate::vm::{build_query, TxMode}; use anyhow::Context; use itertools::Either; -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use spacetimedb_client_api_messages::websocket::{Compression, WebsocketFormat}; use spacetimedb_data_structures::map::HashSet; use spacetimedb_lib::db::auth::{StAccess, StTableType}; @@ -523,7 +522,7 @@ impl ExecutionSet { let tables = self .exec_units // if you need eval to run single-threaded for debugging, change this to .iter() - .par_iter() + .iter() .filter_map(|unit| unit.eval(db, tx, &unit.sql, slow_query_threshold, compression)) .collect(); ws::DatabaseUpdate { tables } diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs new file mode 100644 index 00000000000..b31be07727e --- /dev/null +++ b/crates/core/src/util/jobs.rs @@ -0,0 +1,315 @@ +use std::sync::{Arc, Mutex, Weak}; + +use core_affinity::CoreId; +use indexmap::IndexMap; +use smallvec::SmallVec; +use spacetimedb_data_structures::map::HashMap; +use tokio::sync::{mpsc, oneshot, watch}; + +use super::notify_once::NotifyOnce; + +/// A handle to a pool of CPU cores for running job threads on. +/// +/// Each thread is represented by a [`JobThread`], which is pinned to a single +/// core and sequentially runs the jobs that are passed to [`JobThread::run`]. +/// This pool attempts to keep the number of `JobThread`s pinned to each core +/// as equitable as possible; new threads allocated by [`Self::take()`] are +/// assigned to cores in a round-robin fashion, and when a thread exits, it +/// takes a thread pinned to a busier core and repins it to the core it was +/// just running on. +/// +/// Construction is done via the `FromIterator` impl. If created from an empty +/// iterator or via `JobCores::default()`, the job threads will work but not be +/// pinned to any threads. +/// +/// This handle is cheaply cloneable. If all instances of it are dropped, +/// threads will continue running, but will no longer repin each other +/// when one exits. +#[derive(Default, Clone)] +pub struct JobCores { + inner: Option>>, +} + +struct JobCoresInner { + /// A map to the repin_tx for each job thread + job_threads: HashMap>, + cores: IndexMap, + /// An index into `cores` of the next core to put a new job onto. + /// + /// This acts as a partition point in `cores`; all cores in `..index` have + /// one fewer job on them than the cores in `index..`. + next_core: usize, + next_id: JobThreadId, +} + +#[derive(Default)] +struct CoreInfo { + jobs: SmallVec<[JobThreadId; 4]>, +} + +#[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +struct JobThreadId(usize); + +impl JobCores { + /// Reserve a core from the pool to later start a job thread on. + pub fn take(&self) -> JobCore { + let inner = if let Some(inner) = &self.inner { + let cores = Arc::downgrade(inner); + let (id, repin_rx) = inner.lock().unwrap().allocate(); + Some(JobCoreInner { + repin_rx, + _guard: JobCoreGuard { cores, id }, + }) + } else { + None + }; + + JobCore { inner } + } +} + +impl FromIterator for JobCores { + fn from_iter>(iter: T) -> Self { + let cores: IndexMap<_, _> = iter.into_iter().map(|id| (id, CoreInfo::default())).collect(); + let inner = (!cores.is_empty()).then(|| { + Arc::new(Mutex::new(JobCoresInner { + job_threads: HashMap::default(), + cores, + next_core: 0, + next_id: JobThreadId(0), + })) + }); + Self { inner } + } +} + +impl JobCoresInner { + fn allocate(&mut self) -> (JobThreadId, watch::Receiver) { + let id = self.next_id; + self.next_id.0 += 1; + + let (&core_id, core) = self.cores.get_index_mut(self.next_core).unwrap(); + core.jobs.push(id); + self.next_core = (self.next_core + 1) % self.cores.len(); + + let (repin_tx, repin_rx) = watch::channel(core_id); + self.job_threads.insert(id, repin_tx); + + (id, repin_rx) + } + + /// Run when a `JobThread` exits. + fn deallocate(&mut self, id: JobThreadId) { + let core_id = *self.job_threads.remove(&id).unwrap().borrow(); + + let core_index = self.cores.get_index_of(&core_id).unwrap(); + + // This core is now less busy than it should be - bump `next_core` back + // by 1 and steal a thread from the core there. + // + // This wraps around in the 0 case, so the partition point is simply + // moved to the end of the ring buffer. + + let steal_from_index = self.next_core.checked_sub(1).unwrap_or(self.cores.len() - 1); + + // if this core was already at `next_core - 1`, we don't need to steal from anywhere + let (core, steal_from) = match self.cores.get_disjoint_indices_mut([core_index, steal_from_index]) { + Ok([(_, core), (_, steal_from)]) => (core, Some(steal_from)), + Err(_) => (&mut self.cores[core_index], None), + }; + + let pos = core.jobs.iter().position(|x| *x == id).unwrap(); + core.jobs.remove(pos); + + if let Some(steal_from) = steal_from { + // This unwrap will never fail, since cores below `next_core` always have + // at least 1 thread on them. Edge case: if `next_core` is 0, `steal_from` + // would wrap around to the end - but when `next_core` is 0, every core has + // the same number of threads; so, if the last core is empty, all the cores + // would be empty, but we know that's impossible because we're deallocating + // a thread right now. + let stolen = steal_from.jobs.pop().unwrap(); + // the way we pop and push here means that older job threads will be less + // likely to be repinned, while younger ones are liable to bounce around. + core.jobs.push(stolen); + self.job_threads[&stolen].send_replace(core_id); + } + + self.next_core = steal_from_index; + } +} + +/// A core taken from [`JobCores`], not yet running a job loop. +#[derive(Default)] +pub struct JobCore { + inner: Option, +} + +struct JobCoreInner { + repin_rx: watch::Receiver, + _guard: JobCoreGuard, +} + +impl JobCore { + /// Start running a job thread on this core. + /// + /// `init` constructs the data provided to each job, and `unsize` unsizes + /// it to `&mut T`, if necessary. + pub fn start(self, init: F, unsize: F2) -> JobThread + where + F: FnOnce() -> U + Send + 'static, + F2: FnOnce(&mut U) -> &mut T + Send + 'static, + U: 'static, + T: ?Sized + 'static, + { + let (tx, rx) = mpsc::channel::>>(Self::JOB_CHANNEL_LENGTH); + let close = Arc::new(NotifyOnce::new()); + + let closed = close.clone(); + let handle = tokio::runtime::Handle::current(); + std::thread::spawn(move || { + let mut data = init(); + let data = unsize(&mut data); + handle.block_on(self.job_loop(rx, closed, data)) + }); + + JobThread { tx, close } + } + + // this shouldn't matter too much, since callers will need to wait for + // the job to finish anyway. + const JOB_CHANNEL_LENGTH: usize = 64; + + async fn job_loop(mut self, mut rx: mpsc::Receiver>>, closed: Arc, data: &mut T) { + // this function is async because we need to recv on the repin channel + // and the jobs channel, but the jobs being run are blocking + + let repin_rx = self.inner.as_mut().map(|inner| &mut inner.repin_rx); + let repin_loop = async { + if let Some(rx) = repin_rx { + rx.mark_changed(); + while rx.changed().await.is_ok() { + core_affinity::set_for_current(*rx.borrow_and_update()); + } + } + }; + + let job_loop = async { + while let Some(job) = rx.recv().await { + // blocking in place means that other futures on the same task + // won't get polled - in this case, that's just the repin loop, + // which is fine because it can just run before the next job. + tokio::task::block_in_place(|| job(data)) + } + }; + + tokio::select! { + () = super::also_poll(job_loop, repin_loop) => {} + // when we receive a close notification, we immediately drop all + // remaining jobs in the queue. + () = closed.notified() => {} + } + } +} + +/// On drop, tells the `JobCores` that this core has been freed up. +struct JobCoreGuard { + cores: Weak>, + id: JobThreadId, +} + +impl Drop for JobCoreGuard { + fn drop(&mut self) { + if let Some(cores) = self.cores.upgrade() { + cores.lock().unwrap().deallocate(self.id); + } + } +} + +/// A handle to a thread running a job loop; see [`JobCores`] for more details. +/// +/// The thread stores data of type `T`; jobs run on the thread will be given +/// mutable access to it. +/// +/// This handle is cheaply cloneable. If all strong handles have been dropped, +/// the thread will shut down. +pub struct JobThread { + tx: mpsc::Sender>>, + close: Arc, +} + +impl Clone for JobThread { + fn clone(&self) -> Self { + Self { + tx: self.tx.clone(), + close: self.close.clone(), + } + } +} + +type Job = dyn FnOnce(&mut T) + Send; + +impl JobThread { + /// Run a blocking job on this `JobThread`. + /// + /// The job (`f`) will be placed in a queue, and will run strictly after + /// jobs ahead of it in the queue. If `f` panics, it will be bubbled up to + /// the calling task. + pub async fn run(&self, f: F) -> Result + where + F: FnOnce(&mut T) -> R + Send + 'static, + R: Send + 'static, + { + let (ret_tx, ret_rx) = oneshot::channel(); + + let span = tracing::Span::current(); + self.tx + .send(Box::new(move |data| { + let _entered = span.entered(); + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(data))); + if let Err(Err(_panic)) = ret_tx.send(result) { + tracing::warn!("uncaught panic on threadpool") + } + })) + .await + .map_err(|_| JobThreadClosed)?; + + match ret_rx.await { + Ok(Ok(ret)) => Ok(ret), + Ok(Err(panic)) => std::panic::resume_unwind(panic), + Err(_closed) => Err(JobThreadClosed), + } + } + + /// Shutdown the job thread. + pub fn close(&self) { + self.close.notify(); + } + + /// Returns a future that resolves once the job thread has been closed. + pub async fn closed(&self) { + self.tx.closed().await + } + + /// Obtain a weak version of this handle. + pub fn downgrade(&self) -> WeakJobThread { + let tx = self.tx.downgrade(); + let close = Arc::downgrade(&self.close); + WeakJobThread { tx, close } + } +} + +pub struct JobThreadClosed; + +/// A weak version of `JobThread` that does not hold the thread open. +// used in crate::core::module_host::WeakModuleHost +pub struct WeakJobThread { + tx: mpsc::WeakSender>>, + close: Weak, +} + +impl WeakJobThread { + pub fn upgrade(&self) -> Option> { + Option::zip(self.tx.upgrade(), self.close.upgrade()).map(|(tx, close)| JobThread { tx, close }) + } +} diff --git a/crates/core/src/util/lending_pool.rs b/crates/core/src/util/lending_pool.rs deleted file mode 100644 index 09b55969b2b..00000000000 --- a/crates/core/src/util/lending_pool.rs +++ /dev/null @@ -1,219 +0,0 @@ -//! like, a semaphore but with values. or something - -use std::collections::VecDeque; -use std::future::Future; -use std::mem::ManuallyDrop; -use std::ops::{Deref, DerefMut}; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use parking_lot::Mutex; -use spacetimedb_lib::Identity; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; - -use crate::worker_metrics::WORKER_METRICS; - -use super::notify_once::{NotifiedOnce, NotifyOnce}; - -pub struct LendingPool { - sem: Arc, - inner: Arc>, -} - -impl Default for LendingPool { - fn default() -> Self { - Self::new() - } -} - -impl Clone for LendingPool { - fn clone(&self) -> Self { - Self { - sem: self.sem.clone(), - inner: self.inner.clone(), - } - } -} - -struct LendingPoolInner { - closed_notify: NotifyOnce, - vec: Mutex>, -} - -struct PoolVec { - total_count: usize, - deque: Option>, -} - -#[derive(Debug)] -pub struct PoolClosed; - -/// A scope guard for the reducer queue length metric, -/// ensuring an increment is always be paired with one and only one decrement. -struct QueueMetric { - db: Identity, -} - -impl Drop for QueueMetric { - fn drop(&mut self) { - WORKER_METRICS.instance_queue_length.with_label_values(&self.db).dec(); - let queue_length = WORKER_METRICS.instance_queue_length.with_label_values(&self.db).get(); - WORKER_METRICS - .instance_queue_length_histogram - .with_label_values(&self.db) - .observe(queue_length as f64); - } -} - -impl QueueMetric { - fn inc(db: Identity) -> Self { - WORKER_METRICS.instance_queue_length.with_label_values(&db).inc(); - let queue_length = WORKER_METRICS.instance_queue_length.with_label_values(&db).get(); - WORKER_METRICS - .instance_queue_length_histogram - .with_label_values(&db) - .observe(queue_length as f64); - Self { db } - } -} - -impl LendingPool { - pub fn new() -> Self { - Self::from_iter(std::iter::empty()) - } - - pub fn request_with_context(&self, db: Identity) -> impl Future, PoolClosed>> { - let acq = self.sem.clone().acquire_owned(); - let pool_inner = self.inner.clone(); - - async move { - let _guard = QueueMetric::inc(db); - let permit = acq.await.map_err(|_| PoolClosed)?; - let resource = pool_inner - .vec - .lock() - .deque - .as_mut() - .ok_or(PoolClosed)? - .pop_front() - .ok_or(PoolClosed)?; - Ok(LentResource { - resource: ManuallyDrop::new(resource), - permit: ManuallyDrop::new(permit), - pool_inner, - }) - } - } - - pub fn add(&self, resource: T) -> Result<(), PoolClosed> { - self.add_multiple(std::iter::once(resource)) - } - - pub fn add_multiple>(&self, resources: I) -> Result<(), PoolClosed> { - let resources = resources.into_iter(); - let mut inner = self.inner.vec.lock(); - let deque = inner.deque.as_mut().ok_or(PoolClosed)?; - let mut num_new = 0; - deque.extend(resources.inspect(|_| num_new += 1)); - inner.total_count += num_new; - self.sem.add_permits(num_new); - Ok(()) - } - - pub fn num_total(&self) -> usize { - self.inner.vec.lock().total_count - } - - pub fn num_available(&self) -> usize { - self.sem.available_permits() - } - - pub fn close(&self) { - let mut vec = self.inner.vec.lock(); - self.sem.close(); - if let Some(deque) = vec.deque.take() { - vec.total_count -= deque.len(); - } - if vec.total_count == 0 { - self.inner.closed_notify.notify(); - } - } - - pub fn closed(&self) -> Closed<'_> { - Closed { - notified: self.inner.closed_notify.notified(), - } - } -} - -impl FromIterator for LendingPool { - fn from_iter>(iter: I) -> Self { - let deque = VecDeque::from_iter(iter); - Self { - sem: Arc::new(Semaphore::new(deque.len())), - inner: Arc::new(LendingPoolInner { - closed_notify: NotifyOnce::new(), - vec: Mutex::new(PoolVec { - total_count: deque.len(), - deque: Some(deque), - }), - }), - } - } -} - -pin_project_lite::pin_project! { - pub struct Closed<'a> { - #[pin] - notified: NotifiedOnce<'a>, - } -} - -impl Future for Closed<'_> { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().notified.poll(cx) - } -} - -pub struct LentResource { - resource: ManuallyDrop, - permit: ManuallyDrop, - pool_inner: Arc>, -} - -impl Deref for LentResource { - type Target = T; - fn deref(&self) -> &T { - &self.resource - } -} - -impl DerefMut for LentResource { - fn deref_mut(&mut self) -> &mut T { - &mut self.resource - } -} - -impl Drop for LentResource { - fn drop(&mut self) { - let resource = unsafe { ManuallyDrop::take(&mut self.resource) }; - let permit = unsafe { ManuallyDrop::take(&mut self.permit) }; - { - let mut vec = self.pool_inner.vec.lock(); - if let Some(deque) = &mut vec.deque { - deque.push_back(resource); - drop(permit); - } else { - drop(resource); - permit.forget(); - vec.total_count -= 1; - if vec.total_count == 0 { - self.pool_inner.closed_notify.notify(); - } - } - } - } -} diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs index 5830e4b299e..cc37d75ed38 100644 --- a/crates/core/src/util/mod.rs +++ b/crates/core/src/util/mod.rs @@ -5,7 +5,7 @@ use tokio::sync::oneshot; pub mod prometheus_handle; -pub mod lending_pool; +pub mod jobs; pub mod notify_once; pub mod slow; diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index d19e1a4415f..6021694c9a7 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -20,6 +20,7 @@ use spacetimedb::host::{ }; use spacetimedb::identity::Identity; use spacetimedb::messages::control_db::{Database, Node, Replica}; +use spacetimedb::util::jobs::JobCores; use spacetimedb::worker_metrics::WORKER_METRICS; use spacetimedb_client_api::auth::{self, LOCALHOST}; use spacetimedb_client_api::{Host, NodeDelegate}; @@ -46,6 +47,7 @@ impl StandaloneEnv { config: Config, certs: &CertificateAuthority, data_dir: Arc, + db_cores: JobCores, ) -> anyhow::Result> { let _pid_file = data_dir.pid_file()?; let meta_path = data_dir.metadata_toml(); @@ -68,6 +70,7 @@ impl StandaloneEnv { program_store.clone(), energy_monitor, durability_provider, + db_cores, ); let client_actor_index = ClientActorIndex::new(); let jwt_keys = certs.get_or_create_keys()?; @@ -461,9 +464,9 @@ impl StandaloneEnv { } } -pub async fn exec_subcommand(cmd: &str, args: &ArgMatches) -> Result<(), anyhow::Error> { +pub async fn exec_subcommand(cmd: &str, args: &ArgMatches, db_cores: JobCores) -> Result<(), anyhow::Error> { match cmd { - "start" => start::exec(args).await, + "start" => start::exec(args, db_cores).await, "extract-schema" => extract_schema::exec(args).await, unknown => Err(anyhow::anyhow!("Invalid subcommand: {}", unknown)), } @@ -479,7 +482,7 @@ pub async fn start_server(data_dir: &ServerDataDir, cert_dir: Option<&std::path: args.extend(["--jwt-key-dir".as_ref(), cert_dir.as_os_str()]) } let args = start::cli().try_get_matches_from(args)?; - start::exec(&args).await + start::exec(&args, JobCores::default()).await } #[cfg(test)] @@ -516,9 +519,11 @@ mod tests { page_pool_max_size: None, }; - let _env = StandaloneEnv::init(config, &ca, data_dir.clone()).await?; + let _env = StandaloneEnv::init(config, &ca, data_dir.clone(), Default::default()).await?; // Ensure that we have a lock. - assert!(StandaloneEnv::init(config, &ca, data_dir.clone()).await.is_err()); + assert!(StandaloneEnv::init(config, &ca, data_dir.clone(), Default::default()) + .await + .is_err()); Ok(()) } diff --git a/crates/standalone/src/main.rs b/crates/standalone/src/main.rs index cb1868fde95..81aee00bfa3 100644 --- a/crates/standalone/src/main.rs +++ b/crates/standalone/src/main.rs @@ -1,15 +1,17 @@ use clap::Command; +use spacetimedb::startup; +use spacetimedb::util::jobs::JobCores; use tokio::runtime::Builder; use spacetimedb_standalone::*; use std::panic; use std::process; -async fn async_main() -> anyhow::Result<()> { +async fn async_main(db_cores: JobCores) -> anyhow::Result<()> { let matches = get_command().get_matches(); let (cmd, subcommand_args) = matches.subcommand().unwrap(); - exec_subcommand(cmd, subcommand_args).await?; + exec_subcommand(cmd, subcommand_args, db_cores).await?; Ok(()) } @@ -66,10 +68,13 @@ fn main() -> anyhow::Result<()> { process::exit(1); })); + let cores = startup::pin_threads(); + // Create a multi-threaded run loop - Builder::new_multi_thread() - .enable_all() - .build() - .unwrap() - .block_on(async_main()) + let mut builder = Builder::new_multi_thread(); + builder.enable_all(); + cores.tokio.configure(&mut builder); + let rt = builder.build().unwrap(); + cores.rayon.configure(rt.handle()); + rt.block_on(async_main(cores.databases)) } diff --git a/crates/standalone/src/subcommands/start.rs b/crates/standalone/src/subcommands/start.rs index e7c2bb69c22..54ae5a1cefa 100644 --- a/crates/standalone/src/subcommands/start.rs +++ b/crates/standalone/src/subcommands/start.rs @@ -8,6 +8,7 @@ use clap::{Arg, ArgMatches}; use spacetimedb::config::{CertificateAuthority, ConfigFile}; use spacetimedb::db::{Config, Storage}; use spacetimedb::startup::{self, TracingOptions}; +use spacetimedb::util::jobs::JobCores; use spacetimedb::worker_metrics; use spacetimedb_client_api::routes::database::DatabaseRoutes; use spacetimedb_client_api::routes::router; @@ -75,7 +76,7 @@ pub fn cli() -> clap::Command { // .after_help("Run `spacetime help start` for more detailed information.") } -pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> { +pub async fn exec(args: &ArgMatches, db_cores: JobCores) -> anyhow::Result<()> { let listen_addr = args.get_one::("listen_addr").unwrap(); let cert_dir = args.get_one::("jwt_key_dir"); let certs = Option::zip( @@ -122,24 +123,20 @@ pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> { } }; - startup::StartupOptions { - tracing: Some(TracingOptions { - config: config.logs, - reload_config: cfg!(debug_assertions).then_some(config_path), - disk_logging: std::env::var_os("SPACETIMEDB_DISABLE_DISK_LOGGING") - .is_none() - .then(|| data_dir.logs()), - edition: "standalone".to_owned(), - tracy: enable_tracy || std::env::var_os("SPACETIMEDB_TRACY").is_some(), - flamegraph: std::env::var_os("SPACETIMEDB_FLAMEGRAPH").map(|_| { - std::env::var_os("SPACETIMEDB_FLAMEGRAPH_PATH") - .unwrap_or("/var/log/flamegraph.folded".into()) - .into() - }), + startup::configure_tracing(TracingOptions { + config: config.logs, + reload_config: cfg!(debug_assertions).then_some(config_path), + disk_logging: std::env::var_os("SPACETIMEDB_DISABLE_DISK_LOGGING") + .is_none() + .then(|| data_dir.logs()), + edition: "standalone".to_owned(), + tracy: enable_tracy || std::env::var_os("SPACETIMEDB_TRACY").is_some(), + flamegraph: std::env::var_os("SPACETIMEDB_FLAMEGRAPH").map(|_| { + std::env::var_os("SPACETIMEDB_FLAMEGRAPH_PATH") + .unwrap_or("/var/log/flamegraph.folded".into()) + .into() }), - ..Default::default() - } - .configure(); + }); let certs = certs .or(config.certificate_authority) @@ -147,11 +144,10 @@ pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> { .context("cannot omit --jwt-{pub,priv}-key-path when those options are not specified in config.toml")?; let data_dir = Arc::new(data_dir.clone()); - let ctx = StandaloneEnv::init(db_config, &certs, data_dir).await?; + let ctx = StandaloneEnv::init(db_config, &certs, data_dir, db_cores).await?; worker_metrics::spawn_jemalloc_stats(listen_addr.clone()); worker_metrics::spawn_tokio_stats(listen_addr.clone()); worker_metrics::spawn_page_pool_stats(listen_addr.clone(), ctx.page_pool().clone()); - let mut db_routes = DatabaseRoutes::default(); db_routes.root_post = db_routes.root_post.layer(DefaultBodyLimit::disable()); db_routes.db_put = db_routes.db_put.layer(DefaultBodyLimit::disable()); diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index d8e2556902a..22c55b9af8a 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -178,9 +178,10 @@ impl CompiledModule { }; let certs = CertificateAuthority::in_cli_config_dir(&paths.cli_config_dir); - let env = spacetimedb_standalone::StandaloneEnv::init(config, &certs, paths.data_dir.into()) - .await - .unwrap(); + let env = + spacetimedb_standalone::StandaloneEnv::init(config, &certs, paths.data_dir.into(), Default::default()) + .await + .unwrap(); // TODO: Fix this when we update identity generation. let identity = Identity::ZERO; let db_identity = SpacetimeAuth::alloc(&env).await.unwrap().identity;