From 14c570da4f09c153792e6c98ba6ef2eb2fdeabe0 Mon Sep 17 00:00:00 2001 From: = Date: Mon, 26 May 2025 20:12:51 -0400 Subject: [PATCH 1/4] Small non-functional change to make datastore extraction easier --- .../datastore/locking_tx_datastore/datastore.rs | 17 ++++++++--------- crates/core/src/db/relational_db.rs | 11 ++++++++--- crates/core/src/host/module_host.rs | 2 +- .../src/host/wasm_common/module_host_actor.rs | 4 ++-- crates/core/src/sql/execute.rs | 2 +- crates/core/src/subscription/mod.rs | 11 ++++++++++- .../subscription/module_subscription_actor.rs | 12 ++++++------ 7 files changed, 36 insertions(+), 23 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs index 747cb054ef0..7cbca35f8ad 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs @@ -702,6 +702,10 @@ struct TableStats { num_indices: usize, } +pub trait MetricsRecorder { + fn record(&self, metrics: &ExecutionMetrics); +} + impl TxMetrics { /// Compute transaction metrics that we can report once the tx lock is released. pub(super) fn new( @@ -751,11 +755,11 @@ impl TxMetrics { } /// Reports the metrics for `reducer` using `get_exec_counter` to retrieve the metrics counters. - pub fn report<'a>( + pub fn report<'a, R: MetricsRecorder + 'a>( &self, tx_data: Option<&TxData>, reducer: &str, - get_exec_counter: impl FnOnce(WorkloadType) -> &'a ExecutionCounters, + get_exec_counter: impl FnOnce(WorkloadType) -> &'a R, ) { let workload = &self.workload; let db = &self.database_identity; @@ -844,11 +848,6 @@ impl TxMetrics { } } } - - /// Reports the metrics for `reducer`, using counters provided by `db`. - pub(crate) fn report_with_db(&self, reducer: &str, db: &RelationalDB, tx_data: Option<&TxData>) { - self.report(tx_data, reducer, |wl| db.exec_counters_for(wl)); - } } /// Reports the `TxMetrics`s passed. @@ -862,9 +861,9 @@ pub fn report_tx_metricses( metrics_read: &TxMetrics, ) { if let Some(metrics_mut) = metrics_mut { - metrics_mut.report_with_db(reducer, db, tx_data); + db.report(reducer, metrics_mut, tx_data); } - metrics_read.report_with_db(reducer, db, None); + db.report(reducer, metrics_read, None); } impl MutTx for Locking { diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 9b001bb50a7..3b6b349010c 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -983,11 +983,11 @@ impl RelationalDB { { if res.is_err() { let (tx_metrics, reducer) = self.rollback_mut_tx(tx); - tx_metrics.report_with_db(&reducer, self, None); + self.report(&reducer, &tx_metrics, None); } else { match self.commit_tx(tx).map_err(E::from)? { Some((tx_data, tx_metrics, reducer)) => { - tx_metrics.report_with_db(&reducer, self, Some(&tx_data)); + self.report(&reducer, &tx_metrics, Some(&tx_data)); } None => panic!("TODO: retry?"), } @@ -1002,7 +1002,7 @@ impl RelationalDB { match res { Err(e) => { let (tx_metrics, reducer) = self.rollback_mut_tx(tx); - tx_metrics.report_with_db(&reducer, self, None); + self.report(&reducer, &tx_metrics, None); Err(e) } @@ -1360,6 +1360,11 @@ impl RelationalDB { pub fn drop_constraint(&self, tx: &mut MutTx, constraint_id: ConstraintId) -> Result<(), DBError> { self.inner.drop_constraint_mut_tx(tx, constraint_id) } + + /// Reports the metrics for `reducer`, using counters provided by `db`. + pub fn report(&self, reducer: &str, metrics: &TxMetrics, tx_data: Option<&TxData>) { + metrics.report(tx_data, reducer, |wl: WorkloadType| self.exec_counters_for(wl)); + } } #[allow(unused)] diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index a6c653fd315..17ae5adeac5 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -364,7 +364,7 @@ fn init_database( let rcr = match module_def.lifecycle_reducer(Lifecycle::Init) { None => { if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { - tx_metrics.report_with_db(&reducer, stdb, Some(&tx_data)); + stdb.report(&reducer, &tx_metrics, Some(&tx_data)); } None } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 4a1249e9ad0..e53e270e0b6 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -290,12 +290,12 @@ impl ModuleInstance for WasmModuleInstance { log::warn!("Database update failed: {} @ {}", e, stdb.database_identity()); self.system_logger().warn(&format!("Database update failed: {e}")); let (tx_metrics, reducer) = stdb.rollback_mut_tx(tx); - tx_metrics.report_with_db(&reducer, stdb, None); + stdb.report(&reducer, &tx_metrics, None); Ok(UpdateDatabaseResult::ErrorExecutingMigration(e)) } Ok(()) => { if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { - tx_metrics.report_with_db(&reducer, stdb, Some(&tx_data)); + stdb.report(&reducer, &tx_metrics, Some(&tx_data)); } self.system_logger().info("Database updated"); log::info!("Database updated, {}", stdb.database_identity()); diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index b570c9d4178..1f1f5230f10 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -255,7 +255,7 @@ pub fn run( let metrics = tx.metrics; return db.commit_tx(tx).map(|tx_opt| { if let Some((tx_data, tx_metrics, reducer)) = tx_opt { - tx_metrics.report_with_db(&reducer, db, Some(&tx_data)); + db.report(&reducer, &tx_metrics, Some(&tx_data)); } SqlResult { rows: vec![], metrics } }); diff --git a/crates/core/src/subscription/mod.rs b/crates/core/src/subscription/mod.rs index 9b185a855ae..2b5078dbff6 100644 --- a/crates/core/src/subscription/mod.rs +++ b/crates/core/src/subscription/mod.rs @@ -12,7 +12,10 @@ use spacetimedb_lib::{metrics::ExecutionMetrics, Identity}; use spacetimedb_primitives::TableId; use crate::{ - db::db_metrics::DB_METRICS, error::DBError, execution_context::WorkloadType, worker_metrics::WORKER_METRICS, + db::{datastore::locking_tx_datastore::datastore::MetricsRecorder, db_metrics::DB_METRICS}, + error::DBError, + execution_context::WorkloadType, + worker_metrics::WORKER_METRICS, }; pub mod delta; @@ -84,6 +87,12 @@ impl ExecutionCounters { } } +impl MetricsRecorder for ExecutionCounters { + fn record(&self, metrics: &ExecutionMetrics) { + self.record(metrics); + } +} + /// Execute a subscription query pub fn execute_plan(plan_fragments: &[PipelinedProject], tx: &Tx) -> Result<(F::List, u64, ExecutionMetrics)> where diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 2924c7199f5..3ca8c1b5d04 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -264,7 +264,7 @@ impl ModuleSubscriptions { let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Subscribe), |tx| { let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - tx_metrics.report_with_db(&reducer, &self.relational_db, None); + self.relational_db.report(&reducer, &tx_metrics, None); }); let existing_query = { @@ -357,7 +357,7 @@ impl ModuleSubscriptions { let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Unsubscribe), |tx| { let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - tx_metrics.report_with_db(&reducer, &self.relational_db, None); + self.relational_db.report(&reducer, &tx_metrics, None); }); let auth = AuthCtx::new(self.owner_identity, sender.id.identity); let (table_rows, metrics) = return_on_err_with_sql!( @@ -403,7 +403,7 @@ impl ModuleSubscriptions { // Always lock the db before the subscription lock to avoid deadlocks. let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Unsubscribe), |tx| { let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - tx_metrics.report_with_db(&reducer, &self.relational_db, None); + self.relational_db.report(&reducer, &tx_metrics, None); }); let removed_queries = { @@ -475,7 +475,7 @@ impl ModuleSubscriptions { // We always get the db lock before the subscription lock to avoid deadlocks. let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Subscribe), |tx| { let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - tx_metrics.report_with_db(&reducer, &self.relational_db, None); + self.relational_db.report(&reducer, &tx_metrics, None); }); let guard = self.subscriptions.read(); @@ -536,7 +536,7 @@ impl ModuleSubscriptions { ); let tx = scopeguard::guard(tx, |tx| { let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - tx_metrics.report_with_db(&reducer, &self.relational_db, None); + self.relational_db.report(&reducer, &tx_metrics, None); }); // We minimize locking so that other clients can add subscriptions concurrently. @@ -592,7 +592,7 @@ impl ModuleSubscriptions { let (queries, auth, tx) = self.compile_queries(sender.id.identity, subscription.query_strings, num_queries)?; let tx = scopeguard::guard(tx, |tx| { let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - tx_metrics.report_with_db(&reducer, &self.relational_db, None); + self.relational_db.report(&reducer, &tx_metrics, None); }); check_row_limit( From 917fd66e3f76ca6593289cfd6ac84882e88a3c7a Mon Sep 17 00:00:00 2001 From: = Date: Mon, 26 May 2025 20:48:31 -0400 Subject: [PATCH 2/4] Fixed report_tx_metricses function --- .../locking_tx_datastore/datastore.rs | 21 +------------------ crates/core/src/db/relational_db.rs | 20 ++++++++++++++++-- crates/core/src/sql/execute.rs | 9 +------- .../subscription/module_subscription_actor.rs | 9 +------- 4 files changed, 21 insertions(+), 38 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs index 7cbca35f8ad..ae4a175c09b 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs @@ -6,6 +6,7 @@ use super::{ tx::TxId, tx_state::TxState, }; +use crate::execution_context::{Workload, WorkloadType}; use crate::{ db::datastore::{ locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx}, @@ -13,10 +14,6 @@ use crate::{ }, subscription::ExecutionCounters, }; -use crate::{ - db::relational_db::RelationalDB, - execution_context::{Workload, WorkloadType}, -}; use crate::{ db::{ datastore::{ @@ -850,22 +847,6 @@ impl TxMetrics { } } -/// Reports the `TxMetrics`s passed. -/// -/// Should only be called after the tx lock has been fully released. -pub fn report_tx_metricses( - reducer: &str, - db: &RelationalDB, - tx_data: Option<&TxData>, - metrics_mut: Option<&TxMetrics>, - metrics_read: &TxMetrics, -) { - if let Some(metrics_mut) = metrics_mut { - db.report(reducer, metrics_mut, tx_data); - } - db.report(reducer, metrics_read, None); -} - impl MutTx for Locking { type MutTx = MutTxId; diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 3b6b349010c..95a51917781 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1,5 +1,5 @@ use super::datastore::locking_tx_datastore::committed_state::CommittedState; -use super::datastore::locking_tx_datastore::datastore::{report_tx_metricses, TxMetrics}; +use super::datastore::locking_tx_datastore::datastore::TxMetrics; use super::datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView, }; @@ -972,7 +972,7 @@ impl RelationalDB { let mut tx = self.begin_tx(workload); let res = f(&mut tx); let (tx_metics, reducer) = self.release_tx(tx); - report_tx_metricses(&reducer, self, None, None, &tx_metics); + self.report_tx_metricses(&reducer, None, None, &tx_metics); res } @@ -1013,6 +1013,22 @@ impl RelationalDB { pub(crate) fn alter_table_access(&self, tx: &mut MutTx, name: Box, access: StAccess) -> Result<(), DBError> { self.inner.alter_table_access_mut_tx(tx, name, access) } + + /// Reports the `TxMetrics`s passed. + /// + /// Should only be called after the tx lock has been fully released. + pub(crate) fn report_tx_metricses( + &self, + reducer: &str, + tx_data: Option<&TxData>, + metrics_mut: Option<&TxMetrics>, + metrics_read: &TxMetrics, + ) { + if let Some(metrics_mut) = metrics_mut { + self.report(reducer, metrics_mut, tx_data); + } + self.report(reducer, metrics_read, None); + } } impl RelationalDB { diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 1f1f5230f10..26d72b86cb8 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -1,7 +1,6 @@ use std::time::Duration; use super::ast::SchemaViewer; -use crate::db::datastore::locking_tx_datastore::datastore::report_tx_metricses; use crate::db::datastore::locking_tx_datastore::state_view::StateView; use crate::db::datastore::system_tables::StVarTable; use crate::db::datastore::traits::IsolationLevel; @@ -204,13 +203,7 @@ pub fn run( // Release the tx on drop, so that we record metrics. let mut tx = scopeguard::guard(tx, |tx| { let (tx_metrics_downgrade, reducer) = db.release_tx(tx); - report_tx_metricses( - &reducer, - db, - Some(&tx_data), - Some(&tx_metrics_mut), - &tx_metrics_downgrade, - ); + db.report_tx_metricses(&reducer, Some(&tx_data), Some(&tx_metrics_mut), &tx_metrics_downgrade); }); // Compute the header for the result set diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 3ca8c1b5d04..a84b4718702 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -8,7 +8,6 @@ use crate::client::messages::{ SubscriptionUpdateMessage, TransactionUpdateMessage, }; use crate::client::{ClientActorId, ClientConnectionSender, Protocol}; -use crate::db::datastore::locking_tx_datastore::datastore::report_tx_metricses; use crate::db::datastore::locking_tx_datastore::tx::TxId; use crate::db::db_metrics::DB_METRICS; use crate::db::relational_db::{MutTx, RelationalDB, Tx}; @@ -677,13 +676,7 @@ impl ModuleSubscriptions { // When we're done with this method, release the tx and report metrics. let mut read_tx = scopeguard::guard(read_tx, |tx| { let (tx_metrics_read, reducer) = self.relational_db.release_tx(tx); - report_tx_metricses( - &reducer, - stdb, - tx_data.as_ref(), - Some(&tx_metrics_mut), - &tx_metrics_read, - ); + stdb.report_tx_metricses(&reducer, tx_data.as_ref(), Some(&tx_metrics_mut), &tx_metrics_read); }); // Create the delta transaction we'll use to eval updates against. let delta_read_tx = tx_data From 41fa66f057865badf0c7afc2933f4033e2a0ec09 Mon Sep 17 00:00:00 2001 From: = Date: Mon, 26 May 2025 20:54:17 -0400 Subject: [PATCH 3/4] Removed a reference to ExecutionCounters in the datastore module --- .../locking_tx_datastore/datastore.rs | 24 ++++--------------- crates/core/src/db/relational_db.rs | 9 ++++++- 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs index ae4a175c09b..b29afce31a5 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs @@ -6,14 +6,11 @@ use super::{ tx::TxId, tx_state::TxState, }; -use crate::execution_context::{Workload, WorkloadType}; -use crate::{ - db::datastore::{ - locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx}, - traits::{InsertFlags, UpdateFlags}, - }, - subscription::ExecutionCounters, +use crate::db::datastore::{ + locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx}, + traits::{InsertFlags, UpdateFlags}, }; +use crate::execution_context::{Workload, WorkloadType}; use crate::{ db::{ datastore::{ @@ -33,7 +30,6 @@ use crate::{ }; use anyhow::{anyhow, Context}; use core::{cell::RefCell, ops::RangeBounds}; -use enum_map::EnumMap; use parking_lot::{Mutex, RwLock}; use spacetimedb_commitlog::payload::{txdata, Txdata}; use spacetimedb_data_structures::map::{HashCollectionExt, HashMap}; @@ -71,9 +67,6 @@ pub struct Locking { sequence_state: Arc>, /// The identity of this database. pub(crate) database_identity: Identity, - - /// A map from workload types to their cached prometheus counters. - workload_type_to_exec_counters: Arc>, } impl MemoryUsage for Locking { @@ -82,7 +75,6 @@ impl MemoryUsage for Locking { committed_state, sequence_state, database_identity, - workload_type_to_exec_counters: _, } = self; std::mem::size_of_val(&**committed_state) + committed_state.read().heap_usage() @@ -94,14 +86,10 @@ impl MemoryUsage for Locking { impl Locking { pub fn new(database_identity: Identity, page_pool: PagePool) -> Self { - let workload_type_to_exec_counters = - Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity))); - Self { committed_state: Arc::new(RwLock::new(CommittedState::new(page_pool))), sequence_state: <_>::default(), database_identity, - workload_type_to_exec_counters, } } @@ -318,10 +306,6 @@ impl Locking { tx.alter_table_access(table_id, access) } - - pub(crate) fn exec_counters_for(&self, workload_type: WorkloadType) -> &ExecutionCounters { - &self.workload_type_to_exec_counters[workload_type] - } } impl DataRow for Locking { diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 95a51917781..4b874801d94 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -23,6 +23,7 @@ use crate::messages::control_db::HostType; use crate::subscription::ExecutionCounters; use crate::util::{asyncify, spawn_rayon}; use anyhow::{anyhow, Context}; +use enum_map::EnumMap; use fs2::FileExt; use futures::channel::mpsc; use futures::StreamExt; @@ -107,6 +108,9 @@ pub struct RelationalDB { // We want to release the file lock last. // TODO(noa): is this lockfile still necessary now that we have data-dir? _lock: LockFile, + + /// A map from workload types to their cached prometheus counters. + workload_type_to_exec_counters: Arc>, } #[derive(Clone)] @@ -224,6 +228,8 @@ impl RelationalDB { let (durability, disk_size_fn) = durability.unzip(); let snapshot_worker = snapshot_repo.map(|repo| SnapshotWorker::new(inner.committed_state.clone(), repo.clone())); + let workload_type_to_exec_counters = + Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity))); Self { inner, @@ -237,6 +243,7 @@ impl RelationalDB { disk_size_fn, _lock: lock, + workload_type_to_exec_counters, } } @@ -735,7 +742,7 @@ impl RelationalDB { /// Returns the execution counters for `workload_type` for this database. pub fn exec_counters_for(&self, workload_type: WorkloadType) -> &ExecutionCounters { - self.inner.exec_counters_for(workload_type) + &self.workload_type_to_exec_counters[workload_type] } /// Begin a transaction. From 4a28bf0a8706b646db0fed3dbaeb931463076a43 Mon Sep 17 00:00:00 2001 From: = Date: Mon, 26 May 2025 20:29:28 -0400 Subject: [PATCH 4/4] Remove StVarTable in furtherance of datastore extraction --- crates/core/src/db/datastore/system_tables.rs | 95 ------------------- crates/core/src/db/relational_db.rs | 94 +++++++++++++++++- crates/core/src/sql/execute.rs | 3 +- crates/core/src/vm.rs | 8 +- 4 files changed, 97 insertions(+), 103 deletions(-) diff --git a/crates/core/src/db/datastore/system_tables.rs b/crates/core/src/db/datastore/system_tables.rs index 9039d66668c..bbbe71f450b 100644 --- a/crates/core/src/db/datastore/system_tables.rs +++ b/crates/core/src/db/datastore/system_tables.rs @@ -11,7 +11,6 @@ //! - Use [`st_fields_enum`] to define its column enum. //! - Register its schema in [`system_module_def`], making sure to call `validate_system_table` at the end of the function. -use crate::db::relational_db::RelationalDB; use crate::error::DBError; use spacetimedb_lib::db::auth::{StAccess, StTableType}; use spacetimedb_lib::db::raw_def::v9::{btree, RawSql}; @@ -21,7 +20,6 @@ use spacetimedb_lib::ser::Serialize; use spacetimedb_lib::st_var::StVarValue; use spacetimedb_lib::{ConnectionId, Identity, ProductValue, SpacetimeType}; use spacetimedb_primitives::*; -use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type; use spacetimedb_sats::algebraic_value::ser::value_serialize; use spacetimedb_sats::hash::Hash; use spacetimedb_sats::product_value::InvalidFieldError; @@ -34,16 +32,11 @@ use spacetimedb_schema::schema::{ TableSchema, }; use spacetimedb_table::table::RowRef; -use spacetimedb_vm::errors::{ErrorType, ErrorVm}; -use spacetimedb_vm::ops::parse; use std::cell::RefCell; use std::str::FromStr; use strum::Display; use v9::{RawModuleDefV9Builder, TableType}; -use super::locking_tx_datastore::tx::TxId; -use super::locking_tx_datastore::MutTxId; - /// The static ID of the table that defines tables pub(crate) const ST_TABLE_ID: TableId = TableId(1); /// The static ID of the table that defines columns @@ -950,81 +943,6 @@ impl TryFrom> for StClientRow { } } -/// A handle for reading system variables from `st_var` -pub struct StVarTable; - -impl StVarTable { - /// Read the value of [ST_VARNAME_ROW_LIMIT] from `st_var` - pub fn row_limit(db: &RelationalDB, tx: &TxId) -> Result, DBError> { - let data = Self::read_var(db, tx, StVarName::RowLimit); - - if let Some(StVarValue::U64(limit)) = data? { - return Ok(Some(limit)); - } - Ok(None) - } - - /// Read the value of [ST_VARNAME_SLOW_QRY] from `st_var` - pub fn query_limit(db: &RelationalDB, tx: &TxId) -> Result, DBError> { - if let Some(StVarValue::U64(ms)) = Self::read_var(db, tx, StVarName::SlowQryThreshold)? { - return Ok(Some(ms)); - } - Ok(None) - } - - /// Read the value of [ST_VARNAME_SLOW_SUB] from `st_var` - pub fn sub_limit(db: &RelationalDB, tx: &TxId) -> Result, DBError> { - if let Some(StVarValue::U64(ms)) = Self::read_var(db, tx, StVarName::SlowSubThreshold)? { - return Ok(Some(ms)); - } - Ok(None) - } - - /// Read the value of [ST_VARNAME_SLOW_INC] from `st_var` - pub fn incr_limit(db: &RelationalDB, tx: &TxId) -> Result, DBError> { - if let Some(StVarValue::U64(ms)) = Self::read_var(db, tx, StVarName::SlowIncThreshold)? { - return Ok(Some(ms)); - } - Ok(None) - } - - /// Read the value of a system variable from `st_var` - pub fn read_var(db: &RelationalDB, tx: &TxId, name: StVarName) -> Result, DBError> { - if let Some(row_ref) = db - .iter_by_col_eq(tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())? - .next() - { - return Ok(Some(StVarRow::try_from(row_ref)?.value)); - } - Ok(None) - } - - /// Update the value of a system variable in `st_var` - pub fn write_var(db: &RelationalDB, tx: &mut MutTxId, name: StVarName, literal: &str) -> Result<(), DBError> { - let value = Self::parse_var(name, literal)?; - if let Some(row_ref) = db - .iter_by_col_eq_mut(tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())? - .next() - { - db.delete(tx, ST_VAR_ID, [row_ref.pointer()]); - } - tx.insert_via_serialize_bsatn(ST_VAR_ID, &StVarRow { name, value })?; - Ok(()) - } - - /// Parse the literal representation of a system variable - fn parse_var(name: StVarName, literal: &str) -> Result { - StVarValue::try_from_primitive(parse::parse(literal, &name.type_of())?).map_err(|v| { - ErrorVm::Type(ErrorType::Parse { - value: literal.to_string(), - ty: fmt_algebraic_type(&name.type_of()).to_string(), - err: format!("error parsing value: {:?}", v), - }) - .into() - }) - } -} - /// System table [ST_VAR_NAME] /// /// | name | value | @@ -1210,19 +1128,6 @@ fn to_product_value(value: &T) -> ProductValue { #[cfg(test)] mod tests { use super::*; - use crate::db::relational_db::tests_utils::{with_auto_commit, with_read_only, TestDB}; - - #[test] - fn test_system_variables() { - let db = TestDB::durable().expect("failed to create db"); - let _ = with_auto_commit(&db, |tx| StVarTable::write_var(&db, tx, StVarName::RowLimit, "5")); - assert_eq!( - 5, - with_read_only(&db, |tx| StVarTable::row_limit(&db, tx)) - .expect("failed to read from st_var") - .expect("row_limit does not exist") - ); - } #[test] fn test_sequences_within_reserved_range() { diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 4b874801d94..befa9c8303d 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -3,7 +3,7 @@ use super::datastore::locking_tx_datastore::datastore::TxMetrics; use super::datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView, }; -use super::datastore::system_tables::ST_MODULE_ID; +use super::datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID}; use super::datastore::traits::{ InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore, UpdateFlags, @@ -32,10 +32,12 @@ use spacetimedb_commitlog as commitlog; use spacetimedb_durability::{self as durability, TxOffset}; use spacetimedb_lib::db::auth::StAccess; use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql}; +use spacetimedb_lib::st_var::StVarValue; use spacetimedb_lib::ConnectionId; use spacetimedb_lib::Identity; use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath}; use spacetimedb_primitives::*; +use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type; use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue}; use spacetimedb_schema::def::{ModuleDef, TableDef}; use spacetimedb_schema::schema::{IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema}; @@ -44,6 +46,8 @@ use spacetimedb_table::indexes::RowPointer; use spacetimedb_table::page_pool::PagePool; use spacetimedb_table::table::RowRef; use spacetimedb_table::MemoryUsage; +use spacetimedb_vm::errors::{ErrorType, ErrorVm}; +use spacetimedb_vm::ops::parse; use std::borrow::Cow; use std::collections::HashSet; use std::fmt; @@ -1388,6 +1392,78 @@ impl RelationalDB { pub fn report(&self, reducer: &str, metrics: &TxMetrics, tx_data: Option<&TxData>) { metrics.report(tx_data, reducer, |wl: WorkloadType| self.exec_counters_for(wl)); } + + /// Read the value of [ST_VARNAME_ROW_LIMIT] from `st_var` + pub(crate) fn row_limit(&self, tx: &Tx) -> Result, DBError> { + let data = self.read_var(tx, StVarName::RowLimit); + + if let Some(StVarValue::U64(limit)) = data? { + return Ok(Some(limit)); + } + Ok(None) + } + + /// Read the value of [ST_VARNAME_SLOW_QRY] from `st_var` + pub(crate) fn query_limit(&self, tx: &Tx) -> Result, DBError> { + if let Some(StVarValue::U64(ms)) = self.read_var(tx, StVarName::SlowQryThreshold)? { + return Ok(Some(ms)); + } + Ok(None) + } + + /// Read the value of [ST_VARNAME_SLOW_SUB] from `st_var` + #[allow(dead_code)] + pub(crate) fn sub_limit(&self, tx: &Tx) -> Result, DBError> { + if let Some(StVarValue::U64(ms)) = self.read_var(tx, StVarName::SlowSubThreshold)? { + return Ok(Some(ms)); + } + Ok(None) + } + + /// Read the value of [ST_VARNAME_SLOW_INC] from `st_var` + #[allow(dead_code)] + pub(crate) fn incr_limit(&self, tx: &Tx) -> Result, DBError> { + if let Some(StVarValue::U64(ms)) = self.read_var(tx, StVarName::SlowIncThreshold)? { + return Ok(Some(ms)); + } + Ok(None) + } + + /// Read the value of a system variable from `st_var` + pub(crate) fn read_var(&self, tx: &Tx, name: StVarName) -> Result, DBError> { + if let Some(row_ref) = self + .iter_by_col_eq(tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())? + .next() + { + return Ok(Some(StVarRow::try_from(row_ref)?.value)); + } + Ok(None) + } + + /// Update the value of a system variable in `st_var` + pub(crate) fn write_var(&self, tx: &mut MutTx, name: StVarName, literal: &str) -> Result<(), DBError> { + let value = Self::parse_var(name, literal)?; + if let Some(row_ref) = self + .iter_by_col_eq_mut(tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())? + .next() + { + self.delete(tx, ST_VAR_ID, [row_ref.pointer()]); + } + tx.insert_via_serialize_bsatn(ST_VAR_ID, &StVarRow { name, value })?; + Ok(()) + } + + /// Parse the literal representation of a system variable + fn parse_var(name: StVarName, literal: &str) -> Result { + StVarValue::try_from_primitive(parse::parse(literal, &name.type_of())?).map_err(|v| { + ErrorVm::Type(ErrorType::Parse { + value: literal.to_string(), + ty: fmt_algebraic_type(&name.type_of()).to_string(), + err: format!("error parsing value: {:?}", v), + }) + .into() + }) + } } #[allow(unused)] @@ -1948,7 +2024,9 @@ mod tests { system_tables, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_SEQUENCE_ID, ST_TABLE_ID, }; - use crate::db::relational_db::tests_utils::{begin_tx, insert, make_snapshot, TestDB}; + use crate::db::relational_db::tests_utils::{ + begin_tx, insert, make_snapshot, with_auto_commit, with_read_only, TestDB, + }; use crate::error::IndexError; use crate::execution_context::ReducerContext; use anyhow::bail; @@ -2032,6 +2110,18 @@ mod tests { Ok(()) } + #[test] + fn test_system_variables() { + let db = TestDB::durable().expect("failed to create db"); + let _ = with_auto_commit(&db, |tx| db.write_var(tx, StVarName::RowLimit, "5")); + assert_eq!( + 5, + with_read_only(&db, |tx| db.row_limit(tx)) + .expect("failed to read from st_var") + .expect("row_limit does not exist") + ); + } + #[test] fn test_open_twice() -> ResultTest<()> { let stdb = TestDB::durable()?; diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 26d72b86cb8..4e1b8ed02e7 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -2,7 +2,6 @@ use std::time::Duration; use super::ast::SchemaViewer; use crate::db::datastore::locking_tx_datastore::state_view::StateView; -use crate::db::datastore::system_tables::StVarTable; use crate::db::datastore::traits::IsolationLevel; use crate::db::relational_db::{RelationalDB, Tx}; use crate::energy::EnergyQuanta; @@ -72,7 +71,7 @@ fn execute( updates: &mut Vec, ) -> Result, DBError> { let slow_query_threshold = if let TxMode::Tx(tx) = p.tx { - StVarTable::query_limit(p.db, tx)?.map(Duration::from_millis) + p.db.query_limit(tx)?.map(Duration::from_millis) } else { None }; diff --git a/crates/core/src/vm.rs b/crates/core/src/vm.rs index 4c03bf4d513..07b67b20863 100644 --- a/crates/core/src/vm.rs +++ b/crates/core/src/vm.rs @@ -3,7 +3,7 @@ use crate::db::datastore::locking_tx_datastore::state_view::IterByColRangeMutTx; use crate::db::datastore::locking_tx_datastore::tx::TxId; use crate::db::datastore::locking_tx_datastore::IterByColRangeTx; -use crate::db::datastore::system_tables::{st_var_schema, StVarName, StVarRow, StVarTable}; +use crate::db::datastore::system_tables::{st_var_schema, StVarName, StVarRow}; use crate::db::relational_db::{MutTx, RelationalDB, Tx}; use crate::error::DBError; use crate::estimation; @@ -467,7 +467,7 @@ pub fn check_row_limit( auth: &AuthCtx, ) -> Result<(), DBError> { if auth.caller != auth.owner { - if let Some(limit) = StVarTable::row_limit(db, tx)? { + if let Some(limit) = db.row_limit(tx)? { let mut estimate: u64 = 0; for query in queries { estimate = estimate.saturating_add(row_est(query, tx)); @@ -603,7 +603,7 @@ impl<'db, 'tx> DbProgram<'db, 'tx> { fn _set_var(&mut self, name: String, literal: String) -> Result { let tx = self.tx.unwrap_mut(); - StVarTable::write_var(self.db, tx, StVarName::from_str(&name)?, &literal)?; + self.db.write_var(tx, StVarName::from_str(&name)?, &literal)?; Ok(Code::Pass(None)) } @@ -611,7 +611,7 @@ impl<'db, 'tx> DbProgram<'db, 'tx> { fn read_key_into_table(env: &DbProgram, name: &str) -> Result { if let TxMode::Tx(tx) = &env.tx { let name = StVarName::from_str(name)?; - if let Some(value) = StVarTable::read_var(env.db, tx, name)? { + if let Some(value) = env.db.read_var(tx, name)? { return Ok(MemTable::from_iter( Arc::new(st_var_schema().into()), [ProductValue::from(StVarRow { name, value })],