From f759ea3b080c2bd3dbb1a445c96be777dbcb86d4 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Fri, 26 Jul 2024 18:15:27 +0300 Subject: [PATCH 1/7] Implement `Retry` functionality --- .../src/main/scala/cats/effect/IO.scala | 40 ++ .../scala/cats/effect/syntax/package.scala | 1 + .../main/scala/cats/effect/std/Retry.scala | 643 ++++++++++++++++++ .../cats/effect/std/syntax/AllSyntax.scala | 2 +- .../cats/effect/std/syntax/RetrySyntax.scala | 38 ++ .../cats/effect/std/syntax/package.scala | 2 + .../scala/cats/effect/std/RetrySpec.scala | 369 ++++++++++ 7 files changed, 1094 insertions(+), 1 deletion(-) create mode 100644 std/shared/src/main/scala/cats/effect/std/Retry.scala create mode 100644 std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala create mode 100644 tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 4f047d9e9d..0e0c432858 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -44,6 +44,7 @@ import cats.effect.std.{ Backpressure, Console, Env, + Retry, SecureRandom, Supervisor, SystemProperties, @@ -646,6 +647,45 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] { (FiberIO[A @uncheckedVariance], OutcomeIO[B])]] = IO.racePair(this, that) + /** + * Evaluates the current IO with the given retry `policy`. + * + * @example + * {{{ + * val policy = Retry.exponentialBackoff[IO, Throwable](1.second).withMaxRetries(10) + * io.retry(policy) + * }}} + * + * @param policy + * the policy to use + */ + def retry(policy: Retry[IO, Throwable]): IO[A] = + Retry.retry(policy)(this) + + /** + * Evaluates the current IO with the given retry `policy`. + * + * @example + * {{{ + * val policy = Retry.exponentialBackoff[IO, Throwable](1.second).withMaxRetries(10) + * io.retry( + * policy, + * (status, err, decision) => IO.println(s"Attempt $${status.retriesTotal}, error: $${err.getMessage}, next: $$decision") + * ) + * }}} + * + * @param policy + * the policy to use + * + * @param onRetry + * the effect to invoke on every retry decision + */ + def retry( + policy: Retry[IO, Throwable], + onRetry: (Retry.Status, Throwable, Retry.Decision) => IO[Unit] + ): IO[A] = + Retry.retry(policy, onRetry)(this) + /** * Inverse of `attempt` * diff --git a/core/shared/src/main/scala/cats/effect/syntax/package.scala b/core/shared/src/main/scala/cats/effect/syntax/package.scala index de4d679df5..3beaaf56f4 100644 --- a/core/shared/src/main/scala/cats/effect/syntax/package.scala +++ b/core/shared/src/main/scala/cats/effect/syntax/package.scala @@ -29,5 +29,6 @@ package object syntax { object clock extends kernel.syntax.ClockSyntax object supervisor extends std.syntax.SupervisorSyntax + object retry extends std.syntax.RetrySyntax object dispatcher extends DispatcherSyntax } diff --git a/std/shared/src/main/scala/cats/effect/std/Retry.scala b/std/shared/src/main/scala/cats/effect/std/Retry.scala new file mode 100644 index 0000000000..58c2b16b2e --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/Retry.scala @@ -0,0 +1,643 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std + +import cats.{~>, Monad, Semigroup, Show} +import cats.effect.kernel.GenTemporal +import cats.syntax.apply._ +import cats.syntax.flatMap._ +import cats.syntax.foldable._ +import cats.syntax.functor._ +import cats.syntax.monadError._ + +import scala.concurrent.duration._ + +/** + * Glossary: + * - individual delay - the delay between retries + * - cumulative delay - the total delay accumulated across all retries + */ +sealed trait Retry[F[_], E] { + import Retry.{Decision, Status} + + /** + * The name of the policy. The name is used for informative purposes. + */ + def name: String + + /** + * Decides whether the action should be retried or not. + * + * @param status + * the current status + * + * @param error + * the current error + */ + def decide(status: Status, error: E): F[Decision] + + /** + * Applies the `transform` function to the result of the [[decide]] invocation. + * + * @param transform + * the transformation to apply + */ + def withTransformedDecision(transform: (Status, E, Decision) => Decision): Retry[F, E] + + /** + * The policy will retry exclusively when a caught error satisfies the `matcher`. + * + * @example + * the policy will retry on `TimeoutException` and give up : + * {{{ + * val timeoutExceptionOnly = Retry + * .exponential[IO, Throwable](1.second) + * .withErrorMatcher { case e: TimeoutException => IO.pure(true) } + * + * // will retry using exponential backoff strategy + * Retry.retry(timeoutExceptionOnly)(IO.raiseError(new TimeoutException("oops"))) + * + * // will never retry and give up immediately + * Retry.retry(timeoutExceptionOnly)(IO.raiseError(new RuntimeException("oops"))) + * }}} + * + * @param matcher + * the matcher to use + */ + def withErrorMatcher(matcher: PartialFunction[E, F[Boolean]]): Retry[F, E] + + /** + * Sets the name for the policy. The name is used for informative purposes. + * + * @param name + * the name to use + */ + def withName(name: String): Retry[F, E] + + /** + * Combines `this` policy with the `other` policy. + * + * The rules: + * - Retry will happen when '''both''' policies decides to retry + * - If both policies decide to retry, the '''maximum''' of the two delays will be chosen + * + * @example + * + * {{{ + * val delay1 = Retry.constantDelay[IO, Throwable](1.second) + * val delay2 = Retry.constantDelay[IO, Throwable](2.second) + * + * // will always retry after 2 second + * val policy = alwaysGiveUp.and(alwaysRetry) + * }}} + * + * @example + * + * {{{ + * val alwaysGiveUp = Retry.alwaysGiveUp[IO, Throwable] + * val alwaysRetry = Retry.constantDelay[IO, Throwable](1.second) + * + * // will never retry + * val policy = alwaysGiveUp.and(alwaysRetry) + * }}} + * + * @param other + * the `other` policy to combine `this` policy with + */ + def and(other: Retry[F, E]): Retry[F, E] + + /** + * Combines `this` policy with the `other` policy. + * + * The rules: + * - Retry will happen when either policy decides to retry + * - If both policies decide to retry, the '''minimum''' of the two delays will be chosen + * + * @example + * + * {{{ + * val alwaysGiveUp = Retry.alwaysGiveUp[IO, Throwable] + * val alwaysRetry = Retry.constantDelay[IO, Throwable](1.second) + * + * // will always retry after 1 second + * val policy = alwaysGiveUp.or(alwaysRetry) + * }}} + * + * @param other + * the `other` policy to combine `this` policy with + */ + def or(other: Retry[F, E]): Retry[F, E] + + /** + * Combines `this` policy with the `other` policy. + * + * The rules: + * - Retry will happen when '''both''' policies decides to retry + * - If both policies decide to retry, the '''maximum''' of the two delays will be chosen + * + * @example + * + * {{{ + * val delay1 = Retry.constantDelay[IO, Throwable](1.second) + * val delay2 = Retry.constantDelay[IO, Throwable](2.second) + * + * // will always retry after 2 second + * val policy = alwaysGiveUp && alwaysRetry + * }}} + * + * @example + * the policy will never retry: + * {{{ + * val alwaysGiveUp = Retry.alwaysGiveUp[IO, Throwable] + * val alwaysRetry = Retry.constantDelay[IO, Throwable](1.second) + * + * val policy = alwaysGiveUp && alwaysRetry + * }}} + * + * @see + * [[and]] + * + * @param other + * the `other` policy to combine `this` policy with + */ + final def &&(other: Retry[F, E]): Retry[F, E] = and(other) + + /** + * Combines `this` policy with the `other` policy. + * + * The rules: + * - Retry will happen when either policy decides to retry + * - If both policies decide to retry, the '''minimum''' of the two delays will be chosen + * + * @example + * the policy will always retry after 1 second + * {{{ + * val alwaysGiveUp = Retry.alwaysGiveUp[IO, Throwable] + * val alwaysRetry = Retry.constantDelay[IO, Throwable](1.second) + * + * val policy = alwaysGiveUp || alwaysRetry + * }}} + * + * @see + * [[or]] + * + * @param other + * the `other` policy to combine `this` policy with + */ + final def ||(other: Retry[F, E]): Retry[F, E] = or(other) + + /** + * Caps the '''individual''' delay between attempts by the given value. + * + * @example + * the policy will retry '''indefinitely''' with 2 seconds delay between attempts: + * {{{ + * val policy = Retry + * .constantDelay[IO, Throwable](3.second) + * .withCappedDelay(2.seconds) + * }}} + * + * @param cap + * the cap of the '''individual''' delay + */ + final def withCappedDelay(cap: FiniteDuration): Retry[F, E] = + withTransformedDecision { + case (_, _, decision: Decision.Retry) => + Decision.retry(decision.delay.min(cap)) + + case (_, _, _: Decision.GiveUp) => + Decision.giveUp + }.withName(s"CapDelay($name, cap=$cap)") + + /** + * Limits the number of maximum retries. + * + * @example + * the policy will retry 5 times at most: + * {{{ + * val policy = Retry + * .constantDelay[IO, Throwable](1.second) + * .withMaxRetries(5) + * }}} + * + * @param max + * the maximum number of retries + */ + final def withMaxRetries(max: Int): Retry[F, E] = + withTransformedDecision { + case (status, _, decision: Decision.Retry) => + if (status.retriesTotal >= max) Decision.giveUp else decision + + case (_, _, _: Decision.GiveUp) => + Decision.giveUp + }.withName(s"MaxRetries($name, max=$max)") + + /** + * Gives up once the '''individual''' delay is greater than the given `threshold`. + * + * @example + * the policy will give up once the individual delay between attempts will be more than 5 + * seconds: + * {{{ + * val policy = Retry + * .exponential[IO, Throwable](1.second) + * .withMaxDelay(5.seconds) + * }}} + * + * @example + * the policy will retry '''indefinitely''' because individual delay is never greater than + * the threshold: + * {{{ + * val policy = Retry + * .constantDelay(1.second) + * .withMaxDelay(2.seconds) + * }}} + * + * @param threshold + * the maximum '''individual''' delay + */ + final def withMaxDelay(threshold: FiniteDuration): Retry[F, E] = + withTransformedDecision { + case (_, _, decision: Decision.Retry) => + if (decision.delay >= threshold) Decision.giveUp else decision + + case (_, _, _: Decision.GiveUp) => + Decision.giveUp + }.withName(s"MaxDelay($name, threshold=$threshold)") + + /** + * Gives up once the '''cumulative''' delay is greater than the given `threshold`. + * + * @example + * the policy will give up once the cumulative delay between attempts will be more than 5 + * seconds: + * {{{ + * val policy = Retry + * .exponential[IO, Throwable](1.second) + * .withCumulativeDelay(5.seconds) + * }}} + * + * @param threshold + * the maximum '''cumulative''' delay + */ + final def withMaxCumulativeDelay(threshold: FiniteDuration): Retry[F, E] = + withTransformedDecision { + case (status, _, decision: Decision.Retry) => + if (status.cumulativeDelay + decision.delay >= threshold) Decision.giveUp + else decision + + case (_, _, _: Decision.GiveUp) => + Decision.giveUp + }.withName(s"MaxCumulativeDelay($name, threshold=$threshold)") + + final def mapK[G[_]: Monad](f: F ~> G): Retry[G, E] = + Retry.named(s"MapK($name)")((s, e) => f(decide(s, e))) + + override def toString: String = + name +} + +object Retry { + + /* Represents the status of a retry operation. */ + sealed trait Status { + /* The total number of retries that have occurred. */ + def retriesTotal: Int + + /* The total delay accumulated across all retries. */ + def cumulativeDelay: FiniteDuration + + def withRetry(delay: FiniteDuration): Status + } + + object Status { + private[std] val initial: Status = Impl(0, Duration.Zero) + + private[std] def apply( + retriesTotal: Int, + cumulativeDelay: FiniteDuration + ): Status = + Impl(retriesTotal, cumulativeDelay) + + private final case class Impl( + retriesTotal: Int, + cumulativeDelay: FiniteDuration + ) extends Status { + def withRetry(delay: FiniteDuration): Status = + copy( + retriesTotal = retriesTotal + 1, + cumulativeDelay = cumulativeDelay + delay + ) + } + } + + /* Represents the retry decision. */ + sealed trait Decision extends Product with Serializable + object Decision { + + /* No more retries. */ + sealed trait GiveUp extends Decision + + /* The operation must be retried after the delay. */ + sealed trait Retry extends Decision { + /* How long to wait before the next attempt. */ + def delay: FiniteDuration + } + + def giveUp: Decision = GiveUp + + def retry(delay: FiniteDuration): Decision = RetryImpl(delay) + + private case object GiveUp extends GiveUp + private final case class RetryImpl(delay: FiniteDuration) extends Retry + } + + sealed trait BackoffMultiplier extends Product with Serializable + object BackoffMultiplier { + + def const(multiplier: Double): BackoffMultiplier = + Const(multiplier) + + def randomized(min: Double, max: Double): BackoffMultiplier = + Randomized(min, max) + + private[Retry] final case class Const(multiplier: Double) extends BackoffMultiplier + private[Retry] final case class Randomized( + min: Double, + max: Double + ) extends BackoffMultiplier + } + + /** + * Evaluates `fa` with the given retry `policy`. + * + * @param policy + * the policy to use + * + * @param fa + * the effect + */ + def retry[F[_], A, E](policy: Retry[F, E])(fa: F[A])(implicit F: GenTemporal[F, E]): F[A] = + doRetry(policy, None)(fa) + + /** + * Evaluates `fa` with the given retry `policy`. + * + * @param policy + * the policy to use + * + * @param onRetry + * the effect to invoke on every retry decision + * + * @param fa + * the effect + */ + def retry[F[_], A, E]( + policy: Retry[F, E], + onRetry: (Status, E, Decision) => F[Unit] + )(fa: F[A])(implicit F: GenTemporal[F, E]): F[A] = + doRetry(policy, Some(onRetry))(fa) + + /** + * The return policy that always gives up. + */ + def alwaysGiveUp[F[_]: Monad, E]: Retry[F, E] = + const("AlwaysGiveUp", Decision.giveUp) + + /** + * The policy that uses the given `delay` between attempts. + * + * @param delay + * the delay to use between retries + */ + def constantDelay[F[_]: Monad, E](delay: FiniteDuration): Retry[F, E] = + const(s"ConstantDelay($delay)", Decision.retry(delay)) + + /** + * The policy with a fixed number of retries. + * + * @example + * {{{ + * val policy = Retry.constantDelay[IO, Throwable](1.second) && Retry.maxRetries[IO, Throwable](10) + * }}} + * + * @param max + * the maximum number of retries + */ + def maxRetries[F[_]: Monad, E](max: Int): Retry[F, E] = + named(s"MaxRetries($max)") { (status, _) => + Monad[F].pure( + if (status.retriesTotal >= max) Decision.giveUp else Decision.retry(Duration.Zero) + ) + } + + /** + * The policy that uses exponential backoff strategy with the given `baseDelay`. The backoff + * multiplier is `2.0` and randomization factor `0.5`. The delays are always jittered. + * + * The simplified formula is: + * {{{ + * retry_delay = randomization_factor * random_double[0:1] * base_delay * (backoff_multiplier ^ retry_attempt) + * }}} + * + * @example + * an exponential backoff with maximum of 10 attempts: + * {{{ + * val policy = Retry + * .exponentialBackoff[IO, Throwable](1.second) + * .withMaxRetries(10) + * }}} + * + * @see + * [[https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/]] + * + * @param baseDelay + * the base delay to use + */ + def exponentialBackoff[F[_]: Monad: Random, E](baseDelay: FiniteDuration): Retry[F, E] = + exponentialBackoff(baseDelay, Some(BackoffMultiplier.const(2.0)), Some(0.5)) + + /** + * The policy that uses exponential backoff strategy with the given configuration. The delays + * are always jittered. + * + * The simplified formula is: + * {{{ + * retry_delay = randomization_factor * random_double[0:1] * base_delay * (backoff_multiplier ^ retry_attempt) + * }}} + * + * @example + * an exponential backoff with maximum of 10 attempts: + * {{{ + * val policy = Retry + * .exponentialBackoff[IO, Throwable](1.second, Some(Retry.BackoffMultiplier.const(1.5)), Some(0.75)) + * .withMaxRetries(10) + * }}} + * + * @see + * [[https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/]] + * + * @param baseDelay + * the base delay to use + * + * @param backoffMultiplier + * the backoff multiplier to use. A random double between `0.0` and `1.0` will be used if + * undefined + * + * @param randomizationFactor + * the randomization factor to use. The `1.0` will be used if undefined + */ + def exponentialBackoff[F[_]: Monad: Random, E]( + baseDelay: FiniteDuration, + backoffMultiplier: Option[BackoffMultiplier], + randomizationFactor: Option[Double] + ): Retry[F, E] = { + val name = (backoffMultiplier, randomizationFactor) match { + case (Some(multiplier), Some(factor)) => + s"Backoff(baseDelay=$baseDelay, multiplier=$multiplier, randomizationFactor=$factor)" + + case (Some(multiplier), None) => + s"Backoff(baseDelay=$baseDelay, multiplier=$multiplier)" + + case (None, Some(factor)) => + s"Backoff(baseDelay=$baseDelay, randomizationFactor=$factor)" + + case (None, None) => + s"Backoff(baseDelay=$baseDelay)" + } + + named[F, E](name) { (status, _) => + val multiplier = backoffMultiplier match { + case Some(BackoffMultiplier.Const(multiplier)) => Monad[F].pure(multiplier) + case Some(BackoffMultiplier.Randomized(min, max)) => Random[F].betweenDouble(min, max) + case None => Random[F].nextDouble + } + + (multiplier, Random[F].nextDouble).mapN { (multiplier, random) => + val factor = randomizationFactor.getOrElse(1.0) + val e = math.pow(multiplier, status.retriesTotal.toDouble).toLong + val backoff = safeMultiply(baseDelay, e) + val delay = factor * random * backoff.toNanos + + Decision.retry(delay.toLong.nanos) + } + } + } + + /** + * Creates a policy that uses the given decider function. + * + * @param decider + * the function to decide whether to continue + */ + def apply[F[_]: Monad, E](decider: (Status, E) => F[Decision]): Retry[F, E] = + named("f()")(decider) + + /** + * Creates a policy with the given name and decider function. + * + * @param name + * the name of the policy. The name is used for informative purposes + * + * @param decider + * the function to decide whether to continue + */ + def named[F[_]: Monad, E](name: String)(decider: (Status, E) => F[Decision]): Retry[F, E] = + RetryImpl(name, decider) + + implicit def retrySemigroup[F[_], E]: Semigroup[Retry[F, E]] = + (x, y) => x && y + + implicit def retryShow[F[_], E]: Show[Retry[F, E]] = + retry => retry.name + + private def doRetry[F[_], A, E]( + retry: Retry[F, E], + onRetry: Option[(Status, E, Decision) => F[Unit]] + )(fa: F[A])(implicit F: GenTemporal[F, E]): F[A] = { + def onError(status: Status, error: E): F[Either[Status, A]] = + retry + .decide(status, error) + .flatTap(decision => onRetry.traverse_(_(status, error, decision))) + .flatMap { + case retry: Decision.Retry => + F.delayBy(F.pure(Left(status.withRetry(retry.delay))), retry.delay) + case _: Decision.GiveUp => + F.raiseError(error) + } + + F.tailRecM(Status.initial) { status => + fa.redeemWith(error => onError(status, error), success => F.pure(Right(success))) + } + } + + private def const[F[_]: Monad, E](name: String, decision: Decision): Retry[F, E] = { + val d = Monad[F].pure(decision) + named(name)((_, _) => d) + } + + private final case class RetryImpl[F[_]: Monad, E]( + name: String, + decider: (Status, E) => F[Decision] + ) extends Retry[F, E] { + + def decide(status: Status, error: E): F[Decision] = + decider(status, error) + + def and(other: Retry[F, E]): Retry[F, E] = + Retry.named(s"($name && ${other.name})") { (status, error) => + (decide(status, error), other.decide(status, error)).mapN { + case (a: Decision.Retry, b: Decision.Retry) => Decision.retry(a.delay.max(b.delay)) + case (_, _) => Decision.giveUp + } + } + + def or(other: Retry[F, E]): Retry[F, E] = + Retry.named(s"($name || ${other.name})") { (status, error) => + (decide(status, error), other.decide(status, error)).mapN { + case (a: Decision.Retry, b: Decision.Retry) => Decision.retry(a.delay.min(b.delay)) + case (_: Decision.GiveUp, other: Decision.Retry) => other + case (that: Decision.Retry, _: Decision.GiveUp) => that + case (_: Decision.GiveUp, _: Decision.GiveUp) => Decision.giveUp + } + } + + def withErrorMatcher(matcher: PartialFunction[E, F[Boolean]]): Retry[F, E] = + copy(decider = { (status, error) => + matcher + .applyOrElse(error, (_: E) => Monad[F].pure(false)) + .ifM(decider(status, error), Monad[F].pure(Decision.giveUp)) + }) + + def withName(name: String): Retry[F, E] = + copy(name = name) + + def withTransformedDecision(transform: (Status, E, Decision) => Decision): Retry[F, E] = + copy(decider = { (status, error) => + decide(status, error).map(decision => transform(status, error, decision)) + }) + } + + private val LongMax: BigInt = BigInt(Long.MaxValue) + private def safeMultiply(duration: FiniteDuration, multiplier: Long): FiniteDuration = { + val durationNanos = BigInt(duration.toNanos) + val resultNanos = durationNanos * BigInt(multiplier) + val safeResultNanos = resultNanos.min(LongMax) + safeResultNanos.toLong.nanos + } + +} diff --git a/std/shared/src/main/scala/cats/effect/std/syntax/AllSyntax.scala b/std/shared/src/main/scala/cats/effect/std/syntax/AllSyntax.scala index 367d4f5bee..0de080029e 100644 --- a/std/shared/src/main/scala/cats/effect/std/syntax/AllSyntax.scala +++ b/std/shared/src/main/scala/cats/effect/std/syntax/AllSyntax.scala @@ -17,4 +17,4 @@ package cats.effect package std.syntax -trait AllSyntax extends BackpressureSyntax with SupervisorSyntax +trait AllSyntax extends BackpressureSyntax with SupervisorSyntax with RetrySyntax diff --git a/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala b/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala new file mode 100644 index 0000000000..2c3ee4bf2f --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std.syntax + +import cats.effect.kernel.GenTemporal +import cats.effect.std.Retry + +trait RetrySyntax { + implicit def retryOps[F[_], A](wrapped: F[A]): RetryOps[F, A] = + new RetryOps(wrapped) +} + +final class RetryOps[F[_], A] private[syntax] (private val fa: F[A]) extends AnyVal { + + def retry[E](policy: Retry[F, E])(implicit F: GenTemporal[F, E]): F[A] = + Retry.retry(policy)(fa) + + def retry[E]( + policy: Retry[F, E], + onRetry: (Retry.Status, E, Retry.Decision) => F[Unit] + )(implicit F: GenTemporal[F, E]): F[A] = + Retry.retry(policy, onRetry)(fa) + +} diff --git a/std/shared/src/main/scala/cats/effect/std/syntax/package.scala b/std/shared/src/main/scala/cats/effect/std/syntax/package.scala index 62168d7672..9fa7618c08 100644 --- a/std/shared/src/main/scala/cats/effect/std/syntax/package.scala +++ b/std/shared/src/main/scala/cats/effect/std/syntax/package.scala @@ -22,4 +22,6 @@ package object syntax { object supervisor extends SupervisorSyntax + object retry extends RetrySyntax + } diff --git a/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala b/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala new file mode 100644 index 0000000000..6f4e3dd162 --- /dev/null +++ b/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala @@ -0,0 +1,369 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std + +import cats.{Hash, Show} +import cats.effect.{BaseSpec, IO, Ref} +import cats.syntax.applicative._ +import cats.syntax.semigroup._ + +import scala.concurrent.duration._ + +class RetrySpec extends BaseSpec { + import Retry.{Decision, Status} + + private class Error1 extends RuntimeException + + private val error: Throwable = new RuntimeException("oops") + private val errorIO: IO[Unit] = IO.raiseError(error) + + "Retry" should { + + "retry until the action succeeds" in ticked { implicit ticker => + val delay = 1.second + val policy = Retry.constantDelay[IO, Throwable](delay) + + def action(attempts: Ref[IO, Int]) = + attempts.getAndUpdate(_ + 1).flatMap { total => + IO.raiseError(new RuntimeException("oops")).whenA(total < 3) + } + + val io = + for { + counter <- IO.ref(0) + result <- action(counter).retry(policy).timed + counterValue <- counter.get + } yield (result._1, counterValue) + + val expectedRetries = 3 + + io must completeAs((delay * expectedRetries.toLong, expectedRetries + 1)) + } + + "retry until the policy chooses to give up" in ticked { implicit ticker => + val maxRetries = 2 + val policy = Retry.maxRetries[IO, Throwable](maxRetries) + val io = IO.raiseError(new RuntimeException("oops")) + + run(policy)(io) must completeAs((Duration.Zero, maxRetries + 1)) + } + + "withErrorMatcher - retry only on matched errors" in ticked { implicit ticker => + val maxRetries = 5 + val delay = 1.second + + val policy = + Retry.constantDelay[IO, Throwable](delay).withMaxRetries(maxRetries).withErrorMatcher { + case _: Error1 => IO.pure(true) + } + + val io = IO.raiseError(new Error1) + + run(policy)(io) must completeAs((maxRetries * delay, maxRetries + 1)) + } + + "withErrorMatcher - give up on mismatched errors" in ticked { implicit ticker => + val maxRetries = 5 + val delay = 1.second + + val policy = + Retry.constantDelay[IO, Throwable](delay).withMaxRetries(maxRetries).withErrorMatcher { + case _: Error1 => IO.pure(true) + } + + val io = IO.raiseError(new RuntimeException("oops")) + + run(policy)(io) must completeAs((Duration.Zero, 1)) + } + + "withCappedDelay - cap the individual delay" in ticked { implicit ticker => + val maxRetries = 5 + val delay = 2.second + val capDelay = 1.second + + val policy = + Retry + .constantDelay[IO, Throwable](delay) + .withCappedDelay(capDelay) + .withMaxRetries(maxRetries) + + val expected: List[(Status, Decision)] = { + val retries = List.tabulate(maxRetries) { i => + (Status(i, capDelay * i.toLong.toLong), Decision.retry(capDelay)) + } + + retries :+ (Status(maxRetries, maxRetries * capDelay) -> Decision.giveUp) + } + + runWithAttempts(policy)(errorIO) must completeAs((maxRetries * capDelay, expected)) + } + + "withMaxRetries - give up once max retries are reached" in ticked { implicit ticker => + val maxRetries = 5 + val delay = 2.second + + val policy = + Retry.constantDelay[IO, Throwable](delay).withMaxRetries(maxRetries) + + val expected: List[(Status, Decision)] = { + val retries = List.tabulate(maxRetries) { i => + (Status(i, delay * i.toLong), Decision.retry(delay)) + } + + retries :+ (Status(maxRetries, maxRetries * delay) -> Decision.giveUp) + } + + runWithAttempts(policy)(errorIO) must completeAs((maxRetries * delay, expected)) + } + + "withMaxDelay - give up once max individual delay is reached" in ticked { implicit ticker => + val delay = 1.second + val maxDelay = 5.second + + val policy = + Retry[IO, Throwable] { (status, _) => + IO.pure(Decision.retry(status.retriesTotal * delay)) + }.withMaxDelay(maxDelay) + + val expected: List[(Status, Decision)] = { + val numRetries = (maxDelay.toSeconds / delay.toSeconds).toInt + + val retries = List.tabulate(numRetries) { i => + val cumulativeDelay = + Range(0, i).map(_ * delay).reduceOption(_ + _).getOrElse(Duration.Zero) + + (Status(i, cumulativeDelay), Decision.retry(i * delay)) + } + + val cumulativeDelay = retries.map(_._1.cumulativeDelay).reduce(_ + _) + retries :+ (Status(numRetries, cumulativeDelay) -> Decision.giveUp) + } + val totalDelay = expected.map(_._1.cumulativeDelay).last + + runWithAttempts(policy)(errorIO) must completeAs((totalDelay, expected)) + } + + "withMaxCumulativeDelay - cap the max cumulative (total) delay" in ticked { implicit t => + val delay = 1.second + val maxDelay = 5.second + val maxRetries = (maxDelay.toSeconds / delay.toSeconds).toInt - 1 + + val policy = + Retry.constantDelay[IO, Throwable](delay).withMaxCumulativeDelay(maxDelay) + + val expected: List[(Status, Decision)] = { + val retries = List.tabulate(maxRetries) { i => + (Status(i, delay * i.toLong), Decision.retry(delay)) + } + + retries :+ (Status(maxRetries, delay * maxRetries.toLong) -> Decision.giveUp) + } + + runWithAttempts(policy)(errorIO) must completeAs((maxRetries * delay, expected)) + } + + "&& (and) - give up when one policy is decided to give up" in ticked { implicit ticker => + val alwaysGiveUp = Retry.alwaysGiveUp[IO, Throwable] + val alwaysRetry = Retry.constantDelay[IO, Throwable](1.second) + + val expected = Decision.giveUp + + (alwaysGiveUp |+| alwaysRetry).decide(Status.initial, error) must completeAs(expected) + } + + "&& (and) - choose the maximum delay if both policies decided to retry" in ticked { + implicit ticker => + val delay1 = Retry.constantDelay[IO, Throwable](1.second) + val delay2 = Retry.constantDelay[IO, Throwable](2.seconds) + + val expected = Decision.retry(2.seconds) + + (delay1 |+| delay2).decide(Status.initial, error) must completeAs(expected) + } + + "|| (or) - retry when one policy decided to retry" in ticked { implicit ticker => + val alwaysGiveUp = Retry.alwaysGiveUp[IO, Throwable] + val alwaysRetry = Retry.constantDelay[IO, Throwable](1.second) + + val expected = Decision.retry(1.second) + + (alwaysGiveUp || alwaysRetry).decide(Status.initial, error) must completeAs(expected) + } + + "|| (or) - choose the minimum delay if both policies decided to retry" in ticked { + implicit ticker => + val delay1 = Retry.constantDelay[IO, Throwable](1.second) + val delay2 = Retry.constantDelay[IO, Throwable](2.seconds) + + val expected = Decision.retry(1.second) + + (delay1 || delay2).decide(Status.initial, error) must completeAs(expected) + } + + } + + "Retry.exponentialBackoff" should { + // it's not random :) + val RandomNextDouble = 1.0 + implicit val random: Random[IO] = + new Random.ScalaRandom[IO](IO.pure(scala.util.Random)) { + override def nextDouble: IO[Double] = IO.pure(RandomNextDouble) + } + + def calculateExpected( + baseDelay: FiniteDuration, + maxRetries: Int, + multiplier: Double, + factor: Double, + maxDelay: Option[FiniteDuration] = None + ): List[(Status, Decision)] = { + + // per step delay = rand_factor * random_double [0:1] * (base_delay * multiplier ^ retry) + def stepDelay(retry: Int) = + factor * RandomNextDouble * baseDelay * math.pow(multiplier, retry.toDouble) match { + case f: FiniteDuration => + maxDelay.fold(f)(f.min) + case _ => + sys.error("result is not finite") + } + + @annotation.tailrec + def loop(retry: Int, output: List[(Status, Decision)]): List[(Status, Decision)] = { + val cumulative = output + .collect { case (_, r: Decision.Retry) => r.delay } + .reduceOption(_ + _) + .getOrElse(Duration.Zero) + + val status = Status(retry, cumulative) + + if (retry < maxRetries) { + val next = (status, Decision.retry(stepDelay(retry))) + loop(retry + 1, output :+ next) + } else { + output :+ (status -> Decision.giveUp) + } + } + + loop(0, Nil) + } + + "use default multiplier (2.0) and randomization factor (0.5)" in ticked { implicit ticker => + val baseDelay = 1.second + val maxRetries = 3 + val policy = Retry.exponentialBackoff[IO, Throwable](baseDelay).withMaxRetries(maxRetries) + + val expected: List[(Status, Decision)] = calculateExpected( + baseDelay = baseDelay, + maxRetries = maxRetries, + multiplier = 2.0, + factor = 0.5 + ) + val delayTotal = expected.map(_._1.cumulativeDelay).last + + runWithAttempts(policy)(errorIO) must completeAs((delayTotal, expected)) + } + + "use the given multiplier (1.0) and randomization factor (1.0)" in ticked { implicit t => + val baseDelay = 1.second + val maxRetries = 3 + val policy = Retry + .exponentialBackoff[IO, Throwable]( + baseDelay, + Some(Retry.BackoffMultiplier.const(1.0)), + Some(1.0) + ) + .withMaxRetries(maxRetries) + + val expected: List[(Status, Decision)] = calculateExpected( + baseDelay = baseDelay, + maxRetries = maxRetries, + multiplier = 1.0, + factor = 1.0 + ) + val delayTotal = expected.map(_._1.cumulativeDelay).last + + runWithAttempts(policy)(errorIO) must completeAs((delayTotal, expected)) + } + + "cap the delay at the specific max" in ticked { implicit ticker => + val baseDelay = 1.second + val maxRetries = 5 + val maxDelay = 2.seconds + val policy = Retry + .exponentialBackoff[IO, Throwable](baseDelay) + .withCappedDelay(maxDelay) + .withMaxRetries(maxRetries) + + val expected: List[(Status, Decision)] = calculateExpected( + baseDelay = baseDelay, + maxRetries = maxRetries, + multiplier = 2.0, + factor = 0.5, + maxDelay = Some(maxDelay) + ) + val delayTotal = expected.map(_._1.cumulativeDelay).last + + runWithAttempts(policy)(errorIO) must completeAs((delayTotal, expected)) + } + + "respect the max number of retries" in ticked { implicit ticker => + val baseDelay = 1.second + val maxRetries = 5 + val maxDelay = 2.seconds + val policy = Retry + .exponentialBackoff[IO, Throwable](baseDelay) + .withCappedDelay(maxDelay) + .withMaxRetries(maxRetries) + + val expected: List[(Status, Decision)] = calculateExpected( + baseDelay = baseDelay, + maxRetries = maxRetries, + multiplier = 2.0, + factor = 0.5, + maxDelay = Some(maxDelay) + ) + val delayTotal = expected.map(_._1.cumulativeDelay).last + + runWithAttempts(policy)(errorIO) must completeAs((delayTotal, expected)) + } + + } + + private def run[A](policy: Retry[IO, Throwable])(io: IO[A]): IO[(FiniteDuration, Int)] = + for { + counter <- IO.ref(0) + result <- io.retry(policy, (_, _: Throwable, _) => counter.update(_ + 1)).attempt.timed + attempts <- counter.get + } yield (result._1, attempts) + + private def runWithAttempts[A](policy: Retry[IO, Throwable])( + io: IO[A] + ): IO[(FiniteDuration, List[(Status, Decision)])] = + for { + ref <- IO.ref(List.empty[(Status, Decision)]) + result <- io + .retry(policy, (s, _: Throwable, d) => ref.update(_ :+ (s -> d))) + .attempt + .timed + attempts <- ref.get + } yield (result._1, attempts) + + private implicit val statusOrder: Hash[Status] = Hash.fromUniversalHashCode + private implicit val statusShow: Show[Status] = Show.fromToString + private implicit val decisionOrder: Hash[Decision] = Hash.fromUniversalHashCode + private implicit val decisionShow: Show[Decision] = Show.fromToString +} From 5dc69868886038864f6a303af1048b1e6bfdb8d8 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Sun, 28 Jul 2024 10:49:14 +0300 Subject: [PATCH 2/7] Retry: `withErrorMatcher` must keep the last provided matcher --- .../main/scala/cats/effect/std/Retry.scala | 15 +- .../scala/cats/effect/std/RetrySpec.scala | 176 ++++++++++++------ 2 files changed, 122 insertions(+), 69 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Retry.scala b/std/shared/src/main/scala/cats/effect/std/Retry.scala index 58c2b16b2e..149d7958de 100644 --- a/std/shared/src/main/scala/cats/effect/std/Retry.scala +++ b/std/shared/src/main/scala/cats/effect/std/Retry.scala @@ -557,7 +557,7 @@ object Retry { * the function to decide whether to continue */ def named[F[_]: Monad, E](name: String)(decider: (Status, E) => F[Decision]): Retry[F, E] = - RetryImpl(name, decider) + RetryImpl(name, decider, { _: E => Monad[F].pure(true) }) implicit def retrySemigroup[F[_], E]: Semigroup[Retry[F, E]] = (x, y) => x && y @@ -592,11 +592,14 @@ object Retry { private final case class RetryImpl[F[_]: Monad, E]( name: String, - decider: (Status, E) => F[Decision] + decider: (Status, E) => F[Decision], + errorMatcher: PartialFunction[E, F[Boolean]] ) extends Retry[F, E] { def decide(status: Status, error: E): F[Decision] = - decider(status, error) + errorMatcher + .applyOrElse(error, (_: E) => Monad[F].pure(false)) + .ifM(decider(status, error), Monad[F].pure(Decision.giveUp)) def and(other: Retry[F, E]): Retry[F, E] = Retry.named(s"($name && ${other.name})") { (status, error) => @@ -617,11 +620,7 @@ object Retry { } def withErrorMatcher(matcher: PartialFunction[E, F[Boolean]]): Retry[F, E] = - copy(decider = { (status, error) => - matcher - .applyOrElse(error, (_: E) => Monad[F].pure(false)) - .ifM(decider(status, error), Monad[F].pure(Decision.giveUp)) - }) + copy(errorMatcher = matcher) def withName(name: String): Retry[F, E] = copy(name = name) diff --git a/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala b/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala index 6f4e3dd162..88362acbcd 100644 --- a/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala @@ -19,6 +19,7 @@ package cats.effect.std import cats.{Hash, Show} import cats.effect.{BaseSpec, IO, Ref} import cats.syntax.applicative._ +import cats.syntax.functor._ import cats.syntax.semigroup._ import scala.concurrent.duration._ @@ -27,6 +28,7 @@ class RetrySpec extends BaseSpec { import Retry.{Decision, Status} private class Error1 extends RuntimeException + private class Error2 extends RuntimeException private val error: Throwable = new RuntimeException("oops") private val errorIO: IO[Unit] = IO.raiseError(error) @@ -38,9 +40,7 @@ class RetrySpec extends BaseSpec { val policy = Retry.constantDelay[IO, Throwable](delay) def action(attempts: Ref[IO, Int]) = - attempts.getAndUpdate(_ + 1).flatMap { total => - IO.raiseError(new RuntimeException("oops")).whenA(total < 3) - } + attempts.getAndUpdate(_ + 1).flatMap(total => errorIO.whenA(total < 3)) val io = for { @@ -57,23 +57,36 @@ class RetrySpec extends BaseSpec { "retry until the policy chooses to give up" in ticked { implicit ticker => val maxRetries = 2 val policy = Retry.maxRetries[IO, Throwable](maxRetries) - val io = IO.raiseError(new RuntimeException("oops")) - run(policy)(io) must completeAs((Duration.Zero, maxRetries + 1)) + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.retry(Duration.Zero)), + RetryAttempt(Status(1, Duration.Zero), Decision.retry(Duration.Zero)), + RetryAttempt(Status(2, Duration.Zero), Decision.giveUp) + ) + + run(policy)(errorIO) must completeAs(expected) } "withErrorMatcher - retry only on matched errors" in ticked { implicit ticker => val maxRetries = 5 val delay = 1.second + val error = new Error1 val policy = Retry.constantDelay[IO, Throwable](delay).withMaxRetries(maxRetries).withErrorMatcher { case _: Error1 => IO.pure(true) } - val io = IO.raiseError(new Error1) + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), + RetryAttempt(Status(1, 1.second), Decision.retry(delay), error), + RetryAttempt(Status(2, 2.seconds), Decision.retry(delay), error), + RetryAttempt(Status(3, 3.seconds), Decision.retry(delay), error), + RetryAttempt(Status(4, 4.seconds), Decision.retry(delay), error), + RetryAttempt(Status(5, 5.seconds), Decision.giveUp, error) + ) - run(policy)(io) must completeAs((maxRetries * delay, maxRetries + 1)) + run(policy)(IO.raiseError(error)) must completeAs(expected) } "withErrorMatcher - give up on mismatched errors" in ticked { implicit ticker => @@ -85,9 +98,51 @@ class RetrySpec extends BaseSpec { case _: Error1 => IO.pure(true) } - val io = IO.raiseError(new RuntimeException("oops")) + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.giveUp) + ) + + run(policy)(errorIO) must completeAs(expected) + } + + "withErrorMatcher - keep the last matcher - give up on mismatched" in ticked { implicit t => + val delay = 1.second + val maxRetries = 1 + + val policy = + Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher { case _: Error1 => IO.pure(true) } + .withErrorMatcher { case _: Error2 => IO.pure(true) } + + val error = new Error1 + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.giveUp, error) + ) + + run(policy)(IO.raiseError(error)) must completeAs(expected) + } + + "withErrorMatcher - keep the last matcher" in ticked { implicit ticker => + val delay = 1.second + val maxRetries = 2 + + val policy = + Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher { case _: Error1 => IO.pure(true) } + .withErrorMatcher { case _: Error2 => IO.pure(true) } + + val error = new Error2 + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), + RetryAttempt(Status(1, 1.second), Decision.retry(delay), error), + RetryAttempt(Status(2, 2.seconds), Decision.giveUp, error) + ) - run(policy)(io) must completeAs((Duration.Zero, 1)) + run(policy)(IO.raiseError(error)) must completeAs(expected) } "withCappedDelay - cap the individual delay" in ticked { implicit ticker => @@ -101,15 +156,15 @@ class RetrySpec extends BaseSpec { .withCappedDelay(capDelay) .withMaxRetries(maxRetries) - val expected: List[(Status, Decision)] = { + val expected = { val retries = List.tabulate(maxRetries) { i => - (Status(i, capDelay * i.toLong.toLong), Decision.retry(capDelay)) + RetryAttempt(Status(i, capDelay * i.toLong), Decision.retry(capDelay)) } - retries :+ (Status(maxRetries, maxRetries * capDelay) -> Decision.giveUp) + retries :+ RetryAttempt(Status(maxRetries, maxRetries * capDelay), Decision.giveUp) } - runWithAttempts(policy)(errorIO) must completeAs((maxRetries * capDelay, expected)) + run(policy)(errorIO) must completeAs(expected) } "withMaxRetries - give up once max retries are reached" in ticked { implicit ticker => @@ -119,15 +174,15 @@ class RetrySpec extends BaseSpec { val policy = Retry.constantDelay[IO, Throwable](delay).withMaxRetries(maxRetries) - val expected: List[(Status, Decision)] = { + val expected = { val retries = List.tabulate(maxRetries) { i => - (Status(i, delay * i.toLong), Decision.retry(delay)) + RetryAttempt(Status(i, delay * i.toLong), Decision.retry(delay)) } - retries :+ (Status(maxRetries, maxRetries * delay) -> Decision.giveUp) + retries :+ RetryAttempt(Status(maxRetries, maxRetries * delay), Decision.giveUp) } - runWithAttempts(policy)(errorIO) must completeAs((maxRetries * delay, expected)) + run(policy)(errorIO) must completeAs(expected) } "withMaxDelay - give up once max individual delay is reached" in ticked { implicit ticker => @@ -139,22 +194,21 @@ class RetrySpec extends BaseSpec { IO.pure(Decision.retry(status.retriesTotal * delay)) }.withMaxDelay(maxDelay) - val expected: List[(Status, Decision)] = { + val expected = { val numRetries = (maxDelay.toSeconds / delay.toSeconds).toInt val retries = List.tabulate(numRetries) { i => val cumulativeDelay = Range(0, i).map(_ * delay).reduceOption(_ + _).getOrElse(Duration.Zero) - (Status(i, cumulativeDelay), Decision.retry(i * delay)) + RetryAttempt(Status(i, cumulativeDelay), Decision.retry(i * delay)) } - val cumulativeDelay = retries.map(_._1.cumulativeDelay).reduce(_ + _) - retries :+ (Status(numRetries, cumulativeDelay) -> Decision.giveUp) + val cumulativeDelay = retries.map(_.status.cumulativeDelay).reduce(_ + _) + retries :+ RetryAttempt(Status(numRetries, cumulativeDelay), Decision.giveUp) } - val totalDelay = expected.map(_._1.cumulativeDelay).last - runWithAttempts(policy)(errorIO) must completeAs((totalDelay, expected)) + run(policy)(errorIO) must completeAs(expected) } "withMaxCumulativeDelay - cap the max cumulative (total) delay" in ticked { implicit t => @@ -165,15 +219,15 @@ class RetrySpec extends BaseSpec { val policy = Retry.constantDelay[IO, Throwable](delay).withMaxCumulativeDelay(maxDelay) - val expected: List[(Status, Decision)] = { + val expected = { val retries = List.tabulate(maxRetries) { i => - (Status(i, delay * i.toLong), Decision.retry(delay)) + RetryAttempt(Status(i, delay * i.toLong), Decision.retry(delay)) } - retries :+ (Status(maxRetries, delay * maxRetries.toLong) -> Decision.giveUp) + retries :+ RetryAttempt(Status(maxRetries, delay * maxRetries.toLong), Decision.giveUp) } - runWithAttempts(policy)(errorIO) must completeAs((maxRetries * delay, expected)) + run(policy)(errorIO) must completeAs(expected) } "&& (and) - give up when one policy is decided to give up" in ticked { implicit ticker => @@ -230,7 +284,7 @@ class RetrySpec extends BaseSpec { multiplier: Double, factor: Double, maxDelay: Option[FiniteDuration] = None - ): List[(Status, Decision)] = { + ): List[RetryAttempt] = { // per step delay = rand_factor * random_double [0:1] * (base_delay * multiplier ^ retry) def stepDelay(retry: Int) = @@ -242,19 +296,19 @@ class RetrySpec extends BaseSpec { } @annotation.tailrec - def loop(retry: Int, output: List[(Status, Decision)]): List[(Status, Decision)] = { + def loop(retry: Int, output: List[RetryAttempt]): List[RetryAttempt] = { val cumulative = output - .collect { case (_, r: Decision.Retry) => r.delay } + .collect { case RetryAttempt(_, r: Decision.Retry, _) => r.delay } .reduceOption(_ + _) .getOrElse(Duration.Zero) val status = Status(retry, cumulative) if (retry < maxRetries) { - val next = (status, Decision.retry(stepDelay(retry))) + val next = RetryAttempt(status, Decision.retry(stepDelay(retry))) loop(retry + 1, output :+ next) } else { - output :+ (status -> Decision.giveUp) + output :+ RetryAttempt(status, Decision.giveUp) } } @@ -266,15 +320,14 @@ class RetrySpec extends BaseSpec { val maxRetries = 3 val policy = Retry.exponentialBackoff[IO, Throwable](baseDelay).withMaxRetries(maxRetries) - val expected: List[(Status, Decision)] = calculateExpected( + val expected = calculateExpected( baseDelay = baseDelay, maxRetries = maxRetries, multiplier = 2.0, factor = 0.5 ) - val delayTotal = expected.map(_._1.cumulativeDelay).last - runWithAttempts(policy)(errorIO) must completeAs((delayTotal, expected)) + run(policy)(errorIO) must completeAs(expected) } "use the given multiplier (1.0) and randomization factor (1.0)" in ticked { implicit t => @@ -288,15 +341,14 @@ class RetrySpec extends BaseSpec { ) .withMaxRetries(maxRetries) - val expected: List[(Status, Decision)] = calculateExpected( + val expected = calculateExpected( baseDelay = baseDelay, maxRetries = maxRetries, multiplier = 1.0, factor = 1.0 ) - val delayTotal = expected.map(_._1.cumulativeDelay).last - runWithAttempts(policy)(errorIO) must completeAs((delayTotal, expected)) + run(policy)(errorIO) must completeAs(expected) } "cap the delay at the specific max" in ticked { implicit ticker => @@ -308,16 +360,15 @@ class RetrySpec extends BaseSpec { .withCappedDelay(maxDelay) .withMaxRetries(maxRetries) - val expected: List[(Status, Decision)] = calculateExpected( + val expected = calculateExpected( baseDelay = baseDelay, maxRetries = maxRetries, multiplier = 2.0, factor = 0.5, maxDelay = Some(maxDelay) ) - val delayTotal = expected.map(_._1.cumulativeDelay).last - runWithAttempts(policy)(errorIO) must completeAs((delayTotal, expected)) + run(policy)(errorIO) must completeAs(expected) } "respect the max number of retries" in ticked { implicit ticker => @@ -329,41 +380,44 @@ class RetrySpec extends BaseSpec { .withCappedDelay(maxDelay) .withMaxRetries(maxRetries) - val expected: List[(Status, Decision)] = calculateExpected( + val expected = calculateExpected( baseDelay = baseDelay, maxRetries = maxRetries, multiplier = 2.0, factor = 0.5, maxDelay = Some(maxDelay) ) - val delayTotal = expected.map(_._1.cumulativeDelay).last - runWithAttempts(policy)(errorIO) must completeAs((delayTotal, expected)) + run(policy)(errorIO) must completeAs(expected) } } - private def run[A](policy: Retry[IO, Throwable])(io: IO[A]): IO[(FiniteDuration, Int)] = - for { - counter <- IO.ref(0) - result <- io.retry(policy, (_, _: Throwable, _) => counter.update(_ + 1)).attempt.timed - attempts <- counter.get - } yield (result._1, attempts) - - private def runWithAttempts[A](policy: Retry[IO, Throwable])( - io: IO[A] - ): IO[(FiniteDuration, List[(Status, Decision)])] = + private def run[A](policy: Retry[IO, Throwable])(io: IO[A]): IO[List[RetryAttempt]] = for { - ref <- IO.ref(List.empty[(Status, Decision)]) - result <- io - .retry(policy, (s, _: Throwable, d) => ref.update(_ :+ (s -> d))) + ref <- IO.ref(List.empty[RetryAttempt]) + time <- io + .retry(policy, (s, e: Throwable, d) => ref.update(_ :+ RetryAttempt(s, d, e))) .attempt .timed + ._1F attempts <- ref.get - } yield (result._1, attempts) + } yield { + if (time > Duration.Zero && attempts.nonEmpty) { // ensure sleep time == cumulative delay + assert(attempts.last.status.cumulativeDelay == time) + } + + attempts + } + + private case class RetryAttempt( + status: Status, + decision: Decision, + error: Throwable = error + ) - private implicit val statusOrder: Hash[Status] = Hash.fromUniversalHashCode - private implicit val statusShow: Show[Status] = Show.fromToString - private implicit val decisionOrder: Hash[Decision] = Hash.fromUniversalHashCode + private implicit val retryAttemptHash: Hash[RetryAttempt] = Hash.fromUniversalHashCode + private implicit val retryAttemptShow: Show[RetryAttempt] = Show.fromToString + private implicit val decisionHash: Hash[Decision] = Hash.fromUniversalHashCode private implicit val decisionShow: Show[Decision] = Show.fromToString } From b36ff3ab4b7da479e339392755e410cef6c534a6 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Sun, 28 Jul 2024 11:29:16 +0300 Subject: [PATCH 3/7] Retry: introduce `ErrorMatcher` --- .../main/scala/cats/effect/std/Retry.scala | 104 ++++++++- .../scala/cats/effect/std/RetrySpec.scala | 216 +++++++++++------- 2 files changed, 232 insertions(+), 88 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Retry.scala b/std/shared/src/main/scala/cats/effect/std/Retry.scala index 149d7958de..52befc03c2 100644 --- a/std/shared/src/main/scala/cats/effect/std/Retry.scala +++ b/std/shared/src/main/scala/cats/effect/std/Retry.scala @@ -16,7 +16,7 @@ package cats.effect.std -import cats.{~>, Monad, Semigroup, Show} +import cats.{~>, Applicative, Monad, Semigroup, Show} import cats.effect.kernel.GenTemporal import cats.syntax.apply._ import cats.syntax.flatMap._ @@ -25,6 +25,7 @@ import cats.syntax.functor._ import cats.syntax.monadError._ import scala.concurrent.duration._ +import scala.reflect.{classTag, ClassTag} /** * Glossary: @@ -32,7 +33,7 @@ import scala.concurrent.duration._ * - cumulative delay - the total delay accumulated across all retries */ sealed trait Retry[F[_], E] { - import Retry.{Decision, Status} + import Retry.{Decision, ErrorMatcher, Status} /** * The name of the policy. The name is used for informative purposes. @@ -66,7 +67,7 @@ sealed trait Retry[F[_], E] { * {{{ * val timeoutExceptionOnly = Retry * .exponential[IO, Throwable](1.second) - * .withErrorMatcher { case e: TimeoutException => IO.pure(true) } + * .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[TimeoutException]) * * // will retry using exponential backoff strategy * Retry.retry(timeoutExceptionOnly)(IO.raiseError(new TimeoutException("oops"))) @@ -78,7 +79,7 @@ sealed trait Retry[F[_], E] { * @param matcher * the matcher to use */ - def withErrorMatcher(matcher: PartialFunction[E, F[Boolean]]): Retry[F, E] + def withErrorMatcher(matcher: ErrorMatcher[F, E]): Retry[F, E] /** * Sets the name for the policy. The name is used for informative purposes. @@ -366,6 +367,91 @@ object Retry { private final case class RetryImpl(delay: FiniteDuration) extends Retry } + /** + * The error matcher decides whether the retry decision should be calculated for the raised + * error or not. + */ + sealed trait ErrorMatcher[F[_], -E] { + def matches(e: E): F[Boolean] + } + + object ErrorMatcher { + + /** + * Creates an error matcher that matches all errors. + */ + def all[F[_]: Applicative, E]: ErrorMatcher[F, E] = + new Impl[F, E]({ (_: E) => Applicative[F].pure(true) }) + + /** + * Creates an error matcher using the given `matcher` under the hood. + * + * @param matcher + * the matcher to use + */ + def matches[F[_]: Applicative, E]( + matcher: PartialFunction[E, Boolean] + ): ErrorMatcher[F, E] = + new Impl[F, E](matcher.andThen(b => Applicative[F].pure(b))) + + /** + * Creates an error matcher using the given `matcher` under the hood. + * + * @param matcher + * the matcher to use + */ + def when[F[_]: Applicative, E]( + matcher: PartialFunction[E, F[Boolean]] + ): ErrorMatcher[F, E] = + new Impl[F, E](matcher) + + /** + * A partially-applied constructor. + * + * @example + * {{{ + * val onlyTimeoutException = ErrorMatcher[IO, Throwable].only[TimeoutException] + * val allButTimeoutException = ErrorMatcher[IO, Throwable].except[TimeoutException] + * }}} + */ + def apply[F[_], E]: ApplyPartiallyApplied[F, E] = + new ApplyPartiallyApplied(dummy = true) + + final class ApplyPartiallyApplied[F[_], E](private val dummy: Boolean) extends AnyVal { + + /** + * Creates a new error matcher the matches only errors of type `E1`. + * + * @example + * matches only `TimeoutException` errors: + * {{{ + * val matcher = ErrorMatcher[IO, Throwable].only[TimeoutException] + * }}} + */ + def only[E1 <: E: ClassTag](implicit F: Applicative[F]): ErrorMatcher[F, E] = + matches { case _: E1 => true } + + /** + * Creates a new error matcher the matches all errors except `E1`. + * + * @example + * matches all errors except the `TimeoutException` error: + * {{{ + * val matcher = ErrorMatcher[IO, Throwable].except[TimeoutException] + * }}} + */ + def except[E1 <: E: ClassTag](implicit F: Applicative[F]): ErrorMatcher[F, E] = + matches { case e => !classTag[E1].runtimeClass.isInstance(e) } + } + + private final class Impl[F[_]: Applicative, E]( + matcher: PartialFunction[E, F[Boolean]] + ) extends ErrorMatcher[F, E] { + def matches(e: E): F[Boolean] = + matcher.applyOrElse(e, (_: E) => Applicative[F].pure(false)) + } + } + sealed trait BackoffMultiplier extends Product with Serializable object BackoffMultiplier { @@ -557,7 +643,7 @@ object Retry { * the function to decide whether to continue */ def named[F[_]: Monad, E](name: String)(decider: (Status, E) => F[Decision]): Retry[F, E] = - RetryImpl(name, decider, { _: E => Monad[F].pure(true) }) + RetryImpl(name, decider, ErrorMatcher.all) implicit def retrySemigroup[F[_], E]: Semigroup[Retry[F, E]] = (x, y) => x && y @@ -593,13 +679,11 @@ object Retry { private final case class RetryImpl[F[_]: Monad, E]( name: String, decider: (Status, E) => F[Decision], - errorMatcher: PartialFunction[E, F[Boolean]] + errorMatcher: ErrorMatcher[F, E] ) extends Retry[F, E] { def decide(status: Status, error: E): F[Decision] = - errorMatcher - .applyOrElse(error, (_: E) => Monad[F].pure(false)) - .ifM(decider(status, error), Monad[F].pure(Decision.giveUp)) + errorMatcher.matches(error).ifM(decider(status, error), Monad[F].pure(Decision.giveUp)) def and(other: Retry[F, E]): Retry[F, E] = Retry.named(s"($name && ${other.name})") { (status, error) => @@ -619,7 +703,7 @@ object Retry { } } - def withErrorMatcher(matcher: PartialFunction[E, F[Boolean]]): Retry[F, E] = + def withErrorMatcher(matcher: ErrorMatcher[F, E]): Retry[F, E] = copy(errorMatcher = matcher) def withName(name: String): Retry[F, E] = diff --git a/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala b/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala index 88362acbcd..316c0da36e 100644 --- a/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala @@ -67,84 +67,6 @@ class RetrySpec extends BaseSpec { run(policy)(errorIO) must completeAs(expected) } - "withErrorMatcher - retry only on matched errors" in ticked { implicit ticker => - val maxRetries = 5 - val delay = 1.second - - val error = new Error1 - val policy = - Retry.constantDelay[IO, Throwable](delay).withMaxRetries(maxRetries).withErrorMatcher { - case _: Error1 => IO.pure(true) - } - - val expected = List( - RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), - RetryAttempt(Status(1, 1.second), Decision.retry(delay), error), - RetryAttempt(Status(2, 2.seconds), Decision.retry(delay), error), - RetryAttempt(Status(3, 3.seconds), Decision.retry(delay), error), - RetryAttempt(Status(4, 4.seconds), Decision.retry(delay), error), - RetryAttempt(Status(5, 5.seconds), Decision.giveUp, error) - ) - - run(policy)(IO.raiseError(error)) must completeAs(expected) - } - - "withErrorMatcher - give up on mismatched errors" in ticked { implicit ticker => - val maxRetries = 5 - val delay = 1.second - - val policy = - Retry.constantDelay[IO, Throwable](delay).withMaxRetries(maxRetries).withErrorMatcher { - case _: Error1 => IO.pure(true) - } - - val expected = List( - RetryAttempt(Status(0, Duration.Zero), Decision.giveUp) - ) - - run(policy)(errorIO) must completeAs(expected) - } - - "withErrorMatcher - keep the last matcher - give up on mismatched" in ticked { implicit t => - val delay = 1.second - val maxRetries = 1 - - val policy = - Retry - .constantDelay[IO, Throwable](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher { case _: Error1 => IO.pure(true) } - .withErrorMatcher { case _: Error2 => IO.pure(true) } - - val error = new Error1 - val expected = List( - RetryAttempt(Status(0, Duration.Zero), Decision.giveUp, error) - ) - - run(policy)(IO.raiseError(error)) must completeAs(expected) - } - - "withErrorMatcher - keep the last matcher" in ticked { implicit ticker => - val delay = 1.second - val maxRetries = 2 - - val policy = - Retry - .constantDelay[IO, Throwable](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher { case _: Error1 => IO.pure(true) } - .withErrorMatcher { case _: Error2 => IO.pure(true) } - - val error = new Error2 - val expected = List( - RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), - RetryAttempt(Status(1, 1.second), Decision.retry(delay), error), - RetryAttempt(Status(2, 2.seconds), Decision.giveUp, error) - ) - - run(policy)(IO.raiseError(error)) must completeAs(expected) - } - "withCappedDelay - cap the individual delay" in ticked { implicit ticker => val maxRetries = 5 val delay = 2.second @@ -270,6 +192,144 @@ class RetrySpec extends BaseSpec { } + "Retry#withErrorMatcher" should { + + "retry only on matched errors" in ticked { implicit ticker => + val maxRetries = 2 + val delay = 1.second + + val error = new Error1 + val policy = + Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), + RetryAttempt(Status(1, 1.second), Decision.retry(delay), error), + RetryAttempt(Status(2, 2.seconds), Decision.giveUp, error) + ) + + run(policy)(IO.raiseError(error)) must completeAs(expected) + } + + "give up on mismatched errors" in ticked { implicit ticker => + val maxRetries = 5 + val delay = 1.second + + val policy = + Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.giveUp) + ) + + run(policy)(errorIO) must completeAs(expected) + } + + "keep the last matcher - give up on mismatched" in ticked { implicit ticker => + val delay = 1.second + val maxRetries = 1 + + val policy = + Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error2]) + + val error = new Error1 + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.giveUp, error) + ) + + run(policy)(IO.raiseError(error)) must completeAs(expected) + } + + "keep the last matcher - retry on matching errors" in ticked { implicit ticker => + val delay = 1.second + val maxRetries = 2 + + val policy = + Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error2]) + + val error = new Error2 + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), + RetryAttempt(Status(1, 1.second), Decision.retry(delay), error), + RetryAttempt(Status(2, 2.seconds), Decision.giveUp, error) + ) + + run(policy)(IO.raiseError(error)) must completeAs(expected) + } + + "ErrorMatcher.except - give up on 'excepted' errors" in ticked { implicit ticker => + val maxRetries = 2 + val delay = 1.second + + val error = new Error1 + val policy = + Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[Error1]) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.giveUp, error) + ) + + run(policy)(IO.raiseError(error)) must completeAs(expected) + } + + "ErrorMatcher.except - give up on subtypes" in ticked { implicit ticker => + val maxRetries = 2 + val delay = 1.second + + val error = new Error1 + val policy = + Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[RuntimeException]) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.giveUp, error) + ) + + run(policy)(IO.raiseError(error)) must completeAs(expected) + } + + "ErrorMatcher.except - recover on all errors but the 'excepted' one" in ticked { + implicit ticker => + val maxRetries = 2 + val delay = 1.second + + val error = new Error2 + val policy = + Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[Error1]) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), + RetryAttempt(Status(1, 1.second), Decision.retry(delay), error), + RetryAttempt(Status(2, 2.seconds), Decision.giveUp, error) + ) + + run(policy)(IO.raiseError(error)) must completeAs(expected) + } + + } + "Retry.exponentialBackoff" should { // it's not random :) val RandomNextDouble = 1.0 From bf8a2cff509e85f8a8875c5903c02d72d68de8b8 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Sun, 28 Jul 2024 11:58:34 +0300 Subject: [PATCH 4/7] Retry: add MTL-specific tests --- .../src/main/scala/cats/effect/IO.scala | 6 +- .../main/scala/cats/effect/std/Retry.scala | 8 +- .../cats/effect/std/syntax/RetrySyntax.scala | 4 +- .../scala/cats/effect/std/RetrySpec.scala | 187 ++++++++++++++---- 4 files changed, 153 insertions(+), 52 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 0e0c432858..b7e6d6580f 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -677,14 +677,14 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] { * @param policy * the policy to use * - * @param onRetry + * @param onError * the effect to invoke on every retry decision */ def retry( policy: Retry[IO, Throwable], - onRetry: (Retry.Status, Throwable, Retry.Decision) => IO[Unit] + onError: (Retry.Status, Throwable, Retry.Decision) => IO[Unit] ): IO[A] = - Retry.retry(policy, onRetry)(this) + Retry.retry(policy, onError)(this) /** * Inverse of `attempt` diff --git a/std/shared/src/main/scala/cats/effect/std/Retry.scala b/std/shared/src/main/scala/cats/effect/std/Retry.scala index 52befc03c2..e7e1ef68d6 100644 --- a/std/shared/src/main/scala/cats/effect/std/Retry.scala +++ b/std/shared/src/main/scala/cats/effect/std/Retry.scala @@ -381,7 +381,7 @@ object Retry { * Creates an error matcher that matches all errors. */ def all[F[_]: Applicative, E]: ErrorMatcher[F, E] = - new Impl[F, E]({ (_: E) => Applicative[F].pure(true) }) + new Impl[F, E]({ case _ => Applicative[F].pure(true) }) /** * Creates an error matcher using the given `matcher` under the hood. @@ -486,7 +486,7 @@ object Retry { * @param policy * the policy to use * - * @param onRetry + * @param onError * the effect to invoke on every retry decision * * @param fa @@ -494,9 +494,9 @@ object Retry { */ def retry[F[_], A, E]( policy: Retry[F, E], - onRetry: (Status, E, Decision) => F[Unit] + onError: (Status, E, Decision) => F[Unit] )(fa: F[A])(implicit F: GenTemporal[F, E]): F[A] = - doRetry(policy, Some(onRetry))(fa) + doRetry(policy, Some(onError))(fa) /** * The return policy that always gives up. diff --git a/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala b/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala index 2c3ee4bf2f..3edfc04018 100644 --- a/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala +++ b/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala @@ -31,8 +31,8 @@ final class RetryOps[F[_], A] private[syntax] (private val fa: F[A]) extends Any def retry[E]( policy: Retry[F, E], - onRetry: (Retry.Status, E, Retry.Decision) => F[Unit] + onError: (Retry.Status, E, Retry.Decision) => F[Unit] )(implicit F: GenTemporal[F, E]): F[A] = - Retry.retry(policy, onRetry)(fa) + Retry.retry(policy, onError)(fa) } diff --git a/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala b/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala index 316c0da36e..12e1f78861 100644 --- a/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala @@ -17,8 +17,11 @@ package cats.effect.std import cats.{Hash, Show} -import cats.effect.{BaseSpec, IO, Ref} +import cats.data.EitherT +import cats.effect.{BaseSpec, IO, Ref, Temporal} +import cats.mtl.Handle import cats.syntax.applicative._ +import cats.syntax.flatMap._ import cats.syntax.functor._ import cats.syntax.semigroup._ @@ -72,11 +75,10 @@ class RetrySpec extends BaseSpec { val delay = 2.second val capDelay = 1.second - val policy = - Retry - .constantDelay[IO, Throwable](delay) - .withCappedDelay(capDelay) - .withMaxRetries(maxRetries) + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withCappedDelay(capDelay) + .withMaxRetries(maxRetries) val expected = { val retries = List.tabulate(maxRetries) { i => @@ -199,11 +201,10 @@ class RetrySpec extends BaseSpec { val delay = 1.second val error = new Error1 - val policy = - Retry - .constantDelay[IO, Throwable](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) val expected = List( RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), @@ -218,11 +219,10 @@ class RetrySpec extends BaseSpec { val maxRetries = 5 val delay = 1.second - val policy = - Retry - .constantDelay[IO, Throwable](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) val expected = List( RetryAttempt(Status(0, Duration.Zero), Decision.giveUp) @@ -235,12 +235,11 @@ class RetrySpec extends BaseSpec { val delay = 1.second val maxRetries = 1 - val policy = - Retry - .constantDelay[IO, Throwable](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error2]) + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error2]) val error = new Error1 val expected = List( @@ -254,12 +253,11 @@ class RetrySpec extends BaseSpec { val delay = 1.second val maxRetries = 2 - val policy = - Retry - .constantDelay[IO, Throwable](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error2]) + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error2]) val error = new Error2 val expected = List( @@ -276,11 +274,10 @@ class RetrySpec extends BaseSpec { val delay = 1.second val error = new Error1 - val policy = - Retry - .constantDelay[IO, Throwable](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[Error1]) + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[Error1]) val expected = List( RetryAttempt(Status(0, Duration.Zero), Decision.giveUp, error) @@ -294,11 +291,10 @@ class RetrySpec extends BaseSpec { val delay = 1.second val error = new Error1 - val policy = - Retry - .constantDelay[IO, Throwable](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[RuntimeException]) + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[RuntimeException]) val expected = List( RetryAttempt(Status(0, Duration.Zero), Decision.giveUp, error) @@ -313,11 +309,10 @@ class RetrySpec extends BaseSpec { val delay = 1.second val error = new Error2 - val policy = - Retry - .constantDelay[IO, Throwable](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[Error1]) + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[Error1]) val expected = List( RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), @@ -453,6 +448,112 @@ class RetrySpec extends BaseSpec { } + "Retry MTL" should { + + sealed trait Errors + final class Error1 extends Errors + final class Error2 extends Errors + + type RetryAttempt = (Status, Decision, Errors) + + def mtlRetry[F[_], E, A]( + action: F[A], + policy: Retry[F, E], + onRetry: (Status, E, Decision) => F[Unit] + )(implicit F: Temporal[F], H: Handle[F, E]): F[A] = + F.tailRecM(Status.initial) { status => + H.attempt(action).flatMap { + case Left(error) => + policy + .decide(status, error) + .flatTap(decision => onRetry(status, error, decision)) + .flatMap { + case retry: Decision.Retry => + F.delayBy(F.pure(Left(status.withRetry(retry.delay))), retry.delay) + + case _: Decision.GiveUp => + H.raise(error) + } + + case Right(success) => + F.pure(Right(success)) + } + } + + implicit val outputHash: Hash[(Either[Errors, Unit], List[RetryAttempt])] = + Hash.fromUniversalHashCode + + implicit val outputShow: Show[(Either[Errors, Unit], List[RetryAttempt])] = + Show.fromToString + + "give up on mismatched errors" in ticked { implicit ticker => + type F[A] = EitherT[IO, Errors, A] + + val maxRetries = 2 + val delay = 1.second + + val error = new Error2 + val policy = Retry + .constantDelay[F, Errors](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[F, Errors].only[Error1]) + + val expected: List[RetryAttempt] = List( + (Status(0, Duration.Zero), Decision.giveUp, error) + ) + + val io: F[Unit] = Handle[F, Errors].raise[Errors, Unit](error) + + val run = + for { + ref <- IO.ref(List.empty[RetryAttempt]) + result <- mtlRetry[F, Errors, Unit]( + io, + policy, + (s, e: Errors, d) => EitherT.liftF(ref.update(_ :+ (s, d, e))) + ).value + attempts <- ref.get + } yield (result, attempts) + + run must completeAs((Left(error), expected)) + } + + "retry only on matching errors" in ticked { implicit ticker => + type F[A] = EitherT[IO, Errors, A] + + val maxRetries = 2 + val delay = 1.second + + val error = new Error1 + val policy = Retry + .constantDelay[F, Errors](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[F, Errors].only[Error1]) + + val expected: List[RetryAttempt] = List( + (Status(0, Duration.Zero), Decision.retry(delay), error), + (Status(1, 1.second), Decision.retry(delay), error), + (Status(2, 2.seconds), Decision.giveUp, error) + ) + + val io: F[Unit] = Handle[F, Errors].raise[Errors, Unit](error) + + val run = + for { + ref <- IO.ref(List.empty[RetryAttempt]) + result <- mtlRetry[F, Errors, Unit]( + io, + policy, + (s, e: Errors, d) => EitherT.liftF(ref.update(_ :+ (s, d, e))) + ).value + attempts <- ref.get + } yield (result, attempts) + + run must completeAs((Left(error), expected)) + } + + } + private def run[A](policy: Retry[IO, Throwable])(io: IO[A]): IO[List[RetryAttempt]] = for { ref <- IO.ref(List.empty[RetryAttempt]) From a137204342d4584f7615ce5694cac9fdc9401e18 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Sun, 28 Jul 2024 12:16:11 +0300 Subject: [PATCH 5/7] Retry: update docs --- .../main/scala/cats/effect/std/Retry.scala | 43 +++++++++++++++++++ .../scala/cats/effect/std/RetrySpec.scala | 4 +- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Retry.scala b/std/shared/src/main/scala/cats/effect/std/Retry.scala index e7e1ef68d6..12a4ab036a 100644 --- a/std/shared/src/main/scala/cats/effect/std/Retry.scala +++ b/std/shared/src/main/scala/cats/effect/std/Retry.scala @@ -31,6 +31,49 @@ import scala.reflect.{classTag, ClassTag} * Glossary: * - individual delay - the delay between retries * - cumulative delay - the total delay accumulated across all retries + * + * ==Usage== + * + * ===Retry on all errors=== + * + * {{{ + * val policy = Retry + * .exponentialBackoff[IO, Throwable](1.second) + * .withMaxRetries(10) + * + * // retries 10 times at most using an exponential backoff strategy + * IO.raiseError(new RuntimeException("oops")).retry(policy) + * }}} + * + * ===Retry on some errors (e.g. TimeoutException)=== + * + * {{{ + * val policy = Retry + * .exponentialBackoff[IO, Throwable](1.second) + * .withMaxRetries(10) + * .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[TimeoutException]) + * + * // retries 10 times at most using an exponential backoff strategy + * IO.raiseError(new TimeoutException("timeout")).retry(policy) + * + * // gives up immediately + * IO.raiseError(new RuntimeException("oops")).retry(policy) + * }}} + * + * ===Retry on all errors except the TimeoutException=== + * + * {{{ + * val policy = Retry + * .exponentialBackoff[IO, Throwable](1.second) + * .withMaxRetries(10) + * .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[TimeoutException]) + * + * // retries 10 times at most using an exponential backoff strategy + * IO.raiseError(new RuntimeException("oops")).retry(policy) + * + * // gives up immediately + * IO.raiseError(new TimeoutException("timeout")).retry(policy) + * }}} */ sealed trait Retry[F[_], E] { import Retry.{Decision, ErrorMatcher, Status} diff --git a/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala b/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala index 12e1f78861..60e7689855 100644 --- a/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala @@ -510,7 +510,7 @@ class RetrySpec extends BaseSpec { result <- mtlRetry[F, Errors, Unit]( io, policy, - (s, e: Errors, d) => EitherT.liftF(ref.update(_ :+ (s, d, e))) + (s, e: Errors, d) => EitherT.liftF(ref.update(_ :+ ((s, d, e)))) ).value attempts <- ref.get } yield (result, attempts) @@ -544,7 +544,7 @@ class RetrySpec extends BaseSpec { result <- mtlRetry[F, Errors, Unit]( io, policy, - (s, e: Errors, d) => EitherT.liftF(ref.update(_ :+ (s, d, e))) + (s, e: Errors, d) => EitherT.liftF(ref.update(_ :+ ((s, d, e)))) ).value attempts <- ref.get } yield (result, attempts) From b8eed1badc7079db54e35a8bb96793c5648be2ed Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Sat, 1 Mar 2025 09:37:50 +0200 Subject: [PATCH 6/7] Retry: migrate tests to munit --- .../scala/cats/effect/std/RetryMTLSuite.scala | 116 ++++ .../scala/cats/effect/std/RetrySpec.scala | 584 ------------------ .../scala/cats/effect/std/RetrySuite.scala | 469 ++++++++++++++ 3 files changed, 585 insertions(+), 584 deletions(-) create mode 100644 tests/shared/src/test/scala/cats/effect/std/RetryMTLSuite.scala delete mode 100644 tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala create mode 100644 tests/shared/src/test/scala/cats/effect/std/RetrySuite.scala diff --git a/tests/shared/src/test/scala/cats/effect/std/RetryMTLSuite.scala b/tests/shared/src/test/scala/cats/effect/std/RetryMTLSuite.scala new file mode 100644 index 0000000000..d0d6dacf7d --- /dev/null +++ b/tests/shared/src/test/scala/cats/effect/std/RetryMTLSuite.scala @@ -0,0 +1,116 @@ +package cats.effect.std + +import cats.{Hash, Show} +import cats.data.EitherT +import cats.effect.{BaseSuite, IO, Temporal} +import cats.mtl.Handle +import cats.syntax.all._ + +import scala.concurrent.duration._ + +class RetryMTLSuite extends BaseSuite { + import Retry.{Decision, Status} + + sealed trait Errors + final class Error1 extends Errors + final class Error2 extends Errors + + type RetryAttempt = (Status, Decision, Errors) + + ticked("give up on mismatched errors") { implicit ticker => + type F[A] = EitherT[IO, Errors, A] + + val maxRetries = 2 + val delay = 1.second + + val error = new Error2 + val policy = Retry + .constantDelay[F, Errors](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[F, Errors].only[Error1]) + + val expected: List[RetryAttempt] = List( + (Status(0, Duration.Zero), Decision.giveUp, error) + ) + + val io: F[Unit] = Handle[F, Errors].raise[Errors, Unit](error) + + val run = + for { + ref <- IO.ref(List.empty[RetryAttempt]) + result <- mtlRetry[F, Errors, Unit]( + io, + policy, + (s, e: Errors, d) => EitherT.liftF(ref.update(_ :+ ((s, d, e)))) + ).value + attempts <- ref.get + } yield (result, attempts) + + assertCompleteAs(run, (Left(error), expected)) + } + + ticked("retry only on matching errors") { implicit ticker => + type F[A] = EitherT[IO, Errors, A] + + val maxRetries = 2 + val delay = 1.second + + val error = new Error1 + val policy = Retry + .constantDelay[F, Errors](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[F, Errors].only[Error1]) + + val expected: List[RetryAttempt] = List( + (Status(0, Duration.Zero), Decision.retry(delay), error), + (Status(1, 1.second), Decision.retry(delay), error), + (Status(2, 2.seconds), Decision.giveUp, error) + ) + + val io: F[Unit] = Handle[F, Errors].raise[Errors, Unit](error) + + val run = + for { + ref <- IO.ref(List.empty[RetryAttempt]) + result <- mtlRetry[F, Errors, Unit]( + io, + policy, + (s, e: Errors, d) => EitherT.liftF(ref.update(_ :+ ((s, d, e)))) + ).value + attempts <- ref.get + } yield (result, attempts) + + assertCompleteAs(run, (Left(error), expected)) + } + + private def mtlRetry[F[_], E, A]( + action: F[A], + policy: Retry[F, E], + onRetry: (Status, E, Decision) => F[Unit] + )(implicit F: Temporal[F], H: Handle[F, E]): F[A] = + F.tailRecM(Status.initial) { status => + H.attempt(action).flatMap { + case Left(error) => + policy + .decide(status, error) + .flatTap(decision => onRetry(status, error, decision)) + .flatMap { + case retry: Decision.Retry => + F.delayBy(F.pure(Left(status.withRetry(retry.delay))), retry.delay) + + case _: Decision.GiveUp => + H.raise(error) + } + + case Right(success) => + F.pure(Right(success)) + } + } + + private implicit val outputHash: Hash[(Either[Errors, Unit], List[RetryAttempt])] = + Hash.fromUniversalHashCode + + private implicit val outputShow: Show[(Either[Errors, Unit], List[RetryAttempt])] = + Show.fromToString + +} diff --git a/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala b/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala deleted file mode 100644 index 60e7689855..0000000000 --- a/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala +++ /dev/null @@ -1,584 +0,0 @@ -/* - * Copyright 2020-2024 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect.std - -import cats.{Hash, Show} -import cats.data.EitherT -import cats.effect.{BaseSpec, IO, Ref, Temporal} -import cats.mtl.Handle -import cats.syntax.applicative._ -import cats.syntax.flatMap._ -import cats.syntax.functor._ -import cats.syntax.semigroup._ - -import scala.concurrent.duration._ - -class RetrySpec extends BaseSpec { - import Retry.{Decision, Status} - - private class Error1 extends RuntimeException - private class Error2 extends RuntimeException - - private val error: Throwable = new RuntimeException("oops") - private val errorIO: IO[Unit] = IO.raiseError(error) - - "Retry" should { - - "retry until the action succeeds" in ticked { implicit ticker => - val delay = 1.second - val policy = Retry.constantDelay[IO, Throwable](delay) - - def action(attempts: Ref[IO, Int]) = - attempts.getAndUpdate(_ + 1).flatMap(total => errorIO.whenA(total < 3)) - - val io = - for { - counter <- IO.ref(0) - result <- action(counter).retry(policy).timed - counterValue <- counter.get - } yield (result._1, counterValue) - - val expectedRetries = 3 - - io must completeAs((delay * expectedRetries.toLong, expectedRetries + 1)) - } - - "retry until the policy chooses to give up" in ticked { implicit ticker => - val maxRetries = 2 - val policy = Retry.maxRetries[IO, Throwable](maxRetries) - - val expected = List( - RetryAttempt(Status(0, Duration.Zero), Decision.retry(Duration.Zero)), - RetryAttempt(Status(1, Duration.Zero), Decision.retry(Duration.Zero)), - RetryAttempt(Status(2, Duration.Zero), Decision.giveUp) - ) - - run(policy)(errorIO) must completeAs(expected) - } - - "withCappedDelay - cap the individual delay" in ticked { implicit ticker => - val maxRetries = 5 - val delay = 2.second - val capDelay = 1.second - - val policy = Retry - .constantDelay[IO, Throwable](delay) - .withCappedDelay(capDelay) - .withMaxRetries(maxRetries) - - val expected = { - val retries = List.tabulate(maxRetries) { i => - RetryAttempt(Status(i, capDelay * i.toLong), Decision.retry(capDelay)) - } - - retries :+ RetryAttempt(Status(maxRetries, maxRetries * capDelay), Decision.giveUp) - } - - run(policy)(errorIO) must completeAs(expected) - } - - "withMaxRetries - give up once max retries are reached" in ticked { implicit ticker => - val maxRetries = 5 - val delay = 2.second - - val policy = - Retry.constantDelay[IO, Throwable](delay).withMaxRetries(maxRetries) - - val expected = { - val retries = List.tabulate(maxRetries) { i => - RetryAttempt(Status(i, delay * i.toLong), Decision.retry(delay)) - } - - retries :+ RetryAttempt(Status(maxRetries, maxRetries * delay), Decision.giveUp) - } - - run(policy)(errorIO) must completeAs(expected) - } - - "withMaxDelay - give up once max individual delay is reached" in ticked { implicit ticker => - val delay = 1.second - val maxDelay = 5.second - - val policy = - Retry[IO, Throwable] { (status, _) => - IO.pure(Decision.retry(status.retriesTotal * delay)) - }.withMaxDelay(maxDelay) - - val expected = { - val numRetries = (maxDelay.toSeconds / delay.toSeconds).toInt - - val retries = List.tabulate(numRetries) { i => - val cumulativeDelay = - Range(0, i).map(_ * delay).reduceOption(_ + _).getOrElse(Duration.Zero) - - RetryAttempt(Status(i, cumulativeDelay), Decision.retry(i * delay)) - } - - val cumulativeDelay = retries.map(_.status.cumulativeDelay).reduce(_ + _) - retries :+ RetryAttempt(Status(numRetries, cumulativeDelay), Decision.giveUp) - } - - run(policy)(errorIO) must completeAs(expected) - } - - "withMaxCumulativeDelay - cap the max cumulative (total) delay" in ticked { implicit t => - val delay = 1.second - val maxDelay = 5.second - val maxRetries = (maxDelay.toSeconds / delay.toSeconds).toInt - 1 - - val policy = - Retry.constantDelay[IO, Throwable](delay).withMaxCumulativeDelay(maxDelay) - - val expected = { - val retries = List.tabulate(maxRetries) { i => - RetryAttempt(Status(i, delay * i.toLong), Decision.retry(delay)) - } - - retries :+ RetryAttempt(Status(maxRetries, delay * maxRetries.toLong), Decision.giveUp) - } - - run(policy)(errorIO) must completeAs(expected) - } - - "&& (and) - give up when one policy is decided to give up" in ticked { implicit ticker => - val alwaysGiveUp = Retry.alwaysGiveUp[IO, Throwable] - val alwaysRetry = Retry.constantDelay[IO, Throwable](1.second) - - val expected = Decision.giveUp - - (alwaysGiveUp |+| alwaysRetry).decide(Status.initial, error) must completeAs(expected) - } - - "&& (and) - choose the maximum delay if both policies decided to retry" in ticked { - implicit ticker => - val delay1 = Retry.constantDelay[IO, Throwable](1.second) - val delay2 = Retry.constantDelay[IO, Throwable](2.seconds) - - val expected = Decision.retry(2.seconds) - - (delay1 |+| delay2).decide(Status.initial, error) must completeAs(expected) - } - - "|| (or) - retry when one policy decided to retry" in ticked { implicit ticker => - val alwaysGiveUp = Retry.alwaysGiveUp[IO, Throwable] - val alwaysRetry = Retry.constantDelay[IO, Throwable](1.second) - - val expected = Decision.retry(1.second) - - (alwaysGiveUp || alwaysRetry).decide(Status.initial, error) must completeAs(expected) - } - - "|| (or) - choose the minimum delay if both policies decided to retry" in ticked { - implicit ticker => - val delay1 = Retry.constantDelay[IO, Throwable](1.second) - val delay2 = Retry.constantDelay[IO, Throwable](2.seconds) - - val expected = Decision.retry(1.second) - - (delay1 || delay2).decide(Status.initial, error) must completeAs(expected) - } - - } - - "Retry#withErrorMatcher" should { - - "retry only on matched errors" in ticked { implicit ticker => - val maxRetries = 2 - val delay = 1.second - - val error = new Error1 - val policy = Retry - .constantDelay[IO, Throwable](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) - - val expected = List( - RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), - RetryAttempt(Status(1, 1.second), Decision.retry(delay), error), - RetryAttempt(Status(2, 2.seconds), Decision.giveUp, error) - ) - - run(policy)(IO.raiseError(error)) must completeAs(expected) - } - - "give up on mismatched errors" in ticked { implicit ticker => - val maxRetries = 5 - val delay = 1.second - - val policy = Retry - .constantDelay[IO, Throwable](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) - - val expected = List( - RetryAttempt(Status(0, Duration.Zero), Decision.giveUp) - ) - - run(policy)(errorIO) must completeAs(expected) - } - - "keep the last matcher - give up on mismatched" in ticked { implicit ticker => - val delay = 1.second - val maxRetries = 1 - - val policy = Retry - .constantDelay[IO, Throwable](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error2]) - - val error = new Error1 - val expected = List( - RetryAttempt(Status(0, Duration.Zero), Decision.giveUp, error) - ) - - run(policy)(IO.raiseError(error)) must completeAs(expected) - } - - "keep the last matcher - retry on matching errors" in ticked { implicit ticker => - val delay = 1.second - val maxRetries = 2 - - val policy = Retry - .constantDelay[IO, Throwable](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error2]) - - val error = new Error2 - val expected = List( - RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), - RetryAttempt(Status(1, 1.second), Decision.retry(delay), error), - RetryAttempt(Status(2, 2.seconds), Decision.giveUp, error) - ) - - run(policy)(IO.raiseError(error)) must completeAs(expected) - } - - "ErrorMatcher.except - give up on 'excepted' errors" in ticked { implicit ticker => - val maxRetries = 2 - val delay = 1.second - - val error = new Error1 - val policy = Retry - .constantDelay[IO, Throwable](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[Error1]) - - val expected = List( - RetryAttempt(Status(0, Duration.Zero), Decision.giveUp, error) - ) - - run(policy)(IO.raiseError(error)) must completeAs(expected) - } - - "ErrorMatcher.except - give up on subtypes" in ticked { implicit ticker => - val maxRetries = 2 - val delay = 1.second - - val error = new Error1 - val policy = Retry - .constantDelay[IO, Throwable](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[RuntimeException]) - - val expected = List( - RetryAttempt(Status(0, Duration.Zero), Decision.giveUp, error) - ) - - run(policy)(IO.raiseError(error)) must completeAs(expected) - } - - "ErrorMatcher.except - recover on all errors but the 'excepted' one" in ticked { - implicit ticker => - val maxRetries = 2 - val delay = 1.second - - val error = new Error2 - val policy = Retry - .constantDelay[IO, Throwable](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[Error1]) - - val expected = List( - RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), - RetryAttempt(Status(1, 1.second), Decision.retry(delay), error), - RetryAttempt(Status(2, 2.seconds), Decision.giveUp, error) - ) - - run(policy)(IO.raiseError(error)) must completeAs(expected) - } - - } - - "Retry.exponentialBackoff" should { - // it's not random :) - val RandomNextDouble = 1.0 - implicit val random: Random[IO] = - new Random.ScalaRandom[IO](IO.pure(scala.util.Random)) { - override def nextDouble: IO[Double] = IO.pure(RandomNextDouble) - } - - def calculateExpected( - baseDelay: FiniteDuration, - maxRetries: Int, - multiplier: Double, - factor: Double, - maxDelay: Option[FiniteDuration] = None - ): List[RetryAttempt] = { - - // per step delay = rand_factor * random_double [0:1] * (base_delay * multiplier ^ retry) - def stepDelay(retry: Int) = - factor * RandomNextDouble * baseDelay * math.pow(multiplier, retry.toDouble) match { - case f: FiniteDuration => - maxDelay.fold(f)(f.min) - case _ => - sys.error("result is not finite") - } - - @annotation.tailrec - def loop(retry: Int, output: List[RetryAttempt]): List[RetryAttempt] = { - val cumulative = output - .collect { case RetryAttempt(_, r: Decision.Retry, _) => r.delay } - .reduceOption(_ + _) - .getOrElse(Duration.Zero) - - val status = Status(retry, cumulative) - - if (retry < maxRetries) { - val next = RetryAttempt(status, Decision.retry(stepDelay(retry))) - loop(retry + 1, output :+ next) - } else { - output :+ RetryAttempt(status, Decision.giveUp) - } - } - - loop(0, Nil) - } - - "use default multiplier (2.0) and randomization factor (0.5)" in ticked { implicit ticker => - val baseDelay = 1.second - val maxRetries = 3 - val policy = Retry.exponentialBackoff[IO, Throwable](baseDelay).withMaxRetries(maxRetries) - - val expected = calculateExpected( - baseDelay = baseDelay, - maxRetries = maxRetries, - multiplier = 2.0, - factor = 0.5 - ) - - run(policy)(errorIO) must completeAs(expected) - } - - "use the given multiplier (1.0) and randomization factor (1.0)" in ticked { implicit t => - val baseDelay = 1.second - val maxRetries = 3 - val policy = Retry - .exponentialBackoff[IO, Throwable]( - baseDelay, - Some(Retry.BackoffMultiplier.const(1.0)), - Some(1.0) - ) - .withMaxRetries(maxRetries) - - val expected = calculateExpected( - baseDelay = baseDelay, - maxRetries = maxRetries, - multiplier = 1.0, - factor = 1.0 - ) - - run(policy)(errorIO) must completeAs(expected) - } - - "cap the delay at the specific max" in ticked { implicit ticker => - val baseDelay = 1.second - val maxRetries = 5 - val maxDelay = 2.seconds - val policy = Retry - .exponentialBackoff[IO, Throwable](baseDelay) - .withCappedDelay(maxDelay) - .withMaxRetries(maxRetries) - - val expected = calculateExpected( - baseDelay = baseDelay, - maxRetries = maxRetries, - multiplier = 2.0, - factor = 0.5, - maxDelay = Some(maxDelay) - ) - - run(policy)(errorIO) must completeAs(expected) - } - - "respect the max number of retries" in ticked { implicit ticker => - val baseDelay = 1.second - val maxRetries = 5 - val maxDelay = 2.seconds - val policy = Retry - .exponentialBackoff[IO, Throwable](baseDelay) - .withCappedDelay(maxDelay) - .withMaxRetries(maxRetries) - - val expected = calculateExpected( - baseDelay = baseDelay, - maxRetries = maxRetries, - multiplier = 2.0, - factor = 0.5, - maxDelay = Some(maxDelay) - ) - - run(policy)(errorIO) must completeAs(expected) - } - - } - - "Retry MTL" should { - - sealed trait Errors - final class Error1 extends Errors - final class Error2 extends Errors - - type RetryAttempt = (Status, Decision, Errors) - - def mtlRetry[F[_], E, A]( - action: F[A], - policy: Retry[F, E], - onRetry: (Status, E, Decision) => F[Unit] - )(implicit F: Temporal[F], H: Handle[F, E]): F[A] = - F.tailRecM(Status.initial) { status => - H.attempt(action).flatMap { - case Left(error) => - policy - .decide(status, error) - .flatTap(decision => onRetry(status, error, decision)) - .flatMap { - case retry: Decision.Retry => - F.delayBy(F.pure(Left(status.withRetry(retry.delay))), retry.delay) - - case _: Decision.GiveUp => - H.raise(error) - } - - case Right(success) => - F.pure(Right(success)) - } - } - - implicit val outputHash: Hash[(Either[Errors, Unit], List[RetryAttempt])] = - Hash.fromUniversalHashCode - - implicit val outputShow: Show[(Either[Errors, Unit], List[RetryAttempt])] = - Show.fromToString - - "give up on mismatched errors" in ticked { implicit ticker => - type F[A] = EitherT[IO, Errors, A] - - val maxRetries = 2 - val delay = 1.second - - val error = new Error2 - val policy = Retry - .constantDelay[F, Errors](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher(Retry.ErrorMatcher[F, Errors].only[Error1]) - - val expected: List[RetryAttempt] = List( - (Status(0, Duration.Zero), Decision.giveUp, error) - ) - - val io: F[Unit] = Handle[F, Errors].raise[Errors, Unit](error) - - val run = - for { - ref <- IO.ref(List.empty[RetryAttempt]) - result <- mtlRetry[F, Errors, Unit]( - io, - policy, - (s, e: Errors, d) => EitherT.liftF(ref.update(_ :+ ((s, d, e)))) - ).value - attempts <- ref.get - } yield (result, attempts) - - run must completeAs((Left(error), expected)) - } - - "retry only on matching errors" in ticked { implicit ticker => - type F[A] = EitherT[IO, Errors, A] - - val maxRetries = 2 - val delay = 1.second - - val error = new Error1 - val policy = Retry - .constantDelay[F, Errors](delay) - .withMaxRetries(maxRetries) - .withErrorMatcher(Retry.ErrorMatcher[F, Errors].only[Error1]) - - val expected: List[RetryAttempt] = List( - (Status(0, Duration.Zero), Decision.retry(delay), error), - (Status(1, 1.second), Decision.retry(delay), error), - (Status(2, 2.seconds), Decision.giveUp, error) - ) - - val io: F[Unit] = Handle[F, Errors].raise[Errors, Unit](error) - - val run = - for { - ref <- IO.ref(List.empty[RetryAttempt]) - result <- mtlRetry[F, Errors, Unit]( - io, - policy, - (s, e: Errors, d) => EitherT.liftF(ref.update(_ :+ ((s, d, e)))) - ).value - attempts <- ref.get - } yield (result, attempts) - - run must completeAs((Left(error), expected)) - } - - } - - private def run[A](policy: Retry[IO, Throwable])(io: IO[A]): IO[List[RetryAttempt]] = - for { - ref <- IO.ref(List.empty[RetryAttempt]) - time <- io - .retry(policy, (s, e: Throwable, d) => ref.update(_ :+ RetryAttempt(s, d, e))) - .attempt - .timed - ._1F - attempts <- ref.get - } yield { - if (time > Duration.Zero && attempts.nonEmpty) { // ensure sleep time == cumulative delay - assert(attempts.last.status.cumulativeDelay == time) - } - - attempts - } - - private case class RetryAttempt( - status: Status, - decision: Decision, - error: Throwable = error - ) - - private implicit val retryAttemptHash: Hash[RetryAttempt] = Hash.fromUniversalHashCode - private implicit val retryAttemptShow: Show[RetryAttempt] = Show.fromToString - private implicit val decisionHash: Hash[Decision] = Hash.fromUniversalHashCode - private implicit val decisionShow: Show[Decision] = Show.fromToString -} diff --git a/tests/shared/src/test/scala/cats/effect/std/RetrySuite.scala b/tests/shared/src/test/scala/cats/effect/std/RetrySuite.scala new file mode 100644 index 0000000000..9b67df6f4a --- /dev/null +++ b/tests/shared/src/test/scala/cats/effect/std/RetrySuite.scala @@ -0,0 +1,469 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.std + +import cats.{Hash, Show} +import cats.effect.{BaseSuite, IO, Ref} +import cats.syntax.applicative._ +import cats.syntax.functor._ +import cats.syntax.semigroup._ + +import scala.concurrent.duration._ + +class RetrySuite extends BaseSuite { + import Retry.{Decision, Status} + + private class Error1 extends RuntimeException + private class Error2 extends RuntimeException + + private val error: Throwable = new RuntimeException("oops") + private val errorIO: IO[Unit] = IO.raiseError(error) + + ticked("retry until the action succeeds") { implicit ticker => + val delay = 1.second + val policy = Retry.constantDelay[IO, Throwable](delay) + + def action(attempts: Ref[IO, Int]) = + attempts.getAndUpdate(_ + 1).flatMap(total => errorIO.whenA(total < 3)) + + val io = + for { + counter <- IO.ref(0) + result <- action(counter).retry(policy).timed + counterValue <- counter.get + } yield (result._1, counterValue) + + val expectedRetries = 3 + + assertCompleteAs(io, (delay * expectedRetries.toLong, expectedRetries + 1)) + } + + ticked("retry until the policy chooses to give up") { implicit ticker => + val maxRetries = 2 + val policy = Retry.maxRetries[IO, Throwable](maxRetries) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.retry(Duration.Zero)), + RetryAttempt(Status(1, Duration.Zero), Decision.retry(Duration.Zero)), + RetryAttempt(Status(2, Duration.Zero), Decision.giveUp) + ) + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("withCappedDelay - cap the individual delay") { implicit ticker => + val maxRetries = 5 + val delay = 2.second + val capDelay = 1.second + + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withCappedDelay(capDelay) + .withMaxRetries(maxRetries) + + val expected = { + val retries = List.tabulate(maxRetries) { i => + RetryAttempt(Status(i, capDelay * i.toLong), Decision.retry(capDelay)) + } + + retries :+ RetryAttempt(Status(maxRetries, maxRetries * capDelay), Decision.giveUp) + } + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("withMaxRetries - give up once max retries are reached") { implicit ticker => + val maxRetries = 5 + val delay = 2.second + + val policy = + Retry.constantDelay[IO, Throwable](delay).withMaxRetries(maxRetries) + + val expected = { + val retries = List.tabulate(maxRetries) { i => + RetryAttempt(Status(i, delay * i.toLong), Decision.retry(delay)) + } + + retries :+ RetryAttempt(Status(maxRetries, maxRetries * delay), Decision.giveUp) + } + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("withMaxDelay - give up once max individual delay is reached") { implicit ticker => + val delay = 1.second + val maxDelay = 5.second + + val policy = + Retry[IO, Throwable] { (status, _) => + IO.pure(Decision.retry(status.retriesTotal * delay)) + }.withMaxDelay(maxDelay) + + val expected = { + val numRetries = (maxDelay.toSeconds / delay.toSeconds).toInt + + val retries = List.tabulate(numRetries) { i => + val cumulativeDelay = + Range(0, i).map(_ * delay).reduceOption(_ + _).getOrElse(Duration.Zero) + + RetryAttempt(Status(i, cumulativeDelay), Decision.retry(i * delay)) + } + + val cumulativeDelay = retries.map(_.status.cumulativeDelay).reduce(_ + _) + retries :+ RetryAttempt(Status(numRetries, cumulativeDelay), Decision.giveUp) + } + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("withMaxCumulativeDelay - cap the max cumulative (total) delay") { implicit t => + val delay = 1.second + val maxDelay = 5.second + val maxRetries = (maxDelay.toSeconds / delay.toSeconds).toInt - 1 + + val policy = + Retry.constantDelay[IO, Throwable](delay).withMaxCumulativeDelay(maxDelay) + + val expected = { + val retries = List.tabulate(maxRetries) { i => + RetryAttempt(Status(i, delay * i.toLong), Decision.retry(delay)) + } + + retries :+ RetryAttempt(Status(maxRetries, delay * maxRetries.toLong), Decision.giveUp) + } + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("&& (and) - give up when one policy is decided to give up") { implicit ticker => + val alwaysGiveUp = Retry.alwaysGiveUp[IO, Throwable] + val alwaysRetry = Retry.constantDelay[IO, Throwable](1.second) + + val expected = Decision.giveUp + + assertCompleteAs((alwaysGiveUp |+| alwaysRetry).decide(Status.initial, error), expected) + } + + ticked("&& (and) - choose the maximum delay if both policies decided to retry") { + implicit ticker => + val delay1 = Retry.constantDelay[IO, Throwable](1.second) + val delay2 = Retry.constantDelay[IO, Throwable](2.seconds) + + val expected = Decision.retry(2.seconds) + + assertCompleteAs((delay1 |+| delay2).decide(Status.initial, error), expected) + } + + ticked("|| (or) - retry when one policy decided to retry") { implicit ticker => + val alwaysGiveUp = Retry.alwaysGiveUp[IO, Throwable] + val alwaysRetry = Retry.constantDelay[IO, Throwable](1.second) + + val expected = Decision.retry(1.second) + + assertCompleteAs((alwaysGiveUp || alwaysRetry).decide(Status.initial, error), expected) + } + + ticked("|| (or) - choose the minimum delay if both policies decided to retry") { + implicit ticker => + val delay1 = Retry.constantDelay[IO, Throwable](1.second) + val delay2 = Retry.constantDelay[IO, Throwable](2.seconds) + + val expected = Decision.retry(1.second) + + assertCompleteAs((delay1 || delay2).decide(Status.initial, error), expected) + } + + ticked("withErrorMatcher - retry only on matched errors") { implicit ticker => + val maxRetries = 2 + val delay = 1.second + + val error = new Error1 + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), + RetryAttempt(Status(1, 1.second), Decision.retry(delay), error), + RetryAttempt(Status(2, 2.seconds), Decision.giveUp, error) + ) + + assertCompleteAs(run(policy)(IO.raiseError(error)), expected) + } + + ticked("withErrorMatcher - give up on mismatched errors") { implicit ticker => + val maxRetries = 5 + val delay = 1.second + + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.giveUp) + ) + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("withErrorMatcher - keep the last matcher - give up on mismatched") { + implicit ticker => + val delay = 1.second + val maxRetries = 1 + + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error2]) + + val error = new Error1 + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.giveUp, error) + ) + + assertCompleteAs(run(policy)(IO.raiseError(error)), expected) + } + + ticked("withErrorMatcher - keep the last matcher - retry on matching errors") { + implicit ticker => + val delay = 1.second + val maxRetries = 2 + + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error1]) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].only[Error2]) + + val error = new Error2 + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), + RetryAttempt(Status(1, 1.second), Decision.retry(delay), error), + RetryAttempt(Status(2, 2.seconds), Decision.giveUp, error) + ) + + assertCompleteAs(run(policy)(IO.raiseError(error)), expected) + } + + ticked("ErrorMatcher.except - give up on 'excepted' errors") { implicit ticker => + val maxRetries = 2 + val delay = 1.second + + val error = new Error1 + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[Error1]) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.giveUp, error) + ) + + assertCompleteAs(run(policy)(IO.raiseError(error)), expected) + } + + ticked("ErrorMatcher.except - give up on subtypes") { implicit ticker => + val maxRetries = 2 + val delay = 1.second + + val error = new Error1 + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[RuntimeException]) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.giveUp, error) + ) + + assertCompleteAs(run(policy)(IO.raiseError(error)), expected) + } + + ticked("ErrorMatcher.except - recover on all errors but the 'excepted' one") { + implicit ticker => + val maxRetries = 2 + val delay = 1.second + + val error = new Error2 + val policy = Retry + .constantDelay[IO, Throwable](delay) + .withMaxRetries(maxRetries) + .withErrorMatcher(Retry.ErrorMatcher[IO, Throwable].except[Error1]) + + val expected = List( + RetryAttempt(Status(0, Duration.Zero), Decision.retry(delay), error), + RetryAttempt(Status(1, 1.second), Decision.retry(delay), error), + RetryAttempt(Status(2, 2.seconds), Decision.giveUp, error) + ) + + assertCompleteAs(run(policy)(IO.raiseError(error)), expected) + } + + ticked("exponentialBackoff - use default multiplier (2.0) and randomization factor (0.5)") { + implicit ticker => + val baseDelay = 1.second + val maxRetries = 3 + val policy = + Retry.exponentialBackoff[IO, Throwable](baseDelay).withMaxRetries(maxRetries) + + val expected = calculateExpectedBackoff( + baseDelay = baseDelay, + maxRetries = maxRetries, + multiplier = 2.0, + factor = 0.5 + ) + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("exponentialBackoff - use the given multiplier (1.0) and randomization factor (1.0)") { + implicit ticker => + val baseDelay = 1.second + val maxRetries = 3 + val policy = Retry + .exponentialBackoff[IO, Throwable]( + baseDelay, + Some(Retry.BackoffMultiplier.const(1.0)), + Some(1.0) + ) + .withMaxRetries(maxRetries) + + val expected = calculateExpectedBackoff( + baseDelay = baseDelay, + maxRetries = maxRetries, + multiplier = 1.0, + factor = 1.0 + ) + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("exponentialBackoff - cap the delay at the specific max") { implicit ticker => + val baseDelay = 1.second + val maxRetries = 5 + val maxDelay = 2.seconds + val policy = Retry + .exponentialBackoff[IO, Throwable](baseDelay) + .withCappedDelay(maxDelay) + .withMaxRetries(maxRetries) + + val expected = calculateExpectedBackoff( + baseDelay = baseDelay, + maxRetries = maxRetries, + multiplier = 2.0, + factor = 0.5, + maxDelay = Some(maxDelay) + ) + + assertCompleteAs(run(policy)(errorIO), expected) + } + + ticked("exponentialBackoff - respect the max number of retries") { implicit ticker => + val baseDelay = 1.second + val maxRetries = 5 + val maxDelay = 2.seconds + val policy = Retry + .exponentialBackoff[IO, Throwable](baseDelay) + .withCappedDelay(maxDelay) + .withMaxRetries(maxRetries) + + val expected = calculateExpectedBackoff( + baseDelay = baseDelay, + maxRetries = maxRetries, + multiplier = 2.0, + factor = 0.5, + maxDelay = Some(maxDelay) + ) + + assertCompleteAs(run(policy)(errorIO), expected) + } + + // it's not random :) + private val RandomNextDouble = 1.0 + private implicit val random: Random[IO] = + new Random.ScalaRandom[IO](IO.pure(scala.util.Random)) { + override def nextDouble: IO[Double] = IO.pure(RandomNextDouble) + } + + private def calculateExpectedBackoff( + baseDelay: FiniteDuration, + maxRetries: Int, + multiplier: Double, + factor: Double, + maxDelay: Option[FiniteDuration] = None + ): List[RetryAttempt] = { + + // per step delay = rand_factor * random_double [0:1] * (base_delay * multiplier ^ retry) + def stepDelay(retry: Int) = + factor * RandomNextDouble * baseDelay * math.pow(multiplier, retry.toDouble) match { + case f: FiniteDuration => + maxDelay.fold(f)(f.min) + case _ => + sys.error("result is not finite") + } + + @annotation.tailrec + def loop(retry: Int, output: List[RetryAttempt]): List[RetryAttempt] = { + val cumulative = output + .collect { case RetryAttempt(_, r: Decision.Retry, _) => r.delay } + .reduceOption(_ + _) + .getOrElse(Duration.Zero) + + val status = Status(retry, cumulative) + + if (retry < maxRetries) { + val next = RetryAttempt(status, Decision.retry(stepDelay(retry))) + loop(retry + 1, output :+ next) + } else { + output :+ RetryAttempt(status, Decision.giveUp) + } + } + + loop(0, Nil) + } + + private def run[A](policy: Retry[IO, Throwable])(io: IO[A]): IO[List[RetryAttempt]] = + for { + ref <- IO.ref(List.empty[RetryAttempt]) + time <- io + .retry(policy, (s, e: Throwable, d) => ref.update(_ :+ RetryAttempt(s, d, e))) + .attempt + .timed + ._1F + attempts <- ref.get + } yield { + if (time > Duration.Zero && attempts.nonEmpty) { // ensure sleep time == cumulative delay + assert(attempts.last.status.cumulativeDelay == time) + } + + attempts + } + + private case class RetryAttempt( + status: Status, + decision: Decision, + error: Throwable = error + ) + + private implicit val retryAttemptHash: Hash[RetryAttempt] = Hash.fromUniversalHashCode + private implicit val retryAttemptShow: Show[RetryAttempt] = Show.fromToString + private implicit val decisionHash: Hash[Decision] = Hash.fromUniversalHashCode + private implicit val decisionShow: Show[Decision] = Show.fromToString +} From 346c39631eef34be9581bac9cb3acf9ce2485bfc Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Sat, 1 Mar 2025 09:41:33 +0200 Subject: [PATCH 7/7] Retry: fix headers --- .../src/main/scala/cats/effect/std/Retry.scala | 2 +- .../cats/effect/std/syntax/RetrySyntax.scala | 2 +- .../test/scala/cats/effect/IOPlatformSuite.scala | 2 +- .../test/scala/cats/effect/IOPlatformSuite.scala | 2 +- .../scala/cats/effect/std/RetryMTLSuite.scala | 16 ++++++++++++++++ .../test/scala/cats/effect/std/RetrySuite.scala | 2 +- 6 files changed, 21 insertions(+), 5 deletions(-) diff --git a/std/shared/src/main/scala/cats/effect/std/Retry.scala b/std/shared/src/main/scala/cats/effect/std/Retry.scala index 12a4ab036a..542df4299d 100644 --- a/std/shared/src/main/scala/cats/effect/std/Retry.scala +++ b/std/shared/src/main/scala/cats/effect/std/Retry.scala @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Typelevel + * Copyright 2020-2025 Typelevel * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala b/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala index 3edfc04018..431fe4b9c3 100644 --- a/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala +++ b/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Typelevel + * Copyright 2020-2025 Typelevel * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/tests/js/src/test/scala/cats/effect/IOPlatformSuite.scala b/tests/js/src/test/scala/cats/effect/IOPlatformSuite.scala index 776d6ef5d0..6db23f6d80 100644 --- a/tests/js/src/test/scala/cats/effect/IOPlatformSuite.scala +++ b/tests/js/src/test/scala/cats/effect/IOPlatformSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Typelevel + * Copyright 2020-2025 Typelevel * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/tests/jvm/src/test/scala/cats/effect/IOPlatformSuite.scala b/tests/jvm/src/test/scala/cats/effect/IOPlatformSuite.scala index 4bd1d94e7f..8bb8a84fbf 100644 --- a/tests/jvm/src/test/scala/cats/effect/IOPlatformSuite.scala +++ b/tests/jvm/src/test/scala/cats/effect/IOPlatformSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Typelevel + * Copyright 2020-2025 Typelevel * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/tests/shared/src/test/scala/cats/effect/std/RetryMTLSuite.scala b/tests/shared/src/test/scala/cats/effect/std/RetryMTLSuite.scala index d0d6dacf7d..af85836dc9 100644 --- a/tests/shared/src/test/scala/cats/effect/std/RetryMTLSuite.scala +++ b/tests/shared/src/test/scala/cats/effect/std/RetryMTLSuite.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cats.effect.std import cats.{Hash, Show} diff --git a/tests/shared/src/test/scala/cats/effect/std/RetrySuite.scala b/tests/shared/src/test/scala/cats/effect/std/RetrySuite.scala index 9b67df6f4a..1c9d0da301 100644 --- a/tests/shared/src/test/scala/cats/effect/std/RetrySuite.scala +++ b/tests/shared/src/test/scala/cats/effect/std/RetrySuite.scala @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Typelevel + * Copyright 2020-2025 Typelevel * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.