-
Notifications
You must be signed in to change notification settings - Fork 100
feat(avro): extract avro datum from arrow array #166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,8 @@ | |
| * under the License. | ||
| */ | ||
|
|
||
| #include <ranges> | ||
|
|
||
| #include <arrow/array/builder_binary.h> | ||
| #include <arrow/array/builder_decimal.h> | ||
| #include <arrow/array/builder_nested.h> | ||
|
|
@@ -451,4 +453,217 @@ Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node, | |
| projected_schema, array_builder); | ||
| } | ||
|
|
||
| namespace { | ||
|
|
||
| // ToAvroNodeVisitor uses 0 for null branch and 1 for value branch. | ||
| constexpr int64_t kNullBranch = 0; | ||
| constexpr int64_t kValueBranch = 1; | ||
|
|
||
| } // namespace | ||
|
|
||
| Status ExtractDatumFromArray(const ::arrow::Array& array, int64_t index, | ||
| ::avro::GenericDatum* datum) { | ||
| if (index < 0 || index >= array.length()) { | ||
| return InvalidArgument("Cannot extract datum from array at index {} of length {}", | ||
| index, array.length()); | ||
| } | ||
|
|
||
| if (array.IsNull(index)) { | ||
| if (!datum->isUnion()) [[unlikely]] { | ||
| return InvalidSchema("Cannot extract null to non-union type: {}", | ||
| ::avro::toString(datum->type())); | ||
| } | ||
| datum->selectBranch(kNullBranch); | ||
| return {}; | ||
| } | ||
|
|
||
| if (datum->isUnion()) { | ||
| datum->selectBranch(kValueBranch); | ||
| } | ||
|
|
||
| switch (array.type()->id()) { | ||
| case ::arrow::Type::BOOL: { | ||
| const auto& bool_array = | ||
| internal::checked_cast<const ::arrow::BooleanArray&>(array); | ||
| datum->value<bool>() = bool_array.Value(index); | ||
| return {}; | ||
| } | ||
|
|
||
| case ::arrow::Type::INT32: { | ||
| const auto& int32_array = internal::checked_cast<const ::arrow::Int32Array&>(array); | ||
| datum->value<int32_t>() = int32_array.Value(index); | ||
| return {}; | ||
| } | ||
|
|
||
| case ::arrow::Type::INT64: { | ||
| const auto& int64_array = internal::checked_cast<const ::arrow::Int64Array&>(array); | ||
| datum->value<int64_t>() = int64_array.Value(index); | ||
| return {}; | ||
| } | ||
|
|
||
| case ::arrow::Type::FLOAT: { | ||
| const auto& float_array = internal::checked_cast<const ::arrow::FloatArray&>(array); | ||
| datum->value<float>() = float_array.Value(index); | ||
| return {}; | ||
| } | ||
|
|
||
| case ::arrow::Type::DOUBLE: { | ||
| const auto& double_array = | ||
| internal::checked_cast<const ::arrow::DoubleArray&>(array); | ||
| datum->value<double>() = double_array.Value(index); | ||
| return {}; | ||
| } | ||
|
|
||
| case ::arrow::Type::STRING: { | ||
|
wgtmac marked this conversation as resolved.
|
||
| const auto& string_array = | ||
| internal::checked_cast<const ::arrow::StringArray&>(array); | ||
| datum->value<std::string>() = string_array.GetString(index); | ||
| return {}; | ||
| } | ||
|
|
||
| case ::arrow::Type::BINARY: { | ||
|
wgtmac marked this conversation as resolved.
|
||
| const auto& binary_array = | ||
| internal::checked_cast<const ::arrow::BinaryArray&>(array); | ||
| std::string_view value = binary_array.GetView(index); | ||
| datum->value<std::vector<uint8_t>>().assign( | ||
| reinterpret_cast<const uint8_t*>(value.data()), | ||
| reinterpret_cast<const uint8_t*>(value.data()) + value.size()); | ||
| return {}; | ||
| } | ||
|
|
||
| case ::arrow::Type::FIXED_SIZE_BINARY: { | ||
| const auto& fixed_array = | ||
| internal::checked_cast<const ::arrow::FixedSizeBinaryArray&>(array); | ||
| std::string_view value = fixed_array.GetView(index); | ||
| auto& fixed_datum = datum->value<::avro::GenericFixed>(); | ||
| fixed_datum.value().assign( | ||
| reinterpret_cast<const char*>(value.data()), | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. GenericFixed holds std::vector<uint8_t>, so cast to const uint8_t*?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sense. Fixed. |
||
| reinterpret_cast<const char*>(value.data()) + value.size()); | ||
| return {}; | ||
| } | ||
|
|
||
| case ::arrow::Type::DECIMAL128: { | ||
| const auto& decimal_array = | ||
| internal::checked_cast<const ::arrow::Decimal128Array&>(array); | ||
| std::string_view decimal_value = decimal_array.GetView(index); | ||
| auto& fixed_datum = datum->value<::avro::GenericFixed>(); | ||
| auto& bytes = fixed_datum.value(); | ||
| bytes.assign(decimal_value.begin(), decimal_value.end()); | ||
| std::ranges::reverse(bytes); | ||
| return {}; | ||
| } | ||
|
|
||
| case ::arrow::Type::DATE32: { | ||
| const auto& date_array = internal::checked_cast<const ::arrow::Date32Array&>(array); | ||
| datum->value<int32_t>() = date_array.Value(index); | ||
| return {}; | ||
| } | ||
|
|
||
| case ::arrow::Type::TIME64: { | ||
| const auto& time_array = internal::checked_cast<const ::arrow::Time64Array&>(array); | ||
| datum->value<int64_t>() = time_array.Value(index); | ||
| return {}; | ||
| } | ||
|
|
||
| // For both timestamp and timestamp_tz with time unit as microsecond. | ||
| case ::arrow::Type::TIMESTAMP: { | ||
| const auto& timestamp_array = | ||
| internal::checked_cast<const ::arrow::TimestampArray&>(array); | ||
| datum->value<int64_t>() = timestamp_array.Value(index); | ||
| return {}; | ||
| } | ||
|
|
||
| // TODO(gangwu): support uuid type. | ||
|
|
||
| case ::arrow::Type::STRUCT: { | ||
| const auto& struct_array = | ||
| internal::checked_cast<const ::arrow::StructArray&>(array); | ||
| auto& record = datum->value<::avro::GenericRecord>(); | ||
| for (int i = 0; i < struct_array.num_fields(); ++i) { | ||
| ICEBERG_RETURN_UNEXPECTED( | ||
| ExtractDatumFromArray(*struct_array.field(i), index, &record.fieldAt(i))); | ||
| } | ||
| return {}; | ||
| } | ||
|
|
||
| // TODO(gangwu): support LARGE_LIST. | ||
| case ::arrow::Type::LIST: { | ||
| const auto& list_array = internal::checked_cast<const ::arrow::ListArray&>(array); | ||
| auto& avro_array = datum->value<::avro::GenericArray>(); | ||
| auto& elements = avro_array.value(); | ||
|
|
||
| auto start = list_array.value_offset(index); | ||
| auto end = list_array.value_offset(index + 1); | ||
| auto length = end - start; | ||
|
|
||
| auto values = list_array.values(); | ||
| elements.resize(length, ::avro::GenericDatum(avro_array.schema()->leafAt(0))); | ||
|
|
||
| for (int64_t i = 0; i < length; ++i) { | ||
| ICEBERG_RETURN_UNEXPECTED( | ||
| ExtractDatumFromArray(*values, start + i, &elements[i])); | ||
| } | ||
| return {}; | ||
| } | ||
|
|
||
| case ::arrow::Type::MAP: { | ||
| const auto& map_array = internal::checked_cast<const ::arrow::MapArray&>(array); | ||
| auto start = map_array.value_offset(index); | ||
| auto end = map_array.value_offset(index + 1); | ||
| auto length = end - start; | ||
|
|
||
| auto keys = map_array.keys(); | ||
| auto items = map_array.items(); | ||
|
|
||
| if (datum->type() == ::avro::AVRO_MAP) { | ||
| // Handle regular Avro map | ||
| auto& avro_map = datum->value<::avro::GenericMap>(); | ||
| auto value_node = avro_map.schema()->leafAt(1); | ||
|
|
||
| auto& map_entries = avro_map.value(); | ||
| map_entries.resize( | ||
| length, std::make_pair(std::string(), ::avro::GenericDatum(value_node))); | ||
|
|
||
| const auto& key_array = | ||
| internal::checked_cast<const ::arrow::StringArray&>(*keys); | ||
|
|
||
| for (int64_t i = 0; i < length; ++i) { | ||
| auto& map_entry = map_entries[i]; | ||
| map_entry.first = key_array.GetString(start + i); | ||
| ICEBERG_RETURN_UNEXPECTED( | ||
| ExtractDatumFromArray(*items, start + i, &map_entry.second)); | ||
| } | ||
| } else if (datum->type() == ::avro::AVRO_ARRAY) { | ||
| // Handle array-based map (list<struct<key, value>>) | ||
| auto& avro_array = datum->value<::avro::GenericArray>(); | ||
| auto record_node = avro_array.schema()->leafAt(0); | ||
| if (record_node->type() != ::avro::AVRO_RECORD || record_node->leaves() != 2) { | ||
| return InvalidArgument( | ||
| "Expected Avro record with 2 fields for map value, got: {}", | ||
| ToString(record_node)); | ||
| } | ||
|
|
||
| auto& elements = avro_array.value(); | ||
| elements.resize(length, ::avro::GenericDatum(record_node)); | ||
|
|
||
| for (int64_t i = 0; i < length; ++i) { | ||
| auto& record = elements[i].value<::avro::GenericRecord>(); | ||
| ICEBERG_RETURN_UNEXPECTED( | ||
| ExtractDatumFromArray(*keys, start + i, &record.fieldAt(0))); | ||
| ICEBERG_RETURN_UNEXPECTED( | ||
| ExtractDatumFromArray(*items, start + i, &record.fieldAt(1))); | ||
| } | ||
| } else { | ||
| return InvalidArgument("Unsupported Avro type for map: {}", | ||
| static_cast<int>(datum->type())); | ||
| } | ||
| return {}; | ||
| } | ||
|
|
||
| default: | ||
| return InvalidArgument("Unsupported Arrow array type: {}", | ||
| array.type()->ToString()); | ||
| } | ||
| } | ||
|
|
||
| } // namespace iceberg::avro | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check if the GenericDatum type matches the arrow::Array type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is an internal API used by the Avro writer. Schemas of GenericDatum, ArrowArray and Iceberg should be consistent. The GenericDatum is reused so we don't want to check it repeatedly.