diff --git a/akka-contrib/docs/circuitbreaker.rst b/akka-contrib/docs/circuitbreaker.rst new file mode 100644 index 0000000000..a3d9adbb0c --- /dev/null +++ b/akka-contrib/docs/circuitbreaker.rst @@ -0,0 +1,184 @@ +Circuit-Breaker Actor +====================== + +This is an alternative implementation of the [AKKA Circuit Breaker Pattern](http://doc.akka.io/docs/akka/snapshot/common/circuitbreaker.html). +The main difference is that is intended to be used only for request-reply interactions with actor using the Circuit-Breaker as a proxy of the target one +in order to provide the same failfast functionnalities and a protocol similar to the AKKA Pattern implementation + + +### Usage + +Let's assume we have an actor wrapping a back-end service and able to respond to `Request` calls with a `Response` object +containing an `Either[String, String]` to map successful and failed responses. The service is also potentially slowing down +because of the workload. + +A simple implementation can be given by this class:: + + + object SimpleService { + case class Request(content: String) + case class Response(content: Either[String, String]) + case object ResetCount + } + + /** + * This is a simple actor simulating a service + * - Becoming slower with the increase of frequency of input requests + * - Failing around 30% of the requests + */ + class SimpleService extends Actor with ActorLogging { + import SimpleService._ + + var messageCount = 0 + + import context.dispatcher + + context.system.scheduler.schedule(1.second, 1.second, self, ResetCount) + + override def receive = { + case ResetCount => + messageCount = 0 + + case Request(content) => + messageCount += 1 + // simulate workload + Thread.sleep( 100 * messageCount ) + // Fails around 30% of the times + if(Random.nextInt(100) < 70 ) { + sender ! Response(Right(s"Successfully processed $content")) + } else { + sender ! Response(Left(s"Failure processing $content")) + } + + } + } + + +If we want to interface with this service using the Circuit Breaker we can use two approaches: + +Using a non-conversational approach: :: + + class CircuitBreakerExample(potentiallyFailingService: ActorRef) extends Actor with ActorLogging { + import SimpleService._ + + val serviceCircuitBreaker = + context.actorOf( + CircuitBreakerActorBuilder( maxFailures = 3, callTimeout = 2.seconds, resetTimeout = 30.seconds ) + .copy( + failureDetector = { + _ match { + case Response(Left(_)) => true + case _ => false + } + } + ) + .propsForTarget(potentiallyFailingService), + "serviceCircuitBreaker" + ) + + override def receive: Receive = { + case AskFor(requestToForward) => + serviceCircuitBreaker ! Request(requestToForward) + + case Right(Response(content)) => + //handle response + log.info("Got successful response {}", content) + + case Response(Right(content)) => + //handle response + log.info("Got successful response {}", content) + + case Response(Left(content)) => + //handle response + log.info("Got failed response {}", content) + + case CircuitOpenFailure(failedMsg) => + log.warning("Unable to send message {}", failedMsg) + } + } + +Using the ASK pattern, in this case it is useful to be able to map circuit open failures to the same type of failures +returned by the service (a `Left[String]` in our case): :: + + class CircuitBreakerAskExample(potentiallyFailingService: ActorRef) extends Actor with ActorLogging { + import SimpleService._ + import akka.pattern._ + + implicit val askTimeout: Timeout = 2.seconds + + val serviceCircuitBreaker = + context.actorOf( + CircuitBreakerActorBuilder( maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds ) + .copy( + failureDetector = { + _ match { + case Response(Left(_)) => true + case _ => false + } + } + ) + .copy( + openCircuitFailureConverter = { failure => + Left(s"Circuit open when processing ${failure.failedMsg}") + } + ) + .propsForTarget(potentiallyFailingService), + "serviceCircuitBreaker" + ) + + import context.dispatcher + + override def receive: Receive = { + case AskFor(requestToForward) => + (serviceCircuitBreaker ? Request(requestToForward)).mapTo[Either[String, String]].onComplete { + case Success(Right(successResponse)) => + //handle response + log.info("Got successful response {}", successResponse) + + case Success(Left(failureResponse)) => + //handle response + log.info("Got successful response {}", failureResponse) + + case Failure(exception) => + //handle response + log.info("Got successful response {}", exception) + + } + + } + } + + +If it is not possible to define define a specific error response, you can map the Open Circuit notification into a failure. +That also means that your `CircuitBreakerActor` will be essentially useful to protect you from time out for extra workload or +temporary failures in the target actor :: + + class CircuitBreakerAskWithFailureExample(potentiallyFailingService: ActorRef) extends Actor with ActorLogging { + import SimpleService._ + import akka.pattern._ + import CircuitBreakerActor._ + + implicit val askTimeout: Timeout = 2.seconds + + val serviceCircuitBreaker = + context.actorOf( + CircuitBreakerActorBuilder( maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds ).propsForTarget(potentiallyFailingService), + "serviceCircuitBreaker" + ) + + import context.dispatcher + + override def receive: Receive = { + case AskFor(requestToForward) => + (serviceCircuitBreaker ? Request(requestToForward)).failForOpenCircuit.mapTo[String].onComplete { + case Success(successResponse) => + //handle response + log.info("Got successful response {}", successResponse) + + case Failure(exception) => + //handle response + log.info("Got successful response {}", exception) + + } + } + } diff --git a/akka-contrib/src/main/scala/akka/contrib/circuitbreaker/CircuitBreakerActor.scala b/akka-contrib/src/main/scala/akka/contrib/circuitbreaker/CircuitBreakerActor.scala new file mode 100644 index 0000000000..5cd40dde57 --- /dev/null +++ b/akka-contrib/src/main/scala/akka/contrib/circuitbreaker/CircuitBreakerActor.scala @@ -0,0 +1,266 @@ +/** + * Copyright (C) 2014-2015 Typesafe Inc. + */ +package akka.contrib.circuitbreaker + +import akka.actor._ +import akka.pattern._ +import akka.util.Timeout + +import scala.concurrent.{ ExecutionContext, Future } +import scala.util.{ Failure, Success } + +object CircuitBreakerActor { + + /** + * Creates an circuit breaker actor proxying a target actor intended for request-reply interactions. + * It is possible to send messages through this proxy without expecting a response wrapping them into a + * [[akka.contrib.circuitbreaker.CircuitBreakerActor.TellOnly]] + * + * The circuit breaker implements the same state machine documented in [[akka.pattern.CircuitBreaker]] + * + * @param target the actor to proxy + * @param maxFailures maximum number of failures before opening the circuit + * @param callTimeout timeout before considering the ongoing call a failure + * @param resetTimeout time after which the channel will be closed after entering the open state + * @param circuitEventListener an actor that will receive a series of messages of type + * [[akka.contrib.circuitbreaker.CircuitBreakerActor.CircuitBreakerEvent]] + * @param failureDetector function to detect if the a message received from the target actor as + * response from a request represent a failure + * @param failureMap function to map a failure into a response message. The failing response message is wrapped + * into a [[akka.contrib.circuitbreaker.CircuitBreakerActor.CircuitOpenFailure]] object + * @return + */ + def props(target: ActorRef, + maxFailures: Int, + callTimeout: Timeout, + resetTimeout: Timeout, + circuitEventListener: Option[ActorRef], + failureDetector: Any ⇒ Boolean, + failureMap: CircuitOpenFailure ⇒ Any) = + Props(new CircuitBreakerActor(target, maxFailures, callTimeout, resetTimeout, circuitEventListener, failureDetector, failureMap)) + + sealed trait CircuitBreakerCommand + + case class TellOnly(msg: Any) extends CircuitBreakerCommand + + sealed trait CircuitBreakerResponse + case class CircuitOpenFailure(failedMsg: Any) + + sealed trait CircuitBreakerEvent + case class CircuitOpen(circuit: ActorRef) extends CircuitBreakerCommand + case class CircuitClosed(circuit: ActorRef) extends CircuitBreakerCommand + case class CircuitHalfOpen(circuit: ActorRef) extends CircuitBreakerCommand + + sealed trait CircuitBreakerState + case object Open extends CircuitBreakerState + case object Closed extends CircuitBreakerState + case object HalfOpen extends CircuitBreakerState + + case class CircuitBreakerStateData(failureCount: Int = 0, firstHalfOpenMessageSent: Boolean = false) + + /** + * Convenience builder with appropriate defaults to create the CircuitBreakerActor props + * + * @param maxFailures maximum number of failures before opening the circuit + * @param callTimeout timeout before considering the ongoing call a failure + * @param resetTimeout time after which the channel will be closed after entering the open state + * @param circuitEventListener an actor that will receive a series of messages of type + * [[akka.contrib.circuitbreaker.CircuitBreakerActor.CircuitBreakerEvent]] + * defaults to None + * @param failureDetector function to detect if the a message received from the target actor as + * response from a request represent a failure, defaults to a function accepting every + * response making this circuit breaker be activated by response timeouts only + * + * @param openCircuitFailureConverter function to map a failure into a response message. + * Defaults to an identify function + */ + case class CircuitBreakerActorBuilder( + maxFailures: Int, callTimeout: Timeout, resetTimeout: Timeout, + circuitEventListener: Option[ActorRef] = None, + failureDetector: Any ⇒ Boolean = { _ ⇒ false }, + openCircuitFailureConverter: CircuitOpenFailure ⇒ Any = identity) { + + /** + * Creates the props for a [[akka.contrib.circuitbreaker.CircuitBreakerActor]] proxying the given target + * + * @param target the target actor ref + * @return + */ + def propsForTarget(target: ActorRef) = CircuitBreakerActor.props(target, maxFailures, callTimeout, resetTimeout, circuitEventListener, failureDetector, openCircuitFailureConverter) + + } + + class OpenCircuitException extends Exception("Circuit Open so unable to complete operation") + + /** + * Extends [[scala.concurrent.Future]] with the method failForOpenCircuitWith to handle + * [[akka.contrib.circuitbreaker.CircuitBreakerActor.CircuitOpenFailure]] failure responses throwing + * an exception built with the given exception builder + * + * @param future + */ + implicit class CircuitBreakerAwareFuture(val future: Future[Any]) extends AnyVal { + def failForOpenCircuit(implicit executionContext: ExecutionContext): Future[Any] = failForOpenCircuitWith(new OpenCircuitException) + + def failForOpenCircuitWith(throwing: ⇒ Throwable)(implicit executionContext: ExecutionContext): Future[Any] = { + future.flatMap { + _ match { + case CircuitOpenFailure(_) ⇒ Future.failed(throwing) + case result ⇒ Future.successful(result) + } + } + } + + } + +} + +object CircuitBreakerInternalEvents { + sealed trait CircuitBreakerInternalEvent + case object CallFailed extends CircuitBreakerInternalEvent + case object CallSucceeded extends CircuitBreakerInternalEvent +} + +import CircuitBreakerActor._ +import CircuitBreakerInternalEvents._ + +class CircuitBreakerActor( + target: ActorRef, + maxFailures: Int, + callTimeout: Timeout, + resetTimeout: Timeout, + circuitEventListener: Option[ActorRef], + failureDetector: Any ⇒ Boolean, + failureMap: CircuitOpenFailure ⇒ Any) extends Actor with ActorLogging with FSM[CircuitBreakerState, CircuitBreakerStateData] { + + startWith(Closed, CircuitBreakerStateData(failureCount = 0)) + + def callSucceededHandling: StateFunction = { + case Event(CallSucceeded, state) ⇒ + log.debug("Received call succeeded notification in state {} resetting counter", state) + goto(Closed) using CircuitBreakerStateData(failureCount = 0, firstHalfOpenMessageSent = false) + } + + when(Closed) { + callSucceededHandling orElse { + case Event(TellOnly(message), _) ⇒ + log.debug("CLOSED: Sending message {} without expecting any response", message) + target ! message + stay + + case Event(CallFailed, state) ⇒ + log.debug("Received call failed notification in state {} incrementing counter", state) + val newState = state.copy(failureCount = state.failureCount + 1) + if (newState.failureCount < maxFailures) { + stay using newState + } else { + goto(Open) using newState + } + + case Event(message, state) ⇒ + log.debug("CLOSED: Sending message {} expecting a response withing timeout {}", message, callTimeout) + val currentSender = sender() + forwardRequest(message, sender, state) + stay + + } + } + + when(Open, stateTimeout = resetTimeout.duration) { + callSucceededHandling orElse { + case Event(StateTimeout, state) ⇒ + log.debug("Timeout expired for state OPEN, going to half open") + goto(HalfOpen) using state.copy(firstHalfOpenMessageSent = false) + + case Event(CallFailed, state) ⇒ + log.debug("OPEN: Call received a further call failed notification, probably from a previous timed out event, ignoring") + stay + + case Event(openNotification @ CircuitOpenFailure(_), _) ⇒ + log.error("Why did I send message {} to myself?", openNotification) + stay + + case Event(message, state) ⇒ + val failureNotification = failureMap(CircuitOpenFailure(message)) + log.debug("OPEN: Failing request for message {}, sending failure notification {} to sender {}", message, failureNotification, sender) + sender ! failureNotification + stay + + } + } + + when(HalfOpen) { + callSucceededHandling orElse { + case Event(TellOnly(message), _) ⇒ + log.debug("HALF-OPEN: Dropping TellOnly request for message {}", message) + stay + + case Event(CallFailed, CircuitBreakerStateData(_, true)) ⇒ + log.debug("HALF-OPEN: First forwarded call failed returning to OPEN state") + goto(Open) + + case Event(CallFailed, CircuitBreakerStateData(_, false)) ⇒ + log.debug("HALF-OPEN: Call received a further call failed notification, probably from a previous timed out event, ignoring") + stay + + case Event(message, state @ CircuitBreakerStateData(_, false)) ⇒ + log.debug("HALF-OPEN: First message {} received, forwarding it to target {}", message, target) + forwardRequest(message, sender, state) + stay using state.copy(firstHalfOpenMessageSent = true) + + case Event(message, CircuitBreakerStateData(_, true)) ⇒ + val failureNotification = failureMap(CircuitOpenFailure(message)) + log.debug("HALF-OPEN: Failing request for message {}, sending failure notification {} to sender {}", message, failureNotification, sender) + sender ! failureNotification + stay + } + } + + def forwardRequest(message: Any, currentSender: ActorRef, state: CircuitBreakerStateData) = { + import context.dispatcher + + target.ask(message)(callTimeout).onComplete { + case Success(response) ⇒ + log.debug("Request '{}' has been replied to with response {}, forwarding to original sender {}", message, currentSender) + + currentSender ! response + + val isFailure = failureDetector(response) + + if (isFailure) { + log.debug("Response '{}' is considered as failure sending self-message to ask incrementing failure count (origin state was {})", + response, state) + + self ! CallFailed + } else { + + log.debug("Request '{}' succeeded with response {}, returning response to sender {} and sending message to ask to reset failure count (origin state was {})", + message, response, currentSender, state) + + self ! CallSucceeded + } + + case Failure(reason) ⇒ + log.debug("Request '{}' to target {} failed with exception {}, sending self-message to ask incrementing failure count (origin state was {})", + message, target, reason, state) + + self ! CallFailed + } + } + + onTransition { + case from -> Closed ⇒ + log.debug("Moving from state {} to state CLOSED", from) + circuitEventListener foreach { _ ! CircuitClosed(self) } + + case from -> HalfOpen ⇒ + log.debug("Moving from state {} to state HALF OPEN", from) + circuitEventListener foreach { _ ! CircuitHalfOpen(self) } + + case from -> Open ⇒ + log.debug("Moving from state {} to state OPEN", from) + circuitEventListener foreach { _ ! CircuitOpen(self) } + } + +} diff --git a/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/CircuitBreakerActorSpec.scala b/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/CircuitBreakerActorSpec.scala new file mode 100644 index 0000000000..be2a2abce2 --- /dev/null +++ b/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/CircuitBreakerActorSpec.scala @@ -0,0 +1,302 @@ +/** + * Copyright (C) 2014-2015 Typesafe Inc. + */ +package akka.contrib.circuitbreaker + +import akka.actor.ActorRef +import akka.contrib.circuitbreaker.CircuitBreakerActor._ +import akka.testkit.{ AkkaSpec, TestProbe } +import org.scalatest.GivenWhenThen + +import scala.concurrent.duration._ +import scala.language.postfixOps + +class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { + + val baseCircuitBreakerBuilder = + CircuitBreakerActorBuilder( + maxFailures = 2, + callTimeout = 200 millis, + resetTimeout = 1 second, + failureDetector = { + _ == "FAILURE" + }) + + trait CircuitBreakerScenario { + val sender = TestProbe() + val eventListener = TestProbe() + val receiver = TestProbe() + + def circuitBreaker: ActorRef + + def defaultCircuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) + + def receiverRespondsWithFailureToRequest(request: Any) = { + sender.send(circuitBreaker, request) + receiver.expectMsg(request) + receiver.reply("FAILURE") + sender.expectMsg("FAILURE") + } + + def receiverRespondsToRequestWith(request: Any, reply: Any) = { + sender.send(circuitBreaker, request) + receiver.expectMsg(request) + receiver.reply(reply) + sender.expectMsg(reply) + } + + def waitForCircuitBreakerToReceiveSelfNotificationMessage = Thread.sleep(baseCircuitBreakerBuilder.resetTimeout.duration.toMillis / 4) + + def waitForResetTimeoutToExpire = Thread.sleep(baseCircuitBreakerBuilder.resetTimeout.duration.toMillis + 100) + + def messageIsRejectedWithOpenCircuitNotification(message: Any) = { + sender.send(circuitBreaker, message) + sender.expectMsg(CircuitOpenFailure(message)) + } + + } + + "CircuitBreakerActor" should { + + "act as a transparent proxy in case of successful requests-replies - forward to target" in { + + val sender = TestProbe() + val receiver = TestProbe() + + val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) + + sender.send(circuitBreaker, "test message") + + receiver.expectMsg("test message") + } + + "act as a transparent proxy in case of successful requests-replies - full cycle" in { + + val sender = TestProbe() + val receiver = TestProbe() + val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) + + sender.send(circuitBreaker, "test message") + + receiver.expectMsg("test message") + receiver.reply("response") + + sender.expectMsg("response") + } + + "forward further messages before receiving the response of the first one" in { + val sender = TestProbe() + val receiver = TestProbe() + val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) + + sender.send(circuitBreaker, "test message1") + sender.send(circuitBreaker, "test message2") + sender.send(circuitBreaker, "test message3") + + receiver.expectMsg("test message1") + receiver.expectMsg("test message2") + receiver.expectMsg("test message3") + } + + "send responses to the right sender" in { + val sender1 = TestProbe() + val sender2 = TestProbe() + val receiver = TestProbe() + val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) + + sender1.send(circuitBreaker, "test message1") + sender2.send(circuitBreaker, "test message2") + + receiver.expectMsg("test message1") + receiver.reply("response1") + + receiver.expectMsg("test message2") + receiver.reply("response2") + + sender1.expectMsg("response1") + sender2.expectMsg("response2") + } + + "return failed responses too" in { + val sender = TestProbe() + val receiver = TestProbe() + val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) + + sender.send(circuitBreaker, "request") + + receiver.expectMsg("request") + receiver.reply("FAILURE") + + sender.expectMsg("FAILURE") + } + + "enter open state after reaching the threshold of failed responses" in new CircuitBreakerScenario { + val circuitBreaker = defaultCircuitBreaker + + (1 to baseCircuitBreakerBuilder.maxFailures) foreach { index ⇒ + receiverRespondsWithFailureToRequest(s"request$index") + } + + waitForCircuitBreakerToReceiveSelfNotificationMessage + + sender.send(circuitBreaker, "request in open state") + receiver.expectNoMsg + } + + "respond with a CircuitOpenFailure message when in open state " in new CircuitBreakerScenario { + val circuitBreaker = defaultCircuitBreaker + + (1 to baseCircuitBreakerBuilder.maxFailures) foreach { index ⇒ + receiverRespondsWithFailureToRequest(s"request$index") + } + + waitForCircuitBreakerToReceiveSelfNotificationMessage + + sender.send(circuitBreaker, "request in open state") + sender.expectMsg(CircuitOpenFailure("request in open state")) + } + + "respond with the converted CircuitOpenFailure if a converter is provided" in new CircuitBreakerScenario { + val circuitBreaker = system.actorOf( + baseCircuitBreakerBuilder + .copy(openCircuitFailureConverter = { failureMsg ⇒ s"NOT SENT: ${failureMsg.failedMsg}" }) + .propsForTarget(receiver.ref)) + + (1 to baseCircuitBreakerBuilder.maxFailures) foreach { index ⇒ + receiverRespondsWithFailureToRequest(s"request$index") + } + + waitForCircuitBreakerToReceiveSelfNotificationMessage + + sender.send(circuitBreaker, "request in open state") + sender.expectMsg("NOT SENT: request in open state") + } + + "enter open state after reaching the threshold of timed-out responses" in { + val sender = TestProbe() + val receiver = TestProbe() + val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) + + sender.send(circuitBreaker, "request1") + sender.send(circuitBreaker, "request2") + + Thread.sleep(baseCircuitBreakerBuilder.callTimeout.duration.toMillis + 100) + + receiver.expectMsg("request1") + receiver.reply("this should be timed out 1") + + receiver.expectMsg("request2") + receiver.reply("this should be timed out 2") + + // Have to wait a bit to let the circuit breaker receive the self notification message + Thread.sleep(300) + + sender.send(circuitBreaker, "request in open state") + receiver.expectNoMsg + } + + "enter HALF OPEN state after the given state timeout, sending the first message only" in new CircuitBreakerScenario { + Given("A circuit breaker actor pointing to a test probe") + val circuitBreaker = defaultCircuitBreaker + + When("ENTERING OPEN STATE") + receiverRespondsWithFailureToRequest("request1") + receiverRespondsWithFailureToRequest("request2") + + waitForCircuitBreakerToReceiveSelfNotificationMessage + + Then("Messages are ignored") + messageIsRejectedWithOpenCircuitNotification("IGNORED SINCE IN OPEN STATE1") + messageIsRejectedWithOpenCircuitNotification("IGNORED SINCE IN OPEN STATE2") + + When("ENTERING HALF OPEN STATE") + waitForResetTimeoutToExpire + + Then("First message should be forwarded, following ones ignored if the failure persist") + sender.send(circuitBreaker, "First message in half-open state, should be forwarded") + sender.send(circuitBreaker, "Second message in half-open state, should be ignored") + + receiver.expectMsg("First message in half-open state, should be forwarded") + receiver.expectNoMsg() + + sender.expectMsg(CircuitOpenFailure("Second message in half-open state, should be ignored")) + + } + + "return to CLOSED state from HALF-OPEN if a successful message response notification is received" in new CircuitBreakerScenario { + Given("A circuit breaker actor pointing to a test probe") + val circuitBreaker = defaultCircuitBreaker + + When("Entering HALF OPEN state") + receiverRespondsWithFailureToRequest("request1") + receiverRespondsWithFailureToRequest("request2") + + waitForResetTimeoutToExpire + + And("Receiving a successful response") + receiverRespondsToRequestWith("First message in half-open state, should be forwarded", "This should close the circuit") + + waitForCircuitBreakerToReceiveSelfNotificationMessage + + Then("circuit is re-closed") + sender.send(circuitBreaker, "request1") + receiver.expectMsg("request1") + + sender.send(circuitBreaker, "request2") + receiver.expectMsg("request2") + + } + + "return to OPEN state from HALF-OPEN if a FAILURE message response is received" in new CircuitBreakerScenario { + Given("A circuit breaker actor pointing to a test probe") + val circuitBreaker = defaultCircuitBreaker + + When("Entering HALF OPEN state") + receiverRespondsWithFailureToRequest("request1") + receiverRespondsWithFailureToRequest("request2") + + waitForResetTimeoutToExpire + + And("Receiving a failure response") + receiverRespondsWithFailureToRequest("First message in half-open state, should be forwarded") + + waitForCircuitBreakerToReceiveSelfNotificationMessage + + Then("circuit is opened again") + sender.send(circuitBreaker, "this should be ignored") + receiver.expectNoMsg() + sender.expectMsg(CircuitOpenFailure("this should be ignored")) + + } + + "Notify an event status change listener when changing state" in new CircuitBreakerScenario { + Given("A circuit breaker actor pointing to a test probe") + override val circuitBreaker = system.actorOf( + baseCircuitBreakerBuilder + .copy(circuitEventListener = Some(eventListener.ref)) + .propsForTarget(receiver.ref)) + + When("Entering OPEN state") + receiverRespondsWithFailureToRequest("request1") + receiverRespondsWithFailureToRequest("request2") + + waitForCircuitBreakerToReceiveSelfNotificationMessage + + Then("An event is sent") + eventListener.expectMsg(CircuitOpen(circuitBreaker)) + + When("Entering HALF OPEN state") + waitForResetTimeoutToExpire + + Then("An event is sent") + eventListener.expectMsg(CircuitHalfOpen(circuitBreaker)) + + When("Entering CLOSED state") + receiverRespondsToRequestWith("First message in half-open state, should be forwarded", "This should close the circuit") + Then("An event is sent") + eventListener.expectMsg(CircuitClosed(circuitBreaker)) + + } + } + +}