diff --git a/datafusion_iceberg/src/table.rs b/datafusion_iceberg/src/table.rs index 07447119..6d1cfbc5 100644 --- a/datafusion_iceberg/src/table.rs +++ b/datafusion_iceberg/src/table.rs @@ -261,6 +261,29 @@ impl TableProvider for DataFusionTable { } } +// Create a fake object store URL. Different table paths should produce fake URLs +// that differ in the host name, because DF's DefaultObjectStoreRegistry only takes +// hostname into account for object store keys +fn fake_object_store_url(table_location_url: &str) -> Option { + let mut u = url::Url::parse(table_location_url).ok()?; + u.set_host(Some(&format!( + "{}-{}", + u.host_str().unwrap_or(""), + // Hex-encode the path to ensure it produces a valid hostname + u.path() + .as_bytes() + .iter() + .map(|b| format!("{:02x}", b)) + .collect::>() + .join("") + ))) + .unwrap(); + u.set_path(""); + u.set_query(None); + u.set_fragment(None); + ObjectStoreUrl::parse(&u).ok() +} + #[allow(clippy::too_many_arguments)] async fn table_scan( table: &Table, @@ -278,8 +301,8 @@ async fn table_scan( .unwrap_or_else(|| table.current_schema(None).unwrap().clone()); // Create a unique URI for this particular object store - let object_store_url = ObjectStoreUrl::parse(&table.metadata().location) - .unwrap_or_else(|_| ObjectStoreUrl::local_filesystem()); + let object_store_url = fake_object_store_url(&table.metadata().location) + .unwrap_or_else(ObjectStoreUrl::local_filesystem); session .runtime_env() .register_object_store(object_store_url.as_ref(), table.object_store()); @@ -359,7 +382,8 @@ async fn table_scan( let manifests = table .manifests(snapshot_range.0, snapshot_range.1) - .await.map_err(DataFusionIcebergError::from)?; + .await + .map_err(DataFusionIcebergError::from)?; // If there is a filter expression on the partition column, the manifest files to read are pruned. let data_files: Vec = if let Some(predicate) = partition_predicates { @@ -485,7 +509,7 @@ async fn table_scan( .with_pushdown_filters(true) } else { ParquetSource::default() - } + }, ); // Create plan for every partition with delete files @@ -577,11 +601,14 @@ async fn table_scan( let delete_file_source = Arc::new( if let Some(physical_predicate) = physical_predicate.clone() { ParquetSource::default() - .with_predicate(Arc::clone(&delete_file_schema), physical_predicate) + .with_predicate( + Arc::clone(&delete_file_schema), + physical_predicate, + ) .with_pushdown_filters(true) } else { ParquetSource::default() - } + }, ); let delete_file_scan_config = FileScanConfig::new( @@ -872,7 +899,9 @@ fn value_to_scalarvalue(value: &Value) -> Result { #[cfg(test)] mod tests { - use datafusion::{arrow::array::Int64Array, prelude::SessionContext}; + use datafusion::{ + arrow::array::Int64Array, execution::object_store::ObjectStoreUrl, prelude::SessionContext, + }; use iceberg_rust::{ catalog::tabular::Tabular, object_store::ObjectStoreBuilder, @@ -895,7 +924,7 @@ mod tests { use std::{ops::Deref, sync::Arc}; - use crate::{catalog::catalog::IcebergCatalog, DataFusionTable}; + use crate::{catalog::catalog::IcebergCatalog, table::fake_object_store_url, DataFusionTable}; #[tokio::test] pub async fn test_datafusion_table_insert() { @@ -1643,4 +1672,17 @@ mod tests { } } } + + #[test] + fn test_fake_object_store_url() { + assert_eq!( + fake_object_store_url("s3://a"), + Some(ObjectStoreUrl::parse("s3://a-").unwrap()), + ); + assert_eq!( + fake_object_store_url("s3://a/b"), + Some(ObjectStoreUrl::parse("s3://a-2f62").unwrap()), + ); + assert_eq!(fake_object_store_url("invalid url"), None); + } }