Skip to content

Commit a8110b1

Browse files
committed
feat: retry failed transaction commit
1 parent f955a55 commit a8110b1

9 files changed

Lines changed: 584 additions & 15 deletions

File tree

src/iceberg/table.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ Status Table::Refresh() {
8282
ICEBERG_ASSIGN_OR_RAISE(auto refreshed_table, catalog_->LoadTable(identifier_));
8383
if (metadata_location_ != refreshed_table->metadata_file_location()) {
8484
metadata_ = std::move(refreshed_table->metadata_);
85+
metadata_location_ = std::string(refreshed_table->metadata_file_location());
8586
io_ = std::move(refreshed_table->io_);
8687
metadata_cache_ = std::make_unique<TableMetadataCache>(metadata_.get());
8788
}

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ add_iceberg_test(util_test
115115
endian_test.cc
116116
formatter_test.cc
117117
location_util_test.cc
118+
retry_util_test.cc
118119
string_util_test.cc
119120
transform_util_test.cc
120121
truncate_util_test.cc
Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/util/retry_util.h"
21+
22+
#include <gtest/gtest.h>
23+
24+
#include "iceberg/result.h"
25+
#include "iceberg/test/matchers.h"
26+
27+
namespace iceberg {
28+
29+
// --------------------------------------------------------------------------
30+
// Test: Successful on first attempt — no retries
31+
// --------------------------------------------------------------------------
32+
TEST(RetryRunnerTest, SuccessOnFirstAttempt) {
33+
int call_count = 0;
34+
int32_t attempts = 0;
35+
36+
auto result = RetryRunner()
37+
.WithRetries(3)
38+
.WithExponentialBackoff(1, 10, 5000, 2.0)
39+
.Run(
40+
[&]() -> Result<int> {
41+
++call_count;
42+
return 42;
43+
},
44+
&attempts);
45+
46+
EXPECT_THAT(result, IsOk());
47+
EXPECT_EQ(*result, 42);
48+
EXPECT_EQ(call_count, 1);
49+
EXPECT_EQ(attempts, 1);
50+
}
51+
52+
// --------------------------------------------------------------------------
53+
// Test: Retry once then succeed
54+
// --------------------------------------------------------------------------
55+
TEST(RetryRunnerTest, RetryOnceThenSucceed) {
56+
int call_count = 0;
57+
int32_t attempts = 0;
58+
59+
auto result = RetryRunner()
60+
.WithRetries(3)
61+
.WithExponentialBackoff(1, 10, 5000, 2.0)
62+
.Run(
63+
[&]() -> Result<int> {
64+
++call_count;
65+
if (call_count == 1) {
66+
return CommitFailed("transient failure");
67+
}
68+
return 42;
69+
},
70+
&attempts);
71+
72+
EXPECT_THAT(result, IsOk());
73+
EXPECT_EQ(*result, 42);
74+
EXPECT_EQ(call_count, 2);
75+
EXPECT_EQ(attempts, 2);
76+
}
77+
78+
// --------------------------------------------------------------------------
79+
// Test: Max attempts exhausted
80+
// --------------------------------------------------------------------------
81+
TEST(RetryRunnerTest, MaxAttemptsExhausted) {
82+
int call_count = 0;
83+
int32_t attempts = 0;
84+
85+
auto result = RetryRunner()
86+
.WithRetries(2)
87+
.WithExponentialBackoff(1, 10, 5000, 2.0)
88+
.Run(
89+
[&]() -> Result<int> {
90+
++call_count;
91+
return CommitFailed("always fails");
92+
},
93+
&attempts);
94+
95+
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
96+
EXPECT_EQ(call_count, 3); // 1 initial + 2 retries
97+
EXPECT_EQ(attempts, 3);
98+
}
99+
100+
// --------------------------------------------------------------------------
101+
// Test: OnlyRetryOn filters correctly
102+
// --------------------------------------------------------------------------
103+
TEST(RetryRunnerTest, OnlyRetryOnFilter) {
104+
int call_count = 0;
105+
int32_t attempts = 0;
106+
107+
auto result = RetryRunner()
108+
.WithRetries(3)
109+
.WithExponentialBackoff(1, 10, 5000, 2.0)
110+
.OnlyRetryOn(ErrorKind::kCommitFailed)
111+
.Run(
112+
[&]() -> Result<int> {
113+
++call_count;
114+
// Return a non-retryable error
115+
return ValidationFailed("schema conflict");
116+
},
117+
&attempts);
118+
119+
// Should NOT retry because ValidationFailed is not in the retry list
120+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
121+
EXPECT_EQ(call_count, 1);
122+
EXPECT_EQ(attempts, 1);
123+
}
124+
125+
// --------------------------------------------------------------------------
126+
// Test: OnlyRetryOn retries matching error
127+
// --------------------------------------------------------------------------
128+
TEST(RetryRunnerTest, OnlyRetryOnMatchingError) {
129+
int call_count = 0;
130+
int32_t attempts = 0;
131+
132+
auto result = RetryRunner()
133+
.WithRetries(2)
134+
.WithExponentialBackoff(1, 10, 5000, 2.0)
135+
.OnlyRetryOn(ErrorKind::kCommitFailed)
136+
.Run(
137+
[&]() -> Result<int> {
138+
++call_count;
139+
if (call_count <= 2) {
140+
return CommitFailed("transient");
141+
}
142+
return 100;
143+
},
144+
&attempts);
145+
146+
EXPECT_THAT(result, IsOk());
147+
EXPECT_EQ(*result, 100);
148+
EXPECT_EQ(call_count, 3); // 2 failures + 1 success
149+
EXPECT_EQ(attempts, 3);
150+
}
151+
152+
// --------------------------------------------------------------------------
153+
// Test: StopRetryOn stops on matching error
154+
// --------------------------------------------------------------------------
155+
TEST(RetryRunnerTest, StopRetryOnMatchingError) {
156+
int call_count = 0;
157+
int32_t attempts = 0;
158+
159+
auto result = RetryRunner()
160+
.WithRetries(5)
161+
.WithExponentialBackoff(1, 10, 5000, 2.0)
162+
.StopRetryOn({ErrorKind::kCommitStateUnknown})
163+
.Run(
164+
[&]() -> Result<int> {
165+
++call_count;
166+
return CommitStateUnknown("datacenter on fire");
167+
},
168+
&attempts);
169+
170+
EXPECT_THAT(result, IsError(ErrorKind::kCommitStateUnknown));
171+
EXPECT_EQ(call_count, 1);
172+
EXPECT_EQ(attempts, 1);
173+
}
174+
175+
// --------------------------------------------------------------------------
176+
// Test: Zero retries means only one attempt
177+
// --------------------------------------------------------------------------
178+
TEST(RetryRunnerTest, ZeroRetries) {
179+
int call_count = 0;
180+
int32_t attempts = 0;
181+
182+
auto result = RetryRunner()
183+
.WithRetries(0)
184+
.WithExponentialBackoff(1, 10, 5000, 2.0)
185+
.Run(
186+
[&]() -> Result<int> {
187+
++call_count;
188+
return CommitFailed("fail");
189+
},
190+
&attempts);
191+
192+
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
193+
EXPECT_EQ(call_count, 1);
194+
EXPECT_EQ(attempts, 1);
195+
}
196+
197+
198+
// --------------------------------------------------------------------------
199+
// Test: MakeCommitRetryRunner has correct configuration
200+
// --------------------------------------------------------------------------
201+
TEST(RetryRunnerTest, MakeCommitRetryRunnerConfig) {
202+
int call_count = 0;
203+
int32_t attempts = 0;
204+
205+
// MakeCommitRetryRunner should only retry on kCommitFailed
206+
auto result = MakeCommitRetryRunner(2, 1, 10, 5000)
207+
.Run(
208+
[&]() -> Result<int> {
209+
++call_count;
210+
// ValidationFailed should not be retried
211+
return ValidationFailed("not retryable");
212+
},
213+
&attempts);
214+
215+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
216+
EXPECT_EQ(call_count, 1);
217+
EXPECT_EQ(attempts, 1);
218+
}
219+
220+
// --------------------------------------------------------------------------
221+
// Test: MakeCommitRetryRunner retries CommitFailed
222+
// --------------------------------------------------------------------------
223+
TEST(RetryRunnerTest, MakeCommitRetryRunnerRetriesCommitFailed) {
224+
int call_count = 0;
225+
int32_t attempts = 0;
226+
227+
auto result = MakeCommitRetryRunner(3, 1, 10, 5000)
228+
.Run(
229+
[&]() -> Result<int> {
230+
++call_count;
231+
if (call_count <= 2) {
232+
return CommitFailed("transient");
233+
}
234+
return 99;
235+
},
236+
&attempts);
237+
238+
EXPECT_THAT(result, IsOk());
239+
EXPECT_EQ(*result, 99);
240+
EXPECT_EQ(call_count, 3);
241+
EXPECT_EQ(attempts, 3);
242+
}
243+
244+
// --------------------------------------------------------------------------
245+
// Test: OnlyRetryOn with multiple error kinds
246+
// --------------------------------------------------------------------------
247+
TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) {
248+
int call_count = 0;
249+
int32_t attempts = 0;
250+
251+
auto result =
252+
RetryRunner()
253+
.WithRetries(5)
254+
.WithExponentialBackoff(1, 10, 5000, 2.0)
255+
.OnlyRetryOn({ErrorKind::kCommitFailed, ErrorKind::kServiceUnavailable})
256+
.Run(
257+
[&]() -> Result<int> {
258+
++call_count;
259+
if (call_count == 1) {
260+
return CommitFailed("conflict");
261+
}
262+
if (call_count == 2) {
263+
return ServiceUnavailable("server busy");
264+
}
265+
return 77;
266+
},
267+
&attempts);
268+
269+
EXPECT_THAT(result, IsOk());
270+
EXPECT_EQ(*result, 77);
271+
EXPECT_EQ(call_count, 3);
272+
EXPECT_EQ(attempts, 3);
273+
}
274+
275+
// --------------------------------------------------------------------------
276+
// Test: Default retry (no filter) retries all errors
277+
// --------------------------------------------------------------------------
278+
TEST(RetryRunnerTest, DefaultRetryAllErrors) {
279+
int call_count = 0;
280+
int32_t attempts = 0;
281+
282+
auto result = RetryRunner()
283+
.WithRetries(3)
284+
.WithExponentialBackoff(1, 10, 5000, 2.0)
285+
.Run(
286+
[&]() -> Result<int> {
287+
++call_count;
288+
if (call_count == 1) {
289+
return IOError("disk full");
290+
}
291+
if (call_count == 2) {
292+
return ValidationFailed("bad schema");
293+
}
294+
return 55;
295+
},
296+
&attempts);
297+
298+
EXPECT_THAT(result, IsOk());
299+
EXPECT_EQ(*result, 55);
300+
EXPECT_EQ(call_count, 3);
301+
EXPECT_EQ(attempts, 3);
302+
}
303+
304+
} // namespace iceberg

0 commit comments

Comments
 (0)