diff --git a/akka-actor/src/main/java/akka/japi/pf/DeciderBuilder.java b/akka-actor/src/main/java/akka/japi/pf/DeciderBuilder.java new file mode 100644 index 0000000000..672dfb6c64 --- /dev/null +++ b/akka-actor/src/main/java/akka/japi/pf/DeciderBuilder.java @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.japi.pf; + +import static akka.actor.SupervisorStrategy.Directive; + +/** + * Used for building a partial function for {@link akka.actor.Actor#supervisorStrategy() Actor.supervisorStrategy()}. + * * + * Inside an actor you can use it like this with Java 8 to define your supervisorStrategy. + *

+ * Example: + *

+ * @Override
+ * private static SupervisorStrategy strategy =
+ *   new OneForOneStrategy(10, Duration.create("1 minute"), DeciderBuilder.
+ *     match(ArithmeticException.class, e -> resume()).
+ *     match(NullPointerException.class, e -> restart()).
+ *     match(IllegalArgumentException.class, e -> stop()).
+ *     matchAny(o -> escalate()).build());
+ *
+ * @Override
+ * public SupervisorStrategy supervisorStrategy() {
+ *   return strategy;
+ * }
+ * 
+ * + * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. + */ +public class DeciderBuilder { + private DeciderBuilder() { + } + + /** + * Return a new {@link PFBuilder} with a case statement added. + * + * @param type a type to match the argument against + * @param apply an action to apply to the argument if the type matches + * @return a builder with the case statement added + */ + public static

PFBuilder match(final Class

type, FI.Apply apply) { + return Match.match(type, apply); + } + + /** + * Return a new {@link PFBuilder} with a case statement added. + * + * @param type a type to match the argument against + * @param predicate a predicate that will be evaluated on the argument if the type matches + * @param apply an action to apply to the argument if the type matches and the predicate returns true + * @return a builder with the case statement added + */ + public static

PFBuilder match(final Class

type, + FI.TypedPredicate

predicate, + FI.Apply apply) { + return Match.match(type, predicate, apply); + } + + /** + * Return a new {@link PFBuilder} with a case statement added. + * + * @param apply an action to apply to the argument + * @return a builder with the case statement added + */ + public static PFBuilder matchAny(FI.Apply apply) { + return Match.matchAny(apply); + } +} diff --git a/akka-actor/src/main/java/akka/japi/pf/Match.java b/akka-actor/src/main/java/akka/japi/pf/Match.java index c4926a869c..8b3dcd24e3 100644 --- a/akka-actor/src/main/java/akka/japi/pf/Match.java +++ b/akka-actor/src/main/java/akka/japi/pf/Match.java @@ -62,6 +62,18 @@ public class Match extends AbstractMatch { return new PFBuilder().matchEquals(object, apply); } + /** + * Convenience function to create a {@link PFBuilder} with the first + * case statement added. + * + * @param apply an action to apply to the argument + * @return a builder with the case statement added + * @see PFBuilder#matchAny(FI.Apply) + */ + public static PFBuilder matchAny(final FI.Apply apply) { + return new PFBuilder().matchAny(apply); + } + /** * Create a {@link Match} from the builder. * diff --git a/akka-actor/src/main/java/akka/japi/pf/ReceiveBuilder.java b/akka-actor/src/main/java/akka/japi/pf/ReceiveBuilder.java index 4768489dde..bd47648fe7 100644 --- a/akka-actor/src/main/java/akka/japi/pf/ReceiveBuilder.java +++ b/akka-actor/src/main/java/akka/japi/pf/ReceiveBuilder.java @@ -70,6 +70,20 @@ public class ReceiveBuilder { return UnitMatch.matchEquals(object, apply); } + /** + * Return a new {@link UnitPFBuilder} with a case statement added. + * + * @param object the object to compare equals with + * @param predicate a predicate that will be evaluated on the argument if the object compares equal + * @param apply an action to apply to the argument if the object compares equal + * @return a builder with the case statement added + */ + public static

UnitPFBuilder matchEquals(P object, + FI.TypedPredicate

predicate, + FI.UnitApply

apply) { + return UnitMatch.matchEquals(object, predicate, apply); + } + /** * Return a new {@link UnitPFBuilder} with a case statement added. * diff --git a/akka-actor/src/main/java/akka/japi/pf/UnitMatch.java b/akka-actor/src/main/java/akka/japi/pf/UnitMatch.java index dcc81af128..2a0f19b1d5 100644 --- a/akka-actor/src/main/java/akka/japi/pf/UnitMatch.java +++ b/akka-actor/src/main/java/akka/japi/pf/UnitMatch.java @@ -63,6 +63,22 @@ public class UnitMatch extends AbstractMatch { return new UnitPFBuilder().matchEquals(object, apply); } + /** + * Convenience function to create a {@link UnitPFBuilder} with the first + * case statement added. + * + * @param object the object to compare equals with + * @param predicate a predicate that will be evaluated on the argument the object compares equal + * @param apply an action to apply to the argument if the object compares equal + * @return a builder with the case statement added + * @see UnitPFBuilder#matchEquals(Object, FI.UnitApply) + */ + public static UnitPFBuilder matchEquals(final P object, + final FI.TypedPredicate

predicate, + final FI.UnitApply

apply) { + return new UnitPFBuilder().matchEquals(object, predicate, apply); + } + /** * Convenience function to create a {@link UnitPFBuilder} with the first * case statement added. diff --git a/akka-actor/src/main/java/akka/japi/pf/UnitPFBuilder.java b/akka-actor/src/main/java/akka/japi/pf/UnitPFBuilder.java index 78ae74368d..0ce2b03f32 100644 --- a/akka-actor/src/main/java/akka/japi/pf/UnitPFBuilder.java +++ b/akka-actor/src/main/java/akka/japi/pf/UnitPFBuilder.java @@ -87,6 +87,34 @@ public final class UnitPFBuilder extends AbstractPFBuilder { }, apply)); return this; } + + /** + * Add a new case statement to this builder. + * + * @param object the object to compare equals with + * @param predicate a predicate that will be evaluated on the argument if the object compares equal + * @param apply an action to apply to the argument if the object compares equal + * @return a builder with the case statement added + */ + public

UnitPFBuilder matchEquals(final P object, + final FI.TypedPredicate

predicate, + final FI.UnitApply

apply) { + addStatement(new UnitCaseStatement( + new FI.Predicate() { + @Override + public boolean defined(Object o) { + if (!object.equals(o)) + return false; + else { + @SuppressWarnings("unchecked") + P p = (P) o; + return predicate.defined(p); + } + } + }, apply)); + return this; + } + /** * Add a new case statement to this builder, that matches any argument. * @param apply an action to apply to the argument diff --git a/akka-actor/src/main/scala/akka/actor/AbstractActor.scala b/akka-actor/src/main/scala/akka/actor/AbstractActor.scala index 6727c43562..98d36ae7cf 100644 --- a/akka-actor/src/main/scala/akka/actor/AbstractActor.scala +++ b/akka-actor/src/main/scala/akka/actor/AbstractActor.scala @@ -52,6 +52,15 @@ abstract class AbstractActor extends Actor { def getContext(): AbstractActorContext = context.asInstanceOf[AbstractActorContext] } +/** + * Java API: compatible with lambda expressions + * + * Actor base class that mixes in logging into the Actor. + * + * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. + */ +abstract class AbstractLoggingActor extends AbstractActor with ActorLogging + /** * Java API: compatible with lambda expressions * diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index b8d3ccd2ee..5b4938695a 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -387,14 +387,45 @@ case class AllForOneStrategy( import SupervisorStrategy._ + /** + * Java API + */ def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider, loggingEnabled: Boolean) = this(maxNrOfRetries, withinTimeRange, loggingEnabled)(SupervisorStrategy.makeDecider(decider)) + /** + * Java API + */ def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) = this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider)) + /** + * Java API + */ def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) = this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) + + /** + * Java API: compatible with lambda expressions + * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. + */ + def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.Decider) = + this(maxNrOfRetries = maxNrOfRetries, withinTimeRange = withinTimeRange)(decider) + + /** + * Java API: compatible with lambda expressions + * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. + */ + def this(loggingEnabled: Boolean, decider: SupervisorStrategy.Decider) = + this(loggingEnabled = loggingEnabled)(decider) + + /** + * Java API: compatible with lambda expressions + * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. + */ + def this(decider: SupervisorStrategy.Decider) = + this()(decider) + /* * this is a performance optimization to avoid re-allocating the pairs upon * every call to requestRestartPermission, assuming that strategies are shared @@ -432,15 +463,45 @@ case class OneForOneStrategy( override val loggingEnabled: Boolean = true)(val decider: SupervisorStrategy.Decider) extends SupervisorStrategy { + /** + * Java API + */ def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider, loggingEnabled: Boolean) = this(maxNrOfRetries, withinTimeRange, loggingEnabled)(SupervisorStrategy.makeDecider(decider)) + /** + * Java API + */ def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) = this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider)) + /** + * Java API + */ def this(maxNrOfRetries: Int, withinTimeRange: Duration, trapExit: JIterable[Class[_ <: Throwable]]) = this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(trapExit)) + /** + * Java API: compatible with lambda expressions + * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. + */ + def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.Decider) = + this(maxNrOfRetries = maxNrOfRetries, withinTimeRange = withinTimeRange)(decider) + + /** + * Java API: compatible with lambda expressions + * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. + */ + def this(loggingEnabled: Boolean, decider: SupervisorStrategy.Decider) = + this(loggingEnabled = loggingEnabled)(decider) + + /** + * Java API: compatible with lambda expressions + * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. + */ + def this(decider: SupervisorStrategy.Decider) = + this()(decider) + /* * this is a performance optimization to avoid re-allocating the pairs upon * every call to requestRestartPermission, assuming that strategies are shared diff --git a/akka-actor/src/main/scala/akka/event/LoggingReceive.scala b/akka-actor/src/main/scala/akka/event/LoggingReceive.scala index ef79ce42bf..8388ab5c9c 100644 --- a/akka-actor/src/main/scala/akka/event/LoggingReceive.scala +++ b/akka-actor/src/main/scala/akka/event/LoggingReceive.scala @@ -30,6 +30,12 @@ object LoggingReceive { case _: LoggingReceive ⇒ r case _ ⇒ if (context.system.settings.AddLoggingReceive) new LoggingReceive(None, r) else r } + + /** + * Java API: compatible with lambda expressions + * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. + */ + def create(r: Receive, context: ActorContext): Receive = apply(r)(context) } /** diff --git a/akka-docs/rst/java.rst b/akka-docs/rst/java.rst index 46dca46987..b8c90e53e1 100644 --- a/akka-docs/rst/java.rst +++ b/akka-docs/rst/java.rst @@ -9,6 +9,7 @@ Java Documentation intro/index-java general/index java/index-actors + java/lambda-index-actors java/index-futures java/index-network java/index-utilities diff --git a/akka-docs/rst/java/code/docs/actor/FaultHandlingTest.java b/akka-docs/rst/java/code/docs/actor/FaultHandlingTest.java index 4e2833bf9d..8ce0fba610 100644 --- a/akka-docs/rst/java/code/docs/actor/FaultHandlingTest.java +++ b/akka-docs/rst/java/code/docs/actor/FaultHandlingTest.java @@ -16,6 +16,7 @@ import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.Terminated; import akka.actor.UntypedActor; +import scala.collection.immutable.Seq; import scala.concurrent.Await; import static akka.pattern.Patterns.ask; import scala.concurrent.duration.Duration; @@ -31,7 +32,6 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static akka.japi.Util.immutableSeq; import akka.japi.Function; import scala.Option; -import scala.collection.immutable.Seq; import org.junit.Test; import org.junit.BeforeClass; @@ -164,12 +164,13 @@ public class FaultHandlingTest { public void mustEmploySupervisorStrategy() throws Exception { // code here //#testkit - EventFilter ex1 = (EventFilter) new ErrorFilter(ArithmeticException.class); - EventFilter ex2 = (EventFilter) new ErrorFilter(NullPointerException.class); - EventFilter ex3 = (EventFilter) new ErrorFilter(IllegalArgumentException.class); - EventFilter ex4 = (EventFilter) new ErrorFilter(Exception.class); - Seq ignoreExceptions = seq(ex1, ex2, ex3, ex4); - system.eventStream().publish(new TestEvent.Mute(ignoreExceptions)); + EventFilter ex1 = new ErrorFilter(ArithmeticException.class); + EventFilter ex2 = new ErrorFilter(NullPointerException.class); + EventFilter ex3 = new ErrorFilter(IllegalArgumentException.class); + EventFilter ex4 = new ErrorFilter(Exception.class); + EventFilter[] ignoreExceptions = { ex1, ex2, ex3, ex4 }; + Seq seq = immutableSeq(ignoreExceptions); + system.eventStream().publish(new TestEvent.Mute(seq)); //#create Props superprops = Props.create(Supervisor.class); @@ -219,11 +220,5 @@ public class FaultHandlingTest { //#testkit } - //#testkit - @SuppressWarnings("unchecked") - public Seq seq(A... args) { - return immutableSeq(args); - } - //#testkit } //#testkit diff --git a/akka-docs/rst/java/fault-tolerance.rst b/akka-docs/rst/java/fault-tolerance.rst index 8d907fd18d..0288c3d9be 100644 --- a/akka-docs/rst/java/fault-tolerance.rst +++ b/akka-docs/rst/java/fault-tolerance.rst @@ -24,13 +24,6 @@ sample as it is easy to follow the log output to understand what is happening in fault-tolerance-sample -.. note:: - - If the strategy is declared inside the supervising actor (as opposed to - as a static property or class) its decider has access to all internal state of - the actor in a thread-safe fashion, including obtaining a reference to the - currently failed child (available as the ``getSender()`` of the failure message). - Creating a Supervisor Strategy ------------------------------ diff --git a/akka-docs/rst/java/index-actors.rst b/akka-docs/rst/java/index-actors.rst index 3bb88e0095..20b567295b 100644 --- a/akka-docs/rst/java/index-actors.rst +++ b/akka-docs/rst/java/index-actors.rst @@ -13,6 +13,3 @@ Actors fsm persistence testing - lambda-actors - lambda-fsm - lambda-persistence diff --git a/akka-docs/rst/java/lambda-fault-tolerance-sample.rst b/akka-docs/rst/java/lambda-fault-tolerance-sample.rst new file mode 100644 index 0000000000..7fdcb1dbc8 --- /dev/null +++ b/akka-docs/rst/java/lambda-fault-tolerance-sample.rst @@ -0,0 +1,53 @@ +.. _lambda-fault-tolerance-sample-java: + +Diagrams of the Fault Tolerance Sample +---------------------------------------------- + +.. image:: ../images/faulttolerancesample-normal-flow.png + +*The above diagram illustrates the normal message flow.* + +**Normal flow:** + +======= ================================================================================== +Step Description +======= ================================================================================== +1 The progress ``Listener`` starts the work. +2 The ``Worker`` schedules work by sending ``Do`` messages periodically to itself +3, 4, 5 When receiving ``Do`` the ``Worker`` tells the ``CounterService`` + to increment the counter, three times. The ``Increment`` message is forwarded + to the ``Counter``, which updates its counter variable and sends current value + to the ``Storage``. +6, 7 The ``Worker`` asks the ``CounterService`` of current value of the counter and pipes + the result back to the ``Listener``. +======= ================================================================================== + + +.. image:: ../images/faulttolerancesample-failure-flow.png + +*The above diagram illustrates what happens in case of storage failure.* + +**Failure flow:** + +=========== ================================================================================== +Step Description +=========== ================================================================================== +1 The ``Storage`` throws ``StorageException``. +2 The ``CounterService`` is supervisor of the ``Storage`` and restarts the + ``Storage`` when ``StorageException`` is thrown. +3, 4, 5, 6 The ``Storage`` continues to fail and is restarted. +7 After 3 failures and restarts within 5 seconds the ``Storage`` is stopped by its + supervisor, i.e. the ``CounterService``. +8 The ``CounterService`` is also watching the ``Storage`` for termination and + receives the ``Terminated`` message when the ``Storage`` has been stopped ... +9, 10, 11 and tells the ``Counter`` that there is no ``Storage``. +12 The ``CounterService`` schedules a ``Reconnect`` message to itself. +13, 14 When it receives the ``Reconnect`` message it creates a new ``Storage`` ... +15, 16 and tells the ``Counter`` to use the new ``Storage`` +=========== ================================================================================== + +Full Source Code of the Fault Tolerance Sample +------------------------------------------------------ + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/japi/FaultHandlingDocSample.java#all + diff --git a/akka-docs/rst/java/lambda-fault-tolerance.rst b/akka-docs/rst/java/lambda-fault-tolerance.rst new file mode 100644 index 0000000000..7a9d27ddff --- /dev/null +++ b/akka-docs/rst/java/lambda-fault-tolerance.rst @@ -0,0 +1,175 @@ +.. _lambda-fault-tolerance-java: + +Fault Tolerance (Java with Lambda Support) +=========================================== + +As explained in :ref:`actor-systems` each actor is the supervisor of its +children, and as such each actor defines fault handling supervisor strategy. +This strategy cannot be changed afterwards as it is an integral part of the +actor system’s structure. + +Fault Handling in Practice +-------------------------- + +First, let us look at a sample that illustrates one way to handle data store errors, +which is a typical source of failure in real world applications. Of course it depends +on the actual application what is possible to do when the data store is unavailable, +but in this sample we use a best effort re-connect approach. + +Read the following source code. The inlined comments explain the different pieces of +the fault handling and why they are added. It is also highly recommended to run this +sample as it is easy to follow the log output to understand what is happening in runtime. + +.. toctree:: + + lambda-fault-tolerance-sample + +Creating a Supervisor Strategy +------------------------------ + +The following sections explain the fault handling mechanism and alternatives +in more depth. + +For the sake of demonstration let us consider the following strategy: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java + :include: strategy + +I have chosen a few well-known exception types in order to demonstrate the +application of the fault handling directives described in :ref:`supervision`. +First off, it is a one-for-one strategy, meaning that each child is treated +separately (an all-for-one strategy works very similarly, the only difference +is that any decision is applied to all children of the supervisor, not only the +failing one). There are limits set on the restart frequency, namely maximum 10 +restarts per minute. ``-1`` and ``Duration.Inf()`` means that the respective limit +does not apply, leaving the possibility to specify an absolute upper limit on the +restarts or to make the restarts work infinitely. +The child actor is stopped if the limit is exceeded. + +.. note:: + + If the strategy is declared inside the supervising actor (as opposed to + a separate class) its decider has access to all internal state of + the actor in a thread-safe fashion, including obtaining a reference to the + currently failed child (available as the ``getSender`` of the failure message). + +Default Supervisor Strategy +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +``Escalate`` is used if the defined strategy doesn't cover the exception that was thrown. + +When the supervisor strategy is not defined for an actor the following +exceptions are handled by default: + +* ``ActorInitializationException`` will stop the failing child actor +* ``ActorKilledException`` will stop the failing child actor +* ``Exception`` will restart the failing child actor +* Other types of ``Throwable`` will be escalated to parent actor + +If the exception escalate all the way up to the root guardian it will handle it +in the same way as the default strategy defined above. + +Stopping Supervisor Strategy +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Closer to the Erlang way is the strategy to just stop children when they fail +and then take corrective action in the supervisor when DeathWatch signals the +loss of the child. This strategy is also provided pre-packaged as +:obj:`SupervisorStrategy.stoppingStrategy` with an accompanying +:class:`StoppingSupervisorStrategy` configurator to be used when you want the +``"/user"`` guardian to apply it. + +Logging of Actor Failures +^^^^^^^^^^^^^^^^^^^^^^^^^ + +By default the ``SupervisorStrategy`` logs failures unless they are escalated. +Escalated failures are supposed to be handled, and potentially logged, at a level +higher in the hierarchy. + +You can mute the default logging of a ``SupervisorStrategy`` by setting +``loggingEnabled`` to ``false`` when instantiating it. Customized logging +can be done inside the ``Decider``. Note that the reference to the currently +failed child is available as the ``getSender`` when the ``SupervisorStrategy`` is +declared inside the supervising actor. + +You may also customize the logging in your own ``SupervisorStrategy`` implementation +by overriding the ``logFailure`` method. + +Supervision of Top-Level Actors +------------------------------- + +Toplevel actors means those which are created using ``system.actorOf()``, and +they are children of the :ref:`User Guardian `. There are no +special rules applied in this case, the guardian simply applies the configured +strategy. + +Test Application +---------------- + +The following section shows the effects of the different directives in practice, +wherefor a test setup is needed. First off, we need a suitable supervisor: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java + :include: supervisor + +This supervisor will be used to create a child, with which we can experiment: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java + :include: child + +The test is easier by using the utilities described in :ref:`akka-testkit`, +where ``TestProbe`` provides an actor ref useful for receiving and inspecting replies. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java + :include: testkit + +Let us create actors: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java + :include: create + +The first test shall demonstrate the ``Resume`` directive, so we try it out by +setting some non-initial state in the actor and have it fail: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java + :include: resume + +As you can see the value 42 survives the fault handling directive. Now, if we +change the failure to a more serious ``NullPointerException``, that will no +longer be the case: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java + :include: restart + +And finally in case of the fatal ``IllegalArgumentException`` the child will be +terminated by the supervisor: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java + :include: stop + +Up to now the supervisor was completely unaffected by the child’s failure, +because the directives set did handle it. In case of an ``Exception``, this is not +true anymore and the supervisor escalates the failure. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java + :include: escalate-kill + +The supervisor itself is supervised by the top-level actor provided by the +:class:`ActorSystem`, which has the default policy to restart in case of all +``Exception`` cases (with the notable exceptions of +``ActorInitializationException`` and ``ActorKilledException``). Since the +default directive in case of a restart is to kill all children, we expected our poor +child not to survive this failure. + +In case this is not desired (which depends on the use case), we need to use a +different supervisor which overrides this behavior. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java + :include: supervisor2 + +With this parent, the child survives the escalated restart, as demonstrated in +the last test: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java + :include: escalate-restart + diff --git a/akka-docs/rst/java/lambda-index-actors.rst b/akka-docs/rst/java/lambda-index-actors.rst new file mode 100644 index 0000000000..c06b6165cc --- /dev/null +++ b/akka-docs/rst/java/lambda-index-actors.rst @@ -0,0 +1,10 @@ +Actors (Java with Lambda Support) +================================= + +.. toctree:: + :maxdepth: 2 + + lambda-actors + lambda-fault-tolerance + lambda-fsm + lambda-persistence diff --git a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java new file mode 100644 index 0000000000..c8c68948aa --- /dev/null +++ b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java @@ -0,0 +1,204 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package docs.actor; + +//#testkit +import akka.actor.*; + +import static akka.actor.SupervisorStrategy.resume; +import static akka.actor.SupervisorStrategy.restart; +import static akka.actor.SupervisorStrategy.stop; +import static akka.actor.SupervisorStrategy.escalate; +import akka.japi.pf.DeciderBuilder; +import akka.japi.pf.ReceiveBuilder; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import scala.PartialFunction; +import scala.concurrent.Await; +import static akka.pattern.Patterns.ask; +import scala.concurrent.duration.Duration; +import akka.testkit.TestProbe; + +//#testkit +import akka.testkit.ErrorFilter; +import akka.testkit.EventFilter; +import akka.testkit.TestEvent; +import akka.testkit.JavaTestKit; +import static java.util.concurrent.TimeUnit.SECONDS; +import static akka.japi.Util.immutableSeq; +import scala.Option; + +import org.junit.Test; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import scala.runtime.BoxedUnit; + +//#testkit +public class FaultHandlingTest { +//#testkit + + public static Config config = ConfigFactory.parseString( + "akka {\n" + + " loggers = [\"akka.testkit.TestEventListener\"]\n" + + " loglevel = \"WARNING\"\n" + + " stdout-loglevel = \"WARNING\"\n" + + "}\n"); + + static + //#supervisor + public class Supervisor extends AbstractActor { + + //#strategy + private static SupervisorStrategy strategy = + new OneForOneStrategy(10, Duration.create("1 minute"), DeciderBuilder. + match(ArithmeticException.class, e -> resume()). + match(NullPointerException.class, e -> restart()). + match(IllegalArgumentException.class, e -> stop()). + matchAny(o -> escalate()).build()); + + @Override + public SupervisorStrategy supervisorStrategy() { + return strategy; + } + + //#strategy + + @Override + public PartialFunction receive() { + return ReceiveBuilder. + match(Props.class, props -> { + sender().tell(context().actorOf(props), self()); + }).build(); + } + } + + //#supervisor + + static + //#supervisor2 + public class Supervisor2 extends AbstractActor { + + //#strategy2 + private static SupervisorStrategy strategy = + new OneForOneStrategy(10, Duration.create("1 minute"), DeciderBuilder. + match(ArithmeticException.class, e -> resume()). + match(NullPointerException.class, e -> restart()). + match(IllegalArgumentException.class, e -> stop()). + matchAny(o -> escalate()).build()); + + @Override + public SupervisorStrategy supervisorStrategy() { + return strategy; + } + + //#strategy2 + + @Override + public PartialFunction receive() { + return ReceiveBuilder. + match(Props.class, props -> { + sender().tell(context().actorOf(props), self()); + }).build(); + } + + @Override + public void preRestart(Throwable cause, Option msg) { + // do not kill all children, which is the default here + } + } + + //#supervisor2 + + static + //#child + public class Child extends AbstractActor { + int state = 0; + + @Override + public PartialFunction receive() { + return ReceiveBuilder. + match(Exception.class, exception -> { throw exception; }). + match(Integer.class, i -> state = i). + matchEquals("get", s -> sender().tell(state, self())).build(); + } + } + + //#child + + //#testkit + static ActorSystem system; + Duration timeout = Duration.create(5, SECONDS); + + @BeforeClass + public static void start() { + system = ActorSystem.create("FaultHandlingTest", config); + } + + @AfterClass + public static void cleanup() { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + @Test + public void mustEmploySupervisorStrategy() throws Exception { + // code here + //#testkit + EventFilter ex1 = new ErrorFilter(ArithmeticException.class); + EventFilter ex2 = new ErrorFilter(NullPointerException.class); + EventFilter ex3 = new ErrorFilter(IllegalArgumentException.class); + EventFilter ex4 = new ErrorFilter(Exception.class); + EventFilter[] ignoreExceptions = { ex1, ex2, ex3, ex4 }; + system.eventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions))); + + //#create + Props superprops = Props.create(Supervisor.class); + ActorRef supervisor = system.actorOf(superprops, "supervisor"); + ActorRef child = (ActorRef) Await.result(ask(supervisor, + Props.create(Child.class), 5000), timeout); + //#create + + //#resume + child.tell(42, ActorRef.noSender()); + assert Await.result(ask(child, "get", 5000), timeout).equals(42); + child.tell(new ArithmeticException(), ActorRef.noSender()); + assert Await.result(ask(child, "get", 5000), timeout).equals(42); + //#resume + + //#restart + child.tell(new NullPointerException(), ActorRef.noSender()); + assert Await.result(ask(child, "get", 5000), timeout).equals(0); + //#restart + + //#stop + final TestProbe probe = new TestProbe(system); + probe.watch(child); + child.tell(new IllegalArgumentException(), ActorRef.noSender()); + probe.expectMsgClass(Terminated.class); + //#stop + + //#escalate-kill + child = (ActorRef) Await.result(ask(supervisor, + Props.create(Child.class), 5000), timeout); + probe.watch(child); + assert Await.result(ask(child, "get", 5000), timeout).equals(0); + child.tell(new Exception(), ActorRef.noSender()); + probe.expectMsgClass(Terminated.class); + //#escalate-kill + + //#escalate-restart + superprops = Props.create(Supervisor2.class); + supervisor = system.actorOf(superprops); + child = (ActorRef) Await.result(ask(supervisor, + Props.create(Child.class), 5000), timeout); + child.tell(23, ActorRef.noSender()); + assert Await.result(ask(child, "get", 5000), timeout).equals(23); + child.tell(new Exception(), ActorRef.noSender()); + assert Await.result(ask(child, "get", 5000), timeout).equals(0); + //#escalate-restart + //#testkit + } + +} +//#testkit diff --git a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/BuncherTest.java b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/BuncherTest.java index 66fc24bc94..22990a446a 100644 --- a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/BuncherTest.java +++ b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/BuncherTest.java @@ -36,8 +36,7 @@ public class BuncherTest { } @Test - public void testBuncherActorBatchesCorrectly() - { + public void testBuncherActorBatchesCorrectly() { new JavaTestKit(system) {{ final ActorRef buncher = system.actorOf(Props.create(Buncher.class)); @@ -64,8 +63,7 @@ public class BuncherTest { } @Test - public void testBuncherActorDoesntBatchUninitialized() - { + public void testBuncherActorDoesntBatchUninitialized() { new JavaTestKit(system) {{ final ActorRef buncher = system.actorOf(Props.create(Buncher.class)); diff --git a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/japi/FaultHandlingDocSample.java b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/japi/FaultHandlingDocSample.java new file mode 100644 index 0000000000..ab0c4f98a5 --- /dev/null +++ b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/japi/FaultHandlingDocSample.java @@ -0,0 +1,472 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package docs.actor.japi; + +//#all +//#imports +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import akka.actor.*; +import akka.dispatch.Mapper; +import akka.event.LoggingReceive; +import akka.japi.pf.DeciderBuilder; +import akka.japi.pf.ReceiveBuilder; +import akka.util.Timeout; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import scala.concurrent.duration.Duration; +import scala.PartialFunction; +import scala.runtime.BoxedUnit; + +import static akka.japi.Util.classTag; +import static akka.actor.SupervisorStrategy.resume; +import static akka.actor.SupervisorStrategy.restart; +import static akka.actor.SupervisorStrategy.stop; +import static akka.actor.SupervisorStrategy.escalate; + +import static akka.pattern.Patterns.ask; +import static akka.pattern.Patterns.pipe; + +import static docs.actor.japi.FaultHandlingDocSample.WorkerApi.*; +import static docs.actor.japi.FaultHandlingDocSample.CounterServiceApi.*; +import static docs.actor.japi.FaultHandlingDocSample.CounterApi.*; +import static docs.actor.japi.FaultHandlingDocSample.StorageApi.*; + +//#imports + +public class FaultHandlingDocSample { + + /** + * Runs the sample + */ + public static void main(String[] args) { + Config config = ConfigFactory.parseString( + "akka.loglevel = \"DEBUG\"\n" + + "akka.actor.debug {\n" + + " receive = on\n" + + " lifecycle = on\n" + + "}\n"); + + ActorSystem system = ActorSystem.create("FaultToleranceSample", config); + ActorRef worker = system.actorOf(Props.create(Worker.class), "worker"); + ActorRef listener = system.actorOf(Props.create(Listener.class), "listener"); + // start the work and listen on progress + // note that the listener is used as sender of the tell, + // i.e. it will receive replies from the worker + worker.tell(Start, listener); + } + + /** + * Listens on progress from the worker and shuts down the system when enough + * work has been done. + */ + public static class Listener extends AbstractLoggingActor { + + @Override + public void preStart() { + // If we don't get any progress within 15 seconds then the service + // is unavailable + context().setReceiveTimeout(Duration.create("15 seconds")); + } + + @Override + public PartialFunction receive() { + return LoggingReceive.create(ReceiveBuilder. + match(Progress.class, progress -> { + log().info("Current progress: {} %", progress.percent); + if (progress.percent >= 100.0) { + log().info("That's all, shutting down"); + context().system().shutdown(); + } + }). + matchEquals(ReceiveTimeout.getInstance(), x -> { + // No progress within 15 seconds, ServiceUnavailable + log().error("Shutting down due to unavailable service"); + context().system().shutdown(); + }).build(), context()); + } + } + + //#messages + public interface WorkerApi { + public static final Object Start = "Start"; + public static final Object Do = "Do"; + + public static class Progress { + public final double percent; + + public Progress(double percent) { + this.percent = percent; + } + + public String toString() { + return String.format("%s(%s)", getClass().getSimpleName(), percent); + } + } + } + + //#messages + + /** + * Worker performs some work when it receives the Start message. It will + * continuously notify the sender of the Start message of current Progress. + * The Worker supervise the CounterService. + */ + public static class Worker extends AbstractLoggingActor { + final Timeout askTimeout = new Timeout(Duration.create(5, "seconds")); + + // The sender of the initial Start message will continuously be notified + // about progress + ActorRef progressListener; + final ActorRef counterService = context().actorOf( + Props.create(CounterService.class), "counter"); + final int totalCount = 51; + + // Stop the CounterService child if it throws ServiceUnavailable + private static final SupervisorStrategy strategy = + new OneForOneStrategy(DeciderBuilder. + match(ServiceUnavailable.class, e -> stop()). + matchAny(o -> escalate()).build()); + + @Override + public SupervisorStrategy supervisorStrategy() { + return strategy; + } + + @Override + public PartialFunction receive() { + return LoggingReceive.create(ReceiveBuilder. + matchEquals(Start, x -> progressListener == null, x -> { + progressListener = sender(); + context().system().scheduler().schedule( + Duration.Zero(), Duration.create(1, "second"), self(), Do, + context().dispatcher(), null + ); + }). + matchEquals(Do, x -> { + counterService.tell(new Increment(1), self()); + counterService.tell(new Increment(1), self()); + counterService.tell(new Increment(1), self()); + // Send current progress to the initial sender + pipe(ask(counterService, GetCurrentCount, askTimeout) + .mapTo(classTag(CurrentCount.class)) + .map(new Mapper() { + public Progress apply(CurrentCount c) { + return new Progress(100.0 * c.count / totalCount); + } + }, context().dispatcher()), context().dispatcher()) + .to(progressListener); + }).build(), context()); + } + } + + //#messages + public interface CounterServiceApi { + + public static final Object GetCurrentCount = "GetCurrentCount"; + + public static class CurrentCount { + public final String key; + public final long count; + + public CurrentCount(String key, long count) { + this.key = key; + this.count = count; + } + + public String toString() { + return String.format("%s(%s, %s)", getClass().getSimpleName(), key, count); + } + } + + public static class Increment { + public final long n; + + public Increment(long n) { + this.n = n; + } + + public String toString() { + return String.format("%s(%s)", getClass().getSimpleName(), n); + } + } + + public static class ServiceUnavailable extends RuntimeException { + private static final long serialVersionUID = 1L; + public ServiceUnavailable(String msg) { + super(msg); + } + } + + } + + //#messages + + /** + * Adds the value received in Increment message to a persistent counter. + * Replies with CurrentCount when it is asked for CurrentCount. CounterService + * supervise Storage and Counter. + */ + public static class CounterService extends AbstractLoggingActor { + + // Reconnect message + static final Object Reconnect = "Reconnect"; + + private static class SenderMsgPair { + final ActorRef sender; + final Object msg; + + SenderMsgPair(ActorRef sender, Object msg) { + this.msg = msg; + this.sender = sender; + } + } + + final String key = self().path().name(); + ActorRef storage; + ActorRef counter; + final List backlog = new ArrayList<>(); + final int MAX_BACKLOG = 10000; + + // Restart the storage child when StorageException is thrown. + // After 3 restarts within 5 seconds it will be stopped. + private static final SupervisorStrategy strategy = + new OneForOneStrategy(3, Duration.create("5 seconds"), DeciderBuilder. + match(StorageException.class, e -> restart()). + matchAny(o -> escalate()).build()); + + @Override + public SupervisorStrategy supervisorStrategy() { + return strategy; + } + + @Override + public void preStart() { + initStorage(); + } + + /** + * The child storage is restarted in case of failure, but after 3 restarts, + * and still failing it will be stopped. Better to back-off than + * continuously failing. When it has been stopped we will schedule a + * Reconnect after a delay. Watch the child so we receive Terminated message + * when it has been terminated. + */ + void initStorage() { + storage = context().watch(context().actorOf( + Props.create(Storage.class), "storage")); + // Tell the counter, if any, to use the new storage + if (counter != null) + counter.tell(new UseStorage(storage), self()); + // We need the initial value to be able to operate + storage.tell(new Get(key), self()); + } + + @Override + public PartialFunction receive() { + return LoggingReceive.create(ReceiveBuilder. + match(Entry.class, entry -> entry.key.equals(key) && counter == null, entry -> { + // Reply from Storage of the initial value, now we can create the Counter + final long value = entry.value; + counter = context().actorOf(Props.create(Counter.class, key, value)); + // Tell the counter to use current storage + counter.tell(new UseStorage(storage), self()); + // and send the buffered backlog to the counter + for (SenderMsgPair each : backlog) { + counter.tell(each.msg, each.sender); + } + backlog.clear(); + }). + match(Increment.class, increment -> { + forwardOrPlaceInBacklog(increment); + }). + matchEquals(GetCurrentCount, gcc -> { + forwardOrPlaceInBacklog(gcc); + }). + match(Terminated.class, o -> { + // After 3 restarts the storage child is stopped. + // We receive Terminated because we watch the child, see initStorage. + storage = null; + // Tell the counter that there is no storage for the moment + counter.tell(new UseStorage(null), self()); + // Try to re-establish storage after while + context().system().scheduler().scheduleOnce( + Duration.create(10, "seconds"), self(), Reconnect, + context().dispatcher(), null); + }). + matchEquals(Reconnect, o -> { + // Re-establish storage after the scheduled delay + initStorage(); + }).build(), context()); + } + + void forwardOrPlaceInBacklog(Object msg) { + // We need the initial value from storage before we can start delegate to + // the counter. Before that we place the messages in a backlog, to be sent + // to the counter when it is initialized. + if (counter == null) { + if (backlog.size() >= MAX_BACKLOG) + throw new ServiceUnavailable("CounterService not available," + + " lack of initial value"); + backlog.add(new SenderMsgPair(sender(), msg)); + } else { + counter.forward(msg, context()); + } + } + } + + //#messages + public interface CounterApi { + public static class UseStorage { + public final ActorRef storage; + + public UseStorage(ActorRef storage) { + this.storage = storage; + } + + public String toString() { + return String.format("%s(%s)", getClass().getSimpleName(), storage); + } + } + } + + //#messages + + /** + * The in memory count variable that will send current value to the Storage, + * if there is any storage available at the moment. + */ + public static class Counter extends AbstractLoggingActor { + final String key; + long count; + ActorRef storage; + + public Counter(String key, long initialValue) { + this.key = key; + this.count = initialValue; + } + + @Override + public PartialFunction receive() { + return LoggingReceive.create(ReceiveBuilder. + match(UseStorage.class, useStorage -> { + storage = useStorage.storage; + storeCount(); + }). + match(Increment.class, increment -> { + count += increment.n; + storeCount(); + }). + matchEquals(GetCurrentCount, gcc -> { + sender().tell(new CurrentCount(key, count), self()); + }).build(), context()); + } + + void storeCount() { + // Delegate dangerous work, to protect our valuable state. + // We can continue without storage. + if (storage != null) { + storage.tell(new Store(new Entry(key, count)), self()); + } + } + } + + //#messages + public interface StorageApi { + + public static class Store { + public final Entry entry; + + public Store(Entry entry) { + this.entry = entry; + } + + public String toString() { + return String.format("%s(%s)", getClass().getSimpleName(), entry); + } + } + + public static class Entry { + public final String key; + public final long value; + + public Entry(String key, long value) { + this.key = key; + this.value = value; + } + + public String toString() { + return String.format("%s(%s, %s)", getClass().getSimpleName(), key, value); + } + } + + public static class Get { + public final String key; + + public Get(String key) { + this.key = key; + } + + public String toString() { + return String.format("%s(%s)", getClass().getSimpleName(), key); + } + } + + public static class StorageException extends RuntimeException { + private static final long serialVersionUID = 1L; + public StorageException(String msg) { + super(msg); + } + } + } + + //#messages + + /** + * Saves key/value pairs to persistent storage when receiving Store message. + * Replies with current value when receiving Get message. Will throw + * StorageException if the underlying data store is out of order. + */ + public static class Storage extends AbstractLoggingActor { + + final DummyDB db = DummyDB.instance; + + @Override + public PartialFunction receive() { + return LoggingReceive.create(ReceiveBuilder. + match(Store.class, store -> { + db.save(store.entry.key, store.entry.value); + }). + match(Get.class, get -> { + Long value = db.load(get.key); + sender().tell(new Entry(get.key, value == null ? + Long.valueOf(0L) : value), self()); + }).build(), context()); + } + } + + //#dummydb + public static class DummyDB { + public static final DummyDB instance = new DummyDB(); + private final Map db = new HashMap(); + + private DummyDB() { + } + + public synchronized void save(String key, Long value) throws StorageException { + if (11 <= value && value <= 14) + throw new StorageException("Simulated store failure " + value); + db.put(key, value); + } + + public synchronized Long load(String key) throws StorageException { + return db.get(key); + } + } + //#dummydb +} +//#all