Skip to content

Commit f88ce47

Browse files
committed
impl(v3): add attempt predicate to AsyncRetryLoop
1 parent 9429a07 commit f88ce47

2 files changed

Lines changed: 71 additions & 24 deletions

File tree

google/cloud/internal/async_retry_loop.h

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -170,16 +170,23 @@ struct FutureValueType<future<T>> {
170170
* functions. If the value is visible, the retry loop will stop on the next
171171
* callback and/or before the next request or timer is issued.
172172
*/
173-
template <typename Functor, typename Request, typename RetryPolicyType>
173+
template <
174+
typename Functor, typename Request, typename RetryPolicyType,
175+
typename ReturnType = google::cloud::internal::invoke_result_t<
176+
Functor, google::cloud::CompletionQueue&,
177+
std::shared_ptr<grpc::ClientContext>, ImmutableOptions, Request const&>>
174178
class AsyncRetryLoopImpl
175179
: public std::enable_shared_from_this<
176180
AsyncRetryLoopImpl<Functor, Request, RetryPolicyType>> {
177181
public:
178-
AsyncRetryLoopImpl(std::unique_ptr<RetryPolicyType> retry_policy,
179-
std::unique_ptr<BackoffPolicy> backoff_policy,
180-
Idempotency idempotency, google::cloud::CompletionQueue cq,
181-
Functor&& functor, ImmutableOptions options,
182-
Request request, char const* location)
182+
AsyncRetryLoopImpl(
183+
std::unique_ptr<RetryPolicyType> retry_policy,
184+
std::unique_ptr<BackoffPolicy> backoff_policy, Idempotency idempotency,
185+
google::cloud::CompletionQueue cq, Functor&& functor,
186+
ImmutableOptions options, Request request, char const* location,
187+
std::function<
188+
bool(typename FutureValueType<ReturnType>::value_type const&)>
189+
attempt_predicate = {})
183190
: retry_policy_(std::move(retry_policy)),
184191
backoff_policy_(std::move(backoff_policy)),
185192
idempotency_(idempotency),
@@ -188,11 +195,9 @@ class AsyncRetryLoopImpl
188195
functor_(std::forward<Functor>(functor)),
189196
request_(std::move(request)),
190197
location_(location),
191-
call_context_(std::move(options)) {}
198+
call_context_(std::move(options)),
199+
attempt_predicate_(std::move(attempt_predicate)) {}
192200

193-
using ReturnType = ::google::cloud::internal::invoke_result_t<
194-
Functor, google::cloud::CompletionQueue&,
195-
std::shared_ptr<grpc::ClientContext>, ImmutableOptions, Request const&>;
196201
using T = typename FutureValueType<ReturnType>::value_type;
197202

198203
future<T> Start() {
@@ -256,8 +261,16 @@ class AsyncRetryLoopImpl
256261
}
257262

258263
void OnAttempt(T result) {
259-
// A successful attempt, set the value and finish the loop.
260-
if (result.ok()) return SetDone(std::move(result));
264+
if (attempt_predicate_) {
265+
// A successful attempt that satisfies the predicate, set the value and
266+
// finish the loop.
267+
if (result.ok() && attempt_predicate_(result)) {
268+
return SetDone(std::move(result));
269+
}
270+
} else {
271+
// A successful attempt, set the value and finish the loop.
272+
if (result.ok()) return SetDone(std::move(result));
273+
}
261274
// Some kind of failure, first verify that it is retryable.
262275
last_status_ = GetResultStatus(std::move(result));
263276
auto delay =
@@ -325,6 +338,8 @@ class AsyncRetryLoopImpl
325338
CallContext call_context_;
326339
Status last_status_;
327340
promise<T> result_;
341+
std::function<bool(typename FutureValueType<ReturnType>::value_type const&)>
342+
attempt_predicate_;
328343

329344
// Only the following variables require synchronization, as they coordinate
330345
// the work between the retry loop (which would be lock-free) and the cancel
@@ -339,17 +354,23 @@ class AsyncRetryLoopImpl
339354
/**
340355
* Create the right AsyncRetryLoopImpl object and start the retry loop on it.
341356
*/
342-
template <typename Functor, typename Request, typename RetryPolicyType,
343-
std::enable_if_t<google::cloud::internal::is_invocable<
344-
Functor, google::cloud::CompletionQueue&,
345-
std::shared_ptr<grpc::ClientContext>,
346-
ImmutableOptions, Request const&>::value,
347-
int> = 0>
348-
auto AsyncRetryLoop(std::unique_ptr<RetryPolicyType> retry_policy,
349-
std::unique_ptr<BackoffPolicy> backoff_policy,
350-
Idempotency idempotency, google::cloud::CompletionQueue cq,
351-
Functor&& functor, ImmutableOptions options,
352-
Request request, char const* location)
357+
template <
358+
typename Functor, typename Request, typename RetryPolicyType,
359+
std::enable_if_t<google::cloud::internal::is_invocable<
360+
Functor, google::cloud::CompletionQueue&,
361+
std::shared_ptr<grpc::ClientContext>, ImmutableOptions,
362+
Request const&>::value,
363+
int> = 0,
364+
typename ReturnType = google::cloud::internal::invoke_result_t<
365+
Functor, google::cloud::CompletionQueue&,
366+
std::shared_ptr<grpc::ClientContext>, ImmutableOptions, Request const&>>
367+
auto AsyncRetryLoop(
368+
std::unique_ptr<RetryPolicyType> retry_policy,
369+
std::unique_ptr<BackoffPolicy> backoff_policy, Idempotency idempotency,
370+
google::cloud::CompletionQueue cq, Functor&& functor,
371+
ImmutableOptions options, Request request, char const* location,
372+
std::function<bool(typename FutureValueType<ReturnType>::value_type const&)>
373+
attempt_predicate = {})
353374
-> google::cloud::internal::invoke_result_t<
354375
Functor, google::cloud::CompletionQueue&,
355376
std::shared_ptr<grpc::ClientContext>, ImmutableOptions,
@@ -358,7 +379,7 @@ auto AsyncRetryLoop(std::unique_ptr<RetryPolicyType> retry_policy,
358379
std::make_shared<AsyncRetryLoopImpl<Functor, Request, RetryPolicyType>>(
359380
std::move(retry_policy), std::move(backoff_policy), idempotency,
360381
std::move(cq), std::forward<Functor>(functor), options,
361-
std::move(request), location);
382+
std::move(request), location, std::move(attempt_predicate));
362383
return loop->Start();
363384
}
364385

google/cloud/internal/async_retry_loop_test.cc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,32 @@ TEST(AsyncRetryLoopTest, TransientThenSuccess) {
160160
EXPECT_EQ(84, *actual);
161161
}
162162

163+
TEST(AsyncRetryLoopTest, TransientPredicateThenSuccess) {
164+
AutomaticallyCreatedBackgroundThreads background;
165+
::testing::MockFunction<bool(StatusOr<int>)> mock_predicate;
166+
EXPECT_CALL(mock_predicate, Call)
167+
.WillOnce([](StatusOr<int> const&) { return false; })
168+
.WillOnce([](StatusOr<int> const&) { return false; })
169+
.WillOnce([](StatusOr<int> const&) { return true; });
170+
171+
auto pending = AsyncRetryLoop(
172+
TestRetryPolicy(), TestBackoffPolicy(), Idempotency::kIdempotent,
173+
background.cq(),
174+
[&](google::cloud::CompletionQueue&, auto,
175+
ImmutableOptions const& options, int request) {
176+
EXPECT_EQ(options->get<TestOption>(), "TransientPredicateThenSuccess");
177+
return make_ready_future(StatusOr<int>(2 * request));
178+
},
179+
MakeImmutableOptions(
180+
Options{}.set<TestOption>("TransientPredicateThenSuccess")),
181+
42, "error message", mock_predicate.AsStdFunction());
182+
183+
OptionsSpan overlay(Options{}.set<TestOption>("uh-oh"));
184+
StatusOr<int> actual = pending.get();
185+
ASSERT_THAT(actual.status(), IsOk());
186+
EXPECT_EQ(84, *actual);
187+
}
188+
163189
TEST(AsyncRetryLoopTest, ReturnJustStatus) {
164190
int counter = 0;
165191
AutomaticallyCreatedBackgroundThreads background;

0 commit comments

Comments
 (0)