Skip to content

Commit 4f36084

Browse files
shangxinliclaude
andcommitted
Address review feedback from Gang Wu
- Fix O(M*S) I/O: Pre-cache ManifestFile objects in manifest_cache_ during Phase 1 (ReadManifestsForSnapshot), eliminating repeated manifest list reads in FindDataFilesToDelete. - Fix storage leak: Use LiveEntries() instead of Entries() to match Java's ManifestFiles.readPaths behavior (only ADDED/EXISTING entries). - Fix data loss risk: When reading a retained manifest fails, abort data file deletion entirely instead of silently continuing. Java retries and throws on failure here. - Fix statistics file deletion: Use path-based set difference instead of snapshot_id-only check, preventing erroneous deletion of statistics files shared across snapshots. - Remove goto anti-pattern: Extract ManifestFile lookup into MakeManifestReader() helper and use manifest_cache_ for direct lookup. - Improve API: FindDataFilesToDelete now returns Result<unordered_set<string>> instead of using a mutable out-parameter. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent d3df6e3 commit 4f36084

File tree

2 files changed

+95
-78
lines changed

2 files changed

+95
-78
lines changed

src/iceberg/update/expire_snapshots.cc

Lines changed: 76 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,17 @@ void ExpireSnapshots::DeleteFilePath(const std::string& path) {
331331
}
332332
}
333333

334+
Result<std::shared_ptr<ManifestReader>> ExpireSnapshots::MakeManifestReader(
335+
const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io) {
336+
const TableMetadata& metadata = base();
337+
auto schema_result = metadata.Schema();
338+
if (!schema_result.has_value()) return Unexpected(schema_result.error());
339+
auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id);
340+
if (!spec_result.has_value()) return Unexpected(spec_result.error());
341+
return ManifestReader::Make(manifest, file_io, schema_result.value(),
342+
spec_result.value());
343+
}
344+
334345
Status ExpireSnapshots::ReadManifestsForSnapshot(
335346
int64_t snapshot_id, std::unordered_set<std::string>& manifest_paths) {
336347
const TableMetadata& metadata = base();
@@ -351,95 +362,73 @@ Status ExpireSnapshots::ReadManifestsForSnapshot(
351362

352363
for (const auto& manifest : manifests_result.value()) {
353364
manifest_paths.insert(manifest.manifest_path);
365+
// Cache manifest metadata for later use in FindDataFilesToDelete,
366+
// avoiding O(M*S) repeated I/O from re-reading manifest lists.
367+
manifest_cache_.emplace(manifest.manifest_path, manifest);
354368
}
355369

356370
return {};
357371
}
358372

359-
Status ExpireSnapshots::FindDataFilesToDelete(
373+
Result<std::unordered_set<std::string>> ExpireSnapshots::FindDataFilesToDelete(
360374
const std::unordered_set<std::string>& manifests_to_delete,
361-
const std::unordered_set<std::string>& retained_manifests,
362-
std::unordered_set<std::string>& data_files_to_delete) {
363-
const TableMetadata& metadata = base();
375+
const std::unordered_set<std::string>& retained_manifests) {
364376
auto file_io = ctx_->table->io();
377+
std::unordered_set<std::string> data_files_to_delete;
365378

366-
// Step 1: Collect all file paths from manifests being deleted
367-
for (const auto& manifest_path : manifests_to_delete) {
368-
// Find the ManifestFile for this path by scanning expired snapshots
369-
for (const auto& snapshot : metadata.snapshots) {
370-
if (!snapshot) continue;
371-
SnapshotCache snapshot_cache(snapshot.get());
372-
auto manifests_result = snapshot_cache.Manifests(file_io);
373-
if (!manifests_result.has_value()) continue;
374-
375-
for (const auto& manifest : manifests_result.value()) {
376-
if (manifest.manifest_path != manifest_path) continue;
379+
// Step 1: Collect live file paths from manifests being deleted.
380+
// Use LiveEntries() (ADDED/EXISTING only) to match Java's ManifestFiles.readPaths
381+
// which delegates to liveEntries(). Using Entries() would include DELETED entries
382+
// and could cause storage leaks.
383+
for (const auto& [path, manifest] : manifest_cache_) {
384+
if (!manifests_to_delete.contains(path)) continue;
377385

378-
auto schema_result = metadata.Schema();
379-
if (!schema_result.has_value()) continue;
380-
auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id);
381-
if (!spec_result.has_value()) continue;
386+
auto reader_result = MakeManifestReader(manifest, file_io);
387+
if (!reader_result.has_value()) continue;
382388

383-
auto reader_result = ManifestReader::Make(
384-
manifest, file_io, schema_result.value(), spec_result.value());
385-
if (!reader_result.has_value()) continue;
389+
auto entries_result = reader_result.value()->LiveEntries();
390+
if (!entries_result.has_value()) continue;
386391

387-
auto entries_result = reader_result.value()->Entries();
388-
if (!entries_result.has_value()) continue;
389-
390-
for (const auto& entry : entries_result.value()) {
391-
if (entry.data_file) {
392-
data_files_to_delete.insert(entry.data_file->file_path);
393-
}
394-
}
395-
goto next_manifest; // Found and processed this manifest, move to next
392+
for (const auto& entry : entries_result.value()) {
393+
if (entry.data_file) {
394+
data_files_to_delete.insert(entry.data_file->file_path);
396395
}
397396
}
398-
next_manifest:;
399397
}
400398

401399
if (data_files_to_delete.empty()) {
402-
return {};
400+
return data_files_to_delete;
403401
}
404402

405403
// Step 2: Remove any files that are still referenced by retained manifests.
406-
// This ensures we don't delete files that are shared across manifests.
404+
// If reading a retained manifest fails, we must NOT delete its data files
405+
// to avoid accidental data loss (matching Java's retry + throwFailureWhenFinished).
407406
for (const auto& manifest_path : retained_manifests) {
408407
if (data_files_to_delete.empty()) break;
409408

410-
for (const auto& snapshot : metadata.snapshots) {
411-
if (!snapshot) continue;
412-
SnapshotCache snapshot_cache(snapshot.get());
413-
auto manifests_result = snapshot_cache.Manifests(file_io);
414-
if (!manifests_result.has_value()) continue;
415-
416-
for (const auto& manifest : manifests_result.value()) {
417-
if (manifest.manifest_path != manifest_path) continue;
418-
419-
auto schema_result = metadata.Schema();
420-
if (!schema_result.has_value()) continue;
421-
auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id);
422-
if (!spec_result.has_value()) continue;
409+
auto it = manifest_cache_.find(manifest_path);
410+
if (it == manifest_cache_.end()) continue;
423411

424-
auto reader_result = ManifestReader::Make(
425-
manifest, file_io, schema_result.value(), spec_result.value());
426-
if (!reader_result.has_value()) continue;
412+
auto reader_result = MakeManifestReader(it->second, file_io);
413+
if (!reader_result.has_value()) {
414+
// Cannot read a retained manifest — abort data file deletion to prevent
415+
// accidental data loss. Java retries and throws on failure here.
416+
return std::unordered_set<std::string>{};
417+
}
427418

428-
auto entries_result = reader_result.value()->Entries();
429-
if (!entries_result.has_value()) continue;
419+
auto entries_result = reader_result.value()->LiveEntries();
420+
if (!entries_result.has_value()) {
421+
return std::unordered_set<std::string>{};
422+
}
430423

431-
for (const auto& entry : entries_result.value()) {
432-
if (entry.data_file) {
433-
data_files_to_delete.erase(entry.data_file->file_path);
434-
}
435-
}
436-
goto next_retained;
424+
for (const auto& entry : entries_result.value()) {
425+
if (entry.data_file) {
426+
data_files_to_delete.erase(entry.data_file->file_path);
437427
}
438428
}
439-
next_retained:;
440429
}
441430

442-
return {};
431+
return data_files_to_delete;
443432
}
444433

445434
Status ExpireSnapshots::CleanExpiredFiles(
@@ -483,13 +472,13 @@ Status ExpireSnapshots::CleanExpiredFiles(
483472
// Only read entries from manifests being deleted (not all expired manifests),
484473
// then subtract any files still reachable from retained manifests.
485474
if (cleanup_level_ == CleanupLevel::kAll && !manifests_to_delete.empty()) {
486-
std::unordered_set<std::string> data_files_to_delete;
487-
std::ignore = FindDataFilesToDelete(manifests_to_delete, retained_manifest_paths,
488-
data_files_to_delete);
489-
490-
// TODO(shangxinli): Parallelize file deletion with a thread pool.
491-
for (const auto& path : data_files_to_delete) {
492-
DeleteFilePath(path);
475+
auto data_files_result =
476+
FindDataFilesToDelete(manifests_to_delete, retained_manifest_paths);
477+
if (data_files_result.has_value()) {
478+
// TODO(shangxinli): Parallelize file deletion with a thread pool.
479+
for (const auto& path : data_files_result.value()) {
480+
DeleteFilePath(path);
481+
}
493482
}
494483
}
495484

@@ -508,17 +497,31 @@ Status ExpireSnapshots::CleanExpiredFiles(
508497
}
509498
}
510499

511-
// Phase 6: Delete expired statistics files.
512-
// Use set difference between before and after states (matching Java behavior).
513-
// Since Finalize runs before table_ is updated, "after" is base() minus expired.
514-
std::unordered_set<int64_t> retained_stats_snapshots(retained_snapshot_ids);
500+
// Phase 6: Delete expired statistics files using path-based set difference.
501+
// A statistics file should only be deleted if its path is not referenced by any
502+
// retained snapshot, since the same file path could be shared across snapshots.
503+
// Collect paths from retained snapshots, then delete any not in that set.
504+
std::unordered_set<std::string> retained_stat_paths;
505+
std::unordered_set<std::string> retained_part_stat_paths;
506+
for (const auto& stat_file : metadata.statistics) {
507+
if (stat_file && retained_snapshot_ids.contains(stat_file->snapshot_id)) {
508+
retained_stat_paths.insert(stat_file->path);
509+
}
510+
}
511+
for (const auto& part_stat : metadata.partition_statistics) {
512+
if (part_stat && retained_snapshot_ids.contains(part_stat->snapshot_id)) {
513+
retained_part_stat_paths.insert(part_stat->path);
514+
}
515+
}
515516
for (const auto& stat_file : metadata.statistics) {
516-
if (stat_file && !retained_stats_snapshots.contains(stat_file->snapshot_id)) {
517+
if (stat_file && expired_id_set.contains(stat_file->snapshot_id) &&
518+
!retained_stat_paths.contains(stat_file->path)) {
517519
DeleteFilePath(stat_file->path);
518520
}
519521
}
520522
for (const auto& part_stat : metadata.partition_statistics) {
521-
if (part_stat && !retained_stats_snapshots.contains(part_stat->snapshot_id)) {
523+
if (part_stat && expired_id_set.contains(part_stat->snapshot_id) &&
524+
!retained_part_stat_paths.contains(part_stat->path)) {
522525
DeleteFilePath(part_stat->path);
523526
}
524527
}

src/iceberg/update/expire_snapshots.h

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@
2323
#include <functional>
2424
#include <memory>
2525
#include <string>
26+
#include <unordered_map>
2627
#include <unordered_set>
2728
#include <vector>
2829

2930
#include "iceberg/iceberg_export.h"
31+
#include "iceberg/manifest/manifest_list.h"
32+
#include "iceberg/manifest/manifest_reader.h"
3033
#include "iceberg/result.h"
3134
#include "iceberg/type_fwd.h"
3235
#include "iceberg/update/pending_update.h"
@@ -195,11 +198,17 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate {
195198
Status ReadManifestsForSnapshot(int64_t snapshot_id,
196199
std::unordered_set<std::string>& manifest_paths);
197200

198-
/// \brief Find data files to delete by reading entries from manifests being deleted,
199-
/// then subtracting files still reachable from retained manifests.
200-
Status FindDataFilesToDelete(const std::unordered_set<std::string>& manifests_to_delete,
201-
const std::unordered_set<std::string>& retained_manifests,
202-
std::unordered_set<std::string>& data_files_to_delete);
201+
/// \brief Find data files to delete by reading live entries from manifests being
202+
/// deleted, then subtracting files still reachable from retained manifests.
203+
/// If a retained manifest cannot be read, returns an empty set to prevent
204+
/// accidental data loss.
205+
Result<std::unordered_set<std::string>> FindDataFilesToDelete(
206+
const std::unordered_set<std::string>& manifests_to_delete,
207+
const std::unordered_set<std::string>& retained_manifests);
208+
209+
/// \brief Create a ManifestReader for the given ManifestFile.
210+
Result<std::shared_ptr<ManifestReader>> MakeManifestReader(
211+
const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io);
203212

204213
/// \brief Delete a file, suppressing errors (best-effort).
205214
/// Uses the custom delete function if set, otherwise FileIO::DeleteFile.
@@ -218,6 +227,11 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate {
218227

219228
/// Cached result from Apply(), used during Finalize() for file cleanup
220229
std::optional<ApplyResult> apply_result_;
230+
231+
/// Cache of manifest path -> ManifestFile, built during ReadManifestsForSnapshot
232+
/// to avoid O(M*S) repeated I/O from re-reading manifest lists in
233+
/// FindDataFilesToDelete.
234+
std::unordered_map<std::string, ManifestFile> manifest_cache_;
221235
};
222236

223237
} // namespace iceberg

0 commit comments

Comments
 (0)