Skip to content

Commit b62411b

Browse files
authored
impl(v3): add attempt predicate to AsyncRetryLoop (#15915)
1 parent 9429a07 commit b62411b

2 files changed

Lines changed: 66 additions & 24 deletions

File tree

google/cloud/internal/async_retry_loop.h

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -170,16 +170,23 @@ struct FutureValueType<future<T>> {
170170
* functions. If the value is visible, the retry loop will stop on the next
171171
* callback and/or before the next request or timer is issued.
172172
*/
173-
template <typename Functor, typename Request, typename RetryPolicyType>
173+
template <
174+
typename Functor, typename Request, typename RetryPolicyType,
175+
typename ReturnType = google::cloud::internal::invoke_result_t<
176+
Functor, google::cloud::CompletionQueue&,
177+
std::shared_ptr<grpc::ClientContext>, ImmutableOptions, Request const&>>
174178
class AsyncRetryLoopImpl
175179
: public std::enable_shared_from_this<
176180
AsyncRetryLoopImpl<Functor, Request, RetryPolicyType>> {
177181
public:
178-
AsyncRetryLoopImpl(std::unique_ptr<RetryPolicyType> retry_policy,
179-
std::unique_ptr<BackoffPolicy> backoff_policy,
180-
Idempotency idempotency, google::cloud::CompletionQueue cq,
181-
Functor&& functor, ImmutableOptions options,
182-
Request request, char const* location)
182+
AsyncRetryLoopImpl(
183+
std::unique_ptr<RetryPolicyType> retry_policy,
184+
std::unique_ptr<BackoffPolicy> backoff_policy, Idempotency idempotency,
185+
google::cloud::CompletionQueue cq, Functor&& functor,
186+
ImmutableOptions options, Request request, char const* location,
187+
std::function<
188+
bool(typename FutureValueType<ReturnType>::value_type const&)>
189+
attempt_predicate = {})
183190
: retry_policy_(std::move(retry_policy)),
184191
backoff_policy_(std::move(backoff_policy)),
185192
idempotency_(idempotency),
@@ -188,11 +195,9 @@ class AsyncRetryLoopImpl
188195
functor_(std::forward<Functor>(functor)),
189196
request_(std::move(request)),
190197
location_(location),
191-
call_context_(std::move(options)) {}
198+
call_context_(std::move(options)),
199+
attempt_predicate_(std::move(attempt_predicate)) {}
192200

193-
using ReturnType = ::google::cloud::internal::invoke_result_t<
194-
Functor, google::cloud::CompletionQueue&,
195-
std::shared_ptr<grpc::ClientContext>, ImmutableOptions, Request const&>;
196201
using T = typename FutureValueType<ReturnType>::value_type;
197202

198203
future<T> Start() {
@@ -256,8 +261,11 @@ class AsyncRetryLoopImpl
256261
}
257262

258263
void OnAttempt(T result) {
259-
// A successful attempt, set the value and finish the loop.
260-
if (result.ok()) return SetDone(std::move(result));
264+
// If the attempt is successful and satisfies the attempt predicate, if
265+
// provided, set the value and finish the loop.
266+
if (result.ok() && (!attempt_predicate_ || attempt_predicate_(result))) {
267+
return SetDone(std::move(result));
268+
}
261269
// Some kind of failure, first verify that it is retryable.
262270
last_status_ = GetResultStatus(std::move(result));
263271
auto delay =
@@ -325,6 +333,8 @@ class AsyncRetryLoopImpl
325333
CallContext call_context_;
326334
Status last_status_;
327335
promise<T> result_;
336+
std::function<bool(typename FutureValueType<ReturnType>::value_type const&)>
337+
attempt_predicate_;
328338

329339
// Only the following variables require synchronization, as they coordinate
330340
// the work between the retry loop (which would be lock-free) and the cancel
@@ -339,17 +349,23 @@ class AsyncRetryLoopImpl
339349
/**
340350
* Create the right AsyncRetryLoopImpl object and start the retry loop on it.
341351
*/
342-
template <typename Functor, typename Request, typename RetryPolicyType,
343-
std::enable_if_t<google::cloud::internal::is_invocable<
344-
Functor, google::cloud::CompletionQueue&,
345-
std::shared_ptr<grpc::ClientContext>,
346-
ImmutableOptions, Request const&>::value,
347-
int> = 0>
348-
auto AsyncRetryLoop(std::unique_ptr<RetryPolicyType> retry_policy,
349-
std::unique_ptr<BackoffPolicy> backoff_policy,
350-
Idempotency idempotency, google::cloud::CompletionQueue cq,
351-
Functor&& functor, ImmutableOptions options,
352-
Request request, char const* location)
352+
template <
353+
typename Functor, typename Request, typename RetryPolicyType,
354+
std::enable_if_t<google::cloud::internal::is_invocable<
355+
Functor, google::cloud::CompletionQueue&,
356+
std::shared_ptr<grpc::ClientContext>, ImmutableOptions,
357+
Request const&>::value,
358+
int> = 0,
359+
typename ReturnType = google::cloud::internal::invoke_result_t<
360+
Functor, google::cloud::CompletionQueue&,
361+
std::shared_ptr<grpc::ClientContext>, ImmutableOptions, Request const&>>
362+
auto AsyncRetryLoop(
363+
std::unique_ptr<RetryPolicyType> retry_policy,
364+
std::unique_ptr<BackoffPolicy> backoff_policy, Idempotency idempotency,
365+
google::cloud::CompletionQueue cq, Functor&& functor,
366+
ImmutableOptions options, Request request, char const* location,
367+
std::function<bool(typename FutureValueType<ReturnType>::value_type const&)>
368+
attempt_predicate = {})
353369
-> google::cloud::internal::invoke_result_t<
354370
Functor, google::cloud::CompletionQueue&,
355371
std::shared_ptr<grpc::ClientContext>, ImmutableOptions,
@@ -358,7 +374,7 @@ auto AsyncRetryLoop(std::unique_ptr<RetryPolicyType> retry_policy,
358374
std::make_shared<AsyncRetryLoopImpl<Functor, Request, RetryPolicyType>>(
359375
std::move(retry_policy), std::move(backoff_policy), idempotency,
360376
std::move(cq), std::forward<Functor>(functor), options,
361-
std::move(request), location);
377+
std::move(request), location, std::move(attempt_predicate));
362378
return loop->Start();
363379
}
364380

google/cloud/internal/async_retry_loop_test.cc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,32 @@ TEST(AsyncRetryLoopTest, TransientThenSuccess) {
160160
EXPECT_EQ(84, *actual);
161161
}
162162

163+
TEST(AsyncRetryLoopTest, TransientPredicateThenSuccess) {
164+
AutomaticallyCreatedBackgroundThreads background;
165+
::testing::MockFunction<bool(StatusOr<int>)> mock_predicate;
166+
EXPECT_CALL(mock_predicate, Call)
167+
.WillOnce([](StatusOr<int> const&) { return false; })
168+
.WillOnce([](StatusOr<int> const&) { return false; })
169+
.WillOnce([](StatusOr<int> const&) { return true; });
170+
171+
auto pending = AsyncRetryLoop(
172+
TestRetryPolicy(), TestBackoffPolicy(), Idempotency::kIdempotent,
173+
background.cq(),
174+
[&](google::cloud::CompletionQueue&, auto,
175+
ImmutableOptions const& options, int request) {
176+
EXPECT_EQ(options->get<TestOption>(), "TransientPredicateThenSuccess");
177+
return make_ready_future(StatusOr<int>(2 * request));
178+
},
179+
MakeImmutableOptions(
180+
Options{}.set<TestOption>("TransientPredicateThenSuccess")),
181+
42, "error message", mock_predicate.AsStdFunction());
182+
183+
OptionsSpan overlay(Options{}.set<TestOption>("uh-oh"));
184+
StatusOr<int> actual = pending.get();
185+
ASSERT_THAT(actual.status(), IsOk());
186+
EXPECT_EQ(84, *actual);
187+
}
188+
163189
TEST(AsyncRetryLoopTest, ReturnJustStatus) {
164190
int counter = 0;
165191
AutomaticallyCreatedBackgroundThreads background;

0 commit comments

Comments
 (0)