Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions crates/iceberg/src/scan/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(crate) struct ManifestFileContext {
field_ids: Arc<Vec<i32>>,
bound_predicates: Option<Arc<BoundPredicates>>,
object_cache: Arc<ObjectCache>,
snapshot_schema: SchemaRef,
scan_schema: SchemaRef,
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
delete_file_index: DeleteFileIndex,
case_sensitive: bool,
Expand All @@ -58,7 +58,7 @@ pub(crate) struct ManifestEntryContext {
pub field_ids: Arc<Vec<i32>>,
pub bound_predicates: Option<Arc<BoundPredicates>>,
pub partition_spec_id: i32,
pub snapshot_schema: SchemaRef,
pub scan_schema: SchemaRef,
pub delete_file_index: DeleteFileIndex,
pub case_sensitive: bool,
}
Expand All @@ -71,7 +71,7 @@ impl ManifestFileContext {
object_cache,
manifest_file,
bound_predicates,
snapshot_schema,
scan_schema,
field_ids,
mut sender,
expression_evaluator_cache,
Expand All @@ -89,7 +89,7 @@ impl ManifestFileContext {
field_ids: field_ids.clone(),
partition_spec_id: manifest_file.partition_spec_id,
bound_predicates: bound_predicates.clone(),
snapshot_schema: snapshot_schema.clone(),
scan_schema: scan_schema.clone(),
delete_file_index: delete_file_index.clone(),
case_sensitive: self.case_sensitive,
};
Expand Down Expand Up @@ -123,7 +123,7 @@ impl ManifestEntryContext {
.with_record_count(Some(self.manifest_entry.record_count()))
.with_data_file_path(self.manifest_entry.file_path().to_string())
.with_data_file_format(self.manifest_entry.file_format())
.with_schema(self.snapshot_schema)
.with_schema(self.scan_schema)
.with_project_field_ids(self.field_ids.to_vec())
.with_predicate(
self.bound_predicates
Expand All @@ -147,7 +147,10 @@ pub(crate) struct PlanContext {
pub snapshot: SnapshotRef,

pub table_metadata: TableMetadataRef,
pub snapshot_schema: SchemaRef,
/// The schema the scan resolves columns and predicates against. This is the
/// table's current schema for a default scan, or the snapshot's own schema
/// when an explicit `snapshot_id` pins the scan to a point in history.
pub scan_schema: SchemaRef,
pub case_sensitive: bool,
pub predicate: Option<Arc<Predicate>>,
pub snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
Expand All @@ -173,7 +176,7 @@ impl PlanContext {
let partition_filter = self.partition_filter_cache.get(
partition_spec_id,
&self.table_metadata,
&self.snapshot_schema,
&self.scan_schema,
self.case_sensitive,
self.predicate
.as_ref()
Expand All @@ -182,7 +185,7 @@ impl PlanContext {
"Expected a predicate but none present",
))?
.as_ref()
.bind(self.snapshot_schema.clone(), self.case_sensitive)?,
.bind(self.scan_schema.clone(), self.case_sensitive)?,
)?;

Ok(partition_filter)
Expand Down Expand Up @@ -274,7 +277,7 @@ impl PlanContext {
bound_predicates,
sender,
object_cache: self.object_cache.clone(),
snapshot_schema: self.snapshot_schema.clone(),
scan_schema: self.scan_schema.clone(),
field_ids: self.field_ids.clone(),
expression_evaluator_cache: self.expression_evaluator_cache.clone(),
delete_file_index,
Expand Down
149 changes: 146 additions & 3 deletions crates/iceberg/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,25 @@ impl<'a> TableScanBuilder<'a> {
}
};

let schema = snapshot.schema(self.table.metadata())?;
// Pick the schema that defines this scan's column vocabulary.
//
// - Explicit `snapshot_id` (time-travel): use the schema that snapshot
// was written under, so caller column names match what existed at
// that point in history.
// - Default scan (no `snapshot_id`): use the table's current schema.
// After an `UpdateSchemaAction` commit, the current schema may
// differ from any existing snapshot's schema (rename, add, delete);
// `snapshot.schema()` would still report the snapshot-time vocabulary
// and reject names that are valid against the post-evolution schema.
// Field IDs are stable across schemas, and the downstream Parquet
// projection already maps field IDs to on-disk columns via
// `PARQUET:field_id` metadata, so resolving names against the current
// schema is safe end-to-end.
let schema = if self.snapshot_id.is_some() {
snapshot.schema(self.table.metadata())?
} else {
self.table.metadata().current_schema().clone()
};

// Check that all column names exist in the schema (skip reserved columns).
if let Some(column_names) = self.column_names.as_ref() {
Expand Down Expand Up @@ -284,7 +302,7 @@ impl<'a> TableScanBuilder<'a> {
let plan_context = PlanContext {
snapshot,
table_metadata: self.table.metadata_ref(),
snapshot_schema: schema,
scan_schema: schema,
case_sensitive: self.case_sensitive,
predicate: self.filter.map(Arc::new),
snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
Expand Down Expand Up @@ -618,7 +636,6 @@ pub mod tests {
use tempfile::TempDir;
use uuid::Uuid;

use crate::TableIdent;
use crate::arrow::ArrowReaderBuilder;
use crate::expr::{BoundPredicate, Reference};
use crate::io::{FileIO, OutputFile};
Expand All @@ -631,6 +648,7 @@ pub mod tests {
};
use crate::table::Table;
use crate::test_utils::test_runtime;
use crate::{ErrorKind, TableIdent};

fn render_template(template: &str, ctx: Value) -> String {
let mut env = Environment::new();
Expand Down Expand Up @@ -1320,6 +1338,131 @@ pub mod tests {
);
}

// -----------------------------------------------------------------------
// Schema-evolution regression: scan vocabulary follows current_schema
// when no explicit snapshot_id is given, but stays pinned to the
// snapshot's schema_id when time-travelling.
// -----------------------------------------------------------------------

/// Build a minimal V2 table whose current schema differs from the
/// schema its sole snapshot was written under.
///
/// Schema 0 (snapshot vocabulary): `id, tmp`
/// Schema 1 (current schema): `id, value` <-- tmp renamed to value
///
/// Field IDs are preserved across the rename, matching what
/// `UpdateSchemaAction::rename_column` produces at commit time.
fn make_schema_evolved_table() -> Table {
let json = r#"{
"format-version": 2,
"table-uuid": "00000000-0000-0000-0000-000000000001",
"location": "s3://bucket/test/location",
"last-sequence-number": 1,
"last-updated-ms": 1602638573590,
"last-column-id": 2,
"current-schema-id": 1,
"schemas": [
{
"type": "struct",
"schema-id": 0,
"fields": [
{"id": 1, "name": "id", "required": false, "type": "long"},
{"id": 2, "name": "tmp", "required": false, "type": "double"}
]
},
{
"type": "struct",
"schema-id": 1,
"fields": [
{"id": 1, "name": "id", "required": false, "type": "long"},
{"id": 2, "name": "value", "required": false, "type": "double"}
]
}
],
"default-spec-id": 0,
"partition-specs": [{"spec-id": 0, "fields": []}],
"last-partition-id": 999,
"default-sort-order-id": 0,
"sort-orders": [{"order-id": 0, "fields": []}],
"properties": {},
"current-snapshot-id": 1,
"snapshots": [
{
"snapshot-id": 1,
"sequence-number": 1,
"timestamp-ms": 1602638573590,
"manifest-list": "s3://bucket/test/location/metadata/manifests.avro",
"summary": {"operation": "append"},
"schema-id": 0
}
]
}"#;
let metadata = serde_json::from_str::<TableMetadata>(json).unwrap();
Table::builder()
.metadata(metadata)
.metadata_location("s3://bucket/test/location/metadata/v1.json")
.identifier(TableIdent::from_strs(["ns1", "evolved"]).unwrap())
.file_io(FileIO::new_with_memory())
.runtime(test_runtime())
.build()
.unwrap()
}

#[test]
fn test_default_scan_uses_current_schema_after_evolution() {
// A default scan (no explicit snapshot_id) must validate against
// the table's current schema, not the snapshot's schema. The
// current schema has `value`; the snapshot's schema still has `tmp`.
let table = make_schema_evolved_table();
assert!(
table.scan().select(["id", "value"]).build().is_ok(),
"current-schema column name should be accepted in default scan"
);
}

#[test]
fn test_default_scan_rejects_old_name_after_rename() {
// The old name is no longer part of the table's column vocabulary
// from the caller's perspective once the rename has been committed.
let table = make_schema_evolved_table();
let err = table
.scan()
.select(["id", "tmp"])
.build()
.expect_err("old column name must not resolve against current schema");
assert_eq!(err.kind(), ErrorKind::DataInvalid);
assert!(
err.message().contains("Column tmp not found"),
"error should name the column, got: {}",
err.message()
);
}

#[test]
fn test_snapshot_id_scan_uses_snapshot_schema() {
// Time-travel scans keep the snapshot's vocabulary: the snapshot
// was written under schema 0 (`id, tmp`), so `tmp` resolves and
// `value` does not, even though the current schema has been
// evolved.
let table = make_schema_evolved_table();
assert!(
table
.scan()
.snapshot_id(1)
.select(["id", "tmp"])
.build()
.is_ok(),
"snapshot-time column name should be accepted in time-travel scan"
);
let err = table
.scan()
.snapshot_id(1)
.select(["id", "value"])
.build()
.expect_err("post-evolution column name must not resolve against snapshot schema");
assert_eq!(err.kind(), ErrorKind::DataInvalid);
}

#[tokio::test]
async fn test_plan_files_on_table_without_any_snapshots() {
let table = TableTestFixture::new_empty().table;
Expand Down
Loading