Skip to content

Commit deec370

Browse files
authored
feat: Add support for Arrow LargeListArray in Parquet data projection (#510)
## Summary Add support for projecting Arrow `LargeListArray` (64-bit offsets) in addition to the existing `ListArray` (32-bit offsets) support. This enables handling of lists with more than 2^31-1 total child elements. ## Changes - Add templated `ProjectListArrayImpl<>` function for code reuse between list types - Add `ProjectLargeListArray` wrapper function for 64-bit offset lists - Update `ProjectNestedArray` to dispatch to appropriate handler based on Arrow type - Add test cases for `LargeListArray` projection ## Test Results ``` [ RUN ] ProjectRecordBatchTest.LargeListOfIntegers [ OK ] ProjectRecordBatchTest.LargeListOfIntegers (0 ms) [ RUN ] ProjectRecordBatchTest.LargeListOfStructs [ OK ] ProjectRecordBatchTest.LargeListOfStructs (0 ms) ``` All 59 tests pass. ## Test Plan - [x] Added `LargeListOfIntegers` test - verifies projection of `LargeListArray<int32>` - [x] Added `LargeListOfStructs` test - verifies projection of `LargeListArray` with nested struct elements - [x] Verified all existing tests still pass Closes #502
1 parent b937acb commit deec370

File tree

2 files changed

+203
-14
lines changed

2 files changed

+203
-14
lines changed

src/iceberg/parquet/parquet_data_util.cc

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,13 @@ Result<std::shared_ptr<::arrow::Array>> ProjectStructArray(
148148
return output_array;
149149
}
150150

151-
/// FIXME: Support ::arrow::LargeListArray.
152-
Result<std::shared_ptr<::arrow::Array>> ProjectListArray(
153-
const std::shared_ptr<::arrow::ListArray>& list_array,
154-
const std::shared_ptr<::arrow::ListType>& output_list_type, const ListType& list_type,
151+
/// Templated implementation for projecting list arrays.
152+
/// Works with both ListArray/ListType (32-bit offsets) and
153+
/// LargeListArray/LargeListType (64-bit offsets).
154+
template <typename ArrowListArrayType, typename ArrowListType>
155+
Result<std::shared_ptr<::arrow::Array>> ProjectListArrayImpl(
156+
const std::shared_ptr<ArrowListArrayType>& list_array,
157+
const std::shared_ptr<ArrowListType>& output_list_type, const ListType& list_type,
155158
std::span<const FieldProjection> projections,
156159
const arrow::MetadataColumnContext& metadata_context, ::arrow::MemoryPool* pool) {
157160
if (projections.size() != 1) {
@@ -176,12 +179,30 @@ Result<std::shared_ptr<::arrow::Array>> ProjectListArray(
176179
ProjectPrimitiveArray(list_array->values(), output_element_type, pool));
177180
}
178181

179-
return std::make_shared<::arrow::ListArray>(
182+
return std::make_shared<ArrowListArrayType>(
180183
output_list_type, list_array->length(), list_array->value_offsets(),
181184
std::move(projected_values), list_array->null_bitmap(), list_array->null_count(),
182185
list_array->offset());
183186
}
184187

188+
Result<std::shared_ptr<::arrow::Array>> ProjectListArray(
189+
const std::shared_ptr<::arrow::ListArray>& list_array,
190+
const std::shared_ptr<::arrow::ListType>& output_list_type, const ListType& list_type,
191+
std::span<const FieldProjection> projections,
192+
const arrow::MetadataColumnContext& metadata_context, ::arrow::MemoryPool* pool) {
193+
return ProjectListArrayImpl(list_array, output_list_type, list_type, projections,
194+
metadata_context, pool);
195+
}
196+
197+
Result<std::shared_ptr<::arrow::Array>> ProjectLargeListArray(
198+
const std::shared_ptr<::arrow::LargeListArray>& list_array,
199+
const std::shared_ptr<::arrow::LargeListType>& output_list_type,
200+
const ListType& list_type, std::span<const FieldProjection> projections,
201+
const arrow::MetadataColumnContext& metadata_context, ::arrow::MemoryPool* pool) {
202+
return ProjectListArrayImpl(list_array, output_list_type, list_type, projections,
203+
metadata_context, pool);
204+
}
205+
185206
Result<std::shared_ptr<::arrow::Array>> ProjectMapArray(
186207
const std::shared_ptr<::arrow::MapArray>& map_array,
187208
const std::shared_ptr<::arrow::MapType>& output_map_type, const MapType& map_type,
@@ -249,17 +270,26 @@ Result<std::shared_ptr<::arrow::Array>> ProjectNestedArray(
249270
projections, metadata_context, pool);
250271
}
251272
case TypeId::kList: {
252-
if (output_arrow_type->id() != ::arrow::Type::LIST) {
253-
return InvalidSchema("Expected list type, got: {}",
254-
output_arrow_type->ToString());
273+
const auto& list_type = internal::checked_cast<const ListType&>(nested_type);
274+
275+
if (output_arrow_type->id() == ::arrow::Type::LIST) {
276+
auto list_array = internal::checked_pointer_cast<::arrow::ListArray>(array);
277+
auto output_list_type =
278+
internal::checked_pointer_cast<::arrow::ListType>(output_arrow_type);
279+
return ProjectListArray(list_array, output_list_type, list_type, projections,
280+
metadata_context, pool);
255281
}
256282

257-
auto list_array = internal::checked_pointer_cast<::arrow::ListArray>(array);
258-
auto output_list_type =
259-
internal::checked_pointer_cast<::arrow::ListType>(output_arrow_type);
260-
const auto& list_type = internal::checked_cast<const ListType&>(nested_type);
261-
return ProjectListArray(list_array, output_list_type, list_type, projections,
262-
metadata_context, pool);
283+
if (output_arrow_type->id() == ::arrow::Type::LARGE_LIST) {
284+
auto list_array = internal::checked_pointer_cast<::arrow::LargeListArray>(array);
285+
auto output_list_type =
286+
internal::checked_pointer_cast<::arrow::LargeListType>(output_arrow_type);
287+
return ProjectLargeListArray(list_array, output_list_type, list_type, projections,
288+
metadata_context, pool);
289+
}
290+
291+
return InvalidSchema("Expected list or large_list type, got: {}",
292+
output_arrow_type->ToString());
263293
}
264294
case TypeId::kMap: {
265295
if (output_arrow_type->id() != ::arrow::Type::MAP) {

src/iceberg/test/parquet_data_test.cc

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919

2020
#include <arrow/array.h>
21+
#include <arrow/array/builder_binary.h>
22+
#include <arrow/array/builder_nested.h>
23+
#include <arrow/array/builder_primitive.h>
2124
#include <arrow/c/bridge.h>
2225
#include <arrow/json/from_string.h>
2326
#include <arrow/record_batch.h>
@@ -503,4 +506,160 @@ TEST(ProjectRecordBatchTest, EmptyRecordBatch) {
503506
VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, input_json));
504507
}
505508

509+
TEST(ProjectRecordBatchTest, LargeListOfIntegers) {
510+
// Create a LargeListArray manually (JSON parsing creates regular ListArray)
511+
auto value_builder = std::make_shared<::arrow::Int32Builder>();
512+
::arrow::LargeListBuilder list_builder(::arrow::default_memory_pool(), value_builder,
513+
::arrow::large_list(::arrow::int32()));
514+
515+
// Build: [[1, 2, 3], [4, 5]]
516+
ASSERT_TRUE(list_builder.Append().ok());
517+
ASSERT_TRUE(value_builder->Append(1).ok());
518+
ASSERT_TRUE(value_builder->Append(2).ok());
519+
ASSERT_TRUE(value_builder->Append(3).ok());
520+
521+
ASSERT_TRUE(list_builder.Append().ok());
522+
ASSERT_TRUE(value_builder->Append(4).ok());
523+
ASSERT_TRUE(value_builder->Append(5).ok());
524+
525+
auto large_list_array_result = list_builder.Finish();
526+
ASSERT_TRUE(large_list_array_result.ok());
527+
auto large_list_array = large_list_array_result.ValueOrDie();
528+
529+
// Create input record batch with the LargeListArray
530+
auto input_arrow_schema =
531+
::arrow::schema({::arrow::field("numbers", ::arrow::large_list(::arrow::int32()))});
532+
auto input_record_batch =
533+
::arrow::RecordBatch::Make(input_arrow_schema, 2, {large_list_array});
534+
535+
// Create Iceberg schema (uses ListType which maps to both LIST and LARGE_LIST)
536+
Schema iceberg_schema({
537+
SchemaField::MakeRequired(
538+
1, "numbers",
539+
std::make_shared<ListType>(SchemaField::MakeRequired(2, "element", int32()))),
540+
});
541+
542+
// Create schema projection
543+
auto schema_projection_result =
544+
Project(iceberg_schema, iceberg_schema, /*prune_source=*/false);
545+
ASSERT_THAT(schema_projection_result, IsOk());
546+
auto schema_projection = std::move(schema_projection_result.value());
547+
548+
// Create output Arrow schema with LargeListType
549+
auto output_arrow_schema =
550+
::arrow::schema({::arrow::field("numbers", ::arrow::large_list(::arrow::int32()))});
551+
552+
// Project the record batch
553+
auto project_result = ProjectRecordBatch(
554+
input_record_batch, output_arrow_schema, iceberg_schema, schema_projection,
555+
/*metadata_context=*/{}, ::arrow::default_memory_pool());
556+
ASSERT_THAT(project_result, IsOk());
557+
auto projected_record_batch = std::move(project_result.value());
558+
559+
// Verify the result is a LargeListArray
560+
ASSERT_EQ(projected_record_batch->num_columns(), 1);
561+
ASSERT_EQ(projected_record_batch->column(0)->type()->id(), ::arrow::Type::LARGE_LIST);
562+
563+
// Verify the values
564+
auto projected_array = std::static_pointer_cast<::arrow::LargeListArray>(
565+
projected_record_batch->column(0));
566+
ASSERT_EQ(projected_array->length(), 2);
567+
568+
// First list: [1, 2, 3]
569+
auto first_list = projected_array->value_slice(0);
570+
ASSERT_EQ(first_list->length(), 3);
571+
auto first_values = std::static_pointer_cast<::arrow::Int32Array>(first_list);
572+
EXPECT_EQ(first_values->Value(0), 1);
573+
EXPECT_EQ(first_values->Value(1), 2);
574+
EXPECT_EQ(first_values->Value(2), 3);
575+
576+
// Second list: [4, 5]
577+
auto second_list = projected_array->value_slice(1);
578+
ASSERT_EQ(second_list->length(), 2);
579+
auto second_values = std::static_pointer_cast<::arrow::Int32Array>(second_list);
580+
EXPECT_EQ(second_values->Value(0), 4);
581+
EXPECT_EQ(second_values->Value(1), 5);
582+
}
583+
584+
TEST(ProjectRecordBatchTest, LargeListOfStructs) {
585+
// Create a LargeListArray with struct elements
586+
auto name_builder = std::make_shared<::arrow::StringBuilder>();
587+
auto age_builder = std::make_shared<::arrow::Int32Builder>();
588+
std::vector<std::shared_ptr<::arrow::ArrayBuilder>> field_builders = {name_builder,
589+
age_builder};
590+
auto struct_type = ::arrow::struct_(
591+
{::arrow::field("name", ::arrow::utf8()), ::arrow::field("age", ::arrow::int32())});
592+
auto struct_builder = std::make_shared<::arrow::StructBuilder>(
593+
struct_type, ::arrow::default_memory_pool(), field_builders);
594+
595+
::arrow::LargeListBuilder list_builder(::arrow::default_memory_pool(), struct_builder,
596+
::arrow::large_list(struct_type));
597+
598+
// Build: [[{name: "Alice", age: 30}], [{name: "Bob", age: 25}, {name: "Carol", age:
599+
// 35}]]
600+
ASSERT_TRUE(list_builder.Append().ok());
601+
ASSERT_TRUE(struct_builder->Append().ok());
602+
ASSERT_TRUE(name_builder->Append("Alice").ok());
603+
ASSERT_TRUE(age_builder->Append(30).ok());
604+
605+
ASSERT_TRUE(list_builder.Append().ok());
606+
ASSERT_TRUE(struct_builder->Append().ok());
607+
ASSERT_TRUE(name_builder->Append("Bob").ok());
608+
ASSERT_TRUE(age_builder->Append(25).ok());
609+
ASSERT_TRUE(struct_builder->Append().ok());
610+
ASSERT_TRUE(name_builder->Append("Carol").ok());
611+
ASSERT_TRUE(age_builder->Append(35).ok());
612+
613+
auto large_list_array_result = list_builder.Finish();
614+
ASSERT_TRUE(large_list_array_result.ok());
615+
auto large_list_array = large_list_array_result.ValueOrDie();
616+
617+
// Create input record batch
618+
auto input_arrow_schema =
619+
::arrow::schema({::arrow::field("people", ::arrow::large_list(struct_type))});
620+
auto input_record_batch =
621+
::arrow::RecordBatch::Make(input_arrow_schema, 2, {large_list_array});
622+
623+
// Create Iceberg schema
624+
Schema iceberg_schema({
625+
SchemaField::MakeRequired(1, "people",
626+
std::make_shared<ListType>(SchemaField::MakeRequired(
627+
2, "element",
628+
std::make_shared<StructType>(std::vector<SchemaField>{
629+
SchemaField::MakeRequired(3, "name", string()),
630+
SchemaField::MakeRequired(4, "age", int32()),
631+
})))),
632+
});
633+
634+
// Create schema projection
635+
auto schema_projection_result =
636+
Project(iceberg_schema, iceberg_schema, /*prune_source=*/false);
637+
ASSERT_THAT(schema_projection_result, IsOk());
638+
auto schema_projection = std::move(schema_projection_result.value());
639+
640+
// Create output Arrow schema with LargeListType
641+
auto output_arrow_schema =
642+
::arrow::schema({::arrow::field("people", ::arrow::large_list(struct_type))});
643+
644+
// Project the record batch
645+
auto project_result = ProjectRecordBatch(
646+
input_record_batch, output_arrow_schema, iceberg_schema, schema_projection,
647+
/*metadata_context=*/{}, ::arrow::default_memory_pool());
648+
ASSERT_THAT(project_result, IsOk());
649+
auto projected_record_batch = std::move(project_result.value());
650+
651+
// Verify the result is a LargeListArray
652+
ASSERT_EQ(projected_record_batch->num_columns(), 1);
653+
ASSERT_EQ(projected_record_batch->column(0)->type()->id(), ::arrow::Type::LARGE_LIST);
654+
655+
auto projected_array = std::static_pointer_cast<::arrow::LargeListArray>(
656+
projected_record_batch->column(0));
657+
ASSERT_EQ(projected_array->length(), 2);
658+
659+
// Verify first list has 1 element
660+
EXPECT_EQ(projected_array->value_length(0), 1);
661+
// Verify second list has 2 elements
662+
EXPECT_EQ(projected_array->value_length(1), 2);
663+
}
664+
506665
} // namespace iceberg::parquet

0 commit comments

Comments
 (0)