Skip to content

Commit 6563a75

Browse files
Record transaction metrics off the main thread
1 parent a7293a3 commit 6563a75

6 files changed

Lines changed: 125 additions & 29 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::db::datastore::system_tables::StModuleRow;
2121
use crate::error::{DBError, DatabaseError, RestoreSnapshotError};
2222
use crate::execution_context::{ReducerContext, Workload, WorkloadType};
2323
use crate::messages::control_db::HostType;
24+
use crate::subscription::module_subscription_manager::{spawn_metrics_recorder, MetricsRecorderQueue};
2425
use crate::subscription::ExecutionCounters;
2526
use crate::util::{asyncify, spawn_rayon};
2627
use anyhow::{anyhow, Context};
@@ -118,6 +119,9 @@ pub struct RelationalDB {
118119

119120
/// A map from workload types to their cached prometheus counters.
120121
workload_type_to_exec_counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
122+
123+
/// An async queue for recording transaction metrics off the main thread
124+
metrics_recorder_queue: MetricsRecorderQueue,
121125
}
122126

123127
#[derive(Clone)]
@@ -237,6 +241,7 @@ impl RelationalDB {
237241
snapshot_repo.map(|repo| SnapshotWorker::new(inner.committed_state.clone(), repo.clone()));
238242
let workload_type_to_exec_counters =
239243
Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity)));
244+
let metrics_recorder_queue = spawn_metrics_recorder();
240245

241246
Self {
242247
inner,
@@ -251,6 +256,7 @@ impl RelationalDB {
251256

252257
_lock: lock,
253258
workload_type_to_exec_counters,
259+
metrics_recorder_queue,
254260
}
255261
}
256262

@@ -749,6 +755,11 @@ impl RelationalDB {
749755
Ok(AlgebraicValue::decode(col_ty, &mut &*bytes)?)
750756
}
751757

758+
/// Returns the execution counters for this database.
759+
pub fn exec_counter_map(&self) -> Arc<EnumMap<WorkloadType, ExecutionCounters>> {
760+
self.workload_type_to_exec_counters.clone()
761+
}
762+
752763
/// Returns the execution counters for `workload_type` for this database.
753764
pub fn exec_counters_for(&self, workload_type: WorkloadType) -> &ExecutionCounters {
754765
&self.workload_type_to_exec_counters[workload_type]
@@ -988,7 +999,7 @@ impl RelationalDB {
988999
let mut tx = self.begin_tx(workload);
9891000
let res = f(&mut tx);
9901001
let (tx_metrics, reducer) = self.release_tx(tx);
991-
self.report_tx_metricses(&reducer, None, None, &tx_metrics);
1002+
self.report_tx_metrics(reducer, Arc::new(None), None, Some(tx_metrics));
9921003
res
9931004
}
9941005

@@ -999,11 +1010,11 @@ impl RelationalDB {
9991010
{
10001011
if res.is_err() {
10011012
let (tx_metrics, reducer) = self.rollback_mut_tx(tx);
1002-
self.report(&reducer, &tx_metrics, None);
1013+
self.report(reducer, tx_metrics, None);
10031014
} else {
10041015
match self.commit_tx(tx).map_err(E::from)? {
10051016
Some((tx_data, tx_metrics, reducer)) => {
1006-
self.report(&reducer, &tx_metrics, Some(&tx_data));
1017+
self.report(reducer, tx_metrics, Some(tx_data));
10071018
}
10081019
None => panic!("TODO: retry?"),
10091020
}
@@ -1018,7 +1029,7 @@ impl RelationalDB {
10181029
match res {
10191030
Err(e) => {
10201031
let (tx_metrics, reducer) = self.rollback_mut_tx(tx);
1021-
self.report(&reducer, &tx_metrics, None);
1032+
self.report(reducer, tx_metrics, None);
10221033

10231034
Err(e)
10241035
}
@@ -1042,17 +1053,20 @@ impl RelationalDB {
10421053
/// Reports the `TxMetrics`s passed.
10431054
///
10441055
/// Should only be called after the tx lock has been fully released.
1045-
pub(crate) fn report_tx_metricses(
1056+
pub(crate) fn report_tx_metrics(
10461057
&self,
1047-
reducer: &str,
1048-
tx_data: Option<&TxData>,
1049-
metrics_mut: Option<&TxMetrics>,
1050-
metrics_read: &TxMetrics,
1058+
reducer: String,
1059+
tx_data: Arc<Option<TxData>>,
1060+
metrics_for_writer: Option<TxMetrics>,
1061+
metrics_for_reader: Option<TxMetrics>,
10511062
) {
1052-
if let Some(metrics_mut) = metrics_mut {
1053-
self.report(reducer, metrics_mut, tx_data);
1054-
}
1055-
self.report(reducer, metrics_read, None);
1063+
self.metrics_recorder_queue.send_metrics(
1064+
reducer,
1065+
metrics_for_writer,
1066+
metrics_for_reader,
1067+
tx_data,
1068+
self.exec_counter_map(),
1069+
);
10561070
}
10571071
}
10581072

@@ -1403,8 +1417,8 @@ impl RelationalDB {
14031417
}
14041418

14051419
/// Reports the metrics for `reducer`, using counters provided by `db`.
1406-
pub fn report(&self, reducer: &str, metrics: &TxMetrics, tx_data: Option<&TxData>) {
1407-
metrics.report(tx_data, reducer, |wl: WorkloadType| self.exec_counters_for(wl));
1420+
pub fn report(&self, reducer: String, metrics: TxMetrics, tx_data: Option<TxData>) {
1421+
self.report_tx_metrics(reducer, Arc::new(tx_data), Some(metrics), None);
14081422
}
14091423

14101424
/// Read the value of [ST_VARNAME_ROW_LIMIT] from `st_var`

crates/core/src/host/module_host.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ pub struct ModuleEvent {
185185
}
186186

187187
/// Information about a running module.
188-
#[derive(Debug)]
189188
pub struct ModuleInfo {
190189
/// The definition of the module.
191190
/// Loaded by loading the module's program from the system tables, extracting its definition,
@@ -205,6 +204,17 @@ pub struct ModuleInfo {
205204
pub metrics: ModuleMetrics,
206205
}
207206

207+
impl fmt::Debug for ModuleInfo {
208+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209+
f.debug_struct("ModuleInfo")
210+
.field("module_def", &self.module_def)
211+
.field("owner_identity", &self.owner_identity)
212+
.field("database_identity", &self.database_identity)
213+
.field("module_hash", &self.module_hash)
214+
.finish()
215+
}
216+
}
217+
208218
#[derive(Debug)]
209219
pub struct ModuleMetrics {
210220
pub connected_clients: IntGauge,
@@ -368,7 +378,7 @@ fn init_database(
368378
let rcr = match module_def.lifecycle_reducer(Lifecycle::Init) {
369379
None => {
370380
if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? {
371-
stdb.report(&reducer, &tx_metrics, Some(&tx_data));
381+
stdb.report(reducer, tx_metrics, Some(tx_data));
372382
}
373383
None
374384
}

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,12 +293,12 @@ impl<T: WasmInstance> ModuleInstance for WasmModuleInstance<T> {
293293
log::warn!("Database update failed: {} @ {}", e, stdb.database_identity());
294294
self.system_logger().warn(&format!("Database update failed: {e}"));
295295
let (tx_metrics, reducer) = stdb.rollback_mut_tx(tx);
296-
stdb.report(&reducer, &tx_metrics, None);
296+
stdb.report(reducer, tx_metrics, None);
297297
Ok(UpdateDatabaseResult::ErrorExecutingMigration(e))
298298
}
299299
Ok(()) => {
300300
if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? {
301-
stdb.report(&reducer, &tx_metrics, Some(&tx_data));
301+
stdb.report(reducer, tx_metrics, Some(tx_data));
302302
}
303303
self.system_logger().info("Database updated");
304304
log::info!("Database updated, {}", stdb.database_identity());

crates/core/src/sql/execute.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::sync::Arc;
12
use std::time::Duration;
23

34
use super::ast::SchemaViewer;
@@ -202,7 +203,12 @@ pub fn run(
202203
// Release the tx on drop, so that we record metrics.
203204
let mut tx = scopeguard::guard(tx, |tx| {
204205
let (tx_metrics_downgrade, reducer) = db.release_tx(tx);
205-
db.report_tx_metricses(&reducer, Some(&tx_data), Some(&tx_metrics_mut), &tx_metrics_downgrade);
206+
db.report_tx_metrics(
207+
reducer,
208+
Arc::new(Some(tx_data)),
209+
Some(tx_metrics_mut),
210+
Some(tx_metrics_downgrade),
211+
);
206212
});
207213

208214
// Compute the header for the result set
@@ -247,7 +253,7 @@ pub fn run(
247253
let metrics = tx.metrics;
248254
return db.commit_tx(tx).map(|tx_opt| {
249255
if let Some((tx_data, tx_metrics, reducer)) = tx_opt {
250-
db.report(&reducer, &tx_metrics, Some(&tx_data));
256+
db.report(reducer, tx_metrics, Some(tx_data));
251257
}
252258
SqlResult { rows: vec![], metrics }
253259
});

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use std::{sync::Arc, time::Instant};
3737

3838
type Subscriptions = Arc<RwLock<SubscriptionManager>>;
3939

40-
#[derive(Debug, Clone)]
40+
#[derive(Clone)]
4141
pub struct ModuleSubscriptions {
4242
relational_db: Arc<RelationalDB>,
4343
/// If taking a lock (tx) on the db at the same time, ALWAYS lock the db first.
@@ -318,7 +318,7 @@ impl ModuleSubscriptions {
318318

319319
let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Subscribe), |tx| {
320320
let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
321-
self.relational_db.report(&reducer, &tx_metrics, None);
321+
self.relational_db.report(reducer, tx_metrics, None);
322322
});
323323

324324
let existing_query = {
@@ -420,7 +420,7 @@ impl ModuleSubscriptions {
420420

421421
let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Unsubscribe), |tx| {
422422
let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
423-
self.relational_db.report(&reducer, &tx_metrics, None);
423+
self.relational_db.report(reducer, tx_metrics, None);
424424
});
425425
let auth = AuthCtx::new(self.owner_identity, sender.id.identity);
426426
let (table_rows, metrics) = return_on_err_with_sql!(
@@ -482,7 +482,7 @@ impl ModuleSubscriptions {
482482
// Always lock the db before the subscription lock to avoid deadlocks.
483483
let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Unsubscribe), |tx| {
484484
let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
485-
self.relational_db.report(&reducer, &tx_metrics, None);
485+
self.relational_db.report(reducer, tx_metrics, None);
486486
});
487487

488488
let removed_queries = {
@@ -578,7 +578,7 @@ impl ModuleSubscriptions {
578578
// We always get the db lock before the subscription lock to avoid deadlocks.
579579
let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Subscribe), |tx| {
580580
let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
581-
self.relational_db.report(&reducer, &tx_metrics, None);
581+
self.relational_db.report(reducer, tx_metrics, None);
582582
});
583583

584584
let compile_timer = metrics.compilation_time.start_timer();
@@ -680,7 +680,7 @@ impl ModuleSubscriptions {
680680
);
681681
let tx = scopeguard::guard(tx, |tx| {
682682
let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
683-
self.relational_db.report(&reducer, &tx_metrics, None);
683+
self.relational_db.report(reducer, tx_metrics, None);
684684
});
685685

686686
// We minimize locking so that other clients can add subscriptions concurrently.
@@ -774,7 +774,7 @@ impl ModuleSubscriptions {
774774
)?;
775775
let tx = scopeguard::guard(tx, |tx| {
776776
let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
777-
self.relational_db.report(&reducer, &tx_metrics, None);
777+
self.relational_db.report(reducer, tx_metrics, None);
778778
});
779779

780780
check_row_limit(
@@ -884,13 +884,17 @@ impl ModuleSubscriptions {
884884
}
885885
};
886886

887+
let tx_data = Arc::new(tx_data);
888+
887889
// When we're done with this method, release the tx and report metrics.
888890
let mut read_tx = scopeguard::guard(read_tx, |tx| {
889891
let (tx_metrics_read, reducer) = self.relational_db.release_tx(tx);
890-
stdb.report_tx_metricses(&reducer, tx_data.as_ref(), Some(&tx_metrics_mut), &tx_metrics_read);
892+
self.relational_db
893+
.report_tx_metrics(reducer, tx_data.clone(), Some(tx_metrics_mut), Some(tx_metrics_read));
891894
});
892895
// Create the delta transaction we'll use to eval updates against.
893896
let delta_read_tx = tx_data
897+
.as_ref()
894898
.as_ref()
895899
.map(|tx_data| DeltaTx::new(&read_tx, tx_data, subscriptions.index_ids_for_subscriptions()))
896900
.unwrap_or_else(|| DeltaTx::from(&*read_tx));

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,18 @@ use crate::client::messages::{
55
TransactionUpdateMessage,
66
};
77
use crate::client::{ClientConnectionSender, Protocol};
8+
use crate::db::datastore::locking_tx_datastore::datastore::TxMetrics;
89
use crate::db::datastore::locking_tx_datastore::state_view::StateView;
10+
use crate::db::datastore::traits::TxData;
911
use crate::error::DBError;
12+
use crate::execution_context::WorkloadType;
1013
use crate::host::module_host::{DatabaseTableUpdate, ModuleEvent, UpdatesRelValue};
1114
use crate::messages::websocket::{self as ws, TableUpdate};
1215
use crate::subscription::delta::eval_delta;
16+
use crate::subscription::ExecutionCounters;
1317
use crate::worker_metrics::WORKER_METRICS;
1418
use core::mem;
19+
use enum_map::EnumMap;
1520
use hashbrown::hash_map::OccupiedError;
1621
use hashbrown::{HashMap, HashSet};
1722
use parking_lot::RwLock;
@@ -26,6 +31,7 @@ use spacetimedb_lib::{AlgebraicValue, ConnectionId, Identity, ProductValue};
2631
use spacetimedb_primitives::{ColId, IndexId, TableId};
2732
use spacetimedb_subscription::{JoinEdge, SubscriptionPlan, TableName};
2833
use std::collections::BTreeMap;
34+
use std::fmt::Debug;
2935
use std::sync::atomic::{AtomicBool, Ordering};
3036
use std::sync::Arc;
3137
use tokio::sync::mpsc;
@@ -1283,6 +1289,62 @@ impl SubscriptionManager {
12831289
}
12841290
}
12851291

1292+
struct MetricsMessage {
1293+
reducer: String,
1294+
metrics_for_writer: Option<TxMetrics>,
1295+
metrics_for_reader: Option<TxMetrics>,
1296+
tx_data: Arc<Option<TxData>>,
1297+
counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
1298+
}
1299+
1300+
#[derive(Clone)]
1301+
pub struct MetricsRecorderQueue {
1302+
tx: mpsc::UnboundedSender<MetricsMessage>,
1303+
}
1304+
1305+
impl MetricsRecorderQueue {
1306+
pub fn send_metrics(
1307+
&self,
1308+
reducer: String,
1309+
metrics_for_writer: Option<TxMetrics>,
1310+
metrics_for_reader: Option<TxMetrics>,
1311+
tx_data: Arc<Option<TxData>>,
1312+
counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
1313+
) {
1314+
if let Err(err) = self.tx.send(MetricsMessage {
1315+
reducer,
1316+
metrics_for_writer,
1317+
metrics_for_reader,
1318+
tx_data,
1319+
counters,
1320+
}) {
1321+
log::warn!("failed to send metrics: {err}");
1322+
}
1323+
}
1324+
}
1325+
1326+
pub fn spawn_metrics_recorder() -> MetricsRecorderQueue {
1327+
let (tx, mut rx) = mpsc::unbounded_channel();
1328+
tokio::spawn(async move {
1329+
while let Some(MetricsMessage {
1330+
reducer,
1331+
metrics_for_writer,
1332+
metrics_for_reader,
1333+
tx_data,
1334+
counters,
1335+
}) = rx.recv().await
1336+
{
1337+
if let Some(tx_metrics) = metrics_for_writer {
1338+
tx_metrics.report(tx_data.as_ref().as_ref(), &reducer, |wl| &counters[wl]);
1339+
}
1340+
if let Some(tx_metrics) = metrics_for_reader {
1341+
tx_metrics.report(tx_data.as_ref().as_ref(), &reducer, |wl| &counters[wl]);
1342+
}
1343+
}
1344+
});
1345+
MetricsRecorderQueue { tx }
1346+
}
1347+
12861348
struct SendWorkerClient {
12871349
/// This flag is set if an error occurs during a tx update.
12881350
/// It will be cleaned up async or on resubscribe.

0 commit comments

Comments
 (0)