Skip to content

Commit 4ed7e13

Browse files
committed
feat: add UpdateSnapshotReference
test cases will be added in the SnapshotManager PR.
1 parent 3994b5d commit 4ed7e13

File tree

12 files changed

+441
-1
lines changed

12 files changed

+441
-1
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ set(ICEBERG_SOURCES
9595
update/update_properties.cc
9696
update/update_schema.cc
9797
update/update_sort_order.cc
98+
update/update_snapshot_reference.cc
9899
util/bucket_util.cc
99100
util/content_file_util.cc
100101
util/conversions.cc

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ iceberg_sources = files(
112112
'update/update_partition_spec.cc',
113113
'update/update_properties.cc',
114114
'update/update_schema.cc',
115+
'update/update_snapshot_reference.cc',
115116
'update/update_sort_order.cc',
116117
'util/bucket_util.cc',
117118
'util/content_file_util.cc',

src/iceberg/parquet/parquet_data_util.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ Result<std::shared_ptr<::arrow::Array>> ProjectStructArray(
148148
return output_array;
149149
}
150150

151-
/// Templated implementation for projecting list arrays.
151+
/// \brief Templated implementation for projecting list arrays.
152152
/// Works with both ListArray/ListType (32-bit offsets) and
153153
/// LargeListArray/LargeListType (64-bit offsets).
154154
template <typename ArrowListArrayType, typename ArrowListType>

src/iceberg/transaction.cc

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include "iceberg/update/update_partition_spec.h"
4141
#include "iceberg/update/update_properties.h"
4242
#include "iceberg/update/update_schema.h"
43+
#include "iceberg/update/update_snapshot_reference.h"
4344
#include "iceberg/update/update_sort_order.h"
4445
#include "iceberg/util/checked_cast.h"
4546
#include "iceberg/util/location_util.h"
@@ -198,6 +199,24 @@ Status Transaction::Apply(PendingUpdate& update) {
198199
metadata_builder_->AssignUUID();
199200
}
200201
} break;
202+
case PendingUpdate::Kind::kUpdateSnapshotReference: {
203+
auto& update_ref = internal::checked_cast<UpdateSnapshotReference&>(update);
204+
ICEBERG_ASSIGN_OR_RAISE(auto updated_refs, update_ref.Apply());
205+
const auto& current_refs = current().refs;
206+
// Identify references which have been removed
207+
for (const auto& [name, ref] : current_refs) {
208+
if (updated_refs.find(name) == updated_refs.end()) {
209+
metadata_builder_->RemoveRef(name);
210+
}
211+
}
212+
// Identify references which have been created or updated
213+
for (const auto& [name, ref] : updated_refs) {
214+
auto current_it = current_refs.find(name);
215+
if (current_it == current_refs.end() || *current_it->second != *ref) {
216+
metadata_builder_->SetRef(name, ref);
217+
}
218+
}
219+
} break;
201220
default:
202221
return NotSupported("Unsupported pending update: {}",
203222
static_cast<int32_t>(update.kind()));
@@ -316,4 +335,12 @@ Result<std::shared_ptr<FastAppend>> Transaction::NewFastAppend() {
316335
return fast_append;
317336
}
318337

338+
Result<std::shared_ptr<UpdateSnapshotReference>>
339+
Transaction::NewUpdateSnapshotReference() {
340+
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateSnapshotReference> update_ref,
341+
UpdateSnapshotReference::Make(shared_from_this()));
342+
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_ref));
343+
return update_ref;
344+
}
345+
319346
} // namespace iceberg

src/iceberg/transaction.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
9393
/// \brief Create a new FastAppend to append data files and commit the changes.
9494
Result<std::shared_ptr<FastAppend>> NewFastAppend();
9595

96+
/// \brief Create a new UpdateSnapshotReference to update snapshot references (branches
97+
/// and tags) and commit the changes.
98+
Result<std::shared_ptr<UpdateSnapshotReference>> NewUpdateSnapshotReference();
99+
96100
private:
97101
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
98102
std::unique_ptr<TableMetadataBuilder> metadata_builder);

src/iceberg/type_fwd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ class UpdatePartitionSpec;
199199
class UpdateProperties;
200200
class UpdateSchema;
201201
class UpdateSortOrder;
202+
class UpdateSnapshotReference;
202203

203204
/// ----------------------------------------------------------------------------
204205
/// TODO: Forward declarations below are not added yet.

src/iceberg/update/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ install_headers(
2525
'update_location.h',
2626
'update_partition_spec.h',
2727
'update_schema.h',
28+
'update_snapshot_reference.h',
2829
'update_sort_order.h',
2930
'update_properties.h',
3031
],

src/iceberg/update/pending_update.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
5050
kUpdateSchema,
5151
kUpdateSnapshot,
5252
kUpdateSortOrder,
53+
kUpdateSnapshotReference,
5354
};
5455

5556
/// \brief Return the kind of this pending update.
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/update_snapshot_reference.h"
21+
22+
#include <memory>
23+
#include <optional>
24+
#include <string>
25+
#include <unordered_map>
26+
27+
#include "iceberg/result.h"
28+
#include "iceberg/snapshot.h"
29+
#include "iceberg/table_metadata.h"
30+
#include "iceberg/transaction.h"
31+
#include "iceberg/util/error_collector.h"
32+
#include "iceberg/util/macros.h"
33+
#include "iceberg/util/snapshot_util_internal.h"
34+
35+
namespace iceberg {
36+
37+
Result<std::shared_ptr<UpdateSnapshotReference>> UpdateSnapshotReference::Make(
38+
std::shared_ptr<Transaction> transaction) {
39+
ICEBERG_PRECHECK(transaction != nullptr,
40+
"Cannot create UpdateSnapshotReference without a transaction");
41+
return std::shared_ptr<UpdateSnapshotReference>(
42+
new UpdateSnapshotReference(std::move(transaction)));
43+
}
44+
45+
UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr<Transaction> transaction)
46+
: PendingUpdate(std::move(transaction)) {
47+
// Initialize updated_refs_ with current refs from base metadata
48+
for (const auto& [name, ref] : base().refs) {
49+
updated_refs_[name] = ref;
50+
}
51+
}
52+
53+
UpdateSnapshotReference::~UpdateSnapshotReference() = default;
54+
55+
UpdateSnapshotReference& UpdateSnapshotReference::CreateBranch(const std::string& name,
56+
int64_t snapshot_id) {
57+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
58+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto branch, SnapshotRef::MakeBranch(snapshot_id));
59+
auto [_, inserted] = updated_refs_.emplace(name, std::move(branch));
60+
ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", name);
61+
return *this;
62+
}
63+
64+
UpdateSnapshotReference& UpdateSnapshotReference::CreateTag(const std::string& name,
65+
int64_t snapshot_id) {
66+
ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty");
67+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto tag, SnapshotRef::MakeTag(snapshot_id));
68+
auto [_, inserted] = updated_refs_.emplace(name, std::move(tag));
69+
ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", name);
70+
return *this;
71+
}
72+
73+
UpdateSnapshotReference& UpdateSnapshotReference::RemoveBranch(const std::string& name) {
74+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
75+
ICEBERG_BUILDER_CHECK(name != SnapshotRef::kMainBranch, "Cannot remove main branch");
76+
auto it = updated_refs_.find(name);
77+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
78+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
79+
"Ref '{}' is a tag not a branch", name);
80+
updated_refs_.erase(it);
81+
return *this;
82+
}
83+
84+
UpdateSnapshotReference& UpdateSnapshotReference::RemoveTag(const std::string& name) {
85+
ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty");
86+
auto it = updated_refs_.find(name);
87+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name);
88+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag,
89+
"Ref '{}' is a branch not a tag", name);
90+
updated_refs_.erase(it);
91+
return *this;
92+
}
93+
94+
UpdateSnapshotReference& UpdateSnapshotReference::RenameBranch(
95+
const std::string& name, const std::string& new_name) {
96+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch to rename cannot be empty");
97+
ICEBERG_BUILDER_CHECK(!new_name.empty(), "New branch name cannot be empty");
98+
ICEBERG_BUILDER_CHECK(name != SnapshotRef::kMainBranch, "Cannot rename main branch");
99+
auto it = updated_refs_.find(name);
100+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
101+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
102+
"Ref '{}' is a tag not a branch", name);
103+
auto [_, inserted] = updated_refs_.emplace(new_name, it->second);
104+
ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", new_name);
105+
updated_refs_.erase(it);
106+
return *this;
107+
}
108+
109+
UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::string& name,
110+
int64_t snapshot_id) {
111+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
112+
auto it = updated_refs_.find(name);
113+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
114+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
115+
"Ref '{}' is a tag not a branch", name);
116+
it->second->snapshot_id = snapshot_id;
117+
return *this;
118+
}
119+
120+
UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::string& from,
121+
const std::string& to) {
122+
return ReplaceBranchInternal(from, to, false);
123+
}
124+
125+
UpdateSnapshotReference& UpdateSnapshotReference::FastForward(const std::string& from,
126+
const std::string& to) {
127+
return ReplaceBranchInternal(from, to, true);
128+
}
129+
130+
UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranchInternal(
131+
const std::string& from, const std::string& to, bool fast_forward) {
132+
ICEBERG_BUILDER_CHECK(!from.empty(), "Branch to update cannot be empty");
133+
ICEBERG_BUILDER_CHECK(!to.empty(), "Destination ref cannot be empty");
134+
auto to_it = updated_refs_.find(to);
135+
ICEBERG_BUILDER_CHECK(to_it != updated_refs_.end(), "Ref does not exist: {}", to);
136+
137+
auto from_it = updated_refs_.find(from);
138+
if (from_it == updated_refs_.end()) {
139+
return CreateBranch(from, to_it->second->snapshot_id);
140+
}
141+
142+
ICEBERG_BUILDER_CHECK(from_it->second->type() == SnapshotRefType::kBranch,
143+
"Ref '{}' is a tag not a branch", from);
144+
145+
// Nothing to replace if snapshot IDs are the same
146+
if (to_it->second->snapshot_id == from_it->second->snapshot_id) {
147+
return *this;
148+
}
149+
150+
if (fast_forward) {
151+
// Fast-forward is valid only when the current branch (from) is an ancestor of the
152+
// target (to), i.e. we are moving forward in history.
153+
const auto& base_metadata = transaction_->current();
154+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(
155+
auto from_is_ancestor_of_to,
156+
SnapshotUtil::IsAncestorOf(
157+
to_it->second->snapshot_id, from_it->second->snapshot_id,
158+
[&base_metadata](int64_t id) { return base_metadata.SnapshotById(id); }));
159+
160+
ICEBERG_BUILDER_CHECK(from_is_ancestor_of_to,
161+
"Cannot fast-forward: {} is not an ancestor of {}", from, to);
162+
}
163+
164+
from_it->second->snapshot_id = to_it->second->snapshot_id;
165+
return *this;
166+
}
167+
168+
UpdateSnapshotReference& UpdateSnapshotReference::ReplaceTag(const std::string& name,
169+
int64_t snapshot_id) {
170+
ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty");
171+
auto it = updated_refs_.find(name);
172+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name);
173+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag,
174+
"Ref '{}' is a branch not a tag", name);
175+
it->second->snapshot_id = snapshot_id;
176+
return *this;
177+
}
178+
179+
UpdateSnapshotReference& UpdateSnapshotReference::SetMinSnapshotsToKeep(
180+
const std::string& name, int32_t min_snapshots_to_keep) {
181+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
182+
auto it = updated_refs_.find(name);
183+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
184+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
185+
"Ref '{}' is a tag not a branch", name);
186+
std::get<SnapshotRef::Branch>(it->second->retention).min_snapshots_to_keep =
187+
min_snapshots_to_keep;
188+
ICEBERG_BUILDER_CHECK(it->second->Validate(),
189+
"Invalid min_snapshots_to_keep {} for branch '{}'",
190+
min_snapshots_to_keep, name);
191+
return *this;
192+
}
193+
194+
UpdateSnapshotReference& UpdateSnapshotReference::SetMaxSnapshotAgeMs(
195+
const std::string& name, int64_t max_snapshot_age_ms) {
196+
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
197+
auto it = updated_refs_.find(name);
198+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
199+
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
200+
"Ref '{}' is a tag not a branch", name);
201+
std::get<SnapshotRef::Branch>(it->second->retention).max_snapshot_age_ms =
202+
max_snapshot_age_ms;
203+
ICEBERG_BUILDER_CHECK(it->second->Validate(),
204+
"Invalid max_snapshot_age_ms {} for branch '{}'",
205+
max_snapshot_age_ms, name);
206+
return *this;
207+
}
208+
209+
UpdateSnapshotReference& UpdateSnapshotReference::SetMaxRefAgeMs(const std::string& name,
210+
int64_t max_ref_age_ms) {
211+
ICEBERG_BUILDER_CHECK(!name.empty(), "Reference name cannot be empty");
212+
auto it = updated_refs_.find(name);
213+
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Ref does not exist: {}", name);
214+
if (it->second->type() == SnapshotRefType::kBranch) {
215+
std::get<SnapshotRef::Branch>(it->second->retention).max_ref_age_ms = max_ref_age_ms;
216+
} else {
217+
std::get<SnapshotRef::Tag>(it->second->retention).max_ref_age_ms = max_ref_age_ms;
218+
}
219+
ICEBERG_BUILDER_CHECK(it->second->Validate(), "Invalid max_ref_age_ms {} for ref '{}'",
220+
max_ref_age_ms, name);
221+
return *this;
222+
}
223+
224+
Result<std::unordered_map<std::string, std::shared_ptr<SnapshotRef>>>
225+
UpdateSnapshotReference::Apply() {
226+
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
227+
return updated_refs_;
228+
}
229+
230+
} // namespace iceberg

0 commit comments

Comments
 (0)