Skip to content

Commit c1e5ace

Browse files
authored
feat(io): Add delete_stream to Storage trait (apache#2216)
2 parents 7ef4063 + 170bece commit c1e5ace

9 files changed

Lines changed: 503 additions & 3 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iceberg/src/io/file_io.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::ops::Range;
1919
use std::sync::{Arc, OnceLock};
2020

2121
use bytes::Bytes;
22+
use futures::{Stream, StreamExt};
2223

2324
use super::storage::{
2425
LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig, StorageFactory,
@@ -140,6 +141,18 @@ impl FileIO {
140141
self.get_storage()?.delete_prefix(path.as_ref()).await
141142
}
142143

144+
/// Delete multiple files from a stream of paths.
145+
///
146+
/// # Arguments
147+
///
148+
/// * paths: A stream of absolute paths starting with the scheme string used to construct [`FileIO`].
149+
pub async fn delete_stream(
150+
&self,
151+
paths: impl Stream<Item = String> + Send + 'static,
152+
) -> Result<()> {
153+
self.get_storage()?.delete_stream(paths.boxed()).await
154+
}
155+
143156
/// Check file exists.
144157
///
145158
/// # Arguments

crates/iceberg/src/io/storage/local_fs.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use std::sync::Arc;
2929

3030
use async_trait::async_trait;
3131
use bytes::Bytes;
32+
use futures::StreamExt;
33+
use futures::stream::BoxStream;
3234
use serde::{Deserialize, Serialize};
3335

3436
use crate::io::{
@@ -200,6 +202,13 @@ impl Storage for LocalFsStorage {
200202
Ok(())
201203
}
202204

205+
async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
206+
while let Some(path) = paths.next().await {
207+
self.delete(&path).await?;
208+
}
209+
Ok(())
210+
}
211+
203212
fn new_input(&self, path: &str) -> Result<InputFile> {
204213
Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
205214
}
@@ -534,4 +543,61 @@ mod tests {
534543

535544
assert!(path.exists());
536545
}
546+
547+
#[tokio::test]
548+
async fn test_local_fs_storage_delete_stream() {
549+
use futures::stream;
550+
551+
let tmp_dir = TempDir::new().unwrap();
552+
let storage = LocalFsStorage::new();
553+
554+
// Create multiple files
555+
let file1 = tmp_dir.path().join("file1.txt");
556+
let file2 = tmp_dir.path().join("file2.txt");
557+
let file3 = tmp_dir.path().join("file3.txt");
558+
559+
storage
560+
.write(file1.to_str().unwrap(), Bytes::from("1"))
561+
.await
562+
.unwrap();
563+
storage
564+
.write(file2.to_str().unwrap(), Bytes::from("2"))
565+
.await
566+
.unwrap();
567+
storage
568+
.write(file3.to_str().unwrap(), Bytes::from("3"))
569+
.await
570+
.unwrap();
571+
572+
// Verify files exist
573+
assert!(storage.exists(file1.to_str().unwrap()).await.unwrap());
574+
assert!(storage.exists(file2.to_str().unwrap()).await.unwrap());
575+
assert!(storage.exists(file3.to_str().unwrap()).await.unwrap());
576+
577+
// Delete multiple files using stream
578+
let paths = vec![
579+
file1.to_str().unwrap().to_string(),
580+
file2.to_str().unwrap().to_string(),
581+
];
582+
let path_stream = stream::iter(paths).boxed();
583+
storage.delete_stream(path_stream).await.unwrap();
584+
585+
// Verify deleted files no longer exist
586+
assert!(!storage.exists(file1.to_str().unwrap()).await.unwrap());
587+
assert!(!storage.exists(file2.to_str().unwrap()).await.unwrap());
588+
589+
// Verify file3 still exists
590+
assert!(storage.exists(file3.to_str().unwrap()).await.unwrap());
591+
}
592+
593+
#[tokio::test]
594+
async fn test_local_fs_storage_delete_stream_empty() {
595+
use futures::stream;
596+
597+
let storage = LocalFsStorage::new();
598+
599+
// Delete with empty stream should succeed
600+
let path_stream = stream::iter(Vec::<String>::new()).boxed();
601+
storage.delete_stream(path_stream).await.unwrap();
602+
}
537603
}

crates/iceberg/src/io/storage/memory.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ use std::sync::{Arc, RwLock};
2828

2929
use async_trait::async_trait;
3030
use bytes::Bytes;
31+
use futures::StreamExt;
32+
use futures::stream::BoxStream;
3133
use serde::{Deserialize, Serialize};
3234

3335
use crate::io::{
@@ -220,6 +222,13 @@ impl Storage for MemoryStorage {
220222
Ok(())
221223
}
222224

225+
async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
226+
while let Some(path) = paths.next().await {
227+
self.delete(&path).await?;
228+
}
229+
Ok(())
230+
}
231+
223232
fn new_input(&self, path: &str) -> Result<InputFile> {
224233
Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
225234
}
@@ -594,4 +603,56 @@ mod tests {
594603
assert_eq!(storage.read("/path/to/file").await.unwrap(), content);
595604
assert_eq!(storage.read("path/to/file").await.unwrap(), content);
596605
}
606+
607+
#[tokio::test]
608+
async fn test_memory_storage_delete_stream() {
609+
use futures::stream;
610+
611+
let storage = MemoryStorage::new();
612+
613+
// Create multiple files
614+
storage
615+
.write("memory://file1.txt", Bytes::from("1"))
616+
.await
617+
.unwrap();
618+
storage
619+
.write("memory://file2.txt", Bytes::from("2"))
620+
.await
621+
.unwrap();
622+
storage
623+
.write("memory://file3.txt", Bytes::from("3"))
624+
.await
625+
.unwrap();
626+
627+
// Verify files exist
628+
assert!(storage.exists("memory://file1.txt").await.unwrap());
629+
assert!(storage.exists("memory://file2.txt").await.unwrap());
630+
assert!(storage.exists("memory://file3.txt").await.unwrap());
631+
632+
// Delete multiple files using stream
633+
let paths = vec![
634+
"memory://file1.txt".to_string(),
635+
"memory://file2.txt".to_string(),
636+
];
637+
let path_stream = stream::iter(paths).boxed();
638+
storage.delete_stream(path_stream).await.unwrap();
639+
640+
// Verify deleted files no longer exist
641+
assert!(!storage.exists("memory://file1.txt").await.unwrap());
642+
assert!(!storage.exists("memory://file2.txt").await.unwrap());
643+
644+
// Verify file3 still exists
645+
assert!(storage.exists("memory://file3.txt").await.unwrap());
646+
}
647+
648+
#[tokio::test]
649+
async fn test_memory_storage_delete_stream_empty() {
650+
use futures::stream;
651+
652+
let storage = MemoryStorage::new();
653+
654+
// Delete with empty stream should succeed
655+
let path_stream = stream::iter(Vec::<String>::new()).boxed();
656+
storage.delete_stream(path_stream).await.unwrap();
657+
}
597658
}

crates/iceberg/src/io/storage/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use std::sync::Arc;
2727
use async_trait::async_trait;
2828
use bytes::Bytes;
2929
pub use config::*;
30+
use futures::stream::BoxStream;
3031
pub use local_fs::{LocalFsStorage, LocalFsStorageFactory};
3132
pub use memory::{MemoryStorage, MemoryStorageFactory};
3233

@@ -93,6 +94,9 @@ pub trait Storage: Debug + Send + Sync {
9394
/// Delete all files with the given prefix
9495
async fn delete_prefix(&self, path: &str) -> Result<()>;
9596

97+
/// Delete multiple files from a stream of paths.
98+
async fn delete_stream(&self, paths: BoxStream<'static, String>) -> Result<()>;
99+
96100
/// Create a new input file for reading
97101
fn new_input(&self, path: &str) -> Result<InputFile>;
98102

crates/storage/opendal/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ reqwest = { workspace = true }
4949
serde = { workspace = true }
5050
typetag = { workspace = true }
5151
url = { workspace = true }
52+
futures = { workspace = true }
5253

5354
[dev-dependencies]
5455
async-trait = { workspace = true }

crates/storage/opendal/src/azdls.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ impl FromStr for AzureStorageScheme {
160160
}
161161

162162
/// Validates whether the given path matches what's configured for the backend.
163-
fn match_path_with_config(
163+
pub(crate) fn match_path_with_config(
164164
path: &AzureStoragePath,
165165
config: &AzdlsConfig,
166166
configured_scheme: &AzureStorageScheme,
@@ -220,7 +220,7 @@ fn azdls_config_build(config: &AzdlsConfig, path: &AzureStoragePath) -> Result<o
220220

221221
/// Represents a fully qualified path to blob/ file in Azure Storage.
222222
#[derive(Debug, PartialEq)]
223-
struct AzureStoragePath {
223+
pub(crate) struct AzureStoragePath {
224224
/// The scheme of the URL, e.g., `abfss`, `abfs`, `wasbs`, or `wasb`.
225225
scheme: AzureStorageScheme,
226226

@@ -236,7 +236,7 @@ struct AzureStoragePath {
236236
/// Path to the file.
237237
///
238238
/// It is relative to the `root` of the `AzdlsConfig`.
239-
path: String,
239+
pub(crate) path: String,
240240
}
241241

242242
impl AzureStoragePath {

0 commit comments

Comments
 (0)