Merge pull request #257 from jboner/wip-1574-fault-tolerance-sample-patriknw
DOC: Added new fault handling sample. See #1574
This commit is contained in:
commit
11b489eefd
2 changed files with 312 additions and 2 deletions
|
|
@ -0,0 +1,274 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.docs.actor
|
||||
|
||||
//#all
|
||||
import akka.actor._
|
||||
import akka.actor.SupervisorStrategy._
|
||||
import akka.util.duration._
|
||||
import akka.util.Duration
|
||||
import akka.util.Timeout
|
||||
import akka.event.LoggingReceive
|
||||
import akka.pattern.ask
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
/**
|
||||
* Runs the sample
|
||||
*/
|
||||
object FaultHandlingDocSample extends App {
|
||||
import Worker._
|
||||
import CounterService._
|
||||
|
||||
val config = ConfigFactory.parseString("""
|
||||
akka.loglevel = DEBUG
|
||||
akka.actor.debug {
|
||||
receive = on
|
||||
lifecycle = on
|
||||
}
|
||||
""")
|
||||
|
||||
val system = ActorSystem("FaultToleranceSample", config)
|
||||
val worker = system.actorOf(Props[Worker], name = "worker")
|
||||
|
||||
// Create an Actor that start the work and listens to progress
|
||||
system.actorOf(Props(new Actor with ActorLogging {
|
||||
// If we don't get any progress within 15 seconds then the service is unavailable
|
||||
context.setReceiveTimeout(15 seconds)
|
||||
worker ! Start
|
||||
|
||||
def receive = {
|
||||
case CurrentCount(key, count) ⇒
|
||||
log.info("Current count for [{}] is [{}]", key, count)
|
||||
if (count > 50) {
|
||||
log.info("That's enough, shutting down")
|
||||
system.shutdown()
|
||||
}
|
||||
|
||||
case ReceiveTimeout ⇒
|
||||
// No progress within 15 seconds, ServiceUnavailable
|
||||
log.error("Shutting down due to unavailable service")
|
||||
system.shutdown()
|
||||
}
|
||||
}))
|
||||
|
||||
}
|
||||
|
||||
object Worker {
|
||||
// Messages
|
||||
case object Start
|
||||
case object Do
|
||||
}
|
||||
|
||||
/**
|
||||
* Worker performs some work when it receives the `Start` message.
|
||||
* It will continuously notify the sender of the `Start` message
|
||||
* of current progress. The `Worker` supervise the `CounterService`.
|
||||
*/
|
||||
class Worker extends Actor with ActorLogging {
|
||||
import Worker._
|
||||
import CounterService._
|
||||
implicit def system = context.system
|
||||
implicit val askTimeout = Timeout(5 seconds)
|
||||
|
||||
// Stop the CounterService child if it throws ServiceUnavailable
|
||||
override val supervisorStrategy = OneForOneStrategy() {
|
||||
case _: CounterService.ServiceUnavailable ⇒ Stop
|
||||
}
|
||||
|
||||
// The sender of the initial Start message will continuously be notified about progress
|
||||
var progressListener: Option[ActorRef] = None
|
||||
val counterService = context.actorOf(Props[CounterService], name = "counter")
|
||||
|
||||
def receive = LoggingReceive(this) {
|
||||
case Start if progressListener.isEmpty ⇒
|
||||
progressListener = Some(sender)
|
||||
context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do)
|
||||
|
||||
case Do ⇒
|
||||
counterService ! Increment(1)
|
||||
counterService ! Increment(1)
|
||||
counterService ! Increment(1)
|
||||
|
||||
// Send current count to the initial sender
|
||||
counterService ? GetCurrentCount pipeTo progressListener.get
|
||||
}
|
||||
}
|
||||
|
||||
object CounterService {
|
||||
// Messages
|
||||
case class Increment(n: Int)
|
||||
case object GetCurrentCount
|
||||
case class CurrentCount(key: String, count: Long)
|
||||
case object Reconnect
|
||||
class ServiceUnavailable(msg: String) extends RuntimeException(msg)
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the value received in `Increment` message to a persistent
|
||||
* counter. Replies with `CurrentCount` when it is asked for `CurrentCount`.
|
||||
* `CounterService` supervise `Storage` and `Counter`.
|
||||
*/
|
||||
class CounterService extends Actor {
|
||||
import CounterService._
|
||||
import Counter._
|
||||
import Storage._
|
||||
implicit def system = context.system
|
||||
|
||||
// Restart the storage child when StorageException is thrown.
|
||||
// After 3 restarts within 5 seconds it will be stopped.
|
||||
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds) {
|
||||
case _: Storage.StorageException ⇒ Restart
|
||||
}
|
||||
|
||||
val key = context.self.path.name
|
||||
var storage: Option[ActorRef] = None
|
||||
var counter: Option[ActorRef] = None
|
||||
var backlog = IndexedSeq.empty[Any]
|
||||
val MaxBacklog = 10000
|
||||
|
||||
override def preStart() {
|
||||
initStorage()
|
||||
}
|
||||
|
||||
/**
|
||||
* The child storage is restarted in case of failure, but after 3 restarts,
|
||||
* and still failing it will be stopped. Better to back-off than continuously
|
||||
* failing. When it has been stopped we will schedule a Reconnect after a delay.
|
||||
* Watch the child so we receive Terminated message when it has been terminated.
|
||||
*/
|
||||
def initStorage() {
|
||||
storage = Some(context.watch(context.actorOf(Props[Storage], name = "storage")))
|
||||
// Tell the counter, if any, to use the new storage
|
||||
counter foreach { _ ! UseStorage(storage) }
|
||||
// We need the initial value to be able to operate
|
||||
storage.get ! Get(key)
|
||||
}
|
||||
|
||||
def receive = LoggingReceive(this) {
|
||||
|
||||
case Entry(k, v) if k == key && counter == None ⇒
|
||||
// Reply from Storage of the initial value, now we can create the Counter
|
||||
val c = context.actorOf(Props(new Counter(key, v)))
|
||||
counter = Some(c)
|
||||
// Tell the counter to use current storage
|
||||
c ! UseStorage(storage)
|
||||
// and send the buffered backlog to the counter
|
||||
backlog foreach { c ! _ }
|
||||
backlog = IndexedSeq.empty
|
||||
|
||||
case msg @ Increment(n) ⇒ forwardOrPlaceInBacklow(msg)
|
||||
|
||||
case msg @ GetCurrentCount ⇒ forwardOrPlaceInBacklow(msg)
|
||||
|
||||
case Terminated(actorRef) if Some(actorRef) == storage ⇒
|
||||
// After 3 restarts the storage child is stopped.
|
||||
// We receive Terminated because we watch the child, see initStorage.
|
||||
storage = None
|
||||
// Tell the counter that there is no storage for the moment
|
||||
counter foreach { _ ! UseStorage(None) }
|
||||
// Try to re-establish storage after while
|
||||
context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect)
|
||||
|
||||
case Reconnect ⇒
|
||||
// Re-establish storage after the scheduled delay
|
||||
initStorage()
|
||||
}
|
||||
|
||||
def forwardOrPlaceInBacklow(msg: Any) {
|
||||
// We need the initial value from storage before we can start delegate to the counter.
|
||||
// Before that we place the messages in a backlog, to be sent to the counter when
|
||||
// it is initialized.
|
||||
counter match {
|
||||
case Some(c) ⇒ c forward msg
|
||||
case None ⇒
|
||||
if (backlog.size >= MaxBacklog)
|
||||
throw new ServiceUnavailable("CounterService not available, lack of initial value")
|
||||
backlog = backlog :+ msg
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object Counter {
|
||||
// Messages
|
||||
case class UseStorage(storage: Option[ActorRef])
|
||||
}
|
||||
|
||||
/**
|
||||
* The in memory count variable that will send current
|
||||
* value to the `Storage`, if there is any storage
|
||||
* available at the moment.
|
||||
*/
|
||||
class Counter(key: String, initialValue: Long) extends Actor {
|
||||
import Counter._
|
||||
import CounterService._
|
||||
import Storage._
|
||||
implicit def system = context.system
|
||||
|
||||
var count = initialValue
|
||||
var storage: Option[ActorRef] = None
|
||||
|
||||
def receive = LoggingReceive(this) {
|
||||
case UseStorage(s) ⇒
|
||||
storage = s
|
||||
storeCount()
|
||||
|
||||
case Increment(n) ⇒
|
||||
count += n
|
||||
storeCount()
|
||||
|
||||
case GetCurrentCount ⇒
|
||||
sender ! CurrentCount(key, count)
|
||||
|
||||
}
|
||||
|
||||
def storeCount() {
|
||||
// Delegate dangerous work, to protect our valuable state.
|
||||
// We can continue without storage.
|
||||
storage foreach { _ ! Store(Entry(key, count)) }
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object Storage {
|
||||
// Messages
|
||||
case class Store(entry: Entry)
|
||||
case class Get(key: String)
|
||||
case class Entry(key: String, value: Long)
|
||||
class StorageException(msg: String) extends RuntimeException(msg)
|
||||
}
|
||||
|
||||
/**
|
||||
* Saves key/value pairs to persistent storage when receiving `Store` message.
|
||||
* Replies with current value when receiving `Get` message.
|
||||
* Will throw StorageException if the underlying data store is out of order.
|
||||
*/
|
||||
class Storage extends Actor {
|
||||
import Storage._
|
||||
implicit def system = context.system
|
||||
|
||||
val db = DummyDB
|
||||
|
||||
def receive = LoggingReceive(this) {
|
||||
case Store(Entry(key, count)) ⇒ db.save(key, count)
|
||||
case Get(key) ⇒ sender ! Entry(key, db.load(key).getOrElse(0L))
|
||||
}
|
||||
}
|
||||
|
||||
object DummyDB {
|
||||
import Storage.StorageException
|
||||
var db = Map[String, Long]()
|
||||
|
||||
@throws(classOf[StorageException])
|
||||
def save(key: String, value: Long): Unit = synchronized {
|
||||
if (11 <= value && value <= 14) throw new StorageException("Simulated store failure " + value)
|
||||
db += (key -> value)
|
||||
}
|
||||
|
||||
@throws(classOf[StorageException])
|
||||
def load(key: String): Option[Long] = synchronized {
|
||||
db.get(key)
|
||||
}
|
||||
}
|
||||
//#all
|
||||
|
|
@ -12,9 +12,26 @@ children, and as such each actor defines fault handling supervisor strategy.
|
|||
This strategy cannot be changed afterwards as it is an integral part of the
|
||||
actor system’s structure.
|
||||
|
||||
Fault Handling in Practice
|
||||
--------------------------
|
||||
|
||||
First, let us look at a sample that illustrates one way to handle data store errors,
|
||||
which is a typical source of failure in real world applications. Of course it depends
|
||||
on the actual application what is possible to do when the data store is unavailable,
|
||||
but in this sample we use a best effort re-connect approach.
|
||||
|
||||
Read the following source code. The inlined comments explain the different pieces of
|
||||
the fault handling and why they are added. It is also highly recommended to run this
|
||||
sample as it is easy to follow the log output to understand what is happening in runtime.
|
||||
|
||||
.. includecode:: code/akka/docs/actor/FaultHandlingDocSample.scala#all
|
||||
|
||||
Creating a Supervisor Strategy
|
||||
------------------------------
|
||||
|
||||
The following sections explain the fault handling mechanism and alternatives
|
||||
in more depth.
|
||||
|
||||
For the sake of demonstration let us consider the following strategy:
|
||||
|
||||
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
|
||||
|
|
@ -34,8 +51,27 @@ The match statement which forms the bulk of the body is of type ``Decider``,
|
|||
which is a ``PartialFunction[Throwable, Action]``. This
|
||||
is the piece which maps child failure types to their corresponding actions.
|
||||
|
||||
Practical Application
|
||||
---------------------
|
||||
Default Supervisor Strategy
|
||||
---------------------------
|
||||
|
||||
``Escalate`` is used if the defined strategy doesn't cover the exception that was thrown.
|
||||
|
||||
When the supervisor strategy is not defined for an actor the following
|
||||
exceptions are handled by default::
|
||||
|
||||
OneForOneStrategy() {
|
||||
case _: ActorInitializationException ⇒ Stop
|
||||
case _: ActorKilledException ⇒ Stop
|
||||
case _: Exception ⇒ Restart
|
||||
case _ ⇒ Escalate
|
||||
}
|
||||
|
||||
If the exception escalate all the way up to the root guardian it will handle it
|
||||
in the same way as the default strategy defined above.
|
||||
|
||||
|
||||
Test Application
|
||||
----------------
|
||||
|
||||
The following section shows the effects of the different actions in practice,
|
||||
wherefor a test setup is needed. First off, we need a suitable supervisor:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue