Post revision changes:

- Added reference to the circuit breaker pattern function in the CircuitBreakerActor scaladoc
- made `final` classes `OpenCircuitException` and `CircuitBreakerActor`
- Renamed `CircuitBreakerActorBuilder` into `CircuitBreakerActorPropsBuilder` and renamed the `propsForTarget` method into `props`
This commit is contained in:
galarragas 2015-12-31 18:46:29 +00:00
parent fd5b872a43
commit b1980992f9
5 changed files with 48 additions and 41 deletions

View file

@ -11,6 +11,11 @@ import akka.util.Timeout
import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success } import scala.util.{ Failure, Success }
/**
* This is an Actor which implements the circuit breaker pattern,
* you may also be interested in the raw circuit breaker [[akka.pattern.CircuitBreaker]]
*
*/
object CircuitBreakerActor { object CircuitBreakerActor {
/** /**
@ -59,7 +64,7 @@ object CircuitBreakerActor {
final case class CircuitBreakerStateData(failureCount: Int = 0, firstHalfOpenMessageSent: Boolean = false) final case class CircuitBreakerStateData(failureCount: Int = 0, firstHalfOpenMessageSent: Boolean = false)
final case class CircuitBreakerActorBuilder( final case class CircuitBreakerActorPropsBuilder(
maxFailures: Int, callTimeout: Timeout, resetTimeout: Timeout, maxFailures: Int, callTimeout: Timeout, resetTimeout: Timeout,
circuitEventListener: Option[ActorRef] = None, circuitEventListener: Option[ActorRef] = None,
failureDetector: Any Boolean = { _ false }, failureDetector: Any Boolean = { _ false },
@ -70,11 +75,11 @@ object CircuitBreakerActor {
* *
* @param target the target actor ref * @param target the target actor ref
*/ */
def propsForTarget(target: ActorRef) = CircuitBreakerActor.props(target, maxFailures, callTimeout, resetTimeout, circuitEventListener, failureDetector, openCircuitFailureConverter) def props(target: ActorRef) = CircuitBreakerActor.props(target, maxFailures, callTimeout, resetTimeout, circuitEventListener, failureDetector, openCircuitFailureConverter)
} }
class OpenCircuitException extends Exception("Circuit Open so unable to complete operation") final class OpenCircuitException extends Exception("Circuit Open so unable to complete operation")
/** /**
* Extends [[scala.concurrent.Future]] with the method failForOpenCircuitWith to handle * Extends [[scala.concurrent.Future]] with the method failForOpenCircuitWith to handle
@ -106,7 +111,7 @@ object CircuitBreakerInternalEvents {
import CircuitBreakerActor._ import CircuitBreakerActor._
import CircuitBreakerInternalEvents._ import CircuitBreakerInternalEvents._
class CircuitBreakerActor( final class CircuitBreakerActor(
target: ActorRef, target: ActorRef,
maxFailures: Int, maxFailures: Int,
callTimeout: Timeout, callTimeout: Timeout,

View file

@ -13,8 +13,8 @@ import scala.language.postfixOps
class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen { class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
val baseCircuitBreakerBuilder = val baseCircuitBreakerPropsBuilder =
CircuitBreakerActorBuilder( CircuitBreakerActorPropsBuilder(
maxFailures = 2, maxFailures = 2,
callTimeout = 200 millis, callTimeout = 200 millis,
resetTimeout = 1 second, resetTimeout = 1 second,
@ -29,7 +29,7 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
def circuitBreaker: ActorRef def circuitBreaker: ActorRef
def defaultCircuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) def defaultCircuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref))
def receiverRespondsWithFailureToRequest(request: Any) = { def receiverRespondsWithFailureToRequest(request: Any) = {
sender.send(circuitBreaker, request) sender.send(circuitBreaker, request)
@ -45,9 +45,11 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
sender.expectMsg(reply) sender.expectMsg(reply)
} }
def waitForCircuitBreakerToReceiveSelfNotificationMessage = Thread.sleep(baseCircuitBreakerBuilder.resetTimeout.duration.toMillis / 4) def circuitBreakerToReceivesSelfNotificationMessage =
receiver.expectNoMsg(baseCircuitBreakerPropsBuilder.resetTimeout.duration / 4)
def waitForResetTimeoutToExpire = Thread.sleep(baseCircuitBreakerBuilder.resetTimeout.duration.toMillis + 100) def resetTimeoutToExpires =
receiver.expectNoMsg(baseCircuitBreakerPropsBuilder.resetTimeout.duration + 100.millis)
def messageIsRejectedWithOpenCircuitNotification(message: Any) = { def messageIsRejectedWithOpenCircuitNotification(message: Any) = {
sender.send(circuitBreaker, message) sender.send(circuitBreaker, message)
@ -63,7 +65,7 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
val sender = TestProbe() val sender = TestProbe()
val receiver = TestProbe() val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref))
sender.send(circuitBreaker, "test message") sender.send(circuitBreaker, "test message")
@ -74,7 +76,7 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
val sender = TestProbe() val sender = TestProbe()
val receiver = TestProbe() val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref))
sender.send(circuitBreaker, "test message") sender.send(circuitBreaker, "test message")
@ -87,7 +89,7 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
"forward further messages before receiving the response of the first one" in { "forward further messages before receiving the response of the first one" in {
val sender = TestProbe() val sender = TestProbe()
val receiver = TestProbe() val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref))
sender.send(circuitBreaker, "test message1") sender.send(circuitBreaker, "test message1")
sender.send(circuitBreaker, "test message2") sender.send(circuitBreaker, "test message2")
@ -102,7 +104,7 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
val sender1 = TestProbe() val sender1 = TestProbe()
val sender2 = TestProbe() val sender2 = TestProbe()
val receiver = TestProbe() val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref))
sender1.send(circuitBreaker, "test message1") sender1.send(circuitBreaker, "test message1")
sender2.send(circuitBreaker, "test message2") sender2.send(circuitBreaker, "test message2")
@ -120,7 +122,7 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
"return failed responses too" in { "return failed responses too" in {
val sender = TestProbe() val sender = TestProbe()
val receiver = TestProbe() val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref))
sender.send(circuitBreaker, "request") sender.send(circuitBreaker, "request")
@ -133,11 +135,11 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
"enter open state after reaching the threshold of failed responses" in new CircuitBreakerScenario { "enter open state after reaching the threshold of failed responses" in new CircuitBreakerScenario {
val circuitBreaker = defaultCircuitBreaker val circuitBreaker = defaultCircuitBreaker
(1 to baseCircuitBreakerBuilder.maxFailures) foreach { index (1 to baseCircuitBreakerPropsBuilder.maxFailures) foreach { index
receiverRespondsWithFailureToRequest(s"request$index") receiverRespondsWithFailureToRequest(s"request$index")
} }
waitForCircuitBreakerToReceiveSelfNotificationMessage circuitBreakerToReceivesSelfNotificationMessage
sender.send(circuitBreaker, "request in open state") sender.send(circuitBreaker, "request in open state")
receiver.expectNoMsg receiver.expectNoMsg
@ -146,11 +148,11 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
"respond with a CircuitOpenFailure message when in open state " in new CircuitBreakerScenario { "respond with a CircuitOpenFailure message when in open state " in new CircuitBreakerScenario {
val circuitBreaker = defaultCircuitBreaker val circuitBreaker = defaultCircuitBreaker
(1 to baseCircuitBreakerBuilder.maxFailures) foreach { index (1 to baseCircuitBreakerPropsBuilder.maxFailures) foreach { index
receiverRespondsWithFailureToRequest(s"request$index") receiverRespondsWithFailureToRequest(s"request$index")
} }
waitForCircuitBreakerToReceiveSelfNotificationMessage circuitBreakerToReceivesSelfNotificationMessage
sender.send(circuitBreaker, "request in open state") sender.send(circuitBreaker, "request in open state")
sender.expectMsg(CircuitOpenFailure("request in open state")) sender.expectMsg(CircuitOpenFailure("request in open state"))
@ -158,15 +160,15 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
"respond with the converted CircuitOpenFailure if a converter is provided" in new CircuitBreakerScenario { "respond with the converted CircuitOpenFailure if a converter is provided" in new CircuitBreakerScenario {
val circuitBreaker = system.actorOf( val circuitBreaker = system.actorOf(
baseCircuitBreakerBuilder baseCircuitBreakerPropsBuilder
.copy(openCircuitFailureConverter = { failureMsg s"NOT SENT: ${failureMsg.failedMsg}" }) .copy(openCircuitFailureConverter = { failureMsg s"NOT SENT: ${failureMsg.failedMsg}" })
.propsForTarget(receiver.ref)) .props(receiver.ref))
(1 to baseCircuitBreakerBuilder.maxFailures) foreach { index (1 to baseCircuitBreakerPropsBuilder.maxFailures) foreach { index
receiverRespondsWithFailureToRequest(s"request$index") receiverRespondsWithFailureToRequest(s"request$index")
} }
waitForCircuitBreakerToReceiveSelfNotificationMessage circuitBreakerToReceivesSelfNotificationMessage
sender.send(circuitBreaker, "request in open state") sender.send(circuitBreaker, "request in open state")
sender.expectMsg("NOT SENT: request in open state") sender.expectMsg("NOT SENT: request in open state")
@ -175,12 +177,12 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
"enter open state after reaching the threshold of timed-out responses" in { "enter open state after reaching the threshold of timed-out responses" in {
val sender = TestProbe() val sender = TestProbe()
val receiver = TestProbe() val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref)) val circuitBreaker = system.actorOf(baseCircuitBreakerPropsBuilder.props(target = receiver.ref))
sender.send(circuitBreaker, "request1") sender.send(circuitBreaker, "request1")
sender.send(circuitBreaker, "request2") sender.send(circuitBreaker, "request2")
Thread.sleep(baseCircuitBreakerBuilder.callTimeout.duration.toMillis + 100) Thread.sleep(baseCircuitBreakerPropsBuilder.callTimeout.duration.toMillis + 100)
receiver.expectMsg("request1") receiver.expectMsg("request1")
receiver.reply("this should be timed out 1") receiver.reply("this should be timed out 1")
@ -203,14 +205,14 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
receiverRespondsWithFailureToRequest("request1") receiverRespondsWithFailureToRequest("request1")
receiverRespondsWithFailureToRequest("request2") receiverRespondsWithFailureToRequest("request2")
waitForCircuitBreakerToReceiveSelfNotificationMessage circuitBreakerToReceivesSelfNotificationMessage
Then("Messages are ignored") Then("Messages are ignored")
messageIsRejectedWithOpenCircuitNotification("IGNORED SINCE IN OPEN STATE1") messageIsRejectedWithOpenCircuitNotification("IGNORED SINCE IN OPEN STATE1")
messageIsRejectedWithOpenCircuitNotification("IGNORED SINCE IN OPEN STATE2") messageIsRejectedWithOpenCircuitNotification("IGNORED SINCE IN OPEN STATE2")
When("ENTERING HALF OPEN STATE") When("ENTERING HALF OPEN STATE")
waitForResetTimeoutToExpire resetTimeoutToExpires
Then("First message should be forwarded, following ones ignored if the failure persist") Then("First message should be forwarded, following ones ignored if the failure persist")
sender.send(circuitBreaker, "First message in half-open state, should be forwarded") sender.send(circuitBreaker, "First message in half-open state, should be forwarded")
@ -231,12 +233,12 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
receiverRespondsWithFailureToRequest("request1") receiverRespondsWithFailureToRequest("request1")
receiverRespondsWithFailureToRequest("request2") receiverRespondsWithFailureToRequest("request2")
waitForResetTimeoutToExpire resetTimeoutToExpires
And("Receiving a successful response") And("Receiving a successful response")
receiverRespondsToRequestWith("First message in half-open state, should be forwarded", "This should close the circuit") receiverRespondsToRequestWith("First message in half-open state, should be forwarded", "This should close the circuit")
waitForCircuitBreakerToReceiveSelfNotificationMessage circuitBreakerToReceivesSelfNotificationMessage
Then("circuit is re-closed") Then("circuit is re-closed")
sender.send(circuitBreaker, "request1") sender.send(circuitBreaker, "request1")
@ -255,12 +257,12 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
receiverRespondsWithFailureToRequest("request1") receiverRespondsWithFailureToRequest("request1")
receiverRespondsWithFailureToRequest("request2") receiverRespondsWithFailureToRequest("request2")
waitForResetTimeoutToExpire resetTimeoutToExpires
And("Receiving a failure response") And("Receiving a failure response")
receiverRespondsWithFailureToRequest("First message in half-open state, should be forwarded") receiverRespondsWithFailureToRequest("First message in half-open state, should be forwarded")
waitForCircuitBreakerToReceiveSelfNotificationMessage circuitBreakerToReceivesSelfNotificationMessage
Then("circuit is opened again") Then("circuit is opened again")
sender.send(circuitBreaker, "this should be ignored") sender.send(circuitBreaker, "this should be ignored")
@ -272,21 +274,21 @@ class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
"Notify an event status change listener when changing state" in new CircuitBreakerScenario { "Notify an event status change listener when changing state" in new CircuitBreakerScenario {
Given("A circuit breaker actor pointing to a test probe") Given("A circuit breaker actor pointing to a test probe")
override val circuitBreaker = system.actorOf( override val circuitBreaker = system.actorOf(
baseCircuitBreakerBuilder baseCircuitBreakerPropsBuilder
.copy(circuitEventListener = Some(eventListener.ref)) .copy(circuitEventListener = Some(eventListener.ref))
.propsForTarget(receiver.ref)) .props(target = receiver.ref))
When("Entering OPEN state") When("Entering OPEN state")
receiverRespondsWithFailureToRequest("request1") receiverRespondsWithFailureToRequest("request1")
receiverRespondsWithFailureToRequest("request2") receiverRespondsWithFailureToRequest("request2")
waitForCircuitBreakerToReceiveSelfNotificationMessage circuitBreakerToReceivesSelfNotificationMessage
Then("An event is sent") Then("An event is sent")
eventListener.expectMsg(CircuitOpen(circuitBreaker)) eventListener.expectMsg(CircuitOpen(circuitBreaker))
When("Entering HALF OPEN state") When("Entering HALF OPEN state")
waitForResetTimeoutToExpire resetTimeoutToExpires
Then("An event is sent") Then("An event is sent")
eventListener.expectMsg(CircuitHalfOpen(circuitBreaker)) eventListener.expectMsg(CircuitHalfOpen(circuitBreaker))

View file

@ -1,7 +1,7 @@
package akka.contrib.circuitbreaker.sample package akka.contrib.circuitbreaker.sample
import akka.actor.{ Actor, ActorLogging, ActorRef } import akka.actor.{ Actor, ActorLogging, ActorRef }
import akka.contrib.circuitbreaker.CircuitBreakerActor.{ CircuitBreakerActorBuilder, CircuitOpenFailure } import akka.contrib.circuitbreaker.CircuitBreakerActor.{ CircuitBreakerActorPropsBuilder, CircuitOpenFailure }
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -11,7 +11,7 @@ class CircuitBreaker(potentiallyFailingService: ActorRef) extends Actor with Act
val serviceCircuitBreaker = val serviceCircuitBreaker =
context.actorOf( context.actorOf(
CircuitBreakerActorBuilder(maxFailures = 3, callTimeout = 2.seconds, resetTimeout = 30.seconds) CircuitBreakerActorPropsBuilder(maxFailures = 3, callTimeout = 2.seconds, resetTimeout = 30.seconds)
.copy( .copy(
failureDetector = { failureDetector = {
_ match { _ match {
@ -19,7 +19,7 @@ class CircuitBreaker(potentiallyFailingService: ActorRef) extends Actor with Act
case _ false case _ false
} }
}) })
.propsForTarget(potentiallyFailingService), .props(potentiallyFailingService),
"serviceCircuitBreaker") "serviceCircuitBreaker")
override def receive: Receive = { override def receive: Receive = {

View file

@ -1,7 +1,7 @@
package akka.contrib.circuitbreaker.sample package akka.contrib.circuitbreaker.sample
import akka.actor.{ Actor, ActorLogging, ActorRef } import akka.actor.{ Actor, ActorLogging, ActorRef }
import akka.contrib.circuitbreaker.CircuitBreakerActor.CircuitBreakerActorBuilder import akka.contrib.circuitbreaker.CircuitBreakerActor.CircuitBreakerActorPropsBuilder
import akka.util.Timeout import akka.util.Timeout
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -16,7 +16,7 @@ class CircuitBreakerAsk(potentiallyFailingService: ActorRef) extends Actor with
val serviceCircuitBreaker = val serviceCircuitBreaker =
context.actorOf( context.actorOf(
CircuitBreakerActorBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds) CircuitBreakerActorPropsBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds)
.copy( .copy(
failureDetector = { failureDetector = {
_ match { _ match {
@ -28,7 +28,7 @@ class CircuitBreakerAsk(potentiallyFailingService: ActorRef) extends Actor with
openCircuitFailureConverter = { failure openCircuitFailureConverter = { failure
Left(s"Circuit open when processing ${failure.failedMsg}") Left(s"Circuit open when processing ${failure.failedMsg}")
}) })
.propsForTarget(potentiallyFailingService), .props(potentiallyFailingService),
"serviceCircuitBreaker") "serviceCircuitBreaker")
import context.dispatcher import context.dispatcher

View file

@ -16,7 +16,7 @@ class CircuitBreakerAskWithFailure(potentiallyFailingService: ActorRef) extends
val serviceCircuitBreaker = val serviceCircuitBreaker =
context.actorOf( context.actorOf(
CircuitBreakerActorBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds).propsForTarget(potentiallyFailingService), CircuitBreakerActorPropsBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds).props(potentiallyFailingService),
"serviceCircuitBreaker") "serviceCircuitBreaker")
import context.dispatcher import context.dispatcher