Skip to content

Commit ffd6454

Browse files
authored
feat(storage): implement opendal resolving storage (#2231)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #2210 ## What changes are included in this PR? - Add OpenDalResolvingStorage <!-- Provide a summary of the modifications in this PR. List the main changes such as new features, bug fixes, refactoring, or any other updates. --> ## Are these changes tested? Added a new test <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? -->
1 parent a54e442 commit ffd6454

3 files changed

Lines changed: 619 additions & 0 deletions

File tree

crates/storage/opendal/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ cfg_if! {
9090
}
9191
}
9292

93+
mod resolving;
94+
pub use resolving::{OpenDalResolvingStorage, OpenDalResolvingStorageFactory};
95+
9396
/// OpenDAL-based storage factory.
9497
///
9598
/// Maps scheme to the corresponding OpenDalStorage storage variant.
Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Resolving storage that auto-detects the scheme from a path and delegates
19+
//! to the appropriate [`OpenDalStorage`] variant.
20+
21+
use std::collections::HashMap;
22+
use std::sync::{Arc, RwLock};
23+
24+
use async_trait::async_trait;
25+
use bytes::Bytes;
26+
use futures::StreamExt;
27+
use futures::stream::BoxStream;
28+
use iceberg::io::{
29+
FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
30+
StorageFactory,
31+
};
32+
use iceberg::{Error, ErrorKind, Result};
33+
use opendal::Scheme;
34+
use serde::{Deserialize, Serialize};
35+
use url::Url;
36+
37+
use crate::OpenDalStorage;
38+
#[cfg(feature = "opendal-s3")]
39+
use crate::s3::CustomAwsCredentialLoader;
40+
41+
/// Schemes supported by OpenDalResolvingStorage
42+
pub const SCHEME_MEMORY: &str = "memory";
43+
pub const SCHEME_FILE: &str = "file";
44+
pub const SCHEME_S3: &str = "s3";
45+
pub const SCHEME_S3A: &str = "s3a";
46+
pub const SCHEME_S3N: &str = "s3n";
47+
pub const SCHEME_GS: &str = "gs";
48+
pub const SCHEME_GCS: &str = "gcs";
49+
pub const SCHEME_OSS: &str = "oss";
50+
pub const SCHEME_ABFSS: &str = "abfss";
51+
pub const SCHEME_ABFS: &str = "abfs";
52+
pub const SCHEME_WASBS: &str = "wasbs";
53+
pub const SCHEME_WASB: &str = "wasb";
54+
55+
/// Parse a URL scheme string into an [`opendal::Scheme`].
56+
fn parse_scheme(scheme: &str) -> Result<Scheme> {
57+
match scheme {
58+
SCHEME_MEMORY => Ok(Scheme::Memory),
59+
SCHEME_FILE | "" => Ok(Scheme::Fs),
60+
SCHEME_S3 | SCHEME_S3A | SCHEME_S3N => Ok(Scheme::S3),
61+
SCHEME_GS | SCHEME_GCS => Ok(Scheme::Gcs),
62+
SCHEME_OSS => Ok(Scheme::Oss),
63+
SCHEME_ABFSS | SCHEME_ABFS | SCHEME_WASBS | SCHEME_WASB => Ok(Scheme::Azdls),
64+
s => s.parse::<Scheme>().map_err(|e| {
65+
Error::new(
66+
ErrorKind::FeatureUnsupported,
67+
format!("Unsupported storage scheme: {s}: {e}"),
68+
)
69+
}),
70+
}
71+
}
72+
73+
/// Extract the scheme string from a path URL.
74+
fn extract_scheme(path: &str) -> Result<String> {
75+
let url = Url::parse(path).map_err(|e| {
76+
Error::new(
77+
ErrorKind::DataInvalid,
78+
format!("Invalid path: {path}, failed to parse URL: {e}"),
79+
)
80+
})?;
81+
Ok(url.scheme().to_string())
82+
}
83+
84+
/// Build an [`OpenDalStorage`] variant for the given scheme and config properties.
85+
fn build_storage_for_scheme(
86+
scheme: &str,
87+
props: &HashMap<String, String>,
88+
#[cfg(feature = "opendal-s3")] customized_credential_load: &Option<CustomAwsCredentialLoader>,
89+
) -> Result<OpenDalStorage> {
90+
match parse_scheme(scheme)? {
91+
#[cfg(feature = "opendal-s3")]
92+
Scheme::S3 => {
93+
let config = crate::s3::s3_config_parse(props.clone())?;
94+
Ok(OpenDalStorage::S3 {
95+
configured_scheme: scheme.to_string(),
96+
config: Arc::new(config),
97+
customized_credential_load: customized_credential_load.clone(),
98+
})
99+
}
100+
#[cfg(feature = "opendal-gcs")]
101+
Scheme::Gcs => {
102+
let config = crate::gcs::gcs_config_parse(props.clone())?;
103+
Ok(OpenDalStorage::Gcs {
104+
config: Arc::new(config),
105+
})
106+
}
107+
#[cfg(feature = "opendal-oss")]
108+
Scheme::Oss => {
109+
let config = crate::oss::oss_config_parse(props.clone())?;
110+
Ok(OpenDalStorage::Oss {
111+
config: Arc::new(config),
112+
})
113+
}
114+
#[cfg(feature = "opendal-azdls")]
115+
Scheme::Azdls => {
116+
let configured_scheme: crate::azdls::AzureStorageScheme = scheme.parse()?;
117+
let config = crate::azdls::azdls_config_parse(props.clone())?;
118+
Ok(OpenDalStorage::Azdls {
119+
configured_scheme,
120+
config: Arc::new(config),
121+
})
122+
}
123+
#[cfg(feature = "opendal-fs")]
124+
Scheme::Fs => Ok(OpenDalStorage::LocalFs),
125+
#[cfg(feature = "opendal-memory")]
126+
Scheme::Memory => Ok(OpenDalStorage::Memory(crate::memory::memory_config_build()?)),
127+
unsupported => Err(Error::new(
128+
ErrorKind::FeatureUnsupported,
129+
format!("Unsupported storage scheme: {unsupported}"),
130+
)),
131+
}
132+
}
133+
134+
/// A resolving storage factory that creates [`OpenDalResolvingStorage`] instances.
135+
///
136+
/// This factory accepts paths from any supported storage system and dynamically
137+
/// delegates operations to the appropriate [`OpenDalStorage`] variant based on
138+
/// the path scheme.
139+
///
140+
/// # Example
141+
///
142+
/// ```rust,ignore
143+
/// use std::sync::Arc;
144+
/// use iceberg::io::FileIOBuilder;
145+
/// use iceberg_storage_opendal::OpenDalResolvingStorageFactory;
146+
///
147+
/// let factory = OpenDalResolvingStorageFactory::new();
148+
/// let file_io = FileIOBuilder::new(Arc::new(factory))
149+
/// .with_prop("s3.region", "us-east-1")
150+
/// .build();
151+
/// ```
152+
#[derive(Clone, Debug, Serialize, Deserialize)]
153+
pub struct OpenDalResolvingStorageFactory {
154+
/// Custom AWS credential loader for S3 storage.
155+
#[cfg(feature = "opendal-s3")]
156+
#[serde(skip)]
157+
customized_credential_load: Option<CustomAwsCredentialLoader>,
158+
}
159+
160+
impl Default for OpenDalResolvingStorageFactory {
161+
fn default() -> Self {
162+
Self::new()
163+
}
164+
}
165+
166+
impl OpenDalResolvingStorageFactory {
167+
/// Create a new resolving storage factory.
168+
pub fn new() -> Self {
169+
Self {
170+
#[cfg(feature = "opendal-s3")]
171+
customized_credential_load: None,
172+
}
173+
}
174+
175+
/// Set a custom AWS credential loader for S3 storage.
176+
#[cfg(feature = "opendal-s3")]
177+
pub fn with_s3_credential_loader(mut self, loader: CustomAwsCredentialLoader) -> Self {
178+
self.customized_credential_load = Some(loader);
179+
self
180+
}
181+
}
182+
183+
#[typetag::serde]
184+
impl StorageFactory for OpenDalResolvingStorageFactory {
185+
fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
186+
Ok(Arc::new(OpenDalResolvingStorage {
187+
props: config.props().clone(),
188+
storages: RwLock::new(HashMap::new()),
189+
#[cfg(feature = "opendal-s3")]
190+
customized_credential_load: self.customized_credential_load.clone(),
191+
}))
192+
}
193+
}
194+
195+
/// A resolving storage that auto-detects the scheme from a path and delegates
196+
/// to the appropriate [`OpenDalStorage`] variant.
197+
///
198+
/// Sub-storages are lazily created on first use for each scheme and cached
199+
/// for subsequent operations.
200+
#[derive(Debug, Serialize, Deserialize)]
201+
pub struct OpenDalResolvingStorage {
202+
/// Configuration properties shared across all backends.
203+
props: HashMap<String, String>,
204+
/// Cache of scheme → storage mappings.
205+
#[serde(skip, default)]
206+
storages: RwLock<HashMap<String, Arc<OpenDalStorage>>>,
207+
/// Custom AWS credential loader for S3 storage.
208+
#[cfg(feature = "opendal-s3")]
209+
#[serde(skip)]
210+
customized_credential_load: Option<CustomAwsCredentialLoader>,
211+
}
212+
213+
impl OpenDalResolvingStorage {
214+
/// Resolve the storage for the given path by extracting the scheme and
215+
/// returning the cached or newly-created [`OpenDalStorage`].
216+
fn resolve(&self, path: &str) -> Result<Arc<OpenDalStorage>> {
217+
let scheme = extract_scheme(path)?;
218+
219+
// Fast path: check read lock first.
220+
{
221+
let cache = self
222+
.storages
223+
.read()
224+
.map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
225+
if let Some(storage) = cache.get(&scheme) {
226+
return Ok(storage.clone());
227+
}
228+
}
229+
230+
// Slow path: build and insert under write lock.
231+
let mut cache = self
232+
.storages
233+
.write()
234+
.map_err(|_| Error::new(ErrorKind::Unexpected, "Storage cache lock poisoned"))?;
235+
236+
// Double-check after acquiring write lock.
237+
if let Some(storage) = cache.get(&scheme) {
238+
return Ok(storage.clone());
239+
}
240+
241+
let storage = build_storage_for_scheme(
242+
&scheme,
243+
&self.props,
244+
#[cfg(feature = "opendal-s3")]
245+
&self.customized_credential_load,
246+
)?;
247+
let storage = Arc::new(storage);
248+
cache.insert(scheme, storage.clone());
249+
Ok(storage)
250+
}
251+
}
252+
253+
#[async_trait]
254+
#[typetag::serde]
255+
impl Storage for OpenDalResolvingStorage {
256+
async fn exists(&self, path: &str) -> Result<bool> {
257+
self.resolve(path)?.exists(path).await
258+
}
259+
260+
async fn metadata(&self, path: &str) -> Result<FileMetadata> {
261+
self.resolve(path)?.metadata(path).await
262+
}
263+
264+
async fn read(&self, path: &str) -> Result<Bytes> {
265+
self.resolve(path)?.read(path).await
266+
}
267+
268+
async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
269+
self.resolve(path)?.reader(path).await
270+
}
271+
272+
async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
273+
self.resolve(path)?.write(path, bs).await
274+
}
275+
276+
async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
277+
self.resolve(path)?.writer(path).await
278+
}
279+
280+
async fn delete(&self, path: &str) -> Result<()> {
281+
self.resolve(path)?.delete(path).await
282+
}
283+
284+
async fn delete_prefix(&self, path: &str) -> Result<()> {
285+
self.resolve(path)?.delete_prefix(path).await
286+
}
287+
288+
async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
289+
// Group paths by scheme so each resolved storage receives a batch,
290+
// avoiding repeated operator creation per path.
291+
let mut grouped: HashMap<String, Vec<String>> = HashMap::new();
292+
while let Some(path) = paths.next().await {
293+
let scheme = extract_scheme(&path)?;
294+
grouped.entry(scheme).or_default().push(path);
295+
}
296+
297+
for (_, paths) in grouped {
298+
let storage = self.resolve(&paths[0])?;
299+
storage
300+
.delete_stream(futures::stream::iter(paths).boxed())
301+
.await?;
302+
}
303+
Ok(())
304+
}
305+
306+
fn new_input(&self, path: &str) -> Result<InputFile> {
307+
Ok(InputFile::new(
308+
Arc::new(self.resolve(path)?.as_ref().clone()),
309+
path.to_string(),
310+
))
311+
}
312+
313+
fn new_output(&self, path: &str) -> Result<OutputFile> {
314+
Ok(OutputFile::new(
315+
Arc::new(self.resolve(path)?.as_ref().clone()),
316+
path.to_string(),
317+
))
318+
}
319+
}

0 commit comments

Comments
 (0)