Skip to content

Commit 87d721e

Browse files
committed
refactor: change DataFile.partition_spec_id to be optional
1 parent 80f7408 commit 87d721e

File tree

4 files changed

+38
-21
lines changed

4 files changed

+38
-21
lines changed

src/iceberg/delete_file_index.cc

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ Result<bool> CanContainEqDeletesForFile(const DataFile& data_file,
168168

169169
Status PositionDeletes::Add(ManifestEntry&& entry) {
170170
ICEBERG_PRECHECK(entry.sequence_number.has_value(),
171-
"Missing sequence number for position delete: {}",
171+
"Missing sequence number from position delete: {}",
172172
entry.data_file->file_path);
173173
files_.emplace_back(std::move(entry));
174174
indexed_ = false;
@@ -213,7 +213,7 @@ void PositionDeletes::IndexIfNeeded() {
213213

214214
Status EqualityDeletes::Add(ManifestEntry&& entry) {
215215
ICEBERG_PRECHECK(entry.sequence_number.has_value(),
216-
"Missing sequence number for equality delete: {}",
216+
"Missing sequence number from equality delete: {}",
217217
entry.data_file->file_path);
218218
files_.emplace_back(&schema_, std::move(entry));
219219
indexed_ = false;
@@ -343,7 +343,7 @@ Result<std::vector<std::shared_ptr<DataFile>>> DeleteFileIndex::ForEntry(
343343
const ManifestEntry& entry) const {
344344
ICEBERG_PRECHECK(entry.data_file != nullptr, "Manifest entry has null data file");
345345
ICEBERG_PRECHECK(entry.sequence_number.has_value(),
346-
"Missing sequence number for data file: {}",
346+
"Missing sequence number from data file: {}",
347347
entry.data_file->file_path);
348348
return ForDataFile(entry.sequence_number.value(), *entry.data_file);
349349
}
@@ -396,8 +396,11 @@ Result<std::vector<std::shared_ptr<DataFile>>> DeleteFileIndex::FindEqPartitionD
396396
return {};
397397
}
398398

399-
auto deletes =
400-
eq_deletes_by_partition_->get(data_file.partition_spec_id, data_file.partition);
399+
ICEBERG_PRECHECK(data_file.partition_spec_id.has_value(),
400+
"Missing partition spec id from data file {}", data_file.file_path);
401+
402+
auto deletes = eq_deletes_by_partition_->get(data_file.partition_spec_id.value(),
403+
data_file.partition);
401404
if (!deletes.has_value()) {
402405
return {};
403406
}
@@ -410,8 +413,11 @@ Result<std::vector<std::shared_ptr<DataFile>>> DeleteFileIndex::FindPosPartition
410413
return {};
411414
}
412415

413-
auto deletes =
414-
pos_deletes_by_partition_->get(data_file.partition_spec_id, data_file.partition);
416+
ICEBERG_PRECHECK(data_file.partition_spec_id.has_value(),
417+
"Missing partition spec id from data file {}", data_file.file_path);
418+
419+
auto deletes = pos_deletes_by_partition_->get(data_file.partition_spec_id.value(),
420+
data_file.partition);
415421
if (!deletes.has_value()) {
416422
return {};
417423
}
@@ -606,7 +612,7 @@ Result<std::vector<ManifestEntry>> DeleteFileIndex::Builder::LoadDeleteFiles() {
606612
for (auto& entry : entries) {
607613
ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a data file");
608614
ICEBERG_CHECK(entry.sequence_number.has_value(),
609-
"Missing sequence number for delete file: {}",
615+
"Missing sequence number from delete file: {}",
610616
entry.data_file->file_path);
611617
if (entry.sequence_number.value() > min_sequence_number_) {
612618
auto& file = *entry.data_file;
@@ -628,8 +634,8 @@ Result<std::vector<ManifestEntry>> DeleteFileIndex::Builder::LoadDeleteFiles() {
628634
Status DeleteFileIndex::Builder::AddDV(
629635
std::unordered_map<std::string, ManifestEntry>& dv_by_path, ManifestEntry&& entry) {
630636
ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data file");
631-
ICEBERG_PRECHECK(entry.sequence_number.has_value(), "Missing sequence number for DV {}",
632-
entry.data_file->file_path);
637+
ICEBERG_PRECHECK(entry.sequence_number.has_value(),
638+
"Missing sequence number from DV {}", entry.data_file->file_path);
633639

634640
const auto& path = entry.data_file->referenced_data_file;
635641
ICEBERG_PRECHECK(path.has_value(), "DV must have a referenced data file");
@@ -649,7 +655,7 @@ Status DeleteFileIndex::Builder::AddPositionDelete(
649655
ManifestEntry&& entry) {
650656
ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data file");
651657
ICEBERG_PRECHECK(entry.sequence_number.has_value(),
652-
"Missing sequence number for position delete {}",
658+
"Missing sequence number from position delete {}",
653659
entry.data_file->file_path);
654660

655661
ICEBERG_ASSIGN_OR_RAISE(auto referenced_path,
@@ -664,7 +670,10 @@ Status DeleteFileIndex::Builder::AddPositionDelete(
664670
ICEBERG_RETURN_UNEXPECTED(deletes->Add(std::move(entry)));
665671
} else {
666672
// Partition-scoped position delete
667-
int32_t spec_id = entry.data_file->partition_spec_id;
673+
ICEBERG_PRECHECK(entry.data_file->partition_spec_id.has_value(),
674+
"Missing partition spec id from position delete {}",
675+
entry.data_file->file_path);
676+
int32_t spec_id = entry.data_file->partition_spec_id.value();
668677
const auto& partition = entry.data_file->partition;
669678

670679
auto existing = deletes_by_partition.get(spec_id, partition);
@@ -686,10 +695,13 @@ Status DeleteFileIndex::Builder::AddEqualityDelete(
686695
ManifestEntry&& entry) {
687696
ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data file");
688697
ICEBERG_PRECHECK(entry.sequence_number.has_value(),
689-
"Missing sequence number for equality delete {}",
698+
"Missing sequence number from equality delete {}",
699+
entry.data_file->file_path);
700+
ICEBERG_PRECHECK(entry.data_file->partition_spec_id.has_value(),
701+
"Missing partition spec id from equality delete {}",
690702
entry.data_file->file_path);
691703

692-
int32_t spec_id = entry.data_file->partition_spec_id;
704+
int32_t spec_id = entry.data_file->partition_spec_id.value();
693705

694706
auto spec_it = specs_by_id_.find(spec_id);
695707
if (spec_it == specs_by_id_.end()) {

src/iceberg/manifest/manifest_entry.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,6 @@ struct ICEBERG_EXPORT DataFile {
147147
/// order, and should set sort order id to null. Readers must ignore sort order id for
148148
/// position delete files.
149149
std::optional<int32_t> sort_order_id;
150-
/// This field is not included in spec, so it is not serialized into the manifest file.
151-
/// It is just store in memory representation used in process.
152-
int32_t partition_spec_id = PartitionSpec::kInitialSpecId;
153150
/// Field id: 142
154151
/// The _row_id for the first row in the data file.
155152
///
@@ -178,6 +175,11 @@ struct ICEBERG_EXPORT DataFile {
178175
/// present
179176
std::optional<int64_t> content_size_in_bytes;
180177

178+
/// \brief Partition spec id for this data file.
179+
/// \note This field is for internal use only and will not be persisted to manifest
180+
/// entry.
181+
std::optional<int32_t> partition_spec_id;
182+
181183
static constexpr int32_t kContentFieldId = 134;
182184
inline static const SchemaField kContent = SchemaField::MakeOptional(
183185
kContentFieldId, "content", int32(),

src/iceberg/manifest/manifest_reader.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -814,11 +814,13 @@ Result<InclusiveMetricsEvaluator*> ManifestReaderImpl::GetMetricsEvaluator() {
814814
return metrics_evaluator_.get();
815815
}
816816

817-
bool ManifestReaderImpl::InPartitionSet(const DataFile& file) const {
817+
Result<bool> ManifestReaderImpl::InPartitionSet(const DataFile& file) const {
818818
if (!partition_set_) {
819819
return true;
820820
}
821-
return partition_set_->contains(file.partition_spec_id, file.partition);
821+
ICEBERG_PRECHECK(file.partition_spec_id.has_value(),
822+
"Missing partition spec id from data file {}", file.file_path);
823+
return partition_set_->contains(file.partition_spec_id.value(), file.partition);
822824
}
823825

824826
Status ManifestReaderImpl::OpenReader(std::shared_ptr<Schema> projection) {
@@ -943,7 +945,8 @@ Result<std::vector<ManifestEntry>> ManifestReaderImpl::ReadEntries(bool only_liv
943945
continue;
944946
}
945947
}
946-
if (!InPartitionSet(*entry.data_file)) {
948+
ICEBERG_ASSIGN_OR_RAISE(bool in_partition_set, InPartitionSet(*entry.data_file));
949+
if (!in_partition_set) {
947950
continue;
948951
}
949952
}

src/iceberg/manifest/manifest_reader_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ class ManifestReaderImpl : public ManifestReader {
9696
Result<InclusiveMetricsEvaluator*> GetMetricsEvaluator();
9797

9898
/// \brief Check if a partition is in the partition set.
99-
bool InPartitionSet(const DataFile& file) const;
99+
Result<bool> InPartitionSet(const DataFile& file) const;
100100

101101
// Fields set at construction
102102
const std::string manifest_path_;

0 commit comments

Comments
 (0)