Merge pull request #283 from jboner/wip-1716-directives-patriknw

Replaced Action with Directive in SupervisorStrategy. See #1716
This commit is contained in:
patriknw 2012-02-01 02:28:42 -08:00
commit ed7ff3b02f
6 changed files with 52 additions and 46 deletions

View file

@ -47,36 +47,36 @@ case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int =
trait SupervisorStrategyLowPriorityImplicits { this: SupervisorStrategy.type trait SupervisorStrategyLowPriorityImplicits { this: SupervisorStrategy.type
/** /**
* Implicit conversion from `Seq` of Cause-Action pairs to a `Decider`. See makeDecider(causeAction). * Implicit conversion from `Seq` of Cause-Directive pairs to a `Decider`. See makeDecider(causeDirective).
*/ */
implicit def seqCauseAction2Decider(trapExit: Iterable[CauseAction]): Decider = makeDecider(trapExit) implicit def seqCauseDirective2Decider(trapExit: Iterable[CauseDirective]): Decider = makeDecider(trapExit)
// the above would clash with seqThrowable2Decider for empty lists // the above would clash with seqThrowable2Decider for empty lists
} }
object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
sealed trait Action sealed trait Directive
/** /**
* Resumes message processing for the failed Actor * Resumes message processing for the failed Actor
*/ */
case object Resume extends Action case object Resume extends Directive
/** /**
* Discards the old Actor instance and replaces it with a new, * Discards the old Actor instance and replaces it with a new,
* then resumes message processing. * then resumes message processing.
*/ */
case object Restart extends Action case object Restart extends Directive
/** /**
* Stops the Actor * Stops the Actor
*/ */
case object Stop extends Action case object Stop extends Directive
/** /**
* Escalates the failure to the supervisor of the supervisor, * Escalates the failure to the supervisor of the supervisor,
* by rethrowing the cause of the failure. * by rethrowing the cause of the failure.
*/ */
case object Escalate extends Action case object Escalate extends Directive
/** /**
* Resumes message processing for the failed Actor * Resumes message processing for the failed Actor
@ -127,9 +127,9 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
*/ */
implicit def seqThrowable2Decider(trapExit: Seq[Class[_ <: Throwable]]): Decider = makeDecider(trapExit) implicit def seqThrowable2Decider(trapExit: Seq[Class[_ <: Throwable]]): Decider = makeDecider(trapExit)
type Decider = PartialFunction[Throwable, Action] type Decider = PartialFunction[Throwable, Directive]
type JDecider = akka.japi.Function[Throwable, Action] type JDecider = akka.japi.Function[Throwable, Directive]
type CauseAction = (Class[_ <: Throwable], Action) type CauseDirective = (Class[_ <: Throwable], Directive)
/** /**
* Decider builder which just checks whether one of * Decider builder which just checks whether one of
@ -152,14 +152,14 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toSeq) def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toSeq)
/** /**
* Decider builder for Iterables of cause-action pairs, e.g. a map obtained * Decider builder for Iterables of cause-directive pairs, e.g. a map obtained
* from configuration; will sort the pairs so that the most specific type is * from configuration; will sort the pairs so that the most specific type is
* checked before all its subtypes, allowing carving out subtrees of the * checked before all its subtypes, allowing carving out subtrees of the
* Throwable hierarchy. * Throwable hierarchy.
*/ */
def makeDecider(flat: Iterable[CauseAction]): Decider = { def makeDecider(flat: Iterable[CauseDirective]): Decider = {
val actions = sort(flat) val directives = sort(flat)
return { case x actions find (_._1 isInstance x) map (_._2) getOrElse Escalate } return { case x directives find (_._1 isInstance x) map (_._2) getOrElse Escalate }
} }
def makeDecider(func: JDecider): Decider = { def makeDecider(func: JDecider): Decider = {
@ -170,8 +170,8 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
* Sort so that subtypes always precede their supertypes, but without * Sort so that subtypes always precede their supertypes, but without
* obeying any order between unrelated subtypes (insert sort). * obeying any order between unrelated subtypes (insert sort).
*/ */
def sort(in: Iterable[CauseAction]): Seq[CauseAction] = def sort(in: Iterable[CauseDirective]): Seq[CauseDirective] =
(new ArrayBuffer[CauseAction](in.size) /: in) { (buf, ca) (new ArrayBuffer[CauseDirective](in.size) /: in) { (buf, ca)
buf.indexWhere(_._1 isAssignableFrom ca._1) match { buf.indexWhere(_._1 isAssignableFrom ca._1) match {
case -1 buf append ca case -1 buf append ca
case x buf insert (x, ca) case x buf insert (x, ca)
@ -215,8 +215,8 @@ abstract class SupervisorStrategy {
* Returns whether it processed the failure or not * Returns whether it processed the failure or not
*/ */
def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate val directive = if (decider.isDefinedAt(cause)) decider(cause) else Escalate
action match { directive match {
case Resume child.asInstanceOf[InternalActorRef].resume(); true case Resume child.asInstanceOf[InternalActorRef].resume(); true
case Restart processFailure(context, true, child, cause, stats, children); true case Restart processFailure(context, true, child, cause, stats, children); true
case Stop processFailure(context, false, child, cause, stats, children); true case Stop processFailure(context, false, child, cause, stats, children); true
@ -227,10 +227,13 @@ abstract class SupervisorStrategy {
} }
/** /**
* Restart all child actors when one fails * Applies the fault handling `Directive` (Resume, Restart, Stop) specified in the `Decider`
* to all children when one fails, as opposed to [[akka.actor.OneForOneStrategy]] that applies
* it only to the child actor that failed.
*
* @param maxNrOfRetries the number of times an actor is allowed to be restarted, negative value means no limit * @param maxNrOfRetries the number of times an actor is allowed to be restarted, negative value means no limit
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a * @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Directive]], you can also use a
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates. * `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
*/ */
case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider) case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider)
@ -270,10 +273,13 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration
} }
/** /**
* Restart a child actor when it fails * Applies the fault handling `Directive` (Resume, Restart, Stop) specified in the `Decider`
* to the child actor that failed, as opposed to [[akka.actor.AllForOneStrategy]] that applies
* it to all children.
*
* @param maxNrOfRetries the number of times an actor is allowed to be restarted, negative value means no limit * @param maxNrOfRetries the number of times an actor is allowed to be restarted, negative value means no limit
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a * @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Directive]], you can also use a
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates. * `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
*/ */
case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider) case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider)

View file

@ -37,9 +37,9 @@ import akka.japi.{ Creator }
* } * }
* *
* private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"), * private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
* new Function<Throwable, Action>() { * new Function<Throwable, Directive>() {
* @Override * @Override
* public Action apply(Throwable t) { * public Directive apply(Throwable t) {
* if (t instanceof ArithmeticException) { * if (t instanceof ArithmeticException) {
* return resume(); * return resume();
* } else if (t instanceof NullPointerException) { * } else if (t instanceof NullPointerException) {

View file

@ -40,9 +40,9 @@ public class FaultHandlingTestBase {
//#strategy //#strategy
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"), private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
new Function<Throwable, Action>() { new Function<Throwable, Directive>() {
@Override @Override
public Action apply(Throwable t) { public Directive apply(Throwable t) {
if (t instanceof ArithmeticException) { if (t instanceof ArithmeticException) {
return resume(); return resume();
} else if (t instanceof NullPointerException) { } else if (t instanceof NullPointerException) {
@ -78,9 +78,9 @@ public class FaultHandlingTestBase {
//#strategy2 //#strategy2
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"), private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
new Function<Throwable, Action>() { new Function<Throwable, Directive>() {
@Override @Override
public Action apply(Throwable t) { public Directive apply(Throwable t) {
if (t instanceof ArithmeticException) { if (t instanceof ArithmeticException) {
return resume(); return resume();
} else if (t instanceof NullPointerException) { } else if (t instanceof NullPointerException) {

View file

@ -118,9 +118,9 @@ public class FaultHandlingDocSample {
// Stop the CounterService child if it throws ServiceUnavailable // Stop the CounterService child if it throws ServiceUnavailable
private static SupervisorStrategy strategy = new OneForOneStrategy(-1, Duration.Inf(), private static SupervisorStrategy strategy = new OneForOneStrategy(-1, Duration.Inf(),
new Function<Throwable, Action>() { new Function<Throwable, Directive>() {
@Override @Override
public Action apply(Throwable t) { public Directive apply(Throwable t) {
if (t instanceof ServiceUnavailable) { if (t instanceof ServiceUnavailable) {
return stop(); return stop();
} else { } else {
@ -229,9 +229,9 @@ public class FaultHandlingDocSample {
// Restart the storage child when StorageException is thrown. // Restart the storage child when StorageException is thrown.
// After 3 restarts within 5 seconds it will be stopped. // After 3 restarts within 5 seconds it will be stopped.
private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.parse("5 seconds"), private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.parse("5 seconds"),
new Function<Throwable, Action>() { new Function<Throwable, Directive>() {
@Override @Override
public Action apply(Throwable t) { public Directive apply(Throwable t) {
if (t instanceof StorageException) { if (t instanceof StorageException) {
return restart(); return restart();
} else { } else {

View file

@ -43,7 +43,7 @@ For the sake of demonstration let us consider the following strategy:
:include: strategy :include: strategy
I have chosen a few well-known exception types in order to demonstrate the I have chosen a few well-known exception types in order to demonstrate the
application of the fault handling actions described in :ref:`supervision`. application of the fault handling directives described in :ref:`supervision`.
First off, it is a one-for-one strategy, meaning that each child is treated First off, it is a one-for-one strategy, meaning that each child is treated
separately (an all-for-one strategy works very similarly, the only difference separately (an all-for-one strategy works very similarly, the only difference
is that any decision is applied to all children of the supervisor, not only the is that any decision is applied to all children of the supervisor, not only the
@ -71,7 +71,7 @@ in the same way as the default strategy defined above.
Test Application Test Application
---------------- ----------------
The following section shows the effects of the different actions in practice, The following section shows the effects of the different directives in practice,
wherefor a test setup is needed. First off, we need a suitable supervisor: wherefor a test setup is needed. First off, we need a suitable supervisor:
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java .. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
@ -93,13 +93,13 @@ Let us create actors:
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java .. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
:include: create :include: create
The first test shall demonstrate the ``Resume`` action, so we try it out by The first test shall demonstrate the ``Resume`` directive, so we try it out by
setting some non-initial state in the actor and have it fail: setting some non-initial state in the actor and have it fail:
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java .. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
:include: resume :include: resume
As you can see the value 42 survives the fault handling action. Now, if we As you can see the value 42 survives the fault handling directive. Now, if we
change the failure to a more serious ``NullPointerException``, that will no change the failure to a more serious ``NullPointerException``, that will no
longer be the case: longer be the case:
@ -113,7 +113,7 @@ terminated by the supervisor:
:include: stop :include: stop
Up to now the supervisor was completely unaffected by the childs failure, Up to now the supervisor was completely unaffected by the childs failure,
because the actions set did handle it. In case of an ``Exception``, this is not because the directives set did handle it. In case of an ``Exception``, this is not
true anymore and the supervisor escalates the failure. true anymore and the supervisor escalates the failure.
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java .. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
@ -123,7 +123,7 @@ The supervisor itself is supervised by the top-level actor provided by the
:class:`ActorSystem`, which has the default policy to restart in case of all :class:`ActorSystem`, which has the default policy to restart in case of all
``Exception`` cases (with the notable exceptions of ``Exception`` cases (with the notable exceptions of
``ActorInitializationException`` and ``ActorKilledException``). Since the ``ActorInitializationException`` and ``ActorKilledException``). Since the
default action in case of a restart is to kill all children, we expected our poor default directive in case of a restart is to kill all children, we expected our poor
child not to survive this failure. child not to survive this failure.
In case this is not desired (which depends on the use case), we need to use a In case this is not desired (which depends on the use case), we need to use a

View file

@ -43,7 +43,7 @@ For the sake of demonstration let us consider the following strategy:
:include: strategy :include: strategy
I have chosen a few well-known exception types in order to demonstrate the I have chosen a few well-known exception types in order to demonstrate the
application of the fault handling actions described in :ref:`supervision`. application of the fault handling directives described in :ref:`supervision`.
First off, it is a one-for-one strategy, meaning that each child is treated First off, it is a one-for-one strategy, meaning that each child is treated
separately (an all-for-one strategy works very similarly, the only difference separately (an all-for-one strategy works very similarly, the only difference
is that any decision is applied to all children of the supervisor, not only the is that any decision is applied to all children of the supervisor, not only the
@ -53,8 +53,8 @@ that the respective limit does not apply, leaving the possibility to specify an
absolute upper limit on the restarts or to make the restarts work infinitely. absolute upper limit on the restarts or to make the restarts work infinitely.
The match statement which forms the bulk of the body is of type ``Decider``, The match statement which forms the bulk of the body is of type ``Decider``,
which is a ``PartialFunction[Throwable, Action]``. This which is a ``PartialFunction[Throwable, Directive]``. This
is the piece which maps child failure types to their corresponding actions. is the piece which maps child failure types to their corresponding directives.
Default Supervisor Strategy Default Supervisor Strategy
--------------------------- ---------------------------
@ -76,7 +76,7 @@ in the same way as the default strategy defined above.
Test Application Test Application
---------------- ----------------
The following section shows the effects of the different actions in practice, The following section shows the effects of the different directives in practice,
wherefor a test setup is needed. First off, we need a suitable supervisor: wherefor a test setup is needed. First off, we need a suitable supervisor:
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala .. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
@ -99,13 +99,13 @@ Let us create actors:
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala .. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
:include: create :include: create
The first test shall demonstrate the ``Resume`` action, so we try it out by The first test shall demonstrate the ``Resume`` directive, so we try it out by
setting some non-initial state in the actor and have it fail: setting some non-initial state in the actor and have it fail:
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala .. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
:include: resume :include: resume
As you can see the value 42 survives the fault handling action. Now, if we As you can see the value 42 survives the fault handling directive. Now, if we
change the failure to a more serious ``NullPointerException``, that will no change the failure to a more serious ``NullPointerException``, that will no
longer be the case: longer be the case:
@ -119,7 +119,7 @@ terminated by the supervisor:
:include: stop :include: stop
Up to now the supervisor was completely unaffected by the childs failure, Up to now the supervisor was completely unaffected by the childs failure,
because the actions set did handle it. In case of an ``Exception``, this is not because the directives set did handle it. In case of an ``Exception``, this is not
true anymore and the supervisor escalates the failure. true anymore and the supervisor escalates the failure.
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala .. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
@ -129,7 +129,7 @@ The supervisor itself is supervised by the top-level actor provided by the
:class:`ActorSystem`, which has the default policy to restart in case of all :class:`ActorSystem`, which has the default policy to restart in case of all
``Exception`` cases (with the notable exceptions of ``Exception`` cases (with the notable exceptions of
``ActorInitializationException`` and ``ActorKilledException``). Since the ``ActorInitializationException`` and ``ActorKilledException``). Since the
default action in case of a restart is to kill all children, we expected our poor default directive in case of a restart is to kill all children, we expected our poor
child not to survive this failure. child not to survive this failure.
In case this is not desired (which depends on the use case), we need to use a In case this is not desired (which depends on the use case), we need to use a