@@ -17,7 +17,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
1717use spacetimedb_datastore:: locking_tx_datastore:: state_view:: {
1818 IterByColEqMutTx , IterByColRangeMutTx , IterMutTx , StateView ,
1919} ;
20- use spacetimedb_datastore:: locking_tx_datastore:: { IndexScanPointOrRange , MutTxId , TxId } ;
20+ use spacetimedb_datastore:: locking_tx_datastore:: { ApplyHistoryCounters , IndexScanPointOrRange , MutTxId , TxId } ;
2121use spacetimedb_datastore:: system_tables:: {
2222 system_tables, StModuleRow , ST_CLIENT_ID , ST_CONNECTION_CREDENTIALS_ID , ST_VIEW_SUB_ID ,
2323} ;
@@ -1617,62 +1617,20 @@ impl RelationalDB {
16171617 }
16181618}
16191619
1620- fn apply_history < H > ( datastore : & Locking , database_identity : Identity , history : H ) -> Result < ( ) , DBError >
1621- where
1622- H : durability:: History < TxData = Txdata > ,
1623- {
1624- log:: info!( "[{database_identity}] DATABASE: applying transaction history..." ) ;
1625-
1626- // TODO: Revisit once we actually replay history suffixes, ie. starting
1627- // from an offset larger than the history's min offset.
1628- // TODO: We may want to require that a `tokio::runtime::Handle` is
1629- // always supplied when constructing a `RelationalDB`. This would allow
1630- // to spawn a timer task here which just prints the progress periodically
1631- // in case the history is finite but very long.
1632- let ( _, max_tx_offset) = history. tx_range_hint ( ) ;
1633- let mut last_logged_percentage = 0 ;
1634- let progress = |tx_offset : u64 | {
1635- if let Some ( max_tx_offset) = max_tx_offset {
1636- let percentage = f64:: floor ( ( tx_offset as f64 / max_tx_offset as f64 ) * 100.0 ) as i32 ;
1637- if percentage > last_logged_percentage && percentage % 10 == 0 {
1638- log:: info!( "[{database_identity}] Loaded {percentage}% ({tx_offset}/{max_tx_offset})" ) ;
1639- last_logged_percentage = percentage;
1640- }
1641- // Print _something_ even if we don't know what's still ahead.
1642- } else if tx_offset. is_multiple_of ( 10_000 ) {
1643- log:: info!( "[{database_identity}] Loading transaction {tx_offset}" ) ;
1644- }
1620+ fn apply_history (
1621+ datastore : & Locking ,
1622+ database_identity : Identity ,
1623+ history : impl durability:: History < TxData = Txdata > ,
1624+ ) -> Result < ( ) , DBError > {
1625+ let counters = ApplyHistoryCounters {
1626+ replay_commitlog_time_seconds : WORKER_METRICS
1627+ . replay_commitlog_time_seconds
1628+ . with_label_values ( & database_identity) ,
1629+ replay_commitlog_num_commits : WORKER_METRICS
1630+ . replay_commitlog_num_commits
1631+ . with_label_values ( & database_identity) ,
16451632 } ;
1646-
1647- let time_before = std:: time:: Instant :: now ( ) ;
1648-
1649- let mut replay = datastore. replay (
1650- progress,
1651- // We don't want to instantiate an incorrect state;
1652- // if the commitlog contains an inconsistency we'd rather get a hard error than showing customers incorrect data.
1653- spacetimedb_datastore:: locking_tx_datastore:: datastore:: ErrorBehavior :: FailFast ,
1654- ) ;
1655- let start_tx_offset = replay. next_tx_offset ( ) ;
1656- history
1657- . fold_transactions_from ( start_tx_offset, & mut replay)
1658- . map_err ( anyhow:: Error :: from) ?;
1659-
1660- let time_elapsed = time_before. elapsed ( ) ;
1661- WORKER_METRICS
1662- . replay_commitlog_time_seconds
1663- . with_label_values ( & database_identity)
1664- . set ( time_elapsed. as_secs_f64 ( ) ) ;
1665-
1666- let end_tx_offset = replay. next_tx_offset ( ) ;
1667- WORKER_METRICS
1668- . replay_commitlog_num_commits
1669- . with_label_values ( & database_identity)
1670- . set ( ( end_tx_offset - start_tx_offset) as _ ) ;
1671-
1672- log:: info!( "[{database_identity}] DATABASE: applied transaction history" ) ;
1673- datastore. rebuild_state_after_replay ( ) ?;
1674- log:: info!( "[{database_identity}] DATABASE: rebuilt state after replay" ) ;
1675-
1633+ spacetimedb_datastore:: locking_tx_datastore:: apply_history ( datastore, database_identity, history, counters) ?;
16761634 Ok ( ( ) )
16771635}
16781636
0 commit comments