diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs index 747cb054ef0..b29afce31a5 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs @@ -6,17 +6,11 @@ use super::{ tx::TxId, tx_state::TxState, }; -use crate::{ - db::datastore::{ - locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx}, - traits::{InsertFlags, UpdateFlags}, - }, - subscription::ExecutionCounters, -}; -use crate::{ - db::relational_db::RelationalDB, - execution_context::{Workload, WorkloadType}, +use crate::db::datastore::{ + locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx}, + traits::{InsertFlags, UpdateFlags}, }; +use crate::execution_context::{Workload, WorkloadType}; use crate::{ db::{ datastore::{ @@ -36,7 +30,6 @@ use crate::{ }; use anyhow::{anyhow, Context}; use core::{cell::RefCell, ops::RangeBounds}; -use enum_map::EnumMap; use parking_lot::{Mutex, RwLock}; use spacetimedb_commitlog::payload::{txdata, Txdata}; use spacetimedb_data_structures::map::{HashCollectionExt, HashMap}; @@ -74,9 +67,6 @@ pub struct Locking { sequence_state: Arc>, /// The identity of this database. pub(crate) database_identity: Identity, - - /// A map from workload types to their cached prometheus counters. - workload_type_to_exec_counters: Arc>, } impl MemoryUsage for Locking { @@ -85,7 +75,6 @@ impl MemoryUsage for Locking { committed_state, sequence_state, database_identity, - workload_type_to_exec_counters: _, } = self; std::mem::size_of_val(&**committed_state) + committed_state.read().heap_usage() @@ -97,14 +86,10 @@ impl MemoryUsage for Locking { impl Locking { pub fn new(database_identity: Identity, page_pool: PagePool) -> Self { - let workload_type_to_exec_counters = - Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity))); - Self { committed_state: Arc::new(RwLock::new(CommittedState::new(page_pool))), sequence_state: <_>::default(), database_identity, - workload_type_to_exec_counters, } } @@ -321,10 +306,6 @@ impl Locking { tx.alter_table_access(table_id, access) } - - pub(crate) fn exec_counters_for(&self, workload_type: WorkloadType) -> &ExecutionCounters { - &self.workload_type_to_exec_counters[workload_type] - } } impl DataRow for Locking { @@ -702,6 +683,10 @@ struct TableStats { num_indices: usize, } +pub trait MetricsRecorder { + fn record(&self, metrics: &ExecutionMetrics); +} + impl TxMetrics { /// Compute transaction metrics that we can report once the tx lock is released. pub(super) fn new( @@ -751,11 +736,11 @@ impl TxMetrics { } /// Reports the metrics for `reducer` using `get_exec_counter` to retrieve the metrics counters. - pub fn report<'a>( + pub fn report<'a, R: MetricsRecorder + 'a>( &self, tx_data: Option<&TxData>, reducer: &str, - get_exec_counter: impl FnOnce(WorkloadType) -> &'a ExecutionCounters, + get_exec_counter: impl FnOnce(WorkloadType) -> &'a R, ) { let workload = &self.workload; let db = &self.database_identity; @@ -844,27 +829,6 @@ impl TxMetrics { } } } - - /// Reports the metrics for `reducer`, using counters provided by `db`. - pub(crate) fn report_with_db(&self, reducer: &str, db: &RelationalDB, tx_data: Option<&TxData>) { - self.report(tx_data, reducer, |wl| db.exec_counters_for(wl)); - } -} - -/// Reports the `TxMetrics`s passed. -/// -/// Should only be called after the tx lock has been fully released. -pub fn report_tx_metricses( - reducer: &str, - db: &RelationalDB, - tx_data: Option<&TxData>, - metrics_mut: Option<&TxMetrics>, - metrics_read: &TxMetrics, -) { - if let Some(metrics_mut) = metrics_mut { - metrics_mut.report_with_db(reducer, db, tx_data); - } - metrics_read.report_with_db(reducer, db, None); } impl MutTx for Locking { diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 9b001bb50a7..4b874801d94 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1,5 +1,5 @@ use super::datastore::locking_tx_datastore::committed_state::CommittedState; -use super::datastore::locking_tx_datastore::datastore::{report_tx_metricses, TxMetrics}; +use super::datastore::locking_tx_datastore::datastore::TxMetrics; use super::datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView, }; @@ -23,6 +23,7 @@ use crate::messages::control_db::HostType; use crate::subscription::ExecutionCounters; use crate::util::{asyncify, spawn_rayon}; use anyhow::{anyhow, Context}; +use enum_map::EnumMap; use fs2::FileExt; use futures::channel::mpsc; use futures::StreamExt; @@ -107,6 +108,9 @@ pub struct RelationalDB { // We want to release the file lock last. // TODO(noa): is this lockfile still necessary now that we have data-dir? _lock: LockFile, + + /// A map from workload types to their cached prometheus counters. + workload_type_to_exec_counters: Arc>, } #[derive(Clone)] @@ -224,6 +228,8 @@ impl RelationalDB { let (durability, disk_size_fn) = durability.unzip(); let snapshot_worker = snapshot_repo.map(|repo| SnapshotWorker::new(inner.committed_state.clone(), repo.clone())); + let workload_type_to_exec_counters = + Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity))); Self { inner, @@ -237,6 +243,7 @@ impl RelationalDB { disk_size_fn, _lock: lock, + workload_type_to_exec_counters, } } @@ -735,7 +742,7 @@ impl RelationalDB { /// Returns the execution counters for `workload_type` for this database. pub fn exec_counters_for(&self, workload_type: WorkloadType) -> &ExecutionCounters { - self.inner.exec_counters_for(workload_type) + &self.workload_type_to_exec_counters[workload_type] } /// Begin a transaction. @@ -972,7 +979,7 @@ impl RelationalDB { let mut tx = self.begin_tx(workload); let res = f(&mut tx); let (tx_metics, reducer) = self.release_tx(tx); - report_tx_metricses(&reducer, self, None, None, &tx_metics); + self.report_tx_metricses(&reducer, None, None, &tx_metics); res } @@ -983,11 +990,11 @@ impl RelationalDB { { if res.is_err() { let (tx_metrics, reducer) = self.rollback_mut_tx(tx); - tx_metrics.report_with_db(&reducer, self, None); + self.report(&reducer, &tx_metrics, None); } else { match self.commit_tx(tx).map_err(E::from)? { Some((tx_data, tx_metrics, reducer)) => { - tx_metrics.report_with_db(&reducer, self, Some(&tx_data)); + self.report(&reducer, &tx_metrics, Some(&tx_data)); } None => panic!("TODO: retry?"), } @@ -1002,7 +1009,7 @@ impl RelationalDB { match res { Err(e) => { let (tx_metrics, reducer) = self.rollback_mut_tx(tx); - tx_metrics.report_with_db(&reducer, self, None); + self.report(&reducer, &tx_metrics, None); Err(e) } @@ -1013,6 +1020,22 @@ impl RelationalDB { pub(crate) fn alter_table_access(&self, tx: &mut MutTx, name: Box, access: StAccess) -> Result<(), DBError> { self.inner.alter_table_access_mut_tx(tx, name, access) } + + /// Reports the `TxMetrics`s passed. + /// + /// Should only be called after the tx lock has been fully released. + pub(crate) fn report_tx_metricses( + &self, + reducer: &str, + tx_data: Option<&TxData>, + metrics_mut: Option<&TxMetrics>, + metrics_read: &TxMetrics, + ) { + if let Some(metrics_mut) = metrics_mut { + self.report(reducer, metrics_mut, tx_data); + } + self.report(reducer, metrics_read, None); + } } impl RelationalDB { @@ -1360,6 +1383,11 @@ impl RelationalDB { pub fn drop_constraint(&self, tx: &mut MutTx, constraint_id: ConstraintId) -> Result<(), DBError> { self.inner.drop_constraint_mut_tx(tx, constraint_id) } + + /// Reports the metrics for `reducer`, using counters provided by `db`. + pub fn report(&self, reducer: &str, metrics: &TxMetrics, tx_data: Option<&TxData>) { + metrics.report(tx_data, reducer, |wl: WorkloadType| self.exec_counters_for(wl)); + } } #[allow(unused)] diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index a6c653fd315..17ae5adeac5 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -364,7 +364,7 @@ fn init_database( let rcr = match module_def.lifecycle_reducer(Lifecycle::Init) { None => { if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { - tx_metrics.report_with_db(&reducer, stdb, Some(&tx_data)); + stdb.report(&reducer, &tx_metrics, Some(&tx_data)); } None } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 4a1249e9ad0..e53e270e0b6 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -290,12 +290,12 @@ impl ModuleInstance for WasmModuleInstance { log::warn!("Database update failed: {} @ {}", e, stdb.database_identity()); self.system_logger().warn(&format!("Database update failed: {e}")); let (tx_metrics, reducer) = stdb.rollback_mut_tx(tx); - tx_metrics.report_with_db(&reducer, stdb, None); + stdb.report(&reducer, &tx_metrics, None); Ok(UpdateDatabaseResult::ErrorExecutingMigration(e)) } Ok(()) => { if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { - tx_metrics.report_with_db(&reducer, stdb, Some(&tx_data)); + stdb.report(&reducer, &tx_metrics, Some(&tx_data)); } self.system_logger().info("Database updated"); log::info!("Database updated, {}", stdb.database_identity()); diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index b570c9d4178..26d72b86cb8 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -1,7 +1,6 @@ use std::time::Duration; use super::ast::SchemaViewer; -use crate::db::datastore::locking_tx_datastore::datastore::report_tx_metricses; use crate::db::datastore::locking_tx_datastore::state_view::StateView; use crate::db::datastore::system_tables::StVarTable; use crate::db::datastore::traits::IsolationLevel; @@ -204,13 +203,7 @@ pub fn run( // Release the tx on drop, so that we record metrics. let mut tx = scopeguard::guard(tx, |tx| { let (tx_metrics_downgrade, reducer) = db.release_tx(tx); - report_tx_metricses( - &reducer, - db, - Some(&tx_data), - Some(&tx_metrics_mut), - &tx_metrics_downgrade, - ); + db.report_tx_metricses(&reducer, Some(&tx_data), Some(&tx_metrics_mut), &tx_metrics_downgrade); }); // Compute the header for the result set @@ -255,7 +248,7 @@ pub fn run( let metrics = tx.metrics; return db.commit_tx(tx).map(|tx_opt| { if let Some((tx_data, tx_metrics, reducer)) = tx_opt { - tx_metrics.report_with_db(&reducer, db, Some(&tx_data)); + db.report(&reducer, &tx_metrics, Some(&tx_data)); } SqlResult { rows: vec![], metrics } }); diff --git a/crates/core/src/subscription/mod.rs b/crates/core/src/subscription/mod.rs index 9b185a855ae..2b5078dbff6 100644 --- a/crates/core/src/subscription/mod.rs +++ b/crates/core/src/subscription/mod.rs @@ -12,7 +12,10 @@ use spacetimedb_lib::{metrics::ExecutionMetrics, Identity}; use spacetimedb_primitives::TableId; use crate::{ - db::db_metrics::DB_METRICS, error::DBError, execution_context::WorkloadType, worker_metrics::WORKER_METRICS, + db::{datastore::locking_tx_datastore::datastore::MetricsRecorder, db_metrics::DB_METRICS}, + error::DBError, + execution_context::WorkloadType, + worker_metrics::WORKER_METRICS, }; pub mod delta; @@ -84,6 +87,12 @@ impl ExecutionCounters { } } +impl MetricsRecorder for ExecutionCounters { + fn record(&self, metrics: &ExecutionMetrics) { + self.record(metrics); + } +} + /// Execute a subscription query pub fn execute_plan(plan_fragments: &[PipelinedProject], tx: &Tx) -> Result<(F::List, u64, ExecutionMetrics)> where diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 2924c7199f5..a84b4718702 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -8,7 +8,6 @@ use crate::client::messages::{ SubscriptionUpdateMessage, TransactionUpdateMessage, }; use crate::client::{ClientActorId, ClientConnectionSender, Protocol}; -use crate::db::datastore::locking_tx_datastore::datastore::report_tx_metricses; use crate::db::datastore::locking_tx_datastore::tx::TxId; use crate::db::db_metrics::DB_METRICS; use crate::db::relational_db::{MutTx, RelationalDB, Tx}; @@ -264,7 +263,7 @@ impl ModuleSubscriptions { let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Subscribe), |tx| { let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - tx_metrics.report_with_db(&reducer, &self.relational_db, None); + self.relational_db.report(&reducer, &tx_metrics, None); }); let existing_query = { @@ -357,7 +356,7 @@ impl ModuleSubscriptions { let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Unsubscribe), |tx| { let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - tx_metrics.report_with_db(&reducer, &self.relational_db, None); + self.relational_db.report(&reducer, &tx_metrics, None); }); let auth = AuthCtx::new(self.owner_identity, sender.id.identity); let (table_rows, metrics) = return_on_err_with_sql!( @@ -403,7 +402,7 @@ impl ModuleSubscriptions { // Always lock the db before the subscription lock to avoid deadlocks. let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Unsubscribe), |tx| { let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - tx_metrics.report_with_db(&reducer, &self.relational_db, None); + self.relational_db.report(&reducer, &tx_metrics, None); }); let removed_queries = { @@ -475,7 +474,7 @@ impl ModuleSubscriptions { // We always get the db lock before the subscription lock to avoid deadlocks. let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Subscribe), |tx| { let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - tx_metrics.report_with_db(&reducer, &self.relational_db, None); + self.relational_db.report(&reducer, &tx_metrics, None); }); let guard = self.subscriptions.read(); @@ -536,7 +535,7 @@ impl ModuleSubscriptions { ); let tx = scopeguard::guard(tx, |tx| { let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - tx_metrics.report_with_db(&reducer, &self.relational_db, None); + self.relational_db.report(&reducer, &tx_metrics, None); }); // We minimize locking so that other clients can add subscriptions concurrently. @@ -592,7 +591,7 @@ impl ModuleSubscriptions { let (queries, auth, tx) = self.compile_queries(sender.id.identity, subscription.query_strings, num_queries)?; let tx = scopeguard::guard(tx, |tx| { let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - tx_metrics.report_with_db(&reducer, &self.relational_db, None); + self.relational_db.report(&reducer, &tx_metrics, None); }); check_row_limit( @@ -677,13 +676,7 @@ impl ModuleSubscriptions { // When we're done with this method, release the tx and report metrics. let mut read_tx = scopeguard::guard(read_tx, |tx| { let (tx_metrics_read, reducer) = self.relational_db.release_tx(tx); - report_tx_metricses( - &reducer, - stdb, - tx_data.as_ref(), - Some(&tx_metrics_mut), - &tx_metrics_read, - ); + stdb.report_tx_metricses(&reducer, tx_data.as_ref(), Some(&tx_metrics_mut), &tx_metrics_read); }); // Create the delta transaction we'll use to eval updates against. let delta_read_tx = tx_data