Skip to content

Commit 38e3561

Browse files
committed
feat: retry failed transaction commit
1 parent 133742d commit 38e3561

File tree

9 files changed

+563
-16
lines changed

9 files changed

+563
-16
lines changed

src/iceberg/table.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ Status Table::Refresh() {
8787
ICEBERG_ASSIGN_OR_RAISE(auto refreshed_table, catalog_->LoadTable(identifier_));
8888
if (metadata_location_ != refreshed_table->metadata_file_location()) {
8989
metadata_ = std::move(refreshed_table->metadata_);
90+
metadata_location_ = std::string(refreshed_table->metadata_file_location());
9091
io_ = std::move(refreshed_table->io_);
9192
metadata_cache_ = std::make_unique<TableMetadataCache>(metadata_.get());
9293
}

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ add_iceberg_test(util_test
116116
formatter_test.cc
117117
location_util_test.cc
118118
roaring_position_bitmap_test.cc
119+
retry_util_test.cc
119120
string_util_test.cc
120121
transform_util_test.cc
121122
truncate_util_test.cc
Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/util/retry_util.h"
21+
22+
#include <gtest/gtest.h>
23+
24+
#include "iceberg/result.h"
25+
#include "iceberg/test/matchers.h"
26+
27+
namespace iceberg {
28+
29+
// --------------------------------------------------------------------------
30+
// Test: Successful on first attempt — no retries
31+
// --------------------------------------------------------------------------
32+
TEST(RetryRunnerTest, SuccessOnFirstAttempt) {
33+
int call_count = 0;
34+
int32_t attempts = 0;
35+
36+
auto result = RetryRunner(RetryConfig{.num_retries = 3,
37+
.min_wait_ms = 1,
38+
.max_wait_ms = 10,
39+
.total_timeout_ms = 5000})
40+
.Run(
41+
[&]() -> Result<int> {
42+
++call_count;
43+
return 42;
44+
},
45+
&attempts);
46+
47+
EXPECT_THAT(result, IsOk());
48+
EXPECT_EQ(*result, 42);
49+
EXPECT_EQ(call_count, 1);
50+
EXPECT_EQ(attempts, 1);
51+
}
52+
53+
// --------------------------------------------------------------------------
54+
// Test: Retry once then succeed
55+
// --------------------------------------------------------------------------
56+
TEST(RetryRunnerTest, RetryOnceThenSucceed) {
57+
int call_count = 0;
58+
int32_t attempts = 0;
59+
60+
auto result = RetryRunner(RetryConfig{.num_retries = 3,
61+
.min_wait_ms = 1,
62+
.max_wait_ms = 10,
63+
.total_timeout_ms = 5000})
64+
.Run(
65+
[&]() -> Result<int> {
66+
++call_count;
67+
if (call_count == 1) {
68+
return CommitFailed("transient failure");
69+
}
70+
return 42;
71+
},
72+
&attempts);
73+
74+
EXPECT_THAT(result, IsOk());
75+
EXPECT_EQ(*result, 42);
76+
EXPECT_EQ(call_count, 2);
77+
EXPECT_EQ(attempts, 2);
78+
}
79+
80+
// --------------------------------------------------------------------------
81+
// Test: Max attempts exhausted
82+
// --------------------------------------------------------------------------
83+
TEST(RetryRunnerTest, MaxAttemptsExhausted) {
84+
int call_count = 0;
85+
int32_t attempts = 0;
86+
87+
auto result = RetryRunner(RetryConfig{.num_retries = 2,
88+
.min_wait_ms = 1,
89+
.max_wait_ms = 10,
90+
.total_timeout_ms = 5000})
91+
.Run(
92+
[&]() -> Result<int> {
93+
++call_count;
94+
return CommitFailed("always fails");
95+
},
96+
&attempts);
97+
98+
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
99+
EXPECT_EQ(call_count, 3); // 1 initial + 2 retries
100+
EXPECT_EQ(attempts, 3);
101+
}
102+
103+
// --------------------------------------------------------------------------
104+
// Test: OnlyRetryOn filters correctly
105+
// --------------------------------------------------------------------------
106+
TEST(RetryRunnerTest, OnlyRetryOnFilter) {
107+
int call_count = 0;
108+
int32_t attempts = 0;
109+
110+
auto result = RetryRunner(RetryConfig{.num_retries = 3,
111+
.min_wait_ms = 1,
112+
.max_wait_ms = 10,
113+
.total_timeout_ms = 5000})
114+
.OnlyRetryOn(ErrorKind::kCommitFailed)
115+
.Run(
116+
[&]() -> Result<int> {
117+
++call_count;
118+
// Return a non-retryable error
119+
return ValidationFailed("schema conflict");
120+
},
121+
&attempts);
122+
123+
// Should NOT retry because ValidationFailed is not in the retry list
124+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
125+
EXPECT_EQ(call_count, 1);
126+
EXPECT_EQ(attempts, 1);
127+
}
128+
129+
// --------------------------------------------------------------------------
130+
// Test: OnlyRetryOn retries matching error
131+
// --------------------------------------------------------------------------
132+
TEST(RetryRunnerTest, OnlyRetryOnMatchingError) {
133+
int call_count = 0;
134+
int32_t attempts = 0;
135+
136+
auto result = RetryRunner(RetryConfig{.num_retries = 2,
137+
.min_wait_ms = 1,
138+
.max_wait_ms = 10,
139+
.total_timeout_ms = 5000})
140+
.OnlyRetryOn(ErrorKind::kCommitFailed)
141+
.Run(
142+
[&]() -> Result<int> {
143+
++call_count;
144+
if (call_count <= 2) {
145+
return CommitFailed("transient");
146+
}
147+
return 100;
148+
},
149+
&attempts);
150+
151+
EXPECT_THAT(result, IsOk());
152+
EXPECT_EQ(*result, 100);
153+
EXPECT_EQ(call_count, 3); // 2 failures + 1 success
154+
EXPECT_EQ(attempts, 3);
155+
}
156+
157+
// --------------------------------------------------------------------------
158+
// Test: StopRetryOn stops on matching error
159+
// --------------------------------------------------------------------------
160+
TEST(RetryRunnerTest, StopRetryOnMatchingError) {
161+
int call_count = 0;
162+
int32_t attempts = 0;
163+
164+
auto result = RetryRunner(RetryConfig{.num_retries = 5,
165+
.min_wait_ms = 1,
166+
.max_wait_ms = 10,
167+
.total_timeout_ms = 5000})
168+
.StopRetryOn({ErrorKind::kCommitStateUnknown})
169+
.Run(
170+
[&]() -> Result<int> {
171+
++call_count;
172+
return CommitStateUnknown("datacenter on fire");
173+
},
174+
&attempts);
175+
176+
EXPECT_THAT(result, IsError(ErrorKind::kCommitStateUnknown));
177+
EXPECT_EQ(call_count, 1);
178+
EXPECT_EQ(attempts, 1);
179+
}
180+
181+
// --------------------------------------------------------------------------
182+
// Test: Zero retries means only one attempt
183+
// --------------------------------------------------------------------------
184+
TEST(RetryRunnerTest, ZeroRetries) {
185+
int call_count = 0;
186+
int32_t attempts = 0;
187+
188+
auto result = RetryRunner(RetryConfig{.num_retries = 0,
189+
.min_wait_ms = 1,
190+
.max_wait_ms = 10,
191+
.total_timeout_ms = 5000})
192+
.Run(
193+
[&]() -> Result<int> {
194+
++call_count;
195+
return CommitFailed("fail");
196+
},
197+
&attempts);
198+
199+
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
200+
EXPECT_EQ(call_count, 1);
201+
EXPECT_EQ(attempts, 1);
202+
}
203+
204+
// --------------------------------------------------------------------------
205+
// Test: MakeCommitRetryRunner has correct configuration
206+
// --------------------------------------------------------------------------
207+
TEST(RetryRunnerTest, MakeCommitRetryRunnerConfig) {
208+
int call_count = 0;
209+
int32_t attempts = 0;
210+
211+
// MakeCommitRetryRunner should only retry on kCommitFailed
212+
auto result = MakeCommitRetryRunner(2, 1, 10, 5000)
213+
.Run(
214+
[&]() -> Result<int> {
215+
++call_count;
216+
// ValidationFailed should not be retried
217+
return ValidationFailed("not retryable");
218+
},
219+
&attempts);
220+
221+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
222+
EXPECT_EQ(call_count, 1);
223+
EXPECT_EQ(attempts, 1);
224+
}
225+
226+
// --------------------------------------------------------------------------
227+
// Test: MakeCommitRetryRunner retries CommitFailed
228+
// --------------------------------------------------------------------------
229+
TEST(RetryRunnerTest, MakeCommitRetryRunnerRetriesCommitFailed) {
230+
int call_count = 0;
231+
int32_t attempts = 0;
232+
233+
auto result = MakeCommitRetryRunner(3, 1, 10, 5000)
234+
.Run(
235+
[&]() -> Result<int> {
236+
++call_count;
237+
if (call_count <= 2) {
238+
return CommitFailed("transient");
239+
}
240+
return 99;
241+
},
242+
&attempts);
243+
244+
EXPECT_THAT(result, IsOk());
245+
EXPECT_EQ(*result, 99);
246+
EXPECT_EQ(call_count, 3);
247+
EXPECT_EQ(attempts, 3);
248+
}
249+
250+
// --------------------------------------------------------------------------
251+
// Test: OnlyRetryOn with multiple error kinds
252+
// --------------------------------------------------------------------------
253+
TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) {
254+
int call_count = 0;
255+
int32_t attempts = 0;
256+
257+
auto result =
258+
RetryRunner(RetryConfig{.num_retries = 5,
259+
.min_wait_ms = 1,
260+
.max_wait_ms = 10,
261+
.total_timeout_ms = 5000})
262+
.OnlyRetryOn({ErrorKind::kCommitFailed, ErrorKind::kServiceUnavailable})
263+
.Run(
264+
[&]() -> Result<int> {
265+
++call_count;
266+
if (call_count == 1) {
267+
return CommitFailed("conflict");
268+
}
269+
if (call_count == 2) {
270+
return ServiceUnavailable("server busy");
271+
}
272+
return 77;
273+
},
274+
&attempts);
275+
276+
EXPECT_THAT(result, IsOk());
277+
EXPECT_EQ(*result, 77);
278+
EXPECT_EQ(call_count, 3);
279+
EXPECT_EQ(attempts, 3);
280+
}
281+
282+
// --------------------------------------------------------------------------
283+
// Test: Default retry (no filter) retries all errors
284+
// --------------------------------------------------------------------------
285+
TEST(RetryRunnerTest, DefaultRetryAllErrors) {
286+
int call_count = 0;
287+
int32_t attempts = 0;
288+
289+
auto result = RetryRunner(RetryConfig{.num_retries = 3,
290+
.min_wait_ms = 1,
291+
.max_wait_ms = 10,
292+
.total_timeout_ms = 5000})
293+
.Run(
294+
[&]() -> Result<int> {
295+
++call_count;
296+
if (call_count == 1) {
297+
return IOError("disk full");
298+
}
299+
if (call_count == 2) {
300+
return ValidationFailed("bad schema");
301+
}
302+
return 55;
303+
},
304+
&attempts);
305+
306+
EXPECT_THAT(result, IsOk());
307+
EXPECT_EQ(*result, 55);
308+
EXPECT_EQ(call_count, 3);
309+
EXPECT_EQ(attempts, 3);
310+
}
311+
312+
} // namespace iceberg

0 commit comments

Comments
 (0)