Skip to content

Commit 0f36ff0

Browse files
committed
fix: review comments
1 parent 1b633cc commit 0f36ff0

3 files changed

Lines changed: 65 additions & 42 deletions

File tree

src/iceberg/transaction.cc

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ Status Transaction::Apply(PendingUpdate& update) {
160160
metadata_builder_->SetCurrentSchema(std::move(result.schema),
161161
result.new_last_column_id);
162162
} break;
163-
164163
case PendingUpdate::Kind::kUpdateSnapshot: {
165164
const auto& base = metadata_builder_->current();
166165

@@ -199,20 +198,12 @@ Status Transaction::Apply(PendingUpdate& update) {
199198
} break;
200199
case PendingUpdate::Kind::kUpdateSnapshotReference: {
201200
auto& update_ref = internal::checked_cast<UpdateSnapshotReference&>(update);
202-
ICEBERG_ASSIGN_OR_RAISE(auto updated_refs, update_ref.Apply());
203-
const auto& current_refs = current().refs;
204-
// Identify references which have been removed
205-
for (const auto& [name, ref] : current_refs) {
206-
if (updated_refs.find(name) == updated_refs.end()) {
207-
metadata_builder_->RemoveRef(name);
208-
}
201+
ICEBERG_ASSIGN_OR_RAISE(auto result, update_ref.Apply());
202+
for (const auto& name : result.to_remove) {
203+
metadata_builder_->RemoveRef(name);
209204
}
210-
// Identify references which have been created or updated
211-
for (const auto& [name, ref] : updated_refs) {
212-
auto current_it = current_refs.find(name);
213-
if (current_it == current_refs.end() || *current_it->second != *ref) {
214-
metadata_builder_->SetRef(name, ref);
215-
}
205+
for (auto&& [name, ref] : result.to_set) {
206+
metadata_builder_->SetRef(std::move(name), std::move(ref));
216207
}
217208
} break;
218209
case PendingUpdate::Kind::kUpdateSortOrder: {
@@ -230,7 +221,6 @@ Status Transaction::Apply(PendingUpdate& update) {
230221
metadata_builder_->RemoveStatistics(snapshot_id);
231222
}
232223
} break;
233-
234224
default:
235225
return NotSupported("Unsupported pending update: {}",
236226
static_cast<int32_t>(update.kind()));

src/iceberg/update/update_snapshot_reference.cc

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,9 @@
2424
#include <string>
2525
#include <unordered_map>
2626

27-
#include "iceberg/result.h"
2827
#include "iceberg/snapshot.h"
2928
#include "iceberg/table_metadata.h"
3029
#include "iceberg/transaction.h"
31-
#include "iceberg/util/error_collector.h"
3230
#include "iceberg/util/macros.h"
3331
#include "iceberg/util/snapshot_util_internal.h"
3432

@@ -43,12 +41,8 @@ Result<std::shared_ptr<UpdateSnapshotReference>> UpdateSnapshotReference::Make(
4341
}
4442

4543
UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr<Transaction> transaction)
46-
: PendingUpdate(std::move(transaction)) {
47-
// Initialize updated_refs_ with current refs from base metadata
48-
for (const auto& [name, ref] : base().refs) {
49-
updated_refs_[name] = ref;
50-
}
51-
}
44+
: PendingUpdate(std::move(transaction)),
45+
updated_refs_(base().refs.begin(), base().refs.end()) {}
5246

5347
UpdateSnapshotReference::~UpdateSnapshotReference() = default;
5448

@@ -113,7 +107,9 @@ UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::strin
113107
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
114108
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
115109
"Ref '{}' is a tag not a branch", name);
116-
it->second->snapshot_id = snapshot_id;
110+
// Clone the ref before modifying to avoid affecting base metadata
111+
auto cloned = it->second->Clone(snapshot_id);
112+
it->second = std::shared_ptr<SnapshotRef>(cloned.release());
117113
return *this;
118114
}
119115

@@ -161,7 +157,9 @@ UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranchInternal(
161157
"Cannot fast-forward: {} is not an ancestor of {}", from, to);
162158
}
163159

164-
from_it->second->snapshot_id = to_it->second->snapshot_id;
160+
// Clone the ref before modifying to avoid affecting base metadata
161+
auto cloned = from_it->second->Clone(to_it->second->snapshot_id);
162+
from_it->second = std::shared_ptr<SnapshotRef>(cloned.release());
165163
return *this;
166164
}
167165

@@ -172,7 +170,9 @@ UpdateSnapshotReference& UpdateSnapshotReference::ReplaceTag(const std::string&
172170
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name);
173171
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag,
174172
"Ref '{}' is a branch not a tag", name);
175-
it->second->snapshot_id = snapshot_id;
173+
// Clone the ref before modifying to avoid affecting base metadata
174+
auto cloned = it->second->Clone(snapshot_id);
175+
it->second = std::shared_ptr<SnapshotRef>(cloned.release());
176176
return *this;
177177
}
178178

@@ -183,11 +183,14 @@ UpdateSnapshotReference& UpdateSnapshotReference::SetMinSnapshotsToKeep(
183183
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
184184
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
185185
"Ref '{}' is a tag not a branch", name);
186-
std::get<SnapshotRef::Branch>(it->second->retention).min_snapshots_to_keep =
186+
// Clone the ref before modifying to avoid affecting base metadata
187+
auto cloned = it->second->Clone();
188+
std::get<SnapshotRef::Branch>(cloned->retention).min_snapshots_to_keep =
187189
min_snapshots_to_keep;
188-
ICEBERG_BUILDER_CHECK(it->second->Validate(),
190+
ICEBERG_BUILDER_CHECK(cloned->Validate(),
189191
"Invalid min_snapshots_to_keep {} for branch '{}'",
190192
min_snapshots_to_keep, name);
193+
it->second = std::shared_ptr<SnapshotRef>(cloned.release());
191194
return *this;
192195
}
193196

@@ -198,11 +201,14 @@ UpdateSnapshotReference& UpdateSnapshotReference::SetMaxSnapshotAgeMs(
198201
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
199202
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
200203
"Ref '{}' is a tag not a branch", name);
201-
std::get<SnapshotRef::Branch>(it->second->retention).max_snapshot_age_ms =
204+
// Clone the ref before modifying to avoid affecting base metadata
205+
auto cloned = it->second->Clone();
206+
std::get<SnapshotRef::Branch>(cloned->retention).max_snapshot_age_ms =
202207
max_snapshot_age_ms;
203-
ICEBERG_BUILDER_CHECK(it->second->Validate(),
208+
ICEBERG_BUILDER_CHECK(cloned->Validate(),
204209
"Invalid max_snapshot_age_ms {} for branch '{}'",
205210
max_snapshot_age_ms, name);
211+
it->second = std::shared_ptr<SnapshotRef>(cloned.release());
206212
return *this;
207213
}
208214

@@ -211,20 +217,41 @@ UpdateSnapshotReference& UpdateSnapshotReference::SetMaxRefAgeMs(const std::stri
211217
ICEBERG_BUILDER_CHECK(!name.empty(), "Reference name cannot be empty");
212218
auto it = updated_refs_.find(name);
213219
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Ref does not exist: {}", name);
214-
if (it->second->type() == SnapshotRefType::kBranch) {
215-
std::get<SnapshotRef::Branch>(it->second->retention).max_ref_age_ms = max_ref_age_ms;
220+
// Clone the ref before modifying to avoid affecting base metadata
221+
auto cloned = it->second->Clone();
222+
if (cloned->type() == SnapshotRefType::kBranch) {
223+
std::get<SnapshotRef::Branch>(cloned->retention).max_ref_age_ms = max_ref_age_ms;
216224
} else {
217-
std::get<SnapshotRef::Tag>(it->second->retention).max_ref_age_ms = max_ref_age_ms;
225+
std::get<SnapshotRef::Tag>(cloned->retention).max_ref_age_ms = max_ref_age_ms;
218226
}
219-
ICEBERG_BUILDER_CHECK(it->second->Validate(), "Invalid max_ref_age_ms {} for ref '{}'",
227+
ICEBERG_BUILDER_CHECK(cloned->Validate(), "Invalid max_ref_age_ms {} for ref '{}'",
220228
max_ref_age_ms, name);
229+
it->second = std::shared_ptr<SnapshotRef>(cloned.release());
221230
return *this;
222231
}
223232

224-
Result<std::unordered_map<std::string, std::shared_ptr<SnapshotRef>>>
225-
UpdateSnapshotReference::Apply() {
233+
Result<UpdateSnapshotReference::ApplyResult> UpdateSnapshotReference::Apply() {
226234
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
227-
return updated_refs_;
235+
236+
ApplyResult result;
237+
const auto& current_refs = base().refs;
238+
239+
// Identify references which have been removed
240+
for (const auto& [name, ref] : current_refs) {
241+
if (updated_refs_.find(name) == updated_refs_.end()) {
242+
result.to_remove.push_back(name);
243+
}
244+
}
245+
246+
// Identify references which have been created or updated
247+
for (const auto& [name, ref] : updated_refs_) {
248+
auto current_it = current_refs.find(name);
249+
if (current_it == current_refs.end() || *current_it->second != *ref) {
250+
result.to_set.emplace_back(name, ref);
251+
}
252+
}
253+
254+
return result;
228255
}
229256

230257
} // namespace iceberg

src/iceberg/update/update_snapshot_reference.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,8 @@ namespace iceberg {
3535

3636
/// \brief Updates snapshot references.
3737
///
38-
/// TODO(xxx): Add SetSnapshotOperation operations such as setCurrentSnapshot,
39-
/// rollBackTime, rollbackTo to this class so that we can support those operations for
40-
/// refs.
38+
/// TODO(xxx): Add SetSnapshot operations such as SetCurrentSnapshot, RollBackTime,
39+
/// RollbackTo to this class so that we can support those operations for refs.
4140
class ICEBERG_EXPORT UpdateSnapshotReference : public PendingUpdate {
4241
public:
4342
static Result<std::shared_ptr<UpdateSnapshotReference>> Make(
@@ -136,8 +135,15 @@ class ICEBERG_EXPORT UpdateSnapshotReference : public PendingUpdate {
136135

137136
Kind kind() const final { return Kind::kUpdateSnapshotReference; }
138137

139-
/// \brief Apply the pending changes and return the updated references.
140-
Result<std::unordered_map<std::string, std::shared_ptr<SnapshotRef>>> Apply();
138+
struct ApplyResult {
139+
/// References to set or update (name, ref pairs)
140+
std::vector<std::pair<std::string, std::shared_ptr<SnapshotRef>>> to_set;
141+
/// Reference names to remove
142+
std::vector<std::string> to_remove;
143+
};
144+
145+
/// \brief Apply the pending changes and return the updated and removed references.
146+
Result<ApplyResult> Apply();
141147

142148
private:
143149
explicit UpdateSnapshotReference(std::shared_ptr<Transaction> transaction);

0 commit comments

Comments
 (0)