Skip to content

Commit 596e1f6

Browse files
authored
fix(local): fsync create-mode rename source delete (#758)
* fix(local): fsync create-mode rename delete * better
1 parent c219109 commit 596e1f6

1 file changed

Lines changed: 65 additions & 6 deletions

File tree

src/local.rs

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use crate::{
4444
path::{Path, absolute_path_to_url},
4545
util::InvalidGetRange,
4646
};
47-
use crate::{CopyMode, CopyOptions, ObjectStoreExt, RenameOptions, RenameTargetMode};
47+
use crate::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode};
4848

4949
/// A specialized `Error` for filesystem object store-related errors
5050
#[derive(Debug, thiserror::Error)]
@@ -524,7 +524,9 @@ impl ObjectStore for LocalFileSystem {
524524
let config = Arc::clone(&config);
525525
maybe_spawn_blocking(move || {
526526
let location = location?;
527-
Self::delete_location(config, automatic_cleanup, &location)?;
527+
// `with_fsync` does not apply to standalone deletes; only create-mode rename
528+
// fsyncs its internal source removal as part of the durable copy-and-delete.
529+
Self::delete_location(config, automatic_cleanup, &location, false)?;
528530
Ok(location)
529531
})
530532
})
@@ -727,7 +729,14 @@ impl ObjectStore for LocalFileSystem {
727729
},
728730
)
729731
.await?;
730-
self.delete(from).await?;
732+
let config = Arc::clone(&self.config);
733+
let automatic_cleanup = self.automatic_cleanup;
734+
let fsync = self.fsync;
735+
let from = from.clone();
736+
maybe_spawn_blocking(move || {
737+
Self::delete_location(config, automatic_cleanup, &from, fsync)
738+
})
739+
.await?;
731740
Ok(())
732741
}
733742
}
@@ -739,14 +748,26 @@ impl LocalFileSystem {
739748
config: Arc<Config>,
740749
automatic_cleanup: bool,
741750
location: &Path,
751+
fsync: bool,
742752
) -> Result<()> {
743753
let path = config.path_to_filesystem(location)?;
744754
if let Err(e) = std::fs::remove_file(&path) {
745755
Err(match e.kind() {
746756
ErrorKind::NotFound => Error::NotFound { path, source: e }.into(),
747757
_ => Error::UnableToDeleteFile { path, source: e }.into(),
748758
})
749-
} else if automatic_cleanup {
759+
} else {
760+
if fsync {
761+
fsync_parent_dir(&path).map_err(|source| Error::UnableToSyncFile {
762+
source,
763+
path: path.clone(),
764+
})?;
765+
}
766+
767+
if !automatic_cleanup {
768+
return Ok(());
769+
}
770+
750771
let root = &config.root;
751772
let root = root
752773
.to_file_path()
@@ -763,8 +784,6 @@ impl LocalFileSystem {
763784
}
764785
}
765786

766-
Ok(())
767-
} else {
768787
Ok(())
769788
}
770789
}
@@ -1481,6 +1500,9 @@ mod tests {
14811500
use futures_util::TryStreamExt;
14821501
use tempfile::TempDir;
14831502

1503+
#[cfg(target_family = "unix")]
1504+
use std::os::unix::fs::PermissionsExt;
1505+
14841506
#[cfg(target_family = "unix")]
14851507
use tempfile::NamedTempFile;
14861508

@@ -1570,6 +1592,43 @@ mod tests {
15701592
assert_eq!(read, data);
15711593
}
15721594

1595+
#[tokio::test]
1596+
#[cfg(target_family = "unix")]
1597+
async fn fsync_rename_if_not_exists_propagates_source_delete_sync_error() {
1598+
let root = TempDir::new().unwrap();
1599+
let integration = LocalFileSystem::new_with_prefix(root.path())
1600+
.unwrap()
1601+
.with_fsync(true);
1602+
1603+
let source = Path::from("source_dir/source_file");
1604+
let dest = Path::from("dest_dir/dest_file");
1605+
integration.put(&source, "data".into()).await.unwrap();
1606+
1607+
let source_dir = root.path().join("source_dir");
1608+
let original_permissions = fs::metadata(&source_dir).unwrap().permissions();
1609+
// Allow unlinking the file from the directory, but prevent opening the
1610+
// directory for fsync. Without the source-parent fsync, rename succeeds;
1611+
// with it, the sync error is propagated after the source is deleted.
1612+
fs::set_permissions(&source_dir, fs::Permissions::from_mode(0o300)).unwrap();
1613+
1614+
let result = integration.rename_if_not_exists(&source, &dest).await;
1615+
fs::set_permissions(&source_dir, original_permissions).unwrap();
1616+
1617+
match result {
1618+
Err(crate::Error::Generic { source, .. }) => {
1619+
assert!(source.to_string().contains("Unable to sync data to disk"));
1620+
}
1621+
_ => panic!("expected source parent fsync to fail"),
1622+
}
1623+
1624+
let read = integration.get(&dest).await.unwrap().bytes().await.unwrap();
1625+
assert_eq!(read, Bytes::from("data"));
1626+
assert!(matches!(
1627+
integration.get(&source).await.unwrap_err(),
1628+
crate::Error::NotFound { .. }
1629+
));
1630+
}
1631+
15731632
#[test]
15741633
#[cfg(target_family = "unix")]
15751634
fn test_non_tokio() {

0 commit comments

Comments
 (0)