diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index bb8baea616..648e1963d8 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -43,7 +43,7 @@ pub(crate) struct ManifestFileContext { field_ids: Arc>, bound_predicates: Option>, object_cache: Arc, - snapshot_schema: SchemaRef, + scan_schema: SchemaRef, expression_evaluator_cache: Arc, delete_file_index: DeleteFileIndex, case_sensitive: bool, @@ -58,7 +58,7 @@ pub(crate) struct ManifestEntryContext { pub field_ids: Arc>, pub bound_predicates: Option>, pub partition_spec_id: i32, - pub snapshot_schema: SchemaRef, + pub scan_schema: SchemaRef, pub delete_file_index: DeleteFileIndex, pub case_sensitive: bool, } @@ -71,7 +71,7 @@ impl ManifestFileContext { object_cache, manifest_file, bound_predicates, - snapshot_schema, + scan_schema, field_ids, mut sender, expression_evaluator_cache, @@ -89,7 +89,7 @@ impl ManifestFileContext { field_ids: field_ids.clone(), partition_spec_id: manifest_file.partition_spec_id, bound_predicates: bound_predicates.clone(), - snapshot_schema: snapshot_schema.clone(), + scan_schema: scan_schema.clone(), delete_file_index: delete_file_index.clone(), case_sensitive: self.case_sensitive, }; @@ -123,7 +123,7 @@ impl ManifestEntryContext { .with_record_count(Some(self.manifest_entry.record_count())) .with_data_file_path(self.manifest_entry.file_path().to_string()) .with_data_file_format(self.manifest_entry.file_format()) - .with_schema(self.snapshot_schema) + .with_schema(self.scan_schema) .with_project_field_ids(self.field_ids.to_vec()) .with_predicate( self.bound_predicates @@ -147,7 +147,10 @@ pub(crate) struct PlanContext { pub snapshot: SnapshotRef, pub table_metadata: TableMetadataRef, - pub snapshot_schema: SchemaRef, + /// The schema the scan resolves columns and predicates against. This is the + /// table's current schema for a default scan, or the snapshot's own schema + /// when an explicit `snapshot_id` pins the scan to a point in history. + pub scan_schema: SchemaRef, pub case_sensitive: bool, pub predicate: Option>, pub snapshot_bound_predicate: Option>, @@ -173,7 +176,7 @@ impl PlanContext { let partition_filter = self.partition_filter_cache.get( partition_spec_id, &self.table_metadata, - &self.snapshot_schema, + &self.scan_schema, self.case_sensitive, self.predicate .as_ref() @@ -182,7 +185,7 @@ impl PlanContext { "Expected a predicate but none present", ))? .as_ref() - .bind(self.snapshot_schema.clone(), self.case_sensitive)?, + .bind(self.scan_schema.clone(), self.case_sensitive)?, )?; Ok(partition_filter) @@ -274,7 +277,7 @@ impl PlanContext { bound_predicates, sender, object_cache: self.object_cache.clone(), - snapshot_schema: self.snapshot_schema.clone(), + scan_schema: self.scan_schema.clone(), field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), delete_file_index, diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 368e8143e2..3f65ebb946 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -218,7 +218,25 @@ impl<'a> TableScanBuilder<'a> { } }; - let schema = snapshot.schema(self.table.metadata())?; + // Pick the schema that defines this scan's column vocabulary. + // + // - Explicit `snapshot_id` (time-travel): use the schema that snapshot + // was written under, so caller column names match what existed at + // that point in history. + // - Default scan (no `snapshot_id`): use the table's current schema. + // After an `UpdateSchemaAction` commit, the current schema may + // differ from any existing snapshot's schema (rename, add, delete); + // `snapshot.schema()` would still report the snapshot-time vocabulary + // and reject names that are valid against the post-evolution schema. + // Field IDs are stable across schemas, and the downstream Parquet + // projection already maps field IDs to on-disk columns via + // `PARQUET:field_id` metadata, so resolving names against the current + // schema is safe end-to-end. + let schema = if self.snapshot_id.is_some() { + snapshot.schema(self.table.metadata())? + } else { + self.table.metadata().current_schema().clone() + }; // Check that all column names exist in the schema (skip reserved columns). if let Some(column_names) = self.column_names.as_ref() { @@ -284,7 +302,7 @@ impl<'a> TableScanBuilder<'a> { let plan_context = PlanContext { snapshot, table_metadata: self.table.metadata_ref(), - snapshot_schema: schema, + scan_schema: schema, case_sensitive: self.case_sensitive, predicate: self.filter.map(Arc::new), snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new), @@ -618,7 +636,6 @@ pub mod tests { use tempfile::TempDir; use uuid::Uuid; - use crate::TableIdent; use crate::arrow::ArrowReaderBuilder; use crate::expr::{BoundPredicate, Reference}; use crate::io::{FileIO, OutputFile}; @@ -631,6 +648,7 @@ pub mod tests { }; use crate::table::Table; use crate::test_utils::test_runtime; + use crate::{ErrorKind, TableIdent}; fn render_template(template: &str, ctx: Value) -> String { let mut env = Environment::new(); @@ -1320,6 +1338,131 @@ pub mod tests { ); } + // ----------------------------------------------------------------------- + // Schema-evolution regression: scan vocabulary follows current_schema + // when no explicit snapshot_id is given, but stays pinned to the + // snapshot's schema_id when time-travelling. + // ----------------------------------------------------------------------- + + /// Build a minimal V2 table whose current schema differs from the + /// schema its sole snapshot was written under. + /// + /// Schema 0 (snapshot vocabulary): `id, tmp` + /// Schema 1 (current schema): `id, value` <-- tmp renamed to value + /// + /// Field IDs are preserved across the rename, matching what + /// `UpdateSchemaAction::rename_column` produces at commit time. + fn make_schema_evolved_table() -> Table { + let json = r#"{ + "format-version": 2, + "table-uuid": "00000000-0000-0000-0000-000000000001", + "location": "s3://bucket/test/location", + "last-sequence-number": 1, + "last-updated-ms": 1602638573590, + "last-column-id": 2, + "current-schema-id": 1, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "id", "required": false, "type": "long"}, + {"id": 2, "name": "tmp", "required": false, "type": "double"} + ] + }, + { + "type": "struct", + "schema-id": 1, + "fields": [ + {"id": 1, "name": "id", "required": false, "type": "long"}, + {"id": 2, "name": "value", "required": false, "type": "double"} + ] + } + ], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "last-partition-id": 999, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": {}, + "current-snapshot-id": 1, + "snapshots": [ + { + "snapshot-id": 1, + "sequence-number": 1, + "timestamp-ms": 1602638573590, + "manifest-list": "s3://bucket/test/location/metadata/manifests.avro", + "summary": {"operation": "append"}, + "schema-id": 0 + } + ] + }"#; + let metadata = serde_json::from_str::(json).unwrap(); + Table::builder() + .metadata(metadata) + .metadata_location("s3://bucket/test/location/metadata/v1.json") + .identifier(TableIdent::from_strs(["ns1", "evolved"]).unwrap()) + .file_io(FileIO::new_with_memory()) + .runtime(test_runtime()) + .build() + .unwrap() + } + + #[test] + fn test_default_scan_uses_current_schema_after_evolution() { + // A default scan (no explicit snapshot_id) must validate against + // the table's current schema, not the snapshot's schema. The + // current schema has `value`; the snapshot's schema still has `tmp`. + let table = make_schema_evolved_table(); + assert!( + table.scan().select(["id", "value"]).build().is_ok(), + "current-schema column name should be accepted in default scan" + ); + } + + #[test] + fn test_default_scan_rejects_old_name_after_rename() { + // The old name is no longer part of the table's column vocabulary + // from the caller's perspective once the rename has been committed. + let table = make_schema_evolved_table(); + let err = table + .scan() + .select(["id", "tmp"]) + .build() + .expect_err("old column name must not resolve against current schema"); + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.message().contains("Column tmp not found"), + "error should name the column, got: {}", + err.message() + ); + } + + #[test] + fn test_snapshot_id_scan_uses_snapshot_schema() { + // Time-travel scans keep the snapshot's vocabulary: the snapshot + // was written under schema 0 (`id, tmp`), so `tmp` resolves and + // `value` does not, even though the current schema has been + // evolved. + let table = make_schema_evolved_table(); + assert!( + table + .scan() + .snapshot_id(1) + .select(["id", "tmp"]) + .build() + .is_ok(), + "snapshot-time column name should be accepted in time-travel scan" + ); + let err = table + .scan() + .snapshot_id(1) + .select(["id", "value"]) + .build() + .expect_err("post-evolution column name must not resolve against snapshot schema"); + assert_eq!(err.kind(), ErrorKind::DataInvalid); + } + #[tokio::test] async fn test_plan_files_on_table_without_any_snapshots() { let table = TableTestFixture::new_empty().table;