Skip to content

Commit fb7dead

Browse files
authored
chore: refactor Transaction::Apply to use separate smaller functions (#527)
1 parent e32a3ec commit fb7dead

File tree

3 files changed

+186
-125
lines changed

3 files changed

+186
-125
lines changed

src/iceberg/transaction.cc

Lines changed: 168 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -102,125 +102,46 @@ Status Transaction::AddUpdate(const std::shared_ptr<PendingUpdate>& update) {
102102

103103
Status Transaction::Apply(PendingUpdate& update) {
104104
switch (update.kind()) {
105-
case PendingUpdate::Kind::kExpireSnapshots: {
106-
auto& expire_snapshots = internal::checked_cast<ExpireSnapshots&>(update);
107-
ICEBERG_ASSIGN_OR_RAISE(auto result, expire_snapshots.Apply());
108-
if (!result.snapshot_ids_to_remove.empty()) {
109-
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
110-
}
111-
if (!result.refs_to_remove.empty()) {
112-
for (const auto& ref_name : result.refs_to_remove) {
113-
metadata_builder_->RemoveRef(ref_name);
114-
}
115-
}
116-
if (!result.partition_spec_ids_to_remove.empty()) {
117-
metadata_builder_->RemovePartitionSpecs(
118-
std::move(result.partition_spec_ids_to_remove));
119-
}
120-
if (!result.schema_ids_to_remove.empty()) {
121-
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
122-
}
123-
} break;
124-
case PendingUpdate::Kind::kSetSnapshot: {
125-
auto& set_snapshot = internal::checked_cast<SetSnapshot&>(update);
126-
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, set_snapshot.Apply());
127-
metadata_builder_->SetBranchSnapshot(snapshot_id,
128-
std::string(SnapshotRef::kMainBranch));
129-
} break;
130-
case PendingUpdate::Kind::kUpdateLocation: {
131-
auto& update_location = internal::checked_cast<UpdateLocation&>(update);
132-
ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply());
133-
metadata_builder_->SetLocation(location);
134-
} break;
135-
case PendingUpdate::Kind::kUpdatePartitionSpec: {
136-
auto& update_partition_spec = internal::checked_cast<UpdatePartitionSpec&>(update);
137-
ICEBERG_ASSIGN_OR_RAISE(auto result, update_partition_spec.Apply());
138-
if (result.set_as_default) {
139-
metadata_builder_->SetDefaultPartitionSpec(std::move(result.spec));
140-
} else {
141-
metadata_builder_->AddPartitionSpec(std::move(result.spec));
142-
}
143-
} break;
144-
case PendingUpdate::Kind::kUpdateProperties: {
145-
auto& update_properties = internal::checked_cast<UpdateProperties&>(update);
146-
ICEBERG_ASSIGN_OR_RAISE(auto result, update_properties.Apply());
147-
if (!result.updates.empty()) {
148-
metadata_builder_->SetProperties(std::move(result.updates));
149-
}
150-
if (!result.removals.empty()) {
151-
metadata_builder_->RemoveProperties(std::move(result.removals));
152-
}
153-
if (result.format_version.has_value()) {
154-
metadata_builder_->UpgradeFormatVersion(result.format_version.value());
155-
}
156-
} break;
157-
case PendingUpdate::Kind::kUpdateSchema: {
158-
auto& update_schema = internal::checked_cast<UpdateSchema&>(update);
159-
ICEBERG_ASSIGN_OR_RAISE(auto result, update_schema.Apply());
160-
metadata_builder_->SetCurrentSchema(std::move(result.schema),
161-
result.new_last_column_id);
162-
} break;
163-
case PendingUpdate::Kind::kUpdateSnapshot: {
164-
const auto& base = metadata_builder_->current();
165-
166-
auto& update_snapshot = internal::checked_cast<SnapshotUpdate&>(update);
167-
ICEBERG_ASSIGN_OR_RAISE(auto result, update_snapshot.Apply());
168-
169-
// Create a temp builder to check if this is an empty update
170-
auto temp_update = TableMetadataBuilder::BuildFrom(&base);
171-
if (base.SnapshotById(result.snapshot->snapshot_id).has_value()) {
172-
// This is a rollback operation
173-
temp_update->SetBranchSnapshot(result.snapshot->snapshot_id,
174-
result.target_branch);
175-
} else if (result.stage_only) {
176-
temp_update->AddSnapshot(result.snapshot);
177-
} else {
178-
temp_update->SetBranchSnapshot(std::move(result.snapshot), result.target_branch);
179-
}
180-
181-
if (temp_update->changes().empty()) {
182-
// Do not commit if the metadata has not changed. for example, this may happen
183-
// when setting the current snapshot to an ID that is already current. note that
184-
// this check uses identity.
185-
return {};
186-
}
187-
188-
for (const auto& change : temp_update->changes()) {
189-
change->ApplyTo(*metadata_builder_);
190-
}
191-
192-
// If the table UUID is missing, add it here. the UUID will be re-created each time
193-
// this operation retries to ensure that if a concurrent operation assigns the UUID,
194-
// this operation will not fail.
195-
if (base.table_uuid.empty()) {
196-
metadata_builder_->AssignUUID();
197-
}
198-
} break;
199-
case PendingUpdate::Kind::kUpdateSnapshotReference: {
200-
auto& update_ref = internal::checked_cast<UpdateSnapshotReference&>(update);
201-
ICEBERG_ASSIGN_OR_RAISE(auto result, update_ref.Apply());
202-
for (const auto& name : result.to_remove) {
203-
metadata_builder_->RemoveRef(name);
204-
}
205-
for (auto&& [name, ref] : result.to_set) {
206-
metadata_builder_->SetRef(std::move(name), std::move(ref));
207-
}
208-
} break;
209-
case PendingUpdate::Kind::kUpdateSortOrder: {
210-
auto& update_sort_order = internal::checked_cast<UpdateSortOrder&>(update);
211-
ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply());
212-
metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
213-
} break;
214-
case PendingUpdate::Kind::kUpdateStatistics: {
215-
auto& update_statistics = internal::checked_cast<UpdateStatistics&>(update);
216-
ICEBERG_ASSIGN_OR_RAISE(auto result, update_statistics.Apply());
217-
for (auto&& [_, stat_file] : result.to_set) {
218-
metadata_builder_->SetStatistics(std::move(stat_file));
219-
}
220-
for (const auto& snapshot_id : result.to_remove) {
221-
metadata_builder_->RemoveStatistics(snapshot_id);
222-
}
223-
} break;
105+
case PendingUpdate::Kind::kExpireSnapshots:
106+
ICEBERG_RETURN_UNEXPECTED(
107+
ApplyExpireSnapshots(internal::checked_cast<ExpireSnapshots&>(update)));
108+
break;
109+
case PendingUpdate::Kind::kSetSnapshot:
110+
ICEBERG_RETURN_UNEXPECTED(
111+
ApplySetSnapshot(internal::checked_cast<SetSnapshot&>(update)));
112+
break;
113+
case PendingUpdate::Kind::kUpdateLocation:
114+
ICEBERG_RETURN_UNEXPECTED(
115+
ApplyUpdateLocation(internal::checked_cast<UpdateLocation&>(update)));
116+
break;
117+
case PendingUpdate::Kind::kUpdatePartitionSpec:
118+
ICEBERG_RETURN_UNEXPECTED(
119+
ApplyUpdatePartitionSpec(internal::checked_cast<UpdatePartitionSpec&>(update)));
120+
break;
121+
case PendingUpdate::Kind::kUpdateProperties:
122+
ICEBERG_RETURN_UNEXPECTED(
123+
ApplyUpdateProperties(internal::checked_cast<UpdateProperties&>(update)));
124+
break;
125+
case PendingUpdate::Kind::kUpdateSchema:
126+
ICEBERG_RETURN_UNEXPECTED(
127+
ApplyUpdateSchema(internal::checked_cast<UpdateSchema&>(update)));
128+
break;
129+
case PendingUpdate::Kind::kUpdateSnapshot:
130+
ICEBERG_RETURN_UNEXPECTED(
131+
ApplyUpdateSnapshot(internal::checked_cast<SnapshotUpdate&>(update)));
132+
break;
133+
case PendingUpdate::Kind::kUpdateSnapshotReference:
134+
ICEBERG_RETURN_UNEXPECTED(ApplyUpdateSnapshotReference(
135+
internal::checked_cast<UpdateSnapshotReference&>(update)));
136+
break;
137+
case PendingUpdate::Kind::kUpdateSortOrder:
138+
ICEBERG_RETURN_UNEXPECTED(
139+
ApplyUpdateSortOrder(internal::checked_cast<UpdateSortOrder&>(update)));
140+
break;
141+
case PendingUpdate::Kind::kUpdateStatistics:
142+
ICEBERG_RETURN_UNEXPECTED(
143+
ApplyUpdateStatistics(internal::checked_cast<UpdateStatistics&>(update)));
144+
break;
224145
default:
225146
return NotSupported("Unsupported pending update: {}",
226147
static_cast<int32_t>(update.kind()));
@@ -235,6 +156,134 @@ Status Transaction::Apply(PendingUpdate& update) {
235156
return {};
236157
}
237158

159+
Status Transaction::ApplyExpireSnapshots(ExpireSnapshots& update) {
160+
ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
161+
if (!result.snapshot_ids_to_remove.empty()) {
162+
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
163+
}
164+
if (!result.refs_to_remove.empty()) {
165+
for (const auto& ref_name : result.refs_to_remove) {
166+
metadata_builder_->RemoveRef(ref_name);
167+
}
168+
}
169+
if (!result.partition_spec_ids_to_remove.empty()) {
170+
metadata_builder_->RemovePartitionSpecs(
171+
std::move(result.partition_spec_ids_to_remove));
172+
}
173+
if (!result.schema_ids_to_remove.empty()) {
174+
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
175+
}
176+
return {};
177+
}
178+
179+
Status Transaction::ApplySetSnapshot(SetSnapshot& update) {
180+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, update.Apply());
181+
metadata_builder_->SetBranchSnapshot(snapshot_id,
182+
std::string(SnapshotRef::kMainBranch));
183+
return {};
184+
}
185+
186+
Status Transaction::ApplyUpdateLocation(UpdateLocation& update) {
187+
ICEBERG_ASSIGN_OR_RAISE(auto location, update.Apply());
188+
metadata_builder_->SetLocation(location);
189+
return {};
190+
}
191+
192+
Status Transaction::ApplyUpdatePartitionSpec(UpdatePartitionSpec& update) {
193+
ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
194+
if (result.set_as_default) {
195+
metadata_builder_->SetDefaultPartitionSpec(std::move(result.spec));
196+
} else {
197+
metadata_builder_->AddPartitionSpec(std::move(result.spec));
198+
}
199+
return {};
200+
}
201+
202+
Status Transaction::ApplyUpdateProperties(UpdateProperties& update) {
203+
ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
204+
if (!result.updates.empty()) {
205+
metadata_builder_->SetProperties(std::move(result.updates));
206+
}
207+
if (!result.removals.empty()) {
208+
metadata_builder_->RemoveProperties(std::move(result.removals));
209+
}
210+
if (result.format_version.has_value()) {
211+
metadata_builder_->UpgradeFormatVersion(result.format_version.value());
212+
}
213+
return {};
214+
}
215+
216+
Status Transaction::ApplyUpdateSchema(UpdateSchema& update) {
217+
ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
218+
metadata_builder_->SetCurrentSchema(std::move(result.schema),
219+
result.new_last_column_id);
220+
return {};
221+
}
222+
223+
Status Transaction::ApplyUpdateSnapshot(SnapshotUpdate& update) {
224+
const auto& base = metadata_builder_->current();
225+
226+
ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
227+
228+
// Create a temp builder to check if this is an empty update
229+
auto temp_update = TableMetadataBuilder::BuildFrom(&base);
230+
if (base.SnapshotById(result.snapshot->snapshot_id).has_value()) {
231+
// This is a rollback operation
232+
temp_update->SetBranchSnapshot(result.snapshot->snapshot_id, result.target_branch);
233+
} else if (result.stage_only) {
234+
temp_update->AddSnapshot(result.snapshot);
235+
} else {
236+
temp_update->SetBranchSnapshot(std::move(result.snapshot), result.target_branch);
237+
}
238+
239+
if (temp_update->changes().empty()) {
240+
// Do not commit if the metadata has not changed. for example, this may happen
241+
// when setting the current snapshot to an ID that is already current. note that
242+
// this check uses identity.
243+
return {};
244+
}
245+
246+
for (const auto& change : temp_update->changes()) {
247+
change->ApplyTo(*metadata_builder_);
248+
}
249+
250+
// If the table UUID is missing, add it here. the UUID will be re-created each time
251+
// this operation retries to ensure that if a concurrent operation assigns the UUID,
252+
// this operation will not fail.
253+
if (base.table_uuid.empty()) {
254+
metadata_builder_->AssignUUID();
255+
}
256+
return {};
257+
}
258+
259+
Status Transaction::ApplyUpdateSnapshotReference(UpdateSnapshotReference& update) {
260+
ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
261+
for (const auto& name : result.to_remove) {
262+
metadata_builder_->RemoveRef(name);
263+
}
264+
for (auto&& [name, ref] : result.to_set) {
265+
metadata_builder_->SetRef(std::move(name), std::move(ref));
266+
}
267+
return {};
268+
}
269+
270+
Status Transaction::ApplyUpdateSortOrder(UpdateSortOrder& update) {
271+
ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update.Apply());
272+
metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
273+
return {};
274+
}
275+
276+
Status Transaction::ApplyUpdateStatistics(UpdateStatistics& update) {
277+
ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply());
278+
for (auto&& [_, stat_file] : result.to_set) {
279+
metadata_builder_->SetStatistics(std::move(stat_file));
280+
}
281+
for (const auto& snapshot_id : result.to_remove) {
282+
metadata_builder_->RemoveStatistics(snapshot_id);
283+
}
284+
return {};
285+
}
286+
238287
Result<std::shared_ptr<Table>> Transaction::Commit() {
239288
if (committed_) {
240289
return Invalid("Transaction already committed");

src/iceberg/transaction.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,18 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
110110
/// \brief Apply the pending changes to current table.
111111
Status Apply(PendingUpdate& updates);
112112

113+
// Helper methods for applying different types of updates
114+
Status ApplyExpireSnapshots(ExpireSnapshots& update);
115+
Status ApplySetSnapshot(SetSnapshot& update);
116+
Status ApplyUpdateLocation(UpdateLocation& update);
117+
Status ApplyUpdatePartitionSpec(UpdatePartitionSpec& update);
118+
Status ApplyUpdateProperties(UpdateProperties& update);
119+
Status ApplyUpdateSchema(UpdateSchema& update);
120+
Status ApplyUpdateSnapshot(SnapshotUpdate& update);
121+
Status ApplyUpdateSnapshotReference(UpdateSnapshotReference& update);
122+
Status ApplyUpdateSortOrder(UpdateSortOrder& update);
123+
Status ApplyUpdateStatistics(UpdateStatistics& update);
124+
113125
private:
114126
friend class PendingUpdate;
115127

src/iceberg/util/macros.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
#include "iceberg/exception.h"
2525
#include "iceberg/result.h"
2626

27-
#define ICEBERG_RETURN_UNEXPECTED(result) \
28-
if (auto&& result_name = result; !result_name) [[unlikely]] { \
29-
return std::unexpected<Error>(result_name.error()); \
27+
#define ICEBERG_RETURN_UNEXPECTED(expr) \
28+
if (auto&& result_name = expr; !result_name) [[unlikely]] { \
29+
return std::unexpected<Error>(result_name.error()); \
3030
}
3131

3232
#define ICEBERG_ASSIGN_OR_RAISE_IMPL(result_name, lhs, rexpr) \
@@ -68,9 +68,9 @@
6868
throw iceberg::IcebergError(error.message); \
6969
}
7070

71-
#define ICEBERG_THROW_NOT_OK(result) \
72-
if (auto&& result_name = result; !result_name) [[unlikely]] { \
73-
ERROR_TO_EXCEPTION(result_name.error()); \
71+
#define ICEBERG_THROW_NOT_OK(expr) \
72+
if (auto&& result_name = expr; !result_name) [[unlikely]] { \
73+
ERROR_TO_EXCEPTION(result_name.error()); \
7474
}
7575

7676
#define ICEBERG_ASSIGN_OR_THROW_IMPL(result_name, lhs, rexpr) \

0 commit comments

Comments
 (0)