Skip to content

Commit f468e00

Browse files
committed
wip
1 parent 8928b43 commit f468e00

5 files changed

Lines changed: 178 additions & 64 deletions

File tree

crates/core/src/host/instance_env.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use core::mem;
88
use parking_lot::{Mutex, MutexGuard};
99
use smallvec::SmallVec;
1010
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
11-
use spacetimedb_lib::Timestamp;
11+
use spacetimedb_lib::{Identity, Timestamp};
1212
use spacetimedb_primitives::{ColId, ColList, IndexId, TableId};
1313
use spacetimedb_sats::{
1414
bsatn::{self, ToBsatn},
@@ -171,6 +171,11 @@ impl InstanceEnv {
171171
}
172172
}
173173

174+
/// Returns the database's identity.
175+
pub fn database_identity(&self) -> &Identity {
176+
&self.replica_ctx.database.database_identity
177+
}
178+
174179
/// Signal to this `InstanceEnv` that a reducer call is beginning.
175180
pub fn start_reducer(&mut self, ts: Timestamp) {
176181
self.start_time = ts;

crates/core/src/host/v8/mod.rs

Lines changed: 151 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@ use crate::host::wasm_common::instrumentation::CallTimes;
99
use crate::host::wasm_common::module_host_actor::{
1010
DescribeError, EnergyStats, ExecuteResult, ExecutionTimings, InstanceCommon, ReducerOp,
1111
};
12+
use crate::host::wasmtime::{epoch_ticker, ticks_in_duration, EPOCH_TICKS_PER_SECOND};
1213
use crate::host::ArgsTuple;
1314
use crate::{host::Scheduler, module_host_context::ModuleCreationContext, replica_context::ReplicaContext};
14-
use core::str;
15+
use core::ffi::c_void;
1516
use core::sync::atomic::{AtomicBool, Ordering};
1617
use core::time::Duration;
18+
use core::{ptr, str};
1719
use de::deserialize_js;
1820
use error::{catch_exception, exception_already_thrown, log_traceback, ExcResult, Throwable};
1921
use from_value::cast;
@@ -22,11 +24,9 @@ use ser::serialize_to_js;
2224
use spacetimedb_client_api_messages::energy::ReducerBudget;
2325
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
2426
use spacetimedb_datastore::traits::Program;
25-
use spacetimedb_lib::RawModuleDef;
26-
use spacetimedb_lib::{ConnectionId, Identity};
2727
use spacetimedb_schema::auto_migrate::MigrationPolicy;
28+
use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef, Timestamp};
2829
use std::sync::{Arc, LazyLock};
29-
use std::thread;
3030
use std::time::Instant;
3131
use v8::{
3232
Context, ContextOptions, ContextScope, Function, HandleScope, Isolate, IsolateHandle, Local, OwnedIsolate, Value,
@@ -134,14 +134,16 @@ impl Module for JsModule {
134134
}
135135

136136
fn create_instance(&self) -> Self::Instance {
137-
// TODO(v8): consider some equivalent to `epoch_deadline_callback`
138-
// where we report `Js has been running for ...`s.
139-
140137
// TODO(v8): do we care about preinits / setup or are they unnecessary?
141138

142139
let common = &self.common;
143140
let instance_env = InstanceEnv::new(common.replica_ctx().clone(), common.scheduler().clone());
144-
let instance = JsInstanceEnv { instance_env };
141+
let instance = Some(JsInstanceEnv {
142+
instance_env,
143+
reducer_start: Instant::now(),
144+
call_times: CallTimes::new(),
145+
reducer_name: String::from("<initializing>"),
146+
});
145147

146148
// NOTE(centril): We don't need to do `extract_description` here
147149
// as unlike WASM, we have to recreate the isolate every time.
@@ -157,13 +159,72 @@ impl Module for JsModule {
157159
}
158160
}
159161

162+
const EXPECT_ENV: &str = "there should be a `JsInstanceEnv`";
163+
164+
fn env_on_isolate(isolate: &mut Isolate) -> &mut JsInstanceEnv {
165+
isolate.get_slot_mut().expect(EXPECT_ENV)
166+
}
167+
168+
fn env_on_instance(inst: &mut JsInstance) -> &mut JsInstanceEnv {
169+
inst.instance.as_mut().expect(EXPECT_ENV)
170+
}
171+
160172
struct JsInstanceEnv {
161173
instance_env: InstanceEnv,
174+
175+
/// The point in time the last reducer call started at.
176+
reducer_start: Instant,
177+
178+
/// Track time spent in all wasm instance env calls (aka syscall time).
179+
///
180+
/// Each function, like `insert`, will add the `Duration` spent in it
181+
/// to this tracker.
182+
call_times: CallTimes,
183+
184+
/// The last, including current, reducer to be executed by this environment.
185+
reducer_name: String,
186+
}
187+
188+
impl JsInstanceEnv {
189+
/// Signal to this `WasmInstanceEnv` that a reducer call is beginning.
190+
///
191+
/// Returns the handle used by reducers to read from `args`
192+
/// as well as the handle used to write the error message, if any.
193+
pub fn start_reducer(&mut self, name: &str, ts: Timestamp) {
194+
self.reducer_start = Instant::now();
195+
name.clone_into(&mut self.reducer_name);
196+
self.instance_env.start_reducer(ts);
197+
}
198+
199+
/// Returns the name of the most recent reducer to be run in this environment.
200+
pub fn reducer_name(&self) -> &str {
201+
&self.reducer_name
202+
}
203+
204+
/// Returns the name of the most recent reducer to be run in this environment.
205+
pub fn reducer_start(&self) -> Instant {
206+
self.reducer_start
207+
}
208+
209+
/// Signal to this `WasmInstanceEnv` that a reducer call is over.
210+
/// This resets all of the state associated to a single reducer call,
211+
/// and returns instrumentation records.
212+
pub fn finish_reducer(&mut self) -> ExecutionTimings {
213+
let total_duration = self.reducer_start.elapsed();
214+
215+
// Taking the call times record also resets timings to 0s for the next call.
216+
let wasm_instance_env_call_times = self.call_times.take();
217+
218+
ExecutionTimings {
219+
total_duration,
220+
wasm_instance_env_call_times,
221+
}
222+
}
162223
}
163224

164225
struct JsInstance {
165226
common: InstanceCommon,
166-
instance: JsInstanceEnv,
227+
instance: Option<JsInstanceEnv>,
167228
program: Arc<str>,
168229
}
169230

@@ -178,45 +239,48 @@ impl ModuleInstance for JsInstance {
178239
old_module_info: Arc<ModuleInfo>,
179240
policy: MigrationPolicy,
180241
) -> anyhow::Result<UpdateDatabaseResult> {
181-
let replica_ctx = &self.instance.instance_env.replica_ctx;
242+
let replica_ctx = &env_on_instance(self).instance_env.replica_ctx.clone();
182243
self.common
183244
.update_database(replica_ctx, program, old_module_info, policy)
184245
}
185246

186247
fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> super::ReducerCallResult {
187-
self.common.call_reducer_with_tx(
188-
&self.instance.instance_env.replica_ctx.clone(),
189-
tx,
190-
params,
191-
log_traceback,
192-
|tx, op, budget| {
248+
let replica_ctx = env_on_instance(self).instance_env.replica_ctx.clone();
249+
250+
self.common
251+
.call_reducer_with_tx(&replica_ctx, tx, params, log_traceback, |tx, op, budget| {
252+
let callback_every = EPOCH_TICKS_PER_SECOND;
253+
extern "C" fn callback(isolate: &mut Isolate, _: *mut c_void) {
254+
let env = env_on_isolate(isolate);
255+
let database = env.instance_env.replica_ctx.database_identity;
256+
let reducer = env.reducer_name();
257+
let dur = env.reducer_start().elapsed();
258+
tracing::warn!(reducer, ?database, "Wasm has been running for {dur:?}");
259+
}
260+
261+
// Prepare the isolate with the env.
262+
let mut isolate = Isolate::new(<_>::default());
263+
isolate.set_slot(self.instance.take().expect(EXPECT_ENV));
264+
193265
// TODO(v8): snapshots, module->host calls
194266
// Call the reducer.
195-
let (mut isolate, (tx, call_result, total_duration)) =
196-
with_script(&self.program, budget, |scope, _| {
197-
let start = Instant::now();
198-
199-
let (tx, call_result) = self
200-
.instance
267+
env_on_isolate(&mut isolate).instance_env.start_reducer(op.timestamp);
268+
let (mut isolate, (tx, call_result)) =
269+
with_script(isolate, &self.program, callback_every, callback, budget, |scope, _| {
270+
let (tx, call_result) = env_on_isolate(scope)
201271
.instance_env
202272
.tx
203273
.clone()
204274
.set(tx, || call_call_reducer_from_op(scope, op));
205-
206-
let total_duration = start.elapsed();
207-
208-
(tx, call_result, total_duration)
275+
(tx, call_result)
209276
});
277+
let timings = env_on_isolate(&mut isolate).finish_reducer();
278+
self.instance = isolate.remove_slot();
210279

211-
// Handle energy and timings.
212-
let used = duration_to_budget(total_duration);
280+
// Derive energy stats.
281+
let used = duration_to_budget(timings.total_duration);
213282
let remaining = budget - used;
214283
let energy = EnergyStats { budget, remaining };
215-
let timings = ExecutionTimings {
216-
total_duration,
217-
// TODO(v8): call times.
218-
wasm_instance_env_call_times: CallTimes::new(),
219-
};
220284

221285
// Fetch the currently used heap size in V8.
222286
// The used size is ostensibly fairer than the total size.
@@ -229,33 +293,40 @@ impl ModuleInstance for JsInstance {
229293
call_result,
230294
};
231295
(tx, exec_result)
232-
},
233-
)
296+
})
234297
}
235298
}
236299

237300
fn with_script<R>(
301+
isolate: OwnedIsolate,
238302
code: &str,
303+
callback_every: u64,
304+
callback: IsolateCallback,
239305
budget: ReducerBudget,
240306
logic: impl for<'scope> FnOnce(&mut HandleScope<'scope>, Local<'scope, Value>) -> R,
241307
) -> (OwnedIsolate, R) {
242-
with_scope(budget, |scope| {
308+
with_scope(isolate, callback_every, callback, budget, |scope| {
243309
let code = v8::String::new(scope, code).unwrap();
244310
let script_val = v8::Script::compile(scope, code, None).unwrap().run(scope).unwrap();
245311
logic(scope, script_val)
246312
})
247313
}
248314

249315
/// Sets up an isolate and run `logic` with a [`HandleScope`].
250-
pub(crate) fn with_scope<R>(budget: ReducerBudget, logic: impl FnOnce(&mut HandleScope<'_>) -> R) -> (OwnedIsolate, R) {
251-
let mut isolate: OwnedIsolate = Isolate::new(<_>::default());
316+
pub(crate) fn with_scope<R>(
317+
mut isolate: OwnedIsolate,
318+
callback_every: u64,
319+
callback: IsolateCallback,
320+
budget: ReducerBudget,
321+
logic: impl FnOnce(&mut HandleScope<'_>) -> R,
322+
) -> (OwnedIsolate, R) {
252323
isolate.set_capture_stack_trace_for_uncaught_exceptions(true, 1024);
253324
let isolate_handle = isolate.thread_safe_handle();
254325
let mut scope_1 = HandleScope::new(&mut isolate);
255326
let context = Context::new(&mut scope_1, ContextOptions::default());
256327
let mut scope_2 = ContextScope::new(&mut scope_1, context);
257328

258-
let timeout_thread_cancel_flag = run_reducer_timeout(isolate_handle, budget);
329+
let timeout_thread_cancel_flag = run_reducer_timeout(callback_every, callback, budget, isolate_handle);
259330

260331
let ret = logic(&mut scope_2);
261332
drop(scope_2);
@@ -267,26 +338,44 @@ pub(crate) fn with_scope<R>(budget: ReducerBudget, logic: impl FnOnce(&mut Handl
267338
(isolate, ret)
268339
}
269340

341+
type IsolateCallback = extern "C" fn(&mut Isolate, *mut c_void);
342+
270343
/// Spawns a thread that will terminate reducer execution
271344
/// when `budget` has been used up.
272-
fn run_reducer_timeout(isolate_handle: IsolateHandle, budget: ReducerBudget) -> Arc<AtomicBool> {
345+
///
346+
/// Every `callback_every` ticks, `callback` is called.
347+
fn run_reducer_timeout(
348+
callback_every: u64,
349+
callback: IsolateCallback,
350+
budget: ReducerBudget,
351+
isolate_handle: IsolateHandle,
352+
) -> Arc<AtomicBool> {
273353
let execution_done_flag = Arc::new(AtomicBool::new(false));
274354
let execution_done_flag2 = execution_done_flag.clone();
275355
let timeout = budget_to_duration(budget);
356+
let max_ticks = ticks_in_duration(timeout);
276357

277-
// TODO(v8): Using an OS thread is a bit heavy handed...?
278-
thread::spawn(move || {
279-
// Sleep until the timeout.
280-
thread::sleep(timeout);
281-
358+
let mut num_ticks = 0;
359+
epoch_ticker(move || {
360+
// Check if execution completed.
282361
if execution_done_flag2.load(Ordering::Relaxed) {
283-
// The reducer completed successfully.
284-
return;
362+
return None;
285363
}
286364

287-
// Reducer is still running.
288-
// Terminate V8 execution.
289-
isolate_handle.terminate_execution();
365+
// We've reached the number of ticks to call `callback`.
366+
if num_ticks % callback_every == 0 && isolate_handle.request_interrupt(callback, ptr::null_mut()) {
367+
return None;
368+
}
369+
370+
if num_ticks == max_ticks {
371+
// Execution still ongoing while budget has been exhausted.
372+
// Terminate V8 execution.
373+
// This implements "gas" for v8.
374+
isolate_handle.terminate_execution();
375+
}
376+
377+
num_ticks += 1;
378+
Some(())
290379
});
291380

292381
execution_done_flag
@@ -379,9 +468,18 @@ fn call_call_reducer(
379468

380469
/// Extracts the raw module def by running `__describe_module__` in `program`.
381470
fn extract_description(program: &str) -> Result<RawModuleDef, DescribeError> {
382-
let (_, ret) = with_script(program, ReducerBudget::DEFAULT_BUDGET, |scope, _| {
383-
run_describer(log_traceback, || call_describe_module(scope))
384-
});
471+
let budget = ReducerBudget::DEFAULT_BUDGET;
472+
let callback_every = EPOCH_TICKS_PER_SECOND;
473+
extern "C" fn callback(_: &mut Isolate, _: *mut c_void) {}
474+
475+
let (_, ret) = with_script(
476+
Isolate::new(<_>::default()),
477+
program,
478+
callback_every,
479+
callback,
480+
budget,
481+
|scope, _| run_describer(log_traceback, || call_describe_module(scope)),
482+
);
385483
ret
386484
}
387485

crates/core/src/host/wasm_common.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ impl<I: ResourceIndex> ResourceSlab<I> {
320320
decl_index!(RowIterIdx => std::vec::IntoIter<Vec<u8>>);
321321
pub(super) type RowIters = ResourceSlab<RowIterIdx>;
322322

323-
pub(super) struct TimingSpan {
323+
pub(crate) struct TimingSpan {
324324
pub start: Instant,
325325
pub name: String,
326326
}

crates/core/src/host/wasmtime/mod.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,21 @@ pub struct WasmtimeRuntime {
2727

2828
const EPOCH_TICK_LENGTH: Duration = Duration::from_millis(10);
2929

30-
const EPOCH_TICKS_PER_SECOND: u64 = Duration::from_secs(1).div_duration_f64(EPOCH_TICK_LENGTH) as u64;
30+
pub(crate) const EPOCH_TICKS_PER_SECOND: u64 = ticks_in_duration(Duration::from_secs(1));
31+
32+
pub(crate) const fn ticks_in_duration(duration: Duration) -> u64 {
33+
duration.div_duration_f64(EPOCH_TICK_LENGTH) as u64
34+
}
35+
36+
pub(crate) fn epoch_ticker(mut on_tick: impl 'static + Send + FnMut() -> Option<()>) {
37+
tokio::spawn(async move {
38+
let mut interval = tokio::time::interval(EPOCH_TICK_LENGTH);
39+
loop {
40+
interval.tick().await;
41+
let Some(()) = on_tick() else { return; };
42+
}
43+
});
44+
}
3145

3246
impl WasmtimeRuntime {
3347
pub fn new(data_dir: Option<&ServerDataDir>) -> Self {
@@ -53,13 +67,10 @@ impl WasmtimeRuntime {
5367
let engine = Engine::new(&config).unwrap();
5468

5569
let weak_engine = engine.weak();
56-
tokio::spawn(async move {
57-
let mut interval = tokio::time::interval(EPOCH_TICK_LENGTH);
58-
loop {
59-
interval.tick().await;
60-
let Some(engine) = weak_engine.upgrade() else { break };
61-
engine.increment_epoch();
62-
}
70+
epoch_ticker(move || {
71+
let engine = weak_engine.upgrade()?;
72+
engine.increment_epoch();
73+
Some(())
6374
});
6475

6576
let mut linker = Box::new(Linker::new(&engine));

0 commit comments

Comments
 (0)