Skip to content

Commit e868407

Browse files
committed
polish code to align with Java parity
1 parent 1001097 commit e868407

22 files changed

Lines changed: 2670 additions & 1058 deletions

src/iceberg/manifest/manifest_filter_manager.cc

Lines changed: 193 additions & 77 deletions
Large diffs are not rendered by default.

src/iceberg/manifest/manifest_filter_manager.h

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
/// or EXISTING based on row-filter expressions, exact path deletes, and partition drops.
2525

2626
#include <cstdint>
27+
#include <functional>
2728
#include <memory>
28-
#include <optional>
2929
#include <string>
3030
#include <unordered_map>
3131
#include <unordered_set>
@@ -35,6 +35,7 @@
3535
#include "iceberg/manifest/manifest_list.h"
3636
#include "iceberg/manifest/manifest_writer.h"
3737
#include "iceberg/result.h"
38+
#include "iceberg/snapshot.h"
3839
#include "iceberg/type_fwd.h"
3940
#include "iceberg/util/data_file_set.h"
4041
#include "iceberg/util/partition_value_util.h"
@@ -48,9 +49,6 @@ namespace iceberg {
4849
/// entries are returned unchanged (no I/O). Manifests that do contain deleted
4950
/// entries are rewritten with those entries marked DELETED.
5051
///
51-
/// The manager is content-agnostic: pass ManifestContent::kData to process data
52-
/// manifests, or ManifestContent::kDeletes to process delete manifests.
53-
///
5452
/// TODO(Guotao): For ManifestContent::kDeletes, implement cleanup for orphan delete files
5553
/// and dangling deletion vectors.
5654
///
@@ -59,7 +57,9 @@ class ICEBERG_EXPORT ManifestFilterManager {
5957
public:
6058
using PartitionSpecsById = std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>;
6159

62-
ManifestFilterManager(ManifestContent content, std::shared_ptr<FileIO> file_io);
60+
static Result<std::unique_ptr<ManifestFilterManager>> Make(
61+
ManifestContent content, std::shared_ptr<FileIO> file_io,
62+
std::function<Status(const std::string&)> delete_file = {});
6363
~ManifestFilterManager();
6464

6565
ManifestFilterManager(const ManifestFilterManager&) = delete;
@@ -93,9 +93,8 @@ class ICEBERG_EXPORT ManifestFilterManager {
9393

9494
/// \brief Returns the set of file objects marked for deletion by this manager.
9595
///
96-
/// This is populated by the most recent FilterManifests() call and contains only
97-
/// files that were actually deleted from filtered manifests. Used by higher-level
98-
/// operations (e.g. RowDelta) to enumerate deleted data files for follow-up cleanup.
96+
/// Includes file objects explicitly registered for deletion plus files deleted while
97+
/// filtering manifests.
9998
const DataFileSet& FilesToBeDeleted() const;
10099

101100
/// \brief Returns content-file objects deleted by the most recent
@@ -106,6 +105,12 @@ class ICEBERG_EXPORT ManifestFilterManager {
106105
/// FilterManifests() call.
107106
int32_t DuplicateDeletesCount() const { return duplicate_deletes_count_; }
108107

108+
/// \brief Build a snapshot-summary fragment from filtered manifests.
109+
///
110+
Result<SnapshotSummaryBuilder> BuildSummary(
111+
const std::vector<ManifestFile>& manifests,
112+
const PartitionSpecsById& specs_by_id) const;
113+
109114
/// \brief Register a partition for dropping.
110115
///
111116
/// Any manifest entry whose (spec_id, partition) pair matches will be marked DELETED.
@@ -140,7 +145,7 @@ class ICEBERG_EXPORT ManifestFilterManager {
140145
///
141146
/// \param sequence_number the inclusive lower bound; delete files older than
142147
/// this value are dropped
143-
void DropDeleteFilesOlderThan(int64_t sequence_number);
148+
Status DropDeleteFilesOlderThan(int64_t sequence_number);
144149

145150
/// \brief Register data files that have been removed so their dangling DVs
146151
/// can be cleaned up.
@@ -189,7 +194,14 @@ class ICEBERG_EXPORT ManifestFilterManager {
189194
const std::vector<const ManifestFile*>& manifests,
190195
const ManifestWriterFactory& writer_factory);
191196

197+
/// \brief Delete cached filtered manifests that were not committed and roll back
198+
/// replaced-manifest accounting.
199+
Status CleanUncommitted(const std::unordered_set<std::string>& committed);
200+
192201
private:
202+
ManifestFilterManager(ManifestContent content, std::shared_ptr<FileIO> file_io,
203+
std::function<Status(const std::string&)> delete_file);
204+
193205
/// \brief Returns true if the manifest might contain files matching any expression.
194206
Result<bool> CanContainExpressionDeletes(const ManifestFile& manifest,
195207
const std::shared_ptr<Schema>& schema,
@@ -214,24 +226,15 @@ class ICEBERG_EXPORT ManifestFilterManager {
214226
bool CanTrustManifestReferences(
215227
const std::vector<const ManifestFile*>& manifests) const;
216228

217-
struct DeleteFileKey {
218-
std::string path;
219-
std::optional<int64_t> content_offset;
220-
std::optional<int64_t> content_size_in_bytes;
221-
222-
bool operator==(const DeleteFileKey& other) const = default;
223-
};
224-
225-
struct DeleteFileKeyHash {
226-
size_t operator()(const DeleteFileKey& key) const;
227-
};
228-
229229
struct FoundDeletes {
230230
std::unordered_set<std::string> paths;
231-
std::unordered_set<DeleteFileKey, DeleteFileKeyHash> files;
231+
DeleteFileSet files;
232232
};
233233

234-
static DeleteFileKey MakeDeleteFileKey(const DataFile& file);
234+
struct FilteredManifestDeletes {
235+
std::vector<std::shared_ptr<DataFile>> files;
236+
int32_t duplicate_deletes_count = 0;
237+
};
235238

236239
Result<ManifestFile> FilterManifest(const std::shared_ptr<Schema>& schema,
237240
const PartitionSpecsById& specs_by_id,
@@ -252,6 +255,11 @@ class ICEBERG_EXPORT ManifestFilterManager {
252255

253256
Status ValidateRequiredDeletes(const FoundDeletes& found_deletes) const;
254257

258+
void AddFoundDelete(FoundDeletes& found_deletes,
259+
const std::shared_ptr<DataFile>& file) const;
260+
Status InvalidateFilteredCache();
261+
void ResetDeletedFiles();
262+
255263
/// \brief Get or create a ManifestEvaluator for the given spec.
256264
Result<ManifestEvaluator*> GetManifestEvaluator(const std::shared_ptr<Schema>& schema,
257265
const PartitionSpecsById& specs_by_id,
@@ -270,26 +278,30 @@ class ICEBERG_EXPORT ManifestFilterManager {
270278

271279
const ManifestContent manifest_content_;
272280
std::shared_ptr<FileIO> file_io_;
281+
std::function<Status(const std::string&)> delete_file_;
273282

274283
std::shared_ptr<Expression> delete_expr_;
275284
std::unordered_set<std::string> delete_paths_;
276-
std::unordered_set<DeleteFileKey, DeleteFileKeyHash> delete_file_keys_;
285+
DeleteFileSet delete_files_to_delete_;
277286
DataFileSet delete_files_;
287+
std::unordered_map<ManifestFile, FilteredManifestDeletes>
288+
filtered_manifest_to_deleted_files_;
278289
std::vector<std::shared_ptr<DataFile>> deleted_files_;
279-
std::unordered_set<DeleteFileKey, DeleteFileKeyHash> deleted_file_keys_;
290+
DeleteFileSet deleted_file_set_;
280291
PartitionSet drop_partitions_;
281292
bool fail_missing_delete_paths_{false};
282293
bool fail_any_delete_{false};
283294
bool case_sensitive_{true};
284295
int32_t duplicate_deletes_count_{0};
285-
286296
int32_t replaced_manifests_count_{0};
287297

288298
// minimum data sequence number; delete entries older than this are dropped
289299
int64_t min_sequence_number_{0};
290300
// paths of data files that were removed; DVs referencing these are dangling
291301
std::unordered_set<std::string> removed_data_file_paths_;
292302

303+
std::unordered_map<ManifestFile, ManifestFile> filtered_manifests_;
304+
293305
std::unordered_map<int32_t, std::unique_ptr<ManifestEvaluator>>
294306
manifest_evaluator_cache_;
295307
std::unordered_map<int32_t, std::unique_ptr<ResidualEvaluator>>

0 commit comments

Comments
 (0)