Skip to content

Commit 7b4bffa

Browse files
committed
feat(local): add fsync to LocalFileSystem for durability
Call sync_all() on written files and fsync parent directories at all write-path boundaries (put, copy, rename, multipart complete) so that a successful return guarantees data is durable on disk, matching the implicit contract of cloud object stores.
1 parent 030f29d commit 7b4bffa

1 file changed

Lines changed: 161 additions & 33 deletions

File tree

src/local.rs

Lines changed: 161 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ pub(crate) enum Error {
114114
#[error("Filenames containing trailing '/#\\d+/' are not supported: {}", path)]
115115
InvalidPath { path: String },
116116

117+
#[error("Unable to sync data to file {}: {}", path.display(), source)]
118+
UnableToSyncFile { source: io::Error, path: PathBuf },
119+
117120
#[error("Upload aborted")]
118121
Aborted,
119122
}
@@ -197,11 +200,13 @@ impl From<Error> for super::Error {
197200
/// [`LocalFileSystem::copy_opts`] is implemented using [`std::fs::hard_link`], and therefore
198201
/// does not support copying across filesystem boundaries.
199202
///
200-
#[derive(Debug)]
203+
#[derive(Clone, Debug)]
201204
pub struct LocalFileSystem {
202205
config: Arc<Config>,
203206
// if you want to delete empty directories when deleting files
204207
automatic_cleanup: bool,
208+
// if true, call fsync on files and directories after writes
209+
fsync: bool,
205210
}
206211

207212
#[derive(Debug)]
@@ -229,6 +234,7 @@ impl LocalFileSystem {
229234
root: Url::parse("file:///").unwrap(),
230235
}),
231236
automatic_cleanup: false,
237+
fsync: false,
232238
}
233239
}
234240

@@ -247,6 +253,7 @@ impl LocalFileSystem {
247253
root: absolute_path_to_url(path)?,
248254
}),
249255
automatic_cleanup: false,
256+
fsync: false,
250257
})
251258
}
252259

@@ -260,6 +267,20 @@ impl LocalFileSystem {
260267
self.automatic_cleanup = automatic_cleanup;
261268
self
262269
}
270+
271+
/// Enable fsync after writes to ensure durability
272+
///
273+
/// When enabled, [`LocalFileSystem`] will call [`File::sync_all`] on written files
274+
/// and fsync parent directories after write operations ([`put_opts`](ObjectStore::put_opts),
275+
/// [`copy_opts`](ObjectStore::copy_opts), [`rename_opts`](ObjectStore::rename_opts),
276+
/// and multipart upload completion), ensuring that when an operation returns success,
277+
/// both the file contents and the directory entries are durable on stable storage.
278+
///
279+
/// This is disabled by default.
280+
pub fn with_fsync(mut self, fsync: bool) -> Self {
281+
self.fsync = fsync;
282+
self
283+
}
263284
}
264285

265286
impl Config {
@@ -348,8 +369,9 @@ impl ObjectStore for LocalFileSystem {
348369
}
349370

350371
let path = self.path_to_filesystem(location)?;
372+
let fsync = self.fsync;
351373
maybe_spawn_blocking(move || {
352-
let (mut file, staging_path) = new_staged_upload(&path)?;
374+
let (mut file, staging_path) = new_staged_upload(&path, fsync)?;
353375
let mut e_tag = None;
354376

355377
let err = match payload.iter().try_for_each(|x| file.write_all(x)) {
@@ -361,27 +383,49 @@ impl ObjectStore for LocalFileSystem {
361383
e_tag = Some(get_etag(&metadata));
362384
match opts.mode {
363385
PutMode::Overwrite => {
386+
if fsync {
387+
file.sync_all().map_err(|source| Error::UnableToSyncFile {
388+
source,
389+
path: staging_path.clone(),
390+
})?;
391+
}
364392
// For some fuse types of file systems, the file must be closed first
365393
// to trigger the upload operation, and then renamed, such as Blobfuse
366394
std::mem::drop(file);
367395
match std::fs::rename(&staging_path, &path) {
368-
Ok(_) => None,
396+
Ok(_) => {
397+
if fsync {
398+
fsync_parent_dir(&path)?;
399+
}
400+
None
401+
}
369402
Err(source) => Some(Error::UnableToRenameFile { source }),
370403
}
371404
}
372-
PutMode::Create => match std::fs::hard_link(&staging_path, &path) {
373-
Ok(_) => {
374-
let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup
375-
None
376-
}
377-
Err(source) => match source.kind() {
378-
ErrorKind::AlreadyExists => Some(Error::AlreadyExists {
379-
path: path.to_str().unwrap().to_string(),
405+
PutMode::Create => {
406+
if fsync {
407+
file.sync_all().map_err(|source| Error::UnableToSyncFile {
380408
source,
381-
}),
382-
_ => Some(Error::UnableToRenameFile { source }),
383-
},
384-
},
409+
path: staging_path.clone(),
410+
})?;
411+
}
412+
match std::fs::hard_link(&staging_path, &path) {
413+
Ok(_) => {
414+
let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup
415+
if fsync {
416+
fsync_parent_dir(&path)?;
417+
}
418+
None
419+
}
420+
Err(source) => match source.kind() {
421+
ErrorKind::AlreadyExists => Some(Error::AlreadyExists {
422+
path: path.to_str().unwrap().to_string(),
423+
source,
424+
}),
425+
_ => Some(Error::UnableToRenameFile { source }),
426+
},
427+
}
428+
}
385429
PutMode::Update(_) => unreachable!(),
386430
}
387431
}
@@ -414,8 +458,8 @@ impl ObjectStore for LocalFileSystem {
414458
}
415459

416460
let dest = self.path_to_filesystem(location)?;
417-
let (file, src) = new_staged_upload(&dest)?;
418-
Ok(Box::new(LocalUpload::new(src, dest, file)))
461+
let (file, src) = new_staged_upload(&dest, self.fsync)?;
462+
Ok(Box::new(LocalUpload::new(src, dest, file, self.fsync)))
419463
}
420464

421465
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
@@ -549,6 +593,7 @@ impl ObjectStore for LocalFileSystem {
549593

550594
let from = self.path_to_filesystem(from)?;
551595
let to = self.path_to_filesystem(to)?;
596+
let fsync = self.fsync;
552597

553598
match mode {
554599
CopyMode::Overwrite => {
@@ -564,15 +609,25 @@ impl ObjectStore for LocalFileSystem {
564609
let staged = staged_upload_path(&to, &id.to_string());
565610
match std::fs::hard_link(&from, &staged) {
566611
Ok(_) => {
567-
return std::fs::rename(&staged, &to).map_err(|source| {
568-
let _ = std::fs::remove_file(&staged); // Attempt to clean up
569-
Error::UnableToCopyFile { from, to, source }.into()
570-
});
612+
match std::fs::rename(&staged, &to) {
613+
Ok(_) => {
614+
if fsync {
615+
fsync_parent_dir(&to)?;
616+
}
617+
return Ok(());
618+
}
619+
Err(source) => {
620+
let _ = std::fs::remove_file(&staged); // Attempt to clean up
621+
return Err(
622+
Error::UnableToCopyFile { from, to, source }.into()
623+
);
624+
}
625+
}
571626
}
572627
Err(source) => match source.kind() {
573628
ErrorKind::AlreadyExists => id += 1,
574629
ErrorKind::NotFound => match from.exists() {
575-
true => create_parent_dirs(&to, source)?,
630+
true => create_parent_dirs(&to, source, fsync)?,
576631
false => {
577632
return Err(Error::NotFound { path: from, source }.into());
578633
}
@@ -590,7 +645,12 @@ impl ObjectStore for LocalFileSystem {
590645
maybe_spawn_blocking(move || {
591646
loop {
592647
match std::fs::hard_link(&from, &to) {
593-
Ok(_) => return Ok(()),
648+
Ok(_) => {
649+
if fsync {
650+
fsync_parent_dir(&to)?;
651+
}
652+
return Ok(());
653+
}
594654
Err(source) => match source.kind() {
595655
ErrorKind::AlreadyExists => {
596656
return Err(Error::AlreadyExists {
@@ -600,7 +660,7 @@ impl ObjectStore for LocalFileSystem {
600660
.into());
601661
}
602662
ErrorKind::NotFound => match from.exists() {
603-
true => create_parent_dirs(&to, source)?,
663+
true => create_parent_dirs(&to, source, fsync)?,
604664
false => {
605665
return Err(Error::NotFound { path: from, source }.into());
606666
}
@@ -628,13 +688,22 @@ impl ObjectStore for LocalFileSystem {
628688
RenameTargetMode::Overwrite => {
629689
let from = self.path_to_filesystem(from)?;
630690
let to = self.path_to_filesystem(to)?;
691+
let fsync = self.fsync;
631692
maybe_spawn_blocking(move || {
632693
loop {
633694
match std::fs::rename(&from, &to) {
634-
Ok(_) => return Ok(()),
695+
Ok(_) => {
696+
if fsync {
697+
fsync_parent_dir(&to)?;
698+
if from.parent() != to.parent() {
699+
fsync_parent_dir(&from)?;
700+
}
701+
}
702+
return Ok(());
703+
}
635704
Err(source) => match source.kind() {
636705
ErrorKind::NotFound => match from.exists() {
637-
true => create_parent_dirs(&to, source)?,
706+
true => create_parent_dirs(&to, source, fsync)?,
638707
false => {
639708
return Err(Error::NotFound { path: from, source }.into());
640709
}
@@ -786,23 +855,69 @@ impl LocalFileSystem {
786855
}
787856

788857
/// Creates the parent directories of `path` or returns an error based on `source` if no parent
789-
fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> {
858+
///
859+
/// When `fsync` is true, fsyncs each newly created directory and the first pre-existing
860+
/// ancestor to ensure the new directory entries are durable.
861+
fn create_parent_dirs(path: &std::path::Path, source: io::Error, fsync: bool) -> Result<()> {
790862
let parent = path.parent().ok_or_else(|| {
791863
let path = path.to_path_buf();
792864
Error::UnableToCreateFile { path, source }
793865
})?;
794866

795-
std::fs::create_dir_all(parent).map_err(|source| {
796-
let path = parent.into();
797-
Error::UnableToCreateDir { source, path }
867+
if fsync {
868+
let mut first_existing = parent;
869+
while !first_existing.exists() {
870+
first_existing = first_existing.parent().unwrap_or(first_existing);
871+
}
872+
873+
std::fs::create_dir_all(parent).map_err(|source| {
874+
let path = parent.into();
875+
Error::UnableToCreateDir { source, path }
876+
})?;
877+
878+
let mut dir = parent;
879+
loop {
880+
fsync_dir(dir)?;
881+
if dir == first_existing {
882+
break;
883+
}
884+
dir = match dir.parent() {
885+
Some(p) => p,
886+
None => break,
887+
};
888+
}
889+
} else {
890+
std::fs::create_dir_all(parent).map_err(|source| {
891+
let path = parent.into();
892+
Error::UnableToCreateDir { source, path }
893+
})?;
894+
}
895+
Ok(())
896+
}
897+
898+
/// Fsyncs a directory to ensure its entries are durable
899+
fn fsync_dir(dir_path: &std::path::Path) -> Result<()> {
900+
let dir = File::open(dir_path).map_err(|source| Error::UnableToOpenFile {
901+
source,
902+
path: dir_path.into(),
903+
})?;
904+
dir.sync_all().map_err(|source| Error::UnableToSyncFile {
905+
source,
906+
path: dir_path.into(),
798907
})?;
799908
Ok(())
800909
}
801910

911+
/// Fsyncs the parent directory of `path` to ensure directory entry durability
912+
fn fsync_parent_dir(path: &std::path::Path) -> Result<()> {
913+
let parent = path.parent().unwrap_or(path);
914+
fsync_dir(parent)
915+
}
916+
802917
/// Generates a unique file path `{base}#{suffix}`, returning the opened `File` and `path`
803918
///
804919
/// Creates any directories if necessary
805-
fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> {
920+
fn new_staged_upload(base: &std::path::Path, fsync: bool) -> Result<(File, PathBuf)> {
806921
let mut multipart_id = 1;
807922
loop {
808923
let suffix = multipart_id.to_string();
@@ -812,7 +927,7 @@ fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> {
812927
Ok(f) => return Ok((f, path)),
813928
Err(source) => match source.kind() {
814929
ErrorKind::AlreadyExists => multipart_id += 1,
815-
ErrorKind::NotFound => create_parent_dirs(&path, source)?,
930+
ErrorKind::NotFound => create_parent_dirs(&path, source, fsync)?,
816931
_ => return Err(Error::UnableToOpenFile { source, path }.into()),
817932
},
818933
}
@@ -835,6 +950,8 @@ struct LocalUpload {
835950
src: Option<PathBuf>,
836951
/// The next offset to write into the file
837952
offset: u64,
953+
/// Whether to fsync on complete
954+
fsync: bool,
838955
}
839956

840957
#[derive(Debug)]
@@ -844,14 +961,15 @@ struct UploadState {
844961
}
845962

846963
impl LocalUpload {
847-
pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File) -> Self {
964+
pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File, fsync: bool) -> Self {
848965
Self {
849966
state: Arc::new(UploadState {
850967
dest,
851968
file: Mutex::new(file),
852969
}),
853970
src: Some(src),
854971
offset: 0,
972+
fsync,
855973
}
856974
}
857975
}
@@ -882,11 +1000,21 @@ impl MultipartUpload for LocalUpload {
8821000
async fn complete(&mut self) -> Result<PutResult> {
8831001
let src = self.src.take().ok_or(Error::Aborted)?;
8841002
let s = Arc::clone(&self.state);
1003+
let fsync = self.fsync;
8851004
maybe_spawn_blocking(move || {
8861005
// Ensure no inflight writes
8871006
let file = s.file.lock();
1007+
if fsync {
1008+
file.sync_all().map_err(|source| Error::UnableToSyncFile {
1009+
source,
1010+
path: src.clone(),
1011+
})?;
1012+
}
8881013
std::fs::rename(&src, &s.dest)
8891014
.map_err(|source| Error::UnableToRenameFile { source })?;
1015+
if fsync {
1016+
fsync_parent_dir(&s.dest)?;
1017+
}
8901018
let metadata = file.metadata().map_err(|e| Error::Metadata {
8911019
source: e.into(),
8921020
path: src.to_string_lossy().to_string(),

0 commit comments

Comments
 (0)