Skip to content

Commit 06ccb32

Browse files
committed
Add ADLS storage support
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
1 parent aa24cf4 commit 06ccb32

5 files changed

Lines changed: 190 additions & 24 deletions

File tree

crates/iceberg/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ repository = { workspace = true }
3030

3131
[features]
3232
default = ["storage-memory", "storage-fs", "storage-s3", "tokio"]
33-
storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"]
33+
storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs", "storage-azdls"]
3434

35+
storage-azdls = ["opendal/services-azdls"]
3536
storage-fs = ["opendal/services-fs"]
3637
storage-gcs = ["opendal/services-gcs"]
3738
storage-memory = ["opendal/services-memory"]

crates/iceberg/src/io/file_io.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,13 @@ use crate::{Error, ErrorKind, Result};
3535
///
3636
/// Supported storages:
3737
///
38-
/// | Storage | Feature Flag | Schemes |
39-
/// |--------------------|-------------------|------------|
40-
/// | Local file system | `storage-fs` | `file` |
41-
/// | Memory | `storage-memory` | `memory` |
42-
/// | S3 | `storage-s3` | `s3`, `s3a`|
43-
/// | GCS | `storage-gcs` | `gs`, `gcs`|
38+
/// | Storage | Feature Flag | Schemes |
39+
/// |--------------------|-------------------|-----------------|
40+
/// | Local file system | `storage-fs` | `file` |
41+
/// | Memory | `storage-memory` | `memory` |
42+
/// | S3 | `storage-s3` | `s3`, `s3a` |
43+
/// | GCS | `storage-gcs` | `gs`, `gcs` |
44+
/// | Azure Datalake | `storage-azdls` | `adfs`, `adfss` |
4445
#[derive(Clone, Debug)]
4546
pub struct FileIO {
4647
builder: FileIOBuilder,

crates/iceberg/src/io/mod.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,32 +67,36 @@
6767
//! - `new_output`: Create output file for writing.
6868
6969
mod file_io;
70+
mod storage;
71+
7072
pub use file_io::*;
73+
pub(crate) mod object_cache;
7174

72-
mod storage;
75+
#[cfg(feature = "storage-azdls")]
76+
mod storage_azdls;
77+
#[cfg(feature = "storage-fs")]
78+
mod storage_fs;
79+
#[cfg(feature = "storage-gcs")]
80+
mod storage_gcs;
7381
#[cfg(feature = "storage-memory")]
7482
mod storage_memory;
75-
#[cfg(feature = "storage-memory")]
76-
use storage_memory::*;
83+
#[cfg(feature = "storage-oss")]
84+
mod storage_oss;
7785
#[cfg(feature = "storage-s3")]
7886
mod storage_s3;
79-
#[cfg(feature = "storage-s3")]
80-
pub use storage_s3::*;
81-
pub(crate) mod object_cache;
82-
#[cfg(feature = "storage-fs")]
83-
mod storage_fs;
8487

88+
#[cfg(feature = "storage-azdls")]
89+
pub use storage_azdls::*;
8590
#[cfg(feature = "storage-fs")]
8691
use storage_fs::*;
8792
#[cfg(feature = "storage-gcs")]
88-
mod storage_gcs;
89-
#[cfg(feature = "storage-gcs")]
9093
pub use storage_gcs::*;
91-
92-
#[cfg(feature = "storage-oss")]
93-
mod storage_oss;
94+
#[cfg(feature = "storage-memory")]
95+
use storage_memory::*;
9496
#[cfg(feature = "storage-oss")]
9597
pub use storage_oss::*;
98+
#[cfg(feature = "storage-s3")]
99+
pub use storage_s3::*;
96100

97101
pub(crate) fn is_truthy(value: &str) -> bool {
98102
["true", "t", "1", "on"].contains(&value.to_lowercase().as_str())

crates/iceberg/src/io/storage.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
use std::sync::Arc;
1919

2020
use opendal::layers::RetryLayer;
21+
#[cfg(feature = "storage-azdls")]
22+
use opendal::services::AzdlsConfig;
2123
#[cfg(feature = "storage-gcs")]
2224
use opendal::services::GcsConfig;
2325
#[cfg(feature = "storage-oss")]
@@ -43,10 +45,17 @@ pub(crate) enum Storage {
4345
scheme_str: String,
4446
config: Arc<S3Config>,
4547
},
46-
#[cfg(feature = "storage-oss")]
47-
Oss { config: Arc<OssConfig> },
4848
#[cfg(feature = "storage-gcs")]
4949
Gcs { config: Arc<GcsConfig> },
50+
#[cfg(feature = "storage-oss")]
51+
Oss { config: Arc<OssConfig> },
52+
#[cfg(feature = "storage-azdls")]
53+
Azdls {
54+
/// Azdls storage may have `abfs://` or `abfss://`.
55+
/// Storing the scheme string here to return the correct path.
56+
scheme_str: String,
57+
config: Arc<AzdlsConfig>,
58+
},
5059
}
5160

5261
impl Storage {
@@ -73,6 +82,10 @@ impl Storage {
7382
Scheme::Oss => Ok(Self::Oss {
7483
config: super::oss_config_parse(props)?.into(),
7584
}),
85+
Scheme::Azdls => Ok(Self::Azdls {
86+
scheme_str,
87+
config: super::azdls_config_parse(props)?.into(),
88+
}),
7689
// Update doc on [`FileIO`] when adding new schemes.
7790
_ => Err(Error::new(
7891
ErrorKind::FeatureUnsupported,
@@ -133,7 +146,6 @@ impl Storage {
133146
))
134147
}
135148
}
136-
137149
#[cfg(feature = "storage-gcs")]
138150
Storage::Gcs { config } => {
139151
let operator = super::gcs_config_build(config, path)?;
@@ -162,11 +174,28 @@ impl Storage {
162174
))
163175
}
164176
}
177+
#[cfg(feature = "storage-azdls")]
178+
Storage::Azdls { scheme_str, config } => {
179+
let op = super::azdls_config_build(config, path)?;
180+
181+
let filesystem = op.info().name();
182+
let prefix = format!("{}://{}", scheme_str, filesystem);
183+
184+
if path.starts_with(&prefix) {
185+
Ok((op, &path[prefix.len()..]))
186+
} else {
187+
Err(Error::new(
188+
ErrorKind::DataInvalid,
189+
format!("Invalid azdls url: {}, should start with {}", path, prefix),
190+
))
191+
}
192+
}
165193
#[cfg(all(
166194
not(feature = "storage-s3"),
167195
not(feature = "storage-fs"),
168196
not(feature = "storage-gcs"),
169-
not(feature = "storage-oss")
197+
not(feature = "storage-oss"),
198+
not(feature = "storage-azdls"),
170199
))]
171200
_ => Err(Error::new(
172201
ErrorKind::FeatureUnsupported,
@@ -188,6 +217,7 @@ impl Storage {
188217
"file" | "" => Ok(Scheme::Fs),
189218
"s3" | "s3a" => Ok(Scheme::S3),
190219
"gs" | "gcs" => Ok(Scheme::Gcs),
220+
"abfs" | "abfss" => Ok(Scheme::Azdls),
191221
"oss" => Ok(Scheme::Oss),
192222
s => Ok(s.parse::<Scheme>()?),
193223
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
20+
use opendal::Configurator;
21+
use opendal::services::AzdlsConfig;
22+
use url::Url;
23+
24+
use crate::{Error, ErrorKind, Result};
25+
26+
/// A connection string.
27+
///
28+
/// This could be used to use FileIO with any adls-compatible object storage
29+
/// service that has a different endpoint (like Azurite).
30+
///
31+
/// Note, this string is parsed first, and any other passed adls.* properties
32+
/// will override values from the connection string.
33+
const ADLS_CONNECTION_STRING: &str = "adls.connection-string";
34+
35+
/// The account that you want to connect to.
36+
pub const ADLS_ACCOUNT_NAME: &str = "adls.account-name";
37+
38+
/// The key to authentication against the account.
39+
pub const ADLS_ACCOUNT_KEY: &str = "adls.account-key";
40+
41+
/// The shared access signature.
42+
pub const ADLS_SAS_TOKEN: &str = "adls.sas-token";
43+
44+
/// The tenant-id.
45+
pub const ADLS_TENANT_ID: &str = "adls.tenant-id";
46+
47+
/// The client-id.
48+
pub const ADLS_CLIENT_ID: &str = "adls.client-id";
49+
50+
/// The client-secret.
51+
pub const ADLS_CLIENT_SECRET: &str = "adls.client-secret";
52+
53+
/// Parses adls.* prefixed configuration properties.
54+
pub(crate) fn azdls_config_parse(mut m: HashMap<String, String>) -> Result<AzdlsConfig> {
55+
let mut cfg = AzdlsConfig::default();
56+
57+
if let Some(_conn_str) = m.remove(ADLS_CONNECTION_STRING) {
58+
return Err(Error::new(
59+
ErrorKind::FeatureUnsupported,
60+
"Azdls: connection string currently not supported",
61+
));
62+
}
63+
64+
if let Some(account_name) = m.remove(ADLS_ACCOUNT_NAME) {
65+
cfg.account_name = Some(account_name);
66+
}
67+
68+
if let Some(account_key) = m.remove(ADLS_ACCOUNT_KEY) {
69+
cfg.account_key = Some(account_key);
70+
}
71+
72+
if let Some(sas_token) = m.remove(ADLS_SAS_TOKEN) {
73+
cfg.sas_token = Some(sas_token);
74+
}
75+
76+
if let Some(tenant_id) = m.remove(ADLS_TENANT_ID) {
77+
cfg.tenant_id = Some(tenant_id);
78+
}
79+
80+
if let Some(client_id) = m.remove(ADLS_CLIENT_ID) {
81+
cfg.client_id = Some(client_id);
82+
}
83+
84+
if let Some(client_secret) = m.remove(ADLS_CLIENT_SECRET) {
85+
cfg.client_secret = Some(client_secret);
86+
}
87+
88+
Ok(cfg)
89+
}
90+
91+
pub(crate) fn azdls_config_build(cfg: &AzdlsConfig, path: &str) -> Result<opendal::Operator> {
92+
let url = Url::parse(path)?;
93+
94+
// This is ok to be empty, OpenDAL will use the default filesystem.
95+
let filesystem = url.username();
96+
97+
let builder = cfg.clone().into_builder().filesystem(filesystem);
98+
99+
Ok(opendal::Operator::new(builder)?.finish())
100+
}
101+
102+
#[cfg(test)]
103+
mod tests {
104+
use opendal::services::AzdlsConfig;
105+
106+
use crate::io::{azdls_config_build, azdls_config_parse};
107+
108+
#[test]
109+
fn test_azdls_config_parse() {
110+
let mut m = std::collections::HashMap::new();
111+
m.insert(super::ADLS_ACCOUNT_NAME.to_string(), "test".to_string());
112+
m.insert(super::ADLS_ACCOUNT_KEY.to_string(), "secret".to_string());
113+
114+
let config = azdls_config_parse(m).unwrap();
115+
116+
assert_eq!(config.account_name, Some("test".to_string()));
117+
assert_eq!(config.account_key, Some("secret".to_string()));
118+
}
119+
120+
#[test]
121+
fn test_azdls_config_build() {
122+
let mut config = AzdlsConfig::default();
123+
config.endpoint = Some("https://myaccount.dfs.core.windows.net".to_string());
124+
125+
let path = "abfss://myfs@myaccount.dfs.core.windows.net/mydir/myfile.parquet";
126+
127+
let op = azdls_config_build(&config, path).unwrap();
128+
assert_eq!(op.info().name(), "myfs");
129+
}
130+
}

0 commit comments

Comments
 (0)