Skip to content

Commit 596c5d2

Browse files
committed
feat: add file cleanup for expire snapshots
Implement the file cleanup logic that was missing from the expire snapshots feature (the original PR noted "TODO: File recycling will be added in a followup PR"). Port the "reachable file cleanup" strategy from Java's ReachableFileCleanup, following the same phased approach: Phase 1: Collect manifest paths from expired and retained snapshots Phase 2: Prune manifests still referenced by retained snapshots Phase 3: Find data files only in manifests being deleted, subtract files still reachable from retained manifests (kAll only) Phase 4: Delete orphaned manifest files Phase 5: Delete manifest lists from expired snapshots Phase 6: Delete expired statistics and partition statistics files Key design decisions matching Java parity: - Best-effort deletion: suppress errors on individual file deletions to avoid blocking metadata updates (Java suppressFailureWhenFinished) - Branch/tag awareness: retained snapshot set includes all snapshots reachable from any ref (branch or tag), preventing false-positive deletions of files still referenced by non-main branches - Data file safety: only delete data files from manifests that are themselves being deleted, then subtract any files still reachable from retained manifests (two-pass approach from ReachableFileCleanup) - Respect CleanupLevel: kNone skips all, kMetadataOnly skips data files, kAll cleans everything - FileIO abstraction: uses FileIO::DeleteFile for filesystem compatibility (S3, HDFS, local), with custom DeleteWith() override - Statistics cleanup via snapshot ID membership in retained set TODOs for follow-up: - Multi-threaded file deletion (Java uses Tasks.foreach with executor) - IncrementalFileCleanup strategy for linear ancestry optimization (Java uses this when no branches/cherry-picks involved)
1 parent 1afe65c commit 596c5d2

File tree

3 files changed

+382
-0
lines changed

3 files changed

+382
-0
lines changed

src/iceberg/test/expire_snapshots_test.cc

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
#include "iceberg/update/expire_snapshots.h"
2121

22+
#include <string>
23+
#include <vector>
24+
2225
#include "iceberg/test/matchers.h"
2326
#include "iceberg/test/update_test_base.h"
2427

@@ -65,4 +68,88 @@ TEST_F(ExpireSnapshotsTest, ExpireOlderThan) {
6568
}
6669
}
6770

71+
TEST_F(ExpireSnapshotsTest, DeleteWithCustomFunction) {
72+
std::vector<std::string> deleted_files;
73+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
74+
update->DeleteWith(
75+
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
76+
77+
// Apply first so apply_result_ is cached
78+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
79+
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
80+
81+
// Call Finalize directly to simulate successful commit
82+
// Note: Finalize tries to read manifests from the expired snapshot's manifest list,
83+
// which will fail on mock FS since "s3://a/b/1.avro" doesn't contain real avro data.
84+
// The error is returned from Finalize but in the real commit flow it's ignored.
85+
auto finalize_status = update->Finalize(std::nullopt);
86+
// Finalize may fail because manifest list files don't exist on mock FS,
87+
// but it should not crash
88+
if (finalize_status.has_value()) {
89+
// If it succeeded (e.g., if manifest reading was skipped), verify deletions
90+
EXPECT_FALSE(deleted_files.empty());
91+
}
92+
}
93+
94+
TEST_F(ExpireSnapshotsTest, CleanupLevelNoneSkipsFileDeletion) {
95+
std::vector<std::string> deleted_files;
96+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
97+
update->CleanupLevel(CleanupLevel::kNone);
98+
update->DeleteWith(
99+
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
100+
101+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
102+
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
103+
104+
// With kNone cleanup level, Finalize should skip all file deletion
105+
auto finalize_status = update->Finalize(std::nullopt);
106+
EXPECT_THAT(finalize_status, IsOk());
107+
EXPECT_TRUE(deleted_files.empty());
108+
}
109+
110+
TEST_F(ExpireSnapshotsTest, FinalizeSkippedOnCommitError) {
111+
std::vector<std::string> deleted_files;
112+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
113+
update->DeleteWith(
114+
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
115+
116+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
117+
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
118+
119+
// Simulate a commit failure - Finalize should not delete any files
120+
auto finalize_status = update->Finalize(
121+
Error{.kind = ErrorKind::kCommitFailed, .message = "simulated failure"});
122+
EXPECT_THAT(finalize_status, IsOk());
123+
EXPECT_TRUE(deleted_files.empty());
124+
}
125+
126+
TEST_F(ExpireSnapshotsTest, FinalizeSkippedWhenNoSnapshotsExpired) {
127+
std::vector<std::string> deleted_files;
128+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
129+
update->RetainLast(2);
130+
update->DeleteWith(
131+
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
132+
133+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
134+
EXPECT_TRUE(result.snapshot_ids_to_remove.empty());
135+
136+
// No snapshots expired, so Finalize should not delete any files
137+
auto finalize_status = update->Finalize(std::nullopt);
138+
EXPECT_THAT(finalize_status, IsOk());
139+
EXPECT_TRUE(deleted_files.empty());
140+
}
141+
142+
TEST_F(ExpireSnapshotsTest, CommitWithCleanupLevelNone) {
143+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
144+
update->CleanupLevel(CleanupLevel::kNone);
145+
146+
// Commit should succeed - Finalize is called internally but skips cleanup
147+
EXPECT_THAT(update->Commit(), IsOk());
148+
149+
// Verify snapshot was removed from metadata
150+
auto metadata = ReloadMetadata();
151+
EXPECT_EQ(metadata->snapshots.size(), 1);
152+
EXPECT_EQ(metadata->snapshots.at(0)->snapshot_id, 3055729675574597004);
153+
}
154+
68155
} // namespace iceberg

src/iceberg/update/expire_snapshots.cc

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,17 @@
2323
#include <cstdint>
2424
#include <iterator>
2525
#include <memory>
26+
#include <optional>
27+
#include <string>
2628
#include <unordered_set>
2729
#include <vector>
2830

31+
#include "iceberg/file_io.h"
32+
#include "iceberg/manifest/manifest_entry.h"
33+
#include "iceberg/manifest/manifest_reader.h"
2934
#include "iceberg/schema.h"
3035
#include "iceberg/snapshot.h"
36+
#include "iceberg/statistics_file.h"
3137
#include "iceberg/table.h"
3238
#include "iceberg/table_metadata.h"
3339
#include "iceberg/transaction.h"
@@ -286,7 +292,247 @@ Result<ExpireSnapshots::ApplyResult> ExpireSnapshots::Apply() {
286292
});
287293
}
288294

295+
// Cache the result for use during Finalize()
296+
apply_result_ = result;
297+
289298
return result;
290299
}
291300

301+
Status ExpireSnapshots::Finalize(std::optional<Error> commit_error) {
302+
if (commit_error.has_value()) {
303+
return {};
304+
}
305+
306+
if (cleanup_level_ == CleanupLevel::kNone) {
307+
return {};
308+
}
309+
310+
if (!apply_result_.has_value() || apply_result_->snapshot_ids_to_remove.empty()) {
311+
return {};
312+
}
313+
314+
// File cleanup is best-effort: log and continue on individual file deletion failures
315+
// to avoid blocking metadata updates (matching Java behavior).
316+
return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove);
317+
}
318+
319+
void ExpireSnapshots::DeleteFilePath(const std::string& path) {
320+
try {
321+
if (delete_func_) {
322+
delete_func_(path);
323+
} else {
324+
auto status = transaction_->table()->io()->DeleteFile(path);
325+
// Best-effort: ignore NotFound (file already deleted) and other errors.
326+
// Java uses suppressFailureWhenFinished + onFailure logging.
327+
std::ignore = status;
328+
}
329+
} catch (...) {
330+
// Suppress all exceptions during file cleanup to match Java's
331+
// suppressFailureWhenFinished behavior.
332+
}
333+
}
334+
335+
Status ExpireSnapshots::ReadManifestsForSnapshot(
336+
int64_t snapshot_id, std::unordered_set<std::string>& manifest_paths) {
337+
const TableMetadata& metadata = base();
338+
auto file_io = transaction_->table()->io();
339+
340+
auto snapshot_result = metadata.SnapshotById(snapshot_id);
341+
if (!snapshot_result.has_value()) {
342+
return {};
343+
}
344+
auto& snapshot = snapshot_result.value();
345+
346+
SnapshotCache snapshot_cache(snapshot.get());
347+
auto manifests_result = snapshot_cache.Manifests(file_io);
348+
if (!manifests_result.has_value()) {
349+
// Best-effort: skip this snapshot if we can't read its manifests
350+
return {};
351+
}
352+
353+
for (const auto& manifest : manifests_result.value()) {
354+
manifest_paths.insert(manifest.manifest_path);
355+
}
356+
357+
return {};
358+
}
359+
360+
Status ExpireSnapshots::FindDataFilesToDelete(
361+
const std::unordered_set<std::string>& manifests_to_delete,
362+
const std::unordered_set<std::string>& retained_manifests,
363+
std::unordered_set<std::string>& data_files_to_delete) {
364+
const TableMetadata& metadata = base();
365+
auto file_io = transaction_->table()->io();
366+
367+
// Step 1: Collect all file paths from manifests being deleted
368+
for (const auto& manifest_path : manifests_to_delete) {
369+
// Find the ManifestFile for this path by scanning expired snapshots
370+
for (const auto& snapshot : metadata.snapshots) {
371+
if (!snapshot) continue;
372+
SnapshotCache snapshot_cache(snapshot.get());
373+
auto manifests_result = snapshot_cache.Manifests(file_io);
374+
if (!manifests_result.has_value()) continue;
375+
376+
for (const auto& manifest : manifests_result.value()) {
377+
if (manifest.manifest_path != manifest_path) continue;
378+
379+
auto schema_result = metadata.Schema();
380+
if (!schema_result.has_value()) continue;
381+
auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id);
382+
if (!spec_result.has_value()) continue;
383+
384+
auto reader_result = ManifestReader::Make(
385+
manifest, file_io, schema_result.value(), spec_result.value());
386+
if (!reader_result.has_value()) continue;
387+
388+
auto entries_result = reader_result.value()->Entries();
389+
if (!entries_result.has_value()) continue;
390+
391+
for (const auto& entry : entries_result.value()) {
392+
if (entry.data_file) {
393+
data_files_to_delete.insert(entry.data_file->file_path);
394+
}
395+
}
396+
goto next_manifest; // Found and processed this manifest, move to next
397+
}
398+
}
399+
next_manifest:;
400+
}
401+
402+
if (data_files_to_delete.empty()) {
403+
return {};
404+
}
405+
406+
// Step 2: Remove any files that are still referenced by retained manifests.
407+
// This ensures we don't delete files that are shared across manifests.
408+
for (const auto& manifest_path : retained_manifests) {
409+
if (data_files_to_delete.empty()) break;
410+
411+
for (const auto& snapshot : metadata.snapshots) {
412+
if (!snapshot) continue;
413+
SnapshotCache snapshot_cache(snapshot.get());
414+
auto manifests_result = snapshot_cache.Manifests(file_io);
415+
if (!manifests_result.has_value()) continue;
416+
417+
for (const auto& manifest : manifests_result.value()) {
418+
if (manifest.manifest_path != manifest_path) continue;
419+
420+
auto schema_result = metadata.Schema();
421+
if (!schema_result.has_value()) continue;
422+
auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id);
423+
if (!spec_result.has_value()) continue;
424+
425+
auto reader_result = ManifestReader::Make(
426+
manifest, file_io, schema_result.value(), spec_result.value());
427+
if (!reader_result.has_value()) continue;
428+
429+
auto entries_result = reader_result.value()->Entries();
430+
if (!entries_result.has_value()) continue;
431+
432+
for (const auto& entry : entries_result.value()) {
433+
if (entry.data_file) {
434+
data_files_to_delete.erase(entry.data_file->file_path);
435+
}
436+
}
437+
goto next_retained;
438+
}
439+
}
440+
next_retained:;
441+
}
442+
443+
return {};
444+
}
445+
446+
Status ExpireSnapshots::CleanExpiredFiles(
447+
const std::vector<int64_t>& expired_snapshot_ids) {
448+
const TableMetadata& metadata = base();
449+
450+
// Build expired and retained snapshot ID sets.
451+
// The retained set includes ALL snapshots referenced by any branch or tag,
452+
// since Apply() already computed retention across all refs.
453+
std::unordered_set<int64_t> expired_id_set(expired_snapshot_ids.begin(),
454+
expired_snapshot_ids.end());
455+
std::unordered_set<int64_t> retained_snapshot_ids;
456+
for (const auto& snapshot : metadata.snapshots) {
457+
if (snapshot && !expired_id_set.contains(snapshot->snapshot_id)) {
458+
retained_snapshot_ids.insert(snapshot->snapshot_id);
459+
}
460+
}
461+
462+
// Phase 1: Collect manifest paths from expired and retained snapshots.
463+
// TODO(shangxinli): Parallelize manifest collection with a thread pool.
464+
std::unordered_set<std::string> expired_manifest_paths;
465+
for (int64_t snapshot_id : expired_snapshot_ids) {
466+
std::ignore = ReadManifestsForSnapshot(snapshot_id, expired_manifest_paths);
467+
}
468+
469+
std::unordered_set<std::string> retained_manifest_paths;
470+
for (int64_t snapshot_id : retained_snapshot_ids) {
471+
std::ignore = ReadManifestsForSnapshot(snapshot_id, retained_manifest_paths);
472+
}
473+
474+
// Phase 2: Prune manifests still referenced by retained snapshots.
475+
// Only manifests exclusively in expired snapshots should be deleted.
476+
std::unordered_set<std::string> manifests_to_delete;
477+
for (const auto& path : expired_manifest_paths) {
478+
if (!retained_manifest_paths.contains(path)) {
479+
manifests_to_delete.insert(path);
480+
}
481+
}
482+
483+
// Phase 3: If cleanup level is kAll, find data files to delete.
484+
// Only read entries from manifests being deleted (not all expired manifests),
485+
// then subtract any files still reachable from retained manifests.
486+
if (cleanup_level_ == CleanupLevel::kAll && !manifests_to_delete.empty()) {
487+
std::unordered_set<std::string> data_files_to_delete;
488+
std::ignore = FindDataFilesToDelete(manifests_to_delete, retained_manifest_paths,
489+
data_files_to_delete);
490+
491+
// TODO(shangxinli): Parallelize file deletion with a thread pool.
492+
for (const auto& path : data_files_to_delete) {
493+
DeleteFilePath(path);
494+
}
495+
}
496+
497+
// Phase 4: Delete orphaned manifest files.
498+
for (const auto& path : manifests_to_delete) {
499+
DeleteFilePath(path);
500+
}
501+
502+
// Phase 5: Delete manifest lists from expired snapshots.
503+
for (int64_t snapshot_id : expired_snapshot_ids) {
504+
auto snapshot_result = metadata.SnapshotById(snapshot_id);
505+
if (!snapshot_result.has_value()) continue;
506+
auto& snapshot = snapshot_result.value();
507+
if (!snapshot->manifest_list.empty()) {
508+
DeleteFilePath(snapshot->manifest_list);
509+
}
510+
}
511+
512+
// Phase 6: Delete expired statistics files.
513+
// Use set difference between before and after states (matching Java behavior).
514+
// Since Finalize runs before table_ is updated, "after" is base() minus expired.
515+
std::unordered_set<int64_t> retained_stats_snapshots(retained_snapshot_ids);
516+
for (const auto& stat_file : metadata.statistics) {
517+
if (stat_file && !retained_stats_snapshots.contains(stat_file->snapshot_id)) {
518+
DeleteFilePath(stat_file->path);
519+
}
520+
}
521+
for (const auto& part_stat : metadata.partition_statistics) {
522+
if (part_stat && !retained_stats_snapshots.contains(part_stat->snapshot_id)) {
523+
DeleteFilePath(part_stat->path);
524+
}
525+
}
526+
527+
return {};
528+
}
529+
530+
// TODO(shangxinli): Implement IncrementalFileCleanup strategy for linear ancestry
531+
// optimization. Java uses this when: !specifiedSnapshotId && simple linear main branch
532+
// ancestry (no non-main snapshots removed, no non-main snapshots remain).
533+
// The incremental strategy is more efficient because it only needs to scan
534+
// manifests written by expired snapshots (checking added_snapshot_id), avoiding
535+
// the full reachability analysis. It also handles cherry-pick protection via
536+
// SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP.
537+
292538
} // namespace iceberg

0 commit comments

Comments
 (0)