|
23 | 23 | #include <cstdint> |
24 | 24 | #include <iterator> |
25 | 25 | #include <memory> |
| 26 | +#include <optional> |
| 27 | +#include <string> |
26 | 28 | #include <unordered_set> |
27 | 29 | #include <vector> |
28 | 30 |
|
| 31 | +#include "iceberg/file_io.h" |
| 32 | +#include "iceberg/manifest/manifest_entry.h" |
| 33 | +#include "iceberg/manifest/manifest_reader.h" |
29 | 34 | #include "iceberg/schema.h" |
30 | 35 | #include "iceberg/snapshot.h" |
| 36 | +#include "iceberg/statistics_file.h" |
31 | 37 | #include "iceberg/table.h" |
32 | 38 | #include "iceberg/table_metadata.h" |
33 | 39 | #include "iceberg/transaction.h" |
@@ -285,7 +291,247 @@ Result<ExpireSnapshots::ApplyResult> ExpireSnapshots::Apply() { |
285 | 291 | }); |
286 | 292 | } |
287 | 293 |
|
| 294 | + // Cache the result for use during Finalize() |
| 295 | + apply_result_ = result; |
| 296 | + |
288 | 297 | return result; |
289 | 298 | } |
290 | 299 |
|
| 300 | +Status ExpireSnapshots::Finalize(std::optional<Error> commit_error) { |
| 301 | + if (commit_error.has_value()) { |
| 302 | + return {}; |
| 303 | + } |
| 304 | + |
| 305 | + if (cleanup_level_ == CleanupLevel::kNone) { |
| 306 | + return {}; |
| 307 | + } |
| 308 | + |
| 309 | + if (!apply_result_.has_value() || apply_result_->snapshot_ids_to_remove.empty()) { |
| 310 | + return {}; |
| 311 | + } |
| 312 | + |
| 313 | + // File cleanup is best-effort: log and continue on individual file deletion failures |
| 314 | + // to avoid blocking metadata updates (matching Java behavior). |
| 315 | + return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove); |
| 316 | +} |
| 317 | + |
| 318 | +void ExpireSnapshots::DeleteFilePath(const std::string& path) { |
| 319 | + try { |
| 320 | + if (delete_func_) { |
| 321 | + delete_func_(path); |
| 322 | + } else { |
| 323 | + auto status = ctx_->table->io()->DeleteFile(path); |
| 324 | + // Best-effort: ignore NotFound (file already deleted) and other errors. |
| 325 | + // Java uses suppressFailureWhenFinished + onFailure logging. |
| 326 | + std::ignore = status; |
| 327 | + } |
| 328 | + } catch (...) { |
| 329 | + // Suppress all exceptions during file cleanup to match Java's |
| 330 | + // suppressFailureWhenFinished behavior. |
| 331 | + } |
| 332 | +} |
| 333 | + |
| 334 | +Status ExpireSnapshots::ReadManifestsForSnapshot( |
| 335 | + int64_t snapshot_id, std::unordered_set<std::string>& manifest_paths) { |
| 336 | + const TableMetadata& metadata = base(); |
| 337 | + auto file_io = ctx_->table->io(); |
| 338 | + |
| 339 | + auto snapshot_result = metadata.SnapshotById(snapshot_id); |
| 340 | + if (!snapshot_result.has_value()) { |
| 341 | + return {}; |
| 342 | + } |
| 343 | + auto& snapshot = snapshot_result.value(); |
| 344 | + |
| 345 | + SnapshotCache snapshot_cache(snapshot.get()); |
| 346 | + auto manifests_result = snapshot_cache.Manifests(file_io); |
| 347 | + if (!manifests_result.has_value()) { |
| 348 | + // Best-effort: skip this snapshot if we can't read its manifests |
| 349 | + return {}; |
| 350 | + } |
| 351 | + |
| 352 | + for (const auto& manifest : manifests_result.value()) { |
| 353 | + manifest_paths.insert(manifest.manifest_path); |
| 354 | + } |
| 355 | + |
| 356 | + return {}; |
| 357 | +} |
| 358 | + |
| 359 | +Status ExpireSnapshots::FindDataFilesToDelete( |
| 360 | + const std::unordered_set<std::string>& manifests_to_delete, |
| 361 | + const std::unordered_set<std::string>& retained_manifests, |
| 362 | + std::unordered_set<std::string>& data_files_to_delete) { |
| 363 | + const TableMetadata& metadata = base(); |
| 364 | + auto file_io = ctx_->table->io(); |
| 365 | + |
| 366 | + // Step 1: Collect all file paths from manifests being deleted |
| 367 | + for (const auto& manifest_path : manifests_to_delete) { |
| 368 | + // Find the ManifestFile for this path by scanning expired snapshots |
| 369 | + for (const auto& snapshot : metadata.snapshots) { |
| 370 | + if (!snapshot) continue; |
| 371 | + SnapshotCache snapshot_cache(snapshot.get()); |
| 372 | + auto manifests_result = snapshot_cache.Manifests(file_io); |
| 373 | + if (!manifests_result.has_value()) continue; |
| 374 | + |
| 375 | + for (const auto& manifest : manifests_result.value()) { |
| 376 | + if (manifest.manifest_path != manifest_path) continue; |
| 377 | + |
| 378 | + auto schema_result = metadata.Schema(); |
| 379 | + if (!schema_result.has_value()) continue; |
| 380 | + auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id); |
| 381 | + if (!spec_result.has_value()) continue; |
| 382 | + |
| 383 | + auto reader_result = ManifestReader::Make( |
| 384 | + manifest, file_io, schema_result.value(), spec_result.value()); |
| 385 | + if (!reader_result.has_value()) continue; |
| 386 | + |
| 387 | + auto entries_result = reader_result.value()->Entries(); |
| 388 | + if (!entries_result.has_value()) continue; |
| 389 | + |
| 390 | + for (const auto& entry : entries_result.value()) { |
| 391 | + if (entry.data_file) { |
| 392 | + data_files_to_delete.insert(entry.data_file->file_path); |
| 393 | + } |
| 394 | + } |
| 395 | + goto next_manifest; // Found and processed this manifest, move to next |
| 396 | + } |
| 397 | + } |
| 398 | + next_manifest:; |
| 399 | + } |
| 400 | + |
| 401 | + if (data_files_to_delete.empty()) { |
| 402 | + return {}; |
| 403 | + } |
| 404 | + |
| 405 | + // Step 2: Remove any files that are still referenced by retained manifests. |
| 406 | + // This ensures we don't delete files that are shared across manifests. |
| 407 | + for (const auto& manifest_path : retained_manifests) { |
| 408 | + if (data_files_to_delete.empty()) break; |
| 409 | + |
| 410 | + for (const auto& snapshot : metadata.snapshots) { |
| 411 | + if (!snapshot) continue; |
| 412 | + SnapshotCache snapshot_cache(snapshot.get()); |
| 413 | + auto manifests_result = snapshot_cache.Manifests(file_io); |
| 414 | + if (!manifests_result.has_value()) continue; |
| 415 | + |
| 416 | + for (const auto& manifest : manifests_result.value()) { |
| 417 | + if (manifest.manifest_path != manifest_path) continue; |
| 418 | + |
| 419 | + auto schema_result = metadata.Schema(); |
| 420 | + if (!schema_result.has_value()) continue; |
| 421 | + auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id); |
| 422 | + if (!spec_result.has_value()) continue; |
| 423 | + |
| 424 | + auto reader_result = ManifestReader::Make( |
| 425 | + manifest, file_io, schema_result.value(), spec_result.value()); |
| 426 | + if (!reader_result.has_value()) continue; |
| 427 | + |
| 428 | + auto entries_result = reader_result.value()->Entries(); |
| 429 | + if (!entries_result.has_value()) continue; |
| 430 | + |
| 431 | + for (const auto& entry : entries_result.value()) { |
| 432 | + if (entry.data_file) { |
| 433 | + data_files_to_delete.erase(entry.data_file->file_path); |
| 434 | + } |
| 435 | + } |
| 436 | + goto next_retained; |
| 437 | + } |
| 438 | + } |
| 439 | + next_retained:; |
| 440 | + } |
| 441 | + |
| 442 | + return {}; |
| 443 | +} |
| 444 | + |
| 445 | +Status ExpireSnapshots::CleanExpiredFiles( |
| 446 | + const std::vector<int64_t>& expired_snapshot_ids) { |
| 447 | + const TableMetadata& metadata = base(); |
| 448 | + |
| 449 | + // Build expired and retained snapshot ID sets. |
| 450 | + // The retained set includes ALL snapshots referenced by any branch or tag, |
| 451 | + // since Apply() already computed retention across all refs. |
| 452 | + std::unordered_set<int64_t> expired_id_set(expired_snapshot_ids.begin(), |
| 453 | + expired_snapshot_ids.end()); |
| 454 | + std::unordered_set<int64_t> retained_snapshot_ids; |
| 455 | + for (const auto& snapshot : metadata.snapshots) { |
| 456 | + if (snapshot && !expired_id_set.contains(snapshot->snapshot_id)) { |
| 457 | + retained_snapshot_ids.insert(snapshot->snapshot_id); |
| 458 | + } |
| 459 | + } |
| 460 | + |
| 461 | + // Phase 1: Collect manifest paths from expired and retained snapshots. |
| 462 | + // TODO(shangxinli): Parallelize manifest collection with a thread pool. |
| 463 | + std::unordered_set<std::string> expired_manifest_paths; |
| 464 | + for (int64_t snapshot_id : expired_snapshot_ids) { |
| 465 | + std::ignore = ReadManifestsForSnapshot(snapshot_id, expired_manifest_paths); |
| 466 | + } |
| 467 | + |
| 468 | + std::unordered_set<std::string> retained_manifest_paths; |
| 469 | + for (int64_t snapshot_id : retained_snapshot_ids) { |
| 470 | + std::ignore = ReadManifestsForSnapshot(snapshot_id, retained_manifest_paths); |
| 471 | + } |
| 472 | + |
| 473 | + // Phase 2: Prune manifests still referenced by retained snapshots. |
| 474 | + // Only manifests exclusively in expired snapshots should be deleted. |
| 475 | + std::unordered_set<std::string> manifests_to_delete; |
| 476 | + for (const auto& path : expired_manifest_paths) { |
| 477 | + if (!retained_manifest_paths.contains(path)) { |
| 478 | + manifests_to_delete.insert(path); |
| 479 | + } |
| 480 | + } |
| 481 | + |
| 482 | + // Phase 3: If cleanup level is kAll, find data files to delete. |
| 483 | + // Only read entries from manifests being deleted (not all expired manifests), |
| 484 | + // then subtract any files still reachable from retained manifests. |
| 485 | + if (cleanup_level_ == CleanupLevel::kAll && !manifests_to_delete.empty()) { |
| 486 | + std::unordered_set<std::string> data_files_to_delete; |
| 487 | + std::ignore = FindDataFilesToDelete(manifests_to_delete, retained_manifest_paths, |
| 488 | + data_files_to_delete); |
| 489 | + |
| 490 | + // TODO(shangxinli): Parallelize file deletion with a thread pool. |
| 491 | + for (const auto& path : data_files_to_delete) { |
| 492 | + DeleteFilePath(path); |
| 493 | + } |
| 494 | + } |
| 495 | + |
| 496 | + // Phase 4: Delete orphaned manifest files. |
| 497 | + for (const auto& path : manifests_to_delete) { |
| 498 | + DeleteFilePath(path); |
| 499 | + } |
| 500 | + |
| 501 | + // Phase 5: Delete manifest lists from expired snapshots. |
| 502 | + for (int64_t snapshot_id : expired_snapshot_ids) { |
| 503 | + auto snapshot_result = metadata.SnapshotById(snapshot_id); |
| 504 | + if (!snapshot_result.has_value()) continue; |
| 505 | + auto& snapshot = snapshot_result.value(); |
| 506 | + if (!snapshot->manifest_list.empty()) { |
| 507 | + DeleteFilePath(snapshot->manifest_list); |
| 508 | + } |
| 509 | + } |
| 510 | + |
| 511 | + // Phase 6: Delete expired statistics files. |
| 512 | + // Use set difference between before and after states (matching Java behavior). |
| 513 | + // Since Finalize runs before table_ is updated, "after" is base() minus expired. |
| 514 | + std::unordered_set<int64_t> retained_stats_snapshots(retained_snapshot_ids); |
| 515 | + for (const auto& stat_file : metadata.statistics) { |
| 516 | + if (stat_file && !retained_stats_snapshots.contains(stat_file->snapshot_id)) { |
| 517 | + DeleteFilePath(stat_file->path); |
| 518 | + } |
| 519 | + } |
| 520 | + for (const auto& part_stat : metadata.partition_statistics) { |
| 521 | + if (part_stat && !retained_stats_snapshots.contains(part_stat->snapshot_id)) { |
| 522 | + DeleteFilePath(part_stat->path); |
| 523 | + } |
| 524 | + } |
| 525 | + |
| 526 | + return {}; |
| 527 | +} |
| 528 | + |
| 529 | +// TODO(shangxinli): Implement IncrementalFileCleanup strategy for linear ancestry |
| 530 | +// optimization. Java uses this when: !specifiedSnapshotId && simple linear main branch |
| 531 | +// ancestry (no non-main snapshots removed, no non-main snapshots remain). |
| 532 | +// The incremental strategy is more efficient because it only needs to scan |
| 533 | +// manifests written by expired snapshots (checking added_snapshot_id), avoiding |
| 534 | +// the full reachability analysis. It also handles cherry-pick protection via |
| 535 | +// SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP. |
| 536 | + |
291 | 537 | } // namespace iceberg |
0 commit comments