Skip to content

Commit d43455d

Browse files
authored
feat: implement UpdateMapping and apply meta change to UpdateSchema (#561)
1 parent baf139c commit d43455d

File tree

9 files changed

+465
-16
lines changed

9 files changed

+465
-16
lines changed

src/iceberg/json_serde.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1270,6 +1270,17 @@ Result<std::unique_ptr<NameMapping>> NameMappingFromJson(const nlohmann::json& j
12701270
return NameMapping::Make(std::move(mapped_fields));
12711271
}
12721272

1273+
Result<std::string> UpdateMappingFromJsonString(
1274+
std::string_view mapping_json,
1275+
const std::unordered_map<int32_t, std::shared_ptr<SchemaField>>& updates,
1276+
const std::multimap<int32_t, int32_t>& adds) {
1277+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(std::string(mapping_json)));
1278+
ICEBERG_ASSIGN_OR_RAISE(auto current_mapping, NameMappingFromJson(json));
1279+
ICEBERG_ASSIGN_OR_RAISE(auto updated_mapping,
1280+
UpdateMapping(*current_mapping, updates, adds));
1281+
return ToJsonString(ToJson(*updated_mapping));
1282+
}
1283+
12731284
nlohmann::json ToJson(const TableIdentifier& identifier) {
12741285
nlohmann::json json;
12751286
json[kNamespace] = identifier.ns.levels;

src/iceberg/json_serde_internal.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@
1919

2020
#pragma once
2121

22+
#include <map>
2223
#include <memory>
24+
#include <optional>
25+
#include <string>
26+
#include <string_view>
27+
#include <unordered_map>
2328

2429
#include <nlohmann/json_fwd.hpp>
2530

@@ -347,6 +352,15 @@ ICEBERG_EXPORT nlohmann::json ToJson(const NameMapping& name_mapping);
347352
ICEBERG_EXPORT Result<std::unique_ptr<NameMapping>> NameMappingFromJson(
348353
const nlohmann::json& json);
349354

355+
/// \brief Update a name mapping from its JSON string and return updated JSON.
356+
///
357+
/// Parses the JSON, calls UpdateMapping, and serializes the result.
358+
/// Returns an error if parsing, mapping update, or serialization fails.
359+
ICEBERG_EXPORT Result<std::string> UpdateMappingFromJsonString(
360+
std::string_view mapping_json,
361+
const std::unordered_map<int32_t, std::shared_ptr<SchemaField>>& updates,
362+
const std::multimap<int32_t, int32_t>& adds);
363+
350364
/// \brief Serializes a `TableIdentifier` object to JSON.
351365
///
352366
/// \param identifier The `TableIdentifier` object to be serialized.

src/iceberg/name_mapping.cc

Lines changed: 169 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -310,29 +310,188 @@ class CreateMappingVisitor {
310310
private:
311311
Status AddMappedField(std::vector<MappedField>& fields, const std::string& name,
312312
const SchemaField& field) const {
313-
auto visit_result =
314-
VisitType(*field.type(), [this](const auto& type) { return this->Visit(type); });
315-
ICEBERG_RETURN_UNEXPECTED(visit_result);
313+
ICEBERG_ASSIGN_OR_RAISE(
314+
auto visit_result,
315+
VisitType(*field.type(), [this](const auto& type) { return this->Visit(type); }));
316316

317317
fields.emplace_back(MappedField{
318318
.names = {name},
319319
.field_id = field.field_id(),
320-
.nested_mapping = std::move(visit_result.value()),
320+
.nested_mapping = std::move(visit_result),
321321
});
322322
return {};
323323
}
324324
};
325325

326+
// Visitor class for updating name mappings with schema changes
327+
class UpdateMappingVisitor {
328+
public:
329+
UpdateMappingVisitor(
330+
const std::unordered_map<int32_t, std::shared_ptr<SchemaField>>& updates,
331+
const std::multimap<int32_t, int32_t>& adds)
332+
: updates_(updates), adds_(adds) {}
333+
334+
Result<std::unique_ptr<MappedFields>> VisitMapping(const NameMapping& mapping) {
335+
ICEBERG_ASSIGN_OR_RAISE(auto fields_result, VisitFields(mapping.AsMappedFields()));
336+
return AddNewFields(std::move(fields_result), kRootId);
337+
}
338+
339+
private:
340+
static constexpr int32_t kRootId = -1;
341+
342+
Result<std::unique_ptr<MappedFields>> VisitFields(const MappedFields& fields) {
343+
// Recursively visit all fields
344+
std::vector<MappedField> field_results;
345+
field_results.reserve(fields.Size());
346+
347+
for (const auto& field : fields.fields()) {
348+
ICEBERG_ASSIGN_OR_RAISE(auto field_result, VisitField(field));
349+
field_results.push_back(std::move(field_result));
350+
}
351+
352+
// Build update assignments map for removing reassigned names
353+
std::unordered_map<std::string, int32_t> update_assignments;
354+
for (const auto& field : field_results) {
355+
if (field.field_id.has_value()) {
356+
auto update_it = updates_.find(field.field_id.value());
357+
if (update_it != updates_.end()) {
358+
update_assignments.emplace(std::string(update_it->second->name()),
359+
field.field_id.value());
360+
}
361+
}
362+
}
363+
364+
// Remove reassigned names from all fields
365+
for (auto& field : field_results) {
366+
field = RemoveReassignedNames(field, update_assignments);
367+
}
368+
369+
return MappedFields::Make(std::move(field_results));
370+
}
371+
372+
Result<MappedField> VisitField(const MappedField& field) {
373+
// Update this field's names
374+
std::unordered_set<std::string> field_names = field.names;
375+
if (field.field_id.has_value()) {
376+
auto update_it = updates_.find(field.field_id.value());
377+
if (update_it != updates_.end()) {
378+
field_names.insert(std::string(update_it->second->name()));
379+
}
380+
}
381+
382+
std::unique_ptr<MappedFields> nested_mapping = nullptr;
383+
if (field.nested_mapping != nullptr) {
384+
ICEBERG_ASSIGN_OR_RAISE(nested_mapping, VisitFields(*field.nested_mapping));
385+
}
386+
387+
// Add a new mapping for any new nested fields
388+
if (field.field_id.has_value()) {
389+
ICEBERG_ASSIGN_OR_RAISE(nested_mapping, AddNewFields(std::move(nested_mapping),
390+
field.field_id.value()));
391+
}
392+
393+
return MappedField{
394+
.names = std::move(field_names),
395+
.field_id = field.field_id,
396+
.nested_mapping = std::move(nested_mapping),
397+
};
398+
}
399+
400+
Result<std::unique_ptr<MappedFields>> AddNewFields(
401+
std::unique_ptr<MappedFields> mapping, int32_t parent_id) {
402+
auto range = adds_.equal_range(parent_id);
403+
std::vector<const SchemaField*> fields_to_add;
404+
for (auto it = range.first; it != range.second; ++it) {
405+
auto update_it = updates_.find(it->second);
406+
if (update_it != updates_.end()) {
407+
fields_to_add.push_back(update_it->second.get());
408+
}
409+
}
410+
411+
if (fields_to_add.empty()) {
412+
return std::move(mapping);
413+
}
414+
415+
std::vector<MappedField> new_fields;
416+
CreateMappingVisitor create_visitor;
417+
for (const auto* field_to_add : fields_to_add) {
418+
ICEBERG_ASSIGN_OR_RAISE(
419+
auto nested_result,
420+
VisitType(*field_to_add->type(), [&create_visitor](const auto& type) {
421+
return create_visitor.Visit(type);
422+
}));
423+
424+
new_fields.emplace_back(MappedField{
425+
.names = {std::string(field_to_add->name())},
426+
.field_id = field_to_add->field_id(),
427+
.nested_mapping = std::move(nested_result),
428+
});
429+
}
430+
431+
if (mapping == nullptr || mapping->Size() == 0) {
432+
return MappedFields::Make(std::move(new_fields));
433+
}
434+
435+
// Build assignments map for removing reassigned names
436+
std::unordered_map<std::string, int32_t> assignments;
437+
for (const auto* field_to_add : fields_to_add) {
438+
assignments.emplace(std::string(field_to_add->name()), field_to_add->field_id());
439+
}
440+
441+
// create a copy of fields that can be updated (append new fields, replace existing
442+
// for reassignment)
443+
std::vector<MappedField> fields;
444+
fields.reserve(mapping->Size() + new_fields.size());
445+
for (const auto& field : mapping->fields()) {
446+
fields.push_back(RemoveReassignedNames(field, assignments));
447+
}
448+
449+
fields.insert(fields.end(), std::make_move_iterator(new_fields.begin()),
450+
std::make_move_iterator(new_fields.end()));
451+
452+
return MappedFields::Make(std::move(fields));
453+
}
454+
455+
static MappedField RemoveReassignedNames(
456+
const MappedField& field,
457+
const std::unordered_map<std::string, int32_t>& assignments) {
458+
std::unordered_set<std::string> updated_names = field.names;
459+
std::erase_if(updated_names, [&](const std::string& name) {
460+
auto assign_it = assignments.find(name);
461+
return assign_it != assignments.end() &&
462+
(!field.field_id.has_value() || assign_it->second != field.field_id.value());
463+
});
464+
return MappedField{
465+
.names = std::move(updated_names),
466+
.field_id = field.field_id,
467+
.nested_mapping = field.nested_mapping,
468+
};
469+
}
470+
471+
const std::unordered_map<int32_t, std::shared_ptr<SchemaField>>& updates_;
472+
const std::multimap<int32_t, int32_t>& adds_;
473+
};
474+
326475
} // namespace
327476

328477
Result<std::unique_ptr<NameMapping>> CreateMapping(const Schema& schema) {
329478
CreateMappingVisitor visitor;
330-
auto result = VisitType(
331-
schema, [&visitor](const auto& type) -> Result<std::unique_ptr<MappedFields>> {
332-
return visitor.Visit(type);
333-
});
334-
ICEBERG_RETURN_UNEXPECTED(result);
335-
return NameMapping::Make(std::move(*result));
479+
ICEBERG_ASSIGN_OR_RAISE(
480+
auto result,
481+
VisitType(schema,
482+
[&visitor](const auto& type) -> Result<std::unique_ptr<MappedFields>> {
483+
return visitor.Visit(type);
484+
}));
485+
return NameMapping::Make(std::move(result));
486+
}
487+
488+
Result<std::unique_ptr<NameMapping>> UpdateMapping(
489+
const NameMapping& mapping,
490+
const std::unordered_map<int32_t, std::shared_ptr<SchemaField>>& updates,
491+
const std::multimap<int32_t, int32_t>& adds) {
492+
UpdateMappingVisitor visitor(updates, adds);
493+
ICEBERG_ASSIGN_OR_RAISE(auto result, visitor.VisitMapping(mapping));
494+
return NameMapping::Make(std::move(result));
336495
}
337496

338497
} // namespace iceberg

src/iceberg/name_mapping.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#pragma once
2121

2222
#include <functional>
23+
#include <map>
2324
#include <memory>
2425
#include <optional>
2526
#include <span>
@@ -143,16 +144,15 @@ ICEBERG_EXPORT std::string ToString(const NameMapping& mapping);
143144
/// \return A new NameMapping instance initialized with the schema's fields and names.
144145
ICEBERG_EXPORT Result<std::unique_ptr<NameMapping>> CreateMapping(const Schema& schema);
145146

146-
/// TODO(gangwu): implement this function once SchemaUpdate is supported
147-
///
148147
/// \brief Update a name-based mapping using changes to a schema.
149148
/// \param mapping a name-based mapping
150149
/// \param updates a map from field ID to updated field definitions
151150
/// \param adds a map from parent field ID to nested fields to be added
152151
/// \return an updated mapping with names added to renamed fields and the mapping extended
153152
/// for new fields
154-
// ICEBERG_EXPORT Result<std::unique_ptr<NameMapping>> UpdateMapping(
155-
// const NameMapping& mapping, const std::map<int32_t, SchemaField>& updates,
156-
// const std::multimap<int32_t, int32_t>& adds);
153+
ICEBERG_EXPORT Result<std::unique_ptr<NameMapping>> UpdateMapping(
154+
const NameMapping& mapping,
155+
const std::unordered_map<int32_t, std::shared_ptr<SchemaField>>& updates,
156+
const std::multimap<int32_t, int32_t>& adds);
157157

158158
} // namespace iceberg

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ if(ICEBERG_BUILD_BUNDLE)
179179
SOURCES
180180
expire_snapshots_test.cc
181181
fast_append_test.cc
182+
name_mapping_update_test.cc
182183
snapshot_manager_test.cc
183184
transaction_test.cc
184185
update_location_test.cc

0 commit comments

Comments
 (0)