Skip to content

Commit 8809e0f

Browse files
authored
feat: retry failed transaction commit (apache#626)
This commit implements the retry for transaction commits. It introduces a generic RetryRunner utility with exponential backoff and error-kind filtering, and integrates it into Transaction::Commit() to automatically refresh table metadata and retry on commit conflicts.
1 parent b94fa7c commit 8809e0f

23 files changed

Lines changed: 857 additions & 15 deletions

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ add_iceberg_test(util_test
119119
location_util_test.cc
120120
roaring_position_bitmap_test.cc
121121
position_delete_index_test.cc
122+
retry_util_test.cc
122123
string_util_test.cc
123124
struct_like_set_test.cc
124125
transform_util_test.cc

src/iceberg/test/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ iceberg_tests = {
9191
'formatter_test.cc',
9292
'location_util_test.cc',
9393
'position_delete_index_test.cc',
94+
'retry_util_test.cc',
9495
'roaring_position_bitmap_test.cc',
9596
'string_util_test.cc',
9697
'struct_like_set_test.cc',
Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/util/retry_util.h"
21+
22+
#include <chrono>
23+
#include <thread>
24+
25+
#include <gtest/gtest.h>
26+
27+
#include "iceberg/result.h"
28+
#include "iceberg/test/matchers.h"
29+
30+
namespace iceberg {
31+
32+
TEST(RetryRunnerTest, SuccessOnFirstAttempt) {
33+
int call_count = 0;
34+
int32_t attempts = 0;
35+
36+
auto result = RetryRunner(RetryConfig{.num_retries = 3,
37+
.min_wait_ms = 1,
38+
.max_wait_ms = 10,
39+
.total_timeout_ms = 5000})
40+
.Run(
41+
[&]() -> Result<int> {
42+
++call_count;
43+
return 42;
44+
},
45+
&attempts);
46+
47+
EXPECT_THAT(result, IsOk());
48+
EXPECT_EQ(*result, 42);
49+
EXPECT_EQ(call_count, 1);
50+
EXPECT_EQ(attempts, 1);
51+
}
52+
53+
TEST(RetryRunnerTest, RetryOnceThenSucceed) {
54+
int call_count = 0;
55+
int32_t attempts = 0;
56+
57+
auto result = RetryRunner(RetryConfig{.num_retries = 3,
58+
.min_wait_ms = 1,
59+
.max_wait_ms = 10,
60+
.total_timeout_ms = 5000})
61+
.Run(
62+
[&]() -> Result<int> {
63+
++call_count;
64+
if (call_count == 1) {
65+
return CommitFailed("transient failure");
66+
}
67+
return 42;
68+
},
69+
&attempts);
70+
71+
EXPECT_THAT(result, IsOk());
72+
EXPECT_EQ(*result, 42);
73+
EXPECT_EQ(call_count, 2);
74+
EXPECT_EQ(attempts, 2);
75+
}
76+
77+
TEST(RetryRunnerTest, MaxAttemptsExhausted) {
78+
int call_count = 0;
79+
int32_t attempts = 0;
80+
81+
auto result = RetryRunner(RetryConfig{.num_retries = 2,
82+
.min_wait_ms = 1,
83+
.max_wait_ms = 10,
84+
.total_timeout_ms = 5000})
85+
.Run(
86+
[&]() -> Result<int> {
87+
++call_count;
88+
return CommitFailed("always fails");
89+
},
90+
&attempts);
91+
92+
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
93+
EXPECT_EQ(call_count, 3);
94+
EXPECT_EQ(attempts, 3);
95+
}
96+
97+
TEST(RetryRunnerTest, OnlyRetryOnFilter) {
98+
int call_count = 0;
99+
int32_t attempts = 0;
100+
101+
auto result = RetryRunner(RetryConfig{.num_retries = 3,
102+
.min_wait_ms = 1,
103+
.max_wait_ms = 10,
104+
.total_timeout_ms = 5000})
105+
.OnlyRetryOn(ErrorKind::kCommitFailed)
106+
.Run(
107+
[&]() -> Result<int> {
108+
++call_count;
109+
return ValidationFailed("schema conflict");
110+
},
111+
&attempts);
112+
113+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
114+
EXPECT_EQ(call_count, 1);
115+
EXPECT_EQ(attempts, 1);
116+
}
117+
118+
TEST(RetryRunnerTest, OnlyRetryOnMatchingError) {
119+
int call_count = 0;
120+
int32_t attempts = 0;
121+
122+
auto result = RetryRunner(RetryConfig{.num_retries = 2,
123+
.min_wait_ms = 1,
124+
.max_wait_ms = 10,
125+
.total_timeout_ms = 5000})
126+
.OnlyRetryOn(ErrorKind::kCommitFailed)
127+
.Run(
128+
[&]() -> Result<int> {
129+
++call_count;
130+
if (call_count <= 2) {
131+
return CommitFailed("transient");
132+
}
133+
return 100;
134+
},
135+
&attempts);
136+
137+
EXPECT_THAT(result, IsOk());
138+
EXPECT_EQ(*result, 100);
139+
EXPECT_EQ(call_count, 3);
140+
EXPECT_EQ(attempts, 3);
141+
}
142+
143+
TEST(RetryRunnerTest, StopRetryOnMatchingError) {
144+
int call_count = 0;
145+
int32_t attempts = 0;
146+
147+
auto result = RetryRunner(RetryConfig{.num_retries = 5,
148+
.min_wait_ms = 1,
149+
.max_wait_ms = 10,
150+
.total_timeout_ms = 5000})
151+
.StopRetryOn({ErrorKind::kCommitStateUnknown})
152+
.Run(
153+
[&]() -> Result<int> {
154+
++call_count;
155+
return CommitStateUnknown("datacenter on fire");
156+
},
157+
&attempts);
158+
159+
EXPECT_THAT(result, IsError(ErrorKind::kCommitStateUnknown));
160+
EXPECT_EQ(call_count, 1);
161+
EXPECT_EQ(attempts, 1);
162+
}
163+
164+
TEST(RetryRunnerTest, ZeroRetries) {
165+
int call_count = 0;
166+
int32_t attempts = 0;
167+
168+
auto result = RetryRunner(RetryConfig{.num_retries = 0,
169+
.min_wait_ms = 1,
170+
.max_wait_ms = 10,
171+
.total_timeout_ms = 5000})
172+
.Run(
173+
[&]() -> Result<int> {
174+
++call_count;
175+
return CommitFailed("fail");
176+
},
177+
&attempts);
178+
179+
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
180+
EXPECT_EQ(call_count, 1);
181+
EXPECT_EQ(attempts, 1);
182+
}
183+
184+
TEST(RetryRunnerTest, TotalTimeoutStopsBeforeStartingAnotherAttempt) {
185+
int call_count = 0;
186+
int32_t attempts = 0;
187+
188+
auto result = RetryRunner(RetryConfig{.num_retries = 3,
189+
.min_wait_ms = 20,
190+
.max_wait_ms = 20,
191+
.total_timeout_ms = 15})
192+
.Run(
193+
[&]() -> Result<int> {
194+
++call_count;
195+
// The first failure consumes most of the 15 ms budget, so the
196+
// next 20 ms backoff should prevent another attempt from
197+
// starting.
198+
if (call_count == 1) {
199+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
200+
}
201+
return CommitFailed("retry budget exhausted");
202+
},
203+
&attempts);
204+
205+
EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed));
206+
EXPECT_EQ(call_count, 1);
207+
EXPECT_EQ(attempts, 1);
208+
}
209+
210+
TEST(RetryRunnerTest, MakeCommitRetryRunnerConfig) {
211+
int call_count = 0;
212+
int32_t attempts = 0;
213+
214+
auto result = MakeCommitRetryRunner(2, 1, 10, 5000)
215+
.Run(
216+
[&]() -> Result<int> {
217+
++call_count;
218+
return ValidationFailed("not retryable");
219+
},
220+
&attempts);
221+
222+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
223+
EXPECT_EQ(call_count, 1);
224+
EXPECT_EQ(attempts, 1);
225+
}
226+
227+
TEST(RetryRunnerTest, MakeCommitRetryRunnerRetriesCommitFailed) {
228+
int call_count = 0;
229+
int32_t attempts = 0;
230+
231+
auto result = MakeCommitRetryRunner(3, 1, 10, 5000)
232+
.Run(
233+
[&]() -> Result<int> {
234+
++call_count;
235+
if (call_count <= 2) {
236+
return CommitFailed("transient");
237+
}
238+
return 99;
239+
},
240+
&attempts);
241+
242+
EXPECT_THAT(result, IsOk());
243+
EXPECT_EQ(*result, 99);
244+
EXPECT_EQ(call_count, 3);
245+
EXPECT_EQ(attempts, 3);
246+
}
247+
248+
TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) {
249+
int call_count = 0;
250+
int32_t attempts = 0;
251+
252+
auto result =
253+
RetryRunner(RetryConfig{.num_retries = 5,
254+
.min_wait_ms = 1,
255+
.max_wait_ms = 10,
256+
.total_timeout_ms = 5000})
257+
.OnlyRetryOn({ErrorKind::kCommitFailed, ErrorKind::kServiceUnavailable})
258+
.Run(
259+
[&]() -> Result<int> {
260+
++call_count;
261+
if (call_count == 1) {
262+
return CommitFailed("conflict");
263+
}
264+
if (call_count == 2) {
265+
return ServiceUnavailable("server busy");
266+
}
267+
return 77;
268+
},
269+
&attempts);
270+
271+
EXPECT_THAT(result, IsOk());
272+
EXPECT_EQ(*result, 77);
273+
EXPECT_EQ(call_count, 3);
274+
EXPECT_EQ(attempts, 3);
275+
}
276+
277+
TEST(RetryRunnerTest, DefaultRetryAllErrors) {
278+
int call_count = 0;
279+
int32_t attempts = 0;
280+
281+
auto result = RetryRunner(RetryConfig{.num_retries = 3,
282+
.min_wait_ms = 1,
283+
.max_wait_ms = 10,
284+
.total_timeout_ms = 5000})
285+
.Run(
286+
[&]() -> Result<int> {
287+
++call_count;
288+
if (call_count == 1) {
289+
return IOError("disk full");
290+
}
291+
if (call_count == 2) {
292+
return ValidationFailed("bad schema");
293+
}
294+
return 55;
295+
},
296+
&attempts);
297+
298+
EXPECT_THAT(result, IsOk());
299+
EXPECT_EQ(*result, 55);
300+
EXPECT_EQ(call_count, 3);
301+
EXPECT_EQ(attempts, 3);
302+
}
303+
304+
} // namespace iceberg

src/iceberg/test/table_test.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ TYPED_TEST(TypedTableTest, Refresh) {
128128
.WillOnce(::testing::Return(refreshed));
129129
}
130130
EXPECT_THAT(table->Refresh(), IsOk());
131+
if constexpr (std::is_same_v<TypeParam, Table>) {
132+
EXPECT_EQ(table->metadata_file_location(), "s3://bucket/meta2.json");
133+
}
131134
} else {
132135
EXPECT_THAT(table->Refresh(), IsError(ErrorKind::kNotSupported));
133136
}

0 commit comments

Comments
 (0)