From 0f56a6d1ed427879dc3baba3ec2bc7ad03e16c08 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 11 Jun 2018 11:53:02 +0200 Subject: [PATCH] Passivate for Typed Sharding, #24478 * also moved singleton doc sample to own file * and mv /doc/ to /docs/ as in other places --- .../typed/internal/ClusterShardingImpl.scala | 64 ++++++++-- .../typed/javadsl/ClusterSharding.scala | 33 ++++- .../typed/scaladsl/ClusterSharding.scala | 30 ++++- .../typed/MultiDcClusterShardingSpec.scala | 4 +- .../typed/ShardingCompileOnlyTest.java | 120 ++++++++++++++++++ .../ClusterShardingPersistenceSpec.scala | 9 +- .../typed/scaladsl/ClusterShardingSpec.scala | 81 ++++++++---- .../typed/ShardingCompileOnlySpec.scala | 70 ++++++---- .../typed/SingletonCompileOnlyTest.java | 40 ++---- .../typed/SingletonCompileOnlySpec.scala | 52 ++++++++ .../main/paradox/typed/cluster-sharding.md | 38 ++++-- .../main/paradox/typed/cluster-singleton.md | 8 +- 12 files changed, 432 insertions(+), 117 deletions(-) create mode 100644 akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java rename akka-cluster-sharding-typed/src/test/scala/{doc => docs}/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala (60%) rename akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java => akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java (59%) create mode 100644 akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index 7f230f753c..4d4138ad19 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -5,25 +5,30 @@ package akka.cluster.sharding.typed package internal +import java.net.URLEncoder import java.util.Optional import java.util.concurrent.{ CompletionStage, ConcurrentHashMap } import scala.compat.java8.OptionConverters._ import scala.compat.java8.FutureConverters._ import scala.concurrent.Future + +import akka.actor.ExtendedActorSystem import akka.actor.{ InternalActorRef, Scheduler } +import akka.actor.typed.ActorContext import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.actor.typed.Props import akka.actor.typed.internal.adapter.ActorRefAdapter import akka.actor.typed.internal.adapter.ActorSystemAdapter +import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardRegion import akka.cluster.sharding.ShardRegion.{ StartEntity ⇒ UntypedStartEntity } -import akka.cluster.sharding.typed.javadsl.EntityIdToBehavior +import akka.cluster.sharding.typed.javadsl.EntityFactory import akka.cluster.typed.Cluster import akka.event.Logging import akka.event.LoggingAdapter @@ -31,6 +36,7 @@ import akka.pattern.AskTimeoutException import akka.pattern.PromiseActorRef import akka.util.Timeout import akka.japi.function.{ Function ⇒ JFunction } +import akka.util.ByteString /** * INTERNAL API @@ -91,9 +97,10 @@ import akka.japi.function.{ Function ⇒ JFunction } // typeKey.name to messageClassName private val regions: ConcurrentHashMap[String, String] = new ConcurrentHashMap private val proxies: ConcurrentHashMap[String, String] = new ConcurrentHashMap + private val shardCommandActors: ConcurrentHashMap[String, ActorRef[scaladsl.ClusterSharding.ShardCommand]] = new ConcurrentHashMap override def spawn[A]( - behavior: String ⇒ Behavior[A], + behavior: (ActorRef[scaladsl.ClusterSharding.ShardCommand], String) ⇒ Behavior[A], entityProps: Props, typeKey: scaladsl.EntityTypeKey[A], settings: ClusterShardingSettings, @@ -104,7 +111,7 @@ import akka.japi.function.{ Function ⇒ JFunction } } override def spawn[A]( - behavior: EntityIdToBehavior[A], + behavior: EntityFactory[A], entityProps: Props, typeKey: javadsl.EntityTypeKey[A], settings: ClusterShardingSettings, @@ -115,8 +122,8 @@ import akka.japi.function.{ Function ⇒ JFunction } Optional.of(defaultShardAllocationStrategy(settings))) } - override def spawnWithMessageExtractor[E, A]( - behavior: String ⇒ Behavior[A], + def spawnWithMessageExtractor[E, A]( + behavior: (ActorRef[scaladsl.ClusterSharding.ShardCommand], String) ⇒ Behavior[A], entityProps: Props, typeKey: scaladsl.EntityTypeKey[A], settings: ClusterShardingSettings, @@ -140,8 +147,20 @@ import akka.japi.function.{ Function ⇒ JFunction } if (settings.shouldHostShard(cluster)) { log.info("Starting Shard Region [{}]...", typeKey.name) + val shardCommandDelegator = + shardCommandActors.computeIfAbsent( + typeKey.name, + new java.util.function.Function[String, ActorRef[scaladsl.ClusterSharding.ShardCommand]] { + override def apply(t: String): ActorRef[scaladsl.ClusterSharding.ShardCommand] = { + // using untyped.systemActorOf to avoid the Future[ActorRef] + system.toUntyped.asInstanceOf[ExtendedActorSystem].systemActorOf( + PropsAdapter(ShardCommandActor.behavior(extractor.handOffStopMessage)), + URLEncoder.encode(typeKey.name, ByteString.UTF_8) + "ShardCommandDelegator") + } + }) + val untypedEntityPropsFactory: String ⇒ akka.actor.Props = { entityId ⇒ - PropsAdapter(behavior(entityId), entityProps) + PropsAdapter(behavior(shardCommandDelegator, entityId), entityProps) } untypedSharding.internalStart( @@ -178,13 +197,13 @@ import akka.japi.function.{ Function ⇒ JFunction } } override def spawnWithMessageExtractor[E, A]( - behavior: EntityIdToBehavior[A], + behavior: EntityFactory[A], entityProps: Props, typeKey: javadsl.EntityTypeKey[A], settings: ClusterShardingSettings, extractor: ShardingMessageExtractor[E, A], allocationStrategy: Optional[ShardAllocationStrategy]): ActorRef[E] = { - spawnWithMessageExtractor(entityId ⇒ behavior.apply(entityId), entityProps, typeKey.asScala, + spawnWithMessageExtractor((shard, entityId) ⇒ behavior.apply(shard, entityId), entityProps, typeKey.asScala, settings, extractor, allocationStrategy.asScala) } @@ -255,3 +274,32 @@ import akka.japi.function.{ Function ⇒ JFunction } } } + +/** + * INTERNAL API + */ +@InternalApi private[akka] object ShardCommandActor { + import akka.actor.typed.scaladsl.adapter._ + import akka.cluster.sharding.ShardRegion.{ Passivate ⇒ UntypedPassivate } + + def behavior(stopMessage: Any): Behavior[scaladsl.ClusterSharding.ShardCommand] = { + def sendUntypedPassivate(entity: ActorRef[_], ctx: ActorContext[_]): Unit = { + val pathToShard = entity.toUntyped.path.elements.take(4).mkString("/") + ctx.asScala.system.toUntyped.actorSelection(pathToShard).tell(UntypedPassivate(stopMessage), entity.toUntyped) + } + + Behaviors.receive { (ctx, msg) ⇒ + msg match { + case scaladsl.ClusterSharding.Passivate(entity) ⇒ + sendUntypedPassivate(entity, ctx) + Behaviors.same + case javadsl.ClusterSharding.Passivate(entity) ⇒ + sendUntypedPassivate(entity, ctx) + Behaviors.same + case _ ⇒ + Behaviors.unhandled + } + } + } + +} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index 43a23d2f6e..c03131a25b 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -8,7 +8,6 @@ package javadsl import java.util.Optional import java.util.concurrent.CompletionStage -import akka.actor.Scheduler import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior @@ -21,13 +20,32 @@ import akka.japi.function.{ Function ⇒ JFunction } import akka.util.Timeout @FunctionalInterface -trait EntityIdToBehavior[A] { - def apply(entityId: String): Behavior[A] +trait EntityFactory[A] { + def apply(shardRegion: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[A] } object ClusterSharding { def get(system: ActorSystem[_]): ClusterSharding = scaladsl.ClusterSharding(system).asJava + + /** + * When an entity is created an `ActorRef[ShardCommand]` is passed to the + * factory method. The entity can request passivation by sending the [[Passivate]] + * message to this ref. Sharding will then send back the specified + * `handOffStopMessage` message to the entity, which is then supposed to stop itself. + * + * Not for user extension. + */ + @DoNotInherit trait ShardCommand extends scaladsl.ClusterSharding.ShardCommand + + /** + * The entity can request passivation by sending the [[Passivate]] message + * to the `ActorRef[ShardCommand]` that was passed in to the factory method + * when creating the entity. Sharding will then send back the specified + * `handOffStopMessage` message to the entity, which is then supposed to stop + * itself. + */ + final case class Passivate[A](entity: ActorRef[A]) extends ShardCommand } /** @@ -130,8 +148,9 @@ object ClusterSharding { * the entity actors for example by defining receive timeout (`context.setReceiveTimeout`). * If a message is already enqueued to the entity when it stops itself the enqueued message * in the mailbox will be dropped. To support graceful passivation without losing such - * messages the entity actor can send [[ShardRegion.Passivate]] to its parent `ShardRegion`. - * The specified wrapped message in `Passivate` will be sent back to the entity, which is + * messages the entity actor can send [[ClusterSharding.Passivate]] to the `ActorRef[ShardCommand]` + * that was passed in to the factory method when creating the entity.. + * The specified `handOffStopMessage` message will be sent back to the entity, which is * then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion` * between reception of `Passivate` and termination of the entity. Such buffered messages * are thereafter delivered to a new incarnation of the entity. @@ -158,7 +177,7 @@ abstract class ClusterSharding { * @tparam A The type of command the entity accepts */ def spawn[A]( - behavior: EntityIdToBehavior[A], + behavior: EntityFactory[A], props: Props, typeKey: EntityTypeKey[A], settings: ClusterShardingSettings, @@ -179,7 +198,7 @@ abstract class ClusterSharding { * @tparam A The type of command the entity accepts */ def spawnWithMessageExtractor[E, A]( - behavior: EntityIdToBehavior[A], + behavior: EntityFactory[A], entityProps: Props, typeKey: EntityTypeKey[A], settings: ClusterShardingSettings, diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index 1dbcd984f9..36f3d36f3e 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -7,7 +7,6 @@ package scaladsl import scala.concurrent.Future -import akka.actor.Scheduler import akka.util.Timeout import scala.reflect.ClassTag @@ -30,6 +29,25 @@ object ClusterSharding extends ExtensionId[ClusterSharding] { override def createExtension(system: ActorSystem[_]): ClusterSharding = new ClusterShardingImpl(system) + /** + * When an entity is created an `ActorRef[ShardCommand]` is passed to the + * factory method. The entity can request passivation by sending the [[Passivate]] + * message to this ref. Sharding will then send back the specified + * `handOffStopMessage` message to the entity, which is then supposed to stop itself. + * + * Not for user extension. + */ + @DoNotInherit trait ShardCommand + + /** + * The entity can request passivation by sending the [[Passivate]] message + * to the `ActorRef[ShardCommand]` that was passed in to the factory method + * when creating the entity. Sharding will then send back the specified + * `handOffStopMessage` message to the entity, which is then supposed to stop + * itself. + */ + final case class Passivate[A](entity: ActorRef[A]) extends ShardCommand with javadsl.ClusterSharding.ShardCommand + } /** @@ -132,8 +150,9 @@ object ClusterSharding extends ExtensionId[ClusterSharding] { * the entity actors for example by defining receive timeout (`context.setReceiveTimeout`). * If a message is already enqueued to the entity when it stops itself the enqueued message * in the mailbox will be dropped. To support graceful passivation without losing such - * messages the entity actor can send [[ShardRegion.Passivate]] to its parent `ShardRegion`. - * The specified wrapped message in `Passivate` will be sent back to the entity, which is + * messages the entity actor can send [[ClusterSharding.Passivate]] to the `ActorRef[ShardCommand]` + * that was passed in to the factory method when creating the entity.. + * The specified `handOffStopMessage` message will be sent back to the entity, which is * then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion` * between reception of `Passivate` and termination of the entity. Such buffered messages * are thereafter delivered to a new incarnation of the entity. @@ -144,6 +163,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] { */ @DoNotInherit trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding ⇒ + import ClusterSharding.ShardCommand /** * Spawn a shard region or a proxy depending on if the settings require role and if this node has @@ -160,7 +180,7 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding * @tparam A The type of command the entity accepts */ def spawn[A]( - behavior: String ⇒ Behavior[A], + behavior: (ActorRef[ShardCommand], String) ⇒ Behavior[A], props: Props, typeKey: EntityTypeKey[A], settings: ClusterShardingSettings, @@ -181,7 +201,7 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding * @tparam A The type of command the entity accepts */ def spawnWithMessageExtractor[E, A]( - behavior: String ⇒ Behavior[A], + behavior: (ActorRef[ShardCommand], String) ⇒ Behavior[A], entityProps: Props, typeKey: EntityTypeKey[A], settings: ClusterShardingSettings, diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala index f5b01b2201..963fc03081 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala @@ -67,7 +67,7 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh "start sharding" in { val sharding = ClusterSharding(typedSystem) val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.spawn( - _ ⇒ multiDcPinger, + (_, _) ⇒ multiDcPinger, Props.empty, typeKey = typeKey, ClusterShardingSettings(typedSystem), @@ -100,7 +100,7 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh "be able to message cross dc via proxy" in { runOn(first, second) { val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).spawn( - _ ⇒ multiDcPinger, + (_, _) ⇒ multiDcPinger, Props.empty, typeKey = typeKey, ClusterShardingSettings(typedSystem).withDataCenter("dc2"), diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java new file mode 100644 index 0000000000..16b519702d --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java @@ -0,0 +1,120 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package jdocs.akka.cluster.sharding.typed; + +import java.time.Duration; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.actor.typed.Behavior; +import akka.actor.typed.Props; +import akka.actor.typed.javadsl.Behaviors; + +//#import +import akka.cluster.sharding.typed.ClusterShardingSettings; +import akka.cluster.sharding.typed.ShardingEnvelope; +import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.cluster.sharding.typed.javadsl.EntityTypeKey; +import akka.cluster.sharding.typed.javadsl.EntityRef; + +//#import + +public class ShardingCompileOnlyTest { + + //#counter-messages + interface CounterCommand {} + public static class Increment implements CounterCommand { } + public static class GoodByeCounter implements CounterCommand { } + + public static class GetValue implements CounterCommand { + private final ActorRef replyTo; + public GetValue(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + //#counter-messages + + //#counter + + public static Behavior counter(String entityId, Integer value) { + return Behaviors.receive(CounterCommand.class) + .onMessage(Increment.class, (ctx, msg) -> { + return counter(entityId,value + 1); + }) + .onMessage(GetValue.class, (ctx, msg) -> { + msg.replyTo.tell(value); + return Behaviors.same(); + }) + .onMessage(GoodByeCounter.class, (ctx, msg) -> { + return Behaviors.stopped(); + }) + .build(); + } + //#counter + + //#counter-passivate + public static class Idle implements CounterCommand { } + + public static Behavior counter2(ActorRef shard, String entityId) { + return Behaviors.setup(ctx -> { + ctx.setReceiveTimeout(Duration.ofSeconds(30), new Idle()); + return counter2(shard, entityId, 0); + }); + } + + private static Behavior counter2( + ActorRef shard, + String entityId, + Integer value) { + return Behaviors.receive(CounterCommand.class) + .onMessage(Increment.class, (ctx, msg) -> { + return counter(entityId,value + 1); + }) + .onMessage(GetValue.class, (ctx, msg) -> { + msg.replyTo.tell(value); + return Behaviors.same(); + }) + .onMessage(Idle.class, (ctx, msg) -> { + // after receive timeout + shard.tell(new ClusterSharding.Passivate<>(ctx.getSelf())); + return Behaviors.same(); + }) + .onMessage(GoodByeCounter.class, (ctx, msg) -> { + // the handOffStopMessage, used for rebalance and passivate + return Behaviors.stopped(); + }) + .build(); + } + //#counter-passivate + + public static void example() { + + ActorSystem system = ActorSystem.create( + Behaviors.empty(), "ShardingExample" + ); + + //#sharding-extension + ClusterSharding sharding = ClusterSharding.get(system); + //#sharding-extension + + //#spawn + EntityTypeKey typeKey = EntityTypeKey.create(CounterCommand.class, "Counter"); + ActorRef> shardRegion = sharding.spawn( + (shard, entityId) -> counter(entityId,0), + Props.empty(), + typeKey, + ClusterShardingSettings.create(system), + 10, + new GoodByeCounter()); + //#spawn + + //#send + EntityRef counterOne = sharding.entityRefFor(typeKey, "counter-`"); + counterOne.tell(new Increment()); + + shardRegion.tell(new ShardingEnvelope<>("counter-1", new Increment())); + //#send + } +} diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala index 5c529ab5ad..f539e0d31e 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala @@ -70,8 +70,13 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh Cluster(system).manager ! Join(Cluster(system).selfMember.address) "start persistent actor" in { - ClusterSharding(system).spawn[Command](persistentActor, Props.empty, typeKey, - ClusterShardingSettings(system), maxNumberOfShards = 100, handOffStopMessage = StopPlz) + ClusterSharding(system).spawn[Command]( + (_, entityId) ⇒ persistentActor(entityId), + Props.empty, + typeKey, + ClusterShardingSettings(system), + maxNumberOfShards = 100, + handOffStopMessage = StopPlz) val p = TestProbe[String]() diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala index b55d28257f..399e3452a2 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala @@ -7,6 +7,8 @@ package akka.cluster.sharding.typed.scaladsl import java.nio.charset.StandardCharsets import scala.concurrent.duration._ + +import akka.Done import akka.actor.ExtendedActorSystem import akka.actor.testkit.typed.scaladsl.ActorTestKit import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit @@ -63,6 +65,7 @@ object ClusterShardingSpec { final case class ReplyPlz(toMe: ActorRef[String]) extends TestProtocol final case class WhoAreYou(replyTo: ActorRef[String]) extends TestProtocol final case class StopPlz() extends TestProtocol + final case class PassivatePlz() extends TestProtocol sealed trait IdTestProtocol extends java.io.Serializable final case class IdReplyPlz(id: String, toMe: ActorRef[String]) extends IdTestProtocol @@ -72,12 +75,13 @@ object ClusterShardingSpec { class Serializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { def identifier: Int = 48 def manifest(o: AnyRef): String = o match { - case _: ReplyPlz ⇒ "a" - case _: WhoAreYou ⇒ "b" - case _: StopPlz ⇒ "c" - case _: IdReplyPlz ⇒ "A" - case _: IdWhoAreYou ⇒ "B" - case _: IdStopPlz ⇒ "C" + case _: ReplyPlz ⇒ "a" + case _: WhoAreYou ⇒ "b" + case _: StopPlz ⇒ "c" + case _: PassivatePlz ⇒ "d" + case _: IdReplyPlz ⇒ "A" + case _: IdWhoAreYou ⇒ "B" + case _: IdStopPlz ⇒ "C" } private def actorRefToBinary(ref: ActorRef[_]): Array[Byte] = @@ -94,6 +98,7 @@ object ClusterShardingSpec { case ReplyPlz(ref) ⇒ actorRefToBinary(ref) case WhoAreYou(ref) ⇒ actorRefToBinary(ref) case _: StopPlz ⇒ Array.emptyByteArray + case _: PassivatePlz ⇒ Array.emptyByteArray case IdReplyPlz(id, ref) ⇒ idAndRefToBinary(id, ref) case IdWhoAreYou(id, ref) ⇒ idAndRefToBinary(id, ref) case _: IdStopPlz ⇒ Array.emptyByteArray @@ -113,6 +118,7 @@ object ClusterShardingSpec { case "a" ⇒ ReplyPlz(actorRefFromBinary(bytes)) case "b" ⇒ WhoAreYou(actorRefFromBinary(bytes)) case "c" ⇒ StopPlz() + case "d" ⇒ PassivatePlz() case "A" ⇒ IdReplyPlz.tupled(idAndRefFromBinary(bytes)) case "B" ⇒ IdWhoAreYou.tupled(idAndRefFromBinary(bytes)) case "C" ⇒ IdStopPlz() @@ -135,22 +141,28 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. } private val typeKey = EntityTypeKey[TestProtocol]("envelope-shard") - private val behavior = Behaviors.receive[TestProtocol] { - case (_, StopPlz()) ⇒ - Behaviors.stopped + private def behavior(shard: ActorRef[ClusterSharding.ShardCommand], stopProbe: Option[ActorRef[Done]] = None) = + Behaviors.receive[TestProtocol] { + case (ctx, PassivatePlz()) ⇒ + shard ! ClusterSharding.Passivate(ctx.self) + Behaviors.same - case (ctx, WhoAreYou(replyTo)) ⇒ - val address = Cluster(ctx.system).selfMember.address - replyTo ! s"I'm ${ctx.self.path.name} at ${address.host.get}:${address.port.get}" - Behaviors.same + case (_, StopPlz()) ⇒ + stopProbe.foreach(_ ! Done) + Behaviors.stopped - case (_, ReplyPlz(toMe)) ⇒ - toMe ! "Hello!" - Behaviors.same - } + case (ctx, WhoAreYou(replyTo)) ⇒ + val address = Cluster(ctx.system).selfMember.address + replyTo ! s"I'm ${ctx.self.path.name} at ${address.host.get}:${address.port.get}" + Behaviors.same + + case (_, ReplyPlz(toMe)) ⇒ + toMe ! "Hello!" + Behaviors.same + } private val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard") - private val behaviorWithId = Behaviors.receive[IdTestProtocol] { + private def behaviorWithId(shard: ActorRef[ClusterSharding.ShardCommand]) = Behaviors.receive[IdTestProtocol] { case (_, IdStopPlz()) ⇒ Behaviors.stopped @@ -165,7 +177,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. } private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.spawn( - _ ⇒ behavior, + (shard, _) ⇒ behavior(shard), Props.empty, typeKey, ClusterShardingSettings(system), @@ -173,7 +185,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. StopPlz()) private val shardingRef2 = sharding2.spawn( - _ ⇒ behavior, + (shard, _) ⇒ behavior(shard), Props.empty, typeKey, ClusterShardingSettings(system2), @@ -181,7 +193,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. StopPlz()) private val shardingRef3: ActorRef[IdTestProtocol] = sharding.spawnWithMessageExtractor( - _ ⇒ behaviorWithId, + (shard, _) ⇒ behaviorWithId(shard), Props.empty, typeKey2, ClusterShardingSettings(system), @@ -193,7 +205,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. None) private val shardingRef4 = sharding2.spawnWithMessageExtractor( - _ ⇒ behaviorWithId, + (shard, _) ⇒ behaviorWithId(shard), Props.empty, typeKey2, ClusterShardingSettings(system2), @@ -246,11 +258,34 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. } } + "be able to passivate" in { + val stopProbe = TestProbe[Done]() + val p = TestProbe[String]() + val typeKey3 = EntityTypeKey[TestProtocol]("passivate-test") + + val shardingRef3: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.spawn( + (shard, _) ⇒ behavior(shard, Some(stopProbe.ref)), + Props.empty, + typeKey3, + ClusterShardingSettings(system), + 10, + StopPlz()) + + shardingRef3 ! ShardingEnvelope(s"test1", ReplyPlz(p.ref)) + p.expectMessage("Hello!") + + shardingRef3 ! ShardingEnvelope(s"test1", PassivatePlz()) + stopProbe.expectMessage(Done) + + shardingRef3 ! ShardingEnvelope(s"test1", ReplyPlz(p.ref)) + p.expectMessage("Hello!") + } + "fail if starting sharding for already used typeName, but with a different type" in { // sharding has been already started with EntityTypeKey[TestProtocol]("envelope-shard") val ex = intercept[Exception] { sharding.spawn( - _ ⇒ behaviorWithId, + (shard, _) ⇒ behaviorWithId(shard), Props.empty, EntityTypeKey[IdTestProtocol]("envelope-shard"), ClusterShardingSettings(system), diff --git a/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala similarity index 60% rename from akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala rename to akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala index b51199bd47..37d9fb658e 100644 --- a/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala @@ -2,11 +2,11 @@ * Copyright (C) 2018 Lightbend Inc. */ -package doc.akka.cluster.sharding.typed +package docs.akka.cluster.sharding.typed +import scala.concurrent.duration._ import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } import akka.actor.typed.scaladsl.Behaviors -import akka.cluster.typed.{ ClusterSingleton, ClusterSingletonSettings } import docs.akka.persistence.typed.InDepthPersistentBehaviorSpec import docs.akka.persistence.typed.InDepthPersistentBehaviorSpec.{ BlogCommand, PassivatePost } @@ -24,26 +24,32 @@ object ShardingCompileOnlySpec { val sharding = ClusterSharding(system) //#sharding-extension - //#counter + //#counter-messages trait CounterCommand case object Increment extends CounterCommand final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand case object GoodByeCounter extends CounterCommand + //#counter-messages - def counter(entityId: String, value: Int): Behavior[CounterCommand] = Behaviors.receiveMessage[CounterCommand] { - case Increment ⇒ - counter(entityId, value + 1) - case GetValue(replyTo) ⇒ - replyTo ! value - Behaviors.same - } + //#counter + + def counter(entityId: String, value: Int): Behavior[CounterCommand] = + Behaviors.receiveMessage[CounterCommand] { + case Increment ⇒ + counter(entityId, value + 1) + case GetValue(replyTo) ⇒ + replyTo ! value + Behaviors.same + case GoodByeCounter ⇒ + Behaviors.stopped + } //#counter //#spawn val TypeKey = EntityTypeKey[CounterCommand]("Counter") // if a extractor is defined then the type would be ActorRef[BasicCommand] val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.spawn( - behavior = entityId ⇒ counter(entityId, 0), + behavior = (shard, entityId) ⇒ counter(entityId, 0), props = Props.empty, typeKey = TypeKey, settings = ClusterShardingSettings(system), @@ -60,10 +66,11 @@ object ShardingCompileOnlySpec { shardRegion ! ShardingEnvelope("counter-1", Increment) //#send + import InDepthPersistentBehaviorSpec.behavior //#persistence val ShardingTypeName = EntityTypeKey[BlogCommand]("BlogPost") ClusterSharding(system).spawn[BlogCommand]( - behavior = entityId ⇒ InDepthPersistentBehaviorSpec.behavior(entityId), + behavior = (shard, entityId) ⇒ behavior(entityId), props = Props.empty, typeKey = ShardingTypeName, settings = ClusterShardingSettings(system), @@ -71,20 +78,33 @@ object ShardingCompileOnlySpec { handOffStopMessage = PassivatePost) //#persistence - // as a singleton + //#counter-passivate - //#singleton - val singletonManager = ClusterSingleton(system) - // Start if needed and provide a proxy to a named singleton - val proxy: ActorRef[CounterCommand] = singletonManager.spawn( - behavior = counter("TheCounter", 0), - "GlobalCounter", - Props.empty, - ClusterSingletonSettings(system), - terminationMessage = GoodByeCounter - ) + case object Idle extends CounterCommand - proxy ! Increment - //#singleton + def counter2(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[CounterCommand] = { + Behaviors.setup { ctx ⇒ + + def become(value: Int): Behavior[CounterCommand] = + Behaviors.receiveMessage[CounterCommand] { + case Increment ⇒ + become(value + 1) + case GetValue(replyTo) ⇒ + replyTo ! value + Behaviors.same + case Idle ⇒ + // after receive timeout + shard ! ClusterSharding.Passivate(ctx.self) + Behaviors.same + case GoodByeCounter ⇒ + // the handOffStopMessage, used for rebalance and passivate + Behaviors.stopped + } + + ctx.setReceiveTimeout(30.seconds, Idle) + become(0) + } + } + //#counter-passivate } diff --git a/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java similarity index 59% rename from akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java rename to akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java index 2cae52a2a4..35a1af59e2 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java +++ b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java @@ -2,26 +2,21 @@ * Copyright (C) 2018 Lightbend Inc. */ -package jdoc.akka.cluster.sharding.typed; +package jdocs.akka.cluster.typed; import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; import akka.actor.typed.Props; import akka.actor.typed.javadsl.Behaviors; + +//#import import akka.cluster.typed.ClusterSingleton; import akka.cluster.typed.ClusterSingletonSettings; //#import -import akka.cluster.sharding.typed.ClusterShardingSettings; -import akka.cluster.sharding.typed.ShardingEnvelope; -import akka.cluster.sharding.typed.javadsl.ClusterSharding; -import akka.cluster.sharding.typed.javadsl.EntityTypeKey; -import akka.cluster.sharding.typed.javadsl.EntityRef; -//#import - -public class ShardingCompileOnlyTest { +public class SingletonCompileOnlyTest { //#counter interface CounterCommand {} @@ -44,6 +39,9 @@ public class ShardingCompileOnlyTest { msg.replyTo.tell(value); return Behaviors.same(); }) + .onMessage(GoodByeCounter.class, (ctx, msg) -> { + return Behaviors.stopped(); + }) .build(); } //#counter @@ -51,31 +49,9 @@ public class ShardingCompileOnlyTest { public static void example() { ActorSystem system = ActorSystem.create( - Behaviors.empty(), "ShardingExample" + Behaviors.empty(), "SingletonExample" ); - //#sharding-extension - ClusterSharding sharding = ClusterSharding.get(system); - //#sharding-extension - - //#spawn - EntityTypeKey typeKey = EntityTypeKey.create(CounterCommand.class, "Counter"); - ActorRef> shardRegion = sharding.spawn( - entityId -> counter(entityId,0), - Props.empty(), - typeKey, - ClusterShardingSettings.create(system), - 10, - new GoodByeCounter()); - //#spawn - - //#send - EntityRef counterOne = sharding.entityRefFor(typeKey, "counter-`"); - counterOne.tell(new Increment()); - - shardRegion.tell(new ShardingEnvelope<>("counter-1", new Increment())); - //#send - //#singleton ClusterSingleton singleton = ClusterSingleton.get(system); // Start if needed and provide a proxy to a named singleton diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala new file mode 100644 index 0000000000..f70117ae42 --- /dev/null +++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package docs.akka.cluster.typed + +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.Props +import akka.actor.typed.scaladsl.Behaviors + +object SingletonCompileOnlySpec { + + val system = ActorSystem(Behaviors.empty, "Singleton") + + //#counter + trait CounterCommand + case object Increment extends CounterCommand + final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand + case object GoodByeCounter extends CounterCommand + + def counter(entityId: String, value: Int): Behavior[CounterCommand] = + Behaviors.receiveMessage[CounterCommand] { + case Increment ⇒ + counter(entityId, value + 1) + case GetValue(replyTo) ⇒ + replyTo ! value + Behaviors.same + case GoodByeCounter ⇒ + Behaviors.stopped + } + //#counter + + //#singleton + import akka.cluster.typed.ClusterSingleton + import akka.cluster.typed.ClusterSingletonSettings + + val singletonManager = ClusterSingleton(system) + // Start if needed and provide a proxy to a named singleton + val proxy: ActorRef[CounterCommand] = singletonManager.spawn( + behavior = counter("TheCounter", 0), + "GlobalCounter", + Props.empty, + ClusterSingletonSettings(system), + terminationMessage = GoodByeCounter + ) + + proxy ! Increment + //#singleton + +} diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index b30137c326..998fa09090 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -29,35 +29,35 @@ This module is currently marked as @ref:[may change](../common/may-change.md) in Sharding is accessed via the `ClusterSharding` extension Scala -: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #sharding-extension } +: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #sharding-extension } Java -: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #import #sharding-extension } +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #import #sharding-extension } It is common for sharding to be used with persistence however any Behavior can be used with sharding e.g. a basic counter: Scala -: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter } +: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-messages #counter } Java -: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter } +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter } Each Entity type has a key that is then used to retrieve an EntityRef for a given entity identifier. Scala -: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #spawn } +: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #spawn } Java -: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #spawn } +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #spawn } Messages to a specific entity are then sent via an EntityRef. It is also possible to wrap methods in a `ShardingEnvelop` or define extractor functions and send messages directly to the shard region. Scala -: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #send } +: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #send } Java -: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #send } +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #send } ## Persistence example @@ -74,7 +74,27 @@ Scala To create the entity: Scala -: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #persistence } +: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #persistence } Sending messages to entities is the same as the example above. The only difference is when an entity is moved the state will be restored. See @ref:[persistence](persistence.md) for more details. + +## Passivation + +If the state of the entities are persistent you may stop entities that are not used to +reduce memory consumption. This is done by the application specific implementation of +the entity actors for example by defining receive timeout (`context.setReceiveTimeout`). +If a message is already enqueued to the entity when it stops itself the enqueued message +in the mailbox will be dropped. To support graceful passivation without losing such +messages the entity actor can send `ClusterSharding.Passivate` to to the +@scala:[`ActorRef[ShardCommand]`]@java:[`ActorRef`] that was passed in to +the factory method when creating the entity. The specified `handOffStopMessage` message +will be sent back to the entity, which is then supposed to stop itself. Incoming messages +will be buffered by the `Shard` between reception of `Passivate` and termination of the +entity. Such buffered messages are thereafter delivered to a new incarnation of the entity. + +Scala +: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-messages #counter-passivate } + +Java +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter-passivate } diff --git a/akka-docs/src/main/paradox/typed/cluster-singleton.md b/akka-docs/src/main/paradox/typed/cluster-singleton.md index 8284ea1751..6877fc9af8 100644 --- a/akka-docs/src/main/paradox/typed/cluster-singleton.md +++ b/akka-docs/src/main/paradox/typed/cluster-singleton.md @@ -42,20 +42,20 @@ instance will eventually be started. Any `Behavior` can be run as a singleton. E.g. a basic counter: Scala -: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter } +: @@snip [SingletonCompileOnlySpec.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala) { #counter } Java -: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter } +: @@snip [SingletonCompileOnlyTest.java](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java) { #counter } Then on every node in the cluster, or every node with a given role, use the `ClusterSingleton` extension to spawn the singleton. An instance will per data centre of the cluster: Scala -: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #singleton } +: @@snip [SingletonCompileOnlySpec.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala) { #singleton } Java -: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #singleton } +: @@snip [SingletonCompileOnlyTest.java](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java) { #import #singleton } ## Accessing singleton of another data centre