Skip to content

Commit 9af1cd9

Browse files
author
Jordan Epstein
committed
feat: implement native Iceberg V2 writer via iceberg-rust
1 parent 92d58ea commit 9af1cd9

28 files changed

Lines changed: 3748 additions & 335 deletions

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ jobs:
290290
org.apache.comet.CometIcebergWriteActionSuite
291291
org.apache.comet.CometIcebergWriteDetectionSuite
292292
org.apache.comet.iceberg.IcebergReflectionSuite
293+
org.apache.comet.serde.operator.IcebergWriteProtoTranslationSuite
293294
org.apache.comet.csv.CometCsvNativeReadSuite
294295
org.apache.comet.CometFuzzTestSuite
295296
org.apache.comet.CometFuzzIcebergSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ jobs:
132132
org.apache.comet.CometIcebergWriteActionSuite
133133
org.apache.comet.CometIcebergWriteDetectionSuite
134134
org.apache.comet.iceberg.IcebergReflectionSuite
135+
org.apache.comet.serde.operator.IcebergWriteProtoTranslationSuite
135136
org.apache.comet.csv.CometCsvNativeReadSuite
136137
org.apache.comet.CometFuzzTestSuite
137138
org.apache.comet.CometFuzzIcebergSuite

docs/source/user-guide/latest/iceberg-writes.md

Lines changed: 145 additions & 43 deletions
Large diffs are not rendered by default.

native/core/src/cloud/s3/credential_bridge.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ static WARNED_MISSING_EXPIRY: OnceCell<()> = OnceCell::new();
5353
#[derive(Debug, Clone, Copy)]
5454
pub enum AccessMode {
5555
Read = 0,
56-
#[allow(dead_code)]
5756
Write = 1,
5857
}
5958

native/core/src/execution/jni_api.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ fn op_name(op: &OpStruct) -> &'static str {
236236
OpStruct::Window(_) => "Window",
237237
OpStruct::NativeScan(_) => "NativeScan",
238238
OpStruct::IcebergScan(_) => "IcebergScan",
239+
OpStruct::IcebergWrite(_) => "IcebergWrite",
239240
OpStruct::ParquetWriter(_) => "ParquetWriter",
240241
OpStruct::Explode(_) => "Explode",
241242
OpStruct::CsvScan(_) => "CsvScan",
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Helpers shared between the Iceberg scan and Iceberg write operators.
19+
20+
use std::collections::HashMap;
21+
use std::sync::Arc;
22+
23+
use datafusion::common::DataFusionError;
24+
use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
25+
use iceberg_storage_opendal::{CustomAwsCredentialLoader, OpenDalStorageFactory};
26+
27+
use crate::cloud::s3::credential_bridge::{AccessMode, CometS3CredentialBridge};
28+
29+
/// Activation key for the `CometS3CredentialProvider` SPI, read from a catalog's `s3.*` property
30+
/// bag.
31+
const ICEBERG_PROVIDER_CLASS_PROPERTY: &str = "s3.comet.credential.provider.class";
32+
33+
/// Key prefixes forwarded to iceberg-rust's `FileIO`. The full unfiltered catalog bag (catalog
34+
/// URI, OAuth tokens, credentials.uri, tenant-id, etc.) is kept upstream so
35+
/// `CometS3CredentialBridge` can read whatever the vendor needs.
36+
const STORAGE_PROPERTY_PREFIXES: &[&str] = &["s3.", "gcs.", "adls.", "client."];
37+
38+
/// Pick an OpenDAL storage backend from a URI's scheme. `file` (or no scheme) falls through to
39+
/// the local file system. `memory` is used by the write path to assemble manifest bytes that
40+
/// stay entirely in-process. For S3, the Comet credential bridge is wired in when a provider
41+
/// class is configured; `access_mode` is forwarded to the JVM SPI so the read and write paths can
42+
/// be granted different (e.g. read-only vs read-write) credentials.
43+
pub(crate) fn storage_factory_for(
44+
path: &str,
45+
catalog_properties: &HashMap<String, String>,
46+
catalog_name: &str,
47+
access_mode: AccessMode,
48+
) -> Result<Arc<dyn StorageFactory>, DataFusionError> {
49+
let scheme = if path.contains("://") {
50+
path.split("://").next().unwrap_or("file")
51+
} else {
52+
"file"
53+
};
54+
match scheme {
55+
"file" => Ok(Arc::new(OpenDalStorageFactory::Fs)),
56+
"memory" => Ok(Arc::new(OpenDalStorageFactory::Memory)),
57+
"s3" | "s3a" => {
58+
let customized_credential_load =
59+
build_s3_credential_loader(path, catalog_properties, catalog_name, access_mode);
60+
Ok(Arc::new(OpenDalStorageFactory::S3 {
61+
customized_credential_load,
62+
}))
63+
}
64+
"gs" => Ok(Arc::new(OpenDalStorageFactory::Gcs)),
65+
"oss" => Ok(Arc::new(OpenDalStorageFactory::Oss)),
66+
_ => Err(DataFusionError::Execution(format!(
67+
"Unsupported storage scheme: {scheme}"
68+
))),
69+
}
70+
}
71+
72+
/// Build a `FileIO` whose storage scheme is inferred from `reference_path` and whose properties
73+
/// come from the catalog. The reference path is the metadata location for reads or the data
74+
/// location for writes — anything that carries the right URI scheme. `catalog_name` is the
75+
/// credential dispatch key and `access_mode` is the access intent forwarded to the S3 credential
76+
/// bridge, so the write path can request write-capable credentials.
77+
pub(crate) fn load_file_io(
78+
catalog_properties: &HashMap<String, String>,
79+
reference_path: &str,
80+
catalog_name: &str,
81+
access_mode: AccessMode,
82+
) -> Result<FileIO, DataFusionError> {
83+
let factory = storage_factory_for(
84+
reference_path,
85+
catalog_properties,
86+
catalog_name,
87+
access_mode,
88+
)?;
89+
let mut file_io_builder = FileIOBuilder::new(factory);
90+
91+
// Narrow to storage-prefix keys before forwarding to iceberg-rust's FileIO. The full
92+
// unfiltered bag (catalog URI, OAuth tokens, credentials.uri, tenant-id, etc.) is kept
93+
// upstream so CometS3CredentialBridge can read whatever the vendor needs.
94+
for (key, value) in catalog_properties {
95+
if STORAGE_PROPERTY_PREFIXES.iter().any(|p| key.starts_with(p)) {
96+
file_io_builder = file_io_builder.with_prop(key, value);
97+
}
98+
}
99+
100+
Ok(file_io_builder.build())
101+
}
102+
103+
/// Wires the configured Comet credential provider into opendal's S3 service, or returns `None`
104+
/// so opendal falls back to its default credential chain.
105+
fn build_s3_credential_loader(
106+
reference_path: &str,
107+
catalog_properties: &HashMap<String, String>,
108+
catalog_name: &str,
109+
access_mode: AccessMode,
110+
) -> Option<CustomAwsCredentialLoader> {
111+
let url = url::Url::parse(reference_path).ok()?;
112+
let bucket = url.host_str()?;
113+
let provider_class = catalog_properties
114+
.get(ICEBERG_PROVIDER_CLASS_PROPERTY)
115+
.map(|s| s.trim())
116+
.filter(|s| !s.is_empty())?;
117+
// Fall back to the bucket when the table has no catalog identity (e.g. HadoopTables loaded by
118+
// raw path).
119+
let dispatch_key: &str = if catalog_name.is_empty() {
120+
bucket
121+
} else {
122+
catalog_name
123+
};
124+
let bridge = CometS3CredentialBridge::new(
125+
provider_class,
126+
dispatch_key,
127+
bucket,
128+
url.path(),
129+
access_mode,
130+
catalog_properties,
131+
);
132+
match bridge {
133+
Ok(b) => Some(CustomAwsCredentialLoader::new(b)),
134+
Err(e) => {
135+
log::warn!(
136+
"Failed to initialize CometS3CredentialBridge for {provider_class}: {e}; \
137+
falling back to default opendal credential chain"
138+
);
139+
None
140+
}
141+
}
142+
}

native/core/src/execution/operators/iceberg_scan.rs

Lines changed: 4 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -39,22 +39,16 @@ use datafusion::physical_plan::{
3939
};
4040
use futures::{Stream, StreamExt, TryStreamExt};
4141
use iceberg::arrow::ScanMetrics;
42-
use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
43-
use iceberg_storage_opendal::CustomAwsCredentialLoader;
44-
use iceberg_storage_opendal::OpenDalStorageFactory;
4542

46-
use crate::cloud::s3::credential_bridge::{AccessMode, CometS3CredentialBridge};
43+
use crate::cloud::s3::credential_bridge::AccessMode;
44+
use crate::execution::operators::iceberg_common::load_file_io;
4745
use crate::execution::operators::ExecutionError;
4846
use crate::parquet::parquet_support::SparkParquetOptions;
4947
use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory;
5048
use datafusion_comet_spark_expr::EvalMode;
5149
use datafusion_physical_expr_adapter::{PhysicalExprAdapter, PhysicalExprAdapterFactory};
5250
use iceberg::scan::FileScanTask;
5351

54-
/// Activation key for the `CometS3CredentialProvider` SPI on the Iceberg path, read from a Spark
55-
/// catalog's `s3.*` property bag.
56-
const ICEBERG_PROVIDER_CLASS_PROPERTY: &str = "s3.comet.credential.provider.class";
57-
5852
/// Iceberg table scan operator that uses iceberg-rust to read Iceberg tables.
5953
///
6054
/// Executes pre-planned FileScanTasks for efficient parallel scanning.
@@ -166,10 +160,11 @@ impl IcebergScanExec {
166160
context: Arc<TaskContext>,
167161
) -> DFResult<SendableRecordBatchStream> {
168162
let output_schema = Arc::clone(&self.output_schema);
169-
let file_io = Self::load_file_io(
163+
let file_io = load_file_io(
170164
&self.catalog_properties,
171165
&self.metadata_location,
172166
&self.catalog_name,
167+
AccessMode::Read,
173168
)?;
174169
let batch_size = context.session_config().batch_size();
175170

@@ -214,96 +209,6 @@ impl IcebergScanExec {
214209

215210
Ok(Box::pin(wrapped_stream))
216211
}
217-
218-
fn storage_factory_for(
219-
path: &str,
220-
catalog_properties: &HashMap<String, String>,
221-
catalog_name: &str,
222-
) -> Result<Arc<dyn StorageFactory>, DataFusionError> {
223-
let scheme = if path.contains("://") {
224-
path.split("://").next().unwrap_or("file")
225-
} else {
226-
"file"
227-
};
228-
match scheme {
229-
"file" => Ok(Arc::new(OpenDalStorageFactory::Fs)),
230-
"s3" | "s3a" => {
231-
let customized_credential_load =
232-
build_s3_credential_loader(path, catalog_properties, catalog_name);
233-
Ok(Arc::new(OpenDalStorageFactory::S3 {
234-
customized_credential_load,
235-
}))
236-
}
237-
"gs" => Ok(Arc::new(OpenDalStorageFactory::Gcs)),
238-
"oss" => Ok(Arc::new(OpenDalStorageFactory::Oss)),
239-
_ => Err(DataFusionError::Execution(format!(
240-
"Unsupported storage scheme: {scheme}"
241-
))),
242-
}
243-
}
244-
245-
fn load_file_io(
246-
catalog_properties: &HashMap<String, String>,
247-
metadata_location: &str,
248-
catalog_name: &str,
249-
) -> Result<FileIO, DataFusionError> {
250-
let factory =
251-
Self::storage_factory_for(metadata_location, catalog_properties, catalog_name)?;
252-
let mut file_io_builder = FileIOBuilder::new(factory);
253-
254-
// Narrow to storage-prefix keys before forwarding to iceberg-rust's FileIO. The full
255-
// unfiltered bag (catalog URI, OAuth tokens, credentials.uri, tenant-id, etc.) is kept
256-
// upstream so CometS3CredentialBridge can read whatever the vendor needs.
257-
for (key, value) in catalog_properties {
258-
if STORAGE_PROPERTY_PREFIXES.iter().any(|p| key.starts_with(p)) {
259-
file_io_builder = file_io_builder.with_prop(key, value);
260-
}
261-
}
262-
263-
Ok(file_io_builder.build())
264-
}
265-
}
266-
267-
const STORAGE_PROPERTY_PREFIXES: &[&str] = &["s3.", "gcs.", "adls.", "client."];
268-
269-
/// Wires the configured Comet credential provider into opendal's S3 service, or returns `None`
270-
/// so opendal falls back to its default credential chain.
271-
fn build_s3_credential_loader(
272-
metadata_location: &str,
273-
catalog_properties: &HashMap<String, String>,
274-
catalog_name: &str,
275-
) -> Option<CustomAwsCredentialLoader> {
276-
let url = url::Url::parse(metadata_location).ok()?;
277-
let bucket = url.host_str()?;
278-
let provider_class = catalog_properties
279-
.get(ICEBERG_PROVIDER_CLASS_PROPERTY)
280-
.map(|s| s.trim())
281-
.filter(|s| !s.is_empty())?;
282-
// Fall back to the bucket when the table has no catalog identity (e.g. HadoopTables loaded by
283-
// raw path).
284-
let dispatch_key: &str = if catalog_name.is_empty() {
285-
bucket
286-
} else {
287-
catalog_name
288-
};
289-
let bridge = CometS3CredentialBridge::new(
290-
provider_class,
291-
dispatch_key,
292-
bucket,
293-
url.path(),
294-
AccessMode::Read,
295-
catalog_properties,
296-
);
297-
match bridge {
298-
Ok(b) => Some(CustomAwsCredentialLoader::new(b)),
299-
Err(e) => {
300-
log::warn!(
301-
"Failed to initialize CometS3CredentialBridge for {provider_class}: {e}; \
302-
falling back to default opendal credential chain"
303-
);
304-
None
305-
}
306-
}
307212
}
308213

309214
/// Metrics for IcebergScanExec

0 commit comments

Comments
 (0)