Replaced Action with Directive in SupervisorStrategy. See #1716
This commit is contained in:
parent
4162372024
commit
2a4418799a
6 changed files with 44 additions and 44 deletions
|
|
@ -47,36 +47,36 @@ case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int =
|
|||
trait SupervisorStrategyLowPriorityImplicits { this: SupervisorStrategy.type ⇒
|
||||
|
||||
/**
|
||||
* Implicit conversion from `Seq` of Cause-Action pairs to a `Decider`. See makeDecider(causeAction).
|
||||
* Implicit conversion from `Seq` of Cause-Directive pairs to a `Decider`. See makeDecider(causeDirective).
|
||||
*/
|
||||
implicit def seqCauseAction2Decider(trapExit: Iterable[CauseAction]): Decider = makeDecider(trapExit)
|
||||
implicit def seqCauseDirective2Decider(trapExit: Iterable[CauseDirective]): Decider = makeDecider(trapExit)
|
||||
// the above would clash with seqThrowable2Decider for empty lists
|
||||
}
|
||||
|
||||
object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
|
||||
sealed trait Action
|
||||
sealed trait Directive
|
||||
|
||||
/**
|
||||
* Resumes message processing for the failed Actor
|
||||
*/
|
||||
case object Resume extends Action
|
||||
case object Resume extends Directive
|
||||
|
||||
/**
|
||||
* Discards the old Actor instance and replaces it with a new,
|
||||
* then resumes message processing.
|
||||
*/
|
||||
case object Restart extends Action
|
||||
case object Restart extends Directive
|
||||
|
||||
/**
|
||||
* Stops the Actor
|
||||
*/
|
||||
case object Stop extends Action
|
||||
case object Stop extends Directive
|
||||
|
||||
/**
|
||||
* Escalates the failure to the supervisor of the supervisor,
|
||||
* by rethrowing the cause of the failure.
|
||||
*/
|
||||
case object Escalate extends Action
|
||||
case object Escalate extends Directive
|
||||
|
||||
/**
|
||||
* Resumes message processing for the failed Actor
|
||||
|
|
@ -127,9 +127,9 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
|
|||
*/
|
||||
implicit def seqThrowable2Decider(trapExit: Seq[Class[_ <: Throwable]]): Decider = makeDecider(trapExit)
|
||||
|
||||
type Decider = PartialFunction[Throwable, Action]
|
||||
type JDecider = akka.japi.Function[Throwable, Action]
|
||||
type CauseAction = (Class[_ <: Throwable], Action)
|
||||
type Decider = PartialFunction[Throwable, Directive]
|
||||
type JDecider = akka.japi.Function[Throwable, Directive]
|
||||
type CauseDirective = (Class[_ <: Throwable], Directive)
|
||||
|
||||
/**
|
||||
* Decider builder which just checks whether one of
|
||||
|
|
@ -152,14 +152,14 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
|
|||
def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toSeq)
|
||||
|
||||
/**
|
||||
* Decider builder for Iterables of cause-action pairs, e.g. a map obtained
|
||||
* Decider builder for Iterables of cause-directive pairs, e.g. a map obtained
|
||||
* from configuration; will sort the pairs so that the most specific type is
|
||||
* checked before all its subtypes, allowing carving out subtrees of the
|
||||
* Throwable hierarchy.
|
||||
*/
|
||||
def makeDecider(flat: Iterable[CauseAction]): Decider = {
|
||||
val actions = sort(flat)
|
||||
return { case x ⇒ actions find (_._1 isInstance x) map (_._2) getOrElse Escalate }
|
||||
def makeDecider(flat: Iterable[CauseDirective]): Decider = {
|
||||
val directives = sort(flat)
|
||||
return { case x ⇒ directives find (_._1 isInstance x) map (_._2) getOrElse Escalate }
|
||||
}
|
||||
|
||||
def makeDecider(func: JDecider): Decider = {
|
||||
|
|
@ -170,8 +170,8 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
|
|||
* Sort so that subtypes always precede their supertypes, but without
|
||||
* obeying any order between unrelated subtypes (insert sort).
|
||||
*/
|
||||
def sort(in: Iterable[CauseAction]): Seq[CauseAction] =
|
||||
(new ArrayBuffer[CauseAction](in.size) /: in) { (buf, ca) ⇒
|
||||
def sort(in: Iterable[CauseDirective]): Seq[CauseDirective] =
|
||||
(new ArrayBuffer[CauseDirective](in.size) /: in) { (buf, ca) ⇒
|
||||
buf.indexWhere(_._1 isAssignableFrom ca._1) match {
|
||||
case -1 ⇒ buf append ca
|
||||
case x ⇒ buf insert (x, ca)
|
||||
|
|
@ -215,8 +215,8 @@ abstract class SupervisorStrategy {
|
|||
* Returns whether it processed the failure or not
|
||||
*/
|
||||
def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
|
||||
val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate
|
||||
action match {
|
||||
val directive = if (decider.isDefinedAt(cause)) decider(cause) else Escalate
|
||||
directive match {
|
||||
case Resume ⇒ child.asInstanceOf[InternalActorRef].resume(); true
|
||||
case Restart ⇒ processFailure(context, true, child, cause, stats, children); true
|
||||
case Stop ⇒ processFailure(context, false, child, cause, stats, children); true
|
||||
|
|
@ -230,7 +230,7 @@ abstract class SupervisorStrategy {
|
|||
* Restart all child actors when one fails
|
||||
* @param maxNrOfRetries the number of times an actor is allowed to be restarted, negative value means no limit
|
||||
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
||||
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
|
||||
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Directive]], you can also use a
|
||||
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
||||
*/
|
||||
case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider)
|
||||
|
|
@ -273,7 +273,7 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration
|
|||
* Restart a child actor when it fails
|
||||
* @param maxNrOfRetries the number of times an actor is allowed to be restarted, negative value means no limit
|
||||
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
||||
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
|
||||
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Directive]], you can also use a
|
||||
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
||||
*/
|
||||
case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider)
|
||||
|
|
|
|||
|
|
@ -37,9 +37,9 @@ import akka.japi.{ Creator }
|
|||
* }
|
||||
*
|
||||
* private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
|
||||
* new Function<Throwable, Action>() {
|
||||
* new Function<Throwable, Directive>() {
|
||||
* @Override
|
||||
* public Action apply(Throwable t) {
|
||||
* public Directive apply(Throwable t) {
|
||||
* if (t instanceof ArithmeticException) {
|
||||
* return resume();
|
||||
* } else if (t instanceof NullPointerException) {
|
||||
|
|
|
|||
|
|
@ -40,9 +40,9 @@ public class FaultHandlingTestBase {
|
|||
|
||||
//#strategy
|
||||
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
|
||||
new Function<Throwable, Action>() {
|
||||
new Function<Throwable, Directive>() {
|
||||
@Override
|
||||
public Action apply(Throwable t) {
|
||||
public Directive apply(Throwable t) {
|
||||
if (t instanceof ArithmeticException) {
|
||||
return resume();
|
||||
} else if (t instanceof NullPointerException) {
|
||||
|
|
@ -78,9 +78,9 @@ public class FaultHandlingTestBase {
|
|||
|
||||
//#strategy2
|
||||
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
|
||||
new Function<Throwable, Action>() {
|
||||
new Function<Throwable, Directive>() {
|
||||
@Override
|
||||
public Action apply(Throwable t) {
|
||||
public Directive apply(Throwable t) {
|
||||
if (t instanceof ArithmeticException) {
|
||||
return resume();
|
||||
} else if (t instanceof NullPointerException) {
|
||||
|
|
|
|||
|
|
@ -115,9 +115,9 @@ public class FaultHandlingDocSample {
|
|||
|
||||
// Stop the CounterService child if it throws ServiceUnavailable
|
||||
private static SupervisorStrategy strategy = new OneForOneStrategy(-1, Duration.Inf(),
|
||||
new Function<Throwable, Action>() {
|
||||
new Function<Throwable, Directive>() {
|
||||
@Override
|
||||
public Action apply(Throwable t) {
|
||||
public Directive apply(Throwable t) {
|
||||
if (t instanceof ServiceUnavailable) {
|
||||
return stop();
|
||||
} else {
|
||||
|
|
@ -224,9 +224,9 @@ public class FaultHandlingDocSample {
|
|||
// Restart the storage child when StorageException is thrown.
|
||||
// After 3 restarts within 5 seconds it will be stopped.
|
||||
private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.parse("5 seconds"),
|
||||
new Function<Throwable, Action>() {
|
||||
new Function<Throwable, Directive>() {
|
||||
@Override
|
||||
public Action apply(Throwable t) {
|
||||
public Directive apply(Throwable t) {
|
||||
if (t instanceof StorageException) {
|
||||
return restart();
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ For the sake of demonstration let us consider the following strategy:
|
|||
:include: strategy
|
||||
|
||||
I have chosen a few well-known exception types in order to demonstrate the
|
||||
application of the fault handling actions described in :ref:`supervision`.
|
||||
application of the fault handling directives described in :ref:`supervision`.
|
||||
First off, it is a one-for-one strategy, meaning that each child is treated
|
||||
separately (an all-for-one strategy works very similarly, the only difference
|
||||
is that any decision is applied to all children of the supervisor, not only the
|
||||
|
|
@ -71,7 +71,7 @@ in the same way as the default strategy defined above.
|
|||
Test Application
|
||||
----------------
|
||||
|
||||
The following section shows the effects of the different actions in practice,
|
||||
The following section shows the effects of the different directives in practice,
|
||||
wherefor a test setup is needed. First off, we need a suitable supervisor:
|
||||
|
||||
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||
|
|
@ -93,13 +93,13 @@ Let us create actors:
|
|||
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||
:include: create
|
||||
|
||||
The first test shall demonstrate the ``Resume`` action, so we try it out by
|
||||
The first test shall demonstrate the ``Resume`` directive, so we try it out by
|
||||
setting some non-initial state in the actor and have it fail:
|
||||
|
||||
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||
:include: resume
|
||||
|
||||
As you can see the value 42 survives the fault handling action. Now, if we
|
||||
As you can see the value 42 survives the fault handling directive. Now, if we
|
||||
change the failure to a more serious ``NullPointerException``, that will no
|
||||
longer be the case:
|
||||
|
||||
|
|
@ -113,7 +113,7 @@ terminated by the supervisor:
|
|||
:include: stop
|
||||
|
||||
Up to now the supervisor was completely unaffected by the child’s failure,
|
||||
because the actions set did handle it. In case of an ``Exception``, this is not
|
||||
because the directives set did handle it. In case of an ``Exception``, this is not
|
||||
true anymore and the supervisor escalates the failure.
|
||||
|
||||
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||
|
|
@ -123,7 +123,7 @@ The supervisor itself is supervised by the top-level actor provided by the
|
|||
:class:`ActorSystem`, which has the default policy to restart in case of all
|
||||
``Exception`` cases (with the notable exceptions of
|
||||
``ActorInitializationException`` and ``ActorKilledException``). Since the
|
||||
default action in case of a restart is to kill all children, we expected our poor
|
||||
default directive in case of a restart is to kill all children, we expected our poor
|
||||
child not to survive this failure.
|
||||
|
||||
In case this is not desired (which depends on the use case), we need to use a
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ For the sake of demonstration let us consider the following strategy:
|
|||
:include: strategy
|
||||
|
||||
I have chosen a few well-known exception types in order to demonstrate the
|
||||
application of the fault handling actions described in :ref:`supervision`.
|
||||
application of the fault handling directives described in :ref:`supervision`.
|
||||
First off, it is a one-for-one strategy, meaning that each child is treated
|
||||
separately (an all-for-one strategy works very similarly, the only difference
|
||||
is that any decision is applied to all children of the supervisor, not only the
|
||||
|
|
@ -53,8 +53,8 @@ that the respective limit does not apply, leaving the possibility to specify an
|
|||
absolute upper limit on the restarts or to make the restarts work infinitely.
|
||||
|
||||
The match statement which forms the bulk of the body is of type ``Decider``,
|
||||
which is a ``PartialFunction[Throwable, Action]``. This
|
||||
is the piece which maps child failure types to their corresponding actions.
|
||||
which is a ``PartialFunction[Throwable, Directive]``. This
|
||||
is the piece which maps child failure types to their corresponding directives.
|
||||
|
||||
Default Supervisor Strategy
|
||||
---------------------------
|
||||
|
|
@ -76,7 +76,7 @@ in the same way as the default strategy defined above.
|
|||
Test Application
|
||||
----------------
|
||||
|
||||
The following section shows the effects of the different actions in practice,
|
||||
The following section shows the effects of the different directives in practice,
|
||||
wherefor a test setup is needed. First off, we need a suitable supervisor:
|
||||
|
||||
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
|
||||
|
|
@ -99,13 +99,13 @@ Let us create actors:
|
|||
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
|
||||
:include: create
|
||||
|
||||
The first test shall demonstrate the ``Resume`` action, so we try it out by
|
||||
The first test shall demonstrate the ``Resume`` directive, so we try it out by
|
||||
setting some non-initial state in the actor and have it fail:
|
||||
|
||||
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
|
||||
:include: resume
|
||||
|
||||
As you can see the value 42 survives the fault handling action. Now, if we
|
||||
As you can see the value 42 survives the fault handling directive. Now, if we
|
||||
change the failure to a more serious ``NullPointerException``, that will no
|
||||
longer be the case:
|
||||
|
||||
|
|
@ -119,7 +119,7 @@ terminated by the supervisor:
|
|||
:include: stop
|
||||
|
||||
Up to now the supervisor was completely unaffected by the child’s failure,
|
||||
because the actions set did handle it. In case of an ``Exception``, this is not
|
||||
because the directives set did handle it. In case of an ``Exception``, this is not
|
||||
true anymore and the supervisor escalates the failure.
|
||||
|
||||
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
|
||||
|
|
@ -129,7 +129,7 @@ The supervisor itself is supervised by the top-level actor provided by the
|
|||
:class:`ActorSystem`, which has the default policy to restart in case of all
|
||||
``Exception`` cases (with the notable exceptions of
|
||||
``ActorInitializationException`` and ``ActorKilledException``). Since the
|
||||
default action in case of a restart is to kill all children, we expected our poor
|
||||
default directive in case of a restart is to kill all children, we expected our poor
|
||||
child not to survive this failure.
|
||||
|
||||
In case this is not desired (which depends on the use case), we need to use a
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue