From f13b91a326e426304a661c946bb60ac1a158a091 Mon Sep 17 00:00:00 2001 From: Sergei Patiakin Date: Wed, 23 Apr 2025 10:08:02 +0200 Subject: [PATCH 1/5] Avoid store mixing for different paths --- datafusion_iceberg/src/table.rs | 38 +++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/datafusion_iceberg/src/table.rs b/datafusion_iceberg/src/table.rs index 07447119..63c785cc 100644 --- a/datafusion_iceberg/src/table.rs +++ b/datafusion_iceberg/src/table.rs @@ -261,6 +261,25 @@ impl TableProvider for DataFusionTable { } } +// Create a fake object store URL. Different table paths should produce fake URLs +// that differ in the host name, because DF's DefaultObjectStoreRegistry only takes +// hostname into account +fn fake_object_store_url(table_location_url: &str) -> Option { + let mut u = url::Url::parse(&table_location_url).ok()?; + u.set_host(Some(&format!( + "{}{}", + u.host_str().unwrap_or("-"), + u.path() + .replace(object_store::path::DELIMITER, "-") + .replace(':', "-") + ))) + .unwrap(); + u.set_path(""); + u.set_query(None); + u.set_fragment(None); + ObjectStoreUrl::parse(u.to_string()).ok() +} + #[allow(clippy::too_many_arguments)] async fn table_scan( table: &Table, @@ -278,8 +297,8 @@ async fn table_scan( .unwrap_or_else(|| table.current_schema(None).unwrap().clone()); // Create a unique URI for this particular object store - let object_store_url = ObjectStoreUrl::parse(&table.metadata().location) - .unwrap_or_else(|_| ObjectStoreUrl::local_filesystem()); + let object_store_url = fake_object_store_url(&table.metadata().location) + .unwrap_or_else(|| ObjectStoreUrl::local_filesystem()); session .runtime_env() .register_object_store(object_store_url.as_ref(), table.object_store()); @@ -872,7 +891,9 @@ fn value_to_scalarvalue(value: &Value) -> Result { #[cfg(test)] mod tests { - use datafusion::{arrow::array::Int64Array, prelude::SessionContext}; + use datafusion::{ + arrow::array::Int64Array, execution::object_store::ObjectStoreUrl, prelude::SessionContext, + }; use iceberg_rust::{ catalog::tabular::Tabular, object_store::ObjectStoreBuilder, @@ -895,7 +916,7 @@ mod tests { use std::{ops::Deref, sync::Arc}; - use crate::{catalog::catalog::IcebergCatalog, DataFusionTable}; + use crate::{catalog::catalog::IcebergCatalog, table::fake_object_store_url, DataFusionTable}; #[tokio::test] pub async fn test_datafusion_table_insert() { @@ -1643,4 +1664,13 @@ mod tests { } } } + + #[test] + fn test_fake_object_store_url() { + assert_eq!( + fake_object_store_url("s3://aaa/bbb/ccc"), + Some(ObjectStoreUrl::parse("s3://aaa-bbb-ccc").unwrap()), + ); + assert_eq!(fake_object_store_url("invalid url"), None); + } } From 4842ed46e597cea21484d7f490e97d0d70081aa6 Mon Sep 17 00:00:00 2001 From: Sergei Patiakin Date: Wed, 23 Apr 2025 10:29:41 +0200 Subject: [PATCH 2/5] Clippy --- datafusion_iceberg/src/table.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/datafusion_iceberg/src/table.rs b/datafusion_iceberg/src/table.rs index 63c785cc..87b73abf 100644 --- a/datafusion_iceberg/src/table.rs +++ b/datafusion_iceberg/src/table.rs @@ -265,7 +265,7 @@ impl TableProvider for DataFusionTable { // that differ in the host name, because DF's DefaultObjectStoreRegistry only takes // hostname into account fn fake_object_store_url(table_location_url: &str) -> Option { - let mut u = url::Url::parse(&table_location_url).ok()?; + let mut u = url::Url::parse(table_location_url).ok()?; u.set_host(Some(&format!( "{}{}", u.host_str().unwrap_or("-"), @@ -277,7 +277,7 @@ fn fake_object_store_url(table_location_url: &str) -> Option { u.set_path(""); u.set_query(None); u.set_fragment(None); - ObjectStoreUrl::parse(u.to_string()).ok() + ObjectStoreUrl::parse(&u).ok() } #[allow(clippy::too_many_arguments)] @@ -298,7 +298,7 @@ async fn table_scan( // Create a unique URI for this particular object store let object_store_url = fake_object_store_url(&table.metadata().location) - .unwrap_or_else(|| ObjectStoreUrl::local_filesystem()); + .unwrap_or_else(ObjectStoreUrl::local_filesystem); session .runtime_env() .register_object_store(object_store_url.as_ref(), table.object_store()); @@ -378,7 +378,8 @@ async fn table_scan( let manifests = table .manifests(snapshot_range.0, snapshot_range.1) - .await.map_err(DataFusionIcebergError::from)?; + .await + .map_err(DataFusionIcebergError::from)?; // If there is a filter expression on the partition column, the manifest files to read are pruned. let data_files: Vec = if let Some(predicate) = partition_predicates { @@ -504,7 +505,7 @@ async fn table_scan( .with_pushdown_filters(true) } else { ParquetSource::default() - } + }, ); // Create plan for every partition with delete files @@ -596,11 +597,14 @@ async fn table_scan( let delete_file_source = Arc::new( if let Some(physical_predicate) = physical_predicate.clone() { ParquetSource::default() - .with_predicate(Arc::clone(&delete_file_schema), physical_predicate) + .with_predicate( + Arc::clone(&delete_file_schema), + physical_predicate, + ) .with_pushdown_filters(true) } else { ParquetSource::default() - } + }, ); let delete_file_scan_config = FileScanConfig::new( From b87e4f63ffcc4eae1e4c9499acd5617e1d6a797b Mon Sep 17 00:00:00 2001 From: Sergei Patiakin Date: Wed, 23 Apr 2025 11:24:23 +0200 Subject: [PATCH 3/5] Avoid collisions --- datafusion_iceberg/src/table.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/datafusion_iceberg/src/table.rs b/datafusion_iceberg/src/table.rs index 87b73abf..f1bd8921 100644 --- a/datafusion_iceberg/src/table.rs +++ b/datafusion_iceberg/src/table.rs @@ -263,15 +263,16 @@ impl TableProvider for DataFusionTable { // Create a fake object store URL. Different table paths should produce fake URLs // that differ in the host name, because DF's DefaultObjectStoreRegistry only takes -// hostname into account +// hostname into account for object store keys fn fake_object_store_url(table_location_url: &str) -> Option { let mut u = url::Url::parse(table_location_url).ok()?; u.set_host(Some(&format!( - "{}{}", - u.host_str().unwrap_or("-"), + "{}-0{}", + u.host_str().unwrap_or("").replace('-', "-1"), u.path() - .replace(object_store::path::DELIMITER, "-") - .replace(':', "-") + .replace('-', "-1") + .replace(object_store::path::DELIMITER, "-2") + .replace(':', "-3") ))) .unwrap(); u.set_path(""); @@ -1673,7 +1674,11 @@ mod tests { fn test_fake_object_store_url() { assert_eq!( fake_object_store_url("s3://aaa/bbb/ccc"), - Some(ObjectStoreUrl::parse("s3://aaa-bbb-ccc").unwrap()), + Some(ObjectStoreUrl::parse("s3://aaa-0-2bbb-2ccc").unwrap()), + ); + assert_eq!( + fake_object_store_url("s3://aaa/bbb-ccc"), + Some(ObjectStoreUrl::parse("s3://aaa-0-2bbb-1ccc").unwrap()), ); assert_eq!(fake_object_store_url("invalid url"), None); } From 53f71eacd1caf8d3f90f80d6cf4cf4957f2ba67f Mon Sep 17 00:00:00 2001 From: Sergei Patiakin Date: Wed, 23 Apr 2025 14:26:48 +0200 Subject: [PATCH 4/5] Use hex encoding --- datafusion_iceberg/src/table.rs | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/datafusion_iceberg/src/table.rs b/datafusion_iceberg/src/table.rs index f1bd8921..e8935105 100644 --- a/datafusion_iceberg/src/table.rs +++ b/datafusion_iceberg/src/table.rs @@ -267,12 +267,15 @@ impl TableProvider for DataFusionTable { fn fake_object_store_url(table_location_url: &str) -> Option { let mut u = url::Url::parse(table_location_url).ok()?; u.set_host(Some(&format!( - "{}-0{}", - u.host_str().unwrap_or("").replace('-', "-1"), + "{}-{}", + u.host_str().unwrap_or(""), + // Append hex-encoded path to ensure we get a valid hostname u.path() - .replace('-', "-1") - .replace(object_store::path::DELIMITER, "-2") - .replace(':', "-3") + .as_bytes() + .iter() + .map(|b| format!("{:02x}", b)) + .collect::>() + .join("") ))) .unwrap(); u.set_path(""); @@ -1673,12 +1676,12 @@ mod tests { #[test] fn test_fake_object_store_url() { assert_eq!( - fake_object_store_url("s3://aaa/bbb/ccc"), - Some(ObjectStoreUrl::parse("s3://aaa-0-2bbb-2ccc").unwrap()), + fake_object_store_url("s3://a"), + Some(ObjectStoreUrl::parse("s3://a-").unwrap()), ); assert_eq!( - fake_object_store_url("s3://aaa/bbb-ccc"), - Some(ObjectStoreUrl::parse("s3://aaa-0-2bbb-1ccc").unwrap()), + fake_object_store_url("s3://a/b"), + Some(ObjectStoreUrl::parse("s3://a-2f62").unwrap()), ); assert_eq!(fake_object_store_url("invalid url"), None); } From b0856634d4d5c3a39b4e218f6de9c1060be894b9 Mon Sep 17 00:00:00 2001 From: Sergei Patiakin Date: Wed, 23 Apr 2025 14:45:03 +0200 Subject: [PATCH 5/5] Improve comment --- datafusion_iceberg/src/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion_iceberg/src/table.rs b/datafusion_iceberg/src/table.rs index e8935105..6d1cfbc5 100644 --- a/datafusion_iceberg/src/table.rs +++ b/datafusion_iceberg/src/table.rs @@ -269,7 +269,7 @@ fn fake_object_store_url(table_location_url: &str) -> Option { u.set_host(Some(&format!( "{}-{}", u.host_str().unwrap_or(""), - // Append hex-encoded path to ensure we get a valid hostname + // Hex-encode the path to ensure it produces a valid hostname u.path() .as_bytes() .iter()