diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala index 377dbc978c..878c29adbf 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala @@ -40,6 +40,9 @@ object CircuitBreakerSpec { def multiFailureCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker = new Breaker(new CircuitBreaker(system.scheduler, 5, 200.millis.dilated, 500.millis.dilated)) + + def nonOneFactorCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker = + new Breaker(new CircuitBreaker(system.scheduler, 1, 2000.millis.dilated, 1000.millis.dilated, 1.day.dilated, 5)) } class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter { @@ -209,6 +212,28 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter { breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.halfOpenLatch) } + + "increase the reset timeout after it transits to open again" in { + val breaker = CircuitBreakerSpec.nonOneFactorCb() + breaker().withCircuitBreaker(Future(throwException)) + checkLatch(breaker.openLatch) + + val e1 = intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) } + val shortRemainingDuration = e1.remainingDuration + + Thread.sleep(1000.millis.dilated.toMillis) + checkLatch(breaker.halfOpenLatch) + + // transit to open again + breaker().withCircuitBreaker(Future(throwException)) + checkLatch(breaker.openLatch) + + val e2 = intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) } + val longRemainingDuration = e2.remainingDuration + + (shortRemainingDuration < longRemainingDuration) should ===(true) + + } } "An asynchronous circuit breaker that is half-open" must { diff --git a/akka-actor/src/main/java/akka/pattern/AbstractCircuitBreaker.java b/akka-actor/src/main/java/akka/pattern/AbstractCircuitBreaker.java index 45fbc8b3ab..5dc9c24c46 100644 --- a/akka-actor/src/main/java/akka/pattern/AbstractCircuitBreaker.java +++ b/akka-actor/src/main/java/akka/pattern/AbstractCircuitBreaker.java @@ -7,10 +7,12 @@ import akka.util.Unsafe; class AbstractCircuitBreaker { protected final static long stateOffset; + protected final static long resetTimeoutOffset; static { try { stateOffset = Unsafe.instance.objectFieldOffset(CircuitBreaker.class.getDeclaredField("_currentStateDoNotCallMeDirectly")); + resetTimeoutOffset = Unsafe.instance.objectFieldOffset(CircuitBreaker.class.getDeclaredField("_currentResetTimeoutDoNotCallMeDirectly")); } catch(Throwable t){ throw new ExceptionInInitializerError(t); } diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index 9a09ba483a..ee16d2cddd 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -18,7 +18,6 @@ import scala.concurrent.TimeoutException import scala.util.control.NonFatal import scala.util.Success import akka.dispatch.ExecutionContexts.sameThreadExecutionContext -import akka.japi.function.Creator import scala.compat.java8.FutureConverters @@ -41,7 +40,6 @@ object CircuitBreaker { */ def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker = new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(sameThreadExecutionContext) - /** * Java API: Create a new CircuitBreaker. * @@ -71,17 +69,39 @@ object CircuitBreaker { * closed state. If it fails, the circuit breaker will re-open to open state. All calls beyond the first that * execute while the first is running will fail-fast with an exception. * - * * @param scheduler Reference to Akka scheduler * @param maxFailures Maximum number of failures before opening the circuit * @param callTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to consider a call a failure * @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit * @param executor [[scala.concurrent.ExecutionContext]] used for execution of state transition listeners */ -class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker { +class CircuitBreaker( + scheduler: Scheduler, + maxFailures: Int, + callTimeout: FiniteDuration, + resetTimeout: FiniteDuration, + maxResetTimeout: FiniteDuration, + exponentialBackoffFactor: Double)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker { + + require(exponentialBackoffFactor >= 1.0, "factor must be >= 1.0") def this(executor: ExecutionContext, scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration) = { - this(scheduler, maxFailures, callTimeout, resetTimeout)(executor) + this(scheduler, maxFailures, callTimeout, resetTimeout, 36500.days, 1.0)(executor) + } + + // add the old constructor to make it binary compatible + def this(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)(implicit executor: ExecutionContext) = { + this(scheduler, maxFailures, callTimeout, resetTimeout, 36500.days, 1.0)(executor) + } + + /** + * The `resetTimeout` will be increased exponentially for each failed attempt to close the circuit. + * The default exponential backoff factor is 2. + * + * @param maxResetTimeout the upper bound of resetTimeout + */ + def withExponentialBackoff(maxResetTimeout: FiniteDuration): CircuitBreaker = { + new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout, maxResetTimeout, 2.0)(executor) } /** @@ -90,6 +110,12 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite @volatile private[this] var _currentStateDoNotCallMeDirectly: State = Closed + /** + * Holds reference to current resetTimeout of CircuitBreaker - *access only via helper methods* + */ + @volatile + private[this] var _currentResetTimeoutDoNotCallMeDirectly: FiniteDuration = resetTimeout + /** * Helper method for access to underlying state via Unsafe * @@ -110,6 +136,20 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite private[this] def currentState: State = Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.stateOffset).asInstanceOf[State] + /** + * Helper method for updating the underlying resetTimeout via Unsafe + */ + @inline + private[this] def swapResetTimeout(oldResetTimeout: FiniteDuration, newResetTimeout: FiniteDuration): Boolean = + Unsafe.instance.compareAndSwapObject(this, AbstractCircuitBreaker.resetTimeoutOffset, oldResetTimeout, newResetTimeout) + + /** + * Helper method for accessing to the underlying resetTimeout via Unsafe + */ + @inline + private[this] def currentResetTimeout: FiniteDuration = + Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.resetTimeoutOffset).asInstanceOf[FiniteDuration] + /** * Wraps invocations of asynchronous calls that need to be protected * @@ -451,11 +491,14 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite override def callFails(): Unit = if (incrementAndGet() == maxFailures) tripBreaker(Closed) /** - * On entry of this state, failure count is reset. + * On entry of this state, failure count and resetTimeout is reset. * * @return */ - override def _enter(): Unit = set(0) + override def _enter(): Unit = { + set(0) + swapResetTimeout(currentResetTimeout, resetTimeout) + } /** * Override for more descriptive toString @@ -530,7 +573,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite */ private def remainingDuration(): FiniteDuration = { val fromOpened = System.nanoTime() - get - val diff = resetTimeout.toNanos - fromOpened + val diff = currentResetTimeout.toNanos - fromOpened if (diff <= 0L) Duration.Zero else diff.nanos } @@ -557,9 +600,16 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite */ override def _enter(): Unit = { set(System.nanoTime()) - scheduler.scheduleOnce(resetTimeout) { + scheduler.scheduleOnce(currentResetTimeout) { attemptReset() } + val nextResetTimeout = currentResetTimeout * exponentialBackoffFactor match { + case f: FiniteDuration ⇒ f + case _ ⇒ currentResetTimeout + } + + if (nextResetTimeout < maxResetTimeout) + swapResetTimeout(currentResetTimeout, nextResetTimeout) } /** diff --git a/akka-docs/rst/common/circuitbreaker.rst b/akka-docs/rst/common/circuitbreaker.rst index 052a9cbfec..79528ef65f 100644 --- a/akka-docs/rst/common/circuitbreaker.rst +++ b/akka-docs/rst/common/circuitbreaker.rst @@ -45,8 +45,8 @@ What do they do? * In `Half-Open` state: * The first call attempted is allowed through without failing fast * All other calls fail-fast with an exception just as in `Open` state - * If the first call succeeds, the breaker is reset back to `Closed` state - * If the first call fails, the breaker is tripped again into the `Open` state for another full `resetTimeout` + * If the first call succeeds, the breaker is reset back to `Closed` state and the `resetTimeout` is reset + * If the first call fails, the breaker is tripped again into the `Open` state (as for exponential backoff circuit breaker, the `resetTimeout` is multiplied by the exponential backoff factor) * State transition listeners: * Callbacks can be provided for every state entry via `onOpen`, `onClose`, and `onHalfOpen` * These are executed in the :class:`ExecutionContext` provided.