Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 10 additions & 46 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,11 @@ use super::{
tx::TxId,
tx_state::TxState,
};
use crate::{
db::datastore::{
locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx},
traits::{InsertFlags, UpdateFlags},
},
subscription::ExecutionCounters,
};
use crate::{
db::relational_db::RelationalDB,
execution_context::{Workload, WorkloadType},
use crate::db::datastore::{
locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx},
traits::{InsertFlags, UpdateFlags},
};
use crate::execution_context::{Workload, WorkloadType};
use crate::{
db::{
datastore::{
Expand All @@ -36,7 +30,6 @@ use crate::{
};
use anyhow::{anyhow, Context};
use core::{cell::RefCell, ops::RangeBounds};
use enum_map::EnumMap;
use parking_lot::{Mutex, RwLock};
use spacetimedb_commitlog::payload::{txdata, Txdata};
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap};
Expand Down Expand Up @@ -74,9 +67,6 @@ pub struct Locking {
sequence_state: Arc<Mutex<SequencesState>>,
/// The identity of this database.
pub(crate) database_identity: Identity,

/// A map from workload types to their cached prometheus counters.
workload_type_to_exec_counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
}

impl MemoryUsage for Locking {
Expand All @@ -85,7 +75,6 @@ impl MemoryUsage for Locking {
committed_state,
sequence_state,
database_identity,
workload_type_to_exec_counters: _,
} = self;
std::mem::size_of_val(&**committed_state)
+ committed_state.read().heap_usage()
Expand All @@ -97,14 +86,10 @@ impl MemoryUsage for Locking {

impl Locking {
pub fn new(database_identity: Identity, page_pool: PagePool) -> Self {
let workload_type_to_exec_counters =
Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity)));

Self {
committed_state: Arc::new(RwLock::new(CommittedState::new(page_pool))),
sequence_state: <_>::default(),
database_identity,
workload_type_to_exec_counters,
}
}

Expand Down Expand Up @@ -321,10 +306,6 @@ impl Locking {

tx.alter_table_access(table_id, access)
}

pub(crate) fn exec_counters_for(&self, workload_type: WorkloadType) -> &ExecutionCounters {
&self.workload_type_to_exec_counters[workload_type]
}
}

impl DataRow for Locking {
Expand Down Expand Up @@ -702,6 +683,10 @@ struct TableStats {
num_indices: usize,
}

pub trait MetricsRecorder {
Comment thread
cloutiertyler marked this conversation as resolved.
fn record(&self, metrics: &ExecutionMetrics);
}

impl TxMetrics {
/// Compute transaction metrics that we can report once the tx lock is released.
pub(super) fn new(
Expand Down Expand Up @@ -751,11 +736,11 @@ impl TxMetrics {
}

/// Reports the metrics for `reducer` using `get_exec_counter` to retrieve the metrics counters.
pub fn report<'a>(
pub fn report<'a, R: MetricsRecorder + 'a>(
&self,
tx_data: Option<&TxData>,
reducer: &str,
get_exec_counter: impl FnOnce(WorkloadType) -> &'a ExecutionCounters,
get_exec_counter: impl FnOnce(WorkloadType) -> &'a R,
) {
let workload = &self.workload;
let db = &self.database_identity;
Expand Down Expand Up @@ -844,27 +829,6 @@ impl TxMetrics {
}
}
}

/// Reports the metrics for `reducer`, using counters provided by `db`.
pub(crate) fn report_with_db(&self, reducer: &str, db: &RelationalDB, tx_data: Option<&TxData>) {
self.report(tx_data, reducer, |wl| db.exec_counters_for(wl));
}
}

/// Reports the `TxMetrics`s passed.
///
/// Should only be called after the tx lock has been fully released.
pub fn report_tx_metricses(
reducer: &str,
db: &RelationalDB,
tx_data: Option<&TxData>,
metrics_mut: Option<&TxMetrics>,
metrics_read: &TxMetrics,
) {
if let Some(metrics_mut) = metrics_mut {
metrics_mut.report_with_db(reducer, db, tx_data);
}
metrics_read.report_with_db(reducer, db, None);
}

impl MutTx for Locking {
Expand Down
40 changes: 34 additions & 6 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::datastore::locking_tx_datastore::committed_state::CommittedState;
use super::datastore::locking_tx_datastore::datastore::{report_tx_metricses, TxMetrics};
use super::datastore::locking_tx_datastore::datastore::TxMetrics;
use super::datastore::locking_tx_datastore::state_view::{
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView,
};
Expand All @@ -23,6 +23,7 @@ use crate::messages::control_db::HostType;
use crate::subscription::ExecutionCounters;
use crate::util::{asyncify, spawn_rayon};
use anyhow::{anyhow, Context};
use enum_map::EnumMap;
use fs2::FileExt;
use futures::channel::mpsc;
use futures::StreamExt;
Expand Down Expand Up @@ -107,6 +108,9 @@ pub struct RelationalDB {
// We want to release the file lock last.
// TODO(noa): is this lockfile still necessary now that we have data-dir?
_lock: LockFile,

/// A map from workload types to their cached prometheus counters.
workload_type_to_exec_counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -224,6 +228,8 @@ impl RelationalDB {
let (durability, disk_size_fn) = durability.unzip();
let snapshot_worker =
snapshot_repo.map(|repo| SnapshotWorker::new(inner.committed_state.clone(), repo.clone()));
let workload_type_to_exec_counters =
Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity)));

Self {
inner,
Expand All @@ -237,6 +243,7 @@ impl RelationalDB {
disk_size_fn,

_lock: lock,
workload_type_to_exec_counters,
}
}

Expand Down Expand Up @@ -735,7 +742,7 @@ impl RelationalDB {

/// Returns the execution counters for `workload_type` for this database.
pub fn exec_counters_for(&self, workload_type: WorkloadType) -> &ExecutionCounters {
self.inner.exec_counters_for(workload_type)
&self.workload_type_to_exec_counters[workload_type]
}

/// Begin a transaction.
Expand Down Expand Up @@ -972,7 +979,7 @@ impl RelationalDB {
let mut tx = self.begin_tx(workload);
let res = f(&mut tx);
let (tx_metics, reducer) = self.release_tx(tx);
report_tx_metricses(&reducer, self, None, None, &tx_metics);
self.report_tx_metricses(&reducer, None, None, &tx_metics);
res
}

Expand All @@ -983,11 +990,11 @@ impl RelationalDB {
{
if res.is_err() {
let (tx_metrics, reducer) = self.rollback_mut_tx(tx);
tx_metrics.report_with_db(&reducer, self, None);
self.report(&reducer, &tx_metrics, None);
} else {
match self.commit_tx(tx).map_err(E::from)? {
Some((tx_data, tx_metrics, reducer)) => {
tx_metrics.report_with_db(&reducer, self, Some(&tx_data));
self.report(&reducer, &tx_metrics, Some(&tx_data));
}
None => panic!("TODO: retry?"),
}
Expand All @@ -1002,7 +1009,7 @@ impl RelationalDB {
match res {
Err(e) => {
let (tx_metrics, reducer) = self.rollback_mut_tx(tx);
tx_metrics.report_with_db(&reducer, self, None);
self.report(&reducer, &tx_metrics, None);

Err(e)
}
Expand All @@ -1013,6 +1020,22 @@ impl RelationalDB {
pub(crate) fn alter_table_access(&self, tx: &mut MutTx, name: Box<str>, access: StAccess) -> Result<(), DBError> {
self.inner.alter_table_access_mut_tx(tx, name, access)
}

/// Reports the `TxMetrics`s passed.
///
/// Should only be called after the tx lock has been fully released.
pub(crate) fn report_tx_metricses(
&self,
reducer: &str,
tx_data: Option<&TxData>,
metrics_mut: Option<&TxMetrics>,
metrics_read: &TxMetrics,
) {
if let Some(metrics_mut) = metrics_mut {
self.report(reducer, metrics_mut, tx_data);
}
self.report(reducer, metrics_read, None);
}
}

impl RelationalDB {
Expand Down Expand Up @@ -1360,6 +1383,11 @@ impl RelationalDB {
pub fn drop_constraint(&self, tx: &mut MutTx, constraint_id: ConstraintId) -> Result<(), DBError> {
self.inner.drop_constraint_mut_tx(tx, constraint_id)
}

/// Reports the metrics for `reducer`, using counters provided by `db`.
pub fn report(&self, reducer: &str, metrics: &TxMetrics, tx_data: Option<&TxData>) {
metrics.report(tx_data, reducer, |wl: WorkloadType| self.exec_counters_for(wl));
}
}

#[allow(unused)]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ fn init_database(
let rcr = match module_def.lifecycle_reducer(Lifecycle::Init) {
None => {
if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? {
tx_metrics.report_with_db(&reducer, stdb, Some(&tx_data));
stdb.report(&reducer, &tx_metrics, Some(&tx_data));
}
None
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,12 @@ impl<T: WasmInstance> ModuleInstance for WasmModuleInstance<T> {
log::warn!("Database update failed: {} @ {}", e, stdb.database_identity());
self.system_logger().warn(&format!("Database update failed: {e}"));
let (tx_metrics, reducer) = stdb.rollback_mut_tx(tx);
tx_metrics.report_with_db(&reducer, stdb, None);
stdb.report(&reducer, &tx_metrics, None);
Ok(UpdateDatabaseResult::ErrorExecutingMigration(e))
}
Ok(()) => {
if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? {
tx_metrics.report_with_db(&reducer, stdb, Some(&tx_data));
stdb.report(&reducer, &tx_metrics, Some(&tx_data));
}
self.system_logger().info("Database updated");
log::info!("Database updated, {}", stdb.database_identity());
Expand Down
11 changes: 2 additions & 9 deletions crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::time::Duration;

use super::ast::SchemaViewer;
use crate::db::datastore::locking_tx_datastore::datastore::report_tx_metricses;
use crate::db::datastore::locking_tx_datastore::state_view::StateView;
use crate::db::datastore::system_tables::StVarTable;
use crate::db::datastore::traits::IsolationLevel;
Expand Down Expand Up @@ -204,13 +203,7 @@ pub fn run(
// Release the tx on drop, so that we record metrics.
let mut tx = scopeguard::guard(tx, |tx| {
let (tx_metrics_downgrade, reducer) = db.release_tx(tx);
report_tx_metricses(
&reducer,
db,
Some(&tx_data),
Some(&tx_metrics_mut),
&tx_metrics_downgrade,
);
db.report_tx_metricses(&reducer, Some(&tx_data), Some(&tx_metrics_mut), &tx_metrics_downgrade);
});

// Compute the header for the result set
Expand Down Expand Up @@ -255,7 +248,7 @@ pub fn run(
let metrics = tx.metrics;
return db.commit_tx(tx).map(|tx_opt| {
if let Some((tx_data, tx_metrics, reducer)) = tx_opt {
tx_metrics.report_with_db(&reducer, db, Some(&tx_data));
db.report(&reducer, &tx_metrics, Some(&tx_data));
}
SqlResult { rows: vec![], metrics }
});
Expand Down
11 changes: 10 additions & 1 deletion crates/core/src/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use spacetimedb_lib::{metrics::ExecutionMetrics, Identity};
use spacetimedb_primitives::TableId;

use crate::{
db::db_metrics::DB_METRICS, error::DBError, execution_context::WorkloadType, worker_metrics::WORKER_METRICS,
db::{datastore::locking_tx_datastore::datastore::MetricsRecorder, db_metrics::DB_METRICS},
error::DBError,
execution_context::WorkloadType,
worker_metrics::WORKER_METRICS,
};

pub mod delta;
Expand Down Expand Up @@ -84,6 +87,12 @@ impl ExecutionCounters {
}
}

impl MetricsRecorder for ExecutionCounters {
fn record(&self, metrics: &ExecutionMetrics) {
self.record(metrics);
}
}

/// Execute a subscription query
pub fn execute_plan<Tx, F>(plan_fragments: &[PipelinedProject], tx: &Tx) -> Result<(F::List, u64, ExecutionMetrics)>
where
Expand Down
Loading
Loading