Skip to content

Commit 9e96946

Browse files
Refactor the applying field-ids on name mapping code.
1 parent 1f8eef7 commit 9e96946

File tree

6 files changed

+392
-357
lines changed

6 files changed

+392
-357
lines changed

src/iceberg/avro/avro_reader.cc

Lines changed: 20 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
#include <avro/GenericDatum.hh>
3333

3434
#include "iceberg/arrow/arrow_fs_file_io.h"
35-
#include "iceberg/avro/avro_constants.h"
3635
#include "iceberg/avro/avro_data_util_internal.h"
3736
#include "iceberg/avro/avro_schema_util_internal.h"
3837
#include "iceberg/avro/avro_stream_internal.h"
@@ -98,7 +97,7 @@ class AvroReader::Impl {
9897
// Create a base reader without setting reader schema to enable projection.
9998
auto base_reader =
10099
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
101-
const ::avro::ValidSchema& file_schema = base_reader->dataSchema();
100+
::avro::ValidSchema file_schema = base_reader->dataSchema();
102101

103102
// Validate field ids in the file schema.
104103
HasIdVisitor has_id_visitor;
@@ -107,8 +106,25 @@ class AvroReader::Impl {
107106
if (has_id_visitor.HasNoIds()) {
108107
// Apply field IDs based on name mapping if available
109108
if (options.name_mapping) {
110-
ICEBERG_RETURN_UNEXPECTED(ApplyFieldIdsFromNameMapping(*options.name_mapping,
111-
file_schema.root().get()));
109+
ICEBERG_ASSIGN_OR_RAISE(
110+
auto new_root_node,
111+
CreateAvroNodeWithFieldIds(file_schema.root(), *options.name_mapping));
112+
113+
// Create a new schema with the updated root node
114+
auto new_schema = ::avro::ValidSchema(new_root_node);
115+
116+
// Verify that all fields now have IDs after applying the name mapping
117+
HasIdVisitor verify_visitor;
118+
ICEBERG_RETURN_UNEXPECTED(verify_visitor.Visit(new_schema));
119+
if (!verify_visitor.AllHaveIds()) {
120+
// TODO(liuxiaoyu): Print detailed error message with missing field IDs
121+
// information in future
122+
return InvalidSchema(
123+
"Not all fields have field IDs after applying name mapping.");
124+
}
125+
126+
// Update the file schema to use the new schema with field IDs
127+
file_schema = new_schema;
112128
} else {
113129
return InvalidSchema(
114130
"Avro file schema has no field IDs and no name mapping provided");
@@ -220,156 +236,6 @@ class AvroReader::Impl {
220236
return arrow_array;
221237
}
222238

223-
// Apply field IDs to Avro schema nodes based on name mapping
224-
Status ApplyFieldIdsFromNameMapping(const NameMapping& name_mapping,
225-
::avro::Node* node) {
226-
switch (node->type()) {
227-
case ::avro::AVRO_RECORD:
228-
return ApplyFieldIdsToRecord(node, name_mapping);
229-
case ::avro::AVRO_ARRAY:
230-
return ApplyFieldIdsToArray(node, name_mapping);
231-
case ::avro::AVRO_MAP:
232-
return ApplyFieldIdsToMap(node, name_mapping);
233-
case ::avro::AVRO_UNION:
234-
return ApplyFieldIdsToUnion(node, name_mapping);
235-
case ::avro::AVRO_BOOL:
236-
case ::avro::AVRO_INT:
237-
case ::avro::AVRO_LONG:
238-
case ::avro::AVRO_FLOAT:
239-
case ::avro::AVRO_DOUBLE:
240-
case ::avro::AVRO_STRING:
241-
case ::avro::AVRO_BYTES:
242-
case ::avro::AVRO_FIXED:
243-
return {};
244-
case ::avro::AVRO_NULL:
245-
case ::avro::AVRO_ENUM:
246-
default:
247-
return InvalidSchema("Unsupported Avro type for field ID application: {}",
248-
static_cast<int>(node->type()));
249-
}
250-
}
251-
252-
Status ApplyFieldIdsToRecord(::avro::Node* node, const NameMapping& name_mapping) {
253-
for (size_t i = 0; i < node->leaves(); ++i) {
254-
const std::string& field_name = node->nameAt(i);
255-
::avro::Node* field_node = node->leafAt(i).get();
256-
257-
// Try to find field ID by name in the name mapping
258-
if (auto field_ref = name_mapping.Find(field_name)) {
259-
if (field_ref->get().field_id.has_value()) {
260-
// Add field ID attribute to the node
261-
::avro::CustomAttributes attributes;
262-
attributes.addAttribute(std::string(kFieldId),
263-
std::to_string(field_ref->get().field_id.value()),
264-
false);
265-
node->addCustomAttributesForField(attributes);
266-
}
267-
268-
// Recursively apply field IDs to nested fields if they exist
269-
if (field_ref->get().nested_mapping &&
270-
field_node->type() == ::avro::AVRO_RECORD) {
271-
const auto& nested_mapping = field_ref->get().nested_mapping;
272-
auto fields_span = nested_mapping->fields();
273-
std::vector<MappedField> fields_vector(fields_span.begin(), fields_span.end());
274-
auto nested_name_mapping = NameMapping::Make(std::move(fields_vector));
275-
ICEBERG_RETURN_UNEXPECTED(
276-
ApplyFieldIdsFromNameMapping(*nested_name_mapping, field_node));
277-
} else {
278-
// Recursively apply field IDs to child nodes (only if not already handled by
279-
// nested mapping)
280-
ICEBERG_RETURN_UNEXPECTED(
281-
ApplyFieldIdsFromNameMapping(name_mapping, field_node));
282-
}
283-
} else {
284-
// Recursively apply field IDs to child nodes even if no mapping found
285-
ICEBERG_RETURN_UNEXPECTED(ApplyFieldIdsFromNameMapping(name_mapping, field_node));
286-
}
287-
}
288-
return {};
289-
}
290-
291-
Status ApplyFieldIdsToArray(::avro::Node* node, const NameMapping& name_mapping) {
292-
// TODO(liuxiaoyu): Add debug logging to print node information for troubleshooting
293-
// when array type validation fails
294-
if (node->leaves() != 1) {
295-
return InvalidSchema("Array type must have exactly one leaf");
296-
}
297-
298-
// Check if this is a map represented as array
299-
if (node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
300-
node->logicalType().customLogicalType() != nullptr &&
301-
node->logicalType().customLogicalType()->name() == kMapLogicalType) {
302-
return ApplyFieldIdsFromNameMapping(name_mapping, node->leafAt(0).get());
303-
}
304-
305-
// For regular arrays, try to find element field ID
306-
if (auto element_field = name_mapping.Find(std::string(kElement))) {
307-
if (element_field->get().field_id.has_value()) {
308-
::avro::CustomAttributes attributes;
309-
attributes.addAttribute(std::string(kElementId),
310-
std::to_string(element_field->get().field_id.value()),
311-
false);
312-
node->addCustomAttributesForField(attributes);
313-
}
314-
}
315-
316-
return ApplyFieldIdsFromNameMapping(name_mapping, node->leafAt(0).get());
317-
}
318-
319-
Status ApplyFieldIdsToMap(::avro::Node* node, const NameMapping& name_mapping) {
320-
if (node->leaves() != 2) {
321-
return InvalidSchema("Map type must have exactly two leaves");
322-
}
323-
324-
// Try to find key and value field IDs
325-
if (auto key_field = name_mapping.Find(std::string(kKey))) {
326-
if (key_field->get().field_id.has_value()) {
327-
::avro::CustomAttributes attributes;
328-
attributes.addAttribute(std::string(kKeyId),
329-
std::to_string(key_field->get().field_id.value()), false);
330-
node->addCustomAttributesForField(attributes);
331-
}
332-
}
333-
334-
if (auto value_field = name_mapping.Find(std::string(kValue))) {
335-
if (value_field->get().field_id.has_value()) {
336-
::avro::CustomAttributes attributes;
337-
attributes.addAttribute(std::string(kValueId),
338-
std::to_string(value_field->get().field_id.value()),
339-
false);
340-
node->addCustomAttributesForField(attributes);
341-
}
342-
}
343-
344-
return ApplyFieldIdsFromNameMapping(name_mapping, node->leafAt(1).get());
345-
}
346-
347-
Status ApplyFieldIdsToUnion(::avro::Node* node, const NameMapping& name_mapping) {
348-
if (node->leaves() != 2) {
349-
return InvalidSchema("Union type must have exactly two branches");
350-
}
351-
352-
const auto& branch_0 = node->leafAt(0);
353-
const auto& branch_1 = node->leafAt(1);
354-
355-
bool branch_0_is_null = (branch_0->type() == ::avro::AVRO_NULL);
356-
bool branch_1_is_null = (branch_1->type() == ::avro::AVRO_NULL);
357-
358-
if (branch_0_is_null && !branch_1_is_null) {
359-
// branch_0 is null, branch_1 is not null
360-
return ApplyFieldIdsFromNameMapping(name_mapping, branch_1.get());
361-
} else if (!branch_0_is_null && branch_1_is_null) {
362-
// branch_0 is not null, branch_1 is null
363-
return ApplyFieldIdsFromNameMapping(name_mapping, branch_0.get());
364-
} else if (branch_0_is_null && branch_1_is_null) {
365-
// Both branches are null - this is invalid
366-
return InvalidSchema("Union type cannot have two null branches");
367-
} else {
368-
// Neither branch is null - this is invalid
369-
return InvalidSchema("Union type must have exactly one null branch");
370-
}
371-
}
372-
373239
private:
374240
// Max number of rows in the record batch to read.
375241
int64_t batch_size_{};

0 commit comments

Comments
 (0)