Skip to content

Commit a54e442

Browse files
authored
feat(io): Add delete_stream to Storage trait (#2216)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #2065 ## What changes are included in this PR? - Add `delete_stream` to `Storage` trait to support batch delete - Expose `delete_stream` in `FileIO` as well <!-- Provide a summary of the modifications in this PR. List the main changes such as new features, bug fixes, refactoring, or any other updates. --> ## Are these changes tested? Added uts Addded integtests for opendal <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? -->
1 parent cc256f5 commit a54e442

9 files changed

Lines changed: 503 additions & 3 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iceberg/src/io/file_io.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::ops::Range;
1919
use std::sync::{Arc, OnceLock};
2020

2121
use bytes::Bytes;
22+
use futures::{Stream, StreamExt};
2223

2324
use super::storage::{
2425
LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig, StorageFactory,
@@ -140,6 +141,18 @@ impl FileIO {
140141
self.get_storage()?.delete_prefix(path.as_ref()).await
141142
}
142143

144+
/// Delete multiple files from a stream of paths.
145+
///
146+
/// # Arguments
147+
///
148+
/// * paths: A stream of absolute paths starting with the scheme string used to construct [`FileIO`].
149+
pub async fn delete_stream(
150+
&self,
151+
paths: impl Stream<Item = String> + Send + 'static,
152+
) -> Result<()> {
153+
self.get_storage()?.delete_stream(paths.boxed()).await
154+
}
155+
143156
/// Check file exists.
144157
///
145158
/// # Arguments

crates/iceberg/src/io/storage/local_fs.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use std::sync::Arc;
2929

3030
use async_trait::async_trait;
3131
use bytes::Bytes;
32+
use futures::StreamExt;
33+
use futures::stream::BoxStream;
3234
use serde::{Deserialize, Serialize};
3335

3436
use crate::io::{
@@ -200,6 +202,13 @@ impl Storage for LocalFsStorage {
200202
Ok(())
201203
}
202204

205+
async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
206+
while let Some(path) = paths.next().await {
207+
self.delete(&path).await?;
208+
}
209+
Ok(())
210+
}
211+
203212
fn new_input(&self, path: &str) -> Result<InputFile> {
204213
Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
205214
}
@@ -534,4 +543,61 @@ mod tests {
534543

535544
assert!(path.exists());
536545
}
546+
547+
#[tokio::test]
548+
async fn test_local_fs_storage_delete_stream() {
549+
use futures::stream;
550+
551+
let tmp_dir = TempDir::new().unwrap();
552+
let storage = LocalFsStorage::new();
553+
554+
// Create multiple files
555+
let file1 = tmp_dir.path().join("file1.txt");
556+
let file2 = tmp_dir.path().join("file2.txt");
557+
let file3 = tmp_dir.path().join("file3.txt");
558+
559+
storage
560+
.write(file1.to_str().unwrap(), Bytes::from("1"))
561+
.await
562+
.unwrap();
563+
storage
564+
.write(file2.to_str().unwrap(), Bytes::from("2"))
565+
.await
566+
.unwrap();
567+
storage
568+
.write(file3.to_str().unwrap(), Bytes::from("3"))
569+
.await
570+
.unwrap();
571+
572+
// Verify files exist
573+
assert!(storage.exists(file1.to_str().unwrap()).await.unwrap());
574+
assert!(storage.exists(file2.to_str().unwrap()).await.unwrap());
575+
assert!(storage.exists(file3.to_str().unwrap()).await.unwrap());
576+
577+
// Delete multiple files using stream
578+
let paths = vec![
579+
file1.to_str().unwrap().to_string(),
580+
file2.to_str().unwrap().to_string(),
581+
];
582+
let path_stream = stream::iter(paths).boxed();
583+
storage.delete_stream(path_stream).await.unwrap();
584+
585+
// Verify deleted files no longer exist
586+
assert!(!storage.exists(file1.to_str().unwrap()).await.unwrap());
587+
assert!(!storage.exists(file2.to_str().unwrap()).await.unwrap());
588+
589+
// Verify file3 still exists
590+
assert!(storage.exists(file3.to_str().unwrap()).await.unwrap());
591+
}
592+
593+
#[tokio::test]
594+
async fn test_local_fs_storage_delete_stream_empty() {
595+
use futures::stream;
596+
597+
let storage = LocalFsStorage::new();
598+
599+
// Delete with empty stream should succeed
600+
let path_stream = stream::iter(Vec::<String>::new()).boxed();
601+
storage.delete_stream(path_stream).await.unwrap();
602+
}
537603
}

crates/iceberg/src/io/storage/memory.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ use std::sync::{Arc, RwLock};
2828

2929
use async_trait::async_trait;
3030
use bytes::Bytes;
31+
use futures::StreamExt;
32+
use futures::stream::BoxStream;
3133
use serde::{Deserialize, Serialize};
3234

3335
use crate::io::{
@@ -220,6 +222,13 @@ impl Storage for MemoryStorage {
220222
Ok(())
221223
}
222224

225+
async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
226+
while let Some(path) = paths.next().await {
227+
self.delete(&path).await?;
228+
}
229+
Ok(())
230+
}
231+
223232
fn new_input(&self, path: &str) -> Result<InputFile> {
224233
Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
225234
}
@@ -594,4 +603,56 @@ mod tests {
594603
assert_eq!(storage.read("/path/to/file").await.unwrap(), content);
595604
assert_eq!(storage.read("path/to/file").await.unwrap(), content);
596605
}
606+
607+
#[tokio::test]
608+
async fn test_memory_storage_delete_stream() {
609+
use futures::stream;
610+
611+
let storage = MemoryStorage::new();
612+
613+
// Create multiple files
614+
storage
615+
.write("memory://file1.txt", Bytes::from("1"))
616+
.await
617+
.unwrap();
618+
storage
619+
.write("memory://file2.txt", Bytes::from("2"))
620+
.await
621+
.unwrap();
622+
storage
623+
.write("memory://file3.txt", Bytes::from("3"))
624+
.await
625+
.unwrap();
626+
627+
// Verify files exist
628+
assert!(storage.exists("memory://file1.txt").await.unwrap());
629+
assert!(storage.exists("memory://file2.txt").await.unwrap());
630+
assert!(storage.exists("memory://file3.txt").await.unwrap());
631+
632+
// Delete multiple files using stream
633+
let paths = vec![
634+
"memory://file1.txt".to_string(),
635+
"memory://file2.txt".to_string(),
636+
];
637+
let path_stream = stream::iter(paths).boxed();
638+
storage.delete_stream(path_stream).await.unwrap();
639+
640+
// Verify deleted files no longer exist
641+
assert!(!storage.exists("memory://file1.txt").await.unwrap());
642+
assert!(!storage.exists("memory://file2.txt").await.unwrap());
643+
644+
// Verify file3 still exists
645+
assert!(storage.exists("memory://file3.txt").await.unwrap());
646+
}
647+
648+
#[tokio::test]
649+
async fn test_memory_storage_delete_stream_empty() {
650+
use futures::stream;
651+
652+
let storage = MemoryStorage::new();
653+
654+
// Delete with empty stream should succeed
655+
let path_stream = stream::iter(Vec::<String>::new()).boxed();
656+
storage.delete_stream(path_stream).await.unwrap();
657+
}
597658
}

crates/iceberg/src/io/storage/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use std::sync::Arc;
2727
use async_trait::async_trait;
2828
use bytes::Bytes;
2929
pub use config::*;
30+
use futures::stream::BoxStream;
3031
pub use local_fs::{LocalFsStorage, LocalFsStorageFactory};
3132
pub use memory::{MemoryStorage, MemoryStorageFactory};
3233

@@ -93,6 +94,9 @@ pub trait Storage: Debug + Send + Sync {
9394
/// Delete all files with the given prefix
9495
async fn delete_prefix(&self, path: &str) -> Result<()>;
9596

97+
/// Delete multiple files from a stream of paths.
98+
async fn delete_stream(&self, paths: BoxStream<'static, String>) -> Result<()>;
99+
96100
/// Create a new input file for reading
97101
fn new_input(&self, path: &str) -> Result<InputFile>;
98102

crates/storage/opendal/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ reqwest = { workspace = true }
4949
serde = { workspace = true }
5050
typetag = { workspace = true }
5151
url = { workspace = true }
52+
futures = { workspace = true }
5253

5354
[dev-dependencies]
5455
async-trait = { workspace = true }

crates/storage/opendal/src/azdls.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ impl FromStr for AzureStorageScheme {
160160
}
161161

162162
/// Validates whether the given path matches what's configured for the backend.
163-
fn match_path_with_config(
163+
pub(crate) fn match_path_with_config(
164164
path: &AzureStoragePath,
165165
config: &AzdlsConfig,
166166
configured_scheme: &AzureStorageScheme,
@@ -220,7 +220,7 @@ fn azdls_config_build(config: &AzdlsConfig, path: &AzureStoragePath) -> Result<o
220220

221221
/// Represents a fully qualified path to blob/ file in Azure Storage.
222222
#[derive(Debug, PartialEq)]
223-
struct AzureStoragePath {
223+
pub(crate) struct AzureStoragePath {
224224
/// The scheme of the URL, e.g., `abfss`, `abfs`, `wasbs`, or `wasb`.
225225
scheme: AzureStorageScheme,
226226

@@ -236,7 +236,7 @@ struct AzureStoragePath {
236236
/// Path to the file.
237237
///
238238
/// It is relative to the `root` of the `AzdlsConfig`.
239-
path: String,
239+
pub(crate) path: String,
240240
}
241241

242242
impl AzureStoragePath {

0 commit comments

Comments
 (0)