diff --git a/crates/cli/src/subcommands/publish.rs b/crates/cli/src/subcommands/publish.rs index df63923eb59..4e3884e1931 100644 --- a/crates/cli/src/subcommands/publish.rs +++ b/crates/cli/src/subcommands/publish.rs @@ -48,6 +48,15 @@ pub fn cli() -> clap::Command { .conflicts_with("build_options") .help("The system path (absolute or relative) to the compiled wasm binary we should publish, instead of building the project."), ) + // TODO(v8): needs better UX but good enough for a demo... + .arg( + Arg::new("javascript") + .long("javascript") + .action(SetTrue) + .requires("wasm_file") + .hide(true) + .help("UNSTABLE: interpret `--bin-path` as a JS module"), + ) .arg( Arg::new("num_replicas") .value_parser(clap::value_parser!(u8)) @@ -84,6 +93,8 @@ pub async fn exec(mut config: Config, args: &ArgMatches) -> Result<(), anyhow::E let force = args.get_flag("force"); let anon_identity = args.get_flag("anon_identity"); let wasm_file = args.get_one::("wasm_file"); + // TODO(v8): needs better UX but good enough for a demo... + let wasm_file_is_really_js = args.get_flag("javascript"); let database_host = config.get_host_url(server)?; let build_options = args.get_one::("build_options").unwrap(); let num_replicas = args.get_one::("num_replicas"); @@ -123,6 +134,11 @@ pub async fn exec(mut config: Config, args: &ArgMatches) -> Result<(), anyhow::E }; let program_bytes = fs::read(path_to_wasm)?; + // TODO(v8): needs better UX but good enough for a demo... + if wasm_file_is_really_js { + builder = builder.query(&[("host_type", "Js")]); + } + let server_address = { let url = Url::parse(&database_host)?; url.host_str().unwrap_or("").to_string() diff --git a/crates/client-api/Cargo.toml b/crates/client-api/Cargo.toml index f19b1495dd8..9e05007f22a 100644 --- a/crates/client-api/Cargo.toml +++ b/crates/client-api/Cargo.toml @@ -63,3 +63,6 @@ toml.workspace = true [lints] workspace = true + +[features] +unstable = [] diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index fdee60f490d..801c906bcc1 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -493,6 +493,8 @@ pub struct PublishDatabaseQueryParams { #[serde(default)] clear: bool, num_replicas: Option, + #[serde(default)] + host_type: HostType, /// [`Hash`] of [`MigrationToken`]` to be checked if `MigrationPolicy::BreakClients` is set. /// /// Users obtain such a hash via the `/database/:name_or_identity/pre-publish POST` route. @@ -532,12 +534,25 @@ pub async fn publish( Query(PublishDatabaseQueryParams { clear, num_replicas, + host_type, token, policy, }): Query, Extension(auth): Extension, body: Bytes, ) -> axum::response::Result> { + // Feature gate V8 modules. + // The host must've been compiled with the `unstable` feature. + // TODO(v8): ungate this when V8 is ready to ship. + #[cfg(not(feature = "unstable"))] + if host_type == HostType::Js { + return Err(( + StatusCode::BAD_REQUEST, + "JS host type requires a host with unstable features", + ) + .into()); + } + // You should not be able to publish to a database that you do not own // so, unless you are the owner, this will fail. @@ -638,7 +653,7 @@ pub async fn publish( database_identity, program_bytes: body.into(), num_replicas, - host_type: HostType::Wasm, + host_type, }, policy, ) diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 4d6863a7f27..67e16ccd69a 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -2,12 +2,13 @@ use super::scheduler::{get_schedule_from_row, ScheduleError, Scheduler}; use crate::database_logger::{BacktraceProvider, LogLevel, Record}; use crate::db::relational_db::{MutTx, RelationalDB}; use crate::error::{DBError, DatastoreError, IndexError, NodesError}; +use crate::host::wasm_common::TimingSpan; use crate::replica_context::ReplicaContext; use core::mem; use parking_lot::{Mutex, MutexGuard}; use smallvec::SmallVec; use spacetimedb_datastore::locking_tx_datastore::MutTxId; -use spacetimedb_lib::Timestamp; +use spacetimedb_lib::{Identity, Timestamp}; use spacetimedb_primitives::{ColId, ColList, IndexId, TableId}; use spacetimedb_sats::{ bsatn::{self, ToBsatn}, @@ -18,6 +19,7 @@ use spacetimedb_table::indexes::RowPointer; use spacetimedb_table::table::RowRef; use std::ops::DerefMut; use std::sync::Arc; +use std::vec::IntoIter; #[derive(Clone)] pub struct InstanceEnv { @@ -170,6 +172,11 @@ impl InstanceEnv { } } + /// Returns the database's identity. + pub fn database_identity(&self) -> &Identity { + &self.replica_ctx.database.database_identity + } + /// Signal to this `InstanceEnv` that a reducer call is beginning. pub fn start_reducer(&mut self, ts: Timestamp) { self.start_time = ts; @@ -189,6 +196,21 @@ impl InstanceEnv { ); } + /// End a console timer by logging the span at INFO level. + pub(crate) fn console_timer_end(&self, span: &TimingSpan, bt: &dyn BacktraceProvider) { + let elapsed = span.start.elapsed(); + let message = format!("Timing span {:?}: {:?}", &span.name, elapsed); + + let record = Record { + ts: chrono::Utc::now(), + target: None, + filename: None, + line_number: None, + message: &message, + }; + self.console_log(LogLevel::Info, &record, bt); + } + /// Project `cols` in `row_ref` encoded in BSATN to `buffer` /// and return the full length of the BSATN. /// @@ -469,6 +491,33 @@ impl InstanceEnv { Ok(chunks) } + + pub fn fill_buffer_from_iter( + iter: &mut IntoIter>, + mut buffer: &mut [u8], + chunk_pool: &mut ChunkPool, + ) -> usize { + let mut written = 0; + // Fill the buffer as much as possible. + while let Some(chunk) = iter.as_slice().first() { + let Some((buf_chunk, rest)) = buffer.split_at_mut_checked(chunk.len()) else { + // Cannot fit chunk into the buffer, + // either because we already filled it too much, + // or because it is too small. + break; + }; + buf_chunk.copy_from_slice(chunk); + written += chunk.len(); + buffer = rest; + + // Advance the iterator, as we used a chunk. + // SAFETY: We peeked one `chunk`, so there must be one at least. + let chunk = unsafe { iter.next().unwrap_unchecked() }; + chunk_pool.put(chunk); + } + + written + } } impl TxSlot { diff --git a/crates/core/src/host/module_common.rs b/crates/core/src/host/module_common.rs index e93b68c1172..d5826ff54c5 100644 --- a/crates/core/src/host/module_common.rs +++ b/crates/core/src/host/module_common.rs @@ -5,6 +5,7 @@ use crate::{ energy::EnergyMonitor, host::{ module_host::{DynModule, ModuleInfo}, + wasm_common::{module_host_actor::DescribeError, DESCRIBE_MODULE_DUNDER}, Scheduler, }, module_host_context::ModuleCreationContext, @@ -88,3 +89,22 @@ impl DynModule for ModuleCommon { &self.scheduler } } + +/// Runs the describer of modules in `run` and does some logging around it. +pub(crate) fn run_describer( + log_traceback: impl FnOnce(&str, &str, &anyhow::Error), + run: impl FnOnce() -> anyhow::Result, +) -> Result { + let describer_func_name = DESCRIBE_MODULE_DUNDER; + let start = std::time::Instant::now(); + log::trace!("Start describer \"{describer_func_name}\"..."); + + let result = run(); + + let duration = start.elapsed(); + log::trace!("Describer \"{}\" ran: {} us", describer_func_name, duration.as_micros()); + + result + .inspect_err(|err| log_traceback("describer", describer_func_name, err)) + .map_err(DescribeError::RuntimeError) +} diff --git a/crates/core/src/host/v8/de.rs b/crates/core/src/host/v8/de.rs index 3103cf14095..7828d5bdf65 100644 --- a/crates/core/src/host/v8/de.rs +++ b/crates/core/src/host/v8/de.rs @@ -92,7 +92,7 @@ impl de::Error for Error<'_> { } /// Returns a scratch buffer to fill when deserializing strings. -fn scratch_buf() -> [MaybeUninit; N] { +pub(crate) fn scratch_buf() -> [MaybeUninit; N] { [const { MaybeUninit::uninit() }; N] } diff --git a/crates/core/src/host/v8/error.rs b/crates/core/src/host/v8/error.rs index 1021f9c5f11..9491d326ce8 100644 --- a/crates/core/src/host/v8/error.rs +++ b/crates/core/src/host/v8/error.rs @@ -59,12 +59,12 @@ impl<'scope, M: IntoJsString> IntoException<'scope> for RangeError { } #[derive(Debug)] -pub(super) struct ExceptionThrown { +pub(crate) struct ExceptionThrown { _priv: (), } /// A result where the error indicates that an exception has already been thrown in V8. -pub(super) type ExcResult = Result; +pub(crate) type ExcResult = Result; /// Indicates that the JS side had thrown an exception. pub(super) fn exception_already_thrown() -> ExceptionThrown { @@ -126,7 +126,9 @@ pub(super) struct JsError { impl fmt::Display for JsError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "js error {}", self.msg)?; - writeln!(f, "{}", self.trace)?; + if !f.alternate() { + writeln!(f, "{}", self.trace)?; + } Ok(()) } } @@ -204,7 +206,7 @@ impl fmt::Display for JsStackTraceFrame { // This isn't exactly the same format as chrome uses, // but it's close enough for now. - // TODO(centril): make it more like chrome in the future. + // TODO(v8): make it more like chrome in the future. f.write_fmt(format_args!( "at {} ({}:{}:{})", fn_name, script_name, &self.line, &self.column @@ -249,6 +251,16 @@ impl JsError { } } +pub(super) fn log_traceback(func_type: &str, func: &str, e: &anyhow::Error) { + log::info!("{func_type} \"{func}\" runtime error: {e:#}"); + if let Some(js_err) = e.downcast_ref::() { + log::info!("js error {}", js_err.msg); + for (index, frame) in js_err.trace.frames.iter().enumerate() { + log::info!(" Frame #{index}: {frame}"); + } + } +} + /// Run `body` within a try-catch context and capture any JS exception thrown as a [`JsError`]. pub(super) fn catch_exception<'scope, T>( scope: &mut HandleScope<'scope>, diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 92e1fb0c896..76859dae66c 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -3,27 +3,41 @@ use super::module_common::{build_common_module_from_raw, ModuleCommon}; use super::module_host::{CallReducerParams, DynModule, Module, ModuleInfo, ModuleInstance, ModuleRuntime}; use super::UpdateDatabaseResult; +use crate::database_logger::{BacktraceFrame, BacktraceProvider, ModuleBacktrace, Record}; +use crate::host::instance_env::{ChunkPool, InstanceEnv}; +use crate::host::module_common::run_describer; +use crate::host::v8::de::{scratch_buf, v8_interned_string}; use crate::host::wasm_common::instrumentation::CallTimes; use crate::host::wasm_common::module_host_actor::{ - EnergyStats, ExecuteResult, ExecutionTimings, InstanceCommon, ReducerOp, + DescribeError, EnergyStats, ExecuteResult, ExecutionTimings, InstanceCommon, ReducerOp, }; +use crate::host::wasm_common::{RowIterIdx, RowIters, TimingSpan, TimingSpanIdx, TimingSpanSet}; +use crate::host::wasmtime::{epoch_ticker, ticks_in_duration, EPOCH_TICKS_PER_SECOND}; use crate::host::ArgsTuple; use crate::{host::Scheduler, module_host_context::ModuleCreationContext, replica_context::ReplicaContext}; -use anyhow::anyhow; +use anyhow::Context as _; +use core::ffi::c_void; +use core::sync::atomic::{AtomicBool, Ordering}; use core::time::Duration; +use core::{ptr, str}; use de::deserialize_js; -use error::{catch_exception, exception_already_thrown, ExcResult, Throwable}; +use error::{catch_exception, exception_already_thrown, log_traceback, ExcResult, Throwable}; use from_value::cast; use key_cache::get_or_create_key_cache; use ser::serialize_to_js; -use spacetimedb_client_api_messages::energy::{EnergyQuanta, ReducerBudget}; +use spacetimedb_client_api_messages::energy::ReducerBudget; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::traits::Program; -use spacetimedb_lib::RawModuleDef; -use spacetimedb_lib::{ConnectionId, Identity}; +use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef, Timestamp}; +use spacetimedb_primitives::{ColId, IndexId, TableId}; +use spacetimedb_sats::Serialize; use spacetimedb_schema::auto_migrate::MigrationPolicy; use std::sync::{Arc, LazyLock}; -use v8::{Context, ContextOptions, ContextScope, Function, HandleScope, Isolate, Local, Value}; +use std::time::Instant; +use v8::{ + Context, ContextOptions, ContextScope, Function, FunctionCallbackArguments, HandleScope, Isolate, IsolateHandle, + Local, Object, OwnedIsolate, ReturnValue, Value, +}; mod de; mod error; @@ -80,21 +94,27 @@ impl V8RuntimeInner { mcc.program.hash, ); - if true { - return Err::(anyhow!("v8_todo")); - } + // TODO(v8): determine min required ABI by module and check that it's supported? + + // TODO(v8): validate function signatures like in WASM? Is that possible with V8? + + // Convert program to a string. + let program: Arc = str::from_utf8(&mcc.program.bytes)?.into(); + + // Run the program as a script and extract the raw module def. + let desc = extract_description(&program)?; - let desc = todo!(); // Validate and create a common module rom the raw definition. let common = build_common_module_from_raw(mcc, desc)?; - Ok(JsModule { common }) + Ok(JsModule { common, program }) } } #[derive(Clone)] struct JsModule { common: ModuleCommon, + program: Arc, } impl DynModule for JsModule { @@ -121,13 +141,111 @@ impl Module for JsModule { } fn create_instance(&self) -> Self::Instance { - todo!() + // TODO(v8): do we care about preinits / setup or are they unnecessary? + + let common = &self.common; + let instance_env = InstanceEnv::new(common.replica_ctx().clone(), common.scheduler().clone()); + let instance = Some(JsInstanceEnv { + instance_env, + reducer_start: Instant::now(), + call_times: CallTimes::new(), + iters: Default::default(), + reducer_name: String::from(""), + chunk_pool: <_>::default(), + timing_spans: <_>::default(), + }); + + // NOTE(centril): We don't need to do `extract_description` here + // as unlike WASM, we have to recreate the isolate every time. + + let common = InstanceCommon::new(common); + let program = self.program.clone(); + + JsInstance { + common, + instance, + program, + } + } +} + +const EXPECT_ENV: &str = "there should be a `JsInstanceEnv`"; + +fn env_on_isolate(isolate: &mut Isolate) -> &mut JsInstanceEnv { + isolate.get_slot_mut().expect(EXPECT_ENV) +} + +fn env_on_instance(inst: &mut JsInstance) -> &mut JsInstanceEnv { + inst.instance.as_mut().expect(EXPECT_ENV) +} + +struct JsInstanceEnv { + instance_env: InstanceEnv, + + /// The slab of `BufferIters` created for this instance. + iters: RowIters, + + /// Track time spent in module-defined spans. + timing_spans: TimingSpanSet, + + /// The point in time the last reducer call started at. + reducer_start: Instant, + + /// Track time spent in all wasm instance env calls (aka syscall time). + /// + /// Each function, like `insert`, will add the `Duration` spent in it + /// to this tracker. + call_times: CallTimes, + + /// The last, including current, reducer to be executed by this environment. + reducer_name: String, + + /// A pool of unused allocated chunks that can be reused. + // TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`. + chunk_pool: ChunkPool, +} + +impl JsInstanceEnv { + /// Signal to this `WasmInstanceEnv` that a reducer call is beginning. + /// + /// Returns the handle used by reducers to read from `args` + /// as well as the handle used to write the error message, if any. + pub fn start_reducer(&mut self, name: &str, ts: Timestamp) { + self.reducer_start = Instant::now(); + name.clone_into(&mut self.reducer_name); + self.instance_env.start_reducer(ts); + } + + /// Returns the name of the most recent reducer to be run in this environment. + pub fn reducer_name(&self) -> &str { + &self.reducer_name + } + + /// Returns the name of the most recent reducer to be run in this environment. + pub fn reducer_start(&self) -> Instant { + self.reducer_start + } + + /// Signal to this `WasmInstanceEnv` that a reducer call is over. + /// This resets all of the state associated to a single reducer call, + /// and returns instrumentation records. + pub fn finish_reducer(&mut self) -> ExecutionTimings { + let total_duration = self.reducer_start.elapsed(); + + // Taking the call times record also resets timings to 0s for the next call. + let wasm_instance_env_call_times = self.call_times.take(); + + ExecutionTimings { + total_duration, + wasm_instance_env_call_times, + } } } struct JsInstance { common: InstanceCommon, - replica_ctx: Arc, + instance: Option, + program: Arc, } impl ModuleInstance for JsInstance { @@ -141,58 +259,175 @@ impl ModuleInstance for JsInstance { old_module_info: Arc, policy: MigrationPolicy, ) -> anyhow::Result { - let replica_ctx = &self.replica_ctx; + let replica_ctx = &env_on_instance(self).instance_env.replica_ctx.clone(); self.common .update_database(replica_ctx, program, old_module_info, policy) } fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> super::ReducerCallResult { - // TODO(centril): snapshots, module->host calls - let mut isolate = Isolate::new(<_>::default()); - let scope = &mut HandleScope::new(&mut isolate); - let context = Context::new(scope, ContextOptions::default()); - let scope = &mut ContextScope::new(scope, context); - - self.common.call_reducer_with_tx( - &self.replica_ctx.clone(), - tx, - params, - // TODO(centril): logging. - |_ty, _fun, _err| {}, - |tx, op, _budget| { - let call_result = call_call_reducer_from_op(scope, op); - // TODO(centril): energy metrering. - let energy = EnergyStats { - used: EnergyQuanta::ZERO, - wasmtime_fuel_used: 0, - remaining: ReducerBudget::ZERO, - }; - // TODO(centril): timings. - let timings = ExecutionTimings { - total_duration: Duration::ZERO, - wasm_instance_env_call_times: CallTimes::new(), - }; + let replica_ctx = env_on_instance(self).instance_env.replica_ctx.clone(); + + self.common + .call_reducer_with_tx(&replica_ctx, tx, params, log_traceback, |tx, op, budget| { + let callback_every = EPOCH_TICKS_PER_SECOND; + extern "C" fn callback(isolate: &mut Isolate, _: *mut c_void) { + let env = env_on_isolate(isolate); + let database = env.instance_env.replica_ctx.database_identity; + let reducer = env.reducer_name(); + let dur = env.reducer_start().elapsed(); + tracing::warn!(reducer, ?database, "Wasm has been running for {dur:?}"); + } + + // Prepare the isolate with the env. + let mut isolate = Isolate::new(<_>::default()); + isolate.set_slot(self.instance.take().expect(EXPECT_ENV)); + + // TODO(v8): snapshots, module->host calls + // Call the reducer. + env_on_isolate(&mut isolate).instance_env.start_reducer(op.timestamp); + let (mut isolate, (tx, call_result)) = + with_script(isolate, &self.program, callback_every, callback, budget, |scope, _| { + let (tx, call_result) = env_on_isolate(scope) + .instance_env + .tx + .clone() + .set(tx, || call_call_reducer_from_op(scope, op)); + (tx, call_result) + }); + let timings = env_on_isolate(&mut isolate).finish_reducer(); + self.instance = isolate.remove_slot(); + + // Derive energy stats. + let used = duration_to_budget(timings.total_duration); + let remaining = budget - used; + let energy = EnergyStats { budget, remaining }; + + // Fetch the currently used heap size in V8. + // The used size is ostensibly fairer than the total size. + let memory_allocation = isolate.get_heap_statistics().used_heap_size(); + let exec_result = ExecuteResult { energy, timings, - // TODO(centril): memory allocation. - memory_allocation: 0, + memory_allocation, call_result, }; (tx, exec_result) - }, - ) + }) } } +fn with_script( + isolate: OwnedIsolate, + code: &str, + callback_every: u64, + callback: IsolateCallback, + budget: ReducerBudget, + logic: impl for<'scope> FnOnce(&mut HandleScope<'scope>, Local<'scope, Value>) -> R, +) -> (OwnedIsolate, R) { + with_scope(isolate, callback_every, callback, budget, |scope| { + let code = v8::String::new(scope, code).unwrap(); + let script_val = v8::Script::compile(scope, code, None).unwrap().run(scope).unwrap(); + + register_host_funs(scope); + + logic(scope, script_val) + }) +} + +/// Sets up an isolate and run `logic` with a [`HandleScope`]. +pub(crate) fn with_scope( + mut isolate: OwnedIsolate, + callback_every: u64, + callback: IsolateCallback, + budget: ReducerBudget, + logic: impl FnOnce(&mut HandleScope<'_>) -> R, +) -> (OwnedIsolate, R) { + isolate.set_capture_stack_trace_for_uncaught_exceptions(true, 1024); + let isolate_handle = isolate.thread_safe_handle(); + let mut scope_1 = HandleScope::new(&mut isolate); + let context = Context::new(&mut scope_1, ContextOptions::default()); + let mut scope_2 = ContextScope::new(&mut scope_1, context); + + let timeout_thread_cancel_flag = run_reducer_timeout(callback_every, callback, budget, isolate_handle); + + let ret = logic(&mut scope_2); + drop(scope_2); + drop(scope_1); + + // Cancel the execution timeout in `run_reducer_timeout`. + timeout_thread_cancel_flag.store(true, Ordering::Relaxed); + + (isolate, ret) +} + +type IsolateCallback = extern "C" fn(&mut Isolate, *mut c_void); + +/// Spawns a thread that will terminate reducer execution +/// when `budget` has been used up. +/// +/// Every `callback_every` ticks, `callback` is called. +fn run_reducer_timeout( + callback_every: u64, + callback: IsolateCallback, + budget: ReducerBudget, + isolate_handle: IsolateHandle, +) -> Arc { + let execution_done_flag = Arc::new(AtomicBool::new(false)); + let execution_done_flag2 = execution_done_flag.clone(); + let timeout = budget_to_duration(budget); + let max_ticks = ticks_in_duration(timeout); + + let mut num_ticks = 0; + epoch_ticker(move || { + // Check if execution completed. + if execution_done_flag2.load(Ordering::Relaxed) { + return None; + } + + // We've reached the number of ticks to call `callback`. + if num_ticks % callback_every == 0 && isolate_handle.request_interrupt(callback, ptr::null_mut()) { + return None; + } + + if num_ticks == max_ticks { + // Execution still ongoing while budget has been exhausted. + // Terminate V8 execution. + // This implements "gas" for v8. + isolate_handle.terminate_execution(); + } + + num_ticks += 1; + Some(()) + }); + + execution_done_flag +} + +/// Converts a [`ReducerBudget`] to a [`Duration`]. +fn budget_to_duration(_budget: ReducerBudget) -> Duration { + // TODO(v8): This is fake logic that allows a maximum timeout. + // Replace with sensible math. + Duration::MAX +} + +/// Converts a [`Duration`] to a [`ReducerBudget`]. +fn duration_to_budget(_duration: Duration) -> ReducerBudget { + // TODO(v8): This is fake logic that allows minimum energy usage. + // Replace with sensible math. + ReducerBudget::ZERO +} + +fn global<'scope>(scope: &mut HandleScope<'scope>) -> Local<'scope, Object> { + scope.get_current_context().global(scope) +} + /// Returns the global property `key`. fn get_global_property<'scope>( scope: &mut HandleScope<'scope>, key: Local<'scope, v8::String>, ) -> ExcResult> { - scope - .get_current_context() - .global(scope) + global(scope) .get(scope, key.into()) .ok_or_else(exception_already_thrown) } @@ -256,6 +491,23 @@ fn call_call_reducer( .map_err(Into::into) } +/// Extracts the raw module def by running `__describe_module__` in `program`. +fn extract_description(program: &str) -> Result { + let budget = ReducerBudget::DEFAULT_BUDGET; + let callback_every = EPOCH_TICKS_PER_SECOND; + extern "C" fn callback(_: &mut Isolate, _: *mut c_void) {} + + let (_, ret) = with_script( + Isolate::new(<_>::default()), + program, + callback_every, + callback, + budget, + |scope, _| run_describer(log_traceback, || call_describe_module(scope)), + ); + ret +} + // Calls the `__describe_module__` function on the global proxy object to extract a [`RawModuleDef`]. fn call_describe_module(scope: &mut HandleScope<'_>) -> anyhow::Result { // Get a cached version of the `__describe_module__` property. @@ -278,6 +530,340 @@ fn call_describe_module(scope: &mut HandleScope<'_>) -> anyhow::Result(scope: &mut HandleScope<'s>, args: FunctionCallbackArguments<'s>) -> FnRet<'s> { + let name: &str = deserialize_js(scope, args.get(0))?; + let id = env_on_isolate(scope).instance_env.table_id_from_name(name).unwrap(); + let ret = serialize_to_js(scope, &id)?; + Ok(ret) +} + +fn index_id_from_name<'s>(scope: &mut HandleScope<'s>, args: FunctionCallbackArguments<'s>) -> FnRet<'s> { + let name: &str = deserialize_js(scope, args.get(0))?; + let id = env_on_isolate(scope).instance_env.index_id_from_name(name).unwrap(); + let ret = serialize_to_js(scope, &id)?; + Ok(ret) +} + +fn datastore_table_row_count<'s>(scope: &mut HandleScope<'s>, args: FunctionCallbackArguments<'s>) -> FnRet<'s> { + let table_id: TableId = deserialize_js(scope, args.get(0))?; + let count = env_on_isolate(scope) + .instance_env + .datastore_table_row_count(table_id) + .unwrap(); + serialize_to_js(scope, &count) +} + +fn datastore_table_scan_bsatn<'s>(scope: &mut HandleScope<'s>, args: FunctionCallbackArguments<'s>) -> FnRet<'s> { + let table_id: TableId = deserialize_js(scope, args.get(0))?; + + let env = env_on_isolate(scope); + // Collect the iterator chunks. + let chunks = env + .instance_env + .datastore_table_scan_bsatn_chunks(&mut env.chunk_pool, table_id) + .unwrap(); + + // Register the iterator and get back the index to write to `out`. + // Calls to the iterator are done through dynamic dispatch. + let idx = env.iters.insert(chunks.into_iter()); + + let ret = serialize_to_js(scope, &idx.0)?; + Ok(ret) +} + +fn convert_u32_to_col_id(col_id: u32) -> anyhow::Result { + let col_id: u16 = col_id.try_into().context("ABI violation, a `ColId` must be a `u16`")?; + Ok(col_id.into()) +} + +fn datastore_index_scan_range_bsatn<'s>(scope: &mut HandleScope<'s>, args: FunctionCallbackArguments<'s>) -> FnRet<'s> { + let index_id: IndexId = deserialize_js(scope, args.get(0))?; + + let prefix_elems: u32 = deserialize_js(scope, args.get(2))?; + let prefix_elems = convert_u32_to_col_id(prefix_elems).unwrap(); + + let prefix: &[u8] = if prefix_elems.idx() == 0 { + &[] + } else { + deserialize_js(scope, args.get(1))? + }; + + let rstart: &[u8] = deserialize_js(scope, args.get(3))?; + let rend: &[u8] = deserialize_js(scope, args.get(4))?; + + let env = env_on_isolate(scope); + + // Find the relevant rows. + let chunks = env + .instance_env + .datastore_index_scan_range_bsatn_chunks(&mut env.chunk_pool, index_id, prefix, prefix_elems, rstart, rend) + .unwrap(); + + // Insert the encoded + concatenated rows into a new buffer and return its id. + let idx = env.iters.insert(chunks.into_iter()); + + let ret = serialize_to_js(scope, &idx.0)?; + Ok(ret) +} + +fn row_iter_bsatn_advance<'s>(scope: &mut HandleScope<'s>, args: FunctionCallbackArguments<'s>) -> FnRet<'s> { + let row_iter_idx: u32 = deserialize_js(scope, args.get(0))?; + let row_iter_idx = RowIterIdx(row_iter_idx); + let buffer_max_len: u32 = deserialize_js(scope, args.get(1))?; + + // Retrieve the iterator by `row_iter_idx`, or error. + let env = env_on_isolate(scope); + let iter = env.iters.get_mut(row_iter_idx).unwrap(); + + // Allocate a buffer with `buffer_max_len` capacity. + let mut buffer = vec![0; buffer_max_len as usize]; + // Fill the buffer as much as possible. + let written = InstanceEnv::fill_buffer_from_iter(iter, &mut buffer, &mut env.chunk_pool); + buffer.truncate(written); + + let ret = match (written, iter.as_slice().first()) { + // Nothing was written and the iterator is not exhausted. + (0, Some(_chunk)) => { + unimplemented!() + } + // The iterator is exhausted, destroy it, and tell the caller. + (_, None) => { + env.iters.take(row_iter_idx); + serialize_to_js(scope, &AdvanceRet { flag: -1, buffer })? + } + // Something was written, but the iterator is not exhausted. + (_, Some(_)) => serialize_to_js(scope, &AdvanceRet { flag: 0, buffer })?, + }; + Ok(ret) +} + +#[derive(Serialize)] +struct AdvanceRet { + buffer: Vec, + flag: i32, +} + +fn row_iter_bsatn_close<'s>(scope: &mut HandleScope<'s>, args: FunctionCallbackArguments<'s>) -> FnRet<'s> { + let row_iter_idx: u32 = deserialize_js(scope, args.get(0))?; + let row_iter_idx = RowIterIdx(row_iter_idx); + + // Retrieve the iterator by `row_iter_idx`, or error. + let env = env_on_isolate(scope); + + // Retrieve the iterator by `row_iter_idx`, or error. + Ok(match env.iters.take(row_iter_idx) { + None => unimplemented!(), + // TODO(Centril): consider putting these into a pool for reuse. + Some(_) => serialize_to_js(scope, &0u32)?, + }) +} + +fn datastore_insert_bsatn<'s>(scope: &mut HandleScope<'s>, args: FunctionCallbackArguments<'s>) -> FnRet<'s> { + let table_id: TableId = deserialize_js(scope, args.get(0))?; + let mut row: Vec = deserialize_js(scope, args.get(1))?; + + // Insert the row into the DB and write back the generated column values. + let env: &mut JsInstanceEnv = env_on_isolate(scope); + let row_len = env.instance_env.insert(table_id, &mut row).unwrap(); + row.truncate(row_len); + + serialize_to_js(scope, &row) +} + +fn datastore_update_bsatn<'s>(scope: &mut HandleScope<'s>, args: FunctionCallbackArguments<'s>) -> FnRet<'s> { + let table_id: TableId = deserialize_js(scope, args.get(0))?; + let index_id: IndexId = deserialize_js(scope, args.get(1))?; + let mut row: Vec = deserialize_js(scope, args.get(2))?; + + // Insert the row into the DB and write back the generated column values. + let env: &mut JsInstanceEnv = env_on_isolate(scope); + let row_len = env.instance_env.update(table_id, index_id, &mut row).unwrap(); + row.truncate(row_len); + + serialize_to_js(scope, &row) +} + +fn datastore_delete_by_index_scan_range_bsatn<'s>( + scope: &mut HandleScope<'s>, + args: FunctionCallbackArguments<'s>, +) -> FnRet<'s> { + let index_id: IndexId = deserialize_js(scope, args.get(0))?; + + let prefix_elems: u32 = deserialize_js(scope, args.get(2))?; + let prefix_elems = convert_u32_to_col_id(prefix_elems).unwrap(); + + let prefix: &[u8] = if prefix_elems.idx() == 0 { + &[] + } else { + deserialize_js(scope, args.get(1))? + }; + + let rstart: &[u8] = deserialize_js(scope, args.get(3))?; + let rend: &[u8] = deserialize_js(scope, args.get(4))?; + + let env = env_on_isolate(scope); + + // Delete the relevant rows. + let num = env + .instance_env + .datastore_delete_by_index_scan_range_bsatn(index_id, prefix, prefix_elems, rstart, rend) + .unwrap(); + + serialize_to_js(scope, &num) +} + +fn datastore_delete_all_by_eq_bsatn<'s>(scope: &mut HandleScope<'s>, args: FunctionCallbackArguments<'s>) -> FnRet<'s> { + let table_id: TableId = deserialize_js(scope, args.get(0))?; + let relation: &[u8] = deserialize_js(scope, args.get(1))?; + + let env = env_on_isolate(scope); + let num = env + .instance_env + .datastore_delete_all_by_eq_bsatn(table_id, relation) + .unwrap(); + + serialize_to_js(scope, &num) +} + +fn volatile_nonatomic_schedule_immediate<'s>( + scope: &mut HandleScope<'s>, + args: FunctionCallbackArguments<'s>, +) -> FnRet<'s> { + let name: String = deserialize_js(scope, args.get(0))?; + let args: Vec = deserialize_js(scope, args.get(1))?; + + let env = env_on_isolate(scope); + env.instance_env + .scheduler + .volatile_nonatomic_schedule_immediate(name, crate::host::ReducerArgs::Bsatn(args.into())); + + Ok(v8::undefined(scope).into()) +} + +fn console_log<'s>(scope: &mut HandleScope<'s>, args: FunctionCallbackArguments<'s>) -> FnRet<'s> { + let level: u32 = deserialize_js(scope, args.get(0))?; + + let msg = args.get(1).cast::(); + let mut buf = scratch_buf::<128>(); + let msg = msg.to_rust_cow_lossy(scope, &mut buf); + let frame: Local<'_, v8::StackFrame> = v8::StackTrace::current_stack_trace(scope, 2) + .ok_or_else(exception_already_thrown)? + .get_frame(scope, 1) + .ok_or_else(exception_already_thrown)?; + let mut buf = scratch_buf::<32>(); + let filename = frame + .get_script_name(scope) + .map(|s| s.to_rust_cow_lossy(scope, &mut buf)); + let record = Record { + // TODO: figure out whether to use walltime now or logical reducer now (env.reducer_start) + ts: chrono::Utc::now(), + target: None, + filename: filename.as_deref(), + line_number: Some(frame.get_line_number() as u32), + message: &msg, + }; + + let env = env_on_isolate(scope); + env.instance_env.console_log((level as u8).into(), &record, &Noop); + + Ok(v8::undefined(scope).into()) +} + +struct Noop; +impl BacktraceProvider for Noop { + fn capture(&self) -> Box { + Box::new(Noop) + } +} +impl ModuleBacktrace for Noop { + fn frames(&self) -> Vec> { + Vec::new() + } +} + +fn console_timer_start<'s>(scope: &mut HandleScope<'s>, args: FunctionCallbackArguments<'s>) -> FnRet<'s> { + let name = args.get(0).cast::(); + let mut buf = scratch_buf::<128>(); + let name = name.to_rust_cow_lossy(scope, &mut buf).into_owned(); + + let env = env_on_isolate(scope); + let span_id = env.timing_spans.insert(TimingSpan::new(name)).0; + serialize_to_js(scope, &span_id) +} + +fn console_timer_end<'s>(scope: &mut HandleScope<'s>, args: FunctionCallbackArguments<'s>) -> FnRet<'s> { + let span_id: u32 = deserialize_js(scope, args.get(0))?; + + let env = env_on_isolate(scope); + let span = env.timing_spans.take(TimingSpanIdx(span_id)).unwrap(); + env.instance_env.console_timer_end(&span, &Noop); + + serialize_to_js(scope, &0u32) +} + +fn identity<'s>(scope: &mut HandleScope<'s>, _: FunctionCallbackArguments<'s>) -> FnRet<'s> { + let env = env_on_isolate(scope); + let identity = *env.instance_env.database_identity(); + serialize_to_js(scope, &identity) +} + +fn register_host_funs(scope: &mut HandleScope<'_>) { + register_host_fun(scope, "table_id_from_name", table_id_from_name); + register_host_fun(scope, "index_id_from_name", index_id_from_name); + register_host_fun(scope, "datastore_table_row_count", datastore_table_row_count); + register_host_fun(scope, "datastore_table_scan_bsatn", datastore_table_scan_bsatn); + register_host_fun( + scope, + "datastore_index_scan_range_bsatn", + datastore_index_scan_range_bsatn, + ); + register_host_fun(scope, "row_iter_bsatn_advance", row_iter_bsatn_advance); + register_host_fun(scope, "row_iter_bsatn_close", row_iter_bsatn_close); + register_host_fun(scope, "datastore_insert_bsatn", datastore_insert_bsatn); + register_host_fun(scope, "datastore_update_bsatn", datastore_update_bsatn); + register_host_fun( + scope, + "datastore_delete_by_index_scan_range_bsatn", + datastore_delete_by_index_scan_range_bsatn, + ); + register_host_fun( + scope, + "datastore_delete_all_by_eq_bsatn", + datastore_delete_all_by_eq_bsatn, + ); + register_host_fun( + scope, + "volatile_nonatomic_schedule_immediate", + volatile_nonatomic_schedule_immediate, + ); + register_host_fun(scope, "console_log", console_log); + register_host_fun(scope, "console_timer_start", console_timer_start); + register_host_fun(scope, "console_timer_end", console_timer_end); + register_host_fun(scope, "identity", identity); +} + +type FnRet<'s> = ExcResult>; + +fn register_host_fun( + scope: &mut HandleScope<'_>, + name: &str, + fun: impl Copy + for<'s> Fn(&mut HandleScope<'s>, FunctionCallbackArguments<'s>) -> FnRet<'s>, +) { + let name = v8_interned_string(scope, name).into(); + let fun = Function::new(scope, adapt_fun(fun)).unwrap().into(); + global(scope).set(scope, name, fun).unwrap(); +} + +fn adapt_fun( + fun: impl Copy + for<'s> Fn(&mut HandleScope<'s>, FunctionCallbackArguments<'s>) -> FnRet<'s>, +) -> impl Copy + for<'s> Fn(&mut HandleScope<'s>, FunctionCallbackArguments<'s>, ReturnValue) { + move |scope, args, mut rv| { + if let Ok(value) = fun(scope, args) { + rv.set(value); + } + } +} + #[cfg(test)] mod test { use super::*; diff --git a/crates/core/src/host/v8/ser.rs b/crates/core/src/host/v8/ser.rs index 62f6130ba08..2965df81aa1 100644 --- a/crates/core/src/host/v8/ser.rs +++ b/crates/core/src/host/v8/ser.rs @@ -150,7 +150,7 @@ impl<'this, 'scope> ser::Serializer for Serializer<'this, 'scope> { } fn serialize_named_product(self, _len: usize) -> Result { - // TODO(noa): this can be more efficient if we tell it the names ahead of time + // TODO(v8, noa): this can be more efficient if we tell it the names ahead of time let object = Object::new(self.scope); Ok(SerializeNamedProduct { inner: self, diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index 666cc808ea3..f7ef73dd82a 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -320,7 +320,7 @@ impl ResourceSlab { decl_index!(RowIterIdx => std::vec::IntoIter>); pub(super) type RowIters = ResourceSlab; -pub(super) struct TimingSpan { +pub(crate) struct TimingSpan { pub start: Instant, pub name: String, } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index f46f69ed2be..7eeb35d3505 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -8,7 +8,7 @@ use tracing::span::EnteredSpan; use super::instrumentation::CallTimes; use crate::client::ClientConnectionSender; use crate::database_logger; -use crate::energy::{EnergyMonitor, EnergyQuanta, ReducerBudget, ReducerFingerprint}; +use crate::energy::{EnergyMonitor, ReducerBudget, ReducerFingerprint}; use crate::host::instance_env::InstanceEnv; use crate::host::module_common::{build_common_module_from_raw, ModuleCommon}; use crate::host::module_host::{ @@ -60,11 +60,17 @@ pub trait WasmInstance: Send + Sync + 'static { } pub struct EnergyStats { - pub used: EnergyQuanta, - pub wasmtime_fuel_used: u64, + pub budget: ReducerBudget, pub remaining: ReducerBudget, } +impl EnergyStats { + /// Returns the used energy amount. + fn used(&self) -> ReducerBudget { + (self.budget.get() - self.remaining.get()).into() + } +} + pub struct ExecutionTimings { pub total_duration: Duration, pub wasm_instance_env_call_times: CallTimes, @@ -160,16 +166,7 @@ impl WasmModuleHostActor { impl WasmModuleHostActor { fn make_from_instance(&self, instance: T::Instance) -> WasmModuleInstance { - let common = InstanceCommon { - info: self.common.info(), - energy_monitor: self.common.energy_monitor(), - // will be updated on the first reducer call - allocated_memory: 0, - metric_wasm_memory_bytes: WORKER_METRICS - .wasm_memory_bytes - .with_label_values(self.common.database_identity()), - trapped: false, - }; + let common = InstanceCommon::new(&self.common); WasmModuleInstance { instance, common } } } @@ -273,6 +270,19 @@ pub(crate) struct InstanceCommon { } impl InstanceCommon { + pub(crate) fn new(module: &ModuleCommon) -> Self { + Self { + info: module.info(), + energy_monitor: module.energy_monitor(), + // Will be updated on the first reducer call. + allocated_memory: 0, + metric_wasm_memory_bytes: WORKER_METRICS + .wasm_memory_bytes + .with_label_values(module.database_identity()), + trapped: false, + } + } + #[tracing::instrument(level = "trace", skip_all)] pub(crate) fn update_database( &mut self, @@ -407,14 +417,16 @@ impl InstanceCommon { call_result, } = result; + let energy_used = energy.used(); + let energy_quanta_used = energy_used.into(); vm_metrics.report( - energy.wasmtime_fuel_used, + energy_used.get(), timings.total_duration, &timings.wasm_instance_env_call_times, ); self.energy_monitor - .record_reducer(&energy_fingerprint, energy.used, timings.total_duration); + .record_reducer(&energy_fingerprint, energy_quanta_used, timings.total_duration); if self.allocated_memory != memory_allocation { self.metric_wasm_memory_bytes.set(memory_allocation as i64); self.allocated_memory = memory_allocation; @@ -422,7 +434,7 @@ impl InstanceCommon { reducer_span .record("timings.total_duration", tracing::field::debug(timings.total_duration)) - .record("energy.used", tracing::field::debug(energy.used)); + .record("energy.used", tracing::field::debug(energy_used)); maybe_log_long_running_reducer(reducer_name, timings.total_duration); reducer_span.exit(); @@ -480,7 +492,7 @@ impl InstanceCommon { args, }, status, - energy_quanta_used: energy.used, + energy_quanta_used, host_execution_duration: timings.total_duration, request_id, timer, @@ -489,7 +501,7 @@ impl InstanceCommon { ReducerCallResult { outcome: ReducerOutcome::from(&event.status), - energy_used: energy.used, + energy_used: energy_quanta_used, execution_duration: timings.total_duration, } } diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index 5a61d5e23ab..06d80f85da1 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -27,7 +27,23 @@ pub struct WasmtimeRuntime { const EPOCH_TICK_LENGTH: Duration = Duration::from_millis(10); -const EPOCH_TICKS_PER_SECOND: u64 = Duration::from_secs(1).div_duration_f64(EPOCH_TICK_LENGTH) as u64; +pub(crate) const EPOCH_TICKS_PER_SECOND: u64 = ticks_in_duration(Duration::from_secs(1)); + +pub(crate) const fn ticks_in_duration(duration: Duration) -> u64 { + duration.div_duration_f64(EPOCH_TICK_LENGTH) as u64 +} + +pub(crate) fn epoch_ticker(mut on_tick: impl 'static + Send + FnMut() -> Option<()>) { + tokio::spawn(async move { + let mut interval = tokio::time::interval(EPOCH_TICK_LENGTH); + loop { + interval.tick().await; + let Some(()) = on_tick() else { + return; + }; + } + }); +} impl WasmtimeRuntime { pub fn new(data_dir: Option<&ServerDataDir>) -> Self { @@ -53,13 +69,10 @@ impl WasmtimeRuntime { let engine = Engine::new(&config).unwrap(); let weak_engine = engine.weak(); - tokio::spawn(async move { - let mut interval = tokio::time::interval(EPOCH_TICK_LENGTH); - loop { - interval.tick().await; - let Some(engine) = weak_engine.upgrade() else { break }; - engine.increment_epoch(); - } + epoch_ticker(move || { + let engine = weak_engine.upgrade()?; + engine.increment_epoch(); + Some(()) }); let mut linker = Box::new(Linker::new(&engine)); diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index f4605274986..4a905fc4b90 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -71,6 +71,7 @@ pub(super) struct WasmInstanceEnv { /// The last, including current, reducer to be executed by this environment. reducer_name: String, + /// A pool of unused allocated chunks that can be reused. // TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`. chunk_pool: ChunkPool, @@ -601,27 +602,12 @@ impl WasmInstanceEnv { // Read `buffer_len`, i.e., the capacity of `buffer` pointed to by `buffer_ptr`. let buffer_len = u32::read_from(mem, buffer_len_ptr)?; let write_buffer_len = |mem, len| u32::try_from(len).unwrap().write_to(mem, buffer_len_ptr); + // Get a mutable view to the `buffer`. - let mut buffer = mem.deref_slice_mut(buffer_ptr, buffer_len)?; + let buffer = mem.deref_slice_mut(buffer_ptr, buffer_len)?; - let mut written = 0; // Fill the buffer as much as possible. - while let Some(chunk) = iter.as_slice().first() { - let Some((buf_chunk, rest)) = buffer.split_at_mut_checked(chunk.len()) else { - // Cannot fit chunk into the buffer, - // either because we already filled it too much, - // or because it is too small. - break; - }; - buf_chunk.copy_from_slice(chunk); - written += chunk.len(); - buffer = rest; - - // Advance the iterator, as we used a chunk. - // SAFETY: We peeked one `chunk`, so there must be one at least. - let chunk = unsafe { iter.next().unwrap_unchecked() }; - env.chunk_pool.put(chunk); - } + let written = InstanceEnv::fill_buffer_from_iter(iter, buffer, &mut env.chunk_pool); let ret = match (written, iter.as_slice().first()) { // Nothing was written and the iterator is not exhausted. @@ -1188,22 +1174,10 @@ impl WasmInstanceEnv { let Some(span) = caller.data_mut().timing_spans.take(TimingSpanIdx(span_id)) else { return Ok(errno::NO_SUCH_CONSOLE_TIMER.get().into()); }; - - let elapsed = span.start.elapsed(); - let message = format!("Timing span {:?}: {:?}", &span.name, elapsed); - - let record = Record { - ts: chrono::Utc::now(), - target: None, - filename: None, - line_number: None, - message: &message, - }; - caller.data().instance_env.console_log( - crate::database_logger::LogLevel::Info, - &record, - &caller.as_context(), - ); + caller + .data() + .instance_env + .console_timer_end(&span, &caller.as_context()); Ok(0) }) } @@ -1220,7 +1194,7 @@ impl WasmInstanceEnv { // as we want to possibly trap, but not to return an error code. Self::with_span(caller, AbiCall::Identity, |caller| { let (mem, env) = Self::mem_env(caller); - let identity = env.instance_env.replica_ctx.database.database_identity; + let identity = env.instance_env.database_identity(); // We're implicitly casting `out_ptr` to `WasmPtr` here. // (Both types are actually `u32`.) // This works because `Identity::write_to` does not require an aligned pointer, diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 141f7455518..48f4cd96af7 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -4,6 +4,7 @@ use super::wasm_instance_env::WasmInstanceEnv; use super::{Mem, WasmtimeFuel, EPOCH_TICKS_PER_SECOND}; use crate::energy::ReducerBudget; use crate::host::instance_env::InstanceEnv; +use crate::host::module_common::run_describer; use crate::host::wasm_common::module_host_actor::{DescribeError, InitializationError}; use crate::host::wasm_common::*; use crate::util::string_from_utf8_lossy_owned; @@ -158,29 +159,18 @@ pub struct WasmtimeInstance { impl module_host_actor::WasmInstance for WasmtimeInstance { fn extract_descriptions(&mut self) -> Result, DescribeError> { let describer_func_name = DESCRIBE_MODULE_DUNDER; - let store = &mut self.store; - let describer = self.instance.get_func(&mut *store, describer_func_name).unwrap(); - let describer = describer - .typed::(&mut *store) + let describer = self + .instance + .get_typed_func::(&mut self.store, describer_func_name) .map_err(|_| DescribeError::Signature)?; - let sink = store.data_mut().setup_standard_bytes_sink(); - - let start = std::time::Instant::now(); - log::trace!("Start describer \"{describer_func_name}\"..."); - - let result = describer.call(&mut *store, sink); + let sink = self.store.data_mut().setup_standard_bytes_sink(); - let duration = start.elapsed(); - log::trace!("Describer \"{}\" ran: {} us", describer_func_name, duration.as_micros()); - - result - .inspect_err(|err| log_traceback("describer", describer_func_name, err)) - .map_err(DescribeError::RuntimeError)?; + run_describer(log_traceback, || describer.call(&mut self.store, sink))?; // Fetch the bsatn returned by the describer call. - let bytes = store.data_mut().take_standard_bytes_sink(); + let bytes = self.store.data_mut().take_standard_bytes_sink(); Ok(bytes) } @@ -192,11 +182,8 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { #[tracing::instrument(level = "trace", skip_all)] fn call_reducer(&mut self, op: ReducerOp<'_>, budget: ReducerBudget) -> module_host_actor::ExecuteResult { let store = &mut self.store; - // note that ReducerBudget being a u64 is load-bearing here - although we convert budget right back into - // EnergyQuanta at the end of this function, from_energy_quanta clamps it to a u64 range. - // otherwise, we'd return something like `used: i128::MAX - u64::MAX`, which is inaccurate. + // Set the fuel budget in WASM. set_store_fuel(store, budget.into()); - let original_fuel = get_store_fuel(store); store.set_epoch_deadline(EPOCH_TICKS_PER_SECOND); // Prepare sender identity and connection ID, as LITTLE-ENDIAN byte arrays. @@ -230,14 +217,10 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let call_result = call_result.map(|code| handle_error_sink_code(code, error)); + // Compute fuel and heap usage. let remaining_fuel = get_store_fuel(store); - let remaining: ReducerBudget = remaining_fuel.into(); - let energy = module_host_actor::EnergyStats { - used: (budget - remaining).into(), - wasmtime_fuel_used: original_fuel.0 - remaining_fuel.0, - remaining, - }; + let energy = module_host_actor::EnergyStats { budget, remaining }; let memory_allocation = store.data().get_mem().memory.data_size(&store); module_host_actor::ExecuteResult { diff --git a/crates/core/src/messages/control_db.rs b/crates/core/src/messages/control_db.rs index 8299875e339..8f92f350324 100644 --- a/crates/core/src/messages/control_db.rs +++ b/crates/core/src/messages/control_db.rs @@ -75,9 +75,12 @@ pub struct NodeStatus { /// SEE: pub state: String, } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Default, Serialize, Deserialize, serde::Deserialize, +)] #[repr(i32)] pub enum HostType { + #[default] Wasm = 0, Js = 1, } diff --git a/crates/standalone/Cargo.toml b/crates/standalone/Cargo.toml index e719721721d..17b8d547062 100644 --- a/crates/standalone/Cargo.toml +++ b/crates/standalone/Cargo.toml @@ -19,6 +19,7 @@ required-features = [] # Features required to build this target (N/A for lib) [features] # Perfmaps for profiling modules perfmap = ["spacetimedb-core/perfmap"] +unstable = ["spacetimedb-client-api/unstable"] [dependencies] spacetimedb-client-api-messages.workspace = true