-
Notifications
You must be signed in to change notification settings - Fork 446
feat: Add iceberg.schema to footer for compatibility
#2249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d069194
1c20f0c
0eaca25
6692efe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,7 @@ use itertools::Itertools; | |
| use parquet::arrow::AsyncArrowWriter; | ||
| use parquet::arrow::async_reader::AsyncFileReader; | ||
| use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; | ||
| use parquet::file::metadata::ParquetMetaData; | ||
| use parquet::file::metadata::{KeyValue, ParquetMetaData}; | ||
| use parquet::file::properties::WriterProperties; | ||
| use parquet::file::statistics::Statistics; | ||
|
|
||
|
|
@@ -46,6 +46,9 @@ use crate::transform::create_transform_function; | |
| use crate::writer::{CurrentFileStatus, DataFile}; | ||
| use crate::{Error, ErrorKind, Result}; | ||
|
|
||
| /// The key used to store the Iceberg schema JSON in the Parquet file footer metadata. | ||
| const ICEBERG_SCHEMA_KEY: &str = "iceberg.schema"; | ||
|
|
||
| /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] | ||
| #[derive(Clone, Debug)] | ||
| pub struct ParquetWriterBuilder { | ||
|
|
@@ -67,12 +70,33 @@ impl ParquetWriterBuilder { | |
| schema: SchemaRef, | ||
| match_mode: FieldMatchMode, | ||
| ) -> Self { | ||
| let props = Self::add_iceberg_schema_metadata(props, &schema); | ||
| Self { | ||
| props, | ||
| schema, | ||
| match_mode, | ||
| } | ||
| } | ||
|
|
||
| /// Adds the `iceberg.schema` key-value metadata to the Parquet writer properties. | ||
| /// | ||
| /// This embeds the full Iceberg schema JSON (including field IDs, types, and | ||
| /// schema ID) into the Parquet file footer. | ||
| fn add_iceberg_schema_metadata(props: WriterProperties, schema: &Schema) -> WriterProperties { | ||
| let schema_json = serde_json::to_string(schema) | ||
| .expect("Iceberg schema serialization to JSON should not fail"); | ||
|
|
||
| let iceberg_kv = KeyValue::new(ICEBERG_SCHEMA_KEY.to_string(), schema_json); | ||
|
|
||
| // Preserve any existing key-value metadata from the caller | ||
| let mut kv_metadata = props.key_value_metadata().cloned().unwrap_or_default(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible we're better propagating this error upstream also rather than silently ignoring it? |
||
| kv_metadata.push(iceberg_kv); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a caller has already set |
||
|
|
||
| props | ||
| .into_builder() | ||
| .set_key_value_metadata(Some(kv_metadata)) | ||
| .build() | ||
| } | ||
| } | ||
|
|
||
| impl FileWriterBuilder for ParquetWriterBuilder { | ||
|
|
@@ -2279,4 +2303,304 @@ mod tests { | |
| assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))])); | ||
| assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))])); | ||
| } | ||
|
|
||
| /// Helper to read the Parquet file footer key-value metadata from a written data file. | ||
| async fn read_parquet_kv_metadata( | ||
| file_io: &FileIO, | ||
| data_file: &DataFile, | ||
| ) -> Option<Vec<parquet::file::metadata::KeyValue>> { | ||
| use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; | ||
|
|
||
| let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); | ||
| let input_content = input_file.read().await.unwrap(); | ||
| let reader_builder = ParquetRecordBatchReaderBuilder::try_new(input_content).unwrap(); | ||
| reader_builder | ||
| .metadata() | ||
| .file_metadata() | ||
| .key_value_metadata() | ||
| .cloned() | ||
| } | ||
|
|
||
| /// Helper to write a simple parquet file and return the data file, file_io, | ||
| /// and TempDir (must be kept alive so the file remains on disk). | ||
| async fn write_simple_parquet_file( | ||
| props: WriterProperties, | ||
| ) -> Result<(DataFile, FileIO, TempDir)> { | ||
| let temp_dir = TempDir::new().unwrap(); | ||
| let file_io = FileIO::new_with_fs(); | ||
| let location_gen = DefaultLocationGenerator::with_data_location( | ||
| temp_dir.path().to_str().unwrap().to_string(), | ||
| ); | ||
| let file_name_gen = | ||
| DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); | ||
|
|
||
| let iceberg_schema = Arc::new( | ||
| Schema::builder() | ||
| .with_schema_id(1) | ||
| .with_fields(vec![ | ||
| NestedField::required(0, "id", Type::Primitive(PrimitiveType::Long)).into(), | ||
| NestedField::optional(1, "name", Type::Primitive(PrimitiveType::String)).into(), | ||
| ]) | ||
| .build() | ||
| .unwrap(), | ||
| ); | ||
|
|
||
| let arrow_schema: ArrowSchemaRef = Arc::new(iceberg_schema.as_ref().try_into().unwrap()); | ||
| let col0 = Arc::new(Int64Array::from_iter_values(0..10)) as ArrayRef; | ||
| let col1 = Arc::new(arrow_array::StringArray::from(vec![ | ||
| Some("a"), | ||
| Some("b"), | ||
| Some("c"), | ||
| None, | ||
| Some("e"), | ||
| Some("f"), | ||
| Some("g"), | ||
| Some("h"), | ||
| Some("i"), | ||
| Some("j"), | ||
| ])) as ArrayRef; | ||
| let batch = RecordBatch::try_new(arrow_schema.clone(), vec![col0, col1]).unwrap(); | ||
|
|
||
| let output_file = file_io.new_output( | ||
| location_gen.generate_location(None, &file_name_gen.generate_file_name()), | ||
| )?; | ||
|
|
||
| let mut pw = ParquetWriterBuilder::new(props, iceberg_schema) | ||
| .build(output_file) | ||
| .await?; | ||
| pw.write(&batch).await?; | ||
| let res = pw.close().await?; | ||
| assert_eq!(res.len(), 1); | ||
| let data_file = res | ||
| .into_iter() | ||
| .next() | ||
| .unwrap() | ||
| .content(DataContentType::Data) | ||
| .partition(Struct::empty()) | ||
| .partition_spec_id(0) | ||
| .build() | ||
| .unwrap(); | ||
|
|
||
| Ok((data_file, file_io, temp_dir)) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_iceberg_schema_metadata_present_in_parquet_footer() -> Result<()> { | ||
| let (data_file, file_io, _temp_dir) = | ||
| write_simple_parquet_file(WriterProperties::builder().build()).await?; | ||
|
|
||
| let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file).await; | ||
| let kv_metadata = kv_metadata.expect("key_value_metadata should be present"); | ||
|
|
||
| let iceberg_schema_entry = kv_metadata.iter().find(|kv| kv.key == ICEBERG_SCHEMA_KEY); | ||
| assert!( | ||
| iceberg_schema_entry.is_some(), | ||
| "Parquet footer must contain '{ICEBERG_SCHEMA_KEY}' key-value metadata" | ||
| ); | ||
|
|
||
| let schema_json = iceberg_schema_entry | ||
| .unwrap() | ||
| .value | ||
| .as_ref() | ||
| .expect("iceberg.schema value should not be None"); | ||
| assert!( | ||
| !schema_json.is_empty(), | ||
| "iceberg.schema JSON should not be empty" | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_iceberg_schema_metadata_roundtrip() -> Result<()> { | ||
| let (data_file, file_io, _temp_dir) = | ||
| write_simple_parquet_file(WriterProperties::builder().build()).await?; | ||
|
|
||
| let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file) | ||
| .await | ||
| .expect("key_value_metadata should be present"); | ||
|
|
||
| let schema_json = kv_metadata | ||
| .iter() | ||
| .find(|kv| kv.key == ICEBERG_SCHEMA_KEY) | ||
| .expect("iceberg.schema key must be present") | ||
| .value | ||
| .as_ref() | ||
| .expect("iceberg.schema value must not be None"); | ||
|
|
||
| // Deserialize back to an Iceberg Schema | ||
| let deserialized: Schema = serde_json::from_str(schema_json) | ||
| .expect("iceberg.schema JSON should deserialize to a valid Schema"); | ||
|
|
||
| // Verify schema ID | ||
| assert_eq!(deserialized.schema_id(), 1); | ||
|
|
||
| // Verify field count | ||
| assert_eq!(deserialized.as_struct().fields().len(), 2); | ||
|
|
||
| // Verify field IDs and names | ||
| let id_field = deserialized.field_by_id(0).expect("field 0 should exist"); | ||
| assert_eq!(id_field.name, "id"); | ||
|
|
||
| let name_field = deserialized.field_by_id(1).expect("field 1 should exist"); | ||
| assert_eq!(name_field.name, "name"); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_iceberg_schema_metadata_with_nested_schema() -> Result<()> { | ||
| // Use a nested schema with Struct and List types to verify the iceberg.schema | ||
| // JSON correctly roundtrips complex types including field IDs. | ||
| let schema = Schema::builder() | ||
| .with_schema_id(2) | ||
| .with_fields(vec![ | ||
| NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), | ||
| NestedField::optional( | ||
| 2, | ||
| "location", | ||
| Type::Struct(StructType::new(vec![ | ||
| NestedField::required(3, "lat", Type::Primitive(PrimitiveType::Double)) | ||
| .into(), | ||
| NestedField::required(4, "lon", Type::Primitive(PrimitiveType::Double)) | ||
| .into(), | ||
| ])), | ||
| ) | ||
| .into(), | ||
| NestedField::optional( | ||
| 5, | ||
| "tags", | ||
| Type::List(ListType::new( | ||
| NestedField::required(6, "element", Type::Primitive(PrimitiveType::String)) | ||
| .into(), | ||
| )), | ||
| ) | ||
| .into(), | ||
| ]) | ||
| .build() | ||
| .unwrap(); | ||
|
|
||
| let temp_dir = TempDir::new().unwrap(); | ||
| let file_io = FileIO::new_with_fs(); | ||
| let location_gen = DefaultLocationGenerator::with_data_location( | ||
| temp_dir.path().to_str().unwrap().to_string(), | ||
| ); | ||
| let file_name_gen = | ||
| DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); | ||
|
|
||
| let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap()); | ||
|
|
||
| // Build columns | ||
| let col_id = Arc::new(Int64Array::from(vec![1i64, 2, 3])) as ArrayRef; | ||
| let col_location = Arc::new(StructArray::new( | ||
| { | ||
| if let DataType::Struct(fields) = arrow_schema.field(1).data_type() { | ||
| fields.clone() | ||
| } else { | ||
| unreachable!() | ||
| } | ||
| }, | ||
| vec![ | ||
| Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef, | ||
| Arc::new(Float64Array::from(vec![4.0, 5.0, 6.0])) as ArrayRef, | ||
| ], | ||
| None, | ||
| )) as ArrayRef; | ||
| let col_tags = Arc::new({ | ||
| let values = arrow_array::StringArray::from(vec!["a", "b", "c", "d", "e", "f"]); | ||
| let offsets = arrow_buffer::OffsetBuffer::new(arrow_buffer::ScalarBuffer::from(vec![ | ||
| 0i32, 2, 4, 6, | ||
| ])); | ||
| let field = arrow_schema.field(2).clone(); | ||
| if let DataType::List(inner) = field.data_type() { | ||
| ListArray::new(inner.clone(), offsets, Arc::new(values), None) | ||
| } else { | ||
| unreachable!() | ||
| } | ||
| }) as ArrayRef; | ||
|
|
||
| let batch = | ||
| RecordBatch::try_new(arrow_schema.clone(), vec![col_id, col_location, col_tags]) | ||
| .unwrap(); | ||
|
|
||
| let output_file = file_io.new_output( | ||
| location_gen.generate_location(None, &file_name_gen.generate_file_name()), | ||
| )?; | ||
|
|
||
| let mut pw = ParquetWriterBuilder::new( | ||
| WriterProperties::builder().build(), | ||
| Arc::new(schema.clone()), | ||
| ) | ||
| .build(output_file) | ||
| .await?; | ||
| pw.write(&batch).await?; | ||
| let res = pw.close().await?; | ||
| assert_eq!(res.len(), 1); | ||
| let data_file = res | ||
| .into_iter() | ||
| .next() | ||
| .unwrap() | ||
| .content(DataContentType::Data) | ||
| .partition(Struct::empty()) | ||
| .partition_spec_id(0) | ||
| .build() | ||
| .unwrap(); | ||
|
|
||
| let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file) | ||
| .await | ||
| .expect("key_value_metadata should be present"); | ||
|
|
||
| let schema_json = kv_metadata | ||
| .iter() | ||
| .find(|kv| kv.key == ICEBERG_SCHEMA_KEY) | ||
| .expect("iceberg.schema key must be present for nested schema") | ||
| .value | ||
| .as_ref() | ||
| .expect("iceberg.schema value must not be None"); | ||
|
|
||
| let deserialized: Schema = serde_json::from_str(schema_json) | ||
| .expect("nested iceberg.schema JSON should deserialize"); | ||
|
|
||
| // Verify the nested schema round-trips correctly | ||
| assert_eq!(deserialized, schema); | ||
|
|
||
| // Spot-check nested field IDs survived the roundtrip | ||
| assert_eq!(deserialized.field_by_id(3).unwrap().name, "lat"); | ||
| assert_eq!(deserialized.field_by_id(6).unwrap().name, "element"); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_iceberg_schema_preserves_existing_kv_metadata() -> Result<()> { | ||
| let props = WriterProperties::builder() | ||
| .set_key_value_metadata(Some(vec![parquet::file::metadata::KeyValue::new( | ||
| "custom.key".to_string(), | ||
| "custom_value".to_string(), | ||
| )])) | ||
| .build(); | ||
|
|
||
| let (data_file, file_io, _temp_dir) = write_simple_parquet_file(props).await?; | ||
|
|
||
| let kv_metadata = read_parquet_kv_metadata(&file_io, &data_file) | ||
| .await | ||
| .expect("key_value_metadata should be present"); | ||
|
|
||
| // The caller's custom metadata should still be present | ||
| let custom_entry = kv_metadata.iter().find(|kv| kv.key == "custom.key"); | ||
| assert!( | ||
| custom_entry.is_some(), | ||
| "Caller's custom key-value metadata must be preserved" | ||
| ); | ||
| assert_eq!(custom_entry.unwrap().value.as_deref(), Some("custom_value")); | ||
|
|
||
| // iceberg.schema should also be present | ||
| let iceberg_entry = kv_metadata.iter().find(|kv| kv.key == ICEBERG_SCHEMA_KEY); | ||
| assert!( | ||
| iceberg_entry.is_some(), | ||
| "iceberg.schema must be present alongside custom metadata" | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we propagate this error up rather than having it panic?