Skip to content

Commit 467d908

Browse files
committed
feat: implement UpdateMapping and apply meta change to UpdateSchema
1 parent 2e00ce0 commit 467d908

File tree

9 files changed

+466
-6
lines changed

9 files changed

+466
-6
lines changed

src/iceberg/json_serde.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1270,6 +1270,24 @@ Result<std::unique_ptr<NameMapping>> NameMappingFromJson(const nlohmann::json& j
12701270
return NameMapping::Make(std::move(mapped_fields));
12711271
}
12721272

1273+
std::optional<std::string> UpdateMappingFromJsonString(
1274+
std::string_view mapping_json, const std::map<int32_t, SchemaField>& updates,
1275+
const std::multimap<int32_t, int32_t>& adds) {
1276+
auto json_result = FromJsonString(std::string(mapping_json));
1277+
if (!json_result) return std::nullopt;
1278+
1279+
auto current_mapping = NameMappingFromJson(*json_result);
1280+
if (!current_mapping) return std::nullopt;
1281+
1282+
auto updated_mapping = UpdateMapping(**current_mapping, updates, adds);
1283+
if (!updated_mapping) return std::nullopt;
1284+
1285+
auto json_str = ToJsonString(ToJson(**updated_mapping));
1286+
if (!json_str) return std::nullopt;
1287+
1288+
return std::move(*json_str);
1289+
}
1290+
12731291
nlohmann::json ToJson(const TableIdentifier& identifier) {
12741292
nlohmann::json json;
12751293
json[kNamespace] = identifier.ns.levels;

src/iceberg/json_serde_internal.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@
1919

2020
#pragma once
2121

22+
#include <map>
2223
#include <memory>
24+
#include <optional>
25+
#include <string>
26+
#include <string_view>
2327

2428
#include <nlohmann/json_fwd.hpp>
2529

@@ -347,6 +351,14 @@ ICEBERG_EXPORT nlohmann::json ToJson(const NameMapping& name_mapping);
347351
ICEBERG_EXPORT Result<std::unique_ptr<NameMapping>> NameMappingFromJson(
348352
const nlohmann::json& json);
349353

354+
/// \brief Update a name mapping from its JSON string and return updated JSON.
355+
///
356+
/// Parses the JSON, calls UpdateMapping, and serializes the result.
357+
/// Returns nullopt if any step fails.
358+
ICEBERG_EXPORT std::optional<std::string> UpdateMappingFromJsonString(
359+
std::string_view mapping_json, const std::map<int32_t, SchemaField>& updates,
360+
const std::multimap<int32_t, int32_t>& adds);
361+
350362
/// \brief Serializes a `TableIdentifier` object to JSON.
351363
///
352364
/// \param identifier The `TableIdentifier` object to be serialized.

src/iceberg/name_mapping.cc

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,158 @@ class CreateMappingVisitor {
323323
}
324324
};
325325

326+
// Visitor class for updating name mappings with schema changes
327+
class UpdateMappingVisitor {
328+
public:
329+
UpdateMappingVisitor(const std::map<int32_t, SchemaField>& updates,
330+
const std::multimap<int32_t, int32_t>& adds)
331+
: updates_(updates), adds_(adds) {}
332+
333+
Result<std::unique_ptr<MappedFields>> VisitMapping(const NameMapping& mapping) {
334+
auto fields_result = VisitFields(mapping.AsMappedFields());
335+
ICEBERG_RETURN_UNEXPECTED(fields_result);
336+
return AddNewFields(std::move(*fields_result),
337+
-1 /* parent ID for top-level fields */);
338+
}
339+
340+
private:
341+
Result<std::unique_ptr<MappedFields>> VisitFields(const MappedFields& fields) {
342+
// Recursively visit all fields
343+
std::vector<MappedField> field_results;
344+
field_results.reserve(fields.Size());
345+
346+
for (const auto& field : fields.fields()) {
347+
auto field_result = VisitField(field);
348+
ICEBERG_RETURN_UNEXPECTED(field_result);
349+
field_results.push_back(std::move(*field_result));
350+
}
351+
352+
// Build update assignments map for removing reassigned names
353+
std::unordered_map<std::string, int32_t> update_assignments;
354+
std::ranges::for_each(field_results, [&](const auto& field) {
355+
if (field.field_id.has_value()) {
356+
auto update_it = updates_.find(field.field_id.value());
357+
if (update_it != updates_.end()) {
358+
update_assignments.emplace(std::string(update_it->second.name()),
359+
field.field_id.value());
360+
}
361+
}
362+
});
363+
364+
// Remove reassigned names from all fields
365+
for (auto& field : field_results) {
366+
field = RemoveReassignedNames(field, update_assignments);
367+
}
368+
369+
return MappedFields::Make(std::move(field_results));
370+
}
371+
372+
Result<MappedField> VisitField(const MappedField& field) {
373+
// Update this field's names
374+
std::unordered_set<std::string> field_names = field.names;
375+
if (field.field_id.has_value()) {
376+
auto update_it = updates_.find(field.field_id.value());
377+
if (update_it != updates_.end()) {
378+
field_names.insert(std::string(update_it->second.name()));
379+
}
380+
}
381+
382+
std::unique_ptr<MappedFields> nested_mapping = nullptr;
383+
if (field.nested_mapping != nullptr) {
384+
auto nested_result = VisitFields(*field.nested_mapping);
385+
ICEBERG_RETURN_UNEXPECTED(nested_result);
386+
nested_mapping = std::move(*nested_result);
387+
}
388+
389+
// Add a new mapping for any new nested fields
390+
if (field.field_id.has_value()) {
391+
auto nested_result =
392+
AddNewFields(std::move(nested_mapping), field.field_id.value());
393+
ICEBERG_RETURN_UNEXPECTED(nested_result);
394+
nested_mapping = std::move(*nested_result);
395+
}
396+
397+
return MappedField{
398+
.names = std::move(field_names),
399+
.field_id = field.field_id,
400+
.nested_mapping = std::move(nested_mapping),
401+
};
402+
}
403+
404+
Result<std::unique_ptr<MappedFields>> AddNewFields(
405+
std::unique_ptr<MappedFields> mapping, int32_t parent_id) {
406+
auto range = adds_.equal_range(parent_id);
407+
std::vector<const SchemaField*> fields_to_add;
408+
for (auto it = range.first; it != range.second; ++it) {
409+
auto update_it = updates_.find(it->second);
410+
if (update_it != updates_.end()) {
411+
fields_to_add.push_back(&update_it->second);
412+
}
413+
}
414+
415+
if (fields_to_add.empty()) {
416+
return std::move(mapping);
417+
}
418+
419+
std::vector<MappedField> new_fields;
420+
CreateMappingVisitor create_visitor;
421+
for (const auto* field_to_add : fields_to_add) {
422+
auto nested_result = VisitType(
423+
*field_to_add->type(),
424+
[&create_visitor](const auto& type) { return create_visitor.Visit(type); });
425+
ICEBERG_RETURN_UNEXPECTED(nested_result);
426+
427+
new_fields.emplace_back(MappedField{
428+
.names = {std::string(field_to_add->name())},
429+
.field_id = field_to_add->field_id(),
430+
.nested_mapping = std::move(*nested_result),
431+
});
432+
}
433+
434+
if (mapping == nullptr || mapping->Size() == 0) {
435+
return MappedFields::Make(std::move(new_fields));
436+
}
437+
438+
// Build assignments map for removing reassigned names
439+
std::unordered_map<std::string, int32_t> assignments;
440+
for (const auto* field_to_add : fields_to_add) {
441+
assignments.emplace(std::string(field_to_add->name()), field_to_add->field_id());
442+
}
443+
444+
// create a copy of fields that can be updated (append new fields, replace existing
445+
// for reassignment)
446+
std::vector<MappedField> fields;
447+
fields.reserve(mapping->Size() + new_fields.size());
448+
for (const auto& field : mapping->fields()) {
449+
fields.push_back(RemoveReassignedNames(field, assignments));
450+
}
451+
452+
fields.insert(fields.end(), std::make_move_iterator(new_fields.begin()),
453+
std::make_move_iterator(new_fields.end()));
454+
455+
return MappedFields::Make(std::move(fields));
456+
}
457+
458+
static MappedField RemoveReassignedNames(
459+
const MappedField& field,
460+
const std::unordered_map<std::string, int32_t>& assignments) {
461+
std::unordered_set<std::string> updated_names = field.names;
462+
std::erase_if(updated_names, [&](const std::string& name) {
463+
auto assign_it = assignments.find(name);
464+
return assign_it != assignments.end() &&
465+
(!field.field_id.has_value() || assign_it->second != field.field_id.value());
466+
});
467+
return MappedField{
468+
.names = std::move(updated_names),
469+
.field_id = field.field_id,
470+
.nested_mapping = field.nested_mapping,
471+
};
472+
}
473+
474+
const std::map<int32_t, SchemaField>& updates_;
475+
const std::multimap<int32_t, int32_t>& adds_;
476+
};
477+
326478
} // namespace
327479

328480
Result<std::unique_ptr<NameMapping>> CreateMapping(const Schema& schema) {
@@ -335,4 +487,13 @@ Result<std::unique_ptr<NameMapping>> CreateMapping(const Schema& schema) {
335487
return NameMapping::Make(std::move(*result));
336488
}
337489

490+
Result<std::unique_ptr<NameMapping>> UpdateMapping(
491+
const NameMapping& mapping, const std::map<int32_t, SchemaField>& updates,
492+
const std::multimap<int32_t, int32_t>& adds) {
493+
UpdateMappingVisitor visitor(updates, adds);
494+
auto result = visitor.VisitMapping(mapping);
495+
ICEBERG_RETURN_UNEXPECTED(result);
496+
return NameMapping::Make(std::move(*result));
497+
}
498+
338499
} // namespace iceberg

src/iceberg/name_mapping.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#pragma once
2121

2222
#include <functional>
23+
#include <map>
2324
#include <memory>
2425
#include <optional>
2526
#include <span>
@@ -143,16 +144,14 @@ ICEBERG_EXPORT std::string ToString(const NameMapping& mapping);
143144
/// \return A new NameMapping instance initialized with the schema's fields and names.
144145
ICEBERG_EXPORT Result<std::unique_ptr<NameMapping>> CreateMapping(const Schema& schema);
145146

146-
/// TODO(gangwu): implement this function once SchemaUpdate is supported
147-
///
148147
/// \brief Update a name-based mapping using changes to a schema.
149148
/// \param mapping a name-based mapping
150149
/// \param updates a map from field ID to updated field definitions
151150
/// \param adds a map from parent field ID to nested fields to be added
152151
/// \return an updated mapping with names added to renamed fields and the mapping extended
153152
/// for new fields
154-
// ICEBERG_EXPORT Result<std::unique_ptr<NameMapping>> UpdateMapping(
155-
// const NameMapping& mapping, const std::map<int32_t, SchemaField>& updates,
156-
// const std::multimap<int32_t, int32_t>& adds);
153+
ICEBERG_EXPORT Result<std::unique_ptr<NameMapping>> UpdateMapping(
154+
const NameMapping& mapping, const std::map<int32_t, SchemaField>& updates,
155+
const std::multimap<int32_t, int32_t>& adds);
157156

158157
} // namespace iceberg

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ if(ICEBERG_BUILD_BUNDLE)
179179
SOURCES
180180
expire_snapshots_test.cc
181181
fast_append_test.cc
182+
name_mapping_update_test.cc
182183
set_snapshot_test.cc
183184
transaction_test.cc
184185
update_location_test.cc

0 commit comments

Comments
 (0)