Skip to content
Open
69 changes: 47 additions & 22 deletions crates/catalog/loader/tests/schema_update_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ mod common;
use std::collections::HashMap;

use common::{CatalogKind, cleanup_namespace_dyn, load_catalog};
use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type};
use iceberg::transaction::{AddColumn, ApplyTransactionAction, Transaction};
use iceberg::spec::{
AddColumn, DeleteColumn, NestedField, PrimitiveType, Schema, StructType, Type,
};
use iceberg::transaction::Transaction;
use iceberg::{ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent};
use iceberg_test_utils::normalize_test_name_with_parts;
use rstest::rstest;
Expand All @@ -52,6 +54,8 @@ fn base_schema() -> Schema {
#[case::memory_catalog(CatalogKind::Memory)]
#[tokio::test]
async fn test_catalog_schema_add_column(#[case] kind: CatalogKind) -> Result<()> {
use iceberg::transaction::ApplyTransactionAction;

let Some(harness) = load_catalog(kind).await else {
return Ok(());
};
Expand Down Expand Up @@ -79,10 +83,12 @@ async fn test_catalog_schema_add_column(#[case] kind: CatalogKind) -> Result<()>
let tx = Transaction::new(&table);
let tx = tx
.update_schema()
.add_column(AddColumn::optional(
"a",
Type::Primitive(PrimitiveType::Int),
))
.add(
AddColumn::builder()
.name("a")
.r#type(PrimitiveType::Int.into())
.build(),
)
.apply(tx)?;
let updated = tx.commit(catalog.as_ref()).await?;

Expand All @@ -105,6 +111,8 @@ async fn test_catalog_schema_add_column(#[case] kind: CatalogKind) -> Result<()>
#[case::memory_catalog(CatalogKind::Memory)]
#[tokio::test]
async fn test_catalog_schema_add_nested_and_delete_column(#[case] kind: CatalogKind) -> Result<()> {
use iceberg::transaction::ApplyTransactionAction;

let Some(harness) = load_catalog(kind).await else {
return Ok(());
};
Expand Down Expand Up @@ -136,27 +144,32 @@ async fn test_catalog_schema_add_nested_and_delete_column(#[case] kind: CatalogK
let tx = Transaction::new(&table);
let tx = tx
.update_schema()
.add_column(AddColumn::optional(
"info",
Type::Struct(StructType::new(vec![
NestedField::optional(0, "city", Type::Primitive(PrimitiveType::String)).into(),
])),
))
.add(
AddColumn::builder()
.name("info")
.r#type(
StructType::new(vec![
NestedField::optional(0, "city", PrimitiveType::String.into()).into(),
])
.into(),
)
.build(),
)
.apply(tx)?;
let table = tx.commit(catalog.as_ref()).await?;

// Second transaction: add a sub-field to the nested struct and delete a top-level column.
let tx = Transaction::new(&table);
let tx = tx
.update_schema()
.add_column(
.add(
AddColumn::builder()
.name("zip")
.field_type(Type::Primitive(PrimitiveType::String))
.parent("info")
.r#type(PrimitiveType::String.into())
.parent("info".into())
.build(),
)
.delete_column("baz")
.delete(DeleteColumn::new("baz"))
.apply(tx)?;
let table = tx.commit(catalog.as_ref()).await?;

Expand All @@ -179,6 +192,8 @@ async fn test_catalog_schema_add_nested_and_delete_column(#[case] kind: CatalogK
#[case::memory_catalog(CatalogKind::Memory)]
#[tokio::test]
async fn test_catalog_schema_delete_invalid_column_errors(#[case] kind: CatalogKind) -> Result<()> {
use iceberg::transaction::ApplyTransactionAction;

let Some(harness) = load_catalog(kind).await else {
return Ok(());
};
Expand Down Expand Up @@ -208,13 +223,19 @@ async fn test_catalog_schema_delete_invalid_column_errors(#[case] kind: CatalogK

// Deleting an identifier field must fail.
let tx = Transaction::new(&table);
let tx = tx.update_schema().delete_column("bar").apply(tx)?;
let tx = tx
.update_schema()
.delete(DeleteColumn::new("bar"))
.apply(tx)?;
let err = tx.commit(catalog.as_ref()).await.unwrap_err();
assert_eq!(err.kind(), ErrorKind::PreconditionFailed);

// Deleting a nonexistent field must fail.
let tx = Transaction::new(&table);
let tx = tx.update_schema().delete_column("nonexistent").apply(tx)?;
let tx = tx
.update_schema()
.delete(DeleteColumn::new("nonexistent"))
.apply(tx)?;
let err = tx.commit(catalog.as_ref()).await.unwrap_err();
assert_eq!(err.kind(), ErrorKind::PreconditionFailed);

Expand All @@ -233,6 +254,8 @@ async fn test_catalog_schema_delete_invalid_column_errors(#[case] kind: CatalogK
async fn test_catalog_schema_update_persisted_after_reload(
#[case] kind: CatalogKind,
) -> Result<()> {
use iceberg::transaction::ApplyTransactionAction;

let Some(harness) = load_catalog(kind).await else {
return Ok(());
};
Expand Down Expand Up @@ -264,10 +287,12 @@ async fn test_catalog_schema_update_persisted_after_reload(
let tx = Transaction::new(&table);
let tx = tx
.update_schema()
.add_column(AddColumn::optional(
"new_field",
Type::Primitive(PrimitiveType::Long),
))
.add(
AddColumn::builder()
.name("new_field")
.r#type(PrimitiveType::Long.into())
.build(),
)
.apply(tx)?;
tx.commit(catalog.as_ref()).await?;

Expand Down
59 changes: 53 additions & 6 deletions crates/iceberg/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,23 @@ impl<'de> iceberg::spec::PrimitiveType
pub fn iceberg::spec::PrimitiveType::deserialize<__D>(__deserializer: __D) -> core::result::Result<iceberg::spec::PrimitiveType, <__D as serde_core::de::Deserializer>::Error> where __D: serde_core::de::Deserializer<'de>
impl<'de> serde_core::de::Deserialize<'de> for iceberg::spec::PrimitiveType
pub fn iceberg::spec::PrimitiveType::deserialize<D>(deserializer: D) -> core::result::Result<Self, <D as serde_core::de::Deserializer>::Error> where D: serde_core::de::Deserializer<'de>
pub enum iceberg::spec::SchemaOperation
pub iceberg::spec::SchemaOperation::Add(iceberg::spec::AddColumn)
pub iceberg::spec::SchemaOperation::AllowIncompatibleChanges
pub iceberg::spec::SchemaOperation::Delete(iceberg::spec::DeleteColumn)
pub iceberg::spec::SchemaOperation::Move(iceberg::spec::MoveColumn)
pub iceberg::spec::SchemaOperation::Rename(iceberg::spec::RenameColumn)
pub iceberg::spec::SchemaOperation::Update(iceberg::spec::UpdateColumn)
impl core::convert::From<iceberg::spec::AddColumn> for iceberg::spec::SchemaOperation
pub fn iceberg::spec::SchemaOperation::from(add: iceberg::spec::AddColumn) -> Self
impl core::convert::From<iceberg::spec::DeleteColumn> for iceberg::spec::SchemaOperation
pub fn iceberg::spec::SchemaOperation::from(delete: iceberg::spec::DeleteColumn) -> Self
impl core::convert::From<iceberg::spec::MoveColumn> for iceberg::spec::SchemaOperation
pub fn iceberg::spec::SchemaOperation::from(move_col: iceberg::spec::MoveColumn) -> Self
impl core::convert::From<iceberg::spec::RenameColumn> for iceberg::spec::SchemaOperation
pub fn iceberg::spec::SchemaOperation::from(rename: iceberg::spec::RenameColumn) -> Self
impl core::convert::From<iceberg::spec::UpdateColumn> for iceberg::spec::SchemaOperation
pub fn iceberg::spec::SchemaOperation::from(update: iceberg::spec::UpdateColumn) -> Self
pub enum iceberg::spec::SnapshotRetention
pub iceberg::spec::SnapshotRetention::Branch
pub iceberg::spec::SnapshotRetention::Branch::max_ref_age_ms: core::option::Option<i64>
Expand Down Expand Up @@ -1689,6 +1706,8 @@ pub fn iceberg::spec::Type::decimal(precision: u32, scale: u32) -> iceberg::Resu
pub fn iceberg::spec::Type::decimal_max_precision(num_bytes: u32) -> iceberg::Result<u32>
pub fn iceberg::spec::Type::decimal_required_bytes(precision: u32) -> iceberg::Result<u32>
pub fn iceberg::spec::Type::is_floating_type(&self) -> bool
pub fn iceberg::spec::Type::is_list(&self) -> bool
pub fn iceberg::spec::Type::is_map(&self) -> bool
pub fn iceberg::spec::Type::is_nested(&self) -> bool
pub fn iceberg::spec::Type::is_primitive(&self) -> bool
pub fn iceberg::spec::Type::is_struct(&self) -> bool
Expand Down Expand Up @@ -1752,6 +1771,11 @@ impl serde_core::ser::Serialize for iceberg::spec::ViewRepresentation
pub fn iceberg::spec::ViewRepresentation::serialize<__S>(&self, __serializer: __S) -> core::result::Result<<__S as serde_core::ser::Serializer>::Ok, <__S as serde_core::ser::Serializer>::Error> where __S: serde_core::ser::Serializer
impl<'de> serde_core::de::Deserialize<'de> for iceberg::spec::ViewRepresentation
pub fn iceberg::spec::ViewRepresentation::deserialize<__D>(__deserializer: __D) -> core::result::Result<Self, <__D as serde_core::de::Deserializer>::Error> where __D: serde_core::de::Deserializer<'de>
pub struct iceberg::spec::AddColumn
impl core::convert::From<iceberg::spec::AddColumn> for iceberg::spec::SchemaOperation
pub fn iceberg::spec::SchemaOperation::from(add: iceberg::spec::AddColumn) -> Self
impl iceberg::spec::AddColumn
pub fn iceberg::spec::AddColumn::builder() -> AddColumnBuilder<((), (), (), (), (), ())>
pub struct iceberg::spec::BlobMetadata
pub iceberg::spec::BlobMetadata::fields: alloc::vec::Vec<i32>
pub iceberg::spec::BlobMetadata::properties: std::collections::hash::map::HashMap<alloc::string::String, alloc::string::String>
Expand Down Expand Up @@ -1887,6 +1911,11 @@ impl serde_core::ser::Serialize for iceberg::spec::Datum
pub fn iceberg::spec::Datum::serialize<S: serde_core::ser::Serializer>(&self, serializer: S) -> core::result::Result<<S as serde_core::ser::Serializer>::Ok, <S as serde_core::ser::Serializer>::Error>
impl<'de> serde_core::de::Deserialize<'de> for iceberg::spec::Datum
pub fn iceberg::spec::Datum::deserialize<D: serde_core::de::Deserializer<'de>>(deserializer: D) -> core::result::Result<Self, <D as serde_core::de::Deserializer>::Error>
pub struct iceberg::spec::DeleteColumn
impl iceberg::spec::DeleteColumn
pub fn iceberg::spec::DeleteColumn::new(name: impl core::convert::Into<alloc::string::String>) -> Self
impl core::convert::From<iceberg::spec::DeleteColumn> for iceberg::spec::SchemaOperation
pub fn iceberg::spec::SchemaOperation::from(delete: iceberg::spec::DeleteColumn) -> Self
pub struct iceberg::spec::EncryptedKey
impl iceberg::spec::EncryptedKey
pub fn iceberg::spec::EncryptedKey::encrypted_by_id(&self) -> core::option::Option<&str>
Expand Down Expand Up @@ -1932,6 +1961,8 @@ pub struct iceberg::spec::ListType
pub iceberg::spec::ListType::element_field: iceberg::spec::NestedFieldRef
impl iceberg::spec::ListType
pub fn iceberg::spec::ListType::new(element_field: iceberg::spec::NestedFieldRef) -> Self
pub fn iceberg::spec::ListType::optional(element_id: i32, element_type: iceberg::spec::Type) -> Self
pub fn iceberg::spec::ListType::required(element_id: i32, element_type: iceberg::spec::Type) -> Self
impl core::clone::Clone for iceberg::spec::ListType
pub fn iceberg::spec::ListType::clone(&self) -> iceberg::spec::ListType
impl core::cmp::Eq for iceberg::spec::ListType
Expand Down Expand Up @@ -2157,6 +2188,13 @@ impl serde_core::ser::Serialize for iceberg::spec::MetadataLog
pub fn iceberg::spec::MetadataLog::serialize<__S>(&self, __serializer: __S) -> core::result::Result<<__S as serde_core::ser::Serializer>::Ok, <__S as serde_core::ser::Serializer>::Error> where __S: serde_core::ser::Serializer
impl<'de> serde_core::de::Deserialize<'de> for iceberg::spec::MetadataLog
pub fn iceberg::spec::MetadataLog::deserialize<__D>(__deserializer: __D) -> core::result::Result<Self, <__D as serde_core::de::Deserializer>::Error> where __D: serde_core::de::Deserializer<'de>
pub struct iceberg::spec::MoveColumn
impl iceberg::spec::MoveColumn
pub fn iceberg::spec::MoveColumn::after(name: impl core::convert::Into<alloc::string::String>, reference: impl core::convert::Into<alloc::string::String>) -> Self
pub fn iceberg::spec::MoveColumn::before(name: impl core::convert::Into<alloc::string::String>, reference: impl core::convert::Into<alloc::string::String>) -> Self
pub fn iceberg::spec::MoveColumn::first(name: impl core::convert::Into<alloc::string::String>) -> Self
impl core::convert::From<iceberg::spec::MoveColumn> for iceberg::spec::SchemaOperation
pub fn iceberg::spec::SchemaOperation::from(move_col: iceberg::spec::MoveColumn) -> Self
pub struct iceberg::spec::NameMapping
impl iceberg::spec::NameMapping
pub fn iceberg::spec::NameMapping::fields(&self) -> &[iceberg::spec::MappedField]
Expand Down Expand Up @@ -2297,6 +2335,11 @@ impl serde_core::ser::Serialize for iceberg::spec::PartitionStatisticsFile
pub fn iceberg::spec::PartitionStatisticsFile::serialize<__S>(&self, __serializer: __S) -> core::result::Result<<__S as serde_core::ser::Serializer>::Ok, <__S as serde_core::ser::Serializer>::Error> where __S: serde_core::ser::Serializer
impl<'de> serde_core::de::Deserialize<'de> for iceberg::spec::PartitionStatisticsFile
pub fn iceberg::spec::PartitionStatisticsFile::deserialize<__D>(__deserializer: __D) -> core::result::Result<Self, <__D as serde_core::de::Deserializer>::Error> where __D: serde_core::de::Deserializer<'de>
pub struct iceberg::spec::RenameColumn
impl core::convert::From<iceberg::spec::RenameColumn> for iceberg::spec::SchemaOperation
pub fn iceberg::spec::SchemaOperation::from(rename: iceberg::spec::RenameColumn) -> Self
impl iceberg::spec::RenameColumn
pub fn iceberg::spec::RenameColumn::builder() -> RenameColumnBuilder<((), ())>
pub struct iceberg::spec::Schema
impl iceberg::spec::Schema
pub fn iceberg::spec::Schema::accessor_by_field_id(&self, field_id: i32) -> core::option::Option<alloc::sync::Arc<iceberg::expr::accessor::StructAccessor>>
Expand Down Expand Up @@ -2808,6 +2851,14 @@ impl core::default::Default for iceberg::spec::UnboundPartitionSpecBuilder
pub fn iceberg::spec::UnboundPartitionSpecBuilder::default() -> iceberg::spec::UnboundPartitionSpecBuilder
impl core::fmt::Debug for iceberg::spec::UnboundPartitionSpecBuilder
pub fn iceberg::spec::UnboundPartitionSpecBuilder::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
pub struct iceberg::spec::UpdateColumn
impl iceberg::spec::UpdateColumn
pub fn iceberg::spec::UpdateColumn::new_default_value(name: impl core::convert::Into<alloc::string::String>, new_default_value: core::option::Option<iceberg::spec::Literal>) -> Self
pub fn iceberg::spec::UpdateColumn::new_doc(name: impl core::convert::Into<alloc::string::String>, new_doc: core::option::Option<alloc::string::String>) -> Self
pub fn iceberg::spec::UpdateColumn::new_required(name: impl core::convert::Into<alloc::string::String>, is_required: bool) -> Self
pub fn iceberg::spec::UpdateColumn::new_type(name: impl core::convert::Into<alloc::string::String>, new_type: iceberg::spec::PrimitiveType) -> Self
impl core::convert::From<iceberg::spec::UpdateColumn> for iceberg::spec::SchemaOperation
pub fn iceberg::spec::SchemaOperation::from(update: iceberg::spec::UpdateColumn) -> Self
pub struct iceberg::spec::ViewMetadata
impl iceberg::spec::ViewMetadata
pub fn iceberg::spec::ViewMetadata::current_schema(&self) -> &iceberg::spec::SchemaRef
Expand Down Expand Up @@ -2981,6 +3032,7 @@ pub fn iceberg::spec::SchemaWithPartnerVisitor::struct(&mut self, struct: &icebe
pub fn iceberg::spec::deserialize_data_file_from_json(json: &str, partition_spec_id: i32, partition_type: &iceberg::spec::StructType, schema: &iceberg::spec::Schema) -> iceberg::Result<iceberg::spec::DataFile>
pub fn iceberg::spec::prune_columns(schema: &iceberg::spec::Schema, selected: impl core::iter::traits::collect::IntoIterator<Item = i32>, select_full_types: bool) -> iceberg::Result<iceberg::spec::Type>
pub fn iceberg::spec::read_data_files_from_avro<R: std::io::Read>(reader: &mut R, schema: &iceberg::spec::Schema, partition_spec_id: i32, partition_type: &iceberg::spec::StructType, version: iceberg::spec::FormatVersion) -> iceberg::Result<alloc::vec::Vec<iceberg::spec::DataFile>>
pub fn iceberg::spec::schema_update(schema: iceberg::spec::SchemaRef, operations: &[iceberg::spec::SchemaOperation]) -> iceberg::Result<iceberg::spec::SchemaRef>
pub fn iceberg::spec::serialize_data_file_to_json(data_file: iceberg::spec::DataFile, partition_type: &iceberg::spec::StructType, format_version: iceberg::spec::FormatVersion) -> iceberg::Result<alloc::string::String>
pub fn iceberg::spec::visit_schema<V: iceberg::spec::SchemaVisitor>(schema: &iceberg::spec::Schema, visitor: &mut V) -> iceberg::Result<<V as iceberg::spec::SchemaVisitor>::T>
pub fn iceberg::spec::visit_schema_with_partner<P, V: iceberg::spec::SchemaWithPartnerVisitor<P>, A: iceberg::spec::PartnerAccessor<P>>(schema: &iceberg::spec::Schema, partner: &P, visitor: &mut V, accessor: &A) -> iceberg::Result<<V as iceberg::spec::SchemaWithPartnerVisitor>::T>
Expand Down Expand Up @@ -3052,12 +3104,6 @@ impl iceberg::transaction::ActionCommit
pub fn iceberg::transaction::ActionCommit::new(updates: alloc::vec::Vec<iceberg::TableUpdate>, requirements: alloc::vec::Vec<iceberg::TableRequirement>) -> Self
pub fn iceberg::transaction::ActionCommit::take_requirements(&mut self) -> alloc::vec::Vec<iceberg::TableRequirement>
pub fn iceberg::transaction::ActionCommit::take_updates(&mut self) -> alloc::vec::Vec<iceberg::TableUpdate>
pub struct iceberg::transaction::AddColumn
impl iceberg::transaction::AddColumn
pub fn iceberg::transaction::AddColumn::optional(name: impl alloc::string::ToString, field_type: iceberg::spec::Type) -> Self
pub fn iceberg::transaction::AddColumn::required(name: impl alloc::string::ToString, field_type: iceberg::spec::Type, initial_default: iceberg::spec::Literal) -> Self
impl iceberg::transaction::AddColumn
pub fn iceberg::transaction::AddColumn::builder() -> AddColumnBuilder<((), (), (), (), (), (), ())>
pub struct iceberg::transaction::Transaction
impl iceberg::transaction::Transaction
pub async fn iceberg::transaction::Transaction::commit(self, catalog: &dyn iceberg::Catalog) -> iceberg::Result<iceberg::table::Table>
Expand Down Expand Up @@ -3273,6 +3319,7 @@ impl<B, L, F> iceberg::writer::IcebergWriterBuilder for iceberg::writer::base_wr
pub type iceberg::writer::base_writer::equality_delete_writer::EqualityDeleteFileWriterBuilder<B, L, F>::R = iceberg::writer::base_writer::equality_delete_writer::EqualityDeleteFileWriter<B, L, F>
pub fn iceberg::writer::base_writer::equality_delete_writer::EqualityDeleteFileWriterBuilder<B, L, F>::build<'life0, 'async_trait>(&'life0 self, partition_key: core::option::Option<iceberg::spec::PartitionKey>) -> core::pin::Pin<alloc::boxed::Box<(dyn core::future::future::Future<Output = iceberg::Result<Self::R>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait
pub macro iceberg::ensure_data_valid!
pub macro iceberg::ensure_precondition!
#[non_exhaustive] pub enum iceberg::ErrorKind
pub iceberg::ErrorKind::CatalogCommitConflicts
pub iceberg::ErrorKind::DataInvalid
Expand Down
10 changes: 10 additions & 0 deletions crates/iceberg/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,16 @@ macro_rules! ensure_data_valid {
};
}

/// Helper macro to check preconditions.
#[macro_export]
macro_rules! ensure_precondition {
($cond: expr, $fmt: literal, $($arg:tt)*) => {
if !$cond {
return Err($crate::error::Error::new($crate::error::ErrorKind::PreconditionFailed, format!($fmt, $($arg)*)))
}
};
}

#[cfg(test)]
mod tests {
use anyhow::anyhow;
Expand Down
Loading
Loading