|
37 | 37 | #include "iceberg/exception.h" |
38 | 38 | #include "iceberg/file_io.h" |
39 | 39 | #include "iceberg/json_internal.h" |
| 40 | +#include "iceberg/partition_field.h" |
40 | 41 | #include "iceberg/partition_spec.h" |
41 | 42 | #include "iceberg/result.h" |
42 | 43 | #include "iceberg/schema.h" |
43 | 44 | #include "iceberg/snapshot.h" |
44 | 45 | #include "iceberg/sort_order.h" |
45 | 46 | #include "iceberg/table_properties.h" |
46 | 47 | #include "iceberg/table_update.h" |
| 48 | +#include "iceberg/util/checked_cast.h" |
47 | 49 | #include "iceberg/util/error_collector.h" |
48 | 50 | #include "iceberg/util/gzip_internal.h" |
49 | 51 | #include "iceberg/util/location_util.h" |
@@ -547,9 +549,10 @@ Result<int32_t> TableMetadataBuilder::Impl::AddSortOrder(const SortOrder& order) |
547 | 549 | bool is_new_order = |
548 | 550 | last_added_order_id_.has_value() && |
549 | 551 | std::ranges::find_if(changes_, [new_order_id](const auto& change) { |
550 | | - auto* add_sort_order = dynamic_cast<table::AddSortOrder*>(change.get()); |
551 | | - return add_sort_order && |
552 | | - add_sort_order->sort_order()->order_id() == new_order_id; |
| 552 | + return change->kind() == TableUpdate::Kind::kAddSortOrder && |
| 553 | + internal::checked_cast<const table::AddSortOrder&>(*change) |
| 554 | + .sort_order() |
| 555 | + ->order_id() == new_order_id; |
553 | 556 | }) != changes_.cend(); |
554 | 557 | last_added_order_id_ = is_new_order ? std::make_optional(new_order_id) : std::nullopt; |
555 | 558 | return new_order_id; |
@@ -582,52 +585,63 @@ Result<int32_t> TableMetadataBuilder::Impl::AddSortOrder(const SortOrder& order) |
582 | 585 | Status TableMetadataBuilder::Impl::SetDefaultPartitionSpec(int32_t spec_id) { |
583 | 586 | if (spec_id == -1) { |
584 | 587 | if (!last_added_spec_id_.has_value()) { |
585 | | - return InvalidArgument( |
| 588 | + return ValidationFailed( |
586 | 589 | "Cannot set last added partition spec: no partition spec has been added"); |
587 | 590 | } |
588 | 591 | return SetDefaultPartitionSpec(last_added_spec_id_.value()); |
589 | 592 | } |
590 | 593 |
|
591 | 594 | if (spec_id == metadata_.default_spec_id) { |
| 595 | + // the new spec is already current and no change is needed |
592 | 596 | return {}; |
593 | 597 | } |
594 | 598 |
|
595 | 599 | metadata_.default_spec_id = spec_id; |
596 | | - |
597 | | - changes_.push_back(std::make_unique<table::SetDefaultPartitionSpec>(spec_id)); |
| 600 | + if (last_added_spec_id_ == std::make_optional(spec_id)) { |
| 601 | + changes_.push_back(std::make_unique<table::SetDefaultPartitionSpec>(kLastAdded)); |
| 602 | + } else { |
| 603 | + changes_.push_back(std::make_unique<table::SetDefaultPartitionSpec>(spec_id)); |
| 604 | + } |
598 | 605 | return {}; |
599 | 606 | } |
600 | 607 |
|
601 | 608 | Result<int32_t> TableMetadataBuilder::Impl::AddPartitionSpec(const PartitionSpec& spec) { |
602 | 609 | int32_t new_spec_id = ReuseOrCreateNewPartitionSpecId(spec); |
603 | 610 |
|
604 | | - if (specs_by_id_.find(new_spec_id) != specs_by_id_.end()) { |
| 611 | + if (specs_by_id_.contains(new_spec_id)) { |
605 | 612 | // update last_added_spec_id if the spec was added in this set of changes (since it |
606 | 613 | // is now the last) |
607 | | - bool is_new_spec = last_added_spec_id_.has_value() && |
608 | | - std::ranges::find_if(changes_, [new_spec_id](const auto& change) { |
609 | | - auto* add_spec = |
610 | | - dynamic_cast<table::AddPartitionSpec*>(change.get()); |
611 | | - return add_spec && add_spec->spec()->spec_id() == new_spec_id; |
612 | | - }) != changes_.cend(); |
| 614 | + bool is_new_spec = |
| 615 | + last_added_spec_id_.has_value() && |
| 616 | + std::ranges::find_if(changes_, [new_spec_id](const auto& change) { |
| 617 | + return change->kind() == TableUpdate::Kind::kAddPartitionSpec && |
| 618 | + internal::checked_cast<const table::AddPartitionSpec&>(*change) |
| 619 | + .spec() |
| 620 | + ->spec_id() == new_spec_id; |
| 621 | + }) != changes_.cend(); |
613 | 622 | last_added_spec_id_ = is_new_spec ? std::make_optional(new_spec_id) : std::nullopt; |
614 | 623 | return new_spec_id; |
615 | 624 | } |
616 | 625 |
|
617 | 626 | // Get current schema and validate the partition spec against it |
618 | 627 | ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata_.Schema()); |
619 | 628 | ICEBERG_RETURN_UNEXPECTED(spec.Validate(*schema, /*allow_missing_fields=*/false)); |
| 629 | + ICEBERG_CHECK( |
| 630 | + metadata_.format_version > 1 || PartitionSpec::HasSequentialFieldIds(spec), |
| 631 | + "Spec does not use sequential IDs that are required in v1: {}", spec.ToString()); |
620 | 632 |
|
621 | | - std::shared_ptr<PartitionSpec> new_spec; |
622 | 633 | ICEBERG_ASSIGN_OR_RAISE( |
623 | | - new_spec, |
| 634 | + std::shared_ptr<PartitionSpec> new_spec, |
624 | 635 | PartitionSpec::Make(new_spec_id, std::vector<PartitionField>(spec.fields().begin(), |
625 | 636 | spec.fields().end()))); |
| 637 | + metadata_.last_partition_id = |
| 638 | + std::max(metadata_.last_partition_id, new_spec->last_assigned_field_id()); |
626 | 639 | metadata_.partition_specs.push_back(new_spec); |
627 | 640 | specs_by_id_.emplace(new_spec_id, new_spec); |
628 | 641 |
|
629 | 642 | changes_.push_back(std::make_unique<table::AddPartitionSpec>(new_spec)); |
630 | 643 | last_added_spec_id_ = new_spec_id; |
| 644 | + |
631 | 645 | return new_spec_id; |
632 | 646 | } |
633 | 647 |
|
@@ -714,10 +728,10 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSortOrderId( |
714 | 728 |
|
715 | 729 | int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId( |
716 | 730 | const PartitionSpec& new_spec) { |
717 | | - // determine the next spec id |
| 731 | + // if the spec already exists, use the same ID. otherwise, use the highest ID + 1. |
718 | 732 | int32_t new_spec_id = PartitionSpec::kInitialSpecId; |
719 | 733 | for (const auto& spec : metadata_.partition_specs) { |
720 | | - if (spec->SameSpec(new_spec)) { |
| 734 | + if (new_spec.CompatibleWith(*spec)) { |
721 | 735 | return spec->spec_id(); |
722 | 736 | } else if (new_spec_id <= spec->spec_id()) { |
723 | 737 | new_spec_id = spec->spec_id() + 1; |
|
0 commit comments