Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/core/src/host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use spacetimedb_schema::def::deserialize::ReducerArgsDeserializeSeed;

mod disk_storage;
mod host_controller;
mod module_common;
#[allow(clippy::too_many_arguments)]
pub mod module_host;
pub mod scheduler;
Expand Down
90 changes: 90 additions & 0 deletions crates/core/src/host/module_common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//! Module backend infrastructure, shared between different runtimes,
//! like WASM and V8.

use crate::{
energy::EnergyMonitor,
host::{
module_host::{DynModule, ModuleInfo},
Scheduler,
},
module_host_context::ModuleCreationContext,
replica_context::ReplicaContext,
};
use spacetimedb_lib::{Identity, RawModuleDef};
use spacetimedb_schema::{def::ModuleDef, error::ValidationErrors};
use std::sync::Arc;

/// Builds a [`ModuleCommon`] from a [`RawModuleDef`].
pub fn build_common_module_from_raw(
mcc: ModuleCreationContext,
raw_def: RawModuleDef,
) -> Result<ModuleCommon, ValidationErrors> {
// Perform a bunch of validation on the raw definition.
let def: ModuleDef = raw_def.try_into()?;

let replica_ctx = mcc.replica_ctx;
let log_tx = replica_ctx.logger.tx.clone();

// Note: assigns Reducer IDs based on the alphabetical order of reducer names.
let info = ModuleInfo::new(
def,
replica_ctx.owner_identity,
replica_ctx.database_identity,
mcc.program.hash,
log_tx,
replica_ctx.subscriptions.clone(),
);

Ok(ModuleCommon::new(replica_ctx, mcc.scheduler, info, mcc.energy_monitor))
}

/// Non-runtime-specific parts of a module.
#[derive(Clone)]
pub(crate) struct ModuleCommon {
replica_context: Arc<ReplicaContext>,
scheduler: Scheduler,
info: Arc<ModuleInfo>,
energy_monitor: Arc<dyn EnergyMonitor>,
}

impl ModuleCommon {
/// Returns a new common module.
fn new(
replica_context: Arc<ReplicaContext>,
scheduler: Scheduler,
info: Arc<ModuleInfo>,
energy_monitor: Arc<dyn EnergyMonitor>,
) -> Self {
Self {
replica_context,
scheduler,
info,
energy_monitor,
}
}

/// Returns the module info.
pub fn info(&self) -> Arc<ModuleInfo> {
self.info.clone()
}

/// Returns the identity of the database.
pub fn database_identity(&self) -> &Identity {
&self.info.database_identity
}

/// Returns the energy monitor.
pub fn energy_monitor(&self) -> Arc<dyn EnergyMonitor> {
self.energy_monitor.clone()
}
}

impl DynModule for ModuleCommon {
fn replica_ctx(&self) -> &Arc<ReplicaContext> {
&self.replica_context
}

fn scheduler(&self) -> &Scheduler {
&self.scheduler
}
}
29 changes: 24 additions & 5 deletions crates/core/src/host/v8/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
host::{
module_common::{build_common_module_from_raw, ModuleCommon},
module_host::{DynModule, Module, ModuleInfo, ModuleInstance, ModuleRuntime},
Scheduler,
},
Expand Down Expand Up @@ -44,21 +45,39 @@ impl V8RuntimeInner {
Self { _priv: () }
}

fn make_actor(&self, _: ModuleCreationContext<'_>) -> anyhow::Result<impl Module> {
Err::<JsModule, _>(anyhow!("v8_todo"))
fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result<impl Module> {
#![allow(unreachable_code, unused_variables)]

log::trace!(
"Making new V8 module host actor for database {} with module {}",
mcc.replica_ctx.database_identity,
mcc.program.hash,
);

if true {
return Err::<JsModule, _>(anyhow!("v8_todo"));
}

let desc = todo!();
// Validate and create a common module rom the raw definition.
let common = build_common_module_from_raw(mcc, desc)?;

Ok(JsModule { common })
}
}

#[derive(Clone)]
struct JsModule;
struct JsModule {
common: ModuleCommon,
}

impl DynModule for JsModule {
fn replica_ctx(&self) -> &Arc<ReplicaContext> {
todo!()
self.common.replica_ctx()
}

fn scheduler(&self) -> &Scheduler {
todo!()
self.common.scheduler()
}
}

Expand Down
73 changes: 25 additions & 48 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use bytes::Bytes;
use prometheus::IntGauge;
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
use spacetimedb_schema::auto_migrate::ponder_migrate;
use spacetimedb_schema::def::ModuleDef;
use std::sync::Arc;
use std::time::Duration;

use super::instrumentation::CallTimes;
use crate::database_logger::{self, SystemLogger};
use crate::energy::{EnergyMonitor, EnergyQuanta, ReducerBudget, ReducerFingerprint};
use crate::host::instance_env::InstanceEnv;
use crate::host::module_common::{build_common_module_from_raw, ModuleCommon};
use crate::host::module_host::{
CallReducerParams, DatabaseUpdate, DynModule, EventStatus, Module, ModuleEvent, ModuleFunctionCall, ModuleInfo,
ModuleInstance,
Expand Down Expand Up @@ -81,11 +81,8 @@ pub struct ExecuteResult<E> {
pub(crate) struct WasmModuleHostActor<T: WasmModule> {
module: T::InstancePre,
initial_instance: Option<Box<WasmModuleInstance<T::Instance>>>,
replica_context: Arc<ReplicaContext>,
scheduler: Scheduler,
common: ModuleCommon,
func_names: Arc<FuncNames>,
info: Arc<ModuleInfo>,
energy_monitor: Arc<dyn EnergyMonitor>,
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -126,56 +123,35 @@ pub enum DescribeError {

impl<T: WasmModule> WasmModuleHostActor<T> {
pub fn new(mcc: ModuleCreationContext, module: T) -> Result<Self, InitializationError> {
let ModuleCreationContext {
replica_ctx: replica_context,
scheduler,
program,
energy_monitor,
} = mcc;
let module_hash = program.hash;
log::trace!(
"Making new module host actor for database {} with module {}",
replica_context.database_identity,
module_hash,
"Making new WASM module host actor for database {} with module {}",
mcc.replica_ctx.database_identity,
mcc.program.hash,
);
let log_tx = replica_context.logger.tx.clone();

FuncNames::check_required(|name| module.get_export(name))?;
let mut func_names = FuncNames::default();
module.for_each_export(|sym, ty| func_names.update_from_general(sym, ty))?;
func_names.preinits.sort_unstable();

let func_names = {
FuncNames::check_required(|name| module.get_export(name))?;
let mut func_names = FuncNames::default();
module.for_each_export(|sym, ty| func_names.update_from_general(sym, ty))?;
func_names.preinits.sort_unstable();
func_names
};
let uninit_instance = module.instantiate_pre()?;
let mut instance = uninit_instance.instantiate(
InstanceEnv::new(replica_context.clone(), scheduler.clone()),
&func_names,
)?;
let instance_env = InstanceEnv::new(mcc.replica_ctx.clone(), mcc.scheduler.clone());
let mut instance = uninit_instance.instantiate(instance_env, &func_names)?;

let desc = instance.extract_descriptions()?;
let desc: RawModuleDef = bsatn::from_slice(&desc).map_err(DescribeError::Decode)?;

// Perform a bunch of validation on the raw definition.
let def: ModuleDef = desc.try_into()?;

// Note: assigns Reducer IDs based on the alphabetical order of reducer names.
let info = ModuleInfo::new(
def,
replica_context.owner_identity,
replica_context.database_identity,
module_hash,
log_tx,
replica_context.subscriptions.clone(),
);
// Validate and create a common module rom the raw definition.
let common = build_common_module_from_raw(mcc, desc)?;

let func_names = Arc::new(func_names);
let mut module = WasmModuleHostActor {
module: uninit_instance,
initial_instance: None,
func_names,
info,
replica_context,
scheduler,
energy_monitor,
common,
};
module.initial_instance = Some(Box::new(module.make_from_instance(instance)));

Expand All @@ -187,25 +163,25 @@ impl<T: WasmModule> WasmModuleHostActor<T> {
fn make_from_instance(&self, instance: T::Instance) -> WasmModuleInstance<T::Instance> {
WasmModuleInstance {
instance,
info: self.info.clone(),
energy_monitor: self.energy_monitor.clone(),
info: self.common.info(),
energy_monitor: self.common.energy_monitor(),
// will be updated on the first reducer call
allocated_memory: 0,
metric_wasm_memory_bytes: WORKER_METRICS
.wasm_memory_bytes
.with_label_values(&self.info.database_identity),
.with_label_values(self.common.database_identity()),
trapped: false,
}
}
}

impl<T: WasmModule> DynModule for WasmModuleHostActor<T> {
fn replica_ctx(&self) -> &Arc<ReplicaContext> {
&self.replica_context
self.common.replica_ctx()
}

fn scheduler(&self) -> &Scheduler {
&self.scheduler
self.common.scheduler()
}
}

Expand All @@ -219,11 +195,12 @@ impl<T: WasmModule> Module for WasmModuleHostActor<T> {
}

fn info(&self) -> Arc<ModuleInfo> {
self.info.clone()
self.common.info()
}

fn create_instance(&self) -> Self::Instance {
let env = InstanceEnv::new(self.replica_context.clone(), self.scheduler.clone());
let common = &self.common;
let env = InstanceEnv::new(common.replica_ctx().clone(), common.scheduler().clone());
// this shouldn't fail, since we already called module.create_instance()
// before and it didn't error, and ideally they should be deterministic
let mut instance = self
Expand Down
Loading