diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala index 35784cf58f..377dbc978c 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala @@ -73,6 +73,22 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter { intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.halfOpenLatch) } + + "still be in open state after calling success method" in { + val breaker = CircuitBreakerSpec.longResetTimeoutCb() + intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } + checkLatch(breaker.openLatch) + breaker().succeed() + checkLatch(breaker.openLatch) + } + + "still be in open state after calling fail method" in { + val breaker = CircuitBreakerSpec.longResetTimeoutCb() + intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } + checkLatch(breaker.openLatch) + breaker().fail() + checkLatch(breaker.openLatch) + } } "A synchronous circuit breaker that is half-open" must { @@ -91,6 +107,22 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter { intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } checkLatch(breaker.openLatch) } + + "open on calling fail method" in { + val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } + checkLatch(breaker.halfOpenLatch) + breaker().fail() + checkLatch(breaker.openLatch) + } + + "close on calling success method" in { + val breaker = CircuitBreakerSpec.shortResetTimeoutCb() + intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) } + checkLatch(breaker.halfOpenLatch) + breaker().succeed() + checkLatch(breaker.closedLatch) + } } "A synchronous circuit breaker that is closed" must { @@ -107,6 +139,14 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter { breaker().currentFailureCount should ===(1) } + "increment failure count on fail method" in { + val breaker = CircuitBreakerSpec.longCallTimeoutCb() + breaker().currentFailureCount should ===(0) + breaker().fail() + checkLatch(breaker.openLatch) + breaker().currentFailureCount should ===(1) + } + "reset failure count after success" in { val breaker = CircuitBreakerSpec.multiFailureCb() breaker().currentFailureCount should ===(0) @@ -119,6 +159,18 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter { breaker().currentFailureCount should ===(0) } + "reset failure count after success method" in { + val breaker = CircuitBreakerSpec.multiFailureCb() + breaker().currentFailureCount should ===(0) + intercept[TestException] { + val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread + breaker().withSyncCircuitBreaker({ if (Thread.currentThread() eq ct) throwException else "fail" }) + } + breaker().currentFailureCount should ===(1) + breaker().succeed() + breaker().currentFailureCount should ===(0) + } + "throw TimeoutException on callTimeout" in { val breaker = CircuitBreakerSpec.shortCallTimeoutCb() intercept[TimeoutException] { diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index 6460ee039c..9a09ba483a 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -166,6 +166,54 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite */ def callWithSyncCircuitBreaker[T](body: Callable[T]): T = withSyncCircuitBreaker(body.call) + /** + * Mark a successful call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the + * caller Actor. In such a case, it is convenient to mark a successful call instead of using Future + * via [[withCircuitBreaker]] + */ + def succeed(): Unit = { + currentState.callSucceeds() + } + + /** + * Mark a failed call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the + * caller Actor. In such a case, it is convenient to mark a failed call instead of using Future + * via [[withCircuitBreaker]] + */ + def fail(): Unit = { + currentState.callFails() + } + + /** + * Return true if the internal state is Closed. WARNING: It is a "power API" call which you should use with care. + * Ordinal use cases of CircuitBreaker expects a remote call to return Future, as in withCircuitBreaker. + * So, if you check the state by yourself, and make a remote call outside CircuitBreaker, you should + * manage the state yourself. + */ + def isClosed: Boolean = { + currentState == Closed + } + + /** + * Return true if the internal state is Open. WARNING: It is a "power API" call which you should use with care. + * Ordinal use cases of CircuitBreaker expects a remote call to return Future, as in withCircuitBreaker. + * So, if you check the state by yourself, and make a remote call outside CircuitBreaker, you should + * manage the state yourself. + */ + def isOpen: Boolean = { + currentState == Open + } + + /** + * Return true if the internal state is HalfOpen. WARNING: It is a "power API" call which you should use with care. + * Ordinal use cases of CircuitBreaker expects a remote call to return Future, as in withCircuitBreaker. + * So, if you check the state by yourself, and make a remote call outside CircuitBreaker, you should + * manage the state yourself. + */ + def isHalfOpen: Boolean = { + currentState == HalfOpen + } + /** * Adds a callback to execute when circuit breaker opens * @@ -189,7 +237,6 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite /** * Adds a callback to execute when circuit breaker transitions to half-open - * * The callback is run in the [[scala.concurrent.ExecutionContext]] supplied in the constructor. * * @param callback Handler to be invoked on state change diff --git a/akka-docs/rst/common/circuitbreaker.rst b/akka-docs/rst/common/circuitbreaker.rst index ddce833178..052a9cbfec 100644 --- a/akka-docs/rst/common/circuitbreaker.rst +++ b/akka-docs/rst/common/circuitbreaker.rst @@ -107,3 +107,35 @@ Java will return a :class:`CircuitBreaker` where callbacks are executed in the caller's thread. This can be useful if the asynchronous :class:`Future` behavior is unnecessary, for example invoking a synchronous-only API. + + +------------ +Tell Pattern +------------ + +The above ``Call Protection`` pattern works well when the return from a remote call is wrapped in a ``Future``. +However, when a remote call sends back a message or timeout to the caller ``Actor``, the ``Call Protection`` pattern +is awkward. CircuitBreaker doesn't support it natively at the moment, so you need to use below low-level power-user APIs, +``succeed`` and ``fail`` methods, as well as ``isClose``, ``isOpen``, ``isHalfOpen``. + +.. note:: + + The below examples doesn't make a remote call when the state is `HalfOpen`. Using the power-user APIs, it is + your responsibility to judge when to make remote calls in `HalfOpen`. + + +^^^^^^^ +Scala +^^^^^^^ + +.. includecode:: code/docs/circuitbreaker/CircuitBreakerDocSpec.scala + :include: circuit-breaker-tell-pattern + +^^^^^^^ +Java +^^^^^^^ + +.. includecode:: code/docs/circuitbreaker/TellPatternJavaActor.java + :include: circuit-breaker-tell-pattern + + diff --git a/akka-docs/rst/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala b/akka-docs/rst/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala index 97d5fbc6e1..5f6c8c6d16 100644 --- a/akka-docs/rst/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala +++ b/akka-docs/rst/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala @@ -8,10 +8,9 @@ package docs.circuitbreaker import scala.concurrent.duration._ import akka.pattern.CircuitBreaker import akka.pattern.pipe -import akka.actor.Actor -import akka.actor.ActorLogging +import akka.actor.{Actor, ActorLogging, ActorRef} + import scala.concurrent.Future -import akka.event.Logging //#imports1 @@ -45,3 +44,35 @@ class DangerousActor extends Actor with ActorLogging { } +class TellPatternActor(recipient : ActorRef) extends Actor with ActorLogging { + import context.dispatcher + + val breaker = + new CircuitBreaker( + context.system.scheduler, + maxFailures = 5, + callTimeout = 10.seconds, + resetTimeout = 1.minute).onOpen(notifyMeOnOpen()) + + def notifyMeOnOpen(): Unit = + log.warning("My CircuitBreaker is now open, and will not close for one minute") + + //#circuit-breaker-tell-pattern + import akka.actor.ReceiveTimeout + + def receive = { + case "call" if breaker.isClosed => { + recipient ! "message" + } + case "response" => { + breaker.succeed() + } + case err: Throwable => { + breaker.fail() + } + case ReceiveTimeout => { + breaker.fail() + } + } + //#circuit-breaker-tell-pattern +} \ No newline at end of file diff --git a/akka-docs/rst/common/code/docs/circuitbreaker/TellPatternJavaActor.java b/akka-docs/rst/common/code/docs/circuitbreaker/TellPatternJavaActor.java new file mode 100644 index 0000000000..099da29342 --- /dev/null +++ b/akka-docs/rst/common/code/docs/circuitbreaker/TellPatternJavaActor.java @@ -0,0 +1,51 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package docs.circuitbreaker; + +import akka.actor.ActorRef; +import akka.actor.ReceiveTimeout; +import akka.actor.UntypedActor; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.pattern.CircuitBreaker; +import scala.concurrent.duration.Duration; + +public class TellPatternJavaActor extends UntypedActor { + + private final ActorRef target; + private final CircuitBreaker breaker; + private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + + public TellPatternJavaActor(ActorRef targetActor) { + this.target = targetActor; + this.breaker = new CircuitBreaker( + getContext().dispatcher(), getContext().system().scheduler(), + 5, Duration.create(10, "s"), Duration.create(1, "m")) + .onOpen(new Runnable() { + public void run() { + notifyMeOnOpen(); + } + }); + } + + public void notifyMeOnOpen() { + log.warning("My CircuitBreaker is now open, and will not close for one minute"); + } + + //#circuit-breaker-tell-pattern + @Override + public void onReceive(Object payload) { + if ( "call".equals(payload) && breaker.isClosed() ) { + target.tell("message", getSelf()); + } else if ( "response".equals(payload) ) { + breaker.succeed(); + } else if ( payload instanceof Throwable ) { + breaker.fail(); + } else if ( payload instanceof ReceiveTimeout ) { + breaker.fail(); + } + } + //#circuit-breaker-tell-pattern + +}