Skip to content

Commit d3479a2

Browse files
committed
feat: add update_schema action (cherry-picked from apache/iceberg-rust PR apache#2120)
Adds Transaction::update_schema() for programmatic schema evolution. Cherry-picked onto v0.9.0 tag from tomighita's upstream PR.
1 parent 7ef4063 commit d3479a2

4 files changed

Lines changed: 1328 additions & 0 deletions

File tree

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Catalog tests for schema evolution with `MemoryCatalog`.
19+
20+
use std::collections::HashMap;
21+
22+
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
23+
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
24+
use iceberg::transaction::{AddColumn, ApplyTransactionAction, Transaction};
25+
use iceberg::{Catalog, CatalogBuilder, ErrorKind, NamespaceIdent, TableCreation, TableIdent};
26+
use tempfile::TempDir;
27+
28+
fn base_schema() -> Schema {
29+
Schema::builder()
30+
.with_fields(vec![
31+
NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
32+
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
33+
NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
34+
])
35+
.with_identifier_field_ids(vec![2])
36+
.build()
37+
.unwrap()
38+
}
39+
40+
async fn new_catalog() -> (iceberg::MemoryCatalog, TempDir) {
41+
let warehouse = TempDir::new().unwrap();
42+
let catalog = MemoryCatalogBuilder::default()
43+
.load(
44+
"memory",
45+
HashMap::from([(
46+
MEMORY_CATALOG_WAREHOUSE.to_string(),
47+
warehouse.path().to_string_lossy().to_string(),
48+
)]),
49+
)
50+
.await
51+
.unwrap();
52+
53+
(catalog, warehouse)
54+
}
55+
56+
async fn create_table(catalog: &iceberg::MemoryCatalog, table_name: &str) -> TableIdent {
57+
let ns = NamespaceIdent::new("schema_evolution".to_string());
58+
if catalog.get_namespace(&ns).await.is_err() {
59+
catalog.create_namespace(&ns, HashMap::new()).await.unwrap();
60+
}
61+
62+
let table_ident = TableIdent::new(ns.clone(), table_name.to_string());
63+
let _ = catalog.drop_table(&table_ident).await;
64+
65+
catalog
66+
.create_table(
67+
&ns,
68+
TableCreation::builder()
69+
.name(table_name.to_string())
70+
.schema(base_schema())
71+
.build(),
72+
)
73+
.await
74+
.unwrap();
75+
76+
table_ident
77+
}
78+
79+
#[tokio::test]
80+
async fn test_add_field_with_memory_catalog() {
81+
let (catalog, _warehouse) = new_catalog().await;
82+
let table_ident = create_table(&catalog, "t_add_field").await;
83+
let table = catalog.load_table(&table_ident).await.unwrap();
84+
85+
let tx = Transaction::new(&table);
86+
let tx = tx
87+
.update_schema()
88+
.add_column(AddColumn::optional(
89+
"a",
90+
Type::Primitive(PrimitiveType::Int),
91+
))
92+
.apply(tx)
93+
.unwrap();
94+
95+
let updated_table = tx.commit(&catalog).await.unwrap();
96+
let schema = updated_table.metadata().current_schema();
97+
98+
let field_a = schema.field_by_name("a").expect("a should exist");
99+
assert_eq!(field_a.id, 4);
100+
assert_eq!(*field_a.field_type, Type::Primitive(PrimitiveType::Int));
101+
}
102+
103+
#[tokio::test]
104+
async fn test_add_nested_and_delete_field_with_memory_catalog() {
105+
let (catalog, _warehouse) = new_catalog().await;
106+
let table_ident = create_table(&catalog, "t_add_nested_delete").await;
107+
let table = catalog.load_table(&table_ident).await.unwrap();
108+
109+
let tx = Transaction::new(&table);
110+
let tx = tx
111+
.update_schema()
112+
.add_column(AddColumn::optional(
113+
"info",
114+
Type::Struct(iceberg::spec::StructType::new(vec![
115+
NestedField::optional(0, "city", Type::Primitive(PrimitiveType::String)).into(),
116+
])),
117+
))
118+
.apply(tx)
119+
.unwrap();
120+
let table = tx.commit(&catalog).await.unwrap();
121+
122+
let tx = Transaction::new(&table);
123+
let tx = tx
124+
.update_schema()
125+
.add_column(
126+
AddColumn::builder()
127+
.name("zip")
128+
.field_type(Type::Primitive(PrimitiveType::String))
129+
.parent("info")
130+
.build(),
131+
)
132+
.delete_column("baz")
133+
.apply(tx)
134+
.unwrap();
135+
let table = tx.commit(&catalog).await.unwrap();
136+
137+
let schema = table.metadata().current_schema();
138+
assert!(schema.field_by_name("info").is_some());
139+
assert!(schema.field_by_name("info.city").is_some());
140+
assert!(schema.field_by_name("info.zip").is_some());
141+
assert!(schema.field_by_name("baz").is_none());
142+
}
143+
144+
#[tokio::test]
145+
async fn test_delete_identifier_and_missing_field_fail_with_memory_catalog() {
146+
let (catalog, _warehouse) = new_catalog().await;
147+
let table_ident = create_table(&catalog, "t_delete_failures").await;
148+
let table = catalog.load_table(&table_ident).await.unwrap();
149+
150+
let tx = Transaction::new(&table);
151+
let tx = tx.update_schema().delete_column("bar").apply(tx).unwrap();
152+
let err = tx.commit(&catalog).await.unwrap_err();
153+
assert_eq!(err.kind(), ErrorKind::PreconditionFailed);
154+
155+
let tx = Transaction::new(&table);
156+
let tx = tx
157+
.update_schema()
158+
.delete_column("nonexistent")
159+
.apply(tx)
160+
.unwrap();
161+
let err = tx.commit(&catalog).await.unwrap_err();
162+
assert_eq!(err.kind(), ErrorKind::PreconditionFailed);
163+
}

crates/iceberg/src/spec/schema/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ pub type SchemaId = i32;
5151
pub type SchemaRef = Arc<Schema>;
5252
/// Default schema id.
5353
pub const DEFAULT_SCHEMA_ID: SchemaId = 0;
54+
/// Delimiter for schema name, which denotes a nested struct.
55+
pub const SCHEMA_NAME_DELIMITER: &str = ".";
5456

5557
/// Defines schema in iceberg.
5658
#[derive(Debug, Serialize, Deserialize, Clone)]

crates/iceberg/src/transaction/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,15 @@ mod snapshot;
5858
mod sort_order;
5959
mod update_location;
6060
mod update_properties;
61+
mod update_schema;
6162
mod update_statistics;
6263
mod upgrade_format_version;
6364

6465
use std::sync::Arc;
6566
use std::time::Duration;
6667

6768
use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
69+
pub use update_schema::AddColumn;
6870

6971
use crate::error::Result;
7072
use crate::spec::TableProperties;
@@ -74,6 +76,7 @@ use crate::transaction::append::FastAppendAction;
7476
use crate::transaction::sort_order::ReplaceSortOrderAction;
7577
use crate::transaction::update_location::UpdateLocationAction;
7678
use crate::transaction::update_properties::UpdatePropertiesAction;
79+
use crate::transaction::update_schema::UpdateSchemaAction;
7780
use crate::transaction::update_statistics::UpdateStatisticsAction;
7881
use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
7982
use crate::{Catalog, TableCommit, TableRequirement, TableUpdate};
@@ -136,6 +139,11 @@ impl Transaction {
136139
UpdatePropertiesAction::new()
137140
}
138141

142+
/// Creates an update schema action.
143+
pub fn update_schema(&self) -> UpdateSchemaAction {
144+
UpdateSchemaAction::new()
145+
}
146+
139147
/// Creates a fast append action.
140148
pub fn fast_append(&self) -> FastAppendAction {
141149
FastAppendAction::new()

0 commit comments

Comments
 (0)