From 0804daf1a50dbcddf85dcbcce4bd2c6822a06acd Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 28 Aug 2018 11:41:34 +0200 Subject: [PATCH] Simplify signature of ClusterSharding.spawn, #25480 --- .../src/main/resources/reference.conf | 12 ++ .../typed/ClusterShardingSettings.scala | 12 +- .../typed/ShardingMessageExtractor.scala | 61 +++--- .../typed/internal/ClusterShardingImpl.scala | 116 ++++++------ .../typed/javadsl/ClusterSharding.scala | 173 ++++++++++++------ .../typed/scaladsl/ClusterSharding.scala | 167 +++++++++++------ .../typed/MultiDcClusterShardingSpec.scala | 29 ++- .../typed/ShardingCompileOnlyTest.java | 59 +++++- .../ClusterShardingPersistenceSpec.scala | 10 +- .../typed/scaladsl/ClusterShardingSpec.scala | 61 +++--- .../typed/ShardingCompileOnlySpec.scala | 38 ++-- .../cluster/sharding/ClusterSharding.scala | 4 + .../akka/cluster/typed/ClusterSingleton.scala | 2 + .../src/main/paradox/cluster-sharding.md | 2 +- .../main/paradox/typed/cluster-sharding.md | 16 +- .../typed/InDepthPersistentBehaviorTest.java | 1 - 16 files changed, 465 insertions(+), 298 deletions(-) diff --git a/akka-cluster-sharding-typed/src/main/resources/reference.conf b/akka-cluster-sharding-typed/src/main/resources/reference.conf index 0522c24776..5d8730f947 100644 --- a/akka-cluster-sharding-typed/src/main/resources/reference.conf +++ b/akka-cluster-sharding-typed/src/main/resources/reference.conf @@ -1,3 +1,15 @@ + +akka.cluster.sharding { + # Number of shards used by the default HashCodeMessageExtractor + # when no other message extractor is defined. This value must be + # the same for all nodes in the cluster and that is verified by + # configuration check when joining. Changing the value requires + # stopping all nodes in the cluster. + number-of-shards = 1000 +} + +# FIXME JoinConfigCompatChecker for number-of-shards + akka.actor { serializers { typed-sharding = "akka.cluster.sharding.typed.internal.ShardingSerializer" diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala index 94984eb60f..774f479cf5 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala @@ -25,7 +25,8 @@ object ClusterShardingSettings { def fromConfig(config: Config): ClusterShardingSettings = { val untypedSettings = UntypedShardingSettings(config) - fromUntypedSettings(untypedSettings) + val numberOfShards = config.getInt("number-of-shards") + fromUntypedSettings(numberOfShards, untypedSettings) } /** Java API: Creates new cluster sharding settings object */ @@ -33,8 +34,9 @@ object ClusterShardingSettings { apply(system) /** INTERNAL API: Indended only for internal use, it is not recommended to keep converting between the setting types */ - private[akka] def fromUntypedSettings(untypedSettings: UntypedShardingSettings): ClusterShardingSettings = { + private[akka] def fromUntypedSettings(numberOfShards: Int, untypedSettings: UntypedShardingSettings): ClusterShardingSettings = { new ClusterShardingSettings( + numberOfShards, role = untypedSettings.role, dataCenter = None, rememberEntities = untypedSettings.rememberEntities, @@ -217,6 +219,7 @@ object ClusterShardingSettings { } /** + * @param numberOfShards number of shards used by the default [[HashCodeMessageExtractor]] * @param role Specifies that this entity type requires cluster nodes with a specific role. * If the role is not specified all nodes in the cluster are used. If the given role does * not match the role of the current node the `ShardRegion` will be started in proxy mode. @@ -237,6 +240,7 @@ object ClusterShardingSettings { * @param tuningParameters additional tuning parameters, see descriptions in reference.conf */ final class ClusterShardingSettings( + val numberOfShards: Int, val role: Option[String], val dataCenter: Option[DataCenter], val rememberEntities: Boolean, @@ -263,6 +267,9 @@ final class ClusterShardingSettings( role.forall(cluster.selfMember.roles.contains) && dataCenter.forall(cluster.selfMember.dataCenter.contains) + // no withNumberOfShards because it should be defined in configuration to be able to verify same + // value on all nodes with `JoinConfigCompatChecker` + def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.option(role)) def withDataCenter(dataCenter: DataCenter): ClusterShardingSettings = @@ -300,6 +307,7 @@ final class ClusterShardingSettings( tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters, coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings): ClusterShardingSettings = new ClusterShardingSettings( + numberOfShards, role, dataCenter, rememberEntities, diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingMessageExtractor.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingMessageExtractor.scala index 48e94c0b22..8195ce2b09 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingMessageExtractor.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingMessageExtractor.scala @@ -14,18 +14,18 @@ object ShardingMessageExtractor { * * This is recommended since it does not force details about sharding into the entity protocol */ - def apply[A](maxNumberOfShards: Int, handOffStopMessage: A): ShardingMessageExtractor[ShardingEnvelope[A], A] = - new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) + def apply[M](numberOfShards: Int): ShardingMessageExtractor[ShardingEnvelope[M], M] = + new HashCodeMessageExtractor[M](numberOfShards) /** * Scala API: Create a message extractor for a protocol where the entity id is available in each message. */ - def noEnvelope[A]( - maxNumberOfShards: Int, - handOffStopMessage: A)( - extractEntityId: A ⇒ String): ShardingMessageExtractor[A, A] = - new HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) { - def entityId(message: A) = extractEntityId(message) + def noEnvelope[M]( + numberOfShards: Int, + stopMessage: M)( + extractEntityId: M ⇒ String): ShardingMessageExtractor[M, M] = + new HashCodeNoEnvelopeMessageExtractor[M](numberOfShards) { + def entityId(message: M) = extractEntityId(message) } } @@ -34,11 +34,11 @@ object ShardingMessageExtractor { * Entirely customizable typed message extractor. Prefer [[HashCodeMessageExtractor]] or * [[HashCodeNoEnvelopeMessageExtractor]]if possible. * - * @tparam E Possibly an Envelope around the messages accepted by the entity actor, is the same as `A` if there is no + * @tparam E Possibly an Envelope around the messages accepted by the entity actor, is the same as `M` if there is no * envelope. - * @tparam A The type of message accepted by the entity actor + * @tparam M The type of message accepted by the entity actor */ -abstract class ShardingMessageExtractor[E, A] { +abstract class ShardingMessageExtractor[E, M] { /** * Extract the entity id from an incoming `message`. If `null` is returned @@ -58,13 +58,8 @@ abstract class ShardingMessageExtractor[E, A] { * message to support wrapping in message envelope that is unwrapped before * sending to the entity actor. */ - def unwrapMessage(message: E): A + def unwrapMessage(message: E): M - /** - * Message sent to an entity to tell it to stop, e.g. when rebalanced. - * The message defined here is not passed to `entityId`, `shardId` or `unwrapMessage`. - */ - def handOffStopMessage: A } /** @@ -73,16 +68,15 @@ abstract class ShardingMessageExtractor[E, A] { * * This is recommended since it does not force details about sharding into the entity protocol * - * @tparam A The type of message accepted by the entity actor + * @tparam M The type of message accepted by the entity actor */ -final class HashCodeMessageExtractor[A]( - val maxNumberOfShards: Int, - override val handOffStopMessage: A) - extends ShardingMessageExtractor[ShardingEnvelope[A], A] { +final class HashCodeMessageExtractor[M]( + val numberOfShards: Int) + extends ShardingMessageExtractor[ShardingEnvelope[M], M] { - override def entityId(envelope: ShardingEnvelope[A]): String = envelope.entityId - override def shardId(entityId: String): String = math.abs(entityId.hashCode % maxNumberOfShards).toString - override def unwrapMessage(envelope: ShardingEnvelope[A]): A = envelope.message + override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId + override def shardId(entityId: String): String = math.abs(entityId.hashCode % numberOfShards).toString + override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message } /** @@ -91,17 +85,16 @@ final class HashCodeMessageExtractor[A]( * * This is recommended since it does not force details about sharding into the entity protocol * - * @tparam A The type of message accepted by the entity actor + * @tparam M The type of message accepted by the entity actor */ -abstract class HashCodeNoEnvelopeMessageExtractor[A]( - val maxNumberOfShards: Int, - override val handOffStopMessage: A) - extends ShardingMessageExtractor[A, A] { +abstract class HashCodeNoEnvelopeMessageExtractor[M]( + val numberOfShards: Int) + extends ShardingMessageExtractor[M, M] { - override def shardId(entityId: String): String = math.abs(entityId.hashCode % maxNumberOfShards).toString - override final def unwrapMessage(message: A): A = message + override def shardId(entityId: String): String = math.abs(entityId.hashCode % numberOfShards).toString + override final def unwrapMessage(message: M): M = message - override def toString = s"HashCodeNoEnvelopeMessageExtractor($maxNumberOfShards)" + override def toString = s"HashCodeNoEnvelopeMessageExtractor($numberOfShards)" } /** @@ -114,5 +107,5 @@ abstract class HashCodeNoEnvelopeMessageExtractor[A]( * The alternative way of routing messages through sharding is to not use envelopes, * and have the message types themselves carry identifiers. */ -final case class ShardingEnvelope[A](entityId: String, message: A) // TODO think if should remain a case class +final case class ShardingEnvelope[M](entityId: String, message: M) // TODO think if should remain a case class diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index 4d4138ad19..a10d5ae926 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -6,15 +6,15 @@ package akka.cluster.sharding.typed package internal import java.net.URLEncoder -import java.util.Optional -import java.util.concurrent.{ CompletionStage, ConcurrentHashMap } +import java.util.concurrent.CompletionStage +import java.util.concurrent.ConcurrentHashMap -import scala.compat.java8.OptionConverters._ import scala.compat.java8.FutureConverters._ import scala.concurrent.Future import akka.actor.ExtendedActorSystem -import akka.actor.{ InternalActorRef, Scheduler } +import akka.actor.InternalActorRef +import akka.actor.Scheduler import akka.actor.typed.ActorContext import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem @@ -28,23 +28,22 @@ import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardRegion import akka.cluster.sharding.ShardRegion.{ StartEntity ⇒ UntypedStartEntity } -import akka.cluster.sharding.typed.javadsl.EntityFactory import akka.cluster.typed.Cluster import akka.event.Logging import akka.event.LoggingAdapter +import akka.japi.function.{ Function ⇒ JFunction } import akka.pattern.AskTimeoutException import akka.pattern.PromiseActorRef -import akka.util.Timeout -import akka.japi.function.{ Function ⇒ JFunction } import akka.util.ByteString +import akka.util.Timeout /** * INTERNAL API * Extracts entityId and unwraps ShardingEnvelope and StartEntity messages. * Other messages are delegated to the given `ShardingMessageExtractor`. */ -@InternalApi private[akka] class ExtractorAdapter[E, A](delegate: ShardingMessageExtractor[E, A]) - extends ShardingMessageExtractor[Any, A] { +@InternalApi private[akka] class ExtractorAdapter[E, M](delegate: ShardingMessageExtractor[E, M]) + extends ShardingMessageExtractor[Any, M] { override def entityId(message: Any): String = { message match { case ShardingEnvelope(entityId, _) ⇒ entityId //also covers UntypedStartEntity in ShardingEnvelope @@ -55,21 +54,19 @@ import akka.util.ByteString override def shardId(entityId: String): String = delegate.shardId(entityId) - override def unwrapMessage(message: Any): A = { + override def unwrapMessage(message: Any): M = { message match { - case ShardingEnvelope(_, msg: A @unchecked) ⇒ + case ShardingEnvelope(_, msg: M @unchecked) ⇒ //also covers UntypedStartEntity in ShardingEnvelope msg case msg: UntypedStartEntity ⇒ - // not really of type A, but erased and StartEntity is only handled internally, not delivered to the entity - msg.asInstanceOf[A] + // not really of type M, but erased and StartEntity is only handled internally, not delivered to the entity + msg.asInstanceOf[M] case msg: E @unchecked ⇒ delegate.unwrapMessage(msg) } } - override def handOffStopMessage: A = delegate.handOffStopMessage - override def toString: String = delegate.toString } @@ -99,35 +96,43 @@ import akka.util.ByteString private val proxies: ConcurrentHashMap[String, String] = new ConcurrentHashMap private val shardCommandActors: ConcurrentHashMap[String, ActorRef[scaladsl.ClusterSharding.ShardCommand]] = new ConcurrentHashMap - override def spawn[A]( - behavior: (ActorRef[scaladsl.ClusterSharding.ShardCommand], String) ⇒ Behavior[A], - entityProps: Props, - typeKey: scaladsl.EntityTypeKey[A], - settings: ClusterShardingSettings, - maxNumberOfShards: Int, - handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = { - val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) - spawnWithMessageExtractor(behavior, entityProps, typeKey, settings, extractor, Some(defaultShardAllocationStrategy(settings))) + // scaladsl impl + override def start[M, E](shardedEntity: scaladsl.ShardedEntity[M, E]): ActorRef[E] = { + val settings = shardedEntity.settings match { + case None ⇒ ClusterShardingSettings(system) + case Some(s) ⇒ s + } + + val extractor = (shardedEntity.messageExtractor match { + case None ⇒ new HashCodeMessageExtractor[M](settings.numberOfShards) + case Some(e) ⇒ e + }).asInstanceOf[ShardingMessageExtractor[E, M]] + + internalStart(shardedEntity.create, shardedEntity.entityProps, shardedEntity.typeKey, + shardedEntity.stopMessage, settings, extractor, shardedEntity.allocationStrategy) } - override def spawn[A]( - behavior: EntityFactory[A], - entityProps: Props, - typeKey: javadsl.EntityTypeKey[A], - settings: ClusterShardingSettings, - maxNumberOfShards: Int, - handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = { - val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) - spawnWithMessageExtractor(behavior, entityProps, typeKey, settings, extractor, - Optional.of(defaultShardAllocationStrategy(settings))) + // javadsl impl + override def start[M, E](shardedEntity: javadsl.ShardedEntity[M, E]): ActorRef[E] = { + import scala.compat.java8.OptionConverters._ + start(new scaladsl.ShardedEntity( + create = (shard, entitityId) ⇒ shardedEntity.createBehavior.apply(shard, entitityId), + typeKey = shardedEntity.typeKey.asScala, + stopMessage = shardedEntity.stopMessage, + entityProps = shardedEntity.entityProps, + settings = shardedEntity.settings.asScala, + messageExtractor = shardedEntity.messageExtractor.asScala, + allocationStrategy = shardedEntity.allocationStrategy.asScala + )) } - def spawnWithMessageExtractor[E, A]( - behavior: (ActorRef[scaladsl.ClusterSharding.ShardCommand], String) ⇒ Behavior[A], + private def internalStart[M, E]( + behavior: (ActorRef[scaladsl.ClusterSharding.ShardCommand], String) ⇒ Behavior[M], entityProps: Props, - typeKey: scaladsl.EntityTypeKey[A], + typeKey: scaladsl.EntityTypeKey[M], + stopMessage: M, settings: ClusterShardingSettings, - extractor: ShardingMessageExtractor[E, A], + extractor: ShardingMessageExtractor[E, M], allocationStrategy: Option[ShardAllocationStrategy]): ActorRef[E] = { val extractorAdapter = new ExtractorAdapter(extractor) @@ -154,7 +159,7 @@ import akka.util.ByteString override def apply(t: String): ActorRef[scaladsl.ClusterSharding.ShardCommand] = { // using untyped.systemActorOf to avoid the Future[ActorRef] system.toUntyped.asInstanceOf[ExtendedActorSystem].systemActorOf( - PropsAdapter(ShardCommandActor.behavior(extractor.handOffStopMessage)), + PropsAdapter(ShardCommandActor.behavior(stopMessage)), URLEncoder.encode(typeKey.name, ByteString.UTF_8) + "ShardCommandDelegator") } }) @@ -170,7 +175,7 @@ import akka.util.ByteString extractEntityId, extractShardId, allocationStrategy.getOrElse(defaultShardAllocationStrategy(settings)), - extractor.handOffStopMessage) + stopMessage) } else { log.info("Starting Shard Region Proxy [{}] (no actors will be hosted on this node) " + "for role [{}] and dataCenter [{}] ...", typeKey.name, settings.role, settings.dataCenter) @@ -183,7 +188,7 @@ import akka.util.ByteString extractShardId) } - val messageClassName = typeKey.asInstanceOf[EntityTypeKeyImpl[A]].messageClassName + val messageClassName = typeKey.asInstanceOf[EntityTypeKeyImpl[M]].messageClassName val typeNames = if (settings.shouldHostShard(cluster)) regions else proxies @@ -196,23 +201,12 @@ import akka.util.ByteString ActorRefAdapter(ref) } - override def spawnWithMessageExtractor[E, A]( - behavior: EntityFactory[A], - entityProps: Props, - typeKey: javadsl.EntityTypeKey[A], - settings: ClusterShardingSettings, - extractor: ShardingMessageExtractor[E, A], - allocationStrategy: Optional[ShardAllocationStrategy]): ActorRef[E] = { - spawnWithMessageExtractor((shard, entityId) ⇒ behavior.apply(shard, entityId), entityProps, typeKey.asScala, - settings, extractor, allocationStrategy.asScala) + override def entityRefFor[M](typeKey: scaladsl.EntityTypeKey[M], entityId: String): scaladsl.EntityRef[M] = { + new EntityRefImpl[M](untypedSharding.shardRegion(typeKey.name), entityId, system.scheduler) } - override def entityRefFor[A](typeKey: scaladsl.EntityTypeKey[A], entityId: String): scaladsl.EntityRef[A] = { - new EntityRefImpl[A](untypedSharding.shardRegion(typeKey.name), entityId, system.scheduler) - } - - override def entityRefFor[A](typeKey: javadsl.EntityTypeKey[A], entityId: String): javadsl.EntityRef[A] = { - new EntityRefImpl[A](untypedSharding.shardRegion(typeKey.name), entityId, system.scheduler) + override def entityRefFor[M](typeKey: javadsl.EntityTypeKey[M], entityId: String): javadsl.EntityRef[M] = { + new EntityRefImpl[M](untypedSharding.shardRegion(typeKey.name), entityId, system.scheduler) } override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = { @@ -226,14 +220,14 @@ import akka.util.ByteString /** * INTERNAL API */ -@InternalApi private[akka] final class EntityRefImpl[A](shardRegion: akka.actor.ActorRef, entityId: String, +@InternalApi private[akka] final class EntityRefImpl[M](shardRegion: akka.actor.ActorRef, entityId: String, scheduler: Scheduler) - extends javadsl.EntityRef[A] with scaladsl.EntityRef[A] { + extends javadsl.EntityRef[M] with scaladsl.EntityRef[M] { - override def tell(msg: A): Unit = + override def tell(msg: M): Unit = shardRegion ! ShardingEnvelope(entityId, msg) - override def ask[U](message: (ActorRef[U]) ⇒ A)(implicit timeout: Timeout): Future[U] = { + override def ask[U](message: ActorRef[U] ⇒ M)(implicit timeout: Timeout): Future[U] = { val replyTo = new EntityPromiseRef[U](shardRegion.asInstanceOf[InternalActorRef], timeout) val m = message(replyTo.ref) if (replyTo.promiseRef ne null) replyTo.promiseRef.messageClassName = m.getClass.getName @@ -241,7 +235,7 @@ import akka.util.ByteString replyTo.future } - def ask[U](message: JFunction[ActorRef[U], A], timeout: Timeout): CompletionStage[U] = + def ask[U](message: JFunction[ActorRef[U], M], timeout: Timeout): CompletionStage[U] = ask[U](replyTo ⇒ message.apply(replyTo))(timeout).toJava /** Similar to [[akka.actor.typed.scaladsl.AskPattern.PromiseRef]] but for an `EntityRef` target. */ diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index c03131a25b..f138e00cef 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -7,6 +7,7 @@ package javadsl import java.util.Optional import java.util.concurrent.CompletionStage +import java.util.function.BiFunction import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem @@ -20,8 +21,8 @@ import akka.japi.function.{ Function ⇒ JFunction } import akka.util.Timeout @FunctionalInterface -trait EntityFactory[A] { - def apply(shardRegion: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[A] +trait EntityFactory[M] { + def apply(shardRegion: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[M] } object ClusterSharding { @@ -32,7 +33,7 @@ object ClusterSharding { * When an entity is created an `ActorRef[ShardCommand]` is passed to the * factory method. The entity can request passivation by sending the [[Passivate]] * message to this ref. Sharding will then send back the specified - * `handOffStopMessage` message to the entity, which is then supposed to stop itself. + * `stopMessage` message to the entity, which is then supposed to stop itself. * * Not for user extension. */ @@ -42,10 +43,10 @@ object ClusterSharding { * The entity can request passivation by sending the [[Passivate]] message * to the `ActorRef[ShardCommand]` that was passed in to the factory method * when creating the entity. Sharding will then send back the specified - * `handOffStopMessage` message to the entity, which is then supposed to stop + * `stopMessage` message to the entity, which is then supposed to stop * itself. */ - final case class Passivate[A](entity: ActorRef[A]) extends ShardCommand + final case class Passivate[M](entity: ActorRef[M]) extends ShardCommand } /** @@ -81,7 +82,7 @@ object ClusterSharding { * to extract the entity identifier and the shard identifier from incoming messages. * A shard is a group of entities that will be managed together. For the first message in a * specific shard the `ShardRegion` requests the location of the shard from a central coordinator, - * the [[ShardCoordinator]]. The `ShardCoordinator` decides which `ShardRegion` + * the [[akka.cluster.sharding.ShardCoordinator]]. The `ShardCoordinator` decides which `ShardRegion` * owns the shard. The `ShardRegion` receives the decided home of the shard * and if that is the `ShardRegion` instance itself it will create a local child * actor representing the entity and direct all messages for that entity to it. @@ -115,7 +116,7 @@ object ClusterSharding { * location. * * The logic that decides which shards to rebalance is defined in a plugable shard - * allocation strategy. The default implementation [[LeastShardAllocationStrategy]] + * allocation strategy. The default implementation [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] * picks shards for handoff from the `ShardRegion` with most number of previously allocated shards. * They will then be allocated to the `ShardRegion` with least number of previously allocated shards, * i.e. new members in the cluster. There is a configurable threshold of how large the difference @@ -148,9 +149,9 @@ object ClusterSharding { * the entity actors for example by defining receive timeout (`context.setReceiveTimeout`). * If a message is already enqueued to the entity when it stops itself the enqueued message * in the mailbox will be dropped. To support graceful passivation without losing such - * messages the entity actor can send [[ClusterSharding.Passivate]] to the `ActorRef[ShardCommand]` + * messages the entity actor can send [[ClusterSharding#Passivate]] to the `ActorRef[ShardCommand]` * that was passed in to the factory method when creating the entity.. - * The specified `handOffStopMessage` message will be sent back to the entity, which is + * The specified `stopMessage` message will be sent back to the entity, which is * then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion` * between reception of `Passivate` and termination of the entity. Such buffered messages * are thereafter delivered to a new incarnation of the entity. @@ -163,47 +164,15 @@ object ClusterSharding { abstract class ClusterSharding { /** - * Spawn a shard region or a proxy depending on if the settings require role and if this node has + * Initialize sharding for the given `shardedEntity` factory settings. + * + * It will start a shard region or a proxy depending on if the settings require role and if this node has * such a role. * - * Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the - * recipient actor. - * A [[HashCodeMessageExtractor]] will be used for extracting entityId and shardId - * [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy. - * - * @param behavior Create the behavior for an entity given a entityId - * @param typeKey A key that uniquely identifies the type of entity in this cluster - * @param handOffStopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced. - * @tparam A The type of command the entity accepts - */ - def spawn[A]( - behavior: EntityFactory[A], - props: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - maxNumberOfShards: Int, - handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] - - /** - * Spawn a shard region or a proxy depending on if the settings require role and if this node - * has such a role. - * - * @param behavior Create the behavior for an entity given a entityId - * @param typeKey A key that uniquely identifies the type of entity in this cluster - * @param entityProps Props to apply when starting an entity - * @param messageExtractor Extract entityId, shardId, and unwrap messages. - * @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards, - * [[ClusterSharding#defaultShardAllocationStrategy]] is used if `Optional.empty` + * @tparam M The type of message the entity accepts * @tparam E A possible envelope around the message the entity accepts - * @tparam A The type of command the entity accepts */ - def spawnWithMessageExtractor[E, A]( - behavior: EntityFactory[A], - entityProps: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - messageExtractor: ShardingMessageExtractor[E, A], - allocationStrategy: Optional[ShardAllocationStrategy]): ActorRef[E] + def start[M, E](shardedEntity: ShardedEntity[M, E]): ActorRef[E] /** * Create an `ActorRef`-like reference to a specific sharded entity. @@ -214,12 +183,108 @@ abstract class ClusterSharding { * * For in-depth documentation of its semantics, see [[EntityRef]]. */ - def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A] + def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String): EntityRef[M] - /** The default ShardAllocationStrategy currently is [[LeastShardAllocationStrategy]] however could be changed in the future. */ + /** + * The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the + * given `settings`. This could be changed in the future. + */ def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy } +object ShardedEntity { + + /** + * Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional + * settings can be defined using the `with` methods of the returned [[ShardedEntity]]. + * + * @param createBehavior Create the behavior for an entity given an entityId + * @param typeKey A key that uniquely identifies the type of entity in this cluster + * @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated. + * + * @tparam M The type of message the entity accepts + */ + def create[M]( + createBehavior: JFunction[String, Behavior[M]], + typeKey: EntityTypeKey[M], + stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] = { + create(new BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]] { + override def apply(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[M] = + createBehavior.apply(entityId) + }, typeKey, stopMessage) + } + + /** + * Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional + * settings can be defined using the `with` methods of the returned [[ShardedEntity]]. + * + * @param createBehavior Create the behavior for an entity given `ShardCommand` ref and an entityId + * @param typeKey A key that uniquely identifies the type of entity in this cluster + * @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated. + * @tparam M The type of message the entity accepts + */ + def create[M]( + createBehavior: BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]], + typeKey: EntityTypeKey[M], + stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] = + new ShardedEntity(createBehavior, typeKey, stopMessage, Props.empty, Optional.empty(), Optional.empty(), Optional.empty()) +} + +/** + * Defines how the entity should be created. Used in [[ClusterSharding#start]]. + */ +final class ShardedEntity[M, E] private[akka] ( + val createBehavior: BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]], + val typeKey: EntityTypeKey[M], + val stopMessage: M, + val entityProps: Props, + val settings: Optional[ClusterShardingSettings], + val messageExtractor: Optional[ShardingMessageExtractor[E, M]], + val allocationStrategy: Optional[ShardAllocationStrategy]) { + + /** + * [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings. + */ + def withEntityProps(newEntityProps: Props): ShardedEntity[M, E] = + copy(entityProps = newEntityProps) + + /** + * Additional settings, typically loaded from configuration. + */ + def withSettings(newSettings: ClusterShardingSettings): ShardedEntity[M, E] = + copy(settings = Optional.ofNullable(newSettings)) + + /** + * + * If a `messageExtractor` is not specified the messages are sent to the entities by wrapping + * them in [[ShardingEnvelope]] with the entityId of the recipient actor. That envelope + * is used by the [[HashCodeMessageExtractor]] for extracting entityId and shardId. The number of + * shards is then defined by `numberOfShards` in `ClusterShardingSettings`, which by default + * is configured with `akka.cluster.sharding.number-of-shards`. + */ + def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): ShardedEntity[M, Envelope] = + new ShardedEntity(createBehavior, typeKey, stopMessage, entityProps, settings, Optional.ofNullable(newExtractor), allocationStrategy) + + /** + * Allocation strategy which decides on which nodes to allocate new shards, + * [[ClusterSharding#defaultShardAllocationStrategy]] is used if this is not specified. + */ + def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): ShardedEntity[M, E] = + copy(allocationStrategy = Optional.ofNullable(newAllocationStrategy)) + + private def copy( + create: BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]] = createBehavior, + typeKey: EntityTypeKey[M] = typeKey, + stopMessage: M = stopMessage, + entityProps: Props = entityProps, + settings: Optional[ClusterShardingSettings] = settings, + allocationStrategy: Optional[ShardAllocationStrategy] = allocationStrategy + ): ShardedEntity[M, E] = { + new ShardedEntity(create, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy) + } + +} + /** Allows starting a specific Sharded Entity by its entity identifier */ object StartEntity { @@ -227,8 +292,8 @@ object StartEntity { * Returns [[ShardingEnvelope]] which can be sent via Cluster Sharding in order to wake up the * specified (by `entityId`) Sharded Entity, ''without'' delivering a real message to it. */ - def create[A](msgClass: Class[A], entityId: String): ShardingEnvelope[A] = - scaladsl.StartEntity[A](entityId) + def create[M](msgClass: Class[M], entityId: String): ShardingEnvelope[M] = + scaladsl.StartEntity[M](entityId) } /** @@ -268,23 +333,23 @@ object EntityTypeKey { * * Not for user extension. */ -@DoNotInherit abstract class EntityRef[A] { scaladslSelf: scaladsl.EntityRef[A] ⇒ +@DoNotInherit abstract class EntityRef[M] { scaladslSelf: scaladsl.EntityRef[M] ⇒ /** * Send a message to the entity referenced by this EntityRef using *at-most-once* * messaging semantics. */ - def tell(msg: A): Unit + def tell(msg: M): Unit /** * Allows to "ask" the [[EntityRef]] for a reply. * See [[akka.actor.typed.javadsl.AskPattern]] for a complete write-up of this pattern */ - def ask[U](message: JFunction[ActorRef[U], A], timeout: Timeout): CompletionStage[U] + def ask[U](message: JFunction[ActorRef[U], M], timeout: Timeout): CompletionStage[U] /** * INTERNAL API */ - @InternalApi private[akka] def asScala: scaladsl.EntityRef[A] = scaladslSelf + @InternalApi private[akka] def asScala: scaladsl.EntityRef[M] = scaladslSelf } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index 36f3d36f3e..91e48d529b 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -33,7 +33,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] { * When an entity is created an `ActorRef[ShardCommand]` is passed to the * factory method. The entity can request passivation by sending the [[Passivate]] * message to this ref. Sharding will then send back the specified - * `handOffStopMessage` message to the entity, which is then supposed to stop itself. + * `stopMessage` message to the entity, which is then supposed to stop itself. * * Not for user extension. */ @@ -43,10 +43,10 @@ object ClusterSharding extends ExtensionId[ClusterSharding] { * The entity can request passivation by sending the [[Passivate]] message * to the `ActorRef[ShardCommand]` that was passed in to the factory method * when creating the entity. Sharding will then send back the specified - * `handOffStopMessage` message to the entity, which is then supposed to stop + * `stopMessage` message to the entity, which is then supposed to stop * itself. */ - final case class Passivate[A](entity: ActorRef[A]) extends ShardCommand with javadsl.ClusterSharding.ShardCommand + final case class Passivate[M](entity: ActorRef[M]) extends ShardCommand with javadsl.ClusterSharding.ShardCommand } @@ -83,7 +83,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] { * to extract the entity identifier and the shard identifier from incoming messages. * A shard is a group of entities that will be managed together. For the first message in a * specific shard the `ShardRegion` requests the location of the shard from a central coordinator, - * the [[ShardCoordinator]]. The `ShardCoordinator` decides which `ShardRegion` + * the [[akka.cluster.sharding.ShardCoordinator]]. The `ShardCoordinator` decides which `ShardRegion` * owns the shard. The `ShardRegion` receives the decided home of the shard * and if that is the `ShardRegion` instance itself it will create a local child * actor representing the entity and direct all messages for that entity to it. @@ -117,7 +117,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] { * location. * * The logic that decides which shards to rebalance is defined in a plugable shard - * allocation strategy. The default implementation [[LeastShardAllocationStrategy]] + * allocation strategy. The default implementation [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] * picks shards for handoff from the `ShardRegion` with most number of previously allocated shards. * They will then be allocated to the `ShardRegion` with least number of previously allocated shards, * i.e. new members in the cluster. There is a configurable threshold of how large the difference @@ -152,7 +152,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] { * in the mailbox will be dropped. To support graceful passivation without losing such * messages the entity actor can send [[ClusterSharding.Passivate]] to the `ActorRef[ShardCommand]` * that was passed in to the factory method when creating the entity.. - * The specified `handOffStopMessage` message will be sent back to the entity, which is + * The specified `stopMessage` message will be sent back to the entity, which is * then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion` * between reception of `Passivate` and termination of the entity. Such buffered messages * are thereafter delivered to a new incarnation of the entity. @@ -163,50 +163,17 @@ object ClusterSharding extends ExtensionId[ClusterSharding] { */ @DoNotInherit trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding ⇒ - import ClusterSharding.ShardCommand /** - * Spawn a shard region or a proxy depending on if the settings require role and if this node has + * Initialize sharding for the given `shardedEntity` factory settings. + * + * It will start a shard region or a proxy depending on if the settings require role and if this node has * such a role. * - * Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the - * recipient actor. - * A [[HashCodeMessageExtractor]] will be used for extracting entityId and shardId - * [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy. - * - * @param behavior Create the behavior for an entity given a entityId - * @param typeKey A key that uniquely identifies the type of entity in this cluster - * @param handOffStopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced. - * @tparam A The type of command the entity accepts - */ - def spawn[A]( - behavior: (ActorRef[ShardCommand], String) ⇒ Behavior[A], - props: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - maxNumberOfShards: Int, - handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] - - /** - * Spawn a shard region or a proxy depending on if the settings require role and if this node - * has such a role. - * - * @param behavior Create the behavior for an entity given a entityId - * @param typeKey A key that uniquely identifies the type of entity in this cluster - * @param entityProps Props to apply when starting an entity - * @param messageExtractor Extract entityId, shardId, and unwrap messages. - * @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards, - * [[ClusterSharding#defaultShardAllocationStrategy]] is used if `None` + * @tparam M The type of message the entity accepts * @tparam E A possible envelope around the message the entity accepts - * @tparam A The type of command the entity accepts */ - def spawnWithMessageExtractor[E, A]( - behavior: (ActorRef[ShardCommand], String) ⇒ Behavior[A], - entityProps: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - messageExtractor: ShardingMessageExtractor[E, A], - allocationStrategy: Option[ShardAllocationStrategy]): ActorRef[E] + def start[M, E](shardedEntity: ShardedEntity[M, E]): ActorRef[E] /** * Create an `ActorRef`-like reference to a specific sharded entity. @@ -217,9 +184,12 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding * * For in-depth documentation of its semantics, see [[EntityRef]]. */ - def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A] + def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String): EntityRef[M] - /** The default ShardAllocationStrategy currently is [[LeastShardAllocationStrategy]] however could be changed in the future. */ + /** + * The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the + * given `settings`. This could be changed in the future. + */ def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy /** @@ -228,6 +198,95 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding @InternalApi private[akka] def asJava: javadsl.ClusterSharding = javadslSelf } +object ShardedEntity { + + /** + * Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional + * settings can be defined using the `with` methods of the returned [[ShardedEntity]]. + * + * @param create Create the behavior for an entity given an entityId + * @param typeKey A key that uniquely identifies the type of entity in this cluster + * @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated. + * + * @tparam M The type of message the entity accepts + */ + def apply[M]( + create: String ⇒ Behavior[M], + typeKey: EntityTypeKey[M], + stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] = + apply((_, entityId) ⇒ create(entityId), typeKey, stopMessage) + + /** + * Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional + * settings can be defined using the `with` methods of the returned [[ShardedEntity]]. + * + * @param create Create the behavior for an entity given `ShardCommand` ref and an entityId + * @param typeKey A key that uniquely identifies the type of entity in this cluster + * @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated. + * @tparam M The type of message the entity accepts + */ + def apply[M]( + create: (ActorRef[ClusterSharding.ShardCommand], String) ⇒ Behavior[M], + typeKey: EntityTypeKey[M], + stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] = + new ShardedEntity(create, typeKey, stopMessage, Props.empty, None, None, None) +} + +/** + * Defines how the entity should be created. Used in [[ClusterSharding#start]]. + */ +final class ShardedEntity[M, E] private[akka] ( + val create: (ActorRef[ClusterSharding.ShardCommand], String) ⇒ Behavior[M], + val typeKey: EntityTypeKey[M], + val stopMessage: M, + val entityProps: Props, + val settings: Option[ClusterShardingSettings], + val messageExtractor: Option[ShardingMessageExtractor[E, M]], + val allocationStrategy: Option[ShardAllocationStrategy]) { + + /** + * [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings. + */ + def withEntityProps(newEntityProps: Props): ShardedEntity[M, E] = + copy(entityProps = newEntityProps) + + /** + * Additional settings, typically loaded from configuration. + */ + def withSettings(newSettings: ClusterShardingSettings): ShardedEntity[M, E] = + copy(settings = Option(newSettings)) + + /** + * + * If a `messageExtractor` is not specified the messages are sent to the entities by wrapping + * them in [[ShardingEnvelope]] with the entityId of the recipient actor. That envelope + * is used by the [[HashCodeMessageExtractor]] for extracting entityId and shardId. The number of + * shards is then defined by `numberOfShards` in `ClusterShardingSettings`, which by default + * is configured with `akka.cluster.sharding.number-of-shards`. + */ + def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): ShardedEntity[M, Envelope] = + new ShardedEntity(create, typeKey, stopMessage, entityProps, settings, Option(newExtractor), allocationStrategy) + + /** + * Allocation strategy which decides on which nodes to allocate new shards, + * [[ClusterSharding#defaultShardAllocationStrategy]] is used if this is not specified. + */ + def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): ShardedEntity[M, E] = + copy(allocationStrategy = Option(newAllocationStrategy)) + + private def copy( + create: (ActorRef[ClusterSharding.ShardCommand], String) ⇒ Behavior[M] = create, + typeKey: EntityTypeKey[M] = typeKey, + stopMessage: M = stopMessage, + entityProps: Props = entityProps, + settings: Option[ClusterShardingSettings] = settings, + allocationStrategy: Option[ShardAllocationStrategy] = allocationStrategy + ): ShardedEntity[M, E] = { + new ShardedEntity(create, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy) + } + +} + /** Allows starting a specific Sharded Entity by its entity identifier */ object StartEntity { @@ -235,9 +294,9 @@ object StartEntity { * Returns [[ShardingEnvelope]] which can be sent via Cluster Sharding in order to wake up the * specified (by `entityId`) Sharded Entity, ''without'' delivering a real message to it. */ - def apply[A](entityId: String): ShardingEnvelope[A] = { - // StartEntity isn't really of type A, but erased and StartEntity is only handled internally, not delivered to the entity - new ShardingEnvelope[A](entityId, UntypedStartEntity(entityId).asInstanceOf[A]) + def apply[M](entityId: String): ShardingEnvelope[M] = { + // StartEntity isn't really of type M, but erased and StartEntity is only handled internally, not delivered to the entity + new ShardingEnvelope[M](entityId, UntypedStartEntity(entityId).asInstanceOf[M]) } } @@ -271,7 +330,7 @@ object EntityTypeKey { * [[ActorRef]] and watch it in case such notification is desired. * Not for user extension. */ -@DoNotInherit trait EntityRef[A] { +@DoNotInherit trait EntityRef[M] { /** * Send a message to the entity referenced by this EntityRef using *at-most-once* @@ -283,7 +342,7 @@ object EntityTypeKey { * target.tell("Hello") * }}} */ - def tell(msg: A): Unit + def tell(msg: M): Unit /** * Send a message to the entity referenced by this EntityRef using *at-most-once* @@ -295,7 +354,7 @@ object EntityTypeKey { * target ! "Hello" * }}} */ - def !(msg: A): Unit = this.tell(msg) + def !(msg: M): Unit = this.tell(msg) /** * Allows to "ask" the [[EntityRef]] for a reply. @@ -313,7 +372,7 @@ object EntityTypeKey { * * Please note that an implicit [[akka.util.Timeout]] must be available to use this pattern. */ - def ask[U](f: ActorRef[U] ⇒ A)(implicit timeout: Timeout): Future[U] + def ask[U](f: ActorRef[U] ⇒ M)(implicit timeout: Timeout): Future[U] /** * Allows to "ask" the [[EntityRef]] for a reply. @@ -331,7 +390,7 @@ object EntityTypeKey { * * Please note that an implicit [[akka.util.Timeout]] must be available to use this pattern. */ - def ?[U](message: ActorRef[U] ⇒ A)(implicit timeout: Timeout): Future[U] = + def ?[U](message: ActorRef[U] ⇒ M)(implicit timeout: Timeout): Future[U] = this.ask(message)(timeout) } diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala index 963fc03081..878983e957 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala @@ -7,6 +7,7 @@ package akka.cluster.sharding.typed import akka.actor.typed.{ ActorRef, Props } import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.ShardedEntity import akka.cluster.typed.{ MultiDcClusterActors, MultiNodeTypedClusterSpec } import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } import akka.actor.testkit.typed.scaladsl.TestProbe @@ -26,6 +27,7 @@ object MultiDcClusterShardingSpecConfig extends MultiNodeConfig { """ akka.loglevel = DEBUG akka.cluster.sharding { + number-of-shards = 10 # First is likely to be ignored as shard coordinator not ready retry-interval = 0.2s } @@ -66,14 +68,11 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh "start sharding" in { val sharding = ClusterSharding(typedSystem) - val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.spawn( - (_, _) ⇒ multiDcPinger, - Props.empty, - typeKey = typeKey, - ClusterShardingSettings(typedSystem), - 10, - NoMore - ) + val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.start( + ShardedEntity( + _ ⇒ multiDcPinger, + typeKey, + NoMore)) val probe = TestProbe[Pong] shardRegion ! ShardingEnvelope(entityId, Ping(probe.ref)) probe.expectMessage(Pong(cluster.selfMember.dataCenter)) @@ -99,14 +98,12 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh "be able to message cross dc via proxy" in { runOn(first, second) { - val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).spawn( - (_, _) ⇒ multiDcPinger, - Props.empty, - typeKey = typeKey, - ClusterShardingSettings(typedSystem).withDataCenter("dc2"), - 10, - NoMore - ) + val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).start( + ShardedEntity( + _ ⇒ multiDcPinger, + typeKey, + NoMore) + .withSettings(ClusterShardingSettings(typedSystem).withDataCenter("dc2"))) val probe = TestProbe[Pong] proxy ! ShardingEnvelope(entityId, Ping(probe.ref)) probe.expectMessage(remainingOrDefault, Pong("dc2")) diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java index 16b519702d..eefb95f6ba 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java @@ -18,9 +18,14 @@ import akka.cluster.sharding.typed.ShardingEnvelope; import akka.cluster.sharding.typed.javadsl.ClusterSharding; import akka.cluster.sharding.typed.javadsl.EntityTypeKey; import akka.cluster.sharding.typed.javadsl.EntityRef; +import akka.cluster.sharding.typed.javadsl.ShardedEntity; //#import +import jdocs.akka.persistence.typed.InDepthPersistentBehaviorTest.BlogCommand; +import jdocs.akka.persistence.typed.InDepthPersistentBehaviorTest.BlogBehavior; +import jdocs.akka.persistence.typed.InDepthPersistentBehaviorTest.PassivatePost; + public class ShardingCompileOnlyTest { //#counter-messages @@ -82,13 +87,31 @@ public class ShardingCompileOnlyTest { return Behaviors.same(); }) .onMessage(GoodByeCounter.class, (ctx, msg) -> { - // the handOffStopMessage, used for rebalance and passivate + // the stopMessage, used for rebalance and passivate return Behaviors.stopped(); }) .build(); } //#counter-passivate + public static void startPassivateExample() { + ActorSystem system = ActorSystem.create( + Behaviors.empty(), "ShardingExample" + ); + ClusterSharding sharding = ClusterSharding.get(system); + + //#counter-passivate-start + + EntityTypeKey typeKey = EntityTypeKey.create(CounterCommand.class, "Counter"); + + sharding.start( + ShardedEntity.create( + (shard, entityId) -> counter2(shard, entityId), + typeKey, + new GoodByeCounter())); + //#counter-passivate-start + } + public static void example() { ActorSystem system = ActorSystem.create( @@ -99,16 +122,15 @@ public class ShardingCompileOnlyTest { ClusterSharding sharding = ClusterSharding.get(system); //#sharding-extension - //#spawn + //#start EntityTypeKey typeKey = EntityTypeKey.create(CounterCommand.class, "Counter"); - ActorRef> shardRegion = sharding.spawn( - (shard, entityId) -> counter(entityId,0), - Props.empty(), - typeKey, - ClusterShardingSettings.create(system), - 10, - new GoodByeCounter()); - //#spawn + + ActorRef> shardRegion = sharding.start( + ShardedEntity.create( + entityId -> counter(entityId,0), + typeKey, + new GoodByeCounter())); + //#start //#send EntityRef counterOne = sharding.entityRefFor(typeKey, "counter-`"); @@ -117,4 +139,21 @@ public class ShardingCompileOnlyTest { shardRegion.tell(new ShardingEnvelope<>("counter-1", new Increment())); //#send } + + public static void persistenceExample() { + ActorSystem system = ActorSystem.create( + Behaviors.empty(), "ShardingExample" + ); + ClusterSharding sharding = ClusterSharding.get(system); + + //#persistence + EntityTypeKey blogTypeKey = EntityTypeKey.create(BlogCommand.class, "BlogPost"); + + sharding.start( + ShardedEntity.create( + BlogBehavior::behavior, + blogTypeKey, + new PassivatePost())); + //#persistence + } } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala index f539e0d31e..ee87b1f51a 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala @@ -70,13 +70,11 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh Cluster(system).manager ! Join(Cluster(system).selfMember.address) "start persistent actor" in { - ClusterSharding(system).spawn[Command]( - (_, entityId) ⇒ persistentActor(entityId), - Props.empty, + ClusterSharding(system).start(ShardedEntity( + entityId ⇒ persistentActor(entityId), typeKey, - ClusterShardingSettings(system), - maxNumberOfShards = 100, - handOffStopMessage = StopPlz) + StopPlz + )) val p = TestProbe[String]() diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala index 399e3452a2..efb090c28e 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala @@ -44,6 +44,8 @@ object ClusterShardingSpec { akka.cluster.jmx.multi-mbeans-in-same-jvm = on + akka.cluster.sharding.number-of-shards = 10 + akka.coordinated-shutdown.terminate-actor-system = off akka.actor { @@ -176,45 +178,38 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. Behaviors.same } - private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.spawn( + private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(ShardedEntity( (shard, _) ⇒ behavior(shard), - Props.empty, typeKey, - ClusterShardingSettings(system), - 10, - StopPlz()) + StopPlz())) - private val shardingRef2 = sharding2.spawn( + private val shardingRef2 = sharding2.start(ShardedEntity( (shard, _) ⇒ behavior(shard), - Props.empty, typeKey, - ClusterShardingSettings(system2), - 10, - StopPlz()) + StopPlz())) - private val shardingRef3: ActorRef[IdTestProtocol] = sharding.spawnWithMessageExtractor( + private val shardingRef3: ActorRef[IdTestProtocol] = sharding.start(ShardedEntity( (shard, _) ⇒ behaviorWithId(shard), - Props.empty, typeKey2, - ClusterShardingSettings(system), - ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { + IdStopPlz()) + .withMessageExtractor(ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { case IdReplyPlz(id, _) ⇒ id case IdWhoAreYou(id, _) ⇒ id case other ⇒ throw new IllegalArgumentException(s"Unexpected message $other") - }, - None) + }) + ) - private val shardingRef4 = sharding2.spawnWithMessageExtractor( + private val shardingRef4 = sharding2.start(ShardedEntity( (shard, _) ⇒ behaviorWithId(shard), - Props.empty, typeKey2, - ClusterShardingSettings(system2), - ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { - case IdReplyPlz(id, _) ⇒ id - case IdWhoAreYou(id, _) ⇒ id - case other ⇒ throw new IllegalArgumentException(s"Unexpected message $other") - }, - None) + IdStopPlz()) + .withMessageExtractor( + ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { + case IdReplyPlz(id, _) ⇒ id + case IdWhoAreYou(id, _) ⇒ id + case other ⇒ throw new IllegalArgumentException(s"Unexpected message $other") + }) + ) def totalEntityCount1(): Int = { import akka.pattern.ask @@ -263,13 +258,10 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. val p = TestProbe[String]() val typeKey3 = EntityTypeKey[TestProtocol]("passivate-test") - val shardingRef3: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.spawn( + val shardingRef3: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(ShardedEntity( (shard, _) ⇒ behavior(shard, Some(stopProbe.ref)), - Props.empty, typeKey3, - ClusterShardingSettings(system), - 10, - StopPlz()) + StopPlz())) shardingRef3 ! ShardingEnvelope(s"test1", ReplyPlz(p.ref)) p.expectMessage("Hello!") @@ -284,13 +276,10 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. "fail if starting sharding for already used typeName, but with a different type" in { // sharding has been already started with EntityTypeKey[TestProtocol]("envelope-shard") val ex = intercept[Exception] { - sharding.spawn( + sharding.start(ShardedEntity( (shard, _) ⇒ behaviorWithId(shard), - Props.empty, EntityTypeKey[IdTestProtocol]("envelope-shard"), - ClusterShardingSettings(system), - 10, - IdStopPlz()) + IdStopPlz())) } ex.getMessage should include("already spawned") @@ -350,7 +339,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. } } - "use the handOffStopMessage for leaving/rebalance" in { + "use the stopMessage for leaving/rebalance" in { var replies1 = Set.empty[String] (1 to 10).foreach { n ⇒ val p = TestProbe[String]() diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala index 37d9fb658e..660b6be1c3 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala @@ -5,8 +5,10 @@ package docs.akka.cluster.sharding.typed import scala.concurrent.duration._ + import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } import akka.actor.typed.scaladsl.Behaviors +import akka.cluster.sharding.typed.scaladsl.ShardedEntity import docs.akka.persistence.typed.InDepthPersistentBehaviorSpec import docs.akka.persistence.typed.InDepthPersistentBehaviorSpec.{ BlogCommand, PassivatePost } @@ -45,17 +47,14 @@ object ShardingCompileOnlySpec { } //#counter - //#spawn + //#start val TypeKey = EntityTypeKey[CounterCommand]("Counter") - // if a extractor is defined then the type would be ActorRef[BasicCommand] - val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.spawn( - behavior = (shard, entityId) ⇒ counter(entityId, 0), - props = Props.empty, + + val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.start(ShardedEntity( + create = entityId ⇒ counter(entityId, 0), typeKey = TypeKey, - settings = ClusterShardingSettings(system), - maxNumberOfShards = 10, - handOffStopMessage = GoodByeCounter) - //#spawn + stopMessage = GoodByeCounter)) + //#start //#send // With an EntityRef @@ -68,14 +67,12 @@ object ShardingCompileOnlySpec { import InDepthPersistentBehaviorSpec.behavior //#persistence - val ShardingTypeName = EntityTypeKey[BlogCommand]("BlogPost") - ClusterSharding(system).spawn[BlogCommand]( - behavior = (shard, entityId) ⇒ behavior(entityId), - props = Props.empty, - typeKey = ShardingTypeName, - settings = ClusterShardingSettings(system), - maxNumberOfShards = 100, - handOffStopMessage = PassivatePost) + val BlogTypeKey = EntityTypeKey[BlogCommand]("BlogPost") + + ClusterSharding(system).start(ShardedEntity( + create = entityId ⇒ behavior(entityId), + typeKey = BlogTypeKey, + stopMessage = PassivatePost)) //#persistence //#counter-passivate @@ -97,7 +94,7 @@ object ShardingCompileOnlySpec { shard ! ClusterSharding.Passivate(ctx.self) Behaviors.same case GoodByeCounter ⇒ - // the handOffStopMessage, used for rebalance and passivate + // the stopMessage, used for rebalance and passivate Behaviors.stopped } @@ -105,6 +102,11 @@ object ShardingCompileOnlySpec { become(0) } } + + sharding.start(ShardedEntity( + create = (shard, entityId) ⇒ counter2(shard, entityId), + typeKey = TypeKey, + stopMessage = GoodByeCounter)) //#counter-passivate } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 2428debf20..9882a6bc3c 100755 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -539,6 +539,10 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { } } + /** + * The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the + * given `settings`. This could be changed in the future. + */ def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = { val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala index 7d5a0e5286..7da3fc01b2 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala @@ -132,6 +132,8 @@ private[akka] object ClusterSingletonImpl { @DoNotInherit abstract class ClusterSingleton extends Extension { + // FIXME align with ClusterSharding API, issue #25480 + /** * Start if needed and provide a proxy to a named singleton * diff --git a/akka-docs/src/main/paradox/cluster-sharding.md b/akka-docs/src/main/paradox/cluster-sharding.md index afba0174c5..b4e23ca319 100644 --- a/akka-docs/src/main/paradox/cluster-sharding.md +++ b/akka-docs/src/main/paradox/cluster-sharding.md @@ -209,7 +209,7 @@ That means they will start buffering incoming messages for that shard, in the sa shard location is unknown. During the rebalance process the coordinator will not answer any requests for the location of shards that are being rebalanced, i.e. local buffering will continue until the handoff is completed. The `ShardRegion` responsible for the rebalanced shard -will stop all entities in that shard by sending the specified `handOffStopMessage` +will stop all entities in that shard by sending the specified `stopMessage` (default `PoisonPill`) to them. When all entities have been terminated the `ShardRegion` owning the entities will acknowledge the handoff as completed to the coordinator. Thereafter the coordinator will reply to requests for the location of diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 998fa09090..63de2beb42 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -45,10 +45,10 @@ Java Each Entity type has a key that is then used to retrieve an EntityRef for a given entity identifier. Scala -: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #spawn } +: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #start } Java -: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #spawn } +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #start } Messages to a specific entity are then sent via an EntityRef. It is also possible to wrap methods in a `ShardingEnvelop` or define extractor functions and send messages directly to the shard region. @@ -62,8 +62,7 @@ Java ## Persistence example When using sharding entities can be moved to different nodes in the cluster. Persistence can be used to recover the state of -an actor after it has moved. Currently Akka typed only has a Scala API for persistence, you can track the progress of the -Java API [here](https://github.com/akka/akka/issues/24193). +an actor after it has moved. Taking the larger example from the @ref:[persistence documentation](persistence.md#larger-example) and making it into a sharded entity is the same as for a non persistent behavior. The behavior: @@ -71,11 +70,17 @@ a sharded entity is the same as for a non persistent behavior. The behavior: Scala : @@snip [InDepthPersistentBehaviorSpec.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #behavior } +Java +: @@snip [InDepthPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #behavior } + To create the entity: Scala : @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #persistence } +Java +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #persistence } + Sending messages to entities is the same as the example above. The only difference is when an entity is moved the state will be restored. See @ref:[persistence](persistence.md) for more details. @@ -97,4 +102,5 @@ Scala : @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-messages #counter-passivate } Java -: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter-passivate } +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter-passivate #counter-passivate-start } + diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java index 06ff703a18..dc5b00ecb6 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java @@ -133,7 +133,6 @@ public class InDepthPersistentBehaviorTest { } } public static class PassivatePost implements BlogCommand { - } public static class PostContent implements BlogCommand { final String postId;