Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 10 additions & 46 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,11 @@ use super::{
tx::TxId,
tx_state::TxState,
};
use crate::{
db::datastore::{
locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx},
traits::{InsertFlags, UpdateFlags},
},
subscription::ExecutionCounters,
};
use crate::{
db::relational_db::RelationalDB,
execution_context::{Workload, WorkloadType},
use crate::db::datastore::{
locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx},
traits::{InsertFlags, UpdateFlags},
};
use crate::execution_context::{Workload, WorkloadType};
use crate::{
db::{
datastore::{
Expand All @@ -36,7 +30,6 @@ use crate::{
};
use anyhow::{anyhow, Context};
use core::{cell::RefCell, ops::RangeBounds};
use enum_map::EnumMap;
use parking_lot::{Mutex, RwLock};
use spacetimedb_commitlog::payload::{txdata, Txdata};
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap};
Expand Down Expand Up @@ -74,9 +67,6 @@ pub struct Locking {
sequence_state: Arc<Mutex<SequencesState>>,
/// The identity of this database.
pub(crate) database_identity: Identity,

/// A map from workload types to their cached prometheus counters.
workload_type_to_exec_counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
}

impl MemoryUsage for Locking {
Expand All @@ -85,7 +75,6 @@ impl MemoryUsage for Locking {
committed_state,
sequence_state,
database_identity,
workload_type_to_exec_counters: _,
} = self;
std::mem::size_of_val(&**committed_state)
+ committed_state.read().heap_usage()
Expand All @@ -97,14 +86,10 @@ impl MemoryUsage for Locking {

impl Locking {
pub fn new(database_identity: Identity, page_pool: PagePool) -> Self {
let workload_type_to_exec_counters =
Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity)));

Self {
committed_state: Arc::new(RwLock::new(CommittedState::new(page_pool))),
sequence_state: <_>::default(),
database_identity,
workload_type_to_exec_counters,
}
}

Expand Down Expand Up @@ -321,10 +306,6 @@ impl Locking {

tx.alter_table_access(table_id, access)
}

pub(crate) fn exec_counters_for(&self, workload_type: WorkloadType) -> &ExecutionCounters {
&self.workload_type_to_exec_counters[workload_type]
}
}

impl DataRow for Locking {
Expand Down Expand Up @@ -702,6 +683,10 @@ struct TableStats {
num_indices: usize,
}

pub trait MetricsRecorder {
fn record(&self, metrics: &ExecutionMetrics);
}

impl TxMetrics {
/// Compute transaction metrics that we can report once the tx lock is released.
pub(super) fn new(
Expand Down Expand Up @@ -751,11 +736,11 @@ impl TxMetrics {
}

/// Reports the metrics for `reducer` using `get_exec_counter` to retrieve the metrics counters.
pub fn report<'a>(
pub fn report<'a, R: MetricsRecorder + 'a>(
&self,
tx_data: Option<&TxData>,
reducer: &str,
get_exec_counter: impl FnOnce(WorkloadType) -> &'a ExecutionCounters,
get_exec_counter: impl FnOnce(WorkloadType) -> &'a R,
) {
let workload = &self.workload;
let db = &self.database_identity;
Expand Down Expand Up @@ -844,27 +829,6 @@ impl TxMetrics {
}
}
}

/// Reports the metrics for `reducer`, using counters provided by `db`.
pub(crate) fn report_with_db(&self, reducer: &str, db: &RelationalDB, tx_data: Option<&TxData>) {
self.report(tx_data, reducer, |wl| db.exec_counters_for(wl));
}
}

/// Reports the `TxMetrics`s passed.
///
/// Should only be called after the tx lock has been fully released.
pub fn report_tx_metricses(
reducer: &str,
db: &RelationalDB,
tx_data: Option<&TxData>,
metrics_mut: Option<&TxMetrics>,
metrics_read: &TxMetrics,
) {
if let Some(metrics_mut) = metrics_mut {
metrics_mut.report_with_db(reducer, db, tx_data);
}
metrics_read.report_with_db(reducer, db, None);
}

impl MutTx for Locking {
Expand Down
95 changes: 0 additions & 95 deletions crates/core/src/db/datastore/system_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
//! - Use [`st_fields_enum`] to define its column enum.
//! - Register its schema in [`system_module_def`], making sure to call `validate_system_table` at the end of the function.

use crate::db::relational_db::RelationalDB;
use crate::error::DBError;
use spacetimedb_lib::db::auth::{StAccess, StTableType};
use spacetimedb_lib::db::raw_def::v9::{btree, RawSql};
Expand All @@ -21,7 +20,6 @@ use spacetimedb_lib::ser::Serialize;
use spacetimedb_lib::st_var::StVarValue;
use spacetimedb_lib::{ConnectionId, Identity, ProductValue, SpacetimeType};
use spacetimedb_primitives::*;
use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type;
use spacetimedb_sats::algebraic_value::ser::value_serialize;
use spacetimedb_sats::hash::Hash;
use spacetimedb_sats::product_value::InvalidFieldError;
Expand All @@ -34,16 +32,11 @@ use spacetimedb_schema::schema::{
TableSchema,
};
use spacetimedb_table::table::RowRef;
use spacetimedb_vm::errors::{ErrorType, ErrorVm};
use spacetimedb_vm::ops::parse;
use std::cell::RefCell;
use std::str::FromStr;
use strum::Display;
use v9::{RawModuleDefV9Builder, TableType};

use super::locking_tx_datastore::tx::TxId;
use super::locking_tx_datastore::MutTxId;

/// The static ID of the table that defines tables
pub(crate) const ST_TABLE_ID: TableId = TableId(1);
/// The static ID of the table that defines columns
Expand Down Expand Up @@ -950,81 +943,6 @@ impl TryFrom<RowRef<'_>> for StClientRow {
}
}

/// A handle for reading system variables from `st_var`
pub struct StVarTable;

impl StVarTable {
/// Read the value of [ST_VARNAME_ROW_LIMIT] from `st_var`
pub fn row_limit(db: &RelationalDB, tx: &TxId) -> Result<Option<u64>, DBError> {
let data = Self::read_var(db, tx, StVarName::RowLimit);

if let Some(StVarValue::U64(limit)) = data? {
return Ok(Some(limit));
}
Ok(None)
}

/// Read the value of [ST_VARNAME_SLOW_QRY] from `st_var`
pub fn query_limit(db: &RelationalDB, tx: &TxId) -> Result<Option<u64>, DBError> {
if let Some(StVarValue::U64(ms)) = Self::read_var(db, tx, StVarName::SlowQryThreshold)? {
return Ok(Some(ms));
}
Ok(None)
}

/// Read the value of [ST_VARNAME_SLOW_SUB] from `st_var`
pub fn sub_limit(db: &RelationalDB, tx: &TxId) -> Result<Option<u64>, DBError> {
if let Some(StVarValue::U64(ms)) = Self::read_var(db, tx, StVarName::SlowSubThreshold)? {
return Ok(Some(ms));
}
Ok(None)
}

/// Read the value of [ST_VARNAME_SLOW_INC] from `st_var`
pub fn incr_limit(db: &RelationalDB, tx: &TxId) -> Result<Option<u64>, DBError> {
if let Some(StVarValue::U64(ms)) = Self::read_var(db, tx, StVarName::SlowIncThreshold)? {
return Ok(Some(ms));
}
Ok(None)
}

/// Read the value of a system variable from `st_var`
pub fn read_var(db: &RelationalDB, tx: &TxId, name: StVarName) -> Result<Option<StVarValue>, DBError> {
if let Some(row_ref) = db
.iter_by_col_eq(tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())?
.next()
{
return Ok(Some(StVarRow::try_from(row_ref)?.value));
}
Ok(None)
}

/// Update the value of a system variable in `st_var`
pub fn write_var(db: &RelationalDB, tx: &mut MutTxId, name: StVarName, literal: &str) -> Result<(), DBError> {
let value = Self::parse_var(name, literal)?;
if let Some(row_ref) = db
.iter_by_col_eq_mut(tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())?
.next()
{
db.delete(tx, ST_VAR_ID, [row_ref.pointer()]);
}
tx.insert_via_serialize_bsatn(ST_VAR_ID, &StVarRow { name, value })?;
Ok(())
}

/// Parse the literal representation of a system variable
fn parse_var(name: StVarName, literal: &str) -> Result<StVarValue, DBError> {
StVarValue::try_from_primitive(parse::parse(literal, &name.type_of())?).map_err(|v| {
ErrorVm::Type(ErrorType::Parse {
value: literal.to_string(),
ty: fmt_algebraic_type(&name.type_of()).to_string(),
err: format!("error parsing value: {:?}", v),
})
.into()
})
}
}

/// System table [ST_VAR_NAME]
///
/// | name | value |
Expand Down Expand Up @@ -1210,19 +1128,6 @@ fn to_product_value<T: Serialize>(value: &T) -> ProductValue {
#[cfg(test)]
mod tests {
use super::*;
use crate::db::relational_db::tests_utils::{with_auto_commit, with_read_only, TestDB};

#[test]
fn test_system_variables() {
let db = TestDB::durable().expect("failed to create db");
let _ = with_auto_commit(&db, |tx| StVarTable::write_var(&db, tx, StVarName::RowLimit, "5"));
assert_eq!(
5,
with_read_only(&db, |tx| StVarTable::row_limit(&db, tx))
.expect("failed to read from st_var")
.expect("row_limit does not exist")
);
}

#[test]
fn test_sequences_within_reserved_range() {
Expand Down
Loading
Loading