Add some more imports to cluster sharding doc examples (#23305)
This commit is contained in:
parent
71175eaf54
commit
dd71f5759f
3 changed files with 27 additions and 16 deletions
|
|
@ -39,7 +39,7 @@ Scala
|
|||
: @@snip [ClusterShardingSpec.scala]($akka$/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala) { #counter-actor }
|
||||
|
||||
Java
|
||||
: @@snip [ClusterShardingTest.java]($akka$/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java) { #counter-actor }
|
||||
: @@snip [ClusterShardingTest.java]($code$/java/jdocs/sharding/ClusterShardingTest.java) { #counter-actor }
|
||||
|
||||
The above actor uses event sourcing and the support provided in @scala[`PersistentActor`] @java[`AbstractPersistentActor`] to store its state.
|
||||
It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover
|
||||
|
|
@ -56,7 +56,7 @@ Scala
|
|||
: @@snip [ClusterShardingSpec.scala]($akka$/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala) { #counter-start }
|
||||
|
||||
Java
|
||||
: @@snip [ClusterShardingTest.java]($akka$/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java) { #counter-start }
|
||||
: @@snip [ClusterShardingTest.java]($code$/java/jdocs/sharding/ClusterShardingTest.java) { #counter-start }
|
||||
|
||||
The @scala[`extractEntityId` and `extractShardId` are two] @java[`messageExtractor` defines] application specific @scala[functions] @java[methods] to extract the entity
|
||||
identifier and the shard identifier from incoming messages.
|
||||
|
|
@ -65,7 +65,7 @@ Scala
|
|||
: @@snip [ClusterShardingSpec.scala]($akka$/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala) { #counter-extractor }
|
||||
|
||||
Java
|
||||
: @@snip [ClusterShardingTest.java]($akka$/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java) { #counter-extractor }
|
||||
: @@snip [ClusterShardingTest.java]($code$/java/jdocs/sharding/ClusterShardingTest.java) { #counter-extractor }
|
||||
|
||||
This example illustrates two different ways to define the entity identifier in the messages:
|
||||
|
||||
|
|
@ -103,14 +103,14 @@ Scala
|
|||
: @@snip [ClusterShardingSpec.scala]($akka$/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala) { #counter-usage }
|
||||
|
||||
Java
|
||||
: @@snip [ClusterShardingTest.java]($akka$/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java) { #counter-usage }
|
||||
: @@snip [ClusterShardingTest.java]($code$/java/jdocs/sharding/ClusterShardingTest.java) { #counter-usage }
|
||||
|
||||
@@@ div { .group-scala }
|
||||
|
||||
A more comprehensive sample is available in the
|
||||
tutorial named [Akka Cluster Sharding with Scala!](https://github.com/typesafehub/activator-akka-cluster-sharding-scala).
|
||||
|
||||
@@@
|
||||
@@@
|
||||
|
||||
## How it works
|
||||
|
||||
|
|
@ -188,7 +188,7 @@ must be to begin the rebalancing. This strategy can be replaced by an applicatio
|
|||
implementation.
|
||||
|
||||
The state of shard locations in the `ShardCoordinator` is persistent (durable) with
|
||||
@ref:[Distributed Data](distributed-data.md) or @ref:[Persistence](persistence.md) to survive failures. When a crashed or
|
||||
@ref:[Distributed Data](distributed-data.md) or @ref:[Persistence](persistence.md) to survive failures. When a crashed or
|
||||
unreachable coordinator node has been removed (via down) from the cluster a new `ShardCoordinator` singleton
|
||||
actor will take over and the state is recovered. During such a failure period shards
|
||||
with known location are still available, while messages for new (unknown) shards
|
||||
|
|
@ -298,7 +298,7 @@ Scala
|
|||
: @@snip [ClusterShardingSpec.scala]($akka$/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala) { #extractShardId-StartEntity }
|
||||
|
||||
Java
|
||||
: @@snip [ClusterShardingTest.java]($akka$/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java) { #extractShardId-StartEntity }
|
||||
: @@snip [ClusterShardingTest.java]($code$/java/jdocs/sharding/ClusterShardingTest.java) { #extractShardId-StartEntity }
|
||||
|
||||
When configured to remember entities, whenever a `Shard` is rebalanced onto another
|
||||
node or recovers after a crash it will recreate all the entities which were previously
|
||||
|
|
@ -307,8 +307,8 @@ sent to the parent of the entity actor, otherwise the entity will be automatical
|
|||
restarted after the entity restart backoff specified in the configuration.
|
||||
|
||||
When [Distributed Data mode](#cluster-sharding-mode) is used the identifiers of the entities are
|
||||
stored in @ref:[Durable Storage](distributed-data.md#ddata-durable) of Distributed Data. You may want to change the
|
||||
configuration of the akka.cluster.sharding.distributed-data.durable.lmdb.dir`, since
|
||||
stored in @ref[Durable Storage](distributed-data.md#ddata-durable) of Distributed Data. You may want to change the
|
||||
configuration of the `akka.cluster.sharding.distributed-data.durable.lmdb.dir`, since
|
||||
the default directory contains the remote port of the actor system. If using a dynamically
|
||||
assigned port (0) it will be different each time and the previously stored data will not
|
||||
be loaded.
|
||||
|
|
@ -319,7 +319,7 @@ for that entity has been received in the `Shard`. Entities will not be restarted
|
|||
using a `Passivate`.
|
||||
|
||||
Note that the state of the entities themselves will not be restored unless they have been made persistent,
|
||||
e.g. with @ref:[Persistence](persistence.md).
|
||||
e.g. with @ref[Persistence](persistence.md).
|
||||
|
||||
The performance cost of `rememberEntities` is rather high when starting/stopping entities and when
|
||||
shards are rebalanced. This cost increases with number of entities per shard and we currently don't
|
||||
|
|
@ -335,7 +335,7 @@ Scala
|
|||
: @@snip [ClusterShardingSpec.scala]($akka$/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala) { #supervisor }
|
||||
|
||||
Java
|
||||
: @@snip [ClusterShardingTest.java]($akka$/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java) { #supervisor }
|
||||
: @@snip [ClusterShardingTest.java]($code$/java/jdocs/sharding/ClusterShardingTest.java) { #supervisor }
|
||||
|
||||
You start such a supervisor in the same way as if it was the entity actor.
|
||||
|
||||
|
|
@ -343,13 +343,13 @@ Scala
|
|||
: @@snip [ClusterShardingSpec.scala]($akka$/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala) { #counter-supervisor-start }
|
||||
|
||||
Java
|
||||
: @@snip [ClusterShardingTest.java]($akka$/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java) { #counter-supervisor-start }
|
||||
: @@snip [ClusterShardingTest.java]($code$/java/jdocs/sharding/ClusterShardingTest.java) { #counter-supervisor-start }
|
||||
|
||||
Note that stopped entities will be started again when a new message is targeted to the entity.
|
||||
|
||||
## Graceful Shutdown
|
||||
|
||||
You can send the @scala[`ShardRegion.GracefulShutdown`] @java[`ShardRegion.gracefulShutdownInstance`] message
|
||||
You can send the @scala[`ShardRegion.GracefulShutdown`] @java[`ShardRegion.gracefulShutdownInstance`] message
|
||||
to the `ShardRegion` actor to handoff all shards that are hosted by that `ShardRegion` and then the
|
||||
`ShardRegion` actor will be stopped. You can `watch` the `ShardRegion` actor to know when it is completed.
|
||||
During this period other regions will buffer messages for those shards in the same way as when a rebalance is
|
||||
|
|
@ -442,7 +442,7 @@ if needed.
|
|||
@@snip [reference.conf]($akka$/akka-cluster-sharding/src/main/resources/reference.conf) { #sharding-ext-config }
|
||||
|
||||
Custom shard allocation strategy can be defined in an optional parameter to
|
||||
`ClusterSharding.start`. See the API documentation of @scala[`ShardAllocationStrategy`] @java[`AbstractShardAllocationStrategy`] for details
|
||||
`ClusterSharding.start`. See the API documentation of @scala[`ShardAllocationStrategy`] @java[`AbstractShardAllocationStrategy`] for details
|
||||
of how to implement a custom shard allocation strategy.
|
||||
|
||||
## Inspecting cluster sharding state
|
||||
|
|
@ -467,5 +467,5 @@ When doing rolling upgrades special care must be taken to not change any of the
|
|||
* the `extractShardId` function
|
||||
* the role that the shard regions run on
|
||||
* the persistence mode
|
||||
|
||||
|
||||
If any one of these needs a change it will require a full cluster restart.
|
||||
266
akka-docs/src/test/java/jdocs/sharding/ClusterShardingTest.java
Normal file
266
akka-docs/src/test/java/jdocs/sharding/ClusterShardingTest.java
Normal file
|
|
@ -0,0 +1,266 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.sharding;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
import scala.concurrent.duration.Duration;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorInitializationException;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.OneForOneStrategy;
|
||||
import akka.actor.PoisonPill;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.SupervisorStrategy;
|
||||
import akka.actor.Terminated;
|
||||
import akka.actor.ReceiveTimeout;
|
||||
//#counter-extractor
|
||||
import akka.cluster.sharding.ShardRegion;
|
||||
|
||||
//#counter-extractor
|
||||
|
||||
//#counter-start
|
||||
import akka.japi.Option;
|
||||
import akka.cluster.sharding.ClusterSharding;
|
||||
import akka.cluster.sharding.ClusterShardingSettings;
|
||||
|
||||
//#counter-start
|
||||
import akka.persistence.AbstractPersistentActor;
|
||||
import akka.cluster.Cluster;
|
||||
import akka.japi.pf.DeciderBuilder;
|
||||
|
||||
// Doc code, compile only
|
||||
public class ClusterShardingTest {
|
||||
|
||||
ActorSystem system = null;
|
||||
|
||||
ActorRef getSelf() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public void demonstrateUsage() {
|
||||
//#counter-extractor
|
||||
ShardRegion.MessageExtractor messageExtractor = new ShardRegion.MessageExtractor() {
|
||||
|
||||
@Override
|
||||
public String entityId(Object message) {
|
||||
if (message instanceof Counter.EntityEnvelope)
|
||||
return String.valueOf(((Counter.EntityEnvelope) message).id);
|
||||
else if (message instanceof Counter.Get)
|
||||
return String.valueOf(((Counter.Get) message).counterId);
|
||||
else
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object entityMessage(Object message) {
|
||||
if (message instanceof Counter.EntityEnvelope)
|
||||
return ((Counter.EntityEnvelope) message).payload;
|
||||
else
|
||||
return message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String shardId(Object message) {
|
||||
int numberOfShards = 100;
|
||||
if (message instanceof Counter.EntityEnvelope) {
|
||||
long id = ((Counter.EntityEnvelope) message).id;
|
||||
return String.valueOf(id % numberOfShards);
|
||||
} else if (message instanceof Counter.Get) {
|
||||
long id = ((Counter.Get) message).counterId;
|
||||
return String.valueOf(id % numberOfShards);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
//#counter-extractor
|
||||
|
||||
//#counter-start
|
||||
Option<String> roleOption = Option.none();
|
||||
ClusterShardingSettings settings = ClusterShardingSettings.create(system);
|
||||
ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter",
|
||||
Props.create(Counter.class), settings, messageExtractor);
|
||||
//#counter-start
|
||||
|
||||
//#counter-usage
|
||||
ActorRef counterRegion = ClusterSharding.get(system).shardRegion("Counter");
|
||||
counterRegion.tell(new Counter.Get(123), getSelf());
|
||||
|
||||
counterRegion.tell(new Counter.EntityEnvelope(123,
|
||||
Counter.CounterOp.INCREMENT), getSelf());
|
||||
counterRegion.tell(new Counter.Get(123), getSelf());
|
||||
//#counter-usage
|
||||
|
||||
//#counter-supervisor-start
|
||||
ClusterSharding.get(system).start("SupervisedCounter",
|
||||
Props.create(CounterSupervisor.class), settings, messageExtractor);
|
||||
//#counter-supervisor-start
|
||||
}
|
||||
|
||||
public void demonstrateUsage2() {
|
||||
ShardRegion.MessageExtractor messageExtractor = new ShardRegion.MessageExtractor() {
|
||||
|
||||
@Override
|
||||
public String entityId(Object message) {
|
||||
if (message instanceof Counter.EntityEnvelope)
|
||||
return String.valueOf(((Counter.EntityEnvelope) message).id);
|
||||
else if (message instanceof Counter.Get)
|
||||
return String.valueOf(((Counter.Get) message).counterId);
|
||||
else
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object entityMessage(Object message) {
|
||||
if (message instanceof Counter.EntityEnvelope)
|
||||
return ((Counter.EntityEnvelope) message).payload;
|
||||
else
|
||||
return message;
|
||||
}
|
||||
|
||||
//#extractShardId-StartEntity
|
||||
@Override
|
||||
public String shardId(Object message) {
|
||||
int numberOfShards = 100;
|
||||
if (message instanceof Counter.EntityEnvelope) {
|
||||
long id = ((Counter.EntityEnvelope) message).id;
|
||||
return String.valueOf(id % numberOfShards);
|
||||
} else if (message instanceof Counter.Get) {
|
||||
long id = ((Counter.Get) message).counterId;
|
||||
return String.valueOf(id % numberOfShards);
|
||||
} else if (message instanceof ShardRegion.StartEntity) {
|
||||
long id = Long.valueOf(((ShardRegion.StartEntity) message).entityId());
|
||||
return String.valueOf(id % numberOfShards);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
//#extractShardId-StartEntity
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
static//#counter-actor
|
||||
public class Counter extends AbstractPersistentActor {
|
||||
|
||||
public static enum CounterOp {
|
||||
INCREMENT, DECREMENT
|
||||
}
|
||||
|
||||
public static class Get {
|
||||
final public long counterId;
|
||||
|
||||
public Get(long counterId) {
|
||||
this.counterId = counterId;
|
||||
}
|
||||
}
|
||||
|
||||
public static class EntityEnvelope {
|
||||
final public long id;
|
||||
final public Object payload;
|
||||
|
||||
public EntityEnvelope(long id, Object payload) {
|
||||
this.id = id;
|
||||
this.payload = payload;
|
||||
}
|
||||
}
|
||||
|
||||
public static class CounterChanged {
|
||||
final public int delta;
|
||||
|
||||
public CounterChanged(int delta) {
|
||||
this.delta = delta;
|
||||
}
|
||||
}
|
||||
|
||||
int count = 0;
|
||||
|
||||
// getSelf().path().name() is the entity identifier (utf-8 URL-encoded)
|
||||
@Override
|
||||
public String persistenceId() {
|
||||
return "Counter-" + getSelf().path().name();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() throws Exception {
|
||||
super.preStart();
|
||||
getContext().setReceiveTimeout(Duration.create(120, SECONDS));
|
||||
}
|
||||
|
||||
void updateState(CounterChanged event) {
|
||||
count += event.delta;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceiveRecover() {
|
||||
return receiveBuilder()
|
||||
.match(CounterChanged.class, this::updateState)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(Get.class, this::receiveGet)
|
||||
.matchEquals(CounterOp.INCREMENT, msg -> receiveIncrement())
|
||||
.matchEquals(CounterOp.DECREMENT, msg -> receiveDecrement())
|
||||
.matchEquals(ReceiveTimeout.getInstance(), msg -> passivate())
|
||||
.build();
|
||||
}
|
||||
|
||||
private void receiveGet(Get msg) {
|
||||
getSender().tell(count, getSelf());
|
||||
}
|
||||
|
||||
private void receiveIncrement() {
|
||||
persist(new CounterChanged(+1), this::updateState);
|
||||
}
|
||||
|
||||
private void receiveDecrement() {
|
||||
persist(new CounterChanged(-1), this::updateState);
|
||||
}
|
||||
|
||||
private void passivate() {
|
||||
getContext().getParent().tell(
|
||||
new ShardRegion.Passivate(PoisonPill.getInstance()), getSelf());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//#counter-actor
|
||||
|
||||
static//#supervisor
|
||||
public class CounterSupervisor extends AbstractActor {
|
||||
|
||||
private final ActorRef counter = getContext().actorOf(
|
||||
Props.create(Counter.class), "theCounter");
|
||||
|
||||
private static final SupervisorStrategy strategy =
|
||||
new OneForOneStrategy(DeciderBuilder.
|
||||
match(IllegalArgumentException.class, e -> SupervisorStrategy.resume()).
|
||||
match(ActorInitializationException.class, e -> SupervisorStrategy.stop()).
|
||||
match(Exception.class, e -> SupervisorStrategy.restart()).
|
||||
matchAny(o -> SupervisorStrategy.escalate()).build());
|
||||
|
||||
@Override
|
||||
public SupervisorStrategy supervisorStrategy() {
|
||||
return strategy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(Object.class, msg -> counter.forward(msg, getContext()))
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
||||
//#supervisor
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue