Skip to content

Commit 72e4b3e

Browse files
authored
Merge pull request #178 from splitgraph/fix/store-mixing
feat: upstream object store deduplication
2 parents 2579dd8 + b085663 commit 72e4b3e

1 file changed

Lines changed: 42 additions & 4 deletions

File tree

datafusion_iceberg/src/table.rs

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,29 @@ impl TableProvider for DataFusionTable {
261261
}
262262
}
263263

264+
// Create a fake object store URL. Different table paths should produce fake URLs
265+
// that differ in the host name, because DF's DefaultObjectStoreRegistry only takes
266+
// hostname into account for object store keys
267+
fn fake_object_store_url(table_location_url: &str) -> Option<ObjectStoreUrl> {
268+
let mut u = url::Url::parse(table_location_url).ok()?;
269+
u.set_host(Some(&format!(
270+
"{}-{}",
271+
u.host_str().unwrap_or(""),
272+
// Hex-encode the path to ensure it produces a valid hostname
273+
u.path()
274+
.as_bytes()
275+
.iter()
276+
.map(|b| format!("{:02x}", b))
277+
.collect::<Vec<_>>()
278+
.join("")
279+
)))
280+
.unwrap();
281+
u.set_path("");
282+
u.set_query(None);
283+
u.set_fragment(None);
284+
ObjectStoreUrl::parse(&u).ok()
285+
}
286+
264287
#[allow(clippy::too_many_arguments)]
265288
async fn table_scan(
266289
table: &Table,
@@ -278,8 +301,8 @@ async fn table_scan(
278301
.unwrap_or_else(|| table.current_schema(None).unwrap().clone());
279302

280303
// Create a unique URI for this particular object store
281-
let object_store_url = ObjectStoreUrl::parse(&table.metadata().location)
282-
.unwrap_or_else(|_| ObjectStoreUrl::local_filesystem());
304+
let object_store_url = fake_object_store_url(&table.metadata().location)
305+
.unwrap_or_else(ObjectStoreUrl::local_filesystem);
283306
session
284307
.runtime_env()
285308
.register_object_store(object_store_url.as_ref(), table.object_store());
@@ -885,7 +908,9 @@ fn value_to_scalarvalue(value: &Value) -> Result<ScalarValue, DataFusionError> {
885908
#[cfg(test)]
886909
mod tests {
887910

888-
use datafusion::{arrow::array::Int64Array, prelude::SessionContext};
911+
use datafusion::{
912+
arrow::array::Int64Array, execution::object_store::ObjectStoreUrl, prelude::SessionContext,
913+
};
889914
use iceberg_rust::{
890915
catalog::tabular::Tabular,
891916
object_store::ObjectStoreBuilder,
@@ -908,7 +933,7 @@ mod tests {
908933

909934
use std::{ops::Deref, sync::Arc};
910935

911-
use crate::{catalog::catalog::IcebergCatalog, DataFusionTable};
936+
use crate::{catalog::catalog::IcebergCatalog, table::fake_object_store_url, DataFusionTable};
912937

913938
#[tokio::test]
914939
pub async fn test_datafusion_table_insert() {
@@ -1656,4 +1681,17 @@ mod tests {
16561681
}
16571682
}
16581683
}
1684+
1685+
#[test]
1686+
fn test_fake_object_store_url() {
1687+
assert_eq!(
1688+
fake_object_store_url("s3://a"),
1689+
Some(ObjectStoreUrl::parse("s3://a-").unwrap()),
1690+
);
1691+
assert_eq!(
1692+
fake_object_store_url("s3://a/b"),
1693+
Some(ObjectStoreUrl::parse("s3://a-2f62").unwrap()),
1694+
);
1695+
assert_eq!(fake_object_store_url("invalid url"), None);
1696+
}
16591697
}

0 commit comments

Comments
 (0)