Skip to content

Commit af79eae

Browse files
DerGutLi0k
authored andcommitted
feat(storage-azdls): Add Azure Datalake Storage support (apache#1368)
- Closes apache#1360. This PR adds an integration for the Azure Datalake storage service. At it's core, it adds parsing logic for configuration properties. The finished config struct is simply passed down to OpenDAL. In addition it adds logic to parse fully qualified file URIs, and matches it against expected (previously configured) values. It also creates a new `Storage::Azdls` enum variant based on OpenDAL's existing `Scheme::Azdls` enum variant. It then fits the parsing logic into the existing framework to build the storage integration from an `io::FileIOBuilder`. Other Iceberg ADLS integrations ([pyiceberg + Java](https://github.com/apache/iceberg-go/pull/313/files#r2021460617)) also support the `wasb://` and `wasbs://` schemes. WASB refers to a client-side implementation of hierarchical namespaces on top of Blob Storage. ADLS(v2) on the other hand is a service offered by Azure, also built on top of Blob Storage. IIUC we can accept both schemes because objects written to Blob Storage via `wasb://` will also be accessible via `adfs://` (which operates on the same Blob Storage). Even though the URIs slightly differ in format when they refer to the same object, we can largely reuse existing logic. ```diff -wasb[s]://<containername>@<accountname>.blob.core.windows.net/<path> +adfs[s]://<filesystemname>@<accountname>.dfs.core.windows.net/<path> ``` I added minor unit tests to validate the configuration property parsing logic. I decided **not** to add integration tests because 1. ADLS is not S3-compatible which means that we can't reuse our Minio setup 2. the Azure-specific alternative to local testing - Azurite - doesn't support ADLS I have yet to test it in a functioning environment. --------- Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
1 parent cccb65d commit af79eae

9 files changed

Lines changed: 667 additions & 63 deletions

File tree

Cargo.lock

Lines changed: 12 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ mockito = "1"
7878
murmur3 = "0.5.2"
7979
num-bigint = "0.4.6"
8080
once_cell = "1.20"
81-
opendal = "0.51.2"
81+
opendal = "0.53.3"
8282
ordered-float = "4"
8383
parquet = "54.1.0"
8484
paste = "1.0.15"
@@ -97,7 +97,7 @@ serde_json = "1.0.138"
9797
serde_repr = "0.1.16"
9898
serde_with = "3.4"
9999
tempfile = "3.18"
100-
thrift = "0.17.0"
100+
thrift = "0.17.0"
101101
tokio = { version = "1.36", default-features = false }
102102
typed-builder = "0.20"
103103
url = "2.5.4"

crates/iceberg/Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@ keywords = ["iceberg"]
3232
default = ["storage-memory", "storage-fs", "storage-s3", "tokio"]
3333
storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs", "storage-azblob"]
3434

35-
storage-memory = ["opendal/services-memory"]
35+
storage-azdls = ["opendal/services-azdls"]
3636
storage-fs = ["opendal/services-fs"]
37-
storage-s3 = ["opendal/services-s3"]
3837
storage-gcs = ["opendal/services-gcs"]
38+
storage-memory = ["opendal/services-memory"]
39+
storage-oss = ["opendal/services-oss"]
40+
storage-s3 = ["opendal/services-s3"]
3941
storage-azblob = ["opendal/services-azblob"]
4042

4143
async-std = ["dep:async-std"]

crates/iceberg/README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,27 @@ async fn main() -> Result<()> {
5757
Ok(())
5858
}
5959
```
60+
61+
## IO Support
62+
63+
Iceberg Rust provides various storage backends through feature flags. Here are the currently supported storage backends:
64+
65+
| Storage Backend | Feature Flag | Status | Description |
66+
| -------------------- | ---------------- | -------------- | --------------------------------------------- |
67+
| Memory | `storage-memory` | ✅ Stable | In-memory storage for testing and development |
68+
| Local Filesystem | `storage-fs` | ✅ Stable | Local filesystem storage |
69+
| Amazon S3 | `storage-s3` | ✅ Stable | Amazon S3 storage |
70+
| Google Cloud Storage | `storage-gcs` | ✅ Stable | Google Cloud Storage |
71+
| Alibaba Cloud OSS | `storage-oss` | 🧪 Experimental | Alibaba Cloud Object Storage Service |
72+
| Azure Datalake | `storage-azdls` | 🧪 Experimental | Azure Datalake Storage v2 |
73+
74+
You can enable all stable storage backends at once using the `storage-all` feature flag.
75+
76+
> Note that `storage-oss` and `storage-azdls` are currently experimental and not included in `storage-all`.
77+
78+
Example usage in `Cargo.toml`:
79+
80+
```toml
81+
[dependencies]
82+
iceberg = { version = "x.y.z", features = ["storage-s3", "storage-fs"] }
83+
```

crates/iceberg/src/io/file_io.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,15 @@ pub const IO_CHUNK_SIZE: &str = "io.write.chunk-size";
4242
///
4343
/// Supported storages:
4444
///
45-
/// | Storage | Feature Flag | Schemes |
46-
/// |--------------------|-------------------|------------|
47-
/// | Local file system | `storage-fs` | `file` |
48-
/// | Memory | `storage-memory` | `memory` |
49-
/// | S3 | `storage-s3` | `s3`, `s3a`|
50-
/// | GCS | `storage-gcs` | `gs`, `gcs`|
51-
/// | AZBLOB | `storage-azblob` | `azblob` |
45+
/// | Storage | Feature Flag | Expected Path Format | Schemes |
46+
/// |--------------------|-------------------|----------------------------------| ------------------------------|
47+
/// | Local file system | `storage-fs` | `file` | `file://path/to/file` |
48+
/// | Memory | `storage-memory` | `memory` | `memory://path/to/file` |
49+
/// | S3 | `storage-s3` | `s3`, `s3a` | `s3://<bucket>/path/to/file` |
50+
/// | GCS | `storage-gcs` | `gs`, `gcs` | `gs://<bucket>/path/to/file` |
51+
/// | OSS | `storage-oss` | `oss` | `oss://<bucket>/path/to/file` |
52+
/// | AZBLOB | `storage-azblob` | `azblob` | |
53+
/// | Azure Datalake | `storage-azdls` | `abfs`, `abfss`, `wasb`, `wasbs` | `abfs://<filesystem>@<account>.dfs.core.windows.net/path/to/file` or `wasb://<container>@<account>.blob.core.windows.net/path/to/file` |
5254
#[derive(Clone, Debug)]
5355
pub struct FileIO {
5456
builder: FileIOBuilder,
@@ -336,7 +338,8 @@ impl FileWrite for opendal::Writer {
336338
}
337339

338340
async fn close(&mut self) -> crate::Result<()> {
339-
Ok(opendal::Writer::close(self).await?)
341+
let _ = opendal::Writer::close(self).await?;
342+
Ok(())
340343
}
341344
}
342345

crates/iceberg/src/io/mod.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,32 +67,42 @@
6767
//! - `new_output`: Create output file for writing.
6868
6969
mod file_io;
70+
mod storage;
71+
7072
pub use file_io::*;
73+
pub(crate) mod object_cache;
7174

72-
mod storage;
75+
#[cfg(feature = "storage-azdls")]
76+
mod storage_azdls;
77+
#[cfg(feature = "storage-fs")]
78+
mod storage_fs;
79+
#[cfg(feature = "storage-gcs")]
80+
mod storage_gcs;
7381
#[cfg(feature = "storage-memory")]
7482
mod storage_memory;
75-
#[cfg(feature = "storage-memory")]
76-
use storage_memory::*;
83+
#[cfg(feature = "storage-oss")]
84+
mod storage_oss;
7785
#[cfg(feature = "storage-s3")]
7886
mod storage_s3;
79-
#[cfg(feature = "storage-s3")]
80-
pub use storage_s3::*;
81-
pub(crate) mod object_cache;
82-
#[cfg(feature = "storage-fs")]
83-
mod storage_fs;
8487

88+
#[cfg(feature = "storage-azdls")]
89+
pub use storage_azdls::*;
8590
#[cfg(feature = "storage-fs")]
8691
use storage_fs::*;
8792
#[cfg(feature = "storage-gcs")]
88-
mod storage_gcs;
89-
#[cfg(feature = "storage-gcs")]
9093
pub use storage_gcs::*;
9194

9295
#[cfg(feature = "storage-azblob")]
9396
mod storage_azblob;
9497
#[cfg(feature = "storage-azblob")]
9598
pub use storage_azblob::*;
99+
#[cfg(feature = "storage-memory")]
100+
use storage_memory::*;
101+
#[cfg(feature = "storage-oss")]
102+
pub use storage_oss::*;
103+
#[cfg(feature = "storage-s3")]
104+
pub use storage_s3::*;
105+
96106
pub(crate) fn is_truthy(value: &str) -> bool {
97107
["true", "t", "1", "on"].contains(&value.to_lowercase().as_str())
98108
}

crates/iceberg/src/io/storage.rs

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,16 @@ use std::sync::Arc;
2020
use opendal::layers::{RetryLayer, TimeoutLayer};
2121
#[cfg(feature = "storage-azblob")]
2222
use opendal::services::AzblobConfig;
23+
#[cfg(feature = "storage-azdls")]
24+
use opendal::services::AzdlsConfig;
2325
#[cfg(feature = "storage-gcs")]
2426
use opendal::services::GcsConfig;
2527
#[cfg(feature = "storage-s3")]
2628
use opendal::services::S3Config;
2729
use opendal::{Operator, Scheme};
2830

31+
#[cfg(feature = "storage-azdls")]
32+
use super::AzureStorageScheme;
2933
use super::FileIOBuilder;
3034
use crate::{Error, ErrorKind};
3135

@@ -36,22 +40,30 @@ pub(crate) enum Storage {
3640
Memory(Operator),
3741
#[cfg(feature = "storage-fs")]
3842
LocalFs,
43+
/// Expects paths of the form `s3[a]://<bucket>/<path>`.
3944
#[cfg(feature = "storage-s3")]
4045
S3 {
4146
/// s3 storage could have `s3://` and `s3a://`.
4247
/// Storing the scheme string here to return the correct path.
43-
scheme_str: String,
44-
/// uses the same client for one FileIO Storage.
45-
///
46-
/// TODO: allow users to configure this client.
47-
client: reqwest::Client,
48+
configured_scheme: String,
4849
config: Arc<S3Config>,
4950
},
5051
#[cfg(feature = "storage-gcs")]
5152
Gcs { config: Arc<GcsConfig> },
52-
5353
#[cfg(feature = "storage-azblob")]
5454
Azblob { config: Arc<AzblobConfig> },
55+
#[cfg(feature = "storage-oss")]
56+
Oss { config: Arc<OssConfig> },
57+
/// Expects paths of the form
58+
/// `abfs[s]://<filesystem>@<account>.dfs.<endpoint-suffix>/<path>` or
59+
/// `wasb[s]://<container>@<account>.blob.<endpoint-suffix>/<path>`.
60+
#[cfg(feature = "storage-azdls")]
61+
Azdls {
62+
/// Because Azdls accepts multiple possible schemes, we store the full
63+
/// passed scheme here to later validate schemes passed via paths.
64+
configured_scheme: AzureStorageScheme,
65+
config: Arc<AzdlsConfig>,
66+
},
5567
}
5668

5769
impl Storage {
@@ -67,8 +79,7 @@ impl Storage {
6779
Scheme::Fs => Ok(Self::LocalFs),
6880
#[cfg(feature = "storage-s3")]
6981
Scheme::S3 => Ok(Self::S3 {
70-
scheme_str,
71-
client: reqwest::Client::new(),
82+
configured_scheme: scheme_str,
7283
config: super::s3_config_parse(props)?.into(),
7384
}),
7485
#[cfg(feature = "storage-gcs")]
@@ -79,6 +90,14 @@ impl Storage {
7990
Scheme::Azblob => Ok(Self::Azblob {
8091
config: super::azblob_config_parse(props)?.into(),
8192
}),
93+
#[cfg(feature = "storage-azdls")]
94+
Scheme::Azdls => {
95+
let scheme = scheme_str.parse::<AzureStorageScheme>()?;
96+
Ok(Self::Azdls {
97+
config: super::azdls_config_parse(props)?.into(),
98+
configured_scheme: scheme,
99+
})
100+
}
82101
// Update doc on [`FileIO`] when adding new schemes.
83102
_ => Err(Error::new(
84103
ErrorKind::FeatureUnsupported,
@@ -125,15 +144,14 @@ impl Storage {
125144
}
126145
#[cfg(feature = "storage-s3")]
127146
Storage::S3 {
128-
scheme_str,
129-
client,
147+
configured_scheme,
130148
config,
131149
} => {
132-
let op = super::s3_config_build(client, config, path)?;
150+
let op = super::s3_config_build(config, path)?;
133151
let op_info = op.info();
134152

135153
// Check prefix of s3 path.
136-
let prefix = format!("{}://{}/", scheme_str, op_info.name());
154+
let prefix = format!("{}://{}/", configured_scheme, op_info.name());
137155
if path.starts_with(&prefix) {
138156
Ok((op, &path[prefix.len()..]))
139157
} else {
@@ -169,11 +187,18 @@ impl Storage {
169187
))
170188
}
171189
}
190+
#[cfg(feature = "storage-azdls")]
191+
Storage::Azdls {
192+
configured_scheme,
193+
config,
194+
} => super::azdls_create_operator(path, config, configured_scheme),
172195
#[cfg(all(
173196
not(feature = "storage-s3"),
174197
not(feature = "storage-fs"),
175198
not(feature = "storage-gcs"),
176-
not(feature = "storage-azblob")
199+
not(feature = "storage-azblob"),
200+
not(feature = "storage-oss"),
201+
// not(feature = "storage-azdls"),
177202
))]
178203
_ => Err(Error::new(
179204
ErrorKind::FeatureUnsupported,
@@ -195,6 +220,8 @@ impl Storage {
195220
"file" | "" => Ok(Scheme::Fs),
196221
"s3" | "s3a" => Ok(Scheme::S3),
197222
"gs" | "gcs" => Ok(Scheme::Gcs),
223+
"oss" => Ok(Scheme::Oss),
224+
"abfss" | "abfs" | "wasbs" | "wasb" => Ok(Scheme::Azdls),
198225
s => Ok(s.parse::<Scheme>()?),
199226
}
200227
}

0 commit comments

Comments
 (0)