The idea is to filter the sources, replacing @<var>@ occurrences with the mapping for <var> (which is currently hard-coded). @@ -> @. In order to make this work, I had to move the doc sources one directory down (into akka-docs/rst) so that the filtered result could be in a sibling directory so that relative links (to _sphinx plugins or real code) would continue to work. While I was at it I also changed it so that WARNINGs and ERRORs are not swallowed into the debug dump anymore but printed at [warn] level (minimum). One piece of fallout is that the (online) html build is now run after the normal one, not in parallel.
294 lines
8.6 KiB
Scala
294 lines
8.6 KiB
Scala
/**
|
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
|
*/
|
|
package docs.actor
|
|
|
|
import language.postfixOps
|
|
|
|
//#all
|
|
//#imports
|
|
import akka.actor._
|
|
import akka.actor.SupervisorStrategy._
|
|
import scala.concurrent.util.duration._
|
|
import scala.concurrent.util.Duration
|
|
import akka.util.Timeout
|
|
import akka.event.LoggingReceive
|
|
import akka.pattern.{ ask, pipe }
|
|
import com.typesafe.config.ConfigFactory
|
|
//#imports
|
|
|
|
/**
|
|
* Runs the sample
|
|
*/
|
|
object FaultHandlingDocSample extends App {
|
|
import Worker._
|
|
|
|
val config = ConfigFactory.parseString("""
|
|
akka.loglevel = DEBUG
|
|
akka.actor.debug {
|
|
receive = on
|
|
lifecycle = on
|
|
}
|
|
""")
|
|
|
|
val system = ActorSystem("FaultToleranceSample", config)
|
|
val worker = system.actorOf(Props[Worker], name = "worker")
|
|
val listener = system.actorOf(Props[Listener], name = "listener")
|
|
// start the work and listen on progress
|
|
// note that the listener is used as sender of the tell,
|
|
// i.e. it will receive replies from the worker
|
|
worker.tell(Start, sender = listener)
|
|
}
|
|
|
|
/**
|
|
* Listens on progress from the worker and shuts down the system when enough
|
|
* work has been done.
|
|
*/
|
|
class Listener extends Actor with ActorLogging {
|
|
import Worker._
|
|
// If we don't get any progress within 15 seconds then the service is unavailable
|
|
context.setReceiveTimeout(15 seconds)
|
|
|
|
def receive = {
|
|
case Progress(percent) ⇒
|
|
log.info("Current progress: {} %", percent)
|
|
if (percent >= 100.0) {
|
|
log.info("That's all, shutting down")
|
|
context.system.shutdown()
|
|
}
|
|
|
|
case ReceiveTimeout ⇒
|
|
// No progress within 15 seconds, ServiceUnavailable
|
|
log.error("Shutting down due to unavailable service")
|
|
context.system.shutdown()
|
|
}
|
|
}
|
|
|
|
//#messages
|
|
object Worker {
|
|
case object Start
|
|
case object Do
|
|
case class Progress(percent: Double)
|
|
}
|
|
//#messages
|
|
|
|
/**
|
|
* Worker performs some work when it receives the `Start` message.
|
|
* It will continuously notify the sender of the `Start` message
|
|
* of current ``Progress``. The `Worker` supervise the `CounterService`.
|
|
*/
|
|
class Worker extends Actor with ActorLogging {
|
|
import Worker._
|
|
import CounterService._
|
|
implicit val askTimeout = Timeout(5 seconds)
|
|
|
|
// Stop the CounterService child if it throws ServiceUnavailable
|
|
override val supervisorStrategy = OneForOneStrategy() {
|
|
case _: CounterService.ServiceUnavailable ⇒ Stop
|
|
}
|
|
|
|
// The sender of the initial Start message will continuously be notified about progress
|
|
var progressListener: Option[ActorRef] = None
|
|
val counterService = context.actorOf(Props[CounterService], name = "counter")
|
|
val totalCount = 51
|
|
import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext
|
|
|
|
def receive = LoggingReceive {
|
|
case Start if progressListener.isEmpty ⇒
|
|
progressListener = Some(sender)
|
|
context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do)
|
|
|
|
case Do ⇒
|
|
counterService ! Increment(1)
|
|
counterService ! Increment(1)
|
|
counterService ! Increment(1)
|
|
|
|
// Send current progress to the initial sender
|
|
counterService ? GetCurrentCount map {
|
|
case CurrentCount(_, count) ⇒ Progress(100.0 * count / totalCount)
|
|
} pipeTo progressListener.get
|
|
}
|
|
}
|
|
|
|
//#messages
|
|
object CounterService {
|
|
case class Increment(n: Int)
|
|
case object GetCurrentCount
|
|
case class CurrentCount(key: String, count: Long)
|
|
class ServiceUnavailable(msg: String) extends RuntimeException(msg)
|
|
|
|
private case object Reconnect
|
|
}
|
|
//#messages
|
|
|
|
/**
|
|
* Adds the value received in `Increment` message to a persistent
|
|
* counter. Replies with `CurrentCount` when it is asked for `CurrentCount`.
|
|
* `CounterService` supervise `Storage` and `Counter`.
|
|
*/
|
|
class CounterService extends Actor {
|
|
import CounterService._
|
|
import Counter._
|
|
import Storage._
|
|
|
|
// Restart the storage child when StorageException is thrown.
|
|
// After 3 restarts within 5 seconds it will be stopped.
|
|
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds) {
|
|
case _: Storage.StorageException ⇒ Restart
|
|
}
|
|
|
|
val key = self.path.name
|
|
var storage: Option[ActorRef] = None
|
|
var counter: Option[ActorRef] = None
|
|
var backlog = IndexedSeq.empty[(ActorRef, Any)]
|
|
val MaxBacklog = 10000
|
|
|
|
import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext
|
|
|
|
override def preStart() {
|
|
initStorage()
|
|
}
|
|
|
|
/**
|
|
* The child storage is restarted in case of failure, but after 3 restarts,
|
|
* and still failing it will be stopped. Better to back-off than continuously
|
|
* failing. When it has been stopped we will schedule a Reconnect after a delay.
|
|
* Watch the child so we receive Terminated message when it has been terminated.
|
|
*/
|
|
def initStorage() {
|
|
storage = Some(context.watch(context.actorOf(Props[Storage], name = "storage")))
|
|
// Tell the counter, if any, to use the new storage
|
|
counter foreach { _ ! UseStorage(storage) }
|
|
// We need the initial value to be able to operate
|
|
storage.get ! Get(key)
|
|
}
|
|
|
|
def receive = LoggingReceive {
|
|
|
|
case Entry(k, v) if k == key && counter == None ⇒
|
|
// Reply from Storage of the initial value, now we can create the Counter
|
|
val c = context.actorOf(Props(new Counter(key, v)))
|
|
counter = Some(c)
|
|
// Tell the counter to use current storage
|
|
c ! UseStorage(storage)
|
|
// and send the buffered backlog to the counter
|
|
for ((replyTo, msg) ← backlog) c.tell(msg, sender = replyTo)
|
|
backlog = IndexedSeq.empty
|
|
|
|
case msg @ Increment(n) ⇒ forwardOrPlaceInBacklog(msg)
|
|
|
|
case msg @ GetCurrentCount ⇒ forwardOrPlaceInBacklog(msg)
|
|
|
|
case Terminated(actorRef) if Some(actorRef) == storage ⇒
|
|
// After 3 restarts the storage child is stopped.
|
|
// We receive Terminated because we watch the child, see initStorage.
|
|
storage = None
|
|
// Tell the counter that there is no storage for the moment
|
|
counter foreach { _ ! UseStorage(None) }
|
|
// Try to re-establish storage after while
|
|
context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect)
|
|
|
|
case Reconnect ⇒
|
|
// Re-establish storage after the scheduled delay
|
|
initStorage()
|
|
}
|
|
|
|
def forwardOrPlaceInBacklog(msg: Any) {
|
|
// We need the initial value from storage before we can start delegate to the counter.
|
|
// Before that we place the messages in a backlog, to be sent to the counter when
|
|
// it is initialized.
|
|
counter match {
|
|
case Some(c) ⇒ c forward msg
|
|
case None ⇒
|
|
if (backlog.size >= MaxBacklog)
|
|
throw new ServiceUnavailable("CounterService not available, lack of initial value")
|
|
backlog = backlog :+ (sender, msg)
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
//#messages
|
|
object Counter {
|
|
case class UseStorage(storage: Option[ActorRef])
|
|
}
|
|
//#messages
|
|
|
|
/**
|
|
* The in memory count variable that will send current
|
|
* value to the `Storage`, if there is any storage
|
|
* available at the moment.
|
|
*/
|
|
class Counter(key: String, initialValue: Long) extends Actor {
|
|
import Counter._
|
|
import CounterService._
|
|
import Storage._
|
|
|
|
var count = initialValue
|
|
var storage: Option[ActorRef] = None
|
|
|
|
def receive = LoggingReceive {
|
|
case UseStorage(s) ⇒
|
|
storage = s
|
|
storeCount()
|
|
|
|
case Increment(n) ⇒
|
|
count += n
|
|
storeCount()
|
|
|
|
case GetCurrentCount ⇒
|
|
sender ! CurrentCount(key, count)
|
|
|
|
}
|
|
|
|
def storeCount() {
|
|
// Delegate dangerous work, to protect our valuable state.
|
|
// We can continue without storage.
|
|
storage foreach { _ ! Store(Entry(key, count)) }
|
|
}
|
|
|
|
}
|
|
|
|
//#messages
|
|
object Storage {
|
|
case class Store(entry: Entry)
|
|
case class Get(key: String)
|
|
case class Entry(key: String, value: Long)
|
|
class StorageException(msg: String) extends RuntimeException(msg)
|
|
}
|
|
//#messages
|
|
|
|
/**
|
|
* Saves key/value pairs to persistent storage when receiving `Store` message.
|
|
* Replies with current value when receiving `Get` message.
|
|
* Will throw StorageException if the underlying data store is out of order.
|
|
*/
|
|
class Storage extends Actor {
|
|
import Storage._
|
|
|
|
val db = DummyDB
|
|
|
|
def receive = LoggingReceive {
|
|
case Store(Entry(key, count)) ⇒ db.save(key, count)
|
|
case Get(key) ⇒ sender ! Entry(key, db.load(key).getOrElse(0L))
|
|
}
|
|
}
|
|
|
|
//#dummydb
|
|
object DummyDB {
|
|
import Storage.StorageException
|
|
private var db = Map[String, Long]()
|
|
|
|
@throws(classOf[StorageException])
|
|
def save(key: String, value: Long): Unit = synchronized {
|
|
if (11 <= value && value <= 14) throw new StorageException("Simulated store failure " + value)
|
|
db += (key -> value)
|
|
}
|
|
|
|
@throws(classOf[StorageException])
|
|
def load(key: String): Option[Long] = synchronized {
|
|
db.get(key)
|
|
}
|
|
}
|
|
//#dummydb
|
|
//#all
|