Skip to content

Commit 2e2f49c

Browse files
committed
various fixes:
- unify partition summary collection - fix manifest reader to inherit metadata - fix first row id round trip
1 parent 77c86cf commit 2e2f49c

16 files changed

Lines changed: 159 additions & 155 deletions

src/iceberg/manifest_adapter.cc

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
#include "iceberg/arrow/nanoarrow_status_internal.h"
2828
#include "iceberg/manifest_entry.h"
2929
#include "iceberg/manifest_list.h"
30-
#include "iceberg/partition_summary_internal.h"
3130
#include "iceberg/result.h"
3231
#include "iceberg/schema.h"
3332
#include "iceberg/util/checked_cast.h"
@@ -321,16 +320,13 @@ Status ManifestEntryAdapter::AppendDataFile(
321320
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
322321
}
323322
break;
324-
case 142: {
325-
// first_row_id (optional int64)
326-
ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, GetFirstRowId(file));
327-
if (first_row_id.has_value()) {
328-
ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, first_row_id.value()));
323+
case 142: // first_row_id (optional int64)
324+
if (file.first_row_id.has_value()) {
325+
ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, file.first_row_id.value()));
329326
} else {
330327
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
331328
}
332329
break;
333-
}
334330
case 143: {
335331
// referenced_data_file (optional string)
336332
ICEBERG_ASSIGN_OR_RAISE(auto referenced_data_file, GetReferenceDataFile(file));
@@ -396,8 +392,6 @@ Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
396392
return InvalidManifest("Missing required data_file field from manifest entry.");
397393
}
398394

399-
ICEBERG_RETURN_UNEXPECTED(partition_summary_->Update(entry.data_file->partition));
400-
401395
const auto& fields = manifest_schema_->fields();
402396
for (size_t i = 0; i < fields.size(); i++) {
403397
const auto& field = fields[i];

src/iceberg/manifest_adapter.h

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,7 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
7676

7777
const std::shared_ptr<PartitionSpec>& partition_spec() const { return partition_spec_; }
7878

79-
const std::unique_ptr<PartitionSummary>& partition_summary() const {
80-
return partition_summary_;
81-
}
82-
83-
/// \brief Create a ManifestFile object without setting file metadata, such as
84-
/// location, file size, key metadata, etc.
85-
Result<ManifestFile> ToManifestFile() const;
79+
const std::shared_ptr<StructType>& partition_type() const { return partition_type_; }
8680

8781
protected:
8882
Status AppendInternal(const ManifestEntry& entry);
@@ -106,9 +100,9 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
106100
std::optional<int64_t> snapshot_id_;
107101
std::shared_ptr<PartitionSpec> partition_spec_;
108102
std::shared_ptr<Schema> current_schema_;
103+
std::shared_ptr<StructType> partition_type_;
109104
std::shared_ptr<Schema> manifest_schema_;
110105
const ManifestContent content_;
111-
std::unique_ptr<PartitionSummary> partition_summary_;
112106
};
113107

114108
/// \brief Adapter for appending a list of `ManifestFile`s to an `ArrowArray`.

src/iceberg/manifest_entry.cc

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,37 +22,11 @@
2222
#include <memory>
2323
#include <vector>
2424

25-
#include "iceberg/schema.h"
2625
#include "iceberg/schema_field.h"
2726
#include "iceberg/type.h"
2827

2928
namespace iceberg {
3029

31-
std::shared_ptr<DataFile> DataFile::Clone() const {
32-
auto copy = std::make_shared<DataFile>();
33-
copy->content = content;
34-
copy->file_path = file_path;
35-
copy->file_format = file_format;
36-
copy->partition = partition;
37-
copy->record_count = record_count;
38-
copy->file_size_in_bytes = file_size_in_bytes;
39-
copy->column_sizes = column_sizes;
40-
copy->value_counts = value_counts;
41-
copy->null_value_counts = null_value_counts;
42-
copy->nan_value_counts = nan_value_counts;
43-
copy->lower_bounds = lower_bounds;
44-
copy->upper_bounds = upper_bounds;
45-
copy->key_metadata = key_metadata;
46-
copy->split_offsets = split_offsets;
47-
copy->equality_ids = equality_ids;
48-
copy->sort_order_id = sort_order_id;
49-
copy->first_row_id = first_row_id;
50-
copy->referenced_data_file = referenced_data_file;
51-
copy->content_offset = content_offset;
52-
copy->content_size_in_bytes = content_size_in_bytes;
53-
return copy;
54-
}
55-
5630
bool ManifestEntry::operator==(const ManifestEntry& other) const {
5731
return status == other.status && snapshot_id == other.snapshot_id &&
5832
sequence_number == other.sequence_number &&

src/iceberg/manifest_entry.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,7 @@ struct ICEBERG_EXPORT DataFile {
268268

269269
bool operator==(const DataFile& other) const = default;
270270

271-
std::shared_ptr<DataFile> Clone() const;
272-
271+
/// \brief Get the schema of the data file with the given partition type.
273272
static std::shared_ptr<StructType> Type(std::shared_ptr<StructType> partition_type);
274273
};
275274

@@ -316,6 +315,10 @@ struct ICEBERG_EXPORT ManifestEntry {
316315
ManifestEntry AsAdded() const {
317316
ManifestEntry copy = *this;
318317
copy.status = ManifestStatus::kAdded;
318+
if (copy.data_file->first_row_id.has_value()) {
319+
copy.data_file = std::make_unique<DataFile>(*copy.data_file);
320+
copy.data_file->first_row_id = std::nullopt;
321+
}
319322
return copy;
320323
}
321324

src/iceberg/manifest_reader.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
4848
InheritableMetadataFactory::FromManifest(manifest));
4949

5050
return std::make_unique<ManifestReaderImpl>(std::move(reader), std::move(schema),
51-
std::move(inheritable_metadata));
51+
std::move(inheritable_metadata),
52+
manifest.first_row_id);
5253
}
5354

5455
Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
@@ -66,7 +67,8 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
6667
.projection = schema}));
6768
ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata, InheritableMetadataFactory::Empty());
6869
return std::make_unique<ManifestReaderImpl>(std::move(reader), std::move(schema),
69-
std::move(inheritable_metadata));
70+
std::move(inheritable_metadata),
71+
std::nullopt);
7072
}
7173

7274
Result<std::unique_ptr<ManifestListReader>> ManifestListReader::Make(

src/iceberg/manifest_reader_internal.cc

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ Status ParseLiteral(ArrowArrayView* view_of_partition, int64_t row_idx,
328328
}
329329

330330
Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
331-
ArrowArrayView* view_of_column,
331+
ArrowArrayView* view_of_column, std::optional<int64_t>& first_row_id,
332332
std::vector<ManifestEntry>& manifest_entries) {
333333
if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
334334
return InvalidManifest("DataFile field should be a struct.");
@@ -432,10 +432,25 @@ Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
432432
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->sort_order_id,
433433
view_of_file_field, int32_t);
434434
break;
435-
case 16:
435+
case 16: {
436436
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->first_row_id,
437437
view_of_file_field, int64_t);
438+
if (first_row_id.has_value()) {
439+
std::ranges::for_each(manifest_entries, [&first_row_id](ManifestEntry& entry) {
440+
if (entry.status != ManifestStatus::kDeleted &&
441+
!entry.data_file->first_row_id.has_value()) {
442+
entry.data_file->first_row_id = first_row_id.value();
443+
first_row_id = first_row_id.value() + entry.data_file->record_count;
444+
}
445+
});
446+
} else {
447+
// data file's first_row_id is null when the manifest's first_row_id is null
448+
std::ranges::for_each(manifest_entries, [](ManifestEntry& entry) {
449+
entry.data_file->first_row_id = std::nullopt;
450+
});
451+
}
438452
break;
453+
}
439454
case 17:
440455
PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->referenced_data_file,
441456
view_of_file_field);
@@ -455,9 +470,9 @@ Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
455470
return {};
456471
}
457472

458-
Result<std::vector<ManifestEntry>> ParseManifestEntry(ArrowSchema* schema,
459-
ArrowArray* array_in,
460-
const Schema& iceberg_schema) {
473+
Result<std::vector<ManifestEntry>> ParseManifestEntry(
474+
ArrowSchema* schema, ArrowArray* array_in, const Schema& iceberg_schema,
475+
std::optional<int64_t>& first_row_id) {
461476
if (schema->n_children != array_in->n_children) {
462477
return InvalidManifest("Columns size not match between schema:{} and array:{}",
463478
schema->n_children, array_in->n_children);
@@ -512,8 +527,8 @@ Result<std::vector<ManifestEntry>> ParseManifestEntry(ArrowSchema* schema,
512527
case 4: {
513528
auto data_file_schema =
514529
internal::checked_pointer_cast<StructType>(field.value()->get().type());
515-
ICEBERG_RETURN_UNEXPECTED(
516-
ParseDataFile(data_file_schema, view_of_column, manifest_entries));
530+
ICEBERG_RETURN_UNEXPECTED(ParseDataFile(data_file_schema, view_of_column,
531+
first_row_id, manifest_entries));
517532
break;
518533
}
519534
default:
@@ -533,7 +548,7 @@ Result<std::vector<ManifestEntry>> ManifestReaderImpl::Entries() const {
533548
internal::ArrowArrayGuard array_guard(&result.value());
534549
ICEBERG_ASSIGN_OR_RAISE(
535550
auto parse_result,
536-
ParseManifestEntry(&arrow_schema, &result.value(), *schema_));
551+
ParseManifestEntry(&arrow_schema, &result.value(), *schema_, first_row_id_));
537552
manifest_entries.insert(manifest_entries.end(),
538553
std::make_move_iterator(parse_result.begin()),
539554
std::make_move_iterator(parse_result.end()));

src/iceberg/manifest_reader_internal.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,12 @@ class ManifestReaderImpl : public ManifestReader {
3333
public:
3434
explicit ManifestReaderImpl(std::unique_ptr<Reader> reader,
3535
std::shared_ptr<Schema> schema,
36-
std::unique_ptr<InheritableMetadata> inheritable_metadata)
36+
std::unique_ptr<InheritableMetadata> inheritable_metadata,
37+
std::optional<int64_t> first_row_id)
3738
: schema_(std::move(schema)),
3839
reader_(std::move(reader)),
39-
inheritable_metadata_(std::move(inheritable_metadata)) {}
40+
inheritable_metadata_(std::move(inheritable_metadata)),
41+
first_row_id_(first_row_id) {}
4042

4143
Result<std::vector<ManifestEntry>> Entries() const override;
4244

@@ -46,6 +48,7 @@ class ManifestReaderImpl : public ManifestReader {
4648
std::shared_ptr<Schema> schema_;
4749
std::unique_ptr<Reader> reader_;
4850
std::unique_ptr<InheritableMetadata> inheritable_metadata_;
51+
mutable std::optional<int64_t> first_row_id_;
4952
};
5053

5154
/// \brief Read manifest files from a manifest list file.

0 commit comments

Comments
 (0)