Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions iceberg-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async-trait = { workspace = true }
bytes = { workspace = true }
derive-getters = { workspace = true }
derive_builder = { workspace = true }
flate2 = { version = "1.1", features = ["zlib-rs"], default-features = false }
futures = { workspace = true }
getrandom = { workspace = true }
iceberg-rust-spec = { path = "../iceberg-rust-spec", version = "0.7.0" }
Expand Down
3 changes: 3 additions & 0 deletions iceberg-rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub enum Error {
/// Conversion error
#[error("Failed to convert {0} to {1}.")]
Conversion(String, String),
/// Failed to decompress gzip data
#[error("Failed to decompress gzip data: {0}")]
Decompress(String),
/// Not found
#[error("{0} not found.")]
NotFound(String),
Expand Down
324 changes: 323 additions & 1 deletion iceberg-rust/src/object_store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use iceberg_rust_spec::{
use object_store::{Attributes, ObjectStore, PutOptions, TagSet};

use crate::error::Error;
use flate2::read::GzDecoder;
use std::io::Read;

/// Simplify interaction with iceberg files
#[async_trait]
Expand All @@ -32,7 +34,8 @@ impl<T: ObjectStore> IcebergStore for T {
.await?
.bytes()
.await?;
serde_json::from_slice(&bytes).map_err(Error::from)

parse_metadata(location, &bytes)
}

async fn put_metadata(
Expand Down Expand Up @@ -80,9 +83,23 @@ fn version_hint_path(original: &str) -> Option<String> {
)
}

fn parse_metadata(location: &str, bytes: &[u8]) -> Result<TabularMetadata, Error> {
if location.ends_with(".gz.metadata.json") {
let mut decoder = GzDecoder::new(bytes);
let mut decompressed_data = Vec::new();
decoder
.read_to_end(&mut decompressed_data)
.map_err(|e| Error::Decompress(e.to_string()))?;
serde_json::from_slice(&decompressed_data).map_err(Error::from)
} else {
serde_json::from_slice(bytes).map_err(Error::from)
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;

#[test]
fn test_version_hint_path_normal_case() {
Expand Down Expand Up @@ -124,4 +141,309 @@ mod tests {
let expected = "/path/to/version-hint.text";
assert_eq!(version_hint_path(input), Some(expected.to_string()));
}

#[test]
fn test_parse_metadata_table_plain_json() {
let location = "/path/to/metadata/v1.metadata.json";
let json_data = r#"
{
"format-version" : 2,
"table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
"location": "s3://b/wh/data.db/table",
"last-sequence-number" : 1,
"last-updated-ms": 1515100955770,
"last-column-id": 1,
"schemas": [
{
"schema-id" : 1,
"type" : "struct",
"fields" :[
{
"id": 1,
"name": "struct_name",
"required": true,
"type": "fixed[1]"
}
]
}
],
"current-schema-id" : 1,
"partition-specs": [
{
"spec-id": 1,
"fields": [
{
"source-id": 4,
"field-id": 1000,
"name": "ts_day",
"transform": "day"
}
]
}
],
"default-spec-id": 1,
"last-partition-id": 1,
"properties": {
"commit.retry.num-retries": "1"
},
"metadata-log": [
{
"metadata-file": "s3://bucket/.../v1.json",
"timestamp-ms": 1515100
}
],
"sort-orders": [],
"default-sort-order-id": 0
}
"#;
let bytes = json_data.as_bytes();

let result = parse_metadata(location, bytes);
assert!(result.is_ok());
let metadata = result.unwrap();
if let TabularMetadata::Table(table_metadata) = metadata {
// Add specific checks for `table_metadata` fields if needed
assert_eq!(table_metadata.table_uuid.to_string(), "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94");
} else {
panic!("Expected TabularMetadata::Table variant");
}
}

#[test]
fn test_parse_metadata_table_gzipped_json() {
let location = "/path/to/metadata/v1.gz.metadata.json";
let json_data = r#"
{
"format-version" : 2,
"table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
"location": "s3://b/wh/data.db/table",
"last-sequence-number" : 1,
"last-updated-ms": 1515100955770,
"last-column-id": 1,
"schemas": [
{
"schema-id" : 1,
"type" : "struct",
"fields" :[
{
"id": 1,
"name": "struct_name",
"required": true,
"type": "fixed[1]"
}
]
}
],
"current-schema-id" : 1,
"partition-specs": [
{
"spec-id": 1,
"fields": [
{
"source-id": 4,
"field-id": 1000,
"name": "ts_day",
"transform": "day"
}
]
}
],
"default-spec-id": 1,
"last-partition-id": 1,
"properties": {
"commit.retry.num-retries": "1"
},
"metadata-log": [
{
"metadata-file": "s3://bucket/.../v1.json",
"timestamp-ms": 1515100
}
],
"sort-orders": [],
"default-sort-order-id": 0
}
"#;

let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
encoder.write_all(json_data.as_bytes()).unwrap();
let compressed_data = encoder.finish().unwrap();

let result = parse_metadata(location, &compressed_data);
assert!(result.is_ok());
let metadata = result.unwrap();
if let TabularMetadata::Table(table_metadata) = metadata {
// Add specific checks for `table_metadata` fields if needed
assert_eq!(table_metadata.table_uuid.to_string(), "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94");
} else {
panic!("Expected TabularMetadata::Table variant");
}
}

#[test]
fn test_parse_metadata_view_plain_json() {
let location = "/path/to/metadata/v1.metadata.json";
let json_data = r#"
{
"view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
"format-version" : 1,
"location" : "s3://bucket/warehouse/default.db/event_agg",
"current-version-id" : 1,
"properties" : {
"comment" : "Daily event counts"
},
"versions" : [ {
"version-id" : 1,
"timestamp-ms" : 1573518431292,
"schema-id" : 1,
"default-catalog" : "prod",
"default-namespace" : [ "default" ],
"summary" : {
"operation" : "create",
"engine-name" : "Spark",
"engineVersion" : "3.3.2"
},
"representations" : [ {
"type" : "sql",
"sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
"dialect" : "spark"
} ]
} ],
"schemas": [ {
"schema-id": 1,
"type" : "struct",
"fields" : [ {
"id" : 1,
"name" : "event_count",
"required" : false,
"type" : "int",
"doc" : "Count of events"
}, {
"id" : 2,
"name" : "event_date",
"required" : false,
"type" : "date"
} ]
} ],
"version-log" : [ {
"timestamp-ms" : 1573518431292,
"version-id" : 1
} ]
}
"#;
let bytes = json_data.as_bytes();

let result = parse_metadata(location, bytes);
assert!(result.is_ok());
let metadata = result.unwrap();
if let TabularMetadata::View(view_metadata) = metadata {
// Add specific checks for `view_metadata` fields if needed
assert_eq!(view_metadata.view_uuid.to_string(), "fa6506c3-7681-40c8-86dc-e36561f83385");
} else {
panic!("Expected TabularMetadata::View variant");
}
}

#[test]
fn test_parse_metadata_view_gzipped_json() {
let location = "/path/to/metadata/v1.gz.metadata.json";
let json_data = r#"
{
"view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
"format-version" : 1,
"location" : "s3://bucket/warehouse/default.db/event_agg",
"current-version-id" : 1,
"properties" : {
"comment" : "Daily event counts"
},
"versions" : [ {
"version-id" : 1,
"timestamp-ms" : 1573518431292,
"schema-id" : 1,
"default-catalog" : "prod",
"default-namespace" : [ "default" ],
"summary" : {
"operation" : "create",
"engine-name" : "Spark",
"engineVersion" : "3.3.2"
},
"representations" : [ {
"type" : "sql",
"sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
"dialect" : "spark"
} ]
} ],
"schemas": [ {
"schema-id": 1,
"type" : "struct",
"fields" : [ {
"id" : 1,
"name" : "event_count",
"required" : false,
"type" : "int",
"doc" : "Count of events"
}, {
"id" : 2,
"name" : "event_date",
"required" : false,
"type" : "date"
} ]
} ],
"version-log" : [ {
"timestamp-ms" : 1573518431292,
"version-id" : 1
} ]
}
"#;

let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
encoder.write_all(json_data.as_bytes()).unwrap();
let compressed_data = encoder.finish().unwrap();

let result = parse_metadata(location, &compressed_data);
assert!(result.is_ok());
let metadata = result.unwrap();
if let TabularMetadata::View(view_metadata) = metadata {
// Add specific checks for `view_metadata` fields if needed
assert_eq!(view_metadata.view_uuid.to_string(), "fa6506c3-7681-40c8-86dc-e36561f83385");
} else {
panic!("Expected TabularMetadata::View variant");
}
}

#[test]
fn test_parse_metadata_invalid_json() {
let location = "/path/to/metadata/v1.metadata.json";
let invalid_json_data = r#"{"key": "value""#; // Missing closing brace
let bytes = invalid_json_data.as_bytes();

let result = parse_metadata(location, bytes);
assert!(result.is_err());
}

#[test]
fn test_parse_metadata_invalid_gzipped_data() {
let location = "/path/to/metadata/v1.gz.metadata.json";
let invalid_gzipped_data = b"not a valid gzip";

let result = parse_metadata(location, invalid_gzipped_data);
assert!(result.is_err());
}

#[test]
fn test_parse_metadata_empty_bytes() {
let location = "/path/to/metadata/v1.metadata.json";
let empty_bytes: &[u8] = &[];

let result = parse_metadata(location, empty_bytes);
assert!(result.is_err());
}

#[test]
fn test_parse_metadata_gzipped_empty_bytes() {
let location = "/path/to/metadata/v1.gz.metadata.json";
let empty_gzipped_bytes: &[u8] = &[];

let result = parse_metadata(location, empty_gzipped_bytes);
assert!(result.is_err());
}
}