diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java index c75e9cc3b4..763500dc49 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java @@ -67,7 +67,7 @@ public class HelloWorldPersistentEntityExample { extends EventSourcedEntity { // Command - interface Command extends CborSerializable {} + public interface Command extends CborSerializable {} public static final class Greet implements Command { public final String whom; diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java index 00a09b2639..b14cea8f8a 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java @@ -9,7 +9,10 @@ import java.time.Duration; import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; // #import import akka.cluster.sharding.typed.ShardingEnvelope; @@ -23,77 +26,136 @@ import akka.cluster.sharding.typed.javadsl.Entity; import jdocs.akka.persistence.typed.BlogPostExample.BlogCommand; import jdocs.akka.persistence.typed.BlogPostExample.BlogBehavior; -public class ShardingCompileOnlyTest { - - // #counter-messages - interface CounterCommand {} - - public static class Increment implements CounterCommand {} - - public static class GetValue implements CounterCommand { - private final ActorRef replyTo; - - public GetValue(ActorRef replyTo) { - this.replyTo = replyTo; - } - } - // #counter-messages +interface ShardingCompileOnlyTest { // #counter + public class Counter extends AbstractBehavior { - public static Behavior counter(String entityId, Integer value) { - return Behaviors.receive(CounterCommand.class) - .onMessage(Increment.class, msg -> counter(entityId, value + 1)) - .onMessage( - GetValue.class, - msg -> { - msg.replyTo.tell(value); - return Behaviors.same(); - }) - .build(); + public interface Command {} + + public enum Increment implements Command { + INSTANCE + } + + public static class GetValue implements Command { + private final ActorRef replyTo; + + public GetValue(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + public static Behavior create(String entityId) { + return Behaviors.setup(context -> new Counter(context, entityId)); + } + + private final ActorContext context; + private final String entityId; + private int value = 0; + + private Counter(ActorContext context, String entityId) { + this.context = context; + this.entityId = entityId; + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(Increment.class, msg -> onIncrement()) + .onMessage(GetValue.class, this::onGetValue) + .build(); + } + + private Behavior onIncrement() { + value++; + return this; + } + + private Behavior onGetValue(GetValue msg) { + msg.replyTo.tell(value); + return this; + } } // #counter // #counter-passivate - public static class Idle implements CounterCommand {} + public class Counter2 extends AbstractBehavior { - public static class GoodByeCounter implements CounterCommand {} + public interface Command {} - public static Behavior counter2( - ActorRef shard, String entityId) { - return Behaviors.setup( - ctx -> { - ctx.setReceiveTimeout(Duration.ofSeconds(30), new Idle()); - return counter2(shard, entityId, 0); - }); - } + private enum Idle implements Command { + INSTANCE + } - private static Behavior counter2( - ActorRef shard, String entityId, Integer value) { - return Behaviors.setup( - context -> - Behaviors.receive(CounterCommand.class) - .onMessage(Increment.class, msg -> counter(entityId, value + 1)) - .onMessage( - GetValue.class, - msg -> { - msg.replyTo.tell(value); - return Behaviors.same(); - }) - .onMessage( - Idle.class, - msg -> { - // after receive timeout - shard.tell(new ClusterSharding.Passivate<>(context.getSelf())); - return Behaviors.same(); - }) - .onMessage( - GoodByeCounter.class, - msg -> { - // the stopMessage, used for rebalance and passivate - return Behaviors.stopped(); - }) - .build()); + public enum GoodByeCounter implements Command { + INSTANCE + } + + public enum Increment implements Command { + INSTANCE + } + + public static class GetValue implements Command { + private final ActorRef replyTo; + + public GetValue(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + public static Behavior create( + ActorRef shard, String entityId) { + return Behaviors.setup( + ctx -> { + ctx.setReceiveTimeout(Duration.ofSeconds(30), Idle.INSTANCE); + return new Counter2(ctx, shard, entityId); + }); + } + + private final ActorContext context; + private final ActorRef shard; + private final String entityId; + private int value = 0; + + private Counter2( + ActorContext context, + ActorRef shard, + String entityId) { + this.context = context; + this.shard = shard; + this.entityId = entityId; + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(Increment.class, msg -> onIncrement()) + .onMessage(GetValue.class, this::onGetValue) + .onMessage(Idle.class, msg -> onIdle()) + .onMessage(GoodByeCounter.class, msg -> onGoodByeCounter()) + .build(); + } + + private Behavior onIncrement() { + value++; + return this; + } + + private Behavior onGetValue(GetValue msg) { + msg.replyTo.tell(value); + return this; + } + + private Behavior onIdle() { + // after receive timeout + shard.tell(new ClusterSharding.Passivate<>(context.getSelf())); + return this; + } + + private Behavior onGoodByeCounter() { + // the stopMessage, used for rebalance and passivate + return Behaviors.stopped(); + } } // #counter-passivate @@ -103,11 +165,12 @@ public class ShardingCompileOnlyTest { // #counter-passivate-init - EntityTypeKey typeKey = EntityTypeKey.create(CounterCommand.class, "Counter"); + EntityTypeKey typeKey = + EntityTypeKey.create(Counter2.Command.class, "Counter"); sharding.init( - Entity.of(typeKey, ctx -> counter2(ctx.getShard(), ctx.getEntityId())) - .withStopMessage(new GoodByeCounter())); + Entity.of(typeKey, ctx -> Counter2.create(ctx.getShard(), ctx.getEntityId())) + .withStopMessage(Counter2.GoodByeCounter.INSTANCE)); // #counter-passivate-init } @@ -120,17 +183,17 @@ public class ShardingCompileOnlyTest { // #sharding-extension // #init - EntityTypeKey typeKey = EntityTypeKey.create(CounterCommand.class, "Counter"); + EntityTypeKey typeKey = EntityTypeKey.create(Counter.Command.class, "Counter"); - ActorRef> shardRegion = - sharding.init(Entity.of(typeKey, ctx -> counter(ctx.getEntityId(), 0))); + ActorRef> shardRegion = + sharding.init(Entity.of(typeKey, ctx -> Counter.create(ctx.getEntityId()))); // #init // #send - EntityRef counterOne = sharding.entityRefFor(typeKey, "counter-1"); - counterOne.tell(new Increment()); + EntityRef counterOne = sharding.entityRefFor(typeKey, "counter-1"); + counterOne.tell(Counter.Increment.INSTANCE); - shardRegion.tell(new ShardingEnvelope<>("counter-1", new Increment())); + shardRegion.tell(new ShardingEnvelope<>("counter-1", Counter.Increment.INSTANCE)); // #send } diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingReplyCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingReplyCompileOnlyTest.java index 0f1cf6a835..ff8de8a9e0 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingReplyCompileOnlyTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingReplyCompileOnlyTest.java @@ -51,7 +51,7 @@ interface ShardingReplyCompileOnlyTest { } } - public Behavior create() { + public static Behavior create() { return Behaviors.setup(Counter::new); } diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala index d9e77e7759..d80d3ab335 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala @@ -29,31 +29,26 @@ object ShardingCompileOnlySpec { //#sharding-extension //#counter - //#counter-messages object Counter { - //#counter sealed trait Command case object Increment extends Command final case class GetValue(replyTo: ActorRef[Int]) extends Command - //#counter-messages - //#counter - - def apply(entityId: String): Behavior[Command] = - counter(entityId, 0) - - private def counter(entityId: String, value: Int): Behavior[Command] = - Behaviors.receiveMessage[Command] { - case Increment => - counter(entityId, value + 1) - case GetValue(replyTo) => - replyTo ! value - Behaviors.same + def apply(entityId: String): Behavior[Command] = { + def updated(value: Int): Behavior[Command] = { + Behaviors.receiveMessage[Command] { + case Increment => + updated(value + 1) + case GetValue(replyTo) => + replyTo ! value + Behaviors.same + } } - //#counter-messages + updated(0) + + } } - //#counter-messages //#counter //#init @@ -87,7 +82,6 @@ object ShardingCompileOnlySpec { import akka.cluster.sharding.typed.scaladsl.EntityTypeKey //#counter-passivate - object Counter { sealed trait Command case object Increment extends Command @@ -97,10 +91,10 @@ object ShardingCompileOnlySpec { def apply(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[Command] = { Behaviors.setup { ctx => - def become(value: Int): Behavior[Command] = + def updated(value: Int): Behavior[Command] = Behaviors.receiveMessage[Command] { case Increment => - become(value + 1) + updated(value + 1) case GetValue(replyTo) => replyTo ! value Behaviors.same @@ -114,17 +108,19 @@ object ShardingCompileOnlySpec { } ctx.setReceiveTimeout(30.seconds, Idle) - become(0) + updated(0) } } } + //#counter-passivate + //#counter-passivate-init val TypeKey = EntityTypeKey[Counter.Command]("Counter") ClusterSharding(system).init( Entity(typeKey = TypeKey, createBehavior = ctx => Counter(ctx.shard, ctx.entityId)) .withStopMessage(Counter.GoodByeCounter)) - //#counter-passivate + //#counter-passivate-init } diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 8382ac9ad4..44d0096db6 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -28,10 +28,10 @@ Java It is common for sharding to be used with persistence however any Behavior can be used with sharding e.g. a basic counter: Scala -: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-messages #counter } +: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter } Java -: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter } +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter } Each Entity type has a key that is then used to retrieve an EntityRef for a given entity identifier. @@ -103,10 +103,18 @@ of `Passivate` and termination of the entity. Such buffered messages are thereaf to a new incarnation of the entity. Scala -: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-messages #counter-passivate } +: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-passivate } Java -: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter-passivate #counter-passivate-init } +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-passivate } + +and then initialized with: + +Scala +: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-passivate-init } + +Java +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-passivate-init } Note that in the above example the `stopMessage` is specified as `GoodByeCounter`. That message will be sent to the entity when it's supposed to stop itself due to rebalance or passivation. If the `stopMessage` is not defined