Skip to content

Commit 3d1a91c

Browse files
kimShubham8287
andauthored
Handle snapshot restore more robustly (#2735)
Signed-off-by: Kim Altintop <kim@eagain.io> Signed-off-by: Shubham Mishra <shivam828787@gmail.com> Co-authored-by: Shubham Mishra <shubham@clockworklabs.io>
1 parent 2c5d78d commit 3d1a91c

8 files changed

Lines changed: 273 additions & 47 deletions

File tree

crates/commitlog/src/commitlog.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,15 @@ impl<R: Repo, T> Generic<R, T> {
182182
self.head.next_tx_offset().checked_sub(1)
183183
}
184184

185+
/// The first transaction offset written to disk, or `None` if nothing has
186+
/// been written yet.
187+
pub fn min_committed_offset(&self) -> Option<u64> {
188+
self.tail
189+
.first()
190+
.copied()
191+
.or_else(|| (!self.head.is_empty()).then(|| self.head.min_tx_offset()))
192+
}
193+
185194
// Helper to obtain a list of the segment offsets which include transaction
186195
// offset `offset`.
187196
//

crates/commitlog/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,13 @@ impl<T> Commitlog<T> {
131131
self.inner.read().unwrap().max_committed_offset()
132132
}
133133

134+
/// Determine the minimum transaction offset in the log.
135+
///
136+
/// The offset is `None` if the log hasn't been flushed to disk yet.
137+
pub fn min_committed_offset(&self) -> Option<u64> {
138+
self.inner.read().unwrap().min_committed_offset()
139+
}
140+
134141
/// Get the current epoch.
135142
///
136143
/// See also: [`Commit::epoch`].

crates/core/src/db/relational_db.rs

Lines changed: 151 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use super::datastore::{
1616
};
1717
use super::db_metrics::DB_METRICS;
1818
use crate::db::datastore::system_tables::{StModuleRow, WASM_MODULE};
19-
use crate::error::{DBError, DatabaseError, TableError};
19+
use crate::error::{DBError, DatabaseError, RestoreSnapshotError, TableError};
2020
use crate::execution_context::{ReducerContext, Workload};
2121
use crate::messages::control_db::HostType;
2222
use crate::util::{asyncify, spawn_rayon};
@@ -336,12 +336,14 @@ impl RelationalDB {
336336
.map(|pair| pair.0.clone())
337337
.as_deref()
338338
.and_then(|durability| durability.durable_tx_offset());
339+
let (min_commitlog_offset, _) = history.tx_range_hint();
339340

340341
log::info!("[{database_identity}] DATABASE: durable_tx_offset is {durable_tx_offset:?}");
341342
let inner = Self::restore_from_snapshot_or_bootstrap(
342343
database_identity,
343344
snapshot_repo.as_deref(),
344345
durable_tx_offset,
346+
min_commitlog_offset,
345347
page_pool,
346348
)?;
347349

@@ -460,43 +462,102 @@ impl RelationalDB {
460462
database_identity: Identity,
461463
snapshot_repo: Option<&SnapshotRepository>,
462464
durable_tx_offset: Option<TxOffset>,
465+
min_commitlog_offset: TxOffset,
463466
page_pool: PagePool,
464-
) -> Result<Locking, DBError> {
465-
if let Some(snapshot_repo) = snapshot_repo {
466-
if let Some(durable_tx_offset) = durable_tx_offset {
467-
// Don't restore from a snapshot newer than the `durable_tx_offset`,
468-
// so that you drop TXes which were committed but not durable before the restart.
469-
if let Some(tx_offset) = snapshot_repo.latest_snapshot_older_than(durable_tx_offset)? {
470-
// Mark any newer snapshots as invalid, as the new history will diverge from their state.
471-
snapshot_repo.invalidate_newer_snapshots(durable_tx_offset)?;
472-
log::info!("[{database_identity}] DATABASE: restoring snapshot of tx_offset {tx_offset}");
473-
let start = std::time::Instant::now();
474-
let snapshot = snapshot_repo.read_snapshot(tx_offset, &page_pool)?;
467+
) -> Result<Locking, RestoreSnapshotError> {
468+
fn try_restore_snapshot(
469+
snapshot_repo: &SnapshotRepository,
470+
snapshot_offset: TxOffset,
471+
database_identity: Identity,
472+
page_pool: PagePool,
473+
) -> Result<Locking, RestoreSnapshotError> {
474+
log::info!(
475+
"[{database_identity}] DATABASE: restoring snapshot of tx_offset {}",
476+
snapshot_offset
477+
);
478+
let start = std::time::Instant::now();
479+
let snapshot = snapshot_repo
480+
.read_snapshot(snapshot_offset, &page_pool)
481+
.map_err(Box::new)?;
482+
log::info!(
483+
"[{database_identity}] DATABASE: read snapshot of tx_offset {} in {:?}",
484+
snapshot_offset,
485+
start.elapsed(),
486+
);
487+
if snapshot.database_identity != database_identity {
488+
return Err(RestoreSnapshotError::IdentityMismatch {
489+
expected: database_identity,
490+
actual: snapshot.database_identity,
491+
});
492+
}
493+
let start = std::time::Instant::now();
494+
Locking::restore_from_snapshot(snapshot, page_pool)
495+
.inspect(|_| {
475496
log::info!(
476-
"[{database_identity}] DATABASE: read snapshot of tx_offset {tx_offset} in {:?}",
497+
"[{database_identity}] DATABASE: restored from snapshot of tx_offset {} in {:?}",
498+
snapshot_offset,
477499
start.elapsed(),
478-
);
479-
if snapshot.database_identity != database_identity {
480-
// TODO: return a proper typed error
481-
return Err(anyhow::anyhow!(
482-
"Snapshot has incorrect database_identity: expected {database_identity} but found {}",
483-
snapshot.database_identity,
484-
)
485-
.into());
500+
)
501+
})
502+
.inspect_err(|e| {
503+
log::warn!(
504+
"[{database_identity}] DATABASE: failed to restore snapshot of tx_offset {}: {}",
505+
snapshot_offset,
506+
e
507+
)
508+
})
509+
.map_err(Box::new)
510+
.map_err(RestoreSnapshotError::Datastore)
511+
}
512+
513+
if let Some((snapshot_repo, durable_tx_offset)) = snapshot_repo.zip(durable_tx_offset) {
514+
// Mark any newer snapshots as invalid, as the history past
515+
// `durable_tx_offset` may have been reset and thus diverge from
516+
// any snapshots taken earlier.
517+
snapshot_repo
518+
.invalidate_newer_snapshots(durable_tx_offset)
519+
.map_err(Box::new)?;
520+
521+
// Try to restore from any snapshot that was taken within the
522+
// range `(min_commitlog_offset + 1)..=durable_tx_offset`.
523+
let mut upper_bound = durable_tx_offset;
524+
loop {
525+
let Some(snapshot_offset) = snapshot_repo
526+
.latest_snapshot_older_than(upper_bound)
527+
.map_err(Box::new)?
528+
else {
529+
break;
530+
};
531+
if min_commitlog_offset + 1 > snapshot_offset {
532+
break;
533+
}
534+
if let Ok(datastore) =
535+
try_restore_snapshot(snapshot_repo, snapshot_offset, database_identity, page_pool.clone())
536+
{
537+
return Ok(datastore);
538+
} else {
539+
// `latest_snapshot_older_than` is inclusive of the
540+
// upper bound, so subtract one and give up if there
541+
// are no more offsets to try.
542+
match snapshot_offset.checked_sub(1) {
543+
None => break,
544+
Some(older_than) => upper_bound = older_than,
486545
}
487-
let start = std::time::Instant::now();
488-
let res = Locking::restore_from_snapshot(snapshot, page_pool);
489-
log::info!(
490-
"[{database_identity}] DATABASE: restored from snapshot of tx_offset {tx_offset} in {:?}",
491-
start.elapsed(),
492-
);
493-
return res;
494546
}
495547
}
496-
log::info!("[{database_identity}] DATABASE: no snapshot on disk");
548+
}
549+
log::info!("[{database_identity}] DATABASE: no usable snapshot on disk");
550+
551+
// If we didn't find a snapshot and the commitlog doesn't start at the
552+
// zero-th commit (e.g. due to archiving), there is no way to restore
553+
// the database.
554+
if min_commitlog_offset > 0 {
555+
return Err(RestoreSnapshotError::NoConnectedSnapshot { min_commitlog_offset });
497556
}
498557

499558
Locking::bootstrap(database_identity, page_pool)
559+
.map_err(Box::new)
560+
.map_err(RestoreSnapshotError::Bootstrap)
500561
}
501562

502563
/// Apply the provided [`spacetimedb_durability::History`] onto the database
@@ -1267,7 +1328,7 @@ where
12671328
// always supplied when constructing a `RelationalDB`. This would allow
12681329
// to spawn a timer task here which just prints the progress periodically
12691330
// in case the history is finite but very long.
1270-
let max_tx_offset = history.max_tx_offset();
1331+
let (_, max_tx_offset) = history.tx_range_hint();
12711332
let mut last_logged_percentage = 0;
12721333
let progress = |tx_offset: u64| {
12731334
if let Some(max_tx_offset) = max_tx_offset {
@@ -1665,13 +1726,15 @@ pub mod tests_utils {
16651726

16661727
impl durability::History for TestHistory {
16671728
type TxData = Txdata;
1729+
16681730
fn fold_transactions_from<D>(&self, offset: TxOffset, decoder: D) -> Result<(), D::Error>
16691731
where
16701732
D: commitlog::Decoder,
16711733
D::Error: From<commitlog::error::Traversal>,
16721734
{
16731735
self.0.fold_transactions_from(offset, decoder)
16741736
}
1737+
16751738
fn transactions_from<'a, D>(
16761739
&self,
16771740
offset: TxOffset,
@@ -1684,8 +1747,12 @@ pub mod tests_utils {
16841747
{
16851748
self.0.transactions_from(offset, decoder)
16861749
}
1687-
fn max_tx_offset(&self) -> Option<TxOffset> {
1688-
self.0.max_committed_offset()
1750+
1751+
fn tx_range_hint(&self) -> (TxOffset, Option<TxOffset>) {
1752+
let min = self.0.min_committed_offset().unwrap_or_default();
1753+
let max = self.0.max_committed_offset();
1754+
1755+
(min, max)
16891756
}
16901757
}
16911758

@@ -1721,6 +1788,7 @@ mod tests {
17211788
#![allow(clippy::disallowed_macros)]
17221789

17231790
use std::cell::RefCell;
1791+
use std::fs::OpenOptions;
17241792
use std::path::PathBuf;
17251793
use std::rc::Rc;
17261794

@@ -1737,7 +1805,7 @@ mod tests {
17371805
use commitlog::payload::txdata;
17381806
use commitlog::Commitlog;
17391807
use durability::EmptyHistory;
1740-
use pretty_assertions::assert_eq;
1808+
use pretty_assertions::{assert_eq, assert_matches};
17411809
use spacetimedb_data_structures::map::IntMap;
17421810
use spacetimedb_fs_utils::compression::{CompressCount, CompressType};
17431811
use spacetimedb_lib::db::raw_def::v9::{btree, RawTableDefBuilder};
@@ -2769,6 +2837,7 @@ mod tests {
27692837
Identity::ZERO,
27702838
Some(&repo),
27712839
Some(last_compress),
2840+
0,
27722841
PagePool::new_for_test(),
27732842
)?;
27742843

@@ -2795,7 +2864,7 @@ mod tests {
27952864

27962865
let last = repo.latest_snapshot()?;
27972866
let stdb =
2798-
RelationalDB::restore_from_snapshot_or_bootstrap(identity, Some(&repo), last, PagePool::new_for_test())?;
2867+
RelationalDB::restore_from_snapshot_or_bootstrap(identity, Some(&repo), last, 0, PagePool::new_for_test())?;
27992868

28002869
let out = TempDir::with_prefix("snapshot_test")?;
28012870
let dir = SnapshotsPath::from_path_unchecked(out.path());
@@ -2808,4 +2877,52 @@ mod tests {
28082877

28092878
Ok(())
28102879
}
2880+
2881+
#[test]
2882+
fn tries_older_snapshots() -> ResultTest<()> {
2883+
let stdb = TestDB::in_memory()?;
2884+
stdb.path().snapshots().create()?;
2885+
let repo = SnapshotRepository::open(stdb.path().snapshots(), stdb.database_identity(), 85)?;
2886+
2887+
stdb.take_snapshot(&repo)?.expect("failed to take snapshot");
2888+
{
2889+
let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
2890+
let schema = my_table(AlgebraicType::I32);
2891+
let table_id = stdb.create_table(&mut tx, schema)?;
2892+
for v in 0..3 {
2893+
insert(&stdb, &mut tx, table_id, &product![v])?;
2894+
}
2895+
stdb.commit_tx(tx)?;
2896+
}
2897+
stdb.take_snapshot(&repo)?.expect("failed to take snapshot");
2898+
2899+
let try_restore = |durable_tx_offset, min_commitlog_offset| {
2900+
RelationalDB::restore_from_snapshot_or_bootstrap(
2901+
stdb.database_identity(),
2902+
Some(&repo),
2903+
Some(durable_tx_offset),
2904+
min_commitlog_offset,
2905+
PagePool::new_for_test(),
2906+
)
2907+
};
2908+
2909+
try_restore(1, 0)?;
2910+
// We can restore from the previous snapshot
2911+
// if the snapshot file is corrupted
2912+
repo.snapshot_dir_path(1)
2913+
.snapshot_file(1)
2914+
.open_file(OpenOptions::new().write(true))?
2915+
.set_len(1)?;
2916+
try_restore(1, 0)?;
2917+
// Also if it's gone
2918+
std::fs::remove_file(repo.snapshot_dir_path(1).snapshot_file(1))?;
2919+
try_restore(1, 0)?;
2920+
// But not if the commitlog starts after the previous snapshot
2921+
assert_matches!(
2922+
try_restore(1, 1).map(drop),
2923+
Err(RestoreSnapshotError::NoConnectedSnapshot { .. })
2924+
);
2925+
2926+
Ok(())
2927+
}
28112928
}

crates/core/src/error.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use std::sync::{MutexGuard, PoisonError};
55

66
use enum_as_inner::EnumAsInner;
77
use hex::FromHexError;
8+
use spacetimedb_commitlog::repo::TxOffset;
89
use spacetimedb_expr::errors::TypingError;
10+
use spacetimedb_lib::Identity;
911
use spacetimedb_sats::AlgebraicType;
1012
use spacetimedb_schema::error::ValidationErrors;
1113
use spacetimedb_snapshot::SnapshotError;
@@ -236,6 +238,8 @@ pub enum DBError {
236238
error: Box<DBError>,
237239
sql: Box<str>,
238240
},
241+
#[error(transparent)]
242+
RestoreSnapshot(#[from] RestoreSnapshotError),
239243
}
240244

241245
impl From<bflatn_to::Error> for DBError {
@@ -409,3 +413,17 @@ impl From<ErrorVm> for NodesError {
409413
DBError::from(err).into()
410414
}
411415
}
416+
417+
#[derive(Debug, Error)]
418+
pub enum RestoreSnapshotError {
419+
#[error("Snapshot has incorrect database_identity: expected {expected} but found {actual}")]
420+
IdentityMismatch { expected: Identity, actual: Identity },
421+
#[error("Failed to restore datastore from snapshot")]
422+
Datastore(#[source] Box<DBError>),
423+
#[error("Failed to read snapshot")]
424+
Snapshot(#[from] Box<SnapshotError>),
425+
#[error("Failed to bootstrap datastore without snapshot")]
426+
Bootstrap(#[source] Box<DBError>),
427+
#[error("No connected snapshot found, commitlog starts at {min_commitlog_offset}")]
428+
NoConnectedSnapshot { min_commitlog_offset: TxOffset },
429+
}

crates/core/src/host/host_controller.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -698,8 +698,7 @@ impl Host {
698698
///
699699
/// Note that this does **not** run module initialization routines, but may
700700
/// create on-disk artifacts if the host / database did not exist.
701-
702-
#[tracing::instrument(level = "debug", skip_all, err)]
701+
#[tracing::instrument(level = "debug", skip_all)]
703702
async fn try_init(host_controller: &HostController, database: Database, replica_id: u64) -> anyhow::Result<Self> {
704703
let HostController {
705704
data_dir,
@@ -738,7 +737,17 @@ impl Host {
738737
Some(durability),
739738
Some(snapshot_repo),
740739
page_pool.clone(),
741-
)?;
740+
)
741+
// Make sure we log the source chain of the error
742+
// as a single line, with the help of `anyhow`.
743+
.map_err(anyhow::Error::from)
744+
.inspect_err(|e| {
745+
tracing::error!(
746+
database = %database.database_identity,
747+
replica = replica_id,
748+
"Failed to open database: {e:#}"
749+
);
750+
})?;
742751
if let Some(start_snapshot_watcher) = start_snapshot_watcher {
743752
let watcher = db.subscribe_to_snapshots().expect("we passed snapshot_repo");
744753
start_snapshot_watcher(watcher)

crates/durability/src/imp/local.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,10 @@ impl<T: Encode + 'static> History for Local<T> {
345345
self.clog.transactions_from(offset, decoder)
346346
}
347347

348-
fn max_tx_offset(&self) -> Option<TxOffset> {
349-
self.clog.max_committed_offset()
348+
fn tx_range_hint(&self) -> (TxOffset, Option<TxOffset>) {
349+
let min = self.clog.min_committed_offset().unwrap_or_default();
350+
let max = self.clog.max_committed_offset();
351+
352+
(min, max)
350353
}
351354
}

0 commit comments

Comments
 (0)