Skip to content

Commit 04b47ba

Browse files
authored
Use the same single threaded runtime for all wasm operations (#5172)
# Description of Changes Merged `SingleThreadedExecutor` into `SingleCoreExecutor`. Before this change, we would allocate two database worker threads for WASM modules - one for reducers and one for procedures. With this change, `procedures` run on a Tokio `LocalSet`, while reducers run inline on the OS thread. The goal is to avoid spawning more than one thread. I don’t think this meaningfully changes runtime behavior (or we can call it bit of an optimisaion), and it still avoids the `async` overhead for reducer calls. It also better aligns with the “thread per core” principle and allows the single-threaded DST executor to perform close to production simulation. # API and ABI breaking changes NA # Expected complexity level and risk 2, simple runtime behaviour change but it could have hidden implications if done wrong. # Testing I think existing tests should be enough, considering we also have benchmark tests in CI.
1 parent cb92b6f commit 04b47ba

3 files changed

Lines changed: 100 additions & 194 deletions

File tree

crates/core/src/host/module_host.rs

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::subscription::row_list_builder_pool::{BsatnRowListBuilderPool, JsonRo
2828
use crate::subscription::tx::DeltaTx;
2929
use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource};
3030
use crate::subscription::{execute_plan, execute_plan_for_view};
31-
use crate::util::jobs::{AllocatedJobCore, SingleCoreExecutor, SingleThreadedExecutor};
31+
use crate::util::jobs::{AllocatedJobCore, SingleThreadedExecutor};
3232
use crate::worker_metrics::WORKER_METRICS;
3333
use anyhow::Context;
3434
use bytes::Bytes;
@@ -342,8 +342,7 @@ pub enum ModuleWithInstance {
342342
Wasm {
343343
module: super::wasmtime::Module,
344344
procedure_module: super::wasmtime::ProcedureModule,
345-
main_thread_name: String,
346-
procedure_thread_name: String,
345+
thread_name: String,
347346
core: AllocatedJobCore,
348347
init_inst: Box<super::wasmtime::ModuleInstance>,
349348
procedure_instance_pool_size: NonZeroUsize,
@@ -407,9 +406,8 @@ impl WasmtimeModuleState {
407406
}
408407
}
409408

410-
/// Wasm uses a single-core executor backed by a Tokio single threaded runtime
411-
/// for async procedures. It uses an executor backed by a single OS-thread for
412-
/// everything else.
409+
/// Wasm uses a single executor backed by a single OS thread with a Tokio LocalSet
410+
/// for async procedures; synchronous reducers run inline on the same thread.
413411
///
414412
/// Note, procedures acquire a module instance from the async procedure pool
415413
/// before being enqueued by the executor.
@@ -418,8 +416,7 @@ impl WasmtimeModuleState {
418416
/// to acquire.
419417
struct WasmtimeModuleHost {
420418
module: Arc<super::wasmtime::Module>,
421-
main_executor: SingleThreadedExecutor<WasmtimeModuleState>,
422-
procedure_executor: SingleCoreExecutor,
419+
executor: SingleThreadedExecutor<WasmtimeModuleState>,
423420
procedure_instances: Arc<WasmtimeProcedureInstanceManager>,
424421
}
425422

@@ -435,7 +432,7 @@ impl WasmtimeModuleHost {
435432
A: Send + 'static,
436433
{
437434
let label = label.to_owned();
438-
self.main_executor.enqueue_job(move |state| {
435+
self.executor.enqueue_sync_job(move |state| {
439436
scopeguard::defer_on_unwind!({
440437
log::warn!("wasm main operation {label} panicked");
441438
on_panic();
@@ -461,7 +458,7 @@ impl WasmtimeModuleHost {
461458
let instance_manager = self.procedure_instances.clone();
462459
let ModuleInstanceLease { instance, slot } = instance_manager.get_instance().await;
463460
let label = label.to_owned();
464-
self.procedure_executor.enqueue_job(async move || {
461+
self.executor.enqueue_async_job(async move || {
465462
scopeguard::defer_on_unwind!({
466463
log::warn!("wasm procedure {label} panicked");
467464
on_panic();
@@ -1709,8 +1706,7 @@ impl ModuleHost {
17091706
ModuleWithInstance::Wasm {
17101707
module,
17111708
procedure_module,
1712-
main_thread_name,
1713-
procedure_thread_name,
1709+
thread_name,
17141710
core,
17151711
init_inst,
17161712
procedure_instance_pool_size,
@@ -1721,20 +1717,7 @@ impl ModuleHost {
17211717
let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity);
17221718
let main_state = WasmtimeModuleState::new(module.clone(), init_inst, metrics.clone());
17231719

1724-
// The wasm main and procedure executors run on separate OS threads,
1725-
// but they intentionally share one database core allocation.
1726-
// When core pinning is enabled, both threads pin to the same core
1727-
// and rebalance together because they use clones of the same `CorePinner`.
1728-
let (load_balance_guard, core_pinner) = core.into_shared();
1729-
1730-
let main_executor = AllocatedJobCore::spawn_executor(
1731-
load_balance_guard.clone(),
1732-
core_pinner.clone(),
1733-
main_state,
1734-
main_thread_name,
1735-
);
1736-
let procedure_executor =
1737-
AllocatedJobCore::spawn_async_executor(load_balance_guard, core_pinner, procedure_thread_name);
1720+
let executor = core.spawn_executor(main_state, thread_name);
17381721
let procedure_instances = Arc::new(ModuleInstanceManager::new_bounded_with_metrics(
17391722
procedure_module,
17401723
None,
@@ -1743,8 +1726,7 @@ impl ModuleHost {
17431726
));
17441727
Arc::new(ModuleHostInner::Wasm(Box::new(WasmtimeModuleHost {
17451728
module,
1746-
main_executor,
1747-
procedure_executor,
1729+
executor,
17481730
procedure_instances,
17491731
})))
17501732
}
@@ -1853,9 +1835,9 @@ impl ModuleHost {
18531835

18541836
Ok(match &*self.inner {
18551837
ModuleHostInner::Wasm(host) => {
1856-
let executor = host.main_executor.clone();
1838+
let executor = host.executor.clone();
18571839
executor
1858-
.run_job(move |state| {
1840+
.run_sync_job(move |state| {
18591841
state.with_instance(move |inst| {
18601842
drop(timer_guard);
18611843
wasm(arg, inst)
@@ -1897,12 +1879,12 @@ impl ModuleHost {
18971879

18981880
Ok(match &*self.inner {
18991881
ModuleHostInner::Wasm(host) => {
1900-
let executor = host.procedure_executor.clone();
1882+
let executor = host.executor.clone();
19011883
let instance_manager = host.procedure_instances.clone();
19021884
instance_manager
19031885
.with_instance(async move |mut inst| {
19041886
executor
1905-
.run_job(async move || {
1887+
.run_async_job(async move || {
19061888
drop(timer_guard);
19071889
let res = wasm(arg, &mut inst).await;
19081890
(res, inst)
@@ -3202,10 +3184,10 @@ impl ModuleHost {
32023184
request,
32033185
|request, inst, on_panic| async move { inst.enqueue_one_off_query(request, on_panic).await },
32043186
move |request, wasm_host, on_panic, timer_guard| {
3205-
let executor = wasm_host.main_executor.clone();
3187+
let executor = wasm_host.executor.clone();
32063188
let info = wasm_host.module.info();
32073189
let label = label.to_owned();
3208-
executor.enqueue_job(move |_| {
3190+
executor.enqueue_sync_job(move |_| {
32093191
scopeguard::defer_on_unwind!({
32103192
log::warn!("websocket one-off query operation {label} panicked");
32113193
on_panic();

crates/core/src/host/wasmtime/mod.rs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -135,21 +135,14 @@ pub type ProcedureModule = WasmModuleHostActor<WasmtimeAsyncModule>;
135135
pub type ModuleInstance = WasmModuleInstance<WasmtimeInstance>;
136136

137137
// Linux thread names expose at most 15 bytes, so keep the database identity
138-
// suffix short enough to survive after the `wasm-main-`/`wasm-proc-` prefix.
139-
const THREAD_NAME_DATABASE_ID_SUFFIX_LEN: usize = 5;
138+
// suffix short enough to survive after the `wasm-` prefix.
139+
const THREAD_NAME_DATABASE_ID_SUFFIX_LEN: usize = 10;
140140

141-
fn wasm_main_worker_thread_name(database_identity: &spacetimedb_lib::Identity) -> String {
141+
fn wasm_worker_thread_name(database_identity: &spacetimedb_lib::Identity) -> String {
142142
let hex = database_identity.to_hex();
143143
// We use the tail of the identity to avoid the common structured prefix.
144144
let suffix = &hex.as_str()[hex.as_str().len() - THREAD_NAME_DATABASE_ID_SUFFIX_LEN..];
145-
format!("wasm-main-{suffix}")
146-
}
147-
148-
fn wasm_procedure_executor_thread_name(database_identity: &spacetimedb_lib::Identity) -> String {
149-
let hex = database_identity.to_hex();
150-
// We use the tail of the identity to avoid the common structured prefix.
151-
let suffix = &hex.as_str()[hex.as_str().len() - THREAD_NAME_DATABASE_ID_SUFFIX_LEN..];
152-
format!("wasm-proc-{suffix}")
145+
format!("wasm-{suffix}")
153146
}
154147

155148
impl WasmtimeRuntime {
@@ -182,16 +175,14 @@ impl WasmtimeRuntime {
182175

183176
let module = WasmtimeModule::new(module);
184177
let procedure_module = WasmtimeAsyncModule::new(procedure_module);
185-
let main_thread_name = wasm_main_worker_thread_name(&mcc.replica_ctx.database_identity);
186-
let procedure_thread_name = wasm_procedure_executor_thread_name(&mcc.replica_ctx.database_identity);
178+
let thread_name = wasm_worker_thread_name(&mcc.replica_ctx.database_identity);
187179

188180
let (module, init_inst) = WasmModuleHostActor::new(mcc, module)?;
189181
let procedure_module = module.with_runtime_module(procedure_module)?;
190182
Ok(super::module_host::ModuleWithInstance::Wasm {
191183
module,
192184
procedure_module,
193-
main_thread_name,
194-
procedure_thread_name,
185+
thread_name,
195186
core,
196187
init_inst: Box::new(init_inst),
197188
procedure_instance_pool_size: self.config.procedure_instance_pool_size,

0 commit comments

Comments
 (0)