Skip to content

Commit abdf059

Browse files
add trap handling for reducers, pooled instances for procedures
1 parent ae7de47 commit abdf059

3 files changed

Lines changed: 520 additions & 96 deletions

File tree

crates/core/src/host/module_host.rs

Lines changed: 110 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,8 @@ struct WasmtimeModuleHost {
359359

360360
struct V8ModuleHost {
361361
module: super::v8::JsModule,
362-
instance: super::v8::JsInstance,
362+
reducer_lane: super::v8::JsReducerLane,
363+
procedure_instances: ModuleInstanceManager<super::v8::JsModule>,
363364
}
364365

365366
/// A module; used as a bound on `InstanceManager`.
@@ -407,7 +408,7 @@ impl GenericModule for super::v8::JsModule {
407408

408409
impl GenericModuleInstance for super::v8::JsInstance {
409410
fn trapped(&self) -> bool {
410-
false
411+
self.trapped()
411412
}
412413
}
413414

@@ -615,6 +616,7 @@ pub fn call_identity_connected(
615616
}
616617
}
617618

619+
#[derive(Clone)]
618620
pub struct CallReducerParams {
619621
pub timestamp: Timestamp,
620622
pub caller_identity: Identity,
@@ -801,6 +803,23 @@ impl<M: GenericModule> ModuleInstanceManager<M> {
801803
}
802804
}
803805

806+
fn new_empty(module: M, database_identity: Identity) -> Self {
807+
let host_type = module.host_type();
808+
let create_instance_time_metric = CreateInstanceTimeMetric {
809+
metric: WORKER_METRICS
810+
.module_create_instance_time_seconds
811+
.with_label_values(&database_identity, &host_type),
812+
host_type,
813+
database_identity,
814+
};
815+
816+
Self {
817+
instances: Mutex::new(VecDeque::new()),
818+
module,
819+
create_instance_time_metric,
820+
}
821+
}
822+
804823
async fn with_instance<R>(&self, f: impl AsyncFnOnce(M::Instance) -> (R, M::Instance)) -> R {
805824
let inst = self.get_instance().await;
806825
let (res, inst) = f(inst).await;
@@ -1029,9 +1048,12 @@ impl ModuleHost {
10291048
}
10301049
ModuleWithInstance::Js { module, init_inst } => {
10311050
info = module.info();
1051+
let reducer_lane = super::v8::JsReducerLane::new(module.clone(), init_inst);
1052+
let procedure_instances = ModuleInstanceManager::new_empty(module.clone(), database_identity);
10321053
Arc::new(ModuleHostInner::Js(V8ModuleHost {
10331054
module,
1034-
instance: init_inst,
1055+
reducer_lane,
1056+
procedure_instances,
10351057
}))
10361058
}
10371059
};
@@ -1100,13 +1122,13 @@ impl ModuleHost {
11001122
})
11011123
.await
11021124
}
1103-
ModuleHostInner::Js(V8ModuleHost { instance, .. }) => {
1104-
instance
1125+
ModuleHostInner::Js(V8ModuleHost { reducer_lane, .. }) => {
1126+
reducer_lane
11051127
.run_on_thread(async move || {
11061128
drop(timer_guard);
11071129
f().await
11081130
})
1109-
.await
1131+
.await?
11101132
}
11111133
})
11121134
}
@@ -1145,7 +1167,7 @@ impl ModuleHost {
11451167
arg: A,
11461168
timer: impl FnOnce(&str) -> Guard,
11471169
work_wasm: impl AsyncFnOnce(Guard, &SingleCoreExecutor, Box<ModuleInstance>, A) -> (R, Box<ModuleInstance>),
1148-
work_js: impl AsyncFnOnce(Guard, &JsInstance, A) -> R,
1170+
work_js: impl AsyncFnOnce(Guard, &super::v8::JsReducerLane, A) -> R,
11491171
) -> Result<R, NoSuchModule> {
11501172
self.guard_closed()?;
11511173
let timer_guard = timer(label);
@@ -1172,7 +1194,7 @@ impl ModuleHost {
11721194
.with_instance(async |inst| work_wasm(timer_guard, executor, inst, arg).await)
11731195
.await
11741196
}
1175-
ModuleHostInner::Js(V8ModuleHost { instance, .. }) => work_js(timer_guard, instance, arg).await,
1197+
ModuleHostInner::Js(V8ModuleHost { reducer_lane, .. }) => work_js(timer_guard, reducer_lane, arg).await,
11761198
})
11771199
}
11781200

@@ -1185,7 +1207,7 @@ impl ModuleHost {
11851207
label: &str,
11861208
arg: A,
11871209
wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static,
1188-
js: impl AsyncFnOnce(A, &JsInstance) -> R,
1210+
js: impl AsyncFnOnce(A, &super::v8::JsReducerLane) -> R,
11891211
) -> Result<R, NoSuchModule>
11901212
where
11911213
R: Send + 'static,
@@ -1217,6 +1239,35 @@ impl ModuleHost {
12171239
.await
12181240
}
12191241

1242+
async fn with_js_procedure_instance<R>(
1243+
&self,
1244+
label: &str,
1245+
f: impl AsyncFnOnce(&JsInstance) -> R,
1246+
) -> Result<R, NoSuchModule> {
1247+
self.guard_closed()?;
1248+
let timer_guard = self.start_call_timer(label);
1249+
1250+
scopeguard::defer_on_unwind!({
1251+
log::warn!("procedure {label} panicked");
1252+
(self.on_panic)();
1253+
});
1254+
1255+
Ok(match &*self.inner {
1256+
ModuleHostInner::Wasm(_) => unreachable!("WASM procedures should not use the JS procedure instance path"),
1257+
ModuleHostInner::Js(V8ModuleHost {
1258+
procedure_instances, ..
1259+
}) => {
1260+
procedure_instances
1261+
.with_instance(async |inst| {
1262+
drop(timer_guard);
1263+
let res = f(&inst).await;
1264+
(res, inst)
1265+
})
1266+
.await
1267+
}
1268+
})
1269+
}
1270+
12201271
pub async fn disconnect_client(&self, client_id: ClientActorId) {
12211272
log::trace!("disconnecting client {client_id}");
12221273
if let Err(e) = self
@@ -1833,26 +1884,62 @@ impl ModuleHost {
18331884
name: &str,
18341885
params: CallProcedureParams,
18351886
) -> Result<CallProcedureReturn, NoSuchModule> {
1836-
self.call(
1837-
name,
1838-
params,
1839-
async move |params, inst| inst.call_procedure(params).await,
1840-
async move |params, inst| inst.call_procedure(params).await,
1841-
)
1842-
.await
1887+
match &*self.inner {
1888+
ModuleHostInner::Wasm(_) => {
1889+
self.call(
1890+
name,
1891+
params,
1892+
async move |params, inst| inst.call_procedure(params).await,
1893+
async move |_params, _inst| unreachable!("JS procedure lane is not used for WASM modules"),
1894+
)
1895+
.await
1896+
}
1897+
ModuleHostInner::Js(_) => {
1898+
self.with_js_procedure_instance(name, async move |inst| inst.call_procedure(params).await)
1899+
.await
1900+
}
1901+
}
18431902
}
18441903

18451904
pub(super) async fn call_scheduled_function(
18461905
&self,
18471906
params: ScheduledFunctionParams,
18481907
) -> Result<CallScheduledFunctionResult, NoSuchModule> {
1849-
self.call(
1850-
"unknown scheduled function",
1851-
params,
1852-
async move |params, inst| inst.call_scheduled_function(params).await,
1853-
async move |params, inst| inst.call_scheduled_function(params).await,
1854-
)
1855-
.await
1908+
match &*self.inner {
1909+
ModuleHostInner::Wasm(_) => {
1910+
self.call(
1911+
"unknown scheduled function",
1912+
params,
1913+
async move |params, inst| inst.call_scheduled_function(params).await,
1914+
async move |_params, _inst| unreachable!("JS scheduled-function lane is not used for WASM modules"),
1915+
)
1916+
.await
1917+
}
1918+
ModuleHostInner::Js(V8ModuleHost { module, .. }) => {
1919+
let use_procedure_lane =
1920+
match params.uses_procedure_lane(&self.info, module.replica_ctx().relational_db.as_ref()) {
1921+
Ok(use_procedure_lane) => use_procedure_lane,
1922+
Err(err) => {
1923+
log::error!("failed to classify scheduled JS function; routing to procedure lane: {err:#}");
1924+
true
1925+
}
1926+
};
1927+
if use_procedure_lane {
1928+
self.with_js_procedure_instance("unknown scheduled function", async move |inst| {
1929+
inst.call_scheduled_function(params).await
1930+
})
1931+
.await
1932+
} else {
1933+
self.call(
1934+
"unknown scheduled function",
1935+
params,
1936+
async move |params, inst| inst.call_scheduled_function(params).await,
1937+
async move |params, inst| inst.call_scheduled_function(params).await,
1938+
)
1939+
.await
1940+
}
1941+
}
1942+
}
18561943
}
18571944

18581945
/// Materializes the views return by the `view_collector`, if not already materialized,

crates/core/src/host/scheduler.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,16 +268,26 @@ struct SchedulerActor {
268268
module_host: WeakModuleHost,
269269
}
270270

271+
#[derive(Clone)]
271272
enum QueueItem {
272273
Id { id: ScheduledFunctionId, at: Timestamp },
273274
VolatileNonatomicImmediate { function_name: String, args: FunctionArgs },
274275
}
275276

277+
#[derive(Clone)]
276278
pub(crate) struct ScheduledFunctionParams(QueueItem);
277279

278280
#[cfg(target_pointer_width = "64")]
279281
spacetimedb_table::static_assert_size!(QueueItem, 64);
280282

283+
impl ScheduledFunctionParams {
284+
pub(crate) fn uses_procedure_lane(&self, module: &ModuleInfo, db: &RelationalDB) -> anyhow::Result<bool> {
285+
let tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
286+
let params = call_params_for_queued_item(module, db, &tx, self.0.clone())?;
287+
Ok(matches!(params, Some((_timestamp, _instant, CallParams::Procedure(_)))))
288+
}
289+
}
290+
281291
impl SchedulerActor {
282292
async fn run(mut self) {
283293
loop {

0 commit comments

Comments
 (0)