Skip to content

Commit cbacf3a

Browse files
authored
feat(format): support avro write (#99)
1 parent 00cb53b commit cbacf3a

95 files changed

Lines changed: 1915 additions & 723 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

LICENSE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,8 @@ This product includes code from Apache Iceberg C++.
266266
* Avro direct decoder/encoder:
267267
* src/paimon/format/avro/avro_direct_decoder.cpp
268268
* src/paimon/format/avro/avro_direct_decoder.h
269+
* src/paimon/format/avro/avro_direct_encoder.cpp
270+
* src/paimon/format/avro/avro_direct_encoder.h
269271

270272
Copyright: 2024-2025 The Apache Software Foundation.
271273
Home page: https://iceberg.apache.org/

cmake_modules/DefineOptions.cmake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
9292
define_option_string(PAIMON_CXXFLAGS "Compiler flags to append when compiling Paimon"
9393
"")
9494

95-
define_option(PAIMON_BUILD_STATIC "Build static libraries" OFF)
95+
define_option(PAIMON_BUILD_STATIC "Build static libraries" ON)
9696

9797
define_option(PAIMON_BUILD_SHARED "Build shared libraries" ON)
9898
#----------------------------------------------------------------------

include/paimon/defs.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ struct PAIMON_EXPORT Options {
130130
static const char MANIFEST_TARGET_FILE_SIZE[];
131131

132132
/// "manifest.format" - Specify the message format of manifest files.
133-
/// Default value is orc.
133+
/// Default value is avro.
134134
static const char MANIFEST_FORMAT[];
135135

136136
/// "manifest.compression" - File compression for manifest, default value is zstd.

include/paimon/read_context.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
#include "paimon/result.h"
2828
#include "paimon/type_fwd.h"
2929
#include "paimon/utils/read_ahead_cache.h"
30-
#include "paimon/utils/special_field_ids.h"
3130
#include "paimon/visibility.h"
3231

3332
namespace paimon {
@@ -179,9 +178,9 @@ class PAIMON_EXPORT ReadContextBuilder {
179178
/// @param read_field_ids Vector of field ids to read from the table.
180179
/// @return Reference to this builder for method chaining.
181180
/// @note Currently supports top-level field selection. Future versions may support
182-
/// nested field selection using ArrowSchema for more granular projection,
183-
/// If SetReadFieldIds() call and SetReadSchema() are natually are mutually
184-
/// exclusive. Calling both will ignore the read schema set by SetReadSchema().
181+
/// nested field selection using ArrowSchema for more granular projection.
182+
/// @note SetReadFieldIds() and SetReadSchema() are mutually exclusive.
183+
/// Calling both will ignore the read schema set by SetReadSchema().
185184
ReadContextBuilder& SetReadFieldIds(const std::vector<int32_t>& read_field_ids);
186185

187186
/// Set a configuration options map to set some option entries which are not defined in the

include/paimon/reader/file_batch_reader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class PAIMON_EXPORT FileBatchReader : public BatchReader {
5050
virtual uint64_t GetPreviousBatchFirstRowNumber() const = 0;
5151

5252
/// Get the number of rows in the file.
53-
virtual uint64_t GetNumberOfRows() const = 0;
53+
virtual Result<uint64_t> GetNumberOfRows() const = 0;
5454

5555
/// Get whether or not support read precisely while bitmap pushed down.
5656
virtual bool SupportPreciseBitmapSelection() const = 0;

src/paimon/common/reader/delegating_prefetch_reader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class DelegatingPrefetchReader : public FileBatchReader {
5858
return GetReader()->GetPreviousBatchFirstRowNumber();
5959
}
6060

61-
uint64_t GetNumberOfRows() const override {
61+
Result<uint64_t> GetNumberOfRows() const override {
6262
return GetReader()->GetNumberOfRows();
6363
}
6464

src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,11 @@ Status PrefetchFileBatchReaderImpl::SetReadRanges(
208208
read_ranges_.push_back(read_range);
209209
}
210210
// Note: add a special read range out of file row count, for trigger an EOF access.
211-
read_ranges_.push_back(EofRange());
211+
std::pair<uint64_t, uint64_t> eof_range;
212+
PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange());
213+
read_ranges_.push_back(eof_range);
212214
for (auto& read_ranges : read_ranges_in_group_) {
213-
read_ranges.push_back(EofRange());
215+
read_ranges.push_back(eof_range);
214216
}
215217
return Status::OK();
216218
}
@@ -420,7 +422,9 @@ Status PrefetchFileBatchReaderImpl::HandleReadResult(
420422
}
421423
prefetch_queue->push({read_range, std::move(read_batch_with_bitmap), first_row_number});
422424
} else {
423-
prefetch_queue->push({EofRange(), std::move(read_batch_with_bitmap), first_row_number});
425+
std::pair<uint64_t, uint64_t> eof_range;
426+
PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange());
427+
prefetch_queue->push({eof_range, std::move(read_batch_with_bitmap), first_row_number});
424428
readers_pos_[reader_idx]->store(std::numeric_limits<uint64_t>::max());
425429
}
426430
return Status::OK();
@@ -490,7 +494,8 @@ Result<BatchReader::ReadBatchWithBitmap> PrefetchFileBatchReaderImpl::NextBatchW
490494
}
491495
}
492496
value_count++;
493-
if (IsEofRange(peek_batch->read_range)) {
497+
PAIMON_ASSIGN_OR_RAISE(bool is_eof_range, IsEofRange(peek_batch->read_range));
498+
if (is_eof_range) {
494499
eof_count++;
495500
continue;
496501
}
@@ -550,7 +555,7 @@ uint64_t PrefetchFileBatchReaderImpl::GetPreviousBatchFirstRowNumber() const {
550555
return previous_batch_first_row_num_;
551556
}
552557

553-
uint64_t PrefetchFileBatchReaderImpl::GetNumberOfRows() const {
558+
Result<uint64_t> PrefetchFileBatchReaderImpl::GetNumberOfRows() const {
554559
assert(!readers_.empty());
555560
return readers_[0]->GetNumberOfRows();
556561
}
@@ -569,13 +574,15 @@ Status PrefetchFileBatchReaderImpl::GetReadStatus() const {
569574
std::shared_lock<std::shared_mutex> lock(rw_mutex_);
570575
return read_status_;
571576
}
572-
bool PrefetchFileBatchReaderImpl::IsEofRange(
577+
Result<bool> PrefetchFileBatchReaderImpl::IsEofRange(
573578
const std::pair<uint64_t, uint64_t>& read_range) const {
574-
return read_range.first >= GetNumberOfRows();
579+
PAIMON_ASSIGN_OR_RAISE(uint64_t num_rows, GetNumberOfRows());
580+
return read_range.first >= num_rows;
575581
}
576582

577-
std::pair<uint64_t, uint64_t> PrefetchFileBatchReaderImpl::EofRange() const {
578-
return {GetNumberOfRows(), GetNumberOfRows() + 1};
583+
Result<std::pair<uint64_t, uint64_t>> PrefetchFileBatchReaderImpl::EofRange() const {
584+
PAIMON_ASSIGN_OR_RAISE(uint64_t num_rows, GetNumberOfRows());
585+
return std::make_pair(num_rows, num_rows + 1);
579586
}
580587

581588
void PrefetchFileBatchReaderImpl::Close() {

src/paimon/common/reader/prefetch_file_batch_reader_impl.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
7777

7878
Status SeekToRow(uint64_t row_number) override;
7979
uint64_t GetPreviousBatchFirstRowNumber() const override;
80-
uint64_t GetNumberOfRows() const override;
80+
Result<uint64_t> GetNumberOfRows() const override;
8181
uint64_t GetNextRowToRead() const override;
8282
void Close() override;
8383
Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) override;
@@ -117,7 +117,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
117117
void Workloop();
118118
void SetReadStatus(const Status& status);
119119
Status GetReadStatus() const;
120-
bool IsEofRange(const std::pair<uint64_t, uint64_t>& read_range) const;
120+
Result<bool> IsEofRange(const std::pair<uint64_t, uint64_t>& read_range) const;
121121
Status DoReadBatch(size_t reader_idx);
122122
void ReadBatch(size_t reader_idx);
123123
size_t GetEnabledReaderSize() const;
@@ -128,7 +128,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
128128
static std::vector<std::vector<std::pair<uint64_t, uint64_t>>> DispatchReadRanges(
129129
const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges, size_t reader_count);
130130

131-
std::pair<uint64_t, uint64_t> EofRange() const;
131+
Result<std::pair<uint64_t, uint64_t>> EofRange() const;
132132
std::optional<std::pair<uint64_t, uint64_t>> GetCurrentReadRange(size_t reader_idx) const;
133133
Status EnsureReaderPosition(size_t reader_idx,
134134
const std::pair<uint64_t, uint64_t>& read_range) const;

src/paimon/common/utils/arrow/arrow_utils.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,17 @@ class ArrowUtils {
3939
return std::make_shared<arrow::Schema>(struct_type->fields());
4040
}
4141

42-
static std::vector<int32_t> CreateProjection(
42+
static Result<std::vector<int32_t>> CreateProjection(
4343
const std::shared_ptr<::arrow::Schema>& file_schema,
4444
const arrow::FieldVector& read_fields) {
4545
std::vector<int32_t> target_to_src_mapping;
4646
target_to_src_mapping.reserve(read_fields.size());
4747
for (const auto& field : read_fields) {
4848
auto src_field_idx = file_schema->GetFieldIndex(field->name());
49-
assert(src_field_idx >= 0);
49+
if (src_field_idx < 0) {
50+
return Status::Invalid(
51+
fmt::format("Field '{}' not found or duplicate in file schema", field->name()));
52+
}
5053
target_to_src_mapping.push_back(src_field_idx);
5154
}
5255
return target_to_src_mapping;

src/paimon/common/utils/arrow/arrow_utils_test.cpp

Lines changed: 82 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,93 @@
1818

1919
#include "arrow/api.h"
2020
#include "gtest/gtest.h"
21-
#include "paimon/common/types/data_field.h"
21+
#include "paimon/testing/utils/testharness.h"
2222

2323
namespace paimon::test {
2424

2525
TEST(ArrowUtilsTest, TestCreateProjection) {
26-
std::vector<DataField> read_fields = {DataField(1, arrow::field("k1", arrow::int32())),
27-
DataField(3, arrow::field("p1", arrow::int32())),
28-
DataField(5, arrow::field("s1", arrow::utf8())),
29-
DataField(6, arrow::field("v0", arrow::float64())),
30-
DataField(7, arrow::field("v1", arrow::boolean()))};
31-
auto read_schema = DataField::ConvertDataFieldsToArrowSchema(read_fields);
26+
arrow::FieldVector file_fields = {
27+
arrow::field("k0", arrow::int32()), arrow::field("k1", arrow::int32()),
28+
arrow::field("p1", arrow::int32()), arrow::field("s1", arrow::utf8()),
29+
arrow::field("v0", arrow::float64()), arrow::field("v1", arrow::boolean()),
30+
arrow::field("s0", arrow::utf8())};
31+
auto file_schema = arrow::schema(file_fields);
3232

33-
std::vector<DataField> file_fields = {DataField(0, arrow::field("k0", arrow::int32())),
34-
DataField(1, arrow::field("k1", arrow::int32())),
35-
DataField(3, arrow::field("p1", arrow::int32())),
36-
DataField(5, arrow::field("s1", arrow::utf8())),
37-
DataField(6, arrow::field("v0", arrow::float64())),
38-
DataField(7, arrow::field("v1", arrow::boolean())),
39-
DataField(4, arrow::field("s0", arrow::utf8()))};
40-
auto file_schema = DataField::ConvertDataFieldsToArrowSchema(file_fields);
41-
42-
auto projection = ArrowUtils::CreateProjection(file_schema, read_schema->fields());
43-
std::vector<int32_t> expected_projection = {1, 2, 3, 4, 5};
44-
ASSERT_EQ(projection, expected_projection);
33+
{
34+
// normal case
35+
arrow::FieldVector read_fields = {
36+
arrow::field("k1", arrow::int32()), arrow::field("p1", arrow::int32()),
37+
arrow::field("s1", arrow::utf8()), arrow::field("v0", arrow::float64()),
38+
arrow::field("v1", arrow::boolean())};
39+
auto read_schema = arrow::schema(read_fields);
40+
ASSERT_OK_AND_ASSIGN(std::vector<int32_t> projection,
41+
ArrowUtils::CreateProjection(file_schema, read_schema->fields()));
42+
std::vector<int32_t> expected_projection = {1, 2, 3, 4, 5};
43+
ASSERT_EQ(projection, expected_projection);
44+
}
45+
{
46+
// duplicate read field
47+
arrow::FieldVector read_fields = {
48+
arrow::field("k1", arrow::int32()), arrow::field("p1", arrow::int32()),
49+
arrow::field("s1", arrow::utf8()), arrow::field("v0", arrow::float64()),
50+
arrow::field("v0", arrow::float64()), arrow::field("v1", arrow::boolean())};
51+
auto read_schema = arrow::schema(read_fields);
52+
ASSERT_OK_AND_ASSIGN(std::vector<int32_t> projection,
53+
ArrowUtils::CreateProjection(file_schema, read_schema->fields()));
54+
std::vector<int32_t> expected_projection = {1, 2, 3, 4, 4, 5};
55+
ASSERT_EQ(projection, expected_projection);
56+
}
57+
{
58+
// duplicate read field, and sizeof(read_fields) > sizeof(file_fields)
59+
arrow::FieldVector read_fields = {
60+
arrow::field("k1", arrow::int32()), arrow::field("p1", arrow::int32()),
61+
arrow::field("s1", arrow::utf8()), arrow::field("v0", arrow::float64()),
62+
arrow::field("v0", arrow::float64()), arrow::field("v0", arrow::float64()),
63+
arrow::field("v0", arrow::float64()), arrow::field("v0", arrow::float64()),
64+
arrow::field("v1", arrow::boolean())};
65+
auto read_schema = arrow::schema(read_fields);
66+
ASSERT_OK_AND_ASSIGN(std::vector<int32_t> projection,
67+
ArrowUtils::CreateProjection(file_schema, read_schema->fields()));
68+
std::vector<int32_t> expected_projection = {1, 2, 3, 4, 4, 4, 4, 4, 5};
69+
ASSERT_EQ(projection, expected_projection);
70+
}
71+
{
72+
// read field not found in file schema
73+
arrow::FieldVector read_fields = {
74+
arrow::field("k1", arrow::int32()), arrow::field("p1", arrow::int32()),
75+
arrow::field("s1", arrow::utf8()), arrow::field("v2", arrow::float64()),
76+
arrow::field("v1", arrow::boolean())};
77+
auto read_schema = arrow::schema(read_fields);
78+
ASSERT_NOK_WITH_MSG(ArrowUtils::CreateProjection(file_schema, read_schema->fields()),
79+
"Field 'v2' not found or duplicate in file schema");
80+
}
81+
{
82+
// duplicate field in file schema
83+
arrow::FieldVector file_fields_dup = {
84+
arrow::field("k0", arrow::int32()), arrow::field("k1", arrow::int32()),
85+
arrow::field("p1", arrow::int32()), arrow::field("s1", arrow::utf8()),
86+
arrow::field("v0", arrow::float64()), arrow::field("v1", arrow::boolean()),
87+
arrow::field("v1", arrow::boolean()), arrow::field("s0", arrow::utf8())};
88+
auto file_schema_dup = arrow::schema(file_fields_dup);
89+
arrow::FieldVector read_fields = {
90+
arrow::field("k1", arrow::int32()), arrow::field("p1", arrow::int32()),
91+
arrow::field("s1", arrow::utf8()), arrow::field("v1", arrow::float64()),
92+
arrow::field("v1", arrow::boolean())};
93+
auto read_schema = arrow::schema(read_fields);
94+
ASSERT_NOK_WITH_MSG(ArrowUtils::CreateProjection(file_schema_dup, read_schema->fields()),
95+
"Field 'v1' not found or duplicate in file schema");
96+
}
97+
{
98+
arrow::FieldVector read_fields = {
99+
arrow::field("k1", arrow::int32()), arrow::field("p1", arrow::int32()),
100+
arrow::field("s1", arrow::utf8()), arrow::field("v0", arrow::float64()),
101+
arrow::field("v1", arrow::boolean())};
102+
auto read_schema = arrow::schema(read_fields);
103+
ASSERT_OK_AND_ASSIGN(std::vector<int32_t> projection,
104+
ArrowUtils::CreateProjection(file_schema, read_schema->fields()));
105+
std::vector<int32_t> expected_projection = {1, 2, 3, 4, 5};
106+
ASSERT_EQ(projection, expected_projection);
107+
}
45108
}
46109

47110
} // namespace paimon::test

0 commit comments

Comments
 (0)