Add random factor for CircuitBreaker (#29478)

* Multiply reset timeout with randomFactor
* add old constructor to make it binary compatible
* add scaladoc for randomFactor
This commit is contained in:
Chiyu Zhong 2020-08-14 16:48:19 +08:00 committed by GitHub
parent 4b08c7e6c3
commit 94dc84d5d0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 67 additions and 10 deletions

View file

@ -74,7 +74,7 @@ object CircuitBreakerSpec {
new Breaker(new CircuitBreaker(system.scheduler, 5, 200.millis.dilated, 500.millis.dilated))
def nonOneFactorCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker =
new Breaker(new CircuitBreaker(system.scheduler, 1, 2000.millis.dilated, 1000.millis.dilated, 1.day.dilated, 5))
new Breaker(new CircuitBreaker(system.scheduler, 1, 2000.millis.dilated, 1000.millis.dilated, 1.day.dilated, 5, 0))
val evenNumberIsFailure: Try[Int] => Boolean = {
case Success(i) => i % 2 == 0

View file

@ -5,7 +5,7 @@
package akka.pattern
import java.util.Optional
import java.util.concurrent.{ Callable, CompletionStage, CopyOnWriteArrayList }
import java.util.concurrent.{ Callable, CompletionStage, CopyOnWriteArrayList, ThreadLocalRandom }
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong }
import java.util.function.BiFunction
import java.util.function.Consumer
@ -19,7 +19,6 @@ import scala.util.control.NoStackTrace
import scala.util.control.NonFatal
import com.github.ghik.silencer.silent
import akka.AkkaException
import akka.actor.Scheduler
import akka.dispatch.ExecutionContexts.parasitic
@ -131,6 +130,10 @@ object CircuitBreaker {
* @param maxFailures Maximum number of failures before opening the circuit
* @param callTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to consider a call a failure
* @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit
* @param randomFactor after calculation of the exponential back-off an additional random delay
* based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* randomFactor should be in range `0.0` (inclusive) and `1.0` (inclusive).
* In order to skip this additional delay pass in `0`.
* @param executor [[scala.concurrent.ExecutionContext]] used for execution of state transition listeners
*/
class CircuitBreaker(
@ -139,10 +142,12 @@ class CircuitBreaker(
callTimeout: FiniteDuration,
val resetTimeout: FiniteDuration,
maxResetTimeout: FiniteDuration,
exponentialBackoffFactor: Double)(implicit executor: ExecutionContext)
exponentialBackoffFactor: Double,
randomFactor: Double)(implicit executor: ExecutionContext)
extends AbstractCircuitBreaker {
require(exponentialBackoffFactor >= 1.0, "factor must be >= 1.0")
require(exponentialBackoffFactor >= 1.0, "exponentialBackoffFactor must be >= 1.0")
require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0")
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.12")
def this(
@ -151,7 +156,14 @@ class CircuitBreaker(
maxFailures: Int,
callTimeout: FiniteDuration,
resetTimeout: FiniteDuration) = {
this(scheduler, maxFailures, callTimeout, resetTimeout, 36500.days, 1.0)(executor)
this(
scheduler,
maxFailures,
callTimeout,
resetTimeout,
maxResetTimeout = 36500.days,
exponentialBackoffFactor = 1.0,
randomFactor = 0.0)(executor)
}
def this(
@ -160,14 +172,41 @@ class CircuitBreaker(
maxFailures: Int,
callTimeout: java.time.Duration,
resetTimeout: java.time.Duration) = {
this(scheduler, maxFailures, callTimeout.asScala, resetTimeout.asScala, 36500.days, 1.0)(executor)
this(
scheduler,
maxFailures,
callTimeout.asScala,
resetTimeout.asScala,
maxResetTimeout = 36500.days,
exponentialBackoffFactor = 1.0,
randomFactor = 0.0)(executor)
}
// add the old constructor to make it binary compatible
def this(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)(
implicit
executor: ExecutionContext) = {
this(scheduler, maxFailures, callTimeout, resetTimeout, 36500.days, 1.0)(executor)
this(
scheduler,
maxFailures,
callTimeout,
resetTimeout,
maxResetTimeout = 36500.days,
exponentialBackoffFactor = 1.0,
randomFactor = 0.0)(executor)
}
// add the old constructor to make it binary compatible
def this(
scheduler: Scheduler,
maxFailures: Int,
callTimeout: FiniteDuration,
resetTimeout: FiniteDuration,
maxResetTimeout: FiniteDuration,
exponentialBackoffFactor: Double)(
implicit
executor: ExecutionContext) = {
this(scheduler, maxFailures, callTimeout, resetTimeout, maxResetTimeout, exponentialBackoffFactor, 0.0)(executor)
}
/**
@ -177,7 +216,7 @@ class CircuitBreaker(
* @param maxResetTimeout the upper bound of resetTimeout
*/
def withExponentialBackoff(maxResetTimeout: FiniteDuration): CircuitBreaker = {
new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout, maxResetTimeout, 2.0)(executor)
new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout, maxResetTimeout, 2.0, randomFactor)(executor)
}
/**
@ -190,6 +229,23 @@ class CircuitBreaker(
withExponentialBackoff(maxResetTimeout.asScala)
}
/**
* Adds jitter to the delay.
* @param randomFactor after calculation of the back-off an additional random delay based on this
* factor is added, e.g. 0.2 adds up to 20% delay. In order to skip this
* additional delay pass in 0.
*/
def withRandomFactor(randomFactor: Double): CircuitBreaker = {
new CircuitBreaker(
scheduler,
maxFailures,
callTimeout,
resetTimeout,
maxResetTimeout,
exponentialBackoffFactor,
randomFactor)(executor)
}
/**
* Holds reference to current state of CircuitBreaker - *access only via helper methods*
*/
@ -983,7 +1039,8 @@ class CircuitBreaker(
scheduler.scheduleOnce(currentResetTimeout) {
attemptReset()
}
val nextResetTimeout = currentResetTimeout * exponentialBackoffFactor match {
val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor
val nextResetTimeout = currentResetTimeout * exponentialBackoffFactor * rnd match {
case f: FiniteDuration => f
case _ => currentResetTimeout
}