diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc index d13df0eb8..73807b009 100644 --- a/src/iceberg/parquet/parquet_reader.cc +++ b/src/iceberg/parquet/parquet_reader.cc @@ -19,9 +19,14 @@ #include "iceberg/parquet/parquet_reader.h" +#include +#include #include +#include +#include #include +#include #include #include #include @@ -35,12 +40,15 @@ #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" #include "iceberg/arrow/metadata_column_util_internal.h" +#include "iceberg/constants.h" #include "iceberg/parquet/parquet_data_util_internal.h" #include "iceberg/parquet/parquet_register.h" #include "iceberg/parquet/parquet_schema_util_internal.h" #include "iceberg/result.h" +#include "iceberg/schema.h" #include "iceberg/schema_internal.h" #include "iceberg/schema_util.h" +#include "iceberg/type.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" @@ -48,6 +56,132 @@ namespace iceberg::parquet { namespace { +constexpr int32_t kUnknownFieldId = -1; + +int32_t GetFieldId(const std::shared_ptr<::arrow::Field>& field) { + if (!field->metadata()) { + return kUnknownFieldId; + } + + int idx = field->metadata()->FindKey(kParquetFieldIdKey); + if (idx == -1) { + return kUnknownFieldId; + } + + std::string value = field->metadata()->value(idx); + int32_t field_id = kUnknownFieldId; + std::from_chars(value.data(), value.data() + value.size(), field_id); + + return field_id; +} + +// Forward declaration +Result> ConvertArrowType( + const std::shared_ptr<::arrow::DataType>& type); + +Result> ToSchemaField( + const std::shared_ptr<::arrow::Field>& field) { + ICEBERG_ASSIGN_OR_RAISE(auto field_type, ConvertArrowType(field->type())); + + auto field_id = GetFieldId(field); + return std::make_unique(field_id, field->name(), std::move(field_type), + field->nullable()); +} + +Result> ConvertArrowType( + const std::shared_ptr<::arrow::DataType>& type) { + switch (type->id()) { + case ::arrow::Type::BOOL: + return iceberg::boolean(); + case ::arrow::Type::INT32: + return iceberg::int32(); + case ::arrow::Type::INT64: + return iceberg::int64(); + case ::arrow::Type::FLOAT: + return iceberg::float32(); + case ::arrow::Type::DOUBLE: + return iceberg::float64(); + case ::arrow::Type::DECIMAL128: { + const auto& decimal_type = static_cast(*type); + return iceberg::decimal(decimal_type.precision(), decimal_type.scale()); + } + case ::arrow::Type::DATE32: + return iceberg::date(); + case ::arrow::Type::TIME64: { + const auto& time_type = static_cast(*type); + if (time_type.unit() != ::arrow::TimeUnit::MICRO) { + return InvalidSchema("Unsupported time unit for Arrow time type: {}", + static_cast(time_type.unit())); + } + return iceberg::time(); + } + case ::arrow::Type::TIMESTAMP: { + const auto& timestamp_type = static_cast(*type); + if (timestamp_type.unit() != ::arrow::TimeUnit::MICRO) { + return InvalidSchema("Unsupported time unit for Arrow timestamp type: {}", + static_cast(timestamp_type.unit())); + } + if (timestamp_type.timezone().empty()) { + return iceberg::timestamp(); + } else { + return iceberg::timestamp_tz(); + } + } + case ::arrow::Type::STRING: + case ::arrow::Type::LARGE_STRING: + return iceberg::string(); + case ::arrow::Type::BINARY: + case ::arrow::Type::LARGE_BINARY: + return iceberg::binary(); + case ::arrow::Type::FIXED_SIZE_BINARY: { + const auto& fixed_type = static_cast(*type); + return iceberg::fixed(fixed_type.byte_width()); + } + case ::arrow::Type::EXTENSION: { + const auto& ext_type = static_cast(*type); + if (ext_type.extension_name() == "arrow.uuid") { + return iceberg::uuid(); + } + return ConvertArrowType(ext_type.storage_type()); + } + case ::arrow::Type::STRUCT: { + const auto& struct_type = static_cast(*type); + std::vector fields; + fields.reserve(struct_type.num_fields()); + for (const auto& field : struct_type.fields()) { + ICEBERG_ASSIGN_OR_RAISE(auto schema_field, ToSchemaField(field)); + fields.emplace_back(std::move(*schema_field)); + } + return std::make_shared(std::move(fields)); + } + case ::arrow::Type::LIST: { + const auto& list_type = static_cast(*type); + ICEBERG_ASSIGN_OR_RAISE(auto element_field, ToSchemaField(list_type.value_field())); + return std::make_shared(std::move(*element_field)); + } + case ::arrow::Type::MAP: { + const auto& map_type = static_cast(*type); + ICEBERG_ASSIGN_OR_RAISE(auto key_field, ToSchemaField(map_type.key_field())); + ICEBERG_ASSIGN_OR_RAISE(auto value_field, ToSchemaField(map_type.item_field())); + return std::make_shared(std::move(*key_field), std::move(*value_field)); + } + default: + return InvalidSchema("Unsupported Arrow type: {}", type->ToString()); + } +} + +Result> InferIcebergSchema( + const std::shared_ptr<::arrow::Schema>& schema, std::optional schema_id) { + std::vector fields; + fields.reserve(schema->num_fields()); + for (const auto& field : schema->fields()) { + ICEBERG_ASSIGN_OR_RAISE(auto schema_field, ToSchemaField(field)); + fields.emplace_back(std::move(*schema_field)); + } + auto id = schema_id.value_or(Schema::kInitialSchemaId); + return std::make_unique(std::move(fields), id); +} + Result> OpenInputStream( const ReaderOptions& options) { ::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File); @@ -113,12 +247,7 @@ class ParquetReader::Impl { public: // Open the Parquet reader with the given options Status Open(const ReaderOptions& options) { - if (options.projection == nullptr) { - return InvalidArgument("Projected schema is required by Parquet reader"); - } - split_ = options.split; - read_schema_ = options.projection; // Prepare reader properties ::parquet::ReaderProperties reader_properties(pool_); @@ -134,6 +263,16 @@ class ParquetReader::Impl { ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileReader::Make( pool_, std::move(file_reader), arrow_reader_properties, &reader_)); + if (options.projection != nullptr) { + read_schema_ = options.projection; + } else { + std::shared_ptr<::arrow::Schema> arrow_schema; + ICEBERG_ARROW_RETURN_NOT_OK(reader_->GetSchema(&arrow_schema)); + ICEBERG_ASSIGN_OR_RAISE(auto schema, + InferIcebergSchema(arrow_schema, std::nullopt)); + read_schema_ = std::move(schema); + } + // Project read schema onto the Parquet file schema ICEBERG_ASSIGN_OR_RAISE(projection_, BuildProjection(reader_.get(), *read_schema_)); metadata_context_ = {.file_path = options.path, .next_file_pos = 0}; diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 215d883b0..125fe0895 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -165,6 +165,7 @@ if(ICEBERG_BUILD_BUNDLE) USE_BUNDLE SOURCES parquet_data_test.cc + parquet_reader_no_projection_test.cc parquet_schema_test.cc parquet_test.cc) diff --git a/src/iceberg/test/parquet_reader_no_projection_test.cc b/src/iceberg/test/parquet_reader_no_projection_test.cc new file mode 100644 index 000000000..0e253839a --- /dev/null +++ b/src/iceberg/test/parquet_reader_no_projection_test.cc @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/file_reader.h" +#include "iceberg/file_writer.h" +#include "iceberg/parquet/parquet_register.h" +#include "iceberg/schema.h" +#include "iceberg/schema_internal.h" +#include "iceberg/test/matchers.h" +#include "iceberg/type.h" + +namespace iceberg::parquet { + +namespace { + +Status WriteArray(std::shared_ptr<::arrow::Array> data, + const WriterOptions& writer_options) { + ICEBERG_ASSIGN_OR_RAISE( + auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet, writer_options)); + ArrowArray arr; + ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr)); + ICEBERG_RETURN_UNEXPECTED(writer->Write(&arr)); + return writer->Close(); +} + +} // namespace + +class ParquetReaderNoProjectionTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { parquet::RegisterAll(); } + + void SetUp() override { + file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + temp_parquet_file_ = "parquet_reader_no_projection_test.parquet"; + } + + void CreateSimpleParquetFile() { + auto schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string())}); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + auto array = + ::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()), + R"([[1, "Foo"],[2, "Bar"],[3, "Baz"]])") + .ValueOrDie(); + + WriterProperties writer_properties; + writer_properties.Set(WriterProperties::kParquetCompression, + std::string("uncompressed")); + + ASSERT_TRUE(WriteArray(array, {.path = temp_parquet_file_, + .schema = schema, + .io = file_io_, + .properties = std::move(writer_properties)})); + } + + void VerifyNextBatch(Reader& reader, std::string_view expected_json) { + auto schema_result = reader.Schema(); + ASSERT_THAT(schema_result, IsOk()); + auto arrow_c_schema = std::move(schema_result.value()); + auto import_schema_result = ::arrow::ImportType(&arrow_c_schema); + auto arrow_schema = import_schema_result.ValueOrDie(); + + auto data = reader.Next(); + ASSERT_THAT(data, IsOk()) << "Reader.Next() failed: " << data.error().message; + ASSERT_TRUE(data.value().has_value()) << "Reader.Next() returned no data"; + auto arrow_c_array = data.value().value(); + auto data_result = ::arrow::ImportArray(&arrow_c_array, arrow_schema); + auto arrow_array = data_result.ValueOrDie(); + + auto expected_array = + ::arrow::json::ArrayFromJSONString(arrow_schema, expected_json).ValueOrDie(); + ASSERT_TRUE(arrow_array->Equals(*expected_array)); + } + + void VerifyExhausted(Reader& reader) { + auto data = reader.Next(); + ASSERT_THAT(data, IsOk()); + ASSERT_FALSE(data.value().has_value()); + } + + std::shared_ptr file_io_; + std::string temp_parquet_file_; +}; + +TEST_F(ParquetReaderNoProjectionTest, ReadWithoutProjection) { + CreateSimpleParquetFile(); + + // No projection passed + auto reader_result = ReaderFactoryRegistry::Open( + FileFormatType::kParquet, {.path = temp_parquet_file_, .io = file_io_}); + + ASSERT_THAT(reader_result, IsOk()) + << "Failed to create reader: " << reader_result.error().message; + auto reader = std::move(reader_result.value()); + + ASSERT_NO_FATAL_FAILURE( + VerifyNextBatch(*reader, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); +} + +} // namespace iceberg::parquet