Skip to content

Commit 723f1fd

Browse files
committed
Address delete validation review comments
1 parent 07ee3f2 commit 723f1fd

6 files changed

Lines changed: 367 additions & 45 deletions

src/iceberg/manifest/manifest_filter_manager.cc

Lines changed: 68 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,25 @@ Result<std::string> FormatPartitionPath(const PartitionSpecsById& specs_by_id,
6565

6666
} // namespace
6767

68+
size_t ManifestFilterManager::DeleteFileKeyHash::operator()(
69+
const DeleteFileKey& key) const {
70+
size_t hash = std::hash<std::string>{}(key.path);
71+
auto combine = [&hash](const auto& value) {
72+
size_t value_hash = value.has_value() ? std::hash<int64_t>{}(*value) : 0;
73+
hash ^= value_hash + 0x9e3779b9 + (hash << 6) + (hash >> 2);
74+
};
75+
combine(key.content_offset);
76+
combine(key.content_size_in_bytes);
77+
return hash;
78+
}
79+
80+
ManifestFilterManager::DeleteFileKey ManifestFilterManager::MakeDeleteFileKey(
81+
const DataFile& file) {
82+
return DeleteFileKey{.path = file.file_path,
83+
.content_offset = file.content_offset,
84+
.content_size_in_bytes = file.content_size_in_bytes};
85+
}
86+
6887
ManifestFilterManager::ManifestFilterManager(ManifestContent content,
6988
std::shared_ptr<FileIO> file_io)
7089
: manifest_content_(content),
@@ -93,14 +112,19 @@ void ManifestFilterManager::DeleteFile(std::string_view path) {
93112

94113
Status ManifestFilterManager::DeleteFile(std::shared_ptr<DataFile> file) {
95114
ICEBERG_PRECHECK(file != nullptr, "Cannot delete file: null");
96-
delete_paths_.insert(file->file_path);
115+
delete_file_keys_.insert(MakeDeleteFileKey(*file));
97116
return {};
98117
}
99118

100119
const DataFileSet& ManifestFilterManager::FilesToBeDeleted() const {
101120
return delete_files_;
102121
}
103122

123+
const std::vector<std::shared_ptr<DataFile>>& ManifestFilterManager::DeletedFiles()
124+
const {
125+
return deleted_files_;
126+
}
127+
104128
void ManifestFilterManager::DropPartition(int32_t spec_id, PartitionValues partition) {
105129
drop_partitions_.add(spec_id, std::move(partition));
106130
}
@@ -113,7 +137,7 @@ void ManifestFilterManager::FailAnyDelete() { fail_any_delete_ = true; }
113137

114138
bool ManifestFilterManager::ContainsDeletes() const {
115139
return HasRowFilterExpression(delete_expr_) || !delete_paths_.empty() ||
116-
!drop_partitions_.empty();
140+
!delete_file_keys_.empty() || !drop_partitions_.empty();
117141
}
118142

119143
void ManifestFilterManager::DropDeleteFilesOlderThan(int64_t sequence_number) {
@@ -129,9 +153,8 @@ void ManifestFilterManager::RemoveDanglingDeletesFor(const DataFileSet& deleted_
129153
Result<bool> ManifestFilterManager::CanContainDroppedFiles(const ManifestFile&) const {
130154
// TODO(Guotao): Use the manifest descriptor to skip unrelated object-delete
131155
// manifests once object-delete partitions are tracked separately.
132-
// Currently, DeleteFile(std::shared_ptr<DataFile>) degrades to a path-based delete,
133-
// which forces scanning all manifests.
134-
return !delete_paths_.empty() || !removed_data_file_paths_.empty();
156+
return !delete_paths_.empty() || !delete_file_keys_.empty() ||
157+
!removed_data_file_paths_.empty();
135158
}
136159

137160
Result<bool> ManifestFilterManager::CanContainDroppedPartitions(
@@ -217,8 +240,9 @@ Result<bool> ManifestFilterManager::ShouldDelete(const ManifestEntry& entry,
217240
const DataFile& file = *entry.data_file;
218241
int32_t spec_id = file.partition_spec_id.value_or(manifest_spec_id);
219242

220-
// Path-based and partition-drop checks
243+
// Path/object-based and partition-drop checks.
221244
if (delete_paths_.count(file.file_path) ||
245+
delete_file_keys_.count(MakeDeleteFileKey(file)) ||
222246
drop_partitions_.contains(spec_id, file.partition)) {
223247
if (fail_any_delete_) {
224248
ICEBERG_ASSIGN_OR_RAISE(auto partition_path,
@@ -293,8 +317,7 @@ bool ManifestFilterManager::CanTrustManifestReferences(
293317
Result<ManifestFile> ManifestFilterManager::FilterManifest(
294318
const std::shared_ptr<Schema>& schema, const PartitionSpecsById& specs_by_id,
295319
const ManifestFile& manifest, bool trust_manifest_references,
296-
const ManifestWriterFactory& writer_factory,
297-
std::unordered_set<std::string>& found_paths) {
320+
const ManifestWriterFactory& writer_factory, FoundDeletes& found_deletes) {
298321
ICEBERG_ASSIGN_OR_RAISE(
299322
auto can_contain_deleted_files,
300323
CanContainDeletedFiles(manifest, schema, specs_by_id, trust_manifest_references));
@@ -315,7 +338,7 @@ Result<ManifestFile> ManifestFilterManager::FilterManifest(
315338
}
316339

317340
return FilterManifestWithDeletedFiles(entries, spec_id, schema, specs_by_id,
318-
writer_factory, found_paths);
341+
writer_factory, found_deletes);
319342
}
320343

321344
Result<bool> ManifestFilterManager::ManifestHasDeletedFiles(
@@ -334,21 +357,30 @@ Result<bool> ManifestFilterManager::ManifestHasDeletedFiles(
334357
Result<ManifestFile> ManifestFilterManager::FilterManifestWithDeletedFiles(
335358
const std::vector<ManifestEntry>& entries, int32_t manifest_spec_id,
336359
const std::shared_ptr<Schema>& schema, const PartitionSpecsById& specs_by_id,
337-
const ManifestWriterFactory& writer_factory,
338-
std::unordered_set<std::string>& found_paths) {
360+
const ManifestWriterFactory& writer_factory, FoundDeletes& found_deletes) {
339361
ICEBERG_ASSIGN_OR_RAISE(auto writer,
340362
writer_factory(manifest_spec_id, manifest_content_));
341363
for (const auto& entry : entries) {
342364
ICEBERG_ASSIGN_OR_RAISE(auto should_delete,
343365
ShouldDelete(entry, schema, specs_by_id, manifest_spec_id));
344366
if (should_delete) {
345-
if (entry.data_file && delete_paths_.count(entry.data_file->file_path)) {
346-
found_paths.insert(entry.data_file->file_path);
347-
}
348367
if (entry.data_file) {
349-
// TODO(Guotao): Track duplicate deletes and avoid full DataFile copies when
350-
// summary generation can use lighter records.
351-
delete_files_.insert(std::make_shared<DataFile>(*entry.data_file));
368+
const auto key = MakeDeleteFileKey(*entry.data_file);
369+
if (delete_paths_.count(entry.data_file->file_path)) {
370+
found_deletes.paths.insert(entry.data_file->file_path);
371+
}
372+
if (delete_file_keys_.count(key)) {
373+
found_deletes.files.insert(key);
374+
}
375+
376+
auto file = std::make_shared<DataFile>(*entry.data_file);
377+
delete_files_.insert(file);
378+
auto [_, inserted] = deleted_file_keys_.insert(key);
379+
if (inserted) {
380+
deleted_files_.push_back(std::move(file));
381+
} else {
382+
++duplicate_deletes_count_;
383+
}
352384
}
353385
ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry));
354386
} else {
@@ -361,18 +393,24 @@ Result<ManifestFile> ManifestFilterManager::FilterManifestWithDeletedFiles(
361393
}
362394

363395
Status ManifestFilterManager::ValidateRequiredDeletes(
364-
const std::unordered_set<std::string>& found_paths) const {
396+
const FoundDeletes& found_deletes) const {
365397
if (!fail_missing_delete_paths_) {
366398
return {};
367399
}
368400

369401
std::string missing;
370402
for (const auto& path : delete_paths_) {
371-
if (!found_paths.count(path)) {
403+
if (!found_deletes.paths.count(path)) {
372404
if (!missing.empty()) missing += ", ";
373405
missing += path;
374406
}
375407
}
408+
for (const auto& key : delete_file_keys_) {
409+
if (!found_deletes.files.count(key)) {
410+
if (!missing.empty()) missing += ", ";
411+
missing += key.path;
412+
}
413+
}
376414
if (!missing.empty()) {
377415
return InvalidArgument("Missing delete paths: {}", missing);
378416
}
@@ -391,9 +429,12 @@ Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
391429
const std::shared_ptr<Snapshot>& base_snapshot,
392430
const ManifestWriterFactory& writer_factory) {
393431
delete_files_.clear();
432+
deleted_files_.clear();
433+
deleted_file_keys_.clear();
434+
duplicate_deletes_count_ = 0;
394435
replaced_manifests_count_ = 0;
395436
if (!base_snapshot) {
396-
ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes({}));
437+
ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes(FoundDeletes{}));
397438
return std::vector<ManifestFile>{};
398439
}
399440

@@ -431,11 +472,14 @@ Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
431472
}
432473
}
433474

434-
std::unordered_set<std::string> found_paths;
475+
FoundDeletes found_deletes;
435476
delete_files_.clear();
477+
deleted_files_.clear();
478+
deleted_file_keys_.clear();
479+
duplicate_deletes_count_ = 0;
436480
if (manifests.empty()) {
437481
replaced_manifests_count_ = 0;
438-
ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes(found_paths));
482+
ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes(found_deletes));
439483
return std::vector<ManifestFile>{};
440484
}
441485

@@ -452,14 +496,14 @@ Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
452496
ICEBERG_ASSIGN_OR_RAISE(
453497
auto filtered_manifest,
454498
FilterManifest(schema, specs_by_id, *manifest_ptr, trust_manifest_references,
455-
writer_factory, found_paths));
499+
writer_factory, found_deletes));
456500
if (filtered_manifest.manifest_path != manifest_ptr->manifest_path) {
457501
++replaced_manifests_count_;
458502
}
459503
filtered.push_back(std::move(filtered_manifest));
460504
}
461505

462-
ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes(found_paths));
506+
ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes(found_deletes));
463507
return filtered;
464508
}
465509

src/iceberg/manifest/manifest_filter_manager.h

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
#include <cstdint>
2727
#include <memory>
28+
#include <optional>
2829
#include <string>
2930
#include <unordered_map>
3031
#include <unordered_set>
@@ -97,6 +98,14 @@ class ICEBERG_EXPORT ManifestFilterManager {
9798
/// operations (e.g. RowDelta) to enumerate deleted data files for follow-up cleanup.
9899
const DataFileSet& FilesToBeDeleted() const;
99100

101+
/// \brief Returns content-file objects deleted by the most recent
102+
/// FilterManifests() call, deduplicated by content-file identity.
103+
const std::vector<std::shared_ptr<DataFile>>& DeletedFiles() const;
104+
105+
/// \brief Returns how many duplicate file deletes were found in the most recent
106+
/// FilterManifests() call.
107+
int32_t DuplicateDeletesCount() const { return duplicate_deletes_count_; }
108+
100109
/// \brief Register a partition for dropping.
101110
///
102111
/// Any manifest entry whose (spec_id, partition) pair matches will be marked DELETED.
@@ -205,12 +214,31 @@ class ICEBERG_EXPORT ManifestFilterManager {
205214
bool CanTrustManifestReferences(
206215
const std::vector<const ManifestFile*>& manifests) const;
207216

217+
struct DeleteFileKey {
218+
std::string path;
219+
std::optional<int64_t> content_offset;
220+
std::optional<int64_t> content_size_in_bytes;
221+
222+
bool operator==(const DeleteFileKey& other) const = default;
223+
};
224+
225+
struct DeleteFileKeyHash {
226+
size_t operator()(const DeleteFileKey& key) const;
227+
};
228+
229+
struct FoundDeletes {
230+
std::unordered_set<std::string> paths;
231+
std::unordered_set<DeleteFileKey, DeleteFileKeyHash> files;
232+
};
233+
234+
static DeleteFileKey MakeDeleteFileKey(const DataFile& file);
235+
208236
Result<ManifestFile> FilterManifest(const std::shared_ptr<Schema>& schema,
209237
const PartitionSpecsById& specs_by_id,
210238
const ManifestFile& manifest,
211239
bool trust_manifest_references,
212240
const ManifestWriterFactory& writer_factory,
213-
std::unordered_set<std::string>& found_paths);
241+
FoundDeletes& found_deletes);
214242

215243
Result<bool> ManifestHasDeletedFiles(const std::vector<ManifestEntry>& entries,
216244
const std::shared_ptr<Schema>& schema,
@@ -220,11 +248,9 @@ class ICEBERG_EXPORT ManifestFilterManager {
220248
Result<ManifestFile> FilterManifestWithDeletedFiles(
221249
const std::vector<ManifestEntry>& entries, int32_t manifest_spec_id,
222250
const std::shared_ptr<Schema>& schema, const PartitionSpecsById& specs_by_id,
223-
const ManifestWriterFactory& writer_factory,
224-
std::unordered_set<std::string>& found_paths);
251+
const ManifestWriterFactory& writer_factory, FoundDeletes& found_deletes);
225252

226-
Status ValidateRequiredDeletes(
227-
const std::unordered_set<std::string>& found_paths) const;
253+
Status ValidateRequiredDeletes(const FoundDeletes& found_deletes) const;
228254

229255
/// \brief Get or create a ManifestEvaluator for the given spec.
230256
Result<ManifestEvaluator*> GetManifestEvaluator(const std::shared_ptr<Schema>& schema,
@@ -247,11 +273,15 @@ class ICEBERG_EXPORT ManifestFilterManager {
247273

248274
std::shared_ptr<Expression> delete_expr_;
249275
std::unordered_set<std::string> delete_paths_;
276+
std::unordered_set<DeleteFileKey, DeleteFileKeyHash> delete_file_keys_;
250277
DataFileSet delete_files_;
278+
std::vector<std::shared_ptr<DataFile>> deleted_files_;
279+
std::unordered_set<DeleteFileKey, DeleteFileKeyHash> deleted_file_keys_;
251280
PartitionSet drop_partitions_;
252281
bool fail_missing_delete_paths_{false};
253282
bool fail_any_delete_{false};
254283
bool case_sensitive_{true};
284+
int32_t duplicate_deletes_count_{0};
255285

256286
int32_t replaced_manifests_count_{0};
257287

src/iceberg/test/manifest_filter_manager_test.cc

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,4 +562,84 @@ TEST_F(ManifestFilterManagerTest, RemoveDanglingDeletesForFiltersDanglingDV) {
562562
EXPECT_EQ(entries[0].status, ManifestStatus::kDeleted);
563563
}
564564

565+
TEST_F(ManifestFilterManagerTest, DeleteFileObjectMatchesDeletionVectorByContent) {
566+
auto metadata = *table_->metadata();
567+
metadata.format_version = 3;
568+
auto factory = MakeWriterFactory(metadata);
569+
570+
auto make_dv = [&](int64_t offset) {
571+
auto f = std::make_shared<DataFile>();
572+
f->content = DataFile::Content::kPositionDeletes;
573+
f->file_path = table_location_ + "/delete/dv.puffin";
574+
f->file_format = FileFormatType::kPuffin;
575+
f->referenced_data_file =
576+
std::format("{}/data/referenced-{}.parquet", table_location_, offset);
577+
f->content_offset = offset;
578+
f->content_size_in_bytes = 10;
579+
f->partition = PartitionValues(std::vector<Literal>{Literal::Long(1L)});
580+
f->file_size_in_bytes = 256;
581+
f->record_count = 5;
582+
f->partition_spec_id = spec_->spec_id();
583+
return f;
584+
};
585+
auto dv0 = make_dv(0);
586+
auto dv1 = make_dv(10);
587+
588+
auto manifest_path = std::format("{}/metadata/dv-manifest-{}.avro", table_location_,
589+
manifest_counter_++);
590+
ICEBERG_UNWRAP_OR_FAIL(
591+
auto manifest,
592+
WriteDeleteManifest({{dv0, 3L}, {dv1, 3L}}, file_io_, metadata, manifest_path));
593+
594+
ManifestFilterManager mgr(ManifestContent::kDeletes, file_io_);
595+
EXPECT_THAT(mgr.DeleteFile(dv0), IsOk());
596+
597+
std::vector<const ManifestFile*> manifests{&manifest};
598+
ICEBERG_UNWRAP_OR_FAIL(auto schema, metadata.Schema());
599+
ICEBERG_UNWRAP_OR_FAIL(
600+
auto result, mgr.FilterManifests(schema, SpecsById(metadata), manifests, factory));
601+
602+
ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, metadata));
603+
ASSERT_EQ(entries.size(), 2U);
604+
for (const auto& entry : entries) {
605+
ASSERT_NE(entry.data_file, nullptr);
606+
if (entry.data_file->content_offset == 0) {
607+
EXPECT_EQ(entry.status, ManifestStatus::kDeleted);
608+
} else {
609+
EXPECT_EQ(entry.status, ManifestStatus::kExisting);
610+
}
611+
}
612+
}
613+
614+
TEST_F(ManifestFilterManagerTest, DuplicateDeletesCountRepeatedDeletedFiles) {
615+
auto* metadata = table_->metadata().get();
616+
auto factory = MakeWriterFactory(*metadata);
617+
618+
auto del_file = std::make_shared<DataFile>();
619+
del_file->content = DataFile::Content::kPositionDeletes;
620+
del_file->file_path = table_location_ + "/delete/duplicate.parquet";
621+
del_file->file_format = FileFormatType::kParquet;
622+
del_file->partition = PartitionValues(std::vector<Literal>{Literal::Long(1L)});
623+
del_file->file_size_in_bytes = 512;
624+
del_file->record_count = 10;
625+
del_file->partition_spec_id = spec_->spec_id();
626+
627+
auto manifest_path = std::format("{}/metadata/dup-manifest-{}.avro", table_location_,
628+
manifest_counter_++);
629+
ICEBERG_UNWRAP_OR_FAIL(auto manifest,
630+
WriteDeleteManifest({{del_file, 3L}, {del_file, 3L}}, file_io_,
631+
*metadata, manifest_path));
632+
633+
ManifestFilterManager mgr(ManifestContent::kDeletes, file_io_);
634+
EXPECT_THAT(mgr.DeleteFile(del_file), IsOk());
635+
636+
std::vector<const ManifestFile*> manifests{&manifest};
637+
ICEBERG_UNWRAP_OR_FAIL(auto schema, metadata->Schema());
638+
ICEBERG_UNWRAP_OR_FAIL(
639+
auto result, mgr.FilterManifests(schema, SpecsById(*metadata), manifests, factory));
640+
641+
EXPECT_EQ(mgr.DeletedFiles().size(), 1U);
642+
EXPECT_EQ(mgr.DuplicateDeletesCount(), 1);
643+
}
644+
565645
} // namespace iceberg

0 commit comments

Comments
 (0)