Skip to content

Commit 4ff7967

Browse files
support applying field-ids based on name mapping
1 parent 3cf5963 commit 4ff7967

6 files changed

Lines changed: 372 additions & 11 deletions

File tree

src/iceberg/avro/avro_reader.cc

Lines changed: 140 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,14 @@
3030
#include <avro/DataFile.hh>
3131
#include <avro/Generic.hh>
3232
#include <avro/GenericDatum.hh>
33+
#include <nlohmann/json.hpp>
3334

3435
#include "iceberg/arrow/arrow_fs_file_io.h"
3536
#include "iceberg/avro/avro_data_util_internal.h"
3637
#include "iceberg/avro/avro_schema_util_internal.h"
3738
#include "iceberg/avro/avro_stream_internal.h"
39+
#include "iceberg/json_internal.h"
40+
#include "iceberg/name_mapping.h"
3841
#include "iceberg/schema_internal.h"
3942
#include "iceberg/util/checked_cast.h"
4043
#include "iceberg/util/macros.h"
@@ -84,6 +87,7 @@ class AvroBatchReader::Impl {
8487

8588
// Open the input stream and adapt to the avro interface.
8689
// TODO(gangwu): make this configurable
90+
// maybe TODO
8791
constexpr int64_t kDefaultBufferSize = 1024 * 1024;
8892
ICEBERG_ASSIGN_OR_RAISE(auto input_stream,
8993
CreateInputStream(options, kDefaultBufferSize));
@@ -96,11 +100,19 @@ class AvroBatchReader::Impl {
96100
// Validate field ids in the file schema.
97101
HasIdVisitor has_id_visitor;
98102
ICEBERG_RETURN_UNEXPECTED(has_id_visitor.Visit(file_schema));
103+
99104
if (has_id_visitor.HasNoIds()) {
100-
// TODO(gangwu): support applying field-ids based on name mapping
101-
return NotImplemented("Avro file schema has no field IDs");
102-
}
103-
if (!has_id_visitor.AllHaveIds()) {
105+
// Apply field IDs based on name mapping if available
106+
auto name_mapping_iter = options.properties.find("name_mapping");
107+
if (name_mapping_iter != options.properties.end()) {
108+
// Parse name mapping from JSON string
109+
ICEBERG_ASSIGN_OR_RAISE(auto name_mapping,
110+
iceberg::NameMappingFromJson(nlohmann::json::parse(name_mapping_iter->second)));
111+
ICEBERG_RETURN_UNEXPECTED(ApplyFieldIdsFromNameMapping(file_schema.root(), *name_mapping));
112+
} else {
113+
return NotImplemented("Avro file schema has no field IDs and no name mapping provided");
114+
}
115+
} else if (!has_id_visitor.AllHaveIds()) {
104116
return InvalidSchema("Not all fields in the Avro file schema have field IDs");
105117
}
106118

@@ -195,6 +207,130 @@ class AvroBatchReader::Impl {
195207
return arrow_array;
196208
}
197209

210+
// Apply field IDs to Avro schema nodes based on name mapping
211+
Status ApplyFieldIdsFromNameMapping(const ::avro::NodePtr& node, const NameMapping& name_mapping) {
212+
switch (node->type()) {
213+
case ::avro::AVRO_RECORD:
214+
return ApplyFieldIdsToRecord(node, name_mapping);
215+
case ::avro::AVRO_ARRAY:
216+
return ApplyFieldIdsToArray(node, name_mapping);
217+
case ::avro::AVRO_MAP:
218+
return ApplyFieldIdsToMap(node, name_mapping);
219+
case ::avro::AVRO_UNION:
220+
return ApplyFieldIdsToUnion(node, name_mapping);
221+
case ::avro::AVRO_BOOL:
222+
case ::avro::AVRO_INT:
223+
case ::avro::AVRO_LONG:
224+
case ::avro::AVRO_FLOAT:
225+
case ::avro::AVRO_DOUBLE:
226+
case ::avro::AVRO_STRING:
227+
case ::avro::AVRO_BYTES:
228+
case ::avro::AVRO_FIXED:
229+
return {};
230+
case ::avro::AVRO_NULL:
231+
case ::avro::AVRO_ENUM:
232+
default:
233+
return InvalidSchema("Unsupported Avro type for field ID application: {}", static_cast<int>(node->type()));
234+
}
235+
}
236+
237+
Status ApplyFieldIdsToRecord(const ::avro::NodePtr& node, const NameMapping& name_mapping) {
238+
for (size_t i = 0; i < node->leaves(); ++i) {
239+
const std::string& field_name = node->nameAt(i);
240+
::avro::NodePtr field_node = node->leafAt(i);
241+
242+
// Try to find field ID by name in the name mapping
243+
if (auto field_ref = name_mapping.Find(field_name)) {
244+
if (field_ref->get().field_id.has_value()) {
245+
// Add field ID attribute to the node
246+
::avro::CustomAttributes attributes;
247+
attributes.addAttribute("field-id", std::to_string(field_ref->get().field_id.value()), false);
248+
node->addCustomAttributesForField(attributes);
249+
}
250+
251+
// Recursively apply field IDs to nested fields if they exist
252+
if (field_ref->get().nested_mapping && field_node->type() == ::avro::AVRO_RECORD) {
253+
const auto& nested_mapping_ptr = field_ref->get().nested_mapping;
254+
auto fields_span = nested_mapping_ptr->fields();
255+
std::vector<MappedField> fields_vec(fields_span.begin(), fields_span.end());
256+
auto nested_name_mapping = NameMapping::Make(std::move(fields_vec));
257+
ICEBERG_RETURN_UNEXPECTED(ApplyFieldIdsFromNameMapping(field_node, *nested_name_mapping));
258+
}
259+
}
260+
261+
// Recursively apply field IDs to child nodes
262+
ICEBERG_RETURN_UNEXPECTED(ApplyFieldIdsFromNameMapping(field_node, name_mapping));
263+
}
264+
return {};
265+
}
266+
267+
Status ApplyFieldIdsToArray(const ::avro::NodePtr& node, const NameMapping& name_mapping) {
268+
if (node->leaves() != 1) {
269+
return InvalidSchema("Array type must have exactly one leaf");
270+
}
271+
272+
// Check if this is a map represented as array
273+
if (node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
274+
node->logicalType().customLogicalType() != nullptr &&
275+
node->logicalType().customLogicalType()->name() == "map") {
276+
return ApplyFieldIdsFromNameMapping(node->leafAt(0), name_mapping);
277+
}
278+
279+
// For regular arrays, try to find element field ID
280+
if (auto element_field = name_mapping.Find("element")) {
281+
if (element_field->get().field_id.has_value()) {
282+
::avro::CustomAttributes attributes;
283+
attributes.addAttribute("element-id", std::to_string(element_field->get().field_id.value()), false);
284+
node->addCustomAttributesForField(attributes);
285+
}
286+
}
287+
288+
return ApplyFieldIdsFromNameMapping(node->leafAt(0), name_mapping);
289+
}
290+
291+
Status ApplyFieldIdsToMap(const ::avro::NodePtr& node, const NameMapping& name_mapping) {
292+
if (node->leaves() != 2) {
293+
return InvalidSchema("Map type must have exactly two leaves");
294+
}
295+
296+
// Try to find key and value field IDs
297+
if (auto key_field = name_mapping.Find("key")) {
298+
if (key_field->get().field_id.has_value()) {
299+
::avro::CustomAttributes attributes;
300+
attributes.addAttribute("key-id", std::to_string(key_field->get().field_id.value()), false);
301+
node->addCustomAttributesForField(attributes);
302+
}
303+
}
304+
305+
if (auto value_field = name_mapping.Find("value")) {
306+
if (value_field->get().field_id.has_value()) {
307+
::avro::CustomAttributes attributes;
308+
attributes.addAttribute("value-id", std::to_string(value_field->get().field_id.value()), false);
309+
node->addCustomAttributesForField(attributes);
310+
}
311+
}
312+
313+
return ApplyFieldIdsFromNameMapping(node->leafAt(1), name_mapping);
314+
}
315+
316+
Status ApplyFieldIdsToUnion(const ::avro::NodePtr& node, const NameMapping& name_mapping) {
317+
if (node->leaves() != 2) {
318+
return InvalidSchema("Union type must have exactly two branches");
319+
}
320+
321+
const auto& branch_0 = node->leafAt(0);
322+
const auto& branch_1 = node->leafAt(1);
323+
324+
if (branch_0->type() == ::avro::AVRO_NULL) {
325+
return ApplyFieldIdsFromNameMapping(branch_1, name_mapping);
326+
}
327+
if (branch_1->type() == ::avro::AVRO_NULL) {
328+
return ApplyFieldIdsFromNameMapping(branch_0, name_mapping);
329+
}
330+
331+
return InvalidSchema("Union type must have exactly one null branch");
332+
}
333+
198334
private:
199335
// Max number of rows in the record batch to read.
200336
int64_t batch_size_{};

src/iceberg/name_mapping.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,22 +152,22 @@ const std::unordered_map<int32_t, MappedFieldConstRef>& MappedFields::LazyIdToFi
152152
NameMapping::NameMapping(std::unique_ptr<MappedFields> mapping)
153153
: mapping_(std::move(mapping)) {}
154154

155-
std::optional<MappedFieldConstRef> NameMapping::Find(int32_t id) {
155+
std::optional<MappedFieldConstRef> NameMapping::Find(int32_t id) const {
156156
const auto& fields_by_id = LazyFieldsById();
157157
if (auto iter = fields_by_id.find(id); iter != fields_by_id.cend()) {
158158
return iter->second;
159159
}
160160
return std::nullopt;
161161
}
162162

163-
std::optional<MappedFieldConstRef> NameMapping::Find(std::span<const std::string> names) {
163+
std::optional<MappedFieldConstRef> NameMapping::Find(std::span<const std::string> names) const {
164164
if (names.empty()) {
165165
return std::nullopt;
166166
}
167167
return Find(JoinByDot(names));
168168
}
169169

170-
std::optional<MappedFieldConstRef> NameMapping::Find(const std::string& name) {
170+
std::optional<MappedFieldConstRef> NameMapping::Find(const std::string& name) const {
171171
const auto& fields_by_name = LazyFieldsByName();
172172
if (auto iter = fields_by_name.find(name); iter != fields_by_name.cend()) {
173173
return iter->second;

src/iceberg/name_mapping.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,13 @@ class ICEBERG_EXPORT NameMapping {
104104
static std::unique_ptr<NameMapping> MakeEmpty();
105105

106106
/// \brief Find a field by its ID.
107-
std::optional<MappedFieldConstRef> Find(int32_t id);
107+
std::optional<MappedFieldConstRef> Find(int32_t id) const;
108108

109109
/// \brief Find a field by its unconcatenated names.
110-
std::optional<MappedFieldConstRef> Find(std::span<const std::string> names);
110+
std::optional<MappedFieldConstRef> Find(std::span<const std::string> names) const;
111111

112112
/// \brief Find a field by its (concatenated) name.
113-
std::optional<MappedFieldConstRef> Find(const std::string& name);
113+
std::optional<MappedFieldConstRef> Find(const std::string& name) const;
114114

115115
/// \brief Get the underlying MappedFields instance.
116116
const MappedFields& AsMappedFields() const;

test/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ add_test(NAME util_test COMMAND util_test)
7171

7272
if(ICEBERG_BUILD_BUNDLE)
7373
add_executable(avro_test)
74-
target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc avro_stream_test.cc)
74+
target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc avro_stream_test.cc avro_reader_test.cc)
7575
target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main
7676
GTest::gmock)
7777
add_test(NAME avro_test COMMAND avro_test)

0 commit comments

Comments
 (0)