Skip to content

Commit 9f9f12b

Browse files
committed
fix
1 parent 5748808 commit 9f9f12b

File tree

9 files changed

+73
-57
lines changed

9 files changed

+73
-57
lines changed

src/iceberg/table.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ Status Table::Refresh() {
8787
ICEBERG_ASSIGN_OR_RAISE(auto refreshed_table, catalog_->LoadTable(identifier_));
8888
if (metadata_location_ != refreshed_table->metadata_file_location()) {
8989
metadata_ = std::move(refreshed_table->metadata_);
90-
metadata_location_ = std::string(refreshed_table->metadata_file_location());
9190
io_ = std::move(refreshed_table->io_);
9291
metadata_cache_ = std::make_unique<TableMetadataCache>(metadata_.get());
9392
}

src/iceberg/transaction.cc

Lines changed: 37 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -347,32 +347,22 @@ Result<std::shared_ptr<Table>> Transaction::Commit() {
347347
return ctx_->table;
348348
}
349349

350-
Result<std::shared_ptr<Table>> commit_result;
351-
if (!CanRetry()) {
352-
std::vector<std::unique_ptr<TableRequirement>> requirements;
353-
switch (ctx_->kind) {
354-
case TransactionKind::kCreate: {
355-
ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForCreateTable(updates));
356-
} break;
357-
case TransactionKind::kUpdate: {
358-
ICEBERG_ASSIGN_OR_RAISE(
359-
requirements,
360-
TableRequirements::ForUpdateTable(*ctx_->metadata_builder->base(), updates));
361-
} break;
362-
}
363-
commit_result =
364-
ctx_->table->catalog()->UpdateTable(ctx_->table->name(), requirements, updates);
365-
} else {
366-
const auto& props = ctx_->table->properties();
367-
int32_t num_retries = props.Get(TableProperties::kCommitNumRetries);
368-
int32_t min_wait_ms = props.Get(TableProperties::kCommitMinRetryWaitMs);
369-
int32_t max_wait_ms = props.Get(TableProperties::kCommitMaxRetryWaitMs);
370-
int32_t total_timeout_ms = props.Get(TableProperties::kCommitTotalRetryTimeMs);
371-
372-
commit_result =
373-
MakeCommitRetryRunner(num_retries, min_wait_ms, max_wait_ms, total_timeout_ms)
374-
.Run([this]() -> Result<std::shared_ptr<Table>> { return CommitOnce(); });
375-
}
350+
const auto& props = ctx_->table->properties();
351+
int32_t num_retries =
352+
CanRetry() ? static_cast<int32_t>(props.Get(TableProperties::kCommitNumRetries))
353+
: 0;
354+
int32_t min_wait_ms = props.Get(TableProperties::kCommitMinRetryWaitMs);
355+
int32_t max_wait_ms = props.Get(TableProperties::kCommitMaxRetryWaitMs);
356+
int32_t total_timeout_ms = props.Get(TableProperties::kCommitTotalRetryTimeMs);
357+
358+
bool is_first_attempt = true;
359+
auto commit_result =
360+
MakeCommitRetryRunner(num_retries, min_wait_ms, max_wait_ms, total_timeout_ms)
361+
.Run([this, &is_first_attempt]() -> Result<std::shared_ptr<Table>> {
362+
auto result = CommitOnce(is_first_attempt);
363+
is_first_attempt = false;
364+
return result;
365+
});
376366

377367
for (const auto& update : pending_updates_) {
378368
std::ignore = update->Finalize(commit_result.has_value()
@@ -389,27 +379,31 @@ Result<std::shared_ptr<Table>> Transaction::Commit() {
389379
return ctx_->table;
390380
}
391381

392-
Result<std::shared_ptr<Table>> Transaction::CommitOnce() {
393-
auto refresh_result = ctx_->table->Refresh();
394-
if (!refresh_result.has_value()) {
395-
return std::unexpected(refresh_result.error());
396-
}
382+
Result<std::shared_ptr<Table>> Transaction::CommitOnce(bool is_first_attempt) {
383+
std::vector<std::unique_ptr<TableRequirement>> requirements;
397384

398-
if (ctx_->metadata_builder->base() != ctx_->table->metadata().get()) {
399-
ctx_->metadata_builder =
400-
TableMetadataBuilder::BuildFrom(ctx_->table->metadata().get());
401-
for (const auto& update : pending_updates_) {
402-
auto commit_status = update->Commit();
403-
if (!commit_status.has_value()) {
404-
return std::unexpected(commit_status.error());
385+
switch (ctx_->kind) {
386+
case TransactionKind::kCreate: {
387+
ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForCreateTable(
388+
ctx_->metadata_builder->changes()));
389+
} break;
390+
case TransactionKind::kUpdate: {
391+
if (!is_first_attempt) {
392+
ICEBERG_RETURN_UNEXPECTED(ctx_->table->Refresh());
405393
}
406-
}
394+
if (ctx_->metadata_builder->base() != ctx_->table->metadata().get()) {
395+
ctx_->metadata_builder =
396+
TableMetadataBuilder::BuildFrom(ctx_->table->metadata().get());
397+
for (const auto& update : pending_updates_) {
398+
ICEBERG_RETURN_UNEXPECTED(Apply(*update));
399+
}
400+
}
401+
ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForUpdateTable(
402+
*ctx_->metadata_builder->base(),
403+
ctx_->metadata_builder->changes()));
404+
} break;
407405
}
408406

409-
ICEBERG_ASSIGN_OR_RAISE(auto requirements, TableRequirements::ForUpdateTable(
410-
*ctx_->metadata_builder->base(),
411-
ctx_->metadata_builder->changes()));
412-
413407
return ctx_->table->catalog()->UpdateTable(ctx_->table->name(), requirements,
414408
ctx_->metadata_builder->changes());
415409
}

src/iceberg/transaction.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
138138
Status ApplyUpdateSortOrder(UpdateSortOrder& update);
139139
Status ApplyUpdateStatistics(UpdateStatistics& update);
140140

141-
/// \brief Perform a single commit attempt for UPDATE transactions
142-
Result<std::shared_ptr<Table>> CommitOnce();
141+
/// \brief Perform a single commit attempt
142+
Result<std::shared_ptr<Table>> CommitOnce(bool is_first_attempt);
143143

144144
/// \brief Whether this transaction can retry after a commit conflict.
145145
bool CanRetry() const;

src/iceberg/update/snapshot_update.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,7 @@ Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDeleteManifests(
218218
}
219219

220220
int64_t SnapshotUpdate::SnapshotId() {
221-
while (!snapshot_id_.has_value() ||
222-
base().SnapshotById(snapshot_id_.value()).has_value()) {
221+
if (!snapshot_id_.has_value()) {
223222
snapshot_id_ = SnapshotUtil::GenerateSnapshotId(base());
224223
}
225224
return snapshot_id_.value();

src/iceberg/update/update_partition_spec.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ class ICEBERG_EXPORT UpdatePartitionSpec : public PendingUpdate {
100100

101101
Kind kind() const final { return Kind::kUpdatePartitionSpec; }
102102

103+
bool IsRetryable() const override { return false; }
104+
103105
struct ApplyResult {
104106
std::shared_ptr<PartitionSpec> spec;
105107
bool set_as_default;

src/iceberg/update/update_partition_statistics.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ class ICEBERG_EXPORT UpdatePartitionStatistics : public PendingUpdate {
6363

6464
Kind kind() const final { return Kind::kUpdatePartitionStatistics; }
6565

66+
bool IsRetryable() const override { return false; }
67+
6668
struct ApplyResult {
6769
std::vector<std::pair<int64_t, std::shared_ptr<PartitionStatisticsFile>>> to_set;
6870
std::vector<int64_t> to_remove;

src/iceberg/update/update_schema.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,8 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate {
334334

335335
Kind kind() const final { return Kind::kUpdateSchema; }
336336

337+
bool IsRetryable() const override { return false; }
338+
337339
struct ApplyResult {
338340
std::shared_ptr<Schema> schema;
339341
int32_t new_last_column_id;

src/iceberg/update/update_statistics.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ class ICEBERG_EXPORT UpdateStatistics : public PendingUpdate {
6262

6363
Kind kind() const final { return Kind::kUpdateStatistics; }
6464

65+
bool IsRetryable() const override { return false; }
66+
6567
struct ApplyResult {
6668
std::vector<std::pair<int64_t, std::shared_ptr<StatisticsFile>>> to_set;
6769
std::vector<int64_t> to_remove;

src/iceberg/util/retry_util.h

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
#include <algorithm>
2323
#include <chrono>
24-
#include <functional>
2524
#include <random>
2625
#include <thread>
2726
#include <vector>
@@ -51,26 +50,48 @@ class RetryRunner {
5150
explicit RetryRunner(RetryConfig config = {}) : config_(std::move(config)) {}
5251

5352
/// \brief Specify error types that should trigger a retry.
53+
///
54+
/// When set, only errors matching one of these kinds will be retried.
55+
/// All other errors will stop retries immediately.
56+
///
57+
/// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set,
58+
/// StopRetryOn is ignored.
5459
RetryRunner& OnlyRetryOn(std::initializer_list<ErrorKind> error_kinds) {
5560
only_retry_on_ = std::vector<ErrorKind>(error_kinds);
5661
return *this;
5762
}
5863

59-
/// \brief Specify a single error type that should trigger a retry
64+
/// \brief Specify a single error type that should trigger a retry.
65+
///
66+
/// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set,
67+
/// StopRetryOn is ignored.
6068
RetryRunner& OnlyRetryOn(ErrorKind error_kind) {
6169
only_retry_on_ = std::vector<ErrorKind>{error_kind};
6270
return *this;
6371
}
6472

65-
/// \brief Specify error types that should stop retries immediately
73+
/// \brief Specify error types that should stop retries immediately.
74+
///
75+
/// When set, errors matching one of these kinds will not be retried.
76+
/// All other errors will be retried.
77+
///
78+
/// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set,
79+
/// StopRetryOn is ignored.
6680
RetryRunner& StopRetryOn(std::initializer_list<ErrorKind> error_kinds) {
6781
stop_retry_on_ = std::vector<ErrorKind>(error_kinds);
6882
return *this;
6983
}
7084

7185
/// \brief Run a task that returns a Result<T>
86+
///
87+
/// TODO: Replace attempt_counter with a metrics reporter once it is available.
7288
template <typename F, typename T = typename std::invoke_result_t<F>::value_type>
7389
Result<T> Run(F&& task, int32_t* attempt_counter = nullptr) {
90+
if (config_.num_retries < 0) {
91+
return InvalidArgument("num_retries must be non-negative, got {}",
92+
config_.num_retries);
93+
}
94+
7495
auto start_time = std::chrono::steady_clock::now();
7596
int32_t attempt = 0;
7697
int32_t max_attempts = config_.num_retries + 1;
@@ -103,7 +124,7 @@ class RetryRunner {
103124
}
104125

105126
int32_t delay_ms = CalculateDelay(attempt);
106-
Sleep(delay_ms);
127+
std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
107128
}
108129
}
109130

@@ -137,11 +158,6 @@ class RetryRunner {
137158
return std::max(1, delay_ms);
138159
}
139160

140-
/// \brief Sleep for the specified duration
141-
void Sleep(int32_t ms) const {
142-
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
143-
}
144-
145161
RetryConfig config_;
146162
std::vector<ErrorKind> only_retry_on_;
147163
std::vector<ErrorKind> stop_retry_on_;

0 commit comments

Comments
 (0)