Skip to content

Commit e8abb0c

Browse files
committed
feat(manifest): add ManifestFilterManager and ManifestMergeManager
Implement two manifest management classes for table write operations: - ManifestFilterManager: filters manifest entries by row filter expression, file path, or partition value; supports FailMissingDeletePaths validation. Rewrites manifests that contain matching files, marking entries as DELETED; passes through manifests that cannot contain matching files unchanged. - ManifestMergeManager: merges small manifests using greedy bin-packing, grouping by partition_spec_id (manifests with different specs are never merged). Oversized manifests pass through unchanged. ADDED entries from prior manifests become EXISTING when merged (matching Java semantics). Also fix Schema destructor declaration to resolve a clang compilation issue with incomplete SchemaCache type in unique_ptr (PIMPL pattern fix).
1 parent fc80e4b commit e8abb0c

12 files changed

Lines changed: 1174 additions & 0 deletions

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,10 @@ set(ICEBERG_SOURCES
4545
location_provider.cc
4646
manifest/manifest_adapter.cc
4747
manifest/manifest_entry.cc
48+
manifest/manifest_filter_manager.cc
4849
manifest/manifest_group.cc
4950
manifest/manifest_list.cc
51+
manifest/manifest_merge_manager.cc
5052
manifest/manifest_reader.cc
5153
manifest/manifest_util.cc
5254
manifest/manifest_writer.cc
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/manifest/manifest_filter_manager.h"
21+
22+
#include <string>
23+
#include <unordered_set>
24+
#include <vector>
25+
26+
#include "iceberg/expression/inclusive_metrics_evaluator.h"
27+
#include "iceberg/expression/manifest_evaluator.h"
28+
#include "iceberg/manifest/manifest_entry.h"
29+
#include "iceberg/manifest/manifest_list.h"
30+
#include "iceberg/manifest/manifest_reader.h"
31+
#include "iceberg/result.h"
32+
#include "iceberg/snapshot.h"
33+
#include "iceberg/table_metadata.h"
34+
#include "iceberg/util/macros.h"
35+
36+
namespace iceberg {
37+
38+
ManifestFilterManager::ManifestFilterManager(ManifestContent content,
39+
std::shared_ptr<FileIO> file_io)
40+
: manifest_content_(content), file_io_(std::move(file_io)) {}
41+
42+
void ManifestFilterManager::DeleteByRowFilter(std::shared_ptr<Expression> expr,
43+
bool case_sensitive) {
44+
delete_exprs_.push_back({std::move(expr), case_sensitive});
45+
}
46+
47+
void ManifestFilterManager::DeleteFile(std::string_view path) {
48+
std::string p(path);
49+
delete_paths_.insert(p);
50+
pending_paths_.insert(std::move(p));
51+
}
52+
53+
void ManifestFilterManager::DropPartition(int32_t spec_id, PartitionValues partition) {
54+
drop_partitions_.add(spec_id, std::move(partition));
55+
}
56+
57+
void ManifestFilterManager::FailMissingDeletePaths() {
58+
fail_missing_delete_paths_ = true;
59+
}
60+
61+
bool ManifestFilterManager::DeletesFiles() const {
62+
return !delete_exprs_.empty() || !delete_paths_.empty() || !drop_partitions_.empty();
63+
}
64+
65+
bool ManifestFilterManager::CanContainDroppedFiles() const {
66+
return !delete_paths_.empty();
67+
}
68+
69+
bool ManifestFilterManager::CanContainDroppedPartitions(const ManifestFile& manifest) {
70+
if (drop_partitions_.empty()) return false;
71+
// Without a partition filter helper, we conservatively say yes.
72+
// A full implementation would use ManifestFileUtil::canContainAny; for now
73+
// we open the manifest and let per-entry checks decide.
74+
(void)manifest;
75+
return true;
76+
}
77+
78+
bool ManifestFilterManager::CanContainExpressionDeletes(const ManifestFile& manifest,
79+
const TableMetadata& metadata) {
80+
if (delete_exprs_.empty()) return false;
81+
int32_t spec_id = manifest.partition_spec_id;
82+
for (size_t i = 0; i < delete_exprs_.size(); ++i) {
83+
auto* evaluator_ptr = GetManifestEvaluator(metadata, spec_id, delete_exprs_[i])
84+
.value_or(nullptr);
85+
if (evaluator_ptr == nullptr) return true; // conservative on error
86+
auto result = evaluator_ptr->Evaluate(manifest);
87+
if (!result.has_value() || result.value()) return true;
88+
}
89+
return false;
90+
}
91+
92+
bool ManifestFilterManager::CanContainDeletedFiles(const ManifestFile& manifest,
93+
const TableMetadata& metadata) {
94+
// A manifest with no live files cannot contain files to delete.
95+
bool has_live = (manifest.added_files_count.value_or(0) > 0) ||
96+
(manifest.existing_files_count.value_or(0) > 0);
97+
if (!has_live) return false;
98+
99+
return CanContainDroppedFiles() || CanContainExpressionDeletes(manifest, metadata) ||
100+
CanContainDroppedPartitions(manifest);
101+
}
102+
103+
Result<ManifestEvaluator*> ManifestFilterManager::GetManifestEvaluator(
104+
const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) {
105+
auto& vec = manifest_evaluator_cache_[spec_id];
106+
size_t idx = &de - delete_exprs_.data();
107+
if (idx >= vec.size()) {
108+
vec.resize(delete_exprs_.size());
109+
}
110+
if (!vec[idx]) {
111+
ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id));
112+
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
113+
ICEBERG_ASSIGN_OR_RAISE(vec[idx], ManifestEvaluator::MakeRowFilter(
114+
de.expr, spec, *schema, de.case_sensitive));
115+
}
116+
return vec[idx].get();
117+
}
118+
119+
Result<InclusiveMetricsEvaluator*> ManifestFilterManager::GetMetricsEvaluator(
120+
const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) {
121+
auto& vec = metrics_evaluator_cache_[spec_id];
122+
size_t idx = &de - delete_exprs_.data();
123+
if (idx >= vec.size()) {
124+
vec.resize(delete_exprs_.size());
125+
}
126+
if (!vec[idx]) {
127+
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
128+
ICEBERG_ASSIGN_OR_RAISE(vec[idx],
129+
InclusiveMetricsEvaluator::Make(de.expr, *schema,
130+
de.case_sensitive));
131+
}
132+
return vec[idx].get();
133+
}
134+
135+
bool ManifestFilterManager::ShouldDelete(const ManifestEntry& entry,
136+
const TableMetadata& metadata,
137+
int32_t manifest_spec_id) {
138+
if (!entry.data_file) return false;
139+
const DataFile& file = *entry.data_file;
140+
141+
// Path-based check
142+
if (delete_paths_.count(file.file_path)) {
143+
pending_paths_.erase(file.file_path);
144+
return true;
145+
}
146+
147+
// Partition-drop check
148+
int32_t spec_id = file.partition_spec_id.value_or(manifest_spec_id);
149+
if (drop_partitions_.contains(spec_id, file.partition)) {
150+
return true;
151+
}
152+
153+
// Expression-based check (inclusive metrics)
154+
for (const auto& de : delete_exprs_) {
155+
auto* eval = GetMetricsEvaluator(metadata, spec_id, de).value_or(nullptr);
156+
if (eval == nullptr) return true; // conservative on error
157+
auto result = eval->Evaluate(file);
158+
if (!result.has_value() || result.value()) return true;
159+
}
160+
161+
return false;
162+
}
163+
164+
Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
165+
const TableMetadata& metadata, const std::shared_ptr<Snapshot>& base_snapshot,
166+
const ManifestWriterFactory& writer_factory) {
167+
// No base snapshot → nothing to filter
168+
if (!base_snapshot) return std::vector<ManifestFile>{};
169+
170+
// Load the relevant manifests from the manifest list
171+
ICEBERG_ASSIGN_OR_RAISE(auto list_reader,
172+
ManifestListReader::Make(base_snapshot->manifest_list, file_io_));
173+
ICEBERG_ASSIGN_OR_RAISE(auto all_manifests, list_reader->Files());
174+
175+
// Keep only manifests for this manager's content type
176+
std::vector<ManifestFile> manifests;
177+
manifests.reserve(all_manifests.size());
178+
for (const auto& m : all_manifests) {
179+
if (m.content == manifest_content_) manifests.push_back(m);
180+
}
181+
182+
// No conditions registered → return unchanged
183+
if (!DeletesFiles()) return manifests;
184+
185+
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
186+
187+
std::vector<ManifestFile> result;
188+
result.reserve(manifests.size());
189+
190+
for (const auto& manifest : manifests) {
191+
// Fast path: metadata skip
192+
if (!CanContainDeletedFiles(manifest, metadata)) {
193+
result.push_back(manifest);
194+
continue;
195+
}
196+
197+
int32_t spec_id = manifest.partition_spec_id;
198+
ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id));
199+
200+
// Read all live entries from the manifest
201+
ICEBERG_ASSIGN_OR_RAISE(auto reader,
202+
ManifestReader::Make(manifest, file_io_, schema, spec));
203+
ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries());
204+
205+
// Check whether any entry should be deleted
206+
bool has_deletes = false;
207+
for (const auto& entry : entries) {
208+
if (ShouldDelete(entry, metadata, spec_id)) {
209+
has_deletes = true;
210+
break;
211+
}
212+
}
213+
214+
if (!has_deletes) {
215+
result.push_back(manifest);
216+
continue;
217+
}
218+
219+
// Rewrite the manifest with deleted entries marked
220+
ICEBERG_ASSIGN_OR_RAISE(auto writer, writer_factory(spec_id, manifest_content_));
221+
for (const auto& entry : entries) {
222+
if (ShouldDelete(entry, metadata, spec_id)) {
223+
ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry));
224+
} else {
225+
ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry));
226+
}
227+
}
228+
ICEBERG_RETURN_UNEXPECTED(writer->Close());
229+
ICEBERG_ASSIGN_OR_RAISE(auto filtered_manifest, writer->ToManifestFile());
230+
result.push_back(std::move(filtered_manifest));
231+
}
232+
233+
// Validate that all registered delete paths were found
234+
if (fail_missing_delete_paths_ && !pending_paths_.empty()) {
235+
std::string missing;
236+
for (const auto& p : pending_paths_) {
237+
if (!missing.empty()) missing += ", ";
238+
missing += p;
239+
}
240+
return InvalidArgument("Missing delete paths: {}", missing);
241+
}
242+
243+
return result;
244+
}
245+
246+
} // namespace iceberg

0 commit comments

Comments
 (0)