diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index 370bbfed336e..a2c886324007 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -502,7 +502,16 @@ public static Schema schemaFor(TableMetadata metadata, String ref) { } Snapshot snapshot = metadata.snapshot(snapshotRef.snapshotId()); - return metadata.schemas().get(snapshot.schemaId()); + Integer schemaId = snapshot.schemaId(); + // schemaId could be null, if snapshot was created before Iceberg added schema id to snapshot + if (schemaId != null) { + Schema schema = metadata.schemasById().get(schemaId); + Preconditions.checkState(schema != null, "Cannot find schema with schema id %s", schemaId); + return schema; + } + + // TODO: recover the schema by reading previous metadata files + return metadata.schema(); } /** diff --git a/core/src/test/java/org/apache/iceberg/util/TestSnapshotUtil.java b/core/src/test/java/org/apache/iceberg/util/TestSnapshotUtil.java index 96b56e5ffb67..24fb285b7340 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestSnapshotUtil.java +++ b/core/src/test/java/org/apache/iceberg/util/TestSnapshotUtil.java @@ -29,6 +29,7 @@ import java.util.stream.StreamSupport; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.PartitionSpec; @@ -36,6 +37,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.TestTables; import org.apache.iceberg.types.Types; @@ -253,6 +255,74 @@ public void schemaForTag() { assertThat(SnapshotUtil.schemaFor(table, tag).asStruct()).isEqualTo(initialSchema.asStruct()); } + @Test + public void schemaForTagWithMetadata() { + // Evolve the schema twice and tag the snapshot written under the middle schema. + table.updateSchema().addColumn("col1", Types.IntegerType.get()).commit(); + appendFileToMain(); + long taggedSnapshotId = table.currentSnapshot().snapshotId(); + table.updateSchema().addColumn("col2", Types.IntegerType.get()).commit(); + appendFileToMain(); + + String tag = "tag"; + table.manageSnapshots().createTag(tag, taggedSnapshotId).commit(); + + // Drop the branches set up in before() so the oldest snapshots become unreferenced. + table.manageSnapshots().removeBranch("b1").removeBranch("fork").commit(); + + // Drop the unreferenced oldest schema so schema ids no longer match list positions: + // the tagged snapshot keeps schema id 1, which now sits at list index 0. + table + .expireSnapshots() + .cleanExpiredMetadata(true) + .expireOlderThan(table.snapshot(taggedSnapshotId).timestampMillis()) + .retainLast(2) + .commit(); + + TableMetadata metadata = ((HasTableOperations) table).operations().current(); + Schema expected = + new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get()), + optional(3, "col1", Types.IntegerType.get())); + + assertThat(metadata.snapshot(taggedSnapshotId).schemaId()).isEqualTo(1); + assertThat(SnapshotUtil.schemaFor(metadata, tag).asStruct()).isEqualTo(expected.asStruct()); + } + + @Test + public void schemaForTagWithMetadataSchemaIdBeyondListSize() { + // Tag the latest schema, then drop the oldest unreferenced schema so the schemas list shrinks + // while the tagged snapshot keeps the highest schema id, which is now beyond the list size. + table.updateSchema().addColumn("col1", Types.IntegerType.get()).commit(); + appendFileToMain(); + table.updateSchema().addColumn("col2", Types.IntegerType.get()).commit(); + appendFileToMain(); + long taggedSnapshotId = table.currentSnapshot().snapshotId(); + + String tag = "tag"; + table.manageSnapshots().createTag(tag, taggedSnapshotId).commit(); + + table + .expireSnapshots() + .cleanExpiredMetadata(true) + .expireOlderThan(table.snapshot(taggedSnapshotId).timestampMillis()) + .retainLast(1) + .commit(); + + TableMetadata metadata = ((HasTableOperations) table).operations().current(); + Schema expected = + new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get()), + optional(3, "col1", Types.IntegerType.get()), + optional(4, "col2", Types.IntegerType.get())); + + assertThat(metadata.snapshot(taggedSnapshotId).schemaId()) + .isGreaterThanOrEqualTo(metadata.schemas().size()); + assertThat(SnapshotUtil.schemaFor(metadata, tag).asStruct()).isEqualTo(expected.asStruct()); + } + @Test public void schemaForSnapshotId() { Schema initialSchema =