exponential backoff in circuit breaker #21036
This commit is contained in:
parent
0b8d3c1f00
commit
df4a6270e6
4 changed files with 88 additions and 11 deletions
|
|
@ -40,6 +40,9 @@ object CircuitBreakerSpec {
|
|||
|
||||
def multiFailureCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker =
|
||||
new Breaker(new CircuitBreaker(system.scheduler, 5, 200.millis.dilated, 500.millis.dilated))
|
||||
|
||||
def nonOneFactorCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker =
|
||||
new Breaker(new CircuitBreaker(system.scheduler, 1, 2000.millis.dilated, 1000.millis.dilated, 1.day.dilated, 5))
|
||||
}
|
||||
|
||||
class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter {
|
||||
|
|
@ -209,6 +212,28 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter {
|
|||
breaker().withCircuitBreaker(Future(throwException))
|
||||
checkLatch(breaker.halfOpenLatch)
|
||||
}
|
||||
|
||||
"increase the reset timeout after it transits to open again" in {
|
||||
val breaker = CircuitBreakerSpec.nonOneFactorCb()
|
||||
breaker().withCircuitBreaker(Future(throwException))
|
||||
checkLatch(breaker.openLatch)
|
||||
|
||||
val e1 = intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) }
|
||||
val shortRemainingDuration = e1.remainingDuration
|
||||
|
||||
Thread.sleep(1000.millis.dilated.toMillis)
|
||||
checkLatch(breaker.halfOpenLatch)
|
||||
|
||||
// transit to open again
|
||||
breaker().withCircuitBreaker(Future(throwException))
|
||||
checkLatch(breaker.openLatch)
|
||||
|
||||
val e2 = intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) }
|
||||
val longRemainingDuration = e2.remainingDuration
|
||||
|
||||
(shortRemainingDuration < longRemainingDuration) should ===(true)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
"An asynchronous circuit breaker that is half-open" must {
|
||||
|
|
|
|||
|
|
@ -7,10 +7,12 @@ import akka.util.Unsafe;
|
|||
|
||||
class AbstractCircuitBreaker {
|
||||
protected final static long stateOffset;
|
||||
protected final static long resetTimeoutOffset;
|
||||
|
||||
static {
|
||||
try {
|
||||
stateOffset = Unsafe.instance.objectFieldOffset(CircuitBreaker.class.getDeclaredField("_currentStateDoNotCallMeDirectly"));
|
||||
resetTimeoutOffset = Unsafe.instance.objectFieldOffset(CircuitBreaker.class.getDeclaredField("_currentResetTimeoutDoNotCallMeDirectly"));
|
||||
} catch(Throwable t){
|
||||
throw new ExceptionInInitializerError(t);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ import scala.concurrent.TimeoutException
|
|||
import scala.util.control.NonFatal
|
||||
import scala.util.Success
|
||||
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext
|
||||
import akka.japi.function.Creator
|
||||
|
||||
import scala.compat.java8.FutureConverters
|
||||
|
||||
|
|
@ -41,7 +40,6 @@ object CircuitBreaker {
|
|||
*/
|
||||
def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker =
|
||||
new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(sameThreadExecutionContext)
|
||||
|
||||
/**
|
||||
* Java API: Create a new CircuitBreaker.
|
||||
*
|
||||
|
|
@ -71,17 +69,39 @@ object CircuitBreaker {
|
|||
* closed state. If it fails, the circuit breaker will re-open to open state. All calls beyond the first that
|
||||
* execute while the first is running will fail-fast with an exception.
|
||||
*
|
||||
*
|
||||
* @param scheduler Reference to Akka scheduler
|
||||
* @param maxFailures Maximum number of failures before opening the circuit
|
||||
* @param callTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to consider a call a failure
|
||||
* @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit
|
||||
* @param executor [[scala.concurrent.ExecutionContext]] used for execution of state transition listeners
|
||||
*/
|
||||
class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker {
|
||||
class CircuitBreaker(
|
||||
scheduler: Scheduler,
|
||||
maxFailures: Int,
|
||||
callTimeout: FiniteDuration,
|
||||
resetTimeout: FiniteDuration,
|
||||
maxResetTimeout: FiniteDuration,
|
||||
exponentialBackoffFactor: Double)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker {
|
||||
|
||||
require(exponentialBackoffFactor >= 1.0, "factor must be >= 1.0")
|
||||
|
||||
def this(executor: ExecutionContext, scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration) = {
|
||||
this(scheduler, maxFailures, callTimeout, resetTimeout)(executor)
|
||||
this(scheduler, maxFailures, callTimeout, resetTimeout, 36500.days, 1.0)(executor)
|
||||
}
|
||||
|
||||
// add the old constructor to make it binary compatible
|
||||
def this(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)(implicit executor: ExecutionContext) = {
|
||||
this(scheduler, maxFailures, callTimeout, resetTimeout, 36500.days, 1.0)(executor)
|
||||
}
|
||||
|
||||
/**
|
||||
* The `resetTimeout` will be increased exponentially for each failed attempt to close the circuit.
|
||||
* The default exponential backoff factor is 2.
|
||||
*
|
||||
* @param maxResetTimeout the upper bound of resetTimeout
|
||||
*/
|
||||
def withExponentialBackoff(maxResetTimeout: FiniteDuration): CircuitBreaker = {
|
||||
new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout, maxResetTimeout, 2.0)(executor)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -90,6 +110,12 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
|
|||
@volatile
|
||||
private[this] var _currentStateDoNotCallMeDirectly: State = Closed
|
||||
|
||||
/**
|
||||
* Holds reference to current resetTimeout of CircuitBreaker - *access only via helper methods*
|
||||
*/
|
||||
@volatile
|
||||
private[this] var _currentResetTimeoutDoNotCallMeDirectly: FiniteDuration = resetTimeout
|
||||
|
||||
/**
|
||||
* Helper method for access to underlying state via Unsafe
|
||||
*
|
||||
|
|
@ -110,6 +136,20 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
|
|||
private[this] def currentState: State =
|
||||
Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.stateOffset).asInstanceOf[State]
|
||||
|
||||
/**
|
||||
* Helper method for updating the underlying resetTimeout via Unsafe
|
||||
*/
|
||||
@inline
|
||||
private[this] def swapResetTimeout(oldResetTimeout: FiniteDuration, newResetTimeout: FiniteDuration): Boolean =
|
||||
Unsafe.instance.compareAndSwapObject(this, AbstractCircuitBreaker.resetTimeoutOffset, oldResetTimeout, newResetTimeout)
|
||||
|
||||
/**
|
||||
* Helper method for accessing to the underlying resetTimeout via Unsafe
|
||||
*/
|
||||
@inline
|
||||
private[this] def currentResetTimeout: FiniteDuration =
|
||||
Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.resetTimeoutOffset).asInstanceOf[FiniteDuration]
|
||||
|
||||
/**
|
||||
* Wraps invocations of asynchronous calls that need to be protected
|
||||
*
|
||||
|
|
@ -451,11 +491,14 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
|
|||
override def callFails(): Unit = if (incrementAndGet() == maxFailures) tripBreaker(Closed)
|
||||
|
||||
/**
|
||||
* On entry of this state, failure count is reset.
|
||||
* On entry of this state, failure count and resetTimeout is reset.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
override def _enter(): Unit = set(0)
|
||||
override def _enter(): Unit = {
|
||||
set(0)
|
||||
swapResetTimeout(currentResetTimeout, resetTimeout)
|
||||
}
|
||||
|
||||
/**
|
||||
* Override for more descriptive toString
|
||||
|
|
@ -530,7 +573,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
|
|||
*/
|
||||
private def remainingDuration(): FiniteDuration = {
|
||||
val fromOpened = System.nanoTime() - get
|
||||
val diff = resetTimeout.toNanos - fromOpened
|
||||
val diff = currentResetTimeout.toNanos - fromOpened
|
||||
if (diff <= 0L) Duration.Zero
|
||||
else diff.nanos
|
||||
}
|
||||
|
|
@ -557,9 +600,16 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
|
|||
*/
|
||||
override def _enter(): Unit = {
|
||||
set(System.nanoTime())
|
||||
scheduler.scheduleOnce(resetTimeout) {
|
||||
scheduler.scheduleOnce(currentResetTimeout) {
|
||||
attemptReset()
|
||||
}
|
||||
val nextResetTimeout = currentResetTimeout * exponentialBackoffFactor match {
|
||||
case f: FiniteDuration ⇒ f
|
||||
case _ ⇒ currentResetTimeout
|
||||
}
|
||||
|
||||
if (nextResetTimeout < maxResetTimeout)
|
||||
swapResetTimeout(currentResetTimeout, nextResetTimeout)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -45,8 +45,8 @@ What do they do?
|
|||
* In `Half-Open` state:
|
||||
* The first call attempted is allowed through without failing fast
|
||||
* All other calls fail-fast with an exception just as in `Open` state
|
||||
* If the first call succeeds, the breaker is reset back to `Closed` state
|
||||
* If the first call fails, the breaker is tripped again into the `Open` state for another full `resetTimeout`
|
||||
* If the first call succeeds, the breaker is reset back to `Closed` state and the `resetTimeout` is reset
|
||||
* If the first call fails, the breaker is tripped again into the `Open` state (as for exponential backoff circuit breaker, the `resetTimeout` is multiplied by the exponential backoff factor)
|
||||
* State transition listeners:
|
||||
* Callbacks can be provided for every state entry via `onOpen`, `onClose`, and `onHalfOpen`
|
||||
* These are executed in the :class:`ExecutionContext` provided.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue