diff --git a/Cargo.lock b/Cargo.lock index 447ae14f..708d0067 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3218,6 +3218,7 @@ dependencies = [ "chrono", "derive-getters", "derive_builder", + "flate2", "futures", "getrandom 0.3.2", "iceberg-rust-spec", diff --git a/datafusion_iceberg/src/table.rs b/datafusion_iceberg/src/table.rs index bc45ff21..23f3b2fd 100644 --- a/datafusion_iceberg/src/table.rs +++ b/datafusion_iceberg/src/table.rs @@ -261,6 +261,29 @@ impl TableProvider for DataFusionTable { } } +// Create a fake object store URL. Different table paths should produce fake URLs +// that differ in the host name, because DF's DefaultObjectStoreRegistry only takes +// hostname into account for object store keys +fn fake_object_store_url(table_location_url: &str) -> Option { + let mut u = url::Url::parse(table_location_url).ok()?; + u.set_host(Some(&format!( + "{}-{}", + u.host_str().unwrap_or(""), + // Hex-encode the path to ensure it produces a valid hostname + u.path() + .as_bytes() + .iter() + .map(|b| format!("{:02x}", b)) + .collect::>() + .join("") + ))) + .unwrap(); + u.set_path(""); + u.set_query(None); + u.set_fragment(None); + ObjectStoreUrl::parse(&u).ok() +} + #[allow(clippy::too_many_arguments)] async fn table_scan( table: &Table, @@ -278,8 +301,8 @@ async fn table_scan( .unwrap_or_else(|| table.current_schema(None).unwrap().clone()); // Create a unique URI for this particular object store - let object_store_url = ObjectStoreUrl::parse(&table.metadata().location) - .unwrap_or_else(|_| ObjectStoreUrl::local_filesystem()); + let object_store_url = fake_object_store_url(&table.metadata().location) + .unwrap_or_else(ObjectStoreUrl::local_filesystem); session .runtime_env() .register_object_store(object_store_url.as_ref(), table.object_store()); @@ -885,7 +908,9 @@ fn value_to_scalarvalue(value: &Value) -> Result { #[cfg(test)] mod tests { - use datafusion::{arrow::array::Int64Array, prelude::SessionContext}; + use datafusion::{ + arrow::array::Int64Array, execution::object_store::ObjectStoreUrl, prelude::SessionContext, + }; use iceberg_rust::{ catalog::tabular::Tabular, object_store::ObjectStoreBuilder, @@ -908,7 +933,7 @@ mod tests { use std::{ops::Deref, sync::Arc}; - use crate::{catalog::catalog::IcebergCatalog, DataFusionTable}; + use crate::{catalog::catalog::IcebergCatalog, table::fake_object_store_url, DataFusionTable}; #[tokio::test] pub async fn test_datafusion_table_insert() { @@ -1656,4 +1681,17 @@ mod tests { } } } + + #[test] + fn test_fake_object_store_url() { + assert_eq!( + fake_object_store_url("s3://a"), + Some(ObjectStoreUrl::parse("s3://a-").unwrap()), + ); + assert_eq!( + fake_object_store_url("s3://a/b"), + Some(ObjectStoreUrl::parse("s3://a-2f62").unwrap()), + ); + assert_eq!(fake_object_store_url("invalid url"), None); + } } diff --git a/iceberg-rust/Cargo.toml b/iceberg-rust/Cargo.toml index 65325b4a..f03a05e4 100644 --- a/iceberg-rust/Cargo.toml +++ b/iceberg-rust/Cargo.toml @@ -16,6 +16,7 @@ async-trait = { workspace = true } bytes = { workspace = true } derive-getters = { workspace = true } derive_builder = { workspace = true } +flate2 = { version = "1.1", features = ["zlib-rs"], default-features = false } futures = { workspace = true } getrandom = { workspace = true } iceberg-rust-spec = { path = "../iceberg-rust-spec", version = "0.7.0" } diff --git a/iceberg-rust/src/catalog/create.rs b/iceberg-rust/src/catalog/create.rs index 73d0915d..8deda7e5 100644 --- a/iceberg-rust/src/catalog/create.rs +++ b/iceberg-rust/src/catalog/create.rs @@ -221,7 +221,7 @@ impl CreateViewBuilder> { if version.default_namespace().is_empty() { version.default_namespace = namespace.to_vec() } - if version.default_catalog().is_none() { + if version.default_catalog().is_none() && !catalog.name().is_empty() { version.default_catalog = Some(catalog.name().to_string()) } } @@ -349,7 +349,7 @@ impl CreateMaterializedViewBuilder { if version.default_namespace().is_empty() { version.default_namespace = namespace.to_vec() } - if version.default_catalog().is_none() { + if version.default_catalog().is_none() && !catalog.name().is_empty() { version.default_catalog = Some(catalog.name().to_string()) } } diff --git a/iceberg-rust/src/error.rs b/iceberg-rust/src/error.rs index df95652c..33a14eaf 100644 --- a/iceberg-rust/src/error.rs +++ b/iceberg-rust/src/error.rs @@ -20,6 +20,9 @@ pub enum Error { /// Conversion error #[error("Failed to convert {0} to {1}.")] Conversion(String, String), + /// Failed to decompress gzip data + #[error("Failed to decompress gzip data: {0}")] + Decompress(String), /// Not found #[error("{0} not found.")] NotFound(String), diff --git a/iceberg-rust/src/materialized_view/transaction/mod.rs b/iceberg-rust/src/materialized_view/transaction/mod.rs index 3cee1d27..a5adf07e 100644 --- a/iceberg-rust/src/materialized_view/transaction/mod.rs +++ b/iceberg-rust/src/materialized_view/transaction/mod.rs @@ -41,14 +41,14 @@ impl<'view> Transaction<'view> { } /// Update the schmema of the view - pub fn update_representation( + pub fn update_representations( mut self, - representation: ViewRepresentation, + representations: Vec, schema: StructType, ) -> Self { self.view_operations - .push(ViewOperation::UpdateRepresentation { - representation, + .push(ViewOperation::UpdateRepresentations { + representations, schema, branch: self.branch.clone(), }); diff --git a/iceberg-rust/src/object_store/mod.rs b/iceberg-rust/src/object_store/mod.rs index 19669d20..b41c0885 100644 --- a/iceberg-rust/src/object_store/mod.rs +++ b/iceberg-rust/src/object_store/mod.rs @@ -31,7 +31,7 @@ impl Display for Bucket<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Bucket::S3(s) => write!(f, "s3://{}", s), - Bucket::GCS(s) => write!(f, "gcs://{}", s), + Bucket::GCS(s) => write!(f, "gs://{}", s), Bucket::Local => write!(f, ""), } } diff --git a/iceberg-rust/src/object_store/store.rs b/iceberg-rust/src/object_store/store.rs index ca1b23d6..25a3f0b2 100644 --- a/iceberg-rust/src/object_store/store.rs +++ b/iceberg-rust/src/object_store/store.rs @@ -8,6 +8,8 @@ use iceberg_rust_spec::{ use object_store::{Attributes, ObjectStore, PutOptions, TagSet}; use crate::error::Error; +use flate2::read::GzDecoder; +use std::io::Read; /// Simplify interaction with iceberg files #[async_trait] @@ -32,7 +34,8 @@ impl IcebergStore for T { .await? .bytes() .await?; - serde_json::from_slice(&bytes).map_err(Error::from) + + parse_metadata(location, &bytes) } async fn put_metadata( @@ -80,9 +83,23 @@ fn version_hint_path(original: &str) -> Option { ) } +fn parse_metadata(location: &str, bytes: &[u8]) -> Result { + if location.ends_with(".gz.metadata.json") { + let mut decoder = GzDecoder::new(bytes); + let mut decompressed_data = Vec::new(); + decoder + .read_to_end(&mut decompressed_data) + .map_err(|e| Error::Decompress(e.to_string()))?; + serde_json::from_slice(&decompressed_data).map_err(Error::from) + } else { + serde_json::from_slice(bytes).map_err(Error::from) + } +} + #[cfg(test)] mod tests { use super::*; + use std::io::Write; #[test] fn test_version_hint_path_normal_case() { @@ -124,4 +141,309 @@ mod tests { let expected = "/path/to/version-hint.text"; assert_eq!(version_hint_path(input), Some(expected.to_string())); } + + #[test] + fn test_parse_metadata_table_plain_json() { + let location = "/path/to/metadata/v1.metadata.json"; + let json_data = r#" + { + "format-version" : 2, + "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94", + "location": "s3://b/wh/data.db/table", + "last-sequence-number" : 1, + "last-updated-ms": 1515100955770, + "last-column-id": 1, + "schemas": [ + { + "schema-id" : 1, + "type" : "struct", + "fields" :[ + { + "id": 1, + "name": "struct_name", + "required": true, + "type": "fixed[1]" + } + ] + } + ], + "current-schema-id" : 1, + "partition-specs": [ + { + "spec-id": 1, + "fields": [ + { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + } + ] + } + ], + "default-spec-id": 1, + "last-partition-id": 1, + "properties": { + "commit.retry.num-retries": "1" + }, + "metadata-log": [ + { + "metadata-file": "s3://bucket/.../v1.json", + "timestamp-ms": 1515100 + } + ], + "sort-orders": [], + "default-sort-order-id": 0 + } + "#; + let bytes = json_data.as_bytes(); + + let result = parse_metadata(location, bytes); + assert!(result.is_ok()); + let metadata = result.unwrap(); + if let TabularMetadata::Table(table_metadata) = metadata { + // Add specific checks for `table_metadata` fields if needed + assert_eq!(table_metadata.table_uuid.to_string(), "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94"); + } else { + panic!("Expected TabularMetadata::Table variant"); + } + } + + #[test] + fn test_parse_metadata_table_gzipped_json() { + let location = "/path/to/metadata/v1.gz.metadata.json"; + let json_data = r#" + { + "format-version" : 2, + "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94", + "location": "s3://b/wh/data.db/table", + "last-sequence-number" : 1, + "last-updated-ms": 1515100955770, + "last-column-id": 1, + "schemas": [ + { + "schema-id" : 1, + "type" : "struct", + "fields" :[ + { + "id": 1, + "name": "struct_name", + "required": true, + "type": "fixed[1]" + } + ] + } + ], + "current-schema-id" : 1, + "partition-specs": [ + { + "spec-id": 1, + "fields": [ + { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + } + ] + } + ], + "default-spec-id": 1, + "last-partition-id": 1, + "properties": { + "commit.retry.num-retries": "1" + }, + "metadata-log": [ + { + "metadata-file": "s3://bucket/.../v1.json", + "timestamp-ms": 1515100 + } + ], + "sort-orders": [], + "default-sort-order-id": 0 + } + "#; + + let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); + encoder.write_all(json_data.as_bytes()).unwrap(); + let compressed_data = encoder.finish().unwrap(); + + let result = parse_metadata(location, &compressed_data); + assert!(result.is_ok()); + let metadata = result.unwrap(); + if let TabularMetadata::Table(table_metadata) = metadata { + // Add specific checks for `table_metadata` fields if needed + assert_eq!(table_metadata.table_uuid.to_string(), "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94"); + } else { + panic!("Expected TabularMetadata::Table variant"); + } + } + + #[test] + fn test_parse_metadata_view_plain_json() { + let location = "/path/to/metadata/v1.metadata.json"; + let json_data = r#" + { + "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385", + "format-version" : 1, + "location" : "s3://bucket/warehouse/default.db/event_agg", + "current-version-id" : 1, + "properties" : { + "comment" : "Daily event counts" + }, + "versions" : [ { + "version-id" : 1, + "timestamp-ms" : 1573518431292, + "schema-id" : 1, + "default-catalog" : "prod", + "default-namespace" : [ "default" ], + "summary" : { + "operation" : "create", + "engine-name" : "Spark", + "engineVersion" : "3.3.2" + }, + "representations" : [ { + "type" : "sql", + "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2", + "dialect" : "spark" + } ] + } ], + "schemas": [ { + "schema-id": 1, + "type" : "struct", + "fields" : [ { + "id" : 1, + "name" : "event_count", + "required" : false, + "type" : "int", + "doc" : "Count of events" + }, { + "id" : 2, + "name" : "event_date", + "required" : false, + "type" : "date" + } ] + } ], + "version-log" : [ { + "timestamp-ms" : 1573518431292, + "version-id" : 1 + } ] + } + "#; + let bytes = json_data.as_bytes(); + + let result = parse_metadata(location, bytes); + assert!(result.is_ok()); + let metadata = result.unwrap(); + if let TabularMetadata::View(view_metadata) = metadata { + // Add specific checks for `view_metadata` fields if needed + assert_eq!(view_metadata.view_uuid.to_string(), "fa6506c3-7681-40c8-86dc-e36561f83385"); + } else { + panic!("Expected TabularMetadata::View variant"); + } + } + + #[test] + fn test_parse_metadata_view_gzipped_json() { + let location = "/path/to/metadata/v1.gz.metadata.json"; + let json_data = r#" + { + "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385", + "format-version" : 1, + "location" : "s3://bucket/warehouse/default.db/event_agg", + "current-version-id" : 1, + "properties" : { + "comment" : "Daily event counts" + }, + "versions" : [ { + "version-id" : 1, + "timestamp-ms" : 1573518431292, + "schema-id" : 1, + "default-catalog" : "prod", + "default-namespace" : [ "default" ], + "summary" : { + "operation" : "create", + "engine-name" : "Spark", + "engineVersion" : "3.3.2" + }, + "representations" : [ { + "type" : "sql", + "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2", + "dialect" : "spark" + } ] + } ], + "schemas": [ { + "schema-id": 1, + "type" : "struct", + "fields" : [ { + "id" : 1, + "name" : "event_count", + "required" : false, + "type" : "int", + "doc" : "Count of events" + }, { + "id" : 2, + "name" : "event_date", + "required" : false, + "type" : "date" + } ] + } ], + "version-log" : [ { + "timestamp-ms" : 1573518431292, + "version-id" : 1 + } ] + } + "#; + + let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); + encoder.write_all(json_data.as_bytes()).unwrap(); + let compressed_data = encoder.finish().unwrap(); + + let result = parse_metadata(location, &compressed_data); + assert!(result.is_ok()); + let metadata = result.unwrap(); + if let TabularMetadata::View(view_metadata) = metadata { + // Add specific checks for `view_metadata` fields if needed + assert_eq!(view_metadata.view_uuid.to_string(), "fa6506c3-7681-40c8-86dc-e36561f83385"); + } else { + panic!("Expected TabularMetadata::View variant"); + } + } + + #[test] + fn test_parse_metadata_invalid_json() { + let location = "/path/to/metadata/v1.metadata.json"; + let invalid_json_data = r#"{"key": "value""#; // Missing closing brace + let bytes = invalid_json_data.as_bytes(); + + let result = parse_metadata(location, bytes); + assert!(result.is_err()); + } + + #[test] + fn test_parse_metadata_invalid_gzipped_data() { + let location = "/path/to/metadata/v1.gz.metadata.json"; + let invalid_gzipped_data = b"not a valid gzip"; + + let result = parse_metadata(location, invalid_gzipped_data); + assert!(result.is_err()); + } + + #[test] + fn test_parse_metadata_empty_bytes() { + let location = "/path/to/metadata/v1.metadata.json"; + let empty_bytes: &[u8] = &[]; + + let result = parse_metadata(location, empty_bytes); + assert!(result.is_err()); + } + + #[test] + fn test_parse_metadata_gzipped_empty_bytes() { + let location = "/path/to/metadata/v1.gz.metadata.json"; + let empty_gzipped_bytes: &[u8] = &[]; + + let result = parse_metadata(location, empty_gzipped_bytes); + assert!(result.is_err()); + } } diff --git a/iceberg-rust/src/view/transaction/mod.rs b/iceberg-rust/src/view/transaction/mod.rs index 31b8781e..b90a7913 100644 --- a/iceberg-rust/src/view/transaction/mod.rs +++ b/iceberg-rust/src/view/transaction/mod.rs @@ -28,13 +28,13 @@ impl<'view> Transaction<'view> { } } /// Update the schmema of the view - pub fn update_representation( + pub fn update_representations( mut self, - representation: ViewRepresentation, + representations: Vec, schema: StructType, ) -> Self { - self.operations.push(ViewOperation::UpdateRepresentation { - representation, + self.operations.push(ViewOperation::UpdateRepresentations { + representations, schema, branch: self.branch.clone(), }); diff --git a/iceberg-rust/src/view/transaction/operation.rs b/iceberg-rust/src/view/transaction/operation.rs index 0c6fdf57..a4608cf3 100644 --- a/iceberg-rust/src/view/transaction/operation.rs +++ b/iceberg-rust/src/view/transaction/operation.rs @@ -23,9 +23,9 @@ use crate::{ /// View operation pub enum Operation { /// Update vresion - UpdateRepresentation { + UpdateRepresentations { /// Representation to add - representation: ViewRepresentation, + representations: Vec, /// Schema of the representation schema: StructType, /// Branch where to add the representation @@ -35,6 +35,46 @@ pub enum Operation { UpdateProperties(Vec<(String, String)>), } +// Tries to preserve dialect order +fn upsert_representation( + current_representations: &[ViewRepresentation], + new_representation: ViewRepresentation, +) -> Vec { + let ViewRepresentation::Sql { + dialect: new_dialect, + .. + } = &new_representation; + let mut updated = false; + let mut representations: Vec = current_representations + .iter() + .map( + |current_representation @ ViewRepresentation::Sql { dialect, .. }| { + if dialect == new_dialect { + updated = true; + new_representation.clone() + } else { + current_representation.clone() + } + }, + ) + .collect(); + if !updated { + representations.push(new_representation); + } + representations +} + +fn upsert_representations( + current_representations: &[ViewRepresentation], + new_representations: &[ViewRepresentation], +) -> Vec { + let mut representations: Vec = current_representations.into(); + for r in new_representations { + representations = upsert_representation(&representations, r.clone()); + } + representations +} + impl Operation { /// Execute operation pub async fn execute( @@ -42,12 +82,13 @@ impl Operation { metadata: &GeneralViewMetadata, ) -> Result<(Option, Vec>), Error> { match self { - Operation::UpdateRepresentation { - representation, + Operation::UpdateRepresentations { + representations, schema, branch, } => { - let schema_changed = metadata.current_schema(branch.as_deref()) + let schema_changed = metadata + .current_schema(branch.as_deref()) .map(|s| schema != *s.fields()) .unwrap_or(true); @@ -56,7 +97,10 @@ impl Operation { let schema_id = if schema_changed { metadata.schemas.keys().max().unwrap_or(&0) + 1 } else { - *metadata.current_schema(branch.as_deref()).unwrap().schema_id() + *metadata + .current_schema(branch.as_deref()) + .unwrap() + .schema_id() }; let last_column_id = schema.iter().map(|x| x.id).max().unwrap_or(0); @@ -68,7 +112,10 @@ impl Operation { engine_name: None, engine_version: None, }, - representations: vec![representation], + representations: upsert_representations( + version.representations(), + &representations, + ), default_catalog: version.default_catalog.clone(), default_namespace: version.default_namespace.clone(), timestamp_ms: SystemTime::now() @@ -120,3 +167,48 @@ impl Operation { } } } + +#[cfg(test)] +mod tests { + use iceberg_rust_spec::view_metadata::ViewRepresentation; + + use crate::view::transaction::operation::upsert_representations; + + #[test] + fn test_upsert_representations() { + assert_eq!( + upsert_representations( + &[ + ViewRepresentation::sql("a1", Some("a")), + ViewRepresentation::sql("b1", Some("b")) + ], + &[ + ViewRepresentation::sql("b2", Some("b")), + ViewRepresentation::sql("c2", Some("c")) + ] + ), + vec![ + ViewRepresentation::sql("a1", Some("a")), + ViewRepresentation::sql("b2", Some("b")), + ViewRepresentation::sql("c2", Some("c")), + ] + ); + assert_eq!( + upsert_representations( + &[ + ViewRepresentation::sql("a1", Some("a")), + ViewRepresentation::sql("b1", Some("b")) + ], + &[ + ViewRepresentation::sql("c2", Some("c")), + ViewRepresentation::sql("a2", Some("a")) + ] + ), + vec![ + ViewRepresentation::sql("a2", Some("a")), + ViewRepresentation::sql("b1", Some("b")), + ViewRepresentation::sql("c2", Some("c")), + ] + ); + } +}