Skip to content

Commit 7108f53

Browse files
committed
fix
1 parent 5748808 commit 7108f53

File tree

10 files changed

+220
-57
lines changed

10 files changed

+220
-57
lines changed

src/iceberg/table.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ Status Table::Refresh() {
8787
ICEBERG_ASSIGN_OR_RAISE(auto refreshed_table, catalog_->LoadTable(identifier_));
8888
if (metadata_location_ != refreshed_table->metadata_file_location()) {
8989
metadata_ = std::move(refreshed_table->metadata_);
90-
metadata_location_ = std::string(refreshed_table->metadata_file_location());
9190
io_ = std::move(refreshed_table->io_);
9291
metadata_cache_ = std::make_unique<TableMetadataCache>(metadata_.get());
9392
}

src/iceberg/test/transaction_test.cc

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@
2323
#include "iceberg/expression/term.h"
2424
#include "iceberg/sort_order.h"
2525
#include "iceberg/test/matchers.h"
26+
#include "iceberg/test/mock_catalog.h"
2627
#include "iceberg/test/update_test_base.h"
2728
#include "iceberg/transform.h"
29+
#include "iceberg/type.h"
2830
#include "iceberg/update/update_properties.h"
31+
#include "iceberg/update/update_schema.h"
2932
#include "iceberg/update/update_sort_order.h"
3033

3134
namespace iceberg {
@@ -94,4 +97,148 @@ TEST_F(TransactionTest, MultipleUpdatesInTransaction) {
9497
EXPECT_EQ(*sort_order, *expected_sort_order);
9598
}
9699

100+
101+
class TransactionRetryTest : public UpdateTestBase {
102+
protected:
103+
void SetUp() override {
104+
UpdateTestBase::SetUp();
105+
106+
// Create a MockCatalog and wire it to the existing table
107+
mock_catalog_ = std::make_shared<::testing::NiceMock<MockCatalog>>();
108+
109+
ON_CALL(*mock_catalog_, LoadTable(::testing::_))
110+
.WillByDefault([this](const TableIdentifier&) -> Result<std::shared_ptr<Table>> {
111+
return Table::Make(table_->name(), table_->metadata(),
112+
std::string(table_->metadata_file_location()), table_->io(),
113+
mock_catalog_);
114+
});
115+
116+
// Create a table instance bound to the mock catalog
117+
auto result = Table::Make(table_->name(), table_->metadata(),
118+
std::string(table_->metadata_file_location()), table_->io(),
119+
mock_catalog_);
120+
ASSERT_THAT(result, IsOk());
121+
mock_table_ = std::move(result.value());
122+
}
123+
124+
std::shared_ptr<::testing::NiceMock<MockCatalog>> mock_catalog_;
125+
std::shared_ptr<Table> mock_table_;
126+
};
127+
128+
TEST_F(TransactionRetryTest, CommitRetrySucceedsAfterConflict) {
129+
int update_call_count = 0;
130+
ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_))
131+
.WillByDefault([this, &update_call_count](
132+
const TableIdentifier&,
133+
const std::vector<std::unique_ptr<TableRequirement>>&,
134+
const std::vector<std::unique_ptr<TableUpdate>>&)
135+
-> Result<std::shared_ptr<Table>> {
136+
++update_call_count;
137+
if (update_call_count == 1) {
138+
return CommitFailed("conflict on first attempt");
139+
}
140+
return Table::Make(mock_table_->name(), mock_table_->metadata(),
141+
std::string(mock_table_->metadata_file_location()),
142+
mock_table_->io(), mock_catalog_);
143+
});
144+
145+
ICEBERG_UNWRAP_OR_FAIL(auto txn, mock_table_->NewTransaction());
146+
ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties());
147+
update->Set("retry.test", "value");
148+
EXPECT_THAT(update->Commit(), IsOk());
149+
150+
auto result = txn->Commit();
151+
EXPECT_THAT(result, IsOk());
152+
EXPECT_EQ(update_call_count, 2);
153+
}
154+
155+
TEST_F(TransactionRetryTest, CommitRetryExhausted) {
156+
int update_call_count = 0;
157+
ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_))
158+
.WillByDefault(
159+
[&update_call_count](const TableIdentifier&,
160+
const std::vector<std::unique_ptr<TableRequirement>>&,
161+
const std::vector<std::unique_ptr<TableUpdate>>&)
162+
-> Result<std::shared_ptr<Table>> {
163+
++update_call_count;
164+
return CommitFailed("always conflicts");
165+
});
166+
167+
ICEBERG_UNWRAP_OR_FAIL(auto txn, mock_table_->NewTransaction());
168+
ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties());
169+
update->Set("retry.test", "value");
170+
EXPECT_THAT(update->Commit(), IsOk());
171+
172+
auto result = txn->Commit();
173+
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
174+
EXPECT_EQ(update_call_count, 5);
175+
}
176+
177+
TEST_F(TransactionRetryTest, CommitNonRetryableErrorStopsImmediately) {
178+
int update_call_count = 0;
179+
ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_))
180+
.WillByDefault(
181+
[&update_call_count](const TableIdentifier&,
182+
const std::vector<std::unique_ptr<TableRequirement>>&,
183+
const std::vector<std::unique_ptr<TableUpdate>>&)
184+
-> Result<std::shared_ptr<Table>> {
185+
++update_call_count;
186+
return CommitStateUnknown("unknown state");
187+
});
188+
189+
ICEBERG_UNWRAP_OR_FAIL(auto txn, mock_table_->NewTransaction());
190+
ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties());
191+
update->Set("retry.test", "value");
192+
EXPECT_THAT(update->Commit(), IsOk());
193+
194+
auto result = txn->Commit();
195+
EXPECT_THAT(result, IsError(ErrorKind::kCommitStateUnknown));
196+
EXPECT_EQ(update_call_count, 1); // Should not retry
197+
}
198+
199+
TEST_F(TransactionRetryTest, CreateTransactionDoesNotRetry) {
200+
int update_call_count = 0;
201+
ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_))
202+
.WillByDefault(
203+
[&update_call_count](const TableIdentifier&,
204+
const std::vector<std::unique_ptr<TableRequirement>>&,
205+
const std::vector<std::unique_ptr<TableUpdate>>&)
206+
-> Result<std::shared_ptr<Table>> {
207+
++update_call_count;
208+
return CommitFailed("conflict");
209+
});
210+
211+
ICEBERG_UNWRAP_OR_FAIL(auto txn,
212+
Transaction::Make(mock_table_, TransactionKind::kCreate));
213+
ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties());
214+
update->Set("create.test", "value");
215+
EXPECT_THAT(update->Commit(), IsOk());
216+
217+
auto result = txn->Commit();
218+
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
219+
EXPECT_EQ(update_call_count, 1); // No retry for kCreate
220+
}
221+
222+
TEST_F(TransactionRetryTest, NonRetryableUpdatePreventsRetry) {
223+
int update_call_count = 0;
224+
ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_))
225+
.WillByDefault(
226+
[&update_call_count](const TableIdentifier&,
227+
const std::vector<std::unique_ptr<TableRequirement>>&,
228+
const std::vector<std::unique_ptr<TableUpdate>>&)
229+
-> Result<std::shared_ptr<Table>> {
230+
++update_call_count;
231+
return CommitFailed("conflict");
232+
});
233+
234+
ICEBERG_UNWRAP_OR_FAIL(auto txn, mock_table_->NewTransaction());
235+
ICEBERG_UNWRAP_OR_FAIL(auto schema_update, txn->NewUpdateSchema());
236+
schema_update->AddColumn("new_col", int64());
237+
EXPECT_THAT(schema_update->Commit(), IsOk());
238+
239+
auto result = txn->Commit();
240+
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
241+
EXPECT_EQ(update_call_count, 1);
242+
}
243+
97244
} // namespace iceberg

src/iceberg/transaction.cc

Lines changed: 37 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -347,32 +347,22 @@ Result<std::shared_ptr<Table>> Transaction::Commit() {
347347
return ctx_->table;
348348
}
349349

350-
Result<std::shared_ptr<Table>> commit_result;
351-
if (!CanRetry()) {
352-
std::vector<std::unique_ptr<TableRequirement>> requirements;
353-
switch (ctx_->kind) {
354-
case TransactionKind::kCreate: {
355-
ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForCreateTable(updates));
356-
} break;
357-
case TransactionKind::kUpdate: {
358-
ICEBERG_ASSIGN_OR_RAISE(
359-
requirements,
360-
TableRequirements::ForUpdateTable(*ctx_->metadata_builder->base(), updates));
361-
} break;
362-
}
363-
commit_result =
364-
ctx_->table->catalog()->UpdateTable(ctx_->table->name(), requirements, updates);
365-
} else {
366-
const auto& props = ctx_->table->properties();
367-
int32_t num_retries = props.Get(TableProperties::kCommitNumRetries);
368-
int32_t min_wait_ms = props.Get(TableProperties::kCommitMinRetryWaitMs);
369-
int32_t max_wait_ms = props.Get(TableProperties::kCommitMaxRetryWaitMs);
370-
int32_t total_timeout_ms = props.Get(TableProperties::kCommitTotalRetryTimeMs);
371-
372-
commit_result =
373-
MakeCommitRetryRunner(num_retries, min_wait_ms, max_wait_ms, total_timeout_ms)
374-
.Run([this]() -> Result<std::shared_ptr<Table>> { return CommitOnce(); });
375-
}
350+
const auto& props = ctx_->table->properties();
351+
int32_t num_retries =
352+
CanRetry() ? static_cast<int32_t>(props.Get(TableProperties::kCommitNumRetries))
353+
: 0;
354+
int32_t min_wait_ms = props.Get(TableProperties::kCommitMinRetryWaitMs);
355+
int32_t max_wait_ms = props.Get(TableProperties::kCommitMaxRetryWaitMs);
356+
int32_t total_timeout_ms = props.Get(TableProperties::kCommitTotalRetryTimeMs);
357+
358+
bool is_first_attempt = true;
359+
auto commit_result =
360+
MakeCommitRetryRunner(num_retries, min_wait_ms, max_wait_ms, total_timeout_ms)
361+
.Run([this, &is_first_attempt]() -> Result<std::shared_ptr<Table>> {
362+
auto result = CommitOnce(is_first_attempt);
363+
is_first_attempt = false;
364+
return result;
365+
});
376366

377367
for (const auto& update : pending_updates_) {
378368
std::ignore = update->Finalize(commit_result.has_value()
@@ -389,27 +379,31 @@ Result<std::shared_ptr<Table>> Transaction::Commit() {
389379
return ctx_->table;
390380
}
391381

392-
Result<std::shared_ptr<Table>> Transaction::CommitOnce() {
393-
auto refresh_result = ctx_->table->Refresh();
394-
if (!refresh_result.has_value()) {
395-
return std::unexpected(refresh_result.error());
396-
}
382+
Result<std::shared_ptr<Table>> Transaction::CommitOnce(bool is_first_attempt) {
383+
std::vector<std::unique_ptr<TableRequirement>> requirements;
397384

398-
if (ctx_->metadata_builder->base() != ctx_->table->metadata().get()) {
399-
ctx_->metadata_builder =
400-
TableMetadataBuilder::BuildFrom(ctx_->table->metadata().get());
401-
for (const auto& update : pending_updates_) {
402-
auto commit_status = update->Commit();
403-
if (!commit_status.has_value()) {
404-
return std::unexpected(commit_status.error());
385+
switch (ctx_->kind) {
386+
case TransactionKind::kCreate: {
387+
ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForCreateTable(
388+
ctx_->metadata_builder->changes()));
389+
} break;
390+
case TransactionKind::kUpdate: {
391+
if (!is_first_attempt) {
392+
ICEBERG_RETURN_UNEXPECTED(ctx_->table->Refresh());
405393
}
406-
}
394+
if (ctx_->metadata_builder->base() != ctx_->table->metadata().get()) {
395+
ctx_->metadata_builder =
396+
TableMetadataBuilder::BuildFrom(ctx_->table->metadata().get());
397+
for (const auto& update : pending_updates_) {
398+
ICEBERG_RETURN_UNEXPECTED(Apply(*update));
399+
}
400+
}
401+
ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForUpdateTable(
402+
*ctx_->metadata_builder->base(),
403+
ctx_->metadata_builder->changes()));
404+
} break;
407405
}
408406

409-
ICEBERG_ASSIGN_OR_RAISE(auto requirements, TableRequirements::ForUpdateTable(
410-
*ctx_->metadata_builder->base(),
411-
ctx_->metadata_builder->changes()));
412-
413407
return ctx_->table->catalog()->UpdateTable(ctx_->table->name(), requirements,
414408
ctx_->metadata_builder->changes());
415409
}

src/iceberg/transaction.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
138138
Status ApplyUpdateSortOrder(UpdateSortOrder& update);
139139
Status ApplyUpdateStatistics(UpdateStatistics& update);
140140

141-
/// \brief Perform a single commit attempt for UPDATE transactions
142-
Result<std::shared_ptr<Table>> CommitOnce();
141+
/// \brief Perform a single commit attempt
142+
Result<std::shared_ptr<Table>> CommitOnce(bool is_first_attempt);
143143

144144
/// \brief Whether this transaction can retry after a commit conflict.
145145
bool CanRetry() const;

src/iceberg/update/snapshot_update.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,7 @@ Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDeleteManifests(
218218
}
219219

220220
int64_t SnapshotUpdate::SnapshotId() {
221-
while (!snapshot_id_.has_value() ||
222-
base().SnapshotById(snapshot_id_.value()).has_value()) {
221+
if (!snapshot_id_.has_value()) {
223222
snapshot_id_ = SnapshotUtil::GenerateSnapshotId(base());
224223
}
225224
return snapshot_id_.value();

src/iceberg/update/update_partition_spec.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ class ICEBERG_EXPORT UpdatePartitionSpec : public PendingUpdate {
100100

101101
Kind kind() const final { return Kind::kUpdatePartitionSpec; }
102102

103+
bool IsRetryable() const override { return false; }
104+
103105
struct ApplyResult {
104106
std::shared_ptr<PartitionSpec> spec;
105107
bool set_as_default;

src/iceberg/update/update_partition_statistics.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ class ICEBERG_EXPORT UpdatePartitionStatistics : public PendingUpdate {
6363

6464
Kind kind() const final { return Kind::kUpdatePartitionStatistics; }
6565

66+
bool IsRetryable() const override { return false; }
67+
6668
struct ApplyResult {
6769
std::vector<std::pair<int64_t, std::shared_ptr<PartitionStatisticsFile>>> to_set;
6870
std::vector<int64_t> to_remove;

src/iceberg/update/update_schema.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,8 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate {
334334

335335
Kind kind() const final { return Kind::kUpdateSchema; }
336336

337+
bool IsRetryable() const override { return false; }
338+
337339
struct ApplyResult {
338340
std::shared_ptr<Schema> schema;
339341
int32_t new_last_column_id;

src/iceberg/update/update_statistics.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ class ICEBERG_EXPORT UpdateStatistics : public PendingUpdate {
6262

6363
Kind kind() const final { return Kind::kUpdateStatistics; }
6464

65+
bool IsRetryable() const override { return false; }
66+
6567
struct ApplyResult {
6668
std::vector<std::pair<int64_t, std::shared_ptr<StatisticsFile>>> to_set;
6769
std::vector<int64_t> to_remove;

0 commit comments

Comments
 (0)