!con #16123 Expand cluster roles support in ClusterSharding

Allow a roleOverride: Option[String] to be used when starting ClusterSharding for a given entry type. This will allow role defined clusters of ClusterSharding for entry types instead of requiring the role configuration to be all or nothing across all entry types.
This commit is contained in:
rmarsch 2014-10-24 12:59:58 -04:00
parent 6800c51e56
commit aa59bfdcf6
4 changed files with 42 additions and 20 deletions

View file

@ -174,11 +174,13 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
*/ */
private[akka] object Settings { private[akka] object Settings {
val config = system.settings.config.getConfig("akka.contrib.cluster.sharding") val config = system.settings.config.getConfig("akka.contrib.cluster.sharding")
val Role: Option[String] = config.getString("role") match { val Role: Option[String] = config.getString("role") match {
case "" None case "" None
case r Some(r) case r Some(r)
} }
val HasNecessaryClusterRole: Boolean = Role.forall(cluster.selfRoles.contains) def hasNecessaryClusterRole(role: Option[String]): Boolean = role.forall(cluster.selfRoles.contains)
val GuardianName: String = config.getString("guardian-name") val GuardianName: String = config.getString("guardian-name")
val CoordinatorFailureBackoff = config.getDuration("coordinator-failure-backoff", MILLISECONDS).millis val CoordinatorFailureBackoff = config.getDuration("coordinator-failure-backoff", MILLISECONDS).millis
val RetryInterval: FiniteDuration = config.getDuration("retry-interval", MILLISECONDS).millis val RetryInterval: FiniteDuration = config.getDuration("retry-interval", MILLISECONDS).millis
@ -201,7 +203,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
/** /**
* Scala API: Register a named entry type by defining the [[akka.actor.Props]] of the entry actor * Scala API: Register a named entry type by defining the [[akka.actor.Props]] of the entry actor
* and functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor * and functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor
* for this type can later be retrieved with the [[#shardRegion]] method. * for this type can later be retrieved with the [[#shardRegion]] method.
* *
* Some settings can be configured as described in the `akka.contrib.cluster.sharding` section * Some settings can be configured as described in the `akka.contrib.cluster.sharding` section
* of the `reference.conf`. * of the `reference.conf`.
@ -211,6 +213,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
* if not defined (None) the `ShardRegion` on this node will run in proxy only mode, i.e. * if not defined (None) the `ShardRegion` on this node will run in proxy only mode, i.e.
* it will delegate messages to other `ShardRegion` actors on other nodes, but not host any * it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
* entry actors itself * entry actors itself
* @param roleOverride specifies that this entry type requires cluster nodes with a specific role.
* if not defined (None), then defaults to standard behavior of using Role (if any) from configuration
* @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard`
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
* @param idExtractor partial function to extract the entry id and the message to send to the * @param idExtractor partial function to extract the entry id and the message to send to the
@ -225,13 +229,16 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
def start( def start(
typeName: String, typeName: String,
entryProps: Option[Props], entryProps: Option[Props],
roleOverride: Option[String],
rememberEntries: Boolean, rememberEntries: Boolean,
idExtractor: ShardRegion.IdExtractor, idExtractor: ShardRegion.IdExtractor,
shardResolver: ShardRegion.ShardResolver, shardResolver: ShardRegion.ShardResolver,
allocationStrategy: ShardAllocationStrategy): ActorRef = { allocationStrategy: ShardAllocationStrategy): ActorRef = {
val resolvedRole = if (roleOverride == None) Role else roleOverride
implicit val timeout = system.settings.CreationTimeout implicit val timeout = system.settings.CreationTimeout
val startMsg = Start(typeName, entryProps, rememberEntries, idExtractor, shardResolver, allocationStrategy) val startMsg = Start(typeName, entryProps, roleOverride, rememberEntries, idExtractor, shardResolver, allocationStrategy)
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
regions.put(typeName, shardRegion) regions.put(typeName, shardRegion)
shardRegion shardRegion
@ -240,7 +247,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
/** /**
* Register a named entry type by defining the [[akka.actor.Props]] of the entry actor and * Register a named entry type by defining the [[akka.actor.Props]] of the entry actor and
* functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor * functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor
* for this type can later be retrieved with the [[#shardRegion]] method. * for this type can later be retrieved with the [[#shardRegion]] method.
* *
* The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]] * The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]]
* is used. * is used.
@ -253,6 +260,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
* if not defined (None) the `ShardRegion` on this node will run in proxy only mode, i.e. * if not defined (None) the `ShardRegion` on this node will run in proxy only mode, i.e.
* it will delegate messages to other `ShardRegion` actors on other nodes, but not host any * it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
* entry actors itself * entry actors itself
* @param roleOverride specifies that this entry type requires cluster nodes with a specific role.
* if not defined (None), then defaults to standard behavior of using Role (if any) from configuration
* @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard`
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
* @param idExtractor partial function to extract the entry id and the message to send to the * @param idExtractor partial function to extract the entry id and the message to send to the
@ -265,18 +274,19 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
def start( def start(
typeName: String, typeName: String,
entryProps: Option[Props], entryProps: Option[Props],
roleOverride: Option[String],
rememberEntries: Boolean, rememberEntries: Boolean,
idExtractor: ShardRegion.IdExtractor, idExtractor: ShardRegion.IdExtractor,
shardResolver: ShardRegion.ShardResolver): ActorRef = { shardResolver: ShardRegion.ShardResolver): ActorRef = {
start(typeName, entryProps, rememberEntries, idExtractor, shardResolver, start(typeName, entryProps, roleOverride, rememberEntries, idExtractor, shardResolver,
new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance)) new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance))
} }
/** /**
* Java API: Register a named entry type by defining the [[akka.actor.Props]] of the entry actor * Java API: Register a named entry type by defining the [[akka.actor.Props]] of the entry actor
* and functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor * and functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor
* for this type can later be retrieved with the [[#shardRegion]] method. * for this type can later be retrieved with the [[#shardRegion]] method.
* *
* Some settings can be configured as described in the `akka.contrib.cluster.sharding` section * Some settings can be configured as described in the `akka.contrib.cluster.sharding` section
* of the `reference.conf`. * of the `reference.conf`.
@ -286,6 +296,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
* if not defined (null) the `ShardRegion` on this node will run in proxy only mode, i.e. * if not defined (null) the `ShardRegion` on this node will run in proxy only mode, i.e.
* it will delegate messages to other `ShardRegion` actors on other nodes, but not host any * it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
* entry actors itself * entry actors itself
* @param roleOverride specifies that this entry type requires cluster nodes with a specific role.
* if not defined (None), then defaults to standard behavior of using Role (if any) from configuration
* @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard`
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
* @param messageExtractor functions to extract the entry id, shard id, and the message to send to the * @param messageExtractor functions to extract the entry id, shard id, and the message to send to the
@ -297,11 +309,12 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
def start( def start(
typeName: String, typeName: String,
entryProps: Props, entryProps: Props,
roleOverride: Option[String],
rememberEntries: Boolean, rememberEntries: Boolean,
messageExtractor: ShardRegion.MessageExtractor, messageExtractor: ShardRegion.MessageExtractor,
allocationStrategy: ShardAllocationStrategy): ActorRef = { allocationStrategy: ShardAllocationStrategy): ActorRef = {
start(typeName, entryProps = Option(entryProps), rememberEntries, start(typeName, entryProps = Option(entryProps), roleOverride, rememberEntries,
idExtractor = { idExtractor = {
case msg if messageExtractor.entryId(msg) ne null case msg if messageExtractor.entryId(msg) ne null
(messageExtractor.entryId(msg), messageExtractor.entryMessage(msg)) (messageExtractor.entryId(msg), messageExtractor.entryMessage(msg))
@ -313,7 +326,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
/** /**
* Java API: Register a named entry type by defining the [[akka.actor.Props]] of the entry actor * Java API: Register a named entry type by defining the [[akka.actor.Props]] of the entry actor
* and functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor * and functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor
* for this type can later be retrieved with the [[#shardRegion]] method. * for this type can later be retrieved with the [[#shardRegion]] method.
* *
* The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]] * The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]]
* is used. * is used.
@ -326,6 +339,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
* if not defined (null) the `ShardRegion` on this node will run in proxy only mode, i.e. * if not defined (null) the `ShardRegion` on this node will run in proxy only mode, i.e.
* it will delegate messages to other `ShardRegion` actors on other nodes, but not host any * it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
* entry actors itself * entry actors itself
* @param roleOverride specifies that this entry type requires cluster nodes with a specific role.
* if not defined (None), then defaults to standard behavior of using Role (if any) from configuration
* @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard`
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
* @param messageExtractor functions to extract the entry id, shard id, and the message to send to the * @param messageExtractor functions to extract the entry id, shard id, and the message to send to the
@ -335,10 +350,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
def start( def start(
typeName: String, typeName: String,
entryProps: Props, entryProps: Props,
roleOverride: Option[String],
rememberEntries: Boolean, rememberEntries: Boolean,
messageExtractor: ShardRegion.MessageExtractor): ActorRef = { messageExtractor: ShardRegion.MessageExtractor): ActorRef = {
start(typeName, entryProps, rememberEntries, messageExtractor, start(typeName, entryProps, roleOverride, rememberEntries, messageExtractor,
new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance)) new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance))
} }
@ -359,8 +375,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
*/ */
private[akka] object ClusterShardingGuardian { private[akka] object ClusterShardingGuardian {
import ShardCoordinator.ShardAllocationStrategy import ShardCoordinator.ShardAllocationStrategy
final case class Start(typeName: String, entryProps: Option[Props], rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, final case class Start(typeName: String, entryProps: Option[Props], role: Option[String], rememberEntries: Boolean,
shardResolver: ShardRegion.ShardResolver, allocationStrategy: ShardAllocationStrategy) idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, allocationStrategy: ShardAllocationStrategy)
extends NoSerializationVerificationNeeded extends NoSerializationVerificationNeeded
final case class Started(shardRegion: ActorRef) extends NoSerializationVerificationNeeded final case class Started(shardRegion: ActorRef) extends NoSerializationVerificationNeeded
} }
@ -377,12 +393,13 @@ private[akka] class ClusterShardingGuardian extends Actor {
import sharding.Settings._ import sharding.Settings._
def receive = { def receive = {
case Start(typeName, entryProps, rememberEntries, idExtractor, shardResolver, allocationStrategy) case Start(typeName, entryProps, role, rememberEntries, idExtractor, shardResolver, allocationStrategy)
val encName = URLEncoder.encode(typeName, "utf-8") val encName = URLEncoder.encode(typeName, "utf-8")
val coordinatorSingletonManagerName = encName + "Coordinator" val coordinatorSingletonManagerName = encName + "Coordinator"
val coordinatorPath = (self.path / coordinatorSingletonManagerName / "singleton" / "coordinator").toStringWithoutAddress val coordinatorPath = (self.path / coordinatorSingletonManagerName / "singleton" / "coordinator").toStringWithoutAddress
val shardRegion = context.child(encName).getOrElse { val shardRegion = context.child(encName).getOrElse {
if (HasNecessaryClusterRole && context.child(coordinatorSingletonManagerName).isEmpty) { val hasRequiredRole = hasNecessaryClusterRole(role)
if (hasRequiredRole && context.child(coordinatorSingletonManagerName).isEmpty) {
val coordinatorProps = ShardCoordinator.props(handOffTimeout = HandOffTimeout, shardStartTimeout = ShardStartTimeout, val coordinatorProps = ShardCoordinator.props(handOffTimeout = HandOffTimeout, shardStartTimeout = ShardStartTimeout,
rebalanceInterval = RebalanceInterval, snapshotInterval = SnapshotInterval, allocationStrategy) rebalanceInterval = RebalanceInterval, snapshotInterval = SnapshotInterval, allocationStrategy)
val singletonProps = ShardCoordinatorSupervisor.props(CoordinatorFailureBackoff, coordinatorProps) val singletonProps = ShardCoordinatorSupervisor.props(CoordinatorFailureBackoff, coordinatorProps)
@ -390,14 +407,14 @@ private[akka] class ClusterShardingGuardian extends Actor {
singletonProps, singletonProps,
singletonName = "singleton", singletonName = "singleton",
terminationMessage = PoisonPill, terminationMessage = PoisonPill,
role = Role), role = role),
name = coordinatorSingletonManagerName) name = coordinatorSingletonManagerName)
} }
context.actorOf(ShardRegion.props( context.actorOf(ShardRegion.props(
typeName = typeName, typeName = typeName,
entryProps = if (HasNecessaryClusterRole) entryProps else None, entryProps = if (hasRequiredRole) entryProps else None,
role = Role, role = role,
coordinatorPath = coordinatorPath, coordinatorPath = coordinatorPath,
retryInterval = RetryInterval, retryInterval = RetryInterval,
snapshotInterval = SnapshotInterval, snapshotInterval = SnapshotInterval,
@ -1369,11 +1386,11 @@ object ShardCoordinator {
*/ */
sealed trait CoordinatorMessage sealed trait CoordinatorMessage
/** /**
* `ShardRegion` registers to `ShardCoordinator`, until it receives [[RegisterAck]].  * `ShardRegion` registers to `ShardCoordinator`, until it receives [[RegisterAck]].
*/ */
@SerialVersionUID(1L) final case class Register(shardRegion: ActorRef) extends CoordinatorCommand @SerialVersionUID(1L) final case class Register(shardRegion: ActorRef) extends CoordinatorCommand
/** /**
* `ShardRegion` in proxy only mode registers to `ShardCoordinator`, until it receives [[RegisterAck]].  * `ShardRegion` in proxy only mode registers to `ShardCoordinator`, until it receives [[RegisterAck]].
*/ */
@SerialVersionUID(1L) final case class RegisterProxy(shardRegionProxy: ActorRef) extends CoordinatorCommand @SerialVersionUID(1L) final case class RegisterProxy(shardRegionProxy: ActorRef) extends CoordinatorCommand
/** /**
@ -1738,4 +1755,3 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite
def allocateShardHomes(): Unit = persistentState.unallocatedShards.foreach { self ! GetShardHome(_) } def allocateShardHomes(): Unit = persistentState.unallocatedShards.foreach { self ! GetShardHome(_) }
} }

View file

@ -111,6 +111,7 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe
ClusterSharding(system).start( ClusterSharding(system).start(
typeName = "Entity", typeName = "Entity",
entryProps = Some(Props[Entity]), entryProps = Some(Props[Entity]),
roleOverride = None,
rememberEntries = true, rememberEntries = true,
idExtractor = idExtractor, idExtractor = idExtractor,
shardResolver = shardResolver) shardResolver = shardResolver)

View file

@ -501,6 +501,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
val counterRegion: ActorRef = ClusterSharding(system).start( val counterRegion: ActorRef = ClusterSharding(system).start(
typeName = "Counter", typeName = "Counter",
entryProps = Some(Props[Counter]), entryProps = Some(Props[Counter]),
roleOverride = None,
rememberEntries = false, rememberEntries = false,
idExtractor = idExtractor, idExtractor = idExtractor,
shardResolver = shardResolver) shardResolver = shardResolver)
@ -508,6 +509,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
ClusterSharding(system).start( ClusterSharding(system).start(
typeName = "AnotherCounter", typeName = "AnotherCounter",
entryProps = Some(Props[Counter]), entryProps = Some(Props[Counter]),
roleOverride = None,
rememberEntries = false, rememberEntries = false,
idExtractor = idExtractor, idExtractor = idExtractor,
shardResolver = shardResolver) shardResolver = shardResolver)
@ -549,6 +551,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
val counterRegionViaStart: ActorRef = ClusterSharding(system).start( val counterRegionViaStart: ActorRef = ClusterSharding(system).start(
typeName = "ApiTest", typeName = "ApiTest",
entryProps = Some(Props[Counter]), entryProps = Some(Props[Counter]),
roleOverride = None,
rememberEntries = false, rememberEntries = false,
idExtractor = idExtractor, idExtractor = idExtractor,
shardResolver = shardResolver) shardResolver = shardResolver)

View file

@ -13,6 +13,7 @@ import akka.actor.PoisonPill;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.ReceiveTimeout; import akka.actor.ReceiveTimeout;
import akka.japi.Procedure; import akka.japi.Procedure;
import akka.japi.Option;
import akka.persistence.UntypedPersistentActor; import akka.persistence.UntypedPersistentActor;
// Doc code, compile only // Doc code, compile only
@ -64,8 +65,9 @@ public class ClusterShardingTest {
//#counter-extractor //#counter-extractor
//#counter-start //#counter-start
Option<String> roleOption = Option.none();
ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter", ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter",
Props.create(Counter.class), false, messageExtractor); Props.create(Counter.class), Option.java2ScalaOption(roleOption), false, messageExtractor);
//#counter-start //#counter-start
//#counter-usage //#counter-usage