=cls #18370 Document supervision for Cluster Sharding
This commit is contained in:
parent
1de8f05854
commit
4e2b8190a3
4 changed files with 91 additions and 0 deletions
|
|
@ -95,6 +95,23 @@ object ClusterShardingSpec {
|
|||
|
||||
class AnotherCounter extends QualifiedCounter("AnotherCounter")
|
||||
|
||||
//#supervisor
|
||||
class CounterSupervisor extends Actor {
|
||||
val counter = context.actorOf(Props[Counter], "theCounter")
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy() {
|
||||
case _: IllegalArgumentException ⇒ SupervisorStrategy.Resume
|
||||
case _: ActorInitializationException ⇒ SupervisorStrategy.Stop
|
||||
case _: DeathPactException ⇒ SupervisorStrategy.Stop
|
||||
case _: Exception ⇒ SupervisorStrategy.Restart
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case msg ⇒ counter forward msg
|
||||
}
|
||||
}
|
||||
//#supervisor
|
||||
|
||||
}
|
||||
|
||||
abstract class ClusterShardingSpecConfig(val mode: String) extends MultiNodeConfig {
|
||||
|
|
@ -561,6 +578,15 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
|
|||
settings = ClusterShardingSettings(system),
|
||||
extractEntityId = extractEntityId,
|
||||
extractShardId = extractShardId)
|
||||
|
||||
//#counter-supervisor-start
|
||||
ClusterSharding(system).start(
|
||||
typeName = "SupervisedCounter",
|
||||
entityProps = Props[CounterSupervisor],
|
||||
settings = ClusterShardingSettings(system),
|
||||
extractEntityId = extractEntityId,
|
||||
extractShardId = extractShardId)
|
||||
//#counter-supervisor-start
|
||||
}
|
||||
enterBarrier("extension-started")
|
||||
runOn(fifth) {
|
||||
|
|
|
|||
|
|
@ -8,16 +8,21 @@ import java.util.concurrent.TimeUnit;
|
|||
import scala.concurrent.duration.Duration;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorInitializationException;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.OneForOneStrategy;
|
||||
import akka.actor.PoisonPill;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.SupervisorStrategy;
|
||||
import akka.actor.Terminated;
|
||||
import akka.actor.ReceiveTimeout;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.japi.Procedure;
|
||||
import akka.japi.Option;
|
||||
import akka.persistence.UntypedPersistentActor;
|
||||
import akka.cluster.Cluster;
|
||||
import akka.japi.pf.DeciderBuilder;
|
||||
import akka.japi.pf.ReceiveBuilder;
|
||||
|
||||
// Doc code, compile only
|
||||
|
|
@ -83,6 +88,11 @@ public class ClusterShardingTest {
|
|||
Counter.CounterOp.INCREMENT), getSelf());
|
||||
counterRegion.tell(new Counter.Get(123), getSelf());
|
||||
//#counter-usage
|
||||
|
||||
//#counter-supervisor-start
|
||||
ClusterSharding.get(system).start("SupervisedCounter",
|
||||
Props.create(CounterSupervisor.class), settings, messageExtractor);
|
||||
//#counter-supervisor-start
|
||||
}
|
||||
|
||||
static//#counter-actor
|
||||
|
|
@ -195,4 +205,29 @@ public class ClusterShardingTest {
|
|||
}
|
||||
//#graceful-shutdown
|
||||
|
||||
static//#supervisor
|
||||
public class CounterSupervisor extends UntypedActor {
|
||||
|
||||
private final ActorRef counter = getContext().actorOf(
|
||||
Props.create(Counter.class), "theCounter");
|
||||
|
||||
private static final SupervisorStrategy strategy =
|
||||
new OneForOneStrategy(DeciderBuilder.
|
||||
match(IllegalArgumentException.class, e -> SupervisorStrategy.resume()).
|
||||
match(ActorInitializationException.class, e -> SupervisorStrategy.stop()).
|
||||
match(Exception.class, e -> SupervisorStrategy.restart()).
|
||||
matchAny(o -> SupervisorStrategy.escalate()).build());
|
||||
|
||||
@Override
|
||||
public SupervisorStrategy supervisorStrategy() {
|
||||
return strategy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object msg) {
|
||||
counter.forward(msg, getContext());
|
||||
}
|
||||
}
|
||||
//#supervisor
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -241,6 +241,21 @@ using a ``Passivate``.
|
|||
Note that the state of the entities themselves will not be restored unless they have been made persistent,
|
||||
e.g. with :ref:`persistence-java`.
|
||||
|
||||
Supervision
|
||||
-----------
|
||||
|
||||
If you need to use another ``supervisorStrategy`` for the entity actors than the default (restarting) strategy
|
||||
you need to create an intermediate parent actor that defines the ``supervisorStrategy`` to the
|
||||
child entity actor.
|
||||
|
||||
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#supervisor
|
||||
|
||||
You start such a supervisor in the same way as if it was the entity actor.
|
||||
|
||||
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-supervisor-start
|
||||
|
||||
Note that stopped entities will be started again when a new message is targeted to the entity.
|
||||
|
||||
Graceful Shutdown
|
||||
-----------------
|
||||
|
||||
|
|
|
|||
|
|
@ -244,6 +244,21 @@ using a ``Passivate``.
|
|||
Note that the state of the entities themselves will not be restored unless they have been made persistent,
|
||||
e.g. with :ref:`persistence-scala`.
|
||||
|
||||
Supervision
|
||||
-----------
|
||||
|
||||
If you need to use another ``supervisorStrategy`` for the entity actors than the default (restarting) strategy
|
||||
you need to create an intermediate parent actor that defines the ``supervisorStrategy`` to the
|
||||
child entity actor.
|
||||
|
||||
.. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala#supervisor
|
||||
|
||||
You start such a supervisor in the same way as if it was the entity actor.
|
||||
|
||||
.. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala#counter-supervisor-start
|
||||
|
||||
Note that stopped entities will be started again when a new message is targeted to the entity.
|
||||
|
||||
Graceful Shutdown
|
||||
-----------------
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue