diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 4f047d9e9d..b7e6d6580f 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -44,6 +44,7 @@ import cats.effect.std.{ Backpressure, Console, Env, + Retry, SecureRandom, Supervisor, SystemProperties, @@ -646,6 +647,45 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] { (FiberIO[A @uncheckedVariance], OutcomeIO[B])]] = IO.racePair(this, that) + /** + * Evaluates the current IO with the given retry `policy`. + * + * @example + * {{{ + * val policy = Retry.exponentialBackoff[IO, Throwable](1.second).withMaxRetries(10) + * io.retry(policy) + * }}} + * + * @param policy + * the policy to use + */ + def retry(policy: Retry[IO, Throwable]): IO[A] = + Retry.retry(policy)(this) + + /** + * Evaluates the current IO with the given retry `policy`. + * + * @example + * {{{ + * val policy = Retry.exponentialBackoff[IO, Throwable](1.second).withMaxRetries(10) + * io.retry( + * policy, + * (status, err, decision) => IO.println(s"Attempt $${status.retriesTotal}, error: $${err.getMessage}, next: $$decision") + * ) + * }}} + * + * @param policy + * the policy to use + * + * @param onError + * the effect to invoke on every retry decision + */ + def retry( + policy: Retry[IO, Throwable], + onError: (Retry.Status, Throwable, Retry.Decision) => IO[Unit] + ): IO[A] = + Retry.retry(policy, onError)(this) + /** * Inverse of `attempt` * diff --git a/core/shared/src/main/scala/cats/effect/syntax/package.scala b/core/shared/src/main/scala/cats/effect/syntax/package.scala index de4d679df5..3beaaf56f4 100644 --- a/core/shared/src/main/scala/cats/effect/syntax/package.scala +++ b/core/shared/src/main/scala/cats/effect/syntax/package.scala @@ -29,5 +29,6 @@ package object syntax { object clock extends kernel.syntax.ClockSyntax object supervisor extends std.syntax.SupervisorSyntax + object retry extends std.syntax.RetrySyntax object dispatcher extends DispatcherSyntax } diff --git a/std/shared/src/main/scala/cats/effect/std/Retry.scala b/std/shared/src/main/scala/cats/effect/std/Retry.scala new file mode 100644 index 0000000000..542df4299d --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/Retry.scala @@ -0,0 +1,769 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std + +import cats.{~>, Applicative, Monad, Semigroup, Show} +import cats.effect.kernel.GenTemporal +import cats.syntax.apply._ +import cats.syntax.flatMap._ +import cats.syntax.foldable._ +import cats.syntax.functor._ +import cats.syntax.monadError._ + +import scala.concurrent.duration._ +import scala.reflect.{classTag, ClassTag} + +/** + * Glossary: + * - individual delay - the delay between retries + * - cumulative delay - the total delay accumulated across all retries + * + * ==Usage== + * + * ===Retry on all errors=== + * + * {{{ + * val policy = Retry + * .exponentialBackoff[IO, Throwable](1.second) + * .withMaxRetries(10) + * + * // retries 10 times at most using an exponential backoff strategy + * IO.raiseError(new RuntimeException("oops")).retry(policy) + * }}} + * + * ===Retry on some errors (e.g. TimeoutException)=== + * + * {{{ + * val policy = Retry + * .exponentialBackoff[IO, Throwable](1.second) + * .withMaxRetries(10) + * .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[TimeoutException]) + * + * // retries 10 times at most using an exponential backoff strategy + * IO.raiseError(new TimeoutException("timeout")).retry(policy) + * + * // gives up immediately + * IO.raiseError(new RuntimeException("oops")).retry(policy) + * }}} + * + * ===Retry on all errors except the TimeoutException=== + * + * {{{ + * val policy = Retry + * .exponentialBackoff[IO, Throwable](1.second) + * .withMaxRetries(10) + * .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[TimeoutException]) + * + * // retries 10 times at most using an exponential backoff strategy + * IO.raiseError(new RuntimeException("oops")).retry(policy) + * + * // gives up immediately + * IO.raiseError(new TimeoutException("timeout")).retry(policy) + * }}} + */ +sealed trait Retry[F[_], E] { + import Retry.{Decision, ErrorMatcher, Status} + + /** + * The name of the policy. The name is used for informative purposes. + */ + def name: String + + /** + * Decides whether the action should be retried or not. + * + * @param status + * the current status + * + * @param error + * the current error + */ + def decide(status: Status, error: E): F[Decision] + + /** + * Applies the `transform` function to the result of the [[decide]] invocation. + * + * @param transform + * the transformation to apply + */ + def withTransformedDecision(transform: (Status, E, Decision) => Decision): Retry[F, E] + + /** + * The policy will retry exclusively when a caught error satisfies the `matcher`. + * + * @example + * the policy will retry on `TimeoutException` and give up : + * {{{ + * val timeoutExceptionOnly = Retry + * .exponential[IO, Throwable](1.second) + * .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[TimeoutException]) + * + * // will retry using exponential backoff strategy + * Retry.retry(timeoutExceptionOnly)(IO.raiseError(new TimeoutException("oops"))) + * + * // will never retry and give up immediately + * Retry.retry(timeoutExceptionOnly)(IO.raiseError(new RuntimeException("oops"))) + * }}} + * + * @param matcher + * the matcher to use + */ + def withErrorMatcher(matcher: ErrorMatcher[F, E]): Retry[F, E] + + /** + * Sets the name for the policy. The name is used for informative purposes. + * + * @param name + * the name to use + */ + def withName(name: String): Retry[F, E] + + /** + * Combines `this` policy with the `other` policy. + * + * The rules: + * - Retry will happen when '''both''' policies decides to retry + * - If both policies decide to retry, the '''maximum''' of the two delays will be chosen + * + * @example + * + * {{{ + * val delay1 = Retry.constantDelay[IO, Throwable](1.second) + * val delay2 = Retry.constantDelay[IO, Throwable](2.second) + * + * // will always retry after 2 second + * val policy = alwaysGiveUp.and(alwaysRetry) + * }}} + * + * @example + * + * {{{ + * val alwaysGiveUp = Retry.alwaysGiveUp[IO, Throwable] + * val alwaysRetry = Retry.constantDelay[IO, Throwable](1.second) + * + * // will never retry + * val policy = alwaysGiveUp.and(alwaysRetry) + * }}} + * + * @param other + * the `other` policy to combine `this` policy with + */ + def and(other: Retry[F, E]): Retry[F, E] + + /** + * Combines `this` policy with the `other` policy. + * + * The rules: + * - Retry will happen when either policy decides to retry + * - If both policies decide to retry, the '''minimum''' of the two delays will be chosen + * + * @example + * + * {{{ + * val alwaysGiveUp = Retry.alwaysGiveUp[IO, Throwable] + * val alwaysRetry = Retry.constantDelay[IO, Throwable](1.second) + * + * // will always retry after 1 second + * val policy = alwaysGiveUp.or(alwaysRetry) + * }}} + * + * @param other + * the `other` policy to combine `this` policy with + */ + def or(other: Retry[F, E]): Retry[F, E] + + /** + * Combines `this` policy with the `other` policy. + * + * The rules: + * - Retry will happen when '''both''' policies decides to retry + * - If both policies decide to retry, the '''maximum''' of the two delays will be chosen + * + * @example + * + * {{{ + * val delay1 = Retry.constantDelay[IO, Throwable](1.second) + * val delay2 = Retry.constantDelay[IO, Throwable](2.second) + * + * // will always retry after 2 second + * val policy = alwaysGiveUp && alwaysRetry + * }}} + * + * @example + * the policy will never retry: + * {{{ + * val alwaysGiveUp = Retry.alwaysGiveUp[IO, Throwable] + * val alwaysRetry = Retry.constantDelay[IO, Throwable](1.second) + * + * val policy = alwaysGiveUp && alwaysRetry + * }}} + * + * @see + * [[and]] + * + * @param other + * the `other` policy to combine `this` policy with + */ + final def &&(other: Retry[F, E]): Retry[F, E] = and(other) + + /** + * Combines `this` policy with the `other` policy. + * + * The rules: + * - Retry will happen when either policy decides to retry + * - If both policies decide to retry, the '''minimum''' of the two delays will be chosen + * + * @example + * the policy will always retry after 1 second + * {{{ + * val alwaysGiveUp = Retry.alwaysGiveUp[IO, Throwable] + * val alwaysRetry = Retry.constantDelay[IO, Throwable](1.second) + * + * val policy = alwaysGiveUp || alwaysRetry + * }}} + * + * @see + * [[or]] + * + * @param other + * the `other` policy to combine `this` policy with + */ + final def ||(other: Retry[F, E]): Retry[F, E] = or(other) + + /** + * Caps the '''individual''' delay between attempts by the given value. + * + * @example + * the policy will retry '''indefinitely''' with 2 seconds delay between attempts: + * {{{ + * val policy = Retry + * .constantDelay[IO, Throwable](3.second) + * .withCappedDelay(2.seconds) + * }}} + * + * @param cap + * the cap of the '''individual''' delay + */ + final def withCappedDelay(cap: FiniteDuration): Retry[F, E] = + withTransformedDecision { + case (_, _, decision: Decision.Retry) => + Decision.retry(decision.delay.min(cap)) + + case (_, _, _: Decision.GiveUp) => + Decision.giveUp + }.withName(s"CapDelay($name, cap=$cap)") + + /** + * Limits the number of maximum retries. + * + * @example + * the policy will retry 5 times at most: + * {{{ + * val policy = Retry + * .constantDelay[IO, Throwable](1.second) + * .withMaxRetries(5) + * }}} + * + * @param max + * the maximum number of retries + */ + final def withMaxRetries(max: Int): Retry[F, E] = + withTransformedDecision { + case (status, _, decision: Decision.Retry) => + if (status.retriesTotal >= max) Decision.giveUp else decision + + case (_, _, _: Decision.GiveUp) => + Decision.giveUp + }.withName(s"MaxRetries($name, max=$max)") + + /** + * Gives up once the '''individual''' delay is greater than the given `threshold`. + * + * @example + * the policy will give up once the individual delay between attempts will be more than 5 + * seconds: + * {{{ + * val policy = Retry + * .exponential[IO, Throwable](1.second) + * .withMaxDelay(5.seconds) + * }}} + * + * @example + * the policy will retry '''indefinitely''' because individual delay is never greater than + * the threshold: + * {{{ + * val policy = Retry + * .constantDelay(1.second) + * .withMaxDelay(2.seconds) + * }}} + * + * @param threshold + * the maximum '''individual''' delay + */ + final def withMaxDelay(threshold: FiniteDuration): Retry[F, E] = + withTransformedDecision { + case (_, _, decision: Decision.Retry) => + if (decision.delay >= threshold) Decision.giveUp else decision + + case (_, _, _: Decision.GiveUp) => + Decision.giveUp + }.withName(s"MaxDelay($name, threshold=$threshold)") + + /** + * Gives up once the '''cumulative''' delay is greater than the given `threshold`. + * + * @example + * the policy will give up once the cumulative delay between attempts will be more than 5 + * seconds: + * {{{ + * val policy = Retry + * .exponential[IO, Throwable](1.second) + * .withCumulativeDelay(5.seconds) + * }}} + * + * @param threshold + * the maximum '''cumulative''' delay + */ + final def withMaxCumulativeDelay(threshold: FiniteDuration): Retry[F, E] = + withTransformedDecision { + case (status, _, decision: Decision.Retry) => + if (status.cumulativeDelay + decision.delay >= threshold) Decision.giveUp + else decision + + case (_, _, _: Decision.GiveUp) => + Decision.giveUp + }.withName(s"MaxCumulativeDelay($name, threshold=$threshold)") + + final def mapK[G[_]: Monad](f: F ~> G): Retry[G, E] = + Retry.named(s"MapK($name)")((s, e) => f(decide(s, e))) + + override def toString: String = + name +} + +object Retry { + + /* Represents the status of a retry operation. */ + sealed trait Status { + /* The total number of retries that have occurred. */ + def retriesTotal: Int + + /* The total delay accumulated across all retries. */ + def cumulativeDelay: FiniteDuration + + def withRetry(delay: FiniteDuration): Status + } + + object Status { + private[std] val initial: Status = Impl(0, Duration.Zero) + + private[std] def apply( + retriesTotal: Int, + cumulativeDelay: FiniteDuration + ): Status = + Impl(retriesTotal, cumulativeDelay) + + private final case class Impl( + retriesTotal: Int, + cumulativeDelay: FiniteDuration + ) extends Status { + def withRetry(delay: FiniteDuration): Status = + copy( + retriesTotal = retriesTotal + 1, + cumulativeDelay = cumulativeDelay + delay + ) + } + } + + /* Represents the retry decision. */ + sealed trait Decision extends Product with Serializable + object Decision { + + /* No more retries. */ + sealed trait GiveUp extends Decision + + /* The operation must be retried after the delay. */ + sealed trait Retry extends Decision { + /* How long to wait before the next attempt. */ + def delay: FiniteDuration + } + + def giveUp: Decision = GiveUp + + def retry(delay: FiniteDuration): Decision = RetryImpl(delay) + + private case object GiveUp extends GiveUp + private final case class RetryImpl(delay: FiniteDuration) extends Retry + } + + /** + * The error matcher decides whether the retry decision should be calculated for the raised + * error or not. + */ + sealed trait ErrorMatcher[F[_], -E] { + def matches(e: E): F[Boolean] + } + + object ErrorMatcher { + + /** + * Creates an error matcher that matches all errors. + */ + def all[F[_]: Applicative, E]: ErrorMatcher[F, E] = + new Impl[F, E]({ case _ => Applicative[F].pure(true) }) + + /** + * Creates an error matcher using the given `matcher` under the hood. + * + * @param matcher + * the matcher to use + */ + def matches[F[_]: Applicative, E]( + matcher: PartialFunction[E, Boolean] + ): ErrorMatcher[F, E] = + new Impl[F, E](matcher.andThen(b => Applicative[F].pure(b))) + + /** + * Creates an error matcher using the given `matcher` under the hood. + * + * @param matcher + * the matcher to use + */ + def when[F[_]: Applicative, E]( + matcher: PartialFunction[E, F[Boolean]] + ): ErrorMatcher[F, E] = + new Impl[F, E](matcher) + + /** + * A partially-applied constructor. + * + * @example + * {{{ + * val onlyTimeoutException = ErrorMatcher[IO, Throwable].only[TimeoutException] + * val allButTimeoutException = ErrorMatcher[IO, Throwable].except[TimeoutException] + * }}} + */ + def apply[F[_], E]: ApplyPartiallyApplied[F, E] = + new ApplyPartiallyApplied(dummy = true) + + final class ApplyPartiallyApplied[F[_], E](private val dummy: Boolean) extends AnyVal { + + /** + * Creates a new error matcher the matches only errors of type `E1`. + * + * @example + * matches only `TimeoutException` errors: + * {{{ + * val matcher = ErrorMatcher[IO, Throwable].only[TimeoutException] + * }}} + */ + def only[E1 <: E: ClassTag](implicit F: Applicative[F]): ErrorMatcher[F, E] = + matches { case _: E1 => true } + + /** + * Creates a new error matcher the matches all errors except `E1`. + * + * @example + * matches all errors except the `TimeoutException` error: + * {{{ + * val matcher = ErrorMatcher[IO, Throwable].except[TimeoutException] + * }}} + */ + def except[E1 <: E: ClassTag](implicit F: Applicative[F]): ErrorMatcher[F, E] = + matches { case e => !classTag[E1].runtimeClass.isInstance(e) } + } + + private final class Impl[F[_]: Applicative, E]( + matcher: PartialFunction[E, F[Boolean]] + ) extends ErrorMatcher[F, E] { + def matches(e: E): F[Boolean] = + matcher.applyOrElse(e, (_: E) => Applicative[F].pure(false)) + } + } + + sealed trait BackoffMultiplier extends Product with Serializable + object BackoffMultiplier { + + def const(multiplier: Double): BackoffMultiplier = + Const(multiplier) + + def randomized(min: Double, max: Double): BackoffMultiplier = + Randomized(min, max) + + private[Retry] final case class Const(multiplier: Double) extends BackoffMultiplier + private[Retry] final case class Randomized( + min: Double, + max: Double + ) extends BackoffMultiplier + } + + /** + * Evaluates `fa` with the given retry `policy`. + * + * @param policy + * the policy to use + * + * @param fa + * the effect + */ + def retry[F[_], A, E](policy: Retry[F, E])(fa: F[A])(implicit F: GenTemporal[F, E]): F[A] = + doRetry(policy, None)(fa) + + /** + * Evaluates `fa` with the given retry `policy`. + * + * @param policy + * the policy to use + * + * @param onError + * the effect to invoke on every retry decision + * + * @param fa + * the effect + */ + def retry[F[_], A, E]( + policy: Retry[F, E], + onError: (Status, E, Decision) => F[Unit] + )(fa: F[A])(implicit F: GenTemporal[F, E]): F[A] = + doRetry(policy, Some(onError))(fa) + + /** + * The return policy that always gives up. + */ + def alwaysGiveUp[F[_]: Monad, E]: Retry[F, E] = + const("AlwaysGiveUp", Decision.giveUp) + + /** + * The policy that uses the given `delay` between attempts. + * + * @param delay + * the delay to use between retries + */ + def constantDelay[F[_]: Monad, E](delay: FiniteDuration): Retry[F, E] = + const(s"ConstantDelay($delay)", Decision.retry(delay)) + + /** + * The policy with a fixed number of retries. + * + * @example + * {{{ + * val policy = Retry.constantDelay[IO, Throwable](1.second) && Retry.maxRetries[IO, Throwable](10) + * }}} + * + * @param max + * the maximum number of retries + */ + def maxRetries[F[_]: Monad, E](max: Int): Retry[F, E] = + named(s"MaxRetries($max)") { (status, _) => + Monad[F].pure( + if (status.retriesTotal >= max) Decision.giveUp else Decision.retry(Duration.Zero) + ) + } + + /** + * The policy that uses exponential backoff strategy with the given `baseDelay`. The backoff + * multiplier is `2.0` and randomization factor `0.5`. The delays are always jittered. + * + * The simplified formula is: + * {{{ + * retry_delay = randomization_factor * random_double[0:1] * base_delay * (backoff_multiplier ^ retry_attempt) + * }}} + * + * @example + * an exponential backoff with maximum of 10 attempts: + * {{{ + * val policy = Retry + * .exponentialBackoff[IO, Throwable](1.second) + * .withMaxRetries(10) + * }}} + * + * @see + * [[https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/]] + * + * @param baseDelay + * the base delay to use + */ + def exponentialBackoff[F[_]: Monad: Random, E](baseDelay: FiniteDuration): Retry[F, E] = + exponentialBackoff(baseDelay, Some(BackoffMultiplier.const(2.0)), Some(0.5)) + + /** + * The policy that uses exponential backoff strategy with the given configuration. The delays + * are always jittered. + * + * The simplified formula is: + * {{{ + * retry_delay = randomization_factor * random_double[0:1] * base_delay * (backoff_multiplier ^ retry_attempt) + * }}} + * + * @example + * an exponential backoff with maximum of 10 attempts: + * {{{ + * val policy = Retry + * .exponentialBackoff[IO, Throwable](1.second, Some(Retry.BackoffMultiplier.const(1.5)), Some(0.75)) + * .withMaxRetries(10) + * }}} + * + * @see + * [[https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/]] + * + * @param baseDelay + * the base delay to use + * + * @param backoffMultiplier + * the backoff multiplier to use. A random double between `0.0` and `1.0` will be used if + * undefined + * + * @param randomizationFactor + * the randomization factor to use. The `1.0` will be used if undefined + */ + def exponentialBackoff[F[_]: Monad: Random, E]( + baseDelay: FiniteDuration, + backoffMultiplier: Option[BackoffMultiplier], + randomizationFactor: Option[Double] + ): Retry[F, E] = { + val name = (backoffMultiplier, randomizationFactor) match { + case (Some(multiplier), Some(factor)) => + s"Backoff(baseDelay=$baseDelay, multiplier=$multiplier, randomizationFactor=$factor)" + + case (Some(multiplier), None) => + s"Backoff(baseDelay=$baseDelay, multiplier=$multiplier)" + + case (None, Some(factor)) => + s"Backoff(baseDelay=$baseDelay, randomizationFactor=$factor)" + + case (None, None) => + s"Backoff(baseDelay=$baseDelay)" + } + + named[F, E](name) { (status, _) => + val multiplier = backoffMultiplier match { + case Some(BackoffMultiplier.Const(multiplier)) => Monad[F].pure(multiplier) + case Some(BackoffMultiplier.Randomized(min, max)) => Random[F].betweenDouble(min, max) + case None => Random[F].nextDouble + } + + (multiplier, Random[F].nextDouble).mapN { (multiplier, random) => + val factor = randomizationFactor.getOrElse(1.0) + val e = math.pow(multiplier, status.retriesTotal.toDouble).toLong + val backoff = safeMultiply(baseDelay, e) + val delay = factor * random * backoff.toNanos + + Decision.retry(delay.toLong.nanos) + } + } + } + + /** + * Creates a policy that uses the given decider function. + * + * @param decider + * the function to decide whether to continue + */ + def apply[F[_]: Monad, E](decider: (Status, E) => F[Decision]): Retry[F, E] = + named("f()")(decider) + + /** + * Creates a policy with the given name and decider function. + * + * @param name + * the name of the policy. The name is used for informative purposes + * + * @param decider + * the function to decide whether to continue + */ + def named[F[_]: Monad, E](name: String)(decider: (Status, E) => F[Decision]): Retry[F, E] = + RetryImpl(name, decider, ErrorMatcher.all) + + implicit def retrySemigroup[F[_], E]: Semigroup[Retry[F, E]] = + (x, y) => x && y + + implicit def retryShow[F[_], E]: Show[Retry[F, E]] = + retry => retry.name + + private def doRetry[F[_], A, E]( + retry: Retry[F, E], + onRetry: Option[(Status, E, Decision) => F[Unit]] + )(fa: F[A])(implicit F: GenTemporal[F, E]): F[A] = { + def onError(status: Status, error: E): F[Either[Status, A]] = + retry + .decide(status, error) + .flatTap(decision => onRetry.traverse_(_(status, error, decision))) + .flatMap { + case retry: Decision.Retry => + F.delayBy(F.pure(Left(status.withRetry(retry.delay))), retry.delay) + case _: Decision.GiveUp => + F.raiseError(error) + } + + F.tailRecM(Status.initial) { status => + fa.redeemWith(error => onError(status, error), success => F.pure(Right(success))) + } + } + + private def const[F[_]: Monad, E](name: String, decision: Decision): Retry[F, E] = { + val d = Monad[F].pure(decision) + named(name)((_, _) => d) + } + + private final case class RetryImpl[F[_]: Monad, E]( + name: String, + decider: (Status, E) => F[Decision], + errorMatcher: ErrorMatcher[F, E] + ) extends Retry[F, E] { + + def decide(status: Status, error: E): F[Decision] = + errorMatcher.matches(error).ifM(decider(status, error), Monad[F].pure(Decision.giveUp)) + + def and(other: Retry[F, E]): Retry[F, E] = + Retry.named(s"($name && ${other.name})") { (status, error) => + (decide(status, error), other.decide(status, error)).mapN { + case (a: Decision.Retry, b: Decision.Retry) => Decision.retry(a.delay.max(b.delay)) + case (_, _) => Decision.giveUp + } + } + + def or(other: Retry[F, E]): Retry[F, E] = + Retry.named(s"($name || ${other.name})") { (status, error) => + (decide(status, error), other.decide(status, error)).mapN { + case (a: Decision.Retry, b: Decision.Retry) => Decision.retry(a.delay.min(b.delay)) + case (_: Decision.GiveUp, other: Decision.Retry) => other + case (that: Decision.Retry, _: Decision.GiveUp) => that + case (_: Decision.GiveUp, _: Decision.GiveUp) => Decision.giveUp + } + } + + def withErrorMatcher(matcher: ErrorMatcher[F, E]): Retry[F, E] = + copy(errorMatcher = matcher) + + def withName(name: String): Retry[F, E] = + copy(name = name) + + def withTransformedDecision(transform: (Status, E, Decision) => Decision): Retry[F, E] = + copy(decider = { (status, error) => + decide(status, error).map(decision => transform(status, error, decision)) + }) + } + + private val LongMax: BigInt = BigInt(Long.MaxValue) + private def safeMultiply(duration: FiniteDuration, multiplier: Long): FiniteDuration = { + val durationNanos = BigInt(duration.toNanos) + val resultNanos = durationNanos * BigInt(multiplier) + val safeResultNanos = resultNanos.min(LongMax) + safeResultNanos.toLong.nanos + } + +} diff --git a/std/shared/src/main/scala/cats/effect/std/syntax/AllSyntax.scala b/std/shared/src/main/scala/cats/effect/std/syntax/AllSyntax.scala index 367d4f5bee..0de080029e 100644 --- a/std/shared/src/main/scala/cats/effect/std/syntax/AllSyntax.scala +++ b/std/shared/src/main/scala/cats/effect/std/syntax/AllSyntax.scala @@ -17,4 +17,4 @@ package cats.effect package std.syntax -trait AllSyntax extends BackpressureSyntax with SupervisorSyntax +trait AllSyntax extends BackpressureSyntax with SupervisorSyntax with RetrySyntax diff --git a/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala b/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala new file mode 100644 index 0000000000..431fe4b9c3 --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std.syntax + +import cats.effect.kernel.GenTemporal +import cats.effect.std.Retry + +trait RetrySyntax { + implicit def retryOps[F[_], A](wrapped: F[A]): RetryOps[F, A] = + new RetryOps(wrapped) +} + +final class RetryOps[F[_], A] private[syntax] (private val fa: F[A]) extends AnyVal { + + def retry[E](policy: Retry[F, E])(implicit F: GenTemporal[F, E]): F[A] = + Retry.retry(policy)(fa) + + def retry[E]( + policy: Retry[F, E], + onError: (Retry.Status, E, Retry.Decision) => F[Unit] + )(implicit F: GenTemporal[F, E]): F[A] = + Retry.retry(policy, onError)(fa) + +} diff --git a/std/shared/src/main/scala/cats/effect/std/syntax/package.scala b/std/shared/src/main/scala/cats/effect/std/syntax/package.scala index 62168d7672..9fa7618c08 100644 --- a/std/shared/src/main/scala/cats/effect/std/syntax/package.scala +++ b/std/shared/src/main/scala/cats/effect/std/syntax/package.scala @@ -22,4 +22,6 @@ package object syntax { object supervisor extends SupervisorSyntax + object retry extends RetrySyntax + } diff --git a/tests/js/src/test/scala/cats/effect/IOPlatformSuite.scala b/tests/js/src/test/scala/cats/effect/IOPlatformSuite.scala index 776d6ef5d0..6db23f6d80 100644 --- a/tests/js/src/test/scala/cats/effect/IOPlatformSuite.scala +++ b/tests/js/src/test/scala/cats/effect/IOPlatformSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Typelevel + * Copyright 2020-2025 Typelevel * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/tests/jvm/src/test/scala/cats/effect/IOPlatformSuite.scala b/tests/jvm/src/test/scala/cats/effect/IOPlatformSuite.scala index 4bd1d94e7f..8bb8a84fbf 100644 --- a/tests/jvm/src/test/scala/cats/effect/IOPlatformSuite.scala +++ b/tests/jvm/src/test/scala/cats/effect/IOPlatformSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Typelevel + * Copyright 2020-2025 Typelevel * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/tests/shared/src/test/scala/cats/effect/std/RetryMTLSuite.scala b/tests/shared/src/test/scala/cats/effect/std/RetryMTLSuite.scala new file mode 100644 index 0000000000..af85836dc9 --- /dev/null +++ b/tests/shared/src/test/scala/cats/effect/std/RetryMTLSuite.scala @@ -0,0 +1,132 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std + +import cats.{Hash, Show} +import cats.data.EitherT +import cats.effect.{BaseSuite, IO, Temporal} +import cats.mtl.Handle +import cats.syntax.all._ + +import scala.concurrent.duration._ + +class RetryMTLSuite extends BaseSuite { + import Retry.{Decision, Status} + + sealed trait Errors + final class Error1 extends Errors + final class Error2 extends Errors + + type RetryAttempt = (Status, Decision, Errors) + + ticked("give up on mismatched errors") { implicit ticker => + type F[A] = EitherT[IO, Errors, A] + + val maxRetries = 2 + val delay = 1.second + + val error = new Error2 + val policy = Retry + .constantDelay[F, Errors](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[F, Errors].only[Error1]) + + val expected: List[RetryAttempt] = List( + (Status(0, Duration.Zero), Decision.giveUp, error) + ) + + val io: F[Unit] = Handle[F, Errors].raise[Errors, Unit](error) + + val run = + for { + ref <- IO.ref(List.empty[RetryAttempt]) + result <- mtlRetry[F, Errors, Unit]( + io, + policy, + (s, e: Errors, d) => EitherT.liftF(ref.update(_ :+ ((s, d, e)))) + ).value + attempts <- ref.get + } yield (result, attempts) + + assertCompleteAs(run, (Left(error), expected)) + } + + ticked("retry only on matching errors") { implicit ticker => + type F[A] = EitherT[IO, Errors, A] + + val maxRetries = 2 + val delay = 1.second + + val error = new Error1 + val policy = Retry + .constantDelay[F, Errors](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[F, Errors].only[Error1]) + + val expected: List[RetryAttempt] = List( + (Status(0, Duration.Zero), Decision.retry(delay), error), + (Status(1, 1.second), Decision.retry(delay), error), + (Status(2, 2.seconds), Decision.giveUp, error) + ) + + val io: F[Unit] = Handle[F, Errors].raise[Errors, Unit](error) + + val run = + for { + ref <- IO.ref(List.empty[RetryAttempt]) + result <- mtlRetry[F, Errors, Unit]( + io, + policy, + (s, e: Errors, d) => EitherT.liftF(ref.update(_ :+ ((s, d, e)))) + ).value + attempts <- ref.get + } yield (result, attempts) + + assertCompleteAs(run, (Left(error), expected)) + } + + private def mtlRetry[F[_], E, A]( + action: F[A], + policy: Retry[F, E], + onRetry: (Status, E, Decision) => F[Unit] + )(implicit F: Temporal[F], H: Handle[F, E]): F[A] = + F.tailRecM(Status.initial) { status => + H.attempt(action).flatMap { + case Left(error) => + policy + .decide(status, error) + .flatTap(decision => onRetry(status, error, decision)) + .flatMap { + case retry: Decision.Retry => + F.delayBy(F.pure(Left(status.withRetry(retry.delay))), retry.delay) + + case _: Decision.GiveUp => + H.raise(error) + } + + case Right(success) => + F.pure(Right(success)) + } + } + + private implicit val outputHash: Hash[(Either[Errors, Unit], List[RetryAttempt])] = + Hash.fromUniversalHashCode + + private implicit val outputShow: Show[(Either[Errors, Unit], List[RetryAttempt])] = + Show.fromToString + +} diff --git a/tests/shared/src/test/scala/cats/effect/std/RetrySuite.scala b/tests/shared/src/test/scala/cats/effect/std/RetrySuite.scala new file mode 100644 index 0000000000..1c9d0da301 --- /dev/null +++ b/tests/shared/src/test/scala/cats/effect/std/RetrySuite.scala @@ -0,0 +1,469 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std + +import cats.{Hash, Show} +import cats.effect.{BaseSuite, IO, Ref} +import cats.syntax.applicative._ +import cats.syntax.functor._ +import cats.syntax.semigroup._ + +import scala.concurrent.duration._ + +class RetrySuite extends BaseSuite { + import Retry.{Decision, Status} + + private class Error1 extends RuntimeException + private class Error2 extends RuntimeException + + private val error: Throwable = new RuntimeException("oops") + private val errorIO: IO[Unit] = IO.raiseError(error) + + ticked("retry until the action succeeds") { implicit ticker => + val delay = 1.second + val policy = Retry.constantDelay[IO, Throwable](delay) + + def action(attempts: Ref[IO, Int]) = + attempts.getAndUpdate(_ + 1).flatMap(total => errorIO.whenA(total < 3)) + + val io = + for { + counter <- IO.ref(0) + result <- action(counter).retry(policy).timed + counterValue <- counter.get + } yield (result._1, counterValue) + + val expectedRetries = 3 + + assertCompleteAs(io, (delay * expectedRetries.toLong, expectedRetries + 1)) + } + + ticked("retry until the policy chooses to give up") { implicit ticker => + val maxRetries = 2 + val policy = Retry.maxRetries[IO, Throwable](maxRetries) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.retry(Duration.Zero)), + RetryAttempt(Status(1, Duration.Zero), Decision.retry(Duration.Zero)), + RetryAttempt(Status(2, Duration.Zero), Decision.giveUp) + ) + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("withCappedDelay - cap the individual delay") { implicit ticker => + val maxRetries = 5 + val delay = 2.second + val capDelay = 1.second + + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withCappedDelay(capDelay) + .withMaxRetries(maxRetries) + + val expected = { + val retries = List.tabulate(maxRetries) { i => + RetryAttempt(Status(i, capDelay * i.toLong), Decision.retry(capDelay)) + } + + retries :+ RetryAttempt(Status(maxRetries, maxRetries * capDelay), Decision.giveUp) + } + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("withMaxRetries - give up once max retries are reached") { implicit ticker => + val maxRetries = 5 + val delay = 2.second + + val policy = + Retry.constantDelay[IO, Throwable](delay).withMaxRetries(maxRetries) + + val expected = { + val retries = List.tabulate(maxRetries) { i => + RetryAttempt(Status(i, delay * i.toLong), Decision.retry(delay)) + } + + retries :+ RetryAttempt(Status(maxRetries, maxRetries * delay), Decision.giveUp) + } + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("withMaxDelay - give up once max individual delay is reached") { implicit ticker => + val delay = 1.second + val maxDelay = 5.second + + val policy = + Retry[IO, Throwable] { (status, _) => + IO.pure(Decision.retry(status.retriesTotal * delay)) + }.withMaxDelay(maxDelay) + + val expected = { + val numRetries = (maxDelay.toSeconds / delay.toSeconds).toInt + + val retries = List.tabulate(numRetries) { i => + val cumulativeDelay = + Range(0, i).map(_ * delay).reduceOption(_ + _).getOrElse(Duration.Zero) + + RetryAttempt(Status(i, cumulativeDelay), Decision.retry(i * delay)) + } + + val cumulativeDelay = retries.map(_.status.cumulativeDelay).reduce(_ + _) + retries :+ RetryAttempt(Status(numRetries, cumulativeDelay), Decision.giveUp) + } + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("withMaxCumulativeDelay - cap the max cumulative (total) delay") { implicit t => + val delay = 1.second + val maxDelay = 5.second + val maxRetries = (maxDelay.toSeconds / delay.toSeconds).toInt - 1 + + val policy = + Retry.constantDelay[IO, Throwable](delay).withMaxCumulativeDelay(maxDelay) + + val expected = { + val retries = List.tabulate(maxRetries) { i => + RetryAttempt(Status(i, delay * i.toLong), Decision.retry(delay)) + } + + retries :+ RetryAttempt(Status(maxRetries, delay * maxRetries.toLong), Decision.giveUp) + } + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("&& (and) - give up when one policy is decided to give up") { implicit ticker => + val alwaysGiveUp = Retry.alwaysGiveUp[IO, Throwable] + val alwaysRetry = Retry.constantDelay[IO, Throwable](1.second) + + val expected = Decision.giveUp + + assertCompleteAs((alwaysGiveUp |+| alwaysRetry).decide(Status.initial, error), expected) + } + + ticked("&& (and) - choose the maximum delay if both policies decided to retry") { + implicit ticker => + val delay1 = Retry.constantDelay[IO, Throwable](1.second) + val delay2 = Retry.constantDelay[IO, Throwable](2.seconds) + + val expected = Decision.retry(2.seconds) + + assertCompleteAs((delay1 |+| delay2).decide(Status.initial, error), expected) + } + + ticked("|| (or) - retry when one policy decided to retry") { implicit ticker => + val alwaysGiveUp = Retry.alwaysGiveUp[IO, Throwable] + val alwaysRetry = Retry.constantDelay[IO, Throwable](1.second) + + val expected = Decision.retry(1.second) + + assertCompleteAs((alwaysGiveUp || alwaysRetry).decide(Status.initial, error), expected) + } + + ticked("|| (or) - choose the minimum delay if both policies decided to retry") { + implicit ticker => + val delay1 = Retry.constantDelay[IO, Throwable](1.second) + val delay2 = Retry.constantDelay[IO, Throwable](2.seconds) + + val expected = Decision.retry(1.second) + + assertCompleteAs((delay1 || delay2).decide(Status.initial, error), expected) + } + + ticked("withErrorMatcher - retry only on matched errors") { implicit ticker => + val maxRetries = 2 + val delay = 1.second + + val error = new Error1 + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), + RetryAttempt(Status(1, 1.second), Decision.retry(delay), error), + RetryAttempt(Status(2, 2.seconds), Decision.giveUp, error) + ) + + assertCompleteAs(run(policy)(IO.raiseError(error)), expected) + } + + ticked("withErrorMatcher - give up on mismatched errors") { implicit ticker => + val maxRetries = 5 + val delay = 1.second + + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.giveUp) + ) + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("withErrorMatcher - keep the last matcher - give up on mismatched") { + implicit ticker => + val delay = 1.second + val maxRetries = 1 + + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error2]) + + val error = new Error1 + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.giveUp, error) + ) + + assertCompleteAs(run(policy)(IO.raiseError(error)), expected) + } + + ticked("withErrorMatcher - keep the last matcher - retry on matching errors") { + implicit ticker => + val delay = 1.second + val maxRetries = 2 + + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error2]) + + val error = new Error2 + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), + RetryAttempt(Status(1, 1.second), Decision.retry(delay), error), + RetryAttempt(Status(2, 2.seconds), Decision.giveUp, error) + ) + + assertCompleteAs(run(policy)(IO.raiseError(error)), expected) + } + + ticked("ErrorMatcher.except - give up on 'excepted' errors") { implicit ticker => + val maxRetries = 2 + val delay = 1.second + + val error = new Error1 + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[Error1]) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.giveUp, error) + ) + + assertCompleteAs(run(policy)(IO.raiseError(error)), expected) + } + + ticked("ErrorMatcher.except - give up on subtypes") { implicit ticker => + val maxRetries = 2 + val delay = 1.second + + val error = new Error1 + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[RuntimeException]) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.giveUp, error) + ) + + assertCompleteAs(run(policy)(IO.raiseError(error)), expected) + } + + ticked("ErrorMatcher.except - recover on all errors but the 'excepted' one") { + implicit ticker => + val maxRetries = 2 + val delay = 1.second + + val error = new Error2 + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[Error1]) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), + RetryAttempt(Status(1, 1.second), Decision.retry(delay), error), + RetryAttempt(Status(2, 2.seconds), Decision.giveUp, error) + ) + + assertCompleteAs(run(policy)(IO.raiseError(error)), expected) + } + + ticked("exponentialBackoff - use default multiplier (2.0) and randomization factor (0.5)") { + implicit ticker => + val baseDelay = 1.second + val maxRetries = 3 + val policy = + Retry.exponentialBackoff[IO, Throwable](baseDelay).withMaxRetries(maxRetries) + + val expected = calculateExpectedBackoff( + baseDelay = baseDelay, + maxRetries = maxRetries, + multiplier = 2.0, + factor = 0.5 + ) + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("exponentialBackoff - use the given multiplier (1.0) and randomization factor (1.0)") { + implicit ticker => + val baseDelay = 1.second + val maxRetries = 3 + val policy = Retry + .exponentialBackoff[IO, Throwable]( + baseDelay, + Some(Retry.BackoffMultiplier.const(1.0)), + Some(1.0) + ) + .withMaxRetries(maxRetries) + + val expected = calculateExpectedBackoff( + baseDelay = baseDelay, + maxRetries = maxRetries, + multiplier = 1.0, + factor = 1.0 + ) + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("exponentialBackoff - cap the delay at the specific max") { implicit ticker => + val baseDelay = 1.second + val maxRetries = 5 + val maxDelay = 2.seconds + val policy = Retry + .exponentialBackoff[IO, Throwable](baseDelay) + .withCappedDelay(maxDelay) + .withMaxRetries(maxRetries) + + val expected = calculateExpectedBackoff( + baseDelay = baseDelay, + maxRetries = maxRetries, + multiplier = 2.0, + factor = 0.5, + maxDelay = Some(maxDelay) + ) + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("exponentialBackoff - respect the max number of retries") { implicit ticker => + val baseDelay = 1.second + val maxRetries = 5 + val maxDelay = 2.seconds + val policy = Retry + .exponentialBackoff[IO, Throwable](baseDelay) + .withCappedDelay(maxDelay) + .withMaxRetries(maxRetries) + + val expected = calculateExpectedBackoff( + baseDelay = baseDelay, + maxRetries = maxRetries, + multiplier = 2.0, + factor = 0.5, + maxDelay = Some(maxDelay) + ) + + assertCompleteAs(run(policy)(errorIO), expected) + } + + // it's not random :) + private val RandomNextDouble = 1.0 + private implicit val random: Random[IO] = + new Random.ScalaRandom[IO](IO.pure(scala.util.Random)) { + override def nextDouble: IO[Double] = IO.pure(RandomNextDouble) + } + + private def calculateExpectedBackoff( + baseDelay: FiniteDuration, + maxRetries: Int, + multiplier: Double, + factor: Double, + maxDelay: Option[FiniteDuration] = None + ): List[RetryAttempt] = { + + // per step delay = rand_factor * random_double [0:1] * (base_delay * multiplier ^ retry) + def stepDelay(retry: Int) = + factor * RandomNextDouble * baseDelay * math.pow(multiplier, retry.toDouble) match { + case f: FiniteDuration => + maxDelay.fold(f)(f.min) + case _ => + sys.error("result is not finite") + } + + @annotation.tailrec + def loop(retry: Int, output: List[RetryAttempt]): List[RetryAttempt] = { + val cumulative = output + .collect { case RetryAttempt(_, r: Decision.Retry, _) => r.delay } + .reduceOption(_ + _) + .getOrElse(Duration.Zero) + + val status = Status(retry, cumulative) + + if (retry < maxRetries) { + val next = RetryAttempt(status, Decision.retry(stepDelay(retry))) + loop(retry + 1, output :+ next) + } else { + output :+ RetryAttempt(status, Decision.giveUp) + } + } + + loop(0, Nil) + } + + private def run[A](policy: Retry[IO, Throwable])(io: IO[A]): IO[List[RetryAttempt]] = + for { + ref <- IO.ref(List.empty[RetryAttempt]) + time <- io + .retry(policy, (s, e: Throwable, d) => ref.update(_ :+ RetryAttempt(s, d, e))) + .attempt + .timed + ._1F + attempts <- ref.get + } yield { + if (time > Duration.Zero && attempts.nonEmpty) { // ensure sleep time == cumulative delay + assert(attempts.last.status.cumulativeDelay == time) + } + + attempts + } + + private case class RetryAttempt( + status: Status, + decision: Decision, + error: Throwable = error + ) + + private implicit val retryAttemptHash: Hash[RetryAttempt] = Hash.fromUniversalHashCode + private implicit val retryAttemptShow: Show[RetryAttempt] = Show.fromToString + private implicit val decisionHash: Hash[Decision] = Hash.fromUniversalHashCode + private implicit val decisionShow: Show[Decision] = Show.fromToString +}