Skip to content

Commit 0643070

Browse files
fix applying field-ids on name mapping code
1 parent 9e96946 commit 0643070

File tree

6 files changed

+592
-270
lines changed

6 files changed

+592
-270
lines changed

src/iceberg/avro/avro_reader.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,13 @@ class AvroReader::Impl {
106106
if (has_id_visitor.HasNoIds()) {
107107
// Apply field IDs based on name mapping if available
108108
if (options.name_mapping) {
109+
MappedField mapped_field;
110+
// Convert NameMapping to MappedFields for nested mapping
111+
mapped_field.nested_mapping =
112+
std::make_shared<MappedFields>(options.name_mapping->AsMappedFields());
109113
ICEBERG_ASSIGN_OR_RAISE(
110114
auto new_root_node,
111-
CreateAvroNodeWithFieldIds(file_schema.root(), *options.name_mapping));
115+
CreateAvroNodeWithFieldIds(file_schema.root(), mapped_field));
112116

113117
// Create a new schema with the updated root node
114118
auto new_schema = ::avro::ValidSchema(new_root_node);

src/iceberg/avro/avro_schema_util.cc

Lines changed: 139 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -785,168 +785,173 @@ Result<SchemaProjection> Project(const Schema& expected_schema,
785785
return SchemaProjection{std::move(field_projection.children)};
786786
}
787787

788-
// Helper function to create a new Avro node with field IDs from name mapping
789-
Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
790-
const NameMapping& name_mapping) {
791-
switch (original_node->type()) {
792-
case ::avro::AVRO_RECORD:
793-
return CreateRecordNodeWithFieldIds(original_node, name_mapping);
794-
case ::avro::AVRO_ARRAY:
795-
return CreateArrayNodeWithFieldIds(original_node, name_mapping);
796-
case ::avro::AVRO_MAP:
797-
return CreateMapNodeWithFieldIds(original_node, name_mapping);
798-
case ::avro::AVRO_UNION:
799-
return CreateUnionNodeWithFieldIds(original_node, name_mapping);
800-
case ::avro::AVRO_BOOL:
801-
case ::avro::AVRO_INT:
802-
case ::avro::AVRO_LONG:
803-
case ::avro::AVRO_FLOAT:
804-
case ::avro::AVRO_DOUBLE:
805-
case ::avro::AVRO_STRING:
806-
case ::avro::AVRO_BYTES:
807-
case ::avro::AVRO_FIXED:
808-
// For primitive types, just return a copy
809-
return original_node;
810-
case ::avro::AVRO_NULL:
811-
case ::avro::AVRO_ENUM:
812-
default:
813-
return InvalidSchema("Unsupported Avro type for field ID application: {}",
814-
ToString(original_node));
815-
}
816-
}
788+
namespace {
817789

818790
Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& original_node,
819-
const NameMapping& name_mapping) {
791+
const MappedField& field) {
820792
auto new_record_node = std::make_shared<::avro::NodeRecord>();
821793
new_record_node->setName(original_node->name());
822794

795+
if (original_node->leaves() > original_node->names()) {
796+
return InvalidSchema("Node has {} leaves but only {} names", original_node->leaves(),
797+
original_node->names());
798+
}
799+
823800
for (size_t i = 0; i < original_node->leaves(); ++i) {
824801
const std::string& field_name = original_node->nameAt(i);
825802
::avro::NodePtr field_node = original_node->leafAt(i);
826803

827-
// Try to find field ID by name in the name mapping
828-
if (auto field_ref = name_mapping.Find(field_name)) {
829-
if (field_ref->get().field_id.has_value()) {
830-
// Add field ID attribute to the new node
831-
::avro::CustomAttributes attributes;
832-
attributes.addAttribute(std::string(kFieldIdProp),
833-
std::to_string(field_ref->get().field_id.value()), false);
834-
new_record_node->addCustomAttributesForField(attributes);
804+
// TODO(liuxiaoyu): Add support for case sensitivity in name matching.
805+
// Try to find nested field by name
806+
const MappedField* nested_field = nullptr;
807+
if (field.nested_mapping) {
808+
auto fields_span = field.nested_mapping->fields();
809+
for (const auto& f : fields_span) {
810+
if (f.names.find(field_name) != f.names.end()) {
811+
nested_field = &f;
812+
break;
813+
}
835814
}
815+
}
836816

837-
// Recursively apply field IDs to nested fields if they exist
838-
if (field_ref->get().nested_mapping && field_node->type() == ::avro::AVRO_RECORD) {
839-
const auto& nested_mapping = field_ref->get().nested_mapping;
840-
auto fields_span = nested_mapping->fields();
841-
std::vector<MappedField> fields_vector(fields_span.begin(), fields_span.end());
842-
auto nested_name_mapping = NameMapping::Make(std::move(fields_vector));
817+
if (nested_field) {
818+
// Check if field_id is present
819+
if (!nested_field->field_id.has_value()) {
820+
return InvalidSchema("Field ID is missing for field '{}' in nested mapping",
821+
field_name);
822+
}
843823

844-
ICEBERG_ASSIGN_OR_RAISE(
845-
auto new_nested_node,
846-
CreateAvroNodeWithFieldIds(field_node, *nested_name_mapping));
847-
new_record_node->addName(field_name);
848-
new_record_node->addLeaf(new_nested_node);
849-
} else {
850-
// Recursively apply field IDs to child nodes
851-
ICEBERG_ASSIGN_OR_RAISE(auto new_field_node,
852-
CreateAvroNodeWithFieldIds(field_node, name_mapping));
853-
new_record_node->addName(field_name);
854-
new_record_node->addLeaf(new_field_node);
824+
// Preserve existing custom attributes for this field
825+
::avro::CustomAttributes attributes;
826+
if (i < original_node->customAttributes()) {
827+
// Copy all existing attributes from the original node
828+
const auto& original_attrs = original_node->customAttributesAt(i);
829+
const auto& existing_attrs = original_attrs.attributes();
830+
for (const auto& attr_pair : existing_attrs) {
831+
attributes.addAttribute(attr_pair.first, attr_pair.second, false);
832+
}
855833
}
856-
} else {
857-
// Recursively apply field IDs to child nodes even if no mapping found
858-
ICEBERG_ASSIGN_OR_RAISE(auto new_field_node,
859-
CreateAvroNodeWithFieldIds(field_node, name_mapping));
834+
835+
// Add field ID attribute to the new node (preserving existing attributes)
836+
attributes.addAttribute(std::string(kFieldIdProp),
837+
std::to_string(nested_field->field_id.value()), false);
838+
839+
if (!attributes.attributes().empty()) {
840+
new_record_node->addCustomAttributesForField(attributes);
841+
}
842+
843+
// Recursively apply field IDs to nested fields
844+
ICEBERG_ASSIGN_OR_RAISE(auto new_nested_node,
845+
CreateAvroNodeWithFieldIds(field_node, *nested_field));
860846
new_record_node->addName(field_name);
861-
new_record_node->addLeaf(new_field_node);
847+
new_record_node->addLeaf(new_nested_node);
848+
} else {
849+
// If no nested field found, this is an error
850+
return InvalidSchema("Field '{}' not found in nested mapping", field_name);
862851
}
863852
}
864853

865854
return new_record_node;
866855
}
867856

868857
Result<::avro::NodePtr> CreateArrayNodeWithFieldIds(const ::avro::NodePtr& original_node,
869-
const NameMapping& name_mapping) {
858+
const MappedField& field) {
870859
if (original_node->leaves() != 1) {
871860
return InvalidSchema("Array type must have exactly one leaf");
872861
}
873862

874863
auto new_array_node = std::make_shared<::avro::NodeArray>();
875-
new_array_node->setName(original_node->name());
876-
new_array_node->setLogicalType(original_node->logicalType());
877864

878865
// Check if this is a map represented as array
879-
if (original_node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
880-
original_node->logicalType().customLogicalType() != nullptr &&
881-
original_node->logicalType().customLogicalType()->name() == kMapLogicalType) {
882-
ICEBERG_ASSIGN_OR_RAISE(
883-
auto new_element_node,
884-
CreateAvroNodeWithFieldIds(original_node->leafAt(0), name_mapping));
866+
if (HasMapLogicalType(original_node)) {
867+
ICEBERG_ASSIGN_OR_RAISE(auto new_element_node,
868+
CreateAvroNodeWithFieldIds(original_node->leafAt(0), field));
885869
new_array_node->addLeaf(new_element_node);
886870
return new_array_node;
887871
}
888872

889-
// For regular arrays, try to find element field ID
890-
if (auto element_field = name_mapping.Find(std::string(kElement))) {
891-
if (element_field->get().field_id.has_value()) {
892-
::avro::CustomAttributes attributes;
893-
attributes.addAttribute(std::string(kElementIdProp),
894-
std::to_string(element_field->get().field_id.value()),
895-
false);
896-
new_array_node->addCustomAttributesForField(attributes);
873+
// For regular arrays, try to find element field ID from nested mapping
874+
const MappedField* element_field = nullptr;
875+
if (field.nested_mapping) {
876+
auto fields_span = field.nested_mapping->fields();
877+
for (const auto& f : fields_span) {
878+
if (f.names.find(std::string(kElement)) != f.names.end()) {
879+
element_field = &f;
880+
break;
881+
}
897882
}
898883
}
899884

900-
ICEBERG_ASSIGN_OR_RAISE(
901-
auto new_element_node,
902-
CreateAvroNodeWithFieldIds(original_node->leafAt(0), name_mapping));
903-
new_array_node->addLeaf(new_element_node);
885+
if (element_field) {
886+
// Check if field_id is present
887+
if (!element_field->field_id.has_value()) {
888+
return InvalidSchema("Field ID is missing for element field in array");
889+
}
890+
891+
ICEBERG_ASSIGN_OR_RAISE(
892+
auto new_element_node,
893+
CreateAvroNodeWithFieldIds(original_node->leafAt(0), *element_field));
894+
new_array_node->addLeaf(new_element_node);
895+
} else {
896+
// If no element field found, this is an error
897+
return InvalidSchema("Element field not found in nested mapping for array");
898+
}
899+
904900
return new_array_node;
905901
}
906902

907903
Result<::avro::NodePtr> CreateMapNodeWithFieldIds(const ::avro::NodePtr& original_node,
908-
const NameMapping& name_mapping) {
904+
const MappedField& field) {
909905
if (original_node->leaves() != 2) {
910906
return InvalidSchema("Map type must have exactly two leaves");
911907
}
912908

913909
auto new_map_node = std::make_shared<::avro::NodeMap>();
914-
new_map_node->setName(original_node->name());
915-
new_map_node->setLogicalType(original_node->logicalType());
916910

917-
// Try to find key and value field IDs
918-
if (auto key_field = name_mapping.Find(std::string(kKey))) {
919-
if (key_field->get().field_id.has_value()) {
920-
::avro::CustomAttributes attributes;
921-
attributes.addAttribute(std::string(kKeyIdProp),
922-
std::to_string(key_field->get().field_id.value()), false);
923-
new_map_node->addCustomAttributesForField(attributes);
911+
// Try to find key and value fields from nested mapping
912+
const MappedField* key_field = nullptr;
913+
const MappedField* value_field = nullptr;
914+
if (field.nested_mapping) {
915+
auto fields_span = field.nested_mapping->fields();
916+
for (const auto& f : fields_span) {
917+
if (f.names.find(std::string(kKey)) != f.names.end()) {
918+
key_field = &f;
919+
} else if (f.names.find(std::string(kValue)) != f.names.end()) {
920+
value_field = &f;
921+
}
924922
}
925923
}
926924

927-
if (auto value_field = name_mapping.Find(std::string(kValue))) {
928-
if (value_field->get().field_id.has_value()) {
929-
::avro::CustomAttributes attributes;
930-
attributes.addAttribute(std::string(kValueIdProp),
931-
std::to_string(value_field->get().field_id.value()), false);
932-
new_map_node->addCustomAttributesForField(attributes);
933-
}
925+
// Check if both key and value fields are found
926+
if (!key_field) {
927+
return InvalidSchema("Key field not found in nested mapping for map");
928+
}
929+
if (!value_field) {
930+
return InvalidSchema("Value field not found in nested mapping for map");
931+
}
932+
933+
// Check if field_ids are present
934+
if (!key_field->field_id.has_value()) {
935+
return InvalidSchema("Field ID is missing for key field in map");
936+
}
937+
if (!value_field->field_id.has_value()) {
938+
return InvalidSchema("Field ID is missing for value field in map");
934939
}
935940

936941
// Add key and value nodes
937942
ICEBERG_ASSIGN_OR_RAISE(auto new_key_node, CreateAvroNodeWithFieldIds(
938-
original_node->leafAt(0), name_mapping));
943+
original_node->leafAt(0), *key_field));
939944
ICEBERG_ASSIGN_OR_RAISE(
940945
auto new_value_node,
941-
CreateAvroNodeWithFieldIds(original_node->leafAt(1), name_mapping));
946+
CreateAvroNodeWithFieldIds(original_node->leafAt(1), *value_field));
942947
new_map_node->addLeaf(new_key_node);
943948
new_map_node->addLeaf(new_value_node);
944949

945950
return new_map_node;
946951
}
947952

948953
Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& original_node,
949-
const NameMapping& name_mapping) {
954+
const MappedField& field) {
950955
if (original_node->leaves() != 2) {
951956
return InvalidSchema("Union type must have exactly two branches");
952957
}
@@ -960,15 +965,15 @@ Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& origi
960965
if (branch_0_is_null && !branch_1_is_null) {
961966
// branch_0 is null, branch_1 is not null
962967
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_1,
963-
CreateAvroNodeWithFieldIds(branch_1, name_mapping));
968+
CreateAvroNodeWithFieldIds(branch_1, field));
964969
auto new_union_node = std::make_shared<::avro::NodeUnion>();
965970
new_union_node->addLeaf(branch_0); // null branch
966971
new_union_node->addLeaf(new_branch_1);
967972
return new_union_node;
968973
} else if (!branch_0_is_null && branch_1_is_null) {
969974
// branch_0 is not null, branch_1 is null
970975
ICEBERG_ASSIGN_OR_RAISE(auto new_branch_0,
971-
CreateAvroNodeWithFieldIds(branch_0, name_mapping));
976+
CreateAvroNodeWithFieldIds(branch_0, field));
972977
auto new_union_node = std::make_shared<::avro::NodeUnion>();
973978
new_union_node->addLeaf(new_branch_0);
974979
new_union_node->addLeaf(branch_1); // null branch
@@ -982,4 +987,35 @@ Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& origi
982987
}
983988
}
984989

990+
} // namespace
991+
992+
Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
993+
const MappedField& mapped_field) {
994+
switch (original_node->type()) {
995+
case ::avro::AVRO_RECORD:
996+
return CreateRecordNodeWithFieldIds(original_node, mapped_field);
997+
case ::avro::AVRO_ARRAY:
998+
return CreateArrayNodeWithFieldIds(original_node, mapped_field);
999+
case ::avro::AVRO_MAP:
1000+
return CreateMapNodeWithFieldIds(original_node, mapped_field);
1001+
case ::avro::AVRO_UNION:
1002+
return CreateUnionNodeWithFieldIds(original_node, mapped_field);
1003+
case ::avro::AVRO_BOOL:
1004+
case ::avro::AVRO_INT:
1005+
case ::avro::AVRO_LONG:
1006+
case ::avro::AVRO_FLOAT:
1007+
case ::avro::AVRO_DOUBLE:
1008+
case ::avro::AVRO_STRING:
1009+
case ::avro::AVRO_BYTES:
1010+
case ::avro::AVRO_FIXED:
1011+
// For primitive types, just return a copy
1012+
return original_node;
1013+
case ::avro::AVRO_NULL:
1014+
case ::avro::AVRO_ENUM:
1015+
default:
1016+
return InvalidSchema("Unsupported Avro type for field ID application: {}",
1017+
ToString(original_node));
1018+
}
1019+
}
1020+
9851021
} // namespace iceberg::avro

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -147,37 +147,9 @@ bool HasMapLogicalType(const ::avro::NodePtr& node);
147147

148148
/// \brief Create a new Avro node with field IDs from name mapping.
149149
/// \param original_node The original Avro node to copy.
150-
/// \param name_mapping The name mapping to apply field IDs from.
150+
/// \param mapped_field The mapped field to apply field IDs from.
151151
/// \return A new Avro node with field IDs applied, or an error.
152152
Result<::avro::NodePtr> CreateAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
153-
const NameMapping& name_mapping);
154-
155-
/// \brief Create a new Avro record node with field IDs from name mapping.
156-
/// \param original_node The original Avro record node to copy.
157-
/// \param name_mapping The name mapping to apply field IDs from.
158-
/// \return A new Avro record node with field IDs applied, or an error.
159-
Result<::avro::NodePtr> CreateRecordNodeWithFieldIds(const ::avro::NodePtr& original_node,
160-
const NameMapping& name_mapping);
161-
162-
/// \brief Create a new Avro array node with field IDs from name mapping.
163-
/// \param original_node The original Avro array node to copy.
164-
/// \param name_mapping The name mapping to apply field IDs from.
165-
/// \return A new Avro array node with field IDs applied, or an error.
166-
Result<::avro::NodePtr> CreateArrayNodeWithFieldIds(const ::avro::NodePtr& original_node,
167-
const NameMapping& name_mapping);
168-
169-
/// \brief Create a new Avro map node with field IDs from name mapping.
170-
/// \param original_node The original Avro map node to copy.
171-
/// \param name_mapping The name mapping to apply field IDs from.
172-
/// \return A new Avro map node with field IDs applied, or an error.
173-
Result<::avro::NodePtr> CreateMapNodeWithFieldIds(const ::avro::NodePtr& original_node,
174-
const NameMapping& name_mapping);
175-
176-
/// \brief Create a new Avro union node with field IDs from name mapping.
177-
/// \param original_node The original Avro union node to copy.
178-
/// \param name_mapping The name mapping to apply field IDs from.
179-
/// \return A new Avro union node with field IDs applied, or an error.
180-
Result<::avro::NodePtr> CreateUnionNodeWithFieldIds(const ::avro::NodePtr& original_node,
181-
const NameMapping& name_mapping);
153+
const MappedField& mapped_field);
182154

183155
} // namespace iceberg::avro

0 commit comments

Comments
 (0)