Skip to content

Commit 62757c3

Browse files
committed
feat: add UpdateSnapshotReference
1 parent e5eb6e0 commit 62757c3

File tree

9 files changed

+438
-0
lines changed

9 files changed

+438
-0
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ set(ICEBERG_SOURCES
8585
update/update_properties.cc
8686
update/update_schema.cc
8787
update/update_sort_order.cc
88+
update/update_snapshot_reference.cc
8889
util/bucket_util.cc
8990
util/content_file_util.cc
9091
util/conversions.cc

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ iceberg_sources = files(
106106
'update/update_properties.cc',
107107
'update/update_schema.cc',
108108
'update/update_sort_order.cc',
109+
'update/update_snapshot_reference.cc',
109110
'util/bucket_util.cc',
110111
'util/content_file_util.cc',
111112
'util/conversions.cc',

src/iceberg/transaction.cc

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "iceberg/update/update_partition_spec.h"
3333
#include "iceberg/update/update_properties.h"
3434
#include "iceberg/update/update_schema.h"
35+
#include "iceberg/update/update_snapshot_reference.h"
3536
#include "iceberg/update/update_sort_order.h"
3637
#include "iceberg/util/checked_cast.h"
3738
#include "iceberg/util/macros.h"
@@ -113,6 +114,24 @@ Status Transaction::Apply(PendingUpdate& update) {
113114
metadata_builder_->SetCurrentSchema(std::move(result.schema),
114115
result.new_last_column_id);
115116
} break;
117+
case PendingUpdate::Kind::kUpdateSnapshotReference: {
118+
auto& update_ref = internal::checked_cast<UpdateSnapshotReference&>(update);
119+
ICEBERG_ASSIGN_OR_RAISE(auto updated_refs, update_ref.Apply());
120+
const auto& current_refs = current().refs;
121+
// Remove refs that are no longer in updated_refs
122+
for (const auto& [name, ref] : current_refs) {
123+
if (updated_refs.find(name) == updated_refs.end()) {
124+
metadata_builder_->RemoveRef(name);
125+
}
126+
}
127+
// Add or update refs
128+
for (const auto& [name, ref] : updated_refs) {
129+
auto current_it = current_refs.find(name);
130+
if (current_it == current_refs.end() || *current_it->second != *ref) {
131+
metadata_builder_->SetRef(name, ref);
132+
}
133+
}
134+
} break;
116135
default:
117136
return NotSupported("Unsupported pending update: {}",
118137
static_cast<int32_t>(update.kind()));
@@ -193,4 +212,12 @@ Result<std::shared_ptr<UpdateSchema>> Transaction::NewUpdateSchema() {
193212
return update_schema;
194213
}
195214

215+
Result<std::shared_ptr<UpdateSnapshotReference>>
216+
Transaction::NewUpdateSnapshotReference() {
217+
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateSnapshotReference> update_ref,
218+
UpdateSnapshotReference::Make(shared_from_this()));
219+
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_ref));
220+
return update_ref;
221+
}
222+
196223
} // namespace iceberg

src/iceberg/transaction.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
7272
/// changes.
7373
Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema();
7474

75+
/// \brief Create a new UpdateSnapshotReference to update snapshot references (branches
76+
/// and tags) and commit the changes.
77+
Result<std::shared_ptr<UpdateSnapshotReference>> NewUpdateSnapshotReference();
78+
7579
private:
7680
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
7781
std::unique_ptr<TableMetadataBuilder> metadata_builder);

src/iceberg/type_fwd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ class UpdatePartitionSpec;
192192
class UpdateProperties;
193193
class UpdateSchema;
194194
class UpdateSortOrder;
195+
class UpdateSnapshotReference;
195196

196197
/// ----------------------------------------------------------------------------
197198
/// TODO: Forward declarations below are not added yet.

src/iceberg/update/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ install_headers(
2222
'update_schema.h',
2323
'update_sort_order.h',
2424
'update_properties.h',
25+
'update_snapshot_reference.h',
2526
],
2627
subdir: 'iceberg/update',
2728
)

src/iceberg/update/pending_update.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
4646
kUpdateProperties,
4747
kUpdateSchema,
4848
kUpdateSortOrder,
49+
kUpdateSnapshotReference,
4950
};
5051

5152
/// \brief Return the kind of this pending update.
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/update_snapshot_reference.h"
21+
22+
#include <memory>
23+
#include <optional>
24+
#include <string>
25+
#include <unordered_map>
26+
27+
#include "iceberg/result.h"
28+
#include "iceberg/snapshot.h"
29+
#include "iceberg/table_metadata.h"
30+
#include "iceberg/transaction.h"
31+
#include "iceberg/util/error_collector.h"
32+
#include "iceberg/util/macros.h"
33+
34+
namespace iceberg {
35+
36+
Result<std::shared_ptr<UpdateSnapshotReference>> UpdateSnapshotReference::Make(
37+
std::shared_ptr<Transaction> transaction) {
38+
ICEBERG_PRECHECK(transaction != nullptr,
39+
"Cannot create UpdateSnapshotReference without a transaction");
40+
return std::shared_ptr<UpdateSnapshotReference>(
41+
new UpdateSnapshotReference(std::move(transaction)));
42+
}
43+
44+
UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr<Transaction> transaction)
45+
: PendingUpdate(std::move(transaction)) {
46+
// Initialize updated_refs_ with current refs from base metadata
47+
const auto& base_refs = transaction_->current().refs;
48+
for (const auto& [name, ref] : base_refs) {
49+
updated_refs_[name] = ref;
50+
}
51+
}
52+
53+
UpdateSnapshotReference::~UpdateSnapshotReference() = default;
54+
55+
UpdateSnapshotReference& UpdateSnapshotReference::CreateBranch(const std::string& name,
56+
int64_t snapshot_id) {
57+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
58+
auto branch = std::make_shared<SnapshotRef>(
59+
SnapshotRef{.snapshot_id = snapshot_id, .retention = SnapshotRef::Branch{}});
60+
auto [_, inserted] = updated_refs_.emplace(name, std::move(branch));
61+
ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", name);
62+
return *this;
63+
}
64+
65+
UpdateSnapshotReference& UpdateSnapshotReference::CreateTag(const std::string& name,
66+
int64_t snapshot_id) {
67+
ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty");
68+
auto tag = std::make_shared<SnapshotRef>(
69+
SnapshotRef{.snapshot_id = snapshot_id, .retention = SnapshotRef::Tag{}});
70+
auto [_, inserted] = updated_refs_.emplace(name, std::move(tag));
71+
ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", name);
72+
return *this;
73+
}
74+
75+
UpdateSnapshotReference& UpdateSnapshotReference::RemoveBranch(const std::string& name) {
76+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
77+
ICEBERG_BUILDER_CHECK(name != SnapshotRef::kMainBranch, "Cannot remove main branch");
78+
auto it = updated_refs_.find(name);
79+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
80+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
81+
"Ref '{}' is a tag not a branch", name);
82+
updated_refs_.erase(it);
83+
return *this;
84+
}
85+
86+
UpdateSnapshotReference& UpdateSnapshotReference::RemoveTag(const std::string& name) {
87+
ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty");
88+
auto it = updated_refs_.find(name);
89+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name);
90+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag,
91+
"Ref '{}' is a branch not a tag", name);
92+
updated_refs_.erase(it);
93+
return *this;
94+
}
95+
96+
UpdateSnapshotReference& UpdateSnapshotReference::RenameBranch(
97+
const std::string& name, const std::string& new_name) {
98+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch to rename cannot be empty");
99+
ICEBERG_BUILDER_CHECK(!new_name.empty(), "New branch name cannot be empty");
100+
ICEBERG_BUILDER_CHECK(name != SnapshotRef::kMainBranch, "Cannot rename main branch");
101+
auto it = updated_refs_.find(name);
102+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
103+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
104+
"Ref '{}' is a tag not a branch", name);
105+
auto [_, inserted] = updated_refs_.emplace(new_name, it->second);
106+
ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", new_name);
107+
updated_refs_.erase(it);
108+
return *this;
109+
}
110+
111+
UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::string& name,
112+
int64_t snapshot_id) {
113+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
114+
auto it = updated_refs_.find(name);
115+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
116+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
117+
"Ref '{}' is a tag not a branch", name);
118+
it->second->snapshot_id = snapshot_id;
119+
return *this;
120+
}
121+
122+
UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::string& from,
123+
const std::string& to) {
124+
return ReplaceBranchInternal(from, to, false);
125+
}
126+
127+
UpdateSnapshotReference& UpdateSnapshotReference::FastForward(const std::string& from,
128+
const std::string& to) {
129+
return ReplaceBranchInternal(from, to, true);
130+
}
131+
132+
UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranchInternal(
133+
const std::string& from, const std::string& to, bool fast_forward) {
134+
ICEBERG_BUILDER_CHECK(!from.empty(), "Branch to update cannot be empty");
135+
ICEBERG_BUILDER_CHECK(!to.empty(), "Destination ref cannot be empty");
136+
auto to_it = updated_refs_.find(to);
137+
ICEBERG_BUILDER_CHECK(to_it != updated_refs_.end(), "Ref does not exist: {}", to);
138+
139+
auto from_it = updated_refs_.find(from);
140+
if (from_it == updated_refs_.end()) {
141+
// Create branch if it doesn't exist
142+
return CreateBranch(from, to_it->second->snapshot_id);
143+
}
144+
145+
ICEBERG_BUILDER_CHECK(from_it->second->type() == SnapshotRefType::kBranch,
146+
"Ref '{}' is a tag not a branch", from);
147+
148+
// Nothing to replace if snapshot IDs are the same
149+
if (to_it->second->snapshot_id == from_it->second->snapshot_id) {
150+
return *this;
151+
}
152+
153+
if (fast_forward) {
154+
// Validate that target snapshot is an ancestor of current branch snapshot
155+
// We need to check this using the base metadata's snapshot lookup
156+
const auto& base_metadata = transaction_->current();
157+
auto target_snapshot = base_metadata.SnapshotById(to_it->second->snapshot_id);
158+
auto current_snapshot = base_metadata.SnapshotById(from_it->second->snapshot_id);
159+
160+
ICEBERG_BUILDER_CHECK(target_snapshot.has_value(),
161+
"Target snapshot {} does not exist",
162+
to_it->second->snapshot_id);
163+
ICEBERG_BUILDER_CHECK(current_snapshot.has_value(),
164+
"Current snapshot {} does not exist",
165+
from_it->second->snapshot_id);
166+
167+
// Check if target is an ancestor of current
168+
// We traverse up the parent chain from current to see if we reach target
169+
bool target_is_ancestor = false;
170+
auto* snapshot = current_snapshot.value().get();
171+
while (snapshot) {
172+
if (snapshot->snapshot_id == to_it->second->snapshot_id) {
173+
target_is_ancestor = true;
174+
break;
175+
}
176+
if (!snapshot->parent_snapshot_id.has_value()) {
177+
break;
178+
}
179+
auto parent_result =
180+
base_metadata.SnapshotById(snapshot->parent_snapshot_id.value());
181+
if (!parent_result.has_value()) {
182+
break;
183+
}
184+
snapshot = parent_result.value().get();
185+
}
186+
187+
ICEBERG_BUILDER_CHECK(target_is_ancestor,
188+
"Cannot fast-forward: {} is not an ancestor of {}", from, to);
189+
}
190+
191+
from_it->second->snapshot_id = to_it->second->snapshot_id;
192+
return *this;
193+
}
194+
195+
UpdateSnapshotReference& UpdateSnapshotReference::ReplaceTag(const std::string& name,
196+
int64_t snapshot_id) {
197+
ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty");
198+
auto it = updated_refs_.find(name);
199+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name);
200+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag,
201+
"Ref '{}' is a branch not a tag", name);
202+
it->second->snapshot_id = snapshot_id;
203+
return *this;
204+
}
205+
206+
UpdateSnapshotReference& UpdateSnapshotReference::SetMinSnapshotsToKeep(
207+
const std::string& name, int32_t min_snapshots_to_keep) {
208+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
209+
auto it = updated_refs_.find(name);
210+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
211+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
212+
"Ref '{}' is a tag not a branch", name);
213+
std::get<SnapshotRef::Branch>(it->second->retention).min_snapshots_to_keep =
214+
min_snapshots_to_keep;
215+
return *this;
216+
}
217+
218+
UpdateSnapshotReference& UpdateSnapshotReference::SetMaxSnapshotAgeMs(
219+
const std::string& name, int64_t max_snapshot_age_ms) {
220+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
221+
auto it = updated_refs_.find(name);
222+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
223+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
224+
"Ref '{}' is a tag not a branch", name);
225+
std::get<SnapshotRef::Branch>(it->second->retention).max_snapshot_age_ms =
226+
max_snapshot_age_ms;
227+
return *this;
228+
}
229+
230+
UpdateSnapshotReference& UpdateSnapshotReference::SetMaxRefAgeMs(const std::string& name,
231+
int64_t max_ref_age_ms) {
232+
ICEBERG_BUILDER_CHECK(!name.empty(), "Reference name cannot be empty");
233+
auto it = updated_refs_.find(name);
234+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Ref does not exist: {}", name);
235+
if (it->second->type() == SnapshotRefType::kBranch) {
236+
std::get<SnapshotRef::Branch>(it->second->retention).max_ref_age_ms = max_ref_age_ms;
237+
} else {
238+
std::get<SnapshotRef::Tag>(it->second->retention).max_ref_age_ms = max_ref_age_ms;
239+
}
240+
return *this;
241+
}
242+
243+
Result<std::unordered_map<std::string, std::shared_ptr<SnapshotRef>>>
244+
UpdateSnapshotReference::Apply() {
245+
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
246+
return updated_refs_;
247+
}
248+
249+
} // namespace iceberg

0 commit comments

Comments
 (0)