Skip to content

Commit ce30233

Browse files
author
xiao.dong
committed
feat: support manifest reader
- add manifest reader - refactor of arrow array parser - add basic case
1 parent 5bffdf6 commit ce30233

10 files changed

Lines changed: 587 additions & 69 deletions

src/iceberg/avro/avro_schema_util.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,7 @@ ::avro::LogicalType GetMapLogicalType() {
5858
std::call_once(flag, []() {
5959
// Register the map logical type with the avro custom logical type registry.
6060
// See https://github.com/apache/avro/pull/3326 for details.
61-
::avro::CustomLogicalTypeRegistry::instance().registerType(
62-
"map", [](const std::string&) { return std::make_shared<MapLogicalType>(); });
61+
RegisterLogicalTypes();
6362
});
6463
return ::avro::LogicalType(std::make_shared<MapLogicalType>());
6564
}
@@ -73,6 +72,11 @@ ::avro::CustomAttributes GetAttributesWithFieldId(int32_t field_id) {
7372

7473
} // namespace
7574

75+
void RegisterLogicalTypes() {
76+
::avro::CustomLogicalTypeRegistry::instance().registerType(
77+
"map", [](const std::string&) { return std::make_shared<MapLogicalType>(); });
78+
}
79+
7680
std::string ToString(const ::avro::NodePtr& node) {
7781
std::stringstream ss;
7882
ss << *node;

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,4 +144,6 @@ std::string ToString(const ::avro::LogicalType::Type& logical_type);
144144
/// \return True if the node has a map logical type, false otherwise.
145145
bool HasMapLogicalType(const ::avro::NodePtr& node);
146146

147+
void RegisterLogicalTypes();
148+
147149
} // namespace iceberg::avro

src/iceberg/file_format.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
#include "iceberg/iceberg_export.h"
2929
#include "iceberg/result.h"
30+
#include "iceberg/util/string_utils.h"
3031

3132
namespace iceberg {
3233

@@ -56,10 +57,11 @@ ICEBERG_EXPORT inline std::string_view ToString(FileFormatType format_type) {
5657
/// \brief Convert a string to a FileFormatType
5758
ICEBERG_EXPORT constexpr Result<FileFormatType> FileFormatTypeFromString(
5859
std::string_view str) noexcept {
59-
if (str == "parquet") return FileFormatType::kParquet;
60-
if (str == "avro") return FileFormatType::kAvro;
61-
if (str == "orc") return FileFormatType::kOrc;
62-
if (str == "puffin") return FileFormatType::kPuffin;
60+
auto lower = internal::StringUtils::to_lower(str);
61+
if (lower == "parquet") return FileFormatType::kParquet;
62+
if (lower == "avro") return FileFormatType::kAvro;
63+
if (lower == "orc") return FileFormatType::kOrc;
64+
if (lower == "puffin") return FileFormatType::kPuffin;
6365
return InvalidArgument("Invalid file format type: {}", str);
6466
}
6567

src/iceberg/manifest_entry.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ std::shared_ptr<StructType> DataFile::Type(std::shared_ptr<StructType> partition
4040
kContent,
4141
kFilePath,
4242
kFileFormat,
43-
SchemaField::MakeRequired(102, "partition", std::move(partition_type)),
43+
SchemaField::MakeRequired(102, kPartitionField, std::move(partition_type)),
4444
kRecordCount,
4545
kFileSize,
4646
kColumnSizes,
@@ -68,7 +68,7 @@ std::shared_ptr<StructType> ManifestEntry::TypeFromDataFileType(
6868
std::shared_ptr<StructType> datafile_type) {
6969
return std::make_shared<StructType>(std::vector<SchemaField>{
7070
kStatus, kSnapshotId, kSequenceNumber, kFileSequenceNumber,
71-
SchemaField::MakeRequired(2, "data_file", std::move(datafile_type))});
71+
SchemaField::MakeRequired(2, kDataFileField, std::move(datafile_type))});
7272
}
7373

7474
} // namespace iceberg

src/iceberg/manifest_entry.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,8 @@ struct ICEBERG_EXPORT DataFile {
182182
inline static const SchemaField kFilePath = SchemaField::MakeRequired(
183183
100, "file_path", iceberg::string(), "Location URI with FS scheme");
184184
inline static const SchemaField kFileFormat = SchemaField::MakeRequired(
185-
101, "file_format", iceberg::int32(), "File format name: avro, orc, or parquet");
185+
101, "file_format", iceberg::string(), "File format name: avro, orc, or parquet");
186+
inline static const std::string kPartitionField = "partition";
186187
inline static const SchemaField kRecordCount = SchemaField::MakeRequired(
187188
103, "record_count", iceberg::int64(), "Number of records in the file");
188189
inline static const SchemaField kFileSize = SchemaField::MakeRequired(
@@ -299,6 +300,7 @@ struct ICEBERG_EXPORT ManifestEntry {
299300
SchemaField::MakeOptional(3, "sequence_number", iceberg::int64());
300301
inline static const SchemaField kFileSequenceNumber =
301302
SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());
303+
inline static const std::string kDataFileField = "data_file";
302304

303305
bool operator==(const ManifestEntry& other) const;
304306

src/iceberg/manifest_reader_internal.cc

Lines changed: 402 additions & 60 deletions
Large diffs are not rendered by default.

src/iceberg/util/string_utils.h

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <algorithm>
23+
#include <string>
24+
25+
namespace iceberg::internal {
26+
27+
class StringUtils {
28+
public:
29+
static std::string to_lower(std::string_view str) {
30+
std::string input(str);
31+
std::transform(input.begin(), input.end(), input.begin(),
32+
[](char c) { return std::tolower(c); });
33+
return input;
34+
}
35+
36+
static std::string to_upper(std::string_view str) {
37+
std::string input(str);
38+
std::transform(input.begin(), input.end(), input.begin(),
39+
[](char c) { return std::toupper(c); });
40+
return input;
41+
}
42+
};
43+
44+
} // namespace iceberg::internal

test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ if(ICEBERG_BUILD_BUNDLE)
7878
avro_schema_test.cc
7979
avro_stream_test.cc
8080
manifest_list_reader_test.cc
81+
manifest_reader_test.cc
8182
test_common.cc)
8283
target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main
8384
GTest::gmock)

test/manifest_reader_test.cc

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/manifest_reader.h"
21+
22+
#include <arrow/filesystem/localfs.h>
23+
#include <gtest/gtest.h>
24+
25+
#include "iceberg/arrow/arrow_fs_file_io.h"
26+
#include "iceberg/avro/avro_reader.h"
27+
#include "iceberg/avro/avro_schema_util_internal.h"
28+
#include "iceberg/manifest_entry.h"
29+
#include "iceberg/schema.h"
30+
#include "temp_file_test_base.h"
31+
#include "test_common.h"
32+
33+
namespace iceberg {
34+
35+
class ManifestReaderTest : public TempFileTestBase {
36+
protected:
37+
static void SetUpTestSuite() { avro::AvroReader::Register(); }
38+
39+
void SetUp() override {
40+
TempFileTestBase::SetUp();
41+
local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
42+
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);
43+
44+
avro::RegisterLogicalTypes();
45+
}
46+
47+
std::vector<ManifestEntry> prepare_manifest_entries() {
48+
std::vector<ManifestEntry> manifest_entries;
49+
std::string test_dir_prefix = "/tmp/db/db/iceberg_test/data/";
50+
std::vector<std::string> paths = {
51+
"order_ts_hour=2021-01-27-00/"
52+
"00000-2-d5ae78b7-4449-45ec-adb7-c0e9c0bdb714-0-00001.parquet",
53+
"order_ts_hour=2024-01-27-00/"
54+
"00000-2-d5ae78b7-4449-45ec-adb7-c0e9c0bdb714-0-00002.parquet",
55+
"order_ts_hour=2023-01-26-00/"
56+
"00000-2-d5ae78b7-4449-45ec-adb7-c0e9c0bdb714-0-00003.parquet",
57+
"order_ts_hour=2021-01-26-00/"
58+
"00000-2-d5ae78b7-4449-45ec-adb7-c0e9c0bdb714-0-00004.parquet"};
59+
std::vector<int64_t> partitions = {447696, 473976, 465192, 447672};
60+
std::vector<std::map<int32_t, std::vector<uint8_t>>> bounds = {
61+
{{1, {0xd2, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
62+
{2, {'.', 0x16, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
63+
{3, {0x12, 0xe2}},
64+
{4, {0xc0, 'y', 0xe7, 0x98, 0xd6, 0xb9, 0x05, 0x00}}},
65+
{{1, {0xd2, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
66+
{2, {'.', 0x16, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
67+
{3, {0x12, 0xe3}},
68+
{4, {0xc0, 0x19, '#', '=', 0xe2, 0x0f, 0x06, 0x00}}},
69+
{{1, {'{', 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
70+
{2, {0xc8, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
71+
{3, {0x0e, '"'}},
72+
{4, {0xc0, 0xd9, '7', 0x93, 0x1f, 0xf3, 0x05, 0x00}}},
73+
{{1, {'{', 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
74+
{2, {0xc8, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}},
75+
{3, {0x0e, '!'}},
76+
{4, {0xc0, 0x19, 0x10, '{', 0xc2, 0xb9, 0x05, 0x00}}},
77+
};
78+
for (int i = 0; i < 4; ++i) {
79+
ManifestEntry entry;
80+
entry.status = ManifestStatus::kAdded;
81+
entry.snapshot_id = 6387266376565973956;
82+
entry.data_file = std::make_shared<DataFile>();
83+
entry.data_file->file_path = test_dir_prefix + paths[i];
84+
entry.data_file->file_format = FileFormatType::kParquet;
85+
entry.data_file->partition.emplace_back(Literal::Int(partitions[i]));
86+
entry.data_file->record_count = 1;
87+
entry.data_file->file_size_in_bytes = 1375;
88+
entry.data_file->column_sizes = {{1, 49}, {2, 49}, {3, 49}, {4, 49}};
89+
entry.data_file->value_counts = {{1, 1}, {2, 1}, {3, 1}, {4, 1}};
90+
entry.data_file->null_value_counts = {{1, 0}, {2, 0}, {3, 0}, {4, 0}};
91+
entry.data_file->split_offsets = {4};
92+
entry.data_file->sort_order_id = 0;
93+
entry.data_file->upper_bounds = bounds[i];
94+
entry.data_file->lower_bounds = bounds[i];
95+
manifest_entries.emplace_back(entry);
96+
}
97+
return manifest_entries;
98+
}
99+
100+
std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
101+
std::shared_ptr<FileIO> file_io_;
102+
};
103+
104+
TEST_F(ManifestReaderTest, BasicTest) {
105+
iceberg::SchemaField partition_field(1000, "order_ts_hour", iceberg::int32(), true);
106+
auto partition_schema =
107+
std::make_shared<Schema>(std::vector<SchemaField>({partition_field}));
108+
std::string path = GetResourcePath("56357cd7-391f-4df8-aa24-e7e667da8870-m4.avro");
109+
auto manifest_reader_result =
110+
ManifestReader::MakeReader(path, file_io_, partition_schema);
111+
ASSERT_EQ(manifest_reader_result.has_value(), true)
112+
<< manifest_reader_result.error().message;
113+
auto manifest_reader = std::move(manifest_reader_result.value());
114+
auto read_result = manifest_reader->Entries();
115+
ASSERT_EQ(read_result.has_value(), true) << read_result.error().message;
116+
117+
auto expected_entries = prepare_manifest_entries();
118+
ASSERT_EQ(read_result.value(), expected_entries);
119+
}
120+
121+
} // namespace iceberg
7.36 KB
Binary file not shown.

0 commit comments

Comments
 (0)