Skip to content

Commit 0bce749

Browse files
committed
fix review2
1 parent 107cd63 commit 0bce749

File tree

11 files changed

+128
-157
lines changed

11 files changed

+128
-157
lines changed

src/iceberg/table.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,13 @@ Result<std::shared_ptr<ExpireSnapshots>> Table::NewExpireSnapshots() {
192192
return transaction->NewExpireSnapshots();
193193
}
194194

195+
Result<std::shared_ptr<SetSnapshot>> Table::NewSetSnapshot() {
196+
ICEBERG_ASSIGN_OR_RAISE(
197+
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
198+
/*auto_commit=*/true));
199+
return transaction->NewSetSnapshot();
200+
}
201+
195202
Result<std::shared_ptr<StagedTable>> StagedTable::Make(
196203
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
197204
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/table.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
151151
/// changes.
152152
virtual Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();
153153

154+
/// \brief Create a new SetSnapshot to set the current snapshot or rollback to a
155+
/// previous snapshot and commit the changes.
156+
virtual Result<std::shared_ptr<SetSnapshot>> NewSetSnapshot();
157+
154158
protected:
155159
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
156160
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/table_metadata.cc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -668,9 +668,6 @@ class TableMetadataBuilder::Impl {
668668
Result<std::vector<SnapshotLogEntry>> UpdateSnapshotLog(
669669
int64_t current_snapshot_id) const;
670670

671-
/// \brief Internal method to set a branch snapshot
672-
/// \param snapshot The snapshot to set
673-
/// \param branch The branch name
674671
Status SetBranchSnapshotInternal(const Snapshot& snapshot, const std::string& branch);
675672

676673
private:

src/iceberg/test/set_snapshot_test.cc

Lines changed: 30 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -45,67 +45,53 @@ class SetSnapshotTest : public UpdateTestBase {
4545
};
4646

4747
TEST_F(SetSnapshotTest, SetCurrentSnapshotValid) {
48-
// Create transaction and SetSnapshot
49-
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
50-
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
48+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot());
49+
EXPECT_EQ(set_snapshot->kind(), PendingUpdate::Kind::kSetSnapshot);
5150

52-
// Set current snapshot to the older snapshot
5351
set_snapshot->SetCurrentSnapshot(kOldestSnapshotId);
5452

55-
// Apply and verify
56-
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
57-
EXPECT_NE(result, nullptr);
58-
EXPECT_EQ(result->snapshot_id, kOldestSnapshotId);
53+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
54+
EXPECT_EQ(snapshot_id, kOldestSnapshotId);
55+
56+
// Commit and verify the change was persisted
57+
EXPECT_THAT(set_snapshot->Commit(), IsOk());
58+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
59+
ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
60+
EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId);
5961
}
6062

6163
TEST_F(SetSnapshotTest, SetCurrentSnapshotToCurrentSnapshot) {
62-
// Create transaction and SetSnapshot
63-
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
64-
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
65-
66-
// Set current snapshot to the current snapshot (no-op)
64+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot());
6765
set_snapshot->SetCurrentSnapshot(kCurrentSnapshotId);
6866

69-
// Apply and verify
70-
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
71-
EXPECT_NE(result, nullptr);
72-
EXPECT_EQ(result->snapshot_id, kCurrentSnapshotId);
67+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
68+
EXPECT_EQ(snapshot_id, kCurrentSnapshotId);
7369
}
7470

7571
TEST_F(SetSnapshotTest, SetCurrentSnapshotInvalid) {
76-
// Create transaction and SetSnapshot
77-
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
78-
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
79-
72+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot());
8073
// Try to set to a non-existent snapshot
8174
int64_t invalid_snapshot_id = 9999999999999999;
8275
set_snapshot->SetCurrentSnapshot(invalid_snapshot_id);
8376

8477
// Should fail during Apply
8578
auto result = set_snapshot->Apply();
8679
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
87-
EXPECT_THAT(result, HasErrorMessage("unknown snapshot id"));
80+
EXPECT_THAT(result, HasErrorMessage("is not found"));
8881
}
8982

9083
TEST_F(SetSnapshotTest, RollbackToValid) {
91-
// Create transaction and SetSnapshot
92-
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
93-
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
94-
84+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot());
9585
// Rollback to the oldest snapshot (which is an ancestor)
9686
set_snapshot->RollbackTo(kOldestSnapshotId);
9787

9888
// Apply and verify
99-
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
100-
EXPECT_NE(result, nullptr);
101-
EXPECT_EQ(result->snapshot_id, kOldestSnapshotId);
89+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
90+
EXPECT_EQ(snapshot_id, kOldestSnapshotId);
10291
}
10392

10493
TEST_F(SetSnapshotTest, RollbackToInvalidSnapshot) {
105-
// Create transaction and SetSnapshot
106-
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
107-
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
108-
94+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot());
10995
// Try to rollback to a non-existent snapshot
11096
int64_t invalid_snapshot_id = 9999999999999999;
11197
set_snapshot->RollbackTo(invalid_snapshot_id);
@@ -117,26 +103,19 @@ TEST_F(SetSnapshotTest, RollbackToInvalidSnapshot) {
117103
}
118104

119105
TEST_F(SetSnapshotTest, RollbackToTimeValidOldestSnapshot) {
120-
// Create transaction and SetSnapshot
121-
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
122-
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
123-
106+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot());
124107
// Rollback to a time between the two snapshots
125108
// This should select the oldest snapshot
126109
int64_t time_between = (kOldestSnapshotTimestamp + kCurrentSnapshotTimestamp) / 2;
127110
set_snapshot->RollbackToTime(time_between);
128111

129112
// Apply and verify
130-
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
131-
EXPECT_NE(result, nullptr);
132-
EXPECT_EQ(result->snapshot_id, kOldestSnapshotId);
113+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
114+
EXPECT_EQ(snapshot_id, kOldestSnapshotId);
133115
}
134116

135117
TEST_F(SetSnapshotTest, RollbackToTimeBeforeAnySnapshot) {
136-
// Create transaction and SetSnapshot
137-
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
138-
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
139-
118+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot());
140119
// Try to rollback to a time before any snapshot
141120
int64_t time_before_all = kOldestSnapshotTimestamp - 1000000;
142121
set_snapshot->RollbackToTime(time_before_all);
@@ -148,89 +127,30 @@ TEST_F(SetSnapshotTest, RollbackToTimeBeforeAnySnapshot) {
148127
}
149128

150129
TEST_F(SetSnapshotTest, RollbackToTimeExactMatch) {
151-
// Create transaction and SetSnapshot
152-
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
153-
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
154-
130+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot());
155131
// Rollback to a timestamp just after the oldest snapshot
156132
// This should return the oldest snapshot (the latest one before this time)
157133
int64_t time_just_after_oldest = kOldestSnapshotTimestamp + 1;
158134
set_snapshot->RollbackToTime(time_just_after_oldest);
159135

160136
// Apply and verify - should return the oldest snapshot
161-
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
162-
EXPECT_NE(result, nullptr);
163-
EXPECT_EQ(result->snapshot_id, kOldestSnapshotId);
137+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
138+
EXPECT_EQ(snapshot_id, kOldestSnapshotId);
164139
}
165140

166141
TEST_F(SetSnapshotTest, ApplyWithoutChanges) {
167-
// Create transaction and SetSnapshot
168-
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
169-
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
170-
142+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot());
171143
// Apply without making any changes (NOOP)
172-
ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply());
144+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
173145

174146
// Should return current snapshot
175-
EXPECT_NE(result, nullptr);
176-
EXPECT_EQ(result->snapshot_id, kCurrentSnapshotId);
177-
}
147+
EXPECT_EQ(snapshot_id, kCurrentSnapshotId);
178148

179-
TEST_F(SetSnapshotTest, MethodChaining) {
180-
// Create transaction and SetSnapshot
181-
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
182-
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
183-
184-
// Test that methods return reference for chaining
185-
// Note: Only the last operation should take effect
186-
auto& result1 = set_snapshot->SetCurrentSnapshot(kOldestSnapshotId);
187-
EXPECT_EQ(&result1, set_snapshot.get());
188-
}
189-
190-
TEST_F(SetSnapshotTest, CommitSuccess) {
191-
// Create transaction and SetSnapshot
192-
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
193-
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
194-
195-
// Set to oldest snapshot
196-
set_snapshot->SetCurrentSnapshot(kOldestSnapshotId);
197-
198-
// Commit the change
149+
// Commit NOOP and verify nothing changed
199150
EXPECT_THAT(set_snapshot->Commit(), IsOk());
200-
201-
// Commit the transaction
202-
ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit());
203-
204-
// Verify the current snapshot was changed
205-
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
206-
ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
207-
EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId);
208-
}
209-
210-
TEST_F(SetSnapshotTest, CommitEmptyUpdate) {
211-
// Create transaction and SetSnapshot
212-
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
213-
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
214-
215-
// Commit without making any changes (NOOP)
216-
EXPECT_THAT(set_snapshot->Commit(), IsOk());
217-
218-
// Commit the transaction
219-
ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit());
220-
221-
// Verify the current snapshot remained the same
222151
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
223152
ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
224153
EXPECT_EQ(current_snapshot->snapshot_id, kCurrentSnapshotId);
225154
}
226155

227-
TEST_F(SetSnapshotTest, KindReturnsSetSnapshot) {
228-
// Create transaction and SetSnapshot
229-
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
230-
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn));
231-
232-
// Verify the kind is correct
233-
EXPECT_EQ(set_snapshot->kind(), PendingUpdate::Kind::kSetSnapshot);
234-
}
235-
236156
} // namespace iceberg

src/iceberg/transaction.cc

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,8 @@ Status Transaction::Apply(PendingUpdate& update) {
187187
} break;
188188
case PendingUpdate::Kind::kSetSnapshot: {
189189
auto& set_snapshot = internal::checked_cast<SetSnapshot&>(update);
190-
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, set_snapshot.Apply());
191-
metadata_builder_->SetBranchSnapshot(snapshot->snapshot_id,
190+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, set_snapshot.Apply());
191+
metadata_builder_->SetBranchSnapshot(snapshot_id,
192192
std::string(SnapshotRef::kMainBranch));
193193
} break;
194194
default:
@@ -288,4 +288,11 @@ Result<std::shared_ptr<ExpireSnapshots>> Transaction::NewExpireSnapshots() {
288288
return expire_snapshots;
289289
}
290290

291+
Result<std::shared_ptr<SetSnapshot>> Transaction::NewSetSnapshot() {
292+
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<SetSnapshot> set_snapshot,
293+
SetSnapshot::Make(shared_from_this()));
294+
ICEBERG_RETURN_UNEXPECTED(AddUpdate(set_snapshot));
295+
return set_snapshot;
296+
}
297+
291298
} // namespace iceberg

src/iceberg/transaction.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "iceberg/iceberg_export.h"
2727
#include "iceberg/result.h"
2828
#include "iceberg/type_fwd.h"
29+
#include "iceberg/update/set_snapshot.h"
2930

3031
namespace iceberg {
3132

@@ -82,6 +83,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
8283
/// changes.
8384
Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();
8485

86+
/// \brief Create a new SetSnapshot to set the current snapshot or rollback to a
87+
/// previous snapshot and commit the changes.
88+
Result<std::shared_ptr<SetSnapshot>> NewSetSnapshot();
89+
8590
private:
8691
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
8792
std::unique_ptr<TableMetadataBuilder> metadata_builder);

src/iceberg/type_fwd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ class UpdateProperties;
194194
class UpdateSchema;
195195
class UpdateSortOrder;
196196
class ExpireSnapshots;
197+
class SetSnapshot;
197198

198199
/// ----------------------------------------------------------------------------
199200
/// TODO: Forward declarations below are not added yet.

src/iceberg/update/set_snapshot.cc

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,10 @@ SetSnapshot::~SetSnapshot() = default;
4747

4848
SetSnapshot& SetSnapshot::SetCurrentSnapshot(int64_t snapshot_id) {
4949
// Validate that the snapshot exists
50-
auto snapshot_result = base().SnapshotById(snapshot_id);
51-
ICEBERG_BUILDER_CHECK(snapshot_result.has_value(),
50+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto snapshot, base().SnapshotById(snapshot_id));
51+
ICEBERG_BUILDER_CHECK(snapshot != nullptr,
5252
"Cannot roll back to unknown snapshot id: {}", snapshot_id);
53-
5453
target_snapshot_id_ = snapshot_id;
55-
5654
return *this;
5755
}
5856

@@ -78,8 +76,8 @@ SetSnapshot& SetSnapshot::RollbackTo(int64_t snapshot_id) {
7876
"Cannot roll back to unknown snapshot id: {}", snapshot_id);
7977

8078
// Validate that the snapshot is an ancestor of the current state
81-
ICEBERG_BUILDER_ASSIGN_OR_RETURN(
82-
bool is_ancestor, SnapshotUtil::IsAncestorOf(*transaction_->table(), snapshot_id));
79+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(bool is_ancestor,
80+
SnapshotUtil::IsAncestorOf(base(), snapshot_id));
8381
ICEBERG_BUILDER_CHECK(
8482
is_ancestor,
8583
"Cannot roll back to snapshot, not an ancestor of the current state: {}",
@@ -88,47 +86,52 @@ SetSnapshot& SetSnapshot::RollbackTo(int64_t snapshot_id) {
8886
return SetCurrentSnapshot(snapshot_id);
8987
}
9088

91-
Result<std::shared_ptr<Snapshot>> SetSnapshot::Apply() {
89+
Result<int64_t> SetSnapshot::Apply() {
9290
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
9391

9492
const TableMetadata& base_metadata = transaction_->current();
9593

9694
// If no target snapshot was configured, return current state (NOOP)
9795
if (!target_snapshot_id_.has_value()) {
98-
return base_metadata.Snapshot();
96+
ICEBERG_ASSIGN_OR_RAISE(auto current_snapshot, base_metadata.Snapshot());
97+
return current_snapshot->snapshot_id;
9998
}
10099

100+
// Validate that the snapshot exists
101+
auto snapshot_result = base_metadata.SnapshotById(target_snapshot_id_.value());
102+
ICEBERG_CHECK(snapshot_result.has_value(),
103+
"Cannot roll back to unknown snapshot id: {}",
104+
target_snapshot_id_.value());
105+
101106
// If this is a rollback, validate that the target is still an ancestor
102107
if (is_rollback_) {
103108
ICEBERG_ASSIGN_OR_RAISE(
104109
bool is_ancestor,
105-
SnapshotUtil::IsAncestorOf(*transaction_->table(), target_snapshot_id_.value()));
110+
SnapshotUtil::IsAncestorOf(base_metadata, target_snapshot_id_.value()));
106111
ICEBERG_CHECK(is_ancestor,
107112
"Cannot roll back to {}: not an ancestor of the current table state",
108113
target_snapshot_id_.value());
109114
}
110115

111-
return base_metadata.SnapshotById(target_snapshot_id_.value());
116+
return target_snapshot_id_.value();
112117
}
113118

114119
Result<std::optional<std::shared_ptr<Snapshot>>> SetSnapshot::FindLatestAncestorOlderThan(
115120
int64_t timestamp_ms) const {
116-
ICEBERG_ASSIGN_OR_RAISE(auto ancestors,
117-
SnapshotUtil::CurrentAncestors(*transaction_->table()));
121+
ICEBERG_ASSIGN_OR_RAISE(auto ancestors, SnapshotUtil::CurrentAncestors(base()));
118122

119-
int64_t snapshot_timestamp = 0;
123+
TimePointMs target_timestamp = TimePointMsFromUnixMs(timestamp_ms);
124+
TimePointMs latest_timestamp = TimePointMsFromUnixMs(0);
120125
std::shared_ptr<Snapshot> result = nullptr;
121126

122-
for (const auto& snapshot : ancestors) {
127+
for (auto& snapshot : ancestors) {
123128
if (snapshot == nullptr) {
124129
continue;
125130
}
126-
127-
int64_t snap_timestamp_ms = UnixMsFromTimePointMs(snapshot->timestamp_ms);
128-
129-
if (snap_timestamp_ms < timestamp_ms && snap_timestamp_ms > snapshot_timestamp) {
130-
result = snapshot;
131-
snapshot_timestamp = snap_timestamp_ms;
131+
auto current_timestamp = snapshot->timestamp_ms;
132+
if (current_timestamp < target_timestamp && current_timestamp > latest_timestamp) {
133+
latest_timestamp = current_timestamp; // Save timestamp before move
134+
result = std::move(snapshot);
132135
}
133136
}
134137

0 commit comments

Comments
 (0)