Skip to content

Commit 4044a87

Browse files
committed
1
1 parent 6541d5e commit 4044a87

4 files changed

Lines changed: 90 additions & 17 deletions

File tree

src/iceberg/table_requirements.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,11 @@ void TableUpdateContext::RequireNoBranchesChanged() {
9797
}
9898
}
9999

100+
bool TableUpdateContext::AddChangedRef(const std::string& ref_name) {
101+
auto [it, inserted] = changed_refs_.insert(ref_name);
102+
return inserted;
103+
}
104+
100105
Result<std::vector<std::unique_ptr<TableRequirement>>> TableRequirements::ForCreateTable(
101106
const std::vector<std::unique_ptr<TableUpdate>>& table_updates) {
102107
TableUpdateContext context(nullptr, false);

src/iceberg/table_requirements.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
/// for optimistic concurrency control when committing table changes.
2828

2929
#include <memory>
30+
#include <string>
31+
#include <unordered_set>
3032
#include <vector>
3133

3234
#include "iceberg/iceberg_export.h"
@@ -82,6 +84,11 @@ class ICEBERG_EXPORT TableUpdateContext {
8284
/// \brief Require that no branches have been changed
8385
void RequireNoBranchesChanged();
8486

87+
/// \brief Track a changed ref and return whether it was newly added
88+
/// \param ref_name The name of the ref being changed
89+
/// \return true if this is the first time the ref is being changed
90+
bool AddChangedRef(const std::string& ref_name);
91+
8592
private:
8693
const TableMetadata* base_;
8794
const bool is_replace_;
@@ -94,6 +101,9 @@ class ICEBERG_EXPORT TableUpdateContext {
94101
bool added_last_assigned_partition_id_ = false;
95102
bool added_default_spec_id_ = false;
96103
bool added_default_sort_order_id_ = false;
104+
105+
// Track refs that have been changed to avoid duplicate requirements
106+
std::unordered_set<std::string> changed_refs_;
97107
};
98108

99109
/// \brief Factory class for generating table requirements

src/iceberg/table_update.cc

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ void AddSnapshot::GenerateRequirements(TableUpdateContext& context) const {
142142
void RemoveSnapshots::ApplyTo(TableMetadataBuilder& builder) const {}
143143

144144
void RemoveSnapshots::GenerateRequirements(TableUpdateContext& context) const {
145-
throw IcebergError("RemoveTableSnapshots::GenerateRequirements not implemented");
145+
// RemoveSnapshots doesn't generate any requirements
146146
}
147147

148148
// RemoveSnapshotRef
@@ -162,7 +162,16 @@ void SetSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const {
162162
}
163163

164164
void SetSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
165-
throw NotImplemented("SetTableSnapshotRef::GenerateRequirements not implemented");
165+
bool added = context.AddChangedRef(ref_name_);
166+
if (added && context.base() != nullptr && !context.is_replace()) {
167+
const auto& refs = context.base()->refs;
168+
auto it = refs.find(ref_name_);
169+
// Require that the ref does not exist (nullopt) or is the same as the base snapshot
170+
std::optional<int64_t> base_snapshot_id =
171+
(it != refs.end()) ? std::make_optional(it->second->snapshot_id) : std::nullopt;
172+
context.AddRequirement(
173+
std::make_unique<table::AssertRefSnapshotID>(ref_name_, base_snapshot_id));
174+
}
166175
}
167176

168177
// SetProperties

src/iceberg/test/table_requirements_test.cc

Lines changed: 64 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "iceberg/table_requirements.h"
2121

2222
#include <memory>
23+
#include <ranges>
2324
#include <string>
2425
#include <vector>
2526

@@ -33,6 +34,7 @@
3334
#include "iceberg/table_requirement.h"
3435
#include "iceberg/table_update.h"
3536
#include "iceberg/test/matchers.h"
37+
#include "iceberg/type.h"
3638

3739
namespace iceberg {
3840

@@ -60,21 +62,17 @@ std::unique_ptr<TableMetadata> CreateBaseMetadata(
6062
// Helper function to create a simple schema for tests
6163
std::shared_ptr<Schema> CreateTestSchema(int32_t schema_id = 0) {
6264
std::vector<SchemaField> fields;
63-
fields.emplace_back(SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()));
65+
fields.emplace_back(SchemaField::MakeRequired(1, "id", int32()));
6466
return std::make_shared<Schema>(std::move(fields), schema_id);
6567
}
6668

6769
// Helper function to count requirements of a specific type
6870
template <typename T>
6971
int CountRequirementsOfType(
7072
const std::vector<std::unique_ptr<TableRequirement>>& requirements) {
71-
int count = 0;
72-
for (const auto& req : requirements) {
73-
if (dynamic_cast<T*>(req.get()) != nullptr) {
74-
count++;
75-
}
76-
}
77-
return count;
73+
return std::ranges::count_if(requirements, [](const auto& req) {
74+
return dynamic_cast<T*>(req.get()) != nullptr;
75+
});
7876
}
7977

8078
// Helper function to add a branch to metadata
@@ -815,47 +813,98 @@ TEST(TableRequirementsTest, SetDefaultSortOrderFailure) {
815813

816814
TEST(TableRequirementsTest, AddSnapshot) {
817815
auto metadata = CreateBaseMetadata();
818-
std::vector<std::unique_ptr<TableUpdate>> updates;
819816

817+
std::vector<std::unique_ptr<TableUpdate>> updates;
820818
auto snapshot = std::make_shared<Snapshot>();
821819
snapshot->snapshot_id = 1;
822820
snapshot->sequence_number = 1;
823821
snapshot->timestamp_ms = TimePointMs{std::chrono::milliseconds(1000)};
824822
snapshot->manifest_list = "s3://bucket/manifest_list";
825-
826823
updates.push_back(std::make_unique<table::AddSnapshot>(snapshot));
827824

828825
auto result = TableRequirements::ForUpdateTable(*metadata, updates);
829826
ASSERT_THAT(result, IsOk());
830827

831828
auto& requirements = result.value();
832-
// AddSnapshot doesn't add additional requirements
833829
ASSERT_EQ(requirements.size(), 1);
834830
EXPECT_EQ(CountRequirementsOfType<table::AssertUUID>(requirements), 1);
835831

836-
// Validate against base metadata
837832
for (const auto& req : requirements) {
838833
EXPECT_THAT(req->Validate(metadata.get()), IsOk());
839834
}
840835
}
841836

837+
// RemoveSnapshots Tests
838+
839+
TEST(TableRequirementsTest, RemoveSnapshots) {
840+
auto metadata = CreateBaseMetadata();
841+
842+
std::vector<std::unique_ptr<TableUpdate>> updates;
843+
updates.push_back(std::make_unique<table::RemoveSnapshots>(std::vector<int64_t>{0}));
844+
845+
auto result = TableRequirements::ForUpdateTable(*metadata, updates);
846+
ASSERT_THAT(result, IsOk());
847+
848+
auto& requirements = result.value();
849+
ASSERT_EQ(requirements.size(), 1);
850+
EXPECT_EQ(CountRequirementsOfType<table::AssertUUID>(requirements), 1);
851+
852+
for (const auto& req : requirements) {
853+
EXPECT_THAT(req->Validate(metadata.get()), IsOk());
854+
}
855+
}
856+
857+
// SetSnapshotRef Tests
858+
859+
TEST(TableRequirementsTest, SetSnapshotRef) {
860+
constexpr int64_t kSnapshotId = 14;
861+
const std::string kRefName = "branch";
862+
863+
auto metadata = CreateBaseMetadata();
864+
AddBranch(*metadata, kRefName, kSnapshotId);
865+
866+
// Multiple updates to same ref should deduplicate
867+
std::vector<std::unique_ptr<TableUpdate>> updates;
868+
updates.push_back(std::make_unique<table::SetSnapshotRef>(kRefName, kSnapshotId,
869+
SnapshotRefType::kBranch));
870+
updates.push_back(std::make_unique<table::SetSnapshotRef>(kRefName, kSnapshotId + 1,
871+
SnapshotRefType::kBranch));
872+
updates.push_back(std::make_unique<table::SetSnapshotRef>(kRefName, kSnapshotId + 2,
873+
SnapshotRefType::kBranch));
874+
875+
auto result = TableRequirements::ForUpdateTable(*metadata, updates);
876+
ASSERT_THAT(result, IsOk());
877+
878+
auto& requirements = result.value();
879+
for (const auto& req : requirements) {
880+
EXPECT_THAT(req->Validate(metadata.get()), IsOk());
881+
}
882+
883+
ASSERT_EQ(requirements.size(), 2);
884+
EXPECT_EQ(CountRequirementsOfType<table::AssertUUID>(requirements), 1);
885+
EXPECT_EQ(CountRequirementsOfType<table::AssertRefSnapshotID>(requirements), 1);
886+
887+
auto* assert_ref = dynamic_cast<table::AssertRefSnapshotID*>(requirements[1].get());
888+
ASSERT_NE(assert_ref, nullptr);
889+
EXPECT_EQ(assert_ref->snapshot_id(), kSnapshotId);
890+
EXPECT_EQ(assert_ref->ref_name(), kRefName);
891+
}
892+
842893
// RemoveSnapshotRef Tests
843894

844895
TEST(TableRequirementsTest, RemoveSnapshotRef) {
845896
auto metadata = CreateBaseMetadata();
846-
std::vector<std::unique_ptr<TableUpdate>> updates;
847897

898+
std::vector<std::unique_ptr<TableUpdate>> updates;
848899
updates.push_back(std::make_unique<table::RemoveSnapshotRef>("branch"));
849900

850901
auto result = TableRequirements::ForUpdateTable(*metadata, updates);
851902
ASSERT_THAT(result, IsOk());
852903

853904
auto& requirements = result.value();
854-
// RemoveSnapshotRef doesn't add additional requirements
855905
ASSERT_EQ(requirements.size(), 1);
856906
EXPECT_EQ(CountRequirementsOfType<table::AssertUUID>(requirements), 1);
857907

858-
// Validate against base metadata
859908
for (const auto& req : requirements) {
860909
EXPECT_THAT(req->Validate(metadata.get()), IsOk());
861910
}

0 commit comments

Comments
 (0)