Skip to content

Commit 3994b5d

Browse files
authored
feat: implement set snapshot (#509)
1 parent 93989da commit 3994b5d

File tree

13 files changed

+475
-40
lines changed

13 files changed

+475
-40
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ set(ICEBERG_SOURCES
8888
update/expire_snapshots.cc
8989
update/fast_append.cc
9090
update/pending_update.cc
91+
update/set_snapshot.cc
9192
update/snapshot_update.cc
9293
update/update_location.cc
9394
update/update_partition_spec.cc

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ iceberg_sources = files(
106106
'update/expire_snapshots.cc',
107107
'update/fast_append.cc',
108108
'update/pending_update.cc',
109+
'update/set_snapshot.cc',
109110
'update/snapshot_update.cc',
110111
'update/update_location.cc',
111112
'update/update_partition_spec.cc',

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ if(ICEBERG_BUILD_BUNDLE)
173173
SOURCES
174174
expire_snapshots_test.cc
175175
fast_append_test.cc
176+
set_snapshot_test.cc
176177
transaction_test.cc
177178
update_location_test.cc
178179
update_partition_spec_test.cc
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/set_snapshot.h"
21+
22+
#include <gmock/gmock.h>
23+
#include <gtest/gtest.h>
24+
25+
#include "iceberg/result.h"
26+
#include "iceberg/snapshot.h"
27+
#include "iceberg/test/matchers.h"
28+
#include "iceberg/test/update_test_base.h"
29+
#include "iceberg/transaction.h"
30+
31+
namespace iceberg {
32+
33+
class SetSnapshotTest : public UpdateTestBase {
34+
protected:
35+
// Snapshot IDs from TableMetadataV2Valid.json
36+
static constexpr int64_t kOldestSnapshotId = 3051729675574597004;
37+
static constexpr int64_t kCurrentSnapshotId = 3055729675574597004;
38+
39+
// Timestamps from TableMetadataV2Valid.json
40+
static constexpr int64_t kOldestSnapshotTimestamp = 1515100955770;
41+
static constexpr int64_t kCurrentSnapshotTimestamp = 1555100955770;
42+
};
43+
44+
TEST_F(SetSnapshotTest, SetCurrentSnapshotValid) {
45+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
46+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
47+
EXPECT_EQ(set_snapshot->kind(), PendingUpdate::Kind::kSetSnapshot);
48+
49+
set_snapshot->SetCurrentSnapshot(kOldestSnapshotId);
50+
51+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
52+
EXPECT_EQ(snapshot_id, kOldestSnapshotId);
53+
54+
// Commit and verify the change was persisted
55+
EXPECT_THAT(set_snapshot->Commit(), IsOk());
56+
EXPECT_THAT(transaction->Commit(), IsOk());
57+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
58+
ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
59+
EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId);
60+
}
61+
62+
TEST_F(SetSnapshotTest, SetCurrentSnapshotToCurrentSnapshot) {
63+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
64+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
65+
set_snapshot->SetCurrentSnapshot(kCurrentSnapshotId);
66+
67+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
68+
EXPECT_EQ(snapshot_id, kCurrentSnapshotId);
69+
}
70+
71+
TEST_F(SetSnapshotTest, SetCurrentSnapshotInvalid) {
72+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
73+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
74+
// Try to set to a non-existent snapshot
75+
int64_t invalid_snapshot_id = 9999999999999999;
76+
set_snapshot->SetCurrentSnapshot(invalid_snapshot_id);
77+
78+
auto result = set_snapshot->Apply();
79+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
80+
EXPECT_THAT(result, HasErrorMessage("is not found"));
81+
}
82+
83+
TEST_F(SetSnapshotTest, RollbackToValid) {
84+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
85+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
86+
// Rollback to the oldest snapshot (which is an ancestor)
87+
set_snapshot->RollbackTo(kOldestSnapshotId);
88+
89+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
90+
EXPECT_EQ(snapshot_id, kOldestSnapshotId);
91+
}
92+
93+
TEST_F(SetSnapshotTest, RollbackToInvalidSnapshot) {
94+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
95+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
96+
// Try to rollback to a non-existent snapshot
97+
int64_t invalid_snapshot_id = 9999999999999999;
98+
set_snapshot->RollbackTo(invalid_snapshot_id);
99+
100+
auto result = set_snapshot->Apply();
101+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
102+
EXPECT_THAT(result, HasErrorMessage("unknown snapshot id"));
103+
}
104+
105+
TEST_F(SetSnapshotTest, RollbackToTimeValidOldestSnapshot) {
106+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
107+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
108+
// Rollback to a time between the two snapshots
109+
// This should select the oldest snapshot
110+
int64_t time_between = (kOldestSnapshotTimestamp + kCurrentSnapshotTimestamp) / 2;
111+
set_snapshot->RollbackToTime(time_between);
112+
113+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
114+
EXPECT_EQ(snapshot_id, kOldestSnapshotId);
115+
}
116+
117+
TEST_F(SetSnapshotTest, RollbackToTimeBeforeAnySnapshot) {
118+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
119+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
120+
// Try to rollback to a time before any snapshot
121+
int64_t time_before_all = kOldestSnapshotTimestamp - 1000000;
122+
set_snapshot->RollbackToTime(time_before_all);
123+
124+
// Should fail - no snapshot older than the specified time
125+
auto result = set_snapshot->Apply();
126+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
127+
EXPECT_THAT(result, HasErrorMessage("no valid snapshot older than"));
128+
}
129+
130+
TEST_F(SetSnapshotTest, RollbackToTimeExactMatch) {
131+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
132+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
133+
// Rollback to a timestamp just after the oldest snapshot
134+
int64_t time_just_after_oldest = kOldestSnapshotTimestamp + 1;
135+
set_snapshot->RollbackToTime(time_just_after_oldest);
136+
137+
// Apply and verify - should return the oldest snapshot
138+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
139+
EXPECT_EQ(snapshot_id, kOldestSnapshotId);
140+
}
141+
142+
TEST_F(SetSnapshotTest, ApplyWithoutChanges) {
143+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
144+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
145+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
146+
EXPECT_EQ(snapshot_id, kCurrentSnapshotId);
147+
148+
EXPECT_THAT(set_snapshot->Commit(), IsOk());
149+
EXPECT_THAT(transaction->Commit(), IsOk());
150+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
151+
ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
152+
EXPECT_EQ(current_snapshot->snapshot_id, kCurrentSnapshotId);
153+
}
154+
155+
} // namespace iceberg

src/iceberg/transaction.cc

Lines changed: 53 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include <optional>
2424

2525
#include "iceberg/catalog.h"
26+
#include "iceberg/schema.h"
27+
#include "iceberg/snapshot.h"
2628
#include "iceberg/table.h"
2729
#include "iceberg/table_metadata.h"
2830
#include "iceberg/table_properties.h"
@@ -32,6 +34,7 @@
3234
#include "iceberg/update/expire_snapshots.h"
3335
#include "iceberg/update/fast_append.h"
3436
#include "iceberg/update/pending_update.h"
37+
#include "iceberg/update/set_snapshot.h"
3538
#include "iceberg/update/snapshot_update.h"
3639
#include "iceberg/update/update_location.h"
3740
#include "iceberg/update/update_partition_spec.h"
@@ -96,23 +99,35 @@ Status Transaction::AddUpdate(const std::shared_ptr<PendingUpdate>& update) {
9699

97100
Status Transaction::Apply(PendingUpdate& update) {
98101
switch (update.kind()) {
99-
case PendingUpdate::Kind::kUpdateProperties: {
100-
auto& update_properties = internal::checked_cast<UpdateProperties&>(update);
101-
ICEBERG_ASSIGN_OR_RAISE(auto result, update_properties.Apply());
102-
if (!result.updates.empty()) {
103-
metadata_builder_->SetProperties(std::move(result.updates));
102+
case PendingUpdate::Kind::kExpireSnapshots: {
103+
auto& expire_snapshots = internal::checked_cast<ExpireSnapshots&>(update);
104+
ICEBERG_ASSIGN_OR_RAISE(auto result, expire_snapshots.Apply());
105+
if (!result.snapshot_ids_to_remove.empty()) {
106+
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
104107
}
105-
if (!result.removals.empty()) {
106-
metadata_builder_->RemoveProperties(std::move(result.removals));
108+
if (!result.refs_to_remove.empty()) {
109+
for (const auto& ref_name : result.refs_to_remove) {
110+
metadata_builder_->RemoveRef(ref_name);
111+
}
107112
}
108-
if (result.format_version.has_value()) {
109-
metadata_builder_->UpgradeFormatVersion(result.format_version.value());
113+
if (!result.partition_spec_ids_to_remove.empty()) {
114+
metadata_builder_->RemovePartitionSpecs(
115+
std::move(result.partition_spec_ids_to_remove));
116+
}
117+
if (!result.schema_ids_to_remove.empty()) {
118+
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
110119
}
111120
} break;
112-
case PendingUpdate::Kind::kUpdateSortOrder: {
113-
auto& update_sort_order = internal::checked_cast<UpdateSortOrder&>(update);
114-
ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply());
115-
metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
121+
case PendingUpdate::Kind::kSetSnapshot: {
122+
auto& set_snapshot = internal::checked_cast<SetSnapshot&>(update);
123+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, set_snapshot.Apply());
124+
metadata_builder_->SetBranchSnapshot(snapshot_id,
125+
std::string(SnapshotRef::kMainBranch));
126+
} break;
127+
case PendingUpdate::Kind::kUpdateLocation: {
128+
auto& update_location = internal::checked_cast<UpdateLocation&>(update);
129+
ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply());
130+
metadata_builder_->SetLocation(location);
116131
} break;
117132
case PendingUpdate::Kind::kUpdatePartitionSpec: {
118133
auto& update_partition_spec = internal::checked_cast<UpdatePartitionSpec&>(update);
@@ -123,12 +138,30 @@ Status Transaction::Apply(PendingUpdate& update) {
123138
metadata_builder_->AddPartitionSpec(std::move(result.spec));
124139
}
125140
} break;
141+
case PendingUpdate::Kind::kUpdateProperties: {
142+
auto& update_properties = internal::checked_cast<UpdateProperties&>(update);
143+
ICEBERG_ASSIGN_OR_RAISE(auto result, update_properties.Apply());
144+
if (!result.updates.empty()) {
145+
metadata_builder_->SetProperties(std::move(result.updates));
146+
}
147+
if (!result.removals.empty()) {
148+
metadata_builder_->RemoveProperties(std::move(result.removals));
149+
}
150+
if (result.format_version.has_value()) {
151+
metadata_builder_->UpgradeFormatVersion(result.format_version.value());
152+
}
153+
} break;
126154
case PendingUpdate::Kind::kUpdateSchema: {
127155
auto& update_schema = internal::checked_cast<UpdateSchema&>(update);
128156
ICEBERG_ASSIGN_OR_RAISE(auto result, update_schema.Apply());
129157
metadata_builder_->SetCurrentSchema(std::move(result.schema),
130158
result.new_last_column_id);
131159
} break;
160+
case PendingUpdate::Kind::kUpdateSortOrder: {
161+
auto& update_sort_order = internal::checked_cast<UpdateSortOrder&>(update);
162+
ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply());
163+
metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
164+
} break;
132165
case PendingUpdate::Kind::kUpdateSnapshot: {
133166
const auto& base = metadata_builder_->current();
134167

@@ -165,30 +198,6 @@ Status Transaction::Apply(PendingUpdate& update) {
165198
metadata_builder_->AssignUUID();
166199
}
167200
} break;
168-
case PendingUpdate::Kind::kExpireSnapshots: {
169-
auto& expire_snapshots = internal::checked_cast<ExpireSnapshots&>(update);
170-
ICEBERG_ASSIGN_OR_RAISE(auto result, expire_snapshots.Apply());
171-
if (!result.snapshot_ids_to_remove.empty()) {
172-
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
173-
}
174-
if (!result.refs_to_remove.empty()) {
175-
for (const auto& ref_name : result.refs_to_remove) {
176-
metadata_builder_->RemoveRef(ref_name);
177-
}
178-
}
179-
if (!result.partition_spec_ids_to_remove.empty()) {
180-
metadata_builder_->RemovePartitionSpecs(
181-
std::move(result.partition_spec_ids_to_remove));
182-
}
183-
if (!result.schema_ids_to_remove.empty()) {
184-
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
185-
}
186-
} break;
187-
case PendingUpdate::Kind::kUpdateLocation: {
188-
auto& update_location = internal::checked_cast<UpdateLocation&>(update);
189-
ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply());
190-
metadata_builder_->SetLocation(location);
191-
} break;
192201
default:
193202
return NotSupported("Unsupported pending update: {}",
194203
static_cast<int32_t>(update.kind()));
@@ -293,6 +302,13 @@ Result<std::shared_ptr<UpdateLocation>> Transaction::NewUpdateLocation() {
293302
return update_location;
294303
}
295304

305+
Result<std::shared_ptr<SetSnapshot>> Transaction::NewSetSnapshot() {
306+
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<SetSnapshot> set_snapshot,
307+
SetSnapshot::Make(shared_from_this()));
308+
ICEBERG_RETURN_UNEXPECTED(AddUpdate(set_snapshot));
309+
return set_snapshot;
310+
}
311+
296312
Result<std::shared_ptr<FastAppend>> Transaction::NewFastAppend() {
297313
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<FastAppend> fast_append,
298314
FastAppend::Make(table_->name().name, shared_from_this()));

src/iceberg/transaction.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
8686
/// changes.
8787
Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
8888

89+
/// \brief Create a new SetSnapshot to set the current snapshot or rollback to a
90+
/// previous snapshot and commit the changes.
91+
Result<std::shared_ptr<SetSnapshot>> NewSetSnapshot();
92+
8993
/// \brief Create a new FastAppend to append data files and commit the changes.
9094
Result<std::shared_ptr<FastAppend>> NewFastAppend();
9195

src/iceberg/type_fwd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ class Transaction;
192192
class ExpireSnapshots;
193193
class FastAppend;
194194
class PendingUpdate;
195+
class SetSnapshot;
195196
class SnapshotUpdate;
196197
class UpdateLocation;
197198
class UpdatePartitionSpec;

src/iceberg/update/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ install_headers(
2020
'expire_snapshots.h',
2121
'fast_append.h',
2222
'pending_update.h',
23+
'set_snapshot.h',
2324
'snapshot_update.h',
2425
'update_location.h',
2526
'update_partition_spec.h',

src/iceberg/update/pending_update.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
4343
public:
4444
enum class Kind : uint8_t {
4545
kExpireSnapshots,
46+
kSetSnapshot,
4647
kUpdateLocation,
4748
kUpdatePartitionSpec,
4849
kUpdateProperties,

0 commit comments

Comments
 (0)