Adding to the contrib module an Proxy Actor implementing the circuit breaker pattern

This commit is contained in:
galarragas 2015-12-19 14:39:49 +00:00
parent 8dbade7131
commit 1f4184e64b
3 changed files with 752 additions and 0 deletions

View file

@ -0,0 +1,184 @@
Circuit-Breaker Actor
======================
This is an alternative implementation of the [AKKA Circuit Breaker Pattern](http://doc.akka.io/docs/akka/snapshot/common/circuitbreaker.html).
The main difference is that is intended to be used only for request-reply interactions with actor using the Circuit-Breaker as a proxy of the target one
in order to provide the same failfast functionnalities and a protocol similar to the AKKA Pattern implementation
### Usage
Let's assume we have an actor wrapping a back-end service and able to respond to `Request` calls with a `Response` object
containing an `Either[String, String]` to map successful and failed responses. The service is also potentially slowing down
because of the workload.
A simple implementation can be given by this class::
object SimpleService {
case class Request(content: String)
case class Response(content: Either[String, String])
case object ResetCount
}
/**
* This is a simple actor simulating a service
* - Becoming slower with the increase of frequency of input requests
* - Failing around 30% of the requests
*/
class SimpleService extends Actor with ActorLogging {
import SimpleService._
var messageCount = 0
import context.dispatcher
context.system.scheduler.schedule(1.second, 1.second, self, ResetCount)
override def receive = {
case ResetCount =>
messageCount = 0
case Request(content) =>
messageCount += 1
// simulate workload
Thread.sleep( 100 * messageCount )
// Fails around 30% of the times
if(Random.nextInt(100) < 70 ) {
sender ! Response(Right(s"Successfully processed $content"))
} else {
sender ! Response(Left(s"Failure processing $content"))
}
}
}
If we want to interface with this service using the Circuit Breaker we can use two approaches:
Using a non-conversational approach: ::
class CircuitBreakerExample(potentiallyFailingService: ActorRef) extends Actor with ActorLogging {
import SimpleService._
val serviceCircuitBreaker =
context.actorOf(
CircuitBreakerActorBuilder( maxFailures = 3, callTimeout = 2.seconds, resetTimeout = 30.seconds )
.copy(
failureDetector = {
_ match {
case Response(Left(_)) => true
case _ => false
}
}
)
.propsForTarget(potentiallyFailingService),
"serviceCircuitBreaker"
)
override def receive: Receive = {
case AskFor(requestToForward) =>
serviceCircuitBreaker ! Request(requestToForward)
case Right(Response(content)) =>
//handle response
log.info("Got successful response {}", content)
case Response(Right(content)) =>
//handle response
log.info("Got successful response {}", content)
case Response(Left(content)) =>
//handle response
log.info("Got failed response {}", content)
case CircuitOpenFailure(failedMsg) =>
log.warning("Unable to send message {}", failedMsg)
}
}
Using the ASK pattern, in this case it is useful to be able to map circuit open failures to the same type of failures
returned by the service (a `Left[String]` in our case): ::
class CircuitBreakerAskExample(potentiallyFailingService: ActorRef) extends Actor with ActorLogging {
import SimpleService._
import akka.pattern._
implicit val askTimeout: Timeout = 2.seconds
val serviceCircuitBreaker =
context.actorOf(
CircuitBreakerActorBuilder( maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds )
.copy(
failureDetector = {
_ match {
case Response(Left(_)) => true
case _ => false
}
}
)
.copy(
openCircuitFailureConverter = { failure =>
Left(s"Circuit open when processing ${failure.failedMsg}")
}
)
.propsForTarget(potentiallyFailingService),
"serviceCircuitBreaker"
)
import context.dispatcher
override def receive: Receive = {
case AskFor(requestToForward) =>
(serviceCircuitBreaker ? Request(requestToForward)).mapTo[Either[String, String]].onComplete {
case Success(Right(successResponse)) =>
//handle response
log.info("Got successful response {}", successResponse)
case Success(Left(failureResponse)) =>
//handle response
log.info("Got successful response {}", failureResponse)
case Failure(exception) =>
//handle response
log.info("Got successful response {}", exception)
}
}
}
If it is not possible to define define a specific error response, you can map the Open Circuit notification into a failure.
That also means that your `CircuitBreakerActor` will be essentially useful to protect you from time out for extra workload or
temporary failures in the target actor ::
class CircuitBreakerAskWithFailureExample(potentiallyFailingService: ActorRef) extends Actor with ActorLogging {
import SimpleService._
import akka.pattern._
import CircuitBreakerActor._
implicit val askTimeout: Timeout = 2.seconds
val serviceCircuitBreaker =
context.actorOf(
CircuitBreakerActorBuilder( maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds ).propsForTarget(potentiallyFailingService),
"serviceCircuitBreaker"
)
import context.dispatcher
override def receive: Receive = {
case AskFor(requestToForward) =>
(serviceCircuitBreaker ? Request(requestToForward)).failForOpenCircuit.mapTo[String].onComplete {
case Success(successResponse) =>
//handle response
log.info("Got successful response {}", successResponse)
case Failure(exception) =>
//handle response
log.info("Got successful response {}", exception)
}
}
}

View file

@ -0,0 +1,266 @@
/**
* Copyright (C) 2014-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.circuitbreaker
import akka.actor._
import akka.pattern._
import akka.util.Timeout
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success }
object CircuitBreakerActor {
/**
* Creates an circuit breaker actor proxying a target actor intended for request-reply interactions.
* It is possible to send messages through this proxy without expecting a response wrapping them into a
* [[akka.contrib.circuitbreaker.CircuitBreakerActor.TellOnly]]
*
* The circuit breaker implements the same state machine documented in [[akka.pattern.CircuitBreaker]]
*
* @param target the actor to proxy
* @param maxFailures maximum number of failures before opening the circuit
* @param callTimeout timeout before considering the ongoing call a failure
* @param resetTimeout time after which the channel will be closed after entering the open state
* @param circuitEventListener an actor that will receive a series of messages of type
* [[akka.contrib.circuitbreaker.CircuitBreakerActor.CircuitBreakerEvent]]
* @param failureDetector function to detect if the a message received from the target actor as
* response from a request represent a failure
* @param failureMap function to map a failure into a response message. The failing response message is wrapped
* into a [[akka.contrib.circuitbreaker.CircuitBreakerActor.CircuitOpenFailure]] object
* @return
*/
def props(target: ActorRef,
maxFailures: Int,
callTimeout: Timeout,
resetTimeout: Timeout,
circuitEventListener: Option[ActorRef],
failureDetector: Any Boolean,
failureMap: CircuitOpenFailure Any) =
Props(new CircuitBreakerActor(target, maxFailures, callTimeout, resetTimeout, circuitEventListener, failureDetector, failureMap))
sealed trait CircuitBreakerCommand
case class TellOnly(msg: Any) extends CircuitBreakerCommand
sealed trait CircuitBreakerResponse
case class CircuitOpenFailure(failedMsg: Any)
sealed trait CircuitBreakerEvent
case class CircuitOpen(circuit: ActorRef) extends CircuitBreakerCommand
case class CircuitClosed(circuit: ActorRef) extends CircuitBreakerCommand
case class CircuitHalfOpen(circuit: ActorRef) extends CircuitBreakerCommand
sealed trait CircuitBreakerState
case object Open extends CircuitBreakerState
case object Closed extends CircuitBreakerState
case object HalfOpen extends CircuitBreakerState
case class CircuitBreakerStateData(failureCount: Int = 0, firstHalfOpenMessageSent: Boolean = false)
/**
* Convenience builder with appropriate defaults to create the CircuitBreakerActor props
*
* @param maxFailures maximum number of failures before opening the circuit
* @param callTimeout timeout before considering the ongoing call a failure
* @param resetTimeout time after which the channel will be closed after entering the open state
* @param circuitEventListener an actor that will receive a series of messages of type
* [[akka.contrib.circuitbreaker.CircuitBreakerActor.CircuitBreakerEvent]]
* defaults to None
* @param failureDetector function to detect if the a message received from the target actor as
* response from a request represent a failure, defaults to a function accepting every
* response making this circuit breaker be activated by response timeouts only
*
* @param openCircuitFailureConverter function to map a failure into a response message.
* Defaults to an identify function
*/
case class CircuitBreakerActorBuilder(
maxFailures: Int, callTimeout: Timeout, resetTimeout: Timeout,
circuitEventListener: Option[ActorRef] = None,
failureDetector: Any Boolean = { _ false },
openCircuitFailureConverter: CircuitOpenFailure Any = identity) {
/**
* Creates the props for a [[akka.contrib.circuitbreaker.CircuitBreakerActor]] proxying the given target
*
* @param target the target actor ref
* @return
*/
def propsForTarget(target: ActorRef) = CircuitBreakerActor.props(target, maxFailures, callTimeout, resetTimeout, circuitEventListener, failureDetector, openCircuitFailureConverter)
}
class OpenCircuitException extends Exception("Circuit Open so unable to complete operation")
/**
* Extends [[scala.concurrent.Future]] with the method failForOpenCircuitWith to handle
* [[akka.contrib.circuitbreaker.CircuitBreakerActor.CircuitOpenFailure]] failure responses throwing
* an exception built with the given exception builder
*
* @param future
*/
implicit class CircuitBreakerAwareFuture(val future: Future[Any]) extends AnyVal {
def failForOpenCircuit(implicit executionContext: ExecutionContext): Future[Any] = failForOpenCircuitWith(new OpenCircuitException)
def failForOpenCircuitWith(throwing: Throwable)(implicit executionContext: ExecutionContext): Future[Any] = {
future.flatMap {
_ match {
case CircuitOpenFailure(_) Future.failed(throwing)
case result Future.successful(result)
}
}
}
}
}
object CircuitBreakerInternalEvents {
sealed trait CircuitBreakerInternalEvent
case object CallFailed extends CircuitBreakerInternalEvent
case object CallSucceeded extends CircuitBreakerInternalEvent
}
import CircuitBreakerActor._
import CircuitBreakerInternalEvents._
class CircuitBreakerActor(
target: ActorRef,
maxFailures: Int,
callTimeout: Timeout,
resetTimeout: Timeout,
circuitEventListener: Option[ActorRef],
failureDetector: Any Boolean,
failureMap: CircuitOpenFailure Any) extends Actor with ActorLogging with FSM[CircuitBreakerState, CircuitBreakerStateData] {
startWith(Closed, CircuitBreakerStateData(failureCount = 0))
def callSucceededHandling: StateFunction = {
case Event(CallSucceeded, state)
log.debug("Received call succeeded notification in state {} resetting counter", state)
goto(Closed) using CircuitBreakerStateData(failureCount = 0, firstHalfOpenMessageSent = false)
}
when(Closed) {
callSucceededHandling orElse {
case Event(TellOnly(message), _)
log.debug("CLOSED: Sending message {} without expecting any response", message)
target ! message
stay
case Event(CallFailed, state)
log.debug("Received call failed notification in state {} incrementing counter", state)
val newState = state.copy(failureCount = state.failureCount + 1)
if (newState.failureCount < maxFailures) {
stay using newState
} else {
goto(Open) using newState
}
case Event(message, state)
log.debug("CLOSED: Sending message {} expecting a response withing timeout {}", message, callTimeout)
val currentSender = sender()
forwardRequest(message, sender, state)
stay
}
}
when(Open, stateTimeout = resetTimeout.duration) {
callSucceededHandling orElse {
case Event(StateTimeout, state)
log.debug("Timeout expired for state OPEN, going to half open")
goto(HalfOpen) using state.copy(firstHalfOpenMessageSent = false)
case Event(CallFailed, state)
log.debug("OPEN: Call received a further call failed notification, probably from a previous timed out event, ignoring")
stay
case Event(openNotification @ CircuitOpenFailure(_), _)
log.error("Why did I send message {} to myself?", openNotification)
stay
case Event(message, state)
val failureNotification = failureMap(CircuitOpenFailure(message))
log.debug("OPEN: Failing request for message {}, sending failure notification {} to sender {}", message, failureNotification, sender)
sender ! failureNotification
stay
}
}
when(HalfOpen) {
callSucceededHandling orElse {
case Event(TellOnly(message), _)
log.debug("HALF-OPEN: Dropping TellOnly request for message {}", message)
stay
case Event(CallFailed, CircuitBreakerStateData(_, true))
log.debug("HALF-OPEN: First forwarded call failed returning to OPEN state")
goto(Open)
case Event(CallFailed, CircuitBreakerStateData(_, false))
log.debug("HALF-OPEN: Call received a further call failed notification, probably from a previous timed out event, ignoring")
stay
case Event(message, state @ CircuitBreakerStateData(_, false))
log.debug("HALF-OPEN: First message {} received, forwarding it to target {}", message, target)
forwardRequest(message, sender, state)
stay using state.copy(firstHalfOpenMessageSent = true)
case Event(message, CircuitBreakerStateData(_, true))
val failureNotification = failureMap(CircuitOpenFailure(message))
log.debug("HALF-OPEN: Failing request for message {}, sending failure notification {} to sender {}", message, failureNotification, sender)
sender ! failureNotification
stay
}
}
def forwardRequest(message: Any, currentSender: ActorRef, state: CircuitBreakerStateData) = {
import context.dispatcher
target.ask(message)(callTimeout).onComplete {
case Success(response)
log.debug("Request '{}' has been replied to with response {}, forwarding to original sender {}", message, currentSender)
currentSender ! response
val isFailure = failureDetector(response)
if (isFailure) {
log.debug("Response '{}' is considered as failure sending self-message to ask incrementing failure count (origin state was {})",
response, state)
self ! CallFailed
} else {
log.debug("Request '{}' succeeded with response {}, returning response to sender {} and sending message to ask to reset failure count (origin state was {})",
message, response, currentSender, state)
self ! CallSucceeded
}
case Failure(reason)
log.debug("Request '{}' to target {} failed with exception {}, sending self-message to ask incrementing failure count (origin state was {})",
message, target, reason, state)
self ! CallFailed
}
}
onTransition {
case from -> Closed
log.debug("Moving from state {} to state CLOSED", from)
circuitEventListener foreach { _ ! CircuitClosed(self) }
case from -> HalfOpen
log.debug("Moving from state {} to state HALF OPEN", from)
circuitEventListener foreach { _ ! CircuitHalfOpen(self) }
case from -> Open
log.debug("Moving from state {} to state OPEN", from)
circuitEventListener foreach { _ ! CircuitOpen(self) }
}
}

View file

@ -0,0 +1,302 @@
/**
* Copyright (C) 2014-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.circuitbreaker
import akka.actor.ActorRef
import akka.contrib.circuitbreaker.CircuitBreakerActor._
import akka.testkit.{ AkkaSpec, TestProbe }
import org.scalatest.GivenWhenThen
import scala.concurrent.duration._
import scala.language.postfixOps
class CircuitBreakerActorSpec extends AkkaSpec() with GivenWhenThen {
val baseCircuitBreakerBuilder =
CircuitBreakerActorBuilder(
maxFailures = 2,
callTimeout = 200 millis,
resetTimeout = 1 second,
failureDetector = {
_ == "FAILURE"
})
trait CircuitBreakerScenario {
val sender = TestProbe()
val eventListener = TestProbe()
val receiver = TestProbe()
def circuitBreaker: ActorRef
def defaultCircuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref))
def receiverRespondsWithFailureToRequest(request: Any) = {
sender.send(circuitBreaker, request)
receiver.expectMsg(request)
receiver.reply("FAILURE")
sender.expectMsg("FAILURE")
}
def receiverRespondsToRequestWith(request: Any, reply: Any) = {
sender.send(circuitBreaker, request)
receiver.expectMsg(request)
receiver.reply(reply)
sender.expectMsg(reply)
}
def waitForCircuitBreakerToReceiveSelfNotificationMessage = Thread.sleep(baseCircuitBreakerBuilder.resetTimeout.duration.toMillis / 4)
def waitForResetTimeoutToExpire = Thread.sleep(baseCircuitBreakerBuilder.resetTimeout.duration.toMillis + 100)
def messageIsRejectedWithOpenCircuitNotification(message: Any) = {
sender.send(circuitBreaker, message)
sender.expectMsg(CircuitOpenFailure(message))
}
}
"CircuitBreakerActor" should {
"act as a transparent proxy in case of successful requests-replies - forward to target" in {
val sender = TestProbe()
val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref))
sender.send(circuitBreaker, "test message")
receiver.expectMsg("test message")
}
"act as a transparent proxy in case of successful requests-replies - full cycle" in {
val sender = TestProbe()
val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref))
sender.send(circuitBreaker, "test message")
receiver.expectMsg("test message")
receiver.reply("response")
sender.expectMsg("response")
}
"forward further messages before receiving the response of the first one" in {
val sender = TestProbe()
val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref))
sender.send(circuitBreaker, "test message1")
sender.send(circuitBreaker, "test message2")
sender.send(circuitBreaker, "test message3")
receiver.expectMsg("test message1")
receiver.expectMsg("test message2")
receiver.expectMsg("test message3")
}
"send responses to the right sender" in {
val sender1 = TestProbe()
val sender2 = TestProbe()
val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref))
sender1.send(circuitBreaker, "test message1")
sender2.send(circuitBreaker, "test message2")
receiver.expectMsg("test message1")
receiver.reply("response1")
receiver.expectMsg("test message2")
receiver.reply("response2")
sender1.expectMsg("response1")
sender2.expectMsg("response2")
}
"return failed responses too" in {
val sender = TestProbe()
val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref))
sender.send(circuitBreaker, "request")
receiver.expectMsg("request")
receiver.reply("FAILURE")
sender.expectMsg("FAILURE")
}
"enter open state after reaching the threshold of failed responses" in new CircuitBreakerScenario {
val circuitBreaker = defaultCircuitBreaker
(1 to baseCircuitBreakerBuilder.maxFailures) foreach { index
receiverRespondsWithFailureToRequest(s"request$index")
}
waitForCircuitBreakerToReceiveSelfNotificationMessage
sender.send(circuitBreaker, "request in open state")
receiver.expectNoMsg
}
"respond with a CircuitOpenFailure message when in open state " in new CircuitBreakerScenario {
val circuitBreaker = defaultCircuitBreaker
(1 to baseCircuitBreakerBuilder.maxFailures) foreach { index
receiverRespondsWithFailureToRequest(s"request$index")
}
waitForCircuitBreakerToReceiveSelfNotificationMessage
sender.send(circuitBreaker, "request in open state")
sender.expectMsg(CircuitOpenFailure("request in open state"))
}
"respond with the converted CircuitOpenFailure if a converter is provided" in new CircuitBreakerScenario {
val circuitBreaker = system.actorOf(
baseCircuitBreakerBuilder
.copy(openCircuitFailureConverter = { failureMsg s"NOT SENT: ${failureMsg.failedMsg}" })
.propsForTarget(receiver.ref))
(1 to baseCircuitBreakerBuilder.maxFailures) foreach { index
receiverRespondsWithFailureToRequest(s"request$index")
}
waitForCircuitBreakerToReceiveSelfNotificationMessage
sender.send(circuitBreaker, "request in open state")
sender.expectMsg("NOT SENT: request in open state")
}
"enter open state after reaching the threshold of timed-out responses" in {
val sender = TestProbe()
val receiver = TestProbe()
val circuitBreaker = system.actorOf(baseCircuitBreakerBuilder.propsForTarget(receiver.ref))
sender.send(circuitBreaker, "request1")
sender.send(circuitBreaker, "request2")
Thread.sleep(baseCircuitBreakerBuilder.callTimeout.duration.toMillis + 100)
receiver.expectMsg("request1")
receiver.reply("this should be timed out 1")
receiver.expectMsg("request2")
receiver.reply("this should be timed out 2")
// Have to wait a bit to let the circuit breaker receive the self notification message
Thread.sleep(300)
sender.send(circuitBreaker, "request in open state")
receiver.expectNoMsg
}
"enter HALF OPEN state after the given state timeout, sending the first message only" in new CircuitBreakerScenario {
Given("A circuit breaker actor pointing to a test probe")
val circuitBreaker = defaultCircuitBreaker
When("ENTERING OPEN STATE")
receiverRespondsWithFailureToRequest("request1")
receiverRespondsWithFailureToRequest("request2")
waitForCircuitBreakerToReceiveSelfNotificationMessage
Then("Messages are ignored")
messageIsRejectedWithOpenCircuitNotification("IGNORED SINCE IN OPEN STATE1")
messageIsRejectedWithOpenCircuitNotification("IGNORED SINCE IN OPEN STATE2")
When("ENTERING HALF OPEN STATE")
waitForResetTimeoutToExpire
Then("First message should be forwarded, following ones ignored if the failure persist")
sender.send(circuitBreaker, "First message in half-open state, should be forwarded")
sender.send(circuitBreaker, "Second message in half-open state, should be ignored")
receiver.expectMsg("First message in half-open state, should be forwarded")
receiver.expectNoMsg()
sender.expectMsg(CircuitOpenFailure("Second message in half-open state, should be ignored"))
}
"return to CLOSED state from HALF-OPEN if a successful message response notification is received" in new CircuitBreakerScenario {
Given("A circuit breaker actor pointing to a test probe")
val circuitBreaker = defaultCircuitBreaker
When("Entering HALF OPEN state")
receiverRespondsWithFailureToRequest("request1")
receiverRespondsWithFailureToRequest("request2")
waitForResetTimeoutToExpire
And("Receiving a successful response")
receiverRespondsToRequestWith("First message in half-open state, should be forwarded", "This should close the circuit")
waitForCircuitBreakerToReceiveSelfNotificationMessage
Then("circuit is re-closed")
sender.send(circuitBreaker, "request1")
receiver.expectMsg("request1")
sender.send(circuitBreaker, "request2")
receiver.expectMsg("request2")
}
"return to OPEN state from HALF-OPEN if a FAILURE message response is received" in new CircuitBreakerScenario {
Given("A circuit breaker actor pointing to a test probe")
val circuitBreaker = defaultCircuitBreaker
When("Entering HALF OPEN state")
receiverRespondsWithFailureToRequest("request1")
receiverRespondsWithFailureToRequest("request2")
waitForResetTimeoutToExpire
And("Receiving a failure response")
receiverRespondsWithFailureToRequest("First message in half-open state, should be forwarded")
waitForCircuitBreakerToReceiveSelfNotificationMessage
Then("circuit is opened again")
sender.send(circuitBreaker, "this should be ignored")
receiver.expectNoMsg()
sender.expectMsg(CircuitOpenFailure("this should be ignored"))
}
"Notify an event status change listener when changing state" in new CircuitBreakerScenario {
Given("A circuit breaker actor pointing to a test probe")
override val circuitBreaker = system.actorOf(
baseCircuitBreakerBuilder
.copy(circuitEventListener = Some(eventListener.ref))
.propsForTarget(receiver.ref))
When("Entering OPEN state")
receiverRespondsWithFailureToRequest("request1")
receiverRespondsWithFailureToRequest("request2")
waitForCircuitBreakerToReceiveSelfNotificationMessage
Then("An event is sent")
eventListener.expectMsg(CircuitOpen(circuitBreaker))
When("Entering HALF OPEN state")
waitForResetTimeoutToExpire
Then("An event is sent")
eventListener.expectMsg(CircuitHalfOpen(circuitBreaker))
When("Entering CLOSED state")
receiverRespondsToRequestWith("First message in half-open state, should be forwarded", "This should close the circuit")
Then("An event is sent")
eventListener.expectMsg(CircuitClosed(circuitBreaker))
}
}
}