From aa59bfdcf68bf9bb96bd0878a13d595266bfeaf3 Mon Sep 17 00:00:00 2001 From: rmarsch Date: Fri, 24 Oct 2014 12:59:58 -0400 Subject: [PATCH] !con #16123 Expand cluster roles support in ClusterSharding Allow a roleOverride: Option[String] to be used when starting ClusterSharding for a given entry type. This will allow role defined clusters of ClusterSharding for entry types instead of requiring the role configuration to be all or nothing across all entry types. --- .../contrib/pattern/ClusterSharding.scala | 54 ++++++++++++------- .../pattern/ClusterShardingFailureSpec.scala | 1 + .../contrib/pattern/ClusterShardingSpec.scala | 3 ++ .../contrib/pattern/ClusterShardingTest.java | 4 +- 4 files changed, 42 insertions(+), 20 deletions(-) diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index e4f61b4d79..5ff64f54b3 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -174,11 +174,13 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { */ private[akka] object Settings { val config = system.settings.config.getConfig("akka.contrib.cluster.sharding") + val Role: Option[String] = config.getString("role") match { case "" ⇒ None case r ⇒ Some(r) } - val HasNecessaryClusterRole: Boolean = Role.forall(cluster.selfRoles.contains) + def hasNecessaryClusterRole(role: Option[String]): Boolean = role.forall(cluster.selfRoles.contains) + val GuardianName: String = config.getString("guardian-name") val CoordinatorFailureBackoff = config.getDuration("coordinator-failure-backoff", MILLISECONDS).millis val RetryInterval: FiniteDuration = config.getDuration("retry-interval", MILLISECONDS).millis @@ -201,7 +203,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { /** * Scala API: Register a named entry type by defining the [[akka.actor.Props]] of the entry actor * and functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor - * for this type can later be retrieved with the [[#shardRegion]] method. + * for this type can later be retrieved with the [[#shardRegion]] method. * * Some settings can be configured as described in the `akka.contrib.cluster.sharding` section * of the `reference.conf`. @@ -211,6 +213,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * if not defined (None) the `ShardRegion` on this node will run in proxy only mode, i.e. * it will delegate messages to other `ShardRegion` actors on other nodes, but not host any * entry actors itself + * @param roleOverride specifies that this entry type requires cluster nodes with a specific role. + * if not defined (None), then defaults to standard behavior of using Role (if any) from configuration * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. * @param idExtractor partial function to extract the entry id and the message to send to the @@ -225,13 +229,16 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def start( typeName: String, entryProps: Option[Props], + roleOverride: Option[String], rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, allocationStrategy: ShardAllocationStrategy): ActorRef = { + val resolvedRole = if (roleOverride == None) Role else roleOverride + implicit val timeout = system.settings.CreationTimeout - val startMsg = Start(typeName, entryProps, rememberEntries, idExtractor, shardResolver, allocationStrategy) + val startMsg = Start(typeName, entryProps, roleOverride, rememberEntries, idExtractor, shardResolver, allocationStrategy) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) regions.put(typeName, shardRegion) shardRegion @@ -240,7 +247,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { /** * Register a named entry type by defining the [[akka.actor.Props]] of the entry actor and * functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor - * for this type can later be retrieved with the [[#shardRegion]] method. + * for this type can later be retrieved with the [[#shardRegion]] method. * * The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]] * is used. @@ -253,6 +260,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * if not defined (None) the `ShardRegion` on this node will run in proxy only mode, i.e. * it will delegate messages to other `ShardRegion` actors on other nodes, but not host any * entry actors itself + * @param roleOverride specifies that this entry type requires cluster nodes with a specific role. + * if not defined (None), then defaults to standard behavior of using Role (if any) from configuration * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. * @param idExtractor partial function to extract the entry id and the message to send to the @@ -265,18 +274,19 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def start( typeName: String, entryProps: Option[Props], + roleOverride: Option[String], rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver): ActorRef = { - start(typeName, entryProps, rememberEntries, idExtractor, shardResolver, + start(typeName, entryProps, roleOverride, rememberEntries, idExtractor, shardResolver, new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance)) } /** * Java API: Register a named entry type by defining the [[akka.actor.Props]] of the entry actor * and functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor - * for this type can later be retrieved with the [[#shardRegion]] method. + * for this type can later be retrieved with the [[#shardRegion]] method. * * Some settings can be configured as described in the `akka.contrib.cluster.sharding` section * of the `reference.conf`. @@ -286,6 +296,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * if not defined (null) the `ShardRegion` on this node will run in proxy only mode, i.e. * it will delegate messages to other `ShardRegion` actors on other nodes, but not host any * entry actors itself + * @param roleOverride specifies that this entry type requires cluster nodes with a specific role. + * if not defined (None), then defaults to standard behavior of using Role (if any) from configuration * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. * @param messageExtractor functions to extract the entry id, shard id, and the message to send to the @@ -297,11 +309,12 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def start( typeName: String, entryProps: Props, + roleOverride: Option[String], rememberEntries: Boolean, messageExtractor: ShardRegion.MessageExtractor, allocationStrategy: ShardAllocationStrategy): ActorRef = { - start(typeName, entryProps = Option(entryProps), rememberEntries, + start(typeName, entryProps = Option(entryProps), roleOverride, rememberEntries, idExtractor = { case msg if messageExtractor.entryId(msg) ne null ⇒ (messageExtractor.entryId(msg), messageExtractor.entryMessage(msg)) @@ -313,7 +326,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { /** * Java API: Register a named entry type by defining the [[akka.actor.Props]] of the entry actor * and functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor - * for this type can later be retrieved with the [[#shardRegion]] method. + * for this type can later be retrieved with the [[#shardRegion]] method. * * The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]] * is used. @@ -326,6 +339,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * if not defined (null) the `ShardRegion` on this node will run in proxy only mode, i.e. * it will delegate messages to other `ShardRegion` actors on other nodes, but not host any * entry actors itself + * @param roleOverride specifies that this entry type requires cluster nodes with a specific role. + * if not defined (None), then defaults to standard behavior of using Role (if any) from configuration * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. * @param messageExtractor functions to extract the entry id, shard id, and the message to send to the @@ -335,10 +350,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def start( typeName: String, entryProps: Props, + roleOverride: Option[String], rememberEntries: Boolean, messageExtractor: ShardRegion.MessageExtractor): ActorRef = { - start(typeName, entryProps, rememberEntries, messageExtractor, + start(typeName, entryProps, roleOverride, rememberEntries, messageExtractor, new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance)) } @@ -359,8 +375,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { */ private[akka] object ClusterShardingGuardian { import ShardCoordinator.ShardAllocationStrategy - final case class Start(typeName: String, entryProps: Option[Props], rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, - shardResolver: ShardRegion.ShardResolver, allocationStrategy: ShardAllocationStrategy) + final case class Start(typeName: String, entryProps: Option[Props], role: Option[String], rememberEntries: Boolean, + idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, allocationStrategy: ShardAllocationStrategy) extends NoSerializationVerificationNeeded final case class Started(shardRegion: ActorRef) extends NoSerializationVerificationNeeded } @@ -377,12 +393,13 @@ private[akka] class ClusterShardingGuardian extends Actor { import sharding.Settings._ def receive = { - case Start(typeName, entryProps, rememberEntries, idExtractor, shardResolver, allocationStrategy) ⇒ + case Start(typeName, entryProps, role, rememberEntries, idExtractor, shardResolver, allocationStrategy) ⇒ val encName = URLEncoder.encode(typeName, "utf-8") val coordinatorSingletonManagerName = encName + "Coordinator" val coordinatorPath = (self.path / coordinatorSingletonManagerName / "singleton" / "coordinator").toStringWithoutAddress val shardRegion = context.child(encName).getOrElse { - if (HasNecessaryClusterRole && context.child(coordinatorSingletonManagerName).isEmpty) { + val hasRequiredRole = hasNecessaryClusterRole(role) + if (hasRequiredRole && context.child(coordinatorSingletonManagerName).isEmpty) { val coordinatorProps = ShardCoordinator.props(handOffTimeout = HandOffTimeout, shardStartTimeout = ShardStartTimeout, rebalanceInterval = RebalanceInterval, snapshotInterval = SnapshotInterval, allocationStrategy) val singletonProps = ShardCoordinatorSupervisor.props(CoordinatorFailureBackoff, coordinatorProps) @@ -390,14 +407,14 @@ private[akka] class ClusterShardingGuardian extends Actor { singletonProps, singletonName = "singleton", terminationMessage = PoisonPill, - role = Role), + role = role), name = coordinatorSingletonManagerName) } context.actorOf(ShardRegion.props( typeName = typeName, - entryProps = if (HasNecessaryClusterRole) entryProps else None, - role = Role, + entryProps = if (hasRequiredRole) entryProps else None, + role = role, coordinatorPath = coordinatorPath, retryInterval = RetryInterval, snapshotInterval = SnapshotInterval, @@ -1369,11 +1386,11 @@ object ShardCoordinator { */ sealed trait CoordinatorMessage /** - * `ShardRegion` registers to `ShardCoordinator`, until it receives [[RegisterAck]].  + * `ShardRegion` registers to `ShardCoordinator`, until it receives [[RegisterAck]]. */ @SerialVersionUID(1L) final case class Register(shardRegion: ActorRef) extends CoordinatorCommand /** - * `ShardRegion` in proxy only mode registers to `ShardCoordinator`, until it receives [[RegisterAck]].  + * `ShardRegion` in proxy only mode registers to `ShardCoordinator`, until it receives [[RegisterAck]]. */ @SerialVersionUID(1L) final case class RegisterProxy(shardRegionProxy: ActorRef) extends CoordinatorCommand /** @@ -1738,4 +1755,3 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite def allocateShardHomes(): Unit = persistentState.unallocatedShards.foreach { self ! GetShardHome(_) } } - diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala index ae94e5f26c..23436379b7 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala @@ -111,6 +111,7 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe ClusterSharding(system).start( typeName = "Entity", entryProps = Some(Props[Entity]), + roleOverride = None, rememberEntries = true, idExtractor = idExtractor, shardResolver = shardResolver) diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala index 6667510810..d71e3d3d26 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala @@ -501,6 +501,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult val counterRegion: ActorRef = ClusterSharding(system).start( typeName = "Counter", entryProps = Some(Props[Counter]), + roleOverride = None, rememberEntries = false, idExtractor = idExtractor, shardResolver = shardResolver) @@ -508,6 +509,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult ClusterSharding(system).start( typeName = "AnotherCounter", entryProps = Some(Props[Counter]), + roleOverride = None, rememberEntries = false, idExtractor = idExtractor, shardResolver = shardResolver) @@ -549,6 +551,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult val counterRegionViaStart: ActorRef = ClusterSharding(system).start( typeName = "ApiTest", entryProps = Some(Props[Counter]), + roleOverride = None, rememberEntries = false, idExtractor = idExtractor, shardResolver = shardResolver) diff --git a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java index 21acfdbc3c..dd500c3512 100644 --- a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java +++ b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java @@ -13,6 +13,7 @@ import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.ReceiveTimeout; import akka.japi.Procedure; +import akka.japi.Option; import akka.persistence.UntypedPersistentActor; // Doc code, compile only @@ -64,8 +65,9 @@ public class ClusterShardingTest { //#counter-extractor //#counter-start + Option roleOption = Option.none(); ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter", - Props.create(Counter.class), false, messageExtractor); + Props.create(Counter.class), Option.java2ScalaOption(roleOption), false, messageExtractor); //#counter-start //#counter-usage