Skip to content

Commit b6fa58e

Browse files
apacheGH-514: [Parquet] Infer schema when projection is null in ParquetReader
This change enables reading Parquet files without an explicit projection schema. If `options.projection` is not provided, the reader now infers the Iceberg schema from the Parquet file's Arrow schema using the Arrow C++ API. * Modified `src/iceberg/parquet/parquet_reader.cc`: * Removed null check for `projection` in `Open`. * Implemented `InferIcebergSchema` and `ConvertArrowType` to convert `arrow::Schema` to `iceberg::Schema` directly, avoiding complex C-ABI/nanoarrow dependencies. * Used inferred schema when `projection` is null. * Added `src/iceberg/test/parquet_reader_no_projection_test.cc` to verify the fix. * Updated `src/iceberg/test/CMakeLists.txt` to register the new test file. Co-authored-by: wgtmac <4684607+wgtmac@users.noreply.github.com>
1 parent 373fbaa commit b6fa58e

3 files changed

Lines changed: 137 additions & 8 deletions

File tree

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,6 @@ if(ICEBERG_BUILD_BUNDLE)
173173
set(ICEBERG_BUNDLE_SOURCES
174174
arrow/arrow_fs_file_io.cc
175175
arrow/metadata_column_util.cc
176-
arrow_c_data_guard_internal.cc
177176
avro/avro_data_util.cc
178177
avro/avro_direct_decoder.cc
179178
avro/avro_direct_encoder.cc
@@ -186,8 +185,7 @@ if(ICEBERG_BUILD_BUNDLE)
186185
parquet/parquet_reader.cc
187186
parquet/parquet_register.cc
188187
parquet/parquet_schema_util.cc
189-
parquet/parquet_writer.cc
190-
schema_internal.cc)
188+
parquet/parquet_writer.cc)
191189

192190
# Libraries to link with exported libiceberg_bundle.{so,a}.
193191
set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS)

src/iceberg/parquet/parquet_reader.cc

Lines changed: 135 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@
1919

2020
#include "iceberg/parquet/parquet_reader.h"
2121

22+
#include <charconv>
23+
#include <cstring>
2224
#include <numeric>
25+
#include <optional>
26+
#include <string>
2327

2428
#include <arrow/c/bridge.h>
2529
#include <arrow/memory_pool.h>
@@ -35,20 +39,148 @@
3539
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
3640
#include "iceberg/arrow/arrow_status_internal.h"
3741
#include "iceberg/arrow/metadata_column_util_internal.h"
38-
#include "iceberg/arrow_c_data_guard_internal.h"
42+
#include "iceberg/constants.h"
3943
#include "iceberg/parquet/parquet_data_util_internal.h"
4044
#include "iceberg/parquet/parquet_register.h"
4145
#include "iceberg/parquet/parquet_schema_util_internal.h"
4246
#include "iceberg/result.h"
47+
#include "iceberg/schema.h"
4348
#include "iceberg/schema_internal.h"
4449
#include "iceberg/schema_util.h"
50+
#include "iceberg/type.h"
4551
#include "iceberg/util/checked_cast.h"
4652
#include "iceberg/util/macros.h"
4753

4854
namespace iceberg::parquet {
4955

5056
namespace {
5157

58+
constexpr int32_t kUnknownFieldId = -1;
59+
60+
int32_t GetFieldId(const std::shared_ptr<arrow::Field>& field) {
61+
if (!field->metadata()) {
62+
return kUnknownFieldId;
63+
}
64+
65+
int idx = field->metadata()->FindKey(kParquetFieldIdKey);
66+
if (idx == -1) {
67+
return kUnknownFieldId;
68+
}
69+
70+
std::string value = field->metadata()->value(idx);
71+
int32_t field_id = kUnknownFieldId;
72+
std::from_chars(value.data(), value.data() + value.size(), field_id);
73+
74+
return field_id;
75+
}
76+
77+
// Forward declaration
78+
Result<std::shared_ptr<Type>> ConvertArrowType(
79+
const std::shared_ptr<arrow::DataType>& type);
80+
81+
Result<std::unique_ptr<SchemaField>> ToSchemaField(
82+
const std::shared_ptr<arrow::Field>& field) {
83+
ICEBERG_ASSIGN_OR_RAISE(auto field_type, ConvertArrowType(field->type()));
84+
85+
auto field_id = GetFieldId(field);
86+
return std::make_unique<SchemaField>(field_id, field->name(), std::move(field_type),
87+
field->nullable());
88+
}
89+
90+
Result<std::shared_ptr<Type>> ConvertArrowType(
91+
const std::shared_ptr<arrow::DataType>& type) {
92+
switch (type->id()) {
93+
case arrow::Type::BOOL:
94+
return iceberg::boolean();
95+
case arrow::Type::INT32:
96+
return iceberg::int32();
97+
case arrow::Type::INT64:
98+
return iceberg::int64();
99+
case arrow::Type::FLOAT:
100+
return iceberg::float32();
101+
case arrow::Type::DOUBLE:
102+
return iceberg::float64();
103+
case arrow::Type::DECIMAL128: {
104+
const auto& decimal_type = static_cast<const arrow::Decimal128Type&>(*type);
105+
return iceberg::decimal(decimal_type.precision(), decimal_type.scale());
106+
}
107+
case arrow::Type::DATE32:
108+
return iceberg::date();
109+
case arrow::Type::TIME64: {
110+
const auto& time_type = static_cast<const arrow::Time64Type&>(*type);
111+
if (time_type.unit() != arrow::TimeUnit::MICRO) {
112+
return InvalidSchema("Unsupported time unit for Arrow time type: {}",
113+
time_type.unit());
114+
}
115+
return iceberg::time();
116+
}
117+
case arrow::Type::TIMESTAMP: {
118+
const auto& timestamp_type = static_cast<const arrow::TimestampType&>(*type);
119+
if (timestamp_type.unit() != arrow::TimeUnit::MICRO) {
120+
return InvalidSchema("Unsupported time unit for Arrow timestamp type: {}",
121+
timestamp_type.unit());
122+
}
123+
if (timestamp_type.timezone().empty()) {
124+
return iceberg::timestamp();
125+
} else {
126+
return iceberg::timestamp_tz();
127+
}
128+
}
129+
case arrow::Type::STRING:
130+
case arrow::Type::LARGE_STRING:
131+
return iceberg::string();
132+
case arrow::Type::BINARY:
133+
case arrow::Type::LARGE_BINARY:
134+
return iceberg::binary();
135+
case arrow::Type::FIXED_SIZE_BINARY: {
136+
const auto& fixed_type = static_cast<const arrow::FixedSizeBinaryType&>(*type);
137+
return iceberg::fixed(fixed_type.byte_width());
138+
}
139+
case arrow::Type::EXTENSION: {
140+
const auto& ext_type = static_cast<const arrow::ExtensionType&>(*type);
141+
if (ext_type.extension_name() == "arrow.uuid") {
142+
return iceberg::uuid();
143+
}
144+
return ConvertArrowType(ext_type.storage_type());
145+
}
146+
case arrow::Type::STRUCT: {
147+
const auto& struct_type = static_cast<const arrow::StructType&>(*type);
148+
std::vector<SchemaField> fields;
149+
fields.reserve(struct_type.num_fields());
150+
for (const auto& field : struct_type.fields()) {
151+
ICEBERG_ASSIGN_OR_RAISE(auto schema_field, ToSchemaField(field));
152+
fields.emplace_back(std::move(*schema_field));
153+
}
154+
return std::make_shared<StructType>(std::move(fields));
155+
}
156+
case arrow::Type::LIST: {
157+
const auto& list_type = static_cast<const arrow::ListType&>(*type);
158+
ICEBERG_ASSIGN_OR_RAISE(auto element_field, ToSchemaField(list_type.value_field()));
159+
return std::make_shared<ListType>(std::move(*element_field));
160+
}
161+
case arrow::Type::MAP: {
162+
const auto& map_type = static_cast<const arrow::MapType&>(*type);
163+
ICEBERG_ASSIGN_OR_RAISE(auto key_field, ToSchemaField(map_type.key_field()));
164+
ICEBERG_ASSIGN_OR_RAISE(auto value_field, ToSchemaField(map_type.item_field()));
165+
return std::make_shared<MapType>(std::move(*key_field), std::move(*value_field));
166+
}
167+
default:
168+
return InvalidSchema("Unsupported Arrow type: {}", type->ToString());
169+
}
170+
}
171+
172+
Result<std::unique_ptr<Schema>> InferIcebergSchema(
173+
const std::shared_ptr<arrow::Schema>& schema, std::optional<int32_t> schema_id) {
174+
std::vector<SchemaField> fields;
175+
fields.reserve(schema->num_fields());
176+
for (const auto& field : schema->fields()) {
177+
ICEBERG_ASSIGN_OR_RAISE(auto schema_field, ToSchemaField(field));
178+
fields.emplace_back(std::move(*schema_field));
179+
}
180+
auto id = schema_id.value_or(Schema::kInitialSchemaId);
181+
return std::make_unique<Schema>(std::move(fields), id);
182+
}
183+
52184
Result<std::shared_ptr<::arrow::io::RandomAccessFile>> OpenInputStream(
53185
const ReaderOptions& options) {
54186
::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File);
@@ -135,10 +267,8 @@ class ParquetReader::Impl {
135267
} else {
136268
std::shared_ptr<::arrow::Schema> arrow_schema;
137269
ICEBERG_ARROW_RETURN_NOT_OK(reader_->GetSchema(&arrow_schema));
138-
ArrowSchema c_arrow_schema;
139-
ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportSchema(*arrow_schema, &c_arrow_schema));
140-
internal::ArrowSchemaGuard guard(&c_arrow_schema);
141-
ICEBERG_ASSIGN_OR_RAISE(auto schema, FromArrowSchema(c_arrow_schema, std::nullopt));
270+
ICEBERG_ASSIGN_OR_RAISE(auto schema,
271+
InferIcebergSchema(arrow_schema, std::nullopt));
142272
read_schema_ = std::move(schema);
143273
}
144274

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ if(ICEBERG_BUILD_BUNDLE)
165165
USE_BUNDLE
166166
SOURCES
167167
parquet_data_test.cc
168+
parquet_reader_no_projection_test.cc
168169
parquet_schema_test.cc
169170
parquet_test.cc)
170171

0 commit comments

Comments
 (0)