Skip to content

Commit 6061357

Browse files
committed
fix: review comments
1 parent f5007cc commit 6061357

7 files changed

Lines changed: 144 additions & 166 deletions

File tree

src/iceberg/table.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,10 @@ Result<std::shared_ptr<UpdatePartitionStatistics>> Table::NewUpdatePartitionStat
224224
}
225225

226226
Result<std::shared_ptr<SnapshotManager>> Table::NewSnapshotManager() {
227-
return SnapshotManager::Make(name().ToString(), shared_from_this());
227+
ICEBERG_ASSIGN_OR_RAISE(
228+
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
229+
/*auto_commit=*/true));
230+
return SnapshotManager::Make(std::move(transaction));
228231
}
229232

230233
Result<std::shared_ptr<StagedTable>> StagedTable::Make(

src/iceberg/test/snapshot_manager_test.cc

Lines changed: 118 additions & 100 deletions
Large diffs are not rendered by default.

src/iceberg/transaction.cc

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,7 @@ Transaction::~Transaction() = default;
6363

6464
Result<std::shared_ptr<Transaction>> Transaction::Make(std::shared_ptr<Table> table,
6565
Kind kind, bool auto_commit) {
66-
if (!table || !table->catalog()) [[unlikely]] {
67-
return InvalidArgument("Table and catalog cannot be null");
68-
}
66+
ICEBERG_PRECHECK(table && table->catalog(), "Table and catalog cannot be null");
6967

7068
std::unique_ptr<TableMetadataBuilder> metadata_builder;
7169
if (kind == Kind::kCreate) {
@@ -94,9 +92,11 @@ std::string Transaction::MetadataFileLocation(std::string_view filename) const {
9492
}
9593

9694
Status Transaction::AddUpdate(const std::shared_ptr<PendingUpdate>& update) {
97-
if (!last_update_committed_) {
98-
return InvalidArgument("Cannot add update when previous update is not committed");
99-
}
95+
ICEBERG_PRECHECK(update->kind() != PendingUpdate::Kind::kSnapshotManager,
96+
"SnapshotManager updates should not be added to the transaction");
97+
ICEBERG_CHECK(last_update_committed_,
98+
"Cannot add update when previous update is not committed");
99+
100100
pending_updates_.emplace_back(std::weak_ptr<PendingUpdate>(update));
101101
last_update_committed_ = false;
102102
return {};
@@ -302,13 +302,9 @@ Status Transaction::ApplyUpdatePartitionStatistics(UpdatePartitionStatistics& up
302302
}
303303

304304
Result<std::shared_ptr<Table>> Transaction::Commit() {
305-
if (committed_) {
306-
return Invalid("Transaction already committed");
307-
}
308-
if (!last_update_committed_) {
309-
return InvalidArgument(
310-
"Cannot commit transaction when previous update is not committed");
311-
}
305+
ICEBERG_CHECK(!committed_, "Transaction already committed");
306+
ICEBERG_CHECK(last_update_committed_,
307+
"Cannot commit transaction when previous update is not committed");
312308

313309
const auto& updates = metadata_builder_->changes();
314310
if (updates.empty()) {
@@ -432,7 +428,8 @@ Transaction::NewUpdateSnapshotReference() {
432428
Result<std::shared_ptr<SnapshotManager>> Transaction::NewSnapshotManager() {
433429
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<SnapshotManager> snapshot_manager,
434430
SnapshotManager::Make(shared_from_this()));
435-
ICEBERG_RETURN_UNEXPECTED(AddUpdate(snapshot_manager));
431+
// SnapshotManager has its own commit logic, so it is not added to the pending updates.
432+
// This differs from the Java implementation.
436433
return snapshot_manager;
437434
}
438435

src/iceberg/type_fwd.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,6 @@ class TableProperties;
184184
/// \brief Table update.
185185
class TableMetadataBuilder;
186186
class TableUpdate;
187-
class SnapshotManager;
188187
class TableRequirement;
189188
class TableUpdateContext;
190189
class Transaction;
@@ -194,6 +193,7 @@ class ExpireSnapshots;
194193
class FastAppend;
195194
class PendingUpdate;
196195
class SetSnapshot;
196+
class SnapshotManager;
197197
class SnapshotUpdate;
198198
class UpdateLocation;
199199
class UpdatePartitionSpec;

src/iceberg/update/pending_update.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
4444
enum class Kind : uint8_t {
4545
kExpireSnapshots,
4646
kSetSnapshot,
47+
kSnapshotManager,
4748
kUpdateLocation,
4849
kUpdatePartitionSpec,
4950
kUpdatePartitionStatistics,

src/iceberg/update/snapshot_manager.cc

Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -34,64 +34,38 @@
3434

3535
namespace iceberg {
3636

37-
Result<std::shared_ptr<SnapshotManager>> SnapshotManager::Make(
38-
const std::string& table_name, std::shared_ptr<Table> table) {
39-
if (table == nullptr) {
40-
return InvalidArgument("Table cannot be null");
41-
}
42-
if (table->metadata() == nullptr) {
43-
return InvalidArgument("Cannot manage snapshots: table {} does not exist",
44-
table_name);
45-
}
46-
// Create a transaction first
47-
ICEBERG_ASSIGN_OR_RAISE(auto transaction,
48-
Transaction::Make(table, Transaction::Kind::kUpdate,
49-
/*auto_commit=*/false));
50-
auto manager = std::shared_ptr<SnapshotManager>(
51-
new SnapshotManager(std::move(transaction), /*is_external=*/false));
52-
return manager;
53-
}
54-
5537
Result<std::shared_ptr<SnapshotManager>> SnapshotManager::Make(
5638
std::shared_ptr<Transaction> transaction) {
57-
if (transaction == nullptr) {
58-
return InvalidArgument("Invalid input transaction: null");
59-
}
60-
return std::shared_ptr<SnapshotManager>(
61-
new SnapshotManager(std::move(transaction), /*is_external=*/true));
39+
ICEBERG_PRECHECK(transaction != nullptr, "Invalid input transaction: null");
40+
return std::shared_ptr<SnapshotManager>(new SnapshotManager(std::move(transaction)));
6241
}
6342

64-
SnapshotManager::SnapshotManager(std::shared_ptr<Transaction> transaction,
65-
bool is_external)
66-
: PendingUpdate(transaction), is_external_transaction_(is_external) {}
43+
SnapshotManager::SnapshotManager(std::shared_ptr<Transaction> transaction)
44+
: PendingUpdate(transaction) {}
6745

6846
SnapshotManager::~SnapshotManager() = default;
6947

7048
SnapshotManager& SnapshotManager::Cherrypick(int64_t snapshot_id) {
71-
ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist());
7249
// TODO(anyone): Implement cherrypick operation
7350
ICEBERG_BUILDER_CHECK(false, "Cherrypick operation not yet implemented");
7451
return *this;
7552
}
7653

7754
SnapshotManager& SnapshotManager::SetCurrentSnapshot(int64_t snapshot_id) {
78-
ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist());
7955
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, transaction_->NewSetSnapshot());
8056
set_snapshot->SetCurrentSnapshot(snapshot_id);
8157
ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit());
8258
return *this;
8359
}
8460

8561
SnapshotManager& SnapshotManager::RollbackToTime(TimePointMs timestamp_ms) {
86-
ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist());
8762
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, transaction_->NewSetSnapshot());
8863
set_snapshot->RollbackToTime(UnixMsFromTimePointMs(timestamp_ms));
8964
ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit());
9065
return *this;
9166
}
9267

9368
SnapshotManager& SnapshotManager::RollbackTo(int64_t snapshot_id) {
94-
ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist());
9569
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, transaction_->NewSetSnapshot());
9670
set_snapshot->RollbackTo(snapshot_id);
9771
ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit());
@@ -101,9 +75,8 @@ SnapshotManager& SnapshotManager::RollbackTo(int64_t snapshot_id) {
10175
SnapshotManager& SnapshotManager::CreateBranch(const std::string& name) {
10276
if (base().current_snapshot_id != kInvalidSnapshotId) {
10377
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_snapshot, base().Snapshot());
104-
if (current_snapshot != nullptr) {
105-
return CreateBranch(name, current_snapshot->snapshot_id);
106-
}
78+
ICEBERG_DCHECK(current_snapshot != nullptr, "Current snapshot should not be null");
79+
return CreateBranch(name, current_snapshot->snapshot_id);
10780
}
10881
const auto& current_refs = base().refs;
10982
ICEBERG_BUILDER_CHECK(!base().refs.contains(name), "Ref {} already exists", name);
@@ -197,11 +170,8 @@ SnapshotManager& SnapshotManager::SetMaxRefAgeMs(const std::string& name,
197170
Result<std::shared_ptr<Snapshot>> SnapshotManager::Apply() { return base().Snapshot(); }
198171

199172
Status SnapshotManager::Commit() {
200-
ICEBERG_RETURN_UNEXPECTED(CommitIfRefUpdatesExist());
201-
if (!is_external_transaction_) {
202-
ICEBERG_RETURN_UNEXPECTED(transaction_->Commit());
203-
}
204-
return {};
173+
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
174+
return CommitIfRefUpdatesExist();
205175
}
206176

207177
Result<std::shared_ptr<UpdateSnapshotReference>>

src/iceberg/update/snapshot_manager.h

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,6 @@ namespace iceberg {
3737
/// snapshots, and managing branches and tags.
3838
class ICEBERG_EXPORT SnapshotManager : public PendingUpdate {
3939
public:
40-
/// \brief Create a SnapshotManager for a table.
41-
///
42-
/// \param table_name The name of the table
43-
/// \param table The table to manage snapshots for
44-
/// \return A new SnapshotManager instance, or an error if the table doesn't exist
45-
static Result<std::shared_ptr<SnapshotManager>> Make(const std::string& table_name,
46-
std::shared_ptr<Table> table);
47-
4840
/// \brief Create a SnapshotManager from an existing transaction.
4941
///
5042
/// \param transaction The transaction to use
@@ -194,17 +186,14 @@ class ICEBERG_EXPORT SnapshotManager : public PendingUpdate {
194186
/// \brief Constructor for creating a SnapshotManager with a transaction.
195187
///
196188
/// \param transaction The transaction to use
197-
/// \param is_external Whether this is an external transaction (true) or created
198-
/// internally (false)
199-
SnapshotManager(std::shared_ptr<Transaction> transaction, bool is_external);
189+
explicit SnapshotManager(std::shared_ptr<Transaction> transaction);
200190

201191
/// \brief Get or create the UpdateSnapshotReference operation.
202192
Result<std::shared_ptr<UpdateSnapshotReference>> UpdateSnapshotReferencesOperation();
203193

204194
/// \brief Commit any pending reference updates if they exist.
205195
Status CommitIfRefUpdatesExist();
206196

207-
bool is_external_transaction_;
208197
std::shared_ptr<UpdateSnapshotReference> update_snapshot_references_operation_;
209198
};
210199

0 commit comments

Comments
 (0)