Skip to content

Commit 7375323

Browse files
Datastore Extraction: TxMetrics refactor, small non-functional change to make datastore extraction easier (#2794)
1 parent e7b9c61 commit 7375323

7 files changed

Lines changed: 66 additions & 79 deletions

File tree

crates/core/src/db/datastore/locking_tx_datastore/datastore.rs

Lines changed: 10 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,11 @@ use super::{
66
tx::TxId,
77
tx_state::TxState,
88
};
9-
use crate::{
10-
db::datastore::{
11-
locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx},
12-
traits::{InsertFlags, UpdateFlags},
13-
},
14-
subscription::ExecutionCounters,
15-
};
16-
use crate::{
17-
db::relational_db::RelationalDB,
18-
execution_context::{Workload, WorkloadType},
9+
use crate::db::datastore::{
10+
locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx},
11+
traits::{InsertFlags, UpdateFlags},
1912
};
13+
use crate::execution_context::{Workload, WorkloadType};
2014
use crate::{
2115
db::{
2216
datastore::{
@@ -36,7 +30,6 @@ use crate::{
3630
};
3731
use anyhow::{anyhow, Context};
3832
use core::{cell::RefCell, ops::RangeBounds};
39-
use enum_map::EnumMap;
4033
use parking_lot::{Mutex, RwLock};
4134
use spacetimedb_commitlog::payload::{txdata, Txdata};
4235
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap};
@@ -74,9 +67,6 @@ pub struct Locking {
7467
sequence_state: Arc<Mutex<SequencesState>>,
7568
/// The identity of this database.
7669
pub(crate) database_identity: Identity,
77-
78-
/// A map from workload types to their cached prometheus counters.
79-
workload_type_to_exec_counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
8070
}
8171

8272
impl MemoryUsage for Locking {
@@ -85,7 +75,6 @@ impl MemoryUsage for Locking {
8575
committed_state,
8676
sequence_state,
8777
database_identity,
88-
workload_type_to_exec_counters: _,
8978
} = self;
9079
std::mem::size_of_val(&**committed_state)
9180
+ committed_state.read().heap_usage()
@@ -97,14 +86,10 @@ impl MemoryUsage for Locking {
9786

9887
impl Locking {
9988
pub fn new(database_identity: Identity, page_pool: PagePool) -> Self {
100-
let workload_type_to_exec_counters =
101-
Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity)));
102-
10389
Self {
10490
committed_state: Arc::new(RwLock::new(CommittedState::new(page_pool))),
10591
sequence_state: <_>::default(),
10692
database_identity,
107-
workload_type_to_exec_counters,
10893
}
10994
}
11095

@@ -321,10 +306,6 @@ impl Locking {
321306

322307
tx.alter_table_access(table_id, access)
323308
}
324-
325-
pub(crate) fn exec_counters_for(&self, workload_type: WorkloadType) -> &ExecutionCounters {
326-
&self.workload_type_to_exec_counters[workload_type]
327-
}
328309
}
329310

330311
impl DataRow for Locking {
@@ -702,6 +683,10 @@ struct TableStats {
702683
num_indices: usize,
703684
}
704685

686+
pub trait MetricsRecorder {
687+
fn record(&self, metrics: &ExecutionMetrics);
688+
}
689+
705690
impl TxMetrics {
706691
/// Compute transaction metrics that we can report once the tx lock is released.
707692
pub(super) fn new(
@@ -751,11 +736,11 @@ impl TxMetrics {
751736
}
752737

753738
/// Reports the metrics for `reducer` using `get_exec_counter` to retrieve the metrics counters.
754-
pub fn report<'a>(
739+
pub fn report<'a, R: MetricsRecorder + 'a>(
755740
&self,
756741
tx_data: Option<&TxData>,
757742
reducer: &str,
758-
get_exec_counter: impl FnOnce(WorkloadType) -> &'a ExecutionCounters,
743+
get_exec_counter: impl FnOnce(WorkloadType) -> &'a R,
759744
) {
760745
let workload = &self.workload;
761746
let db = &self.database_identity;
@@ -844,27 +829,6 @@ impl TxMetrics {
844829
}
845830
}
846831
}
847-
848-
/// Reports the metrics for `reducer`, using counters provided by `db`.
849-
pub(crate) fn report_with_db(&self, reducer: &str, db: &RelationalDB, tx_data: Option<&TxData>) {
850-
self.report(tx_data, reducer, |wl| db.exec_counters_for(wl));
851-
}
852-
}
853-
854-
/// Reports the `TxMetrics`s passed.
855-
///
856-
/// Should only be called after the tx lock has been fully released.
857-
pub fn report_tx_metricses(
858-
reducer: &str,
859-
db: &RelationalDB,
860-
tx_data: Option<&TxData>,
861-
metrics_mut: Option<&TxMetrics>,
862-
metrics_read: &TxMetrics,
863-
) {
864-
if let Some(metrics_mut) = metrics_mut {
865-
metrics_mut.report_with_db(reducer, db, tx_data);
866-
}
867-
metrics_read.report_with_db(reducer, db, None);
868832
}
869833

870834
impl MutTx for Locking {

crates/core/src/db/relational_db.rs

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::datastore::locking_tx_datastore::committed_state::CommittedState;
2-
use super::datastore::locking_tx_datastore::datastore::{report_tx_metricses, TxMetrics};
2+
use super::datastore::locking_tx_datastore::datastore::TxMetrics;
33
use super::datastore::locking_tx_datastore::state_view::{
44
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView,
55
};
@@ -23,6 +23,7 @@ use crate::messages::control_db::HostType;
2323
use crate::subscription::ExecutionCounters;
2424
use crate::util::{asyncify, spawn_rayon};
2525
use anyhow::{anyhow, Context};
26+
use enum_map::EnumMap;
2627
use fs2::FileExt;
2728
use futures::channel::mpsc;
2829
use futures::StreamExt;
@@ -107,6 +108,9 @@ pub struct RelationalDB {
107108
// We want to release the file lock last.
108109
// TODO(noa): is this lockfile still necessary now that we have data-dir?
109110
_lock: LockFile,
111+
112+
/// A map from workload types to their cached prometheus counters.
113+
workload_type_to_exec_counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
110114
}
111115

112116
#[derive(Clone)]
@@ -224,6 +228,8 @@ impl RelationalDB {
224228
let (durability, disk_size_fn) = durability.unzip();
225229
let snapshot_worker =
226230
snapshot_repo.map(|repo| SnapshotWorker::new(inner.committed_state.clone(), repo.clone()));
231+
let workload_type_to_exec_counters =
232+
Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity)));
227233

228234
Self {
229235
inner,
@@ -237,6 +243,7 @@ impl RelationalDB {
237243
disk_size_fn,
238244

239245
_lock: lock,
246+
workload_type_to_exec_counters,
240247
}
241248
}
242249

@@ -735,7 +742,7 @@ impl RelationalDB {
735742

736743
/// Returns the execution counters for `workload_type` for this database.
737744
pub fn exec_counters_for(&self, workload_type: WorkloadType) -> &ExecutionCounters {
738-
self.inner.exec_counters_for(workload_type)
745+
&self.workload_type_to_exec_counters[workload_type]
739746
}
740747

741748
/// Begin a transaction.
@@ -972,7 +979,7 @@ impl RelationalDB {
972979
let mut tx = self.begin_tx(workload);
973980
let res = f(&mut tx);
974981
let (tx_metics, reducer) = self.release_tx(tx);
975-
report_tx_metricses(&reducer, self, None, None, &tx_metics);
982+
self.report_tx_metricses(&reducer, None, None, &tx_metics);
976983
res
977984
}
978985

@@ -983,11 +990,11 @@ impl RelationalDB {
983990
{
984991
if res.is_err() {
985992
let (tx_metrics, reducer) = self.rollback_mut_tx(tx);
986-
tx_metrics.report_with_db(&reducer, self, None);
993+
self.report(&reducer, &tx_metrics, None);
987994
} else {
988995
match self.commit_tx(tx).map_err(E::from)? {
989996
Some((tx_data, tx_metrics, reducer)) => {
990-
tx_metrics.report_with_db(&reducer, self, Some(&tx_data));
997+
self.report(&reducer, &tx_metrics, Some(&tx_data));
991998
}
992999
None => panic!("TODO: retry?"),
9931000
}
@@ -1002,7 +1009,7 @@ impl RelationalDB {
10021009
match res {
10031010
Err(e) => {
10041011
let (tx_metrics, reducer) = self.rollback_mut_tx(tx);
1005-
tx_metrics.report_with_db(&reducer, self, None);
1012+
self.report(&reducer, &tx_metrics, None);
10061013

10071014
Err(e)
10081015
}
@@ -1013,6 +1020,22 @@ impl RelationalDB {
10131020
pub(crate) fn alter_table_access(&self, tx: &mut MutTx, name: Box<str>, access: StAccess) -> Result<(), DBError> {
10141021
self.inner.alter_table_access_mut_tx(tx, name, access)
10151022
}
1023+
1024+
/// Reports the `TxMetrics`s passed.
1025+
///
1026+
/// Should only be called after the tx lock has been fully released.
1027+
pub(crate) fn report_tx_metricses(
1028+
&self,
1029+
reducer: &str,
1030+
tx_data: Option<&TxData>,
1031+
metrics_mut: Option<&TxMetrics>,
1032+
metrics_read: &TxMetrics,
1033+
) {
1034+
if let Some(metrics_mut) = metrics_mut {
1035+
self.report(reducer, metrics_mut, tx_data);
1036+
}
1037+
self.report(reducer, metrics_read, None);
1038+
}
10161039
}
10171040

10181041
impl RelationalDB {
@@ -1360,6 +1383,11 @@ impl RelationalDB {
13601383
pub fn drop_constraint(&self, tx: &mut MutTx, constraint_id: ConstraintId) -> Result<(), DBError> {
13611384
self.inner.drop_constraint_mut_tx(tx, constraint_id)
13621385
}
1386+
1387+
/// Reports the metrics for `reducer`, using counters provided by `db`.
1388+
pub fn report(&self, reducer: &str, metrics: &TxMetrics, tx_data: Option<&TxData>) {
1389+
metrics.report(tx_data, reducer, |wl: WorkloadType| self.exec_counters_for(wl));
1390+
}
13631391
}
13641392

13651393
#[allow(unused)]

crates/core/src/host/module_host.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ fn init_database(
364364
let rcr = match module_def.lifecycle_reducer(Lifecycle::Init) {
365365
None => {
366366
if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? {
367-
tx_metrics.report_with_db(&reducer, stdb, Some(&tx_data));
367+
stdb.report(&reducer, &tx_metrics, Some(&tx_data));
368368
}
369369
None
370370
}

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,12 +290,12 @@ impl<T: WasmInstance> ModuleInstance for WasmModuleInstance<T> {
290290
log::warn!("Database update failed: {} @ {}", e, stdb.database_identity());
291291
self.system_logger().warn(&format!("Database update failed: {e}"));
292292
let (tx_metrics, reducer) = stdb.rollback_mut_tx(tx);
293-
tx_metrics.report_with_db(&reducer, stdb, None);
293+
stdb.report(&reducer, &tx_metrics, None);
294294
Ok(UpdateDatabaseResult::ErrorExecutingMigration(e))
295295
}
296296
Ok(()) => {
297297
if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? {
298-
tx_metrics.report_with_db(&reducer, stdb, Some(&tx_data));
298+
stdb.report(&reducer, &tx_metrics, Some(&tx_data));
299299
}
300300
self.system_logger().info("Database updated");
301301
log::info!("Database updated, {}", stdb.database_identity());

crates/core/src/sql/execute.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::time::Duration;
22

33
use super::ast::SchemaViewer;
4-
use crate::db::datastore::locking_tx_datastore::datastore::report_tx_metricses;
54
use crate::db::datastore::locking_tx_datastore::state_view::StateView;
65
use crate::db::datastore::system_tables::StVarTable;
76
use crate::db::datastore::traits::IsolationLevel;
@@ -204,13 +203,7 @@ pub fn run(
204203
// Release the tx on drop, so that we record metrics.
205204
let mut tx = scopeguard::guard(tx, |tx| {
206205
let (tx_metrics_downgrade, reducer) = db.release_tx(tx);
207-
report_tx_metricses(
208-
&reducer,
209-
db,
210-
Some(&tx_data),
211-
Some(&tx_metrics_mut),
212-
&tx_metrics_downgrade,
213-
);
206+
db.report_tx_metricses(&reducer, Some(&tx_data), Some(&tx_metrics_mut), &tx_metrics_downgrade);
214207
});
215208

216209
// Compute the header for the result set
@@ -255,7 +248,7 @@ pub fn run(
255248
let metrics = tx.metrics;
256249
return db.commit_tx(tx).map(|tx_opt| {
257250
if let Some((tx_data, tx_metrics, reducer)) = tx_opt {
258-
tx_metrics.report_with_db(&reducer, db, Some(&tx_data));
251+
db.report(&reducer, &tx_metrics, Some(&tx_data));
259252
}
260253
SqlResult { rows: vec![], metrics }
261254
});

crates/core/src/subscription/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ use spacetimedb_lib::{metrics::ExecutionMetrics, Identity};
1212
use spacetimedb_primitives::TableId;
1313

1414
use crate::{
15-
db::db_metrics::DB_METRICS, error::DBError, execution_context::WorkloadType, worker_metrics::WORKER_METRICS,
15+
db::{datastore::locking_tx_datastore::datastore::MetricsRecorder, db_metrics::DB_METRICS},
16+
error::DBError,
17+
execution_context::WorkloadType,
18+
worker_metrics::WORKER_METRICS,
1619
};
1720

1821
pub mod delta;
@@ -84,6 +87,12 @@ impl ExecutionCounters {
8487
}
8588
}
8689

90+
impl MetricsRecorder for ExecutionCounters {
91+
fn record(&self, metrics: &ExecutionMetrics) {
92+
self.record(metrics);
93+
}
94+
}
95+
8796
/// Execute a subscription query
8897
pub fn execute_plan<Tx, F>(plan_fragments: &[PipelinedProject], tx: &Tx) -> Result<(F::List, u64, ExecutionMetrics)>
8998
where

0 commit comments

Comments
 (0)