* Add Circuit Breaker to akka.pattern for general use. Circuit breaker implementation as described by Michael T. Nygard in Release It!. Fixes #1734

* Uses finite state machine for three states: Closed, Open, Half-Open
    * Closed state allows calls through, and on sequential failures exceeding the max# set - transitions to Open state. Intervening successes cause the failure count to reset to 0
    * Open state throws a CircuitOpenException on every call until the reset timeout is reached which causes a transition to Half-Open state
    * Half-Open state will allow the next single call through, if it succeeds - transition to Closed state, if it fails - transition back to Open state, starting the reset timer again
  * Allow configuration for the call and reset timeouts, as well as the maximum number of sequential failures before opening
  * Supports async or synchronous call protection
  * Callbacks are supported for state entry into Closed, Open, Half-Open.  These are run in the supplied execution context
  * Both thrown exceptions and calls exceeding max call time are considered failures
  * Uses akka scheduler for timer events
  * Integrated into File-Based durable mailbox
  * Sample documented for other durable mailboxes
This commit is contained in:
Brian Scully 2012-06-01 08:24:47 -04:00
parent 85c263e077
commit 6a415f0e9b
15 changed files with 1266 additions and 25 deletions

View file

@ -0,0 +1,121 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.pattern
import akka.testkit._
import akka.util.duration._
import org.scalatest.BeforeAndAfter
import akka.dispatch.{Promise, Await, Future}
class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter {
@volatile
var breakers: BreakerState = null
class BreakerState {
val halfOpenLatch = new TestLatch(1)
val breaker = new CircuitBreaker(system.scheduler,5,100.millis.dilated,500.millis.dilated)
.onHalfOpen(halfOpenLatch.countDown())
}
before {
breakers = new BreakerState()
}
def unreliableCall(param: String) = {
param match {
case "fail" => throw new RuntimeException("FAIL")
case _ => param
}
}
def openBreaker: Unit = {
for (i <- 1 to 5)
Await.result(breakers.breaker.withCircuitBreaker(Future(unreliableCall("fail"))) recoverWith {
case _ => Promise.successful("OK")
}, 1.second.dilated)
}
"A circuit breaker being called by many threads" must {
"allow many calls while in closed state with no errors" in {
val futures = for (i <- 1 to 100) yield breakers.breaker.withCircuitBreaker(Future {Thread.sleep(10); unreliableCall("succeed")})
val futureList = Future.sequence(futures)
val result = Await.result(futureList, 1.second.dilated)
result.size must be (100)
result.distinct.size must be (1)
result.distinct must contain ("succeed")
}
"transition to open state upon reaching failure limit and fail-fast" in {
openBreaker
val futures = for (i <- 1 to 100) yield breakers.breaker.withCircuitBreaker(Future {
Thread.sleep(10); unreliableCall("success")
}) recoverWith {
case _: CircuitBreakerOpenException => Promise.successful("CBO")
}
val futureList = Future.sequence(futures)
val result = Await.result(futureList, 1.second.dilated)
result.size must be (100)
result.distinct.size must be (1)
result.distinct must contain ("CBO")
}
"allow a single call through in half-open state" in {
openBreaker
Await.ready(breakers.halfOpenLatch, 2.seconds.dilated)
val futures = for (i <- 1 to 100) yield breakers.breaker.withCircuitBreaker(Future {
Thread.sleep(10); unreliableCall("succeed")
}) recoverWith {
case _: CircuitBreakerOpenException => Promise.successful("CBO")
}
val futureList = Future.sequence(futures)
val result = Await.result(futureList, 1.second.dilated)
result.size must be (100)
result.distinct.size must be (2)
result.distinct must contain ("succeed")
result.distinct must contain ("CBO")
}
"recover and reset the breaker after the reset timeout" in {
openBreaker
Await.ready(breakers.halfOpenLatch, 2.seconds.dilated)
Await.ready(breakers.breaker.withCircuitBreaker(Future(unreliableCall("succeed"))), 1.second.dilated)
val futures = for (i <- 1 to 100) yield breakers.breaker.withCircuitBreaker(Future {
Thread.sleep(10); unreliableCall("succeed")
}) recoverWith {
case _: CircuitBreakerOpenException => Promise.successful("CBO")
}
val futureList = Future.sequence(futures)
val result = Await.result(futureList, 1.second.dilated)
result.size must be (100)
result.distinct.size must be (1)
result.distinct must contain ("succeed")
}
}
}

View file

@ -0,0 +1,243 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.pattern
import akka.util.duration._
import akka.testkit._
import org.scalatest.BeforeAndAfter
import akka.dispatch.Future
import akka.dispatch.Await
object CircuitBreakerSpec {
class TestException extends RuntimeException
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter {
import CircuitBreakerSpec.TestException
val awaitTimeout = 2.seconds.dilated
@volatile
var breakers: TestCircuitBreakers = null
class TestCircuitBreakers {
val halfOpenLatch = new TestLatch(1)
val openLatch = new TestLatch(1)
val closedLatch = new TestLatch(1)
val shortCallTimeoutCb = new CircuitBreaker(system.scheduler, 1, 50.millis.dilated, 500.millis.dilated)
.onClose(closedLatch.countDown())
.onHalfOpen(halfOpenLatch.countDown())
.onOpen(openLatch.countDown())
val shortResetTimeoutCb = new CircuitBreaker(system.scheduler, 1, 1000.millis.dilated, 50.millis.dilated)
.onClose(closedLatch.countDown())
.onHalfOpen(halfOpenLatch.countDown())
.onOpen(openLatch.countDown())
val longCallTimeoutCb = new CircuitBreaker(system.scheduler, 1, 5 seconds, 500.millis.dilated)
.onClose(closedLatch.countDown())
.onHalfOpen(halfOpenLatch.countDown())
.onOpen(openLatch.countDown())
val longResetTimeoutCb = new CircuitBreaker(system.scheduler, 1, 100.millis.dilated, 5 seconds)
.onClose(closedLatch.countDown())
.onHalfOpen(halfOpenLatch.countDown())
.onOpen(openLatch.countDown())
val multiFailureCb = new CircuitBreaker(system.scheduler, 5, 200.millis.dilated, 500.millis.dilated)
.onClose(closedLatch.countDown())
.onHalfOpen(halfOpenLatch.countDown())
.onOpen(openLatch.countDown())
}
before {
breakers = new TestCircuitBreakers
}
def checkLatch(latch: TestLatch) {
Await.ready(latch, awaitTimeout)
}
def throwException = throw new TestException
def sayHi = "hi"
"A synchronous circuit breaker that is open" must {
"throw exceptions when called before reset timeout" in {
intercept[TestException] {
breakers.longResetTimeoutCb.withSyncCircuitBreaker(throwException)
}
checkLatch(breakers.openLatch)
intercept[CircuitBreakerOpenException] {
breakers.longResetTimeoutCb.withSyncCircuitBreaker(sayHi)
}
}
"transition to half-open on reset timeout" in {
intercept[TestException] {
breakers.shortResetTimeoutCb.withSyncCircuitBreaker(throwException)
}
checkLatch(breakers.halfOpenLatch)
}
}
"A synchronous circuit breaker that is half-open" must {
"pass through next call and close on success" in {
intercept[TestException] {
breakers.shortResetTimeoutCb.withSyncCircuitBreaker(throwException)
}
checkLatch(breakers.halfOpenLatch)
assert("hi" == breakers.shortResetTimeoutCb.withSyncCircuitBreaker(sayHi))
checkLatch(breakers.closedLatch)
}
"open on exception in call" in {
intercept[TestException] {
breakers.shortResetTimeoutCb.withSyncCircuitBreaker(throwException)
}
checkLatch(breakers.halfOpenLatch)
intercept[TestException] {
breakers.shortResetTimeoutCb.withSyncCircuitBreaker(throwException)
}
checkLatch(breakers.openLatch)
}
}
"A synchronous circuit breaker that is closed" must {
"allow calls through" in {
breakers.longCallTimeoutCb.withSyncCircuitBreaker(sayHi) must be("hi")
}
"increment failure count on failure" in {
intercept[TestException] {
breakers.longCallTimeoutCb.withSyncCircuitBreaker(throwException)
}
checkLatch(breakers.openLatch)
breakers.longCallTimeoutCb.currentFailureCount must be(1)
}
"reset failure count after success" in {
intercept[TestException] {
breakers.multiFailureCb.withSyncCircuitBreaker(throwException)
}
breakers.multiFailureCb.currentFailureCount must be(1)
breakers.multiFailureCb.withSyncCircuitBreaker(sayHi)
breakers.multiFailureCb.currentFailureCount must be(0)
}
"increment failure count on callTimeout" in {
breakers.shortCallTimeoutCb.withSyncCircuitBreaker({
100.millis.dilated.sleep()
})
breakers.shortCallTimeoutCb.currentFailureCount must be(1)
}
}
"An asynchronous circuit breaker that is open" must {
"throw exceptions when called before reset timeout" in {
breakers.longResetTimeoutCb.withCircuitBreaker(Future(throwException))
checkLatch(breakers.openLatch)
intercept[CircuitBreakerOpenException] {
Await.result(
breakers.longResetTimeoutCb.withCircuitBreaker(Future(sayHi)),
awaitTimeout)
}
}
"transition to half-open on reset timeout" in {
breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException))
checkLatch(breakers.halfOpenLatch)
}
}
"An asynchronous circuit breaker that is half-open" must {
"pass through next call and close on success" in {
breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException))
checkLatch(breakers.halfOpenLatch)
Await.result(
breakers.shortResetTimeoutCb.withCircuitBreaker(Future(sayHi)),
awaitTimeout) must be("hi")
checkLatch(breakers.closedLatch)
}
"re-open on exception in call" in {
breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException))
checkLatch(breakers.halfOpenLatch)
intercept[TestException] {
Await.result(
breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException)),
awaitTimeout)
}
checkLatch(breakers.openLatch)
}
"re-open on async failure" in {
breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException))
checkLatch(breakers.halfOpenLatch)
breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException))
checkLatch(breakers.openLatch)
}
}
"An asynchronous circuit breaker that is closed" must {
"allow calls through" in {
Await.result(
breakers.longCallTimeoutCb.withCircuitBreaker(Future(sayHi)),
awaitTimeout) must be("hi")
}
"increment failure count on exception" in {
intercept[TestException] {
Await.result(
breakers.longCallTimeoutCb.withCircuitBreaker(Future(throwException)),
awaitTimeout)
}
checkLatch(breakers.openLatch)
breakers.longCallTimeoutCb.currentFailureCount must be(1)
}
"increment failure count on async failure" in {
breakers.longCallTimeoutCb.withCircuitBreaker(Future(throwException))
checkLatch(breakers.openLatch)
breakers.longCallTimeoutCb.currentFailureCount must be(1)
}
"reset failure count after success" in {
breakers.multiFailureCb.withCircuitBreaker(Future(sayHi))
val latch = TestLatch(4)
for (n 1 to 4) breakers.multiFailureCb.withCircuitBreaker(Future(throwException))
awaitCond(breakers.multiFailureCb.currentFailureCount == 4, awaitTimeout)
breakers.multiFailureCb.withCircuitBreaker(Future(sayHi))
awaitCond(breakers.multiFailureCb.currentFailureCount == 0, awaitTimeout)
}
"increment failure count on callTimeout" in {
breakers.shortCallTimeoutCb.withCircuitBreaker {
Future {
100.millis.dilated.sleep()
sayHi
}
}
checkLatch(breakers.openLatch)
breakers.shortCallTimeoutCb.currentFailureCount must be(1)
}
}
}

View file

@ -0,0 +1,18 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.pattern;
import akka.util.Unsafe;
class AbstractCircuitBreaker {
protected final static long stateOffset;
static {
try {
stateOffset = Unsafe.instance.objectFieldOffset(CircuitBreaker.class.getDeclaredField("_currentStateDoNotCallMeDirectly"));
} catch(Throwable t){
throw new ExceptionInInitializerError(t);
}
}
}

View file

@ -0,0 +1,560 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.pattern
import java.util.concurrent.atomic.{ AtomicInteger, AtomicLong, AtomicBoolean }
import akka.AkkaException
import akka.actor.Scheduler
import akka.dispatch.{ Future, ExecutionContext, Await, Promise }
import akka.util.{ Deadline, Duration, NonFatal, Unsafe }
import akka.util.duration._
import util.control.NoStackTrace
import java.util.concurrent.{ Callable, CopyOnWriteArrayList }
/**
* Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread
*/
object CircuitBreaker {
/**
* Synchronous execution context to run in caller's thread - used by companion object factory methods
*/
private[CircuitBreaker] val syncExecutionContext = new ExecutionContext {
def execute(runnable: Runnable): Unit = runnable.run()
def reportFailure(t: Throwable): Unit = ()
}
/**
* Callbacks run in caller's thread when using withSyncCircuitBreaker, and in same ExecutionContext as the passed
* in Future when using withCircuitBreaker. To use another ExecutionContext for the callbacks you can specify the
* executor in the constructor.
*
* @param scheduler Reference to Akka scheduler
* @param maxFailures Maximum number of failures before opening the circuit
* @param callTimeout [[akka.util.Duration]] of time after which to consider a call a failure
* @param resetTimeout [[akka.util.Duration]] of time after which to attempt to close the circuit
*/
def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration): CircuitBreaker =
new CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration)(syncExecutionContext)
/**
* Java API alias for apply
*
* @param scheduler Reference to Akka scheduler
* @param maxFailures Maximum number of failures before opening the circuit
* @param callTimeout [[akka.util.Duration]] of time after which to consider a call a failure
* @param resetTimeout [[akka.util.Duration]] of time after which to attempt to close the circuit
*/
def create(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration): CircuitBreaker =
apply(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration)
}
/**
* Provides circuit breaker functionality to provide stability when working with "dangerous" operations, e.g. calls to
* remote systems
*
* Transitions through three states:
* - In *Closed* state, calls pass through until the `maxFailures` count is reached. This causes the circuit breaker
* to open. Both exceptions and calls exceeding `callTimeout` are considered failures.
* - In *Open* state, calls fail-fast with an exception. After `resetTimeout`, circuit breaker transitions to
* half-open state.
* - In *Half-Open* state, the first call will be allowed through, if it succeeds the circuit breaker will reset to
* closed state. If it fails, the circuit breaker will re-open to open state. All calls beyond the first that
* execute while the first is running will fail-fast with an exception.
*
*
* @param scheduler Reference to Akka scheduler
* @param maxFailures Maximum number of failures before opening the circuit
* @param callTimeout [[akka.util.Duration]] of time after which to consider a call a failure
* @param resetTimeout [[akka.util.Duration]] of time after which to attempt to close the circuit
* @param executor [[akka.dispatch.ExecutionContext]] used for execution of state transition listeners
*/
class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker {
def this(executor: ExecutionContext, scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration) = {
this(scheduler, maxFailures, callTimeout, resetTimeout)(executor)
}
/**
* Holds reference to current state of CircuitBreaker - *access only via helper methods*
*/
@volatile
private[this] var _currentStateDoNotCallMeDirectly: State = Closed
/**
* Helper method for access to underlying state via Unsafe
*
* @param oldState Previous state on transition
* @param newState Next state on transition
* @return Whether the previous state matched correctly
*/
@inline
private[this] def swapState(oldState: State, newState: State): Boolean =
Unsafe.instance.compareAndSwapObject(this, AbstractCircuitBreaker.stateOffset, oldState, newState)
/**
* Helper method for accessing underlying state via Unsafe
*
* @return Reference to current state
*/
@inline
private[this] def currentState: State =
Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.stateOffset).asInstanceOf[State]
/**
* Wraps invocations of asynchronous calls that need to be protected
*
* @param body Call needing protected
* @tparam T return type from call
* @return [[akka.dispatch.Future]] containing the call result
*/
def withCircuitBreaker[T](body: Future[T]): Future[T] = {
currentState.invoke(body)
}
/**
* Java API for withCircuitBreaker
*
* @param body Call needing protected
* @tparam T return type from call
* @return [[akka.dispatch.Future]] containing the call result
*/
def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = {
withCircuitBreaker(body.call)
}
/**
* Wraps invocations of synchronous calls that need to be protected
*
* Calls are run in caller's thread
*
* @param body Call needing protected
* @tparam T return type from call
* @return The result of the call
*/
def withSyncCircuitBreaker[T](body: T): T = {
Await.result(withCircuitBreaker(
{
try
Promise.successful(body)(CircuitBreaker.syncExecutionContext)
catch {
case NonFatal(t) Promise.failed(t)(CircuitBreaker.syncExecutionContext)
}
}),callTimeout)
}
/**
* Java API for withSyncCircuitBreaker
*
* @param body Call needing protected
* @tparam T return type from call
* @return The result of the call
*/
def callWithSyncCircuitBreaker[T](body: Callable[T]): T = {
withSyncCircuitBreaker(body.call)
}
/**
* Adds a callback to execute when circuit breaker opens
*
* The callback is run in the [[akka.dispatch.ExecutionContext]] supplied in the constructor.
*
* @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage
*/
def onOpen[T](callback: T): CircuitBreaker = {
Open.addListener(() callback)
this
}
/**
* Java API for onOpen
*
* @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage
*/
def onOpen[T](callback: Callable[T]): CircuitBreaker = {
onOpen(callback.call)
}
/**
* Adds a callback to execute when circuit breaker transitions to half-open
*
* The callback is run in the [[akka.dispatch.ExecutionContext]] supplied in the constructor.
*
* @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage
*/
def onHalfOpen[T](callback: T): CircuitBreaker = {
HalfOpen.addListener(() callback)
this
}
/**
* JavaAPI for onHalfOpen
*
* @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage
*/
def onHalfOpen[T](callback: Callable[T]): CircuitBreaker = {
onHalfOpen(callback.call)
}
/**
* Adds a callback to execute when circuit breaker state closes
*
* The callback is run in the [[akka.dispatch.ExecutionContext]] supplied in the constructor.
*
* @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage
*/
def onClose[T](callback: T): CircuitBreaker = {
Closed.addListener(() callback)
this
}
/**
* JavaAPI for onClose
*
* @param callback Handler to be invoked on state change
* @tparam T Type supplied to assist with type inference, otherwise ignored by implementation
* @return CircuitBreaker for fluent usage
*/
def onClose[T](callback: Callable[T]): CircuitBreaker = {
onClose(callback.call)
}
/**
* Retrieves current failure count.
*
* @return count
*/
private[akka] def currentFailureCount: Int = Closed.get
/**
* Implements consistent transition between states
*
* @param fromState State being transitioning from
* @param toState State being transitioning from
* @throws IllegalStateException if an invalid transition is attempted
*/
private def transition(fromState: State, toState: State): Unit = {
if (swapState(fromState, toState))
toState.enter()
else
throw new IllegalStateException("Illegal transition attempted from: " + fromState + " to " + toState)
}
/**
* Trips breaker to an open state. This is valid from Closed or Half-Open states.
*
* @param fromState State we're coming from (Closed or Half-Open)
*/
private def tripBreaker(fromState: State): Unit = {
transition(fromState, Open)
}
/**
* Resets breaker to a closed state. This is valid from an Half-Open state only.
*
*/
private def resetBreaker(): Unit = {
transition(HalfOpen, Closed)
}
/**
* Attempts to reset breaker by transitioning to a half-open state. This is valid from an Open state only.
*
*/
private def attemptReset(): Unit = {
transition(Open, HalfOpen)
}
/**
* Internal state abstraction
*/
private sealed trait State {
private val listeners = new CopyOnWriteArrayList[() _]
/**
* Add a listener function which is invoked on state entry
*
* @param listener listener implementation
* @tparam T return type of listener, not used - but supplied for type inference purposes
*/
def addListener[T](listener: () T) {
listeners add listener
}
/**
* Test for whether listeners exist
*
* @return whether listeners exist
*/
private def hasListeners: Boolean = !listeners.isEmpty
/**
* Notifies the listeners of the transition event via a Future executed in implicit parameter ExecutionContext
*
* @return Promise which executes listener in supplied [[akka.dispatch.ExecutionContext]]
*/
protected def notifyTransitionListeners() {
if (hasListeners) {
val iterator = listeners.iterator
while (iterator.hasNext) {
val listener = iterator.next
//FIXME per @viktorklang: it's a bit wasteful to create Futures for one-offs, just use EC.execute instead
Future(listener())
}
}
}
/**
* Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed
* call timeout is counted as a failed call, otherwise a successful call
*
* @param body Implementation of the call
* @tparam T Return type of the call's implementation
* @return Future containing the result of the call
*/
def callThrough[T](body: Future[T]): Future[T] = {
val deadline = callTimeout.fromNow
val bodyFuture = try body catch {
case NonFatal(t) Promise.failed(t)
}
bodyFuture onFailure {
case _ callFails()
} onSuccess {
case _
if (deadline.isOverdue()) callFails()
else callSucceeds()
}
}
/**
* Abstract entry point for all states
*
* @param body Implementation of the call that needs protected
* @tparam T Return type of protected call
* @return Future containing result of protected call
*/
def invoke[T](body: Future[T]): Future[T]
/**
* Invoked when call succeeds
*
*/
def callSucceeds(): Unit
/**
* Invoked when call fails
*
*/
def callFails(): Unit
/**
* Invoked on the transitioned-to state during transition. Notifies listeners after invoking subclass template
* method _enter
*
*/
final def enter(): Unit = {
_enter()
notifyTransitionListeners()
}
/**
* Template method for concrete traits
*
*/
def _enter(): Unit
}
/**
* Concrete implementation of Closed state
*/
private object Closed extends AtomicInteger with State {
/**
* Implementation of invoke, which simply attempts the call
*
* @param body Implementation of the call that needs protected
* @tparam T Return type of protected call
* @return Future containing result of protected call
*/
override def invoke[T](body: Future[T]): Future[T] = {
callThrough(body)
}
/**
* On successful call, the failure count is reset to 0
*
* @return
*/
override def callSucceeds(): Unit = { set(0) }
/**
* On failed call, the failure count is incremented. The count is checked against the configured maxFailures, and
* the breaker is tripped if we have reached maxFailures.
*
* @return
*/
override def callFails(): Unit = {
if (incrementAndGet() == maxFailures) tripBreaker(Closed)
}
/**
* On entry of this state, failure count is reset.
*
* @return
*/
override def _enter(): Unit = {
set(0)
}
/**
* Override for more descriptive toString
*
* @return
*/
override def toString: String = {
"Closed with failure count = " + get()
}
}
/**
* Concrete implementation of half-open state
*/
private object HalfOpen extends AtomicBoolean(true) with State {
/**
* Allows a single call through, during which all other callers fail-fast. If the call fails, the breaker reopens.
* If the call succeeds the breaker closes.
*
* @param body Implementation of the call that needs protected
* @tparam T Return type of protected call
* @return Future containing result of protected call
*/
override def invoke[T](body: Future[T]): Future[T] = {
if (compareAndSet(true, false))
callThrough(body)
else
Promise.failed[T](new CircuitBreakerOpenException(Duration.Zero))
}
/**
* Reset breaker on successful call.
*
* @return
*/
override def callSucceeds(): Unit = { resetBreaker() }
/**
* Reopen breaker on failed call.
*
* @return
*/
override def callFails(): Unit = { tripBreaker(HalfOpen) }
/**
* On entry, guard should be reset for that first call to get in
*
* @return
*/
override def _enter(): Unit = {
set(true)
}
/**
* Override for more descriptive toString
*
* @return
*/
override def toString: String = {
"Half-Open currently testing call for success = " + get()
}
}
/**
* Concrete implementation of Open state
*/
private object Open extends AtomicLong with State {
/**
* Fail-fast on any invocation
*
* @param body Implementation of the call that needs protected
* @tparam T Return type of protected call
* @return Future containing result of protected call
*/
override def invoke[T](body: Future[T]): Future[T] = {
Promise.failed[T](new CircuitBreakerOpenException(remainingTimeout().timeLeft))
}
/**
* Calculate remaining timeout to inform the caller in case a backoff algorithm is useful
*
* @return [[akka.util.Deadline]] to when the breaker will attempt a reset by transitioning to half-open
*/
private def remainingTimeout(): Deadline = get match {
case 0L Deadline.now
case t (t.millis + resetTimeout).fromNow
}
/**
* No-op for open, calls are never executed so cannot succeed or fail
*
* @return
*/
override def callSucceeds(): Unit = {}
/**
* No-op for open, calls are never executed so cannot succeed or fail
*
* @return
*/
override def callFails(): Unit = {}
/**
* On entering this state, schedule an attempted reset via [[akka.actor.Scheduler]] and store the entry time to
* calculate remaining time before attempted reset.
*
* @return
*/
override def _enter(): Unit = {
set(System.currentTimeMillis)
scheduler.scheduleOnce(resetTimeout) {
attemptReset()
}
}
/**
* Override for more descriptive toString
*
* @return
*/
override def toString: String = {
"Open"
}
}
}
/**
* Exception thrown when Circuit Breaker is open.
*
* @param remainingDuration Stores remaining time before attempting a reset. Zero duration means the breaker is
* currently in half-open state.
* @param message Defaults to "Circuit Breaker is open; calls are failing fast"
*/
class CircuitBreakerOpenException(
val remainingDuration: Duration,
message: String = "Circuit Breaker is open; calls are failing fast")
extends AkkaException(message) with NoStackTrace

View file

@ -0,0 +1,130 @@
.. _circuit-breaker:
###############
Circuit Breaker
###############
==================
Why are they used?
==================
A circuit breaker is used to provide stability and prevent cascading failures in distributed
systems. These should be used in conjunction with judicious timeouts at the interfaces between
remote systems to prevent the failure of a single component from bringing down all components.
As an example, we have a web application interacting with a remote third party web service.
Let's say the third party has oversold their capacity and their database melts down under load.
Assume that the database fails in such a way that it takes a very long time to hand back an
error to the third party web service. This in turn makes calls fail after a long period of
time. Back to our web application, the users have noticed that their form submissions take
much longer seeming to hang. Well the users do what they know to do which is use the refresh
button, adding more requests to their already running requests. This eventually causes the
failure of the web application due to resource exhaustion. This will affect all users, even
those who are not using functionality dependent on this third party web service.
Introducing circuit breakers on the web service call would cause the requests to begin to
fail-fast, letting the user know that something is wrong and that they need not refresh
their request. This also confines the failure behavior to only those users that are using
functionality dependent on the third party, other users are no longer affected as there is no
resource exhaustion. Circuit breakers can also allow savvy developers to mark portions of
the site that use the functionality unavailable, or perhaps show some cached content as
appropriate while the breaker is open.
The Akka library provides an implementation of a circuit breaker called
:class:`akka.pattern.CircuitBreaker` which has the behavior described below.
=================
What do they do?
=================
* During normal operation, a circuit breaker is in the `Closed` state:
* Exceptions or calls exceeding the configured `callTimeout` increment a failure counter
* Successes reset the failure count to zero
* When the failure counter reaches a `maxFailures` count, the breaker is tripped into `Open` state
* While in `Open` state:
* All calls fail-fast with a :class:`CircuitBreakerOpenException`
* After the configured `resetTimeout`, the circuit breaker enters a `Half-Open` state
* In `Half-Open` state:
* The first call attempted is allowed through without failing fast
* All other calls fail-fast with an exception just as in `Open` state
* If the first call succeeds, the breaker is reset back to `Closed` state
* If the first call fails, the breaker is tripped again into the `Open` state for another full `resetTimeout`
* State transition listeners:
* Callbacks can be provided for every state entry via `onOpen`, `onClose`, and `onHalfOpen`
* These are executed in the :class:`ExecutionContext` provided.
.. graphviz::
digraph circuit_breaker {
rankdir = "LR";
size = "6,5";
graph [ bgcolor = "transparent" ]
node [ fontname = "Helvetica",
fontsize = 14,
shape = circle,
color = white,
style = filled ];
edge [ fontname = "Helvetica", fontsize = 12 ]
Closed [ fillcolor = green2 ];
"Half-Open" [fillcolor = yellow2 ];
Open [ fillcolor = red2 ];
Closed -> Closed [ label = "Success" ];
"Half-Open" -> Open [ label = "Trip Breaker" ];
"Half-Open" -> Closed [ label = "Reset Breaker" ];
Closed -> Open [ label = "Trip Breaker" ];
Open -> Open [ label = "Calls failing fast" ];
Open -> "Half-Open" [ label = "Attempt Reset" ];
}
========
Examples
========
--------------
Initialization
--------------
Here's how a :class:`CircuitBreaker` would be configured for:
* 5 maximum failures
* a call timeout of 10 seconds
* a reset timeout of 1 minute
^^^^^^^
Scala
^^^^^^^
.. includecode:: code/docs/circuitbreaker/CircuitBreakerDocSpec.scala
:include: imports1,circuit-breaker-initialization
^^^^^^^
Java
^^^^^^^
.. includecode:: code/docs/circuitbreaker/DangerousJavaActor.java
:include: imports1,circuit-breaker-initialization
---------------
Call Protection
---------------
Here's how the :class:`CircuitBreaker` would be used to protect an asynchronous
call as well as a synchronous one:
^^^^^^^
Scala
^^^^^^^
.. includecode:: code/docs/circuitbreaker/CircuitBreakerDocSpec.scala
:include: circuit-breaker-usage
^^^^^^
Java
^^^^^^
.. includecode:: code/docs/circuitbreaker/DangerousJavaActor.java
:include: circuit-breaker-usage
.. note::
Using the :class:`CircuitBreaker` companion object's `apply` or `create` methods
will return a :class:`CircuitBreaker` where callbacks are executed in the caller's thread.
This can be useful if the asynchronous :class:`Future` behavior is unnecessary, for
example invoking a synchronous-only API.

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.circuitbreaker
//#imports1
import akka.util.duration._ // small d is important here
import akka.pattern.CircuitBreaker
import akka.actor.Actor
import akka.dispatch.Future
import akka.event.Logging
//#imports1
class CircuitBreakerDocSpec { }
//#circuit-breaker-initialization
class DangerousActor extends Actor {
val log = Logging(context.system, this)
implicit val executionContext = context.dispatcher
val breaker =
new CircuitBreaker(context.system.scheduler, 5, 10.seconds, 1.minute)
.onOpen(notifyMeOnOpen)
def notifyMeOnOpen =
log.warning("My CircuitBreaker is now open, and will not close for one minute")
//#circuit-breaker-initialization
//#circuit-breaker-usage
def dangerousCall: String = "This really isn't that dangerous of a call after all"
def receive = {
case "is my middle name" =>
sender ! breaker.withCircuitBreaker(Future(dangerousCall))
case "block for me" =>
sender ! breaker.withSyncCircuitBreaker(dangerousCall)
}
//#circuit-breaker-usage
}

View file

@ -0,0 +1,83 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.circuitbreaker;
//#imports1
import akka.actor.UntypedActor;
import akka.dispatch.Future;
import akka.event.LoggingAdapter;
import akka.util.Duration;
import akka.pattern.CircuitBreaker;
import akka.event.Logging;
import static akka.dispatch.Futures.future;
import java.util.concurrent.Callable;
//#imports1
//#circuit-breaker-initialization
public class DangerousJavaActor extends UntypedActor {
private final CircuitBreaker breaker;
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public DangerousJavaActor() {
this.breaker = new CircuitBreaker(
getContext().dispatcher(), getContext().system().scheduler(),
5, Duration.parse("10s"), Duration.parse("1m"))
.onOpen(new Callable<Object>() {
public Object call() throws Exception {
notifyMeOnOpen();
return null;
}
});
}
public void notifyMeOnOpen() {
log.warning("My CircuitBreaker is now open, and will not close for one minute");
}
//#circuit-breaker-initialization
//#circuit-breaker-usage
public String dangerousCall() {
return "This really isn't that dangerous of a call after all";
}
@Override
public void onReceive(Object message) {
if (message instanceof String) {
String m = (String) message;
if ("is my middle name".equals(m)) {
final Future<String> f = future(
new Callable<String>() {
public String call() {
return dangerousCall();
}
}, getContext().dispatcher());
getSender().tell(breaker
.callWithCircuitBreaker(
new Callable<Future<String>>() {
public Future<String> call() throws Exception {
return f;
}
}));
}
if ("block for me".equals(m)) {
getSender().tell(breaker
.callWithSyncCircuitBreaker(
new Callable<String>() {
@Override
public String call() throws Exception {
return dangerousCall();
}
}));
}
}
}
//#circuit-breaker-usage
}

View file

@ -5,3 +5,4 @@ Common utilities
:maxdepth: 2
duration
circuitbreaker

View file

@ -8,7 +8,7 @@ import sys, os
# -- General configuration -----------------------------------------------------
sys.path.append(os.path.abspath('_sphinx/exts'))
extensions = ['sphinx.ext.todo', 'includecode']
extensions = ['sphinx.ext.todo', 'includecode', 'sphinx.ext.graphviz']
templates_path = ['_templates']
source_suffix = '.rst'

View file

@ -50,6 +50,8 @@ import akka.dispatch.MailboxType
import akka.dispatch.MessageQueue
import akka.actor.mailbox.DurableMessageQueue
import akka.actor.mailbox.DurableMessageSerialization
import akka.pattern.CircuitBreaker
import akka.util.duration._
class MyMailboxType(systemSettings: ActorSystem.Settings, config: Config)
extends MailboxType {
@ -65,20 +67,23 @@ class MyMessageQueue(_owner: ActorContext)
extends DurableMessageQueue(_owner) with DurableMessageSerialization {
val storage = new QueueStorage
// A real-world implmentation would use configuration to set the last
// three parameters below
val breaker = CircuitBreaker(_owner.system.scheduler,5,30.seconds,1.minute)
def enqueue(receiver: ActorRef, envelope: Envelope) {
def enqueue(receiver: ActorRef, envelope: Envelope): Unit = breaker.withSyncCircuitBreaker {
val data: Array[Byte] = serialize(envelope)
storage.push(data)
}
def dequeue(): Envelope = {
def dequeue(): Envelope = breaker.withSyncCircuitBreaker {
val data: Option[Array[Byte]] = storage.pull()
data.map(deserialize).orNull
}
def hasMessages: Boolean = !storage.isEmpty
def hasMessages: Boolean = breaker.withSyncCircuitBreaker { !storage.isEmpty }
def numberOfMessages: Int = storage.size
def numberOfMessages: Int = breaker.withSyncCircuitBreaker { storage.size }
/**
* Called when the mailbox is disposed.

View file

@ -80,7 +80,9 @@ a configurator (MailboxType) and a queue implementation (DurableMessageQueue).
The envelope contains the message sent to the actor, and information about sender. It is the
envelope that needs to be stored. As a help utility you can mixin DurableMessageSerialization
to serialize and deserialize the envelope using the ordinary :ref:`serialization-scala`
mechanism. This optional and you may store the envelope data in any way you like.
mechanism. This optional and you may store the envelope data in any way you like. Durable
mailboxes are an excellent fit for usage of circuit breakers. These are described in the
:ref:`circuit-breaker` documentation.
.. includecode:: code/docs/actor/mailbox/DurableMailboxDocSpec.scala
:include: custom-mailbox

View file

@ -45,7 +45,19 @@ akka {
keep-journal = on
# whether to sync the journal after each transaction
sync-journal = off
sync-journal = off
# circuit breaker configuration
circuit-breaker {
# maximum number of failures before opening breaker
max-failures = 3
# duration of time beyond which a call is assumed to be timed out and considered a failure
call-timeout = 3 seconds
# duration of time to wait until attempting to reset the breaker during which all calls fail-fast
reset-timeout = 30 seconds
}
}
}
}

View file

@ -5,14 +5,14 @@
package akka.actor.mailbox
import akka.actor.ActorContext
import akka.dispatch.{ Envelope, MessageQueue }
import akka.event.Logging
import akka.actor.ActorRef
import akka.dispatch.MailboxType
import com.typesafe.config.Config
import akka.util.NonFatal
import akka.ConfigurationException
import akka.actor.ActorSystem
import akka.dispatch._
import akka.util.{ Duration, NonFatal }
import akka.pattern.{ CircuitBreakerOpenException, CircuitBreaker }
class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType {
private val settings = new FileBasedMailboxSettings(systemSettings, config)
@ -26,6 +26,8 @@ class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailbox
// TODO Is it reasonable for all FileBasedMailboxes to have their own logger?
private val log = Logging(system, "FileBasedMessageQueue")
val breaker = CircuitBreaker(_owner.system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout)
private val queue = try {
(new java.io.File(settings.QueuePath)) match {
case dir if dir.exists && !dir.isDirectory throw new IllegalStateException("Path already occupied by non-directory " + dir)
@ -42,18 +44,28 @@ class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailbox
throw e
}
def enqueue(receiver: ActorRef, envelope: Envelope): Unit = queue.add(serialize(envelope))
def dequeue(): Envelope = try {
queue.remove.map(item { queue.confirmRemove(item.xid); deserialize(item.data) }).orNull
} catch {
case _: java.util.NoSuchElementException null
case NonFatal(e)
log.error(e, "Couldn't dequeue from file-based mailbox")
throw e
def enqueue(receiver: ActorRef, envelope: Envelope) {
breaker.withSyncCircuitBreaker(queue.add(serialize(envelope)))
}
def numberOfMessages: Int = queue.length.toInt
def dequeue(): Envelope = {
breaker.withSyncCircuitBreaker(
try {
queue.remove.map(item { queue.confirmRemove(item.xid); deserialize(item.data) }).orNull
} catch {
case _: java.util.NoSuchElementException null
case e: CircuitBreakerOpenException
log.debug(e.getMessage())
throw e
case NonFatal(e)
log.error(e, "Couldn't dequeue from file-based mailbox, due to [{}]", e.getMessage())
throw e
})
}
def numberOfMessages: Int = {
breaker.withSyncCircuitBreaker(queue.length.toInt)
}
def hasMessages: Boolean = numberOfMessages > 0

View file

@ -29,4 +29,7 @@ class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val use
val KeepJournal: Boolean = getBoolean("keep-journal")
val SyncJournal: Boolean = getBoolean("sync-journal")
val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures")
val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout"))
val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout"))
}

View file

@ -1,7 +1,6 @@
package akka.actor.mailbox
import org.apache.commons.io.FileUtils
import com.typesafe.config.ConfigFactory
import akka.dispatch.Mailbox
object FileBasedMailboxSpec {
@ -10,23 +9,32 @@ object FileBasedMailboxSpec {
mailbox-type = akka.actor.mailbox.FileBasedMailboxType
throughput = 1
file-based.directory-path = "file-based"
file-based.circuit-breaker.max-failures = 5
file-based.circuit-breaker.call-timeout = 5 seconds
}
"""
"""
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSpec.config) {
val queuePath = new FileBasedMailboxSettings(system.settings, system.settings.config.getConfig("File-dispatcher")).QueuePath
val settings = new FileBasedMailboxSettings(system.settings, system.settings.config.getConfig("File-dispatcher"))
"FileBasedMailboxSettings" must {
"read the file-based section" in {
queuePath must be("file-based")
settings.QueuePath must be("file-based")
settings.CircuitBreakerMaxFailures must be(5)
import akka.util.duration._
settings.CircuitBreakerCallTimeout must be(5 seconds)
}
}
def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[FileBasedMessageQueue]
def clean() {
FileUtils.deleteDirectory(new java.io.File(queuePath))
FileUtils.deleteDirectory(new java.io.File(settings.QueuePath))
}
override def atStartup() {