Skip to content

Commit ddf26c7

Browse files
committed
v8: wire more of call-reducer
1 parent 413c8cb commit ddf26c7

3 files changed

Lines changed: 66 additions & 15 deletions

File tree

crates/core/src/host/v8/mod.rs

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,25 @@
33
use super::module_common::{build_common_module_from_raw, ModuleCommon};
44
use super::module_host::{CallReducerParams, DynModule, Module, ModuleInfo, ModuleInstance, ModuleRuntime};
55
use super::UpdateDatabaseResult;
6-
use crate::host::wasm_common::module_host_actor::InstanceCommon;
6+
use crate::host::wasm_common::instrumentation::CallTimes;
7+
use crate::host::wasm_common::module_host_actor::{
8+
EnergyStats, ExecuteResult, ExecutionTimings, InstanceCommon, ReducerOp,
9+
};
710
use crate::host::ArgsTuple;
811
use crate::{host::Scheduler, module_host_context::ModuleCreationContext, replica_context::ReplicaContext};
912
use anyhow::anyhow;
13+
use core::time::Duration;
1014
use de::deserialize_js;
1115
use error::{catch_exception, exception_already_thrown, ExcResult, Throwable};
1216
use from_value::cast;
1317
use key_cache::get_or_create_key_cache;
1418
use ser::serialize_to_js;
19+
use spacetimedb_client_api_messages::energy::{EnergyQuanta, ReducerBudget};
1520
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
1621
use spacetimedb_datastore::traits::Program;
1722
use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef};
1823
use std::sync::{Arc, LazyLock};
19-
use v8::{Function, HandleScope, Local, Value};
24+
use v8::{Context, ContextOptions, ContextScope, Function, HandleScope, Isolate, Local, Value};
2025

2126
mod de;
2227
mod error;
@@ -110,7 +115,7 @@ impl Module for JsModule {
110115
}
111116

112117
fn info(&self) -> Arc<ModuleInfo> {
113-
todo!()
118+
self.common.info().clone()
114119
}
115120

116121
fn create_instance(&self) -> Self::Instance {
@@ -137,8 +142,42 @@ impl ModuleInstance for JsInstance {
137142
self.common.update_database(replica_ctx, program, old_module_info)
138143
}
139144

140-
fn call_reducer(&mut self, _tx: Option<MutTxId>, _params: CallReducerParams) -> super::ReducerCallResult {
141-
todo!()
145+
fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> super::ReducerCallResult {
146+
// TODO(centril): snapshots, module->host calls
147+
let mut isolate = Isolate::new(<_>::default());
148+
let scope = &mut HandleScope::new(&mut isolate);
149+
let context = Context::new(scope, ContextOptions::default());
150+
let scope = &mut ContextScope::new(scope, context);
151+
152+
self.common.call_reducer_with_tx(
153+
&self.replica_ctx.clone(),
154+
tx,
155+
params,
156+
// TODO(centril): logging.
157+
|_ty, _fun, _err| {},
158+
|tx, op, _budget| {
159+
let call_result = call_call_reducer_from_op(scope, op);
160+
// TODO(centril): energy metrering.
161+
let energy = EnergyStats {
162+
used: EnergyQuanta::ZERO,
163+
wasmtime_fuel_used: 0,
164+
remaining: ReducerBudget::ZERO,
165+
};
166+
// TODO(centril): timings.
167+
let timings = ExecutionTimings {
168+
total_duration: Duration::ZERO,
169+
wasm_instance_env_call_times: CallTimes::new(),
170+
};
171+
let exec_result = ExecuteResult {
172+
energy,
173+
timings,
174+
// TODO(centril): memory allocation.
175+
memory_allocation: 0,
176+
call_result,
177+
};
178+
(tx, exec_result)
179+
},
180+
)
142181
}
143182
}
144183

@@ -163,13 +202,25 @@ fn call_free_fun<'scope>(
163202
fun.call(scope, receiver, args).ok_or_else(exception_already_thrown)
164203
}
165204

205+
// Calls the `__call_reducer__` function on the global proxy object using `op`.
206+
fn call_call_reducer_from_op(scope: &mut HandleScope<'_>, op: ReducerOp<'_>) -> anyhow::Result<Result<(), Box<str>>> {
207+
call_call_reducer(
208+
scope,
209+
op.id.into(),
210+
op.caller_identity,
211+
op.caller_connection_id,
212+
op.timestamp.to_micros_since_unix_epoch(),
213+
op.args,
214+
)
215+
}
216+
166217
// Calls the `__call_reducer__` function on the global proxy object.
167218
fn call_call_reducer(
168219
scope: &mut HandleScope<'_>,
169220
reducer_id: u32,
170221
sender: &Identity,
171222
conn_id: &ConnectionId,
172-
timestamp: u64,
223+
timestamp: i64,
173224
reducer_args: &ArgsTuple,
174225
) -> anyhow::Result<Result<(), Box<str>>> {
175226
// Get a cached version of the `__call_reducer__` property.

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use bytes::Bytes;
21
use prometheus::{Histogram, IntCounter, IntGauge};
32
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
43
use spacetimedb_schema::auto_migrate::ponder_migrate;
@@ -16,7 +15,7 @@ use crate::host::module_host::{
1615
CallReducerParams, DatabaseUpdate, DynModule, EventStatus, Module, ModuleEvent, ModuleFunctionCall, ModuleInfo,
1716
ModuleInstance,
1817
};
19-
use crate::host::{ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, UpdateDatabaseResult};
18+
use crate::host::{ArgsTuple, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, UpdateDatabaseResult};
2019
use crate::identity::Identity;
2120
use crate::messages::control_db::HostType;
2221
use crate::module_host_context::ModuleCreationContext;
@@ -333,7 +332,7 @@ impl InstanceCommon {
333332
/// The method also performs various measurements and records energy usage,
334333
/// as well as broadcasting a [`ModuleEvent`] containing information about
335334
/// the outcome of the call.
336-
fn call_reducer_with_tx(
335+
pub(crate) fn call_reducer_with_tx(
337336
&mut self,
338337
replica_ctx: &ReplicaContext,
339338
tx: Option<MutTxId>,
@@ -379,7 +378,7 @@ impl InstanceCommon {
379378
caller_identity: &caller_identity,
380379
caller_connection_id: &caller_connection_id,
381380
timestamp,
382-
arg_bytes: args.get_bsatn().clone(),
381+
args: &args,
383382
};
384383

385384
let workload = Workload::Reducer(ReducerContext::from(op.clone()));
@@ -626,8 +625,8 @@ pub struct ReducerOp<'a> {
626625
pub caller_identity: &'a Identity,
627626
pub caller_connection_id: &'a ConnectionId,
628627
pub timestamp: Timestamp,
629-
/// The BSATN-serialized arguments passed to the reducer.
630-
pub arg_bytes: Bytes,
628+
/// The arguments passed to the reducer.
629+
pub args: &'a ArgsTuple,
631630
}
632631

633632
impl From<ReducerOp<'_>> for execution_context::ReducerContext {
@@ -638,15 +637,15 @@ impl From<ReducerOp<'_>> for execution_context::ReducerContext {
638637
caller_identity,
639638
caller_connection_id,
640639
timestamp,
641-
arg_bytes,
640+
args,
642641
}: ReducerOp<'_>,
643642
) -> Self {
644643
Self {
645644
name: name.to_owned(),
646645
caller_identity: *caller_identity,
647646
caller_connection_id: *caller_connection_id,
648647
timestamp,
649-
arg_bsatn: arg_bytes.clone(),
648+
arg_bsatn: args.get_bsatn().clone(),
650649
}
651650
}
652651
}

crates/core/src/host/wasmtime/wasmtime_module.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,8 @@ impl module_host_actor::WasmInstance for WasmtimeInstance {
204204
let [conn_id_0, conn_id_1] = bytemuck::must_cast(op.caller_connection_id.as_le_byte_array());
205205

206206
// Prepare arguments to the reducer + the error sink & start timings.
207-
let (args_source, errors_sink) = store.data_mut().start_reducer(op.name, op.arg_bytes, op.timestamp);
207+
let args_bytes = op.args.get_bsatn().clone();
208+
let (args_source, errors_sink) = store.data_mut().start_reducer(op.name, args_bytes, op.timestamp);
208209

209210
let call_result = self.call_reducer.call(
210211
&mut *store,

0 commit comments

Comments
 (0)