Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 43 additions & 8 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::error::{DBError, DatabaseError, RestoreSnapshotError};
use crate::messages::control_db::HostType;
use crate::subscription::ExecutionCounters;
use crate::util::{asyncify, spawn_rayon};
use crate::worker_metrics::WORKER_METRICS;
use anyhow::{anyhow, Context};
use enum_map::EnumMap;
use fs2::FileExt;
Expand Down Expand Up @@ -378,6 +379,9 @@ impl RelationalDB {
let (min_commitlog_offset, _) = history.tx_range_hint();

log::info!("[{database_identity}] DATABASE: durable_tx_offset is {durable_tx_offset:?}");

let start_time = std::time::Instant::now();

let inner = Self::restore_from_snapshot_or_bootstrap(
database_identity,
snapshot_repo.as_deref(),
Expand All @@ -387,6 +391,13 @@ impl RelationalDB {
)?;

apply_history(&inner, database_identity, history)?;

let elapsed_time = start_time.elapsed();
WORKER_METRICS
.replay_total_time_seconds
.with_label_values(&database_identity)
.set(elapsed_time.as_secs_f64());

let db = Self::new(
lock,
database_identity,
Expand Down Expand Up @@ -532,13 +543,20 @@ impl RelationalDB {
) -> Result<ReconstructedSnapshot, Box<SnapshotError>> {
log::info!("[{database_identity}] DATABASE: restoring snapshot of tx_offset {snapshot_offset}");
let start = std::time::Instant::now();

let snapshot = snapshot_repo
.read_snapshot(snapshot_offset, page_pool)
.map_err(Box::new)?;

let elapsed_time = start.elapsed();

WORKER_METRICS
.replay_snapshot_read_time_seconds
.with_label_values(database_identity)
.set(elapsed_time.as_secs_f64());

log::info!(
"[{database_identity}] DATABASE: read snapshot of tx_offset {} in {:?}",
snapshot_offset,
start.elapsed(),
"[{database_identity}] DATABASE: read snapshot of tx_offset {snapshot_offset} in {elapsed_time:?}",
);

Ok(snapshot)
Expand All @@ -554,10 +572,12 @@ impl RelationalDB {
let snapshot_offset = snapshot.tx_offset;
Locking::restore_from_snapshot(snapshot, page_pool)
.inspect(|_| {
let elapsed_time = start.elapsed();

WORKER_METRICS.replay_snapshot_restore_time_seconds.with_label_values(database_identity).set(elapsed_time.as_secs_f64());

log::info!(
"[{database_identity}] DATABASE: restored from snapshot of tx_offset {} in {:?}",
snapshot_offset,
start.elapsed(),
"[{database_identity}] DATABASE: restored from snapshot of tx_offset {snapshot_offset} in {elapsed_time:?}",
)
})
.inspect_err(|e| {
Expand Down Expand Up @@ -1602,11 +1622,26 @@ where
}
};

let time_before = std::time::Instant::now();

let mut replay = datastore.replay(progress);
let start = replay.next_tx_offset();
let start_tx_offset = replay.next_tx_offset();
history
.fold_transactions_from(start, &mut replay)
.fold_transactions_from(start_tx_offset, &mut replay)
.map_err(anyhow::Error::from)?;

let time_elapsed = time_before.elapsed();
WORKER_METRICS
.replay_commitlog_time_seconds
.with_label_values(&database_identity)
.set(time_elapsed.as_secs_f64());

let end_tx_offset = replay.next_tx_offset();
WORKER_METRICS
.replay_commitlog_num_commits
.with_label_values(&database_identity)
.set((end_tx_offset - start_tx_offset) as _);

log::info!("[{database_identity}] DATABASE: applied transaction history");
datastore.rebuild_state_after_replay()?;
log::info!("[{database_identity}] DATABASE: rebuilt state after replay");
Expand Down
28 changes: 28 additions & 0 deletions crates/core/src/worker_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,34 @@ metrics_group!(
#[help = "The number of server -> client WebSocket messages waiting in any client's outgoing queue"]
#[labels(db: Identity)]
pub total_outgoing_queue_length: IntGaugeVec,

#[name = spacetime_replay_total_time_seconds]
#[help = "Total time spent replaying a database upon restart, including snapshot read, snapshot restore and commitlog replay"]
#[labels(db: Identity)]
// We expect a small number of observations per label
// (exactly one, for non-replicated databases, and one per leader change for replicated databases)
// so we'll just store a `Gauge` with the most recent observation for each database.
pub replay_total_time_seconds: GaugeVec,

#[name = spacetime_replay_snapshot_read_time_seconds]
#[help = "Time spent reading a snapshot from disk before restoring the snapshot upon restart"]
#[labels(db: Identity)]
pub replay_snapshot_read_time_seconds: GaugeVec,

#[name = spacetime_replay_snapshot_restore_time_seconds]
#[help = "Time spent restoring a database from a snapshot after reading the snapshot and before commitlog replay upon restart"]
#[labels(db: Identity)]
pub replay_snapshot_restore_time_seconds: GaugeVec,

#[name = spacetime_replay_commitlog_time_seconds]
#[help = "Time spent replaying the commitlog after restoring from a snapshot upon restart"]
#[labels(db: Identity)]
pub replay_commitlog_time_seconds: GaugeVec,

#[name = spacetime_replay_commitlog_num_commits]
#[help = "Number of commits replayed after restoring from a snapshot upon restart"]
#[labels(db: Identity)]
pub replay_commitlog_num_commits: IntGaugeVec,
}
);

Expand Down
Loading