Skip to content

Commit ffde4e8

Browse files
committed
polish code to align with Java parity
1 parent 1001097 commit ffde4e8

23 files changed

Lines changed: 2806 additions & 1101 deletions

src/iceberg/manifest/manifest_filter_manager.cc

Lines changed: 253 additions & 99 deletions
Large diffs are not rendered by default.

src/iceberg/manifest/manifest_filter_manager.h

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
/// or EXISTING based on row-filter expressions, exact path deletes, and partition drops.
2525

2626
#include <cstdint>
27+
#include <functional>
2728
#include <memory>
28-
#include <optional>
2929
#include <string>
3030
#include <unordered_map>
3131
#include <unordered_set>
@@ -35,6 +35,7 @@
3535
#include "iceberg/manifest/manifest_list.h"
3636
#include "iceberg/manifest/manifest_writer.h"
3737
#include "iceberg/result.h"
38+
#include "iceberg/snapshot.h"
3839
#include "iceberg/type_fwd.h"
3940
#include "iceberg/util/data_file_set.h"
4041
#include "iceberg/util/partition_value_util.h"
@@ -48,9 +49,6 @@ namespace iceberg {
4849
/// entries are returned unchanged (no I/O). Manifests that do contain deleted
4950
/// entries are rewritten with those entries marked DELETED.
5051
///
51-
/// The manager is content-agnostic: pass ManifestContent::kData to process data
52-
/// manifests, or ManifestContent::kDeletes to process delete manifests.
53-
///
5452
/// TODO(Guotao): For ManifestContent::kDeletes, implement cleanup for orphan delete files
5553
/// and dangling deletion vectors.
5654
///
@@ -59,7 +57,9 @@ class ICEBERG_EXPORT ManifestFilterManager {
5957
public:
6058
using PartitionSpecsById = std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>;
6159

62-
ManifestFilterManager(ManifestContent content, std::shared_ptr<FileIO> file_io);
60+
static Result<std::unique_ptr<ManifestFilterManager>> Make(
61+
ManifestContent content, std::shared_ptr<FileIO> file_io,
62+
std::function<Status(const std::string&)> delete_file = {});
6363
~ManifestFilterManager();
6464

6565
ManifestFilterManager(const ManifestFilterManager&) = delete;
@@ -81,7 +81,7 @@ class ICEBERG_EXPORT ManifestFilterManager {
8181
/// Any manifest entry whose file_path matches this path will be marked DELETED.
8282
///
8383
/// \param path The exact file path to delete
84-
void DeleteFile(std::string_view path);
84+
Status DeleteFile(std::string_view path);
8585

8686
/// \brief Register a file object for deletion.
8787
///
@@ -93,9 +93,8 @@ class ICEBERG_EXPORT ManifestFilterManager {
9393

9494
/// \brief Returns the set of file objects marked for deletion by this manager.
9595
///
96-
/// This is populated by the most recent FilterManifests() call and contains only
97-
/// files that were actually deleted from filtered manifests. Used by higher-level
98-
/// operations (e.g. RowDelta) to enumerate deleted data files for follow-up cleanup.
96+
/// Includes file objects explicitly registered for deletion plus files deleted while
97+
/// filtering manifests.
9998
const DataFileSet& FilesToBeDeleted() const;
10099

101100
/// \brief Returns content-file objects deleted by the most recent
@@ -106,13 +105,19 @@ class ICEBERG_EXPORT ManifestFilterManager {
106105
/// FilterManifests() call.
107106
int32_t DuplicateDeletesCount() const { return duplicate_deletes_count_; }
108107

108+
/// \brief Build a snapshot-summary fragment from filtered manifests.
109+
///
110+
Result<SnapshotSummaryBuilder> BuildSummary(
111+
const std::vector<ManifestFile>& manifests,
112+
const PartitionSpecsById& specs_by_id) const;
113+
109114
/// \brief Register a partition for dropping.
110115
///
111116
/// Any manifest entry whose (spec_id, partition) pair matches will be marked DELETED.
112117
///
113118
/// \param spec_id The partition spec ID
114119
/// \param partition The partition values to drop
115-
void DropPartition(int32_t spec_id, PartitionValues partition);
120+
Status DropPartition(int32_t spec_id, PartitionValues partition);
116121

117122
/// \brief Set a flag that makes FilterManifests() fail if any registered
118123
/// delete path was not found in any manifest entry.
@@ -140,7 +145,7 @@ class ICEBERG_EXPORT ManifestFilterManager {
140145
///
141146
/// \param sequence_number the inclusive lower bound; delete files older than
142147
/// this value are dropped
143-
void DropDeleteFilesOlderThan(int64_t sequence_number);
148+
Status DropDeleteFilesOlderThan(int64_t sequence_number);
144149

145150
/// \brief Register data files that have been removed so their dangling DVs
146151
/// can be cleaned up.
@@ -189,7 +194,14 @@ class ICEBERG_EXPORT ManifestFilterManager {
189194
const std::vector<const ManifestFile*>& manifests,
190195
const ManifestWriterFactory& writer_factory);
191196

197+
/// \brief Delete cached filtered manifests that were not committed and roll back
198+
/// replaced-manifest accounting.
199+
Status CleanUncommitted(const std::unordered_set<std::string>& committed);
200+
192201
private:
202+
ManifestFilterManager(ManifestContent content, std::shared_ptr<FileIO> file_io,
203+
std::function<Status(const std::string&)> delete_file);
204+
193205
/// \brief Returns true if the manifest might contain files matching any expression.
194206
Result<bool> CanContainExpressionDeletes(const ManifestFile& manifest,
195207
const std::shared_ptr<Schema>& schema,
@@ -214,31 +226,16 @@ class ICEBERG_EXPORT ManifestFilterManager {
214226
bool CanTrustManifestReferences(
215227
const std::vector<const ManifestFile*>& manifests) const;
216228

217-
struct DeleteFileKey {
218-
std::string path;
219-
std::optional<int64_t> content_offset;
220-
std::optional<int64_t> content_size_in_bytes;
221-
222-
bool operator==(const DeleteFileKey& other) const = default;
223-
};
224-
225-
struct DeleteFileKeyHash {
226-
size_t operator()(const DeleteFileKey& key) const;
229+
struct FilteredManifestDeletes {
230+
std::vector<std::shared_ptr<DataFile>> files;
231+
int32_t duplicate_deletes_count = 0;
227232
};
228233

229-
struct FoundDeletes {
230-
std::unordered_set<std::string> paths;
231-
std::unordered_set<DeleteFileKey, DeleteFileKeyHash> files;
232-
};
233-
234-
static DeleteFileKey MakeDeleteFileKey(const DataFile& file);
235-
236234
Result<ManifestFile> FilterManifest(const std::shared_ptr<Schema>& schema,
237235
const PartitionSpecsById& specs_by_id,
238236
const ManifestFile& manifest,
239237
bool trust_manifest_references,
240-
const ManifestWriterFactory& writer_factory,
241-
FoundDeletes& found_deletes);
238+
const ManifestWriterFactory& writer_factory);
242239

243240
Result<bool> ManifestHasDeletedFiles(const std::vector<ManifestEntry>& entries,
244241
const std::shared_ptr<Schema>& schema,
@@ -248,9 +245,12 @@ class ICEBERG_EXPORT ManifestFilterManager {
248245
Result<ManifestFile> FilterManifestWithDeletedFiles(
249246
const std::vector<ManifestEntry>& entries, int32_t manifest_spec_id,
250247
const std::shared_ptr<Schema>& schema, const PartitionSpecsById& specs_by_id,
251-
const ManifestWriterFactory& writer_factory, FoundDeletes& found_deletes);
248+
const ManifestWriterFactory& writer_factory);
252249

253-
Status ValidateRequiredDeletes(const FoundDeletes& found_deletes) const;
250+
Status ValidateRequiredDeletes() const;
251+
252+
Status InvalidateFilteredCache();
253+
void ResetDeletedFiles();
254254

255255
/// \brief Get or create a ManifestEvaluator for the given spec.
256256
Result<ManifestEvaluator*> GetManifestEvaluator(const std::shared_ptr<Schema>& schema,
@@ -270,26 +270,40 @@ class ICEBERG_EXPORT ManifestFilterManager {
270270

271271
const ManifestContent manifest_content_;
272272
std::shared_ptr<FileIO> file_io_;
273+
std::function<Status(const std::string&)> delete_file_;
273274

274275
std::shared_ptr<Expression> delete_expr_;
275276
std::unordered_set<std::string> delete_paths_;
276-
std::unordered_set<DeleteFileKey, DeleteFileKeyHash> delete_file_keys_;
277-
DataFileSet delete_files_;
277+
// Delete files explicitly registered for deletion by object identity.
278+
DeleteFileSet delete_files_to_delete_;
279+
// Data files explicitly registered for deletion by object identity.
280+
DataFileSet data_files_to_delete_;
281+
// Data files to remove: explicit object deletes plus files found while filtering.
282+
DataFileSet data_files_;
283+
// Delete files to remove: explicit object deletes plus files found while filtering.
284+
DeleteFileSet delete_files_;
285+
std::unordered_map<ManifestFile, FilteredManifestDeletes>
286+
filtered_manifest_to_deleted_files_;
287+
// Ordered files deleted by the latest filter pass, used for summaries.
278288
std::vector<std::shared_ptr<DataFile>> deleted_files_;
279-
std::unordered_set<DeleteFileKey, DeleteFileKeyHash> deleted_file_keys_;
289+
// Data-file identity set for latest-pass dedup and required-delete validation.
290+
DataFileSet deleted_data_file_set_;
291+
// Delete-file identity set for latest-pass dedup and required-delete validation.
292+
DeleteFileSet deleted_delete_file_set_;
280293
PartitionSet drop_partitions_;
281294
bool fail_missing_delete_paths_{false};
282295
bool fail_any_delete_{false};
283296
bool case_sensitive_{true};
284297
int32_t duplicate_deletes_count_{0};
285-
286298
int32_t replaced_manifests_count_{0};
287299

288300
// minimum data sequence number; delete entries older than this are dropped
289301
int64_t min_sequence_number_{0};
290302
// paths of data files that were removed; DVs referencing these are dangling
291303
std::unordered_set<std::string> removed_data_file_paths_;
292304

305+
std::unordered_map<ManifestFile, ManifestFile> filtered_manifests_;
306+
293307
std::unordered_map<int32_t, std::unique_ptr<ManifestEvaluator>>
294308
manifest_evaluator_cache_;
295309
std::unordered_map<int32_t, std::unique_ptr<ResidualEvaluator>>

0 commit comments

Comments
 (0)