Skip to content

Commit 818e9b2

Browse files
authored
snapshot: Ensure all snapshot files are durable (#4891)
When creating or compressing a snapshot, `fsync` all files and directories, so as to ensure that the snapshot is durable on the local disk. This obviously amounts to a large number of `fsync` calls, which may negatively impact performance of taking a snapshot -- since we hold a transaction lock while taking a snapshot, this is not to be taken lightly. # Expected complexity level and risk 3 -- performance impact # Testing I haven't quantified the performance impact.
1 parent f22f800 commit 818e9b2

5 files changed

Lines changed: 157 additions & 36 deletions

File tree

crates/core/src/db/snapshot.rs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,15 @@ impl SnapshotWorker {
144144
struct SnapshotMetrics {
145145
snapshot_timing_total: Histogram,
146146
snapshot_timing_inner: Histogram,
147+
snapshot_timing_fsync: Histogram,
147148
}
148149

149150
impl SnapshotMetrics {
150151
fn new(db: Identity) -> Self {
151152
Self {
152153
snapshot_timing_total: WORKER_METRICS.snapshot_creation_time_total.with_label_values(&db),
153154
snapshot_timing_inner: WORKER_METRICS.snapshot_creation_time_inner.with_label_values(&db),
155+
snapshot_timing_fsync: WORKER_METRICS.snapshot_creation_time_fsync.with_label_values(&db),
154156
}
155157
}
156158
}
@@ -221,27 +223,29 @@ impl SnapshotWorkerActor {
221223

222224
let database_identity = self.snapshot_repo.database_identity();
223225

224-
let maybe_offset = asyncify(move || {
226+
let maybe_snapshot = asyncify(move || {
225227
let _timer = inner_timer.start_timer();
226228
Locking::take_snapshot_internal(&state, &snapshot_repo)
227229
})
228230
.await
229231
.with_context(|| format!("error capturing snapshot of database {}", database_identity))?;
230-
maybe_offset
231-
.map(|(offset, _path)| offset)
232-
.inspect(|snapshot_offset| {
233-
let elapsed = Duration::from_secs_f64(timer.stop_and_record());
234-
info!(
235-
"Captured snapshot of database {} at TX offset {} in {:?}",
236-
database_identity, snapshot_offset, elapsed,
237-
);
238-
})
239-
.with_context(|| {
240-
format!(
241-
"refusing to take snapshot of database {} at TX offset -1",
242-
database_identity
243-
)
244-
})
232+
let (snapshot_offset, unflushed_snapshot) = maybe_snapshot.with_context(|| {
233+
format!(
234+
"refusing to take snapshot of database {} at TX offset -1",
235+
database_identity
236+
)
237+
})?;
238+
self.metrics
239+
.snapshot_timing_fsync
240+
.observe_closure_duration(|| unflushed_snapshot.sync_all())?;
241+
242+
let elapsed = Duration::from_secs_f64(timer.stop_and_record());
243+
info!(
244+
"Captured snapshot of database {} at TX offset {} in {:?}",
245+
database_identity, snapshot_offset, elapsed,
246+
);
247+
248+
Ok(snapshot_offset)
245249
}
246250

247251
async fn maybe_compress_snapshots(&mut self, latest_snapshot: TxOffset) {

crates/core/src/worker_metrics/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,12 @@ metrics_group!(
434434
#[buckets(0.0005, 0.001, 0.005, 0.01, 0.1, 1.0, 5.0, 10.0)]
435435
pub snapshot_creation_time_inner: HistogramVec,
436436

437+
#[name = spacetime_snapshot_creation_time_fsync_sec]
438+
#[help = "The time (in seconds) it took to fsync a database snapshot, excluding scheduling overhead"]
439+
#[labels(db: Identity)]
440+
#[buckets(0.0005, 0.001, 0.005, 0.01, 0.1, 1.0, 5.0, 10.0)]
441+
pub snapshot_creation_time_fsync: HistogramVec,
442+
437443
#[name = spacetime_snapshot_compression_time_total_sec]
438444
#[help = "The time (in seconds) it took to do a compression pass on the snapshot repository, including scheduling overhead"]
439445
#[labels(db: Identity)]

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use spacetimedb_schema::{
3939
reducer_name::ReducerName,
4040
schema::{ColumnSchema, IndexSchema, SequenceSchema, TableSchema},
4141
};
42-
use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotRepository};
42+
use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotRepository, UnflushedSnapshot};
4343
use spacetimedb_table::{
4444
indexes::RowPointer,
4545
page_pool::PagePool,
@@ -228,8 +228,10 @@ impl Locking {
228228
/// Returns an error if [`SnapshotRepository::create_snapshot`] returns an
229229
/// error.
230230
pub fn take_snapshot(&self, repo: &SnapshotRepository) -> Result<Option<SnapshotDirPath>> {
231-
let maybe_offset_and_path = Self::take_snapshot_internal(&self.committed_state, repo)?;
232-
Ok(maybe_offset_and_path.map(|(_, path)| path))
231+
Self::take_snapshot_internal(&self.committed_state, repo)?
232+
.map(|(_offset, snap)| snap.sync_all())
233+
.transpose()
234+
.map_err(Into::into)
233235
}
234236

235237
pub fn assert_system_tables_match(&self) -> Result<()> {
@@ -240,7 +242,7 @@ impl Locking {
240242
pub fn take_snapshot_internal(
241243
committed_state: &RwLock<CommittedState>,
242244
repo: &SnapshotRepository,
243-
) -> Result<Option<(TxOffset, SnapshotDirPath)>> {
245+
) -> Result<Option<(TxOffset, UnflushedSnapshot)>> {
244246
let mut committed_state = committed_state.write();
245247
let Some(tx_offset) = committed_state.next_tx_offset.checked_sub(1) else {
246248
return Ok(None);
@@ -253,9 +255,9 @@ impl Locking {
253255
);
254256

255257
let (tables, blob_store) = committed_state.persistent_tables_and_blob_store();
256-
let snapshot_dir = repo.create_snapshot(tables, blob_store, tx_offset)?;
258+
let unflushed_snapshot = repo.create_snapshot(tables, blob_store, tx_offset)?;
257259

258-
Ok(Some((tx_offset, snapshot_dir)))
260+
Ok(Some((tx_offset, unflushed_snapshot)))
259261
}
260262

261263
/// Returns a list over all the currently connected clients,

crates/snapshot/src/lib.rs

Lines changed: 115 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
#![allow(clippy::result_large_err)]
2525

26+
use log::warn;
2627
use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap};
2728
use spacetimedb_durability::TxOffset;
2829
use spacetimedb_fs_utils::compression::{
@@ -43,8 +44,10 @@ use spacetimedb_table::{
4344
page_pool::PagePool,
4445
table::Table,
4546
};
46-
use std::fs;
47+
use std::fs::{self, File};
48+
use std::io;
4749
use std::ops::RangeBounds;
50+
use std::path::Path;
4851
use std::time::{Duration, Instant};
4952
use std::{
5053
collections::BTreeMap,
@@ -173,6 +176,91 @@ struct TableEntry {
173176
pages: Vec<blake3::Hash>,
174177
}
175178

179+
/// A non-durable snapshot created via [SnapshotRepository::create_snapshot].
180+
///
181+
/// When [SnapshotRepository::create_snapshot] returns, all objects will have
182+
/// been written to the underlying object repository, but not `fsync`'ed.
183+
///
184+
/// Because this means that the snapshot may be incomplete, the [Snapshot] file
185+
/// will _not_ have been written, and the snapshot remains locked (via a [Lockfile]).
186+
///
187+
/// To turn an [UnflushedSnapshot] into a durable snapshot, call
188+
/// [UnflushedSnapshot::sync_all]. This will:
189+
///
190+
/// - sync all objects the snapshot references
191+
/// - sync the object repository root
192+
/// - write and sync the snapshot file
193+
/// - drop the lock file
194+
///
195+
/// This ensures that the snapshot file is present only if all objects are
196+
/// present and durable, and that the snapshot is considered invalid otherwise.
197+
///
198+
/// If [UnflushedSnapshot] is dropped without calling `sync_all`, the [Drop]
199+
/// impl will attempt to call `sync_all` and log any errors.
200+
///
201+
/// This two-stage snapshot creation exists in order to not introduce additional
202+
/// latency while the datastore is locked for snapshotting.
203+
#[must_use = "snapshots are not durable until `sync_all` is called"]
204+
pub struct UnflushedSnapshot {
205+
inner: Option<UnflushedSnapshotInner>,
206+
}
207+
208+
impl UnflushedSnapshot {
209+
/// Sync all objects in the snapshot and write out the snapshot file.
210+
///
211+
/// Returns the [SnapshotDirPath] on success.
212+
pub fn sync_all(mut self) -> Result<SnapshotDirPath, SnapshotError> {
213+
self.inner.take().unwrap().sync_all()
214+
}
215+
}
216+
217+
impl Drop for UnflushedSnapshot {
218+
fn drop(&mut self) {
219+
if let Some(inner) = self.inner.take()
220+
&& let Err(e) = inner.sync_all()
221+
{
222+
warn!("failed to sync unflushed snapshot dropped without syncing: {e}");
223+
}
224+
}
225+
}
226+
227+
struct UnflushedSnapshotInner {
228+
snapshot: Snapshot,
229+
snapshot_dir: SnapshotDirPath,
230+
snapshot_repo: SnapshotRepository,
231+
object_repo: DirTrie,
232+
lockfile: Lockfile,
233+
}
234+
235+
impl UnflushedSnapshotInner {
236+
fn sync_all(self) -> Result<SnapshotDirPath, SnapshotError> {
237+
fn fsync(path: &Path) -> io::Result<()> {
238+
File::open(path)
239+
.and_then(|fd| fd.sync_all())
240+
.map_err(|e| io::Error::new(e.kind(), format!("failed to fsync {}: {}", path.display(), e)))
241+
}
242+
243+
// Sync all objects and their parent directories.
244+
// The paths yielded by the [Snapshot::files] iterator are constructed
245+
// by [DirTree::file_path], which creates a path with a parent.
246+
// `parent()` is thus known to succeed.
247+
for (_, path) in self.snapshot.files(&self.object_repo) {
248+
fsync(&path)?;
249+
fsync(path.parent().unwrap())?;
250+
}
251+
// Sync the root directory of the object repo
252+
fsync(self.object_repo.root())?;
253+
// Write out the snapshot file (syncs internally).
254+
self.snapshot_repo
255+
.write_snapshot_file(&self.snapshot_dir, self.snapshot)?;
256+
257+
// We can now drop the lockfile.
258+
drop(self.lockfile);
259+
260+
Ok(self.snapshot_dir)
261+
}
262+
}
263+
176264
#[derive(Clone, Serialize, Deserialize)]
177265
pub struct Snapshot {
178266
/// A magic number: must be equal to [`MAGIC`].
@@ -650,15 +738,19 @@ impl SnapshotRepository {
650738
/// where `tables` is the committed state of all the tables in the database,
651739
/// and `blobs` is the committed state's blob store.
652740
///
653-
/// Returns the path of the newly-created snapshot directory.
741+
/// The returned [UnflushedSnapshot] is **not** durable -- call
742+
/// [UnflushedSnapshot::sync_all] to finalize it (see the struct docs for
743+
/// more details).
654744
///
655-
/// **NOTE**: The current snapshot is uncompressed to avoid the potential slowdown.
745+
/// Also note that the snapshot remains locked for as long as [UnflushedSnapshot]
746+
/// is alive. It will not appear in [Self::all_snapshots] nor can it be
747+
/// modified by methods in [SnapshotRepository].
656748
pub fn create_snapshot<'db>(
657749
&self,
658750
tables: impl Iterator<Item = &'db mut Table>,
659751
blobs: &'db dyn BlobStore,
660752
tx_offset: TxOffset,
661-
) -> Result<SnapshotDirPath, SnapshotError> {
753+
) -> Result<UnflushedSnapshot, SnapshotError> {
662754
// Invalidate equal to or newer than `tx_offset`.
663755
//
664756
// This is because snapshots don't currently track the epoch in which
@@ -692,7 +784,7 @@ impl SnapshotRepository {
692784
// Before performing any observable operations,
693785
// acquire a lockfile on the snapshot you want to create.
694786
// Because we could be compressing the snapshot.
695-
let _lock = Lockfile::for_file(&snapshot_dir)?;
787+
let lockfile = Lockfile::for_file(&snapshot_dir)?;
696788

697789
// Create the snapshot directory.
698790
snapshot_dir.create()?;
@@ -708,8 +800,6 @@ impl SnapshotRepository {
708800
snapshot.write_all_blobs(&object_repo, blobs, prev_snapshot.as_ref(), &mut counter)?;
709801
snapshot.write_all_tables(&object_repo, tables, prev_snapshot.as_ref(), &mut counter)?;
710802

711-
self.write_snapshot_file(&snapshot_dir, snapshot)?;
712-
713803
log::info!(
714804
"[{}] SNAPSHOT {:0>20}: Hardlinked {} objects and wrote {} objects",
715805
self.database_identity,
@@ -718,9 +808,15 @@ impl SnapshotRepository {
718808
counter.objects_written,
719809
);
720810

721-
// Success! return the directory of the newly-created snapshot.
722-
// The lockfile will be dropped here.
723-
Ok(snapshot_dir)
811+
Ok(UnflushedSnapshot {
812+
inner: Some(UnflushedSnapshotInner {
813+
snapshot,
814+
snapshot_dir,
815+
snapshot_repo: self.clone(),
816+
object_repo,
817+
lockfile,
818+
}),
819+
})
724820
}
725821

726822
/// Write the on-disk snapshot file containing the BSATN-encoded `snapshot`
@@ -744,6 +840,12 @@ impl SnapshotRepository {
744840
snapshot_file.write_all(hash.as_bytes())?;
745841
snapshot_file.write_all(&snapshot_bsatn)?;
746842
snapshot_file.flush()?;
843+
// fsync file + enclosing directory.
844+
snapshot_file
845+
.into_inner()
846+
.expect("buffered writer just flushed")
847+
.sync_all()?;
848+
File::open(&snapshot_dir.0)?.sync_all()?;
747849
}
748850

749851
Ok(())
@@ -1102,6 +1204,7 @@ impl SnapshotRepository {
11021204
if old_file.is_compressed() {
11031205
std::fs::hard_link(old_path, src.with_extension("_tmp"))?;
11041206
std::fs::rename(src.with_extension("_tmp"), src)?;
1207+
File::open(src.parent().unwrap())?.sync_all()?;
11051208
if let Some(stats) = stats {
11061209
stats.hardlinked += 1;
11071210
}
@@ -1134,6 +1237,7 @@ impl SnapshotRepository {
11341237
log::error!("Failed to compress object file {path:?}: {err}");
11351238
})?;
11361239
}
1240+
File::open(dir.root())?.sync_all()?;
11371241

11381242
// Compress the snapshot file last,
11391243
// which marks the whole snapshot as compressed.
@@ -1142,6 +1246,7 @@ impl SnapshotRepository {
11421246
compress(&old, &snapshot_file.0, None, None).inspect_err(|err| {
11431247
log::error!("Failed to compress snapshot file {snapshot_file:?}: {err}");
11441248
})?;
1249+
File::open(&snapshot_dir.0)?.sync_all()?;
11451250

11461251
log::info!(
11471252
"Compressed snapshot {snapshot_dir:?} of replica {}: {compress_type:?}",

crates/snapshot/src/remote.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -727,10 +727,13 @@ where
727727
Some(file_path) => {
728728
let dir = file_path.parent().expect("file not in a directory").to_owned();
729729
fs::create_dir_all(&dir).await?;
730-
let (tmp_file, tmp_out) = spawn_blocking(move || {
731-
let tmp = NamedTempFile::new_in(dir)?;
732-
let out = tmp.reopen()?;
733-
Ok::<_, io::Error>((tmp, out))
730+
let (tmp_file, tmp_out) = spawn_blocking({
731+
let dir = dir.clone();
732+
move || {
733+
let tmp = NamedTempFile::new_in(dir)?;
734+
let out = tmp.reopen()?;
735+
Ok::<_, io::Error>((tmp, out))
736+
}
734737
})
735738
.await
736739
.unwrap()?;
@@ -743,6 +746,7 @@ where
743746
.await
744747
.unwrap()
745748
.map_err(|e| e.error)?;
749+
fs::File::open(dir).await?.sync_all().await?;
746750
}
747751

748752
None => {

0 commit comments

Comments
 (0)