Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 42 additions & 4 deletions datafusion_iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,29 @@ impl TableProvider for DataFusionTable {
}
}

// Create a fake object store URL. Different table paths should produce fake URLs
// that differ in the host name, because DF's DefaultObjectStoreRegistry only takes
// hostname into account for object store keys
fn fake_object_store_url(table_location_url: &str) -> Option<ObjectStoreUrl> {
let mut u = url::Url::parse(table_location_url).ok()?;
u.set_host(Some(&format!(
"{}-{}",
u.host_str().unwrap_or(""),
// Hex-encode the path to ensure it produces a valid hostname
u.path()
.as_bytes()
.iter()
.map(|b| format!("{:02x}", b))
.collect::<Vec<_>>()
.join("")
)))
.unwrap();
u.set_path("");
u.set_query(None);
u.set_fragment(None);
ObjectStoreUrl::parse(&u).ok()
}

#[allow(clippy::too_many_arguments)]
async fn table_scan(
table: &Table,
Expand All @@ -278,8 +301,8 @@ async fn table_scan(
.unwrap_or_else(|| table.current_schema(None).unwrap().clone());

// Create a unique URI for this particular object store
let object_store_url = ObjectStoreUrl::parse(&table.metadata().location)
.unwrap_or_else(|_| ObjectStoreUrl::local_filesystem());
let object_store_url = fake_object_store_url(&table.metadata().location)
.unwrap_or_else(ObjectStoreUrl::local_filesystem);
session
.runtime_env()
.register_object_store(object_store_url.as_ref(), table.object_store());
Expand Down Expand Up @@ -885,7 +908,9 @@ fn value_to_scalarvalue(value: &Value) -> Result<ScalarValue, DataFusionError> {
#[cfg(test)]
mod tests {

use datafusion::{arrow::array::Int64Array, prelude::SessionContext};
use datafusion::{
arrow::array::Int64Array, execution::object_store::ObjectStoreUrl, prelude::SessionContext,
};
use iceberg_rust::{
catalog::tabular::Tabular,
object_store::ObjectStoreBuilder,
Expand All @@ -908,7 +933,7 @@ mod tests {

use std::{ops::Deref, sync::Arc};

use crate::{catalog::catalog::IcebergCatalog, DataFusionTable};
use crate::{catalog::catalog::IcebergCatalog, table::fake_object_store_url, DataFusionTable};

#[tokio::test]
pub async fn test_datafusion_table_insert() {
Expand Down Expand Up @@ -1656,4 +1681,17 @@ mod tests {
}
}
}

#[test]
fn test_fake_object_store_url() {
assert_eq!(
fake_object_store_url("s3://a"),
Some(ObjectStoreUrl::parse("s3://a-").unwrap()),
);
assert_eq!(
fake_object_store_url("s3://a/b"),
Some(ObjectStoreUrl::parse("s3://a-2f62").unwrap()),
);
assert_eq!(fake_object_store_url("invalid url"), None);
}
}
1 change: 1 addition & 0 deletions iceberg-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async-trait = { workspace = true }
bytes = { workspace = true }
derive-getters = { workspace = true }
derive_builder = { workspace = true }
flate2 = { version = "1.1", features = ["zlib-rs"], default-features = false }
futures = { workspace = true }
getrandom = { workspace = true }
iceberg-rust-spec = { path = "../iceberg-rust-spec", version = "0.7.0" }
Expand Down
4 changes: 2 additions & 2 deletions iceberg-rust/src/catalog/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl CreateViewBuilder<Option<()>> {
if version.default_namespace().is_empty() {
version.default_namespace = namespace.to_vec()
}
if version.default_catalog().is_none() {
if version.default_catalog().is_none() && !catalog.name().is_empty() {
version.default_catalog = Some(catalog.name().to_string())
}
}
Expand Down Expand Up @@ -349,7 +349,7 @@ impl CreateMaterializedViewBuilder {
if version.default_namespace().is_empty() {
version.default_namespace = namespace.to_vec()
}
if version.default_catalog().is_none() {
if version.default_catalog().is_none() && !catalog.name().is_empty() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code (whole if let some(version)) is repeating with the one on line 220 and could be moved to a separate function.

version.default_catalog = Some(catalog.name().to_string())
}
}
Expand Down
3 changes: 3 additions & 0 deletions iceberg-rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub enum Error {
/// Conversion error
#[error("Failed to convert {0} to {1}.")]
Conversion(String, String),
/// Failed to decompress gzip data
#[error("Failed to decompress gzip data: {0}")]
Decompress(String),
/// Not found
#[error("{0} not found.")]
NotFound(String),
Expand Down
8 changes: 4 additions & 4 deletions iceberg-rust/src/materialized_view/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ impl<'view> Transaction<'view> {
}

/// Update the schmema of the view
pub fn update_representation(
pub fn update_representations(
mut self,
representation: ViewRepresentation,
representations: Vec<ViewRepresentation>,
schema: StructType,
) -> Self {
self.view_operations
.push(ViewOperation::UpdateRepresentation {
representation,
.push(ViewOperation::UpdateRepresentations {
representations,
schema,
branch: self.branch.clone(),
});
Expand Down
2 changes: 1 addition & 1 deletion iceberg-rust/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Display for Bucket<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Bucket::S3(s) => write!(f, "s3://{}", s),
Bucket::GCS(s) => write!(f, "gcs://{}", s),
Bucket::GCS(s) => write!(f, "gs://{}", s),
Bucket::Local => write!(f, ""),
}
}
Expand Down
Loading