diff --git a/src/iceberg/test/expire_snapshots_test.cc b/src/iceberg/test/expire_snapshots_test.cc index dbc577a71..59c3c2ca6 100644 --- a/src/iceberg/test/expire_snapshots_test.cc +++ b/src/iceberg/test/expire_snapshots_test.cc @@ -19,6 +19,9 @@ #include "iceberg/update/expire_snapshots.h" +#include +#include + #include "iceberg/test/matchers.h" #include "iceberg/test/update_test_base.h" @@ -65,4 +68,88 @@ TEST_F(ExpireSnapshotsTest, ExpireOlderThan) { } } +TEST_F(ExpireSnapshotsTest, DeleteWithCustomFunction) { + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + // Apply first so apply_result_ is cached + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1); + + // Call Finalize directly to simulate successful commit + // Note: Finalize tries to read manifests from the expired snapshot's manifest list, + // which will fail on mock FS since "s3://a/b/1.avro" doesn't contain real avro data. + // The error is returned from Finalize but in the real commit flow it's ignored. + auto finalize_status = update->Finalize(std::nullopt); + // Finalize may fail because manifest list files don't exist on mock FS, + // but it should not crash + if (finalize_status.has_value()) { + // If it succeeded (e.g., if manifest reading was skipped), verify deletions + EXPECT_FALSE(deleted_files.empty()); + } +} + +TEST_F(ExpireSnapshotsTest, CleanupLevelNoneSkipsFileDeletion) { + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->CleanupLevel(CleanupLevel::kNone); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1); + + // With kNone cleanup level, Finalize should skip all file deletion + auto finalize_status = update->Finalize(std::nullopt); + EXPECT_THAT(finalize_status, IsOk()); + EXPECT_TRUE(deleted_files.empty()); +} + +TEST_F(ExpireSnapshotsTest, FinalizeSkippedOnCommitError) { + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1); + + // Simulate a commit failure - Finalize should not delete any files + auto finalize_status = update->Finalize( + Error{.kind = ErrorKind::kCommitFailed, .message = "simulated failure"}); + EXPECT_THAT(finalize_status, IsOk()); + EXPECT_TRUE(deleted_files.empty()); +} + +TEST_F(ExpireSnapshotsTest, FinalizeSkippedWhenNoSnapshotsExpired) { + std::vector deleted_files; + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->RetainLast(2); + update->DeleteWith( + [&deleted_files](const std::string& path) { deleted_files.push_back(path); }); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_TRUE(result.snapshot_ids_to_remove.empty()); + + // No snapshots expired, so Finalize should not delete any files + auto finalize_status = update->Finalize(std::nullopt); + EXPECT_THAT(finalize_status, IsOk()); + EXPECT_TRUE(deleted_files.empty()); +} + +TEST_F(ExpireSnapshotsTest, CommitWithCleanupLevelNone) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->CleanupLevel(CleanupLevel::kNone); + + // Commit should succeed - Finalize is called internally but skips cleanup + EXPECT_THAT(update->Commit(), IsOk()); + + // Verify snapshot was removed from metadata + auto metadata = ReloadMetadata(); + EXPECT_EQ(metadata->snapshots.size(), 1); + EXPECT_EQ(metadata->snapshots.at(0)->snapshot_id, 3055729675574597004); +} + } // namespace iceberg diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index 722ae7a42..257656946 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -23,11 +23,17 @@ #include #include #include +#include +#include #include #include +#include "iceberg/file_io.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_reader.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" +#include "iceberg/statistics_file.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" #include "iceberg/transaction.h" @@ -37,6 +43,264 @@ namespace iceberg { +namespace { + +Result> MakeManifestReader( + const ManifestFile& manifest, const std::shared_ptr& file_io, + const TableMetadata& metadata) { + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto spec, + metadata.PartitionSpecById(manifest.partition_spec_id)); + return ManifestReader::Make(manifest, file_io, std::move(schema), std::move(spec)); +} + +/// \brief Abstract strategy for cleaning up files after snapshot expiration. +/// +/// Mirrors Java's FileCleanupStrategy: provides shared delete utilities while +/// allowing different cleanup algorithms (ReachableFileCleanup, IncrementalFileCleanup). +class FileCleanupStrategy { + public: + FileCleanupStrategy(std::shared_ptr file_io, + std::function delete_func) + : file_io_(std::move(file_io)), delete_func_(std::move(delete_func)) {} + + virtual ~FileCleanupStrategy() = default; + + /// \brief Clean up files that are only reachable by expired snapshots. + /// + /// \param metadata Table metadata before expiration (contains all snapshots). + /// \param expired_snapshot_ids Snapshot IDs that were expired during this operation. + /// \param level Controls which types of files are eligible for deletion. + virtual Status CleanFiles(const TableMetadata& metadata, + const std::unordered_set& expired_snapshot_ids, + CleanupLevel level) = 0; + + protected: + /// \brief Delete a file, suppressing errors (best-effort). + /// + /// Uses the custom delete function if set, otherwise FileIO::DeleteFile. + /// Matches Java's suppressFailureWhenFinished behavior. + void DeleteFile(const std::string& path) { + try { + if (delete_func_) { + delete_func_(path); + } else { + std::ignore = file_io_->DeleteFile(path); + } + } catch (...) { + // Suppress all exceptions during file cleanup to match Java's + // suppressFailureWhenFinished behavior. + } + } + + /// \brief Returns paths of statistics files referenced only by expired snapshots. + /// + /// Uses path-based set difference (matching Java's expiredStatisticsFilesLocations): + /// if the same file path is shared across snapshots, it is only deleted when + /// no retained snapshot references it. + std::unordered_set ExpiredStatisticsFilePaths( + const TableMetadata& metadata, const std::unordered_set& expired_ids) { + std::unordered_set retained_paths; + for (const auto& stat : metadata.statistics) { + if (stat && !expired_ids.contains(stat->snapshot_id)) { + retained_paths.insert(stat->path); + } + } + for (const auto& part_stat : metadata.partition_statistics) { + if (part_stat && !expired_ids.contains(part_stat->snapshot_id)) { + retained_paths.insert(part_stat->path); + } + } + + std::unordered_set expired_paths; + for (const auto& stat : metadata.statistics) { + if (stat && expired_ids.contains(stat->snapshot_id) && + !retained_paths.contains(stat->path)) { + expired_paths.insert(stat->path); + } + } + for (const auto& part_stat : metadata.partition_statistics) { + if (part_stat && expired_ids.contains(part_stat->snapshot_id) && + !retained_paths.contains(part_stat->path)) { + expired_paths.insert(part_stat->path); + } + } + return expired_paths; + } + + std::shared_ptr file_io_; + std::function delete_func_; +}; + +/// \brief File cleanup strategy that determines safe deletions via full reachability. +/// +/// Mirrors Java's ReachableFileCleanup: collects manifests from all expired and +/// retained snapshots, prunes candidates still referenced by retained snapshots, +/// then deletes orphaned manifests, data files, manifest lists, and statistics files. +/// +/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion support. +class ReachableFileCleanup : public FileCleanupStrategy { + public: + using FileCleanupStrategy::FileCleanupStrategy; + + Status CleanFiles(const TableMetadata& metadata, + const std::unordered_set& expired_snapshot_ids, + CleanupLevel level) override { + std::unordered_set retained_snapshot_ids; + for (const auto& snapshot : metadata.snapshots) { + if (snapshot && !expired_snapshot_ids.contains(snapshot->snapshot_id)) { + retained_snapshot_ids.insert(snapshot->snapshot_id); + } + } + + // Phase 1: Collect manifest paths from expired and retained snapshots. + // The manifest_cache_ is populated here to avoid O(M*S) repeated I/O in + // FindDataFilesToDelete. + std::unordered_set expired_manifest_paths; + for (int64_t snapshot_id : expired_snapshot_ids) { + ReadManifestsForSnapshot(metadata, snapshot_id, expired_manifest_paths); + } + std::unordered_set retained_manifest_paths; + for (int64_t snapshot_id : retained_snapshot_ids) { + ReadManifestsForSnapshot(metadata, snapshot_id, retained_manifest_paths); + } + + // Phase 2: Prune manifests still referenced by retained snapshots. + std::unordered_set manifests_to_delete; + for (const auto& path : expired_manifest_paths) { + if (!retained_manifest_paths.contains(path)) { + manifests_to_delete.insert(path); + } + } + + // Phase 3: Delete data files if cleanup level is kAll. + if (level == CleanupLevel::kAll && !manifests_to_delete.empty()) { + auto data_files_result = + FindDataFilesToDelete(metadata, manifests_to_delete, retained_manifest_paths); + if (data_files_result.has_value()) { + for (const auto& path : data_files_result.value()) { + DeleteFile(path); + } + } + } + + // Phase 4: Delete orphaned manifest files. + for (const auto& path : manifests_to_delete) { + DeleteFile(path); + } + + // Phase 5: Delete manifest lists from expired snapshots. + for (int64_t snapshot_id : expired_snapshot_ids) { + auto snapshot_result = metadata.SnapshotById(snapshot_id); + if (!snapshot_result.has_value()) continue; + const auto& snapshot = snapshot_result.value(); + if (!snapshot->manifest_list.empty()) { + DeleteFile(snapshot->manifest_list); + } + } + + // Phase 6: Delete expired statistics files using path-based set difference. + for (const auto& path : ExpiredStatisticsFilePaths(metadata, expired_snapshot_ids)) { + DeleteFile(path); + } + + return {}; + } + + private: + /// Cache of manifest path -> ManifestFile, populated during Phase 1 to avoid + /// re-reading manifest lists in FindDataFilesToDelete. + std::unordered_map manifest_cache_; + + /// \brief Collect manifest paths for a snapshot into manifest_paths. + /// + /// Best-effort: if the snapshot or its manifest list cannot be read, the error + /// is silently suppressed. This is safe for expired snapshots (missed deletions + /// can be cleaned up by GC later) and conservative for retained snapshots (we + /// only delete files we can confirm are unreachable). + void ReadManifestsForSnapshot(const TableMetadata& metadata, int64_t snapshot_id, + std::unordered_set& manifest_paths) { + auto snapshot_result = metadata.SnapshotById(snapshot_id); + if (!snapshot_result.has_value()) return; + auto& snapshot = snapshot_result.value(); + + SnapshotCache snapshot_cache(snapshot.get()); + auto manifests_result = snapshot_cache.Manifests(file_io_); + if (!manifests_result.has_value()) return; + + for (const auto& manifest : manifests_result.value()) { + manifest_paths.insert(manifest.manifest_path); + manifest_cache_.emplace(manifest.manifest_path, manifest); + } + } + + /// \brief Find data files to delete from manifests being removed. + /// + /// Reads live entries (ADDED/EXISTING) from manifests_to_delete, then subtracts + /// any files still referenced by retained_manifests. Uses LiveEntries() to match + /// Java's ManifestFiles.readPaths (delegates to liveEntries()). + /// + /// If any retained manifest cannot be read, returns an empty set to prevent + /// accidental data loss (matching Java's throwFailureWhenFinished for retained + /// manifest reads). + Result> FindDataFilesToDelete( + const TableMetadata& metadata, + const std::unordered_set& manifests_to_delete, + const std::unordered_set& retained_manifests) { + std::unordered_set data_files_to_delete; + + // Step 1: Collect live file paths from manifests being deleted. + for (const auto& [path, manifest] : manifest_cache_) { + if (!manifests_to_delete.contains(path)) continue; + + auto reader_result = MakeManifestReader(manifest, file_io_, metadata); + if (!reader_result.has_value()) continue; + + auto entries_result = reader_result.value()->LiveEntries(); + if (!entries_result.has_value()) continue; + + for (const auto& entry : entries_result.value()) { + if (entry.data_file) { + data_files_to_delete.insert(entry.data_file->file_path); + } + } + } + + if (data_files_to_delete.empty()) { + return data_files_to_delete; + } + + // Step 2: Remove files still referenced by retained manifests. + // Abort entirely if a retained manifest cannot be read to prevent data loss. + for (const auto& manifest_path : retained_manifests) { + if (data_files_to_delete.empty()) break; + + auto it = manifest_cache_.find(manifest_path); + if (it == manifest_cache_.end()) continue; + + auto reader_result = MakeManifestReader(it->second, file_io_, metadata); + if (!reader_result.has_value()) { + return std::unordered_set{}; + } + + auto entries_result = reader_result.value()->LiveEntries(); + if (!entries_result.has_value()) { + return std::unordered_set{}; + } + + for (const auto& entry : entries_result.value()) { + if (entry.data_file) { + data_files_to_delete.erase(entry.data_file->file_path); + } + } + } + + return data_files_to_delete; + } +}; + +} // namespace + Result> ExpireSnapshots::Make( std::shared_ptr ctx) { ICEBERG_PRECHECK(ctx != nullptr, "Cannot create ExpireSnapshots without a context"); @@ -285,7 +549,41 @@ Result ExpireSnapshots::Apply() { }); } + // Cache the result for use during Finalize() + apply_result_ = result; + return result; } +Status ExpireSnapshots::Finalize(std::optional commit_error) { + if (commit_error.has_value()) { + return {}; + } + + if (cleanup_level_ == CleanupLevel::kNone) { + return {}; + } + + if (!apply_result_.has_value() || apply_result_->snapshot_ids_to_remove.empty()) { + return {}; + } + + std::unordered_set expired_ids(apply_result_->snapshot_ids_to_remove.begin(), + apply_result_->snapshot_ids_to_remove.end()); + apply_result_.reset(); + + // File cleanup is best-effort: log and continue on individual file deletion failures + // to avoid blocking metadata updates (matching Java behavior). + ReachableFileCleanup strategy(ctx_->table->io(), delete_func_); + return strategy.CleanFiles(base(), expired_ids, cleanup_level_); +} + +// TODO(shangxinli): Implement IncrementalFileCleanup strategy for linear ancestry +// optimization. Java uses this when: !specifiedSnapshotId && simple linear main branch +// ancestry (no non-main snapshots removed, no non-main snapshots remain). +// The incremental strategy is more efficient because it only needs to scan +// manifests written by expired snapshots (checking added_snapshot_id), avoiding +// the full reachability analysis. It also handles cherry-pick protection via +// SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP. + } // namespace iceberg diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h index bc05d810d..fc2910b58 100644 --- a/src/iceberg/update/expire_snapshots.h +++ b/src/iceberg/update/expire_snapshots.h @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include #include @@ -142,6 +144,16 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { /// \return The results of changes Result Apply(); + /// \brief Finalize the expire snapshots update, cleaning up expired files. + /// + /// After a successful commit, this method deletes manifest files, manifest lists, + /// data files, and statistics files that are no longer referenced by any valid + /// snapshot. The cleanup behavior is controlled by the CleanupLevel setting. + /// + /// \param commit_error An optional error indicating whether the commit was successful + /// \return Status indicating success or failure + Status Finalize(std::optional commit_error) override; + private: explicit ExpireSnapshots(std::shared_ptr ctx); @@ -159,7 +171,6 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { Result> UnreferencedSnapshotIdsToRetain( const SnapshotToRef& refs) const; - private: const TimePointMs current_time_ms_; const int64_t default_max_ref_age_ms_; int32_t default_min_num_snapshots_; @@ -169,6 +180,9 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { enum CleanupLevel cleanup_level_ { CleanupLevel::kAll }; bool clean_expired_metadata_{false}; bool specified_snapshot_id_{false}; + + /// Cached result from Apply(), consumed by Finalize() and cleared after use. + std::optional apply_result_; }; } // namespace iceberg