From 94dc84d5d074404c5ea6ce4ee3ee7ca2b4192d8d Mon Sep 17 00:00:00 2001 From: Chiyu Zhong Date: Fri, 14 Aug 2020 16:48:19 +0800 Subject: [PATCH] Add random factor for CircuitBreaker (#29478) * Multiply reset timeout with randomFactor * add old constructor to make it binary compatible * add scaladoc for randomFactor --- .../akka/pattern/CircuitBreakerSpec.scala | 2 +- .../scala/akka/pattern/CircuitBreaker.scala | 75 ++++++++++++++++--- 2 files changed, 67 insertions(+), 10 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala index 49433b5d80..c103478f22 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala @@ -74,7 +74,7 @@ object CircuitBreakerSpec { new Breaker(new CircuitBreaker(system.scheduler, 5, 200.millis.dilated, 500.millis.dilated)) def nonOneFactorCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker = - new Breaker(new CircuitBreaker(system.scheduler, 1, 2000.millis.dilated, 1000.millis.dilated, 1.day.dilated, 5)) + new Breaker(new CircuitBreaker(system.scheduler, 1, 2000.millis.dilated, 1000.millis.dilated, 1.day.dilated, 5, 0)) val evenNumberIsFailure: Try[Int] => Boolean = { case Success(i) => i % 2 == 0 diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index ca4b06c693..0294b1c350 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -5,7 +5,7 @@ package akka.pattern import java.util.Optional -import java.util.concurrent.{ Callable, CompletionStage, CopyOnWriteArrayList } +import java.util.concurrent.{ Callable, CompletionStage, CopyOnWriteArrayList, ThreadLocalRandom } import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong } import java.util.function.BiFunction import java.util.function.Consumer @@ -19,7 +19,6 @@ import scala.util.control.NoStackTrace import scala.util.control.NonFatal import com.github.ghik.silencer.silent - import akka.AkkaException import akka.actor.Scheduler import akka.dispatch.ExecutionContexts.parasitic @@ -131,6 +130,10 @@ object CircuitBreaker { * @param maxFailures Maximum number of failures before opening the circuit * @param callTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to consider a call a failure * @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit + * @param randomFactor after calculation of the exponential back-off an additional random delay + * based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * randomFactor should be in range `0.0` (inclusive) and `1.0` (inclusive). + * In order to skip this additional delay pass in `0`. * @param executor [[scala.concurrent.ExecutionContext]] used for execution of state transition listeners */ class CircuitBreaker( @@ -139,10 +142,12 @@ class CircuitBreaker( callTimeout: FiniteDuration, val resetTimeout: FiniteDuration, maxResetTimeout: FiniteDuration, - exponentialBackoffFactor: Double)(implicit executor: ExecutionContext) + exponentialBackoffFactor: Double, + randomFactor: Double)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker { - require(exponentialBackoffFactor >= 1.0, "factor must be >= 1.0") + require(exponentialBackoffFactor >= 1.0, "exponentialBackoffFactor must be >= 1.0") + require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0") @deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12") def this( @@ -151,7 +156,14 @@ class CircuitBreaker( maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration) = { - this(scheduler, maxFailures, callTimeout, resetTimeout, 36500.days, 1.0)(executor) + this( + scheduler, + maxFailures, + callTimeout, + resetTimeout, + maxResetTimeout = 36500.days, + exponentialBackoffFactor = 1.0, + randomFactor = 0.0)(executor) } def this( @@ -160,14 +172,41 @@ class CircuitBreaker( maxFailures: Int, callTimeout: java.time.Duration, resetTimeout: java.time.Duration) = { - this(scheduler, maxFailures, callTimeout.asScala, resetTimeout.asScala, 36500.days, 1.0)(executor) + this( + scheduler, + maxFailures, + callTimeout.asScala, + resetTimeout.asScala, + maxResetTimeout = 36500.days, + exponentialBackoffFactor = 1.0, + randomFactor = 0.0)(executor) } // add the old constructor to make it binary compatible def this(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)( implicit executor: ExecutionContext) = { - this(scheduler, maxFailures, callTimeout, resetTimeout, 36500.days, 1.0)(executor) + this( + scheduler, + maxFailures, + callTimeout, + resetTimeout, + maxResetTimeout = 36500.days, + exponentialBackoffFactor = 1.0, + randomFactor = 0.0)(executor) + } + + // add the old constructor to make it binary compatible + def this( + scheduler: Scheduler, + maxFailures: Int, + callTimeout: FiniteDuration, + resetTimeout: FiniteDuration, + maxResetTimeout: FiniteDuration, + exponentialBackoffFactor: Double)( + implicit + executor: ExecutionContext) = { + this(scheduler, maxFailures, callTimeout, resetTimeout, maxResetTimeout, exponentialBackoffFactor, 0.0)(executor) } /** @@ -177,7 +216,7 @@ class CircuitBreaker( * @param maxResetTimeout the upper bound of resetTimeout */ def withExponentialBackoff(maxResetTimeout: FiniteDuration): CircuitBreaker = { - new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout, maxResetTimeout, 2.0)(executor) + new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout, maxResetTimeout, 2.0, randomFactor)(executor) } /** @@ -190,6 +229,23 @@ class CircuitBreaker( withExponentialBackoff(maxResetTimeout.asScala) } + /** + * Adds jitter to the delay. + * @param randomFactor after calculation of the back-off an additional random delay based on this + * factor is added, e.g. 0.2 adds up to 20% delay. In order to skip this + * additional delay pass in 0. + */ + def withRandomFactor(randomFactor: Double): CircuitBreaker = { + new CircuitBreaker( + scheduler, + maxFailures, + callTimeout, + resetTimeout, + maxResetTimeout, + exponentialBackoffFactor, + randomFactor)(executor) + } + /** * Holds reference to current state of CircuitBreaker - *access only via helper methods* */ @@ -983,7 +1039,8 @@ class CircuitBreaker( scheduler.scheduleOnce(currentResetTimeout) { attemptReset() } - val nextResetTimeout = currentResetTimeout * exponentialBackoffFactor match { + val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor + val nextResetTimeout = currentResetTimeout * exponentialBackoffFactor * rnd match { case f: FiniteDuration => f case _ => currentResetTimeout }