Merge pull request #169 from jboner/rk-doc-fault-handling

document FaultHandlingStrategy for Scala
This commit is contained in:
Roland Kuhn 2011-12-15 14:33:54 -08:00
commit 7dad0a3a78
5 changed files with 501 additions and 6 deletions

View file

@ -0,0 +1,4 @@
package akka.docs.actor
import org.scalatest.junit.JUnitSuite
class FaultHandlingTest extends FaultHandlingTestBase with JUnitSuite

View file

@ -0,0 +1,167 @@
package akka.docs.actor;
//#testkit
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.FaultHandlingStrategy;
import static akka.actor.FaultHandlingStrategy.*;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.dispatch.Await;
import akka.util.Duration;
import akka.testkit.AkkaSpec;
import akka.testkit.TestProbe;
//#testkit
import akka.testkit.ErrorFilter;
import akka.testkit.EventFilter;
import akka.testkit.TestEvent;
import static java.util.concurrent.TimeUnit.SECONDS;
import akka.japi.Function;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import org.junit.Test;
import org.junit.BeforeClass;
import org.junit.AfterClass;
//#testkit
public class FaultHandlingTestBase {
//#testkit
//#supervisor
static public class Supervisor extends UntypedActor {
public void onReceive(Object o) {
if (o instanceof Props) {
getSender().tell(getContext().actorOf((Props) o));
}
}
}
//#supervisor
//#supervisor2
static public class Supervisor2 extends UntypedActor {
public void onReceive(Object o) {
if (o instanceof Props) {
getSender().tell(getContext().actorOf((Props) o));
}
}
@Override
public void preRestart(Throwable cause, Option<Object> msg) {
// do not kill all children, which is the default here
}
}
//#supervisor2
//#child
static public class Child extends UntypedActor {
int state = 0;
public void onReceive(Object o) throws Exception {
if (o instanceof Exception) {
throw (Exception) o;
} else if (o instanceof Integer) {
state = (Integer) o;
} else if (o.equals("get")) {
getSender().tell(state);
}
}
}
//#child
//#strategy
static FaultHandlingStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
@Override
public Action apply(Throwable t) {
if (t instanceof ArithmeticException) {
return resume();
} else if (t instanceof NullPointerException) {
return restart();
} else if (t instanceof IllegalArgumentException) {
return stop();
} else {
return escalate();
}
}
}, 10, 60000);
//#strategy
//#testkit
static ActorSystem system;
Duration timeout = Duration.create(5, SECONDS);
@BeforeClass
public static void start() {
system = ActorSystem.create("test", AkkaSpec.testConf());
}
@AfterClass
public static void cleanup() {
system.shutdown();
}
@Test
public void mustEmployFaultHandler() {
// code here
//#testkit
EventFilter ex1 = (EventFilter) new ErrorFilter(ArithmeticException.class);
EventFilter ex2 = (EventFilter) new ErrorFilter(NullPointerException.class);
EventFilter ex3 = (EventFilter) new ErrorFilter(IllegalArgumentException.class);
EventFilter ex4 = (EventFilter) new ErrorFilter(Exception.class);
Seq<EventFilter> ignoreExceptions = seq(ex1, ex2, ex3, ex4);
system.eventStream().publish(new TestEvent.Mute(ignoreExceptions));
//#create
Props superprops = new Props(Supervisor.class).withFaultHandler(strategy);
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
//#create
//#resume
child.tell(42);
assert Await.result(child.ask("get", 5000), timeout).equals(42);
child.tell(new ArithmeticException());
assert Await.result(child.ask("get", 5000), timeout).equals(42);
//#resume
//#restart
child.tell(new NullPointerException());
assert Await.result(child.ask("get", 5000), timeout).equals(0);
//#restart
//#stop
final TestProbe probe = new TestProbe(system);
probe.watch(child);
child.tell(new IllegalArgumentException());
probe.expectMsg(new Terminated(child));
//#stop
//#escalate-kill
child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
probe.watch(child);
assert Await.result(child.ask("get", 5000), timeout).equals(0);
child.tell(new Exception());
probe.expectMsg(new Terminated(child));
//#escalate-kill
//#escalate-restart
superprops = new Props(Supervisor2.class).withFaultHandler(strategy);
supervisor = system.actorOf(superprops, "supervisor2");
child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
child.tell(23);
assert Await.result(child.ask("get", 5000), timeout).equals(23);
child.tell(new Exception());
assert Await.result(child.ask("get", 5000), timeout).equals(0);
//#escalate-restart
//#testkit
}
//#testkit
public <A> Seq<A> seq(A... args) {
return JavaConverters.collectionAsScalaIterableConverter(java.util.Arrays.asList(args)).asScala().toSeq();
}
//#testkit
}
//#testkit

View file

@ -1,10 +1,102 @@
.. _fault-tolerance-java: .. _fault-tolerance-java:
Fault Tolerance Through Supervisor Hierarchies (Java) Fault Handling Strategies (Java)
===================================================== =================================
.. sidebar:: Contents .. sidebar:: Contents
.. contents:: :local: .. contents:: :local:
REWRITE ME As explained in :ref:`actor-systems` each actor is the supervisor of its
children, and as such each actor is given a fault handling strategy when it is
created. This strategy cannot be changed afterwards as it is an integral part
of the actor systems structure.
Creating a Fault Handling Strategy
----------------------------------
For the sake of demonstration let us consider the following strategy:
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
:include: strategy
I have chosen a few well-known exception types in order to demonstrate the
application of the fault handling actions described in :ref:`supervision`.
First off, it is a one-for-one strategy, meaning that each child is treated
separately (an all-for-one strategy works very similarly, the only difference
is that any decision is applied to all children of the supervisor, not only the
failing one). There are limits set on the restart frequency, namely maximum 10
restarts per minute; each of these settings defaults to could be left out, which means
that the respective limit does not apply, leaving the possibility to specify an
absolute upper limit on the restarts or to make the restarts work infinitely.
Practical Application
---------------------
The following section shows the effects of the different actions in practice,
wherefor a test setup is needed. First off, we need a suitable supervisor:
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
:include: supervisor
This supervisor will be used to create a child, with which we can experiment:
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
:include: child
The test is easier by using the utilities described in :ref:`akka-testkit`,
where ``TestProbe`` provides an actor ref useful for receiving and inspecting replies.
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
:include: testkit
Using the strategy shown above let us create actors:
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
:include: create
The first test shall demonstrate the ``Resume`` action, so we try it out by
setting some non-initial state in the actor and have it fail:
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
:include: resume
As you can see the value 42 survives the fault handling action. Now, if we
change the failure to a more serious ``NullPointerException``, that will no
longer be the case:
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
:include: restart
And finally in case of the fatal ``IllegalArgumentException`` the child will be
terminated by the supervisor:
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
:include: stop
Up to now the supervisor was completely unaffected by the childs failure,
because the actions set did handle it. In case of an ``Exception``, this is not
true anymore and the supervisor escalates the failure.
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
:include: escalate-kill
The supervisor itself is supervised by the top-level actor provided by the
:class:`ActorSystem`, which has the default policy to restart in case of all
``Exception`` cases (with the notable exceptions of
``ActorInitializationException`` and ``ActorKilledException``). Since the
default action in case of a restart is to kill all children, we expected our poor
child not to survive this failure.
In case this is not desired (which depends on the use case), we need to use a
different supervisor which overrides this behavior.
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
:include: supervisor2
With this parent, the child survives the escalated restart, as demonstrated in
the last test:
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
:include: escalate-restart

View file

@ -0,0 +1,134 @@
package akka.docs.actor
//#testkit
import akka.testkit.{ AkkaSpec, ImplicitSender, EventFilter }
import akka.actor.{ ActorRef, Props, Terminated }
//#testkit
object FaultHandlingDocSpec {
//#supervisor
//#child
import akka.actor.Actor
//#child
//#supervisor
//#supervisor
class Supervisor extends Actor {
def receive = {
case p: Props sender ! context.actorOf(p)
}
}
//#supervisor
//#supervisor2
class Supervisor2 extends Actor {
def receive = {
case p: Props sender ! context.actorOf(p)
}
// override default to kill all children during restart
override def preRestart(cause: Throwable, msg: Option[Any]) {}
}
//#supervisor2
//#child
class Child extends Actor {
var state = 0
def receive = {
case ex: Exception throw ex
case x: Int state = x
case "get" sender ! state
}
}
//#child
}
//#testkit
class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender {
//#testkit
import FaultHandlingDocSpec._
//#testkit
"A supervisor" must {
"apply the chosen strategy for its child" in {
//#testkit
//#strategy
import akka.actor.OneForOneStrategy
import akka.actor.FaultHandlingStrategy._
val strategy = OneForOneStrategy({
case _: ArithmeticException Resume
case _: NullPointerException Restart
case _: IllegalArgumentException Stop
case _: Exception Escalate
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000))
//#strategy
//#create
val superprops = Props[Supervisor].withFaultHandler(strategy)
val supervisor = system.actorOf(superprops, "supervisor")
supervisor ! Props[Child]
val child = expectMsgType[ActorRef] // retrieve answer from TestKits testActor
//#create
EventFilter[ArithmeticException](occurrences = 1) intercept {
//#resume
child ! 42 // set state to 42
child ! "get"
expectMsg(42)
child ! new ArithmeticException // crash it
child ! "get"
expectMsg(42)
//#resume
}
EventFilter[NullPointerException](occurrences = 1) intercept {
//#restart
child ! new NullPointerException // crash it harder
child ! "get"
expectMsg(0)
//#restart
}
EventFilter[IllegalArgumentException](occurrences = 1) intercept {
//#stop
watch(child) // have testActor watch child
child ! new IllegalArgumentException // break it
expectMsg(Terminated(child))
child.isTerminated must be(true)
//#stop
}
EventFilter[Exception]("CRASH", occurrences = 4) intercept {
//#escalate-kill
supervisor ! Props[Child] // create new child
val child2 = expectMsgType[ActorRef]
watch(child2)
child2 ! "get" // verify it is alive
expectMsg(0)
child2 ! new Exception("CRASH") // escalate failure
expectMsg(Terminated(child2))
//#escalate-kill
//#escalate-restart
val superprops2 = Props[Supervisor2].withFaultHandler(strategy)
val supervisor2 = system.actorOf(superprops2, "supervisor2")
supervisor2 ! Props[Child]
val child3 = expectMsgType[ActorRef]
child3 ! 23
child3 ! "get"
expectMsg(23)
child3 ! new Exception("CRASH")
child3 ! "get"
expectMsg(0)
//#escalate-restart
}
//#testkit
// code here
}
}
}
//#testkit

View file

@ -1,10 +1,108 @@
.. _fault-tolerance-scala: .. _fault-tolerance-scala:
Fault Tolerance Through Supervisor Hierarchies (Scala) Fault Handling Strategies (Scala)
====================================================== =================================
.. sidebar:: Contents .. sidebar:: Contents
.. contents:: :local: .. contents:: :local:
REWRITE ME As explained in :ref:`actor-systems` each actor is the supervisor of its
children, and as such each actor is given a fault handling strategy when it is
created. This strategy cannot be changed afterwards as it is an integral part
of the actor systems structure.
Creating a Fault Handling Strategy
----------------------------------
For the sake of demonstration let us consider the following strategy:
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
:include: strategy
I have chosen a few well-known exception types in order to demonstrate the
application of the fault handling actions described in :ref:`supervision`.
First off, it is a one-for-one strategy, meaning that each child is treated
separately (an all-for-one strategy works very similarly, the only difference
is that any decision is applied to all children of the supervisor, not only the
failing one). There are limits set on the restart frequency, namely maximum 10
restarts per minute; each of these settings defaults to ``None`` which means
that the respective limit does not apply, leaving the possibility to specify an
absolute upper limit on the restarts or to make the restarts work infinitely.
The match statement which forms the bulk of the body is of type ``Decider``,
which is a ``PartialFunction[Throwable, Action]``, and we need to help out the
type inferencer a bit here by ascribing that type after the closing brace. This
is the piece which maps child failure types to their corresponding actions.
Practical Application
---------------------
The following section shows the effects of the different actions in practice,
wherefor a test setup is needed. First off, we need a suitable supervisor:
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
:include: supervisor
This supervisor will be used to create a child, with which we can experiment:
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
:include: child
The test is easier by using the utilities described in :ref:`akka-testkit`,
where ``AkkaSpec`` is a convenient mixture of ``TestKit with WordSpec with
MustMatchers``
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
:include: testkit
Using the strategy shown above let us create actors:
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
:include: create
The first test shall demonstrate the ``Resume`` action, so we try it out by
setting some non-initial state in the actor and have it fail:
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
:include: resume
As you can see the value 42 survives the fault handling action. Now, if we
change the failure to a more serious ``NullPointerException``, that will no
longer be the case:
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
:include: restart
And finally in case of the fatal ``IllegalArgumentException`` the child will be
terminated by the supervisor:
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
:include: stop
Up to now the supervisor was completely unaffected by the childs failure,
because the actions set did handle it. In case of an ``Exception``, this is not
true anymore and the supervisor escalates the failure.
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
:include: escalate-kill
The supervisor itself is supervised by the top-level actor provided by the
:class:`ActorSystem`, which has the default policy to restart in case of all
``Exception`` cases (with the notable exceptions of
``ActorInitializationException`` and ``ActorKilledException``). Since the
default action in case of a restart is to kill all children, we expected our poor
child not to survive this failure.
In case this is not desired (which depends on the use case), we need to use a
different supervisor which overrides this behavior.
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
:include: supervisor2
With this parent, the child survives the escalated restart, as demonstrated in
the last test:
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
:include: escalate-restart