From 5fab2b4521c602d6de755f52d60bbc6119e59dbf Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 9 Jun 2015 16:02:19 +0200 Subject: [PATCH] !cls #16422 Rename shardResolver and idExtractor --- .../cluster/sharding/ClusterSharding.scala | 142 ++++++++++-------- ...terShardingCustomShardAllocationSpec.scala | 8 +- .../sharding/ClusterShardingFailureSpec.scala | 8 +- .../ClusterShardingGracefulShutdownSpec.scala | 8 +- .../sharding/ClusterShardingLeavingSpec.scala | 8 +- .../sharding/ClusterShardingSpec.scala | 28 ++-- .../project/migration-guide-2.3.x-2.4.x.rst | 3 + akka-docs/rst/scala/cluster-sharding.rst | 15 +- 8 files changed, 120 insertions(+), 100 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 4919747bea..58a520fef2 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -200,11 +200,12 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * * @param typeName the name of the entity type * @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion` - * @param role specifies that this entity type requires cluster nodes with a specific role. - * If the role is not specified all nodes in the cluster are used. * @param settings configuration settings, see [[ClusterShardingSettings]] - * @param shardResolver function to determine the shard id for an incoming message, only messages - * that passed the `idExtractor` will be used + * @param extractEntityId partial function to extract the entity id and the message to send to the + * entity from the incoming message, if the partial function does not match the message will + * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream + * @param extractShardId function to determine the shard id for an incoming message, only messages + * that passed the `extractEntityId` will be used * @param allocationStrategy possibility to use a custom shard allocation and * rebalancing logic * @param handOffStopMessage the message that will be sent to entities when they are to be stopped @@ -215,15 +216,15 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { typeName: String, entityProps: Props, settings: ClusterShardingSettings, - idExtractor: ShardRegion.IdExtractor, - shardResolver: ShardRegion.ShardResolver, + extractEntityId: ShardRegion.ExtractEntityId, + extractShardId: ShardRegion.ExtractShardId, allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any): ActorRef = { requireClusterRole(settings.role) implicit val timeout = system.settings.CreationTimeout val startMsg = Start(typeName, entityProps, settings, - idExtractor, shardResolver, allocationStrategy, handOffStopMessage) + extractEntityId, extractShardId, allocationStrategy, handOffStopMessage) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) regions.put(typeName, shardRegion) shardRegion @@ -243,29 +244,29 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * @param typeName the name of the entity type * @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion` * @param settings configuration settings, see [[ClusterShardingSettings]] - * @param idExtractor partial function to extract the entity id and the message to send to the + * @param extractEntityId partial function to extract the entity id and the message to send to the * entity from the incoming message, if the partial function does not match the message will * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream - * @param shardResolver function to determine the shard id for an incoming message, only messages - * that passed the `idExtractor` will be used + * @param extractShardId function to determine the shard id for an incoming message, only messages + * that passed the `extractEntityId` will be used * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard */ def start( typeName: String, entityProps: Props, settings: ClusterShardingSettings, - idExtractor: ShardRegion.IdExtractor, - shardResolver: ShardRegion.ShardResolver): ActorRef = { + extractEntityId: ShardRegion.ExtractEntityId, + extractShardId: ShardRegion.ExtractShardId): ActorRef = { val allocationStrategy = new LeastShardAllocationStrategy( settings.tuningParameters.leastShardAllocationRebalanceThreshold, settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance) - start(typeName, entityProps, settings, idExtractor, shardResolver, allocationStrategy, PoisonPill) + start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill) } /** - * Java API: Register a named entity type by defining the [[akka.actor.Props]] of the entity actor + * Java/Scala API: Register a named entity type by defining the [[akka.actor.Props]] of the entity actor * and functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor * for this type can later be retrieved with the [[#shardRegion]] method. * @@ -292,17 +293,17 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { handOffStopMessage: Any): ActorRef = { start(typeName, entityProps, settings, - idExtractor = { + extractEntityId = { case msg if messageExtractor.entityId(msg) ne null ⇒ (messageExtractor.entityId(msg), messageExtractor.entityMessage(msg)) }, - shardResolver = msg ⇒ messageExtractor.shardId(msg), + extractShardId = msg ⇒ messageExtractor.shardId(msg), allocationStrategy = allocationStrategy, handOffStopMessage = handOffStopMessage) } /** - * Java API: Register a named entity type by defining the [[akka.actor.Props]] of the entity actor + * Java/Scala API: Register a named entity type by defining the [[akka.actor.Props]] of the entity actor * and functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor * for this type can later be retrieved with the [[#shardRegion]] method. * @@ -344,29 +345,29 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * @param typeName the name of the entity type * @param role specifies that this entity type is located on cluster nodes with a specific role. * If the role is not specified all nodes in the cluster are used. - * @param idExtractor partial function to extract the entity id and the message to send to the + * @param extractEntityId partial function to extract the entity id and the message to send to the * entity from the incoming message, if the partial function does not match the message will * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream - * @param shardResolver function to determine the shard id for an incoming message, only messages - * that passed the `idExtractor` will be used + * @param extractShardId function to determine the shard id for an incoming message, only messages + * that passed the `extractEntityId` will be used * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard */ def startProxy( typeName: String, role: Option[String], - idExtractor: ShardRegion.IdExtractor, - shardResolver: ShardRegion.ShardResolver): ActorRef = { + extractEntityId: ShardRegion.ExtractEntityId, + extractShardId: ShardRegion.ExtractShardId): ActorRef = { implicit val timeout = system.settings.CreationTimeout val settings = ClusterShardingSettings(system).withRole(role) - val startMsg = StartProxy(typeName, settings, idExtractor, shardResolver) + val startMsg = StartProxy(typeName, settings, extractEntityId, extractShardId) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) regions.put(typeName, shardRegion) shardRegion } /** - * Java API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode, + * Java/Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode, * i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any * entity actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the * [[#shardRegion]] method. @@ -387,11 +388,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { messageExtractor: ShardRegion.MessageExtractor): ActorRef = { startProxy(typeName, Option(role.orElse(null)), - idExtractor = { + extractEntityId = { case msg if messageExtractor.entityId(msg) ne null ⇒ (messageExtractor.entityId(msg), messageExtractor.entityMessage(msg)) }, - shardResolver = msg ⇒ messageExtractor.shardId(msg)) + extractShardId = msg ⇒ messageExtractor.shardId(msg)) } @@ -413,11 +414,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { private[akka] object ClusterShardingGuardian { import ShardCoordinator.ShardAllocationStrategy final case class Start(typeName: String, entityProps: Props, settings: ClusterShardingSettings, - idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, + extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any) extends NoSerializationVerificationNeeded final case class StartProxy(typeName: String, settings: ClusterShardingSettings, - idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver) + extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId) extends NoSerializationVerificationNeeded final case class Started(shardRegion: ActorRef) extends NoSerializationVerificationNeeded } @@ -439,7 +440,7 @@ private[akka] class ClusterShardingGuardian extends Actor { (self.path / coordinatorSingletonManagerName(encName) / "singleton" / "coordinator").toStringWithoutAddress def receive = { - case Start(typeName, entityProps, settings, idExtractor, shardResolver, allocationStrategy, handOffStopMessage) ⇒ + case Start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, handOffStopMessage) ⇒ import settings.role import settings.tuningParameters.coordinatorFailureBackoff val encName = URLEncoder.encode(typeName, ByteString.UTF_8) @@ -463,14 +464,14 @@ private[akka] class ClusterShardingGuardian extends Actor { entityProps = entityProps, settings = settings, coordinatorPath = cPath, - idExtractor = idExtractor, - shardResolver = shardResolver, + extractEntityId = extractEntityId, + extractShardId = extractShardId, handOffStopMessage = handOffStopMessage), name = encName) } sender() ! Started(shardRegion) - case StartProxy(typeName, settings, idExtractor, shardResolver) ⇒ + case StartProxy(typeName, settings, extractEntityId, extractShardId) ⇒ val encName = URLEncoder.encode(typeName, ByteString.UTF_8) val cName = coordinatorSingletonManagerName(encName) val cPath = coordinatorPath(encName) @@ -479,8 +480,8 @@ private[akka] class ClusterShardingGuardian extends Actor { typeName = typeName, settings = settings, coordinatorPath = cPath, - idExtractor = idExtractor, - shardResolver = shardResolver), + extractEntityId = extractEntityId, + extractShardId = extractShardId), name = encName) } sender() ! Started(shardRegion) @@ -503,11 +504,11 @@ object ShardRegion { entityProps: Props, settings: ClusterShardingSettings, coordinatorPath: String, - idExtractor: ShardRegion.IdExtractor, - shardResolver: ShardRegion.ShardResolver, + extractEntityId: ShardRegion.ExtractEntityId, + extractShardId: ShardRegion.ExtractShardId, handOffStopMessage: Any): Props = - Props(new ShardRegion(typeName, Some(entityProps), settings, coordinatorPath, idExtractor, - shardResolver, handOffStopMessage)).withDeploy(Deploy.local) + Props(new ShardRegion(typeName, Some(entityProps), settings, coordinatorPath, extractEntityId, + extractShardId, handOffStopMessage)).withDeploy(Deploy.local) /** * INTERNAL API @@ -518,9 +519,9 @@ object ShardRegion { typeName: String, settings: ClusterShardingSettings, coordinatorPath: String, - idExtractor: ShardRegion.IdExtractor, - shardResolver: ShardRegion.ShardResolver): Props = - Props(new ShardRegion(typeName, None, settings, coordinatorPath, idExtractor, shardResolver, PoisonPill)) + extractEntityId: ShardRegion.ExtractEntityId, + extractShardId: ShardRegion.ExtractShardId): Props = + Props(new ShardRegion(typeName, None, settings, coordinatorPath, extractEntityId, extractShardId, PoisonPill)) .withDeploy(Deploy.local) /** @@ -545,14 +546,14 @@ object ShardRegion { * message to support wrapping in message envelope that is unwrapped before * sending to the entity actor. */ - type IdExtractor = PartialFunction[Msg, (EntityId, Msg)] + type ExtractEntityId = PartialFunction[Msg, (EntityId, Msg)] /** * Interface of the function used by the [[ShardRegion]] to * extract the shard id from an incoming message. - * Only messages that passed the [[IdExtractor]] will be used + * Only messages that passed the [[ExtractEntityId]] will be used * as input to this function. */ - type ShardResolver = Msg ⇒ ShardId + type ExtractShardId = Msg ⇒ ShardId /** * Java API: Interface of functions to extract entity id, @@ -579,6 +580,21 @@ object ShardRegion { def shardId(message: Any): String } + /** + * Convenience implementation of [[ShardRegion.MessageExtractor]] that + * construct `shardId` based on the `hashCode` of the `entityId`. The number + * of unique shards is limited by the given `maxNumberOfShards`. + */ + abstract class HashCodeMessageExtractor(maxNumberOfShards: Int) extends MessageExtractor { + /** + * Default implementation pass on the message as is. + */ + override def entityMessage(message: Any): Any = message + + override def shardId(message: Any): String = + (math.abs(entityId(message).hashCode) % maxNumberOfShards).toString + } + sealed trait ShardRegionCommand /** @@ -621,7 +637,7 @@ object ShardRegion { def getCurrentRegionsInstance = GetCurrentRegions /** - * Reply to [[GetCurrentRegions]] + * Reply to `GetCurrentRegions` */ @SerialVersionUID(1L) final case class CurrentRegions(regions: Set[Address]) { /** @@ -681,8 +697,8 @@ class ShardRegion( entityProps: Option[Props], settings: ClusterShardingSettings, coordinatorPath: String, - idExtractor: ShardRegion.IdExtractor, - shardResolver: ShardRegion.ShardResolver, + extractEntityId: ShardRegion.ExtractEntityId, + extractShardId: ShardRegion.ExtractShardId, handOffStopMessage: Any) extends Actor with ActorLogging { import ShardCoordinator.Internal._ @@ -743,12 +759,12 @@ class ShardRegion( } def receive = { - case Terminated(ref) ⇒ receiveTerminated(ref) - case evt: ClusterDomainEvent ⇒ receiveClusterEvent(evt) - case state: CurrentClusterState ⇒ receiveClusterState(state) - case msg: CoordinatorMessage ⇒ receiveCoordinatorMessage(msg) - case cmd: ShardRegionCommand ⇒ receiveCommand(cmd) - case msg if idExtractor.isDefinedAt(msg) ⇒ deliverMessage(msg, sender()) + case Terminated(ref) ⇒ receiveTerminated(ref) + case evt: ClusterDomainEvent ⇒ receiveClusterEvent(evt) + case state: CurrentClusterState ⇒ receiveClusterState(state) + case msg: CoordinatorMessage ⇒ receiveCoordinatorMessage(msg) + case cmd: ShardRegionCommand ⇒ receiveCommand(cmd) + case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender()) } def receiveClusterState(state: CurrentClusterState): Unit = { @@ -909,7 +925,7 @@ class ShardRegion( } def deliverMessage(msg: Any, snd: ActorRef): Unit = { - val shard = shardResolver(msg) + val shard = extractShardId(msg) regionByShard.get(shard) match { case Some(ref) if ref == self ⇒ getShard(shard).tell(msg, snd) @@ -947,8 +963,8 @@ class ShardRegion( id, props, settings, - idExtractor, - shardResolver, + extractEntityId, + extractShardId, handOffStopMessage), name)) shards = shards.updated(id, shard) @@ -1019,10 +1035,10 @@ private[akka] object Shard { shardId: ShardRegion.ShardId, entityProps: Props, settings: ClusterShardingSettings, - idExtractor: ShardRegion.IdExtractor, - shardResolver: ShardRegion.ShardResolver, + extractEntityId: ShardRegion.ExtractEntityId, + extractShardId: ShardRegion.ExtractShardId, handOffStopMessage: Any): Props = - Props(new Shard(typeName, shardId, entityProps, settings, idExtractor, shardResolver, handOffStopMessage)) + Props(new Shard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage)) .withDeploy(Deploy.local) } @@ -1039,8 +1055,8 @@ private[akka] class Shard( shardId: ShardRegion.ShardId, entityProps: Props, settings: ClusterShardingSettings, - idExtractor: ShardRegion.IdExtractor, - shardResolver: ShardRegion.ShardResolver, + extractEntityId: ShardRegion.ExtractEntityId, + extractShardId: ShardRegion.ExtractShardId, handOffStopMessage: Any) extends PersistentActor with ActorLogging { import ShardRegion.{ handOffStopperProps, EntityId, Msg, Passivate } @@ -1097,7 +1113,7 @@ private[akka] class Shard( case msg: ShardCommand ⇒ receiveShardCommand(msg) case msg: ShardRegionCommand ⇒ receiveShardRegionCommand(msg) case PersistenceFailure(payload: StateChange, _, _) ⇒ persistenceFailure(payload) - case msg if idExtractor.isDefinedAt(msg) ⇒ deliverMessage(msg, sender()) + case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender()) } def receiveShardCommand(msg: ShardCommand): Unit = msg match { @@ -1221,7 +1237,7 @@ private[akka] class Shard( } def deliverMessage(msg: Any, snd: ActorRef): Unit = { - val (id, payload) = idExtractor(msg) + val (id, payload) = extractEntityId(msg) if (id == null || id == "") { log.warning("Id must not be empty, dropping message [{}]", msg.getClass.getName) context.system.deadLetters ! msg diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala index 37b5ce3935..92c992183c 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala @@ -52,11 +52,11 @@ object ClusterShardingCustomShardAllocationSpec extends MultiNodeConfig { } } - val idExtractor: ShardRegion.IdExtractor = { + val extractEntityId: ShardRegion.ExtractEntityId = { case id: Int ⇒ (id.toString, id) } - val shardResolver: ShardRegion.ShardResolver = msg ⇒ msg match { + val extractShardId: ShardRegion.ExtractShardId = msg ⇒ msg match { case id: Int ⇒ id.toString } @@ -136,8 +136,8 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar typeName = "Entity", entityProps = Props[Entity], settings = ClusterShardingSettings(system), - idExtractor = idExtractor, - shardResolver = shardResolver, + extractEntityId = extractEntityId, + extractShardId = extractShardId, allocationStrategy = TestAllocationStrategy(allocator), handOffStopMessage = PoisonPill) } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala index 063083b652..458e673e7e 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala @@ -62,12 +62,12 @@ object ClusterShardingFailureSpec extends MultiNodeConfig { } } - val idExtractor: ShardRegion.IdExtractor = { + val extractEntityId: ShardRegion.ExtractEntityId = { case m @ Get(id) ⇒ (id, m) case m @ Add(id, _) ⇒ (id, m) } - val shardResolver: ShardRegion.ShardResolver = { + val extractShardId: ShardRegion.ExtractShardId = { case Get(id) ⇒ id.charAt(0).toString case Add(id, _) ⇒ id.charAt(0).toString } @@ -113,8 +113,8 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe typeName = "Entity", entityProps = Props[Entity], settings = ClusterShardingSettings(system).withRememberEntities(true), - idExtractor = idExtractor, - shardResolver = shardResolver) + extractEntityId = extractEntityId, + extractShardId = extractShardId) } lazy val region = ClusterSharding(system).shardRegion("Entity") diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala index afb9ca8ea2..e83e60f860 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala @@ -56,11 +56,11 @@ object ClusterShardingGracefulShutdownSpec extends MultiNodeConfig { } } - val idExtractor: ShardRegion.IdExtractor = { + val extractEntityId: ShardRegion.ExtractEntityId = { case id: Int ⇒ (id.toString, id) } - val shardResolver: ShardRegion.ShardResolver = msg ⇒ msg match { + val extractShardId: ShardRegion.ExtractShardId = msg ⇒ msg match { case id: Int ⇒ id.toString } @@ -123,8 +123,8 @@ class ClusterShardingGracefulShutdownSpec extends MultiNodeSpec(ClusterShardingG typeName = "Entity", entityProps = Props[Entity], settings = ClusterShardingSettings(system), - idExtractor = idExtractor, - shardResolver = shardResolver, + extractEntityId = extractEntityId, + extractShardId = extractShardId, allocationStrategy, handOffStopMessage = StopEntity) } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala index 78f5a675e0..8cc98a7e58 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala @@ -68,11 +68,11 @@ object ClusterShardingLeavingSpec extends MultiNodeConfig { } } - val idExtractor: ShardRegion.IdExtractor = { + val extractEntityId: ShardRegion.ExtractEntityId = { case m @ Ping(id) ⇒ (id, m) } - val shardResolver: ShardRegion.ShardResolver = { + val extractShardId: ShardRegion.ExtractShardId = { case Ping(id: String) ⇒ id.charAt(0).toString } @@ -125,8 +125,8 @@ class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpe typeName = "Entity", entityProps = Props[Entity], settings = ClusterShardingSettings(system), - idExtractor = idExtractor, - shardResolver = shardResolver) + extractEntityId = extractEntityId, + extractShardId = extractShardId) } lazy val region = ClusterSharding(system).shardRegion("Entity") diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index a19c0008be..8c29ad39f3 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -111,14 +111,14 @@ object ClusterShardingSpec extends MultiNodeConfig { } //#counter-actor - val idExtractor: ShardRegion.IdExtractor = { + val extractEntityId: ShardRegion.ExtractEntityId = { case EntityEnvelope(id, payload) ⇒ (id.toString, payload) case msg @ Get(id) ⇒ (id.toString, msg) } val numberOfShards = 12 - val shardResolver: ShardRegion.ShardResolver = { + val extractShardId: ShardRegion.ExtractShardId = { case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString case Get(id) ⇒ (id % numberOfShards).toString } @@ -130,14 +130,14 @@ object ClusterShardingDocCode { import ClusterShardingSpec._ //#counter-extractor - val idExtractor: ShardRegion.IdExtractor = { + val extractEntityId: ShardRegion.ExtractEntityId = { case EntityEnvelope(id, payload) ⇒ (id.toString, payload) case msg @ Get(id) ⇒ (id.toString, msg) } val numberOfShards = 100 - val shardResolver: ShardRegion.ShardResolver = { + val extractShardId: ShardRegion.ExtractShardId = { case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString case Get(id) ⇒ (id % numberOfShards).toString } @@ -221,8 +221,8 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult entityProps = Props[Counter], settings = settings, coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator", - idExtractor = idExtractor, - shardResolver = shardResolver, + extractEntityId = extractEntityId, + extractShardId = extractShardId, handOffStopMessage = PoisonPill), name = typeName + "Region") } @@ -345,8 +345,8 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult typeName = "counter", settings, coordinatorPath = "/user/counterCoordinator/singleton/coordinator", - idExtractor = idExtractor, - shardResolver = shardResolver), + extractEntityId = extractEntityId, + extractShardId = extractShardId), name = "regionProxy") proxy ! Get(1) @@ -512,15 +512,15 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult typeName = "Counter", entityProps = Props[Counter], settings = ClusterShardingSettings(system), - idExtractor = idExtractor, - shardResolver = shardResolver) + extractEntityId = extractEntityId, + extractShardId = extractShardId) //#counter-start ClusterSharding(system).start( typeName = "AnotherCounter", entityProps = Props[Counter], settings = ClusterShardingSettings(system), - idExtractor = idExtractor, - shardResolver = shardResolver) + extractEntityId = extractEntityId, + extractShardId = extractShardId) } enterBarrier("extension-started") runOn(fifth) { @@ -560,8 +560,8 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult typeName = "ApiTest", entityProps = Props[Counter], settings = ClusterShardingSettings(system), - idExtractor = idExtractor, - shardResolver = shardResolver) + extractEntityId = extractEntityId, + extractShardId = extractShardId) val counterRegionViaGet: ActorRef = ClusterSharding(system).shardRegion("ApiTest") diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index b5819ddfcc..1d150d9703 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -241,6 +241,9 @@ of the ``ClusterSharding`` extension instead of the optional ``entryProps`` para Entry was renamed to Entity, for example in the ``MessagesExtractor`` in the Java API and the ``EntityId`` type in the Scala API. +``idExtractor`` function was renamed to ``extractEntityId``. ``shardResolver`` function +was renamed to ``extractShardId``. + ClusterSingletonManager and ClusterSingletonProxy construction ============================================================== diff --git a/akka-docs/rst/scala/cluster-sharding.rst b/akka-docs/rst/scala/cluster-sharding.rst index 733bded617..6793d62793 100644 --- a/akka-docs/rst/scala/cluster-sharding.rst +++ b/akka-docs/rst/scala/cluster-sharding.rst @@ -57,7 +57,7 @@ The message sent to the entity actor is what ``entityMessage`` returns and that if needed. A shard is a group of entities that will be managed together. The grouping is defined by the -``shardResolver`` function shown above. For a specific entity identifier the shard identifier must always +``extractShardId`` function shown above. For a specific entity identifier the shard identifier must always be the same. Otherwise the entity actor might accidentally be started in several places at the same time. Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution, @@ -68,8 +68,9 @@ overhead, and increased latency because the coordinator is involved in the routi shard. The sharding algorithm must be the same on all nodes in a running cluster. It can be changed after stopping all nodes in the cluster. -A simple sharding algorithm that works fine in most cases is to take the ``hashCode`` of the entity identifier modulo -number of shards. +A simple sharding algorithm that works fine in most cases is to take the absolute value of the ``hashCode`` of +the entity identifier modulo number of shards. As a convenience this is provided by the +``ShardRegion.HashCodeMessageExtractor``. Messages to the entities are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a named entity type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``. @@ -98,7 +99,7 @@ method. ``ClusterSharding.start`` gives you the reference which you can pass alo .. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala#counter-start -The ``idExtractor`` and ``shardResolver`` are two application specific functions to extract the entity +The ``extractEntityId`` and ``extractShardId`` are two application specific functions to extract the entity identifier and the shard identifier from incoming messages. .. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala#counter-extractor @@ -109,12 +110,12 @@ This example illustrates two different ways to define the entity identifier in t * The ``EntityEnvelope`` holds the identifier, and the actual message that is sent to the entity actor is wrapped in the envelope. -Note how these two messages types are handled in the ``idExtractor`` function shown above. -The message sent to the entity actor is the second part of the tuple return by the ``idExtractor`` and that makes it +Note how these two messages types are handled in the ``extractEntityId`` function shown above. +The message sent to the entity actor is the second part of the tuple return by the ``extractEntityId`` and that makes it possible to unwrap envelopes if needed. A shard is a group of entities that will be managed together. The grouping is defined by the -``shardResolver`` function shown above. For a specific entity identifier the shard identifier must always +``extractShardId`` function shown above. For a specific entity identifier the shard identifier must always be the same. Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution,