doc: stylish cluster-sharding.md, #24717

This commit is contained in:
Patrik Nordwall 2019-09-03 13:00:53 +02:00
parent ec6fab4481
commit 9805489e98
5 changed files with 165 additions and 98 deletions

View file

@ -67,7 +67,7 @@ public class HelloWorldPersistentEntityExample {
extends EventSourcedEntity<HelloWorld.Command, HelloWorld.Greeted, HelloWorld.KnownPeople> {
// Command
interface Command extends CborSerializable {}
public interface Command extends CborSerializable {}
public static final class Greet implements Command {
public final String whom;

View file

@ -9,7 +9,10 @@ import java.time.Duration;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
// #import
import akka.cluster.sharding.typed.ShardingEnvelope;
@ -23,77 +26,136 @@ import akka.cluster.sharding.typed.javadsl.Entity;
import jdocs.akka.persistence.typed.BlogPostExample.BlogCommand;
import jdocs.akka.persistence.typed.BlogPostExample.BlogBehavior;
public class ShardingCompileOnlyTest {
// #counter-messages
interface CounterCommand {}
public static class Increment implements CounterCommand {}
public static class GetValue implements CounterCommand {
private final ActorRef<Integer> replyTo;
public GetValue(ActorRef<Integer> replyTo) {
this.replyTo = replyTo;
}
}
// #counter-messages
interface ShardingCompileOnlyTest {
// #counter
public class Counter extends AbstractBehavior<Counter.Command> {
public static Behavior<CounterCommand> counter(String entityId, Integer value) {
return Behaviors.receive(CounterCommand.class)
.onMessage(Increment.class, msg -> counter(entityId, value + 1))
.onMessage(
GetValue.class,
msg -> {
msg.replyTo.tell(value);
return Behaviors.same();
})
.build();
public interface Command {}
public enum Increment implements Command {
INSTANCE
}
public static class GetValue implements Command {
private final ActorRef<Integer> replyTo;
public GetValue(ActorRef<Integer> replyTo) {
this.replyTo = replyTo;
}
}
public static Behavior<Command> create(String entityId) {
return Behaviors.setup(context -> new Counter(context, entityId));
}
private final ActorContext<Command> context;
private final String entityId;
private int value = 0;
private Counter(ActorContext<Command> context, String entityId) {
this.context = context;
this.entityId = entityId;
}
@Override
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(Increment.class, msg -> onIncrement())
.onMessage(GetValue.class, this::onGetValue)
.build();
}
private Behavior<Command> onIncrement() {
value++;
return this;
}
private Behavior<Command> onGetValue(GetValue msg) {
msg.replyTo.tell(value);
return this;
}
}
// #counter
// #counter-passivate
public static class Idle implements CounterCommand {}
public class Counter2 extends AbstractBehavior<Counter2.Command> {
public static class GoodByeCounter implements CounterCommand {}
public interface Command {}
public static Behavior<CounterCommand> counter2(
ActorRef<ClusterSharding.ShardCommand> shard, String entityId) {
return Behaviors.setup(
ctx -> {
ctx.setReceiveTimeout(Duration.ofSeconds(30), new Idle());
return counter2(shard, entityId, 0);
});
}
private enum Idle implements Command {
INSTANCE
}
private static Behavior<CounterCommand> counter2(
ActorRef<ClusterSharding.ShardCommand> shard, String entityId, Integer value) {
return Behaviors.setup(
context ->
Behaviors.receive(CounterCommand.class)
.onMessage(Increment.class, msg -> counter(entityId, value + 1))
.onMessage(
GetValue.class,
msg -> {
msg.replyTo.tell(value);
return Behaviors.same();
})
.onMessage(
Idle.class,
msg -> {
// after receive timeout
shard.tell(new ClusterSharding.Passivate<>(context.getSelf()));
return Behaviors.same();
})
.onMessage(
GoodByeCounter.class,
msg -> {
// the stopMessage, used for rebalance and passivate
return Behaviors.stopped();
})
.build());
public enum GoodByeCounter implements Command {
INSTANCE
}
public enum Increment implements Command {
INSTANCE
}
public static class GetValue implements Command {
private final ActorRef<Integer> replyTo;
public GetValue(ActorRef<Integer> replyTo) {
this.replyTo = replyTo;
}
}
public static Behavior<Command> create(
ActorRef<ClusterSharding.ShardCommand> shard, String entityId) {
return Behaviors.setup(
ctx -> {
ctx.setReceiveTimeout(Duration.ofSeconds(30), Idle.INSTANCE);
return new Counter2(ctx, shard, entityId);
});
}
private final ActorContext<Command> context;
private final ActorRef<ClusterSharding.ShardCommand> shard;
private final String entityId;
private int value = 0;
private Counter2(
ActorContext<Command> context,
ActorRef<ClusterSharding.ShardCommand> shard,
String entityId) {
this.context = context;
this.shard = shard;
this.entityId = entityId;
}
@Override
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(Increment.class, msg -> onIncrement())
.onMessage(GetValue.class, this::onGetValue)
.onMessage(Idle.class, msg -> onIdle())
.onMessage(GoodByeCounter.class, msg -> onGoodByeCounter())
.build();
}
private Behavior<Command> onIncrement() {
value++;
return this;
}
private Behavior<Command> onGetValue(GetValue msg) {
msg.replyTo.tell(value);
return this;
}
private Behavior<Command> onIdle() {
// after receive timeout
shard.tell(new ClusterSharding.Passivate<>(context.getSelf()));
return this;
}
private Behavior<Command> onGoodByeCounter() {
// the stopMessage, used for rebalance and passivate
return Behaviors.stopped();
}
}
// #counter-passivate
@ -103,11 +165,12 @@ public class ShardingCompileOnlyTest {
// #counter-passivate-init
EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter");
EntityTypeKey<Counter2.Command> typeKey =
EntityTypeKey.create(Counter2.Command.class, "Counter");
sharding.init(
Entity.of(typeKey, ctx -> counter2(ctx.getShard(), ctx.getEntityId()))
.withStopMessage(new GoodByeCounter()));
Entity.of(typeKey, ctx -> Counter2.create(ctx.getShard(), ctx.getEntityId()))
.withStopMessage(Counter2.GoodByeCounter.INSTANCE));
// #counter-passivate-init
}
@ -120,17 +183,17 @@ public class ShardingCompileOnlyTest {
// #sharding-extension
// #init
EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter");
EntityTypeKey<Counter.Command> typeKey = EntityTypeKey.create(Counter.Command.class, "Counter");
ActorRef<ShardingEnvelope<CounterCommand>> shardRegion =
sharding.init(Entity.of(typeKey, ctx -> counter(ctx.getEntityId(), 0)));
ActorRef<ShardingEnvelope<Counter.Command>> shardRegion =
sharding.init(Entity.of(typeKey, ctx -> Counter.create(ctx.getEntityId())));
// #init
// #send
EntityRef<CounterCommand> counterOne = sharding.entityRefFor(typeKey, "counter-1");
counterOne.tell(new Increment());
EntityRef<Counter.Command> counterOne = sharding.entityRefFor(typeKey, "counter-1");
counterOne.tell(Counter.Increment.INSTANCE);
shardRegion.tell(new ShardingEnvelope<>("counter-1", new Increment()));
shardRegion.tell(new ShardingEnvelope<>("counter-1", Counter.Increment.INSTANCE));
// #send
}

View file

@ -51,7 +51,7 @@ interface ShardingReplyCompileOnlyTest {
}
}
public Behavior<Command> create() {
public static Behavior<Command> create() {
return Behaviors.setup(Counter::new);
}

View file

@ -29,31 +29,26 @@ object ShardingCompileOnlySpec {
//#sharding-extension
//#counter
//#counter-messages
object Counter {
//#counter
sealed trait Command
case object Increment extends Command
final case class GetValue(replyTo: ActorRef[Int]) extends Command
//#counter-messages
//#counter
def apply(entityId: String): Behavior[Command] =
counter(entityId, 0)
private def counter(entityId: String, value: Int): Behavior[Command] =
Behaviors.receiveMessage[Command] {
case Increment =>
counter(entityId, value + 1)
case GetValue(replyTo) =>
replyTo ! value
Behaviors.same
def apply(entityId: String): Behavior[Command] = {
def updated(value: Int): Behavior[Command] = {
Behaviors.receiveMessage[Command] {
case Increment =>
updated(value + 1)
case GetValue(replyTo) =>
replyTo ! value
Behaviors.same
}
}
//#counter-messages
updated(0)
}
}
//#counter-messages
//#counter
//#init
@ -87,7 +82,6 @@ object ShardingCompileOnlySpec {
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
//#counter-passivate
object Counter {
sealed trait Command
case object Increment extends Command
@ -97,10 +91,10 @@ object ShardingCompileOnlySpec {
def apply(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[Command] = {
Behaviors.setup { ctx =>
def become(value: Int): Behavior[Command] =
def updated(value: Int): Behavior[Command] =
Behaviors.receiveMessage[Command] {
case Increment =>
become(value + 1)
updated(value + 1)
case GetValue(replyTo) =>
replyTo ! value
Behaviors.same
@ -114,17 +108,19 @@ object ShardingCompileOnlySpec {
}
ctx.setReceiveTimeout(30.seconds, Idle)
become(0)
updated(0)
}
}
}
//#counter-passivate
//#counter-passivate-init
val TypeKey = EntityTypeKey[Counter.Command]("Counter")
ClusterSharding(system).init(
Entity(typeKey = TypeKey, createBehavior = ctx => Counter(ctx.shard, ctx.entityId))
.withStopMessage(Counter.GoodByeCounter))
//#counter-passivate
//#counter-passivate-init
}

View file

@ -28,10 +28,10 @@ Java
It is common for sharding to be used with persistence however any Behavior can be used with sharding e.g. a basic counter:
Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-messages #counter }
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter }
Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter }
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter }
Each Entity type has a key that is then used to retrieve an EntityRef for a given entity identifier.
@ -103,10 +103,18 @@ of `Passivate` and termination of the entity. Such buffered messages are thereaf
to a new incarnation of the entity.
Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-messages #counter-passivate }
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-passivate }
Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter-passivate #counter-passivate-init }
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-passivate }
and then initialized with:
Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-passivate-init }
Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-passivate-init }
Note that in the above example the `stopMessage` is specified as `GoodByeCounter`. That message will be sent to
the entity when it's supposed to stop itself due to rebalance or passivation. If the `stopMessage` is not defined