Skip to content

Commit 3eb74b3

Browse files
callmepandeyclaude
andcommitted
feat: Enable LargeListArray support in Parquet reader schema validation
Update ValidateParquetSchemaEvolution to accept LARGE_LIST as compatible with Iceberg's ListType. This enables reading Parquet files containing LargeListArray (64-bit offsets) through the Iceberg reader. The data conversion layer already supports LargeListArray via the templated ProjectListArrayImpl<> added in apache#510. This change removes the schema validation blocker that was rejecting LARGE_LIST types. Closes apache#513 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent deec370 commit 3eb74b3

2 files changed

Lines changed: 189 additions & 1 deletion

File tree

src/iceberg/parquet/parquet_schema_util.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ Status ValidateParquetSchemaEvolution(
175175
}
176176
break;
177177
case TypeId::kList:
178-
if (arrow_type->id() == ::arrow::Type::LIST) {
178+
if (arrow_type->id() == ::arrow::Type::LIST ||
179+
arrow_type->id() == ::arrow::Type::LARGE_LIST) {
179180
return {};
180181
}
181182
break;

src/iceberg/test/parquet_test.cc

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,4 +462,191 @@ TEST_F(ParquetReadWrite, SimpleTypeRoundTrip) {
462462
ASSERT_TRUE(out->Equals(*array));
463463
}
464464

465+
// Test that the reader can handle LargeListArray (64-bit offsets) from parquet files
466+
class LargeListArrayReaderTest : public ::testing::Test {
467+
protected:
468+
static void SetUpTestSuite() { parquet::RegisterAll(); }
469+
470+
void SetUp() override {
471+
file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
472+
temp_parquet_file_ = "large_list_test.parquet";
473+
}
474+
475+
// Write a parquet file with LargeListArray using Arrow's writer directly
476+
void WriteLargeListParquetFile(const std::shared_ptr<::arrow::Schema>& arrow_schema,
477+
const std::shared_ptr<::arrow::Array>& data) {
478+
auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
479+
auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie();
480+
481+
auto record_batch =
482+
::arrow::RecordBatch::FromStructArray(
483+
std::static_pointer_cast<::arrow::StructArray>(data))
484+
.ValueOrDie();
485+
auto table = ::arrow::Table::FromRecordBatches({record_batch}).ValueOrDie();
486+
487+
ASSERT_TRUE(::parquet::arrow::WriteTable(*table, ::arrow::default_memory_pool(),
488+
outfile, /*chunk_size=*/1024)
489+
.ok());
490+
ASSERT_TRUE(outfile->Close().ok());
491+
}
492+
493+
std::shared_ptr<FileIO> file_io_;
494+
std::string temp_parquet_file_;
495+
};
496+
497+
TEST_F(LargeListArrayReaderTest, ReadLargeListOfIntegers) {
498+
const std::string kParquetFieldIdKey = "PARQUET:field_id";
499+
500+
// Create Arrow schema with LargeListArray (64-bit offsets)
501+
// Need to add field id to the element field within the list
502+
auto element_field =
503+
::arrow::field("element", ::arrow::int32(), /*nullable=*/true,
504+
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"2"}));
505+
auto arrow_schema = ::arrow::schema({::arrow::field(
506+
"values", ::arrow::large_list(element_field), /*nullable=*/true,
507+
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"1"}))});
508+
509+
// Create data with LargeListArray
510+
auto data =
511+
::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()),
512+
R"([[[1, 2, 3]], [[4, 5]], [null], [[]]])")
513+
.ValueOrDie();
514+
515+
WriteLargeListParquetFile(arrow_schema, data);
516+
517+
// Create Iceberg schema - uses ListType (no distinction between LIST and LARGE_LIST)
518+
auto iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{
519+
SchemaField::MakeOptional(
520+
1, "values",
521+
list(SchemaField::MakeOptional(2, "element", int32()))),
522+
});
523+
524+
// Read using Iceberg reader
525+
ICEBERG_UNWRAP_OR_FAIL(
526+
auto reader,
527+
ReaderFactoryRegistry::Open(
528+
FileFormatType::kParquet,
529+
{.path = temp_parquet_file_, .io = file_io_, .projection = iceberg_schema}));
530+
531+
// Get Arrow schema from reader
532+
auto schema_result = reader->Schema();
533+
ASSERT_THAT(schema_result, IsOk());
534+
auto arrow_c_schema = std::move(schema_result.value());
535+
auto import_schema_result = ::arrow::ImportType(&arrow_c_schema);
536+
auto read_arrow_schema = import_schema_result.ValueOrDie();
537+
538+
// Get data from reader
539+
auto read_result = reader->Next();
540+
ASSERT_THAT(read_result, IsOk());
541+
ASSERT_TRUE(read_result.value().has_value());
542+
auto arrow_c_array = read_result.value().value();
543+
auto read_array =
544+
::arrow::ImportArray(&arrow_c_array, read_arrow_schema).ValueOrDie();
545+
546+
// The output is ListArray (not LargeListArray) because Iceberg schema maps to
547+
// Arrow LIST. The important thing is that we can read LargeListArray from Parquet.
548+
auto struct_array = std::static_pointer_cast<::arrow::StructArray>(read_array);
549+
ASSERT_EQ(struct_array->num_fields(), 1);
550+
ASSERT_EQ(struct_array->field(0)->type_id(), ::arrow::Type::LIST);
551+
552+
// Verify data content - create expected with regular list type
553+
auto expected_element_field =
554+
::arrow::field("element", ::arrow::int32(), /*nullable=*/true);
555+
auto expected_schema = ::arrow::schema({::arrow::field(
556+
"values", ::arrow::list(expected_element_field), /*nullable=*/true)});
557+
auto expected =
558+
::arrow::json::ArrayFromJSONString(::arrow::struct_(expected_schema->fields()),
559+
R"([[[1, 2, 3]], [[4, 5]], [null], [[]]])")
560+
.ValueOrDie();
561+
ASSERT_TRUE(read_array->Equals(*expected));
562+
}
563+
564+
TEST_F(LargeListArrayReaderTest, ReadLargeListOfStructs) {
565+
const std::string kParquetFieldIdKey = "PARQUET:field_id";
566+
567+
// Create Arrow schema with LargeListArray containing structs
568+
auto struct_type = ::arrow::struct_({
569+
::arrow::field("id", ::arrow::int32(), /*nullable=*/false,
570+
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"3"})),
571+
::arrow::field("name", ::arrow::utf8(), /*nullable=*/true,
572+
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"4"})),
573+
});
574+
575+
// Create the element field with field_id for the struct (element of the list)
576+
auto element_field =
577+
::arrow::field("element", struct_type, /*nullable=*/true,
578+
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"2"}));
579+
580+
auto arrow_schema = ::arrow::schema({::arrow::field(
581+
"items", ::arrow::large_list(element_field), /*nullable=*/true,
582+
::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"1"}))});
583+
584+
// Create data with LargeListArray of structs
585+
auto data = ::arrow::json::ArrayFromJSONString(
586+
::arrow::struct_(arrow_schema->fields()),
587+
R"([[[{"id": 1, "name": "a"}, {"id": 2, "name": "b"}]],
588+
[[{"id": 3, "name": null}]],
589+
[null],
590+
[[]]])")
591+
.ValueOrDie();
592+
593+
WriteLargeListParquetFile(arrow_schema, data);
594+
595+
// Create Iceberg schema
596+
auto iceberg_schema = std::make_shared<Schema>(std::vector<SchemaField>{
597+
SchemaField::MakeOptional(
598+
1, "items",
599+
list(SchemaField::MakeOptional(
600+
2, "element",
601+
struct_({
602+
SchemaField::MakeRequired(3, "id", int32()),
603+
SchemaField::MakeOptional(4, "name", string()),
604+
})))),
605+
});
606+
607+
// Read using Iceberg reader
608+
ICEBERG_UNWRAP_OR_FAIL(
609+
auto reader,
610+
ReaderFactoryRegistry::Open(
611+
FileFormatType::kParquet,
612+
{.path = temp_parquet_file_, .io = file_io_, .projection = iceberg_schema}));
613+
614+
// Get data from reader
615+
auto schema_result = reader->Schema();
616+
ASSERT_THAT(schema_result, IsOk());
617+
auto arrow_c_schema = std::move(schema_result.value());
618+
auto read_arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
619+
620+
auto read_result = reader->Next();
621+
ASSERT_THAT(read_result, IsOk());
622+
ASSERT_TRUE(read_result.value().has_value());
623+
auto arrow_c_array = read_result.value().value();
624+
auto read_array =
625+
::arrow::ImportArray(&arrow_c_array, read_arrow_schema).ValueOrDie();
626+
627+
// The output is ListArray (not LargeListArray) because Iceberg schema maps to
628+
// Arrow LIST. The important thing is that we can read LargeListArray from Parquet.
629+
auto struct_array = std::static_pointer_cast<::arrow::StructArray>(read_array);
630+
ASSERT_EQ(struct_array->num_fields(), 1);
631+
ASSERT_EQ(struct_array->field(0)->type_id(), ::arrow::Type::LIST);
632+
633+
// Verify data content - create expected with regular list type
634+
auto expected_struct_type = ::arrow::struct_({
635+
::arrow::field("id", ::arrow::int32(), /*nullable=*/false),
636+
::arrow::field("name", ::arrow::utf8(), /*nullable=*/true),
637+
});
638+
auto expected_element_field =
639+
::arrow::field("element", expected_struct_type, /*nullable=*/true);
640+
auto expected_schema = ::arrow::schema({::arrow::field(
641+
"items", ::arrow::list(expected_element_field), /*nullable=*/true)});
642+
auto expected = ::arrow::json::ArrayFromJSONString(
643+
::arrow::struct_(expected_schema->fields()),
644+
R"([[[{"id": 1, "name": "a"}, {"id": 2, "name": "b"}]],
645+
[[{"id": 3, "name": null}]],
646+
[null],
647+
[[]]])")
648+
.ValueOrDie();
649+
ASSERT_TRUE(read_array->Equals(*expected));
650+
}
651+
465652
} // namespace iceberg::parquet

0 commit comments

Comments
 (0)