Skip to content

Commit 5f6afb2

Browse files
committed
fix(scan): use current_schema for default-snapshot column validation
A default table scan (no explicit snapshot_id) currently validates caller-supplied column names against the snapshot's schema_id, not the table's current schema. After an UpdateSchemaAction commit changes the current schema (rename/add/delete column), pre-existing snapshots still point at the old schema_id, so the validation loop in TableScanBuilder::build rejects names that are valid against the post-evolution schema with: DataInvalid => Column <new_name> not found in table. Schema: <old> The downstream Parquet projection (arrow/reader/projection.rs::get_arrow_projection_mask_with_field_ids) already maps field IDs to on-disk column names via PARQUET:field_id metadata, so resolving names against the current schema is safe end-to-end — field IDs are stable across schema versions, and the file's original column names live in the parquet metadata until the file is rewritten. Fix: branch on whether the caller asked for a specific snapshot. Explicit snapshot_id (time-travel) keeps the snapshot-time vocabulary; default scan uses the table's current schema. Tests: three regression tests on a fixture with current-schema-id=1 (id, value) and a sole snapshot at schema-id=0 (id, tmp): * test_default_scan_uses_current_schema_after_evolution — select(['id','value']) succeeds in the default scan * test_default_scan_rejects_old_name_after_rename — select(['id','tmp']) fails with DataInvalid in the default scan * test_snapshot_id_scan_uses_snapshot_schema — snapshot_id(1).select(['id','tmp']) succeeds (time-travel), and snapshot_id(1).select(['id','value']) fails All 1299 iceberg lib tests pass (37 in scan::tests = 34 existing + 3 new). Clippy + rustfmt clean.
1 parent eef9b42 commit 5f6afb2

1 file changed

Lines changed: 145 additions & 2 deletions

File tree

crates/iceberg/src/scan/mod.rs

Lines changed: 145 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,25 @@ impl<'a> TableScanBuilder<'a> {
218218
}
219219
};
220220

221-
let schema = snapshot.schema(self.table.metadata())?;
221+
// Pick the schema that defines this scan's column vocabulary.
222+
//
223+
// - Explicit `snapshot_id` (time-travel): use the schema that snapshot
224+
// was written under, so caller column names match what existed at
225+
// that point in history.
226+
// - Default scan (no `snapshot_id`): use the table's current schema.
227+
// After an `UpdateSchemaAction` commit, the current schema may
228+
// differ from any existing snapshot's schema (rename, add, delete);
229+
// `snapshot.schema()` would still report the snapshot-time vocabulary
230+
// and reject names that are valid against the post-evolution schema.
231+
// Field IDs are stable across schemas, and the downstream Parquet
232+
// projection already maps field IDs to on-disk columns via
233+
// `PARQUET:field_id` metadata, so resolving names against the current
234+
// schema is safe end-to-end.
235+
let schema = if self.snapshot_id.is_some() {
236+
snapshot.schema(self.table.metadata())?
237+
} else {
238+
self.table.metadata().current_schema().clone()
239+
};
222240

223241
// Check that all column names exist in the schema (skip reserved columns).
224242
if let Some(column_names) = self.column_names.as_ref() {
@@ -618,7 +636,6 @@ pub mod tests {
618636
use tempfile::TempDir;
619637
use uuid::Uuid;
620638

621-
use crate::TableIdent;
622639
use crate::arrow::ArrowReaderBuilder;
623640
use crate::expr::{BoundPredicate, Reference};
624641
use crate::io::{FileIO, OutputFile};
@@ -631,6 +648,7 @@ pub mod tests {
631648
};
632649
use crate::table::Table;
633650
use crate::test_utils::test_runtime;
651+
use crate::{ErrorKind, TableIdent};
634652

635653
fn render_template(template: &str, ctx: Value) -> String {
636654
let mut env = Environment::new();
@@ -1305,6 +1323,131 @@ pub mod tests {
13051323
);
13061324
}
13071325

1326+
// -----------------------------------------------------------------------
1327+
// Schema-evolution regression: scan vocabulary follows current_schema
1328+
// when no explicit snapshot_id is given, but stays pinned to the
1329+
// snapshot's schema_id when time-travelling.
1330+
// -----------------------------------------------------------------------
1331+
1332+
/// Build a minimal V2 table whose current schema differs from the
1333+
/// schema its sole snapshot was written under.
1334+
///
1335+
/// Schema 0 (snapshot vocabulary): `id, tmp`
1336+
/// Schema 1 (current schema): `id, value` <-- tmp renamed to value
1337+
///
1338+
/// Field IDs are preserved across the rename, matching what
1339+
/// `UpdateSchemaAction::rename_column` produces at commit time.
1340+
fn make_schema_evolved_table() -> Table {
1341+
let json = r#"{
1342+
"format-version": 2,
1343+
"table-uuid": "00000000-0000-0000-0000-000000000001",
1344+
"location": "s3://bucket/test/location",
1345+
"last-sequence-number": 1,
1346+
"last-updated-ms": 1602638573590,
1347+
"last-column-id": 2,
1348+
"current-schema-id": 1,
1349+
"schemas": [
1350+
{
1351+
"type": "struct",
1352+
"schema-id": 0,
1353+
"fields": [
1354+
{"id": 1, "name": "id", "required": false, "type": "long"},
1355+
{"id": 2, "name": "tmp", "required": false, "type": "double"}
1356+
]
1357+
},
1358+
{
1359+
"type": "struct",
1360+
"schema-id": 1,
1361+
"fields": [
1362+
{"id": 1, "name": "id", "required": false, "type": "long"},
1363+
{"id": 2, "name": "value", "required": false, "type": "double"}
1364+
]
1365+
}
1366+
],
1367+
"default-spec-id": 0,
1368+
"partition-specs": [{"spec-id": 0, "fields": []}],
1369+
"last-partition-id": 999,
1370+
"default-sort-order-id": 0,
1371+
"sort-orders": [{"order-id": 0, "fields": []}],
1372+
"properties": {},
1373+
"current-snapshot-id": 1,
1374+
"snapshots": [
1375+
{
1376+
"snapshot-id": 1,
1377+
"sequence-number": 1,
1378+
"timestamp-ms": 1602638573590,
1379+
"manifest-list": "s3://bucket/test/location/metadata/manifests.avro",
1380+
"summary": {"operation": "append"},
1381+
"schema-id": 0
1382+
}
1383+
]
1384+
}"#;
1385+
let metadata = serde_json::from_str::<TableMetadata>(json).unwrap();
1386+
Table::builder()
1387+
.metadata(metadata)
1388+
.metadata_location("s3://bucket/test/location/metadata/v1.json")
1389+
.identifier(TableIdent::from_strs(["ns1", "evolved"]).unwrap())
1390+
.file_io(FileIO::new_with_memory())
1391+
.runtime(test_runtime())
1392+
.build()
1393+
.unwrap()
1394+
}
1395+
1396+
#[test]
1397+
fn test_default_scan_uses_current_schema_after_evolution() {
1398+
// A default scan (no explicit snapshot_id) must validate against
1399+
// the table's current schema, not the snapshot's schema. The
1400+
// current schema has `value`; the snapshot's schema still has `tmp`.
1401+
let table = make_schema_evolved_table();
1402+
assert!(
1403+
table.scan().select(["id", "value"]).build().is_ok(),
1404+
"current-schema column name should be accepted in default scan"
1405+
);
1406+
}
1407+
1408+
#[test]
1409+
fn test_default_scan_rejects_old_name_after_rename() {
1410+
// The old name is no longer part of the table's column vocabulary
1411+
// from the caller's perspective once the rename has been committed.
1412+
let table = make_schema_evolved_table();
1413+
let err = table
1414+
.scan()
1415+
.select(["id", "tmp"])
1416+
.build()
1417+
.expect_err("old column name must not resolve against current schema");
1418+
assert_eq!(err.kind(), ErrorKind::DataInvalid);
1419+
assert!(
1420+
err.message().contains("Column tmp not found"),
1421+
"error should name the column, got: {}",
1422+
err.message()
1423+
);
1424+
}
1425+
1426+
#[test]
1427+
fn test_snapshot_id_scan_uses_snapshot_schema() {
1428+
// Time-travel scans keep the snapshot's vocabulary: the snapshot
1429+
// was written under schema 0 (`id, tmp`), so `tmp` resolves and
1430+
// `value` does not, even though the current schema has been
1431+
// evolved.
1432+
let table = make_schema_evolved_table();
1433+
assert!(
1434+
table
1435+
.scan()
1436+
.snapshot_id(1)
1437+
.select(["id", "tmp"])
1438+
.build()
1439+
.is_ok(),
1440+
"snapshot-time column name should be accepted in time-travel scan"
1441+
);
1442+
let err = table
1443+
.scan()
1444+
.snapshot_id(1)
1445+
.select(["id", "value"])
1446+
.build()
1447+
.expect_err("post-evolution column name must not resolve against snapshot schema");
1448+
assert_eq!(err.kind(), ErrorKind::DataInvalid);
1449+
}
1450+
13081451
#[tokio::test]
13091452
async fn test_plan_files_on_table_without_any_snapshots() {
13101453
let table = TableTestFixture::new_empty().table;

0 commit comments

Comments
 (0)