Skip to content

Commit aa78c0e

Browse files
committed
polish design and fix some issues
1 parent a0eda1a commit aa78c0e

19 files changed

Lines changed: 196 additions & 61 deletions

src/iceberg/table.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ Status Table::Refresh() {
8787
ICEBERG_ASSIGN_OR_RAISE(auto refreshed_table, catalog_->LoadTable(identifier_));
8888
if (metadata_location_ != refreshed_table->metadata_file_location()) {
8989
metadata_ = std::move(refreshed_table->metadata_);
90+
metadata_location_ = std::string(refreshed_table->metadata_file_location());
9091
io_ = std::move(refreshed_table->io_);
9192
metadata_cache_ = std::make_unique<TableMetadataCache>(metadata_.get());
9293
}

src/iceberg/test/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ iceberg_tests = {
9191
'formatter_test.cc',
9292
'location_util_test.cc',
9393
'position_delete_index_test.cc',
94+
'retry_util_test.cc',
9495
'roaring_position_bitmap_test.cc',
9596
'string_util_test.cc',
9697
'struct_like_set_test.cc',

src/iceberg/test/retry_util_test.cc

Lines changed: 31 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@
1919

2020
#include "iceberg/util/retry_util.h"
2121

22+
#include <chrono>
23+
#include <thread>
24+
2225
#include <gtest/gtest.h>
2326

2427
#include "iceberg/result.h"
2528
#include "iceberg/test/matchers.h"
2629

2730
namespace iceberg {
2831

29-
// --------------------------------------------------------------------------
30-
// Test: Successful on first attempt — no retries
31-
// --------------------------------------------------------------------------
3232
TEST(RetryRunnerTest, SuccessOnFirstAttempt) {
3333
int call_count = 0;
3434
int32_t attempts = 0;
@@ -50,9 +50,6 @@ TEST(RetryRunnerTest, SuccessOnFirstAttempt) {
5050
EXPECT_EQ(attempts, 1);
5151
}
5252

53-
// --------------------------------------------------------------------------
54-
// Test: Retry once then succeed
55-
// --------------------------------------------------------------------------
5653
TEST(RetryRunnerTest, RetryOnceThenSucceed) {
5754
int call_count = 0;
5855
int32_t attempts = 0;
@@ -77,9 +74,6 @@ TEST(RetryRunnerTest, RetryOnceThenSucceed) {
7774
EXPECT_EQ(attempts, 2);
7875
}
7976

80-
// --------------------------------------------------------------------------
81-
// Test: Max attempts exhausted
82-
// --------------------------------------------------------------------------
8377
TEST(RetryRunnerTest, MaxAttemptsExhausted) {
8478
int call_count = 0;
8579
int32_t attempts = 0;
@@ -96,13 +90,10 @@ TEST(RetryRunnerTest, MaxAttemptsExhausted) {
9690
&attempts);
9791

9892
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
99-
EXPECT_EQ(call_count, 3); // 1 initial + 2 retries
93+
EXPECT_EQ(call_count, 3);
10094
EXPECT_EQ(attempts, 3);
10195
}
10296

103-
// --------------------------------------------------------------------------
104-
// Test: OnlyRetryOn filters correctly
105-
// --------------------------------------------------------------------------
10697
TEST(RetryRunnerTest, OnlyRetryOnFilter) {
10798
int call_count = 0;
10899
int32_t attempts = 0;
@@ -115,20 +106,15 @@ TEST(RetryRunnerTest, OnlyRetryOnFilter) {
115106
.Run(
116107
[&]() -> Result<int> {
117108
++call_count;
118-
// Return a non-retryable error
119109
return ValidationFailed("schema conflict");
120110
},
121111
&attempts);
122112

123-
// Should NOT retry because ValidationFailed is not in the retry list
124113
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
125114
EXPECT_EQ(call_count, 1);
126115
EXPECT_EQ(attempts, 1);
127116
}
128117

129-
// --------------------------------------------------------------------------
130-
// Test: OnlyRetryOn retries matching error
131-
// --------------------------------------------------------------------------
132118
TEST(RetryRunnerTest, OnlyRetryOnMatchingError) {
133119
int call_count = 0;
134120
int32_t attempts = 0;
@@ -150,13 +136,10 @@ TEST(RetryRunnerTest, OnlyRetryOnMatchingError) {
150136

151137
EXPECT_THAT(result, IsOk());
152138
EXPECT_EQ(*result, 100);
153-
EXPECT_EQ(call_count, 3); // 2 failures + 1 success
139+
EXPECT_EQ(call_count, 3);
154140
EXPECT_EQ(attempts, 3);
155141
}
156142

157-
// --------------------------------------------------------------------------
158-
// Test: StopRetryOn stops on matching error
159-
// --------------------------------------------------------------------------
160143
TEST(RetryRunnerTest, StopRetryOnMatchingError) {
161144
int call_count = 0;
162145
int32_t attempts = 0;
@@ -178,9 +161,6 @@ TEST(RetryRunnerTest, StopRetryOnMatchingError) {
178161
EXPECT_EQ(attempts, 1);
179162
}
180163

181-
// --------------------------------------------------------------------------
182-
// Test: Zero retries means only one attempt
183-
// --------------------------------------------------------------------------
184164
TEST(RetryRunnerTest, ZeroRetries) {
185165
int call_count = 0;
186166
int32_t attempts = 0;
@@ -201,19 +181,40 @@ TEST(RetryRunnerTest, ZeroRetries) {
201181
EXPECT_EQ(attempts, 1);
202182
}
203183

204-
// --------------------------------------------------------------------------
205-
// Test: MakeCommitRetryRunner has correct configuration
206-
// --------------------------------------------------------------------------
184+
TEST(RetryRunnerTest, TotalTimeoutStopsBeforeStartingAnotherAttempt) {
185+
int call_count = 0;
186+
int32_t attempts = 0;
187+
188+
auto result = RetryRunner(RetryConfig{.num_retries = 3,
189+
.min_wait_ms = 20,
190+
.max_wait_ms = 20,
191+
.total_timeout_ms = 15})
192+
.Run(
193+
[&]() -> Result<int> {
194+
++call_count;
195+
// The first failure consumes most of the 15 ms budget, so the
196+
// next 20 ms backoff should prevent another attempt from
197+
// starting.
198+
if (call_count == 1) {
199+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
200+
}
201+
return CommitFailed("retry budget exhausted");
202+
},
203+
&attempts);
204+
205+
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
206+
EXPECT_EQ(call_count, 1);
207+
EXPECT_EQ(attempts, 1);
208+
}
209+
207210
TEST(RetryRunnerTest, MakeCommitRetryRunnerConfig) {
208211
int call_count = 0;
209212
int32_t attempts = 0;
210213

211-
// MakeCommitRetryRunner should only retry on kCommitFailed
212214
auto result = MakeCommitRetryRunner(2, 1, 10, 5000)
213215
.Run(
214216
[&]() -> Result<int> {
215217
++call_count;
216-
// ValidationFailed should not be retried
217218
return ValidationFailed("not retryable");
218219
},
219220
&attempts);
@@ -223,9 +224,6 @@ TEST(RetryRunnerTest, MakeCommitRetryRunnerConfig) {
223224
EXPECT_EQ(attempts, 1);
224225
}
225226

226-
// --------------------------------------------------------------------------
227-
// Test: MakeCommitRetryRunner retries CommitFailed
228-
// --------------------------------------------------------------------------
229227
TEST(RetryRunnerTest, MakeCommitRetryRunnerRetriesCommitFailed) {
230228
int call_count = 0;
231229
int32_t attempts = 0;
@@ -247,9 +245,6 @@ TEST(RetryRunnerTest, MakeCommitRetryRunnerRetriesCommitFailed) {
247245
EXPECT_EQ(attempts, 3);
248246
}
249247

250-
// --------------------------------------------------------------------------
251-
// Test: OnlyRetryOn with multiple error kinds
252-
// --------------------------------------------------------------------------
253248
TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) {
254249
int call_count = 0;
255250
int32_t attempts = 0;
@@ -279,9 +274,6 @@ TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) {
279274
EXPECT_EQ(attempts, 3);
280275
}
281276

282-
// --------------------------------------------------------------------------
283-
// Test: Default retry (no filter) retries all errors
284-
// --------------------------------------------------------------------------
285277
TEST(RetryRunnerTest, DefaultRetryAllErrors) {
286278
int call_count = 0;
287279
int32_t attempts = 0;

src/iceberg/test/table_test.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ TYPED_TEST(TypedTableTest, Refresh) {
128128
.WillOnce(::testing::Return(refreshed));
129129
}
130130
EXPECT_THAT(table->Refresh(), IsOk());
131+
if constexpr (std::is_same_v<TypeParam, Table>) {
132+
EXPECT_EQ(table->metadata_file_location(), "s3://bucket/meta2.json");
133+
}
131134
} else {
132135
EXPECT_THAT(table->Refresh(), IsError(ErrorKind::kNotSupported));
133136
}

src/iceberg/test/update_statistics_test.cc

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@
2121

2222
#include <algorithm>
2323
#include <memory>
24+
#include <string>
2425

2526
#include <gmock/gmock.h>
2627
#include <gtest/gtest.h>
2728

2829
#include "iceberg/result.h"
2930
#include "iceberg/statistics_file.h"
3031
#include "iceberg/test/matchers.h"
32+
#include "iceberg/test/mock_catalog.h"
3133
#include "iceberg/test/update_test_base.h"
3234

3335
namespace iceberg {
@@ -66,6 +68,35 @@ class UpdateStatisticsTest : public UpdateTestBase {
6668
}
6769
};
6870

71+
class UpdateStatisticsRetryTest : public UpdateStatisticsTest {
72+
protected:
73+
void SetUp() override {
74+
UpdateStatisticsTest::SetUp();
75+
76+
mock_catalog_ = std::make_shared<::testing::NiceMock<MockCatalog>>();
77+
78+
ON_CALL(*mock_catalog_, LoadTable(::testing::_))
79+
.WillByDefault([this](const TableIdentifier&) -> Result<std::shared_ptr<Table>> {
80+
++load_table_count_;
81+
auto refreshed_metadata = std::make_shared<TableMetadata>(*table_->metadata());
82+
auto refreshed_location = table_location_ + "/metadata/reload-" +
83+
std::to_string(load_table_count_) + ".metadata.json";
84+
return Table::Make(table_->name(), std::move(refreshed_metadata),
85+
std::move(refreshed_location), table_->io(), mock_catalog_);
86+
});
87+
88+
auto result = Table::Make(table_->name(), table_->metadata(),
89+
std::string(table_->metadata_file_location()), table_->io(),
90+
mock_catalog_);
91+
ASSERT_THAT(result, IsOk());
92+
mock_table_ = std::move(result.value());
93+
}
94+
95+
int load_table_count_ = 0;
96+
std::shared_ptr<::testing::NiceMock<MockCatalog>> mock_catalog_;
97+
std::shared_ptr<Table> mock_table_;
98+
};
99+
69100
TEST_F(UpdateStatisticsTest, EmptyUpdate) {
70101
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
71102
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
@@ -218,4 +249,36 @@ TEST_F(UpdateStatisticsTest, CommitSuccess) {
218249
EXPECT_EQ(*statistics[0], *stats_file);
219250
}
220251

252+
TEST_F(UpdateStatisticsRetryTest, StandaloneCommitRetriesAfterConflict) {
253+
ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, mock_table_->current_snapshot());
254+
auto stats_file = MakeStatisticsFile(current_snapshot->snapshot_id,
255+
"/warehouse/test_table/metadata/stats-1.puffin");
256+
257+
int update_call_count = 0;
258+
ON_CALL(*mock_catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_))
259+
.WillByDefault([this, &update_call_count, stats_file](
260+
const TableIdentifier&,
261+
const std::vector<std::unique_ptr<TableRequirement>>&,
262+
const std::vector<std::unique_ptr<TableUpdate>>&)
263+
-> Result<std::shared_ptr<Table>> {
264+
++update_call_count;
265+
if (update_call_count == 1) {
266+
return CommitFailed("conflict on first attempt");
267+
}
268+
auto committed_metadata =
269+
std::make_shared<TableMetadata>(*mock_table_->metadata());
270+
committed_metadata->statistics = {stats_file};
271+
return Table::Make(mock_table_->name(), std::move(committed_metadata),
272+
table_location_ + "/metadata/committed.metadata.json",
273+
mock_table_->io(), mock_catalog_);
274+
});
275+
276+
ICEBERG_UNWRAP_OR_FAIL(auto update, mock_table_->NewUpdateStatistics());
277+
update->SetStatistics(stats_file);
278+
279+
EXPECT_THAT(update->Commit(), IsOk());
280+
EXPECT_EQ(update_call_count, 2);
281+
EXPECT_EQ(load_table_count_, 1);
282+
}
283+
221284
} // namespace iceberg

src/iceberg/update/expire_snapshots.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate {
137137
ExpireSnapshots& CleanExpiredMetadata(bool clean);
138138

139139
Kind kind() const final { return Kind::kExpireSnapshots; }
140+
bool IsRetryable() const override { return true; }
140141

141142
/// \brief Apply the pending changes and return the results
142143
/// \return The results of changes

src/iceberg/update/pending_update.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
5959
virtual Kind kind() const = 0;
6060

6161
/// \brief Whether this update can be retried after a commit conflict.
62-
virtual bool IsRetryable() const { return true; }
62+
virtual bool IsRetryable() const = 0;
6363

6464
/// \brief Apply the pending changes and commit.
6565
///

src/iceberg/update/set_snapshot.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class ICEBERG_EXPORT SetSnapshot : public PendingUpdate {
5151
SetSnapshot& RollbackTo(int64_t snapshot_id);
5252

5353
Kind kind() const final { return Kind::kSetSnapshot; }
54+
bool IsRetryable() const override { return true; }
5455

5556
/// \brief Apply the pending changes and return the target snapshot ID.
5657
Result<int64_t> Apply();

src/iceberg/update/snapshot_update.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
5252
~SnapshotUpdate() override;
5353

5454
Kind kind() const override { return Kind::kUpdateSnapshot; }
55+
bool IsRetryable() const override { return true; }
5556

5657
/// \brief Set a callback to delete files instead of the table's default.
5758
///

src/iceberg/update/update_location.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class ICEBERG_EXPORT UpdateLocation : public PendingUpdate {
4545
UpdateLocation& SetLocation(std::string_view location);
4646

4747
Kind kind() const final { return Kind::kUpdateLocation; }
48+
bool IsRetryable() const override { return true; }
4849

4950
/// \brief Apply the pending changes and return the new location.
5051
Result<std::string> Apply();

0 commit comments

Comments
 (0)