diff --git a/catalogs/iceberg-rest-catalog/src/catalog.rs b/catalogs/iceberg-rest-catalog/src/catalog.rs index e4f589f7..84691ac4 100644 --- a/catalogs/iceberg-rest-catalog/src/catalog.rs +++ b/catalogs/iceberg-rest-catalog/src/catalog.rs @@ -393,7 +393,7 @@ impl Catalog for RestCatalog { self.name.as_deref(), &identifier.namespace().to_string(), create_table, - None, + Some("vended-credentials"), ) .map_err(Into::::into) .await?; diff --git a/iceberg-rust-spec/src/spec/values.rs b/iceberg-rust-spec/src/spec/values.rs index 81f4f7d1..f633c520 100644 --- a/iceberg-rust-spec/src/spec/values.rs +++ b/iceberg-rust-spec/src/spec/values.rs @@ -121,12 +121,7 @@ impl From for ByteBuf { Value::UUID(val) => ByteBuf::from(val.as_u128().to_be_bytes()), Value::Fixed(_, val) => ByteBuf::from(val), Value::Binary(val) => ByteBuf::from(val), - Value::Decimal(val) => { - // rust_decimal mantissa is 96 bits - // so we can remove the first 32 bits of the i128 representation - let bytes = val.mantissa().to_be_bytes()[4..].to_vec(); - ByteBuf::from(bytes) - } + Value::Decimal(val) => ByteBuf::from(i128_to_minimal_signed_be_bytes(val.mantissa())), _ => todo!(), } } @@ -755,6 +750,24 @@ pub fn sign_extend_be(b: &[u8]) -> [u8; N] { result } +/// Encodes a signed integer using the minimal big-endian two's-complement +/// representation required for Iceberg decimal values. +pub fn i128_to_minimal_signed_be_bytes(value: i128) -> Vec { + let bytes = value.to_be_bytes(); + let sign_extension = if value < 0 { 0xff } else { 0x00 }; + let sign_bit = sign_extension & 0x80; + let mut start = 0; + + while start < bytes.len() - 1 + && bytes[start] == sign_extension + && (bytes[start + 1] & 0x80) == sign_bit + { + start += 1; + } + + bytes[start..].to_vec() +} + impl From<&Value> for JsonValue { fn from(value: &Value) -> Self { match value { @@ -1364,10 +1377,7 @@ mod tests { // Test serialization let byte_buf: ByteBuf = value.clone().into(); let bytes: Vec = byte_buf.into_vec(); - assert_eq!( - bytes, - vec![0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 160u8, 16u8, 94u8] - ); + assert_eq!(bytes, vec![0u8, 160u8, 16u8, 94u8]); // Test deserialization check_avro_bytes_serde( @@ -1380,6 +1390,16 @@ mod tests { ); } + #[test] + fn decimal_bytes_are_minimal_signed_big_endian() { + assert_eq!(i128_to_minimal_signed_be_bytes(0), vec![0]); + assert_eq!(i128_to_minimal_signed_be_bytes(2), vec![2]); + assert_eq!(i128_to_minimal_signed_be_bytes(127), vec![127]); + assert_eq!(i128_to_minimal_signed_be_bytes(128), vec![0, 128]); + assert_eq!(i128_to_minimal_signed_be_bytes(-1), vec![255]); + assert_eq!(i128_to_minimal_signed_be_bytes(-129), vec![255, 127]); + } + #[test] fn test_transform_identity() { let value = Value::Int(42); diff --git a/iceberg-rust/src/file_format/parquet.rs b/iceberg-rust/src/file_format/parquet.rs index d1db572c..a6c028eb 100644 --- a/iceberg-rust/src/file_format/parquet.rs +++ b/iceberg-rust/src/file_format/parquet.rs @@ -13,12 +13,12 @@ use iceberg_rust_spec::{ manifest::{AvroMap, Content, DataFile, FileFormat}, partition::PartitionField, schema::Schema, - types::Type, - values::{Struct, Value}, + types::{PrimitiveType, Type}, + values::{i128_to_minimal_signed_be_bytes, Struct, Value}, }, table_metadata::WRITE_METADATA_METRICS_DISTINCT_COUNTS_ENABLED, }; -use parquet::file::{metadata::ParquetMetaData, writer::TrackedWrite}; +use parquet::file::{metadata::ParquetMetaData, statistics::Statistics, writer::TrackedWrite}; use thrift::protocol::{TCompactOutputProtocol, TSerializable}; use tracing::instrument; @@ -68,10 +68,12 @@ pub fn parquet_to_datafile( for row_group in file_metadata.row_groups() { for column in row_group.columns() { let column_name = column.column_descr().name(); - let id = schema + let field = schema .get_name(&column.column_path().parts().join(".")) - .ok_or_else(|| Error::Schema(column_name.to_string(), "".to_string()))? - .id; + .ok_or_else(|| Error::Schema(column_name.to_string(), "".to_string()))?; + let id = field.id; + let data_type = &field.field_type; + column_sizes .entry(id) .and_modify(|x| *x += column.compressed_size()) @@ -89,20 +91,12 @@ pub fn parquet_to_datafile( .or_insert(null_count as i64); } - let data_type = &schema - .fields() - .get(id as usize) - .ok_or_else(|| Error::Schema(column_name.to_string(), "".to_string()))? - .field_type; - if let Some(distinct_counts) = distinct_counts.as_mut() { - if let (Some(distinct_count), Some(min_bytes), Some(max_bytes)) = ( + if let (Some(distinct_count), Some(min), Some(max)) = ( statistics.distinct_count_opt(), - statistics.min_bytes_opt(), - statistics.max_bytes_opt(), + statistic_value(statistics, StatisticBound::Min, data_type)?, + statistic_value(statistics, StatisticBound::Max, data_type)?, ) { - let min = Value::try_from_bytes(min_bytes, data_type)?; - let max = Value::try_from_bytes(max_bytes, data_type)?; let current_min = lower_bounds.get(&id); let current_max = upper_bounds.get(&id); match (min, max, current_min, current_max) { @@ -150,115 +144,119 @@ pub fn parquet_to_datafile( } } - if let Some(min_bytes) = statistics.min_bytes_opt() { - if let Type::Primitive(_) = &data_type { - let new = Value::try_from_bytes(min_bytes, data_type)?; - match lower_bounds.entry(id) { - Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - match (&entry, &new) { - (Value::Int(current), Value::Int(new_val)) - if *current > *new_val => - { + if let Some(new) = statistic_value(statistics, StatisticBound::Min, data_type)? { + match lower_bounds.entry(id) { + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + match (&entry, &new) { + (Value::Int(current), Value::Int(new_val)) => { + if *current > *new_val { *entry = new } - (Value::LongInt(current), Value::LongInt(new_val)) - if *current > *new_val => - { + } + (Value::LongInt(current), Value::LongInt(new_val)) => { + if *current > *new_val { *entry = new } - (Value::Float(current), Value::Float(new_val)) - if *current > *new_val => - { + } + (Value::Float(current), Value::Float(new_val)) => { + if *current > *new_val { *entry = new } - (Value::Double(current), Value::Double(new_val)) - if *current > *new_val => - { + } + (Value::Double(current), Value::Double(new_val)) => { + if *current > *new_val { *entry = new } - (Value::Date(current), Value::Date(new_val)) - if *current > *new_val => - { + } + (Value::Decimal(current), Value::Decimal(new_val)) => { + if *current > *new_val { *entry = new } - (Value::Time(current), Value::Time(new_val)) - if *current > *new_val => - { + } + (Value::Date(current), Value::Date(new_val)) => { + if *current > *new_val { *entry = new } - (Value::Timestamp(current), Value::Timestamp(new_val)) - if *current > *new_val => - { + } + (Value::Time(current), Value::Time(new_val)) => { + if *current > *new_val { *entry = new } - (Value::TimestampTZ(current), Value::TimestampTZ(new_val)) - if *current > *new_val => - { + } + (Value::Timestamp(current), Value::Timestamp(new_val)) => { + if *current > *new_val { *entry = new } - _ => (), } + (Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => { + if *current > *new_val { + *entry = new + } + } + _ => (), } - Entry::Vacant(entry) => { - entry.insert(new); - } + } + Entry::Vacant(entry) => { + entry.insert(new); } } } - if let Some(max_bytes) = statistics.max_bytes_opt() { - if let Type::Primitive(_) = &data_type { - let new = Value::try_from_bytes(max_bytes, data_type)?; - match upper_bounds.entry(id) { - Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - match (&entry, &new) { - (Value::Int(current), Value::Int(new_val)) - if *current < *new_val => - { + if let Some(new) = statistic_value(statistics, StatisticBound::Max, data_type)? { + match upper_bounds.entry(id) { + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + match (&entry, &new) { + (Value::Int(current), Value::Int(new_val)) => { + if *current < *new_val { *entry = new } - (Value::LongInt(current), Value::LongInt(new_val)) - if *current < *new_val => - { + } + (Value::LongInt(current), Value::LongInt(new_val)) => { + if *current < *new_val { *entry = new } - (Value::Float(current), Value::Float(new_val)) - if *current < *new_val => - { + } + (Value::Float(current), Value::Float(new_val)) => { + if *current < *new_val { *entry = new } - (Value::Double(current), Value::Double(new_val)) - if *current < *new_val => - { + } + (Value::Double(current), Value::Double(new_val)) => { + if *current < *new_val { *entry = new } - (Value::Date(current), Value::Date(new_val)) - if *current < *new_val => - { + } + (Value::Decimal(current), Value::Decimal(new_val)) => { + if *current < *new_val { *entry = new } - (Value::Time(current), Value::Time(new_val)) - if *current < *new_val => - { + } + (Value::Date(current), Value::Date(new_val)) => { + if *current < *new_val { *entry = new } - (Value::Timestamp(current), Value::Timestamp(new_val)) - if *current < *new_val => - { + } + (Value::Time(current), Value::Time(new_val)) => { + if *current < *new_val { *entry = new } - (Value::TimestampTZ(current), Value::TimestampTZ(new_val)) - if *current < *new_val => - { + } + (Value::Timestamp(current), Value::Timestamp(new_val)) => { + if *current < *new_val { *entry = new } - _ => (), } + (Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => { + if *current < *new_val { + *entry = new + } + } + _ => (), } - Entry::Vacant(entry) => { - entry.insert(new); - } + } + Entry::Vacant(entry) => { + entry.insert(new); } } } @@ -269,13 +267,12 @@ pub fn parquet_to_datafile( let partition_field = partition_fields .get(column_name) .ok_or_else(|| Error::InvalidFormat("transform".to_string()))?; - if let (Some(min_bytes), Some(max_bytes)) = - (statistics.min_bytes_opt(), statistics.max_bytes_opt()) - { - let min = Value::try_from_bytes(min_bytes, data_type)? - .transform(partition_field.transform())?; - let max = Value::try_from_bytes(max_bytes, data_type)? - .transform(partition_field.transform())?; + if let (Some(min), Some(max)) = ( + statistic_value(statistics, StatisticBound::Min, data_type)?, + statistic_value(statistics, StatisticBound::Max, data_type)?, + ) { + let min = min.transform(partition_field.transform())?; + let max = max.transform(partition_field.transform())?; if min == max { *partition_value = Some(min) } else { @@ -318,6 +315,66 @@ pub fn parquet_to_datafile( Ok(content) } +#[derive(Clone, Copy)] +enum StatisticBound { + Min, + Max, +} + +fn statistic_value( + statistics: &Statistics, + bound: StatisticBound, + data_type: &Type, +) -> Result, Error> { + match data_type { + Type::Primitive(PrimitiveType::Decimal { .. }) => match statistics { + Statistics::Int32(stats) => { + let value = match bound { + StatisticBound::Min => stats.min_opt(), + StatisticBound::Max => stats.max_opt(), + }; + Ok(value + .map(|value| { + let bytes = i128_to_minimal_signed_be_bytes(i128::from(*value)); + Value::try_from_bytes(&bytes, data_type) + }) + .transpose()?) + } + Statistics::Int64(stats) => { + let value = match bound { + StatisticBound::Min => stats.min_opt(), + StatisticBound::Max => stats.max_opt(), + }; + Ok(value + .map(|value| { + let bytes = i128_to_minimal_signed_be_bytes(i128::from(*value)); + Value::try_from_bytes(&bytes, data_type) + }) + .transpose()?) + } + _ => { + let bytes = match bound { + StatisticBound::Min => statistics.min_bytes_opt(), + StatisticBound::Max => statistics.max_bytes_opt(), + }; + Ok(bytes + .map(|bytes| Value::try_from_bytes(bytes, data_type)) + .transpose()?) + } + }, + Type::Primitive(_) => { + let bytes = match bound { + StatisticBound::Min => statistics.min_bytes_opt(), + StatisticBound::Max => statistics.max_bytes_opt(), + }; + Ok(bytes + .map(|bytes| Value::try_from_bytes(bytes, data_type)) + .transpose()?) + } + _ => Ok(None), + } +} + /// Get parquet metadata size pub fn thrift_size(metadata: &T) -> Result { let mut buffer = TrackedWrite::new(Vec::::new()); @@ -435,3 +492,54 @@ where (outside_overlap + new_in_overlap).round() as i64 } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn decimal_int32_statistics_are_decoded_as_logical_values() { + let statistics = Statistics::int32(Some(2), Some(128), None, None, false); + let data_type = Type::Primitive(PrimitiveType::Decimal { + precision: 5, + scale: 0, + }); + + let min = statistic_value(&statistics, StatisticBound::Min, &data_type) + .unwrap() + .unwrap(); + let max = statistic_value(&statistics, StatisticBound::Max, &data_type) + .unwrap() + .unwrap(); + + assert_decimal(min, 2, 0); + assert_decimal(max, 128, 0); + } + + #[test] + fn decimal_int64_statistics_are_decoded_as_logical_values() { + let statistics = Statistics::int64(Some(-129), Some(10_489_950), None, None, false); + let data_type = Type::Primitive(PrimitiveType::Decimal { + precision: 18, + scale: 2, + }); + + let min = statistic_value(&statistics, StatisticBound::Min, &data_type) + .unwrap() + .unwrap(); + let max = statistic_value(&statistics, StatisticBound::Max, &data_type) + .unwrap() + .unwrap(); + + assert_decimal(min, -129, 2); + assert_decimal(max, 10_489_950, 2); + } + + fn assert_decimal(value: Value, mantissa: i128, scale: u32) { + let Value::Decimal(decimal) = value else { + panic!("expected decimal value"); + }; + assert_eq!(decimal.mantissa(), mantissa); + assert_eq!(decimal.scale(), scale); + } +}