Skip to content

Commit 99c85fd

Browse files
committed
fix
1 parent 92588f4 commit 99c85fd

File tree

10 files changed

+219
-57
lines changed

10 files changed

+219
-57
lines changed

src/iceberg/table.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ Status Table::Refresh() {
8787
ICEBERG_ASSIGN_OR_RAISE(auto refreshed_table, catalog_->LoadTable(identifier_));
8888
if (metadata_location_ != refreshed_table->metadata_file_location()) {
8989
metadata_ = std::move(refreshed_table->metadata_);
90-
metadata_location_ = std::string(refreshed_table->metadata_file_location());
9190
io_ = std::move(refreshed_table->io_);
9291
metadata_cache_ = std::make_unique<TableMetadataCache>(metadata_.get());
9392
}

src/iceberg/test/transaction_test.cc

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@
2323
#include "iceberg/expression/term.h"
2424
#include "iceberg/sort_order.h"
2525
#include "iceberg/test/matchers.h"
26+
#include "iceberg/test/mock_catalog.h"
2627
#include "iceberg/test/update_test_base.h"
2728
#include "iceberg/transform.h"
29+
#include "iceberg/type.h"
2830
#include "iceberg/update/update_properties.h"
31+
#include "iceberg/update/update_schema.h"
2932
#include "iceberg/update/update_sort_order.h"
3033

3134
namespace iceberg {
@@ -94,4 +97,147 @@ TEST_F(TransactionTest, MultipleUpdatesInTransaction) {
9497
EXPECT_EQ(*sort_order, *expected_sort_order);
9598
}
9699

100+
class TransactionRetryTest : public UpdateTestBase {
101+
protected:
102+
void SetUp() override {
103+
UpdateTestBase::SetUp();
104+
105+
// Create a MockCatalog and wire it to the existing table
106+
mock_catalog_ = std::make_shared<::testing::NiceMock<MockCatalog>>();
107+
108+
ON_CALL(*mock_catalog_, LoadTable(::testing::_))
109+
.WillByDefault([this](const TableIdentifier&) -> Result<std::shared_ptr<Table>> {
110+
return Table::Make(table_->name(), table_->metadata(),
111+
std::string(table_->metadata_file_location()), table_->io(),
112+
mock_catalog_);
113+
});
114+
115+
// Create a table instance bound to the mock catalog
116+
auto result = Table::Make(table_->name(), table_->metadata(),
117+
std::string(table_->metadata_file_location()), table_->io(),
118+
mock_catalog_);
119+
ASSERT_THAT(result, IsOk());
120+
mock_table_ = std::move(result.value());
121+
}
122+
123+
std::shared_ptr<::testing::NiceMock<MockCatalog>> mock_catalog_;
124+
std::shared_ptr<Table> mock_table_;
125+
};
126+
127+
TEST_F(TransactionRetryTest, CommitRetrySucceedsAfterConflict) {
128+
int update_call_count = 0;
129+
ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_))
130+
.WillByDefault([this, &update_call_count](
131+
const TableIdentifier&,
132+
const std::vector<std::unique_ptr<TableRequirement>>&,
133+
const std::vector<std::unique_ptr<TableUpdate>>&)
134+
-> Result<std::shared_ptr<Table>> {
135+
++update_call_count;
136+
if (update_call_count == 1) {
137+
return CommitFailed("conflict on first attempt");
138+
}
139+
return Table::Make(mock_table_->name(), mock_table_->metadata(),
140+
std::string(mock_table_->metadata_file_location()),
141+
mock_table_->io(), mock_catalog_);
142+
});
143+
144+
ICEBERG_UNWRAP_OR_FAIL(auto txn, mock_table_->NewTransaction());
145+
ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties());
146+
update->Set("retry.test", "value");
147+
EXPECT_THAT(update->Commit(), IsOk());
148+
149+
auto result = txn->Commit();
150+
EXPECT_THAT(result, IsOk());
151+
EXPECT_EQ(update_call_count, 2);
152+
}
153+
154+
TEST_F(TransactionRetryTest, CommitRetryExhausted) {
155+
int update_call_count = 0;
156+
ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_))
157+
.WillByDefault(
158+
[&update_call_count](const TableIdentifier&,
159+
const std::vector<std::unique_ptr<TableRequirement>>&,
160+
const std::vector<std::unique_ptr<TableUpdate>>&)
161+
-> Result<std::shared_ptr<Table>> {
162+
++update_call_count;
163+
return CommitFailed("always conflicts");
164+
});
165+
166+
ICEBERG_UNWRAP_OR_FAIL(auto txn, mock_table_->NewTransaction());
167+
ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties());
168+
update->Set("retry.test", "value");
169+
EXPECT_THAT(update->Commit(), IsOk());
170+
171+
auto result = txn->Commit();
172+
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
173+
EXPECT_EQ(update_call_count, 5);
174+
}
175+
176+
TEST_F(TransactionRetryTest, CommitNonRetryableErrorStopsImmediately) {
177+
int update_call_count = 0;
178+
ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_))
179+
.WillByDefault(
180+
[&update_call_count](const TableIdentifier&,
181+
const std::vector<std::unique_ptr<TableRequirement>>&,
182+
const std::vector<std::unique_ptr<TableUpdate>>&)
183+
-> Result<std::shared_ptr<Table>> {
184+
++update_call_count;
185+
return CommitStateUnknown("unknown state");
186+
});
187+
188+
ICEBERG_UNWRAP_OR_FAIL(auto txn, mock_table_->NewTransaction());
189+
ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties());
190+
update->Set("retry.test", "value");
191+
EXPECT_THAT(update->Commit(), IsOk());
192+
193+
auto result = txn->Commit();
194+
EXPECT_THAT(result, IsError(ErrorKind::kCommitStateUnknown));
195+
EXPECT_EQ(update_call_count, 1); // Should not retry
196+
}
197+
198+
TEST_F(TransactionRetryTest, CreateTransactionDoesNotRetry) {
199+
int update_call_count = 0;
200+
ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_))
201+
.WillByDefault(
202+
[&update_call_count](const TableIdentifier&,
203+
const std::vector<std::unique_ptr<TableRequirement>>&,
204+
const std::vector<std::unique_ptr<TableUpdate>>&)
205+
-> Result<std::shared_ptr<Table>> {
206+
++update_call_count;
207+
return CommitFailed("conflict");
208+
});
209+
210+
ICEBERG_UNWRAP_OR_FAIL(auto txn,
211+
Transaction::Make(mock_table_, TransactionKind::kCreate));
212+
ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties());
213+
update->Set("create.test", "value");
214+
EXPECT_THAT(update->Commit(), IsOk());
215+
216+
auto result = txn->Commit();
217+
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
218+
EXPECT_EQ(update_call_count, 1); // No retry for kCreate
219+
}
220+
221+
TEST_F(TransactionRetryTest, NonRetryableUpdatePreventsRetry) {
222+
int update_call_count = 0;
223+
ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_))
224+
.WillByDefault(
225+
[&update_call_count](const TableIdentifier&,
226+
const std::vector<std::unique_ptr<TableRequirement>>&,
227+
const std::vector<std::unique_ptr<TableUpdate>>&)
228+
-> Result<std::shared_ptr<Table>> {
229+
++update_call_count;
230+
return CommitFailed("conflict");
231+
});
232+
233+
ICEBERG_UNWRAP_OR_FAIL(auto txn, mock_table_->NewTransaction());
234+
ICEBERG_UNWRAP_OR_FAIL(auto schema_update, txn->NewUpdateSchema());
235+
schema_update->AddColumn("new_col", int64());
236+
EXPECT_THAT(schema_update->Commit(), IsOk());
237+
238+
auto result = txn->Commit();
239+
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
240+
EXPECT_EQ(update_call_count, 1);
241+
}
242+
97243
} // namespace iceberg

src/iceberg/transaction.cc

Lines changed: 37 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -347,32 +347,22 @@ Result<std::shared_ptr<Table>> Transaction::Commit() {
347347
return ctx_->table;
348348
}
349349

350-
Result<std::shared_ptr<Table>> commit_result;
351-
if (!CanRetry()) {
352-
std::vector<std::unique_ptr<TableRequirement>> requirements;
353-
switch (ctx_->kind) {
354-
case TransactionKind::kCreate: {
355-
ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForCreateTable(updates));
356-
} break;
357-
case TransactionKind::kUpdate: {
358-
ICEBERG_ASSIGN_OR_RAISE(
359-
requirements,
360-
TableRequirements::ForUpdateTable(*ctx_->metadata_builder->base(), updates));
361-
} break;
362-
}
363-
commit_result =
364-
ctx_->table->catalog()->UpdateTable(ctx_->table->name(), requirements, updates);
365-
} else {
366-
const auto& props = ctx_->table->properties();
367-
int32_t num_retries = props.Get(TableProperties::kCommitNumRetries);
368-
int32_t min_wait_ms = props.Get(TableProperties::kCommitMinRetryWaitMs);
369-
int32_t max_wait_ms = props.Get(TableProperties::kCommitMaxRetryWaitMs);
370-
int32_t total_timeout_ms = props.Get(TableProperties::kCommitTotalRetryTimeMs);
371-
372-
commit_result =
373-
MakeCommitRetryRunner(num_retries, min_wait_ms, max_wait_ms, total_timeout_ms)
374-
.Run([this]() -> Result<std::shared_ptr<Table>> { return CommitOnce(); });
375-
}
350+
const auto& props = ctx_->table->properties();
351+
int32_t num_retries =
352+
CanRetry() ? static_cast<int32_t>(props.Get(TableProperties::kCommitNumRetries))
353+
: 0;
354+
int32_t min_wait_ms = props.Get(TableProperties::kCommitMinRetryWaitMs);
355+
int32_t max_wait_ms = props.Get(TableProperties::kCommitMaxRetryWaitMs);
356+
int32_t total_timeout_ms = props.Get(TableProperties::kCommitTotalRetryTimeMs);
357+
358+
bool is_first_attempt = true;
359+
auto commit_result =
360+
MakeCommitRetryRunner(num_retries, min_wait_ms, max_wait_ms, total_timeout_ms)
361+
.Run([this, &is_first_attempt]() -> Result<std::shared_ptr<Table>> {
362+
auto result = CommitOnce(is_first_attempt);
363+
is_first_attempt = false;
364+
return result;
365+
});
376366

377367
for (const auto& update : pending_updates_) {
378368
std::ignore = update->Finalize(commit_result.has_value()
@@ -389,27 +379,31 @@ Result<std::shared_ptr<Table>> Transaction::Commit() {
389379
return ctx_->table;
390380
}
391381

392-
Result<std::shared_ptr<Table>> Transaction::CommitOnce() {
393-
auto refresh_result = ctx_->table->Refresh();
394-
if (!refresh_result.has_value()) {
395-
return std::unexpected(refresh_result.error());
396-
}
382+
Result<std::shared_ptr<Table>> Transaction::CommitOnce(bool is_first_attempt) {
383+
std::vector<std::unique_ptr<TableRequirement>> requirements;
397384

398-
if (ctx_->metadata_builder->base() != ctx_->table->metadata().get()) {
399-
ctx_->metadata_builder =
400-
TableMetadataBuilder::BuildFrom(ctx_->table->metadata().get());
401-
for (const auto& update : pending_updates_) {
402-
auto commit_status = update->Commit();
403-
if (!commit_status.has_value()) {
404-
return std::unexpected(commit_status.error());
385+
switch (ctx_->kind) {
386+
case TransactionKind::kCreate: {
387+
ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForCreateTable(
388+
ctx_->metadata_builder->changes()));
389+
} break;
390+
case TransactionKind::kUpdate: {
391+
if (!is_first_attempt) {
392+
ICEBERG_RETURN_UNEXPECTED(ctx_->table->Refresh());
405393
}
406-
}
394+
if (ctx_->metadata_builder->base() != ctx_->table->metadata().get()) {
395+
ctx_->metadata_builder =
396+
TableMetadataBuilder::BuildFrom(ctx_->table->metadata().get());
397+
for (const auto& update : pending_updates_) {
398+
ICEBERG_RETURN_UNEXPECTED(Apply(*update));
399+
}
400+
}
401+
ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForUpdateTable(
402+
*ctx_->metadata_builder->base(),
403+
ctx_->metadata_builder->changes()));
404+
} break;
407405
}
408406

409-
ICEBERG_ASSIGN_OR_RAISE(auto requirements, TableRequirements::ForUpdateTable(
410-
*ctx_->metadata_builder->base(),
411-
ctx_->metadata_builder->changes()));
412-
413407
return ctx_->table->catalog()->UpdateTable(ctx_->table->name(), requirements,
414408
ctx_->metadata_builder->changes());
415409
}

src/iceberg/transaction.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
138138
Status ApplyUpdateSortOrder(UpdateSortOrder& update);
139139
Status ApplyUpdateStatistics(UpdateStatistics& update);
140140

141-
/// \brief Perform a single commit attempt for UPDATE transactions
142-
Result<std::shared_ptr<Table>> CommitOnce();
141+
/// \brief Perform a single commit attempt
142+
Result<std::shared_ptr<Table>> CommitOnce(bool is_first_attempt);
143143

144144
/// \brief Whether this transaction can retry after a commit conflict.
145145
bool CanRetry() const;

src/iceberg/update/snapshot_update.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,7 @@ Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDeleteManifests(
218218
}
219219

220220
int64_t SnapshotUpdate::SnapshotId() {
221-
while (!snapshot_id_.has_value() ||
222-
base().SnapshotById(snapshot_id_.value()).has_value()) {
221+
if (!snapshot_id_.has_value()) {
223222
snapshot_id_ = SnapshotUtil::GenerateSnapshotId(base());
224223
}
225224
return snapshot_id_.value();

src/iceberg/update/update_partition_spec.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ class ICEBERG_EXPORT UpdatePartitionSpec : public PendingUpdate {
100100

101101
Kind kind() const final { return Kind::kUpdatePartitionSpec; }
102102

103+
bool IsRetryable() const override { return false; }
104+
103105
struct ApplyResult {
104106
std::shared_ptr<PartitionSpec> spec;
105107
bool set_as_default;

src/iceberg/update/update_partition_statistics.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ class ICEBERG_EXPORT UpdatePartitionStatistics : public PendingUpdate {
6363

6464
Kind kind() const final { return Kind::kUpdatePartitionStatistics; }
6565

66+
bool IsRetryable() const override { return false; }
67+
6668
struct ApplyResult {
6769
std::vector<std::pair<int64_t, std::shared_ptr<PartitionStatisticsFile>>> to_set;
6870
std::vector<int64_t> to_remove;

src/iceberg/update/update_schema.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,8 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate {
334334

335335
Kind kind() const final { return Kind::kUpdateSchema; }
336336

337+
bool IsRetryable() const override { return false; }
338+
337339
struct ApplyResult {
338340
std::shared_ptr<Schema> schema;
339341
int32_t new_last_column_id;

src/iceberg/update/update_statistics.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ class ICEBERG_EXPORT UpdateStatistics : public PendingUpdate {
6262

6363
Kind kind() const final { return Kind::kUpdateStatistics; }
6464

65+
bool IsRetryable() const override { return false; }
66+
6567
struct ApplyResult {
6668
std::vector<std::pair<int64_t, std::shared_ptr<StatisticsFile>>> to_set;
6769
std::vector<int64_t> to_remove;

0 commit comments

Comments
 (0)