Skip to content

Commit ef7b72f

Browse files
committed
align java model
1 parent 2c2365f commit ef7b72f

7 files changed

Lines changed: 464 additions & 261 deletions

src/iceberg/manifest/manifest_filter_manager.cc

Lines changed: 247 additions & 152 deletions
Large diffs are not rendered by default.

src/iceberg/manifest/manifest_filter_manager.h

Lines changed: 74 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,13 @@
2323
/// Filters an existing snapshot's manifest list, marking data files as DELETED
2424
/// or EXISTING based on row-filter expressions, exact path deletes, and partition drops.
2525

26-
#include <functional>
26+
#include <cstdint>
2727
#include <memory>
2828
#include <string>
2929
#include <unordered_map>
3030
#include <unordered_set>
3131
#include <vector>
3232

33-
#include "iceberg/expression/inclusive_metrics_evaluator.h"
34-
#include "iceberg/expression/manifest_evaluator.h"
35-
#include "iceberg/expression/residual_evaluator.h"
36-
#include "iceberg/expression/strict_metrics_evaluator.h"
3733
#include "iceberg/iceberg_export.h"
3834
#include "iceberg/manifest/manifest_list.h"
3935
#include "iceberg/manifest/manifest_writer.h"
@@ -44,15 +40,6 @@
4440

4541
namespace iceberg {
4642

47-
/// \brief Factory type for creating ManifestWriter instances during filtering/merging.
48-
///
49-
/// The factory receives the partition spec ID (to look up the spec) and the manifest
50-
/// content type, and returns a new ManifestWriter ready for writing. The caller
51-
/// (i.e. MergingSnapshotUpdate in PR2) captures metadata, FileIO, and snapshot ID
52-
/// inside the lambda.
53-
using ManifestWriterFactory = std::function<Result<std::unique_ptr<ManifestWriter>>(
54-
int32_t spec_id, ManifestContent content)>;
55-
5643
/// \brief Filters an existing snapshot's manifest list.
5744
///
5845
/// The manager accumulates delete conditions incrementally, then applies them all
@@ -63,15 +50,16 @@ using ManifestWriterFactory = std::function<Result<std::unique_ptr<ManifestWrite
6350
/// The manager is content-agnostic: pass ManifestContent::kData to process data
6451
/// manifests, or ManifestContent::kDeletes to process delete manifests.
6552
///
66-
/// \note For ManifestContent::kDeletes, this implementation applies
67-
/// path/partition/row-filter/object-level deletes. Java's additional cleanup
68-
/// semantics (dropDeleteFilesOlderThan, removeDanglingDeletesFor / DV cleanup)
69-
/// are not yet implemented and are planned for a later layer.
53+
/// TODO(Guotao): For ManifestContent::kDeletes, implement cleanup for orphan delete files
54+
/// and dangling deletion vectors.
7055
///
7156
/// \note This class is non-copyable and non-movable.
7257
class ICEBERG_EXPORT ManifestFilterManager {
7358
public:
59+
using PartitionSpecsById = std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>;
60+
7461
ManifestFilterManager(ManifestContent content, std::shared_ptr<FileIO> file_io);
62+
~ManifestFilterManager();
7563

7664
ManifestFilterManager(const ManifestFilterManager&) = delete;
7765
ManifestFilterManager& operator=(const ManifestFilterManager&) = delete;
@@ -82,7 +70,7 @@ class ICEBERG_EXPORT ManifestFilterManager {
8270
/// expression will be marked DELETED.
8371
///
8472
/// \param expr The expression to match files against
85-
void DeleteByRowFilter(std::shared_ptr<Expression> expr);
73+
Status DeleteByRowFilter(std::shared_ptr<Expression> expr);
8674

8775
/// \brief Set whether row-filter field binding is case-sensitive.
8876
void CaseSensitive(bool case_sensitive);
@@ -98,11 +86,11 @@ class ICEBERG_EXPORT ManifestFilterManager {
9886
///
9987
/// Any manifest entry whose file_path matches file->file_path will be marked
10088
/// DELETED. The file object is retained in FilesToBeDeleted(), allowing callers
101-
/// (e.g. RowDelta in PR5) to enumerate deleted file objects for DV cleanup.
89+
/// to enumerate deleted file objects for follow-up delete-file cleanup.
10290
/// Duplicate registrations (same path) are silently ignored.
10391
///
10492
/// \param file The data/delete file to delete (must not be null)
105-
void DeleteFile(std::shared_ptr<DataFile> file);
93+
Status DeleteFile(std::shared_ptr<DataFile> file);
10694

10795
/// \brief Returns the set of file objects marked for deletion by this manager.
10896
///
@@ -126,9 +114,6 @@ class ICEBERG_EXPORT ManifestFilterManager {
126114

127115
/// \brief Set a flag that makes FilterManifests() return an error if any
128116
/// manifest entry matches a delete condition.
129-
///
130-
/// Used by conflict-detection logic (e.g. OverwriteFiles in PR4) to detect
131-
/// whether a concurrent delete would invalidate a set of newly written files.
132117
void FailAnyDelete();
133118

134119
/// \brief Returns true if any delete condition has been registered.
@@ -147,59 +132,97 @@ class ICEBERG_EXPORT ManifestFilterManager {
147132
const TableMetadata& metadata, const std::shared_ptr<Snapshot>& base_snapshot,
148133
const ManifestWriterFactory& writer_factory);
149134

150-
private:
151-
struct DeleteExpr {
152-
std::shared_ptr<Expression> expr;
153-
};
135+
/// \brief Apply all accumulated delete conditions to the provided manifests.
136+
///
137+
/// This overload accepts only the context needed for filtering. It is intended for
138+
/// callers that already have the active schema, partition specs, and manifest list.
139+
///
140+
/// \param schema Active schema to bind row-filter expressions and metrics evaluators
141+
/// \param specs_by_id All partition specs keyed by spec ID
142+
/// \param manifests Manifest descriptors to filter
143+
/// \param writer_factory Factory to create new ManifestWriter instances
144+
/// \return The filtered manifest list, or an error
145+
Result<std::vector<ManifestFile>> FilterManifests(
146+
const std::shared_ptr<Schema>& schema, const PartitionSpecsById& specs_by_id,
147+
const std::vector<const ManifestFile*>& manifests,
148+
const ManifestWriterFactory& writer_factory);
154149

150+
private:
155151
/// \brief Returns true if the manifest might contain files matching any expression.
156-
bool CanContainExpressionDeletes(const ManifestFile& manifest,
157-
const TableMetadata& metadata);
152+
Result<bool> CanContainExpressionDeletes(const ManifestFile& manifest,
153+
const std::shared_ptr<Schema>& schema,
154+
const PartitionSpecsById& specs_by_id);
158155

159156
/// \brief Returns true if the manifest might contain files in a dropped partition.
160157
///
161158
/// Checks whether the manifest's partition_spec_id matches any spec_id registered
162159
/// via DropPartition(). Manifests from a different spec cannot contain the dropped
163-
/// partition values. A more precise implementation could also compare
164-
/// partition_summaries bounds, but that requires decoding binary bounds against the
165-
/// PartitionSpec, which is not yet available at this call site.
166-
bool CanContainDroppedPartitions(const ManifestFile& manifest);
160+
/// partition values.
161+
Result<bool> CanContainDroppedPartitions(const ManifestFile& manifest) const;
167162

168163
/// \brief Returns true if the manifest might contain path-deleted files.
169-
bool CanContainDroppedFiles() const;
164+
Result<bool> CanContainDroppedFiles(const ManifestFile& manifest) const;
170165

171166
/// \brief Returns true if the manifest possibly contains any deleted file.
172-
bool CanContainDeletedFiles(const ManifestFile& manifest,
173-
const TableMetadata& metadata);
174-
175-
/// \brief Get or create a ManifestEvaluator for the given spec and expression.
176-
Result<ManifestEvaluator*> GetManifestEvaluator(const TableMetadata& metadata,
177-
int32_t spec_id, const DeleteExpr& de);
178-
179-
/// \brief Get or create a ResidualEvaluator for the given spec and expression.
180-
Result<ResidualEvaluator*> GetResidualEvaluator(const TableMetadata& metadata,
181-
int32_t spec_id, const DeleteExpr& de);
167+
Result<bool> CanContainDeletedFiles(const ManifestFile& manifest,
168+
const std::shared_ptr<Schema>& schema,
169+
const PartitionSpecsById& specs_by_id,
170+
bool trust_manifest_references);
171+
172+
bool CanTrustManifestReferences(
173+
const std::vector<const ManifestFile*>& manifests) const;
174+
175+
Result<ManifestFile> FilterManifest(const std::shared_ptr<Schema>& schema,
176+
const PartitionSpecsById& specs_by_id,
177+
const ManifestFile& manifest,
178+
bool trust_manifest_references,
179+
const ManifestWriterFactory& writer_factory,
180+
std::unordered_set<std::string>& found_paths);
181+
182+
Result<bool> ManifestHasDeletedFiles(const std::vector<ManifestEntry>& entries,
183+
const std::shared_ptr<Schema>& schema,
184+
const PartitionSpecsById& specs_by_id,
185+
int32_t manifest_spec_id);
186+
187+
Result<ManifestFile> FilterManifestWithDeletedFiles(
188+
const std::vector<ManifestEntry>& entries, int32_t manifest_spec_id,
189+
const std::shared_ptr<Schema>& schema, const PartitionSpecsById& specs_by_id,
190+
const ManifestWriterFactory& writer_factory,
191+
std::unordered_set<std::string>& found_paths);
192+
193+
Status ValidateRequiredDeletes(
194+
const std::unordered_set<std::string>& found_paths) const;
195+
196+
/// \brief Get or create a ManifestEvaluator for the given spec.
197+
Result<ManifestEvaluator*> GetManifestEvaluator(const std::shared_ptr<Schema>& schema,
198+
const PartitionSpecsById& specs_by_id,
199+
int32_t spec_id);
200+
201+
/// \brief Get or create a ResidualEvaluator for the given spec.
202+
Result<ResidualEvaluator*> GetResidualEvaluator(const std::shared_ptr<Schema>& schema,
203+
const PartitionSpecsById& specs_by_id,
204+
int32_t spec_id);
182205

183206
/// \brief Check whether a single entry should be deleted.
184-
Result<bool> ShouldDelete(const ManifestEntry& entry, const TableMetadata& metadata,
207+
Result<bool> ShouldDelete(const ManifestEntry& entry,
208+
const std::shared_ptr<Schema>& schema,
209+
const PartitionSpecsById& specs_by_id,
185210
int32_t manifest_spec_id);
186211

187212
const ManifestContent manifest_content_;
188213
std::shared_ptr<FileIO> file_io_;
189214

190-
std::vector<DeleteExpr> delete_exprs_;
215+
std::shared_ptr<Expression> delete_expr_;
191216
std::unordered_set<std::string> delete_paths_;
192217
DataFileSet delete_files_;
193218
PartitionSet drop_partitions_;
194219
bool fail_missing_delete_paths_{false};
195220
bool fail_any_delete_{false};
196221
bool case_sensitive_{true};
197222

198-
// Cache: (spec_id, expr_index) → ManifestEvaluator
199-
std::unordered_map<int32_t, std::vector<std::unique_ptr<ManifestEvaluator>>>
223+
std::unordered_map<int32_t, std::unique_ptr<ManifestEvaluator>>
200224
manifest_evaluator_cache_;
201-
// Cache: (spec_id, expr_index) → ResidualEvaluator
202-
std::unordered_map<int32_t, std::vector<std::unique_ptr<ResidualEvaluator>>>
225+
std::unordered_map<int32_t, std::unique_ptr<ResidualEvaluator>>
203226
residual_evaluator_cache_;
204227
};
205228

src/iceberg/manifest/manifest_merge_manager.cc

Lines changed: 50 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
#include "iceberg/manifest/manifest_merge_manager.h"
2121

2222
#include <algorithm>
23+
#include <array>
24+
#include <iterator>
2325
#include <map>
2426
#include <ranges>
2527
#include <utility>
@@ -43,70 +45,76 @@ Result<std::vector<ManifestFile>> ManifestMergeManager::MergeManifests(
4345
const std::vector<ManifestFile>& new_manifests, int64_t snapshot_id,
4446
const TableMetadata& metadata, std::shared_ptr<FileIO> file_io,
4547
const ManifestWriterFactory& writer_factory) {
46-
// Combine new then existing (new-first ordering is preserved in output)
47-
std::vector<ManifestFile> all;
48+
// Combine new then existing (new-first ordering is preserved in output).
49+
auto to_manifest_ptr = [](const ManifestFile& manifest) { return &manifest; };
50+
auto manifest_ranges = std::array{
51+
new_manifests | std::views::transform(to_manifest_ptr),
52+
existing_manifests | std::views::transform(to_manifest_ptr),
53+
};
54+
55+
std::vector<const ManifestFile*> all;
4856
all.reserve(new_manifests.size() + existing_manifests.size());
49-
all.insert(all.end(), new_manifests.begin(), new_manifests.end());
50-
all.insert(all.end(), existing_manifests.begin(), existing_manifests.end());
57+
std::ranges::copy(manifest_ranges | std::views::join, std::back_inserter(all));
5158

5259
if (all.empty() || !merge_enabled_) {
53-
return all;
60+
return all |
61+
std::views::transform([](const ManifestFile* manifest) { return *manifest; }) |
62+
std::ranges::to<std::vector<ManifestFile>>();
5463
}
5564

56-
// Match Java's separate DataFileMergeManager/DeleteFileMergeManager behavior by
57-
// tracking the first (newest) manifest independently per content type.
65+
// Track the first (newest) manifest independently per content type.
5866
std::map<ManifestContent, const ManifestFile*> first_by_content;
59-
for (const auto& manifest : all) {
60-
first_by_content.try_emplace(manifest.content, &manifest);
61-
}
67+
std::ranges::for_each(all, [&first_by_content](const ManifestFile* manifest) {
68+
first_by_content.try_emplace(manifest->content, manifest);
69+
});
6270

63-
// Group manifests by (partition_spec_id, content) — never merge across specs or
64-
// content types. Use reverse spec ordering to match Java's reverse-TreeMap behaviour,
65-
// which is observable in v3 tables where first-row IDs are assigned in output order.
71+
// Group manifests by (partition_spec_id, content), never merging across specs or
72+
// content types. Reverse spec ordering preserves v3 first-row-id assignment order.
6673
using GroupKey = std::pair<int32_t, ManifestContent>;
67-
std::map<GroupKey, std::vector<ManifestFile>, std::greater<>> by_spec;
68-
for (const auto& m : all) {
69-
by_spec[{m.partition_spec_id, m.content}].push_back(m);
70-
}
74+
auto group_key = [](const ManifestFile* manifest) {
75+
return GroupKey{manifest->partition_spec_id, manifest->content};
76+
};
77+
78+
std::map<GroupKey, std::vector<const ManifestFile*>, std::greater<>> by_spec;
79+
std::ranges::for_each(all, [&by_spec, &group_key](const ManifestFile* manifest) {
80+
by_spec[group_key(manifest)].push_back(manifest);
81+
});
7182

7283
std::vector<ManifestFile> result;
7384
result.reserve(all.size());
7485
for (auto& [key, group] : by_spec) {
7586
const auto* first = first_by_content.at(key.second);
76-
ICEBERG_ASSIGN_OR_RAISE(auto merged, MergeGroup(group, *first, snapshot_id, metadata,
87+
ICEBERG_ASSIGN_OR_RAISE(auto merged, MergeGroup(group, first, snapshot_id, metadata,
7788
file_io, writer_factory));
78-
result.insert(result.end(), std::make_move_iterator(merged.begin()),
79-
std::make_move_iterator(merged.end()));
89+
std::ranges::move(merged, std::back_inserter(result));
8090
}
8191
return result;
8292
}
8393

8494
Result<std::vector<ManifestFile>> ManifestMergeManager::MergeGroup(
85-
const std::vector<ManifestFile>& group, const ManifestFile& first,
95+
const std::vector<const ManifestFile*>& group, const ManifestFile* first,
8696
int64_t snapshot_id, const TableMetadata& metadata, std::shared_ptr<FileIO> file_io,
8797
const ManifestWriterFactory& writer_factory) {
88-
// Mirror Java's ListPacker.packEnd(group, ManifestFile::length) with lookback 1:
98+
// Match packEnd(group, ManifestFile::length) with lookback 1:
8999
// 1. Process manifests in reverse order (oldest-first).
90100
// 2. Greedy forward-pack with lookback=1: emit the current bin when the next item
91101
// doesn't fit, then start a new bin.
92102
// 3. Reverse each bin (restoring original item order within a bin).
93103
// 4. Reverse the bin list (newest manifest's bin ends up first).
94-
// Effect: the newest manifest is in the first, possibly under-filled, bin — exactly
95-
// what Java's comment describes ("the manifest that gets under-filled is the first one,
96-
// which will be merged the next time").
97-
std::vector<std::vector<ManifestFile>> bins;
98-
std::vector<ManifestFile> current_bin;
104+
// Effect: the newest manifest is in the first, possibly under-filled, bin.
105+
std::vector<std::vector<const ManifestFile*>> bins;
106+
std::vector<const ManifestFile*> current_bin;
99107
int64_t bin_size = 0;
100108

101-
for (const auto& manifest : std::views::reverse(group)) {
109+
for (const auto* manifest : std::views::reverse(group)) {
102110
if (!current_bin.empty() &&
103-
bin_size + manifest.manifest_length > target_size_bytes_) {
111+
bin_size + manifest->manifest_length > target_size_bytes_) {
104112
bins.push_back(std::move(current_bin));
105113
current_bin.clear();
106114
bin_size = 0;
107115
}
108116
current_bin.push_back(manifest);
109-
bin_size += manifest.manifest_length;
117+
bin_size += manifest->manifest_length;
110118
}
111119
if (!current_bin.empty()) {
112120
bins.push_back(std::move(current_bin));
@@ -118,13 +126,17 @@ Result<std::vector<ManifestFile>> ManifestMergeManager::MergeGroup(
118126
std::ranges::reverse(bins);
119127

120128
// Process each bin: if the bin contains the newest manifest and is too small,
121-
// pass its contents through unchanged (mirrors Java's minCountToMerge logic).
129+
// pass its contents through unchanged.
122130
std::vector<ManifestFile> result;
123131
result.reserve(group.size());
132+
// TODO(Guotao): Flush independent bins in parallel and cache successful merged bins
133+
// for commit retries.
124134
for (auto& bin : bins) {
125135
bool contains_first = std::ranges::find(bin, first) != bin.end();
126136
if (contains_first && std::cmp_less(bin.size(), min_count_to_merge_)) {
127-
result.insert(result.end(), bin.begin(), bin.end());
137+
for (const auto* manifest : bin) {
138+
result.push_back(*manifest);
139+
}
128140
} else {
129141
ICEBERG_ASSIGN_OR_RAISE(
130142
auto merged, FlushBin(bin, snapshot_id, metadata, file_io, writer_factory));
@@ -136,23 +148,23 @@ Result<std::vector<ManifestFile>> ManifestMergeManager::MergeGroup(
136148
}
137149

138150
Result<ManifestFile> ManifestMergeManager::FlushBin(
139-
const std::vector<ManifestFile>& bin, int64_t snapshot_id,
151+
const std::vector<const ManifestFile*>& bin, int64_t snapshot_id,
140152
const TableMetadata& metadata, std::shared_ptr<FileIO> file_io,
141153
const ManifestWriterFactory& writer_factory) {
142154
// A single-manifest bin requires no merging.
143-
if (bin.size() == 1) return bin[0];
155+
if (bin.size() == 1) return *bin[0];
144156

145-
const ManifestFile& first = bin[0];
157+
const ManifestFile& first = *bin[0];
146158
int32_t spec_id = first.partition_spec_id;
147159

148160
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
149161
ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id));
150162

151163
ICEBERG_ASSIGN_OR_RAISE(auto writer, writer_factory(spec_id, first.content));
152164

153-
for (const auto& manifest : bin) {
165+
for (const auto* manifest : bin) {
154166
ICEBERG_ASSIGN_OR_RAISE(auto reader,
155-
ManifestReader::Make(manifest, file_io, schema, spec));
167+
ManifestReader::Make(*manifest, file_io, schema, spec));
156168
ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
157169
for (const auto& entry : entries) {
158170
bool is_current =
@@ -175,4 +187,5 @@ Result<ManifestFile> ManifestMergeManager::FlushBin(
175187
ICEBERG_RETURN_UNEXPECTED(writer->Close());
176188
return writer->ToManifestFile();
177189
}
190+
178191
} // namespace iceberg

0 commit comments

Comments
 (0)