diff --git a/crates/iceberg/src/arrow/int96.rs b/crates/iceberg/src/arrow/int96.rs new file mode 100644 index 0000000000..63a7a30f1a --- /dev/null +++ b/crates/iceberg/src/arrow/int96.rs @@ -0,0 +1,578 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! INT96 timestamp coercion for Parquet files. + +use std::sync::Arc; + +use arrow_schema::{ + DataType, Field, Fields, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit, +}; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::arrow::schema::{ArrowSchemaVisitor, DEFAULT_MAP_FIELD_NAME, visit_schema}; +use crate::error::Result; +use crate::spec::{PrimitiveType, Schema, Type}; +use crate::{Error, ErrorKind}; + +/// Coerce Arrow schema types for INT96 columns to match the Iceberg table schema. +/// +/// arrow-rs defaults INT96 to `Timestamp(Nanosecond)`, which overflows i64 for dates outside +/// ~1677-2262. We use arrow-rs's schema hint mechanism to read INT96 at the resolution +/// specified by the Iceberg schema (`timestamp` → microsecond, `timestamp_ns` → nanosecond). +/// +/// Iceberg Java handles this differently: it bypasses parquet-mr with a custom column reader +/// (`GenericParquetReaders.TimestampInt96Reader`). We achieve the same result via schema hints. +/// +/// References: +/// - Iceberg spec primitive types: +/// - arrow-rs schema hint support: +pub(crate) fn coerce_int96_timestamps( + arrow_schema: &ArrowSchemaRef, + iceberg_schema: &Schema, +) -> Option> { + let mut visitor = Int96CoercionVisitor::new(iceberg_schema); + let coerced = visit_schema(arrow_schema, &mut visitor).ok()?; + if visitor.changed { + Some(Arc::new(coerced)) + } else { + None + } +} + +/// Visitor that coerces `Timestamp(Nanosecond)` Arrow fields to the resolution +/// indicated by the Iceberg schema. +struct Int96CoercionVisitor<'a> { + iceberg_schema: &'a Schema, + // TODO(#2310): use FieldRef (Arc) once ArrowSchemaVisitor passes FieldRef. + field_stack: Vec, + changed: bool, +} + +impl<'a> Int96CoercionVisitor<'a> { + fn new(iceberg_schema: &'a Schema) -> Self { + Self { + iceberg_schema, + field_stack: Vec::new(), + changed: false, + } + } + + /// Determine the target TimeUnit for a Timestamp(Nanosecond) field based on the + /// Iceberg schema. Falls back to microsecond when field IDs are unavailable, + /// matching Iceberg Java behavior. + fn target_unit(&self, field: &Field) -> Option { + if !matches!( + field.data_type(), + DataType::Timestamp(TimeUnit::Nanosecond, _) + ) { + return None; + } + + let target = field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|id_str| id_str.parse::().ok()) + .and_then(|field_id| self.iceberg_schema.field_by_id(field_id)) + .and_then(|f| match &*f.field_type { + Type::Primitive(PrimitiveType::Timestamp | PrimitiveType::Timestamptz) => { + Some(TimeUnit::Microsecond) + } + Type::Primitive(PrimitiveType::TimestampNs | PrimitiveType::TimestamptzNs) => { + Some(TimeUnit::Nanosecond) + } + _ => None, + }) + // Iceberg Java reads INT96 as microseconds by default + .unwrap_or(TimeUnit::Microsecond); + + if target == TimeUnit::Nanosecond { + None + } else { + Some(target) + } + } +} + +impl ArrowSchemaVisitor for Int96CoercionVisitor<'_> { + type T = Field; + type U = ArrowSchema; + + fn before_field(&mut self, field: &Field) -> Result<()> { + self.field_stack.push(field.as_ref().clone()); + Ok(()) + } + + fn after_field(&mut self, _field: &Field) -> Result<()> { + self.field_stack.pop(); + Ok(()) + } + + fn before_list_element(&mut self, field: &Field) -> Result<()> { + self.field_stack.push(field.as_ref().clone()); + Ok(()) + } + + fn after_list_element(&mut self, _field: &Field) -> Result<()> { + self.field_stack.pop(); + Ok(()) + } + + fn before_map_key(&mut self, field: &Field) -> Result<()> { + self.field_stack.push(field.as_ref().clone()); + Ok(()) + } + + fn after_map_key(&mut self, _field: &Field) -> Result<()> { + self.field_stack.pop(); + Ok(()) + } + + fn before_map_value(&mut self, field: &Field) -> Result<()> { + self.field_stack.push(field.as_ref().clone()); + Ok(()) + } + + fn after_map_value(&mut self, _field: &Field) -> Result<()> { + self.field_stack.pop(); + Ok(()) + } + + fn schema(&mut self, schema: &ArrowSchema, values: Vec) -> Result { + Ok(ArrowSchema::new_with_metadata( + values, + schema.metadata().clone(), + )) + } + + fn r#struct(&mut self, _fields: &Fields, results: Vec) -> Result { + let field_info = self + .field_stack + .last() + .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in struct"))?; + Ok(Field::new( + field_info.name(), + DataType::Struct(Fields::from(results)), + field_info.is_nullable(), + ) + .with_metadata(field_info.metadata().clone())) + } + + fn list(&mut self, list: &DataType, value: Field) -> Result { + let field_info = self + .field_stack + .last() + .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in list"))?; + let list_type = match list { + DataType::List(_) => DataType::List(Arc::new(value)), + DataType::LargeList(_) => DataType::LargeList(Arc::new(value)), + DataType::FixedSizeList(_, size) => DataType::FixedSizeList(Arc::new(value), *size), + _ => { + return Err(Error::new( + ErrorKind::Unexpected, + format!("Expected list type, got {list}"), + )); + } + }; + Ok( + Field::new(field_info.name(), list_type, field_info.is_nullable()) + .with_metadata(field_info.metadata().clone()), + ) + } + + fn map(&mut self, map: &DataType, key_value: Field, value: Field) -> Result { + let field_info = self + .field_stack + .last() + .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in map"))?; + let sorted = match map { + DataType::Map(_, sorted) => *sorted, + _ => { + return Err(Error::new( + ErrorKind::Unexpected, + format!("Expected map type, got {map}"), + )); + } + }; + let struct_field = Field::new( + DEFAULT_MAP_FIELD_NAME, + DataType::Struct(Fields::from(vec![key_value, value])), + false, + ); + Ok(Field::new( + field_info.name(), + DataType::Map(Arc::new(struct_field), sorted), + field_info.is_nullable(), + ) + .with_metadata(field_info.metadata().clone())) + } + + fn primitive(&mut self, p: &DataType) -> Result { + let field_info = self.field_stack.last().ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "Field stack underflow in primitive") + })?; + + if let Some(target_unit) = self.target_unit(field_info) { + let tz = match field_info.data_type() { + DataType::Timestamp(_, tz) => tz.clone(), + _ => None, + }; + self.changed = true; + Ok(Field::new( + field_info.name(), + DataType::Timestamp(target_unit, tz), + field_info.is_nullable(), + ) + .with_metadata(field_info.metadata().clone())) + } else { + Ok( + Field::new(field_info.name(), p.clone(), field_info.is_nullable()) + .with_metadata(field_info.metadata().clone()), + ) + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + + use super::coerce_int96_timestamps; + use crate::spec::{ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type}; + + fn iceberg_schema_with_timestamp() -> Schema { + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp)).into(), + NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap() + } + + fn field_id_meta(id: i32) -> HashMap { + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), id.to_string())]) + } + + #[test] + fn test_coerce_timestamp_ns_to_us() { + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true) + .with_metadata(field_id_meta(1)), + Field::new("id", DataType::Int32, false).with_metadata(field_id_meta(2)), + ])); + let iceberg = iceberg_schema_with_timestamp(); + + let coerced = coerce_int96_timestamps(&arrow_schema, &iceberg).unwrap(); + assert_eq!( + coerced.field(0).data_type(), + &DataType::Timestamp(TimeUnit::Microsecond, None) + ); + // Non-timestamp field unchanged + assert_eq!(coerced.field(1).data_type(), &DataType::Int32); + } + + #[test] + fn test_coerce_timestamptz_ns_to_us() { + let iceberg = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamptz)).into(), + ]) + .build() + .unwrap(); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), + true, + ) + .with_metadata(field_id_meta(1)), + ])); + + let coerced = coerce_int96_timestamps(&arrow_schema, &iceberg).unwrap(); + assert_eq!( + coerced.field(0).data_type(), + &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())) + ); + } + + #[test] + fn test_no_coercion_when_iceberg_is_timestamp_ns() { + let iceberg = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::TimestampNs)).into(), + ]) + .build() + .unwrap(); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true) + .with_metadata(field_id_meta(1)), + ])); + + assert!(coerce_int96_timestamps(&arrow_schema, &iceberg).is_none()); + } + + #[test] + fn test_no_coercion_when_iceberg_is_timestamptz_ns() { + let iceberg = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::TimestamptzNs)) + .into(), + ]) + .build() + .unwrap(); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), + true, + ) + .with_metadata(field_id_meta(1)), + ])); + + assert!(coerce_int96_timestamps(&arrow_schema, &iceberg).is_none()); + } + + #[test] + fn test_no_coercion_when_already_microsecond() { + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("ts", DataType::Timestamp(TimeUnit::Microsecond, None), true) + .with_metadata(field_id_meta(1)), + Field::new("id", DataType::Int32, false).with_metadata(field_id_meta(2)), + ])); + let iceberg = iceberg_schema_with_timestamp(); + + assert!(coerce_int96_timestamps(&arrow_schema, &iceberg).is_none()); + } + + // Without field IDs, the visitor can't look up the Iceberg type and falls back + // to microsecond to match Iceberg Java behavior. + #[test] + fn test_defaults_to_us_without_field_ids() { + let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "ts", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )])); + let iceberg = iceberg_schema_with_timestamp(); + + let coerced = coerce_int96_timestamps(&arrow_schema, &iceberg).unwrap(); + assert_eq!( + coerced.field(0).data_type(), + &DataType::Timestamp(TimeUnit::Microsecond, None) + ); + } + + // Field ID exists but points to a non-timestamp Iceberg type. The field_by_id + // lookup succeeds but the match arm returns None, so unwrap_or falls back to + // microsecond. + #[test] + fn test_defaults_to_us_when_iceberg_type_is_not_timestamp() { + let iceberg = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true) + .with_metadata(field_id_meta(1)), + ])); + + let coerced = coerce_int96_timestamps(&arrow_schema, &iceberg).unwrap(); + assert_eq!( + coerced.field(0).data_type(), + &DataType::Timestamp(TimeUnit::Microsecond, None) + ); + } + + #[test] + fn test_coerce_preserves_field_metadata() { + let mut meta = field_id_meta(1); + meta.insert("custom_key".to_string(), "custom_value".to_string()); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true) + .with_metadata(meta.clone()), + ])); + let iceberg = iceberg_schema_with_timestamp(); + + let coerced = coerce_int96_timestamps(&arrow_schema, &iceberg).unwrap(); + assert_eq!(coerced.field(0).metadata(), &meta); + } + + #[test] + fn test_coerce_timestamp_in_struct() { + let iceberg = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required( + 1, + "data", + Type::Struct(StructType::new(vec![ + NestedField::optional(2, "ts", Type::Primitive(PrimitiveType::Timestamp)) + .into(), + ])), + ) + .into(), + ]) + .build() + .unwrap(); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + "data", + DataType::Struct( + vec![ + Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true) + .with_metadata(field_id_meta(2)), + ] + .into(), + ), + false, + ) + .with_metadata(field_id_meta(1)), + ])); + + let coerced = coerce_int96_timestamps(&arrow_schema, &iceberg).unwrap(); + let inner = match coerced.field(0).data_type() { + DataType::Struct(fields) => fields, + other => panic!("Expected Struct, got {other}"), + }; + assert_eq!( + inner[0].data_type(), + &DataType::Timestamp(TimeUnit::Microsecond, None) + ); + } + + #[test] + fn test_coerce_timestamp_in_list() { + let iceberg = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional( + 1, + "timestamps", + Type::List(ListType { + element_field: NestedField::optional( + 2, + "element", + Type::Primitive(PrimitiveType::Timestamp), + ) + .into(), + }), + ) + .into(), + ]) + .build() + .unwrap(); + + let element_field = Field::new( + "element", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ) + .with_metadata(field_id_meta(2)); + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("timestamps", DataType::List(Arc::new(element_field)), true) + .with_metadata(field_id_meta(1)), + ])); + + let coerced = coerce_int96_timestamps(&arrow_schema, &iceberg).unwrap(); + let element_dt = match coerced.field(0).data_type() { + DataType::List(f) => f.data_type(), + other => panic!("Expected List, got {other}"), + }; + assert_eq!( + element_dt, + &DataType::Timestamp(TimeUnit::Microsecond, None) + ); + } + + #[test] + fn test_coerce_timestamp_in_map_value() { + let iceberg = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional( + 1, + "ts_map", + Type::Map(MapType { + key_field: NestedField::required( + 2, + "key", + Type::Primitive(PrimitiveType::String), + ) + .into(), + value_field: NestedField::optional( + 3, + "value", + Type::Primitive(PrimitiveType::Timestamp), + ) + .into(), + }), + ) + .into(), + ]) + .build() + .unwrap(); + + let key_field = Field::new("key", DataType::Utf8, false).with_metadata(field_id_meta(2)); + let value_field = Field::new( + "value", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ) + .with_metadata(field_id_meta(3)); + let entries_field = Field::new( + "key_value", + DataType::Struct(vec![key_field, value_field].into()), + false, + ); + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + "ts_map", + DataType::Map(Arc::new(entries_field), false), + true, + ) + .with_metadata(field_id_meta(1)), + ])); + + let coerced = coerce_int96_timestamps(&arrow_schema, &iceberg).unwrap(); + let value_dt = match coerced.field(0).data_type() { + DataType::Map(entries, _) => match entries.data_type() { + DataType::Struct(fields) => fields[1].data_type().clone(), + other => panic!("Expected Struct inside Map, got {other}"), + }, + other => panic!("Expected Map, got {other}"), + }; + assert_eq!(value_dt, DataType::Timestamp(TimeUnit::Microsecond, None)); + } +} diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index c091c45177..7823320452 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -27,6 +27,7 @@ pub(crate) mod caching_delete_file_loader; pub mod delete_file_loader; pub(crate) mod delete_filter; +mod int96; mod reader; /// RecordBatch projection utilities pub mod record_batch_projector; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 9ccf1ac3d7..488f41cf20 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -46,6 +46,7 @@ use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; use typed_builder::TypedBuilder; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; +use crate::arrow::int96::coerce_int96_timestamps; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::delete_vector::DeleteVector; @@ -386,6 +387,27 @@ impl ArrowReader { arrow_metadata }; + // Coerce INT96 timestamp columns to the resolution specified by the Iceberg schema. + // This must happen before building the stream reader to avoid i64 overflow in arrow-rs. + let arrow_metadata = if let Some(coerced_schema) = + coerce_int96_timestamps(arrow_metadata.schema(), &task.schema) + { + let options = ArrowReaderOptions::new().with_schema(Arc::clone(&coerced_schema)); + ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err( + |e| { + Error::new( + ErrorKind::Unexpected, + format!( + "Failed to create ArrowReaderMetadata with INT96-coerced schema: {coerced_schema}" + ), + ) + .with_source(e) + }, + )? + } else { + arrow_metadata + }; + // Build the stream reader, reusing the already-opened file reader let mut record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata); @@ -4889,4 +4911,557 @@ message schema { .collect(); assert_eq!(ids, vec![2, 3]); } + + // INT96 encoding: [nanos_low_u32, nanos_high_u32, julian_day_u32] + // Julian day 2_440_588 = Unix epoch (1970-01-01) + const UNIX_EPOCH_JULIAN: i64 = 2_440_588; + const MICROS_PER_DAY: i64 = 86_400_000_000; + // Noon on 3333-01-01 (Julian day 2_953_529) — outside the i64 nanosecond range (~1677-2262). + const INT96_TEST_NANOS_WITHIN_DAY: u64 = 43_200_000_000_000; + const INT96_TEST_JULIAN_DAY: u32 = 2_953_529; + + fn make_int96_test_value() -> (parquet::data_type::Int96, i64) { + let mut val = parquet::data_type::Int96::new(); + val.set_data( + (INT96_TEST_NANOS_WITHIN_DAY & 0xFFFFFFFF) as u32, + (INT96_TEST_NANOS_WITHIN_DAY >> 32) as u32, + INT96_TEST_JULIAN_DAY, + ); + let expected_micros = (INT96_TEST_JULIAN_DAY as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY + + (INT96_TEST_NANOS_WITHIN_DAY / 1_000) as i64; + (val, expected_micros) + } + + async fn read_int96_batches( + file_path: &str, + schema: SchemaRef, + project_field_ids: Vec, + ) -> Vec { + let file_io = FileIO::new_with_fs(); + let reader = ArrowReaderBuilder::new(file_io).build(); + + let file_size = std::fs::metadata(file_path).unwrap().len(); + let task = FileScanTask { + file_size_in_bytes: file_size, + start: 0, + length: file_size, + record_count: None, + data_file_path: file_path.to_string(), + data_file_format: DataFileFormat::Parquet, + schema, + project_field_ids, + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }; + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + reader.read(tasks).unwrap().try_collect().await.unwrap() + } + + // ArrowWriter cannot write INT96, so we use SerializedFileWriter directly. + fn write_int96_parquet_file( + table_location: &str, + filename: &str, + with_field_ids: bool, + ) -> (String, Vec) { + use parquet::basic::{Repetition, Type as PhysicalType}; + use parquet::data_type::{Int32Type, Int96, Int96Type}; + use parquet::file::writer::SerializedFileWriter; + use parquet::schema::types::Type as SchemaType; + + let file_path = format!("{table_location}/{filename}"); + + let mut ts_builder = SchemaType::primitive_type_builder("ts", PhysicalType::INT96) + .with_repetition(Repetition::OPTIONAL); + let mut id_builder = SchemaType::primitive_type_builder("id", PhysicalType::INT32) + .with_repetition(Repetition::REQUIRED); + + if with_field_ids { + ts_builder = ts_builder.with_id(Some(1)); + id_builder = id_builder.with_id(Some(2)); + } + + let schema = SchemaType::group_type_builder("schema") + .with_fields(vec![ + Arc::new(ts_builder.build().unwrap()), + Arc::new(id_builder.build().unwrap()), + ]) + .build() + .unwrap(); + + // Dates outside the i64 nanosecond range (~1677-2262) overflow without coercion. + const NOON_NANOS: u64 = INT96_TEST_NANOS_WITHIN_DAY; + const JULIAN_3333: u32 = INT96_TEST_JULIAN_DAY; + const JULIAN_2100: u32 = 2_488_070; + + let test_data: Vec<(u32, u32, u32, i64)> = vec![ + // 3333-01-01 00:00:00 + ( + 0, + 0, + JULIAN_3333, + (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY, + ), + // 3333-01-01 12:00:00 + ( + (NOON_NANOS & 0xFFFFFFFF) as u32, + (NOON_NANOS >> 32) as u32, + JULIAN_3333, + (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY + + (NOON_NANOS / 1_000) as i64, + ), + // 2100-01-01 00:00:00 + ( + 0, + 0, + JULIAN_2100, + (JULIAN_2100 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY, + ), + ]; + + let int96_values: Vec = test_data + .iter() + .map(|(lo, hi, day, _)| { + let mut v = Int96::new(); + v.set_data(*lo, *hi, *day); + v + }) + .collect(); + + let id_values: Vec = (0..test_data.len() as i32).collect(); + let expected_micros: Vec = test_data.iter().map(|(_, _, _, m)| *m).collect(); + + let file = File::create(&file_path).unwrap(); + let mut writer = + SerializedFileWriter::new(file, Arc::new(schema), Default::default()).unwrap(); + + let mut row_group = writer.next_row_group().unwrap(); + { + // def=1: ts is OPTIONAL and present. No repetition levels (top-level columns). + let mut col = row_group.next_column().unwrap().unwrap(); + col.typed::() + .write_batch(&int96_values, Some(&vec![1; test_data.len()]), None) + .unwrap(); + col.close().unwrap(); + } + { + let mut col = row_group.next_column().unwrap().unwrap(); + col.typed::() + .write_batch(&id_values, None, None) + .unwrap(); + col.close().unwrap(); + } + row_group.close().unwrap(); + writer.close().unwrap(); + + (file_path, expected_micros) + } + + async fn assert_int96_read_matches( + file_path: &str, + schema: SchemaRef, + project_field_ids: Vec, + expected_micros: &[i64], + ) { + use arrow_array::TimestampMicrosecondArray; + + let batches = read_int96_batches(file_path, schema, project_field_ids).await; + + assert_eq!(batches.len(), 1); + let ts_array = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .expect("Expected TimestampMicrosecondArray"); + + for (i, expected) in expected_micros.iter().enumerate() { + assert_eq!( + ts_array.value(i), + *expected, + "Row {i}: got {}, expected {expected}", + ts_array.value(i) + ); + } + } + + #[tokio::test] + async fn test_read_int96_timestamps_with_field_ids() { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp)) + .into(), + NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let (file_path, expected_micros) = + write_int96_parquet_file(&table_location, "with_ids.parquet", true); + + assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await; + } + + #[tokio::test] + async fn test_read_int96_timestamps_without_field_ids() { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp)) + .into(), + NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let (file_path, expected_micros) = + write_int96_parquet_file(&table_location, "no_ids.parquet", false); + + assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await; + } + + #[tokio::test] + async fn test_read_int96_timestamps_in_struct() { + use arrow_array::{StructArray, TimestampMicrosecondArray}; + use parquet::basic::{Repetition, Type as PhysicalType}; + use parquet::data_type::Int96Type; + use parquet::file::writer::SerializedFileWriter; + use parquet::schema::types::Type as SchemaType; + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_path = format!("{table_location}/struct_int96.parquet"); + + let ts_type = SchemaType::primitive_type_builder("ts", PhysicalType::INT96) + .with_repetition(Repetition::OPTIONAL) + .with_id(Some(2)) + .build() + .unwrap(); + + let struct_type = SchemaType::group_type_builder("data") + .with_repetition(Repetition::REQUIRED) + .with_id(Some(1)) + .with_fields(vec![Arc::new(ts_type)]) + .build() + .unwrap(); + + let parquet_schema = SchemaType::group_type_builder("schema") + .with_fields(vec![Arc::new(struct_type)]) + .build() + .unwrap(); + + let (int96_val, expected_micros) = make_int96_test_value(); + + let file = File::create(&file_path).unwrap(); + let mut writer = + SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap(); + + // def=1: struct is REQUIRED so no level, ts is OPTIONAL and present (1). + // No repetition levels needed (no repeated groups). + let mut row_group = writer.next_row_group().unwrap(); + { + let mut col = row_group.next_column().unwrap().unwrap(); + col.typed::() + .write_batch(&[int96_val], Some(&[1]), None) + .unwrap(); + col.close().unwrap(); + } + row_group.close().unwrap(); + writer.close().unwrap(); + + let iceberg_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required( + 1, + "data", + Type::Struct(crate::spec::StructType::new(vec![ + NestedField::optional( + 2, + "ts", + Type::Primitive(PrimitiveType::Timestamp), + ) + .into(), + ])), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await; + + assert_eq!(batches.len(), 1); + let struct_array = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .expect("Expected StructArray"); + let ts_array = struct_array + .column(0) + .as_any() + .downcast_ref::() + .expect("Expected TimestampMicrosecondArray inside struct"); + + assert_eq!( + ts_array.value(0), + expected_micros, + "INT96 in struct: got {}, expected {expected_micros}", + ts_array.value(0) + ); + } + + #[tokio::test] + async fn test_read_int96_timestamps_in_list() { + use arrow_array::{ListArray, TimestampMicrosecondArray}; + use parquet::basic::{Repetition, Type as PhysicalType}; + use parquet::data_type::Int96Type; + use parquet::file::writer::SerializedFileWriter; + use parquet::schema::types::Type as SchemaType; + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_path = format!("{table_location}/list_int96.parquet"); + + // 3-level LIST encoding: + // optional group timestamps (LIST) { + // repeated group list { + // optional int96 element; + // } + // } + let element_type = SchemaType::primitive_type_builder("element", PhysicalType::INT96) + .with_repetition(Repetition::OPTIONAL) + .with_id(Some(2)) + .build() + .unwrap(); + + let list_group = SchemaType::group_type_builder("list") + .with_repetition(Repetition::REPEATED) + .with_fields(vec![Arc::new(element_type)]) + .build() + .unwrap(); + + let list_type = SchemaType::group_type_builder("timestamps") + .with_repetition(Repetition::OPTIONAL) + .with_id(Some(1)) + .with_logical_type(Some(parquet::basic::LogicalType::List)) + .with_fields(vec![Arc::new(list_group)]) + .build() + .unwrap(); + + let parquet_schema = SchemaType::group_type_builder("schema") + .with_fields(vec![Arc::new(list_type)]) + .build() + .unwrap(); + + let (int96_val, expected_micros) = make_int96_test_value(); + + let file = File::create(&file_path).unwrap(); + let mut writer = + SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap(); + + // Write a single row with a list containing one INT96 element. + // def=3: list present (1) + repeated group (2) + element present (3) + // rep=0: start of a new list + let mut row_group = writer.next_row_group().unwrap(); + { + let mut col = row_group.next_column().unwrap().unwrap(); + col.typed::() + .write_batch(&[int96_val], Some(&[3]), Some(&[0])) + .unwrap(); + col.close().unwrap(); + } + row_group.close().unwrap(); + writer.close().unwrap(); + + let iceberg_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional( + 1, + "timestamps", + Type::List(crate::spec::ListType { + element_field: NestedField::optional( + 2, + "element", + Type::Primitive(PrimitiveType::Timestamp), + ) + .into(), + }), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await; + + assert_eq!(batches.len(), 1); + let list_array = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .expect("Expected ListArray"); + let ts_array = list_array + .values() + .as_any() + .downcast_ref::() + .expect("Expected TimestampMicrosecondArray inside list"); + + assert_eq!( + ts_array.value(0), + expected_micros, + "INT96 in list: got {}, expected {expected_micros}", + ts_array.value(0) + ); + } + + #[tokio::test] + async fn test_read_int96_timestamps_in_map() { + use arrow_array::{MapArray, TimestampMicrosecondArray}; + use parquet::basic::{Repetition, Type as PhysicalType}; + use parquet::data_type::{ByteArrayType, Int96Type}; + use parquet::file::writer::SerializedFileWriter; + use parquet::schema::types::Type as SchemaType; + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_path = format!("{table_location}/map_int96.parquet"); + + // MAP encoding: + // optional group ts_map (MAP) { + // repeated group key_value { + // required binary key (UTF8); + // optional int96 value; + // } + // } + let key_type = SchemaType::primitive_type_builder("key", PhysicalType::BYTE_ARRAY) + .with_repetition(Repetition::REQUIRED) + .with_logical_type(Some(parquet::basic::LogicalType::String)) + .with_id(Some(2)) + .build() + .unwrap(); + + let value_type = SchemaType::primitive_type_builder("value", PhysicalType::INT96) + .with_repetition(Repetition::OPTIONAL) + .with_id(Some(3)) + .build() + .unwrap(); + + let key_value_group = SchemaType::group_type_builder("key_value") + .with_repetition(Repetition::REPEATED) + .with_fields(vec![Arc::new(key_type), Arc::new(value_type)]) + .build() + .unwrap(); + + let map_type = SchemaType::group_type_builder("ts_map") + .with_repetition(Repetition::OPTIONAL) + .with_id(Some(1)) + .with_logical_type(Some(parquet::basic::LogicalType::Map)) + .with_fields(vec![Arc::new(key_value_group)]) + .build() + .unwrap(); + + let parquet_schema = SchemaType::group_type_builder("schema") + .with_fields(vec![Arc::new(map_type)]) + .build() + .unwrap(); + + let (int96_val, expected_micros) = make_int96_test_value(); + + let file = File::create(&file_path).unwrap(); + let mut writer = + SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap(); + + // Write a single row with a map containing one key-value pair. + // rep=0 for both columns: start of a new map. + // key def=2: map present (1) + key_value entry present (2), key is REQUIRED. + // value def=3: map present (1) + key_value entry present (2) + value present (3). + let mut row_group = writer.next_row_group().unwrap(); + { + let mut col = row_group.next_column().unwrap().unwrap(); + col.typed::() + .write_batch( + &[parquet::data_type::ByteArray::from("event_time")], + Some(&[2]), + Some(&[0]), + ) + .unwrap(); + col.close().unwrap(); + } + { + let mut col = row_group.next_column().unwrap().unwrap(); + col.typed::() + .write_batch(&[int96_val], Some(&[3]), Some(&[0])) + .unwrap(); + col.close().unwrap(); + } + row_group.close().unwrap(); + writer.close().unwrap(); + + let iceberg_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional( + 1, + "ts_map", + Type::Map(crate::spec::MapType { + key_field: NestedField::required( + 2, + "key", + Type::Primitive(PrimitiveType::String), + ) + .into(), + value_field: NestedField::optional( + 3, + "value", + Type::Primitive(PrimitiveType::Timestamp), + ) + .into(), + }), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await; + + assert_eq!(batches.len(), 1); + let map_array = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .expect("Expected MapArray"); + let ts_array = map_array + .values() + .as_any() + .downcast_ref::() + .expect("Expected TimestampMicrosecondArray as map values"); + + assert_eq!( + ts_array.value(0), + expected_micros, + "INT96 in map: got {}, expected {expected_micros}", + ts_array.value(0) + ); + } } diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index bd9e249f48..f96c29ab4a 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -199,7 +199,10 @@ fn visit_struct(fields: &Fields, visitor: &mut V) -> Resu } /// Visit schema in post order. -fn visit_schema(schema: &ArrowSchema, visitor: &mut V) -> Result { +pub(crate) fn visit_schema( + schema: &ArrowSchema, + visitor: &mut V, +) -> Result { let mut results = Vec::with_capacity(schema.fields().len()); for field in schema.fields() { visitor.before_field(field)?;