Skip to content

Commit 3b28744

Browse files
authored
Re-land Replay extraction PRs (#4893)
# Description of Changes See the corresponding commits/PRs for descriptions. # API and ABI breaking changes None # Expected complexity level and risk 1 -- individual PRs already reviewed. # Testing No semantic changes.
1 parent b150cf6 commit 3b28744

5 files changed

Lines changed: 1146 additions & 1026 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 14 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
1717
use spacetimedb_datastore::locking_tx_datastore::state_view::{
1818
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
1919
};
20-
use spacetimedb_datastore::locking_tx_datastore::{IndexScanPointOrRange, MutTxId, TxId};
20+
use spacetimedb_datastore::locking_tx_datastore::{ApplyHistoryCounters, IndexScanPointOrRange, MutTxId, TxId};
2121
use spacetimedb_datastore::system_tables::{
2222
system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID,
2323
};
@@ -1617,62 +1617,20 @@ impl RelationalDB {
16171617
}
16181618
}
16191619

1620-
fn apply_history<H>(datastore: &Locking, database_identity: Identity, history: H) -> Result<(), DBError>
1621-
where
1622-
H: durability::History<TxData = Txdata>,
1623-
{
1624-
log::info!("[{database_identity}] DATABASE: applying transaction history...");
1625-
1626-
// TODO: Revisit once we actually replay history suffixes, ie. starting
1627-
// from an offset larger than the history's min offset.
1628-
// TODO: We may want to require that a `tokio::runtime::Handle` is
1629-
// always supplied when constructing a `RelationalDB`. This would allow
1630-
// to spawn a timer task here which just prints the progress periodically
1631-
// in case the history is finite but very long.
1632-
let (_, max_tx_offset) = history.tx_range_hint();
1633-
let mut last_logged_percentage = 0;
1634-
let progress = |tx_offset: u64| {
1635-
if let Some(max_tx_offset) = max_tx_offset {
1636-
let percentage = f64::floor((tx_offset as f64 / max_tx_offset as f64) * 100.0) as i32;
1637-
if percentage > last_logged_percentage && percentage % 10 == 0 {
1638-
log::info!("[{database_identity}] Loaded {percentage}% ({tx_offset}/{max_tx_offset})");
1639-
last_logged_percentage = percentage;
1640-
}
1641-
// Print _something_ even if we don't know what's still ahead.
1642-
} else if tx_offset.is_multiple_of(10_000) {
1643-
log::info!("[{database_identity}] Loading transaction {tx_offset}");
1644-
}
1620+
fn apply_history(
1621+
datastore: &Locking,
1622+
database_identity: Identity,
1623+
history: impl durability::History<TxData = Txdata>,
1624+
) -> Result<(), DBError> {
1625+
let counters = ApplyHistoryCounters {
1626+
replay_commitlog_time_seconds: WORKER_METRICS
1627+
.replay_commitlog_time_seconds
1628+
.with_label_values(&database_identity),
1629+
replay_commitlog_num_commits: WORKER_METRICS
1630+
.replay_commitlog_num_commits
1631+
.with_label_values(&database_identity),
16451632
};
1646-
1647-
let time_before = std::time::Instant::now();
1648-
1649-
let mut replay = datastore.replay(
1650-
progress,
1651-
// We don't want to instantiate an incorrect state;
1652-
// if the commitlog contains an inconsistency we'd rather get a hard error than showing customers incorrect data.
1653-
spacetimedb_datastore::locking_tx_datastore::datastore::ErrorBehavior::FailFast,
1654-
);
1655-
let start_tx_offset = replay.next_tx_offset();
1656-
history
1657-
.fold_transactions_from(start_tx_offset, &mut replay)
1658-
.map_err(anyhow::Error::from)?;
1659-
1660-
let time_elapsed = time_before.elapsed();
1661-
WORKER_METRICS
1662-
.replay_commitlog_time_seconds
1663-
.with_label_values(&database_identity)
1664-
.set(time_elapsed.as_secs_f64());
1665-
1666-
let end_tx_offset = replay.next_tx_offset();
1667-
WORKER_METRICS
1668-
.replay_commitlog_num_commits
1669-
.with_label_values(&database_identity)
1670-
.set((end_tx_offset - start_tx_offset) as _);
1671-
1672-
log::info!("[{database_identity}] DATABASE: applied transaction history");
1673-
datastore.rebuild_state_after_replay()?;
1674-
log::info!("[{database_identity}] DATABASE: rebuilt state after replay");
1675-
1633+
spacetimedb_datastore::locking_tx_datastore::apply_history(datastore, database_identity, history, counters)?;
16761634
Ok(())
16771635
}
16781636

0 commit comments

Comments
 (0)