Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions crates/core/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
use std::sync::Arc;

use enum_map::EnumMap;
use tokio::sync::mpsc;

use crate::{
db::datastore::{locking_tx_datastore::datastore::TxMetrics, traits::TxData},
execution_context::WorkloadType,
subscription::ExecutionCounters,
};

pub mod datastore;
pub mod db_metrics;
pub mod relational_db;
Expand All @@ -22,3 +33,86 @@ pub struct Config {
/// Specifies the page pool max size in bytes.
pub page_pool_max_size: Option<usize>,
}

/// A message that is processed by the [`spawn_metrics_recorder`] actor.
/// We use a separate task to record metrics to avoid blocking transactions.
pub struct MetricsMessage {
/// The reducer the produced these metrics.
reducer: String,
/// Metrics from a mutable transaction.
metrics_for_writer: Option<TxMetrics>,
/// Metrics from a read-only transaction.
/// A message may have metrics for both types of transactions,
/// because metrics for a reducer and its subscription updates are recorded together.
metrics_for_reader: Option<TxMetrics>,
/// The row updates for an immutable transaction.
/// Needed for insert and delete counters.
tx_data: Option<Arc<TxData>>,
/// Cached metrics counters for each workload type.
counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
}

/// The handle used to send work to the tx metrics recorder.
#[derive(Clone)]
pub struct MetricsRecorderQueue {
tx: mpsc::UnboundedSender<MetricsMessage>,
}

impl MetricsRecorderQueue {
pub fn send_metrics(
&self,
reducer: String,
metrics_for_writer: Option<TxMetrics>,
metrics_for_reader: Option<TxMetrics>,
tx_data: Option<Arc<TxData>>,
counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
) {
if let Err(err) = self.tx.send(MetricsMessage {
reducer,
metrics_for_writer,
metrics_for_reader,
tx_data,
counters,
}) {
log::warn!("failed to send metrics: {err}");
}
}
}

/// Spawns a task for recording transaction metrics.
/// Returns the handle for pushing metrics to the recorder.
pub fn spawn_tx_metrics_recorder() -> (MetricsRecorderQueue, tokio::task::AbortHandle) {
let (tx, mut rx) = mpsc::unbounded_channel();
let abort_handle = tokio::spawn(async move {
while let Some(MetricsMessage {
reducer,
metrics_for_writer,
metrics_for_reader,
tx_data,
counters,
}) = rx.recv().await
{
if let Some(tx_metrics) = metrics_for_writer {
tx_metrics.report(
// If row updates are present,
// they will always belong to the writer transaction.
tx_data.as_deref(),
&reducer,
|wl| &counters[wl],
);
}
if let Some(tx_metrics) = metrics_for_reader {
tx_metrics.report(
// If row updates are present,
// they will never belong to the reader transaction.
// Passing row updates here will most likely panic.
None,
&reducer,
|wl| &counters[wl],
);
}
}
})
.abort_handle();
(MetricsRecorderQueue { tx }, abort_handle)
}
74 changes: 53 additions & 21 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use super::datastore::{
};
use super::db_metrics::DB_METRICS;
use crate::db::datastore::system_tables::StModuleRow;
use crate::db::MetricsRecorderQueue;
use crate::error::{DBError, DatabaseError, RestoreSnapshotError};
use crate::execution_context::{ReducerContext, Workload, WorkloadType};
use crate::messages::control_db::HostType;
Expand Down Expand Up @@ -110,14 +111,17 @@ pub struct RelationalDB {
/// `Some` if `durability` is `Some`, `None` otherwise.
disk_size_fn: Option<DiskSizeFn>,

/// A map from workload types to their cached prometheus counters.
workload_type_to_exec_counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,

/// An async queue for recording transaction metrics off the main thread
metrics_recorder_queue: Option<MetricsRecorderQueue>,
Comment thread
joshua-spacetime marked this conversation as resolved.

// DO NOT ADD FIELDS AFTER THIS.
// By default, fields are dropped in declaration order.
// We want to release the file lock last.
// TODO(noa): is this lockfile still necessary now that we have data-dir?
_lock: LockFile,

/// A map from workload types to their cached prometheus counters.
workload_type_to_exec_counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -231,6 +235,7 @@ impl RelationalDB {
inner: Locking,
durability: Option<(Arc<Durability>, DiskSizeFn)>,
snapshot_repo: Option<Arc<SnapshotRepository>>,
metrics_recorder_queue: Option<MetricsRecorderQueue>,
Comment thread
joshua-spacetime marked this conversation as resolved.
) -> Self {
let (durability, disk_size_fn) = durability.unzip();
let snapshot_worker =
Expand All @@ -249,8 +254,10 @@ impl RelationalDB {
row_count_fn: default_row_count_fn(database_identity),
disk_size_fn,

_lock: lock,
workload_type_to_exec_counters,
metrics_recorder_queue,

_lock: lock,
}
}

Expand Down Expand Up @@ -324,6 +331,10 @@ impl RelationalDB {
/// If restoring from an existing database, the `snapshot_repo` must
/// store views of the same sequence of TXes as the `history`.
///
/// - `metrics_recorder_queue`
Comment thread
joshua-spacetime marked this conversation as resolved.
///
/// The send side of a queue for recording transaction metrics.
///
/// # Return values
///
/// Alongside `Self`, [`ConnectedClients`] is returned, which is the set of
Expand All @@ -333,13 +344,15 @@ impl RelationalDB {
/// gracefully. The caller is responsible for disconnecting the clients.
///
/// [ModuleHost]: crate::host::module_host::ModuleHost
#[allow(clippy::too_many_arguments)]
pub fn open(
root: &ReplicaDir,
database_identity: Identity,
owner_identity: Identity,
history: impl durability::History<TxData = Txdata>,
durability: Option<(Arc<Durability>, DiskSizeFn)>,
snapshot_repo: Option<Arc<SnapshotRepository>>,
metrics_recorder_queue: Option<MetricsRecorderQueue>,
Comment thread
joshua-spacetime marked this conversation as resolved.
page_pool: PagePool,
) -> Result<(Self, ConnectedClients), DBError> {
log::trace!("[{}] DATABASE: OPEN", database_identity);
Expand Down Expand Up @@ -373,6 +386,7 @@ impl RelationalDB {
inner,
durability,
snapshot_repo,
metrics_recorder_queue,
);

if let Some(meta) = db.metadata()? {
Expand Down Expand Up @@ -749,6 +763,11 @@ impl RelationalDB {
Ok(AlgebraicValue::decode(col_ty, &mut &*bytes)?)
}

/// Returns the execution counters for this database.
pub fn exec_counter_map(&self) -> Arc<EnumMap<WorkloadType, ExecutionCounters>> {
self.workload_type_to_exec_counters.clone()
}

/// Returns the execution counters for `workload_type` for this database.
pub fn exec_counters_for(&self, workload_type: WorkloadType) -> &ExecutionCounters {
&self.workload_type_to_exec_counters[workload_type]
Expand Down Expand Up @@ -988,7 +1007,7 @@ impl RelationalDB {
let mut tx = self.begin_tx(workload);
let res = f(&mut tx);
let (tx_metrics, reducer) = self.release_tx(tx);
self.report_tx_metricses(&reducer, None, None, &tx_metrics);
self.report_read_tx_metrics(reducer, tx_metrics);
res
}

Expand All @@ -999,11 +1018,11 @@ impl RelationalDB {
{
if res.is_err() {
let (tx_metrics, reducer) = self.rollback_mut_tx(tx);
self.report(&reducer, &tx_metrics, None);
self.report_mut_tx_metrics(reducer, tx_metrics, None);
Comment thread
joshua-spacetime marked this conversation as resolved.
} else {
match self.commit_tx(tx).map_err(E::from)? {
Some((tx_data, tx_metrics, reducer)) => {
self.report(&reducer, &tx_metrics, Some(&tx_data));
self.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data));
}
None => panic!("TODO: retry?"),
}
Expand All @@ -1018,7 +1037,7 @@ impl RelationalDB {
match res {
Err(e) => {
let (tx_metrics, reducer) = self.rollback_mut_tx(tx);
self.report(&reducer, &tx_metrics, None);
self.report_mut_tx_metrics(reducer, tx_metrics, None);

Err(e)
}
Expand All @@ -1042,17 +1061,22 @@ impl RelationalDB {
/// Reports the `TxMetrics`s passed.
///
/// Should only be called after the tx lock has been fully released.
pub(crate) fn report_tx_metricses(
pub(crate) fn report_tx_metrics(
&self,
reducer: &str,
tx_data: Option<&TxData>,
metrics_mut: Option<&TxMetrics>,
metrics_read: &TxMetrics,
reducer: String,
tx_data: Option<Arc<TxData>>,
metrics_for_writer: Option<TxMetrics>,
metrics_for_reader: Option<TxMetrics>,
) {
if let Some(metrics_mut) = metrics_mut {
self.report(reducer, metrics_mut, tx_data);
if let Some(recorder) = &self.metrics_recorder_queue {
recorder.send_metrics(
reducer,
metrics_for_writer,
metrics_for_reader,
tx_data,
self.exec_counter_map(),
);
}
self.report(reducer, metrics_read, None);
}
}

Expand Down Expand Up @@ -1403,8 +1427,13 @@ impl RelationalDB {
}

/// Reports the metrics for `reducer`, using counters provided by `db`.
pub fn report(&self, reducer: &str, metrics: &TxMetrics, tx_data: Option<&TxData>) {
metrics.report(tx_data, reducer, |wl: WorkloadType| self.exec_counters_for(wl));
pub fn report_mut_tx_metrics(&self, reducer: String, metrics: TxMetrics, tx_data: Option<TxData>) {
self.report_tx_metrics(reducer, tx_data.map(Arc::new), Some(metrics), None);
}

/// Reports subscription metrics for `reducer`, using counters provided by `db`.
pub fn report_read_tx_metrics(&self, reducer: String, metrics: TxMetrics) {
self.report_tx_metrics(reducer, None, None, Some(metrics));
}

/// Read the value of [ST_VARNAME_ROW_LIMIT] from `st_var`
Expand Down Expand Up @@ -1779,7 +1808,7 @@ pub mod tests_utils {
expected_num_clients: usize,
) -> Result<Self, DBError> {
let dir = TempReplicaDir::new()?;
let db = Self::open_db(&dir, history, None, None, expected_num_clients)?;
let db = Self::open_db(&dir, history, None, None, None, expected_num_clients)?;
Ok(Self {
db,
durable: None,
Expand Down Expand Up @@ -1870,7 +1899,7 @@ pub mod tests_utils {
}

fn in_memory_internal(root: &ReplicaDir) -> Result<RelationalDB, DBError> {
Self::open_db(root, EmptyHistory::new(), None, None, 0)
Self::open_db(root, EmptyHistory::new(), None, None, None, 0)
}

fn durable_internal(
Expand All @@ -1884,7 +1913,7 @@ pub mod tests_utils {
let snapshot_repo = want_snapshot_repo
.then(|| open_snapshot_repo(root.snapshots(), Identity::ZERO, 0))
.transpose()?;
let db = Self::open_db(root, history, Some((durability, disk_size_fn)), snapshot_repo, 0)?;
let db = Self::open_db(root, history, Some((durability, disk_size_fn)), snapshot_repo, None, 0)?;

Ok((db, local))
}
Expand All @@ -1894,6 +1923,7 @@ pub mod tests_utils {
history: impl durability::History<TxData = Txdata>,
durability: Option<(Arc<Durability>, DiskSizeFn)>,
snapshot_repo: Option<Arc<SnapshotRepository>>,
metrics_recorder_queue: Option<MetricsRecorderQueue>,
Comment thread
joshua-spacetime marked this conversation as resolved.
expected_num_clients: usize,
) -> Result<RelationalDB, DBError> {
let (db, connected_clients) = RelationalDB::open(
Expand All @@ -1903,6 +1933,7 @@ pub mod tests_utils {
history,
durability,
snapshot_repo,
metrics_recorder_queue,
PagePool::new_for_test(),
)?;
assert_eq!(connected_clients.len(), expected_num_clients);
Expand Down Expand Up @@ -2151,6 +2182,7 @@ mod tests {
EmptyHistory::new(),
None,
None,
None,
PagePool::new_for_test(),
) {
Ok(_) => {
Expand Down
Loading
Loading