Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,10 @@ jobs:
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
org.apache.comet.CometIcebergRewriteActionSuite
org.apache.comet.CometIcebergWriteActionSuite
org.apache.comet.CometIcebergWriteDetectionSuite
org.apache.comet.iceberg.IcebergReflectionSuite
org.apache.comet.serde.operator.IcebergWriteProtoTranslationSuite
org.apache.comet.csv.CometCsvNativeReadSuite
org.apache.comet.CometFuzzTestSuite
org.apache.comet.CometFuzzIcebergSuite
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ jobs:
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
org.apache.comet.CometIcebergRewriteActionSuite
org.apache.comet.CometIcebergWriteActionSuite
org.apache.comet.CometIcebergWriteDetectionSuite
org.apache.comet.iceberg.IcebergReflectionSuite
org.apache.comet.serde.operator.IcebergWriteProtoTranslationSuite
org.apache.comet.csv.CometCsvNativeReadSuite
org.apache.comet.CometFuzzTestSuite
org.apache.comet.CometFuzzIcebergSuite
Expand Down
331 changes: 331 additions & 0 deletions docs/source/user-guide/latest/iceberg-writes.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/source/user-guide/latest/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ to read more.
:hidden:

Iceberg Guide <iceberg>
Iceberg Writes <iceberg-writes>
S3 Credential Providers <s3-credential-providers>
Kubernetes Guide <kubernetes>

Expand Down
1 change: 0 additions & 1 deletion native/core/src/cloud/s3/credential_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ static WARNED_MISSING_EXPIRY: OnceCell<()> = OnceCell::new();
#[derive(Debug, Clone, Copy)]
pub enum AccessMode {
Read = 0,
#[allow(dead_code)]
Write = 1,
}

Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ fn op_name(op: &OpStruct) -> &'static str {
OpStruct::Window(_) => "Window",
OpStruct::NativeScan(_) => "NativeScan",
OpStruct::IcebergScan(_) => "IcebergScan",
OpStruct::IcebergWrite(_) => "IcebergWrite",
OpStruct::ParquetWriter(_) => "ParquetWriter",
OpStruct::Explode(_) => "Explode",
OpStruct::CsvScan(_) => "CsvScan",
Expand Down
142 changes: 142 additions & 0 deletions native/core/src/execution/operators/iceberg_common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Helpers shared between the Iceberg scan and Iceberg write operators.

use std::collections::HashMap;
use std::sync::Arc;

use datafusion::common::DataFusionError;
use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
use iceberg_storage_opendal::{CustomAwsCredentialLoader, OpenDalStorageFactory};

use crate::cloud::s3::credential_bridge::{AccessMode, CometS3CredentialBridge};

/// Activation key for the `CometS3CredentialProvider` SPI, read from a catalog's `s3.*` property
/// bag.
const ICEBERG_PROVIDER_CLASS_PROPERTY: &str = "s3.comet.credential.provider.class";

/// Key prefixes forwarded to iceberg-rust's `FileIO`. The full unfiltered catalog bag (catalog
/// URI, OAuth tokens, credentials.uri, tenant-id, etc.) is kept upstream so
/// `CometS3CredentialBridge` can read whatever the vendor needs.
const STORAGE_PROPERTY_PREFIXES: &[&str] = &["s3.", "gcs.", "adls.", "client."];

/// Pick an OpenDAL storage backend from a URI's scheme. `file` (or no scheme) falls through to
/// the local file system. `memory` is used by the write path to assemble manifest bytes that
/// stay entirely in-process. For S3, the Comet credential bridge is wired in when a provider
/// class is configured; `access_mode` is forwarded to the JVM SPI so the read and write paths can
/// be granted different (e.g. read-only vs read-write) credentials.
pub(crate) fn storage_factory_for(
path: &str,
catalog_properties: &HashMap<String, String>,
catalog_name: &str,
access_mode: AccessMode,
) -> Result<Arc<dyn StorageFactory>, DataFusionError> {
let scheme = if path.contains("://") {
path.split("://").next().unwrap_or("file")
} else {
"file"
};
match scheme {
"file" => Ok(Arc::new(OpenDalStorageFactory::Fs)),
"memory" => Ok(Arc::new(OpenDalStorageFactory::Memory)),
"s3" | "s3a" => {
let customized_credential_load =
build_s3_credential_loader(path, catalog_properties, catalog_name, access_mode);
Ok(Arc::new(OpenDalStorageFactory::S3 {
customized_credential_load,
}))
}
"gs" => Ok(Arc::new(OpenDalStorageFactory::Gcs)),
"oss" => Ok(Arc::new(OpenDalStorageFactory::Oss)),
_ => Err(DataFusionError::Execution(format!(
"Unsupported storage scheme: {scheme}"
))),
}
}

/// Build a `FileIO` whose storage scheme is inferred from `reference_path` and whose properties
/// come from the catalog. The reference path is the metadata location for reads or the data
/// location for writes — anything that carries the right URI scheme. `catalog_name` is the
/// credential dispatch key and `access_mode` is the access intent forwarded to the S3 credential
/// bridge, so the write path can request write-capable credentials.
pub(crate) fn load_file_io(
catalog_properties: &HashMap<String, String>,
reference_path: &str,
catalog_name: &str,
access_mode: AccessMode,
) -> Result<FileIO, DataFusionError> {
let factory = storage_factory_for(
reference_path,
catalog_properties,
catalog_name,
access_mode,
)?;
let mut file_io_builder = FileIOBuilder::new(factory);

// Narrow to storage-prefix keys before forwarding to iceberg-rust's FileIO. The full
// unfiltered bag (catalog URI, OAuth tokens, credentials.uri, tenant-id, etc.) is kept
// upstream so CometS3CredentialBridge can read whatever the vendor needs.
for (key, value) in catalog_properties {
if STORAGE_PROPERTY_PREFIXES.iter().any(|p| key.starts_with(p)) {
file_io_builder = file_io_builder.with_prop(key, value);
}
}

Ok(file_io_builder.build())
}

/// Wires the configured Comet credential provider into opendal's S3 service, or returns `None`
/// so opendal falls back to its default credential chain.
fn build_s3_credential_loader(
reference_path: &str,
catalog_properties: &HashMap<String, String>,
catalog_name: &str,
access_mode: AccessMode,
) -> Option<CustomAwsCredentialLoader> {
let url = url::Url::parse(reference_path).ok()?;
let bucket = url.host_str()?;
let provider_class = catalog_properties
.get(ICEBERG_PROVIDER_CLASS_PROPERTY)
.map(|s| s.trim())
.filter(|s| !s.is_empty())?;
// Fall back to the bucket when the table has no catalog identity (e.g. HadoopTables loaded by
// raw path).
let dispatch_key: &str = if catalog_name.is_empty() {
bucket
} else {
catalog_name
};
let bridge = CometS3CredentialBridge::new(
provider_class,
dispatch_key,
bucket,
url.path(),
access_mode,
catalog_properties,
);
match bridge {
Ok(b) => Some(CustomAwsCredentialLoader::new(b)),
Err(e) => {
log::warn!(
"Failed to initialize CometS3CredentialBridge for {provider_class}: {e}; \
falling back to default opendal credential chain"
);
None
}
}
}
103 changes: 4 additions & 99 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,16 @@ use datafusion::physical_plan::{
};
use futures::{Stream, StreamExt, TryStreamExt};
use iceberg::arrow::ScanMetrics;
use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
use iceberg_storage_opendal::CustomAwsCredentialLoader;
use iceberg_storage_opendal::OpenDalStorageFactory;

use crate::cloud::s3::credential_bridge::{AccessMode, CometS3CredentialBridge};
use crate::cloud::s3::credential_bridge::AccessMode;
use crate::execution::operators::iceberg_common::load_file_io;
use crate::execution::operators::ExecutionError;
use crate::parquet::parquet_support::SparkParquetOptions;
use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory;
use datafusion_comet_spark_expr::EvalMode;
use datafusion_physical_expr_adapter::{PhysicalExprAdapter, PhysicalExprAdapterFactory};
use iceberg::scan::FileScanTask;

/// Activation key for the `CometS3CredentialProvider` SPI on the Iceberg path, read from a Spark
/// catalog's `s3.*` property bag.
const ICEBERG_PROVIDER_CLASS_PROPERTY: &str = "s3.comet.credential.provider.class";

/// Iceberg table scan operator that uses iceberg-rust to read Iceberg tables.
///
/// Executes pre-planned FileScanTasks for efficient parallel scanning.
Expand Down Expand Up @@ -166,10 +160,11 @@ impl IcebergScanExec {
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let output_schema = Arc::clone(&self.output_schema);
let file_io = Self::load_file_io(
let file_io = load_file_io(
&self.catalog_properties,
&self.metadata_location,
&self.catalog_name,
AccessMode::Read,
)?;
let batch_size = context.session_config().batch_size();

Expand Down Expand Up @@ -214,96 +209,6 @@ impl IcebergScanExec {

Ok(Box::pin(wrapped_stream))
}

fn storage_factory_for(
path: &str,
catalog_properties: &HashMap<String, String>,
catalog_name: &str,
) -> Result<Arc<dyn StorageFactory>, DataFusionError> {
let scheme = if path.contains("://") {
path.split("://").next().unwrap_or("file")
} else {
"file"
};
match scheme {
"file" => Ok(Arc::new(OpenDalStorageFactory::Fs)),
"s3" | "s3a" => {
let customized_credential_load =
build_s3_credential_loader(path, catalog_properties, catalog_name);
Ok(Arc::new(OpenDalStorageFactory::S3 {
customized_credential_load,
}))
}
"gs" => Ok(Arc::new(OpenDalStorageFactory::Gcs)),
"oss" => Ok(Arc::new(OpenDalStorageFactory::Oss)),
_ => Err(DataFusionError::Execution(format!(
"Unsupported storage scheme: {scheme}"
))),
}
}

fn load_file_io(
catalog_properties: &HashMap<String, String>,
metadata_location: &str,
catalog_name: &str,
) -> Result<FileIO, DataFusionError> {
let factory =
Self::storage_factory_for(metadata_location, catalog_properties, catalog_name)?;
let mut file_io_builder = FileIOBuilder::new(factory);

// Narrow to storage-prefix keys before forwarding to iceberg-rust's FileIO. The full
// unfiltered bag (catalog URI, OAuth tokens, credentials.uri, tenant-id, etc.) is kept
// upstream so CometS3CredentialBridge can read whatever the vendor needs.
for (key, value) in catalog_properties {
if STORAGE_PROPERTY_PREFIXES.iter().any(|p| key.starts_with(p)) {
file_io_builder = file_io_builder.with_prop(key, value);
}
}

Ok(file_io_builder.build())
}
}

const STORAGE_PROPERTY_PREFIXES: &[&str] = &["s3.", "gcs.", "adls.", "client."];

/// Wires the configured Comet credential provider into opendal's S3 service, or returns `None`
/// so opendal falls back to its default credential chain.
fn build_s3_credential_loader(
metadata_location: &str,
catalog_properties: &HashMap<String, String>,
catalog_name: &str,
) -> Option<CustomAwsCredentialLoader> {
let url = url::Url::parse(metadata_location).ok()?;
let bucket = url.host_str()?;
let provider_class = catalog_properties
.get(ICEBERG_PROVIDER_CLASS_PROPERTY)
.map(|s| s.trim())
.filter(|s| !s.is_empty())?;
// Fall back to the bucket when the table has no catalog identity (e.g. HadoopTables loaded by
// raw path).
let dispatch_key: &str = if catalog_name.is_empty() {
bucket
} else {
catalog_name
};
let bridge = CometS3CredentialBridge::new(
provider_class,
dispatch_key,
bucket,
url.path(),
AccessMode::Read,
catalog_properties,
);
match bridge {
Ok(b) => Some(CustomAwsCredentialLoader::new(b)),
Err(e) => {
log::warn!(
"Failed to initialize CometS3CredentialBridge for {provider_class}: {e}; \
falling back to default opendal credential chain"
);
None
}
}
}

/// Metrics for IcebergScanExec
Expand Down
Loading
Loading