Skip to content

Commit c7b6980

Browse files
committed
feat: add SnapshotManager
1 parent c03f043 commit c7b6980

6 files changed

Lines changed: 532 additions & 0 deletions

File tree

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ set(ICEBERG_SOURCES
6666
schema_util.cc
6767
snapshot.cc
6868
sort_field.cc
69+
update/snapshot_manager.cc
6970
sort_order.cc
7071
statistics_file.cc
7172
table.cc

src/iceberg/table.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "iceberg/table_properties.h"
3131
#include "iceberg/table_scan.h"
3232
#include "iceberg/transaction.h"
33+
#include "iceberg/update/snapshot_manager.h"
3334
#include "iceberg/update/update_partition_spec.h"
3435
#include "iceberg/update/update_properties.h"
3536
#include "iceberg/update/update_schema.h"
@@ -179,6 +180,10 @@ Result<std::shared_ptr<UpdateSchema>> Table::NewUpdateSchema() {
179180
return transaction->NewUpdateSchema();
180181
}
181182

183+
Result<std::shared_ptr<SnapshotManager>> Table::NewSnapshotManager() {
184+
return SnapshotManager::Make(name().ToString(), shared_from_this());
185+
}
186+
182187
Result<std::shared_ptr<StagedTable>> StagedTable::Make(
183188
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
184189
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/table.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
144144
/// changes.
145145
virtual Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema();
146146

147+
/// \brief Create a new SnapshotManager to manage snapshots and snapshot references.
148+
virtual Result<std::shared_ptr<SnapshotManager>> NewSnapshotManager();
149+
147150
protected:
148151
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
149152
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/type_fwd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ class TableProperties;
182182
/// \brief Table update.
183183
class TableMetadataBuilder;
184184
class TableUpdate;
185+
class SnapshotManager;
185186
class TableRequirement;
186187
class TableUpdateContext;
187188
class Transaction;
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/snapshot_manager.h"
21+
22+
#include <memory>
23+
#include <string>
24+
25+
#include "iceberg/result.h"
26+
#include "iceberg/snapshot.h"
27+
#include "iceberg/table.h"
28+
#include "iceberg/table_metadata.h"
29+
#include "iceberg/transaction.h"
30+
#include "iceberg/update/update_snapshot_reference.h"
31+
#include "iceberg/util/macros.h"
32+
33+
namespace iceberg {
34+
35+
Result<std::shared_ptr<SnapshotManager>> SnapshotManager::Make(
36+
const std::string& table_name, std::shared_ptr<Table> table) {
37+
if (table == nullptr) {
38+
return InvalidArgument("Table cannot be null");
39+
}
40+
if (table->metadata() == nullptr) {
41+
return InvalidArgument("Cannot manage snapshots: table {} does not exist",
42+
table_name);
43+
}
44+
// Create a transaction first
45+
ICEBERG_ASSIGN_OR_RAISE(auto transaction,
46+
Transaction::Make(table, Transaction::Kind::kUpdate,
47+
/*auto_commit=*/false));
48+
// Create SnapshotManager with the transaction (not external)
49+
auto manager = std::shared_ptr<SnapshotManager>(
50+
new SnapshotManager(std::move(transaction), false));
51+
return manager;
52+
}
53+
54+
Result<std::shared_ptr<SnapshotManager>> SnapshotManager::Make(
55+
std::shared_ptr<Transaction> transaction) {
56+
if (transaction == nullptr) {
57+
return InvalidArgument("Invalid input transaction: null");
58+
}
59+
return std::shared_ptr<SnapshotManager>(
60+
new SnapshotManager(std::move(transaction), true));
61+
}
62+
63+
SnapshotManager::SnapshotManager(std::shared_ptr<Transaction> transaction,
64+
bool is_external)
65+
: PendingUpdate(transaction), is_external_transaction_(is_external) {}
66+
67+
SnapshotManager::~SnapshotManager() = default;
68+
69+
SnapshotManager& SnapshotManager::Cherrypick(int64_t snapshot_id) {
70+
if (auto status = CommitIfRefUpdatesExist(); !status.has_value()) {
71+
return AddError(status.error());
72+
}
73+
// TODO(anyone): Implement cherrypick operation
74+
// This should create a new snapshot by applying changes from the given snapshot
75+
// For now, throw NotImplemented
76+
ICEBERG_BUILDER_CHECK(false, "Cherrypick operation not yet implemented");
77+
return *this;
78+
}
79+
80+
SnapshotManager& SnapshotManager::SetCurrentSnapshot(int64_t snapshot_id) {
81+
if (auto status = CommitIfRefUpdatesExist(); !status.has_value()) {
82+
return AddError(status.error());
83+
}
84+
// Verify snapshot exists
85+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto snapshot,
86+
transaction_->current().SnapshotById(snapshot_id));
87+
// Set the main branch to point to this snapshot
88+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref,
89+
transaction_->NewUpdateSnapshotReference());
90+
update_ref->ReplaceBranch(std::string(SnapshotRef::kMainBranch), snapshot_id);
91+
if (auto status = update_ref->Commit(); !status.has_value()) {
92+
return AddError(status.error());
93+
}
94+
return *this;
95+
}
96+
97+
SnapshotManager& SnapshotManager::RollbackToTime(TimePointMs timestamp_ms) {
98+
if (auto status = CommitIfRefUpdatesExist(); !status.has_value()) {
99+
return AddError(status.error());
100+
}
101+
// Find the last snapshot before the given timestamp
102+
const auto& snapshots = transaction_->current().snapshots;
103+
std::shared_ptr<Snapshot> target_snapshot = nullptr;
104+
for (const auto& snapshot : snapshots) {
105+
if (snapshot != nullptr && snapshot->timestamp_ms < timestamp_ms) {
106+
if (target_snapshot == nullptr ||
107+
snapshot->timestamp_ms > target_snapshot->timestamp_ms) {
108+
target_snapshot = snapshot;
109+
}
110+
}
111+
}
112+
ICEBERG_BUILDER_CHECK(target_snapshot != nullptr,
113+
"Table has no old snapshot before timestamp {}", timestamp_ms);
114+
return SetCurrentSnapshot(target_snapshot->snapshot_id);
115+
}
116+
117+
SnapshotManager& SnapshotManager::RollbackTo(int64_t snapshot_id) {
118+
if (auto status = CommitIfRefUpdatesExist(); !status.has_value()) {
119+
return AddError(status.error());
120+
}
121+
// Verify snapshot exists
122+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto snapshot,
123+
transaction_->current().SnapshotById(snapshot_id));
124+
// Get current snapshot
125+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_snapshot,
126+
transaction_->current().Snapshot());
127+
// Verify that the target snapshot is an ancestor of the current snapshot
128+
// TODO(anyone): Use SnapshotUtil::IsAncestorOf once we have access to Table
129+
// For now, we'll do a simple check by traversing parent_snapshot_id
130+
int64_t current_id = current_snapshot->snapshot_id;
131+
bool found = false;
132+
while (current_id != -1) {
133+
if (current_id == snapshot_id) {
134+
found = true;
135+
break;
136+
}
137+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_snap,
138+
transaction_->current().SnapshotById(current_id));
139+
if (current_snap->parent_snapshot_id.has_value()) {
140+
current_id = current_snap->parent_snapshot_id.value();
141+
} else {
142+
break;
143+
}
144+
}
145+
ICEBERG_BUILDER_CHECK(found,
146+
"Cannot rollback to {}: it is not an ancestor of the current "
147+
"snapshot",
148+
snapshot_id);
149+
return SetCurrentSnapshot(snapshot_id);
150+
}
151+
152+
SnapshotManager& SnapshotManager::CreateBranch(const std::string& name) {
153+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_snapshot,
154+
transaction_->current().Snapshot());
155+
if (current_snapshot != nullptr) {
156+
return CreateBranch(name, current_snapshot->snapshot_id);
157+
}
158+
// Check if branch already exists
159+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation());
160+
const auto& current_refs = transaction_->current().refs;
161+
ICEBERG_BUILDER_CHECK(current_refs.find(name) == current_refs.end(),
162+
"Ref {} already exists", name);
163+
// Create an empty snapshot for the branch
164+
// TODO(anyone): Implement creating empty snapshot
165+
// For now, throw NotImplemented
166+
ICEBERG_BUILDER_CHECK(false, "Creating branch with empty snapshot not yet implemented");
167+
return *this;
168+
}
169+
170+
SnapshotManager& SnapshotManager::CreateBranch(const std::string& name,
171+
int64_t snapshot_id) {
172+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation());
173+
update_ref->CreateBranch(name, snapshot_id);
174+
return *this;
175+
}
176+
177+
SnapshotManager& SnapshotManager::CreateTag(const std::string& name,
178+
int64_t snapshot_id) {
179+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation());
180+
update_ref->CreateTag(name, snapshot_id);
181+
return *this;
182+
}
183+
184+
SnapshotManager& SnapshotManager::RemoveBranch(const std::string& name) {
185+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation());
186+
update_ref->RemoveBranch(name);
187+
return *this;
188+
}
189+
190+
SnapshotManager& SnapshotManager::RemoveTag(const std::string& name) {
191+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation());
192+
update_ref->RemoveTag(name);
193+
return *this;
194+
}
195+
196+
SnapshotManager& SnapshotManager::ReplaceTag(const std::string& name,
197+
int64_t snapshot_id) {
198+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation());
199+
update_ref->ReplaceTag(name, snapshot_id);
200+
return *this;
201+
}
202+
203+
SnapshotManager& SnapshotManager::ReplaceBranch(const std::string& name,
204+
int64_t snapshot_id) {
205+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation());
206+
update_ref->ReplaceBranch(name, snapshot_id);
207+
return *this;
208+
}
209+
210+
SnapshotManager& SnapshotManager::ReplaceBranch(const std::string& from,
211+
const std::string& to) {
212+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation());
213+
update_ref->ReplaceBranch(from, to);
214+
return *this;
215+
}
216+
217+
SnapshotManager& SnapshotManager::FastForwardBranch(const std::string& from,
218+
const std::string& to) {
219+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation());
220+
update_ref->FastForward(from, to);
221+
return *this;
222+
}
223+
224+
SnapshotManager& SnapshotManager::RenameBranch(const std::string& name,
225+
const std::string& new_name) {
226+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation());
227+
update_ref->RenameBranch(name, new_name);
228+
return *this;
229+
}
230+
231+
SnapshotManager& SnapshotManager::SetMinSnapshotsToKeep(const std::string& branch_name,
232+
int32_t min_snapshots_to_keep) {
233+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation());
234+
update_ref->SetMinSnapshotsToKeep(branch_name, min_snapshots_to_keep);
235+
return *this;
236+
}
237+
238+
SnapshotManager& SnapshotManager::SetMaxSnapshotAgeMs(const std::string& branch_name,
239+
int64_t max_snapshot_age_ms) {
240+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation());
241+
update_ref->SetMaxSnapshotAgeMs(branch_name, max_snapshot_age_ms);
242+
return *this;
243+
}
244+
245+
SnapshotManager& SnapshotManager::SetMaxRefAgeMs(const std::string& name,
246+
int64_t max_ref_age_ms) {
247+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation());
248+
update_ref->SetMaxRefAgeMs(name, max_ref_age_ms);
249+
return *this;
250+
}
251+
252+
Result<std::shared_ptr<Snapshot>> SnapshotManager::Apply() {
253+
ICEBERG_RETURN_UNEXPECTED(CommitIfRefUpdatesExist());
254+
return transaction_->table()->current_snapshot();
255+
}
256+
257+
Status SnapshotManager::Commit() {
258+
ICEBERG_RETURN_UNEXPECTED(CommitIfRefUpdatesExist());
259+
if (!is_external_transaction_) {
260+
ICEBERG_ASSIGN_OR_RAISE(auto updated_table, transaction_->Commit());
261+
// Create a new transaction with the updated table
262+
ICEBERG_ASSIGN_OR_RAISE(transaction_,
263+
Transaction::Make(updated_table, Transaction::Kind::kUpdate,
264+
/*auto_commit=*/false));
265+
// Note: The base class transaction_ member is protected, so we can't update it
266+
// directly However, since we always use transaction_ through the base class member,
267+
// we need to ensure consistency. For now, we'll rely on the fact that all methods use
268+
// transaction_ from the base class.
269+
}
270+
return {};
271+
}
272+
273+
Result<std::shared_ptr<UpdateSnapshotReference>>
274+
SnapshotManager::UpdateSnapshotReferencesOperation() {
275+
if (update_snapshot_references_operation_ == nullptr) {
276+
ICEBERG_ASSIGN_OR_RAISE(update_snapshot_references_operation_,
277+
transaction_->NewUpdateSnapshotReference());
278+
}
279+
return update_snapshot_references_operation_;
280+
}
281+
282+
Status SnapshotManager::CommitIfRefUpdatesExist() {
283+
if (update_snapshot_references_operation_ != nullptr) {
284+
ICEBERG_RETURN_UNEXPECTED(update_snapshot_references_operation_->Commit());
285+
update_snapshot_references_operation_ = nullptr;
286+
}
287+
return {};
288+
}
289+
290+
} // namespace iceberg

0 commit comments

Comments
 (0)