Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 50 additions & 8 deletions datafusion_iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,29 @@ impl TableProvider for DataFusionTable {
}
}

// Create a fake object store URL. Different table paths should produce fake URLs
// that differ in the host name, because DF's DefaultObjectStoreRegistry only takes
// hostname into account for object store keys
fn fake_object_store_url(table_location_url: &str) -> Option<ObjectStoreUrl> {
let mut u = url::Url::parse(table_location_url).ok()?;
u.set_host(Some(&format!(
"{}-{}",
u.host_str().unwrap_or(""),
// Hex-encode the path to ensure it produces a valid hostname
u.path()
.as_bytes()
.iter()
.map(|b| format!("{:02x}", b))
.collect::<Vec<_>>()
.join("")
)))
.unwrap();
u.set_path("");
u.set_query(None);
u.set_fragment(None);
ObjectStoreUrl::parse(&u).ok()
}

#[allow(clippy::too_many_arguments)]
async fn table_scan(
table: &Table,
Expand All @@ -278,8 +301,8 @@ async fn table_scan(
.unwrap_or_else(|| table.current_schema(None).unwrap().clone());

// Create a unique URI for this particular object store
let object_store_url = ObjectStoreUrl::parse(&table.metadata().location)
.unwrap_or_else(|_| ObjectStoreUrl::local_filesystem());
let object_store_url = fake_object_store_url(&table.metadata().location)
.unwrap_or_else(ObjectStoreUrl::local_filesystem);
session
.runtime_env()
.register_object_store(object_store_url.as_ref(), table.object_store());
Expand Down Expand Up @@ -359,7 +382,8 @@ async fn table_scan(

let manifests = table
.manifests(snapshot_range.0, snapshot_range.1)
.await.map_err(DataFusionIcebergError::from)?;
.await
.map_err(DataFusionIcebergError::from)?;

// If there is a filter expression on the partition column, the manifest files to read are pruned.
let data_files: Vec<ManifestEntry> = if let Some(predicate) = partition_predicates {
Expand Down Expand Up @@ -485,7 +509,7 @@ async fn table_scan(
.with_pushdown_filters(true)
} else {
ParquetSource::default()
}
},
);

// Create plan for every partition with delete files
Expand Down Expand Up @@ -577,11 +601,14 @@ async fn table_scan(
let delete_file_source = Arc::new(
if let Some(physical_predicate) = physical_predicate.clone() {
ParquetSource::default()
.with_predicate(Arc::clone(&delete_file_schema), physical_predicate)
.with_predicate(
Arc::clone(&delete_file_schema),
physical_predicate,
)
.with_pushdown_filters(true)
} else {
ParquetSource::default()
}
},
);

let delete_file_scan_config = FileScanConfig::new(
Expand Down Expand Up @@ -872,7 +899,9 @@ fn value_to_scalarvalue(value: &Value) -> Result<ScalarValue, DataFusionError> {
#[cfg(test)]
mod tests {

use datafusion::{arrow::array::Int64Array, prelude::SessionContext};
use datafusion::{
arrow::array::Int64Array, execution::object_store::ObjectStoreUrl, prelude::SessionContext,
};
use iceberg_rust::{
catalog::tabular::Tabular,
object_store::ObjectStoreBuilder,
Expand All @@ -895,7 +924,7 @@ mod tests {

use std::{ops::Deref, sync::Arc};

use crate::{catalog::catalog::IcebergCatalog, DataFusionTable};
use crate::{catalog::catalog::IcebergCatalog, table::fake_object_store_url, DataFusionTable};

#[tokio::test]
pub async fn test_datafusion_table_insert() {
Expand Down Expand Up @@ -1643,4 +1672,17 @@ mod tests {
}
}
}

#[test]
fn test_fake_object_store_url() {
assert_eq!(
fake_object_store_url("s3://a"),
Some(ObjectStoreUrl::parse("s3://a-").unwrap()),
);
assert_eq!(
fake_object_store_url("s3://a/b"),
Some(ObjectStoreUrl::parse("s3://a-2f62").unwrap()),
);
assert_eq!(fake_object_store_url("invalid url"), None);
}
}