Skip to content

Commit 8fdf346

Browse files
authored
refactor: introduce TransactionContext to decouple Transaction and PendingUpdate (#591)
Add TransactionContext to own the shared state (table, metadata_builder, kind) between Transaction and PendingUpdate. Both now hold a shared_ptr<TransactionContext> instead of PendingUpdate holding a weak_ptr<Transaction>. This fixes two issues: - pending_updates_ was weak_ptr, so dropping a PendingUpdate would silently break Finalize/retry; now Transaction holds shared_ptr - Table::New*() no longer creates a temporary Transaction; it creates a TransactionContext directly and passes it to the PendingUpdate, removing the circular dependency Also clean up related redundancy: - Hoist Transaction::Kind to a standalone enum class TransactionKind - Remove Transaction::kind_ (duplicate of ctx_->kind) - Remove auto_commit machinery; PendingUpdate::Commit() now calls txn->Commit() explicitly on the table-created path - TransactionContext::Make returns Result to propagate null table errors
1 parent 1afe65c commit 8fdf346

34 files changed

+314
-265
lines changed

src/iceberg/catalog/memory/in_memory_catalog.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -500,8 +500,7 @@ Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
500500
ICEBERG_ASSIGN_OR_RAISE(
501501
auto table, StagedTable::Make(identifier, std::move(table_metadata), "", file_io_,
502502
shared_from_this()));
503-
return Transaction::Make(std::move(table), Transaction::Kind::kCreate,
504-
/* auto_commit */ false);
503+
return Transaction::Make(std::move(table), TransactionKind::kCreate);
505504
}
506505

507506
Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -384,8 +384,7 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
384384
StagedTable::Make(identifier, std::move(result.metadata),
385385
std::move(result.metadata_location), file_io_,
386386
shared_from_this()));
387-
return Transaction::Make(std::move(staged_table), Transaction::Kind::kCreate,
388-
/*auto_commit=*/false);
387+
return Transaction::Make(std::move(staged_table), TransactionKind::kCreate);
389388
}
390389

391390
Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {

src/iceberg/table.cc

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,16 @@
3232
#include "iceberg/table_scan.h"
3333
#include "iceberg/transaction.h"
3434
#include "iceberg/update/expire_snapshots.h"
35+
#include "iceberg/update/fast_append.h"
36+
#include "iceberg/update/set_snapshot.h"
3537
#include "iceberg/update/snapshot_manager.h"
38+
#include "iceberg/update/update_location.h"
3639
#include "iceberg/update/update_partition_spec.h"
3740
#include "iceberg/update/update_partition_statistics.h"
3841
#include "iceberg/update/update_properties.h"
3942
#include "iceberg/update/update_schema.h"
43+
#include "iceberg/update/update_snapshot_reference.h"
44+
#include "iceberg/update/update_sort_order.h"
4045
#include "iceberg/update/update_statistics.h"
4146
#include "iceberg/util/macros.h"
4247

@@ -166,71 +171,61 @@ Table::NewIncrementalChangelogScan() const {
166171
Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
167172
// Create a brand new transaction object for the table. Users are expected to commit the
168173
// transaction manually.
169-
return Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
170-
/*auto_commit=*/false);
174+
return Transaction::Make(shared_from_this(), TransactionKind::kUpdate);
171175
}
172176

173177
Result<std::shared_ptr<UpdatePartitionSpec>> Table::NewUpdatePartitionSpec() {
174178
ICEBERG_ASSIGN_OR_RAISE(
175-
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
176-
/*auto_commit=*/true));
177-
return transaction->NewUpdatePartitionSpec();
179+
auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate));
180+
return UpdatePartitionSpec::Make(std::move(ctx));
178181
}
179182

180183
Result<std::shared_ptr<UpdateProperties>> Table::NewUpdateProperties() {
181184
ICEBERG_ASSIGN_OR_RAISE(
182-
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
183-
/*auto_commit=*/true));
184-
return transaction->NewUpdateProperties();
185+
auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate));
186+
return UpdateProperties::Make(std::move(ctx));
185187
}
186188

187189
Result<std::shared_ptr<UpdateSortOrder>> Table::NewUpdateSortOrder() {
188190
ICEBERG_ASSIGN_OR_RAISE(
189-
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
190-
/*auto_commit=*/true));
191-
return transaction->NewUpdateSortOrder();
191+
auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate));
192+
return UpdateSortOrder::Make(std::move(ctx));
192193
}
193194

194195
Result<std::shared_ptr<UpdateSchema>> Table::NewUpdateSchema() {
195196
ICEBERG_ASSIGN_OR_RAISE(
196-
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
197-
/*auto_commit=*/true));
198-
return transaction->NewUpdateSchema();
197+
auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate));
198+
return UpdateSchema::Make(std::move(ctx));
199199
}
200200

201201
Result<std::shared_ptr<ExpireSnapshots>> Table::NewExpireSnapshots() {
202202
ICEBERG_ASSIGN_OR_RAISE(
203-
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
204-
/*auto_commit=*/true));
205-
return transaction->NewExpireSnapshots();
203+
auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate));
204+
return ExpireSnapshots::Make(std::move(ctx));
206205
}
207206

208207
Result<std::shared_ptr<UpdateLocation>> Table::NewUpdateLocation() {
209208
ICEBERG_ASSIGN_OR_RAISE(
210-
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
211-
/*auto_commit=*/true));
212-
return transaction->NewUpdateLocation();
209+
auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate));
210+
return UpdateLocation::Make(std::move(ctx));
213211
}
214212

215213
Result<std::shared_ptr<FastAppend>> Table::NewFastAppend() {
216214
ICEBERG_ASSIGN_OR_RAISE(
217-
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
218-
/*auto_commit=*/true));
219-
return transaction->NewFastAppend();
215+
auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate));
216+
return FastAppend::Make(name().name, std::move(ctx));
220217
}
221218

222219
Result<std::shared_ptr<UpdateStatistics>> Table::NewUpdateStatistics() {
223220
ICEBERG_ASSIGN_OR_RAISE(
224-
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
225-
/*auto_commit=*/true));
226-
return transaction->NewUpdateStatistics();
221+
auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate));
222+
return UpdateStatistics::Make(std::move(ctx));
227223
}
228224

229225
Result<std::shared_ptr<UpdatePartitionStatistics>> Table::NewUpdatePartitionStatistics() {
230226
ICEBERG_ASSIGN_OR_RAISE(
231-
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
232-
/*auto_commit=*/true));
233-
return transaction->NewUpdatePartitionStatistics();
227+
auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate));
228+
return UpdatePartitionStatistics::Make(std::move(ctx));
234229
}
235230

236231
Result<std::shared_ptr<SnapshotManager>> Table::NewSnapshotManager() {

src/iceberg/test/update_partition_spec_test.cc

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -232,14 +232,12 @@ class UpdatePartitionSpecTest : public ::testing::TestWithParam<int8_t> {
232232
// Helper to create UpdatePartitionSpec from a table
233233
std::shared_ptr<UpdatePartitionSpec> CreateUpdateFromTable(
234234
std::shared_ptr<Table> table) {
235-
auto transaction_result =
236-
Transaction::Make(table, Transaction::Kind::kUpdate, /*auto_commit=*/false);
237-
if (!transaction_result.has_value()) {
238-
ADD_FAILURE() << "Failed to create transaction: "
239-
<< transaction_result.error().message;
235+
auto ctx_result = TransactionContext::Make(table, TransactionKind::kUpdate);
236+
if (!ctx_result.has_value()) {
237+
ADD_FAILURE() << "Failed to create context: " << ctx_result.error().message;
240238
return nullptr;
241239
}
242-
auto update_result = UpdatePartitionSpec::Make(transaction_result.value());
240+
auto update_result = UpdatePartitionSpec::Make(std::move(ctx_result.value()));
243241
if (!update_result.has_value()) {
244242
ADD_FAILURE() << "Failed to create UpdatePartitionSpec: "
245243
<< update_result.error().message;

0 commit comments

Comments
 (0)