diff --git a/crates/core/src/db/datastore/system_tables.rs b/crates/core/src/db/datastore/system_tables.rs index 9039d66668c..bbbe71f450b 100644 --- a/crates/core/src/db/datastore/system_tables.rs +++ b/crates/core/src/db/datastore/system_tables.rs @@ -11,7 +11,6 @@ //! - Use [`st_fields_enum`] to define its column enum. //! - Register its schema in [`system_module_def`], making sure to call `validate_system_table` at the end of the function. -use crate::db::relational_db::RelationalDB; use crate::error::DBError; use spacetimedb_lib::db::auth::{StAccess, StTableType}; use spacetimedb_lib::db::raw_def::v9::{btree, RawSql}; @@ -21,7 +20,6 @@ use spacetimedb_lib::ser::Serialize; use spacetimedb_lib::st_var::StVarValue; use spacetimedb_lib::{ConnectionId, Identity, ProductValue, SpacetimeType}; use spacetimedb_primitives::*; -use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type; use spacetimedb_sats::algebraic_value::ser::value_serialize; use spacetimedb_sats::hash::Hash; use spacetimedb_sats::product_value::InvalidFieldError; @@ -34,16 +32,11 @@ use spacetimedb_schema::schema::{ TableSchema, }; use spacetimedb_table::table::RowRef; -use spacetimedb_vm::errors::{ErrorType, ErrorVm}; -use spacetimedb_vm::ops::parse; use std::cell::RefCell; use std::str::FromStr; use strum::Display; use v9::{RawModuleDefV9Builder, TableType}; -use super::locking_tx_datastore::tx::TxId; -use super::locking_tx_datastore::MutTxId; - /// The static ID of the table that defines tables pub(crate) const ST_TABLE_ID: TableId = TableId(1); /// The static ID of the table that defines columns @@ -950,81 +943,6 @@ impl TryFrom> for StClientRow { } } -/// A handle for reading system variables from `st_var` -pub struct StVarTable; - -impl StVarTable { - /// Read the value of [ST_VARNAME_ROW_LIMIT] from `st_var` - pub fn row_limit(db: &RelationalDB, tx: &TxId) -> Result, DBError> { - let data = Self::read_var(db, tx, StVarName::RowLimit); - - if let Some(StVarValue::U64(limit)) = data? { - return Ok(Some(limit)); - } - Ok(None) - } - - /// Read the value of [ST_VARNAME_SLOW_QRY] from `st_var` - pub fn query_limit(db: &RelationalDB, tx: &TxId) -> Result, DBError> { - if let Some(StVarValue::U64(ms)) = Self::read_var(db, tx, StVarName::SlowQryThreshold)? { - return Ok(Some(ms)); - } - Ok(None) - } - - /// Read the value of [ST_VARNAME_SLOW_SUB] from `st_var` - pub fn sub_limit(db: &RelationalDB, tx: &TxId) -> Result, DBError> { - if let Some(StVarValue::U64(ms)) = Self::read_var(db, tx, StVarName::SlowSubThreshold)? { - return Ok(Some(ms)); - } - Ok(None) - } - - /// Read the value of [ST_VARNAME_SLOW_INC] from `st_var` - pub fn incr_limit(db: &RelationalDB, tx: &TxId) -> Result, DBError> { - if let Some(StVarValue::U64(ms)) = Self::read_var(db, tx, StVarName::SlowIncThreshold)? { - return Ok(Some(ms)); - } - Ok(None) - } - - /// Read the value of a system variable from `st_var` - pub fn read_var(db: &RelationalDB, tx: &TxId, name: StVarName) -> Result, DBError> { - if let Some(row_ref) = db - .iter_by_col_eq(tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())? - .next() - { - return Ok(Some(StVarRow::try_from(row_ref)?.value)); - } - Ok(None) - } - - /// Update the value of a system variable in `st_var` - pub fn write_var(db: &RelationalDB, tx: &mut MutTxId, name: StVarName, literal: &str) -> Result<(), DBError> { - let value = Self::parse_var(name, literal)?; - if let Some(row_ref) = db - .iter_by_col_eq_mut(tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())? - .next() - { - db.delete(tx, ST_VAR_ID, [row_ref.pointer()]); - } - tx.insert_via_serialize_bsatn(ST_VAR_ID, &StVarRow { name, value })?; - Ok(()) - } - - /// Parse the literal representation of a system variable - fn parse_var(name: StVarName, literal: &str) -> Result { - StVarValue::try_from_primitive(parse::parse(literal, &name.type_of())?).map_err(|v| { - ErrorVm::Type(ErrorType::Parse { - value: literal.to_string(), - ty: fmt_algebraic_type(&name.type_of()).to_string(), - err: format!("error parsing value: {:?}", v), - }) - .into() - }) - } -} - /// System table [ST_VAR_NAME] /// /// | name | value | @@ -1210,19 +1128,6 @@ fn to_product_value(value: &T) -> ProductValue { #[cfg(test)] mod tests { use super::*; - use crate::db::relational_db::tests_utils::{with_auto_commit, with_read_only, TestDB}; - - #[test] - fn test_system_variables() { - let db = TestDB::durable().expect("failed to create db"); - let _ = with_auto_commit(&db, |tx| StVarTable::write_var(&db, tx, StVarName::RowLimit, "5")); - assert_eq!( - 5, - with_read_only(&db, |tx| StVarTable::row_limit(&db, tx)) - .expect("failed to read from st_var") - .expect("row_limit does not exist") - ); - } #[test] fn test_sequences_within_reserved_range() { diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 4b874801d94..befa9c8303d 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -3,7 +3,7 @@ use super::datastore::locking_tx_datastore::datastore::TxMetrics; use super::datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView, }; -use super::datastore::system_tables::ST_MODULE_ID; +use super::datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID}; use super::datastore::traits::{ InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore, UpdateFlags, @@ -32,10 +32,12 @@ use spacetimedb_commitlog as commitlog; use spacetimedb_durability::{self as durability, TxOffset}; use spacetimedb_lib::db::auth::StAccess; use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql}; +use spacetimedb_lib::st_var::StVarValue; use spacetimedb_lib::ConnectionId; use spacetimedb_lib::Identity; use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath}; use spacetimedb_primitives::*; +use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type; use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue}; use spacetimedb_schema::def::{ModuleDef, TableDef}; use spacetimedb_schema::schema::{IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema}; @@ -44,6 +46,8 @@ use spacetimedb_table::indexes::RowPointer; use spacetimedb_table::page_pool::PagePool; use spacetimedb_table::table::RowRef; use spacetimedb_table::MemoryUsage; +use spacetimedb_vm::errors::{ErrorType, ErrorVm}; +use spacetimedb_vm::ops::parse; use std::borrow::Cow; use std::collections::HashSet; use std::fmt; @@ -1388,6 +1392,78 @@ impl RelationalDB { pub fn report(&self, reducer: &str, metrics: &TxMetrics, tx_data: Option<&TxData>) { metrics.report(tx_data, reducer, |wl: WorkloadType| self.exec_counters_for(wl)); } + + /// Read the value of [ST_VARNAME_ROW_LIMIT] from `st_var` + pub(crate) fn row_limit(&self, tx: &Tx) -> Result, DBError> { + let data = self.read_var(tx, StVarName::RowLimit); + + if let Some(StVarValue::U64(limit)) = data? { + return Ok(Some(limit)); + } + Ok(None) + } + + /// Read the value of [ST_VARNAME_SLOW_QRY] from `st_var` + pub(crate) fn query_limit(&self, tx: &Tx) -> Result, DBError> { + if let Some(StVarValue::U64(ms)) = self.read_var(tx, StVarName::SlowQryThreshold)? { + return Ok(Some(ms)); + } + Ok(None) + } + + /// Read the value of [ST_VARNAME_SLOW_SUB] from `st_var` + #[allow(dead_code)] + pub(crate) fn sub_limit(&self, tx: &Tx) -> Result, DBError> { + if let Some(StVarValue::U64(ms)) = self.read_var(tx, StVarName::SlowSubThreshold)? { + return Ok(Some(ms)); + } + Ok(None) + } + + /// Read the value of [ST_VARNAME_SLOW_INC] from `st_var` + #[allow(dead_code)] + pub(crate) fn incr_limit(&self, tx: &Tx) -> Result, DBError> { + if let Some(StVarValue::U64(ms)) = self.read_var(tx, StVarName::SlowIncThreshold)? { + return Ok(Some(ms)); + } + Ok(None) + } + + /// Read the value of a system variable from `st_var` + pub(crate) fn read_var(&self, tx: &Tx, name: StVarName) -> Result, DBError> { + if let Some(row_ref) = self + .iter_by_col_eq(tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())? + .next() + { + return Ok(Some(StVarRow::try_from(row_ref)?.value)); + } + Ok(None) + } + + /// Update the value of a system variable in `st_var` + pub(crate) fn write_var(&self, tx: &mut MutTx, name: StVarName, literal: &str) -> Result<(), DBError> { + let value = Self::parse_var(name, literal)?; + if let Some(row_ref) = self + .iter_by_col_eq_mut(tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())? + .next() + { + self.delete(tx, ST_VAR_ID, [row_ref.pointer()]); + } + tx.insert_via_serialize_bsatn(ST_VAR_ID, &StVarRow { name, value })?; + Ok(()) + } + + /// Parse the literal representation of a system variable + fn parse_var(name: StVarName, literal: &str) -> Result { + StVarValue::try_from_primitive(parse::parse(literal, &name.type_of())?).map_err(|v| { + ErrorVm::Type(ErrorType::Parse { + value: literal.to_string(), + ty: fmt_algebraic_type(&name.type_of()).to_string(), + err: format!("error parsing value: {:?}", v), + }) + .into() + }) + } } #[allow(unused)] @@ -1948,7 +2024,9 @@ mod tests { system_tables, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_SEQUENCE_ID, ST_TABLE_ID, }; - use crate::db::relational_db::tests_utils::{begin_tx, insert, make_snapshot, TestDB}; + use crate::db::relational_db::tests_utils::{ + begin_tx, insert, make_snapshot, with_auto_commit, with_read_only, TestDB, + }; use crate::error::IndexError; use crate::execution_context::ReducerContext; use anyhow::bail; @@ -2032,6 +2110,18 @@ mod tests { Ok(()) } + #[test] + fn test_system_variables() { + let db = TestDB::durable().expect("failed to create db"); + let _ = with_auto_commit(&db, |tx| db.write_var(tx, StVarName::RowLimit, "5")); + assert_eq!( + 5, + with_read_only(&db, |tx| db.row_limit(tx)) + .expect("failed to read from st_var") + .expect("row_limit does not exist") + ); + } + #[test] fn test_open_twice() -> ResultTest<()> { let stdb = TestDB::durable()?; diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 26d72b86cb8..4e1b8ed02e7 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -2,7 +2,6 @@ use std::time::Duration; use super::ast::SchemaViewer; use crate::db::datastore::locking_tx_datastore::state_view::StateView; -use crate::db::datastore::system_tables::StVarTable; use crate::db::datastore::traits::IsolationLevel; use crate::db::relational_db::{RelationalDB, Tx}; use crate::energy::EnergyQuanta; @@ -72,7 +71,7 @@ fn execute( updates: &mut Vec, ) -> Result, DBError> { let slow_query_threshold = if let TxMode::Tx(tx) = p.tx { - StVarTable::query_limit(p.db, tx)?.map(Duration::from_millis) + p.db.query_limit(tx)?.map(Duration::from_millis) } else { None }; diff --git a/crates/core/src/vm.rs b/crates/core/src/vm.rs index 4c03bf4d513..07b67b20863 100644 --- a/crates/core/src/vm.rs +++ b/crates/core/src/vm.rs @@ -3,7 +3,7 @@ use crate::db::datastore::locking_tx_datastore::state_view::IterByColRangeMutTx; use crate::db::datastore::locking_tx_datastore::tx::TxId; use crate::db::datastore::locking_tx_datastore::IterByColRangeTx; -use crate::db::datastore::system_tables::{st_var_schema, StVarName, StVarRow, StVarTable}; +use crate::db::datastore::system_tables::{st_var_schema, StVarName, StVarRow}; use crate::db::relational_db::{MutTx, RelationalDB, Tx}; use crate::error::DBError; use crate::estimation; @@ -467,7 +467,7 @@ pub fn check_row_limit( auth: &AuthCtx, ) -> Result<(), DBError> { if auth.caller != auth.owner { - if let Some(limit) = StVarTable::row_limit(db, tx)? { + if let Some(limit) = db.row_limit(tx)? { let mut estimate: u64 = 0; for query in queries { estimate = estimate.saturating_add(row_est(query, tx)); @@ -603,7 +603,7 @@ impl<'db, 'tx> DbProgram<'db, 'tx> { fn _set_var(&mut self, name: String, literal: String) -> Result { let tx = self.tx.unwrap_mut(); - StVarTable::write_var(self.db, tx, StVarName::from_str(&name)?, &literal)?; + self.db.write_var(tx, StVarName::from_str(&name)?, &literal)?; Ok(Code::Pass(None)) } @@ -611,7 +611,7 @@ impl<'db, 'tx> DbProgram<'db, 'tx> { fn read_key_into_table(env: &DbProgram, name: &str) -> Result { if let TxMode::Tx(tx) = &env.tx { let name = StVarName::from_str(name)?; - if let Some(value) = StVarTable::read_var(env.db, tx, name)? { + if let Some(value) = env.db.read_var(tx, name)? { return Ok(MemTable::from_iter( Arc::new(st_var_schema().into()), [ProductValue::from(StVarRow { name, value })],