Moved definition of fault handler from Props to overridable method supervisorStrategy in Actor. See #1711
* New trait SupervisorStrategy for TypedActors * Adjustments of docs * Updated tests
This commit is contained in:
parent
6cb1914b5c
commit
66e0a7cf0b
29 changed files with 331 additions and 223 deletions
|
|
@ -36,6 +36,30 @@ public class FaultHandlingTestBase {
|
|||
//#testkit
|
||||
//#supervisor
|
||||
static public class Supervisor extends UntypedActor {
|
||||
|
||||
//#strategy
|
||||
private static FaultHandlingStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||
@Override
|
||||
public Action apply(Throwable t) {
|
||||
if (t instanceof ArithmeticException) {
|
||||
return resume();
|
||||
} else if (t instanceof NullPointerException) {
|
||||
return restart();
|
||||
} else if (t instanceof IllegalArgumentException) {
|
||||
return stop();
|
||||
} else {
|
||||
return escalate();
|
||||
}
|
||||
}
|
||||
}, 10, 60000);
|
||||
|
||||
@Override
|
||||
public FaultHandlingStrategy supervisorStrategy() {
|
||||
return strategy;
|
||||
}
|
||||
|
||||
//#strategy
|
||||
|
||||
public void onReceive(Object o) {
|
||||
if (o instanceof Props) {
|
||||
getSender().tell(getContext().actorOf((Props) o));
|
||||
|
|
@ -44,10 +68,35 @@ public class FaultHandlingTestBase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
//#supervisor
|
||||
|
||||
|
||||
//#supervisor2
|
||||
static public class Supervisor2 extends UntypedActor {
|
||||
|
||||
//#strategy2
|
||||
private static FaultHandlingStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||
@Override
|
||||
public Action apply(Throwable t) {
|
||||
if (t instanceof ArithmeticException) {
|
||||
return resume();
|
||||
} else if (t instanceof NullPointerException) {
|
||||
return restart();
|
||||
} else if (t instanceof IllegalArgumentException) {
|
||||
return stop();
|
||||
} else {
|
||||
return escalate();
|
||||
}
|
||||
}
|
||||
}, 10, 60000);
|
||||
|
||||
@Override
|
||||
public FaultHandlingStrategy supervisorStrategy() {
|
||||
return strategy;
|
||||
}
|
||||
|
||||
//#strategy2
|
||||
|
||||
public void onReceive(Object o) {
|
||||
if (o instanceof Props) {
|
||||
getSender().tell(getContext().actorOf((Props) o));
|
||||
|
|
@ -55,18 +104,19 @@ public class FaultHandlingTestBase {
|
|||
unhandled(o);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void preRestart(Throwable cause, Option<Object> msg) {
|
||||
// do not kill all children, which is the default here
|
||||
}
|
||||
}
|
||||
|
||||
//#supervisor2
|
||||
|
||||
|
||||
//#child
|
||||
static public class Child extends UntypedActor {
|
||||
int state = 0;
|
||||
|
||||
|
||||
public void onReceive(Object o) throws Exception {
|
||||
if (o instanceof Exception) {
|
||||
throw (Exception) o;
|
||||
|
|
@ -79,39 +129,23 @@ public class FaultHandlingTestBase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
//#child
|
||||
|
||||
//#strategy
|
||||
static FaultHandlingStrategy strategy = new OneForOneStrategy(new Function<Throwable, Action>() {
|
||||
@Override
|
||||
public Action apply(Throwable t) {
|
||||
if (t instanceof ArithmeticException) {
|
||||
return resume();
|
||||
} else if (t instanceof NullPointerException) {
|
||||
return restart();
|
||||
} else if (t instanceof IllegalArgumentException) {
|
||||
return stop();
|
||||
} else {
|
||||
return escalate();
|
||||
}
|
||||
}
|
||||
}, 10, 60000);
|
||||
//#strategy
|
||||
|
||||
|
||||
//#testkit
|
||||
static ActorSystem system;
|
||||
Duration timeout = Duration.create(5, SECONDS);
|
||||
|
||||
|
||||
@BeforeClass
|
||||
public static void start() {
|
||||
system = ActorSystem.create("test", AkkaSpec.testConf());
|
||||
}
|
||||
|
||||
|
||||
@AfterClass
|
||||
public static void cleanup() {
|
||||
system.shutdown();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void mustEmployFaultHandler() {
|
||||
// code here
|
||||
|
|
@ -122,32 +156,32 @@ public class FaultHandlingTestBase {
|
|||
EventFilter ex4 = (EventFilter) new ErrorFilter(Exception.class);
|
||||
Seq<EventFilter> ignoreExceptions = seq(ex1, ex2, ex3, ex4);
|
||||
system.eventStream().publish(new TestEvent.Mute(ignoreExceptions));
|
||||
|
||||
|
||||
//#create
|
||||
Props superprops = new Props(Supervisor.class).withFaultHandler(strategy);
|
||||
Props superprops = new Props(Supervisor.class);
|
||||
ActorRef supervisor = system.actorOf(superprops, "supervisor");
|
||||
ActorRef child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
|
||||
//#create
|
||||
|
||||
|
||||
//#resume
|
||||
child.tell(42);
|
||||
assert Await.result(child.ask("get", 5000), timeout).equals(42);
|
||||
child.tell(new ArithmeticException());
|
||||
assert Await.result(child.ask("get", 5000), timeout).equals(42);
|
||||
//#resume
|
||||
|
||||
|
||||
//#restart
|
||||
child.tell(new NullPointerException());
|
||||
assert Await.result(child.ask("get", 5000), timeout).equals(0);
|
||||
//#restart
|
||||
|
||||
|
||||
//#stop
|
||||
final TestProbe probe = new TestProbe(system);
|
||||
probe.watch(child);
|
||||
child.tell(new IllegalArgumentException());
|
||||
probe.expectMsg(new Terminated(child));
|
||||
//#stop
|
||||
|
||||
|
||||
//#escalate-kill
|
||||
child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
|
||||
probe.watch(child);
|
||||
|
|
@ -155,9 +189,9 @@ public class FaultHandlingTestBase {
|
|||
child.tell(new Exception());
|
||||
probe.expectMsg(new Terminated(child));
|
||||
//#escalate-kill
|
||||
|
||||
|
||||
//#escalate-restart
|
||||
superprops = new Props(Supervisor2.class).withFaultHandler(strategy);
|
||||
superprops = new Props(Supervisor2.class);
|
||||
supervisor = system.actorOf(superprops, "supervisor2");
|
||||
child = (ActorRef) Await.result(supervisor.ask(new Props(Child.class), 5000), timeout);
|
||||
child.tell(23);
|
||||
|
|
@ -167,10 +201,11 @@ public class FaultHandlingTestBase {
|
|||
//#escalate-restart
|
||||
//#testkit
|
||||
}
|
||||
//#testkit
|
||||
|
||||
//#testkit
|
||||
public <A> Seq<A> seq(A... args) {
|
||||
return JavaConverters.collectionAsScalaIterableConverter(java.util.Arrays.asList(args)).asScala().toSeq();
|
||||
}
|
||||
//#testkit
|
||||
//#testkit
|
||||
}
|
||||
//#testkit
|
||||
|
|
@ -8,9 +8,9 @@ Fault Handling Strategies (Java)
|
|||
.. contents:: :local:
|
||||
|
||||
As explained in :ref:`actor-systems` each actor is the supervisor of its
|
||||
children, and as such each actor is given a fault handling strategy when it is
|
||||
created. This strategy cannot be changed afterwards as it is an integral part
|
||||
of the actor system’s structure.
|
||||
children, and as such each actor defines fault handling supervisor strategy.
|
||||
This strategy cannot be changed afterwards as it is an integral part of the
|
||||
actor system’s structure.
|
||||
|
||||
Creating a Fault Handling Strategy
|
||||
----------------------------------
|
||||
|
|
@ -26,7 +26,7 @@ First off, it is a one-for-one strategy, meaning that each child is treated
|
|||
separately (an all-for-one strategy works very similarly, the only difference
|
||||
is that any decision is applied to all children of the supervisor, not only the
|
||||
failing one). There are limits set on the restart frequency, namely maximum 10
|
||||
restarts per minute; each of these settings defaults to could be left out, which means
|
||||
restarts per minute; each of these settings could be left out, which means
|
||||
that the respective limit does not apply, leaving the possibility to specify an
|
||||
absolute upper limit on the restarts or to make the restarts work infinitely.
|
||||
|
||||
|
|
@ -50,7 +50,7 @@ where ``TestProbe`` provides an actor ref useful for receiving and inspecting re
|
|||
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||
:include: testkit
|
||||
|
||||
Using the strategy shown above let us create actors:
|
||||
Let us create actors:
|
||||
|
||||
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||
:include: create
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue