Skip to content

Commit 2f3ed21

Browse files
committed
implement map conversion
1 parent b355f6f commit 2f3ed21

4 files changed

Lines changed: 80 additions & 18 deletions

File tree

src/iceberg/avro/avro_data_util.cc

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <arrow/util/decimal.h>
2727
#include <avro/Generic.hh>
2828
#include <avro/Node.hh>
29+
#include <avro/NodeImpl.hh>
2930
#include <avro/Types.hh>
3031

3132
#include "iceberg/arrow/arrow_error_transform_internal.h"
@@ -102,8 +103,7 @@ Status AppendListToBuilder(const ::avro::NodePtr& avro_node,
102103
const auto& avro_array = avro_datum.value<::avro::GenericArray>();
103104

104105
auto* list_builder = internal::checked_cast<::arrow::ListBuilder*>(array_builder);
105-
ICEBERG_ARROW_RETURN_NOT_OK(list_builder->Append(
106-
/*is_valid=*/true, /*length=*/static_cast<int64_t>(avro_array.value().size())));
106+
ICEBERG_ARROW_RETURN_NOT_OK(list_builder->Append());
107107

108108
const auto& element_projection = projection.children[0];
109109
auto* value_builder = list_builder->value_builder();
@@ -123,8 +123,65 @@ Status AppendMapToBuilder(const ::avro::NodePtr& avro_node,
123123
const FieldProjection& key_projection,
124124
const FieldProjection& value_projection,
125125
const MapType& map_type, ::arrow::ArrayBuilder* array_builder) {
126-
// TODO(gangwu): support both regular map and array-based map.
127-
return NotImplemented("AppendMapToBuilder is not implemented");
126+
auto* map_builder = internal::checked_cast<::arrow::MapBuilder*>(array_builder);
127+
128+
if (avro_node->type() == ::avro::AVRO_MAP) {
129+
// Handle regular Avro map: map<string, value>
130+
const auto& avro_map = avro_datum.value<::avro::GenericMap>();
131+
const auto& map_entries = avro_map.value();
132+
133+
const auto& key_node = avro_node->leafAt(0);
134+
const auto& value_node = avro_node->leafAt(1);
135+
136+
const auto& key_field = map_type.key();
137+
const auto& value_field = map_type.value();
138+
139+
ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append());
140+
auto* key_builder = map_builder->key_builder();
141+
auto* item_builder = map_builder->item_builder();
142+
143+
for (const auto& entry : map_entries) {
144+
ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(
145+
key_node, entry.first, key_projection, key_field, key_builder));
146+
ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(
147+
value_node, entry.second, value_projection, value_field, item_builder));
148+
}
149+
150+
return {};
151+
} else if (avro_node->type() == ::avro::AVRO_ARRAY && HasMapLogicalType(avro_node)) {
152+
// Handle array-based map: list<struct<key, value>>
153+
const auto& avro_array = avro_datum.value<::avro::GenericArray>();
154+
const auto& array_entries = avro_array.value();
155+
156+
const auto& key_field = map_type.key();
157+
const auto& value_field = map_type.value();
158+
159+
ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append());
160+
auto* key_builder = map_builder->key_builder();
161+
auto* item_builder = map_builder->item_builder();
162+
163+
const auto& record_node = avro_node->leafAt(0);
164+
if (record_node->type() != ::avro::AVRO_RECORD || record_node->leaves() != 2) {
165+
return InvalidArgument(
166+
"Array-based map must contain records with exactly 2 fields, got: {}",
167+
ToString(record_node));
168+
}
169+
const auto& key_node = record_node->leafAt(0);
170+
const auto& value_node = record_node->leafAt(1);
171+
172+
for (const auto& entry : array_entries) {
173+
const auto& record = entry.value<::avro::GenericRecord>();
174+
ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(
175+
key_node, record.fieldAt(0), key_projection, key_field, key_builder));
176+
ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(
177+
value_node, record.fieldAt(1), value_projection, value_field, item_builder));
178+
}
179+
180+
return {};
181+
} else {
182+
return InvalidArgument("Expected Avro map or array with map logical type, got: {}",
183+
ToString(avro_node));
184+
}
128185
}
129186

130187
/// \brief Append nested Avro data to Arrow array builder based on type.

src/iceberg/avro/avro_schema_util.cc

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -404,12 +404,6 @@ bool HasLogicalType(const ::avro::NodePtr& node,
404404
return node->logicalType().type() == expected_type;
405405
}
406406

407-
bool HasMapLogicalType(const ::avro::NodePtr& node) {
408-
return node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
409-
node->logicalType().customLogicalType() != nullptr &&
410-
node->logicalType().customLogicalType()->name() == "map";
411-
}
412-
413407
std::optional<std::string> GetAdjustToUtc(const ::avro::NodePtr& node) {
414408
if (node->customAttributes() == 0) {
415409
return std::nullopt;
@@ -771,6 +765,12 @@ Result<FieldProjection> ProjectNested(const Type& expected_type,
771765

772766
} // namespace
773767

768+
bool HasMapLogicalType(const ::avro::NodePtr& node) {
769+
return node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
770+
node->logicalType().customLogicalType() != nullptr &&
771+
node->logicalType().customLogicalType()->name() == "map";
772+
}
773+
774774
Result<SchemaProjection> Project(const Schema& expected_schema,
775775
const ::avro::NodePtr& avro_node, bool prune_source) {
776776
ICEBERG_ASSIGN_OR_RAISE(

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,4 +139,9 @@ std::string ToString(const ::avro::NodePtr& node);
139139
std::string ToString(const ::avro::LogicalType& logical_type);
140140
std::string ToString(const ::avro::LogicalType::Type& logical_type);
141141

142+
/// \brief Check if an Avro node has a map logical type.
143+
/// \param node The Avro node to check.
144+
/// \return True if the node has a map logical type, false otherwise.
145+
bool HasMapLogicalType(const ::avro::NodePtr& node);
146+
142147
} // namespace iceberg::avro

test/avro_data_test.cc

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@
2727
#include <avro/Types.hh>
2828
#include <gmock/gmock.h>
2929
#include <gtest/gtest.h>
30-
#include <iceberg/arrow/arrow_error_transform_internal.h>
31-
#include <iceberg/avro/avro_data_util_internal.h>
32-
#include <iceberg/avro/avro_schema_util_internal.h>
33-
#include <iceberg/schema.h>
34-
#include <iceberg/schema_internal.h>
35-
#include <iceberg/schema_util.h>
36-
#include <iceberg/type.h>
37-
#include <iceberg/util/macros.h>
3830

31+
#include "iceberg/arrow/arrow_error_transform_internal.h"
32+
#include "iceberg/avro/avro_data_util_internal.h"
33+
#include "iceberg/avro/avro_schema_util_internal.h"
34+
#include "iceberg/schema.h"
35+
#include "iceberg/schema_internal.h"
36+
#include "iceberg/schema_util.h"
37+
#include "iceberg/type.h"
38+
#include "iceberg/util/macros.h"
3939
#include "matchers.h"
4040

4141
namespace iceberg::avro {

0 commit comments

Comments
 (0)