diff --git a/akka-docs/java/code/akka/docs/actor/FaultHandlingTest.scala b/akka-docs/java/code/akka/docs/actor/FaultHandlingTest.scala new file mode 100644 index 0000000000..9621d22ac8 --- /dev/null +++ b/akka-docs/java/code/akka/docs/actor/FaultHandlingTest.scala @@ -0,0 +1,4 @@ +package akka.docs.actor +import org.scalatest.junit.JUnitSuite + +class FaultHandlingTest extends FaultHandlingTestBase with JUnitSuite diff --git a/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java new file mode 100644 index 0000000000..f2bb3b47ec --- /dev/null +++ b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java @@ -0,0 +1,167 @@ +package akka.docs.actor; + +//#testkit +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.FaultHandlingStrategy; +import static akka.actor.FaultHandlingStrategy.*; +import akka.actor.OneForOneStrategy; +import akka.actor.Props; +import akka.actor.Terminated; +import akka.actor.UntypedActor; +import akka.dispatch.Await; +import akka.util.Duration; +import akka.testkit.AkkaSpec; +import akka.testkit.TestProbe; + +//#testkit +import akka.testkit.ErrorFilter; +import akka.testkit.EventFilter; +import akka.testkit.TestEvent; +import static java.util.concurrent.TimeUnit.SECONDS; +import akka.japi.Function; +import scala.Option; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import org.junit.Test; +import org.junit.BeforeClass; +import org.junit.AfterClass; + +//#testkit +public class FaultHandlingTestBase { + //#testkit + //#supervisor + static public class Supervisor extends UntypedActor { + public void onReceive(Object o) { + if (o instanceof Props) { + getSender().tell(getContext().actorOf((Props) o)); + } + } + } + //#supervisor + + //#supervisor2 + static public class Supervisor2 extends UntypedActor { + public void onReceive(Object o) { + if (o instanceof Props) { + getSender().tell(getContext().actorOf((Props) o)); + } + } + + @Override + public void preRestart(Throwable cause, Option msg) { + // do not kill all children, which is the default here + } + } + //#supervisor2 + + //#child + static public class Child extends UntypedActor { + int state = 0; + + public void onReceive(Object o) throws Exception { + if (o instanceof Exception) { + throw (Exception) o; + } else if (o instanceof Integer) { + state = (Integer) o; + } else if (o.equals("get")) { + getSender().tell(state); + } + } + } + //#child + + //#strategy + static FaultHandlingStrategy strategy = new OneForOneStrategy(new Function() { + @Override + public Action apply(Throwable t) { + if (t instanceof ArithmeticException) { + return resume(); + } else if (t instanceof NullPointerException) { + return restart(); + } else if (t instanceof IllegalArgumentException) { + return stop(); + } else { + return escalate(); + } + } + }, 10, 60000); + //#strategy + + //#testkit + static ActorSystem system; + Duration timeout = Duration.create(5, SECONDS); + + @BeforeClass + public static void start() { + system = ActorSystem.create("test", AkkaSpec.testConf()); + } + + @AfterClass + public static void cleanup() { + system.shutdown(); + } + + @Test + public void mustEmployFaultHandler() { + // code here + //#testkit + EventFilter ex1 = (EventFilter) new ErrorFilter(ArithmeticException.class); + EventFilter ex2 = (EventFilter) new ErrorFilter(NullPointerException.class); + EventFilter ex3 = (EventFilter) new ErrorFilter(IllegalArgumentException.class); + EventFilter ex4 = (EventFilter) new ErrorFilter(Exception.class); + Seq ignoreExceptions = seq(ex1, ex2, ex3, ex4); + system.eventStream().publish(new TestEvent.Mute(ignoreExceptions)); + + //#create + Props superprops = new Props(Supervisor.class).withFaultHandler(strategy); + ActorRef supervisor = system.actorOf(superprops, "supervisor"); + ActorRef child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout); + //#create + + //#resume + child.tell(42); + assert Await.result(child.ask("get", 5000), timeout).equals(42); + child.tell(new ArithmeticException()); + assert Await.result(child.ask("get", 5000), timeout).equals(42); + //#resume + + //#restart + child.tell(new NullPointerException()); + assert Await.result(child.ask("get", 5000), timeout).equals(0); + //#restart + + //#stop + final TestProbe probe = new TestProbe(system); + probe.watch(child); + child.tell(new IllegalArgumentException()); + probe.expectMsg(new Terminated(child)); + //#stop + + //#escalate-kill + child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout); + probe.watch(child); + assert Await.result(child.ask("get", 5000), timeout).equals(0); + child.tell(new Exception()); + probe.expectMsg(new Terminated(child)); + //#escalate-kill + + //#escalate-restart + superprops = new Props(Supervisor2.class).withFaultHandler(strategy); + supervisor = system.actorOf(superprops, "supervisor2"); + child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout); + child.tell(23); + assert Await.result(child.ask("get", 5000), timeout).equals(23); + child.tell(new Exception()); + assert Await.result(child.ask("get", 5000), timeout).equals(0); + //#escalate-restart + //#testkit + } +//#testkit + public Seq seq(A... args) { + return JavaConverters.collectionAsScalaIterableConverter(java.util.Arrays.asList(args)).asScala().toSeq(); + } +//#testkit +} +//#testkit \ No newline at end of file diff --git a/akka-docs/java/fault-tolerance.rst b/akka-docs/java/fault-tolerance.rst index 76eb6f1486..ca6481bb77 100644 --- a/akka-docs/java/fault-tolerance.rst +++ b/akka-docs/java/fault-tolerance.rst @@ -1,10 +1,102 @@ .. _fault-tolerance-java: -Fault Tolerance Through Supervisor Hierarchies (Java) -===================================================== +Fault Handling Strategies (Java) +================================= .. sidebar:: Contents .. contents:: :local: -REWRITE ME \ No newline at end of file +As explained in :ref:`actor-systems` each actor is the supervisor of its +children, and as such each actor is given a fault handling strategy when it is +created. This strategy cannot be changed afterwards as it is an integral part +of the actor system’s structure. + +Creating a Fault Handling Strategy +---------------------------------- + +For the sake of demonstration let us consider the following strategy: + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: strategy + +I have chosen a few well-known exception types in order to demonstrate the +application of the fault handling actions described in :ref:`supervision`. +First off, it is a one-for-one strategy, meaning that each child is treated +separately (an all-for-one strategy works very similarly, the only difference +is that any decision is applied to all children of the supervisor, not only the +failing one). There are limits set on the restart frequency, namely maximum 10 +restarts per minute; each of these settings defaults to could be left out, which means +that the respective limit does not apply, leaving the possibility to specify an +absolute upper limit on the restarts or to make the restarts work infinitely. + +Practical Application +--------------------- + +The following section shows the effects of the different actions in practice, +wherefor a test setup is needed. First off, we need a suitable supervisor: + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: supervisor + +This supervisor will be used to create a child, with which we can experiment: + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: child + +The test is easier by using the utilities described in :ref:`akka-testkit`, +where ``TestProbe`` provides an actor ref useful for receiving and inspecting replies. + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: testkit + +Using the strategy shown above let us create actors: + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: create + +The first test shall demonstrate the ``Resume`` action, so we try it out by +setting some non-initial state in the actor and have it fail: + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: resume + +As you can see the value 42 survives the fault handling action. Now, if we +change the failure to a more serious ``NullPointerException``, that will no +longer be the case: + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: restart + +And finally in case of the fatal ``IllegalArgumentException`` the child will be +terminated by the supervisor: + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: stop + +Up to now the supervisor was completely unaffected by the child’s failure, +because the actions set did handle it. In case of an ``Exception``, this is not +true anymore and the supervisor escalates the failure. + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: escalate-kill + +The supervisor itself is supervised by the top-level actor provided by the +:class:`ActorSystem`, which has the default policy to restart in case of all +``Exception`` cases (with the notable exceptions of +``ActorInitializationException`` and ``ActorKilledException``). Since the +default action in case of a restart is to kill all children, we expected our poor +child not to survive this failure. + +In case this is not desired (which depends on the use case), we need to use a +different supervisor which overrides this behavior. + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: supervisor2 + +With this parent, the child survives the escalated restart, as demonstrated in +the last test: + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: escalate-restart + diff --git a/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSpec.scala new file mode 100644 index 0000000000..2cbf1f4933 --- /dev/null +++ b/akka-docs/scala/code/akka/docs/actor/FaultHandlingDocSpec.scala @@ -0,0 +1,134 @@ +package akka.docs.actor + +//#testkit +import akka.testkit.{ AkkaSpec, ImplicitSender, EventFilter } +import akka.actor.{ ActorRef, Props, Terminated } + +//#testkit +object FaultHandlingDocSpec { + //#supervisor + //#child + import akka.actor.Actor + + //#child + //#supervisor + //#supervisor + class Supervisor extends Actor { + def receive = { + case p: Props ⇒ sender ! context.actorOf(p) + } + } + //#supervisor + + //#supervisor2 + class Supervisor2 extends Actor { + def receive = { + case p: Props ⇒ sender ! context.actorOf(p) + } + // override default to kill all children during restart + override def preRestart(cause: Throwable, msg: Option[Any]) {} + } + //#supervisor2 + + //#child + class Child extends Actor { + var state = 0 + def receive = { + case ex: Exception ⇒ throw ex + case x: Int ⇒ state = x + case "get" ⇒ sender ! state + } + } + //#child +} + +//#testkit +class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender { + //#testkit + + import FaultHandlingDocSpec._ + //#testkit + + "A supervisor" must { + + "apply the chosen strategy for its child" in { + //#testkit + //#strategy + import akka.actor.OneForOneStrategy + import akka.actor.FaultHandlingStrategy._ + + val strategy = OneForOneStrategy({ + case _: ArithmeticException ⇒ Resume + case _: NullPointerException ⇒ Restart + case _: IllegalArgumentException ⇒ Stop + case _: Exception ⇒ Escalate + }: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) + + //#strategy + //#create + val superprops = Props[Supervisor].withFaultHandler(strategy) + val supervisor = system.actorOf(superprops, "supervisor") + + supervisor ! Props[Child] + val child = expectMsgType[ActorRef] // retrieve answer from TestKit’s testActor + //#create + EventFilter[ArithmeticException](occurrences = 1) intercept { + //#resume + child ! 42 // set state to 42 + child ! "get" + expectMsg(42) + + child ! new ArithmeticException // crash it + child ! "get" + expectMsg(42) + //#resume + } + EventFilter[NullPointerException](occurrences = 1) intercept { + //#restart + child ! new NullPointerException // crash it harder + child ! "get" + expectMsg(0) + //#restart + } + EventFilter[IllegalArgumentException](occurrences = 1) intercept { + //#stop + watch(child) // have testActor watch “child” + child ! new IllegalArgumentException // break it + expectMsg(Terminated(child)) + child.isTerminated must be(true) + //#stop + } + EventFilter[Exception]("CRASH", occurrences = 4) intercept { + //#escalate-kill + supervisor ! Props[Child] // create new child + val child2 = expectMsgType[ActorRef] + + watch(child2) + child2 ! "get" // verify it is alive + expectMsg(0) + + child2 ! new Exception("CRASH") // escalate failure + expectMsg(Terminated(child2)) + //#escalate-kill + //#escalate-restart + val superprops2 = Props[Supervisor2].withFaultHandler(strategy) + val supervisor2 = system.actorOf(superprops2, "supervisor2") + + supervisor2 ! Props[Child] + val child3 = expectMsgType[ActorRef] + + child3 ! 23 + child3 ! "get" + expectMsg(23) + + child3 ! new Exception("CRASH") + child3 ! "get" + expectMsg(0) + //#escalate-restart + } + //#testkit + // code here + } + } +} +//#testkit \ No newline at end of file diff --git a/akka-docs/scala/fault-tolerance.rst b/akka-docs/scala/fault-tolerance.rst index 7540de98e4..c2ffa10c79 100644 --- a/akka-docs/scala/fault-tolerance.rst +++ b/akka-docs/scala/fault-tolerance.rst @@ -1,10 +1,108 @@ .. _fault-tolerance-scala: -Fault Tolerance Through Supervisor Hierarchies (Scala) -====================================================== +Fault Handling Strategies (Scala) +================================= .. sidebar:: Contents .. contents:: :local: -REWRITE ME \ No newline at end of file +As explained in :ref:`actor-systems` each actor is the supervisor of its +children, and as such each actor is given a fault handling strategy when it is +created. This strategy cannot be changed afterwards as it is an integral part +of the actor system’s structure. + +Creating a Fault Handling Strategy +---------------------------------- + +For the sake of demonstration let us consider the following strategy: + +.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala + :include: strategy + +I have chosen a few well-known exception types in order to demonstrate the +application of the fault handling actions described in :ref:`supervision`. +First off, it is a one-for-one strategy, meaning that each child is treated +separately (an all-for-one strategy works very similarly, the only difference +is that any decision is applied to all children of the supervisor, not only the +failing one). There are limits set on the restart frequency, namely maximum 10 +restarts per minute; each of these settings defaults to ``None`` which means +that the respective limit does not apply, leaving the possibility to specify an +absolute upper limit on the restarts or to make the restarts work infinitely. + +The match statement which forms the bulk of the body is of type ``Decider``, +which is a ``PartialFunction[Throwable, Action]``, and we need to help out the +type inferencer a bit here by ascribing that type after the closing brace. This +is the piece which maps child failure types to their corresponding actions. + +Practical Application +--------------------- + +The following section shows the effects of the different actions in practice, +wherefor a test setup is needed. First off, we need a suitable supervisor: + +.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala + :include: supervisor + +This supervisor will be used to create a child, with which we can experiment: + +.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala + :include: child + +The test is easier by using the utilities described in :ref:`akka-testkit`, +where ``AkkaSpec`` is a convenient mixture of ``TestKit with WordSpec with +MustMatchers`` + +.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala + :include: testkit + +Using the strategy shown above let us create actors: + +.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala + :include: create + +The first test shall demonstrate the ``Resume`` action, so we try it out by +setting some non-initial state in the actor and have it fail: + +.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala + :include: resume + +As you can see the value 42 survives the fault handling action. Now, if we +change the failure to a more serious ``NullPointerException``, that will no +longer be the case: + +.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala + :include: restart + +And finally in case of the fatal ``IllegalArgumentException`` the child will be +terminated by the supervisor: + +.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala + :include: stop + +Up to now the supervisor was completely unaffected by the child’s failure, +because the actions set did handle it. In case of an ``Exception``, this is not +true anymore and the supervisor escalates the failure. + +.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala + :include: escalate-kill + +The supervisor itself is supervised by the top-level actor provided by the +:class:`ActorSystem`, which has the default policy to restart in case of all +``Exception`` cases (with the notable exceptions of +``ActorInitializationException`` and ``ActorKilledException``). Since the +default action in case of a restart is to kill all children, we expected our poor +child not to survive this failure. + +In case this is not desired (which depends on the use case), we need to use a +different supervisor which overrides this behavior. + +.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala + :include: supervisor2 + +With this parent, the child survives the escalated restart, as demonstrated in +the last test: + +.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala + :include: escalate-restart +