Skip to content

Commit f360476

Browse files
committed
feat(manifest): add ManifestFilterManager and ManifestMergeManager
Implement two manifest management classes for table write operations: - ManifestFilterManager: filters manifest entries by row filter expression, file path, or partition value; supports FailMissingDeletePaths validation. Rewrites manifests that contain matching files, marking entries as DELETED; passes through manifests that cannot contain matching files unchanged. - ManifestMergeManager: merges small manifests using greedy bin-packing, grouping by partition_spec_id (manifests with different specs are never merged). Oversized manifests pass through unchanged. ADDED entries from prior manifests become EXISTING when merged (matching Java semantics).
1 parent fc80e4b commit f360476

12 files changed

Lines changed: 1208 additions & 0 deletions

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,10 @@ set(ICEBERG_SOURCES
4545
location_provider.cc
4646
manifest/manifest_adapter.cc
4747
manifest/manifest_entry.cc
48+
manifest/manifest_filter_manager.cc
4849
manifest/manifest_group.cc
4950
manifest/manifest_list.cc
51+
manifest/manifest_merge_manager.cc
5052
manifest/manifest_reader.cc
5153
manifest/manifest_util.cc
5254
manifest/manifest_writer.cc
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/manifest/manifest_filter_manager.h"
21+
22+
#include <string>
23+
#include <unordered_set>
24+
#include <vector>
25+
26+
#include "iceberg/expression/inclusive_metrics_evaluator.h"
27+
#include "iceberg/expression/manifest_evaluator.h"
28+
#include "iceberg/manifest/manifest_entry.h"
29+
#include "iceberg/manifest/manifest_list.h"
30+
#include "iceberg/manifest/manifest_reader.h"
31+
#include "iceberg/result.h"
32+
#include "iceberg/snapshot.h"
33+
#include "iceberg/table_metadata.h"
34+
#include "iceberg/util/macros.h"
35+
36+
namespace iceberg {
37+
38+
ManifestFilterManager::ManifestFilterManager(ManifestContent content,
39+
std::shared_ptr<FileIO> file_io)
40+
: manifest_content_(content), file_io_(std::move(file_io)) {}
41+
42+
void ManifestFilterManager::DeleteByRowFilter(std::shared_ptr<Expression> expr,
43+
bool case_sensitive) {
44+
delete_exprs_.push_back({.expr = std::move(expr), .case_sensitive = case_sensitive});
45+
}
46+
47+
void ManifestFilterManager::DeleteFile(std::string_view path) {
48+
std::string p(path);
49+
delete_paths_.insert(p);
50+
pending_paths_.insert(std::move(p));
51+
}
52+
53+
void ManifestFilterManager::DropPartition(int32_t spec_id, PartitionValues partition) {
54+
drop_partitions_.add(spec_id, std::move(partition));
55+
}
56+
57+
void ManifestFilterManager::FailMissingDeletePaths() {
58+
fail_missing_delete_paths_ = true;
59+
}
60+
61+
bool ManifestFilterManager::DeletesFiles() const {
62+
return !delete_exprs_.empty() || !delete_paths_.empty() || !drop_partitions_.empty();
63+
}
64+
65+
bool ManifestFilterManager::CanContainDroppedFiles() const {
66+
return !delete_paths_.empty();
67+
}
68+
69+
bool ManifestFilterManager::CanContainDroppedPartitions(const ManifestFile& manifest) {
70+
if (drop_partitions_.empty()) return false;
71+
// Only manifests whose partition spec matches a registered drop can contain
72+
// entries for that partition. PartitionKey is pair<spec_id, values>.
73+
int32_t spec_id = manifest.partition_spec_id;
74+
for (const auto& key : drop_partitions_) {
75+
if (key.first == spec_id) return true;
76+
}
77+
return false;
78+
}
79+
80+
bool ManifestFilterManager::CanContainExpressionDeletes(const ManifestFile& manifest,
81+
const TableMetadata& metadata) {
82+
if (delete_exprs_.empty()) return false;
83+
int32_t spec_id = manifest.partition_spec_id;
84+
for (const auto& delete_expr : delete_exprs_) {
85+
auto* evaluator_ptr =
86+
GetManifestEvaluator(metadata, spec_id, delete_expr).value_or(nullptr);
87+
if (evaluator_ptr == nullptr) return true; // conservative on error
88+
auto result = evaluator_ptr->Evaluate(manifest);
89+
if (!result.has_value() || result.value()) return true;
90+
}
91+
return false;
92+
}
93+
94+
bool ManifestFilterManager::CanContainDeletedFiles(const ManifestFile& manifest,
95+
const TableMetadata& metadata) {
96+
// A manifest with no live files cannot contain files to delete.
97+
bool has_live = (manifest.added_files_count.value_or(0) > 0) ||
98+
(manifest.existing_files_count.value_or(0) > 0);
99+
if (!has_live) return false;
100+
101+
return CanContainDroppedFiles() || CanContainExpressionDeletes(manifest, metadata) ||
102+
CanContainDroppedPartitions(manifest);
103+
}
104+
105+
Result<ManifestEvaluator*> ManifestFilterManager::GetManifestEvaluator(
106+
const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) {
107+
auto& vec = manifest_evaluator_cache_[spec_id];
108+
size_t idx = &de - delete_exprs_.data();
109+
if (idx >= vec.size()) {
110+
vec.resize(delete_exprs_.size());
111+
}
112+
if (!vec[idx]) {
113+
ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id));
114+
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
115+
ICEBERG_ASSIGN_OR_RAISE(vec[idx], ManifestEvaluator::MakeRowFilter(
116+
de.expr, spec, *schema, de.case_sensitive));
117+
}
118+
return vec[idx].get();
119+
}
120+
121+
Result<InclusiveMetricsEvaluator*> ManifestFilterManager::GetMetricsEvaluator(
122+
const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) {
123+
auto& vec = metrics_evaluator_cache_[spec_id];
124+
size_t idx = &de - delete_exprs_.data();
125+
if (idx >= vec.size()) {
126+
vec.resize(delete_exprs_.size());
127+
}
128+
if (!vec[idx]) {
129+
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
130+
ICEBERG_ASSIGN_OR_RAISE(
131+
vec[idx], InclusiveMetricsEvaluator::Make(de.expr, *schema, de.case_sensitive));
132+
}
133+
return vec[idx].get();
134+
}
135+
136+
Result<bool> ManifestFilterManager::ShouldDelete(const ManifestEntry& entry,
137+
const TableMetadata& metadata,
138+
int32_t manifest_spec_id) {
139+
if (!entry.data_file) return false;
140+
const DataFile& file = *entry.data_file;
141+
142+
// Path-based check
143+
if (delete_paths_.count(file.file_path)) {
144+
pending_paths_.erase(file.file_path);
145+
return true;
146+
}
147+
148+
// Partition-drop check
149+
int32_t spec_id = file.partition_spec_id.value_or(manifest_spec_id);
150+
if (drop_partitions_.contains(spec_id, file.partition)) {
151+
return true;
152+
}
153+
154+
// Expression-based check (inclusive metrics)
155+
for (const auto& de : delete_exprs_) {
156+
ICEBERG_ASSIGN_OR_RAISE(auto* eval, GetMetricsEvaluator(metadata, spec_id, de));
157+
ICEBERG_ASSIGN_OR_RAISE(auto matches, eval->Evaluate(file));
158+
if (matches) return true;
159+
}
160+
161+
return false;
162+
}
163+
164+
Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
165+
const TableMetadata& metadata, const std::shared_ptr<Snapshot>& base_snapshot,
166+
const ManifestWriterFactory& writer_factory) {
167+
// No base snapshot → nothing to filter
168+
if (!base_snapshot) return std::vector<ManifestFile>{};
169+
170+
// Load the relevant manifests from the manifest list
171+
ICEBERG_ASSIGN_OR_RAISE(
172+
auto list_reader, ManifestListReader::Make(base_snapshot->manifest_list, file_io_));
173+
ICEBERG_ASSIGN_OR_RAISE(auto all_manifests, list_reader->Files());
174+
175+
// Keep only manifests for this manager's content type
176+
std::vector<ManifestFile> manifests;
177+
manifests.reserve(all_manifests.size());
178+
for (const auto& m : all_manifests) {
179+
if (m.content == manifest_content_) manifests.push_back(m);
180+
}
181+
182+
// No conditions registered → return unchanged
183+
if (!DeletesFiles()) return manifests;
184+
185+
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
186+
187+
std::vector<ManifestFile> result;
188+
result.reserve(manifests.size());
189+
190+
for (const auto& manifest : manifests) {
191+
// Fast path: metadata skip
192+
if (!CanContainDeletedFiles(manifest, metadata)) {
193+
result.push_back(manifest);
194+
continue;
195+
}
196+
197+
int32_t spec_id = manifest.partition_spec_id;
198+
ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id));
199+
200+
// Read all live entries from the manifest
201+
ICEBERG_ASSIGN_OR_RAISE(auto reader,
202+
ManifestReader::Make(manifest, file_io_, schema, spec));
203+
ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries());
204+
205+
// Check whether any entry should be deleted
206+
bool has_deletes = false;
207+
for (const auto& entry : entries) {
208+
ICEBERG_ASSIGN_OR_RAISE(auto should_delete, ShouldDelete(entry, metadata, spec_id));
209+
if (should_delete) {
210+
has_deletes = true;
211+
break;
212+
}
213+
}
214+
215+
if (!has_deletes) {
216+
result.push_back(manifest);
217+
continue;
218+
}
219+
220+
// Rewrite the manifest with deleted entries marked
221+
ICEBERG_ASSIGN_OR_RAISE(auto writer, writer_factory(spec_id, manifest_content_));
222+
for (const auto& entry : entries) {
223+
ICEBERG_ASSIGN_OR_RAISE(auto should_delete, ShouldDelete(entry, metadata, spec_id));
224+
if (should_delete) {
225+
ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry));
226+
} else {
227+
ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry));
228+
}
229+
}
230+
ICEBERG_RETURN_UNEXPECTED(writer->Close());
231+
ICEBERG_ASSIGN_OR_RAISE(auto filtered_manifest, writer->ToManifestFile());
232+
result.push_back(std::move(filtered_manifest));
233+
}
234+
235+
// Validate that all registered delete paths were found
236+
if (fail_missing_delete_paths_ && !pending_paths_.empty()) {
237+
std::string missing;
238+
for (const auto& p : pending_paths_) {
239+
if (!missing.empty()) missing += ", ";
240+
missing += p;
241+
}
242+
return InvalidArgument("Missing delete paths: {}", missing);
243+
}
244+
245+
return result;
246+
}
247+
248+
} // namespace iceberg

0 commit comments

Comments
 (0)