|
30 | 30 | #include <avro/DataFile.hh> |
31 | 31 | #include <avro/Generic.hh> |
32 | 32 | #include <avro/GenericDatum.hh> |
| 33 | +#include <nlohmann/json.hpp> |
33 | 34 |
|
34 | 35 | #include "iceberg/arrow/arrow_fs_file_io.h" |
35 | 36 | #include "iceberg/avro/avro_data_util_internal.h" |
36 | 37 | #include "iceberg/avro/avro_schema_util_internal.h" |
37 | 38 | #include "iceberg/avro/avro_stream_internal.h" |
| 39 | +#include "iceberg/json_internal.h" |
| 40 | +#include "iceberg/name_mapping.h" |
38 | 41 | #include "iceberg/schema_internal.h" |
39 | 42 | #include "iceberg/util/checked_cast.h" |
40 | 43 | #include "iceberg/util/macros.h" |
@@ -96,11 +99,22 @@ class AvroBatchReader::Impl { |
96 | 99 | // Validate field ids in the file schema. |
97 | 100 | HasIdVisitor has_id_visitor; |
98 | 101 | ICEBERG_RETURN_UNEXPECTED(has_id_visitor.Visit(file_schema)); |
| 102 | + |
99 | 103 | if (has_id_visitor.HasNoIds()) { |
100 | | - // TODO(gangwu): support applying field-ids based on name mapping |
101 | | - return NotImplemented("Avro file schema has no field IDs"); |
102 | | - } |
103 | | - if (!has_id_visitor.AllHaveIds()) { |
| 104 | + // Apply field IDs based on name mapping if available |
| 105 | + auto name_mapping_iter = options.properties.find("name_mapping"); |
| 106 | + if (name_mapping_iter != options.properties.end()) { |
| 107 | + // Parse name mapping from JSON string |
| 108 | + ICEBERG_ASSIGN_OR_RAISE(auto name_mapping, |
| 109 | + iceberg::NameMappingFromJson( |
| 110 | + nlohmann::json::parse(name_mapping_iter->second))); |
| 111 | + ICEBERG_RETURN_UNEXPECTED( |
| 112 | + ApplyFieldIdsFromNameMapping(file_schema.root(), *name_mapping)); |
| 113 | + } else { |
| 114 | + return NotImplemented( |
| 115 | + "Avro file schema has no field IDs and no name mapping provided"); |
| 116 | + } |
| 117 | + } else if (!has_id_visitor.AllHaveIds()) { |
104 | 118 | return InvalidSchema("Not all fields in the Avro file schema have field IDs"); |
105 | 119 | } |
106 | 120 |
|
@@ -195,6 +209,147 @@ class AvroBatchReader::Impl { |
195 | 209 | return arrow_array; |
196 | 210 | } |
197 | 211 |
|
| 212 | + // Apply field IDs to Avro schema nodes based on name mapping |
| 213 | + Status ApplyFieldIdsFromNameMapping(const ::avro::NodePtr& node, |
| 214 | + const NameMapping& name_mapping) { |
| 215 | + switch (node->type()) { |
| 216 | + case ::avro::AVRO_RECORD: |
| 217 | + return ApplyFieldIdsToRecord(node, name_mapping); |
| 218 | + case ::avro::AVRO_ARRAY: |
| 219 | + return ApplyFieldIdsToArray(node, name_mapping); |
| 220 | + case ::avro::AVRO_MAP: |
| 221 | + return ApplyFieldIdsToMap(node, name_mapping); |
| 222 | + case ::avro::AVRO_UNION: |
| 223 | + return ApplyFieldIdsToUnion(node, name_mapping); |
| 224 | + case ::avro::AVRO_BOOL: |
| 225 | + case ::avro::AVRO_INT: |
| 226 | + case ::avro::AVRO_LONG: |
| 227 | + case ::avro::AVRO_FLOAT: |
| 228 | + case ::avro::AVRO_DOUBLE: |
| 229 | + case ::avro::AVRO_STRING: |
| 230 | + case ::avro::AVRO_BYTES: |
| 231 | + case ::avro::AVRO_FIXED: |
| 232 | + return {}; |
| 233 | + case ::avro::AVRO_NULL: |
| 234 | + case ::avro::AVRO_ENUM: |
| 235 | + default: |
| 236 | + return InvalidSchema("Unsupported Avro type for field ID application: {}", |
| 237 | + static_cast<int>(node->type())); |
| 238 | + } |
| 239 | + } |
| 240 | + |
| 241 | + Status ApplyFieldIdsToRecord(const ::avro::NodePtr& node, |
| 242 | + const NameMapping& name_mapping) { |
| 243 | + for (size_t i = 0; i < node->leaves(); ++i) { |
| 244 | + const std::string& field_name = node->nameAt(i); |
| 245 | + ::avro::NodePtr field_node = node->leafAt(i); |
| 246 | + |
| 247 | + // Try to find field ID by name in the name mapping |
| 248 | + if (auto field_ref = name_mapping.Find(field_name)) { |
| 249 | + if (field_ref->get().field_id.has_value()) { |
| 250 | + // Add field ID attribute to the node |
| 251 | + ::avro::CustomAttributes attributes; |
| 252 | + attributes.addAttribute( |
| 253 | + "field-id", std::to_string(field_ref->get().field_id.value()), false); |
| 254 | + node->addCustomAttributesForField(attributes); |
| 255 | + } |
| 256 | + |
| 257 | + // Recursively apply field IDs to nested fields if they exist |
| 258 | + if (field_ref->get().nested_mapping && |
| 259 | + field_node->type() == ::avro::AVRO_RECORD) { |
| 260 | + const auto& nested_mapping = field_ref->get().nested_mapping; |
| 261 | + auto fields_span = nested_mapping->fields(); |
| 262 | + std::vector<MappedField> fields_vector(fields_span.begin(), fields_span.end()); |
| 263 | + auto nested_name_mapping = NameMapping::Make(std::move(fields_vector)); |
| 264 | + ICEBERG_RETURN_UNEXPECTED( |
| 265 | + ApplyFieldIdsFromNameMapping(field_node, *nested_name_mapping)); |
| 266 | + } else { |
| 267 | + // Recursively apply field IDs to child nodes (only if not already handled by |
| 268 | + // nested mapping) |
| 269 | + ICEBERG_RETURN_UNEXPECTED( |
| 270 | + ApplyFieldIdsFromNameMapping(field_node, name_mapping)); |
| 271 | + } |
| 272 | + } else { |
| 273 | + // Recursively apply field IDs to child nodes even if no mapping found |
| 274 | + ICEBERG_RETURN_UNEXPECTED(ApplyFieldIdsFromNameMapping(field_node, name_mapping)); |
| 275 | + } |
| 276 | + } |
| 277 | + return {}; |
| 278 | + } |
| 279 | + |
| 280 | + Status ApplyFieldIdsToArray(const ::avro::NodePtr& node, |
| 281 | + const NameMapping& name_mapping) { |
| 282 | + if (node->leaves() != 1) { |
| 283 | + return InvalidSchema("Array type must have exactly one leaf"); |
| 284 | + } |
| 285 | + |
| 286 | + // Check if this is a map represented as array |
| 287 | + if (node->logicalType().type() == ::avro::LogicalType::CUSTOM && |
| 288 | + node->logicalType().customLogicalType() != nullptr && |
| 289 | + node->logicalType().customLogicalType()->name() == "map") { |
| 290 | + return ApplyFieldIdsFromNameMapping(node->leafAt(0), name_mapping); |
| 291 | + } |
| 292 | + |
| 293 | + // For regular arrays, try to find element field ID |
| 294 | + if (auto element_field = name_mapping.Find("element")) { |
| 295 | + if (element_field->get().field_id.has_value()) { |
| 296 | + ::avro::CustomAttributes attributes; |
| 297 | + attributes.addAttribute( |
| 298 | + "element-id", std::to_string(element_field->get().field_id.value()), false); |
| 299 | + node->addCustomAttributesForField(attributes); |
| 300 | + } |
| 301 | + } |
| 302 | + |
| 303 | + return ApplyFieldIdsFromNameMapping(node->leafAt(0), name_mapping); |
| 304 | + } |
| 305 | + |
| 306 | + Status ApplyFieldIdsToMap(const ::avro::NodePtr& node, |
| 307 | + const NameMapping& name_mapping) { |
| 308 | + if (node->leaves() != 2) { |
| 309 | + return InvalidSchema("Map type must have exactly two leaves"); |
| 310 | + } |
| 311 | + |
| 312 | + // Try to find key and value field IDs |
| 313 | + if (auto key_field = name_mapping.Find("key")) { |
| 314 | + if (key_field->get().field_id.has_value()) { |
| 315 | + ::avro::CustomAttributes attributes; |
| 316 | + attributes.addAttribute("key-id", |
| 317 | + std::to_string(key_field->get().field_id.value()), false); |
| 318 | + node->addCustomAttributesForField(attributes); |
| 319 | + } |
| 320 | + } |
| 321 | + |
| 322 | + if (auto value_field = name_mapping.Find("value")) { |
| 323 | + if (value_field->get().field_id.has_value()) { |
| 324 | + ::avro::CustomAttributes attributes; |
| 325 | + attributes.addAttribute( |
| 326 | + "value-id", std::to_string(value_field->get().field_id.value()), false); |
| 327 | + node->addCustomAttributesForField(attributes); |
| 328 | + } |
| 329 | + } |
| 330 | + |
| 331 | + return ApplyFieldIdsFromNameMapping(node->leafAt(1), name_mapping); |
| 332 | + } |
| 333 | + |
| 334 | + Status ApplyFieldIdsToUnion(const ::avro::NodePtr& node, |
| 335 | + const NameMapping& name_mapping) { |
| 336 | + if (node->leaves() != 2) { |
| 337 | + return InvalidSchema("Union type must have exactly two branches"); |
| 338 | + } |
| 339 | + |
| 340 | + const auto& branch_0 = node->leafAt(0); |
| 341 | + const auto& branch_1 = node->leafAt(1); |
| 342 | + |
| 343 | + if (branch_0->type() == ::avro::AVRO_NULL) { |
| 344 | + return ApplyFieldIdsFromNameMapping(branch_1, name_mapping); |
| 345 | + } |
| 346 | + if (branch_1->type() == ::avro::AVRO_NULL) { |
| 347 | + return ApplyFieldIdsFromNameMapping(branch_0, name_mapping); |
| 348 | + } |
| 349 | + |
| 350 | + return InvalidSchema("Union type must have exactly one null branch"); |
| 351 | + } |
| 352 | + |
198 | 353 | private: |
199 | 354 | // Max number of rows in the record batch to read. |
200 | 355 | int64_t batch_size_{}; |
|
0 commit comments