Skip to content

Commit 7aad062

Browse files
committed
feat: implement manifest group
1 parent ff8eea9 commit 7aad062

20 files changed

Lines changed: 1169 additions & 119 deletions

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ set(ICEBERG_SOURCES
4141
json_internal.cc
4242
manifest/manifest_adapter.cc
4343
manifest/manifest_entry.cc
44+
manifest/manifest_group.cc
4445
manifest/manifest_list.cc
4546
manifest/manifest_reader.cc
4647
manifest/manifest_writer.cc

src/iceberg/delete_file_index.cc

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -342,10 +342,10 @@ std::vector<std::shared_ptr<DataFile>> DeleteFileIndex::ReferencedDeleteFiles()
342342
Result<std::vector<std::shared_ptr<DataFile>>> DeleteFileIndex::ForEntry(
343343
const ManifestEntry& entry) const {
344344
ICEBERG_PRECHECK(entry.data_file != nullptr, "Manifest entry has null data file");
345-
ICEBERG_PRECHECK(entry.sequence_number.has_value(),
346-
"Missing sequence number for data file: {}",
347-
entry.data_file->file_path);
348-
return ForDataFile(entry.sequence_number.value(), *entry.data_file);
345+
ICEBERG_PRECHECK(
346+
entry.sequence_number.has_value() || entry.status != ManifestStatus::kAdded,
347+
"Missing sequence number for data file: {}", entry.data_file->file_path);
348+
return ForDataFile(entry.sequence_number.value_or(0), *entry.data_file);
349349
}
350350

351351
Result<std::vector<std::shared_ptr<DataFile>>> DeleteFileIndex::ForDataFile(
@@ -453,34 +453,32 @@ Result<std::shared_ptr<DataFile>> DeleteFileIndex::FindDV(
453453
}
454454

455455
Result<DeleteFileIndex::Builder> DeleteFileIndex::BuilderFor(
456-
std::shared_ptr<FileIO> io, std::vector<ManifestFile> delete_manifests) {
456+
std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
457+
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
458+
std::vector<ManifestFile> delete_manifests) {
457459
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
458-
return Builder(std::move(io), std::move(delete_manifests));
460+
ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
461+
ICEBERG_PRECHECK(!specs_by_id.empty(), "Partition specs cannot be empty");
462+
return Builder(std::move(io), std::move(schema), std::move(specs_by_id),
463+
std::move(delete_manifests));
459464
}
460465

461466
// Builder implementation
462467

463-
DeleteFileIndex::Builder::Builder(std::shared_ptr<FileIO> io,
464-
std::vector<ManifestFile> delete_manifests)
465-
: io_(std::move(io)), delete_manifests_(std::move(delete_manifests)) {}
468+
DeleteFileIndex::Builder::Builder(
469+
std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
470+
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
471+
std::vector<ManifestFile> delete_manifests)
472+
: io_(std::move(io)),
473+
schema_(std::move(schema)),
474+
specs_by_id_(std::move(specs_by_id)),
475+
delete_manifests_(std::move(delete_manifests)) {}
466476

467477
DeleteFileIndex::Builder::~Builder() = default;
468478
DeleteFileIndex::Builder::Builder(Builder&&) noexcept = default;
469479
DeleteFileIndex::Builder& DeleteFileIndex::Builder::operator=(Builder&&) noexcept =
470480
default;
471481

472-
DeleteFileIndex::Builder& DeleteFileIndex::Builder::SpecsById(
473-
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id) {
474-
specs_by_id_ = std::move(specs_by_id);
475-
return *this;
476-
}
477-
478-
DeleteFileIndex::Builder& DeleteFileIndex::Builder::WithSchema(
479-
std::shared_ptr<Schema> schema) {
480-
schema_ = std::move(schema);
481-
return *this;
482-
}
483-
484482
DeleteFileIndex::Builder& DeleteFileIndex::Builder::AfterSequenceNumber(int64_t seq) {
485483
min_sequence_number_ = seq;
486484
return *this;
@@ -721,10 +719,6 @@ Status DeleteFileIndex::Builder::AddEqualityDelete(
721719

722720
Result<std::unique_ptr<DeleteFileIndex>> DeleteFileIndex::Builder::Build() {
723721
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
724-
ICEBERG_PRECHECK(io_ != nullptr, "FileIO is required to load delete files");
725-
ICEBERG_PRECHECK(schema_ != nullptr, "Schema is required to load delete files");
726-
ICEBERG_PRECHECK(!specs_by_id_.empty(),
727-
"Partition specs are required to load delete files");
728722

729723
std::vector<ManifestEntry> entries;
730724
if (!delete_manifests_.empty()) {

src/iceberg/delete_file_index.h

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -268,10 +268,14 @@ class ICEBERG_EXPORT DeleteFileIndex {
268268
/// \brief Create a builder for constructing a DeleteFileIndex from manifest files.
269269
///
270270
/// \param io The FileIO to use for reading manifests
271+
/// \param schema Current table schema
272+
/// \param specs_by_id Partition specs by their IDs
271273
/// \param delete_manifests The delete manifests to index
272274
/// \return A Builder instance
273-
static Result<Builder> BuilderFor(std::shared_ptr<FileIO> io,
274-
std::vector<ManifestFile> delete_manifests);
275+
static Result<Builder> BuilderFor(
276+
std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
277+
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
278+
std::vector<ManifestFile> delete_manifests);
275279

276280
private:
277281
friend class Builder;
@@ -318,24 +322,17 @@ class ICEBERG_EXPORT DeleteFileIndex {
318322
class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector {
319323
public:
320324
/// \brief Construct a builder from manifest files.
321-
Builder(std::shared_ptr<FileIO> io, std::vector<ManifestFile> delete_manifests);
325+
Builder(std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
326+
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
327+
std::vector<ManifestFile> delete_manifests);
322328

323-
~Builder() override;
329+
~Builder();
324330

325331
Builder(Builder&&) noexcept;
326332
Builder& operator=(Builder&&) noexcept;
327333
Builder(const Builder&) = delete;
328334
Builder& operator=(const Builder&) = delete;
329335

330-
/// \brief Set the partition specs by ID.
331-
Builder& SpecsById(
332-
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id);
333-
334-
/// \brief Set the table schema.
335-
///
336-
/// Required for filtering and expression evaluation.
337-
Builder& WithSchema(std::shared_ptr<Schema> schema);
338-
339336
/// \brief Set the minimum sequence number for delete files.
340337
///
341338
/// Only delete files with sequence number > min_sequence_number will be included.
@@ -384,10 +381,10 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector {
384381
ManifestEntry&& entry);
385382

386383
std::shared_ptr<FileIO> io_;
384+
std::shared_ptr<Schema> schema_;
385+
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id_;
387386
std::vector<ManifestFile> delete_manifests_;
388387
int64_t min_sequence_number_ = 0;
389-
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id_;
390-
std::shared_ptr<Schema> schema_;
391388
std::shared_ptr<Expression> data_filter_;
392389
std::shared_ptr<Expression> partition_filter_;
393390
std::shared_ptr<PartitionSet> partition_set_;

0 commit comments

Comments
 (0)