diff --git a/Cargo.lock b/Cargo.lock index 862a2f4d67c..4e8857cbffb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -899,6 +899,7 @@ dependencies = [ "env_logger 0.10.2", "log", "regex", + "reqwest 0.12.24", "serde_json", "spacetimedb-guard", "tempfile", diff --git a/crates/client-api-messages/src/energy.rs b/crates/client-api-messages/src/energy.rs index 955a1566b24..f03cdefd673 100644 --- a/crates/client-api-messages/src/energy.rs +++ b/crates/client-api-messages/src/energy.rs @@ -17,12 +17,12 @@ impl EnergyQuanta { pub const ZERO: Self = EnergyQuanta { quanta: 0 }; #[inline] - pub fn new(quanta: u128) -> Self { + pub const fn new(quanta: u128) -> Self { Self { quanta } } #[inline] - pub fn get(&self) -> u128 { + pub const fn get(&self) -> u128 { self.quanta } @@ -121,46 +121,52 @@ impl fmt::Debug for EnergyBalance { } } -/// A measure of energy representing the energy budget for a reducer or any callable function. +/// A measure of the energy budget for a reducer or any callable function. /// -/// In contrast to [`EnergyQuanta`], this is represented by a 64-bit integer. This makes energy handling -/// for reducers easier, while still providing a unlikely-to-ever-be-reached maximum value (e.g. for wasmtime: -/// `(u64::MAX eV / 1000 eV/instruction) * 3 ns/instruction = 640 days`) -#[derive(Copy, Clone, From, Add, Sub, AddAssign, SubAssign)] +/// This unit is not directly convertible to `EnergyQuanta`. It is currently +/// 1:1 to wasmtime fuel, and we intend to treat it as representing a CPU +/// instruction. +#[derive(Copy, Clone, From, Add, Sub, AddAssign, SubAssign, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct FunctionBudget(u64); impl FunctionBudget { - // 1 second of wasm runtime is roughly 2 TeV, so this is - // roughly 1 minute of wasm runtime - pub const DEFAULT_BUDGET: Self = FunctionBudget(120_000_000_000_000); + /// We've generally assumed that 1 second of wasm runtime uses 2_000_000_000 fuel. + /// Currently, 1 wasmtime fuel unit is equivalent to 1 wasm instructions. Assuming + /// 1 wasm instruction compiles to 1 CPU instruction (which it doesn't), this implies + /// a 1 instruction-per-cycle abstract machine with a CPU frequency of 2GHz. + pub const PER_EXECUTION_SEC: Self = FunctionBudget(2_000_000_000); + + pub const PER_EXECUTION_NANOSEC: Self = Self(Self::PER_EXECUTION_SEC.0 / 1_000_000_000); + + /// Roughly 1 minute of runtime. + pub const DEFAULT_BUDGET: Self = FunctionBudget(Self::PER_EXECUTION_SEC.0 * 60); pub const ZERO: Self = FunctionBudget(0); pub const MAX: Self = FunctionBudget(u64::MAX); - pub fn new(v: u64) -> Self { + pub const fn new(v: u64) -> Self { Self(v) } - pub fn get(&self) -> u64 { + pub const fn get(&self) -> u64 { self.0 } - /// Convert from [`EnergyQuanta`]. Returns `None` if `energy` is too large to be represented. - pub fn from_energy(energy: EnergyQuanta) -> Option { - energy.get().try_into().ok().map(Self) - } -} - -impl From for EnergyQuanta { - fn from(value: FunctionBudget) -> Self { - EnergyQuanta::new(value.0.into()) + /// Convert `FunctionBudget` to `Duration` using the conversion factor + /// [`FunctionBudget::PER_EXECUTION_SEC`]. + pub fn to_duration(self) -> Duration { + Duration::from_nanos(self.0 / Self::PER_EXECUTION_NANOSEC.0) } -} -impl fmt::Debug for FunctionBudget { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("ReducerBudget") - .field(&EnergyQuanta::from(*self)) - .finish() + /// Convert `Duration` to `FunctionBudget` using the conversion factor + /// [`FunctionBudget::PER_EXECUTION_SEC`]. + /// + /// Returns `None` on overflow, which means that dur > 106.75 days. + // in order for duration_nanos * budget_per_ns >= u64::MAX: + // duration_nanos >= u64::MAX / budget_per_ns + // duration_nanos >= (9223372036854775 ns = 106.75 days) + pub fn from_duration(dur: Duration) -> Option { + let duration_nanos = u64::try_from(dur.as_nanos()).ok()?; + duration_nanos.checked_mul(Self::PER_EXECUTION_NANOSEC.get()).map(Self) } } diff --git a/crates/client-api/src/auth.rs b/crates/client-api/src/auth.rs index 03c122510aa..fc777c8b105 100644 --- a/crates/client-api/src/auth.rs +++ b/crates/client-api/src/auth.rs @@ -12,7 +12,7 @@ use spacetimedb::auth::token_validation::{ new_validator, DefaultValidator, TokenSigner, TokenValidationError, TokenValidator, }; use spacetimedb::auth::JwtKeys; -use spacetimedb::energy::EnergyQuanta; +use spacetimedb::energy::FunctionBudget; use spacetimedb::identity::Identity; use spacetimedb_data_structures::map::HashMap; use std::time::{Duration, SystemTime}; @@ -518,7 +518,7 @@ impl headers::Header for SpacetimeIdentityToken { } } -pub struct SpacetimeEnergyUsed(pub EnergyQuanta); +pub struct SpacetimeEnergyUsed(pub FunctionBudget); impl headers::Header for SpacetimeEnergyUsed { fn name() -> &'static http::HeaderName { static NAME: http::HeaderName = http::HeaderName::from_static("spacetime-energy-used"); @@ -530,9 +530,7 @@ impl headers::Header for SpacetimeEnergyUsed { } fn encode>(&self, values: &mut E) { - let mut buf = itoa::Buffer::new(); - let value = buf.format(self.0.get()); - values.extend([value.try_into().unwrap()]); + values.extend([self.0.get().into()]); } } diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 60576cf752b..893c0ec9c15 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -202,7 +202,7 @@ pub async fn call( let (status, body) = reducer_outcome_response(&owner_identity, &reducer, result.outcome); Ok(( status, - TypedHeader(SpacetimeEnergyUsed(result.energy_used)), + TypedHeader(SpacetimeEnergyUsed(result.execution_budget_used)), TypedHeader(SpacetimeExecutionDurationMicros(result.execution_duration)), body, ) diff --git a/crates/core/src/client/message_handlers_v1.rs b/crates/core/src/client/message_handlers_v1.rs index 0ab284b2da1..60d8eafdb28 100644 --- a/crates/core/src/client/message_handlers_v1.rs +++ b/crates/core/src/client/message_handlers_v1.rs @@ -1,6 +1,6 @@ use super::messages::{SubscriptionUpdateMessage, SwitchedServerMessage, ToProtocol, TransactionUpdateMessage}; use super::{ClientConnection, DataMessage, MessageHandleError, Protocol}; -use crate::energy::EnergyQuanta; +use crate::energy::FunctionBudget; use crate::host::module_host::{EventStatus, ModuleEvent, ModuleFunctionCall}; use crate::host::{FunctionArgs, ReducerId}; use crate::identity::Identity; @@ -139,7 +139,7 @@ impl MessageExecutionError { }, status: EventStatus::FailedInternal(format!("{:#}", err)), reducer_return_value: None, - energy_quanta_used: EnergyQuanta::ZERO, + execution_budget_used: FunctionBudget::ZERO, host_execution_duration: Duration::ZERO, request_id: Some(RequestId::default()), timer: None, diff --git a/crates/core/src/client/messages.rs b/crates/core/src/client/messages.rs index 123c1c75d4a..10183467029 100644 --- a/crates/core/src/client/messages.rs +++ b/crates/core/src/client/messages.rs @@ -7,6 +7,7 @@ use crate::subscription::websocket_building::{brotli_compress, decide_compressio use bytes::{BufMut, Bytes, BytesMut}; use bytestring::ByteString; use derive_more::From; +use spacetimedb_client_api_messages::energy::EnergyQuanta; use spacetimedb_client_api_messages::websocket::common::{self as ws_common, RowListLen as _}; use spacetimedb_client_api_messages::websocket::v1::{self as ws_v1}; use spacetimedb_client_api_messages::websocket::v2 as ws_v2; @@ -434,7 +435,12 @@ impl ToProtocol for TransactionUpdateMessage { args, request_id, }, - energy_quanta_used: event.energy_quanta_used, + // This conversion is lying. We used to tell the client how much eV a transaction + // used, but now the database just tracks cpu usage, and it's converted to energy + // elsewhere. So, we just pretend that this is `EnergyQuanta` when it's actually + // a different unit, and it doesn't really matter to the client anyway. + // TODO(noa): maybe we could just have this be zero, unconditionally? + energy_quanta_used: EnergyQuanta::new(event.execution_budget_used.get().into()), total_host_execution_duration: event.host_execution_duration.into(), caller_connection_id: event.caller_connection_id.unwrap_or(ConnectionId::ZERO), }; diff --git a/crates/core/src/energy.rs b/crates/core/src/energy.rs index c66378e07a5..446d36c9ced 100644 --- a/crates/core/src/energy.rs +++ b/crates/core/src/energy.rs @@ -17,7 +17,7 @@ pub trait EnergyMonitor: Send + Sync + 'static { fn record_reducer( &self, fingerprint: &FunctionFingerprint<'_>, - energy_used: EnergyQuanta, + execution_budget_used: FunctionBudget, execution_duration: Duration, ); fn record_disk_usage(&self, database: &Database, replica_id: u64, disk_usage: u64, period: Duration); @@ -36,7 +36,7 @@ impl EnergyMonitor for NullEnergyMonitor { fn record_reducer( &self, _fingerprint: &FunctionFingerprint<'_>, - _energy_used: EnergyQuanta, + _execution_budget_used: FunctionBudget, _execution_duration: Duration, ) { } diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 2fec6f79ebf..f2092412639 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -9,7 +9,7 @@ use crate::database_logger::DatabaseLogger; use crate::db::persistence::PersistenceProvider; use crate::db::relational_db::{self, spawn_view_cleanup_loop, DiskSizeFn, RelationalDB, Txdata}; use crate::db::{self, spawn_tx_metrics_recorder}; -use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor}; +use crate::energy::{EnergyMonitor, FunctionBudget, NullEnergyMonitor}; use crate::host::v8::V8Runtime; use crate::host::ProcedureCallError; use crate::messages::control_db::{Database, HostType}; @@ -148,7 +148,7 @@ impl HostRuntimes { #[derive(Clone, Debug)] pub struct ReducerCallResult { pub outcome: ReducerOutcome, - pub energy_used: EnergyQuanta, + pub execution_budget_used: FunctionBudget, pub execution_duration: Duration, } diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 6b1c5054cde..9c24c5b3a11 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -13,7 +13,7 @@ use core::mem; use futures::TryFutureExt; use parking_lot::{Mutex, MutexGuard}; use smallvec::SmallVec; -use spacetimedb_client_api_messages::energy::EnergyQuanta; +use spacetimedb_client_api_messages::energy::FunctionBudget; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::execution_context::Workload; use spacetimedb_datastore::locking_tx_datastore::state_view::StateView; @@ -793,7 +793,7 @@ impl InstanceEnv { request_id: None, timer: None, // The procedure will pick up the tab for the energy. - energy_quanta_used: EnergyQuanta { quanta: 0 }, + execution_budget_used: FunctionBudget::ZERO, host_execution_duration: Duration::from_millis(0), }; // Commit the tx and broadcast it. diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 70310d7baec..77533d952e4 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -6,7 +6,6 @@ use crate::client::messages::{OneOffQueryResponseMessage, ProcedureResultMessage use crate::client::{ClientActorId, ClientConnectionSender, WsVersion}; use crate::database_logger::{DatabaseLogger, LogLevel, Record}; use crate::db::relational_db::{RelationalDB, Tx}; -use crate::energy::EnergyQuanta; use crate::error::DBError; use crate::estimation::{check_row_limit, estimate_rows_scanned}; use crate::hash::Hash; @@ -212,7 +211,7 @@ pub struct ModuleEvent { pub function_call: ModuleFunctionCall, pub status: EventStatus, pub reducer_return_value: Option, - pub energy_quanta_used: EnergyQuanta, + pub execution_budget_used: FunctionBudget, pub host_execution_duration: Duration, pub request_id: Option, pub timer: Option, @@ -1542,7 +1541,7 @@ impl From for ViewOutcome { pub struct ViewCallResult { pub outcome: ViewOutcome, pub tx: MutTxId, - pub energy_used: FunctionBudget, + pub execution_budget_used: FunctionBudget, pub total_duration: Duration, pub abi_duration: Duration, } @@ -1551,7 +1550,7 @@ impl fmt::Debug for ViewCallResult { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ViewCallResult") .field("outcome", &self.outcome) - .field("energy_used", &self.energy_used) + .field("execution_budget_used", &self.execution_budget_used) .field("total_duration", &self.total_duration) .field("abi_duration", &self.abi_duration) .finish() @@ -1562,7 +1561,7 @@ impl ViewCallResult { pub fn default(tx: MutTxId) -> Self { Self { outcome: ViewOutcome::Success, - energy_used: FunctionBudget::ZERO, + execution_budget_used: FunctionBudget::ZERO, total_duration: Duration::ZERO, abi_duration: Duration::ZERO, tx, @@ -2997,7 +2996,7 @@ impl ModuleHost { // Increment execution stats tx = result.tx; outcome = result.outcome; - energy_used += result.energy_used; + energy_used += result.execution_budget_used; total_duration += result.total_duration; abi_duration += result.abi_duration; trapped |= trap; @@ -3011,7 +3010,7 @@ impl ModuleHost { ViewCallResult { outcome, tx, - energy_used, + execution_budget_used: energy_used, total_duration, abi_duration, }, diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index f001a19ab53..45547ab16ed 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -10,7 +10,7 @@ use anyhow::anyhow; use core::time::Duration; use futures::{FutureExt, StreamExt}; use rustc_hash::FxHashMap; -use spacetimedb_client_api_messages::energy::EnergyQuanta; +use spacetimedb_client_api_messages::energy::FunctionBudget; use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload}; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::system_tables::{StScheduledFields, ST_SCHEDULED_ID}; @@ -693,7 +693,7 @@ fn refresh_views_then_commit_and_broadcast( status, reducer_return_value: None, //Keeping them 0 as it is internal transaction, not by reducer - energy_quanta_used: EnergyQuanta { quanta: 0 }, + execution_budget_used: FunctionBudget::ZERO, host_execution_duration: Duration::from_millis(0), request_id: None, timer: None, diff --git a/crates/core/src/host/v8/budget.rs b/crates/core/src/host/v8/budget.rs index 5ffefc64741..e37fbe4f8dc 100644 --- a/crates/core/src/host/v8/budget.rs +++ b/crates/core/src/host/v8/budget.rs @@ -7,7 +7,6 @@ //! so we have to invent one using time and timeouts. use super::env_on_isolate; -use crate::host::wasm_common::module_host_actor::EnergyStats; use crate::host::wasmtime::{epoch_ticker, ticks_in_duration}; use core::ptr; use core::sync::atomic::Ordering; @@ -115,14 +114,6 @@ fn budget_to_duration(_budget: FunctionBudget) -> Duration { Duration::MAX } -/// Returns [`EnergyStats`] for a reducer given its `budget` -/// and the `duration` it took to execute. -pub(super) fn energy_from_elapsed(budget: FunctionBudget, duration: Duration) -> EnergyStats { - let used = duration_to_budget(duration); - let remaining = budget - used; - EnergyStats { budget, remaining } -} - /// Converts a [`Duration`] to a [`ReducerBudget`]. fn duration_to_budget(_duration: Duration) -> FunctionBudget { // TODO(v8): This is fake logic that allows minimum energy usage. diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 433fe5e2af8..b595500eb93 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -53,7 +53,6 @@ //! v //! SendWorker //! ``` -use self::budget::energy_from_elapsed; use self::error::{ catch_exception, exception_already_thrown, log_traceback, ErrorOrException, ExcResult, ExceptionThrown, PinTryCatch, Throwable, @@ -81,7 +80,7 @@ use crate::host::module_host::{ use crate::host::scheduler::{CallScheduledFunctionResult, ScheduledFunctionParams}; use crate::host::wasm_common::instrumentation::CallTimes; use crate::host::wasm_common::module_host_actor::{ - AnonymousViewOp, DescribeError, ExecutionError, ExecutionResult, ExecutionStats, ExecutionTimings, + AnonymousViewOp, DescribeError, EnergyStats, ExecutionError, ExecutionResult, ExecutionStats, ExecutionTimings, HttpHandlerExecuteResult, HttpHandlerOp, InstanceCommon, InstanceOp, ProcedureExecuteResult, ProcedureOp, ReducerExecuteResult, ReducerOp, ViewExecuteResult, ViewOp, WasmInstance, }; @@ -2034,7 +2033,12 @@ where let timings = env.finish_funcall(); // Derive energy stats. - let energy = energy_from_elapsed(budget, timings.total_duration); + let energy_used = FunctionBudget::from_duration(timings.total_duration) + // The magnitude that `total_duration` would have to have to cause an overflow here is + // large enough that we don't really have to worry about it (see `from_duration` docs). + // Still, better to saturate than wrap. + .unwrap_or(FunctionBudget::MAX); + let energy = EnergyStats::from_used(budget, energy_used); // Reuse the last periodic heap sample instead of querying V8 on every call. // We use this statistic for energy tracking, so eventual consistency is fine. diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 5a1b79f573c..4489fb9f8bd 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -34,7 +34,6 @@ use core::future::Future; use core::time::Duration; use prometheus::{Histogram, IntCounter, IntGauge}; use spacetimedb_auth::identity::ConnectionAuthCtx; -use spacetimedb_client_api_messages::energy::EnergyQuanta; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::error::{DatastoreError, ViewError}; use spacetimedb_datastore::execution_context::{self, ReducerContext, Workload}; @@ -118,9 +117,15 @@ impl EnergyStats { remaining: FunctionBudget::ZERO, }; + pub fn from_used(budget: FunctionBudget, used: FunctionBudget) -> Self { + // TODO: should this be a saturating_sub? + let remaining = budget - used; + Self { budget, remaining } + } + /// Returns the used energy amount. fn used(&self) -> FunctionBudget { - (self.budget.get() - self.remaining.get()).into() + self.budget - self.remaining } } @@ -240,7 +245,7 @@ pub struct ExecutionStats { } impl ExecutionStats { - fn energy_used(&self) -> FunctionBudget { + fn execution_budget_used(&self) -> FunctionBudget { self.energy.used() } @@ -674,7 +679,7 @@ impl InstanceCommon { log::info!("Database updated, {} host-type={}", stdb.database_identity(), host_type); let succeed = |info: Arc, - energy_quanta_used: EnergyQuanta, + execution_budget_used: FunctionBudget, host_execution_duration: Duration, tx: MutTxId| -> TransactionOffset { @@ -685,7 +690,7 @@ impl InstanceCommon { function_call: ModuleFunctionCall::update(), status: EventStatus::Committed(DatabaseUpdate::default()), reducer_return_value: None, - energy_quanta_used, + execution_budget_used, host_execution_duration, request_id: None, timer: None, @@ -699,7 +704,7 @@ impl InstanceCommon { let res: UpdateDatabaseResult = match res { crate::db::update::UpdateResult::Success => { - let tx_offset = succeed(self.info.clone(), FunctionBudget::ZERO.into(), Duration::ZERO, tx); + let tx_offset = succeed(self.info.clone(), FunctionBudget::ZERO, Duration::ZERO, tx); UpdateDatabaseResult::UpdatePerformed { tx_offset, durable_offset, @@ -721,7 +726,8 @@ impl InstanceCommon { stdb.report_mut_tx_metrics(reducer, tx_metrics, None); UpdateDatabaseResult::ErrorExecutingMigration(anyhow::anyhow!(msg)) } else { - let tx_offset = succeed(self.info.clone(), out.energy_used.into(), out.total_duration, tx); + let tx_offset = + succeed(self.info.clone(), out.execution_budget_used, out.total_duration, tx); UpdateDatabaseResult::UpdatePerformed { tx_offset, durable_offset, @@ -729,7 +735,7 @@ impl InstanceCommon { } } crate::db::update::UpdateResult::RequiresClientDisconnect => { - let tx_offset = succeed(self.info.clone(), FunctionBudget::ZERO.into(), Duration::ZERO, tx); + let tx_offset = succeed(self.info.clone(), FunctionBudget::ZERO, Duration::ZERO, tx); UpdateDatabaseResult::UpdatePerformedWithClientDisconnect { tx_offset, durable_offset, @@ -1058,7 +1064,7 @@ impl InstanceCommon { }; // Account for view execution in reducer reporting metrics - vm_metrics.report_energy_used(out.energy_used); + vm_metrics.report_execution_budget_used(out.execution_budget_used); vm_metrics.report_total_duration(out.total_duration); vm_metrics.report_abi_duration(out.abi_duration); @@ -1071,7 +1077,7 @@ impl InstanceCommon { reducer_return_value = None; } - let energy_quanta_used = result.stats.energy_used().into(); + let execution_budget_used = result.stats.execution_budget_used(); let total_duration = result.stats.total_duration(); let event = ModuleEvent { @@ -1085,7 +1091,7 @@ impl InstanceCommon { }, status, reducer_return_value, - energy_quanta_used, + execution_budget_used, host_execution_duration: total_duration, request_id, timer, @@ -1094,7 +1100,7 @@ impl InstanceCommon { let res = ReducerCallResult { outcome: ReducerOutcome::from(&event.status), - energy_used: energy_quanta_used, + execution_budget_used, execution_duration: total_duration, }; @@ -1137,13 +1143,12 @@ impl InstanceCommon { let result = vm_call_function(budget); let stats: &ExecutionStats = result.as_ref(); - let energy_used = stats.energy.used(); - let energy_quanta_used = energy_used.into(); + let execution_budget_used = stats.energy.used(); let timings = &stats.timings; let memory_allocation = stats.memory_allocation; self.energy_monitor - .record_reducer(&energy_fingerprint, energy_quanta_used, timings.total_duration); + .record_reducer(&energy_fingerprint, execution_budget_used, timings.total_duration); if self.allocated_memory != memory_allocation { self.metric_wasm_memory_bytes.set(memory_allocation as i64); self.allocated_memory = memory_allocation; @@ -1153,7 +1158,7 @@ impl InstanceCommon { function_span .record("timings.total_duration", tracing::field::debug(timings.total_duration)) - .record("energy.used", tracing::field::debug(energy_used)); + .record("energy.used", tracing::field::debug(execution_budget_used)); result } @@ -1396,7 +1401,7 @@ impl InstanceCommon { let res = ViewCallResult { outcome, tx, - energy_used: result.stats.energy_used(), + execution_budget_used: result.stats.execution_budget_used(), total_duration: result.stats.total_duration(), abi_duration: result.stats.abi_duration(), }; @@ -1448,7 +1453,7 @@ impl InstanceCommon { out.tx = result.tx; out.outcome = result.outcome; - out.energy_used += result.energy_used; + out.execution_budget_used += result.execution_budget_used; out.total_duration += result.total_duration; out.abi_duration += result.abi_duration; @@ -1651,8 +1656,8 @@ impl VmMetrics { self.reducer_plus_query_duration.clone().with_timer(start) } - fn report_energy_used(&self, energy_used: FunctionBudget) { - self.reducer_fuel_used.inc_by(energy_used.get()); + fn report_execution_budget_used(&self, execution_budget_used: FunctionBudget) { + self.reducer_fuel_used.inc_by(execution_budget_used.get()); } fn report_total_duration(&self, duration: Duration) { @@ -1665,10 +1670,10 @@ impl VmMetrics { /// Reports some VM metrics. fn report(&self, stats: &ExecutionStats) { - let energy_used = stats.energy.used(); + let execution_budget_used = stats.energy.used(); let reducer_duration = stats.timings.total_duration; let abi_time = stats.timings.wasm_instance_env_call_times.sum(); - self.report_energy_used(energy_used); + self.report_execution_budget_used(execution_budget_used); self.report_total_duration(reducer_duration); self.report_abi_duration(abi_time); } diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index 45eaaab56fc..e8e4a3d4fc2 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -2,7 +2,7 @@ use self::wasm_instance_env::WasmInstanceEnv; use super::wasm_common::module_host_actor::{InitializationError, WasmModuleHostActor, WasmModuleInstance}; use super::wasm_common::{abi, ModuleCreationError}; use crate::config::WasmConfig; -use crate::energy::{EnergyQuanta, FunctionBudget}; +use crate::energy::FunctionBudget; use crate::error::NodesError; use crate::module_host_context::ModuleCreationContext; use crate::util::jobs::AllocatedJobCore; @@ -213,7 +213,7 @@ impl WasmtimeFuel {} impl From for WasmtimeFuel { fn from(v: FunctionBudget) -> Self { - // ReducerBudget being u64 is load-bearing here - if it was u128 and v was ReducerBudget::MAX, + // FunctionBudget being u64 is load-bearing here - if it was u128 and v was FunctionBudget::MAX, // truncating this result would mean that with set_store_fuel(budget.into()), get_store_fuel() // would be wildly different than the original `budget`, and the energy usage for the reducer // would be u64::MAX even if it did nothing. ask how I know. @@ -227,12 +227,6 @@ impl From for FunctionBudget { } } -impl From for EnergyQuanta { - fn from(fuel: WasmtimeFuel) -> Self { - EnergyQuanta::new(u128::from(fuel.0)) - } -} - pub trait WasmPointee { type Pointer; fn write_to(self, mem: &mut MemView, ptr: Self::Pointer) -> Result<(), MemError>; diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index c89131fa08b..8933335c1c5 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -979,7 +979,6 @@ fn get_memory_size(store: &Store) -> usize { #[cfg(test)] mod tests { use super::*; - use crate::energy::EnergyQuanta; #[test] fn test_fuel() { @@ -990,8 +989,8 @@ mod tests { let budget = FunctionBudget::DEFAULT_BUDGET; set_store_fuel(&mut store, budget.into()); store.set_fuel(store.get_fuel().unwrap() - 10).unwrap(); - let remaining: EnergyQuanta = get_store_fuel(&store).into(); - let used = EnergyQuanta::from(budget) - remaining; - assert_eq!(used, EnergyQuanta::new(10)); + let remaining: FunctionBudget = get_store_fuel(&store).into(); + let used = budget - remaining; + assert_eq!(used, FunctionBudget::new(10)); } } diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 2ecf80ad11a..6ff2cd05566 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -3,7 +3,7 @@ use std::time::Duration; use super::ast::SchemaViewer; use crate::db::relational_db::RelationalDB; -use crate::energy::EnergyQuanta; +use crate::energy::FunctionBudget; use crate::error::DBError; use crate::estimation::{check_row_limit, estimate_rows_scanned}; use crate::host::module_host::{ @@ -204,7 +204,7 @@ fn run_inner( }, status: EventStatus::Committed(DatabaseUpdate::default()), reducer_return_value: None, - energy_quanta_used: EnergyQuanta::ZERO, + execution_budget_used: FunctionBudget::ZERO, host_execution_duration: Duration::ZERO, request_id: None, timer: None, diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index d748fdd09ab..bffa2b7c314 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1952,7 +1952,7 @@ mod tests { use futures::FutureExt; use itertools::Itertools; use pretty_assertions::assert_matches; - use spacetimedb_client_api_messages::energy::EnergyQuanta; + use spacetimedb_client_api_messages::energy::FunctionBudget; use spacetimedb_client_api_messages::websocket::{common::RowListLen as _, v1 as ws_v1, v2 as ws_v2}; use spacetimedb_commitlog::{commitlog, repo}; use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap}; @@ -2158,7 +2158,7 @@ mod tests { function_call: ModuleFunctionCall::default(), status: EventStatus::Committed(DatabaseUpdate::default()), reducer_return_value: None, - energy_quanta_used: EnergyQuanta { quanta: 0 }, + execution_budget_used: FunctionBudget::ZERO, host_execution_duration: Duration::from_millis(0), request_id: None, timer: None, diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 03c3392942f..0700846fc74 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -2211,7 +2211,7 @@ mod tests { use crate::{ client::{ClientActorId, ClientConfig, ClientConnectionSender, ClientName}, db::relational_db::{tests_utils::TestDB, RelationalDB}, - energy::EnergyQuanta, + energy::FunctionBudget, host::{ module_host::{DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall}, ArgsTuple, @@ -3183,7 +3183,7 @@ mod tests { }, status: EventStatus::Committed(DatabaseUpdate::default()), reducer_return_value: None, - energy_quanta_used: EnergyQuanta::ZERO, + execution_budget_used: FunctionBudget::ZERO, host_execution_duration: Duration::default(), request_id: None, timer: None, diff --git a/tools/ci/Cargo.toml b/tools/ci/Cargo.toml index 79a3b96e24b..972a0bf9d61 100644 --- a/tools/ci/Cargo.toml +++ b/tools/ci/Cargo.toml @@ -9,6 +9,7 @@ anyhow.workspace = true chrono = { workspace = true, features=["clock"] } clap.workspace = true regex.workspace = true +reqwest = { workspace = true, features = ["blocking"] } serde_json.workspace = true duct.workspace = true tempfile.workspace = true diff --git a/tools/ci/src/keynote_bench.rs b/tools/ci/src/keynote_bench.rs index fc8f56fd282..5a217659454 100644 --- a/tools/ci/src/keynote_bench.rs +++ b/tools/ci/src/keynote_bench.rs @@ -17,6 +17,10 @@ const BENCH_CONCURRENCY: &str = "64"; const MAX_INFLIGHT_PER_WORKER: &str = "96"; const SEED_ACCOUNTS: &str = "100000"; const SEED_INITIAL_BALANCE: &str = "1000000000000"; +const TRANSFER_REDUCER: &str = "transfer"; +const REDUCER_FUEL_METRIC: &str = "reducer_wasmtime_fuel_used"; +const REDUCER_FUEL_METRIC_TOTAL: &str = "reducer_wasmtime_fuel_used_total"; +const MAX_FUEL_RATIO: f64 = 2.0; struct BenchmarkModule { label: &'static str, @@ -24,6 +28,11 @@ struct BenchmarkModule { min_tps: f64, } +struct BenchmarkResult { + label: &'static str, + transfer_fuel_total: f64, +} + const BENCHMARK_MODULES: &[BenchmarkModule] = &[ BenchmarkModule { label: "TypeScript", @@ -43,14 +52,26 @@ pub fn run() -> Result<()> { let cli_config_dir = tempfile::tempdir().context("failed to create temporary CLI config directory")?; let cli_config_path = cli_config_dir.path().join("config.toml"); + let mut results = Vec::with_capacity(BENCHMARK_MODULES.len()); for module in BENCHMARK_MODULES { - run_module_benchmark(module, &cli_path, &cli_config_path, &server.host_url)?; + results.push(run_module_benchmark( + module, + &cli_path, + &cli_config_path, + &server.host_url, + )?); } + check_transfer_fuel_ratio(&results)?; Ok(()) } -fn run_module_benchmark(module: &BenchmarkModule, cli_path: &Path, config_path: &Path, server_url: &str) -> Result<()> { +fn run_module_benchmark( + module: &BenchmarkModule, + cli_path: &Path, + config_path: &Path, + server_url: &str, +) -> Result { eprintln!( "Running keynote benchmark against {} module ({})...", module.label, module.module_dir @@ -60,7 +81,15 @@ fn run_module_benchmark(module: &BenchmarkModule, cli_path: &Path, config_path: generate_module_bindings(module, cli_path, config_path)?; seed_accounts(cli_path, config_path, server_url)?; let runs_dir = tempfile::tempdir().context("failed to create temporary benchmark runs directory")?; + let transfer_fuel_before = transfer_fuel_total(server_url)?; run_benchmark(module, server_url, runs_dir.path())?; + let transfer_fuel_after = transfer_fuel_total(server_url)?; + let transfer_fuel_total = transfer_fuel_after - transfer_fuel_before; + ensure!( + transfer_fuel_total > 0.0, + "{} keynote benchmark did not record any fuel for the {TRANSFER_REDUCER} reducer", + module.label + ); let result_path = find_result_json(runs_dir.path())?; let result_json = fs::read_to_string(&result_path) @@ -79,12 +108,16 @@ fn run_module_benchmark(module: &BenchmarkModule, cli_path: &Path, config_path: } println!( - "Keynote perf check passed for {} module: throughput {tps:.0} TPS >= {:.0} TPS ({})", + "Keynote perf check passed for {} module: throughput {tps:.0} TPS >= {:.0} TPS; \ + {TRANSFER_REDUCER} fuel total {transfer_fuel_total:.0} ({})", module.label, module.min_tps, result_path.display() ); - Ok(()) + Ok(BenchmarkResult { + label: module.label, + transfer_fuel_total, + }) } fn publish_module(module: &BenchmarkModule, cli_path: &Path, config_path: &Path, server_url: &str) -> Result<()> { @@ -214,3 +247,72 @@ fn result_tps(result_json: &str) -> Result { .and_then(Value::as_f64) .context("benchmark result JSON is missing results[0].res.tps") } + +fn transfer_fuel_total(server_url: &str) -> Result { + let metrics_url = format!("{}/v1/metrics", server_url.trim_end_matches('/')); + let metrics = reqwest::blocking::get(&metrics_url) + .with_context(|| format!("failed to fetch metrics from {metrics_url}"))? + .error_for_status() + .with_context(|| format!("metrics request to {metrics_url} failed"))? + .text() + .context("failed to read metrics response body")?; + + let transfer_label = format!(r#"reducer="{TRANSFER_REDUCER}""#); + let mut total = 0.0; + for line in metrics.lines() { + if !is_reducer_fuel_metric_line(line) || !line.contains(&transfer_label) { + continue; + } + let value = line + .split_whitespace() + .nth(1) + .with_context(|| format!("malformed {REDUCER_FUEL_METRIC} metric line: {line}"))? + .parse::() + .with_context(|| format!("invalid {REDUCER_FUEL_METRIC} metric value in line: {line}"))?; + total += value; + } + Ok(total) +} + +fn is_reducer_fuel_metric_line(line: &str) -> bool { + line.starts_with(REDUCER_FUEL_METRIC) || line.starts_with(REDUCER_FUEL_METRIC_TOTAL) +} + +fn check_transfer_fuel_ratio(results: &[BenchmarkResult]) -> Result<()> { + ensure!( + results.len() == 2, + "expected exactly two keynote benchmark results to compare fuel usage, got {}", + results.len() + ); + let [first, second] = results else { + unreachable!("length was checked above") + }; + + let higher = first.transfer_fuel_total.max(second.transfer_fuel_total); + let lower = first.transfer_fuel_total.min(second.transfer_fuel_total); + ensure!( + lower > 0.0, + "keynote benchmark fuel totals must be nonzero: {}={:.0}, {}={:.0}", + first.label, + first.transfer_fuel_total, + second.label, + second.transfer_fuel_total + ); + + let ratio = higher / lower; + println!( + "Keynote transfer fuel comparison: {}={:.0}, {}={:.0}, relative difference {ratio:.2}x", + first.label, first.transfer_fuel_total, second.label, second.transfer_fuel_total + ); + ensure!( + ratio < MAX_FUEL_RATIO, + "keynote benchmark transfer fuel totals differ by {ratio:.2}x, expected strictly less than {MAX_FUEL_RATIO}x: \ + {}={:.0}, {}={:.0}", + first.label, + first.transfer_fuel_total, + second.label, + second.transfer_fuel_total + ); + + Ok(()) +}