From 86abf7c12ad607ea69cae21cc709a9153eff8a93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=ADs=20Lizardo?= Date: Tue, 13 May 2025 21:29:11 +0200 Subject: [PATCH 1/3] feat: decompress gzip compressed metadata files --- Cargo.lock | 1 + iceberg-rust/Cargo.toml | 1 + iceberg-rust/src/error.rs | 3 +++ iceberg-rust/src/object_store/store.rs | 13 ++++++++++++- 4 files changed, 17 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 447ae14f..708d0067 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3218,6 +3218,7 @@ dependencies = [ "chrono", "derive-getters", "derive_builder", + "flate2", "futures", "getrandom 0.3.2", "iceberg-rust-spec", diff --git a/iceberg-rust/Cargo.toml b/iceberg-rust/Cargo.toml index 65325b4a..f03a05e4 100644 --- a/iceberg-rust/Cargo.toml +++ b/iceberg-rust/Cargo.toml @@ -16,6 +16,7 @@ async-trait = { workspace = true } bytes = { workspace = true } derive-getters = { workspace = true } derive_builder = { workspace = true } +flate2 = { version = "1.1", features = ["zlib-rs"], default-features = false } futures = { workspace = true } getrandom = { workspace = true } iceberg-rust-spec = { path = "../iceberg-rust-spec", version = "0.7.0" } diff --git a/iceberg-rust/src/error.rs b/iceberg-rust/src/error.rs index df95652c..33a14eaf 100644 --- a/iceberg-rust/src/error.rs +++ b/iceberg-rust/src/error.rs @@ -20,6 +20,9 @@ pub enum Error { /// Conversion error #[error("Failed to convert {0} to {1}.")] Conversion(String, String), + /// Failed to decompress gzip data + #[error("Failed to decompress gzip data: {0}")] + Decompress(String), /// Not found #[error("{0} not found.")] NotFound(String), diff --git a/iceberg-rust/src/object_store/store.rs b/iceberg-rust/src/object_store/store.rs index ca1b23d6..2500ea7e 100644 --- a/iceberg-rust/src/object_store/store.rs +++ b/iceberg-rust/src/object_store/store.rs @@ -8,6 +8,8 @@ use iceberg_rust_spec::{ use object_store::{Attributes, ObjectStore, PutOptions, TagSet}; use crate::error::Error; +use flate2::read::GzDecoder; +use std::io::Read; /// Simplify interaction with iceberg files #[async_trait] @@ -32,7 +34,16 @@ impl IcebergStore for T { .await? .bytes() .await?; - serde_json::from_slice(&bytes).map_err(Error::from) + + if location.ends_with(".gz.metadata.json") { + let mut decoder = GzDecoder::new(&bytes[..]); + let mut decompressed_data = Vec::new(); + decoder.read_to_end(&mut decompressed_data) + .map_err(|e| Error::Decompress(e.to_string()))?; + serde_json::from_slice(&decompressed_data).map_err(Error::from) + } else { + serde_json::from_slice(&bytes).map_err(Error::from) + } } async fn put_metadata( From 514899066a10330d93a44ce7a0b909c2dfd2bcb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=ADs=20Lizardo?= Date: Wed, 14 May 2025 09:09:38 +0200 Subject: [PATCH 2/3] Add unittest --- iceberg-rust/src/object_store/store.rs | 329 ++++++++++++++++++++++++- 1 file changed, 320 insertions(+), 9 deletions(-) diff --git a/iceberg-rust/src/object_store/store.rs b/iceberg-rust/src/object_store/store.rs index 2500ea7e..27f241be 100644 --- a/iceberg-rust/src/object_store/store.rs +++ b/iceberg-rust/src/object_store/store.rs @@ -35,15 +35,7 @@ impl IcebergStore for T { .bytes() .await?; - if location.ends_with(".gz.metadata.json") { - let mut decoder = GzDecoder::new(&bytes[..]); - let mut decompressed_data = Vec::new(); - decoder.read_to_end(&mut decompressed_data) - .map_err(|e| Error::Decompress(e.to_string()))?; - serde_json::from_slice(&decompressed_data).map_err(Error::from) - } else { - serde_json::from_slice(&bytes).map_err(Error::from) - } + parse_metadata(location, &bytes) } async fn put_metadata( @@ -91,9 +83,23 @@ fn version_hint_path(original: &str) -> Option { ) } +fn parse_metadata(location: &str, bytes: &[u8]) -> Result { + if location.ends_with(".gz.metadata.json") { + let mut decoder = GzDecoder::new(bytes); + let mut decompressed_data = Vec::new(); + decoder + .read_to_end(&mut decompressed_data) + .map_err(|e| Error::Decompress(e.to_string()))?; + serde_json::from_slice(&decompressed_data).map_err(Error::from) + } else { + serde_json::from_slice(bytes).map_err(Error::from) + } +} + #[cfg(test)] mod tests { use super::*; + use std::io::Write; #[test] fn test_version_hint_path_normal_case() { @@ -135,4 +141,309 @@ mod tests { let expected = "/path/to/version-hint.text"; assert_eq!(version_hint_path(input), Some(expected.to_string())); } + + #[test] + fn test_parse_metadata_table_plain_json() { + let location = "/path/to/metadata/v1.metadata.json"; + let json_data = r#" + { + "format-version" : 2, + "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94", + "location": "s3://b/wh/data.db/table", + "last-sequence-number" : 1, + "last-updated-ms": 1515100955770, + "last-column-id": 1, + "schemas": [ + { + "schema-id" : 1, + "type" : "struct", + "fields" :[ + { + "id": 1, + "name": "struct_name", + "required": true, + "type": "fixed[1]" + } + ] + } + ], + "current-schema-id" : 1, + "partition-specs": [ + { + "spec-id": 1, + "fields": [ + { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + } + ] + } + ], + "default-spec-id": 1, + "last-partition-id": 1, + "properties": { + "commit.retry.num-retries": "1" + }, + "metadata-log": [ + { + "metadata-file": "s3://bucket/.../v1.json", + "timestamp-ms": 1515100 + } + ], + "sort-orders": [], + "default-sort-order-id": 0 + } + "#; + let bytes = json_data.as_bytes(); + + let result = parse_metadata(location, bytes); + assert!(result.is_ok()); + let metadata = result.unwrap(); + if let TabularMetadata::Table(table_metadata) = metadata { + // Add specific checks for `table_metadata` fields if needed + assert_eq!(table_metadata.table_uuid.to_string(), "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94"); + } else { + assert!(false, "Expected TabularMetadata::Table variant"); + } + } + + #[test] + fn test_parse_metadata_table_gzipped_json() { + let location = "/path/to/metadata/v1.gz.metadata.json"; + let json_data = r#" + { + "format-version" : 2, + "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94", + "location": "s3://b/wh/data.db/table", + "last-sequence-number" : 1, + "last-updated-ms": 1515100955770, + "last-column-id": 1, + "schemas": [ + { + "schema-id" : 1, + "type" : "struct", + "fields" :[ + { + "id": 1, + "name": "struct_name", + "required": true, + "type": "fixed[1]" + } + ] + } + ], + "current-schema-id" : 1, + "partition-specs": [ + { + "spec-id": 1, + "fields": [ + { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + } + ] + } + ], + "default-spec-id": 1, + "last-partition-id": 1, + "properties": { + "commit.retry.num-retries": "1" + }, + "metadata-log": [ + { + "metadata-file": "s3://bucket/.../v1.json", + "timestamp-ms": 1515100 + } + ], + "sort-orders": [], + "default-sort-order-id": 0 + } + "#; + + let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); + encoder.write_all(json_data.as_bytes()).unwrap(); + let compressed_data = encoder.finish().unwrap(); + + let result = parse_metadata(location, &compressed_data); + assert!(result.is_ok()); + let metadata = result.unwrap(); + if let TabularMetadata::Table(table_metadata) = metadata { + // Add specific checks for `table_metadata` fields if needed + assert_eq!(table_metadata.table_uuid.to_string(), "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94"); + } else { + assert!(false, "Expected TabularMetadata::Table variant"); + } + } + + #[test] + fn test_parse_metadata_view_plain_json() { + let location = "/path/to/metadata/v1.metadata.json"; + let json_data = r#" + { + "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385", + "format-version" : 1, + "location" : "s3://bucket/warehouse/default.db/event_agg", + "current-version-id" : 1, + "properties" : { + "comment" : "Daily event counts" + }, + "versions" : [ { + "version-id" : 1, + "timestamp-ms" : 1573518431292, + "schema-id" : 1, + "default-catalog" : "prod", + "default-namespace" : [ "default" ], + "summary" : { + "operation" : "create", + "engine-name" : "Spark", + "engineVersion" : "3.3.2" + }, + "representations" : [ { + "type" : "sql", + "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2", + "dialect" : "spark" + } ] + } ], + "schemas": [ { + "schema-id": 1, + "type" : "struct", + "fields" : [ { + "id" : 1, + "name" : "event_count", + "required" : false, + "type" : "int", + "doc" : "Count of events" + }, { + "id" : 2, + "name" : "event_date", + "required" : false, + "type" : "date" + } ] + } ], + "version-log" : [ { + "timestamp-ms" : 1573518431292, + "version-id" : 1 + } ] + } + "#; + let bytes = json_data.as_bytes(); + + let result = parse_metadata(location, bytes); + assert!(result.is_ok()); + let metadata = result.unwrap(); + if let TabularMetadata::View(view_metadata) = metadata { + // Add specific checks for `view_metadata` fields if needed + assert_eq!(view_metadata.view_uuid.to_string(), "fa6506c3-7681-40c8-86dc-e36561f83385"); + } else { + assert!(false, "Expected TabularMetadata::View variant"); + } + } + + #[test] + fn test_parse_metadata_view_gzipped_json() { + let location = "/path/to/metadata/v1.gz.metadata.json"; + let json_data = r#" + { + "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385", + "format-version" : 1, + "location" : "s3://bucket/warehouse/default.db/event_agg", + "current-version-id" : 1, + "properties" : { + "comment" : "Daily event counts" + }, + "versions" : [ { + "version-id" : 1, + "timestamp-ms" : 1573518431292, + "schema-id" : 1, + "default-catalog" : "prod", + "default-namespace" : [ "default" ], + "summary" : { + "operation" : "create", + "engine-name" : "Spark", + "engineVersion" : "3.3.2" + }, + "representations" : [ { + "type" : "sql", + "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2", + "dialect" : "spark" + } ] + } ], + "schemas": [ { + "schema-id": 1, + "type" : "struct", + "fields" : [ { + "id" : 1, + "name" : "event_count", + "required" : false, + "type" : "int", + "doc" : "Count of events" + }, { + "id" : 2, + "name" : "event_date", + "required" : false, + "type" : "date" + } ] + } ], + "version-log" : [ { + "timestamp-ms" : 1573518431292, + "version-id" : 1 + } ] + } + "#; + + let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); + encoder.write_all(json_data.as_bytes()).unwrap(); + let compressed_data = encoder.finish().unwrap(); + + let result = parse_metadata(location, &compressed_data); + assert!(result.is_ok()); + let metadata = result.unwrap(); + if let TabularMetadata::View(view_metadata) = metadata { + // Add specific checks for `view_metadata` fields if needed + assert_eq!(view_metadata.view_uuid.to_string(), "fa6506c3-7681-40c8-86dc-e36561f83385"); + } else { + assert!(false, "Expected TabularMetadata::View variant"); + } + } + + #[test] + fn test_parse_metadata_invalid_json() { + let location = "/path/to/metadata/v1.metadata.json"; + let invalid_json_data = r#"{"key": "value""#; // Missing closing brace + let bytes = invalid_json_data.as_bytes(); + + let result = parse_metadata(location, bytes); + assert!(result.is_err()); + } + + #[test] + fn test_parse_metadata_invalid_gzipped_data() { + let location = "/path/to/metadata/v1.gz.metadata.json"; + let invalid_gzipped_data = b"not a valid gzip"; + + let result = parse_metadata(location, invalid_gzipped_data); + assert!(result.is_err()); + } + + #[test] + fn test_parse_metadata_empty_bytes() { + let location = "/path/to/metadata/v1.metadata.json"; + let empty_bytes: &[u8] = &[]; + + let result = parse_metadata(location, empty_bytes); + assert!(result.is_err()); + } + + #[test] + fn test_parse_metadata_gzipped_empty_bytes() { + let location = "/path/to/metadata/v1.gz.metadata.json"; + let empty_gzipped_bytes: &[u8] = &[]; + + let result = parse_metadata(location, empty_gzipped_bytes); + assert!(result.is_err()); + } } From 0c2d54043c10405db7dd5b7e15d49c7534b714ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=ADs=20Lizardo?= Date: Wed, 14 May 2025 09:21:02 +0200 Subject: [PATCH 3/3] Fix clippy warnings --- iceberg-rust/src/object_store/store.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/iceberg-rust/src/object_store/store.rs b/iceberg-rust/src/object_store/store.rs index 27f241be..25a3f0b2 100644 --- a/iceberg-rust/src/object_store/store.rs +++ b/iceberg-rust/src/object_store/store.rs @@ -205,7 +205,7 @@ mod tests { // Add specific checks for `table_metadata` fields if needed assert_eq!(table_metadata.table_uuid.to_string(), "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94"); } else { - assert!(false, "Expected TabularMetadata::Table variant"); + panic!("Expected TabularMetadata::Table variant"); } } @@ -275,7 +275,7 @@ mod tests { // Add specific checks for `table_metadata` fields if needed assert_eq!(table_metadata.table_uuid.to_string(), "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94"); } else { - assert!(false, "Expected TabularMetadata::Table variant"); + panic!("Expected TabularMetadata::Table variant"); } } @@ -339,7 +339,7 @@ mod tests { // Add specific checks for `view_metadata` fields if needed assert_eq!(view_metadata.view_uuid.to_string(), "fa6506c3-7681-40c8-86dc-e36561f83385"); } else { - assert!(false, "Expected TabularMetadata::View variant"); + panic!("Expected TabularMetadata::View variant"); } } @@ -406,7 +406,7 @@ mod tests { // Add specific checks for `view_metadata` fields if needed assert_eq!(view_metadata.view_uuid.to_string(), "fa6506c3-7681-40c8-86dc-e36561f83385"); } else { - assert!(false, "Expected TabularMetadata::View variant"); + panic!("Expected TabularMetadata::View variant"); } }