Skip to content

Commit 1001097

Browse files
gty404wgtmac
authored andcommitted
Align merging snapshot validation with Java
1 parent 6ebf1d4 commit 1001097

8 files changed

Lines changed: 337 additions & 118 deletions

src/iceberg/test/merging_snapshot_update_test.cc

Lines changed: 173 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class TestMergeAppend : public MergingSnapshotUpdate {
8585
int64_t GeneratedSnapshotId() { return SnapshotId(); }
8686
void SetDataSeqNumber(int64_t seq) { SetNewDataFilesDataSequenceNumber(seq); }
8787
static Status ValidateAddedDataFilesForTest(const TableMetadata& metadata,
88-
int64_t starting_snapshot_id,
88+
std::optional<int64_t> starting_snapshot_id,
8989
const std::shared_ptr<Snapshot>& parent,
9090
std::shared_ptr<FileIO> io) {
9191
return MergingSnapshotUpdate::ValidateAddedDataFiles(metadata, starting_snapshot_id,
@@ -234,6 +234,10 @@ class MergingSnapshotUpdateTest : public MinimalUpdateTestBase {
234234
return TestOverwriteUpdate::Make(TableName(), table_);
235235
}
236236

237+
void SetTableFormatVersion(int8_t format_version) {
238+
table_->metadata()->format_version = format_version;
239+
}
240+
237241
// Commit file_a_ with FastAppend and refresh the table.
238242
void CommitFileA() {
239243
ICEBERG_UNWRAP_OR_FAIL(auto fa, table_->NewFastAppend());
@@ -277,19 +281,37 @@ class MergingSnapshotUpdateTest : public MinimalUpdateTestBase {
277281
return writer->ToManifestFile();
278282
}
279283

284+
Result<ManifestFile> WriteDataManifest(
285+
const TableMetadata& metadata, const std::string& path,
286+
const std::vector<std::shared_ptr<DataFile>>& files, int64_t snapshot_id,
287+
int64_t sequence_number) {
288+
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
289+
ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_->spec_id()));
290+
ICEBERG_ASSIGN_OR_RAISE(
291+
auto writer,
292+
ManifestWriter::MakeWriter(metadata.format_version, snapshot_id, path, file_io_,
293+
spec, schema, ManifestContent::kData));
294+
for (const auto& f : files) {
295+
ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(f, sequence_number));
296+
}
297+
ICEBERG_RETURN_UNEXPECTED(writer->Close());
298+
return writer->ToManifestFile();
299+
}
300+
280301
Result<ManifestFile> WriteDeleteManifest(
281302
const TableMetadata& metadata, const std::string& path,
282-
const std::vector<std::shared_ptr<DataFile>>& files, int64_t sequence_number) {
303+
const std::vector<std::shared_ptr<DataFile>>& files, int64_t snapshot_id,
304+
int64_t sequence_number) {
283305
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
284306
ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_->spec_id()));
285307
ICEBERG_ASSIGN_OR_RAISE(
286308
auto writer,
287-
ManifestWriter::MakeWriter(metadata.format_version, /*snapshot_id=*/1L, path,
288-
file_io_, spec, schema, ManifestContent::kDeletes));
309+
ManifestWriter::MakeWriter(metadata.format_version, snapshot_id, path, file_io_,
310+
spec, schema, ManifestContent::kDeletes));
289311
for (const auto& f : files) {
290312
ManifestEntry entry;
291313
entry.status = ManifestStatus::kAdded;
292-
entry.snapshot_id = 1L;
314+
entry.snapshot_id = snapshot_id;
293315
entry.sequence_number = sequence_number;
294316
entry.data_file = f;
295317
ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
@@ -623,6 +645,39 @@ TEST_F(MergingSnapshotUpdateTest, ValidateNewDeleteFileV2AllowsEqualityDelete) {
623645
EXPECT_THAT(op->AddDelete(eq_del), IsOk());
624646
}
625647

648+
TEST_F(MergingSnapshotUpdateTest, ValidateNewDeleteFileV3RejectsNonDVPositionDelete) {
649+
SetTableFormatVersion(3);
650+
651+
auto del_file = MakeDeleteFile("/delete/del_a.parquet", 1L);
652+
653+
ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
654+
EXPECT_THAT(op->AddDelete(del_file), IsError(ErrorKind::kInvalidArgument));
655+
}
656+
657+
TEST_F(MergingSnapshotUpdateTest, ValidateNewDeleteFileV3AllowsDeletionVector) {
658+
SetTableFormatVersion(3);
659+
660+
auto del_file = MakeDeleteFile("/delete/dv_a.puffin", 1L);
661+
del_file->file_format = FileFormatType::kPuffin;
662+
del_file->referenced_data_file = file_a_->file_path;
663+
del_file->content_offset = 0;
664+
del_file->content_size_in_bytes = 10;
665+
666+
ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
667+
EXPECT_THAT(op->AddDelete(del_file), IsOk());
668+
}
669+
670+
TEST_F(MergingSnapshotUpdateTest, ApplyRejectsV2StagedPositionDeleteAfterV3Upgrade) {
671+
auto del_file = MakeDeleteFile("/delete/del_a.parquet", 1L);
672+
673+
ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
674+
EXPECT_THAT(op->AddDelete(del_file), IsOk());
675+
676+
auto metadata = std::make_shared<TableMetadata>(*table_->metadata());
677+
metadata->format_version = 3;
678+
EXPECT_THAT(op->Apply(*metadata, nullptr), IsError(ErrorKind::kInvalidArgument));
679+
}
680+
626681
// -------------------------------------------------------------------------
627682
// AddManifest — invalid manifest rejection
628683
// -------------------------------------------------------------------------
@@ -970,6 +1025,42 @@ TEST_F(MergingSnapshotUpdateTest, ValidateAddedDataFilesFailsForTruncatedHistory
9701025
IsError(ErrorKind::kInvalidArgument));
9711026
}
9721027

1028+
TEST_F(MergingSnapshotUpdateTest,
1029+
ValidateAddedDataFilesWithNoStartingSnapshotFailsForTruncatedHistory) {
1030+
auto metadata = std::make_shared<TableMetadata>();
1031+
metadata->format_version = 2;
1032+
metadata->location = table_location_;
1033+
metadata->current_schema_id = 0;
1034+
metadata->schemas.push_back(schema_);
1035+
1036+
auto snapshot = std::make_shared<Snapshot>(Snapshot{
1037+
.snapshot_id = 2,
1038+
.parent_snapshot_id = 1,
1039+
.sequence_number = 2,
1040+
.timestamp_ms = TimePointMs{},
1041+
.manifest_list = "",
1042+
.summary = {},
1043+
.schema_id = 0,
1044+
});
1045+
metadata->snapshots = {snapshot};
1046+
1047+
EXPECT_THAT(TestMergeAppend::ValidateAddedDataFilesForTest(*metadata, std::nullopt,
1048+
snapshot, file_io_),
1049+
IsError(ErrorKind::kInvalidArgument));
1050+
}
1051+
1052+
TEST_F(MergingSnapshotUpdateTest, ValidateAddedDataFilesWithNoStartingSnapshotChecksAll) {
1053+
CommitFileA();
1054+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
1055+
1056+
EXPECT_THAT(TestMergeAppend::ValidateAddedDataFilesForTest(
1057+
*table_->metadata(), std::nullopt, snapshot, file_io_),
1058+
IsError(ErrorKind::kInvalidArgument));
1059+
EXPECT_THAT(TestMergeAppend::ValidateAddedDataFilesForTest(
1060+
*table_->metadata(), snapshot->snapshot_id, snapshot, file_io_),
1061+
IsOk());
1062+
}
1063+
9731064
TEST_F(MergingSnapshotUpdateTest, ValidateAddedDataFilesWithPartitionSetDetectsConflict) {
9741065
CommitFileA();
9751066
ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot());
@@ -988,6 +1079,36 @@ TEST_F(MergingSnapshotUpdateTest, ValidateAddedDataFilesWithPartitionSetDetectsC
9881079
IsError(ErrorKind::kInvalidArgument));
9891080
}
9901081

1082+
TEST_F(MergingSnapshotUpdateTest, ValidateAddedDataFilesIgnoresOldEntrySnapshotId) {
1083+
CommitFileA();
1084+
ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot());
1085+
1086+
auto metadata = std::make_shared<TableMetadata>(*table_->metadata());
1087+
1088+
constexpr int64_t kSecondSnapshotId = 123456;
1089+
auto manifest_path = table_location_ + "/metadata/old-entry-data.avro";
1090+
ICEBERG_UNWRAP_OR_FAIL(
1091+
auto manifest,
1092+
WriteDataManifest(*metadata, manifest_path, {file_b_}, first_snapshot->snapshot_id,
1093+
first_snapshot->sequence_number));
1094+
manifest.added_snapshot_id = kSecondSnapshotId;
1095+
manifest.sequence_number = first_snapshot->sequence_number + 1;
1096+
manifest.min_sequence_number = first_snapshot->sequence_number;
1097+
ICEBERG_UNWRAP_OR_FAIL(
1098+
auto second_snapshot,
1099+
MakeSyntheticSnapshot(DataOperation::kAppend, kSecondSnapshotId,
1100+
first_snapshot->snapshot_id,
1101+
first_snapshot->sequence_number + 1, {manifest}));
1102+
1103+
metadata->snapshots.push_back(second_snapshot);
1104+
metadata->current_snapshot_id = second_snapshot->snapshot_id;
1105+
metadata->last_sequence_number = second_snapshot->sequence_number;
1106+
1107+
EXPECT_THAT(TestMergeAppend::ValidateAddedDataFilesForTest(
1108+
*metadata, first_snapshot->snapshot_id, second_snapshot, file_io_),
1109+
IsOk());
1110+
}
1111+
9911112
TEST_F(MergingSnapshotUpdateTest,
9921113
ValidateNoNewDeletesForDataFilesWithFilterDetectsConflict) {
9931114
CommitFileA();
@@ -1143,9 +1264,10 @@ TEST_F(MergingSnapshotUpdateTest, ValidateAddedDVsDetectsConflict) {
11431264

11441265
constexpr int64_t kSecondSnapshotId = 123456;
11451266
auto manifest_path = table_location_ + "/metadata/dv-conflict.avro";
1146-
ICEBERG_UNWRAP_OR_FAIL(auto manifest,
1147-
WriteDeleteManifest(*metadata, manifest_path, {dv_file},
1148-
first_snapshot->sequence_number + 1));
1267+
ICEBERG_UNWRAP_OR_FAIL(
1268+
auto manifest,
1269+
WriteDeleteManifest(*metadata, manifest_path, {dv_file}, kSecondSnapshotId,
1270+
first_snapshot->sequence_number + 1));
11491271
manifest.added_snapshot_id = kSecondSnapshotId;
11501272
manifest.sequence_number = first_snapshot->sequence_number + 1;
11511273
manifest.min_sequence_number = first_snapshot->sequence_number + 1;
@@ -1181,9 +1303,10 @@ TEST_F(MergingSnapshotUpdateTest, ValidateAddedDVsIgnoresUnrelatedDVs) {
11811303

11821304
constexpr int64_t kSecondSnapshotId = 123456;
11831305
auto manifest_path = table_location_ + "/metadata/dv-unrelated.avro";
1184-
ICEBERG_UNWRAP_OR_FAIL(auto manifest,
1185-
WriteDeleteManifest(*metadata, manifest_path, {dv_file},
1186-
first_snapshot->sequence_number + 1));
1306+
ICEBERG_UNWRAP_OR_FAIL(
1307+
auto manifest,
1308+
WriteDeleteManifest(*metadata, manifest_path, {dv_file}, kSecondSnapshotId,
1309+
first_snapshot->sequence_number + 1));
11871310
manifest.added_snapshot_id = kSecondSnapshotId;
11881311
manifest.sequence_number = first_snapshot->sequence_number + 1;
11891312
manifest.min_sequence_number = first_snapshot->sequence_number + 1;
@@ -1204,6 +1327,45 @@ TEST_F(MergingSnapshotUpdateTest, ValidateAddedDVsIgnoresUnrelatedDVs) {
12041327
IsOk());
12051328
}
12061329

1330+
TEST_F(MergingSnapshotUpdateTest, ValidateAddedDVsIgnoresOldEntrySnapshotId) {
1331+
CommitFileA();
1332+
ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot());
1333+
1334+
auto metadata = std::make_shared<TableMetadata>(*table_->metadata());
1335+
metadata->format_version = 3;
1336+
1337+
auto dv_file = MakeDeleteFile("/delete/dv_a.puffin", 1L);
1338+
dv_file->file_format = FileFormatType::kPuffin;
1339+
dv_file->referenced_data_file = file_a_->file_path;
1340+
dv_file->content_offset = 0;
1341+
dv_file->content_size_in_bytes = 10;
1342+
1343+
constexpr int64_t kSecondSnapshotId = 123456;
1344+
auto manifest_path = table_location_ + "/metadata/old-entry-dv.avro";
1345+
ICEBERG_UNWRAP_OR_FAIL(
1346+
auto manifest,
1347+
WriteDeleteManifest(*metadata, manifest_path, {dv_file},
1348+
first_snapshot->snapshot_id, first_snapshot->sequence_number));
1349+
manifest.added_snapshot_id = kSecondSnapshotId;
1350+
manifest.sequence_number = first_snapshot->sequence_number + 1;
1351+
manifest.min_sequence_number = first_snapshot->sequence_number;
1352+
ICEBERG_UNWRAP_OR_FAIL(
1353+
auto second_snapshot,
1354+
MakeSyntheticSnapshot(DataOperation::kOverwrite, kSecondSnapshotId,
1355+
first_snapshot->snapshot_id,
1356+
first_snapshot->sequence_number + 1, {manifest}));
1357+
1358+
metadata->snapshots.push_back(second_snapshot);
1359+
metadata->current_snapshot_id = second_snapshot->snapshot_id;
1360+
metadata->last_sequence_number = second_snapshot->sequence_number;
1361+
1362+
const std::unordered_set<std::string> referenced_data_files{file_a_->file_path};
1363+
EXPECT_THAT(TestMergeAppend::ValidateAddedDVsForTest(
1364+
*metadata, first_snapshot->snapshot_id, Expressions::AlwaysTrue(),
1365+
referenced_data_files, second_snapshot, file_io_),
1366+
IsOk());
1367+
}
1368+
12071369
TEST_F(MergingSnapshotUpdateTest, ValidateDeletedDataFilesWithExpressionDetectsConflict) {
12081370
CommitFileA();
12091371
ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot());

src/iceberg/test/snapshot_util_test.cc

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -312,14 +312,27 @@ TEST_F(SnapshotUtilTest, SchemaForBranch) {
312312

313313
std::string branch = "b1";
314314
ICEBERG_UNWRAP_OR_FAIL(auto schema, SnapshotUtil::SchemaFor(*table_, branch));
315-
EXPECT_EQ(schema->schema_id(), branch_schema->schema_id());
316-
EXPECT_EQ(schema->fields().size(), branch_schema->fields().size());
317-
EXPECT_NE(schema->fields().size(), initial_schema->fields().size());
315+
EXPECT_EQ(schema->schema_id(), initial_schema->schema_id());
316+
EXPECT_EQ(schema->fields().size(), initial_schema->fields().size());
317+
318+
ICEBERG_UNWRAP_OR_FAIL(auto metadata_schema,
319+
SnapshotUtil::SchemaFor(*table_->metadata(), branch));
320+
EXPECT_EQ(metadata_schema->schema_id(), initial_schema->schema_id());
321+
EXPECT_EQ(metadata_schema->fields().size(), initial_schema->fields().size());
318322
}
319323

320324
TEST_F(SnapshotUtilTest, SchemaForTag) {
321325
// Create a tag pointing to base snapshot
322326
auto metadata = table_->metadata();
327+
auto tag_schema = std::make_shared<Schema>(
328+
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
329+
SchemaField::MakeRequired(2, "data", string()),
330+
SchemaField::MakeOptional(3, "tag_only", string())},
331+
1);
332+
metadata->schemas.push_back(tag_schema);
333+
ICEBERG_UNWRAP_OR_FAIL(auto base_snapshot, table_->SnapshotById(base_snapshot_id_));
334+
base_snapshot->schema_id = tag_schema->schema_id();
335+
323336
std::string tag = "tag1";
324337
metadata->refs[tag] = std::make_shared<SnapshotRef>(
325338
SnapshotRef{.snapshot_id = base_snapshot_id_, .retention = SnapshotRef::Tag{}});
@@ -328,9 +341,14 @@ TEST_F(SnapshotUtilTest, SchemaForTag) {
328341
ASSERT_NE(initial_schema, nullptr);
329342

330343
ICEBERG_UNWRAP_OR_FAIL(auto schema, SnapshotUtil::SchemaFor(*table_, tag));
331-
// Tag should return the schema of the snapshot it points to
332-
// Since base snapshot has schema_id = 0, it should return the same schema
333-
EXPECT_EQ(schema->fields().size(), initial_schema->fields().size());
344+
EXPECT_EQ(schema->schema_id(), tag_schema->schema_id());
345+
EXPECT_EQ(schema->fields().size(), tag_schema->fields().size());
346+
EXPECT_NE(schema->fields().size(), initial_schema->fields().size());
347+
348+
ICEBERG_UNWRAP_OR_FAIL(auto metadata_schema,
349+
SnapshotUtil::SchemaFor(*table_->metadata(), tag));
350+
EXPECT_EQ(metadata_schema->schema_id(), tag_schema->schema_id());
351+
EXPECT_EQ(metadata_schema->fields().size(), tag_schema->fields().size());
334352
}
335353

336354
TEST_F(SnapshotUtilTest, SnapshotAfter) {

0 commit comments

Comments
 (0)