From c1d5221bf8f6b7aaf25d7ceb6b66a9f8c1af4461 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 4 Jun 2015 22:54:26 +0200 Subject: [PATCH] !clt #17364 Make ShardRegion.props internal * the extension api is enough * simplify how roles are specified * make proxy mode more explicit by seperate method `startProxy` --- .../src/main/resources/reference.conf | 6 - .../cluster/sharding/ClusterSharding.scala | 270 ++++++++++-------- ...terShardingCustomShardAllocationSpec.scala | 4 +- .../sharding/ClusterShardingFailureSpec.scala | 4 +- .../ClusterShardingGracefulShutdownSpec.scala | 4 +- .../sharding/ClusterShardingLeavingSpec.scala | 4 +- .../sharding/ClusterShardingSpec.scala | 12 +- .../project/migration-guide-2.3.x-2.4.x.rst | 9 + akka-docs/rst/scala/cluster-sharding.rst | 5 +- 9 files changed, 168 insertions(+), 150 deletions(-) diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index c8499c14b1..92447dfaad 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -18,12 +18,6 @@ akka.cluster.sharding { # and started again after this duration. coordinator-failure-backoff = 10 s - # Start the coordinator singleton manager on members tagged with this role. - # All members are used if undefined or empty. - # ShardRegion actor is started in proxy only mode on nodes that are not tagged - # with this role. - role = "" - # The ShardRegion retries registration and shard location requests to the # ShardCoordinator with this interval if it does not reply. retry-interval = 2 s diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index ce01d33609..a39c441716 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -43,6 +43,7 @@ import scala.concurrent.Future import akka.dispatch.ExecutionContexts import akka.pattern.pipe import scala.util.Success +import akka.util.ByteString /** * This extension provides sharding functionality of actors in a cluster. @@ -175,18 +176,13 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { import ShardCoordinator.LeastShardAllocationStrategy private val cluster = Cluster(system) + /** * INTERNAL API */ private[akka] object Settings { val config = system.settings.config.getConfig("akka.cluster.sharding") - val Role: Option[String] = config.getString("role") match { - case "" ⇒ None - case r ⇒ Some(r) - } - def hasNecessaryClusterRole(role: Option[String]): Boolean = role.forall(cluster.selfRoles.contains) - val GuardianName: String = config.getString("guardian-name") val CoordinatorFailureBackoff = config.getDuration("coordinator-failure-backoff", MILLISECONDS).millis val RetryInterval: FiniteDuration = config.getDuration("retry-interval", MILLISECONDS).millis @@ -202,10 +198,15 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { val LeastShardAllocationMaxSimultaneousRebalance: Int = config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance") } + import Settings._ private val regions: ConcurrentHashMap[String, ActorRef] = new ConcurrentHashMap private lazy val guardian = system.actorOf(Props[ClusterShardingGuardian], Settings.GuardianName) + private[akka] def requireClusterRole(role: Option[String]): Unit = + require(role.forall(cluster.selfRoles.contains), + s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]") + /** * Scala API: Register a named entry type by defining the [[akka.actor.Props]] of the entry actor * and functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor @@ -215,12 +216,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * of the `reference.conf`. * * @param typeName the name of the entry type - * @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion`, - * if not defined (None) the `ShardRegion` on this node will run in proxy only mode, i.e. - * it will delegate messages to other `ShardRegion` actors on other nodes, but not host any - * entry actors itself - * @param roleOverride specifies that this entry type requires cluster nodes with a specific role. - * if not defined (None), then defaults to standard behavior of using Role (if any) from configuration + * @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion` + * @param role specifies that this entry type requires cluster nodes with a specific role. + * If the role is not specified all nodes in the cluster are used. * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. * @param idExtractor partial function to extract the entry id and the message to send to the @@ -236,18 +234,17 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { */ def start( typeName: String, - entryProps: Option[Props], - roleOverride: Option[String], + entryProps: Props, + role: Option[String], rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any): ActorRef = { - val resolvedRole = if (roleOverride == None) Role else roleOverride - + requireClusterRole(role) implicit val timeout = system.settings.CreationTimeout - val startMsg = Start(typeName, entryProps, roleOverride, rememberEntries, + val startMsg = Start(typeName, entryProps, role, rememberEntries, idExtractor, shardResolver, allocationStrategy, handOffStopMessage) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) regions.put(typeName, shardRegion) @@ -266,12 +263,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * of the `reference.conf`. * * @param typeName the name of the entry type - * @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion`, - * if not defined (None) the `ShardRegion` on this node will run in proxy only mode, i.e. - * it will delegate messages to other `ShardRegion` actors on other nodes, but not host any - * entry actors itself - * @param roleOverride specifies that this entry type requires cluster nodes with a specific role. - * if not defined (None), then defaults to standard behavior of using Role (if any) from configuration + * @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion` + * @param role specifies that this entry type requires cluster nodes with a specific role. + * If the role is not specified all nodes in the cluster are used. * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. * @param idExtractor partial function to extract the entry id and the message to send to the @@ -283,13 +277,13 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { */ def start( typeName: String, - entryProps: Option[Props], - roleOverride: Option[String], + entryProps: Props, + role: Option[String], rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver): ActorRef = { - start(typeName, entryProps, roleOverride, rememberEntries, idExtractor, shardResolver, + start(typeName, entryProps, role, rememberEntries, idExtractor, shardResolver, new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance), PoisonPill) } @@ -303,12 +297,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * of the `reference.conf`. * * @param typeName the name of the entry type - * @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion`, - * if not defined (null) the `ShardRegion` on this node will run in proxy only mode, i.e. - * it will delegate messages to other `ShardRegion` actors on other nodes, but not host any - * entry actors itself - * @param roleOverride specifies that this entry type requires cluster nodes with a specific role. - * if not defined (None), then defaults to standard behavior of using Role (if any) from configuration + * @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion` + * @param role specifies that this entry type requires cluster nodes with a specific role. + * If the role is not specified all nodes in the cluster are used. * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. * @param messageExtractor functions to extract the entry id, shard id, and the message to send to the @@ -322,13 +313,13 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def start( typeName: String, entryProps: Props, - roleOverride: Option[String], + role: Option[String], rememberEntries: Boolean, messageExtractor: ShardRegion.MessageExtractor, allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any): ActorRef = { - start(typeName, entryProps = Option(entryProps), roleOverride, rememberEntries, + start(typeName, entryProps, role, rememberEntries, idExtractor = { case msg if messageExtractor.entryId(msg) ne null ⇒ (messageExtractor.entryId(msg), messageExtractor.entryMessage(msg)) @@ -350,12 +341,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * of the `reference.conf`. * * @param typeName the name of the entry type - * @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion`, - * if not defined (null) the `ShardRegion` on this node will run in proxy only mode, i.e. - * it will delegate messages to other `ShardRegion` actors on other nodes, but not host any - * entry actors itself - * @param roleOverride specifies that this entry type requires cluster nodes with a specific role. - * if not defined (None), then defaults to standard behavior of using Role (if any) from configuration + * @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion` + * @param role specifies that this entry type requires cluster nodes with a specific role. + * If the role is not specified all nodes in the cluster are used. * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. * @param messageExtractor functions to extract the entry id, shard id, and the message to send to the @@ -365,15 +353,77 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def start( typeName: String, entryProps: Props, - roleOverride: Option[String], + role: Option[String], rememberEntries: Boolean, messageExtractor: ShardRegion.MessageExtractor): ActorRef = { - start(typeName, entryProps, roleOverride, rememberEntries, messageExtractor, + start(typeName, entryProps, role, rememberEntries, messageExtractor, new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance), PoisonPill) } + /** + * Scala API: Register a named entry type `ShardRegion` on this node that will run in proxy only mode, + * i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any + * entry actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the + * [[#shardRegion]] method. + * + * Some settings can be configured as described in the `akka.cluster.sharding` section + * of the `reference.conf`. + * + * @param typeName the name of the entry type + * @param role specifies that this entry type is located on cluster nodes with a specific role. + * If the role is not specified all nodes in the cluster are used. + * @param idExtractor partial function to extract the entry id and the message to send to the + * entry from the incoming message, if the partial function does not match the message will + * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream + * @param shardResolver function to determine the shard id for an incoming message, only messages + * that passed the `idExtractor` will be used + * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard + */ + def startProxy( + typeName: String, + role: Option[String], + idExtractor: ShardRegion.IdExtractor, + shardResolver: ShardRegion.ShardResolver): ActorRef = { + + implicit val timeout = system.settings.CreationTimeout + val startMsg = StartProxy(typeName, role, idExtractor, shardResolver) + val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) + regions.put(typeName, shardRegion) + shardRegion + } + + /** + * Java API: Register a named entry type `ShardRegion` on this node that will run in proxy only mode, + * i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any + * entry actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the + * [[#shardRegion]] method. + * + * Some settings can be configured as described in the `akka.cluster.sharding` section + * of the `reference.conf`. + * + * @param typeName the name of the entry type + * @param role specifies that this entry type is located on cluster nodes with a specific role. + * If the role is not specified all nodes in the cluster are used. + * @param messageExtractor functions to extract the entry id, shard id, and the message to send to the + * entry from the incoming message + * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard + */ + def startProxy( + typeName: String, + role: Option[String], + messageExtractor: ShardRegion.MessageExtractor): ActorRef = { + + startProxy(typeName, role, + idExtractor = { + case msg if messageExtractor.entryId(msg) ne null ⇒ + (messageExtractor.entryId(msg), messageExtractor.entryMessage(msg)) + }, + shardResolver = msg ⇒ messageExtractor.shardId(msg)) + + } + /** * Retrieve the actor reference of the [[ShardRegion]] actor responsible for the named entry type. * The entry type must be registered with the [[#start]] method before it can be used here. @@ -391,10 +441,13 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { */ private[akka] object ClusterShardingGuardian { import ShardCoordinator.ShardAllocationStrategy - final case class Start(typeName: String, entryProps: Option[Props], role: Option[String], rememberEntries: Boolean, + final case class Start(typeName: String, entryProps: Props, role: Option[String], rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any) extends NoSerializationVerificationNeeded + final case class StartProxy(typeName: String, role: Option[String], + idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver) + extends NoSerializationVerificationNeeded final case class Started(shardRegion: ActorRef) extends NoSerializationVerificationNeeded } @@ -409,14 +462,19 @@ private[akka] class ClusterShardingGuardian extends Actor { val sharding = ClusterSharding(context.system) import sharding.Settings._ + private def coordinatorSingletonManagerName(encName: String): String = + encName + "Coordinator" + + private def coordinatorPath(encName: String): String = + (self.path / coordinatorSingletonManagerName(encName) / "singleton" / "coordinator").toStringWithoutAddress + def receive = { case Start(typeName, entryProps, role, rememberEntries, idExtractor, shardResolver, allocationStrategy, handOffStopMessage) ⇒ - val encName = URLEncoder.encode(typeName, "utf-8") - val coordinatorSingletonManagerName = encName + "Coordinator" - val coordinatorPath = (self.path / coordinatorSingletonManagerName / "singleton" / "coordinator").toStringWithoutAddress + val encName = URLEncoder.encode(typeName, ByteString.UTF_8) + val cName = coordinatorSingletonManagerName(encName) + val cPath = coordinatorPath(encName) val shardRegion = context.child(encName).getOrElse { - val hasRequiredRole = hasNecessaryClusterRole(role) - if (hasRequiredRole && context.child(coordinatorSingletonManagerName).isEmpty) { + if (context.child(cName).isEmpty) { val coordinatorProps = ShardCoordinator.props(handOffTimeout = HandOffTimeout, shardStartTimeout = ShardStartTimeout, rebalanceInterval = RebalanceInterval, snapshotInterval = SnapshotInterval, allocationStrategy) val singletonProps = ShardCoordinatorSupervisor.props(CoordinatorFailureBackoff, coordinatorProps) @@ -426,14 +484,14 @@ private[akka] class ClusterShardingGuardian extends Actor { singletonProps, terminationMessage = PoisonPill, singletonSettings), - name = coordinatorSingletonManagerName) + name = cName) } context.actorOf(ShardRegion.props( typeName = typeName, - entryProps = if (hasRequiredRole) entryProps else None, + entryProps = entryProps, role = role, - coordinatorPath = coordinatorPath, + coordinatorPath = cPath, retryInterval = RetryInterval, snapshotInterval = SnapshotInterval, shardFailureBackoff = ShardFailureBackoff, @@ -447,6 +505,23 @@ private[akka] class ClusterShardingGuardian extends Actor { } sender() ! Started(shardRegion) + case StartProxy(typeName, role, idExtractor, shardResolver) ⇒ + val encName = URLEncoder.encode(typeName, ByteString.UTF_8) + val cName = coordinatorSingletonManagerName(encName) + val cPath = coordinatorPath(encName) + val shardRegion = context.child(encName).getOrElse { + context.actorOf(ShardRegion.proxyProps( + typeName = typeName, + role = role, + coordinatorPath = cPath, + retryInterval = RetryInterval, + bufferSize = BufferSize, + idExtractor = idExtractor, + shardResolver = shardResolver), + name = encName) + } + sender() ! Started(shardRegion) + } } @@ -457,11 +532,12 @@ private[akka] class ClusterShardingGuardian extends Actor { object ShardRegion { /** - * Scala API: Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor. + * INTERNAL API + * Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor. */ - def props( + private[akka] def props( typeName: String, - entryProps: Option[Props], + entryProps: Props, role: Option[String], coordinatorPath: String, retryInterval: FiniteDuration, @@ -473,61 +549,15 @@ object ShardRegion { idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, handOffStopMessage: Any): Props = - Props(new ShardRegion(typeName, entryProps, role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff, + Props(new ShardRegion(typeName, Some(entryProps), role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver, handOffStopMessage)) /** - * Scala API: Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor. - */ - def props( - typeName: String, - entryProps: Props, - role: Option[String], - coordinatorPath: String, - retryInterval: FiniteDuration, - shardFailureBackoff: FiniteDuration, - entryRestartBackoff: FiniteDuration, - snapshotInterval: FiniteDuration, - bufferSize: Int, - rememberEntries: Boolean, - idExtractor: ShardRegion.IdExtractor, - shardResolver: ShardRegion.ShardResolver, - handOffStopMessage: Any): Props = - props(typeName, Some(entryProps), role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff, - snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver, handOffStopMessage) - - /** - * Java API: Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor. - */ - def props( - typeName: String, - entryProps: Props, - role: String, - coordinatorPath: String, - retryInterval: FiniteDuration, - shardFailureBackoff: FiniteDuration, - entryRestartBackoff: FiniteDuration, - snapshotInterval: FiniteDuration, - bufferSize: Int, - rememberEntries: Boolean, - messageExtractor: ShardRegion.MessageExtractor, - handOffStopMessage: Any): Props = { - - props(typeName, Option(entryProps), roleOption(role), coordinatorPath, retryInterval, shardFailureBackoff, - entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries, - idExtractor = { - case msg if messageExtractor.entryId(msg) ne null ⇒ - (messageExtractor.entryId(msg), messageExtractor.entryMessage(msg)) - }: ShardRegion.IdExtractor, - shardResolver = msg ⇒ messageExtractor.shardId(msg), - handOffStopMessage = handOffStopMessage) - } - - /** - * Scala API: Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor + * INTERNAL API + * Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor * when using it in proxy only mode. */ - def proxyProps( + private[akka] def proxyProps( typeName: String, role: Option[String], coordinatorPath: String, @@ -538,25 +568,6 @@ object ShardRegion { Props(new ShardRegion(typeName, None, role, coordinatorPath, retryInterval, Duration.Zero, Duration.Zero, Duration.Zero, bufferSize, false, idExtractor, shardResolver, PoisonPill)) - /** - * Java API: : Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor - * when using it in proxy only mode. - */ - def proxyProps( - typeName: String, - role: String, - coordinatorPath: String, - retryInterval: FiniteDuration, - bufferSize: Int, - messageExtractor: ShardRegion.MessageExtractor): Props = { - - proxyProps(typeName, roleOption(role), coordinatorPath, retryInterval, bufferSize, idExtractor = { - case msg if messageExtractor.entryId(msg) ne null ⇒ - (messageExtractor.entryId(msg), messageExtractor.entryMessage(msg)) - }, - shardResolver = msg ⇒ messageExtractor.shardId(msg)) - } - /** * Marker type of entry identifier (`String`). */ @@ -1289,9 +1300,10 @@ private[akka] class Shard( } /** + * INTERNAL API * @see [[ClusterSharding$ ClusterSharding extension]] */ -object ShardCoordinatorSupervisor { +private[akka] object ShardCoordinatorSupervisor { /** * Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor. */ @@ -1304,7 +1316,10 @@ object ShardCoordinatorSupervisor { private case object StartCoordinator } -class ShardCoordinatorSupervisor(failureBackoff: FiniteDuration, coordinatorProps: Props) extends Actor { +/** + * INTERNAL API + */ +private[akka] class ShardCoordinatorSupervisor(failureBackoff: FiniteDuration, coordinatorProps: Props) extends Actor { import ShardCoordinatorSupervisor._ def startCoordinator(): Unit = { @@ -1330,10 +1345,11 @@ object ShardCoordinator { import ShardRegion.ShardId /** + * INTERNAL API * Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor. */ - def props(handOffTimeout: FiniteDuration, shardStartTimeout: FiniteDuration, rebalanceInterval: FiniteDuration, snapshotInterval: FiniteDuration, - allocationStrategy: ShardAllocationStrategy): Props = + private[akka] def props(handOffTimeout: FiniteDuration, shardStartTimeout: FiniteDuration, rebalanceInterval: FiniteDuration, snapshotInterval: FiniteDuration, + allocationStrategy: ShardAllocationStrategy): Props = Props(new ShardCoordinator(handOffTimeout, shardStartTimeout, rebalanceInterval, snapshotInterval, allocationStrategy)) /** diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala index 327f00a2ab..65f35d23e7 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala @@ -134,8 +134,8 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar def startSharding(): Unit = { ClusterSharding(system).start( typeName = "Entity", - entryProps = Some(Props[Entity]), - roleOverride = None, + entryProps = Props[Entity], + role = None, rememberEntries = false, idExtractor = idExtractor, shardResolver = shardResolver, diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala index 3e501074e1..7448c73c4b 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala @@ -111,8 +111,8 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe def startSharding(): Unit = { ClusterSharding(system).start( typeName = "Entity", - entryProps = Some(Props[Entity]), - roleOverride = None, + entryProps = Props[Entity], + role = None, rememberEntries = true, idExtractor = idExtractor, shardResolver = shardResolver) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala index a3c5406f9a..5530f32794 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala @@ -121,8 +121,8 @@ class ClusterShardingGracefulShutdownSpec extends MultiNodeSpec(ClusterShardingG val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) ClusterSharding(system).start( typeName = "Entity", - entryProps = Some(Props[Entity]), - roleOverride = None, + entryProps = Props[Entity], + role = None, rememberEntries = false, idExtractor = idExtractor, shardResolver = shardResolver, diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala index 61e7407c7e..e39fbe99eb 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala @@ -123,8 +123,8 @@ class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpe def startSharding(): Unit = { ClusterSharding(system).start( typeName = "Entity", - entryProps = Some(Props[Entity]), - roleOverride = None, + entryProps = Props[Entity], + role = None, rememberEntries = false, idExtractor = idExtractor, shardResolver = shardResolver) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index ba9748aad2..c00d9bb70d 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -490,16 +490,16 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult //#counter-start val counterRegion: ActorRef = ClusterSharding(system).start( typeName = "Counter", - entryProps = Some(Props[Counter]), - roleOverride = None, + entryProps = Props[Counter], + role = None, rememberEntries = false, idExtractor = idExtractor, shardResolver = shardResolver) //#counter-start ClusterSharding(system).start( typeName = "AnotherCounter", - entryProps = Some(Props[Counter]), - roleOverride = None, + entryProps = Props[Counter], + role = None, rememberEntries = false, idExtractor = idExtractor, shardResolver = shardResolver) @@ -540,8 +540,8 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult runOn(first) { val counterRegionViaStart: ActorRef = ClusterSharding(system).start( typeName = "ApiTest", - entryProps = Some(Props[Counter]), - roleOverride = None, + entryProps = Props[Counter], + role = None, rememberEntries = false, idExtractor = idExtractor, shardResolver = shardResolver) diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index ab67803bb2..7a54fd55c8 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -227,6 +227,15 @@ The classes changed package name from ``akka.contrib.pattern`` to ``akka.cluster The configuration properties changed name to ``akka.cluster.sharding``. +ClusterSharding construction +============================ + +Role is defined as parameter to the ``start`` method of the ``ClusterSharding`` extension +instead of in configuration, so that it can be defined per entry type. + +Starting the ``ShardRegion`` in proxy mode is now done with the ``startProxy`` method +of the ``ClusterSharding`` extension instead of the optional ``entryProps`` parameter. + ClusterSingletonManager and ClusterSingletonProxy construction ============================================================== diff --git a/akka-docs/rst/scala/cluster-sharding.rst b/akka-docs/rst/scala/cluster-sharding.rst index a8d718405d..f692cf8158 100644 --- a/akka-docs/rst/scala/cluster-sharding.rst +++ b/akka-docs/rst/scala/cluster-sharding.rst @@ -239,9 +239,8 @@ Proxy Only Mode The ``ShardRegion`` actor can also be started in proxy only mode, i.e. it will not host any entries itself, but knows how to delegate messages to the right location. -A ``ShardRegion`` starts in proxy only mode if the roles of the node does not include -the node role specified in ``akka.contrib.cluster.sharding.role`` config property -or if the specified `entryProps` is ``None`` / ``null``. +A ``ShardRegion`` is started in proxy only mode with the method ``ClusterSharding.startProxy`` +method. Passivation -----------