Skip to content

Commit a9eaabc

Browse files
committed
ver1
1 parent f869003 commit a9eaabc

File tree

10 files changed

+539
-5
lines changed

10 files changed

+539
-5
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ set(ICEBERG_SOURCES
8383
type.cc
8484
update/pending_update.cc
8585
update/snapshot_update.cc
86+
update/set_snapshot.cc
8687
update/update_partition_spec.cc
8788
update/update_properties.cc
8889
update/update_schema.cc

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ iceberg_sources = files(
103103
'transform_function.cc',
104104
'type.cc',
105105
'update/pending_update.cc',
106+
'update/set_snapshot.cc',
106107
'update/snapshot_update.cc',
107108
'update/update_partition_spec.cc',
108109
'update/update_properties.cc',

src/iceberg/table_metadata.cc

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -665,8 +665,23 @@ class TableMetadataBuilder::Impl {
665665
Result<std::vector<SnapshotLogEntry>> UpdateSnapshotLog(
666666
int64_t current_snapshot_id) const;
667667

668+
/// \brief Internal method to set a branch snapshot
669+
/// \param snapshot The snapshot to set
670+
/// \param branch The branch name
668671
Status SetBranchSnapshotInternal(const Snapshot& snapshot, const std::string& branch);
669672

673+
/// \brief Helper to create a SnapshotRef from an existing ref with a new snapshot ID
674+
/// \param ref The existing SnapshotRef
675+
/// \param snapshot_id The new snapshot ID
676+
/// \return A new SnapshotRef with the same properties but different snapshot ID
677+
static std::shared_ptr<SnapshotRef> BuildRefFrom(const SnapshotRef& ref,
678+
int64_t snapshot_id);
679+
680+
/// \brief Helper to create a new branch SnapshotRef
681+
/// \param snapshot_id The snapshot ID for the branch
682+
/// \return A new branch SnapshotRef
683+
static std::shared_ptr<SnapshotRef> BuildBranchRef(int64_t snapshot_id);
684+
670685
private:
671686
// Base metadata (nullptr for new tables)
672687
const TableMetadata* base_;
@@ -1087,11 +1102,8 @@ Status TableMetadataBuilder::Impl::SetBranchSnapshot(int64_t snapshot_id,
10871102
// change is a noop
10881103
return {};
10891104
}
1090-
1091-
auto snapshot_it = snapshots_by_id_.find(snapshot_id);
1092-
ICEBERG_CHECK(snapshot_it != snapshots_by_id_.end(),
1093-
"Cannot set {} to unknown snapshot: {}", branch, snapshot_id);
1094-
return SetBranchSnapshotInternal(*snapshot_it->second, branch);
1105+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, metadata_.SnapshotById(snapshot_id));
1106+
return SetBranchSnapshotInternal(*snapshot, branch);
10951107
}
10961108

10971109
Status TableMetadataBuilder::Impl::SetBranchSnapshot(std::shared_ptr<Snapshot> snapshot,
@@ -1334,6 +1346,36 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
13341346
return new_schema_id;
13351347
}
13361348

1349+
std::shared_ptr<SnapshotRef> TableMetadataBuilder::Impl::BuildRefFrom(
1350+
const SnapshotRef& ref, int64_t snapshot_id) {
1351+
auto new_ref = std::make_shared<SnapshotRef>();
1352+
new_ref->snapshot_id = snapshot_id;
1353+
1354+
if (std::holds_alternative<SnapshotRef::Branch>(ref.retention)) {
1355+
const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
1356+
new_ref->retention = SnapshotRef::Branch{
1357+
.min_snapshots_to_keep = branch.min_snapshots_to_keep,
1358+
.max_snapshot_age_ms = branch.max_snapshot_age_ms,
1359+
.max_ref_age_ms = branch.max_ref_age_ms,
1360+
};
1361+
} else {
1362+
const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
1363+
new_ref->retention = SnapshotRef::Tag{
1364+
.max_ref_age_ms = tag.max_ref_age_ms,
1365+
};
1366+
}
1367+
1368+
return new_ref;
1369+
}
1370+
1371+
std::shared_ptr<SnapshotRef> TableMetadataBuilder::Impl::BuildBranchRef(
1372+
int64_t snapshot_id) {
1373+
auto new_ref = std::make_shared<SnapshotRef>();
1374+
new_ref->snapshot_id = snapshot_id;
1375+
new_ref->retention = SnapshotRef::Branch{};
1376+
return new_ref;
1377+
}
1378+
13371379
TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
13381380
: impl_(std::make_unique<Impl>(format_version)) {}
13391381

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ if(ICEBERG_BUILD_BUNDLE)
169169
add_iceberg_test(table_update_test
170170
USE_BUNDLE
171171
SOURCES
172+
set_snapshot_test.cc
172173
transaction_test.cc
173174
update_partition_spec_test.cc
174175
update_properties_test.cc
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/set_snapshot.h"
21+
22+
#include <memory>
23+
24+
#include <gmock/gmock.h>
25+
#include <gtest/gtest.h>
26+
27+
#include "iceberg/result.h"
28+
#include "iceberg/snapshot.h"
29+
#include "iceberg/test/matchers.h"
30+
#include "iceberg/test/update_test_base.h"
31+
#include "iceberg/transaction.h"
32+
33+
namespace iceberg {
34+
35+
// Test fixture for SetSnapshot tests
36+
class SetSnapshotTest : public UpdateTestBase {
37+
protected:
38+
// Snapshot IDs from TableMetadataV2Valid.json
39+
static constexpr int64_t kOldestSnapshotId = 3051729675574597004;
40+
static constexpr int64_t kCurrentSnapshotId = 3055729675574597004;
41+
42+
// Timestamps from TableMetadataV2Valid.json
43+
static constexpr int64_t kOldestSnapshotTimestamp = 1515100955770;
44+
static constexpr int64_t kCurrentSnapshotTimestamp = 1555100955770;
45+
};
46+
47+
TEST_F(SetSnapshotTest, SetCurrentSnapshotValid) {
48+
// Create transaction and SetSnapshot
49+
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
50+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
51+
52+
// Set current snapshot to the older snapshot
53+
set_snapshot->SetCurrentSnapshot(kOldestSnapshotId);
54+
55+
// Apply and verify
56+
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
57+
EXPECT_NE(result, nullptr);
58+
EXPECT_EQ(result->snapshot_id, kOldestSnapshotId);
59+
}
60+
61+
TEST_F(SetSnapshotTest, SetCurrentSnapshotToCurrentSnapshot) {
62+
// Create transaction and SetSnapshot
63+
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
64+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
65+
66+
// Set current snapshot to the current snapshot (no-op)
67+
set_snapshot->SetCurrentSnapshot(kCurrentSnapshotId);
68+
69+
// Apply and verify
70+
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
71+
EXPECT_NE(result, nullptr);
72+
EXPECT_EQ(result->snapshot_id, kCurrentSnapshotId);
73+
}
74+
75+
TEST_F(SetSnapshotTest, SetCurrentSnapshotInvalid) {
76+
// Create transaction and SetSnapshot
77+
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
78+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
79+
80+
// Try to set to a non-existent snapshot
81+
int64_t invalid_snapshot_id = 9999999999999999;
82+
set_snapshot->SetCurrentSnapshot(invalid_snapshot_id);
83+
84+
// Should fail during Apply
85+
auto result = set_snapshot->Apply();
86+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
87+
EXPECT_THAT(result, HasErrorMessage("unknown snapshot id"));
88+
}
89+
90+
TEST_F(SetSnapshotTest, RollbackToValid) {
91+
// Create transaction and SetSnapshot
92+
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
93+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
94+
95+
// Rollback to the oldest snapshot (which is an ancestor)
96+
set_snapshot->RollbackTo(kOldestSnapshotId);
97+
98+
// Apply and verify
99+
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
100+
EXPECT_NE(result, nullptr);
101+
EXPECT_EQ(result->snapshot_id, kOldestSnapshotId);
102+
}
103+
104+
TEST_F(SetSnapshotTest, RollbackToInvalidSnapshot) {
105+
// Create transaction and SetSnapshot
106+
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
107+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
108+
109+
// Try to rollback to a non-existent snapshot
110+
int64_t invalid_snapshot_id = 9999999999999999;
111+
set_snapshot->RollbackTo(invalid_snapshot_id);
112+
113+
// Should fail during Apply
114+
auto result = set_snapshot->Apply();
115+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
116+
EXPECT_THAT(result, HasErrorMessage("unknown snapshot id"));
117+
}
118+
119+
TEST_F(SetSnapshotTest, RollbackToTimeValidOldestSnapshot) {
120+
// Create transaction and SetSnapshot
121+
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
122+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
123+
124+
// Rollback to a time between the two snapshots
125+
// This should select the oldest snapshot
126+
int64_t time_between = (kOldestSnapshotTimestamp + kCurrentSnapshotTimestamp) / 2;
127+
set_snapshot->RollbackToTime(time_between);
128+
129+
// Apply and verify
130+
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
131+
EXPECT_NE(result, nullptr);
132+
EXPECT_EQ(result->snapshot_id, kOldestSnapshotId);
133+
}
134+
135+
TEST_F(SetSnapshotTest, RollbackToTimeBeforeAnySnapshot) {
136+
// Create transaction and SetSnapshot
137+
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
138+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
139+
140+
// Try to rollback to a time before any snapshot
141+
int64_t time_before_all = kOldestSnapshotTimestamp - 1000000;
142+
set_snapshot->RollbackToTime(time_before_all);
143+
144+
// Should fail - no snapshot older than the specified time
145+
auto result = set_snapshot->Apply();
146+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
147+
EXPECT_THAT(result, HasErrorMessage("no valid snapshot older than"));
148+
}
149+
150+
TEST_F(SetSnapshotTest, RollbackToTimeExactMatch) {
151+
// Create transaction and SetSnapshot
152+
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
153+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
154+
155+
// Rollback to a timestamp just after the oldest snapshot
156+
// This should return the oldest snapshot (the latest one before this time)
157+
int64_t time_just_after_oldest = kOldestSnapshotTimestamp + 1;
158+
set_snapshot->RollbackToTime(time_just_after_oldest);
159+
160+
// Apply and verify - should return the oldest snapshot
161+
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
162+
EXPECT_NE(result, nullptr);
163+
EXPECT_EQ(result->snapshot_id, kOldestSnapshotId);
164+
}
165+
166+
TEST_F(SetSnapshotTest, ApplyWithoutChanges) {
167+
// Create transaction and SetSnapshot
168+
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
169+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
170+
171+
// Apply without making any changes (NOOP)
172+
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
173+
174+
// Should return current snapshot
175+
EXPECT_NE(result, nullptr);
176+
EXPECT_EQ(result->snapshot_id, kCurrentSnapshotId);
177+
}
178+
179+
TEST_F(SetSnapshotTest, MethodChaining) {
180+
// Create transaction and SetSnapshot
181+
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
182+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
183+
184+
// Test that methods return reference for chaining
185+
// Note: Only the last operation should take effect
186+
auto& result1 = set_snapshot->SetCurrentSnapshot(kOldestSnapshotId);
187+
EXPECT_EQ(&result1, set_snapshot.get());
188+
}
189+
190+
TEST_F(SetSnapshotTest, CommitSuccess) {
191+
// Create transaction and SetSnapshot
192+
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
193+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
194+
195+
// Set to oldest snapshot
196+
set_snapshot->SetCurrentSnapshot(kOldestSnapshotId);
197+
198+
// Commit the change
199+
EXPECT_THAT(set_snapshot->Commit(), IsOk());
200+
201+
// Commit the transaction
202+
ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit());
203+
204+
// Verify the current snapshot was changed
205+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
206+
ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
207+
EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId);
208+
}
209+
210+
TEST_F(SetSnapshotTest, CommitEmptyUpdate) {
211+
// Create transaction and SetSnapshot
212+
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
213+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
214+
215+
// Commit without making any changes (NOOP)
216+
EXPECT_THAT(set_snapshot->Commit(), IsOk());
217+
218+
// Commit the transaction
219+
ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit());
220+
221+
// Verify the current snapshot remained the same
222+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
223+
ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
224+
EXPECT_EQ(current_snapshot->snapshot_id, kCurrentSnapshotId);
225+
}
226+
227+
TEST_F(SetSnapshotTest, KindReturnsSetSnapshot) {
228+
// Create transaction and SetSnapshot
229+
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
230+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
231+
232+
// Verify the kind is correct
233+
EXPECT_EQ(set_snapshot->kind(), PendingUpdate::Kind::kSetSnapshot);
234+
}
235+
236+
} // namespace iceberg

src/iceberg/transaction.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include "iceberg/catalog.h"
2626
#include "iceberg/schema.h"
27+
#include "iceberg/snapshot.h"
2728
#include "iceberg/table.h"
2829
#include "iceberg/table_metadata.h"
2930
#include "iceberg/table_properties.h"
@@ -32,6 +33,7 @@
3233
#include "iceberg/table_update.h"
3334
#include "iceberg/update/pending_update.h"
3435
#include "iceberg/update/snapshot_update.h"
36+
#include "iceberg/update/set_snapshot.h"
3537
#include "iceberg/update/update_partition_spec.h"
3638
#include "iceberg/update/update_properties.h"
3739
#include "iceberg/update/update_schema.h"
@@ -163,6 +165,12 @@ Status Transaction::Apply(PendingUpdate& update) {
163165
metadata_builder_->AssignUUID();
164166
}
165167
} break;
168+
case PendingUpdate::Kind::kSetSnapshot: {
169+
auto& set_snapshot = internal::checked_cast<SetSnapshot&>(update);
170+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, set_snapshot.Apply());
171+
metadata_builder_->SetBranchSnapshot(snapshot->snapshot_id,
172+
std::string(SnapshotRef::kMainBranch));
173+
} break;
166174
default:
167175
return NotSupported("Unsupported pending update: {}",
168176
static_cast<int32_t>(update.kind()));

src/iceberg/update/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
install_headers(
1919
[
2020
'pending_update.h',
21+
'set_snapshot.h',
2122
'snapshot_update.h',
2223
'update_partition_spec.h',
2324
'update_schema.h',

src/iceberg/update/pending_update.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
4747
kUpdateSchema,
4848
kUpdateSnapshot,
4949
kUpdateSortOrder,
50+
kSetSnapshot,
5051
};
5152

5253
/// \brief Return the kind of this pending update.

0 commit comments

Comments
 (0)