diff --git a/akka-contrib/src/main/scala/akka/contrib/circuitbreaker/CircuitBreakerActor.scala b/akka-contrib/src/main/scala/akka/contrib/circuitbreaker/CircuitBreakerActor.scala index 4c38ce3779..60b251001c 100644 --- a/akka-contrib/src/main/scala/akka/contrib/circuitbreaker/CircuitBreakerActor.scala +++ b/akka-contrib/src/main/scala/akka/contrib/circuitbreaker/CircuitBreakerActor.scala @@ -11,6 +11,11 @@ import akka.util.Timeout import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Failure, Success } +/** + * This is an Actor which implements the circuit breaker pattern, + * you may also be interested in the raw circuit breaker [[akka.pattern.CircuitBreaker]] + * + */ object CircuitBreakerActor { /** @@ -59,7 +64,7 @@ object CircuitBreakerActor { final case class CircuitBreakerStateData(failureCount: Int = 0, firstHalfOpenMessageSent: Boolean = false) - final case class CircuitBreakerActorBuilder( + final case class CircuitBreakerActorPropsBuilder( maxFailures: Int, callTimeout: Timeout, resetTimeout: Timeout, circuitEventListener: Option[ActorRef] = None, failureDetector: Any ⇒ Boolean = { _ ⇒ false }, @@ -70,11 +75,11 @@ object CircuitBreakerActor { * * @param target the target actor ref */ - def propsForTarget(target: ActorRef) = CircuitBreakerActor.props(target, maxFailures, callTimeout, resetTimeout, circuitEventListener, failureDetector, openCircuitFailureConverter) + def props(target: ActorRef) = CircuitBreakerActor.props(target, maxFailures, callTimeout, resetTimeout, circuitEventListener, failureDetector, openCircuitFailureConverter) } - class OpenCircuitException extends Exception("Circuit Open so unable to complete operation") + final class OpenCircuitException extends Exception("Circuit Open so unable to complete operation") /** * Extends [[scala.concurrent.Future]] with the method failForOpenCircuitWith to handle @@ -106,7 +111,7 @@ object CircuitBreakerInternalEvents { import CircuitBreakerActor._ import CircuitBreakerInternalEvents._ -class CircuitBreakerActor( +final class CircuitBreakerActor( target: ActorRef, maxFailures: Int, callTimeout: Timeout, diff --git a/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/CircuitBreakerActorSpec.scala b/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/CircuitBreakerActorSpec.scala index 4b86c49ea5..237e4a55db 100644 --- a/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/CircuitBreakerActorSpec.scala +++ b/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/CircuitBreakerActorSpec.scala @@ -13,8 +13,8 @@ import scala.language.postfixOps class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { - val baseCircuitBreakerBuilder = - CircuitBreakerActorBuilder( + val baseCircuitBreakerPropsBuilder = + CircuitBreakerActorPropsBuilder( maxFailures = 2, callTimeout = 200 millis, resetTimeout = 1 second, @@ -29,7 +29,7 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { def circuitBreaker: ActorRef - def defaultCircuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) + def defaultCircuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref)) def receiverRespondsWithFailureToRequest(request: Any) = { sender.send(circuitBreaker, request) @@ -45,9 +45,11 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { sender.expectMsg(reply) } - def waitForCircuitBreakerToReceiveSelfNotificationMessage = Thread.sleep(baseCircuitBreakerBuilder.resetTimeout.duration.toMillis / 4) + def circuitBreakerToReceivesSelfNotificationMessage = + receiver.expectNoMsg(baseCircuitBreakerPropsBuilder.resetTimeout.duration / 4) - def waitForResetTimeoutToExpire = Thread.sleep(baseCircuitBreakerBuilder.resetTimeout.duration.toMillis + 100) + def resetTimeoutToExpires = + receiver.expectNoMsg(baseCircuitBreakerPropsBuilder.resetTimeout.duration + 100.millis) def messageIsRejectedWithOpenCircuitNotification(message: Any) = { sender.send(circuitBreaker, message) @@ -63,7 +65,7 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { val sender = TestProbe() val receiver = TestProbe() - val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) + val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref)) sender.send(circuitBreaker, "test message") @@ -74,7 +76,7 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { val sender = TestProbe() val receiver = TestProbe() - val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) + val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref)) sender.send(circuitBreaker, "test message") @@ -87,7 +89,7 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { "forward further messages before receiving the response of the first one" in { val sender = TestProbe() val receiver = TestProbe() - val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) + val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref)) sender.send(circuitBreaker, "test message1") sender.send(circuitBreaker, "test message2") @@ -102,7 +104,7 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { val sender1 = TestProbe() val sender2 = TestProbe() val receiver = TestProbe() - val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) + val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref)) sender1.send(circuitBreaker, "test message1") sender2.send(circuitBreaker, "test message2") @@ -120,7 +122,7 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { "return failed responses too" in { val sender = TestProbe() val receiver = TestProbe() - val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) + val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref)) sender.send(circuitBreaker, "request") @@ -133,11 +135,11 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { "enter open state after reaching the threshold of failed responses" in new CircuitBreakerScenario { val circuitBreaker = defaultCircuitBreaker - (1 to baseCircuitBreakerBuilder.maxFailures) foreach { index ⇒ + (1 to baseCircuitBreakerPropsBuilder.maxFailures) foreach { index ⇒ receiverRespondsWithFailureToRequest(s"request$index") } - waitForCircuitBreakerToReceiveSelfNotificationMessage + circuitBreakerToReceivesSelfNotificationMessage sender.send(circuitBreaker, "request in open state") receiver.expectNoMsg @@ -146,11 +148,11 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { "respond with a CircuitOpenFailure message when in open state " in new CircuitBreakerScenario { val circuitBreaker = defaultCircuitBreaker - (1 to baseCircuitBreakerBuilder.maxFailures) foreach { index ⇒ + (1 to baseCircuitBreakerPropsBuilder.maxFailures) foreach { index ⇒ receiverRespondsWithFailureToRequest(s"request$index") } - waitForCircuitBreakerToReceiveSelfNotificationMessage + circuitBreakerToReceivesSelfNotificationMessage sender.send(circuitBreaker, "request in open state") sender.expectMsg(CircuitOpenFailure("request in open state")) @@ -158,15 +160,15 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { "respond with the converted CircuitOpenFailure if a converter is provided" in new CircuitBreakerScenario { val circuitBreaker = system.actorOf( - baseCircuitBreakerBuilder + baseCircuitBreakerPropsBuilder .copy(openCircuitFailureConverter = { failureMsg ⇒ s"NOT SENT: ${failureMsg.failedMsg}" }) - .propsForTarget(receiver.ref)) + .props(receiver.ref)) - (1 to baseCircuitBreakerBuilder.maxFailures) foreach { index ⇒ + (1 to baseCircuitBreakerPropsBuilder.maxFailures) foreach { index ⇒ receiverRespondsWithFailureToRequest(s"request$index") } - waitForCircuitBreakerToReceiveSelfNotificationMessage + circuitBreakerToReceivesSelfNotificationMessage sender.send(circuitBreaker, "request in open state") sender.expectMsg("NOT SENT: request in open state") @@ -175,12 +177,12 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { "enter open state after reaching the threshold of timed-out responses" in { val sender = TestProbe() val receiver = TestProbe() - val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) + val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref)) sender.send(circuitBreaker, "request1") sender.send(circuitBreaker, "request2") - Thread.sleep(baseCircuitBreakerBuilder.callTimeout.duration.toMillis + 100) + Thread.sleep(baseCircuitBreakerPropsBuilder.callTimeout.duration.toMillis + 100) receiver.expectMsg("request1") receiver.reply("this should be timed out 1") @@ -203,14 +205,14 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { receiverRespondsWithFailureToRequest("request1") receiverRespondsWithFailureToRequest("request2") - waitForCircuitBreakerToReceiveSelfNotificationMessage + circuitBreakerToReceivesSelfNotificationMessage Then("Messages are ignored") messageIsRejectedWithOpenCircuitNotification("IGNORED SINCE IN OPEN STATE1") messageIsRejectedWithOpenCircuitNotification("IGNORED SINCE IN OPEN STATE2") When("ENTERING HALF OPEN STATE") - waitForResetTimeoutToExpire + resetTimeoutToExpires Then("First message should be forwarded, following ones ignored if the failure persist") sender.send(circuitBreaker, "First message in half-open state, should be forwarded") @@ -231,12 +233,12 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { receiverRespondsWithFailureToRequest("request1") receiverRespondsWithFailureToRequest("request2") - waitForResetTimeoutToExpire + resetTimeoutToExpires And("Receiving a successful response") receiverRespondsToRequestWith("First message in half-open state, should be forwarded", "This should close the circuit") - waitForCircuitBreakerToReceiveSelfNotificationMessage + circuitBreakerToReceivesSelfNotificationMessage Then("circuit is re-closed") sender.send(circuitBreaker, "request1") @@ -255,12 +257,12 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { receiverRespondsWithFailureToRequest("request1") receiverRespondsWithFailureToRequest("request2") - waitForResetTimeoutToExpire + resetTimeoutToExpires And("Receiving a failure response") receiverRespondsWithFailureToRequest("First message in half-open state, should be forwarded") - waitForCircuitBreakerToReceiveSelfNotificationMessage + circuitBreakerToReceivesSelfNotificationMessage Then("circuit is opened again") sender.send(circuitBreaker, "this should be ignored") @@ -272,21 +274,21 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { "Notify an event status change listener when changing state" in new CircuitBreakerScenario { Given("A circuit breaker actor pointing to a test probe") override val circuitBreaker = system.actorOf( - baseCircuitBreakerBuilder + baseCircuitBreakerPropsBuilder .copy(circuitEventListener = Some(eventListener.ref)) - .propsForTarget(receiver.ref)) + .props(target = receiver.ref)) When("Entering OPEN state") receiverRespondsWithFailureToRequest("request1") receiverRespondsWithFailureToRequest("request2") - waitForCircuitBreakerToReceiveSelfNotificationMessage + circuitBreakerToReceivesSelfNotificationMessage Then("An event is sent") eventListener.expectMsg(CircuitOpen(circuitBreaker)) When("Entering HALF OPEN state") - waitForResetTimeoutToExpire + resetTimeoutToExpires Then("An event is sent") eventListener.expectMsg(CircuitHalfOpen(circuitBreaker)) diff --git a/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/sample/CircuitBreaker.scala b/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/sample/CircuitBreaker.scala index d7ead3344a..c16b19dfe1 100644 --- a/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/sample/CircuitBreaker.scala +++ b/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/sample/CircuitBreaker.scala @@ -1,7 +1,7 @@ package akka.contrib.circuitbreaker.sample import akka.actor.{ Actor, ActorLogging, ActorRef } -import akka.contrib.circuitbreaker.CircuitBreakerActor.{ CircuitBreakerActorBuilder, CircuitOpenFailure } +import akka.contrib.circuitbreaker.CircuitBreakerActor.{ CircuitBreakerActorPropsBuilder, CircuitOpenFailure } import scala.concurrent.duration._ @@ -11,7 +11,7 @@ class CircuitBreaker(potentiallyFailingService: ActorRef) extends Actor with Act val serviceCircuitBreaker = context.actorOf( - CircuitBreakerActorBuilder(maxFailures = 3, callTimeout = 2.seconds, resetTimeout = 30.seconds) + CircuitBreakerActorPropsBuilder(maxFailures = 3, callTimeout = 2.seconds, resetTimeout = 30.seconds) .copy( failureDetector = { _ match { @@ -19,7 +19,7 @@ class CircuitBreaker(potentiallyFailingService: ActorRef) extends Actor with Act case _ ⇒ false } }) - .propsForTarget(potentiallyFailingService), + .props(potentiallyFailingService), "serviceCircuitBreaker") override def receive: Receive = { diff --git a/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/sample/CircuitBreakerAsk.scala b/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/sample/CircuitBreakerAsk.scala index 431908d952..d54a7a0eee 100644 --- a/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/sample/CircuitBreakerAsk.scala +++ b/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/sample/CircuitBreakerAsk.scala @@ -1,7 +1,7 @@ package akka.contrib.circuitbreaker.sample import akka.actor.{ Actor, ActorLogging, ActorRef } -import akka.contrib.circuitbreaker.CircuitBreakerActor.CircuitBreakerActorBuilder +import akka.contrib.circuitbreaker.CircuitBreakerActor.CircuitBreakerActorPropsBuilder import akka.util.Timeout import scala.concurrent.duration._ @@ -16,7 +16,7 @@ class CircuitBreakerAsk(potentiallyFailingService: ActorRef) extends Actor with val serviceCircuitBreaker = context.actorOf( - CircuitBreakerActorBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds) + CircuitBreakerActorPropsBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds) .copy( failureDetector = { _ match { @@ -28,7 +28,7 @@ class CircuitBreakerAsk(potentiallyFailingService: ActorRef) extends Actor with openCircuitFailureConverter = { failure ⇒ Left(s"Circuit open when processing ${failure.failedMsg}") }) - .propsForTarget(potentiallyFailingService), + .props(potentiallyFailingService), "serviceCircuitBreaker") import context.dispatcher diff --git a/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/sample/CircuitBreakerAskWithFailure.scala b/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/sample/CircuitBreakerAskWithFailure.scala index 44471d7e79..d6db18279c 100644 --- a/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/sample/CircuitBreakerAskWithFailure.scala +++ b/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/sample/CircuitBreakerAskWithFailure.scala @@ -16,7 +16,7 @@ class CircuitBreakerAskWithFailure(potentiallyFailingService: ActorRef) extends val serviceCircuitBreaker = context.actorOf( - CircuitBreakerActorBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds).propsForTarget(potentiallyFailingService), + CircuitBreakerActorPropsBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds).props(potentiallyFailingService), "serviceCircuitBreaker") import context.dispatcher