@@ -331,6 +331,17 @@ void ExpireSnapshots::DeleteFilePath(const std::string& path) {
331331 }
332332}
333333
334+ Result<std::shared_ptr<ManifestReader>> ExpireSnapshots::MakeManifestReader (
335+ const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io) {
336+ const TableMetadata& metadata = base ();
337+ auto schema_result = metadata.Schema ();
338+ if (!schema_result.has_value ()) return std::unexpected<Error>(schema_result.error ());
339+ auto spec_result = metadata.PartitionSpecById (manifest.partition_spec_id );
340+ if (!spec_result.has_value ()) return std::unexpected<Error>(spec_result.error ());
341+ return ManifestReader::Make (manifest, file_io, schema_result.value (),
342+ spec_result.value ());
343+ }
344+
334345Status ExpireSnapshots::ReadManifestsForSnapshot (
335346 int64_t snapshot_id, std::unordered_set<std::string>& manifest_paths) {
336347 const TableMetadata& metadata = base ();
@@ -351,95 +362,73 @@ Status ExpireSnapshots::ReadManifestsForSnapshot(
351362
352363 for (const auto & manifest : manifests_result.value ()) {
353364 manifest_paths.insert (manifest.manifest_path );
365+ // Cache manifest metadata for later use in FindDataFilesToDelete,
366+ // avoiding O(M*S) repeated I/O from re-reading manifest lists.
367+ manifest_cache_.emplace (manifest.manifest_path , manifest);
354368 }
355369
356370 return {};
357371}
358372
359- Status ExpireSnapshots::FindDataFilesToDelete (
373+ Result<std::unordered_set<std::string>> ExpireSnapshots::FindDataFilesToDelete (
360374 const std::unordered_set<std::string>& manifests_to_delete,
361- const std::unordered_set<std::string>& retained_manifests,
362- std::unordered_set<std::string>& data_files_to_delete) {
363- const TableMetadata& metadata = base ();
375+ const std::unordered_set<std::string>& retained_manifests) {
364376 auto file_io = ctx_->table ->io ();
377+ std::unordered_set<std::string> data_files_to_delete;
365378
366- // Step 1: Collect all file paths from manifests being deleted
367- for (const auto & manifest_path : manifests_to_delete) {
368- // Find the ManifestFile for this path by scanning expired snapshots
369- for (const auto & snapshot : metadata.snapshots ) {
370- if (!snapshot) continue ;
371- SnapshotCache snapshot_cache (snapshot.get ());
372- auto manifests_result = snapshot_cache.Manifests (file_io);
373- if (!manifests_result.has_value ()) continue ;
374-
375- for (const auto & manifest : manifests_result.value ()) {
376- if (manifest.manifest_path != manifest_path) continue ;
379+ // Step 1: Collect live file paths from manifests being deleted.
380+ // Use LiveEntries() (ADDED/EXISTING only) to match Java's ManifestFiles.readPaths
381+ // which delegates to liveEntries(). Using Entries() would include DELETED entries
382+ // and could cause storage leaks.
383+ for (const auto & [path, manifest] : manifest_cache_) {
384+ if (!manifests_to_delete.contains (path)) continue ;
377385
378- auto schema_result = metadata.Schema ();
379- if (!schema_result.has_value ()) continue ;
380- auto spec_result = metadata.PartitionSpecById (manifest.partition_spec_id );
381- if (!spec_result.has_value ()) continue ;
386+ auto reader_result = MakeManifestReader (manifest, file_io);
387+ if (!reader_result.has_value ()) continue ;
382388
383- auto reader_result = ManifestReader::Make (
384- manifest, file_io, schema_result.value (), spec_result.value ());
385- if (!reader_result.has_value ()) continue ;
389+ auto entries_result = reader_result.value ()->LiveEntries ();
390+ if (!entries_result.has_value ()) continue ;
386391
387- auto entries_result = reader_result.value ()->Entries ();
388- if (!entries_result.has_value ()) continue ;
389-
390- for (const auto & entry : entries_result.value ()) {
391- if (entry.data_file ) {
392- data_files_to_delete.insert (entry.data_file ->file_path );
393- }
394- }
395- goto next_manifest; // Found and processed this manifest, move to next
392+ for (const auto & entry : entries_result.value ()) {
393+ if (entry.data_file ) {
394+ data_files_to_delete.insert (entry.data_file ->file_path );
396395 }
397396 }
398- next_manifest:;
399397 }
400398
401399 if (data_files_to_delete.empty ()) {
402- return {} ;
400+ return data_files_to_delete ;
403401 }
404402
405403 // Step 2: Remove any files that are still referenced by retained manifests.
406- // This ensures we don't delete files that are shared across manifests.
404+ // If reading a retained manifest fails, we must NOT delete its data files
405+ // to avoid accidental data loss (matching Java's retry + throwFailureWhenFinished).
407406 for (const auto & manifest_path : retained_manifests) {
408407 if (data_files_to_delete.empty ()) break ;
409408
410- for (const auto & snapshot : metadata.snapshots ) {
411- if (!snapshot) continue ;
412- SnapshotCache snapshot_cache (snapshot.get ());
413- auto manifests_result = snapshot_cache.Manifests (file_io);
414- if (!manifests_result.has_value ()) continue ;
415-
416- for (const auto & manifest : manifests_result.value ()) {
417- if (manifest.manifest_path != manifest_path) continue ;
418-
419- auto schema_result = metadata.Schema ();
420- if (!schema_result.has_value ()) continue ;
421- auto spec_result = metadata.PartitionSpecById (manifest.partition_spec_id );
422- if (!spec_result.has_value ()) continue ;
409+ auto it = manifest_cache_.find (manifest_path);
410+ if (it == manifest_cache_.end ()) continue ;
423411
424- auto reader_result = ManifestReader::Make (
425- manifest, file_io, schema_result.value (), spec_result.value ());
426- if (!reader_result.has_value ()) continue ;
412+ auto reader_result = MakeManifestReader (it->second , file_io);
413+ if (!reader_result.has_value ()) {
414+ // Cannot read a retained manifest — abort data file deletion to prevent
415+ // accidental data loss. Java retries and throws on failure here.
416+ return std::unordered_set<std::string>{};
417+ }
427418
428- auto entries_result = reader_result.value ()->Entries ();
429- if (!entries_result.has_value ()) continue ;
419+ auto entries_result = reader_result.value ()->LiveEntries ();
420+ if (!entries_result.has_value ()) {
421+ return std::unordered_set<std::string>{};
422+ }
430423
431- for (const auto & entry : entries_result.value ()) {
432- if (entry.data_file ) {
433- data_files_to_delete.erase (entry.data_file ->file_path );
434- }
435- }
436- goto next_retained;
424+ for (const auto & entry : entries_result.value ()) {
425+ if (entry.data_file ) {
426+ data_files_to_delete.erase (entry.data_file ->file_path );
437427 }
438428 }
439- next_retained:;
440429 }
441430
442- return {} ;
431+ return data_files_to_delete ;
443432}
444433
445434Status ExpireSnapshots::CleanExpiredFiles (
@@ -483,13 +472,13 @@ Status ExpireSnapshots::CleanExpiredFiles(
483472 // Only read entries from manifests being deleted (not all expired manifests),
484473 // then subtract any files still reachable from retained manifests.
485474 if (cleanup_level_ == CleanupLevel::kAll && !manifests_to_delete.empty ()) {
486- std::unordered_set<std::string> data_files_to_delete;
487- std::ignore = FindDataFilesToDelete (manifests_to_delete, retained_manifest_paths,
488- data_files_to_delete);
489-
490- // TODO(shangxinli): Parallelize file deletion with a thread pool.
491- for ( const auto & path : data_files_to_delete) {
492- DeleteFilePath (path);
475+ auto data_files_result =
476+ FindDataFilesToDelete (manifests_to_delete, retained_manifest_paths);
477+ if (data_files_result. has_value ()) {
478+ // TODO(shangxinli): Parallelize file deletion with a thread pool.
479+ for ( const auto & path : data_files_result. value ()) {
480+ DeleteFilePath (path);
481+ }
493482 }
494483 }
495484
@@ -508,17 +497,31 @@ Status ExpireSnapshots::CleanExpiredFiles(
508497 }
509498 }
510499
511- // Phase 6: Delete expired statistics files.
512- // Use set difference between before and after states (matching Java behavior).
513- // Since Finalize runs before table_ is updated, "after" is base() minus expired.
514- std::unordered_set<int64_t > retained_stats_snapshots (retained_snapshot_ids);
500+ // Phase 6: Delete expired statistics files using path-based set difference.
501+ // A statistics file should only be deleted if its path is not referenced by any
502+ // retained snapshot, since the same file path could be shared across snapshots.
503+ // Collect paths from retained snapshots, then delete any not in that set.
504+ std::unordered_set<std::string> retained_stat_paths;
505+ std::unordered_set<std::string> retained_part_stat_paths;
506+ for (const auto & stat_file : metadata.statistics ) {
507+ if (stat_file && retained_snapshot_ids.contains (stat_file->snapshot_id )) {
508+ retained_stat_paths.insert (stat_file->path );
509+ }
510+ }
511+ for (const auto & part_stat : metadata.partition_statistics ) {
512+ if (part_stat && retained_snapshot_ids.contains (part_stat->snapshot_id )) {
513+ retained_part_stat_paths.insert (part_stat->path );
514+ }
515+ }
515516 for (const auto & stat_file : metadata.statistics ) {
516- if (stat_file && !retained_stats_snapshots.contains (stat_file->snapshot_id )) {
517+ if (stat_file && expired_id_set.contains (stat_file->snapshot_id ) &&
518+ !retained_stat_paths.contains (stat_file->path )) {
517519 DeleteFilePath (stat_file->path );
518520 }
519521 }
520522 for (const auto & part_stat : metadata.partition_statistics ) {
521- if (part_stat && !retained_stats_snapshots.contains (part_stat->snapshot_id )) {
523+ if (part_stat && expired_id_set.contains (part_stat->snapshot_id ) &&
524+ !retained_part_stat_paths.contains (part_stat->path )) {
522525 DeleteFilePath (part_stat->path );
523526 }
524527 }
0 commit comments