diff --git a/akka-docs/java/code/akka/docs/actor/FaultHandlingTest.scala b/akka-docs/java/code/akka/docs/actor/FaultHandlingTest.scala new file mode 100644 index 0000000000..9621d22ac8 --- /dev/null +++ b/akka-docs/java/code/akka/docs/actor/FaultHandlingTest.scala @@ -0,0 +1,4 @@ +package akka.docs.actor +import org.scalatest.junit.JUnitSuite + +class FaultHandlingTest extends FaultHandlingTestBase with JUnitSuite diff --git a/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java new file mode 100644 index 0000000000..f2bb3b47ec --- /dev/null +++ b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java @@ -0,0 +1,167 @@ +package akka.docs.actor; + +//#testkit +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.FaultHandlingStrategy; +import static akka.actor.FaultHandlingStrategy.*; +import akka.actor.OneForOneStrategy; +import akka.actor.Props; +import akka.actor.Terminated; +import akka.actor.UntypedActor; +import akka.dispatch.Await; +import akka.util.Duration; +import akka.testkit.AkkaSpec; +import akka.testkit.TestProbe; + +//#testkit +import akka.testkit.ErrorFilter; +import akka.testkit.EventFilter; +import akka.testkit.TestEvent; +import static java.util.concurrent.TimeUnit.SECONDS; +import akka.japi.Function; +import scala.Option; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import org.junit.Test; +import org.junit.BeforeClass; +import org.junit.AfterClass; + +//#testkit +public class FaultHandlingTestBase { + //#testkit + //#supervisor + static public class Supervisor extends UntypedActor { + public void onReceive(Object o) { + if (o instanceof Props) { + getSender().tell(getContext().actorOf((Props) o)); + } + } + } + //#supervisor + + //#supervisor2 + static public class Supervisor2 extends UntypedActor { + public void onReceive(Object o) { + if (o instanceof Props) { + getSender().tell(getContext().actorOf((Props) o)); + } + } + + @Override + public void preRestart(Throwable cause, Option msg) { + // do not kill all children, which is the default here + } + } + //#supervisor2 + + //#child + static public class Child extends UntypedActor { + int state = 0; + + public void onReceive(Object o) throws Exception { + if (o instanceof Exception) { + throw (Exception) o; + } else if (o instanceof Integer) { + state = (Integer) o; + } else if (o.equals("get")) { + getSender().tell(state); + } + } + } + //#child + + //#strategy + static FaultHandlingStrategy strategy = new OneForOneStrategy(new Function() { + @Override + public Action apply(Throwable t) { + if (t instanceof ArithmeticException) { + return resume(); + } else if (t instanceof NullPointerException) { + return restart(); + } else if (t instanceof IllegalArgumentException) { + return stop(); + } else { + return escalate(); + } + } + }, 10, 60000); + //#strategy + + //#testkit + static ActorSystem system; + Duration timeout = Duration.create(5, SECONDS); + + @BeforeClass + public static void start() { + system = ActorSystem.create("test", AkkaSpec.testConf()); + } + + @AfterClass + public static void cleanup() { + system.shutdown(); + } + + @Test + public void mustEmployFaultHandler() { + // code here + //#testkit + EventFilter ex1 = (EventFilter) new ErrorFilter(ArithmeticException.class); + EventFilter ex2 = (EventFilter) new ErrorFilter(NullPointerException.class); + EventFilter ex3 = (EventFilter) new ErrorFilter(IllegalArgumentException.class); + EventFilter ex4 = (EventFilter) new ErrorFilter(Exception.class); + Seq ignoreExceptions = seq(ex1, ex2, ex3, ex4); + system.eventStream().publish(new TestEvent.Mute(ignoreExceptions)); + + //#create + Props superprops = new Props(Supervisor.class).withFaultHandler(strategy); + ActorRef supervisor = system.actorOf(superprops, "supervisor"); + ActorRef child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout); + //#create + + //#resume + child.tell(42); + assert Await.result(child.ask("get", 5000), timeout).equals(42); + child.tell(new ArithmeticException()); + assert Await.result(child.ask("get", 5000), timeout).equals(42); + //#resume + + //#restart + child.tell(new NullPointerException()); + assert Await.result(child.ask("get", 5000), timeout).equals(0); + //#restart + + //#stop + final TestProbe probe = new TestProbe(system); + probe.watch(child); + child.tell(new IllegalArgumentException()); + probe.expectMsg(new Terminated(child)); + //#stop + + //#escalate-kill + child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout); + probe.watch(child); + assert Await.result(child.ask("get", 5000), timeout).equals(0); + child.tell(new Exception()); + probe.expectMsg(new Terminated(child)); + //#escalate-kill + + //#escalate-restart + superprops = new Props(Supervisor2.class).withFaultHandler(strategy); + supervisor = system.actorOf(superprops, "supervisor2"); + child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout); + child.tell(23); + assert Await.result(child.ask("get", 5000), timeout).equals(23); + child.tell(new Exception()); + assert Await.result(child.ask("get", 5000), timeout).equals(0); + //#escalate-restart + //#testkit + } +//#testkit + public Seq seq(A... args) { + return JavaConverters.collectionAsScalaIterableConverter(java.util.Arrays.asList(args)).asScala().toSeq(); + } +//#testkit +} +//#testkit \ No newline at end of file diff --git a/akka-docs/java/fault-tolerance.rst b/akka-docs/java/fault-tolerance.rst index 76eb6f1486..ca6481bb77 100644 --- a/akka-docs/java/fault-tolerance.rst +++ b/akka-docs/java/fault-tolerance.rst @@ -1,10 +1,102 @@ .. _fault-tolerance-java: -Fault Tolerance Through Supervisor Hierarchies (Java) -===================================================== +Fault Handling Strategies (Java) +================================= .. sidebar:: Contents .. contents:: :local: -REWRITE ME \ No newline at end of file +As explained in :ref:`actor-systems` each actor is the supervisor of its +children, and as such each actor is given a fault handling strategy when it is +created. This strategy cannot be changed afterwards as it is an integral part +of the actor system’s structure. + +Creating a Fault Handling Strategy +---------------------------------- + +For the sake of demonstration let us consider the following strategy: + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: strategy + +I have chosen a few well-known exception types in order to demonstrate the +application of the fault handling actions described in :ref:`supervision`. +First off, it is a one-for-one strategy, meaning that each child is treated +separately (an all-for-one strategy works very similarly, the only difference +is that any decision is applied to all children of the supervisor, not only the +failing one). There are limits set on the restart frequency, namely maximum 10 +restarts per minute; each of these settings defaults to could be left out, which means +that the respective limit does not apply, leaving the possibility to specify an +absolute upper limit on the restarts or to make the restarts work infinitely. + +Practical Application +--------------------- + +The following section shows the effects of the different actions in practice, +wherefor a test setup is needed. First off, we need a suitable supervisor: + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: supervisor + +This supervisor will be used to create a child, with which we can experiment: + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: child + +The test is easier by using the utilities described in :ref:`akka-testkit`, +where ``TestProbe`` provides an actor ref useful for receiving and inspecting replies. + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: testkit + +Using the strategy shown above let us create actors: + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: create + +The first test shall demonstrate the ``Resume`` action, so we try it out by +setting some non-initial state in the actor and have it fail: + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: resume + +As you can see the value 42 survives the fault handling action. Now, if we +change the failure to a more serious ``NullPointerException``, that will no +longer be the case: + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: restart + +And finally in case of the fatal ``IllegalArgumentException`` the child will be +terminated by the supervisor: + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: stop + +Up to now the supervisor was completely unaffected by the child’s failure, +because the actions set did handle it. In case of an ``Exception``, this is not +true anymore and the supervisor escalates the failure. + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: escalate-kill + +The supervisor itself is supervised by the top-level actor provided by the +:class:`ActorSystem`, which has the default policy to restart in case of all +``Exception`` cases (with the notable exceptions of +``ActorInitializationException`` and ``ActorKilledException``). Since the +default action in case of a restart is to kill all children, we expected our poor +child not to survive this failure. + +In case this is not desired (which depends on the use case), we need to use a +different supervisor which overrides this behavior. + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: supervisor2 + +With this parent, the child survives the escalated restart, as demonstrated in +the last test: + +.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java + :include: escalate-restart +