@@ -316,15 +316,18 @@ Status ExpireSnapshots::Finalize(std::optional<Error> commit_error) {
316316 return CleanExpiredFiles (apply_result_->snapshot_ids_to_remove );
317317}
318318
319+ std::shared_ptr<FileIO> ExpireSnapshots::GetFileIO () const {
320+ return transaction_->table ()->io ();
321+ }
322+
319323void ExpireSnapshots::DeleteFilePath (const std::string& path) {
320324 try {
321325 if (delete_func_) {
322326 delete_func_ (path);
323327 } else {
324- auto status = transaction_->table ()->io ()->DeleteFile (path);
325328 // Best-effort: ignore NotFound (file already deleted) and other errors.
326329 // Java uses suppressFailureWhenFinished + onFailure logging.
327- std::ignore = status ;
330+ std::ignore = GetFileIO ()-> DeleteFile (path) ;
328331 }
329332 } catch (...) {
330333 // Suppress all exceptions during file cleanup to match Java's
@@ -334,19 +337,14 @@ void ExpireSnapshots::DeleteFilePath(const std::string& path) {
334337
335338Status ExpireSnapshots::ReadManifestsForSnapshot (
336339 int64_t snapshot_id, std::unordered_set<std::string>& manifest_paths) {
337- const TableMetadata& metadata = base ();
338- auto file_io = transaction_->table ()->io ();
339-
340- auto snapshot_result = metadata.SnapshotById (snapshot_id);
340+ auto snapshot_result = base ().SnapshotById (snapshot_id);
341341 if (!snapshot_result.has_value ()) {
342342 return {};
343343 }
344- auto & snapshot = snapshot_result.value ();
345344
346- SnapshotCache snapshot_cache (snapshot .get ());
347- auto manifests_result = snapshot_cache.Manifests (file_io );
345+ SnapshotCache snapshot_cache (snapshot_result. value () .get ());
346+ auto manifests_result = snapshot_cache.Manifests (GetFileIO () );
348347 if (!manifests_result.has_value ()) {
349- // Best-effort: skip this snapshot if we can't read its manifests
350348 return {};
351349 }
352350
@@ -357,46 +355,66 @@ Status ExpireSnapshots::ReadManifestsForSnapshot(
357355 return {};
358356}
359357
358+ // Helper to read data file paths from a manifest, collecting into or erasing from a set.
359+ void ExpireSnapshots::ReadDataFilesFromManifest (
360+ const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
361+ std::unordered_set<std::string>& file_paths, bool erase) {
362+ const TableMetadata& metadata = base ();
363+ auto schema_result = metadata.Schema ();
364+ if (!schema_result.has_value ()) return ;
365+ auto spec_result = metadata.PartitionSpecById (manifest.partition_spec_id );
366+ if (!spec_result.has_value ()) return ;
367+
368+ auto reader_result =
369+ ManifestReader::Make (manifest, file_io, schema_result.value (), spec_result.value ());
370+ if (!reader_result.has_value ()) return ;
371+
372+ auto entries_result = reader_result.value ()->Entries ();
373+ if (!entries_result.has_value ()) return ;
374+
375+ for (const auto & entry : entries_result.value ()) {
376+ if (entry.data_file ) {
377+ if (erase) {
378+ file_paths.erase (entry.data_file ->file_path );
379+ } else {
380+ file_paths.insert (entry.data_file ->file_path );
381+ }
382+ }
383+ }
384+ }
385+
386+ // Find the ManifestFile object for a given path by scanning all snapshots.
387+ // Returns true if found and invokes the callback with the manifest.
388+ bool ExpireSnapshots::FindManifestByPath (
389+ const std::string& manifest_path, std::shared_ptr<FileIO> file_io,
390+ const std::function<void (const ManifestFile&)>& callback) {
391+ for (const auto & snapshot : base ().snapshots ) {
392+ if (!snapshot) continue ;
393+ SnapshotCache snapshot_cache (snapshot.get ());
394+ auto manifests_result = snapshot_cache.Manifests (file_io);
395+ if (!manifests_result.has_value ()) continue ;
396+
397+ for (const auto & manifest : manifests_result.value ()) {
398+ if (manifest.manifest_path == manifest_path) {
399+ callback (manifest);
400+ return true ;
401+ }
402+ }
403+ }
404+ return false ;
405+ }
406+
360407Status ExpireSnapshots::FindDataFilesToDelete (
361408 const std::unordered_set<std::string>& manifests_to_delete,
362409 const std::unordered_set<std::string>& retained_manifests,
363410 std::unordered_set<std::string>& data_files_to_delete) {
364- const TableMetadata& metadata = base ();
365- auto file_io = transaction_->table ()->io ();
411+ auto file_io = GetFileIO ();
366412
367413 // Step 1: Collect all file paths from manifests being deleted
368414 for (const auto & manifest_path : manifests_to_delete) {
369- // Find the ManifestFile for this path by scanning expired snapshots
370- for (const auto & snapshot : metadata.snapshots ) {
371- if (!snapshot) continue ;
372- SnapshotCache snapshot_cache (snapshot.get ());
373- auto manifests_result = snapshot_cache.Manifests (file_io);
374- if (!manifests_result.has_value ()) continue ;
375-
376- for (const auto & manifest : manifests_result.value ()) {
377- if (manifest.manifest_path != manifest_path) continue ;
378-
379- auto schema_result = metadata.Schema ();
380- if (!schema_result.has_value ()) continue ;
381- auto spec_result = metadata.PartitionSpecById (manifest.partition_spec_id );
382- if (!spec_result.has_value ()) continue ;
383-
384- auto reader_result = ManifestReader::Make (
385- manifest, file_io, schema_result.value (), spec_result.value ());
386- if (!reader_result.has_value ()) continue ;
387-
388- auto entries_result = reader_result.value ()->Entries ();
389- if (!entries_result.has_value ()) continue ;
390-
391- for (const auto & entry : entries_result.value ()) {
392- if (entry.data_file ) {
393- data_files_to_delete.insert (entry.data_file ->file_path );
394- }
395- }
396- goto next_manifest; // Found and processed this manifest, move to next
397- }
398- }
399- next_manifest:;
415+ FindManifestByPath (manifest_path, file_io, [&](const ManifestFile& manifest) {
416+ ReadDataFilesFromManifest (manifest, file_io, data_files_to_delete, /* erase=*/ false );
417+ });
400418 }
401419
402420 if (data_files_to_delete.empty ()) {
@@ -407,37 +425,9 @@ Status ExpireSnapshots::FindDataFilesToDelete(
407425 // This ensures we don't delete files that are shared across manifests.
408426 for (const auto & manifest_path : retained_manifests) {
409427 if (data_files_to_delete.empty ()) break ;
410-
411- for (const auto & snapshot : metadata.snapshots ) {
412- if (!snapshot) continue ;
413- SnapshotCache snapshot_cache (snapshot.get ());
414- auto manifests_result = snapshot_cache.Manifests (file_io);
415- if (!manifests_result.has_value ()) continue ;
416-
417- for (const auto & manifest : manifests_result.value ()) {
418- if (manifest.manifest_path != manifest_path) continue ;
419-
420- auto schema_result = metadata.Schema ();
421- if (!schema_result.has_value ()) continue ;
422- auto spec_result = metadata.PartitionSpecById (manifest.partition_spec_id );
423- if (!spec_result.has_value ()) continue ;
424-
425- auto reader_result = ManifestReader::Make (
426- manifest, file_io, schema_result.value (), spec_result.value ());
427- if (!reader_result.has_value ()) continue ;
428-
429- auto entries_result = reader_result.value ()->Entries ();
430- if (!entries_result.has_value ()) continue ;
431-
432- for (const auto & entry : entries_result.value ()) {
433- if (entry.data_file ) {
434- data_files_to_delete.erase (entry.data_file ->file_path );
435- }
436- }
437- goto next_retained;
438- }
439- }
440- next_retained:;
428+ FindManifestByPath (manifest_path, file_io, [&](const ManifestFile& manifest) {
429+ ReadDataFilesFromManifest (manifest, file_io, data_files_to_delete, /* erase=*/ true );
430+ });
441431 }
442432
443433 return {};
0 commit comments