Skip to content

Commit 6486bd2

Browse files
feat: add AlterTable framework with add_column support (delta-io#2387)
## What changes are proposed in this pull request? Introduces the ALTER TABLE schema evolution framework and the first operation: `add_column` for adding top level columns on non Column Mapping tables. Support for adding column on Column Mapping tables is in PR5 on the stack. - `Snapshot::alter_table()` returns an `AlterTableTransactionBuilder` that uses a type-state pattern (`Ready` -> `Modifying`) to enforce at least one operation before `build()` - `SchemaOperation` enum and `apply_schema_operations()` validate and apply schema changes - `AlterTableTransaction` commits metadata-only changes (`data_change: false`) with the evolved schema - Added columns must be nullable (existing data files return NULL for the new column) ## How was this change tested? - Unit tests in `schema_evolution.rs`: add column success/failure, case-insensitive duplicate detection, `modify_field_at_path` helper (top-level, nested, siblings, error paths) - Integration tests in `kernel/tests/alter_table.rs`: end-to-end add column with schema reload, multiple columns in one commit, duplicate/non-nullable rejection - compile_fail doctests verifying `build()` is unavailable in `Ready` state and data-file operations are unavailable on `AlterTableTransaction`
1 parent 29145bb commit 6486bd2

11 files changed

Lines changed: 801 additions & 9 deletions

File tree

kernel/src/actions/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,11 @@ impl TryFrom<Format> for Scalar {
214214
}
215215

216216
// Serde derives are needed for CRC file deserialization (see `crc::reader`).
217+
//
218+
// TODO(#2446): `Metadata` stores the schema only as a JSON string. Callers that already hold
219+
// a parsed `SchemaRef` (e.g. CREATE TABLE) serialize into `schema_string` and then re-parse
220+
// downstream in `TableConfiguration::try_new` via `parse_schema()`. Caching the parsed schema
221+
// on `Metadata` would eliminate the round-trip.
217222
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
218223
#[serde(rename_all = "camelCase")]
219224
#[internal_api]
@@ -343,6 +348,18 @@ impl Metadata {
343348
TableProperties::from(self.configuration.iter())
344349
}
345350

351+
/// Returns a new Metadata with the schema replaced, preserving all other fields.
352+
///
353+
/// # Errors
354+
///
355+
/// Returns an error if schema serialization fails.
356+
pub(crate) fn with_schema(self, schema: SchemaRef) -> DeltaResult<Self> {
357+
Ok(Self {
358+
schema_string: serde_json::to_string(&schema)?,
359+
..self
360+
})
361+
}
362+
346363
#[cfg(test)]
347364
#[allow(clippy::too_many_arguments)]
348365
pub(crate) fn new_unchecked(

kernel/src/schema/validation.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! Schema validation utilities for Delta table creation.
1+
//! Schema validation utilities shared by table creation and schema evolution.
22
//!
33
//! Validates schemas per the Delta protocol specification.
44
@@ -14,15 +14,15 @@ use crate::{DeltaResult, Error};
1414
/// These characters have special meaning in Parquet schema syntax.
1515
const INVALID_PARQUET_CHARS: &[char] = &[' ', ',', ';', '{', '}', '(', ')', '\n', '\t', '='];
1616

17-
/// Validates a schema for table creation.
17+
/// Validates a schema for CREATE TABLE or ALTER TABLE.
1818
///
1919
/// Performs the following checks:
2020
/// 1. Schema is non-empty
2121
/// 2. No duplicate column names (case-insensitive, including nested fields)
2222
/// 3. Column names contain only valid characters
2323
/// 4. Rejects fields with `delta.invariants` metadata (SQL expression invariants are not supported
2424
/// by kernel; see `TableConfiguration::ensure_write_supported`)
25-
pub(crate) fn validate_schema_for_create(
25+
pub(crate) fn validate_schema(
2626
schema: &StructType,
2727
column_mapping_mode: ColumnMappingMode,
2828
) -> DeltaResult<()> {
@@ -371,7 +371,7 @@ mod tests {
371371
#[case::dot_in_name_with_cm(schema_with_dot(), ColumnMappingMode::Name)]
372372
#[case::different_struct_children(schema_different_struct_children(), ColumnMappingMode::None)]
373373
fn valid_schema_accepted(#[case] schema: StructType, #[case] cm: ColumnMappingMode) {
374-
assert!(validate_schema_for_create(&schema, cm).is_ok());
374+
assert!(validate_schema(&schema, cm).is_ok());
375375
}
376376

377377
// === Invalid schemas ===
@@ -393,7 +393,7 @@ mod tests {
393393
#[case] cm: ColumnMappingMode,
394394
#[case] expected_errs: &[&str],
395395
) {
396-
let result = validate_schema_for_create(&schema, cm);
396+
let result = validate_schema(&schema, cm);
397397
assert!(result.is_err());
398398
let err = result.unwrap_err().to_string();
399399
for expected in expected_errs {
@@ -412,7 +412,7 @@ mod tests {
412412
#[case::array_nested(schema_array_nested_invariant(), "arr.child")]
413413
#[case::map_nested(schema_map_nested_invariant(), "map.child")]
414414
fn invariants_metadata_rejected(#[case] schema: StructType, #[case] expected_path: &str) {
415-
let result = validate_schema_for_create(&schema, ColumnMappingMode::None);
415+
let result = validate_schema(&schema, ColumnMappingMode::None);
416416
let err = result.expect_err("expected delta.invariants metadata rejection");
417417
let msg = err.to_string();
418418
assert!(

kernel/src/snapshot/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::schema::SchemaRef;
2727
use crate::table_configuration::{InCommitTimestampEnablement, TableConfiguration};
2828
use crate::table_features::{physical_to_logical_column_name, ColumnMappingMode, TableFeature};
2929
use crate::table_properties::TableProperties;
30+
use crate::transaction::builder::alter_table::AlterTableTransactionBuilder;
3031
use crate::transaction::Transaction;
3132
use crate::utils::require;
3233
use crate::{DeltaResult, Engine, Error, LogCompactionWriter, Version};
@@ -687,6 +688,17 @@ impl Snapshot {
687688
Transaction::try_new_existing_table(self, committer, engine)
688689
}
689690

691+
/// Creates a builder for altering this table's metadata. Currently supports schema change
692+
/// operations.
693+
///
694+
/// The returned builder allows chaining operations before building an
695+
/// [`AlterTableTransaction`] that can be committed.
696+
///
697+
/// [`AlterTableTransaction`]: crate::transaction::AlterTableTransaction
698+
pub fn alter_table(self: Arc<Self>) -> AlterTableTransactionBuilder {
699+
AlterTableTransactionBuilder::new(self)
700+
}
701+
690702
/// Fetch the latest version of the provided `application_id` for this snapshot. Filters the
691703
/// txn based on the delta.setTransactionRetentionDuration property and lastUpdated.
692704
///

kernel/src/table_configuration.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,32 @@ impl TableConfiguration {
150150
version: Version,
151151
) -> DeltaResult<Self> {
152152
let logical_schema = Arc::new(metadata.parse_schema()?);
153+
Self::try_new_inner(metadata, protocol, table_root, version, logical_schema)
154+
}
155+
156+
/// Like [`try_new`](Self::try_new), but reuses `base`'s protocol, table root, and version
157+
/// and takes a pre-parsed `logical_schema`.
158+
pub(crate) fn try_new_with_schema(
159+
base: &Self,
160+
metadata: Metadata,
161+
logical_schema: SchemaRef,
162+
) -> DeltaResult<Self> {
163+
Self::try_new_inner(
164+
metadata,
165+
base.protocol.clone(),
166+
base.table_root.clone(),
167+
base.version,
168+
logical_schema,
169+
)
170+
}
171+
172+
fn try_new_inner(
173+
metadata: Metadata,
174+
protocol: Protocol,
175+
table_root: Url,
176+
version: Version,
177+
logical_schema: SchemaRef,
178+
) -> DeltaResult<Self> {
153179
let table_properties = metadata.parse_table_properties();
154180
let column_mapping_mode = column_mapping_mode(&protocol, &table_properties);
155181

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
//! Alter table transaction types and constructor.
2+
//!
3+
//! This module defines the [`AlterTableTransaction`] type alias and the
4+
//! [`try_new_alter_table`](AlterTableTransaction::try_new_alter_table) constructor.
5+
//! The builder logic lives in [`builder::alter_table`](super::builder::alter_table).
6+
7+
#![allow(unreachable_pub)]
8+
9+
use std::marker::PhantomData;
10+
use std::sync::OnceLock;
11+
12+
use crate::committer::Committer;
13+
use crate::snapshot::SnapshotRef;
14+
use crate::table_configuration::TableConfiguration;
15+
use crate::transaction::{AlterTable, Transaction};
16+
use crate::utils::current_time_ms;
17+
use crate::DeltaResult;
18+
19+
/// A type alias for alter-table transactions.
20+
///
21+
/// This provides a restricted API surface that only exposes operations valid during ALTER
22+
/// commands. Data file operations are not available at compile time because `AlterTable`
23+
/// does not implement [`SupportsDataFiles`](super::SupportsDataFiles).
24+
pub type AlterTableTransaction = Transaction<AlterTable>;
25+
26+
impl AlterTableTransaction {
27+
/// Create a new transaction for altering a table's schema. Produces a metadata-only commit
28+
/// that emits an updated Metadata action with the evolved schema.
29+
///
30+
/// The `effective_table_config` is the evolved table configuration (new schema, same
31+
/// protocol). It must be fully validated before calling this constructor (e.g. schema
32+
/// operations applied, protocol feature checks passed). The `read_snapshot` provides the
33+
/// pre-commit table state (version, previous protocol/metadata, ICT timestamps) used for
34+
/// commit versioning and post-commit snapshots.
35+
///
36+
/// This is typically called via `AlterTableTransactionBuilder::build()` rather than directly.
37+
pub(crate) fn try_new_alter_table(
38+
read_snapshot: SnapshotRef,
39+
effective_table_config: TableConfiguration,
40+
committer: Box<dyn Committer>,
41+
) -> DeltaResult<Self> {
42+
let span = tracing::info_span!(
43+
"txn",
44+
path = %read_snapshot.table_root(),
45+
read_version = read_snapshot.version(),
46+
operation = "ALTER TABLE",
47+
);
48+
49+
Ok(Transaction {
50+
span,
51+
read_snapshot_opt: Some(read_snapshot),
52+
effective_table_config,
53+
should_emit_protocol: false,
54+
should_emit_metadata: true,
55+
committer,
56+
operation: Some("ALTER TABLE".to_string()),
57+
engine_info: None,
58+
add_files_metadata: vec![],
59+
remove_files_metadata: vec![],
60+
set_transactions: vec![],
61+
commit_timestamp: current_time_ms()?,
62+
user_domain_metadata_additions: vec![],
63+
system_domain_metadata_additions: vec![],
64+
user_domain_removals: vec![],
65+
data_change: false,
66+
shared_write_state: OnceLock::new(),
67+
engine_commit_info: None,
68+
// TODO(#2446): match delta-spark's per-op isBlindAppend policy
69+
// (ADD/DROP/DROP NOT NULL -> true, SET NOT NULL -> false). Hardcoded false for
70+
// now: safe, but misses the true-case optimization delta-spark applies.
71+
is_blind_append: false,
72+
dv_matched_files: vec![],
73+
physical_clustering_columns: None,
74+
_state: PhantomData,
75+
})
76+
}
77+
}
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
//! Builder for ALTER TABLE (schema evolution) transactions.
2+
//!
3+
//! This module contains [`AlterTableTransactionBuilder`], which uses a type-state pattern to
4+
//! enforce valid operation chaining at compile time.
5+
//!
6+
//! # Type States
7+
//!
8+
//! - [`Ready`]: Initial state. Operations are available, but `build()` is not (at least one
9+
//! operation is required).
10+
//! - [`Modifying`]: After any chainable schema operation. More ops can be chained, and `build()` is
11+
//! available. See [`AlterTableTransactionBuilder<Modifying>`] for ops.
12+
//!
13+
//! # Transitions
14+
//!
15+
//! Each `impl` block below is gated by a state bound and documents which operations that
16+
//! state enables. Chainable schema operations live on `impl<S: Chainable>` and transition
17+
//! the builder to a chainable state; `build()` lives on states that are buildable.
18+
//!
19+
//! ```ignore
20+
//! // Allowed: at least one op queued before build().
21+
//! snapshot.alter_table().add_column(field).build(engine, committer)?;
22+
//!
23+
//! // Not allowed: build() is not defined on Ready (no ops queued).
24+
//! snapshot.alter_table().build(engine, committer)?; // compile error
25+
//! ```
26+
27+
use std::marker::PhantomData;
28+
use std::sync::Arc;
29+
30+
use crate::committer::Committer;
31+
use crate::schema::StructField;
32+
use crate::snapshot::SnapshotRef;
33+
use crate::table_configuration::TableConfiguration;
34+
use crate::table_features::Operation;
35+
use crate::transaction::alter_table::AlterTableTransaction;
36+
use crate::transaction::schema_evolution::{
37+
apply_schema_operations, SchemaEvolutionResult, SchemaOperation,
38+
};
39+
use crate::{DeltaResult, Engine};
40+
41+
/// Initial state: `build()` is not yet available (at least one operation is required).
42+
/// See [`Chainable`] for the operations available on this state.
43+
pub struct Ready;
44+
45+
/// State after at least one operation has been added. `build()` is available.
46+
/// See [`Chainable`] for the operations available on this state.
47+
pub struct Modifying;
48+
49+
/// Marker trait for builder states that accept chainable schema operations. Grouping states
50+
/// under one bound lets each op (like `add_column`) live on a single `impl<S: Chainable>`
51+
/// block -- chainable states share the body rather than duplicating it per state.
52+
///
53+
/// Sealed: external types cannot implement this, keeping the set of chainable states closed.
54+
pub trait Chainable: sealed::Sealed {}
55+
impl Chainable for Ready {}
56+
impl Chainable for Modifying {}
57+
58+
mod sealed {
59+
pub trait Sealed {}
60+
impl Sealed for super::Ready {}
61+
impl Sealed for super::Modifying {}
62+
}
63+
64+
/// Builder for constructing an [`AlterTableTransaction`] with schema evolution operations.
65+
///
66+
/// Uses a type-state pattern (`S`) to enforce at compile time:
67+
/// - At least one schema operation must be queued before `build()` is callable.
68+
/// - Only operations valid for the current state can be chained. This will disallow incompatibel
69+
/// chaining.
70+
pub struct AlterTableTransactionBuilder<S = Ready> {
71+
snapshot: SnapshotRef,
72+
operations: Vec<SchemaOperation>,
73+
// PhantomData marker for builder state (Ready or Modifying).
74+
// Zero-sized; only affects which methods are available at compile time.
75+
_state: PhantomData<S>,
76+
}
77+
78+
impl<S> AlterTableTransactionBuilder<S> {
79+
// Reconstructs the builder with a different PhantomData marker, changing which methods
80+
// are available at compile time (e.g. Ready -> Modifying enables `build()`). All real
81+
// fields are moved as-is; only the zero-sized type state changes.
82+
//
83+
// `T` (distinct from the struct's `S`) lets the caller pick the target state:
84+
// `self.transition::<Modifying>()` returns `AlterTableTransactionBuilder<Modifying>`.
85+
fn transition<T>(self) -> AlterTableTransactionBuilder<T> {
86+
AlterTableTransactionBuilder {
87+
snapshot: self.snapshot,
88+
operations: self.operations,
89+
_state: PhantomData,
90+
}
91+
}
92+
}
93+
94+
impl AlterTableTransactionBuilder<Ready> {
95+
/// Create a new builder from a snapshot.
96+
pub(crate) fn new(snapshot: SnapshotRef) -> Self {
97+
AlterTableTransactionBuilder {
98+
snapshot,
99+
operations: Vec::new(),
100+
_state: PhantomData,
101+
}
102+
}
103+
}
104+
105+
impl<S: Chainable> AlterTableTransactionBuilder<S> {
106+
/// Add a new top-level column to the table schema.
107+
///
108+
/// The field must not already exist in the schema (case-insensitive). The field must be
109+
/// nullable because existing data files do not contain this column and will read NULL for it.
110+
/// These constraints are validated during [`build()`](AlterTableTransactionBuilder::build).
111+
pub fn add_column(mut self, field: StructField) -> AlterTableTransactionBuilder<Modifying> {
112+
self.operations.push(SchemaOperation::AddColumn { field });
113+
self.transition()
114+
}
115+
}
116+
117+
impl AlterTableTransactionBuilder<Modifying> {
118+
/// Validate and apply schema operations, then build the [`AlterTableTransaction`].
119+
///
120+
/// This method:
121+
/// 1. Validates the table supports writes
122+
/// 2. Applies each operation sequentially against the evolving schema
123+
/// 3. Constructs new Metadata action with evolved schema
124+
/// 4. Builds the evolved table configuration
125+
/// 5. Creates the transaction
126+
///
127+
/// # Errors
128+
///
129+
/// - Any individual operation fails validation (see per-method errors above)
130+
/// - Table does not support writes (unsupported features)
131+
/// - The evolved schema requires protocol features not enabled on the table (e.g. adding a
132+
/// `timestampNtz` column without the `timestampNtz` feature)
133+
pub fn build(
134+
self,
135+
_engine: &dyn Engine,
136+
committer: Box<dyn Committer>,
137+
) -> DeltaResult<AlterTableTransaction> {
138+
let table_config = self.snapshot.table_configuration();
139+
// Rejects writes to tables kernel can't safely commit to: writer version out of
140+
// kernel's supported range, unsupported writer features, or schemas with SQL-expression
141+
// invariants. Runs on the pre-alter snapshot; future ALTER variants that change the
142+
// protocol must also re-check this on the evolved `TableConfiguration`.
143+
table_config.ensure_operation_supported(Operation::Write)?;
144+
145+
let schema = Arc::unwrap_or_clone(table_config.logical_schema());
146+
let SchemaEvolutionResult {
147+
schema: evolved_schema,
148+
} = apply_schema_operations(schema, self.operations, table_config.column_mapping_mode())?;
149+
150+
let evolved_metadata = table_config
151+
.metadata()
152+
.clone()
153+
.with_schema(evolved_schema.clone())?;
154+
155+
// Validates the evolved metadata against the protocol.
156+
let evolved_table_config = TableConfiguration::try_new_with_schema(
157+
table_config,
158+
evolved_metadata,
159+
evolved_schema,
160+
)?;
161+
162+
AlterTableTransaction::try_new_alter_table(self.snapshot, evolved_table_config, committer)
163+
}
164+
}

0 commit comments

Comments
 (0)