Skip to content

Commit e950bed

Browse files
committed
feat(update): add MergingSnapshotUpdate
Abstract base for merge-based snapshot operations (MergeAppend, OverwriteFiles, RowDelta, etc.), implementing the filter → write → merge pipeline consistent with Java's MergingSnapshotProducer. Also fixes SnapshotSummaryBuilder manifest count fields and a use-after-free bug in SnapshotUpdate::DeleteFile.
1 parent 6c781c7 commit e950bed

15 files changed

Lines changed: 2262 additions & 10 deletions

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ set(ICEBERG_SOURCES
8888
type.cc
8989
update/expire_snapshots.cc
9090
update/fast_append.cc
91+
update/merging_snapshot_update.cc
9192
update/pending_update.cc
9293
update/set_snapshot.cc
9394
update/snapshot_manager.cc

src/iceberg/manifest/manifest_filter_manager.cc

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,24 @@ bool ManifestFilterManager::ContainsDeletes() const {
117117
!drop_partitions_.empty();
118118
}
119119

120+
void ManifestFilterManager::DropDeleteFilesOlderThan(int64_t sequence_number) {
121+
min_sequence_number_ = sequence_number;
122+
}
123+
124+
void ManifestFilterManager::RemoveDanglingDeletesFor(const DataFileSet& deleted_files) {
125+
for (const auto& file : deleted_files) {
126+
removed_data_file_paths_.insert(file->file_path);
127+
}
128+
}
129+
120130
Result<bool> ManifestFilterManager::CanContainDroppedFiles(const ManifestFile&) const {
121131
// TODO(Guotao): Use the manifest descriptor to skip unrelated object-delete
122132
// manifests once object-delete partitions are tracked separately.
123133
// Currently, DeleteFile(std::shared_ptr<DataFile>) degrades to a path-based delete,
124134
// which forces scanning all manifests.
125-
return !delete_paths_.empty();
135+
// Also open delete manifests when a minimum sequence number is set for cleanup.
136+
return !delete_paths_.empty() || !removed_data_file_paths_.empty() ||
137+
(manifest_content_ == ManifestContent::kDeletes && min_sequence_number_ > 0);
126138
}
127139

128140
Result<bool> ManifestFilterManager::CanContainDroppedPartitions(
@@ -219,6 +231,25 @@ Result<bool> ManifestFilterManager::ShouldDelete(const ManifestEntry& entry,
219231
return true;
220232
}
221233

234+
// Delete-manifest-specific cleanup (only for ManifestContent::kDeletes).
235+
if (manifest_content_ == ManifestContent::kDeletes) {
236+
// Drop delete files whose data sequence number is older than the minimum
237+
// retained by the table (they can no longer match any live data rows).
238+
// seq == 0 (kInitialSequenceNumber / nullopt) is intentionally excluded:
239+
// those entries predate sequence number assignment and must not be pruned.
240+
int64_t seq = entry.sequence_number.value_or(0);
241+
if (min_sequence_number_ > 0 && seq > 0 && seq < min_sequence_number_) {
242+
return true;
243+
}
244+
245+
// Drop DVs that reference a data file that has been removed (dangling DV).
246+
if (!removed_data_file_paths_.empty() && file.IsDeletionVector() &&
247+
file.referenced_data_file.has_value() &&
248+
removed_data_file_paths_.count(*file.referenced_data_file)) {
249+
return true;
250+
}
251+
}
252+
222253
if (HasRowFilterExpression(delete_expr_)) {
223254
ICEBERG_ASSIGN_OR_RAISE(auto* residual_eval,
224255
GetResidualEvaluator(schema, specs_by_id, spec_id));
@@ -403,6 +434,7 @@ Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
403434
bool trust_manifest_references = CanTrustManifestReferences(manifests);
404435
manifest_evaluator_cache_.clear();
405436
residual_evaluator_cache_.clear();
437+
replaced_manifests_count_ = 0;
406438

407439
// TODO(Guotao): Parallelize manifest filtering with per-manifest results, then
408440
// merge found paths and deleted files after the loop.
@@ -413,6 +445,9 @@ Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
413445
auto filtered_manifest,
414446
FilterManifest(schema, specs_by_id, *manifest_ptr, trust_manifest_references,
415447
writer_factory, found_paths));
448+
if (filtered_manifest.manifest_path != manifest_ptr->manifest_path) {
449+
++replaced_manifests_count_;
450+
}
416451
filtered.push_back(std::move(filtered_manifest));
417452
}
418453

src/iceberg/manifest/manifest_filter_manager.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,36 @@ class ICEBERG_EXPORT ManifestFilterManager {
116116
/// manifest entry matches a delete condition.
117117
void FailAnyDelete();
118118

119+
/// \brief Returns the number of manifests rewritten (replaced) by the last
120+
/// FilterManifests() call. A manifest is replaced when it contained deleted entries
121+
/// and was rewritten with those entries marked DELETED.
122+
int32_t ReplacedManifestsCount() const { return replaced_manifests_count_; }
123+
119124
/// \brief Returns true if any delete condition has been registered.
120125
bool ContainsDeletes() const;
121126

127+
/// \brief Set the minimum data sequence number for delete files to retain.
128+
///
129+
/// Only valid for ManifestContent::kDeletes managers. Delete entries whose
130+
/// data_sequence_number is positive and less than \p sequence_number will be
131+
/// marked DELETED. This continuously removes delete files that cannot match
132+
/// any remaining data rows (i.e. all data written before that sequence number
133+
/// has itself been deleted).
134+
///
135+
/// \param sequence_number the inclusive lower bound; delete files older than
136+
/// this value are dropped
137+
void DropDeleteFilesOlderThan(int64_t sequence_number);
138+
139+
/// \brief Register data files that have been removed so their dangling DVs
140+
/// can be cleaned up.
141+
///
142+
/// Only valid for ManifestContent::kDeletes managers. For each DV whose
143+
/// referenced_data_file path appears in \p deleted_files, the DV entry is
144+
/// marked DELETED because the data file it targets no longer exists.
145+
///
146+
/// \param deleted_files set of data files that have been marked for deletion
147+
void RemoveDanglingDeletesFor(const DataFileSet& deleted_files);
148+
122149
/// \brief Apply all accumulated delete conditions to the base snapshot's manifests.
123150
///
124151
/// Manifests that cannot possibly contain deleted files are returned unchanged.
@@ -220,6 +247,13 @@ class ICEBERG_EXPORT ManifestFilterManager {
220247
bool fail_any_delete_{false};
221248
bool case_sensitive_{true};
222249

250+
int32_t replaced_manifests_count_{0};
251+
252+
// minimum data sequence number; delete entries older than this are dropped
253+
int64_t min_sequence_number_{0};
254+
// paths of data files that were removed; DVs referencing these are dangling
255+
std::unordered_set<std::string> removed_data_file_paths_;
256+
223257
std::unordered_map<int32_t, std::unique_ptr<ManifestEvaluator>>
224258
manifest_evaluator_cache_;
225259
std::unordered_map<int32_t, std::unique_ptr<ResidualEvaluator>>

src/iceberg/manifest/manifest_merge_manager.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ Result<std::vector<ManifestFile>> ManifestMergeManager::MergeManifests(
5757
std::ranges::copy(manifest_ranges | std::views::join, std::back_inserter(all));
5858

5959
if (all.empty() || !merge_enabled_) {
60+
replaced_manifests_count_ = 0;
6061
return all |
6162
std::views::transform([](const ManifestFile* manifest) { return *manifest; }) |
6263
std::ranges::to<std::vector<ManifestFile>>();
@@ -82,6 +83,7 @@ Result<std::vector<ManifestFile>> ManifestMergeManager::MergeManifests(
8283

8384
std::vector<ManifestFile> result;
8485
result.reserve(all.size());
86+
replaced_manifests_count_ = 0;
8587
for (auto& [key, group] : by_spec) {
8688
const auto* first = first_by_content.at(key.second);
8789
ICEBERG_ASSIGN_OR_RAISE(auto merged, MergeGroup(group, first, snapshot_id, metadata,
@@ -140,6 +142,8 @@ Result<std::vector<ManifestFile>> ManifestMergeManager::MergeGroup(
140142
} else {
141143
ICEBERG_ASSIGN_OR_RAISE(
142144
auto merged, FlushBin(bin, snapshot_id, metadata, file_io, writer_factory));
145+
// Each manifest consumed into the merged output (beyond the 1 output) is replaced.
146+
replaced_manifests_count_ += static_cast<int32_t>(bin.size()) - 1;
143147
result.push_back(std::move(merged));
144148
}
145149
}

src/iceberg/manifest/manifest_merge_manager.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ class ICEBERG_EXPORT ManifestMergeManager {
8484
const TableMetadata& metadata, std::shared_ptr<FileIO> file_io,
8585
const ManifestWriterFactory& writer_factory);
8686

87+
/// \brief Returns the number of manifests replaced (consumed into merged outputs)
88+
/// by the last MergeManifests() call.
89+
int32_t ReplacedManifestsCount() const { return replaced_manifests_count_; }
90+
8791
private:
8892
/// \brief Merge a group of manifests sharing the same spec_id.
8993
///
@@ -109,6 +113,7 @@ class ICEBERG_EXPORT ManifestMergeManager {
109113
const int64_t target_size_bytes_;
110114
const int32_t min_count_to_merge_;
111115
const bool merge_enabled_;
116+
int32_t replaced_manifests_count_{0};
112117
};
113118

114119
} // namespace iceberg

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ iceberg_sources = files(
110110
'type.cc',
111111
'update/expire_snapshots.cc',
112112
'update/fast_append.cc',
113+
'update/merging_snapshot_update.cc',
113114
'update/pending_update.cc',
114115
'update/set_snapshot.cc',
115116
'update/snapshot_manager.cc',

src/iceberg/snapshot.cc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,10 @@ void SnapshotSummaryBuilder::Clear() {
441441
metrics_.Clear();
442442
deleted_duplicate_files_ = 0;
443443
trust_partition_metrics_ = true;
444+
manifests_counts_set_ = false;
445+
manifests_created_ = 0;
446+
manifests_kept_ = 0;
447+
manifests_replaced_ = 0;
444448
}
445449

446450
void SnapshotSummaryBuilder::SetPartitionSummaryLimit(int32_t max) {
@@ -475,6 +479,14 @@ void SnapshotSummaryBuilder::Set(const std::string& property, const std::string&
475479
properties_[property] = value;
476480
}
477481

482+
void SnapshotSummaryBuilder::SetManifestCounts(int32_t created, int32_t kept,
483+
int32_t replaced) {
484+
manifests_counts_set_ = true;
485+
manifests_created_ = created;
486+
manifests_kept_ = kept;
487+
manifests_replaced_ = replaced;
488+
}
489+
478490
void SnapshotSummaryBuilder::Merge(const SnapshotSummaryBuilder& other) {
479491
for (const auto& [key, value] : other.properties_) {
480492
properties_[key] = value;
@@ -491,6 +503,10 @@ void SnapshotSummaryBuilder::Merge(const SnapshotSummaryBuilder& other) {
491503
}
492504

493505
deleted_duplicate_files_ += other.deleted_duplicate_files_;
506+
// Manifest counts (manifests_counts_set_ / manifests_created_ / manifests_kept_ /
507+
// manifests_replaced_) are intentionally not merged here. They are set directly
508+
// on the root summary builder by Apply() after all manifests are finalized, and
509+
// are never populated on sub-builders that get Merge()d in.
494510
}
495511

496512
std::unordered_map<std::string, std::string> SnapshotSummaryBuilder::Build() const {
@@ -504,6 +520,14 @@ std::unordered_map<std::string, std::string> SnapshotSummaryBuilder::Build() con
504520
SetIf(deleted_duplicate_files_ > 0, builder,
505521
SnapshotSummaryFields::kDeletedDuplicatedFiles, deleted_duplicate_files_);
506522

523+
// Always emit all three manifest count fields together when they have been set.
524+
SetIf(manifests_counts_set_, builder, SnapshotSummaryFields::kManifestsCreated,
525+
manifests_created_);
526+
SetIf(manifests_counts_set_, builder, SnapshotSummaryFields::kManifestsKept,
527+
manifests_kept_);
528+
SetIf(manifests_counts_set_, builder, SnapshotSummaryFields::kManifestsReplaced,
529+
manifests_replaced_);
530+
507531
SetIf(trust_partition_metrics_, builder,
508532
SnapshotSummaryFields::kChangedPartitionCountProp, partition_metrics_.size());
509533

src/iceberg/snapshot.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,15 @@ class ICEBERG_EXPORT SnapshotSummaryBuilder {
338338
/// \param value Property value
339339
void Set(const std::string& property, const std::string& value);
340340

341+
/// \brief Set manifest count summary fields.
342+
///
343+
/// Records how many manifests were created, kept, and replaced in this snapshot.
344+
///
345+
/// \param created Manifests written by this snapshot
346+
/// \param kept Manifests carried over unchanged from the previous snapshot
347+
/// \param replaced Manifests rewritten or merged away
348+
void SetManifestCounts(int32_t created, int32_t kept, int32_t replaced);
349+
341350
/// \brief Merge another builder's metrics into this one
342351
///
343352
/// \param other The builder to merge from
@@ -359,6 +368,10 @@ class ICEBERG_EXPORT SnapshotSummaryBuilder {
359368
int32_t max_changed_partitions_for_summaries_{0};
360369
int64_t deleted_duplicate_files_{0};
361370
bool trust_partition_metrics_{true};
371+
bool manifests_counts_set_{false};
372+
int32_t manifests_created_{0};
373+
int32_t manifests_kept_{0};
374+
int32_t manifests_replaced_{0};
362375
};
363376

364377
/// \brief Data operation that produce snapshots.

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ if(ICEBERG_BUILD_BUNDLE)
208208
expire_snapshots_test.cc
209209
fast_append_test.cc
210210
manifest_filter_manager_test.cc
211+
merging_snapshot_update_test.cc
211212
name_mapping_update_test.cc
212213
snapshot_manager_test.cc
213214
transaction_test.cc

0 commit comments

Comments
 (0)