Skip to content

Commit c0fdd63

Browse files
fix applying field-ids on name mapping code
1 parent 7ddeb06 commit c0fdd63

6 files changed

Lines changed: 579 additions & 272 deletions

File tree

src/iceberg/avro/avro_reader.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,12 @@ class AvroBatchReader::Impl {
101101
if (has_id_visitor.HasNoIds()) {
102102
// Apply field IDs based on name mapping if available
103103
if (options.name_mapping) {
104+
MappedField mapped_field;
105+
// Convert NameMapping to MappedFields for nested mapping
106+
mapped_field.nested_mapping = std::make_shared<MappedFields>(options.name_mapping->AsMappedFields());
104107
ICEBERG_ASSIGN_OR_RAISE(
105108
auto new_root_node,
106-
CreateAvroNodeWithFieldIds(file_schema.root(), *options.name_mapping));
109+
CreateAvroNodeWithFieldIds(file_schema.root(), mapped_field));
107110

108111
// Create a new schema with the updated root node
109112
auto new_schema = ::avro::ValidSchema(new_root_node);

src/iceberg/avro/avro_schema_util.cc

Lines changed: 141 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -785,168 +785,171 @@ Result<SchemaProjection> Project(const Schema& expected_schema,
785785
return SchemaProjection{std::move(field_projection.children)};
786786
}
787787

788-
// Helper function to create a new Avro node with field IDs from name mapping
789-
Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
790-
const NameMapping& name_mapping) {
791-
switch (original_node->type()) {
792-
case ::avro::AVRO_RECORD:
793-
return CreateRecordNodeWithFieldIds(original_node, name_mapping);
794-
case ::avro::AVRO_ARRAY:
795-
return CreateArrayNodeWithFieldIds(original_node, name_mapping);
796-
case ::avro::AVRO_MAP:
797-
return CreateMapNodeWithFieldIds(original_node, name_mapping);
798-
case ::avro::AVRO_UNION:
799-
return CreateUnionNodeWithFieldIds(original_node, name_mapping);
800-
case ::avro::AVRO_BOOL:
801-
case ::avro::AVRO_INT:
802-
case ::avro::AVRO_LONG:
803-
case ::avro::AVRO_FLOAT:
804-
case ::avro::AVRO_DOUBLE:
805-
case ::avro::AVRO_STRING:
806-
case ::avro::AVRO_BYTES:
807-
case ::avro::AVRO_FIXED:
808-
// For primitive types, just return a copy
809-
return original_node;
810-
case ::avro::AVRO_NULL:
811-
case ::avro::AVRO_ENUM:
812-
default:
813-
return InvalidSchema("Unsupported Avro type for field ID application: {}",
814-
ToString(original_node));
815-
}
816-
}
788+
namespace {
817789

818790
Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& original_node,
819-
const NameMapping& name_mapping) {
791+
const MappedField& field) {
820792
auto new_record_node = std::make_shared<::avro::NodeRecord>();
821793
new_record_node->setName(original_node->name());
822794

795+
if (original_node->leaves() > original_node->names()) {
796+
return InvalidSchema("Node has {} leaves but only {} names", original_node->leaves(), original_node->names());
797+
}
798+
823799
for (size_t i = 0; i < original_node->leaves(); ++i) {
824800
const std::string& field_name = original_node->nameAt(i);
825801
::avro::NodePtr field_node = original_node->leafAt(i);
826802

827-
// Try to find field ID by name in the name mapping
828-
if (auto field_ref = name_mapping.Find(field_name)) {
829-
if (field_ref->get().field_id.has_value()) {
830-
// Add field ID attribute to the new node
831-
::avro::CustomAttributes attributes;
832-
attributes.addAttribute(std::string(kFieldIdProp),
833-
std::to_string(field_ref->get().field_id.value()), false);
834-
new_record_node->addCustomAttributesForField(attributes);
803+
// TODO(liuxiaoyu): Add support for case sensitivity in name matching.
804+
// Try to find nested field by name
805+
const MappedField* nested_field = nullptr;
806+
if (field.nested_mapping) {
807+
auto fields_span = field.nested_mapping->fields();
808+
for (const auto& f : fields_span) {
809+
if (f.names.find(field_name) != f.names.end()) {
810+
nested_field = &f;
811+
break;
812+
}
835813
}
814+
}
836815

837-
// Recursively apply field IDs to nested fields if they exist
838-
if (field_ref->get().nested_mapping && field_node->type() == ::avro::AVRO_RECORD) {
839-
const auto& nested_mapping = field_ref->get().nested_mapping;
840-
auto fields_span = nested_mapping->fields();
841-
std::vector<MappedField> fields_vector(fields_span.begin(), fields_span.end());
842-
auto nested_name_mapping = NameMapping::Make(std::move(fields_vector));
816+
if (nested_field) {
817+
// Check if field_id is present
818+
if (!nested_field->field_id.has_value()) {
819+
return InvalidSchema("Field ID is missing for field '{}' in nested mapping", field_name);
820+
}
843821

844-
ICEBERG_ASSIGN_OR_RAISE(
845-
auto new_nested_node,
846-
CreateAvroNodeWithFieldIds(field_node, *nested_name_mapping));
847-
new_record_node->addName(field_name);
848-
new_record_node->addLeaf(new_nested_node);
849-
} else {
850-
// Recursively apply field IDs to child nodes
851-
ICEBERG_ASSIGN_OR_RAISE(auto new_field_node,
852-
CreateAvroNodeWithFieldIds(field_node, name_mapping));
853-
new_record_node->addName(field_name);
854-
new_record_node->addLeaf(new_field_node);
822+
// Preserve existing custom attributes for this field
823+
::avro::CustomAttributes attributes;
824+
if (i < original_node->customAttributes()) {
825+
// Copy all existing attributes from the original node
826+
const auto& original_attrs = original_node->customAttributesAt(i);
827+
const auto& existing_attrs = original_attrs.attributes();
828+
for (const auto& attr_pair : existing_attrs) {
829+
attributes.addAttribute(attr_pair.first, attr_pair.second, false);
830+
}
855831
}
856-
} else {
857-
// Recursively apply field IDs to child nodes even if no mapping found
858-
ICEBERG_ASSIGN_OR_RAISE(auto new_field_node,
859-
CreateAvroNodeWithFieldIds(field_node, name_mapping));
832+
833+
// Add field ID attribute to the new node (preserving existing attributes)
834+
attributes.addAttribute(std::string(kFieldIdProp),
835+
std::to_string(nested_field->field_id.value()), false);
836+
837+
if (!attributes.attributes().empty()) {
838+
new_record_node->addCustomAttributesForField(attributes);
839+
}
840+
841+
// Recursively apply field IDs to nested fields
842+
ICEBERG_ASSIGN_OR_RAISE(auto new_nested_node,
843+
CreateAvroNodeWithFieldIds(field_node, *nested_field));
860844
new_record_node->addName(field_name);
861-
new_record_node->addLeaf(new_field_node);
845+
new_record_node->addLeaf(new_nested_node);
846+
} else {
847+
// If no nested field found, this is an error
848+
return InvalidSchema("Field '{}' not found in nested mapping", field_name);
862849
}
863850
}
864851

865852
return new_record_node;
866853
}
867854

868855
Result<::avro::NodePtr> CreateArrayNodeWithFieldIds(const ::avro::NodePtr& original_node,
869-
const NameMapping& name_mapping) {
856+
const MappedField& field) {
870857
if (original_node->leaves() != 1) {
871858
return InvalidSchema("Array type must have exactly one leaf");
872859
}
873860

874861
auto new_array_node = std::make_shared<::avro::NodeArray>();
875-
new_array_node->setName(original_node->name());
876-
new_array_node->setLogicalType(original_node->logicalType());
877-
862+
878863
// Check if this is a map represented as array
879-
if (original_node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
880-
original_node->logicalType().customLogicalType() != nullptr &&
881-
original_node->logicalType().customLogicalType()->name() == kMapLogicalType) {
864+
if (HasMapLogicalType(original_node)) {
882865
ICEBERG_ASSIGN_OR_RAISE(
883866
auto new_element_node,
884-
CreateAvroNodeWithFieldIds(original_node->leafAt(0), name_mapping));
867+
CreateAvroNodeWithFieldIds(original_node->leafAt(0), field));
885868
new_array_node->addLeaf(new_element_node);
886869
return new_array_node;
887870
}
888871

889-
// For regular arrays, try to find element field ID
890-
if (auto element_field = name_mapping.Find(std::string(kElement))) {
891-
if (element_field->get().field_id.has_value()) {
892-
::avro::CustomAttributes attributes;
893-
attributes.addAttribute(std::string(kElementIdProp),
894-
std::to_string(element_field->get().field_id.value()),
895-
false);
896-
new_array_node->addCustomAttributesForField(attributes);
872+
// For regular arrays, try to find element field ID from nested mapping
873+
const MappedField* element_field = nullptr;
874+
if (field.nested_mapping) {
875+
auto fields_span = field.nested_mapping->fields();
876+
for (const auto& f : fields_span) {
877+
if (f.names.find(std::string(kElement)) != f.names.end()) {
878+
element_field = &f;
879+
break;
880+
}
897881
}
898882
}
899883

900-
ICEBERG_ASSIGN_OR_RAISE(
901-
auto new_element_node,
902-
CreateAvroNodeWithFieldIds(original_node->leafAt(0), name_mapping));
903-
new_array_node->addLeaf(new_element_node);
884+
if (element_field) {
885+
// Check if field_id is present
886+
if (!element_field->field_id.has_value()) {
887+
return InvalidSchema("Field ID is missing for element field in array");
888+
}
889+
890+
ICEBERG_ASSIGN_OR_RAISE(
891+
auto new_element_node,
892+
CreateAvroNodeWithFieldIds(original_node->leafAt(0), *element_field));
893+
new_array_node->addLeaf(new_element_node);
894+
} else {
895+
// If no element field found, this is an error
896+
return InvalidSchema("Element field not found in nested mapping for array");
897+
}
898+
904899
return new_array_node;
905900
}
906901

907902
Result<::avro::NodePtr> CreateMapNodeWithFieldIds(const ::avro::NodePtr& original_node,
908-
const NameMapping& name_mapping) {
903+
const MappedField& field) {
909904
if (original_node->leaves() != 2) {
910905
return InvalidSchema("Map type must have exactly two leaves");
911906
}
912907

913908
auto new_map_node = std::make_shared<::avro::NodeMap>();
914-
new_map_node->setName(original_node->name());
915-
new_map_node->setLogicalType(original_node->logicalType());
916909

917-
// Try to find key and value field IDs
918-
if (auto key_field = name_mapping.Find(std::string(kKey))) {
919-
if (key_field->get().field_id.has_value()) {
920-
::avro::CustomAttributes attributes;
921-
attributes.addAttribute(std::string(kKeyIdProp),
922-
std::to_string(key_field->get().field_id.value()), false);
923-
new_map_node->addCustomAttributesForField(attributes);
910+
// Try to find key and value fields from nested mapping
911+
const MappedField* key_field = nullptr;
912+
const MappedField* value_field = nullptr;
913+
if (field.nested_mapping) {
914+
auto fields_span = field.nested_mapping->fields();
915+
for (const auto& f : fields_span) {
916+
if (f.names.find(std::string(kKey)) != f.names.end()) {
917+
key_field = &f;
918+
} else if (f.names.find(std::string(kValue)) != f.names.end()) {
919+
value_field = &f;
920+
}
924921
}
925922
}
926923

927-
if (auto value_field = name_mapping.Find(std::string(kValue))) {
928-
if (value_field->get().field_id.has_value()) {
929-
::avro::CustomAttributes attributes;
930-
attributes.addAttribute(std::string(kValueIdProp),
931-
std::to_string(value_field->get().field_id.value()), false);
932-
new_map_node->addCustomAttributesForField(attributes);
933-
}
924+
// Check if both key and value fields are found
925+
if (!key_field) {
926+
return InvalidSchema("Key field not found in nested mapping for map");
927+
}
928+
if (!value_field) {
929+
return InvalidSchema("Value field not found in nested mapping for map");
930+
}
931+
932+
// Check if field_ids are present
933+
if (!key_field->field_id.has_value()) {
934+
return InvalidSchema("Field ID is missing for key field in map");
935+
}
936+
if (!value_field->field_id.has_value()) {
937+
return InvalidSchema("Field ID is missing for value field in map");
934938
}
935939

936940
// Add key and value nodes
937-
ICEBERG_ASSIGN_OR_RAISE(auto new_key_node, CreateAvroNodeWithFieldIds(
938-
original_node->leafAt(0), name_mapping));
939-
ICEBERG_ASSIGN_OR_RAISE(
940-
auto new_value_node,
941-
CreateAvroNodeWithFieldIds(original_node->leafAt(1), name_mapping));
941+
ICEBERG_ASSIGN_OR_RAISE(auto new_key_node,
942+
CreateAvroNodeWithFieldIds(original_node->leafAt(0), *key_field));
943+
ICEBERG_ASSIGN_OR_RAISE(auto new_value_node,
944+
CreateAvroNodeWithFieldIds(original_node->leafAt(1), *value_field));
942945
new_map_node->addLeaf(new_key_node);
943946
new_map_node->addLeaf(new_value_node);
944947

945948
return new_map_node;
946949
}
947950

948951
Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& original_node,
949-
const NameMapping& name_mapping) {
952+
const MappedField& field) {
950953
if (original_node->leaves() != 2) {
951954
return InvalidSchema("Union type must have exactly two branches");
952955
}
@@ -960,15 +963,15 @@ Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& origi
960963
if (branch_0_is_null && !branch_1_is_null) {
961964
// branch_0 is null, branch_1 is not null
962965
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_1,
963-
CreateAvroNodeWithFieldIds(branch_1, name_mapping));
966+
CreateAvroNodeWithFieldIds(branch_1, field));
964967
auto new_union_node = std::make_shared<::avro::NodeUnion>();
965968
new_union_node->addLeaf(branch_0); // null branch
966969
new_union_node->addLeaf(new_branch_1);
967970
return new_union_node;
968971
} else if (!branch_0_is_null && branch_1_is_null) {
969972
// branch_0 is not null, branch_1 is null
970973
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_0,
971-
CreateAvroNodeWithFieldIds(branch_0, name_mapping));
974+
CreateAvroNodeWithFieldIds(branch_0, field));
972975
auto new_union_node = std::make_shared<::avro::NodeUnion>();
973976
new_union_node->addLeaf(new_branch_0);
974977
new_union_node->addLeaf(branch_1); // null branch
@@ -982,4 +985,36 @@ Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& origi
982985
}
983986
}
984987

985-
} // namespace iceberg::avro
988+
} // namespace
989+
990+
Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
991+
const MappedField& mapped_field) {
992+
993+
switch (original_node->type()) {
994+
case ::avro::AVRO_RECORD:
995+
return CreateRecordNodeWithFieldIds(original_node, mapped_field);
996+
case ::avro::AVRO_ARRAY:
997+
return CreateArrayNodeWithFieldIds(original_node, mapped_field);
998+
case ::avro::AVRO_MAP:
999+
return CreateMapNodeWithFieldIds(original_node, mapped_field);
1000+
case ::avro::AVRO_UNION:
1001+
return CreateUnionNodeWithFieldIds(original_node, mapped_field);
1002+
case ::avro::AVRO_BOOL:
1003+
case ::avro::AVRO_INT:
1004+
case ::avro::AVRO_LONG:
1005+
case ::avro::AVRO_FLOAT:
1006+
case ::avro::AVRO_DOUBLE:
1007+
case ::avro::AVRO_STRING:
1008+
case ::avro::AVRO_BYTES:
1009+
case ::avro::AVRO_FIXED:
1010+
// For primitive types, just return a copy
1011+
return original_node;
1012+
case ::avro::AVRO_NULL:
1013+
case ::avro::AVRO_ENUM:
1014+
default:
1015+
return InvalidSchema("Unsupported Avro type for field ID application: {}",
1016+
ToString(original_node));
1017+
}
1018+
}
1019+
1020+
} // namespace iceberg::avro

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -147,37 +147,11 @@ bool HasMapLogicalType(const ::avro::NodePtr& node);
147147

148148
/// \brief Create a new Avro node with field IDs from name mapping.
149149
/// \param original_node The original Avro node to copy.
150-
/// \param name_mapping The name mapping to apply field IDs from.
150+
/// \param mapped_field The mapped field to apply field IDs from.
151151
/// \return A new Avro node with field IDs applied, or an error.
152152
Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
153-
const NameMapping& name_mapping);
154-
155-
/// \brief Create a new Avro record node with field IDs from name mapping.
156-
/// \param original_node The original Avro record node to copy.
157-
/// \param name_mapping The name mapping to apply field IDs from.
158-
/// \return A new Avro record node with field IDs applied, or an error.
159-
Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& original_node,
160-
const NameMapping& name_mapping);
161-
162-
/// \brief Create a new Avro array node with field IDs from name mapping.
163-
/// \param original_node The original Avro array node to copy.
164-
/// \param name_mapping The name mapping to apply field IDs from.
165-
/// \return A new Avro array node with field IDs applied, or an error.
166-
Result<::avro::NodePtr> CreateArrayNodeWithFieldIds(const ::avro::NodePtr& original_node,
167-
const NameMapping& name_mapping);
168-
169-
/// \brief Create a new Avro map node with field IDs from name mapping.
170-
/// \param original_node The original Avro map node to copy.
171-
/// \param name_mapping The name mapping to apply field IDs from.
172-
/// \return A new Avro map node with field IDs applied, or an error.
173-
Result<::avro::NodePtr> CreateMapNodeWithFieldIds(const ::avro::NodePtr& original_node,
174-
const NameMapping& name_mapping);
175-
176-
/// \brief Create a new Avro union node with field IDs from name mapping.
177-
/// \param original_node The original Avro union node to copy.
178-
/// \param name_mapping The name mapping to apply field IDs from.
179-
/// \return A new Avro union node with field IDs applied, or an error.
180-
Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& original_node,
181-
const NameMapping& name_mapping);
153+
const MappedField& mapped_field);
154+
155+
182156

183157
} // namespace iceberg::avro

0 commit comments

Comments
 (0)