add Java part for fault handling docs
This commit is contained in:
parent
5298d80846
commit
6f31e83c7b
3 changed files with 266 additions and 3 deletions
|
|
@ -0,0 +1,4 @@
|
||||||
|
package akka.docs.actor
|
||||||
|
import org.scalatest.junit.JUnitSuite
|
||||||
|
|
||||||
|
class FaultHandlingTest extends FaultHandlingTestBase with JUnitSuite
|
||||||
167
akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java
Normal file
167
akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java
Normal file
|
|
@ -0,0 +1,167 @@
|
||||||
|
package akka.docs.actor;
|
||||||
|
|
||||||
|
//#testkit
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.actor.FaultHandlingStrategy;
|
||||||
|
import static akka.actor.FaultHandlingStrategy.*;
|
||||||
|
import akka.actor.OneForOneStrategy;
|
||||||
|
import akka.actor.Props;
|
||||||
|
import akka.actor.Terminated;
|
||||||
|
import akka.actor.UntypedActor;
|
||||||
|
import akka.dispatch.Await;
|
||||||
|
import akka.util.Duration;
|
||||||
|
import akka.testkit.AkkaSpec;
|
||||||
|
import akka.testkit.TestProbe;
|
||||||
|
|
||||||
|
//#testkit
|
||||||
|
import akka.testkit.ErrorFilter;
|
||||||
|
import akka.testkit.EventFilter;
|
||||||
|
import akka.testkit.TestEvent;
|
||||||
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
|
import akka.japi.Function;
|
||||||
|
import scala.Option;
|
||||||
|
import scala.collection.JavaConverters;
|
||||||
|
import scala.collection.Seq;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
|
||||||
|
//#testkit
|
||||||
|
public class FaultHandlingTestBase {
|
||||||
|
//#testkit
|
||||||
|
//#supervisor
|
||||||
|
static public class Supervisor extends UntypedActor {
|
||||||
|
public void onReceive(Object o) {
|
||||||
|
if (o instanceof Props) {
|
||||||
|
getSender().tell(getContext().actorOf((Props) o));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#supervisor
|
||||||
|
|
||||||
|
//#supervisor2
|
||||||
|
static public class Supervisor2 extends UntypedActor {
|
||||||
|
public void onReceive(Object o) {
|
||||||
|
if (o instanceof Props) {
|
||||||
|
getSender().tell(getContext().actorOf((Props) o));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preRestart(Throwable cause, Option<Object> msg) {
|
||||||
|
// do not kill all children, which is the default here
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#supervisor2
|
||||||
|
|
||||||
|
//#child
|
||||||
|
static public class Child extends UntypedActor {
|
||||||
|
int state = 0;
|
||||||
|
|
||||||
|
public void onReceive(Object o) throws Exception {
|
||||||
|
if (o instanceof Exception) {
|
||||||
|
throw (Exception) o;
|
||||||
|
} else if (o instanceof Integer) {
|
||||||
|
state = (Integer) o;
|
||||||
|
} else if (o.equals("get")) {
|
||||||
|
getSender().tell(state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#child
|
||||||
|
|
||||||
|
//#strategy
|
||||||
|
static FaultHandlingStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||||
|
@Override
|
||||||
|
public Action apply(Throwable t) {
|
||||||
|
if (t instanceof ArithmeticException) {
|
||||||
|
return resume();
|
||||||
|
} else if (t instanceof NullPointerException) {
|
||||||
|
return restart();
|
||||||
|
} else if (t instanceof IllegalArgumentException) {
|
||||||
|
return stop();
|
||||||
|
} else {
|
||||||
|
return escalate();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, 10, 60000);
|
||||||
|
//#strategy
|
||||||
|
|
||||||
|
//#testkit
|
||||||
|
static ActorSystem system;
|
||||||
|
Duration timeout = Duration.create(5, SECONDS);
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void start() {
|
||||||
|
system = ActorSystem.create("test", AkkaSpec.testConf());
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void cleanup() {
|
||||||
|
system.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mustEmployFaultHandler() {
|
||||||
|
// code here
|
||||||
|
//#testkit
|
||||||
|
EventFilter ex1 = (EventFilter) new ErrorFilter(ArithmeticException.class);
|
||||||
|
EventFilter ex2 = (EventFilter) new ErrorFilter(NullPointerException.class);
|
||||||
|
EventFilter ex3 = (EventFilter) new ErrorFilter(IllegalArgumentException.class);
|
||||||
|
EventFilter ex4 = (EventFilter) new ErrorFilter(Exception.class);
|
||||||
|
Seq<EventFilter> ignoreExceptions = seq(ex1, ex2, ex3, ex4);
|
||||||
|
system.eventStream().publish(new TestEvent.Mute(ignoreExceptions));
|
||||||
|
|
||||||
|
//#create
|
||||||
|
Props superprops = new Props(Supervisor.class).withFaultHandler(strategy);
|
||||||
|
ActorRef supervisor = system.actorOf(superprops, "supervisor");
|
||||||
|
ActorRef child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
|
||||||
|
//#create
|
||||||
|
|
||||||
|
//#resume
|
||||||
|
child.tell(42);
|
||||||
|
assert Await.result(child.ask("get", 5000), timeout).equals(42);
|
||||||
|
child.tell(new ArithmeticException());
|
||||||
|
assert Await.result(child.ask("get", 5000), timeout).equals(42);
|
||||||
|
//#resume
|
||||||
|
|
||||||
|
//#restart
|
||||||
|
child.tell(new NullPointerException());
|
||||||
|
assert Await.result(child.ask("get", 5000), timeout).equals(0);
|
||||||
|
//#restart
|
||||||
|
|
||||||
|
//#stop
|
||||||
|
final TestProbe probe = new TestProbe(system);
|
||||||
|
probe.watch(child);
|
||||||
|
child.tell(new IllegalArgumentException());
|
||||||
|
probe.expectMsg(new Terminated(child));
|
||||||
|
//#stop
|
||||||
|
|
||||||
|
//#escalate-kill
|
||||||
|
child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
|
||||||
|
probe.watch(child);
|
||||||
|
assert Await.result(child.ask("get", 5000), timeout).equals(0);
|
||||||
|
child.tell(new Exception());
|
||||||
|
probe.expectMsg(new Terminated(child));
|
||||||
|
//#escalate-kill
|
||||||
|
|
||||||
|
//#escalate-restart
|
||||||
|
superprops = new Props(Supervisor2.class).withFaultHandler(strategy);
|
||||||
|
supervisor = system.actorOf(superprops, "supervisor2");
|
||||||
|
child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
|
||||||
|
child.tell(23);
|
||||||
|
assert Await.result(child.ask("get", 5000), timeout).equals(23);
|
||||||
|
child.tell(new Exception());
|
||||||
|
assert Await.result(child.ask("get", 5000), timeout).equals(0);
|
||||||
|
//#escalate-restart
|
||||||
|
//#testkit
|
||||||
|
}
|
||||||
|
//#testkit
|
||||||
|
public <A> Seq<A> seq(A... args) {
|
||||||
|
return JavaConverters.collectionAsScalaIterableConverter(java.util.Arrays.asList(args)).asScala().toSeq();
|
||||||
|
}
|
||||||
|
//#testkit
|
||||||
|
}
|
||||||
|
//#testkit
|
||||||
|
|
@ -1,10 +1,102 @@
|
||||||
.. _fault-tolerance-java:
|
.. _fault-tolerance-java:
|
||||||
|
|
||||||
Fault Tolerance Through Supervisor Hierarchies (Java)
|
Fault Handling Strategies (Java)
|
||||||
=====================================================
|
=================================
|
||||||
|
|
||||||
.. sidebar:: Contents
|
.. sidebar:: Contents
|
||||||
|
|
||||||
.. contents:: :local:
|
.. contents:: :local:
|
||||||
|
|
||||||
REWRITE ME
|
As explained in :ref:`actor-systems` each actor is the supervisor of its
|
||||||
|
children, and as such each actor is given a fault handling strategy when it is
|
||||||
|
created. This strategy cannot be changed afterwards as it is an integral part
|
||||||
|
of the actor system’s structure.
|
||||||
|
|
||||||
|
Creating a Fault Handling Strategy
|
||||||
|
----------------------------------
|
||||||
|
|
||||||
|
For the sake of demonstration let us consider the following strategy:
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||||
|
:include: strategy
|
||||||
|
|
||||||
|
I have chosen a few well-known exception types in order to demonstrate the
|
||||||
|
application of the fault handling actions described in :ref:`supervision`.
|
||||||
|
First off, it is a one-for-one strategy, meaning that each child is treated
|
||||||
|
separately (an all-for-one strategy works very similarly, the only difference
|
||||||
|
is that any decision is applied to all children of the supervisor, not only the
|
||||||
|
failing one). There are limits set on the restart frequency, namely maximum 10
|
||||||
|
restarts per minute; each of these settings defaults to could be left out, which means
|
||||||
|
that the respective limit does not apply, leaving the possibility to specify an
|
||||||
|
absolute upper limit on the restarts or to make the restarts work infinitely.
|
||||||
|
|
||||||
|
Practical Application
|
||||||
|
---------------------
|
||||||
|
|
||||||
|
The following section shows the effects of the different actions in practice,
|
||||||
|
wherefor a test setup is needed. First off, we need a suitable supervisor:
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||||
|
:include: supervisor
|
||||||
|
|
||||||
|
This supervisor will be used to create a child, with which we can experiment:
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||||
|
:include: child
|
||||||
|
|
||||||
|
The test is easier by using the utilities described in :ref:`akka-testkit`,
|
||||||
|
where ``TestProbe`` provides an actor ref useful for receiving and inspecting replies.
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||||
|
:include: testkit
|
||||||
|
|
||||||
|
Using the strategy shown above let us create actors:
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||||
|
:include: create
|
||||||
|
|
||||||
|
The first test shall demonstrate the ``Resume`` action, so we try it out by
|
||||||
|
setting some non-initial state in the actor and have it fail:
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||||
|
:include: resume
|
||||||
|
|
||||||
|
As you can see the value 42 survives the fault handling action. Now, if we
|
||||||
|
change the failure to a more serious ``NullPointerException``, that will no
|
||||||
|
longer be the case:
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||||
|
:include: restart
|
||||||
|
|
||||||
|
And finally in case of the fatal ``IllegalArgumentException`` the child will be
|
||||||
|
terminated by the supervisor:
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||||
|
:include: stop
|
||||||
|
|
||||||
|
Up to now the supervisor was completely unaffected by the child’s failure,
|
||||||
|
because the actions set did handle it. In case of an ``Exception``, this is not
|
||||||
|
true anymore and the supervisor escalates the failure.
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||||
|
:include: escalate-kill
|
||||||
|
|
||||||
|
The supervisor itself is supervised by the top-level actor provided by the
|
||||||
|
:class:`ActorSystem`, which has the default policy to restart in case of all
|
||||||
|
``Exception`` cases (with the notable exceptions of
|
||||||
|
``ActorInitializationException`` and ``ActorKilledException``). Since the
|
||||||
|
default action in case of a restart is to kill all children, we expected our poor
|
||||||
|
child not to survive this failure.
|
||||||
|
|
||||||
|
In case this is not desired (which depends on the use case), we need to use a
|
||||||
|
different supervisor which overrides this behavior.
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||||
|
:include: supervisor2
|
||||||
|
|
||||||
|
With this parent, the child survives the escalated restart, as demonstrated in
|
||||||
|
the last test:
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||||
|
:include: escalate-restart
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue