diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala new file mode 100644 index 0000000000..fab1cbab7a --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.pattern + +import akka.testkit._ +import akka.util.duration._ +import org.scalatest.BeforeAndAfter +import akka.dispatch.{Promise, Await, Future} + +class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter { + + @volatile + var breakers: BreakerState = null + + class BreakerState { + + val halfOpenLatch = new TestLatch(1) + + val breaker = new CircuitBreaker(system.scheduler,5,100.millis.dilated,500.millis.dilated) + .onHalfOpen(halfOpenLatch.countDown()) + + } + + before { + breakers = new BreakerState() + } + + def unreliableCall(param: String) = { + param match { + case "fail" => throw new RuntimeException("FAIL") + case _ => param + } + } + + def openBreaker: Unit = { + for (i <- 1 to 5) + Await.result(breakers.breaker.withCircuitBreaker(Future(unreliableCall("fail"))) recoverWith { + case _ => Promise.successful("OK") + }, 1.second.dilated) + } + + "A circuit breaker being called by many threads" must { + "allow many calls while in closed state with no errors" in { + + val futures = for (i <- 1 to 100) yield breakers.breaker.withCircuitBreaker(Future {Thread.sleep(10); unreliableCall("succeed")}) + + val futureList = Future.sequence(futures) + + val result = Await.result(futureList, 1.second.dilated) + + result.size must be (100) + result.distinct.size must be (1) + result.distinct must contain ("succeed") + + } + + "transition to open state upon reaching failure limit and fail-fast" in { + + openBreaker + + val futures = for (i <- 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { + Thread.sleep(10); unreliableCall("success") + }) recoverWith { + case _: CircuitBreakerOpenException => Promise.successful("CBO") + } + + val futureList = Future.sequence(futures) + + val result = Await.result(futureList, 1.second.dilated) + + result.size must be (100) + result.distinct.size must be (1) + result.distinct must contain ("CBO") + } + + "allow a single call through in half-open state" in { + openBreaker + + Await.ready(breakers.halfOpenLatch, 2.seconds.dilated) + + val futures = for (i <- 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { + Thread.sleep(10); unreliableCall("succeed") + }) recoverWith { + case _: CircuitBreakerOpenException => Promise.successful("CBO") + } + + val futureList = Future.sequence(futures) + + val result = Await.result(futureList, 1.second.dilated) + + result.size must be (100) + result.distinct.size must be (2) + result.distinct must contain ("succeed") + result.distinct must contain ("CBO") + } + + "recover and reset the breaker after the reset timeout" in { + openBreaker + + Await.ready(breakers.halfOpenLatch, 2.seconds.dilated) + + Await.ready(breakers.breaker.withCircuitBreaker(Future(unreliableCall("succeed"))), 1.second.dilated) + + val futures = for (i <- 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { + Thread.sleep(10); unreliableCall("succeed") + }) recoverWith { + case _: CircuitBreakerOpenException => Promise.successful("CBO") + } + + val futureList = Future.sequence(futures) + + val result = Await.result(futureList, 1.second.dilated) + + result.size must be (100) + result.distinct.size must be (1) + result.distinct must contain ("succeed") + } + } + +} \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala new file mode 100644 index 0000000000..2c2a07ee3f --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala @@ -0,0 +1,243 @@ + +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.pattern + +import akka.util.duration._ +import akka.testkit._ +import org.scalatest.BeforeAndAfter +import akka.dispatch.Future +import akka.dispatch.Await + +object CircuitBreakerSpec { + + class TestException extends RuntimeException + +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter { + + import CircuitBreakerSpec.TestException + + val awaitTimeout = 2.seconds.dilated + + @volatile + var breakers: TestCircuitBreakers = null + + class TestCircuitBreakers { + val halfOpenLatch = new TestLatch(1) + val openLatch = new TestLatch(1) + val closedLatch = new TestLatch(1) + + val shortCallTimeoutCb = new CircuitBreaker(system.scheduler, 1, 50.millis.dilated, 500.millis.dilated) + .onClose(closedLatch.countDown()) + .onHalfOpen(halfOpenLatch.countDown()) + .onOpen(openLatch.countDown()) + + val shortResetTimeoutCb = new CircuitBreaker(system.scheduler, 1, 1000.millis.dilated, 50.millis.dilated) + .onClose(closedLatch.countDown()) + .onHalfOpen(halfOpenLatch.countDown()) + .onOpen(openLatch.countDown()) + + val longCallTimeoutCb = new CircuitBreaker(system.scheduler, 1, 5 seconds, 500.millis.dilated) + .onClose(closedLatch.countDown()) + .onHalfOpen(halfOpenLatch.countDown()) + .onOpen(openLatch.countDown()) + + val longResetTimeoutCb = new CircuitBreaker(system.scheduler, 1, 100.millis.dilated, 5 seconds) + .onClose(closedLatch.countDown()) + .onHalfOpen(halfOpenLatch.countDown()) + .onOpen(openLatch.countDown()) + + val multiFailureCb = new CircuitBreaker(system.scheduler, 5, 200.millis.dilated, 500.millis.dilated) + .onClose(closedLatch.countDown()) + .onHalfOpen(halfOpenLatch.countDown()) + .onOpen(openLatch.countDown()) + } + + before { + breakers = new TestCircuitBreakers + } + + def checkLatch(latch: TestLatch) { + Await.ready(latch, awaitTimeout) + } + + def throwException = throw new TestException + + def sayHi = "hi" + + "A synchronous circuit breaker that is open" must { + "throw exceptions when called before reset timeout" in { + + intercept[TestException] { + breakers.longResetTimeoutCb.withSyncCircuitBreaker(throwException) + } + checkLatch(breakers.openLatch) + + intercept[CircuitBreakerOpenException] { + breakers.longResetTimeoutCb.withSyncCircuitBreaker(sayHi) + } + } + + "transition to half-open on reset timeout" in { + intercept[TestException] { + breakers.shortResetTimeoutCb.withSyncCircuitBreaker(throwException) + } + checkLatch(breakers.halfOpenLatch) + } + } + + "A synchronous circuit breaker that is half-open" must { + "pass through next call and close on success" in { + intercept[TestException] { + breakers.shortResetTimeoutCb.withSyncCircuitBreaker(throwException) + } + checkLatch(breakers.halfOpenLatch) + assert("hi" == breakers.shortResetTimeoutCb.withSyncCircuitBreaker(sayHi)) + checkLatch(breakers.closedLatch) + } + + "open on exception in call" in { + intercept[TestException] { + breakers.shortResetTimeoutCb.withSyncCircuitBreaker(throwException) + } + checkLatch(breakers.halfOpenLatch) + intercept[TestException] { + breakers.shortResetTimeoutCb.withSyncCircuitBreaker(throwException) + } + checkLatch(breakers.openLatch) + } + } + + "A synchronous circuit breaker that is closed" must { + "allow calls through" in { + breakers.longCallTimeoutCb.withSyncCircuitBreaker(sayHi) must be("hi") + } + + "increment failure count on failure" in { + intercept[TestException] { + breakers.longCallTimeoutCb.withSyncCircuitBreaker(throwException) + } + checkLatch(breakers.openLatch) + breakers.longCallTimeoutCb.currentFailureCount must be(1) + } + + "reset failure count after success" in { + intercept[TestException] { + breakers.multiFailureCb.withSyncCircuitBreaker(throwException) + } + + breakers.multiFailureCb.currentFailureCount must be(1) + breakers.multiFailureCb.withSyncCircuitBreaker(sayHi) + breakers.multiFailureCb.currentFailureCount must be(0) + } + + "increment failure count on callTimeout" in { + breakers.shortCallTimeoutCb.withSyncCircuitBreaker({ + 100.millis.dilated.sleep() + }) + breakers.shortCallTimeoutCb.currentFailureCount must be(1) + } + } + + "An asynchronous circuit breaker that is open" must { + "throw exceptions when called before reset timeout" in { + breakers.longResetTimeoutCb.withCircuitBreaker(Future(throwException)) + + checkLatch(breakers.openLatch) + + intercept[CircuitBreakerOpenException] { + Await.result( + breakers.longResetTimeoutCb.withCircuitBreaker(Future(sayHi)), + awaitTimeout) + } + } + + "transition to half-open on reset timeout" in { + breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException)) + checkLatch(breakers.halfOpenLatch) + } + } + + "An asynchronous circuit breaker that is half-open" must { + "pass through next call and close on success" in { + breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException)) + checkLatch(breakers.halfOpenLatch) + + Await.result( + breakers.shortResetTimeoutCb.withCircuitBreaker(Future(sayHi)), + awaitTimeout) must be("hi") + checkLatch(breakers.closedLatch) + } + + "re-open on exception in call" in { + breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException)) + checkLatch(breakers.halfOpenLatch) + + intercept[TestException] { + Await.result( + breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException)), + awaitTimeout) + } + checkLatch(breakers.openLatch) + } + + "re-open on async failure" in { + breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException)) + checkLatch(breakers.halfOpenLatch) + + breakers.shortResetTimeoutCb.withCircuitBreaker(Future(throwException)) + checkLatch(breakers.openLatch) + } + } + + "An asynchronous circuit breaker that is closed" must { + "allow calls through" in { + Await.result( + breakers.longCallTimeoutCb.withCircuitBreaker(Future(sayHi)), + awaitTimeout) must be("hi") + } + + "increment failure count on exception" in { + intercept[TestException] { + Await.result( + breakers.longCallTimeoutCb.withCircuitBreaker(Future(throwException)), + awaitTimeout) + } + checkLatch(breakers.openLatch) + breakers.longCallTimeoutCb.currentFailureCount must be(1) + } + + "increment failure count on async failure" in { + breakers.longCallTimeoutCb.withCircuitBreaker(Future(throwException)) + checkLatch(breakers.openLatch) + breakers.longCallTimeoutCb.currentFailureCount must be(1) + } + + "reset failure count after success" in { + breakers.multiFailureCb.withCircuitBreaker(Future(sayHi)) + val latch = TestLatch(4) + for (n ← 1 to 4) breakers.multiFailureCb.withCircuitBreaker(Future(throwException)) + awaitCond(breakers.multiFailureCb.currentFailureCount == 4, awaitTimeout) + breakers.multiFailureCb.withCircuitBreaker(Future(sayHi)) + awaitCond(breakers.multiFailureCb.currentFailureCount == 0, awaitTimeout) + } + + "increment failure count on callTimeout" in { + breakers.shortCallTimeoutCb.withCircuitBreaker { + Future { + 100.millis.dilated.sleep() + sayHi + } + } + + checkLatch(breakers.openLatch) + breakers.shortCallTimeoutCb.currentFailureCount must be(1) + } + } + +} diff --git a/akka-actor/src/main/java/akka/pattern/AbstractCircuitBreaker.java b/akka-actor/src/main/java/akka/pattern/AbstractCircuitBreaker.java new file mode 100644 index 0000000000..44482bb357 --- /dev/null +++ b/akka-actor/src/main/java/akka/pattern/AbstractCircuitBreaker.java @@ -0,0 +1,18 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.pattern; + +import akka.util.Unsafe; + +class AbstractCircuitBreaker { + protected final static long stateOffset; + + static { + try { + stateOffset = Unsafe.instance.objectFieldOffset(CircuitBreaker.class.getDeclaredField("_currentStateDoNotCallMeDirectly")); + } catch(Throwable t){ + throw new ExceptionInInitializerError(t); + } + } +} diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala new file mode 100644 index 0000000000..79eba6aa1b --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -0,0 +1,560 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.pattern + +import java.util.concurrent.atomic.{ AtomicInteger, AtomicLong, AtomicBoolean } +import akka.AkkaException +import akka.actor.Scheduler +import akka.dispatch.{ Future, ExecutionContext, Await, Promise } +import akka.util.{ Deadline, Duration, NonFatal, Unsafe } +import akka.util.duration._ +import util.control.NoStackTrace +import java.util.concurrent.{ Callable, CopyOnWriteArrayList } + +/** + * Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread + */ +object CircuitBreaker { + + /** + * Synchronous execution context to run in caller's thread - used by companion object factory methods + */ + private[CircuitBreaker] val syncExecutionContext = new ExecutionContext { + def execute(runnable: Runnable): Unit = runnable.run() + + def reportFailure(t: Throwable): Unit = () + } + + /** + * Callbacks run in caller's thread when using withSyncCircuitBreaker, and in same ExecutionContext as the passed + * in Future when using withCircuitBreaker. To use another ExecutionContext for the callbacks you can specify the + * executor in the constructor. + * + * @param scheduler Reference to Akka scheduler + * @param maxFailures Maximum number of failures before opening the circuit + * @param callTimeout [[akka.util.Duration]] of time after which to consider a call a failure + * @param resetTimeout [[akka.util.Duration]] of time after which to attempt to close the circuit + */ + def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration): CircuitBreaker = + new CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration)(syncExecutionContext) + + /** + * Java API alias for apply + * + * @param scheduler Reference to Akka scheduler + * @param maxFailures Maximum number of failures before opening the circuit + * @param callTimeout [[akka.util.Duration]] of time after which to consider a call a failure + * @param resetTimeout [[akka.util.Duration]] of time after which to attempt to close the circuit + */ + def create(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration): CircuitBreaker = + apply(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration) +} + +/** + * Provides circuit breaker functionality to provide stability when working with "dangerous" operations, e.g. calls to + * remote systems + * + * Transitions through three states: + * - In *Closed* state, calls pass through until the `maxFailures` count is reached. This causes the circuit breaker + * to open. Both exceptions and calls exceeding `callTimeout` are considered failures. + * - In *Open* state, calls fail-fast with an exception. After `resetTimeout`, circuit breaker transitions to + * half-open state. + * - In *Half-Open* state, the first call will be allowed through, if it succeeds the circuit breaker will reset to + * closed state. If it fails, the circuit breaker will re-open to open state. All calls beyond the first that + * execute while the first is running will fail-fast with an exception. + * + * + * @param scheduler Reference to Akka scheduler + * @param maxFailures Maximum number of failures before opening the circuit + * @param callTimeout [[akka.util.Duration]] of time after which to consider a call a failure + * @param resetTimeout [[akka.util.Duration]] of time after which to attempt to close the circuit + * @param executor [[akka.dispatch.ExecutionContext]] used for execution of state transition listeners + */ +class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker { + + def this(executor: ExecutionContext, scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration) = { + this(scheduler, maxFailures, callTimeout, resetTimeout)(executor) + } + + /** + * Holds reference to current state of CircuitBreaker - *access only via helper methods* + */ + @volatile + private[this] var _currentStateDoNotCallMeDirectly: State = Closed + + /** + * Helper method for access to underlying state via Unsafe + * + * @param oldState Previous state on transition + * @param newState Next state on transition + * @return Whether the previous state matched correctly + */ + @inline + private[this] def swapState(oldState: State, newState: State): Boolean = + Unsafe.instance.compareAndSwapObject(this, AbstractCircuitBreaker.stateOffset, oldState, newState) + + /** + * Helper method for accessing underlying state via Unsafe + * + * @return Reference to current state + */ + @inline + private[this] def currentState: State = + Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.stateOffset).asInstanceOf[State] + + /** + * Wraps invocations of asynchronous calls that need to be protected + * + * @param body Call needing protected + * @tparam T return type from call + * @return [[akka.dispatch.Future]] containing the call result + */ + def withCircuitBreaker[T](body: ⇒ Future[T]): Future[T] = { + currentState.invoke(body) + } + + /** + * Java API for withCircuitBreaker + * + * @param body Call needing protected + * @tparam T return type from call + * @return [[akka.dispatch.Future]] containing the call result + */ + def callWithCircuitBreaker[T](body: Callable[Future[T]]): Future[T] = { + withCircuitBreaker(body.call) + } + + /** + * Wraps invocations of synchronous calls that need to be protected + * + * Calls are run in caller's thread + * + * @param body Call needing protected + * @tparam T return type from call + * @return The result of the call + */ + def withSyncCircuitBreaker[T](body: ⇒ T): T = { + Await.result(withCircuitBreaker( + { + try + Promise.successful(body)(CircuitBreaker.syncExecutionContext) + catch { + case NonFatal(t) ⇒ Promise.failed(t)(CircuitBreaker.syncExecutionContext) + } + }),callTimeout) + } + + /** + * Java API for withSyncCircuitBreaker + * + * @param body Call needing protected + * @tparam T return type from call + * @return The result of the call + */ + + def callWithSyncCircuitBreaker[T](body: Callable[T]): T = { + withSyncCircuitBreaker(body.call) + } + + /** + * Adds a callback to execute when circuit breaker opens + * + * The callback is run in the [[akka.dispatch.ExecutionContext]] supplied in the constructor. + * + * @param callback Handler to be invoked on state change + * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation + * @return CircuitBreaker for fluent usage + */ + def onOpen[T](callback: ⇒ T): CircuitBreaker = { + Open.addListener(() ⇒ callback) + this + } + + /** + * Java API for onOpen + * + * @param callback Handler to be invoked on state change + * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation + * @return CircuitBreaker for fluent usage + */ + def onOpen[T](callback: Callable[T]): CircuitBreaker = { + onOpen(callback.call) + } + + /** + * Adds a callback to execute when circuit breaker transitions to half-open + * + * The callback is run in the [[akka.dispatch.ExecutionContext]] supplied in the constructor. + * + * @param callback Handler to be invoked on state change + * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation + * @return CircuitBreaker for fluent usage + */ + def onHalfOpen[T](callback: ⇒ T): CircuitBreaker = { + HalfOpen.addListener(() ⇒ callback) + this + } + + /** + * JavaAPI for onHalfOpen + * + * @param callback Handler to be invoked on state change + * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation + * @return CircuitBreaker for fluent usage + */ + def onHalfOpen[T](callback: Callable[T]): CircuitBreaker = { + onHalfOpen(callback.call) + } + + /** + * Adds a callback to execute when circuit breaker state closes + * + * The callback is run in the [[akka.dispatch.ExecutionContext]] supplied in the constructor. + * + * @param callback Handler to be invoked on state change + * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation + * @return CircuitBreaker for fluent usage + */ + def onClose[T](callback: ⇒ T): CircuitBreaker = { + Closed.addListener(() ⇒ callback) + this + } + + /** + * JavaAPI for onClose + * + * @param callback Handler to be invoked on state change + * @tparam T Type supplied to assist with type inference, otherwise ignored by implementation + * @return CircuitBreaker for fluent usage + */ + def onClose[T](callback: Callable[T]): CircuitBreaker = { + onClose(callback.call) + } + + /** + * Retrieves current failure count. + * + * @return count + */ + private[akka] def currentFailureCount: Int = Closed.get + + /** + * Implements consistent transition between states + * + * @param fromState State being transitioning from + * @param toState State being transitioning from + * @throws IllegalStateException if an invalid transition is attempted + */ + private def transition(fromState: State, toState: State): Unit = { + if (swapState(fromState, toState)) + toState.enter() + else + throw new IllegalStateException("Illegal transition attempted from: " + fromState + " to " + toState) + } + + /** + * Trips breaker to an open state. This is valid from Closed or Half-Open states. + * + * @param fromState State we're coming from (Closed or Half-Open) + */ + private def tripBreaker(fromState: State): Unit = { + transition(fromState, Open) + } + + /** + * Resets breaker to a closed state. This is valid from an Half-Open state only. + * + */ + private def resetBreaker(): Unit = { + transition(HalfOpen, Closed) + } + + /** + * Attempts to reset breaker by transitioning to a half-open state. This is valid from an Open state only. + * + */ + private def attemptReset(): Unit = { + transition(Open, HalfOpen) + } + + /** + * Internal state abstraction + */ + private sealed trait State { + private val listeners = new CopyOnWriteArrayList[() ⇒ _] + + /** + * Add a listener function which is invoked on state entry + * + * @param listener listener implementation + * @tparam T return type of listener, not used - but supplied for type inference purposes + */ + def addListener[T](listener: () ⇒ T) { + listeners add listener + } + + /** + * Test for whether listeners exist + * + * @return whether listeners exist + */ + private def hasListeners: Boolean = !listeners.isEmpty + + /** + * Notifies the listeners of the transition event via a Future executed in implicit parameter ExecutionContext + * + * @return Promise which executes listener in supplied [[akka.dispatch.ExecutionContext]] + */ + protected def notifyTransitionListeners() { + if (hasListeners) { + val iterator = listeners.iterator + while (iterator.hasNext) { + val listener = iterator.next + //FIXME per @viktorklang: it's a bit wasteful to create Futures for one-offs, just use EC.execute instead + Future(listener()) + } + } + } + + /** + * Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed + * call timeout is counted as a failed call, otherwise a successful call + * + * @param body Implementation of the call + * @tparam T Return type of the call's implementation + * @return Future containing the result of the call + */ + def callThrough[T](body: ⇒ Future[T]): Future[T] = { + val deadline = callTimeout.fromNow + val bodyFuture = try body catch { + case NonFatal(t) ⇒ Promise.failed(t) + } + bodyFuture onFailure { + case _ ⇒ callFails() + } onSuccess { + case _ ⇒ + if (deadline.isOverdue()) callFails() + else callSucceeds() + } + } + + /** + * Abstract entry point for all states + * + * @param body Implementation of the call that needs protected + * @tparam T Return type of protected call + * @return Future containing result of protected call + */ + def invoke[T](body: ⇒ Future[T]): Future[T] + + /** + * Invoked when call succeeds + * + */ + def callSucceeds(): Unit + + /** + * Invoked when call fails + * + */ + def callFails(): Unit + + /** + * Invoked on the transitioned-to state during transition. Notifies listeners after invoking subclass template + * method _enter + * + */ + final def enter(): Unit = { + _enter() + notifyTransitionListeners() + } + + /** + * Template method for concrete traits + * + */ + def _enter(): Unit + } + + /** + * Concrete implementation of Closed state + */ + private object Closed extends AtomicInteger with State { + + /** + * Implementation of invoke, which simply attempts the call + * + * @param body Implementation of the call that needs protected + * @tparam T Return type of protected call + * @return Future containing result of protected call + */ + override def invoke[T](body: ⇒ Future[T]): Future[T] = { + callThrough(body) + } + + /** + * On successful call, the failure count is reset to 0 + * + * @return + */ + override def callSucceeds(): Unit = { set(0) } + + /** + * On failed call, the failure count is incremented. The count is checked against the configured maxFailures, and + * the breaker is tripped if we have reached maxFailures. + * + * @return + */ + override def callFails(): Unit = { + if (incrementAndGet() == maxFailures) tripBreaker(Closed) + } + + /** + * On entry of this state, failure count is reset. + * + * @return + */ + override def _enter(): Unit = { + set(0) + } + + /** + * Override for more descriptive toString + * + * @return + */ + override def toString: String = { + "Closed with failure count = " + get() + } + } + + /** + * Concrete implementation of half-open state + */ + private object HalfOpen extends AtomicBoolean(true) with State { + + /** + * Allows a single call through, during which all other callers fail-fast. If the call fails, the breaker reopens. + * If the call succeeds the breaker closes. + * + * @param body Implementation of the call that needs protected + * @tparam T Return type of protected call + * @return Future containing result of protected call + */ + override def invoke[T](body: ⇒ Future[T]): Future[T] = { + if (compareAndSet(true, false)) + callThrough(body) + else + Promise.failed[T](new CircuitBreakerOpenException(Duration.Zero)) + } + + /** + * Reset breaker on successful call. + * + * @return + */ + override def callSucceeds(): Unit = { resetBreaker() } + + /** + * Reopen breaker on failed call. + * + * @return + */ + override def callFails(): Unit = { tripBreaker(HalfOpen) } + + /** + * On entry, guard should be reset for that first call to get in + * + * @return + */ + override def _enter(): Unit = { + set(true) + } + + /** + * Override for more descriptive toString + * + * @return + */ + override def toString: String = { + "Half-Open currently testing call for success = " + get() + } + } + + /** + * Concrete implementation of Open state + */ + private object Open extends AtomicLong with State { + + /** + * Fail-fast on any invocation + * + * @param body Implementation of the call that needs protected + * @tparam T Return type of protected call + * @return Future containing result of protected call + */ + override def invoke[T](body: ⇒ Future[T]): Future[T] = { + Promise.failed[T](new CircuitBreakerOpenException(remainingTimeout().timeLeft)) + } + + /** + * Calculate remaining timeout to inform the caller in case a backoff algorithm is useful + * + * @return [[akka.util.Deadline]] to when the breaker will attempt a reset by transitioning to half-open + */ + private def remainingTimeout(): Deadline = get match { + case 0L ⇒ Deadline.now + case t ⇒ (t.millis + resetTimeout).fromNow + } + + /** + * No-op for open, calls are never executed so cannot succeed or fail + * + * @return + */ + override def callSucceeds(): Unit = {} + + /** + * No-op for open, calls are never executed so cannot succeed or fail + * + * @return + */ + override def callFails(): Unit = {} + + /** + * On entering this state, schedule an attempted reset via [[akka.actor.Scheduler]] and store the entry time to + * calculate remaining time before attempted reset. + * + * @return + */ + override def _enter(): Unit = { + set(System.currentTimeMillis) + scheduler.scheduleOnce(resetTimeout) { + attemptReset() + } + } + + /** + * Override for more descriptive toString + * + * @return + */ + override def toString: String = { + "Open" + } + } + +} + +/** + * Exception thrown when Circuit Breaker is open. + * + * @param remainingDuration Stores remaining time before attempting a reset. Zero duration means the breaker is + * currently in half-open state. + * @param message Defaults to "Circuit Breaker is open; calls are failing fast" + */ +class CircuitBreakerOpenException( + val remainingDuration: Duration, + message: String = "Circuit Breaker is open; calls are failing fast") + extends AkkaException(message) with NoStackTrace diff --git a/akka-docs/common/circuitbreaker.rst b/akka-docs/common/circuitbreaker.rst new file mode 100644 index 0000000000..bd13927c8e --- /dev/null +++ b/akka-docs/common/circuitbreaker.rst @@ -0,0 +1,130 @@ +.. _circuit-breaker: + +############### +Circuit Breaker +############### + +================== +Why are they used? +================== +A circuit breaker is used to provide stability and prevent cascading failures in distributed +systems. These should be used in conjunction with judicious timeouts at the interfaces between +remote systems to prevent the failure of a single component from bringing down all components. + +As an example, we have a web application interacting with a remote third party web service. +Let's say the third party has oversold their capacity and their database melts down under load. +Assume that the database fails in such a way that it takes a very long time to hand back an +error to the third party web service. This in turn makes calls fail after a long period of +time. Back to our web application, the users have noticed that their form submissions take +much longer seeming to hang. Well the users do what they know to do which is use the refresh +button, adding more requests to their already running requests. This eventually causes the +failure of the web application due to resource exhaustion. This will affect all users, even +those who are not using functionality dependent on this third party web service. + +Introducing circuit breakers on the web service call would cause the requests to begin to +fail-fast, letting the user know that something is wrong and that they need not refresh +their request. This also confines the failure behavior to only those users that are using +functionality dependent on the third party, other users are no longer affected as there is no +resource exhaustion. Circuit breakers can also allow savvy developers to mark portions of +the site that use the functionality unavailable, or perhaps show some cached content as +appropriate while the breaker is open. + +The Akka library provides an implementation of a circuit breaker called +:class:`akka.pattern.CircuitBreaker` which has the behavior described below. + +================= +What do they do? +================= +* During normal operation, a circuit breaker is in the `Closed` state: + * Exceptions or calls exceeding the configured `callTimeout` increment a failure counter + * Successes reset the failure count to zero + * When the failure counter reaches a `maxFailures` count, the breaker is tripped into `Open` state +* While in `Open` state: + * All calls fail-fast with a :class:`CircuitBreakerOpenException` + * After the configured `resetTimeout`, the circuit breaker enters a `Half-Open` state +* In `Half-Open` state: + * The first call attempted is allowed through without failing fast + * All other calls fail-fast with an exception just as in `Open` state + * If the first call succeeds, the breaker is reset back to `Closed` state + * If the first call fails, the breaker is tripped again into the `Open` state for another full `resetTimeout` +* State transition listeners: + * Callbacks can be provided for every state entry via `onOpen`, `onClose`, and `onHalfOpen` + * These are executed in the :class:`ExecutionContext` provided. + +.. graphviz:: + + digraph circuit_breaker { + rankdir = "LR"; + size = "6,5"; + graph [ bgcolor = "transparent" ] + node [ fontname = "Helvetica", + fontsize = 14, + shape = circle, + color = white, + style = filled ]; + edge [ fontname = "Helvetica", fontsize = 12 ] + Closed [ fillcolor = green2 ]; + "Half-Open" [fillcolor = yellow2 ]; + Open [ fillcolor = red2 ]; + Closed -> Closed [ label = "Success" ]; + "Half-Open" -> Open [ label = "Trip Breaker" ]; + "Half-Open" -> Closed [ label = "Reset Breaker" ]; + Closed -> Open [ label = "Trip Breaker" ]; + Open -> Open [ label = "Calls failing fast" ]; + Open -> "Half-Open" [ label = "Attempt Reset" ]; + } + +======== +Examples +======== + +-------------- +Initialization +-------------- + +Here's how a :class:`CircuitBreaker` would be configured for: + * 5 maximum failures + * a call timeout of 10 seconds + * a reset timeout of 1 minute + +^^^^^^^ +Scala +^^^^^^^ + +.. includecode:: code/docs/circuitbreaker/CircuitBreakerDocSpec.scala + :include: imports1,circuit-breaker-initialization + +^^^^^^^ +Java +^^^^^^^ + +.. includecode:: code/docs/circuitbreaker/DangerousJavaActor.java + :include: imports1,circuit-breaker-initialization + +--------------- +Call Protection +--------------- + +Here's how the :class:`CircuitBreaker` would be used to protect an asynchronous +call as well as a synchronous one: + +^^^^^^^ +Scala +^^^^^^^ + +.. includecode:: code/docs/circuitbreaker/CircuitBreakerDocSpec.scala + :include: circuit-breaker-usage + +^^^^^^ +Java +^^^^^^ + +.. includecode:: code/docs/circuitbreaker/DangerousJavaActor.java + :include: circuit-breaker-usage + +.. note:: + + Using the :class:`CircuitBreaker` companion object's `apply` or `create` methods + will return a :class:`CircuitBreaker` where callbacks are executed in the caller's thread. + This can be useful if the asynchronous :class:`Future` behavior is unnecessary, for + example invoking a synchronous-only API. diff --git a/akka-docs/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala b/akka-docs/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala new file mode 100644 index 0000000000..bd6c1447ad --- /dev/null +++ b/akka-docs/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package docs.circuitbreaker + +//#imports1 +import akka.util.duration._ // small d is important here +import akka.pattern.CircuitBreaker +import akka.actor.Actor +import akka.dispatch.Future +import akka.event.Logging + +//#imports1 + +class CircuitBreakerDocSpec { } + +//#circuit-breaker-initialization +class DangerousActor extends Actor { + + val log = Logging(context.system, this) + implicit val executionContext = context.dispatcher + val breaker = + new CircuitBreaker(context.system.scheduler, 5, 10.seconds, 1.minute) + .onOpen(notifyMeOnOpen) + + def notifyMeOnOpen = + log.warning("My CircuitBreaker is now open, and will not close for one minute") +//#circuit-breaker-initialization + +//#circuit-breaker-usage + def dangerousCall: String = "This really isn't that dangerous of a call after all" + + def receive = { + case "is my middle name" => + sender ! breaker.withCircuitBreaker(Future(dangerousCall)) + case "block for me" => + sender ! breaker.withSyncCircuitBreaker(dangerousCall) + } +//#circuit-breaker-usage + +} + diff --git a/akka-docs/common/code/docs/circuitbreaker/DangerousJavaActor.java b/akka-docs/common/code/docs/circuitbreaker/DangerousJavaActor.java new file mode 100644 index 0000000000..1562338e04 --- /dev/null +++ b/akka-docs/common/code/docs/circuitbreaker/DangerousJavaActor.java @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package docs.circuitbreaker; + +//#imports1 + +import akka.actor.UntypedActor; +import akka.dispatch.Future; +import akka.event.LoggingAdapter; +import akka.util.Duration; +import akka.pattern.CircuitBreaker; +import akka.event.Logging; + +import static akka.dispatch.Futures.future; + +import java.util.concurrent.Callable; + +//#imports1 + +//#circuit-breaker-initialization +public class DangerousJavaActor extends UntypedActor { + + private final CircuitBreaker breaker; + private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + + public DangerousJavaActor() { + this.breaker = new CircuitBreaker( + getContext().dispatcher(), getContext().system().scheduler(), + 5, Duration.parse("10s"), Duration.parse("1m")) + .onOpen(new Callable() { + public Object call() throws Exception { + notifyMeOnOpen(); + return null; + } + }); + } + + public void notifyMeOnOpen() { + log.warning("My CircuitBreaker is now open, and will not close for one minute"); + } +//#circuit-breaker-initialization + + //#circuit-breaker-usage + public String dangerousCall() { + return "This really isn't that dangerous of a call after all"; + } + + @Override + public void onReceive(Object message) { + if (message instanceof String) { + String m = (String) message; + if ("is my middle name".equals(m)) { + final Future f = future( + new Callable() { + public String call() { + return dangerousCall(); + } + }, getContext().dispatcher()); + + getSender().tell(breaker + .callWithCircuitBreaker( + new Callable>() { + public Future call() throws Exception { + return f; + } + })); + } + if ("block for me".equals(m)) { + getSender().tell(breaker + .callWithSyncCircuitBreaker( + new Callable() { + @Override + public String call() throws Exception { + return dangerousCall(); + } + })); + } + } + } +//#circuit-breaker-usage + +} \ No newline at end of file diff --git a/akka-docs/common/index.rst b/akka-docs/common/index.rst index 4e19d1a1aa..de9c7016fc 100644 --- a/akka-docs/common/index.rst +++ b/akka-docs/common/index.rst @@ -5,3 +5,4 @@ Common utilities :maxdepth: 2 duration + circuitbreaker diff --git a/akka-docs/conf.py b/akka-docs/conf.py index b632430b59..77b7c80be0 100644 --- a/akka-docs/conf.py +++ b/akka-docs/conf.py @@ -8,7 +8,7 @@ import sys, os # -- General configuration ----------------------------------------------------- sys.path.append(os.path.abspath('_sphinx/exts')) -extensions = ['sphinx.ext.todo', 'includecode'] +extensions = ['sphinx.ext.todo', 'includecode', 'sphinx.ext.graphviz'] templates_path = ['_templates'] source_suffix = '.rst' diff --git a/akka-docs/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala b/akka-docs/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala index ac6c58ad08..54349f73e0 100644 --- a/akka-docs/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala +++ b/akka-docs/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala @@ -50,6 +50,8 @@ import akka.dispatch.MailboxType import akka.dispatch.MessageQueue import akka.actor.mailbox.DurableMessageQueue import akka.actor.mailbox.DurableMessageSerialization +import akka.pattern.CircuitBreaker +import akka.util.duration._ class MyMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { @@ -65,20 +67,23 @@ class MyMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { val storage = new QueueStorage + // A real-world implmentation would use configuration to set the last + // three parameters below + val breaker = CircuitBreaker(_owner.system.scheduler,5,30.seconds,1.minute) - def enqueue(receiver: ActorRef, envelope: Envelope) { + def enqueue(receiver: ActorRef, envelope: Envelope): Unit = breaker.withSyncCircuitBreaker { val data: Array[Byte] = serialize(envelope) storage.push(data) } - def dequeue(): Envelope = { + def dequeue(): Envelope = breaker.withSyncCircuitBreaker { val data: Option[Array[Byte]] = storage.pull() data.map(deserialize).orNull } - def hasMessages: Boolean = !storage.isEmpty + def hasMessages: Boolean = breaker.withSyncCircuitBreaker { !storage.isEmpty } - def numberOfMessages: Int = storage.size + def numberOfMessages: Int = breaker.withSyncCircuitBreaker { storage.size } /** * Called when the mailbox is disposed. diff --git a/akka-docs/modules/durable-mailbox.rst b/akka-docs/modules/durable-mailbox.rst index 2a9ca174cf..5be40320d0 100644 --- a/akka-docs/modules/durable-mailbox.rst +++ b/akka-docs/modules/durable-mailbox.rst @@ -80,7 +80,9 @@ a configurator (MailboxType) and a queue implementation (DurableMessageQueue). The envelope contains the message sent to the actor, and information about sender. It is the envelope that needs to be stored. As a help utility you can mixin DurableMessageSerialization to serialize and deserialize the envelope using the ordinary :ref:`serialization-scala` -mechanism. This optional and you may store the envelope data in any way you like. +mechanism. This optional and you may store the envelope data in any way you like. Durable +mailboxes are an excellent fit for usage of circuit breakers. These are described in the +:ref:`circuit-breaker` documentation. .. includecode:: code/docs/actor/mailbox/DurableMailboxDocSpec.scala :include: custom-mailbox diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf index 1a1b7b721b..f454716af0 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf @@ -45,7 +45,19 @@ akka { keep-journal = on # whether to sync the journal after each transaction - sync-journal = off + sync-journal = off + + # circuit breaker configuration + circuit-breaker { + # maximum number of failures before opening breaker + max-failures = 3 + + # duration of time beyond which a call is assumed to be timed out and considered a failure + call-timeout = 3 seconds + + # duration of time to wait until attempting to reset the breaker during which all calls fail-fast + reset-timeout = 30 seconds + } } } } diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala index c595fdcdd3..fccb6b5aea 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala @@ -5,14 +5,14 @@ package akka.actor.mailbox import akka.actor.ActorContext -import akka.dispatch.{ Envelope, MessageQueue } import akka.event.Logging import akka.actor.ActorRef -import akka.dispatch.MailboxType import com.typesafe.config.Config -import akka.util.NonFatal import akka.ConfigurationException import akka.actor.ActorSystem +import akka.dispatch._ +import akka.util.{ Duration, NonFatal } +import akka.pattern.{ CircuitBreakerOpenException, CircuitBreaker } class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { private val settings = new FileBasedMailboxSettings(systemSettings, config) @@ -26,6 +26,8 @@ class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailbox // TODO Is it reasonable for all FileBasedMailboxes to have their own logger? private val log = Logging(system, "FileBasedMessageQueue") + val breaker = CircuitBreaker(_owner.system.scheduler, settings.CircuitBreakerMaxFailures, settings.CircuitBreakerCallTimeout, settings.CircuitBreakerResetTimeout) + private val queue = try { (new java.io.File(settings.QueuePath)) match { case dir if dir.exists && !dir.isDirectory ⇒ throw new IllegalStateException("Path already occupied by non-directory " + dir) @@ -42,18 +44,28 @@ class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailbox throw e } - def enqueue(receiver: ActorRef, envelope: Envelope): Unit = queue.add(serialize(envelope)) - - def dequeue(): Envelope = try { - queue.remove.map(item ⇒ { queue.confirmRemove(item.xid); deserialize(item.data) }).orNull - } catch { - case _: java.util.NoSuchElementException ⇒ null - case NonFatal(e) ⇒ - log.error(e, "Couldn't dequeue from file-based mailbox") - throw e + def enqueue(receiver: ActorRef, envelope: Envelope) { + breaker.withSyncCircuitBreaker(queue.add(serialize(envelope))) } - def numberOfMessages: Int = queue.length.toInt + def dequeue(): Envelope = { + breaker.withSyncCircuitBreaker( + try { + queue.remove.map(item ⇒ { queue.confirmRemove(item.xid); deserialize(item.data) }).orNull + } catch { + case _: java.util.NoSuchElementException ⇒ null + case e: CircuitBreakerOpenException ⇒ + log.debug(e.getMessage()) + throw e + case NonFatal(e) ⇒ + log.error(e, "Couldn't dequeue from file-based mailbox, due to [{}]", e.getMessage()) + throw e + }) + } + + def numberOfMessages: Int = { + breaker.withSyncCircuitBreaker(queue.length.toInt) + } def hasMessages: Boolean = numberOfMessages > 0 diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala index 87dc25840f..dff4021d96 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala @@ -29,4 +29,7 @@ class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val use val KeepJournal: Boolean = getBoolean("keep-journal") val SyncJournal: Boolean = getBoolean("sync-journal") + val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures") + val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout")) + val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout")) } \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala index 6c97142068..e3ad811b52 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala @@ -1,7 +1,6 @@ package akka.actor.mailbox import org.apache.commons.io.FileUtils -import com.typesafe.config.ConfigFactory import akka.dispatch.Mailbox object FileBasedMailboxSpec { @@ -10,23 +9,32 @@ object FileBasedMailboxSpec { mailbox-type = akka.actor.mailbox.FileBasedMailboxType throughput = 1 file-based.directory-path = "file-based" + file-based.circuit-breaker.max-failures = 5 + file-based.circuit-breaker.call-timeout = 5 seconds } - """ + """ } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSpec.config) { - val queuePath = new FileBasedMailboxSettings(system.settings, system.settings.config.getConfig("File-dispatcher")).QueuePath + val settings = new FileBasedMailboxSettings(system.settings, system.settings.config.getConfig("File-dispatcher")) "FileBasedMailboxSettings" must { "read the file-based section" in { - queuePath must be("file-based") + settings.QueuePath must be("file-based") + settings.CircuitBreakerMaxFailures must be(5) + + import akka.util.duration._ + + settings.CircuitBreakerCallTimeout must be(5 seconds) } } + def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[FileBasedMessageQueue] + def clean() { - FileUtils.deleteDirectory(new java.io.File(queuePath)) + FileUtils.deleteDirectory(new java.io.File(settings.QueuePath)) } override def atStartup() {