Skip to content

Commit 247a9eb

Browse files
Use synchronous runtime for the main wasm execution lane (#5095)
# Description of Changes Before this change, we used a single async-enabled wasm runtime for all requests, even though procedures are the only operation that can yield. Now each module gets two separate runtimes. We continue to use the same async runtime for procedures, but now reducers are executed against a synchronous wasm runtime, backed by a single OS-thread instead of a Tokio runtime. The purpose of this change is to remove from the critical path the overhead associated with async calls that really aren't async at all. Also includes the following fix from #5135: > After #4973, WASM procedures can execute concurrently with later operations on the same WebSocket. > Before this change, the C# regression testsuite queued several procedures, then immediately queued `UnsubscribeThen`. After #4973, the unsubscribe could be applied before the `SubscriptionEventOffset` procedure callback ran, clearing `MyTable` from the local subscribed cache. The callback then failed while asserting that the `offset-test:` row was present. > This change treats unsubscribe as a separate phase. It is scheduled after the main work is queued, but only starts once `waiting == 0`, so all callbacks that inspect subscribed state run before the cache is cleared. # API and ABI breaking changes None # Expected complexity level and risk 2.5 # Testing Pure refactor. Relies on current test coverage. #5078 will ensure the performance is on par with V8.
1 parent 1a2f111 commit 247a9eb

9 files changed

Lines changed: 923 additions & 433 deletions

File tree

crates/core/src/host/module_host.rs

Lines changed: 151 additions & 117 deletions
Large diffs are not rendered by default.

crates/core/src/host/scheduler.rs

Lines changed: 215 additions & 129 deletions
Large diffs are not rendered by default.

crates/core/src/host/v8/mod.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,14 @@ impl V8RuntimeInner {
240240
// Validate/create the module and spawn the first instance.
241241
let metrics = InstanceManagerMetrics::new(HostType::Js, mcc.replica_ctx.database_identity);
242242
let mcc = Either::Right(mcc);
243+
244+
// The JS main worker and procedure workers run on separate OS threads,
245+
// but they intentionally share one database core allocation.
246+
// When core pinning is enabled, all worker threads pin to the same core
247+
// and rebalance together because they use clones of the same `CorePinner`.
243248
let load_balance_guard = Arc::new(core.guard);
244249
let core_pinner = core.pinner;
250+
245251
let heap_policy = config.heap_policy;
246252
let (common, init_inst) = spawn_main_instance_worker(
247253
program.clone(),
@@ -1405,10 +1411,7 @@ fn handle_main_worker_request(
14051411
}
14061412
JsMainWorkerRequest::ScheduledReducer { reply_tx, params } => {
14071413
handle_worker_request("scheduled_reducer", reply_tx, || {
1408-
let (res, trapped) = instance_common
1409-
.call_scheduled_function(params, inst)
1410-
.now_or_never()
1411-
.expect("our call_scheduled_function implementation is not actually async");
1414+
let (res, trapped) = instance_common.call_scheduled_reducer(params, inst);
14121415
(res, trapped)
14131416
})
14141417
}
@@ -1506,9 +1509,9 @@ fn handle_procedure_worker_request(
15061509
JsProcedureWorkerRequest::ScheduledProcedure { reply_tx, params } => {
15071510
handle_worker_request("scheduled_procedure", reply_tx, || {
15081511
let (res, trapped) = instance_common
1509-
.call_scheduled_function(params, inst)
1512+
.call_scheduled_procedure(params, inst)
15101513
.now_or_never()
1511-
.expect("our call_scheduled_function implementation is not actually async");
1514+
.expect("our call_scheduled_procedure implementation is not actually async");
15121515
(res, trapped)
15131516
})
15141517
}

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,15 @@ impl<T: WasmModule> WasmModuleHostActor<T> {
394394

395395
Ok((module, initial_instance))
396396
}
397+
398+
pub fn with_runtime_module<U: WasmModule>(&self, module: U) -> Result<WasmModuleHostActor<U>, InitializationError> {
399+
let module = module.instantiate_pre()?;
400+
Ok(WasmModuleHostActor {
401+
module,
402+
common: self.common.clone(),
403+
func_names: self.func_names.clone(),
404+
})
405+
}
397406
}
398407

399408
impl<T: WasmModule> WasmModuleHostActor<T> {
@@ -528,11 +537,20 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
528537
res
529538
}
530539

531-
pub(in crate::host) async fn call_scheduled_function(
540+
pub(in crate::host) async fn call_scheduled_procedure(
532541
&mut self,
533542
params: ScheduledFunctionParams,
534543
) -> CallScheduledFunctionResult {
535-
let (res, trapped) = self.common.call_scheduled_function(params, &mut self.instance).await;
544+
let (res, trapped) = self.common.call_scheduled_procedure(params, &mut self.instance).await;
545+
self.trapped = trapped;
546+
res
547+
}
548+
549+
pub(in crate::host) fn call_scheduled_reducer(
550+
&mut self,
551+
params: ScheduledFunctionParams,
552+
) -> CallScheduledFunctionResult {
553+
let (res, trapped) = self.common.call_scheduled_reducer(params, &mut self.instance);
536554
self.trapped = trapped;
537555
res
538556
}
@@ -1363,12 +1381,20 @@ impl InstanceCommon {
13631381
self.info.relational_db().clear_all_clients().map_err(Into::into)
13641382
}
13651383

1366-
pub(crate) async fn call_scheduled_function<I: WasmInstance>(
1384+
pub(crate) async fn call_scheduled_procedure<I: WasmInstance>(
1385+
&mut self,
1386+
params: ScheduledFunctionParams,
1387+
inst: &mut I,
1388+
) -> (CallScheduledFunctionResult, bool) {
1389+
crate::host::scheduler::call_scheduled_procedure(&self.info.clone(), params, self, inst).await
1390+
}
1391+
1392+
pub(crate) fn call_scheduled_reducer<I: WasmInstance>(
13671393
&mut self,
13681394
params: ScheduledFunctionParams,
13691395
inst: &mut I,
13701396
) -> (CallScheduledFunctionResult, bool) {
1371-
crate::host::scheduler::call_scheduled_function(&self.info.clone(), params, self, inst).await
1397+
crate::host::scheduler::call_scheduled_reducer(&self.info.clone(), params, self, inst)
13721398
}
13731399
}
13741400

crates/core/src/host/wasmtime/mod.rs

Lines changed: 105 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,18 @@ use spacetimedb_paths::server::ServerDataDir;
1111
use std::borrow::Cow;
1212
use std::time::Duration;
1313
use wasmtime::{self, Engine, Linker, StoreContext, StoreContextMut};
14-
pub use wasmtime_module::{WasmtimeInstance, WasmtimeModule};
14+
pub use wasmtime_module::{WasmtimeAsyncModule, WasmtimeInstance, WasmtimeModule};
1515

1616
#[cfg(unix)]
1717
mod pooling_stack_creator;
1818
mod wasm_instance_env;
1919
mod wasmtime_module;
2020

2121
pub struct WasmtimeRuntime {
22-
engine: Engine,
23-
linker: Box<Linker<WasmInstanceEnv>>,
22+
sync_engine: Engine,
23+
sync_linker: Box<Linker<WasmInstanceEnv>>,
24+
async_engine: Engine,
25+
async_linker: Box<Linker<WasmInstanceEnv>>,
2426
config: WasmConfig,
2527
}
2628

@@ -46,76 +48,108 @@ pub(crate) fn epoch_ticker(mut on_tick: impl 'static + Send + FnMut() -> Option<
4648

4749
impl WasmtimeRuntime {
4850
pub fn new(data_dir: Option<&ServerDataDir>, runtime_config: WasmConfig) -> Self {
49-
let mut config = wasmtime::Config::new();
50-
config
51-
.cranelift_opt_level(wasmtime::OptLevel::Speed)
52-
.consume_fuel(true)
53-
.epoch_interruption(true)
54-
.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable)
55-
// We need async support to enable suspending execution of procedures
56-
// when waiting for e.g. HTTP responses or the transaction lock.
57-
// We don't enable either fuel-based or epoch-based yielding
58-
// (see https://docs.wasmtime.dev/api/wasmtime/struct.Store.html#method.epoch_deadline_async_yield_and_update
59-
// and https://docs.wasmtime.dev/api/wasmtime/struct.Store.html#method.fuel_async_yield_interval)
60-
// so reducers will always execute to completion during the first `Future::poll` call,
61-
// and procedures will only yield when performing an asynchronous operation.
62-
// These futures are executed on a separate single-threaded executor not related to the "global" Tokio runtime,
63-
// which is responsible only for executing WASM. See `crate::util::jobs` for this infrastructure.
64-
.async_support(true);
51+
let sync_config = wasmtime_config(data_dir, false);
52+
let async_config = wasmtime_config(data_dir, true);
6553

66-
#[cfg(unix)]
67-
config
68-
.async_stack_size(self::pooling_stack_creator::ASYNC_STACK_SIZE)
69-
.with_host_stack(self::pooling_stack_creator::PoolingStackCreator::new());
54+
let sync_engine = Engine::new(&sync_config).unwrap();
55+
let async_engine = Engine::new(&async_config).unwrap();
7056

71-
// Offer a compile-time flag for enabling perfmap generation,
72-
// so `perf` can display JITted symbol names.
73-
// Ideally we would be able to configure this at runtime via a flag to `spacetime start`,
74-
// but this is good enough for now.
75-
#[cfg(feature = "perfmap")]
76-
config.profiler(wasmtime::ProfilingStrategy::PerfMap);
77-
78-
if let Some(data_dir) = data_dir {
79-
let mut cache_config = wasmtime::CacheConfig::new();
80-
cache_config.with_directory(data_dir.wasmtime_cache().0);
81-
match wasmtime::Cache::new(cache_config) {
82-
Ok(cache) => {
83-
config.cache(Some(cache));
84-
}
85-
Err(e) => {
86-
// caching is just an optimization, so if it fails, just log and continue
87-
tracing::warn!("failed to set up wasmtime cache: {e:#}")
88-
}
57+
let weak_sync_engine = sync_engine.weak();
58+
let weak_async_engine = async_engine.weak();
59+
epoch_ticker(move || {
60+
let mut ticked = false;
61+
if let Some(engine) = weak_sync_engine.upgrade() {
62+
engine.increment_epoch();
63+
ticked = true;
8964
}
65+
if let Some(engine) = weak_async_engine.upgrade() {
66+
engine.increment_epoch();
67+
ticked = true;
68+
}
69+
ticked.then_some(())
70+
});
71+
72+
let mut sync_linker = Box::new(Linker::new(&sync_engine));
73+
WasmtimeModule::link_imports(&mut sync_linker).unwrap();
74+
75+
let mut async_linker = Box::new(Linker::new(&async_engine));
76+
WasmtimeAsyncModule::link_imports(&mut async_linker).unwrap();
77+
78+
let config = runtime_config;
79+
WasmtimeRuntime {
80+
sync_engine,
81+
sync_linker,
82+
async_engine,
83+
async_linker,
84+
config,
9085
}
86+
}
87+
}
9188

92-
let engine = Engine::new(&config).unwrap();
89+
fn wasmtime_config(data_dir: Option<&ServerDataDir>, async_support: bool) -> wasmtime::Config {
90+
let mut config = wasmtime::Config::new();
91+
config
92+
.cranelift_opt_level(wasmtime::OptLevel::Speed)
93+
.consume_fuel(true)
94+
.epoch_interruption(true)
95+
.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);
9396

94-
let weak_engine = engine.weak();
95-
epoch_ticker(move || {
96-
let engine = weak_engine.upgrade()?;
97-
engine.increment_epoch();
98-
Some(())
99-
});
97+
if async_support {
98+
// Procedure instances need async support to suspend execution when waiting for
99+
// e.g. HTTP responses or the transaction lock. Main-lane instances use a
100+
// separate sync engine so reducers/views do not pay Wasmtime's fiber overhead.
101+
config.async_support(true);
100102

101-
let mut linker = Box::new(Linker::new(&engine));
102-
WasmtimeModule::link_imports(&mut linker).unwrap();
103+
#[cfg(unix)]
104+
config
105+
.async_stack_size(self::pooling_stack_creator::ASYNC_STACK_SIZE)
106+
.with_host_stack(self::pooling_stack_creator::PoolingStackCreator::new());
107+
}
103108

104-
let config = runtime_config;
105-
WasmtimeRuntime { engine, linker, config }
109+
// Offer a compile-time flag for enabling perfmap generation,
110+
// so `perf` can display JITted symbol names.
111+
// Ideally we would be able to configure this at runtime via a flag to `spacetime start`,
112+
// but this is good enough for now.
113+
#[cfg(feature = "perfmap")]
114+
config.profiler(wasmtime::ProfilingStrategy::PerfMap);
115+
116+
if let Some(data_dir) = data_dir {
117+
let mut cache_config = wasmtime::CacheConfig::new();
118+
cache_config.with_directory(data_dir.wasmtime_cache().0);
119+
match wasmtime::Cache::new(cache_config) {
120+
Ok(cache) => {
121+
config.cache(Some(cache));
122+
}
123+
Err(e) => {
124+
// caching is just an optimization, so if it fails, just log and continue
125+
tracing::warn!("failed to set up wasmtime cache: {e:#}")
126+
}
127+
}
106128
}
129+
130+
config
107131
}
108132

109133
pub type Module = WasmModuleHostActor<WasmtimeModule>;
134+
pub type ProcedureModule = WasmModuleHostActor<WasmtimeAsyncModule>;
110135
pub type ModuleInstance = WasmModuleInstance<WasmtimeInstance>;
111136

112-
const THREAD_NAME_DATABASE_ID_SUFFIX_LEN: usize = 8;
137+
// Linux thread names expose at most 15 bytes, so keep the database identity
138+
// suffix short enough to survive after the `wasm-main-`/`wasm-proc-` prefix.
139+
const THREAD_NAME_DATABASE_ID_SUFFIX_LEN: usize = 5;
140+
141+
fn wasm_main_worker_thread_name(database_identity: &spacetimedb_lib::Identity) -> String {
142+
let hex = database_identity.to_hex();
143+
// We use the tail of the identity to avoid the common structured prefix.
144+
let suffix = &hex.as_str()[hex.as_str().len() - THREAD_NAME_DATABASE_ID_SUFFIX_LEN..];
145+
format!("wasm-main-{suffix}")
146+
}
113147

114-
fn wasm_executor_thread_name(database_identity: &spacetimedb_lib::Identity) -> String {
148+
fn wasm_procedure_executor_thread_name(database_identity: &spacetimedb_lib::Identity) -> String {
115149
let hex = database_identity.to_hex();
116150
// We use the tail of the identity to avoid the common structured prefix.
117151
let suffix = &hex.as_str()[hex.as_str().len() - THREAD_NAME_DATABASE_ID_SUFFIX_LEN..];
118-
format!("wasm-{suffix}")
152+
format!("wasm-proc-{suffix}")
119153
}
120154

121155
impl WasmtimeRuntime {
@@ -126,7 +160,7 @@ impl WasmtimeRuntime {
126160
core: AllocatedJobCore,
127161
) -> anyhow::Result<super::module_host::ModuleWithInstance> {
128162
let module =
129-
wasmtime::Module::new(&self.engine, program_bytes).map_err(ModuleCreationError::WasmCompileError)?;
163+
wasmtime::Module::new(&self.sync_engine, program_bytes).map_err(ModuleCreationError::WasmCompileError)?;
130164

131165
let func_imports = module
132166
.imports()
@@ -136,17 +170,29 @@ impl WasmtimeRuntime {
136170
abi::verify_supported(WasmtimeModule::IMPLEMENTED_ABI, abi)?;
137171

138172
let module = self
139-
.linker
173+
.sync_linker
140174
.instantiate_pre(&module)
141175
.map_err(InitializationError::Instantiation)?;
176+
let procedure_module =
177+
wasmtime::Module::new(&self.async_engine, program_bytes).map_err(ModuleCreationError::WasmCompileError)?;
178+
let procedure_module = self
179+
.async_linker
180+
.instantiate_pre(&procedure_module)
181+
.map_err(InitializationError::Instantiation)?;
142182

143183
let module = WasmtimeModule::new(module);
144-
let executor_thread_name = wasm_executor_thread_name(&mcc.replica_ctx.database_identity);
184+
let procedure_module = WasmtimeAsyncModule::new(procedure_module);
185+
let main_thread_name = wasm_main_worker_thread_name(&mcc.replica_ctx.database_identity);
186+
let procedure_thread_name = wasm_procedure_executor_thread_name(&mcc.replica_ctx.database_identity);
145187

146188
let (module, init_inst) = WasmModuleHostActor::new(mcc, module)?;
189+
let procedure_module = module.with_runtime_module(procedure_module)?;
147190
Ok(super::module_host::ModuleWithInstance::Wasm {
148191
module,
149-
executor: core.spawn_named_async_executor(executor_thread_name),
192+
procedure_module,
193+
main_thread_name,
194+
procedure_thread_name,
195+
core,
150196
init_inst: Box::new(init_inst),
151197
procedure_instance_pool_size: self.config.procedure_instance_pool_size,
152198
})

crates/core/src/host/wasmtime/wasm_instance_env.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1785,6 +1785,7 @@ impl WasmInstanceEnv {
17851785
view_call.sender,
17861786
args_source.0,
17871787
result_sink,
1788+
true,
17881789
)?;
17891790

17901791
Ok(code)

0 commit comments

Comments
 (0)