Skip to content

Commit 38cc693

Browse files
committed
resolve comments
1 parent 4822613 commit 38cc693

6 files changed

Lines changed: 61 additions & 97 deletions

File tree

src/iceberg/partition_spec.cc

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ bool PartitionSpec::Equals(const PartitionSpec& other) const {
131131
}
132132

133133
Status PartitionSpec::Validate(const Schema& schema, bool allow_missing_fields) const {
134+
ICEBERG_RETURN_UNEXPECTED(ValidatePartitionName(schema, *this));
135+
134136
std::unordered_map<int32_t, int32_t> parents = IndexParents(schema);
135137
for (const auto& partition_field : fields_) {
136138
ICEBERG_ASSIGN_OR_RAISE(auto source_field,
@@ -177,9 +179,10 @@ Status PartitionSpec::Validate(const Schema& schema, bool allow_missing_fields)
177179
return {};
178180
}
179181

180-
Status PartitionSpec::ValidatePartitionName(const Schema& schema) const {
182+
Status PartitionSpec::ValidatePartitionName(const Schema& schema,
183+
const PartitionSpec& spec) {
181184
std::unordered_set<std::string> partition_names;
182-
for (const auto& partition_field : fields_) {
185+
for (const auto& partition_field : spec.fields()) {
183186
auto name = std::string(partition_field.name());
184187
ICEBERG_PRECHECK(!name.empty(), "Cannot use empty partition name: {}", name);
185188

@@ -190,9 +193,10 @@ Status PartitionSpec::ValidatePartitionName(const Schema& schema) const {
190193

191194
ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldByName(name));
192195
auto transform_type = partition_field.transform()->transform_type();
193-
if (transform_type == TransformType::kIdentity) {
194-
// for identity transform case we allow conflicts between partition and schema field
195-
// name as long as they are sourced from the same schema field
196+
if (transform_type == TransformType::kIdentity ||
197+
transform_type == TransformType::kVoid) {
198+
// for identity/nulls transform case we allow conflicts between partition and schema
199+
// field name as long as they are sourced from the same schema field
196200
if (schema_field.has_value() &&
197201
schema_field.value().get().field_id() != partition_field.source_id()) {
198202
return InvalidArgument(

src/iceberg/partition_spec.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,14 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
7979

8080
/// \brief Validates the partition spec against a schema.
8181
/// \param schema The schema to validate against.
82-
/// \param allowMissingFields Whether to skip validation for partition fields whose
82+
/// \param allow_missing_fields Whether to skip validation for partition fields whose
8383
/// source columns have been dropped from the schema.
8484
/// \return Error status if the partition spec is invalid.
8585
Status Validate(const Schema& schema, bool allow_missing_fields) const;
8686

8787
// \brief Validates the partition field names are unique within the partition spec and
8888
// schema.
89-
Status ValidatePartitionName(const Schema& schema) const;
89+
static Status ValidatePartitionName(const Schema& schema, const PartitionSpec& spec);
9090

9191
/// \brief Get the partition fields by source ID.
9292
/// \param source_id The id of the source field.

src/iceberg/table_metadata.cc

Lines changed: 42 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -457,20 +457,6 @@ class TableMetadataBuilder::Impl {
457457
/// \return The ID to use for this schema (reused if exists, new otherwise
458458
int32_t ReuseOrCreateNewSchemaId(const Schema& new_schema) const;
459459

460-
/// \brief Update partition spec to use a new schema
461-
/// \param schema The new schema to bind to
462-
/// \param partition_spec The partition spec to update
463-
/// \return The updated partition spec bound to the new schema
464-
static Result<std::shared_ptr<PartitionSpec>> UpdateSpecSchema(
465-
const Schema& schema, const PartitionSpec& partition_spec);
466-
467-
/// \brief Update sort order to use a new schema
468-
/// \param schema The new schema to bind to
469-
/// \param sort_order The sort order to update
470-
/// \return The updated sort order bound to the new schema
471-
static Result<std::unique_ptr<SortOrder>> UpdateSortOrderSchema(
472-
const Schema& schema, const SortOrder& sort_order);
473-
474460
private:
475461
// Base metadata (nullptr for new tables)
476462
const TableMetadata* base_;
@@ -542,7 +528,7 @@ Status TableMetadataBuilder::Impl::UpgradeFormatVersion(int8_t new_format_versio
542528
}
543529

544530
Status TableMetadataBuilder::Impl::SetDefaultSortOrder(int32_t order_id) {
545-
if (order_id == -1) {
531+
if (order_id == kLastAdded) {
546532
if (!last_added_order_id_.has_value()) {
547533
return InvalidArgument(
548534
"Cannot set last added sort order: no sort order has been added");
@@ -607,7 +593,7 @@ Result<int32_t> TableMetadataBuilder::Impl::AddSortOrder(const SortOrder& order)
607593
}
608594

609595
Status TableMetadataBuilder::Impl::SetDefaultPartitionSpec(int32_t spec_id) {
610-
if (spec_id == -1) {
596+
if (spec_id == kLastAdded) {
611597
if (!last_added_spec_id_.has_value()) {
612598
return ValidationFailed(
613599
"Cannot set last added partition spec: no partition spec has been added");
@@ -656,8 +642,7 @@ Result<int32_t> TableMetadataBuilder::Impl::AddPartitionSpec(const PartitionSpec
656642

657643
ICEBERG_ASSIGN_OR_RAISE(
658644
std::shared_ptr<PartitionSpec> new_spec,
659-
PartitionSpec::Make(new_spec_id, std::vector<PartitionField>(spec.fields().begin(),
660-
spec.fields().end())));
645+
PartitionSpec::Make(new_spec_id, spec.fields() | std::ranges::to<std::vector>()));
661646
metadata_.last_partition_id =
662647
std::max(metadata_.last_partition_id, new_spec->last_assigned_field_id());
663648
metadata_.partition_specs.push_back(new_spec);
@@ -718,30 +703,41 @@ Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) {
718703
}
719704

720705
auto it = schemas_by_id_.find(schema_id);
721-
if (it == schemas_by_id_.end()) {
722-
return InvalidArgument("Cannot set current schema to unknown schema: {}", schema_id);
723-
}
706+
ICEBERG_PRECHECK(it != schemas_by_id_.end(),
707+
"Cannot set current schema to unknown schema: {}", schema_id);
724708
const auto& schema = it->second;
725709

726710
// Rebuild all partition specs for the new current schema
727711
std::vector<std::shared_ptr<PartitionSpec>> updated_specs;
728-
for (const auto& spec : metadata_.partition_specs) {
729-
ICEBERG_ASSIGN_OR_RAISE(auto updated_spec, UpdateSpecSchema(*schema, *spec));
712+
for (const auto& partition_spec : metadata_.partition_specs) {
713+
ICEBERG_ASSIGN_OR_RAISE(
714+
auto updated_spec,
715+
PartitionSpec::Make(partition_spec->spec_id(),
716+
partition_spec->fields() | std::ranges::to<std::vector>()));
717+
718+
ICEBERG_RETURN_UNEXPECTED(
719+
PartitionSpec::ValidatePartitionName(*schema, *updated_spec));
720+
730721
updated_specs.push_back(std::move(updated_spec));
731722
}
732723
metadata_.partition_specs = std::move(updated_specs);
724+
733725
specs_by_id_.clear();
734726
for (const auto& spec : metadata_.partition_specs) {
735727
specs_by_id_.emplace(spec->spec_id(), spec);
736728
}
737729

738730
// Rebuild all sort orders for the new current schema
739731
std::vector<std::shared_ptr<SortOrder>> updated_orders;
740-
for (const auto& order : metadata_.sort_orders) {
741-
ICEBERG_ASSIGN_OR_RAISE(auto updated_order, UpdateSortOrderSchema(*schema, *order));
732+
for (const auto& sort_order : metadata_.sort_orders) {
733+
ICEBERG_ASSIGN_OR_RAISE(
734+
auto updated_order,
735+
SortOrder::Make(sort_order->order_id(),
736+
sort_order->fields() | std::ranges::to<std::vector>()));
742737
updated_orders.push_back(std::move(updated_order));
743738
}
744739
metadata_.sort_orders = std::move(updated_orders);
740+
745741
sort_orders_by_id_.clear();
746742
for (const auto& order : metadata_.sort_orders) {
747743
sort_orders_by_id_.emplace(order->order_id(), order);
@@ -768,8 +764,8 @@ Status TableMetadataBuilder::Impl::RemoveSchemas(
768764

769765
if (!schema_ids.empty()) {
770766
metadata_.schemas = metadata_.schemas | std::views::filter([&](const auto& schema) {
771-
return !schema_ids.contains(
772-
schema->schema_id().value_or(Schema::kInitialSchemaId));
767+
return schema->schema_id().has_value() &&
768+
!schema_ids.contains(schema->schema_id().value());
773769
}) |
774770
std::ranges::to<std::vector<std::shared_ptr<Schema>>>();
775771
changes_.push_back(std::make_unique<table::RemoveSchemas>(schema_ids));
@@ -787,8 +783,8 @@ Result<int32_t> TableMetadataBuilder::Impl::AddSchema(const Schema& schema,
787783
ICEBERG_RETURN_UNEXPECTED(schema.Validate(metadata_.format_version));
788784

789785
auto new_schema_id = ReuseOrCreateNewSchemaId(schema);
790-
if (schemas_by_id_.find(new_schema_id) != schemas_by_id_.end() &&
791-
new_last_column_id == metadata_.last_column_id) {
786+
auto schema_found = schemas_by_id_.contains(new_schema_id);
787+
if (schema_found && new_last_column_id == metadata_.last_column_id) {
792788
// update last_added_schema_id if the schema was added in this set of changes (since
793789
// it is now the last)
794790
bool is_new_schema =
@@ -797,24 +793,27 @@ Result<int32_t> TableMetadataBuilder::Impl::AddSchema(const Schema& schema,
797793
if (change->kind() != TableUpdate::Kind::kAddSchema) {
798794
return false;
799795
}
800-
auto* add_schema = dynamic_cast<table::AddSchema*>(change.get());
801-
return add_schema->schema()->schema_id().value_or(Schema::kInitialSchemaId) ==
802-
new_schema_id;
796+
auto* add_schema = internal::checked_cast<table::AddSchema*>(change.get());
797+
return add_schema->schema()->schema_id() == std::make_optional(new_schema_id);
803798
});
804799
last_added_schema_id_ =
805800
is_new_schema ? std::make_optional(new_schema_id) : std::nullopt;
806801
return new_schema_id;
807802
}
808803

804+
metadata_.last_column_id = new_last_column_id;
805+
809806
auto new_schema =
810807
std::make_shared<Schema>(schema.fields() | std::ranges::to<std::vector>(),
811808
new_schema_id, schema.IdentifierFieldIds());
812809

813-
metadata_.schemas.push_back(new_schema);
814-
schemas_by_id_.emplace(new_schema_id, new_schema);
810+
if (!schema_found) {
811+
metadata_.schemas.push_back(new_schema);
812+
schemas_by_id_.emplace(new_schema_id, new_schema);
813+
}
815814

816815
changes_.push_back(std::make_unique<table::AddSchema>(new_schema, new_last_column_id));
817-
metadata_.last_column_id = new_last_column_id;
816+
818817
last_added_schema_id_ = new_schema_id;
819818

820819
return new_schema_id;
@@ -834,14 +833,16 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataBuilder::Impl::Build() {
834833
auto schema_it = schemas_by_id_.find(current_schema_id);
835834
ICEBERG_PRECHECK(schema_it != schemas_by_id_.end(),
836835
"Current schema ID {} not found in schemas", current_schema_id);
837-
const auto& current_schema = schema_it->second;
838836
{
837+
const auto& current_schema = schema_it->second;
838+
839839
auto spec_it = specs_by_id_.find(metadata_.default_spec_id);
840-
// FIXME(GuoTao.yu): Default spec must exist after we support update partition spec
841-
if (spec_it != specs_by_id_.end()) {
842-
ICEBERG_RETURN_UNEXPECTED(
843-
spec_it->second->Validate(*current_schema, /*allow_missing_fields=*/false));
844-
}
840+
ICEBERG_PRECHECK(spec_it != specs_by_id_.end(),
841+
"Default spec ID {} not found in partition specs",
842+
metadata_.default_spec_id);
843+
ICEBERG_RETURN_UNEXPECTED(
844+
spec_it->second->Validate(*current_schema, /*allow_missing_fields=*/false));
845+
845846
auto sort_order_it = sort_orders_by_id_.find(metadata_.default_sort_order_id);
846847
ICEBERG_PRECHECK(sort_order_it != sort_orders_by_id_.end(),
847848
"Default sort order ID {} not found in sort orders",
@@ -913,42 +914,6 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
913914
return new_schema_id;
914915
}
915916

916-
Result<std::shared_ptr<PartitionSpec>> TableMetadataBuilder::Impl::UpdateSpecSchema(
917-
const Schema& schema, const PartitionSpec& partition_spec) {
918-
// UpdateSpecSchema: Update partition spec to use the new schema
919-
// This preserves the partition spec structure but rebinds it to the new schema
920-
921-
// Copy all fields from the partition spec. IDs should not change.
922-
std::vector<PartitionField> fields;
923-
fields.reserve(partition_spec.fields().size());
924-
int32_t last_assigned_field_id = PartitionSpec::kLegacyPartitionDataIdStart;
925-
for (const auto& field : partition_spec.fields()) {
926-
fields.push_back(field);
927-
last_assigned_field_id = std::max(last_assigned_field_id, field.field_id());
928-
}
929-
930-
// Build without validation because the schema may have changed in a way that
931-
// makes this spec invalid. The spec should still be preserved so that older
932-
// metadata can be interpreted.
933-
ICEBERG_ASSIGN_OR_RAISE(auto new_partition_spec,
934-
PartitionSpec::Make(partition_spec.spec_id(), std::move(fields),
935-
last_assigned_field_id));
936-
937-
// Validate the new partition name against the new schema
938-
ICEBERG_RETURN_UNEXPECTED(new_partition_spec->ValidatePartitionName(schema));
939-
return new_partition_spec;
940-
}
941-
942-
Result<std::unique_ptr<SortOrder>> TableMetadataBuilder::Impl::UpdateSortOrderSchema(
943-
const Schema& schema, const SortOrder& sort_order) {
944-
// Build without validation because the schema may have changed in a way that
945-
// makes this order invalid. The order should still be preserved so that older
946-
// metadata can be interpreted.
947-
auto fields = sort_order.fields();
948-
std::vector<SortField> new_fields{fields.begin(), fields.end()};
949-
return SortOrder::Make(sort_order.order_id(), std::move(new_fields));
950-
}
951-
952917
TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
953918
: impl_(std::make_unique<Impl>(format_version)) {}
954919

src/iceberg/table_metadata.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector {
249249
/// \param schema The schema to set as current
250250
/// \param new_last_column_id The highest column ID in the schema
251251
/// \return Reference to this builder for method chaining
252-
TableMetadataBuilder& SetCurrentSchema(std::shared_ptr<Schema> const& schema,
252+
TableMetadataBuilder& SetCurrentSchema(const std::shared_ptr<Schema>& schema,
253253
int32_t new_last_column_id);
254254

255255
/// \brief Set the current schema by schema ID
@@ -262,7 +262,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector {
262262
///
263263
/// \param schema The schema to add
264264
/// \return Reference to this builder for method chaining
265-
TableMetadataBuilder& AddSchema(std::shared_ptr<Schema> const& schema);
265+
TableMetadataBuilder& AddSchema(const std::shared_ptr<Schema>& schema);
266266

267267
/// \brief Set the default partition spec for the table
268268
///

src/iceberg/test/metadata_io_test.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
3131
#include "iceberg/file_io.h"
3232
#include "iceberg/json_internal.h"
33+
#include "iceberg/partition_spec.h"
3334
#include "iceberg/result.h"
3435
#include "iceberg/schema.h"
3536
#include "iceberg/snapshot.h"
@@ -65,6 +66,7 @@ class MetadataIOTest : public TempFileTestBase {
6566
.last_sequence_number = 0,
6667
.schemas = {schema},
6768
.current_schema_id = 1,
69+
.partition_specs = {PartitionSpec::Unpartitioned()},
6870
.default_spec_id = 0,
6971
.last_partition_id = 0,
7072
.properties = TableProperties::FromMap({{"key", "value"}}),

src/iceberg/test/table_metadata_builder_test.cc

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ TEST(TableMetadataBuilderTest, BuildFromEmpty) {
8181
auto schema = CreateTestSchema();
8282
builder->SetCurrentSchema(schema, schema->HighestFieldId().value());
8383
builder->SetDefaultSortOrder(SortOrder::Unsorted());
84+
builder->SetDefaultPartitionSpec(PartitionSpec::Unpartitioned());
8485
builder->AssignUUID("new-uuid-5678");
8586

8687
ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
@@ -160,6 +161,7 @@ TEST(TableMetadataBuilderTest, AssignUUID) {
160161
auto schema = CreateTestSchema();
161162
builder->SetCurrentSchema(schema, schema->HighestFieldId().value());
162163
builder->SetDefaultSortOrder(SortOrder::Unsorted());
164+
builder->SetDefaultPartitionSpec(PartitionSpec::Unpartitioned());
163165
builder->AssignUUID("new-uuid-5678");
164166
ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
165167
EXPECT_EQ(metadata->table_uuid, "new-uuid-5678");
@@ -187,6 +189,7 @@ TEST(TableMetadataBuilderTest, AssignUUID) {
187189
builder = TableMetadataBuilder::BuildFromEmpty(2);
188190
builder->SetCurrentSchema(schema, schema->HighestFieldId().value());
189191
builder->SetDefaultSortOrder(SortOrder::Unsorted());
192+
builder->SetDefaultPartitionSpec(PartitionSpec::Unpartitioned());
190193
builder->AssignUUID();
191194
ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
192195
EXPECT_FALSE(metadata->table_uuid.empty());
@@ -238,6 +241,7 @@ TEST(TableMetadataBuilderTest, UpgradeFormatVersion) {
238241
auto schema = CreateTestSchema();
239242
builder->SetCurrentSchema(schema, schema->HighestFieldId().value());
240243
builder->SetDefaultSortOrder(SortOrder::Unsorted());
244+
builder->SetDefaultPartitionSpec(PartitionSpec::Unpartitioned());
241245
builder->UpgradeFormatVersion(2);
242246

243247
ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
@@ -438,17 +442,6 @@ TEST(TableMetadataBuilderTest, AddSchemaBasic) {
438442
EXPECT_EQ(metadata->last_column_id, 6);
439443
}
440444

441-
TEST(TableMetadataBuilderTest, AddSchemaInvalid) {
442-
auto base = CreateBaseMetadata();
443-
444-
auto builder = TableMetadataBuilder::BuildFrom(base.get());
445-
auto field_low_id = SchemaField::MakeRequired(1, "low_id", int32());
446-
auto schema_low_id =
447-
std::make_shared<Schema>(std::vector<SchemaField>{field_low_id}, 1);
448-
// Manually try to set a lower last_column_id via SetCurrentSchema
449-
// This is tested indirectly through AddSchemaInternal validation
450-
}
451-
452445
// Test SetCurrentSchema
453446
TEST(TableMetadataBuilderTest, SetCurrentSchemaBasic) {
454447
auto base = CreateBaseMetadata();

0 commit comments

Comments
 (0)