From 4e2b8190a3a137f80b4319a994f6a762d6e906a0 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 10 Sep 2015 15:35:26 +0200 Subject: [PATCH] =cls #18370 Document supervision for Cluster Sharding --- .../sharding/ClusterShardingSpec.scala | 26 ++++++++++++++ .../cluster/sharding/ClusterShardingTest.java | 35 +++++++++++++++++++ akka-docs/rst/java/cluster-sharding.rst | 15 ++++++++ akka-docs/rst/scala/cluster-sharding.rst | 15 ++++++++ 4 files changed, 91 insertions(+) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 91fa0dfda4..0fb3defe07 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -95,6 +95,23 @@ object ClusterShardingSpec { class AnotherCounter extends QualifiedCounter("AnotherCounter") + //#supervisor + class CounterSupervisor extends Actor { + val counter = context.actorOf(Props[Counter], "theCounter") + + override val supervisorStrategy = OneForOneStrategy() { + case _: IllegalArgumentException ⇒ SupervisorStrategy.Resume + case _: ActorInitializationException ⇒ SupervisorStrategy.Stop + case _: DeathPactException ⇒ SupervisorStrategy.Stop + case _: Exception ⇒ SupervisorStrategy.Restart + } + + def receive = { + case msg ⇒ counter forward msg + } + } + //#supervisor + } abstract class ClusterShardingSpecConfig(val mode: String) extends MultiNodeConfig { @@ -561,6 +578,15 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu settings = ClusterShardingSettings(system), extractEntityId = extractEntityId, extractShardId = extractShardId) + + //#counter-supervisor-start + ClusterSharding(system).start( + typeName = "SupervisedCounter", + entityProps = Props[CounterSupervisor], + settings = ClusterShardingSettings(system), + extractEntityId = extractEntityId, + extractShardId = extractShardId) + //#counter-supervisor-start } enterBarrier("extension-started") runOn(fifth) { diff --git a/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java b/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java index e7ed709a78..9f38478277 100644 --- a/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java +++ b/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java @@ -8,16 +8,21 @@ import java.util.concurrent.TimeUnit; import scala.concurrent.duration.Duration; import akka.actor.AbstractActor; +import akka.actor.ActorInitializationException; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.OneForOneStrategy; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.actor.SupervisorStrategy; import akka.actor.Terminated; import akka.actor.ReceiveTimeout; +import akka.actor.UntypedActor; import akka.japi.Procedure; import akka.japi.Option; import akka.persistence.UntypedPersistentActor; import akka.cluster.Cluster; +import akka.japi.pf.DeciderBuilder; import akka.japi.pf.ReceiveBuilder; // Doc code, compile only @@ -83,6 +88,11 @@ public class ClusterShardingTest { Counter.CounterOp.INCREMENT), getSelf()); counterRegion.tell(new Counter.Get(123), getSelf()); //#counter-usage + + //#counter-supervisor-start + ClusterSharding.get(system).start("SupervisedCounter", + Props.create(CounterSupervisor.class), settings, messageExtractor); + //#counter-supervisor-start } static//#counter-actor @@ -195,4 +205,29 @@ public class ClusterShardingTest { } //#graceful-shutdown + static//#supervisor + public class CounterSupervisor extends UntypedActor { + + private final ActorRef counter = getContext().actorOf( + Props.create(Counter.class), "theCounter"); + + private static final SupervisorStrategy strategy = + new OneForOneStrategy(DeciderBuilder. + match(IllegalArgumentException.class, e -> SupervisorStrategy.resume()). + match(ActorInitializationException.class, e -> SupervisorStrategy.stop()). + match(Exception.class, e -> SupervisorStrategy.restart()). + matchAny(o -> SupervisorStrategy.escalate()).build()); + + @Override + public SupervisorStrategy supervisorStrategy() { + return strategy; + } + + @Override + public void onReceive(Object msg) { + counter.forward(msg, getContext()); + } + } + //#supervisor + } diff --git a/akka-docs/rst/java/cluster-sharding.rst b/akka-docs/rst/java/cluster-sharding.rst index ad200770dd..528875281a 100644 --- a/akka-docs/rst/java/cluster-sharding.rst +++ b/akka-docs/rst/java/cluster-sharding.rst @@ -241,6 +241,21 @@ using a ``Passivate``. Note that the state of the entities themselves will not be restored unless they have been made persistent, e.g. with :ref:`persistence-java`. +Supervision +----------- + +If you need to use another ``supervisorStrategy`` for the entity actors than the default (restarting) strategy +you need to create an intermediate parent actor that defines the ``supervisorStrategy`` to the +child entity actor. + +.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#supervisor + +You start such a supervisor in the same way as if it was the entity actor. + +.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-supervisor-start + +Note that stopped entities will be started again when a new message is targeted to the entity. + Graceful Shutdown ----------------- diff --git a/akka-docs/rst/scala/cluster-sharding.rst b/akka-docs/rst/scala/cluster-sharding.rst index 4dff2f462a..0eff4bce12 100644 --- a/akka-docs/rst/scala/cluster-sharding.rst +++ b/akka-docs/rst/scala/cluster-sharding.rst @@ -244,6 +244,21 @@ using a ``Passivate``. Note that the state of the entities themselves will not be restored unless they have been made persistent, e.g. with :ref:`persistence-scala`. +Supervision +----------- + +If you need to use another ``supervisorStrategy`` for the entity actors than the default (restarting) strategy +you need to create an intermediate parent actor that defines the ``supervisorStrategy`` to the +child entity actor. + +.. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala#supervisor + +You start such a supervisor in the same way as if it was the entity actor. + +.. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala#counter-supervisor-start + +Note that stopped entities will be started again when a new message is targeted to the entity. + Graceful Shutdown -----------------