+act #3911 Adding Java Lambda compatibility for Supervisor Strategy

Conflicts:
	akka-actor/src/main/scala/akka/util/Timeout.scala
This commit is contained in:
Björn Antonsson 2014-03-13 10:29:06 +01:00
parent 95d27e3f82
commit ba7247b240
18 changed files with 1141 additions and 27 deletions

View file

@ -0,0 +1,70 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.japi.pf;
import static akka.actor.SupervisorStrategy.Directive;
/**
* Used for building a partial function for {@link akka.actor.Actor#supervisorStrategy() Actor.supervisorStrategy()}.
* *
* Inside an actor you can use it like this with Java 8 to define your supervisorStrategy.
* <p/>
* Example:
* <pre>
* @Override
* private static SupervisorStrategy strategy =
* new OneForOneStrategy(10, Duration.create("1 minute"), DeciderBuilder.
* match(ArithmeticException.class, e -> resume()).
* match(NullPointerException.class, e -> restart()).
* match(IllegalArgumentException.class, e -> stop()).
* matchAny(o -> escalate()).build());
*
* @Override
* public SupervisorStrategy supervisorStrategy() {
* return strategy;
* }
* </pre>
*
* This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing.
*/
public class DeciderBuilder {
private DeciderBuilder() {
}
/**
* Return a new {@link PFBuilder} with a case statement added.
*
* @param type a type to match the argument against
* @param apply an action to apply to the argument if the type matches
* @return a builder with the case statement added
*/
public static <P extends Throwable> PFBuilder<Throwable, Directive> match(final Class<P> type, FI.Apply<P, Directive> apply) {
return Match.match(type, apply);
}
/**
* Return a new {@link PFBuilder} with a case statement added.
*
* @param type a type to match the argument against
* @param predicate a predicate that will be evaluated on the argument if the type matches
* @param apply an action to apply to the argument if the type matches and the predicate returns true
* @return a builder with the case statement added
*/
public static <P extends Throwable> PFBuilder<Throwable, Directive> match(final Class<P> type,
FI.TypedPredicate<P> predicate,
FI.Apply<P, Directive> apply) {
return Match.match(type, predicate, apply);
}
/**
* Return a new {@link PFBuilder} with a case statement added.
*
* @param apply an action to apply to the argument
* @return a builder with the case statement added
*/
public static PFBuilder<Throwable, Directive> matchAny(FI.Apply<Object, Directive> apply) {
return Match.matchAny(apply);
}
}

View file

@ -62,6 +62,18 @@ public class Match<I, R> extends AbstractMatch<I, R> {
return new PFBuilder<F, T>().matchEquals(object, apply);
}
/**
* Convenience function to create a {@link PFBuilder} with the first
* case statement added.
*
* @param apply an action to apply to the argument
* @return a builder with the case statement added
* @see PFBuilder#matchAny(FI.Apply)
*/
public static <F, T> PFBuilder<F, T> matchAny(final FI.Apply<Object, T> apply) {
return new PFBuilder<F, T>().matchAny(apply);
}
/**
* Create a {@link Match} from the builder.
*

View file

@ -70,6 +70,20 @@ public class ReceiveBuilder {
return UnitMatch.matchEquals(object, apply);
}
/**
* Return a new {@link UnitPFBuilder} with a case statement added.
*
* @param object the object to compare equals with
* @param predicate a predicate that will be evaluated on the argument if the object compares equal
* @param apply an action to apply to the argument if the object compares equal
* @return a builder with the case statement added
*/
public static <P> UnitPFBuilder<Object> matchEquals(P object,
FI.TypedPredicate<P> predicate,
FI.UnitApply<P> apply) {
return UnitMatch.matchEquals(object, predicate, apply);
}
/**
* Return a new {@link UnitPFBuilder} with a case statement added.
*

View file

@ -63,6 +63,22 @@ public class UnitMatch<I> extends AbstractMatch<I, BoxedUnit> {
return new UnitPFBuilder<F>().matchEquals(object, apply);
}
/**
* Convenience function to create a {@link UnitPFBuilder} with the first
* case statement added.
*
* @param object the object to compare equals with
* @param predicate a predicate that will be evaluated on the argument the object compares equal
* @param apply an action to apply to the argument if the object compares equal
* @return a builder with the case statement added
* @see UnitPFBuilder#matchEquals(Object, FI.UnitApply)
*/
public static <F, P> UnitPFBuilder<F> matchEquals(final P object,
final FI.TypedPredicate<P> predicate,
final FI.UnitApply<P> apply) {
return new UnitPFBuilder<F>().matchEquals(object, predicate, apply);
}
/**
* Convenience function to create a {@link UnitPFBuilder} with the first
* case statement added.

View file

@ -87,6 +87,34 @@ public final class UnitPFBuilder<I> extends AbstractPFBuilder<I, BoxedUnit> {
}, apply));
return this;
}
/**
* Add a new case statement to this builder.
*
* @param object the object to compare equals with
* @param predicate a predicate that will be evaluated on the argument if the object compares equal
* @param apply an action to apply to the argument if the object compares equal
* @return a builder with the case statement added
*/
public <P> UnitPFBuilder<I> matchEquals(final P object,
final FI.TypedPredicate<P> predicate,
final FI.UnitApply<P> apply) {
addStatement(new UnitCaseStatement<I, P>(
new FI.Predicate() {
@Override
public boolean defined(Object o) {
if (!object.equals(o))
return false;
else {
@SuppressWarnings("unchecked")
P p = (P) o;
return predicate.defined(p);
}
}
}, apply));
return this;
}
/**
* Add a new case statement to this builder, that matches any argument.
* @param apply an action to apply to the argument

View file

@ -52,6 +52,15 @@ abstract class AbstractActor extends Actor {
def getContext(): AbstractActorContext = context.asInstanceOf[AbstractActorContext]
}
/**
* Java API: compatible with lambda expressions
*
* Actor base class that mixes in logging into the Actor.
*
* This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing.
*/
abstract class AbstractLoggingActor extends AbstractActor with ActorLogging
/**
* Java API: compatible with lambda expressions
*

View file

@ -387,14 +387,45 @@ case class AllForOneStrategy(
import SupervisorStrategy._
/**
* Java API
*/
def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider, loggingEnabled: Boolean) =
this(maxNrOfRetries, withinTimeRange, loggingEnabled)(SupervisorStrategy.makeDecider(decider))
/**
* Java API
*/
def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider))
/**
* Java API
*/
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
/**
* Java API: compatible with lambda expressions
* This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing.
*/
def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.Decider) =
this(maxNrOfRetries = maxNrOfRetries, withinTimeRange = withinTimeRange)(decider)
/**
* Java API: compatible with lambda expressions
* This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing.
*/
def this(loggingEnabled: Boolean, decider: SupervisorStrategy.Decider) =
this(loggingEnabled = loggingEnabled)(decider)
/**
* Java API: compatible with lambda expressions
* This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing.
*/
def this(decider: SupervisorStrategy.Decider) =
this()(decider)
/*
* this is a performance optimization to avoid re-allocating the pairs upon
* every call to requestRestartPermission, assuming that strategies are shared
@ -432,15 +463,45 @@ case class OneForOneStrategy(
override val loggingEnabled: Boolean = true)(val decider: SupervisorStrategy.Decider)
extends SupervisorStrategy {
/**
* Java API
*/
def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider, loggingEnabled: Boolean) =
this(maxNrOfRetries, withinTimeRange, loggingEnabled)(SupervisorStrategy.makeDecider(decider))
/**
* Java API
*/
def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider))
/**
* Java API
*/
def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) =
this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit))
/**
* Java API: compatible with lambda expressions
* This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing.
*/
def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.Decider) =
this(maxNrOfRetries = maxNrOfRetries, withinTimeRange = withinTimeRange)(decider)
/**
* Java API: compatible with lambda expressions
* This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing.
*/
def this(loggingEnabled: Boolean, decider: SupervisorStrategy.Decider) =
this(loggingEnabled = loggingEnabled)(decider)
/**
* Java API: compatible with lambda expressions
* This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing.
*/
def this(decider: SupervisorStrategy.Decider) =
this()(decider)
/*
* this is a performance optimization to avoid re-allocating the pairs upon
* every call to requestRestartPermission, assuming that strategies are shared

View file

@ -30,6 +30,12 @@ object LoggingReceive {
case _: LoggingReceive r
case _ if (context.system.settings.AddLoggingReceive) new LoggingReceive(None, r) else r
}
/**
* Java API: compatible with lambda expressions
* This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing.
*/
def create(r: Receive, context: ActorContext): Receive = apply(r)(context)
}
/**

View file

@ -9,6 +9,7 @@ Java Documentation
intro/index-java
general/index
java/index-actors
java/lambda-index-actors
java/index-futures
java/index-network
java/index-utilities

View file

@ -16,6 +16,7 @@ import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import scala.collection.immutable.Seq;
import scala.concurrent.Await;
import static akka.pattern.Patterns.ask;
import scala.concurrent.duration.Duration;
@ -31,7 +32,6 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static akka.japi.Util.immutableSeq;
import akka.japi.Function;
import scala.Option;
import scala.collection.immutable.Seq;
import org.junit.Test;
import org.junit.BeforeClass;
@ -164,12 +164,13 @@ public class FaultHandlingTest {
public void mustEmploySupervisorStrategy() throws Exception {
// code here
//#testkit
EventFilter ex1 = (EventFilter) new ErrorFilter(ArithmeticException.class);
EventFilter ex2 = (EventFilter) new ErrorFilter(NullPointerException.class);
EventFilter ex3 = (EventFilter) new ErrorFilter(IllegalArgumentException.class);
EventFilter ex4 = (EventFilter) new ErrorFilter(Exception.class);
Seq<EventFilter> ignoreExceptions = seq(ex1, ex2, ex3, ex4);
system.eventStream().publish(new TestEvent.Mute(ignoreExceptions));
EventFilter ex1 = new ErrorFilter(ArithmeticException.class);
EventFilter ex2 = new ErrorFilter(NullPointerException.class);
EventFilter ex3 = new ErrorFilter(IllegalArgumentException.class);
EventFilter ex4 = new ErrorFilter(Exception.class);
EventFilter[] ignoreExceptions = { ex1, ex2, ex3, ex4 };
Seq<EventFilter> seq = immutableSeq(ignoreExceptions);
system.eventStream().publish(new TestEvent.Mute(seq));
//#create
Props superprops = Props.create(Supervisor.class);
@ -219,11 +220,5 @@ public class FaultHandlingTest {
//#testkit
}
//#testkit
@SuppressWarnings("unchecked")
public <A> Seq<A> seq(A... args) {
return immutableSeq(args);
}
//#testkit
}
//#testkit

View file

@ -24,13 +24,6 @@ sample as it is easy to follow the log output to understand what is happening in
fault-tolerance-sample
.. note::
If the strategy is declared inside the supervising actor (as opposed to
as a static property or class) its decider has access to all internal state of
the actor in a thread-safe fashion, including obtaining a reference to the
currently failed child (available as the ``getSender()`` of the failure message).
Creating a Supervisor Strategy
------------------------------

View file

@ -13,6 +13,3 @@ Actors
fsm
persistence
testing
lambda-actors
lambda-fsm
lambda-persistence

View file

@ -0,0 +1,53 @@
.. _lambda-fault-tolerance-sample-java:
Diagrams of the Fault Tolerance Sample
----------------------------------------------
.. image:: ../images/faulttolerancesample-normal-flow.png
*The above diagram illustrates the normal message flow.*
**Normal flow:**
======= ==================================================================================
Step Description
======= ==================================================================================
1 The progress ``Listener`` starts the work.
2 The ``Worker`` schedules work by sending ``Do`` messages periodically to itself
3, 4, 5 When receiving ``Do`` the ``Worker`` tells the ``CounterService``
to increment the counter, three times. The ``Increment`` message is forwarded
to the ``Counter``, which updates its counter variable and sends current value
to the ``Storage``.
6, 7 The ``Worker`` asks the ``CounterService`` of current value of the counter and pipes
the result back to the ``Listener``.
======= ==================================================================================
.. image:: ../images/faulttolerancesample-failure-flow.png
*The above diagram illustrates what happens in case of storage failure.*
**Failure flow:**
=========== ==================================================================================
Step Description
=========== ==================================================================================
1 The ``Storage`` throws ``StorageException``.
2 The ``CounterService`` is supervisor of the ``Storage`` and restarts the
``Storage`` when ``StorageException`` is thrown.
3, 4, 5, 6 The ``Storage`` continues to fail and is restarted.
7 After 3 failures and restarts within 5 seconds the ``Storage`` is stopped by its
supervisor, i.e. the ``CounterService``.
8 The ``CounterService`` is also watching the ``Storage`` for termination and
receives the ``Terminated`` message when the ``Storage`` has been stopped ...
9, 10, 11 and tells the ``Counter`` that there is no ``Storage``.
12 The ``CounterService`` schedules a ``Reconnect`` message to itself.
13, 14 When it receives the ``Reconnect`` message it creates a new ``Storage`` ...
15, 16 and tells the ``Counter`` to use the new ``Storage``
=========== ==================================================================================
Full Source Code of the Fault Tolerance Sample
------------------------------------------------------
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/japi/FaultHandlingDocSample.java#all

View file

@ -0,0 +1,175 @@
.. _lambda-fault-tolerance-java:
Fault Tolerance (Java with Lambda Support)
===========================================
As explained in :ref:`actor-systems` each actor is the supervisor of its
children, and as such each actor defines fault handling supervisor strategy.
This strategy cannot be changed afterwards as it is an integral part of the
actor systems structure.
Fault Handling in Practice
--------------------------
First, let us look at a sample that illustrates one way to handle data store errors,
which is a typical source of failure in real world applications. Of course it depends
on the actual application what is possible to do when the data store is unavailable,
but in this sample we use a best effort re-connect approach.
Read the following source code. The inlined comments explain the different pieces of
the fault handling and why they are added. It is also highly recommended to run this
sample as it is easy to follow the log output to understand what is happening in runtime.
.. toctree::
lambda-fault-tolerance-sample
Creating a Supervisor Strategy
------------------------------
The following sections explain the fault handling mechanism and alternatives
in more depth.
For the sake of demonstration let us consider the following strategy:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
:include: strategy
I have chosen a few well-known exception types in order to demonstrate the
application of the fault handling directives described in :ref:`supervision`.
First off, it is a one-for-one strategy, meaning that each child is treated
separately (an all-for-one strategy works very similarly, the only difference
is that any decision is applied to all children of the supervisor, not only the
failing one). There are limits set on the restart frequency, namely maximum 10
restarts per minute. ``-1`` and ``Duration.Inf()`` means that the respective limit
does not apply, leaving the possibility to specify an absolute upper limit on the
restarts or to make the restarts work infinitely.
The child actor is stopped if the limit is exceeded.
.. note::
If the strategy is declared inside the supervising actor (as opposed to
a separate class) its decider has access to all internal state of
the actor in a thread-safe fashion, including obtaining a reference to the
currently failed child (available as the ``getSender`` of the failure message).
Default Supervisor Strategy
^^^^^^^^^^^^^^^^^^^^^^^^^^^
``Escalate`` is used if the defined strategy doesn't cover the exception that was thrown.
When the supervisor strategy is not defined for an actor the following
exceptions are handled by default:
* ``ActorInitializationException`` will stop the failing child actor
* ``ActorKilledException`` will stop the failing child actor
* ``Exception`` will restart the failing child actor
* Other types of ``Throwable`` will be escalated to parent actor
If the exception escalate all the way up to the root guardian it will handle it
in the same way as the default strategy defined above.
Stopping Supervisor Strategy
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Closer to the Erlang way is the strategy to just stop children when they fail
and then take corrective action in the supervisor when DeathWatch signals the
loss of the child. This strategy is also provided pre-packaged as
:obj:`SupervisorStrategy.stoppingStrategy` with an accompanying
:class:`StoppingSupervisorStrategy` configurator to be used when you want the
``"/user"`` guardian to apply it.
Logging of Actor Failures
^^^^^^^^^^^^^^^^^^^^^^^^^
By default the ``SupervisorStrategy`` logs failures unless they are escalated.
Escalated failures are supposed to be handled, and potentially logged, at a level
higher in the hierarchy.
You can mute the default logging of a ``SupervisorStrategy`` by setting
``loggingEnabled`` to ``false`` when instantiating it. Customized logging
can be done inside the ``Decider``. Note that the reference to the currently
failed child is available as the ``getSender`` when the ``SupervisorStrategy`` is
declared inside the supervising actor.
You may also customize the logging in your own ``SupervisorStrategy`` implementation
by overriding the ``logFailure`` method.
Supervision of Top-Level Actors
-------------------------------
Toplevel actors means those which are created using ``system.actorOf()``, and
they are children of the :ref:`User Guardian <user-guardian>`. There are no
special rules applied in this case, the guardian simply applies the configured
strategy.
Test Application
----------------
The following section shows the effects of the different directives in practice,
wherefor a test setup is needed. First off, we need a suitable supervisor:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
:include: supervisor
This supervisor will be used to create a child, with which we can experiment:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
:include: child
The test is easier by using the utilities described in :ref:`akka-testkit`,
where ``TestProbe`` provides an actor ref useful for receiving and inspecting replies.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
:include: testkit
Let us create actors:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
:include: create
The first test shall demonstrate the ``Resume`` directive, so we try it out by
setting some non-initial state in the actor and have it fail:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
:include: resume
As you can see the value 42 survives the fault handling directive. Now, if we
change the failure to a more serious ``NullPointerException``, that will no
longer be the case:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
:include: restart
And finally in case of the fatal ``IllegalArgumentException`` the child will be
terminated by the supervisor:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
:include: stop
Up to now the supervisor was completely unaffected by the childs failure,
because the directives set did handle it. In case of an ``Exception``, this is not
true anymore and the supervisor escalates the failure.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
:include: escalate-kill
The supervisor itself is supervised by the top-level actor provided by the
:class:`ActorSystem`, which has the default policy to restart in case of all
``Exception`` cases (with the notable exceptions of
``ActorInitializationException`` and ``ActorKilledException``). Since the
default directive in case of a restart is to kill all children, we expected our poor
child not to survive this failure.
In case this is not desired (which depends on the use case), we need to use a
different supervisor which overrides this behavior.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
:include: supervisor2
With this parent, the child survives the escalated restart, as demonstrated in
the last test:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java
:include: escalate-restart

View file

@ -0,0 +1,10 @@
Actors (Java with Lambda Support)
=================================
.. toctree::
:maxdepth: 2
lambda-actors
lambda-fault-tolerance
lambda-fsm
lambda-persistence

View file

@ -0,0 +1,204 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
//#testkit
import akka.actor.*;
import static akka.actor.SupervisorStrategy.resume;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.stop;
import static akka.actor.SupervisorStrategy.escalate;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import scala.PartialFunction;
import scala.concurrent.Await;
import static akka.pattern.Patterns.ask;
import scala.concurrent.duration.Duration;
import akka.testkit.TestProbe;
//#testkit
import akka.testkit.ErrorFilter;
import akka.testkit.EventFilter;
import akka.testkit.TestEvent;
import akka.testkit.JavaTestKit;
import static java.util.concurrent.TimeUnit.SECONDS;
import static akka.japi.Util.immutableSeq;
import scala.Option;
import org.junit.Test;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import scala.runtime.BoxedUnit;
//#testkit
public class FaultHandlingTest {
//#testkit
public static Config config = ConfigFactory.parseString(
"akka {\n" +
" loggers = [\"akka.testkit.TestEventListener\"]\n" +
" loglevel = \"WARNING\"\n" +
" stdout-loglevel = \"WARNING\"\n" +
"}\n");
static
//#supervisor
public class Supervisor extends AbstractActor {
//#strategy
private static SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.create("1 minute"), DeciderBuilder.
match(ArithmeticException.class, e -> resume()).
match(NullPointerException.class, e -> restart()).
match(IllegalArgumentException.class, e -> stop()).
matchAny(o -> escalate()).build());
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
//#strategy
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Props.class, props -> {
sender().tell(context().actorOf(props), self());
}).build();
}
}
//#supervisor
static
//#supervisor2
public class Supervisor2 extends AbstractActor {
//#strategy2
private static SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.create("1 minute"), DeciderBuilder.
match(ArithmeticException.class, e -> resume()).
match(NullPointerException.class, e -> restart()).
match(IllegalArgumentException.class, e -> stop()).
matchAny(o -> escalate()).build());
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
//#strategy2
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Props.class, props -> {
sender().tell(context().actorOf(props), self());
}).build();
}
@Override
public void preRestart(Throwable cause, Option<Object> msg) {
// do not kill all children, which is the default here
}
}
//#supervisor2
static
//#child
public class Child extends AbstractActor {
int state = 0;
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Exception.class, exception -> { throw exception; }).
match(Integer.class, i -> state = i).
matchEquals("get", s -> sender().tell(state, self())).build();
}
}
//#child
//#testkit
static ActorSystem system;
Duration timeout = Duration.create(5, SECONDS);
@BeforeClass
public static void start() {
system = ActorSystem.create("FaultHandlingTest", config);
}
@AfterClass
public static void cleanup() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}
@Test
public void mustEmploySupervisorStrategy() throws Exception {
// code here
//#testkit
EventFilter ex1 = new ErrorFilter(ArithmeticException.class);
EventFilter ex2 = new ErrorFilter(NullPointerException.class);
EventFilter ex3 = new ErrorFilter(IllegalArgumentException.class);
EventFilter ex4 = new ErrorFilter(Exception.class);
EventFilter[] ignoreExceptions = { ex1, ex2, ex3, ex4 };
system.eventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions)));
//#create
Props superprops = Props.create(Supervisor.class);
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);
//#create
//#resume
child.tell(42, ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(42);
child.tell(new ArithmeticException(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(42);
//#resume
//#restart
child.tell(new NullPointerException(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
//#restart
//#stop
final TestProbe probe = new TestProbe(system);
probe.watch(child);
child.tell(new IllegalArgumentException(), ActorRef.noSender());
probe.expectMsgClass(Terminated.class);
//#stop
//#escalate-kill
child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);
probe.watch(child);
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
child.tell(new Exception(), ActorRef.noSender());
probe.expectMsgClass(Terminated.class);
//#escalate-kill
//#escalate-restart
superprops = Props.create(Supervisor2.class);
supervisor = system.actorOf(superprops);
child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);
child.tell(23, ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(23);
child.tell(new Exception(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
//#escalate-restart
//#testkit
}
}
//#testkit

View file

@ -36,8 +36,7 @@ public class BuncherTest {
}
@Test
public void testBuncherActorBatchesCorrectly()
{
public void testBuncherActorBatchesCorrectly() {
new JavaTestKit(system) {{
final ActorRef buncher =
system.actorOf(Props.create(Buncher.class));
@ -64,8 +63,7 @@ public class BuncherTest {
}
@Test
public void testBuncherActorDoesntBatchUninitialized()
{
public void testBuncherActorDoesntBatchUninitialized() {
new JavaTestKit(system) {{
final ActorRef buncher =
system.actorOf(Props.create(Buncher.class));

View file

@ -0,0 +1,472 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor.japi;
//#all
//#imports
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import akka.actor.*;
import akka.dispatch.Mapper;
import akka.event.LoggingReceive;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.util.Timeout;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import scala.concurrent.duration.Duration;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import static akka.japi.Util.classTag;
import static akka.actor.SupervisorStrategy.resume;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.stop;
import static akka.actor.SupervisorStrategy.escalate;
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
import static docs.actor.japi.FaultHandlingDocSample.WorkerApi.*;
import static docs.actor.japi.FaultHandlingDocSample.CounterServiceApi.*;
import static docs.actor.japi.FaultHandlingDocSample.CounterApi.*;
import static docs.actor.japi.FaultHandlingDocSample.StorageApi.*;
//#imports
public class FaultHandlingDocSample {
/**
* Runs the sample
*/
public static void main(String[] args) {
Config config = ConfigFactory.parseString(
"akka.loglevel = \"DEBUG\"\n" +
"akka.actor.debug {\n" +
" receive = on\n" +
" lifecycle = on\n" +
"}\n");
ActorSystem system = ActorSystem.create("FaultToleranceSample", config);
ActorRef worker = system.actorOf(Props.create(Worker.class), "worker");
ActorRef listener = system.actorOf(Props.create(Listener.class), "listener");
// start the work and listen on progress
// note that the listener is used as sender of the tell,
// i.e. it will receive replies from the worker
worker.tell(Start, listener);
}
/**
* Listens on progress from the worker and shuts down the system when enough
* work has been done.
*/
public static class Listener extends AbstractLoggingActor {
@Override
public void preStart() {
// If we don't get any progress within 15 seconds then the service
// is unavailable
context().setReceiveTimeout(Duration.create("15 seconds"));
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return LoggingReceive.create(ReceiveBuilder.
match(Progress.class, progress -> {
log().info("Current progress: {} %", progress.percent);
if (progress.percent >= 100.0) {
log().info("That's all, shutting down");
context().system().shutdown();
}
}).
matchEquals(ReceiveTimeout.getInstance(), x -> {
// No progress within 15 seconds, ServiceUnavailable
log().error("Shutting down due to unavailable service");
context().system().shutdown();
}).build(), context());
}
}
//#messages
public interface WorkerApi {
public static final Object Start = "Start";
public static final Object Do = "Do";
public static class Progress {
public final double percent;
public Progress(double percent) {
this.percent = percent;
}
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), percent);
}
}
}
//#messages
/**
* Worker performs some work when it receives the Start message. It will
* continuously notify the sender of the Start message of current Progress.
* The Worker supervise the CounterService.
*/
public static class Worker extends AbstractLoggingActor {
final Timeout askTimeout = new Timeout(Duration.create(5, "seconds"));
// The sender of the initial Start message will continuously be notified
// about progress
ActorRef progressListener;
final ActorRef counterService = context().actorOf(
Props.create(CounterService.class), "counter");
final int totalCount = 51;
// Stop the CounterService child if it throws ServiceUnavailable
private static final SupervisorStrategy strategy =
new OneForOneStrategy(DeciderBuilder.
match(ServiceUnavailable.class, e -> stop()).
matchAny(o -> escalate()).build());
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return LoggingReceive.create(ReceiveBuilder.
matchEquals(Start, x -> progressListener == null, x -> {
progressListener = sender();
context().system().scheduler().schedule(
Duration.Zero(), Duration.create(1, "second"), self(), Do,
context().dispatcher(), null
);
}).
matchEquals(Do, x -> {
counterService.tell(new Increment(1), self());
counterService.tell(new Increment(1), self());
counterService.tell(new Increment(1), self());
// Send current progress to the initial sender
pipe(ask(counterService, GetCurrentCount, askTimeout)
.mapTo(classTag(CurrentCount.class))
.map(new Mapper<CurrentCount, Progress>() {
public Progress apply(CurrentCount c) {
return new Progress(100.0 * c.count / totalCount);
}
}, context().dispatcher()), context().dispatcher())
.to(progressListener);
}).build(), context());
}
}
//#messages
public interface CounterServiceApi {
public static final Object GetCurrentCount = "GetCurrentCount";
public static class CurrentCount {
public final String key;
public final long count;
public CurrentCount(String key, long count) {
this.key = key;
this.count = count;
}
public String toString() {
return String.format("%s(%s, %s)", getClass().getSimpleName(), key, count);
}
}
public static class Increment {
public final long n;
public Increment(long n) {
this.n = n;
}
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), n);
}
}
public static class ServiceUnavailable extends RuntimeException {
private static final long serialVersionUID = 1L;
public ServiceUnavailable(String msg) {
super(msg);
}
}
}
//#messages
/**
* Adds the value received in Increment message to a persistent counter.
* Replies with CurrentCount when it is asked for CurrentCount. CounterService
* supervise Storage and Counter.
*/
public static class CounterService extends AbstractLoggingActor {
// Reconnect message
static final Object Reconnect = "Reconnect";
private static class SenderMsgPair {
final ActorRef sender;
final Object msg;
SenderMsgPair(ActorRef sender, Object msg) {
this.msg = msg;
this.sender = sender;
}
}
final String key = self().path().name();
ActorRef storage;
ActorRef counter;
final List<SenderMsgPair> backlog = new ArrayList<>();
final int MAX_BACKLOG = 10000;
// Restart the storage child when StorageException is thrown.
// After 3 restarts within 5 seconds it will be stopped.
private static final SupervisorStrategy strategy =
new OneForOneStrategy(3, Duration.create("5 seconds"), DeciderBuilder.
match(StorageException.class, e -> restart()).
matchAny(o -> escalate()).build());
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
@Override
public void preStart() {
initStorage();
}
/**
* The child storage is restarted in case of failure, but after 3 restarts,
* and still failing it will be stopped. Better to back-off than
* continuously failing. When it has been stopped we will schedule a
* Reconnect after a delay. Watch the child so we receive Terminated message
* when it has been terminated.
*/
void initStorage() {
storage = context().watch(context().actorOf(
Props.create(Storage.class), "storage"));
// Tell the counter, if any, to use the new storage
if (counter != null)
counter.tell(new UseStorage(storage), self());
// We need the initial value to be able to operate
storage.tell(new Get(key), self());
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return LoggingReceive.create(ReceiveBuilder.
match(Entry.class, entry -> entry.key.equals(key) && counter == null, entry -> {
// Reply from Storage of the initial value, now we can create the Counter
final long value = entry.value;
counter = context().actorOf(Props.create(Counter.class, key, value));
// Tell the counter to use current storage
counter.tell(new UseStorage(storage), self());
// and send the buffered backlog to the counter
for (SenderMsgPair each : backlog) {
counter.tell(each.msg, each.sender);
}
backlog.clear();
}).
match(Increment.class, increment -> {
forwardOrPlaceInBacklog(increment);
}).
matchEquals(GetCurrentCount, gcc -> {
forwardOrPlaceInBacklog(gcc);
}).
match(Terminated.class, o -> {
// After 3 restarts the storage child is stopped.
// We receive Terminated because we watch the child, see initStorage.
storage = null;
// Tell the counter that there is no storage for the moment
counter.tell(new UseStorage(null), self());
// Try to re-establish storage after while
context().system().scheduler().scheduleOnce(
Duration.create(10, "seconds"), self(), Reconnect,
context().dispatcher(), null);
}).
matchEquals(Reconnect, o -> {
// Re-establish storage after the scheduled delay
initStorage();
}).build(), context());
}
void forwardOrPlaceInBacklog(Object msg) {
// We need the initial value from storage before we can start delegate to
// the counter. Before that we place the messages in a backlog, to be sent
// to the counter when it is initialized.
if (counter == null) {
if (backlog.size() >= MAX_BACKLOG)
throw new ServiceUnavailable("CounterService not available," +
" lack of initial value");
backlog.add(new SenderMsgPair(sender(), msg));
} else {
counter.forward(msg, context());
}
}
}
//#messages
public interface CounterApi {
public static class UseStorage {
public final ActorRef storage;
public UseStorage(ActorRef storage) {
this.storage = storage;
}
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), storage);
}
}
}
//#messages
/**
* The in memory count variable that will send current value to the Storage,
* if there is any storage available at the moment.
*/
public static class Counter extends AbstractLoggingActor {
final String key;
long count;
ActorRef storage;
public Counter(String key, long initialValue) {
this.key = key;
this.count = initialValue;
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return LoggingReceive.create(ReceiveBuilder.
match(UseStorage.class, useStorage -> {
storage = useStorage.storage;
storeCount();
}).
match(Increment.class, increment -> {
count += increment.n;
storeCount();
}).
matchEquals(GetCurrentCount, gcc -> {
sender().tell(new CurrentCount(key, count), self());
}).build(), context());
}
void storeCount() {
// Delegate dangerous work, to protect our valuable state.
// We can continue without storage.
if (storage != null) {
storage.tell(new Store(new Entry(key, count)), self());
}
}
}
//#messages
public interface StorageApi {
public static class Store {
public final Entry entry;
public Store(Entry entry) {
this.entry = entry;
}
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), entry);
}
}
public static class Entry {
public final String key;
public final long value;
public Entry(String key, long value) {
this.key = key;
this.value = value;
}
public String toString() {
return String.format("%s(%s, %s)", getClass().getSimpleName(), key, value);
}
}
public static class Get {
public final String key;
public Get(String key) {
this.key = key;
}
public String toString() {
return String.format("%s(%s)", getClass().getSimpleName(), key);
}
}
public static class StorageException extends RuntimeException {
private static final long serialVersionUID = 1L;
public StorageException(String msg) {
super(msg);
}
}
}
//#messages
/**
* Saves key/value pairs to persistent storage when receiving Store message.
* Replies with current value when receiving Get message. Will throw
* StorageException if the underlying data store is out of order.
*/
public static class Storage extends AbstractLoggingActor {
final DummyDB db = DummyDB.instance;
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return LoggingReceive.create(ReceiveBuilder.
match(Store.class, store -> {
db.save(store.entry.key, store.entry.value);
}).
match(Get.class, get -> {
Long value = db.load(get.key);
sender().tell(new Entry(get.key, value == null ?
Long.valueOf(0L) : value), self());
}).build(), context());
}
}
//#dummydb
public static class DummyDB {
public static final DummyDB instance = new DummyDB();
private final Map<String, Long> db = new HashMap<String, Long>();
private DummyDB() {
}
public synchronized void save(String key, Long value) throws StorageException {
if (11 <= value && value <= 14)
throw new StorageException("Simulated store failure " + value);
db.put(key, value);
}
public synchronized Long load(String key) throws StorageException {
return db.get(key);
}
}
//#dummydb
}
//#all