Skip to content

Commit 3e7b20a

Browse files
shangxinliclaude
andauthored
feat: add IncrementalFileCleanup strategy and dispatch in ExpireSnapshots::Finalize (#648)
Mirrors Java's IncrementalFileCleanup for the linear-ancestry case: each manifest is attributed to its writer snapshot, so two passes are enough instead of the full reachability scan. Cherry-pick protection via SnapshotSummaryFields::kSourceSnapshotId is preserved. Finalize() now picks IncrementalFileCleanup when the expiration is "simple" (no explicit snapshot IDs, no removed snapshots outside the current main ancestry, and no retained snapshots outside the current main ancestry), and falls back to ReachableFileCleanup otherwise. The dispatch matches Java RemoveSnapshots.cleanExpiredSnapshots. Two existing cleanup tests (DeletesExpiredFiles, IgnoresExpiredDeleteManifestReadFailures) used an empty current manifest list, which is an unreachable-orphan scenario that only ReachableFileCleanup can resolve. They now call ExpireSnapshotId() to force the reachable path, which keeps their original intent and matches Java behavior. New tests cover both dispatch branches. --------- Co-authored-by: shangxinli <shangxinli@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent bf26218 commit 3e7b20a

6 files changed

Lines changed: 636 additions & 32 deletions

File tree

src/iceberg/manifest/manifest_group.cc

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -262,14 +262,8 @@ Result<std::vector<ManifestEntry>> ManifestGroup::Entries() {
262262

263263
Result<std::unique_ptr<ManifestReader>> ManifestGroup::MakeReader(
264264
const ManifestFile& manifest) {
265-
auto spec_it = specs_by_id_.find(manifest.partition_spec_id);
266-
if (spec_it == specs_by_id_.end()) {
267-
return InvalidArgument("Partition spec {} not found for manifest {}",
268-
manifest.partition_spec_id, manifest.manifest_path);
269-
}
270-
271265
ICEBERG_ASSIGN_OR_RAISE(auto reader,
272-
ManifestReader::Make(manifest, io_, schema_, spec_it->second));
266+
ManifestReader::Make(manifest, io_, schema_, specs_by_id_));
273267

274268
reader->FilterRows(data_filter_)
275269
.FilterPartitions(partition_filter_)

src/iceberg/manifest/manifest_reader.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <algorithm>
2323
#include <memory>
24+
#include <optional>
2425
#include <ranges>
2526
#include <type_traits>
2627
#include <unordered_set>
@@ -998,6 +999,19 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
998999
manifest.first_row_id);
9991000
}
10001001

1002+
Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
1003+
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
1004+
std::shared_ptr<Schema> schema,
1005+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& specs_by_id) {
1006+
auto spec_it = specs_by_id.find(manifest.partition_spec_id);
1007+
if (spec_it == specs_by_id.end() || spec_it->second == nullptr) {
1008+
return InvalidArgument("Partition spec {} not found for manifest {}",
1009+
manifest.partition_spec_id, manifest.manifest_path);
1010+
}
1011+
auto spec = spec_it->second;
1012+
return Make(manifest, std::move(file_io), std::move(schema), std::move(spec));
1013+
}
1014+
10011015
Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
10021016
std::string_view manifest_location, std::optional<int64_t> manifest_length,
10031017
std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> schema,

src/iceberg/manifest/manifest_reader.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,17 @@ class ICEBERG_EXPORT ManifestReader {
9292
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
9393
std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec);
9494

95+
/// \brief Creates a reader for a manifest file using specs keyed by ID.
96+
/// \param manifest A ManifestFile object containing metadata about the manifest.
97+
/// \param file_io File IO implementation to use.
98+
/// \param schema Schema used to bind the partition type.
99+
/// \param specs_by_id Mapping of partition spec ID to PartitionSpec.
100+
/// \return A Result containing the reader or an error.
101+
static Result<std::unique_ptr<ManifestReader>> Make(
102+
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
103+
std::shared_ptr<Schema> schema,
104+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& specs_by_id);
105+
95106
/// \brief Creates a reader for a manifest file.
96107
/// \param manifest_location Path to the manifest file.
97108
/// \param manifest_length Length of the manifest file.

src/iceberg/test/expire_snapshots_test.cc

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
#include "iceberg/avro/avro_register.h"
2929
#include "iceberg/manifest/manifest_entry.h"
3030
#include "iceberg/manifest/manifest_writer.h"
31+
#include "iceberg/partition_spec.h"
32+
#include "iceberg/schema.h"
33+
#include "iceberg/snapshot.h"
3134
#include "iceberg/statistics_file.h"
3235
#include "iceberg/table_metadata.h"
3336
#include "iceberg/test/matchers.h"
@@ -135,6 +138,13 @@ class ExpireSnapshotsCleanupTest : public UpdateTestBase {
135138
return manifest_result.value();
136139
}
137140

141+
ManifestFile AssignManifestSequenceNumber(ManifestFile manifest,
142+
int64_t sequence_number) const {
143+
manifest.sequence_number = sequence_number;
144+
manifest.min_sequence_number = sequence_number;
145+
return manifest;
146+
}
147+
138148
ManifestFile WriteDeleteManifest(const std::string& path, int64_t snapshot_id,
139149
std::vector<ManifestEntry> entries) {
140150
auto writer_result = ManifestWriter::MakeWriter(
@@ -227,6 +237,15 @@ TEST_F(ExpireSnapshotsTest, ExpireById) {
227237
EXPECT_EQ(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
228238
}
229239

240+
TEST_F(ExpireSnapshotsTest, ExpireByIdOverridesRetainLast) {
241+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
242+
update->RetainLast(2);
243+
update->ExpireSnapshotId(3051729675574597004);
244+
245+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
246+
EXPECT_THAT(result.snapshot_ids_to_remove, testing::ElementsAre(3051729675574597004));
247+
}
248+
230249
TEST_F(ExpireSnapshotsTest, ExpireOlderThan) {
231250
struct TestCase {
232251
int64_t expire_older_than;
@@ -243,6 +262,30 @@ TEST_F(ExpireSnapshotsTest, ExpireOlderThan) {
243262
}
244263
}
245264

265+
TEST_F(ExpireSnapshotsCleanupTest, RetainsUnreferencedSnapshotAtExpireThreshold) {
266+
const int64_t unreferenced_snapshot_id = 4055729675574597004;
267+
const int64_t expire_at_ms = 1515100955770;
268+
269+
auto metadata = ReloadMetadata();
270+
metadata->snapshots.push_back(std::make_shared<Snapshot>(Snapshot{
271+
.snapshot_id = unreferenced_snapshot_id,
272+
.parent_snapshot_id = std::nullopt,
273+
.sequence_number = 2,
274+
.timestamp_ms = TimePointMsFromUnixMs(expire_at_ms),
275+
.manifest_list = table_location_ + "/metadata/unreferenced.avro",
276+
.summary = {{SnapshotSummaryFields::kOperation, "append"}},
277+
.schema_id = metadata->current_schema_id,
278+
}));
279+
RewriteTable(std::move(metadata));
280+
281+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
282+
update->ExpireOlderThan(expire_at_ms);
283+
284+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
285+
EXPECT_THAT(result.snapshot_ids_to_remove,
286+
testing::Not(testing::Contains(unreferenced_snapshot_id)));
287+
}
288+
246289
TEST_F(ExpireSnapshotsTest, FinalizeRequiresCommittedMetadata) {
247290
std::vector<std::string> deleted_files;
248291
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
@@ -350,6 +393,8 @@ TEST_F(ExpireSnapshotsCleanupTest, IgnoresExpiredDeleteManifestReadFailures) {
350393

351394
std::vector<std::string> deleted_files;
352395
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
396+
// Force the reachable path.
397+
update->ExpireSnapshotId(kExpiredSnapshotId);
353398
update->DeleteWith(
354399
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
355400

@@ -388,6 +433,7 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredFiles) {
388433

389434
std::vector<std::string> deleted_files;
390435
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
436+
update->ExpireSnapshotId(kExpiredSnapshotId);
391437
update->DeleteWith(
392438
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
393439

@@ -573,4 +619,216 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedPartitionStats) {
573619
EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path)));
574620
}
575621

622+
TEST_F(ExpireSnapshotsCleanupTest, IncrementalDispatchPreservesAncestorAddedFiles) {
623+
const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet";
624+
const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro";
625+
const auto expired_manifest_list_path =
626+
table_location_ + "/metadata/expired-manifest-list.avro";
627+
const auto current_manifest_list_path =
628+
table_location_ + "/metadata/current-manifest-list.avro";
629+
630+
auto expired_data_manifest = WriteDataManifest(
631+
expired_data_manifest_path, kExpiredSnapshotId,
632+
{MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber,
633+
MakeDataFile(expired_data_file_path))});
634+
WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
635+
/*parent_snapshot_id=*/0, kExpiredSequenceNumber,
636+
{expired_data_manifest});
637+
WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId,
638+
kCurrentSequenceNumber, {});
639+
RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path);
640+
641+
std::vector<std::string> deleted_files;
642+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
643+
update->DeleteWith(
644+
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
645+
646+
EXPECT_THAT(update->Commit(), IsOk());
647+
EXPECT_THAT(deleted_files, testing::Contains(expired_data_manifest_path));
648+
EXPECT_THAT(deleted_files, testing::Contains(expired_manifest_list_path));
649+
EXPECT_THAT(deleted_files, testing::Not(testing::Contains(expired_data_file_path)));
650+
}
651+
652+
TEST_F(ExpireSnapshotsCleanupTest, IncrementalDeletesExpiredDeletedEntries) {
653+
const auto deleted_data_file_path =
654+
table_location_ + "/data/deleted-by-expired.parquet";
655+
const auto delete_manifest_path =
656+
table_location_ + "/metadata/expired-delete-entry.avro";
657+
const auto expired_manifest_list_path =
658+
table_location_ + "/metadata/expired-deleted-entry-ml.avro";
659+
const auto current_manifest_list_path =
660+
table_location_ + "/metadata/current-deleted-entry-ml.avro";
661+
662+
auto delete_manifest = WriteDataManifest(
663+
delete_manifest_path, kExpiredSnapshotId,
664+
{MakeEntry(ManifestStatus::kDeleted, kExpiredSnapshotId, kExpiredSequenceNumber,
665+
MakeDataFile(deleted_data_file_path))});
666+
delete_manifest =
667+
AssignManifestSequenceNumber(std::move(delete_manifest), kExpiredSequenceNumber);
668+
WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
669+
/*parent_snapshot_id=*/0, kExpiredSequenceNumber, {delete_manifest});
670+
WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId,
671+
kCurrentSequenceNumber, {delete_manifest});
672+
RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path);
673+
674+
std::vector<std::string> deleted_files;
675+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
676+
update->DeleteWith(
677+
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
678+
679+
EXPECT_THAT(update->Commit(), IsOk());
680+
EXPECT_THAT(deleted_files, testing::Contains(deleted_data_file_path));
681+
EXPECT_THAT(deleted_files, testing::Contains(expired_manifest_list_path));
682+
EXPECT_THAT(deleted_files, testing::Not(testing::Contains(delete_manifest_path)));
683+
}
684+
685+
TEST_F(ExpireSnapshotsCleanupTest, ReachableDispatchDeletesUnreachableData) {
686+
const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet";
687+
const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro";
688+
const auto expired_manifest_list_path =
689+
table_location_ + "/metadata/expired-manifest-list.avro";
690+
const auto current_manifest_list_path =
691+
table_location_ + "/metadata/current-manifest-list.avro";
692+
693+
auto expired_data_manifest = WriteDataManifest(
694+
expired_data_manifest_path, kExpiredSnapshotId,
695+
{MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber,
696+
MakeDataFile(expired_data_file_path))});
697+
WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
698+
/*parent_snapshot_id=*/0, kExpiredSequenceNumber,
699+
{expired_data_manifest});
700+
WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId,
701+
kCurrentSequenceNumber, {});
702+
RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path);
703+
704+
std::vector<std::string> deleted_files;
705+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
706+
update->ExpireSnapshotId(kExpiredSnapshotId);
707+
update->DeleteWith(
708+
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
709+
710+
EXPECT_THAT(update->Commit(), IsOk());
711+
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path,
712+
expired_data_manifest_path,
713+
expired_manifest_list_path));
714+
}
715+
716+
TEST_F(ExpireSnapshotsCleanupTest, IncrementalSkipsCherryPickedSnapshotCleanup) {
717+
const auto picked_data_file_path = table_location_ + "/data/picked-data.parquet";
718+
const auto picked_manifest_path = table_location_ + "/metadata/picked-data.avro";
719+
const auto expired_manifest_list_path =
720+
table_location_ + "/metadata/expired-picked-ml.avro";
721+
const auto current_manifest_list_path =
722+
table_location_ + "/metadata/current-picked-ml.avro";
723+
724+
auto picked_manifest = WriteDataManifest(
725+
picked_manifest_path, kExpiredSnapshotId,
726+
{MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber,
727+
MakeDataFile(picked_data_file_path))});
728+
picked_manifest =
729+
AssignManifestSequenceNumber(std::move(picked_manifest), kExpiredSequenceNumber);
730+
WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
731+
/*parent_snapshot_id=*/0, kExpiredSequenceNumber, {picked_manifest});
732+
WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId,
733+
kCurrentSequenceNumber, {picked_manifest});
734+
735+
auto metadata = ReloadMetadata();
736+
ASSERT_EQ(metadata->snapshots.size(), 2);
737+
metadata->snapshots.at(0)->manifest_list = expired_manifest_list_path;
738+
metadata->snapshots.at(1)->manifest_list = current_manifest_list_path;
739+
metadata->snapshots.at(1)->summary[SnapshotSummaryFields::kSourceSnapshotId] =
740+
std::to_string(kExpiredSnapshotId);
741+
RewriteTable(std::move(metadata));
742+
743+
std::vector<std::string> deleted_files;
744+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
745+
update->DeleteWith(
746+
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
747+
748+
EXPECT_THAT(update->Commit(), IsOk());
749+
EXPECT_TRUE(deleted_files.empty());
750+
auto committed_metadata = ReloadMetadata();
751+
EXPECT_EQ(committed_metadata->snapshots.size(), 1);
752+
EXPECT_EQ(committed_metadata->snapshots.at(0)->snapshot_id, kCurrentSnapshotId);
753+
}
754+
755+
TEST_F(ExpireSnapshotsCleanupTest, ReachableCleanupFailsClosedOnUnbindableExpiredSpec) {
756+
const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet";
757+
const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro";
758+
const auto expired_manifest_list_path =
759+
table_location_ + "/metadata/expired-manifest-list.avro";
760+
const auto current_manifest_list_path =
761+
table_location_ + "/metadata/current-manifest-list.avro";
762+
763+
auto expired_data_manifest = WriteDataManifest(
764+
expired_data_manifest_path, kExpiredSnapshotId,
765+
{MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber,
766+
MakeDataFile(expired_data_file_path))});
767+
WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
768+
/*parent_snapshot_id=*/0, kExpiredSequenceNumber,
769+
{expired_data_manifest});
770+
WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId,
771+
kCurrentSequenceNumber, {});
772+
773+
auto metadata = ReloadMetadata();
774+
ASSERT_EQ(metadata->snapshots.size(), 2);
775+
metadata->snapshots.at(0)->manifest_list = expired_manifest_list_path;
776+
metadata->snapshots.at(1)->manifest_list = current_manifest_list_path;
777+
ICEBERG_UNWRAP_OR_FAIL(auto retained_spec, PartitionSpec::Make(/*spec_id=*/1, {}));
778+
metadata->partition_specs.push_back(
779+
std::shared_ptr<PartitionSpec>(std::move(retained_spec)));
780+
metadata->default_spec_id = 1;
781+
ICEBERG_UNWRAP_OR_FAIL(
782+
auto retained_schema,
783+
Schema::Make(std::vector<SchemaField>{SchemaField::MakeRequired(2, "y", int64()),
784+
SchemaField::MakeRequired(3, "z", int64())},
785+
/*schema_id=*/2, std::vector<int32_t>{}));
786+
metadata->schemas.push_back(std::shared_ptr<Schema>(std::move(retained_schema)));
787+
metadata->current_schema_id = 2;
788+
metadata->snapshots.at(1)->schema_id = 2;
789+
RewriteTable(std::move(metadata));
790+
791+
std::vector<std::string> deleted_files;
792+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
793+
update->ExpireSnapshotId(kExpiredSnapshotId);
794+
update->CleanExpiredMetadata(true);
795+
update->DeleteWith(
796+
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
797+
798+
EXPECT_THAT(update->Commit(), IsOk());
799+
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path,
800+
expired_manifest_list_path));
801+
EXPECT_THAT(deleted_files, testing::Not(testing::Contains(expired_data_file_path)));
802+
}
803+
804+
TEST_F(ExpireSnapshotsCleanupTest, CommitIgnoresMalformedSourceSnapshotIdCleanup) {
805+
const auto expired_manifest_list_path =
806+
table_location_ + "/metadata/expired-malformed-ml.avro";
807+
const auto current_manifest_list_path =
808+
table_location_ + "/metadata/current-malformed-ml.avro";
809+
WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
810+
/*parent_snapshot_id=*/0, kExpiredSequenceNumber, {});
811+
WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId,
812+
kCurrentSequenceNumber, {});
813+
814+
auto metadata = ReloadMetadata();
815+
ASSERT_EQ(metadata->snapshots.size(), 2);
816+
metadata->snapshots.at(0)->manifest_list = expired_manifest_list_path;
817+
metadata->snapshots.at(1)->manifest_list = current_manifest_list_path;
818+
metadata->snapshots.at(1)->summary[SnapshotSummaryFields::kSourceSnapshotId] =
819+
"not-a-number";
820+
RewriteTable(std::move(metadata));
821+
822+
std::vector<std::string> deleted_files;
823+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
824+
update->DeleteWith(
825+
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
826+
827+
EXPECT_THAT(update->Commit(), IsOk());
828+
EXPECT_TRUE(deleted_files.empty());
829+
auto committed_metadata = ReloadMetadata();
830+
EXPECT_EQ(committed_metadata->snapshots.size(), 1);
831+
EXPECT_EQ(committed_metadata->snapshots.at(0)->snapshot_id, kCurrentSnapshotId);
832+
}
833+
576834
} // namespace iceberg

0 commit comments

Comments
 (0)