Skip to content

Commit ff6c292

Browse files
committed
feat: retry failed transaction commit
1 parent f955a55 commit ff6c292

File tree

9 files changed

+581
-16
lines changed

9 files changed

+581
-16
lines changed

src/iceberg/table.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ Status Table::Refresh() {
8282
ICEBERG_ASSIGN_OR_RAISE(auto refreshed_table, catalog_->LoadTable(identifier_));
8383
if (metadata_location_ != refreshed_table->metadata_file_location()) {
8484
metadata_ = std::move(refreshed_table->metadata_);
85+
metadata_location_ = std::string(refreshed_table->metadata_file_location());
8586
io_ = std::move(refreshed_table->io_);
8687
metadata_cache_ = std::make_unique<TableMetadataCache>(metadata_.get());
8788
}

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ add_iceberg_test(util_test
115115
endian_test.cc
116116
formatter_test.cc
117117
location_util_test.cc
118+
retry_util_test.cc
118119
string_util_test.cc
119120
transform_util_test.cc
120121
truncate_util_test.cc
Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/util/retry_util.h"
21+
22+
#include <gtest/gtest.h>
23+
24+
#include "iceberg/result.h"
25+
#include "iceberg/test/matchers.h"
26+
27+
namespace iceberg {
28+
29+
// --------------------------------------------------------------------------
30+
// Test: Successful on first attempt — no retries
31+
// --------------------------------------------------------------------------
32+
TEST(RetryRunnerTest, SuccessOnFirstAttempt) {
33+
int call_count = 0;
34+
int32_t attempts = 0;
35+
36+
auto result = RetryRunner()
37+
.WithRetries(3)
38+
.WithExponentialBackoff(1, 10, 5000, 2.0)
39+
.Run(
40+
[&]() -> Result<int> {
41+
++call_count;
42+
return 42;
43+
},
44+
&attempts);
45+
46+
EXPECT_THAT(result, IsOk());
47+
EXPECT_EQ(*result, 42);
48+
EXPECT_EQ(call_count, 1);
49+
EXPECT_EQ(attempts, 1);
50+
}
51+
52+
// --------------------------------------------------------------------------
53+
// Test: Retry once then succeed
54+
// --------------------------------------------------------------------------
55+
TEST(RetryRunnerTest, RetryOnceThenSucceed) {
56+
int call_count = 0;
57+
int32_t attempts = 0;
58+
59+
auto result = RetryRunner()
60+
.WithRetries(3)
61+
.WithExponentialBackoff(1, 10, 5000, 2.0)
62+
.Run(
63+
[&]() -> Result<int> {
64+
++call_count;
65+
if (call_count == 1) {
66+
return CommitFailed("transient failure");
67+
}
68+
return 42;
69+
},
70+
&attempts);
71+
72+
EXPECT_THAT(result, IsOk());
73+
EXPECT_EQ(*result, 42);
74+
EXPECT_EQ(call_count, 2);
75+
EXPECT_EQ(attempts, 2);
76+
}
77+
78+
// --------------------------------------------------------------------------
79+
// Test: Max attempts exhausted
80+
// --------------------------------------------------------------------------
81+
TEST(RetryRunnerTest, MaxAttemptsExhausted) {
82+
int call_count = 0;
83+
int32_t attempts = 0;
84+
85+
auto result = RetryRunner()
86+
.WithRetries(2)
87+
.WithExponentialBackoff(1, 10, 5000, 2.0)
88+
.Run(
89+
[&]() -> Result<int> {
90+
++call_count;
91+
return CommitFailed("always fails");
92+
},
93+
&attempts);
94+
95+
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
96+
EXPECT_EQ(call_count, 3); // 1 initial + 2 retries
97+
EXPECT_EQ(attempts, 3);
98+
}
99+
100+
// --------------------------------------------------------------------------
101+
// Test: OnlyRetryOn filters correctly
102+
// --------------------------------------------------------------------------
103+
TEST(RetryRunnerTest, OnlyRetryOnFilter) {
104+
int call_count = 0;
105+
int32_t attempts = 0;
106+
107+
auto result = RetryRunner()
108+
.WithRetries(3)
109+
.WithExponentialBackoff(1, 10, 5000, 2.0)
110+
.OnlyRetryOn(ErrorKind::kCommitFailed)
111+
.Run(
112+
[&]() -> Result<int> {
113+
++call_count;
114+
// Return a non-retryable error
115+
return ValidationFailed("schema conflict");
116+
},
117+
&attempts);
118+
119+
// Should NOT retry because ValidationFailed is not in the retry list
120+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
121+
EXPECT_EQ(call_count, 1);
122+
EXPECT_EQ(attempts, 1);
123+
}
124+
125+
// --------------------------------------------------------------------------
126+
// Test: OnlyRetryOn retries matching error
127+
// --------------------------------------------------------------------------
128+
TEST(RetryRunnerTest, OnlyRetryOnMatchingError) {
129+
int call_count = 0;
130+
int32_t attempts = 0;
131+
132+
auto result = RetryRunner()
133+
.WithRetries(2)
134+
.WithExponentialBackoff(1, 10, 5000, 2.0)
135+
.OnlyRetryOn(ErrorKind::kCommitFailed)
136+
.Run(
137+
[&]() -> Result<int> {
138+
++call_count;
139+
if (call_count <= 2) {
140+
return CommitFailed("transient");
141+
}
142+
return 100;
143+
},
144+
&attempts);
145+
146+
EXPECT_THAT(result, IsOk());
147+
EXPECT_EQ(*result, 100);
148+
EXPECT_EQ(call_count, 3); // 2 failures + 1 success
149+
EXPECT_EQ(attempts, 3);
150+
}
151+
152+
// --------------------------------------------------------------------------
153+
// Test: StopRetryOn stops on matching error
154+
// --------------------------------------------------------------------------
155+
TEST(RetryRunnerTest, StopRetryOnMatchingError) {
156+
int call_count = 0;
157+
int32_t attempts = 0;
158+
159+
auto result = RetryRunner()
160+
.WithRetries(5)
161+
.WithExponentialBackoff(1, 10, 5000, 2.0)
162+
.StopRetryOn({ErrorKind::kCommitStateUnknown})
163+
.Run(
164+
[&]() -> Result<int> {
165+
++call_count;
166+
return CommitStateUnknown("datacenter on fire");
167+
},
168+
&attempts);
169+
170+
EXPECT_THAT(result, IsError(ErrorKind::kCommitStateUnknown));
171+
EXPECT_EQ(call_count, 1);
172+
EXPECT_EQ(attempts, 1);
173+
}
174+
175+
// --------------------------------------------------------------------------
176+
// Test: Zero retries means only one attempt
177+
// --------------------------------------------------------------------------
178+
TEST(RetryRunnerTest, ZeroRetries) {
179+
int call_count = 0;
180+
int32_t attempts = 0;
181+
182+
auto result = RetryRunner()
183+
.WithRetries(0)
184+
.WithExponentialBackoff(1, 10, 5000, 2.0)
185+
.Run(
186+
[&]() -> Result<int> {
187+
++call_count;
188+
return CommitFailed("fail");
189+
},
190+
&attempts);
191+
192+
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
193+
EXPECT_EQ(call_count, 1);
194+
EXPECT_EQ(attempts, 1);
195+
}
196+
197+
// --------------------------------------------------------------------------
198+
// Test: MakeCommitRetryRunner has correct configuration
199+
// --------------------------------------------------------------------------
200+
TEST(RetryRunnerTest, MakeCommitRetryRunnerConfig) {
201+
int call_count = 0;
202+
int32_t attempts = 0;
203+
204+
// MakeCommitRetryRunner should only retry on kCommitFailed
205+
auto result = MakeCommitRetryRunner(2, 1, 10, 5000)
206+
.Run(
207+
[&]() -> Result<int> {
208+
++call_count;
209+
// ValidationFailed should not be retried
210+
return ValidationFailed("not retryable");
211+
},
212+
&attempts);
213+
214+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
215+
EXPECT_EQ(call_count, 1);
216+
EXPECT_EQ(attempts, 1);
217+
}
218+
219+
// --------------------------------------------------------------------------
220+
// Test: MakeCommitRetryRunner retries CommitFailed
221+
// --------------------------------------------------------------------------
222+
TEST(RetryRunnerTest, MakeCommitRetryRunnerRetriesCommitFailed) {
223+
int call_count = 0;
224+
int32_t attempts = 0;
225+
226+
auto result = MakeCommitRetryRunner(3, 1, 10, 5000)
227+
.Run(
228+
[&]() -> Result<int> {
229+
++call_count;
230+
if (call_count <= 2) {
231+
return CommitFailed("transient");
232+
}
233+
return 99;
234+
},
235+
&attempts);
236+
237+
EXPECT_THAT(result, IsOk());
238+
EXPECT_EQ(*result, 99);
239+
EXPECT_EQ(call_count, 3);
240+
EXPECT_EQ(attempts, 3);
241+
}
242+
243+
// --------------------------------------------------------------------------
244+
// Test: OnlyRetryOn with multiple error kinds
245+
// --------------------------------------------------------------------------
246+
TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) {
247+
int call_count = 0;
248+
int32_t attempts = 0;
249+
250+
auto result =
251+
RetryRunner()
252+
.WithRetries(5)
253+
.WithExponentialBackoff(1, 10, 5000, 2.0)
254+
.OnlyRetryOn({ErrorKind::kCommitFailed, ErrorKind::kServiceUnavailable})
255+
.Run(
256+
[&]() -> Result<int> {
257+
++call_count;
258+
if (call_count == 1) {
259+
return CommitFailed("conflict");
260+
}
261+
if (call_count == 2) {
262+
return ServiceUnavailable("server busy");
263+
}
264+
return 77;
265+
},
266+
&attempts);
267+
268+
EXPECT_THAT(result, IsOk());
269+
EXPECT_EQ(*result, 77);
270+
EXPECT_EQ(call_count, 3);
271+
EXPECT_EQ(attempts, 3);
272+
}
273+
274+
// --------------------------------------------------------------------------
275+
// Test: Default retry (no filter) retries all errors
276+
// --------------------------------------------------------------------------
277+
TEST(RetryRunnerTest, DefaultRetryAllErrors) {
278+
int call_count = 0;
279+
int32_t attempts = 0;
280+
281+
auto result = RetryRunner()
282+
.WithRetries(3)
283+
.WithExponentialBackoff(1, 10, 5000, 2.0)
284+
.Run(
285+
[&]() -> Result<int> {
286+
++call_count;
287+
if (call_count == 1) {
288+
return IOError("disk full");
289+
}
290+
if (call_count == 2) {
291+
return ValidationFailed("bad schema");
292+
}
293+
return 55;
294+
},
295+
&attempts);
296+
297+
EXPECT_THAT(result, IsOk());
298+
EXPECT_EQ(*result, 55);
299+
EXPECT_EQ(call_count, 3);
300+
EXPECT_EQ(attempts, 3);
301+
}
302+
303+
} // namespace iceberg

0 commit comments

Comments
 (0)