Renamed more things to SupervisorStrategy. Updated more docs. See #1711
This commit is contained in:
parent
66e0a7cf0b
commit
9e15d2062b
19 changed files with 166 additions and 148 deletions
|
|
@ -23,7 +23,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
|
||||
import DeathWatchSpec._
|
||||
|
||||
lazy val supervisor = system.actorOf(Props(new Supervisor(FaultHandlingStrategy.defaultFaultHandler)), "watchers")
|
||||
lazy val supervisor = system.actorOf(Props(new Supervisor(SupervisorStrategy.defaultStrategy)), "watchers")
|
||||
|
||||
def startWatching(target: ActorRef) = Await.result((supervisor ? props(target, testActor)).mapTo[ActorRef], 3 seconds)
|
||||
|
||||
|
|
@ -115,7 +115,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
"fail a monitor which does not handle Terminated()" in {
|
||||
filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) {
|
||||
case class FF(fail: Failed)
|
||||
val strategy = new OneForOneStrategy(FaultHandlingStrategy.makeDecider(List(classOf[Exception])), Some(0)) {
|
||||
val strategy = new OneForOneStrategy(SupervisorStrategy.makeDecider(List(classOf[Exception])), Some(0)) {
|
||||
override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = {
|
||||
testActor.tell(FF(Failed(cause)), child)
|
||||
super.handleFailure(context, child, cause, stats, children)
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ package akka.actor
|
|||
* For testing Supervisor behavior, normally you don't supply the strategy
|
||||
* from the outside like this.
|
||||
*/
|
||||
class Supervisor(override val supervisorStrategy: FaultHandlingStrategy) extends Actor {
|
||||
class Supervisor(override val supervisorStrategy: SupervisorStrategy) extends Actor {
|
||||
|
||||
def receive = {
|
||||
case x: Props ⇒ sender ! context.actorOf(x)
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ object SupervisorHierarchySpec {
|
|||
* For testing Supervisor behavior, normally you don't supply the strategy
|
||||
* from the outside like this.
|
||||
*/
|
||||
class CountDownActor(countDown: CountDownLatch, override val supervisorStrategy: FaultHandlingStrategy) extends Actor {
|
||||
class CountDownActor(countDown: CountDownLatch, override val supervisorStrategy: SupervisorStrategy) extends Actor {
|
||||
|
||||
protected def receive = {
|
||||
case p: Props ⇒ sender ! context.actorOf(p)
|
||||
|
|
@ -43,7 +43,7 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
|
|||
val managerProps = Props(new CountDownActor(countDown, AllForOneStrategy(List(), None, None)))
|
||||
val manager = Await.result((boss ? managerProps).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
val workerProps = Props(new CountDownActor(countDown, FaultHandlingStrategy.defaultFaultHandler))
|
||||
val workerProps = Props(new CountDownActor(countDown, SupervisorStrategy.defaultStrategy))
|
||||
val workerOne, workerTwo, workerThree = Await.result((manager ? workerProps).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
filterException[ActorKilledException] {
|
||||
|
|
@ -62,7 +62,7 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
|
|||
val boss = system.actorOf(Props(new Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy(List(classOf[Throwable]), 1, 5000)
|
||||
|
||||
val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages, FaultHandlingStrategy.defaultFaultHandler))))
|
||||
val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages, SupervisorStrategy.defaultStrategy))))
|
||||
|
||||
protected def receive = {
|
||||
case "killCrasher" ⇒ crasher ! Kill
|
||||
|
|
|
|||
|
|
@ -300,7 +300,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
|
|||
filterEvents(EventFilter[IllegalStateException]("expected")) {
|
||||
val boss = system.actorOf(Props(new Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy {
|
||||
case e: IllegalStateException if e.getMessage == "expected" ⇒ FaultHandlingStrategy.Resume
|
||||
case e: IllegalStateException if e.getMessage == "expected" ⇒ SupervisorStrategy.Resume
|
||||
}
|
||||
def receive = {
|
||||
case p: TypedProps[_] ⇒ context.sender ! TypedActor(context).typedActorOf(p)
|
||||
|
|
|
|||
|
|
@ -236,7 +236,7 @@ trait Actor {
|
|||
* User overridable definition the strategy to use for supervising
|
||||
* child actors.
|
||||
*/
|
||||
def supervisorStrategy(): FaultHandlingStrategy = FaultHandlingStrategy.defaultFaultHandler
|
||||
def supervisorStrategy(): SupervisorStrategy = SupervisorStrategy.defaultStrategy
|
||||
|
||||
/**
|
||||
* User overridable callback.
|
||||
|
|
|
|||
|
|
@ -336,7 +336,7 @@ class LocalActorRefProvider(
|
|||
private class Guardian extends Actor {
|
||||
|
||||
override val supervisorStrategy = {
|
||||
import akka.actor.FaultHandlingStrategy._
|
||||
import akka.actor.SupervisorStrategy._
|
||||
OneForOneStrategy {
|
||||
case _: ActorKilledException ⇒ Stop
|
||||
case _: ActorInitializationException ⇒ Stop
|
||||
|
|
@ -376,8 +376,6 @@ class LocalActorRefProvider(
|
|||
override def preRestart(cause: Throwable, msg: Option[Any]) {}
|
||||
}
|
||||
|
||||
private val guardianProps = Props(new Guardian)
|
||||
|
||||
/*
|
||||
* The problem is that ActorRefs need a reference to the ActorSystem to
|
||||
* provide their service. Hence they cannot be created while the
|
||||
|
|
@ -403,6 +401,8 @@ class LocalActorRefProvider(
|
|||
*/
|
||||
def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras
|
||||
|
||||
private val guardianProps = Props(new Guardian)
|
||||
|
||||
lazy val rootGuardian: InternalActorRef =
|
||||
new LocalActorRef(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true) {
|
||||
object Extra {
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int =
|
|||
}
|
||||
}
|
||||
|
||||
object FaultHandlingStrategy {
|
||||
object SupervisorStrategy {
|
||||
sealed trait Action
|
||||
|
||||
/**
|
||||
|
|
@ -95,7 +95,7 @@ object FaultHandlingStrategy {
|
|||
*/
|
||||
def escalate = Escalate
|
||||
|
||||
final val defaultFaultHandler: FaultHandlingStrategy = {
|
||||
final val defaultStrategy: SupervisorStrategy = {
|
||||
def defaultDecider: Decider = {
|
||||
case _: ActorInitializationException ⇒ Stop
|
||||
case _: ActorKilledException ⇒ Stop
|
||||
|
|
@ -158,9 +158,9 @@ object FaultHandlingStrategy {
|
|||
}
|
||||
}
|
||||
|
||||
abstract class FaultHandlingStrategy {
|
||||
abstract class SupervisorStrategy {
|
||||
|
||||
import FaultHandlingStrategy._
|
||||
import SupervisorStrategy._
|
||||
|
||||
def decider: Decider
|
||||
|
||||
|
|
@ -200,12 +200,12 @@ abstract class FaultHandlingStrategy {
|
|||
|
||||
object AllForOneStrategy {
|
||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): AllForOneStrategy =
|
||||
new AllForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit),
|
||||
new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit),
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): AllForOneStrategy =
|
||||
new AllForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
|
||||
new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
|
||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int]): AllForOneStrategy =
|
||||
new AllForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit), maxNrOfRetries, None)
|
||||
new AllForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, None)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -214,22 +214,22 @@ object AllForOneStrategy {
|
|||
* maxNrOfRetries = the number of times an actor is allowed to be restarted
|
||||
* withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window
|
||||
*/
|
||||
case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider,
|
||||
case class AllForOneStrategy(decider: SupervisorStrategy.Decider,
|
||||
maxNrOfRetries: Option[Int] = None,
|
||||
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy {
|
||||
withinTimeRange: Option[Int] = None) extends SupervisorStrategy {
|
||||
|
||||
def this(decider: FaultHandlingStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(FaultHandlingStrategy.makeDecider(decider),
|
||||
def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(SupervisorStrategy.makeDecider(decider),
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
|
||||
def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(FaultHandlingStrategy.makeDecider(trapExit),
|
||||
this(SupervisorStrategy.makeDecider(trapExit),
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
|
||||
def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(FaultHandlingStrategy.makeDecider(trapExit),
|
||||
this(SupervisorStrategy.makeDecider(trapExit),
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
|
||||
|
|
@ -257,12 +257,12 @@ case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider,
|
|||
|
||||
object OneForOneStrategy {
|
||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): OneForOneStrategy =
|
||||
new OneForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit),
|
||||
new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit),
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): OneForOneStrategy =
|
||||
new OneForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
|
||||
new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange)
|
||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int]): OneForOneStrategy =
|
||||
new OneForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit), maxNrOfRetries, None)
|
||||
new OneForOneStrategy(SupervisorStrategy.makeDecider(trapExit), maxNrOfRetries, None)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -271,22 +271,22 @@ object OneForOneStrategy {
|
|||
* maxNrOfRetries = the number of times an actor is allowed to be restarted
|
||||
* withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window
|
||||
*/
|
||||
case class OneForOneStrategy(decider: FaultHandlingStrategy.Decider,
|
||||
case class OneForOneStrategy(decider: SupervisorStrategy.Decider,
|
||||
maxNrOfRetries: Option[Int] = None,
|
||||
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy {
|
||||
withinTimeRange: Option[Int] = None) extends SupervisorStrategy {
|
||||
|
||||
def this(decider: FaultHandlingStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(FaultHandlingStrategy.makeDecider(decider),
|
||||
def this(decider: SupervisorStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(SupervisorStrategy.makeDecider(decider),
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
|
||||
def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(FaultHandlingStrategy.makeDecider(trapExit),
|
||||
this(SupervisorStrategy.makeDecider(trapExit),
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
|
||||
def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||
this(FaultHandlingStrategy.makeDecider(trapExit),
|
||||
this(SupervisorStrategy.makeDecider(trapExit),
|
||||
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
|
||||
|
|
|
|||
|
|
@ -218,9 +218,9 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
TypedActor.currentContext set null
|
||||
}
|
||||
|
||||
override def supervisorStrategy: FaultHandlingStrategy = me match {
|
||||
case l: SupervisorStrategy ⇒ l.supervisorStrategy
|
||||
case _ ⇒ super.supervisorStrategy
|
||||
override def supervisorStrategy(): SupervisorStrategy = me match {
|
||||
case l: Supervisor ⇒ l.supervisorStrategy
|
||||
case _ ⇒ super.supervisorStrategy
|
||||
}
|
||||
|
||||
override def preStart(): Unit = me match {
|
||||
|
|
@ -283,12 +283,12 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
/**
|
||||
* Mix this into your TypedActor to be able to define supervisor strategy
|
||||
*/
|
||||
trait SupervisorStrategy {
|
||||
trait Supervisor {
|
||||
/**
|
||||
* User overridable definition the strategy to use for supervising
|
||||
* child actors.
|
||||
*/
|
||||
def supervisorStrategy: FaultHandlingStrategy = FaultHandlingStrategy.defaultFaultHandler
|
||||
def supervisorStrategy(): SupervisorStrategy = SupervisorStrategy.defaultStrategy
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ import akka.dispatch.{ MessageDispatcher, Promise }
|
|||
* }
|
||||
* }
|
||||
*
|
||||
* private static FaultHandlingStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||
* private static SupervisorStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||
* @Override
|
||||
* public Action apply(Throwable t) {
|
||||
* if (t instanceof ArithmeticException) {
|
||||
|
|
@ -53,7 +53,7 @@ import akka.dispatch.{ MessageDispatcher, Promise }
|
|||
* }, 10, 60000);
|
||||
*
|
||||
* @Override
|
||||
* public FaultHandlingStrategy supervisorStrategy() {
|
||||
* public SupervisorStrategy supervisorStrategy() {
|
||||
* return strategy;
|
||||
* }
|
||||
*
|
||||
|
|
@ -116,7 +116,7 @@ abstract class UntypedActor extends Actor {
|
|||
* User overridable definition the strategy to use for supervising
|
||||
* child actors.
|
||||
*/
|
||||
override def supervisorStrategy(): FaultHandlingStrategy = super.supervisorStrategy()
|
||||
override def supervisorStrategy(): SupervisorStrategy = super.supervisorStrategy()
|
||||
|
||||
/**
|
||||
* User overridable callback.
|
||||
|
|
|
|||
|
|
@ -3,123 +3,121 @@
|
|||
What is an Actor?
|
||||
=================
|
||||
|
||||
The previous section about :ref:`actor-systems` explained how actors form
|
||||
hierarchies and are the smallest unit when building an application. This
|
||||
section looks at one such actor in isolation, explaining the concepts you
|
||||
encounter while implementing it. For more an in depth reference with all the
|
||||
The previous section about :ref:`actor-systems` explained how actors form
|
||||
hierarchies and are the smallest unit when building an application. This
|
||||
section looks at one such actor in isolation, explaining the concepts you
|
||||
encounter while implementing it. For more an in depth reference with all the
|
||||
details please refer to :ref:`actors-scala` and :ref:`untyped-actors-java`.
|
||||
|
||||
An actor is a container for `State`_, `Behavior`_, a `Mailbox`_, `Children`_
|
||||
and a `Fault Handling Strategy`_. All of this is encapsulated behind an `Actor
|
||||
An actor is a container for `State`_, `Behavior`_, a `Mailbox`_, `Children`_
|
||||
and a `Supervisor Strategy`_. All of this is encapsulated behind an `Actor
|
||||
Reference`_. Finally, this happens `When an Actor Terminates`_.
|
||||
|
||||
Actor Reference
|
||||
---------------
|
||||
|
||||
As detailed below, an actor object needs to be shielded from the outside in
|
||||
order to benefit from the actor model. Therefore, actors are represented to the
|
||||
outside using actor references, which are objects that can be passed around
|
||||
freely and without restriction. This split into inner and outer object enables
|
||||
transparency for all the desired operations: restarting an actor without
|
||||
needing to update references elsewhere, placing the actual actor object on
|
||||
remote hosts, sending messages to actors in completely different applications.
|
||||
But the most important aspect is that it is not possible to look inside an
|
||||
actor and get hold of its state from the outside, unless the actor unwisely
|
||||
As detailed below, an actor object needs to be shielded from the outside in
|
||||
order to benefit from the actor model. Therefore, actors are represented to the
|
||||
outside using actor references, which are objects that can be passed around
|
||||
freely and without restriction. This split into inner and outer object enables
|
||||
transparency for all the desired operations: restarting an actor without
|
||||
needing to update references elsewhere, placing the actual actor object on
|
||||
remote hosts, sending messages to actors in completely different applications.
|
||||
But the most important aspect is that it is not possible to look inside an
|
||||
actor and get hold of its state from the outside, unless the actor unwisely
|
||||
publishes this information itself.
|
||||
|
||||
State
|
||||
-----
|
||||
|
||||
Actor objects will typically contain some variables which reflect possible
|
||||
states the actor may be in. This can be an explicit state machine (e.g. using
|
||||
the :ref:`fsm` module), or it could be a counter, set of listeners, pending
|
||||
requests, etc. These data are what make an actor valuable, and they must be
|
||||
protected from corruption by other actors. The good news is that Akka actors
|
||||
conceptually each have their own light-weight thread, which is completely
|
||||
shielded from the rest of the system. This means that instead of having to
|
||||
synchronize access using locks you can just write your actor code without
|
||||
Actor objects will typically contain some variables which reflect possible
|
||||
states the actor may be in. This can be an explicit state machine (e.g. using
|
||||
the :ref:`fsm` module), or it could be a counter, set of listeners, pending
|
||||
requests, etc. These data are what make an actor valuable, and they must be
|
||||
protected from corruption by other actors. The good news is that Akka actors
|
||||
conceptually each have their own light-weight thread, which is completely
|
||||
shielded from the rest of the system. This means that instead of having to
|
||||
synchronize access using locks you can just write your actor code without
|
||||
worrying about concurrency at all.
|
||||
|
||||
Behind the scenes Akka will run sets of actors on sets of real threads, where
|
||||
typically many actors share one thread, and subsequent invocations of one actor
|
||||
may end up being processed on different threads. Akka ensures that this
|
||||
implementation detail does not affect the single-threadedness of handling the
|
||||
Behind the scenes Akka will run sets of actors on sets of real threads, where
|
||||
typically many actors share one thread, and subsequent invocations of one actor
|
||||
may end up being processed on different threads. Akka ensures that this
|
||||
implementation detail does not affect the single-threadedness of handling the
|
||||
actor’s state.
|
||||
|
||||
Because the internal state is vital to an actor’s operations, having
|
||||
inconsistent state is fatal. Thus, when the actor fails and is restarted by its
|
||||
supervisor, the state will be created from scratch, like upon first creating
|
||||
Because the internal state is vital to an actor’s operations, having
|
||||
inconsistent state is fatal. Thus, when the actor fails and is restarted by its
|
||||
supervisor, the state will be created from scratch, like upon first creating
|
||||
the actor. This is to enable the ability of self-healing of the system.
|
||||
|
||||
Behavior
|
||||
--------
|
||||
|
||||
Every time a message is processed, it is matched against the current behavior
|
||||
of the actor. Behavior means a function which defines the actions to be taken
|
||||
in reaction to the message at that point in time, say forward a request if the
|
||||
client is authorized, deny it otherwise. This behavior may change over time,
|
||||
e.g. because different clients obtain authorization over time, or because the
|
||||
actor may go into an “out-of-service” mode and later come back. These changes
|
||||
are achieved by either encoding them in state variables which are read from the
|
||||
behavior logic, or the function itself may be swapped out at runtime, see the
|
||||
``become`` and ``unbecome`` operations. However, the initial behavior defined
|
||||
during construction of the actor object is special in the sense that a restart
|
||||
Every time a message is processed, it is matched against the current behavior
|
||||
of the actor. Behavior means a function which defines the actions to be taken
|
||||
in reaction to the message at that point in time, say forward a request if the
|
||||
client is authorized, deny it otherwise. This behavior may change over time,
|
||||
e.g. because different clients obtain authorization over time, or because the
|
||||
actor may go into an “out-of-service” mode and later come back. These changes
|
||||
are achieved by either encoding them in state variables which are read from the
|
||||
behavior logic, or the function itself may be swapped out at runtime, see the
|
||||
``become`` and ``unbecome`` operations. However, the initial behavior defined
|
||||
during construction of the actor object is special in the sense that a restart
|
||||
of the actor will reset its behavior to this initial one.
|
||||
|
||||
Mailbox
|
||||
-------
|
||||
|
||||
An actor’s purpose is the processing of messages, and these messages were sent
|
||||
to the actor from other actors (or from outside the actor system). The piece
|
||||
which connects sender and receiver is the actor’s mailbox: each actor has
|
||||
exactly one mailbox to which all senders enqueue their messages. Enqueuing
|
||||
happens in the time-order of send operations, which means that messages sent
|
||||
from different actors may not have a defined order at runtime due to the
|
||||
apparent randomness of distributing actors across threads. Sending multiple
|
||||
messages to the same target from the same actor, on the other hand, will
|
||||
An actor’s purpose is the processing of messages, and these messages were sent
|
||||
to the actor from other actors (or from outside the actor system). The piece
|
||||
which connects sender and receiver is the actor’s mailbox: each actor has
|
||||
exactly one mailbox to which all senders enqueue their messages. Enqueuing
|
||||
happens in the time-order of send operations, which means that messages sent
|
||||
from different actors may not have a defined order at runtime due to the
|
||||
apparent randomness of distributing actors across threads. Sending multiple
|
||||
messages to the same target from the same actor, on the other hand, will
|
||||
enqueue them in the same order.
|
||||
|
||||
There are different mailbox implementations to choose from, the default being a
|
||||
FIFO: the order of the messages processed by the actor matches the order in
|
||||
which they were enqueued. This is usually a good default, but applications may
|
||||
need to prioritize some messages over others. In this case, a priority mailbox
|
||||
will enqueue not always at the end but at a position as given by the message
|
||||
priority, which might even be at the front. While using such a queue, the order
|
||||
of messages processed will naturally be defined by the queue’s algorithm and in
|
||||
There are different mailbox implementations to choose from, the default being a
|
||||
FIFO: the order of the messages processed by the actor matches the order in
|
||||
which they were enqueued. This is usually a good default, but applications may
|
||||
need to prioritize some messages over others. In this case, a priority mailbox
|
||||
will enqueue not always at the end but at a position as given by the message
|
||||
priority, which might even be at the front. While using such a queue, the order
|
||||
of messages processed will naturally be defined by the queue’s algorithm and in
|
||||
general not be FIFO.
|
||||
|
||||
An important feature in which Akka differs from some other actor model
|
||||
implementations is that the current behavior must always handle the next
|
||||
dequeued message, there is no scanning the mailbox for the next matching one.
|
||||
Failure to handle a message will typically be treated as a failure, unless this
|
||||
An important feature in which Akka differs from some other actor model
|
||||
implementations is that the current behavior must always handle the next
|
||||
dequeued message, there is no scanning the mailbox for the next matching one.
|
||||
Failure to handle a message will typically be treated as a failure, unless this
|
||||
behavior is overridden.
|
||||
|
||||
Children
|
||||
--------
|
||||
|
||||
Each actor is potentially a supervisor: if it creates children for delegating
|
||||
sub-tasks, it will automatically supervise them. The list of children is
|
||||
maintained within the actor’s context and the actor has access to it.
|
||||
Modifications to the list are done by creating (``context.actorOf(...)``) or
|
||||
stopping (``context.stop(child)``) children and these actions are reflected
|
||||
immediately. The actual creation and termination actions happen behind the
|
||||
Each actor is potentially a supervisor: if it creates children for delegating
|
||||
sub-tasks, it will automatically supervise them. The list of children is
|
||||
maintained within the actor’s context and the actor has access to it.
|
||||
Modifications to the list are done by creating (``context.actorOf(...)``) or
|
||||
stopping (``context.stop(child)``) children and these actions are reflected
|
||||
immediately. The actual creation and termination actions happen behind the
|
||||
scenes in an asynchronous way, so they do not “block” their supervisor.
|
||||
|
||||
Fault Handling Strategy
|
||||
-----------------------
|
||||
Supervisor Strategy
|
||||
-------------------
|
||||
|
||||
The final piece of an actor is its strategy for handling faults of its
|
||||
children. To keep it simple and robust, this is declared outside of the actor’s
|
||||
code and has no access to the actor’s state. Fault handling is then done
|
||||
transparently by Akka, applying one of the strategies described in
|
||||
:ref:`supervision` for each incoming failure. As this strategy is fundamental
|
||||
to how an actor system is structured, it cannot be changed once an actor has
|
||||
been created.
|
||||
The final piece of an actor is its strategy for handling faults of its
|
||||
children. Fault handling is then done transparently by Akka, applying one
|
||||
of the strategies described in :ref:`supervision` for each incoming failure.
|
||||
As this strategy is fundamental to how an actor system is structured, it
|
||||
cannot be changed once an actor has been created.
|
||||
|
||||
Considering that there is only one such strategy for each actor, this means
|
||||
that if different strategies apply to the various children of an actor, the
|
||||
children should be grouped beneath intermediate supervisors with matching
|
||||
strategies, preferring once more the structuring of actor systems according to
|
||||
Considering that there is only one such strategy for each actor, this means
|
||||
that if different strategies apply to the various children of an actor, the
|
||||
children should be grouped beneath intermediate supervisors with matching
|
||||
strategies, preferring once more the structuring of actor systems according to
|
||||
the splitting of tasks into sub-tasks.
|
||||
|
||||
When an Actor Terminates
|
||||
|
|
|
|||
|
|
@ -6,8 +6,8 @@ package akka.docs.actor;
|
|||
//#testkit
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.FaultHandlingStrategy;
|
||||
import static akka.actor.FaultHandlingStrategy.*;
|
||||
import akka.actor.SupervisorStrategy;
|
||||
import static akka.actor.SupervisorStrategy.*;
|
||||
import akka.actor.OneForOneStrategy;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.Terminated;
|
||||
|
|
@ -38,7 +38,7 @@ public class FaultHandlingTestBase {
|
|||
static public class Supervisor extends UntypedActor {
|
||||
|
||||
//#strategy
|
||||
private static FaultHandlingStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||
private static SupervisorStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||
@Override
|
||||
public Action apply(Throwable t) {
|
||||
if (t instanceof ArithmeticException) {
|
||||
|
|
@ -54,7 +54,7 @@ public class FaultHandlingTestBase {
|
|||
}, 10, 60000);
|
||||
|
||||
@Override
|
||||
public FaultHandlingStrategy supervisorStrategy() {
|
||||
public SupervisorStrategy supervisorStrategy() {
|
||||
return strategy;
|
||||
}
|
||||
|
||||
|
|
@ -75,7 +75,7 @@ public class FaultHandlingTestBase {
|
|||
static public class Supervisor2 extends UntypedActor {
|
||||
|
||||
//#strategy2
|
||||
private static FaultHandlingStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||
private static SupervisorStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||
@Override
|
||||
public Action apply(Throwable t) {
|
||||
if (t instanceof ArithmeticException) {
|
||||
|
|
@ -91,7 +91,7 @@ public class FaultHandlingTestBase {
|
|||
}, 10, 60000);
|
||||
|
||||
@Override
|
||||
public FaultHandlingStrategy supervisorStrategy() {
|
||||
public SupervisorStrategy supervisorStrategy() {
|
||||
return strategy;
|
||||
}
|
||||
|
||||
|
|
@ -147,7 +147,7 @@ public class FaultHandlingTestBase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void mustEmployFaultHandler() {
|
||||
public void mustEmploySupervisorStrategy() {
|
||||
// code here
|
||||
//#testkit
|
||||
EventFilter ex1 = (EventFilter) new ErrorFilter(ArithmeticException.class);
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
.. _fault-tolerance-java:
|
||||
|
||||
Fault Handling Strategies (Java)
|
||||
=================================
|
||||
Fault Tolerance (Java)
|
||||
======================
|
||||
|
||||
.. sidebar:: Contents
|
||||
|
||||
|
|
@ -12,8 +12,8 @@ children, and as such each actor defines fault handling supervisor strategy.
|
|||
This strategy cannot be changed afterwards as it is an integral part of the
|
||||
actor system’s structure.
|
||||
|
||||
Creating a Fault Handling Strategy
|
||||
----------------------------------
|
||||
Creating a Supervisor Strategy
|
||||
------------------------------
|
||||
|
||||
For the sake of demonstration let us consider the following strategy:
|
||||
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ it's located in ``akka.actor.TypedActor``.
|
|||
:include: typed-actor-extension-tools
|
||||
|
||||
.. warning::
|
||||
|
||||
|
||||
Same as not exposing ``this`` of an Akka Actor, it's important not to expose ``this`` of a Typed Actor,
|
||||
instead you should pass the external proxy reference, which is obtained from within your Typed Actor as
|
||||
``TypedActor.self()``, this is your external identity, as the ``ActorRef`` is the external identity of
|
||||
|
|
@ -127,7 +127,7 @@ Request-reply-with-future message send
|
|||
.. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java
|
||||
:include: typed-actor-call-future
|
||||
|
||||
This call is asynchronous, and the Future returned can be used for asynchronous composition.
|
||||
This call is asynchronous, and the Future returned can be used for asynchronous composition.
|
||||
|
||||
Stopping Typed Actors
|
||||
---------------------
|
||||
|
|
@ -153,6 +153,13 @@ you can create child Typed Actors by invoking ``typedActorOf(..)`` on that.
|
|||
|
||||
This also works for creating child Typed Actors in regular Akka Actors.
|
||||
|
||||
Supervisor Strategy
|
||||
-------------------
|
||||
|
||||
By having your Typed Actor implementation class implement ``TypedActor.Supervisor``
|
||||
you can define the strategy to use for supervising child actors, as described in
|
||||
:ref:`supervision` and :ref:`fault-tolerance-java`.
|
||||
|
||||
Lifecycle callbacks
|
||||
-------------------
|
||||
|
||||
|
|
|
|||
|
|
@ -129,6 +129,7 @@ In addition, it offers:
|
|||
|
||||
* :obj:`getSelf()` reference to the :class:`ActorRef` of the actor
|
||||
* :obj:`getSender()` reference sender Actor of the last received message, typically used as described in :ref:`UntypedActor.Reply`
|
||||
* :obj:`supervisorStrategy()` user overridable definition the strategy to use for supervising child actors
|
||||
* :obj:`getContext()` exposes contextual information for the actor and the current message, such as:
|
||||
|
||||
* factory methods to create child actors (:meth:`actorOf`)
|
||||
|
|
|
|||
|
|
@ -403,7 +403,7 @@ v2.0::
|
|||
|
||||
context.parent
|
||||
|
||||
*Fault handling strategy*
|
||||
*Supervisor Strategy*
|
||||
|
||||
v1.3::
|
||||
|
||||
|
|
@ -420,14 +420,18 @@ v1.3::
|
|||
|
||||
v2.0::
|
||||
|
||||
val strategy = OneForOneStrategy({
|
||||
case _: ArithmeticException ⇒ Resume
|
||||
case _: NullPointerException ⇒ Restart
|
||||
case _: IllegalArgumentException ⇒ Stop
|
||||
case _: Exception ⇒ Escalate
|
||||
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000))
|
||||
class MyActor extends Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy({
|
||||
case _: ArithmeticException ⇒ Resume
|
||||
case _: NullPointerException ⇒ Restart
|
||||
case _: IllegalArgumentException ⇒ Stop
|
||||
case _: Exception ⇒ Escalate
|
||||
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000))
|
||||
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(strategy), "supervisor")
|
||||
def receive = {
|
||||
case x =>
|
||||
}
|
||||
}
|
||||
|
||||
Documentation:
|
||||
|
||||
|
|
|
|||
|
|
@ -154,6 +154,7 @@ In addition, it offers:
|
|||
|
||||
* :obj:`self` reference to the :class:`ActorRef` of the actor
|
||||
* :obj:`sender` reference sender Actor of the last received message, typically used as described in :ref:`Actor.Reply`
|
||||
* :obj:`supervisorStrategy` user overridable definition the strategy to use for supervising child actors
|
||||
* :obj:`context` exposes contextual information for the actor and the current message, such as:
|
||||
|
||||
* factory methods to create child actors (:meth:`actorOf`)
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ object FaultHandlingDocSpec {
|
|||
class Supervisor extends Actor {
|
||||
//#strategy
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.FaultHandlingStrategy._
|
||||
import akka.actor.SupervisorStrategy._
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy({
|
||||
case _: ArithmeticException ⇒ Resume
|
||||
|
|
@ -39,7 +39,7 @@ object FaultHandlingDocSpec {
|
|||
class Supervisor2 extends Actor {
|
||||
//#strategy2
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.FaultHandlingStrategy._
|
||||
import akka.actor.SupervisorStrategy._
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy({
|
||||
case _: ArithmeticException ⇒ Resume
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
.. _fault-tolerance-scala:
|
||||
|
||||
Fault Handling Strategies (Scala)
|
||||
=================================
|
||||
Fault Tolerance (Scala)
|
||||
=======================
|
||||
|
||||
.. sidebar:: Contents
|
||||
|
||||
|
|
@ -12,8 +12,8 @@ children, and as such each actor defines fault handling supervisor strategy.
|
|||
This strategy cannot be changed afterwards as it is an integral part of the
|
||||
actor system’s structure.
|
||||
|
||||
Creating a Fault Handling Strategy
|
||||
----------------------------------
|
||||
Creating a Supervisor Strategy
|
||||
------------------------------
|
||||
|
||||
For the sake of demonstration let us consider the following strategy:
|
||||
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ it's located in ``akka.actor.TypedActor``.
|
|||
:include: typed-actor-extension-tools
|
||||
|
||||
.. warning::
|
||||
|
||||
|
||||
Same as not exposing ``this`` of an Akka Actor, it's important not to expose ``this`` of a Typed Actor,
|
||||
instead you should pass the external proxy reference, which is obtained from within your Typed Actor as
|
||||
``TypedActor.self``, this is your external identity, as the ``ActorRef`` is the external identity of
|
||||
|
|
@ -127,7 +127,7 @@ Request-reply-with-future message send
|
|||
.. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala
|
||||
:include: typed-actor-call-future
|
||||
|
||||
This call is asynchronous, and the Future returned can be used for asynchronous composition.
|
||||
This call is asynchronous, and the Future returned can be used for asynchronous composition.
|
||||
|
||||
Stopping Typed Actors
|
||||
---------------------
|
||||
|
|
@ -153,6 +153,13 @@ you can create child Typed Actors by invoking ``typedActorOf(..)`` on that.
|
|||
|
||||
This also works for creating child Typed Actors in regular Akka Actors.
|
||||
|
||||
Supervisor Strategy
|
||||
-------------------
|
||||
|
||||
By having your Typed Actor implementation class implement ``TypedActor.Supervisor``
|
||||
you can define the strategy to use for supervising child actors, as described in
|
||||
:ref:`supervision` and :ref:`fault-tolerance-scala`.
|
||||
|
||||
Lifecycle callbacks
|
||||
-------------------
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue