Skip to content

Commit a6b776f

Browse files
committed
feat:implement AddSortOrder for table metadata
1 parent 4401ceb commit a6b776f

File tree

5 files changed

+269
-13
lines changed

5 files changed

+269
-13
lines changed

src/iceberg/table_metadata.cc

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@
2121

2222
#include <algorithm>
2323
#include <chrono>
24+
#include <cstdint>
2425
#include <format>
2526
#include <ranges>
27+
#include <optional>
2628
#include <string>
29+
#include <unordered_map>
2730

2831
#include <nlohmann/json.hpp>
2932

@@ -274,11 +277,19 @@ struct TableMetadataBuilder::Impl {
274277

275278
// Change tracking
276279
std::vector<std::unique_ptr<TableUpdate>> changes;
280+
std::optional<int32_t> last_added_schema_id;
281+
std::optional<int32_t> last_added_order_id;
282+
std::optional<int32_t> last_added_spec_id;
277283

278284
// Metadata location tracking
279285
std::optional<std::string> metadata_location;
280286
std::optional<std::string> previous_metadata_location;
281287

288+
// indexes for convenience
289+
std::unordered_map<int32_t, std::shared_ptr<Schema>> schemas_by_id;
290+
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id;
291+
std::unordered_map<int32_t, std::shared_ptr<SortOrder>> sort_orders_by_id;
292+
282293
// Constructor for new table
283294
explicit Impl(int8_t format_version) : base(nullptr), metadata{} {
284295
metadata.format_version = format_version;
@@ -294,7 +305,38 @@ struct TableMetadataBuilder::Impl {
294305

295306
// Constructor from existing metadata
296307
explicit Impl(const TableMetadata* base_metadata)
297-
: base(base_metadata), metadata(*base_metadata) {}
308+
: base(base_metadata), metadata(*base_metadata) {
309+
// Initialize index maps from base metadata
310+
for (const auto& schema : metadata.schemas) {
311+
if (schema->schema_id().has_value()) {
312+
schemas_by_id.emplace(schema->schema_id().value(), schema);
313+
}
314+
}
315+
316+
for (const auto& spec : metadata.partition_specs) {
317+
specs_by_id.emplace(spec->spec_id(), spec);
318+
}
319+
320+
for (const auto& order : metadata.sort_orders) {
321+
sort_orders_by_id.emplace(order->order_id(), order);
322+
}
323+
}
324+
325+
int32_t reuseOrCreateNewSortOrderId(const SortOrder& new_order) {
326+
if (new_order.is_unsorted()) {
327+
return SortOrder::kUnsortedOrderId;
328+
}
329+
// determine the next order id
330+
int32_t new_order_id = SortOrder::kInitialSortOrderId;
331+
for (const auto& order : metadata.sort_orders) {
332+
if (order->SameOrder(new_order)) {
333+
return order->order_id();
334+
} else if (new_order_id <= order->order_id()) {
335+
new_order_id = order->order_id() + 1;
336+
}
337+
}
338+
return new_order_id;
339+
}
298340
};
299341

300342
TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
@@ -415,7 +457,68 @@ TableMetadataBuilder& TableMetadataBuilder::SetDefaultSortOrder(int32_t order_id
415457

416458
TableMetadataBuilder& TableMetadataBuilder::AddSortOrder(
417459
std::shared_ptr<SortOrder> order) {
418-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
460+
int32_t new_order_id = impl_->reuseOrCreateNewSortOrderId(*order);
461+
462+
if (impl_->sort_orders_by_id.find(new_order_id) != impl_->sort_orders_by_id.end()) {
463+
// update last_added_order_id if the order was added in this set of changes (since it
464+
// is now the last)
465+
bool is_new_order = false;
466+
for (const auto& change : impl_->changes) {
467+
auto* add_sort_order = dynamic_cast<table::AddSortOrder*>(change.get());
468+
if (add_sort_order && add_sort_order->sort_order()->order_id() == new_order_id) {
469+
is_new_order = true;
470+
break;
471+
}
472+
}
473+
impl_->last_added_order_id =
474+
is_new_order ? std::make_optional(new_order_id) : std::nullopt;
475+
return *this;
476+
}
477+
478+
// Get current schema for validation
479+
auto schema_result = impl_->metadata.Schema();
480+
if (!schema_result) {
481+
impl_->errors.emplace_back(
482+
ErrorKind::kInvalidArgument,
483+
std::format("Cannot find current schema: {}", schema_result.error().message));
484+
return *this;
485+
}
486+
487+
auto schema = schema_result.value();
488+
489+
// Validate the sort order against the schema
490+
auto validate_status = order->Validate(*schema);
491+
if (!validate_status) {
492+
impl_->errors.emplace_back(
493+
ErrorKind::kInvalidArgument,
494+
std::format("Sort order validation failed: {}", validate_status.error().message));
495+
return *this;
496+
}
497+
498+
std::shared_ptr<SortOrder> new_order;
499+
if (order->is_unsorted()) {
500+
new_order = SortOrder::Unsorted();
501+
} else {
502+
// TODO(Li Feiyang): rebuild the sort order using new column ids (freshSortOrder)
503+
// For now, create a new sort order with the provided fields
504+
auto sort_order_result = SortOrder::Make(
505+
*schema, new_order_id,
506+
std::vector<SortField>(order->fields().begin(), order->fields().end()));
507+
if (!sort_order_result) {
508+
impl_->errors.emplace_back(ErrorKind::kInvalidArgument,
509+
std::format("Failed to create sort order: {}",
510+
sort_order_result.error().message));
511+
return *this;
512+
}
513+
new_order = std::move(sort_order_result.value());
514+
}
515+
516+
impl_->metadata.sort_orders.push_back(new_order);
517+
impl_->sort_orders_by_id.emplace(new_order_id, new_order);
518+
519+
impl_->changes.push_back(std::make_unique<table::AddSortOrder>(new_order));
520+
impl_->last_added_order_id = new_order_id;
521+
return *this;
419522
}
420523

421524
TableMetadataBuilder& TableMetadataBuilder::AddSnapshot(

src/iceberg/table_update.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,12 @@ Status RemoveSchemas::GenerateRequirements(TableUpdateContext& context) const {
112112
// AddSortOrder
113113

114114
void AddSortOrder::ApplyTo(TableMetadataBuilder& builder) const {
115-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
115+
builder.AddSortOrder(sort_order_);
116116
}
117117

118118
Status AddSortOrder::GenerateRequirements(TableUpdateContext& context) const {
119-
return NotImplemented("AddTableSortOrder::GenerateRequirements not implemented");
119+
// AddSortOrder doesn't generate any requirements
120+
return {};
120121
}
121122

122123
// SetDefaultSortOrder

src/iceberg/test/table_metadata_builder_test.cc

Lines changed: 126 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,33 @@
1919

2020
#include <memory>
2121
#include <string>
22+
#include <vector>
2223

2324
#include <gtest/gtest.h>
2425

2526
#include "iceberg/partition_spec.h"
27+
#include "iceberg/schema.h"
2628
#include "iceberg/snapshot.h"
29+
#include "iceberg/sort_field.h"
2730
#include "iceberg/sort_order.h"
2831
#include "iceberg/table_metadata.h"
2932
#include "iceberg/table_update.h"
3033
#include "iceberg/test/matchers.h"
34+
#include "iceberg/transform.h"
35+
#include "iceberg/type.h"
3136

3237
namespace iceberg {
3338

3439
namespace {
3540

41+
// Helper function to create a simple schema for testing
42+
std::shared_ptr<Schema> CreateTestSchema() {
43+
auto field1 = SchemaField::MakeRequired(1, "id", int32());
44+
auto field2 = SchemaField::MakeRequired(2, "data", string());
45+
auto field3 = SchemaField::MakeRequired(3, "ts", timestamp());
46+
return std::make_shared<Schema>(std::vector<SchemaField>{field1, field2, field3}, 0);
47+
}
48+
3649
// Helper function to create base metadata for tests
3750
std::unique_ptr<TableMetadata> CreateBaseMetadata() {
3851
auto metadata = std::make_unique<TableMetadata>();
@@ -41,11 +54,14 @@ std::unique_ptr<TableMetadata> CreateBaseMetadata() {
4154
metadata->location = "s3://bucket/test";
4255
metadata->last_sequence_number = 0;
4356
metadata->last_updated_ms = TimePointMs{std::chrono::milliseconds(1000)};
44-
metadata->last_column_id = 0;
57+
metadata->last_column_id = 3;
58+
metadata->current_schema_id = 0;
59+
metadata->schemas.push_back(CreateTestSchema());
4560
metadata->default_spec_id = PartitionSpec::kInitialSpecId;
4661
metadata->last_partition_id = 0;
4762
metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId;
4863
metadata->default_sort_order_id = SortOrder::kInitialSortOrderId;
64+
metadata->sort_orders.push_back(SortOrder::Unsorted());
4965
metadata->next_row_id = TableMetadata::kInitialRowId;
5066
return metadata;
5167
}
@@ -124,17 +140,121 @@ TEST(TableMetadataBuilderTest, AssignUUID) {
124140
EXPECT_EQ(metadata->table_uuid, "TEST-UUID-ABCD"); // Original case preserved
125141
}
126142

143+
// Test AddSortOrder
144+
TEST(TableMetadataBuilderTest, AddSortOrderBasic) {
145+
auto base = CreateBaseMetadata();
146+
auto builder = TableMetadataBuilder::BuildFrom(base.get());
147+
auto schema = CreateTestSchema();
148+
149+
// 1. Add unsorted - should reuse existing unsorted order
150+
builder->AddSortOrder(SortOrder::Unsorted());
151+
ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
152+
ASSERT_EQ(metadata->sort_orders.size(), 1);
153+
EXPECT_TRUE(metadata->sort_orders[0]->is_unsorted());
154+
155+
// 2. Add basic sort order
156+
builder = TableMetadataBuilder::BuildFrom(base.get());
157+
SortField field1(1, Transform::Identity(), SortDirection::kAscending,
158+
NullOrder::kFirst);
159+
ICEBERG_UNWRAP_OR_FAIL(auto order1,
160+
SortOrder::Make(*schema, 1, std::vector<SortField>{field1}));
161+
builder->AddSortOrder(std::move(order1));
162+
ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
163+
ASSERT_EQ(metadata->sort_orders.size(), 2);
164+
EXPECT_EQ(metadata->sort_orders[1]->order_id(), 1);
165+
166+
// 3. Add duplicate - should be idempotent
167+
builder = TableMetadataBuilder::BuildFrom(base.get());
168+
ICEBERG_UNWRAP_OR_FAIL(auto order2,
169+
SortOrder::Make(*schema, 1, std::vector<SortField>{field1}));
170+
ICEBERG_UNWRAP_OR_FAIL(auto order3,
171+
SortOrder::Make(*schema, 1, std::vector<SortField>{field1}));
172+
builder->AddSortOrder(std::move(order2));
173+
builder->AddSortOrder(std::move(order3)); // Duplicate
174+
ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
175+
ASSERT_EQ(metadata->sort_orders.size(), 2); // Only one added
176+
177+
// 4. Add multiple different orders + verify ID reassignment
178+
builder = TableMetadataBuilder::BuildFrom(base.get());
179+
SortField field2(2, Transform::Identity(), SortDirection::kDescending,
180+
NullOrder::kLast);
181+
// User provides ID=99, Builder should reassign to ID=1
182+
ICEBERG_UNWRAP_OR_FAIL(auto order4,
183+
SortOrder::Make(*schema, 99, std::vector<SortField>{field1}));
184+
ICEBERG_UNWRAP_OR_FAIL(
185+
auto order5, SortOrder::Make(*schema, 2, std::vector<SortField>{field1, field2}));
186+
builder->AddSortOrder(std::move(order4));
187+
builder->AddSortOrder(std::move(order5));
188+
ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
189+
ASSERT_EQ(metadata->sort_orders.size(), 3);
190+
EXPECT_EQ(metadata->sort_orders[1]->order_id(), 1); // Reassigned from 99
191+
EXPECT_EQ(metadata->sort_orders[2]->order_id(), 2);
192+
}
193+
194+
TEST(TableMetadataBuilderTest, AddSortOrderInvalid) {
195+
auto base = CreateBaseMetadata();
196+
auto schema = CreateTestSchema();
197+
198+
// 1. Invalid field ID
199+
auto builder = TableMetadataBuilder::BuildFrom(base.get());
200+
SortField invalid_field(999, Transform::Identity(), SortDirection::kAscending,
201+
NullOrder::kFirst);
202+
ICEBERG_UNWRAP_OR_FAIL(auto order1,
203+
SortOrder::Make(1, std::vector<SortField>{invalid_field}));
204+
builder->AddSortOrder(std::move(order1));
205+
ASSERT_THAT(builder->Build(), IsError(ErrorKind::kCommitFailed));
206+
ASSERT_THAT(builder->Build(),
207+
HasErrorMessage("Cannot find source column for sort field"));
208+
209+
// 2. Invalid transform (Day transform on string type)
210+
builder = TableMetadataBuilder::BuildFrom(base.get());
211+
SortField invalid_transform(2, Transform::Day(), SortDirection::kAscending,
212+
NullOrder::kFirst);
213+
ICEBERG_UNWRAP_OR_FAIL(auto order2,
214+
SortOrder::Make(1, std::vector<SortField>{invalid_transform}));
215+
builder->AddSortOrder(std::move(order2));
216+
ASSERT_THAT(builder->Build(), IsError(ErrorKind::kCommitFailed));
217+
ASSERT_THAT(builder->Build(), HasErrorMessage("Invalid source type"));
218+
219+
// 3. Without schema
220+
builder = TableMetadataBuilder::BuildFromEmpty(2);
221+
builder->AssignUUID("test-uuid");
222+
SortField field(1, Transform::Identity(), SortDirection::kAscending, NullOrder::kFirst);
223+
ICEBERG_UNWRAP_OR_FAIL(auto order3,
224+
SortOrder::Make(*schema, 1, std::vector<SortField>{field}));
225+
builder->AddSortOrder(std::move(order3));
226+
ASSERT_THAT(builder->Build(), IsError(ErrorKind::kCommitFailed));
227+
ASSERT_THAT(builder->Build(), HasErrorMessage("Cannot find current schema"));
228+
}
229+
127230
// Test applying TableUpdate to builder
128231
TEST(TableMetadataBuilderTest, ApplyUpdate) {
232+
// Use base metadata with schema for all update tests
233+
auto base = CreateBaseMetadata();
234+
auto builder = TableMetadataBuilder::BuildFrom(base.get());
235+
129236
// Apply AssignUUID update
130-
auto builder = TableMetadataBuilder::BuildFromEmpty(2);
131-
table::AssignUUID update("apply-uuid");
132-
update.ApplyTo(*builder);
133-
// TODO(Li Feiyang): Add more update and `apply` once other build methods are
134-
// implemented
237+
table::AssignUUID uuid_update("apply-uuid");
238+
uuid_update.ApplyTo(*builder);
239+
240+
// Apply AddSortOrder update
241+
auto schema = CreateTestSchema();
242+
SortField sort_field(1, Transform::Identity(), SortDirection::kAscending,
243+
NullOrder::kFirst);
244+
ICEBERG_UNWRAP_OR_FAIL(auto sort_order_unique,
245+
SortOrder::Make(*schema, 1, std::vector<SortField>{sort_field}));
246+
auto sort_order = std::shared_ptr<SortOrder>(std::move(sort_order_unique));
247+
248+
table::AddSortOrder sort_order_update(sort_order);
249+
sort_order_update.ApplyTo(*builder);
250+
251+
// TODO(Li Feiyang): Apply more build methods once they are implemented
135252

253+
// Verify all updates were applied
136254
ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
137255
EXPECT_EQ(metadata->table_uuid, "apply-uuid");
256+
ASSERT_EQ(metadata->sort_orders.size(), 2);
257+
EXPECT_EQ(metadata->sort_orders[1]->order_id(), 1);
138258
}
139259

140260
} // namespace iceberg

0 commit comments

Comments
 (0)