diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index b2126409c1..1f5c5cebe9 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -28,6 +28,23 @@ akka { # formed in case of network partition. auto-down = off + # The roles of this member. List of strings, e.g. roles = ["A", "B"]. + # The roles are part of the membership information and can be used by + # routers or other services to distribute work to certain member types, + # e.g. front-end and back-end nodes. + roles = [] + + role { + # Minimum required number of members of a certain role before the leader changes + # member status of 'Joining' members to 'Up'. Typically used together with + # 'Cluster.registerOnMemberUp' to defer some action, such as starting actors, + # until the cluster has reached a certain size. + # E.g. to require 2 nodes with role 'frontend' and 3 nodes with role 'backend': + # frontend.min-nr-of-members = 2 + # backend.min-nr-of-members = 3 + #.min-nr-of-members = 1 + } + # Minimum required number of members before the leader changes member status # of 'Joining' members to 'Up'. Typically used together with # 'Cluster.registerOnMemberUp' to defer some action, such as starting actors, @@ -201,6 +218,9 @@ akka { # when routees-path is defined. routees-path = "" + # Use members with specified role, or all members if undefined. + use-role = "" + } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index da34727370..9221d81295 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -73,6 +73,17 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { format(system, other.getClass.getName)) } + /** + * roles that this member has + */ + def selfRoles: Set[String] = settings.Roles + + /** + * Java API: roles that this member has + */ + def getSelfRoles: java.util.Set[String] = + scala.collection.JavaConverters.setAsJavaSetConverter(selfRoles).asJava + private val _isTerminated = new AtomicBoolean(false) private val log = Logging(system, "Cluster") diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index 5fb8376eea..0d06d4d64b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -116,11 +116,13 @@ private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: Dynami if (deploy.routerConfig.isInstanceOf[RemoteRouterConfig]) throw new ConfigurationException("Cluster deployment can't be combined with [%s]".format(deploy.routerConfig)) + import ClusterRouterSettings.useRoleOption val clusterRouterSettings = ClusterRouterSettings( totalInstances = deploy.config.getInt("nr-of-instances"), maxInstancesPerNode = deploy.config.getInt("cluster.max-nr-of-instances-per-node"), allowLocalRoutees = deploy.config.getBoolean("cluster.allow-local-routees"), - routeesPath = deploy.config.getString("cluster.routees-path")) + routeesPath = deploy.config.getString("cluster.routees-path"), + useRole = useRoleOption(deploy.config.getString("cluster.use-role"))) Some(deploy.copy( routerConfig = ClusterRouterConfig(deploy.routerConfig, clusterRouterSettings), scope = ClusterScope)) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 6a7416476e..64854f243b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -37,7 +37,7 @@ object ClusterUserAction { * Command to join the cluster. Sent when a node (represented by 'address') * wants to join another node (the receiver). */ - case class Join(address: Address) extends ClusterMessage + case class Join(address: Address, roles: Set[String]) extends ClusterMessage /** * Command to leave the cluster. @@ -288,20 +288,20 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } def initialized: Actor.Receive = { - case msg: GossipEnvelope ⇒ receiveGossip(msg) - case GossipTick ⇒ gossip() - case ReapUnreachableTick ⇒ reapUnreachableMembers() - case LeaderActionsTick ⇒ leaderActions() - case PublishStatsTick ⇒ publishInternalStats() - case InitJoin ⇒ initJoin() - case JoinTo(address) ⇒ join(address) - case ClusterUserAction.Join(address) ⇒ joining(address) - case ClusterUserAction.Down(address) ⇒ downing(address) - case ClusterUserAction.Leave(address) ⇒ leaving(address) - case Exit(address) ⇒ exiting(address) - case Remove(address) ⇒ removing(address) - case SendGossipTo(address) ⇒ gossipTo(address) - case msg: SubscriptionMessage ⇒ publisher forward msg + case msg: GossipEnvelope ⇒ receiveGossip(msg) + case GossipTick ⇒ gossip() + case ReapUnreachableTick ⇒ reapUnreachableMembers() + case LeaderActionsTick ⇒ leaderActions() + case PublishStatsTick ⇒ publishInternalStats() + case InitJoin ⇒ initJoin() + case JoinTo(address) ⇒ join(address) + case ClusterUserAction.Join(address, roles) ⇒ joining(address, roles) + case ClusterUserAction.Down(address) ⇒ downing(address) + case ClusterUserAction.Leave(address) ⇒ leaving(address) + case Exit(address) ⇒ exiting(address) + case Remove(address) ⇒ removing(address) + case SendGossipTo(address) ⇒ gossipTo(address) + case msg: SubscriptionMessage ⇒ publisher forward msg } @@ -366,16 +366,16 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto context.become(initialized) if (address == selfAddress) - joining(address) + joining(address, cluster.selfRoles) else - clusterCore(address) ! ClusterUserAction.Join(selfAddress) + clusterCore(address) ! ClusterUserAction.Join(selfAddress, cluster.selfRoles) } } /** * State transition to JOINING - new node joining. */ - def joining(node: Address): Unit = { + def joining(node: Address, roles: Set[String]): Unit = { if (node.protocol != selfAddress.protocol) log.warning("Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]", selfAddress.protocol, node.protocol) @@ -396,7 +396,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // add joining node as Joining // add self in case someone else joins before self has joined (Set discards duplicates) - val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining) + val newMembers = localMembers + Member(node, Joining, roles) + Member(selfAddress, Joining, cluster.selfRoles) val newGossip = latestGossip copy (members = newMembers) val versionedGossip = newGossip :+ vclockNode @@ -404,7 +404,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto latestGossip = seenVersionedGossip - log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node) + log.info("Cluster Node [{}] - Node [{}] is JOINING, roles [{}]", selfAddress, node, roles.mkString(", ")) if (node != selfAddress) { gossipTo(node) } @@ -419,7 +419,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto */ def leaving(address: Address): Unit = { if (latestGossip.members.exists(_.address == address)) { // only try to update if the node is available (in the member ring) - val newMembers = latestGossip.members map { member ⇒ if (member.address == address) Member(address, Leaving) else member } // mark node as LEAVING + val newMembers = latestGossip.members map { m ⇒ if (m.address == address) m.copy(status = Leaving) else m } // mark node as LEAVING val newGossip = latestGossip copy (members = newMembers) val versionedGossip = newGossip :+ vclockNode @@ -636,8 +636,12 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto if (localGossip.convergence) { // we have convergence - so we can't have unreachable nodes - val numberOfMembers = localMembers.size - def isJoiningToUp(m: Member): Boolean = m.status == Joining && numberOfMembers >= MinNrOfMembers + def enoughMembers: Boolean = { + localMembers.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall { + case (role, threshold) ⇒ localMembers.count(_.hasRole(role)) >= threshold + } + } + def isJoiningToUp(m: Member): Boolean = m.status == Joining && enoughMembers // transform the node member ring val newMembers = localMembers collect { diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 6b61a5c7d8..84b174ee99 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -35,7 +35,8 @@ object ClusterEvent { members: immutable.SortedSet[Member] = immutable.SortedSet.empty, unreachable: Set[Member] = Set.empty, seenBy: Set[Address] = Set.empty, - leader: Option[Address] = None) extends ClusterDomainEvent { + leader: Option[Address] = None, + roleLeaderMap: Map[String, Option[Address]] = Map.empty) extends ClusterDomainEvent { /** * Java API: get current member list. @@ -61,6 +62,28 @@ object ClusterEvent { * Java API: get address of current leader, or null if none */ def getLeader: Address = leader orNull + + /** + * All node roles in the cluster + */ + def allRoles: Set[String] = roleLeaderMap.keySet + + /** + * Java API: All node roles in the cluster + */ + def getAllRoles: java.util.Set[String] = + scala.collection.JavaConverters.setAsJavaSetConverter(allRoles).asJava + + /** + * get address of current leader, if any, within the role set + */ + def roleLeader(role: String): Option[Address] = roleLeaderMap.getOrElse(role, None) + + /** + * Java API: get address of current leader within the role set, + * or null if no node with that role + */ + def getRoleLeader(role: String): Address = roleLeaderMap.get(role).flatten.orNull } /** @@ -107,6 +130,18 @@ object ClusterEvent { def getLeader: Address = leader orNull } + /** + * First member (leader) of the members within a role set changed. + * Published when the state change is first seen on a node. + */ + case class RoleLeaderChanged(role: String, leader: Option[Address]) extends ClusterDomainEvent { + /** + * Java API + * @return address of current leader, or null if none + */ + def getLeader: Address = leader orNull + } + /** * A member is considered as unreachable by the failure detector. */ @@ -184,9 +219,22 @@ object ClusterEvent { /** * INTERNAL API */ - private[cluster] def diffLeader(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[LeaderChanged] = - if (newGossip.leader != oldGossip.leader) List(LeaderChanged(newGossip.leader)) + private[cluster] def diffLeader(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[LeaderChanged] = { + val newLeader = newGossip.leader + if (newLeader != oldGossip.leader) List(LeaderChanged(newLeader)) else Nil + } + + /** + * INTERNAL API + */ + private[cluster] def diffRolesLeader(oldGossip: Gossip, newGossip: Gossip): Set[RoleLeaderChanged] = { + for { + role ← (oldGossip.allRoles ++ newGossip.allRoles) + newLeader = newGossip.roleLeader(role) + if newLeader != oldGossip.roleLeader(role) + } yield RoleLeaderChanged(role, newLeader) + } /** * INTERNAL API @@ -242,7 +290,8 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto members = latestGossip.members, unreachable = latestGossip.overview.unreachable, seenBy = latestGossip.seenBy, - leader = latestGossip.leader) + leader = latestGossip.leader, + roleLeaderMap = latestGossip.allRoles.map(r ⇒ r -> latestGossip.roleLeader(r))(collection.breakOut)) receiver match { case Some(ref) ⇒ ref ! state case None ⇒ publish(state) @@ -275,6 +324,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto } } diffLeader(oldGossip, newGossip) foreach publish + diffRolesLeader(oldGossip, newGossip) foreach publish // publish internal SeenState for testing purposes diffSeen(oldGossip, newGossip) foreach publish } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 807fe85e52..39eaa63bfd 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -57,7 +57,10 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { // replace current member with new member (might have different status, only address is used in equals) state = state.copy(members = state.members - event.member + event.member, unreachable = state.unreachable - event.member) - case LeaderChanged(leader) ⇒ state = state.copy(leader = leader) + case LeaderChanged(leader) ⇒ + state = state.copy(leader = leader) + case RoleLeaderChanged(role, leader) ⇒ + state = state.copy(roleLeaderMap = state.roleLeaderMap + (role -> leader)) case s: CurrentClusterState ⇒ state = s case CurrentInternalStats(stats) ⇒ _latestStats = stats case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes @@ -68,7 +71,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { def self: Member = { state.members.find(_.address == selfAddress).orElse(state.unreachable.find(_.address == selfAddress)). - getOrElse(Member(selfAddress, MemberStatus.Removed)) + getOrElse(Member(selfAddress, MemberStatus.Removed, cluster.selfRoles)) } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 27bde94a7f..64d02ca03d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -5,6 +5,7 @@ package akka.cluster import scala.collection.immutable import com.typesafe.config.Config +import com.typesafe.config.ConfigObject import scala.concurrent.duration.Duration import java.util.concurrent.TimeUnit.MILLISECONDS import akka.ConfigurationException @@ -50,9 +51,16 @@ class ClusterSettings(val config: Config, val systemName: String) { final val PublishStatsInterval: FiniteDuration = Duration(cc.getMilliseconds("publish-stats-interval"), MILLISECONDS) final val AutoJoin: Boolean = cc.getBoolean("auto-join") final val AutoDown: Boolean = cc.getBoolean("auto-down") + final val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet final val MinNrOfMembers: Int = { cc.getInt("min-nr-of-members") } requiring (_ > 0, "min-nr-of-members must be > 0") + final val MinNrOfMembersOfRole: Map[String, Int] = { + import scala.collection.JavaConverters._ + cc.getConfig("role").root.asScala.collect { + case (key, value: ConfigObject) ⇒ (key -> value.toConfig.getInt("min-nr-of-members")) + }.toMap + } final val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled") final val UseDispatcher: String = cc.getString("use-dispatcher") match { case "" ⇒ Dispatchers.DefaultDispatcherId diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 2d3ba52570..a5eac87bfb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -185,12 +185,18 @@ private[cluster] case class Gossip( def isLeader(address: Address): Boolean = leader == Some(address) - def leader: Option[Address] = { - if (members.isEmpty) None - else members.find(m ⇒ m.status != Joining && m.status != Exiting && m.status != Down). - orElse(Some(members.min(Member.leaderStatusOrdering))).map(_.address) + def leader: Option[Address] = leaderOf(members) + + def roleLeader(role: String): Option[Address] = leaderOf(members.filter(_.hasRole(role))) + + private def leaderOf(mbrs: immutable.SortedSet[Member]): Option[Address] = { + if (mbrs.isEmpty) None + else mbrs.find(m ⇒ m.status != Joining && m.status != Exiting && m.status != Down). + orElse(Some(mbrs.min(Member.leaderStatusOrdering))).map(_.address) } + def allRoles: Set[String] = members.flatMap(_.roles) + def isSingletonCluster: Boolean = members.size == 1 /** @@ -201,7 +207,7 @@ private[cluster] case class Gossip( def member(address: Address): Member = { members.find(_.address == address).orElse(overview.unreachable.find(_.address == address)). - getOrElse(Member(address, Removed)) + getOrElse(Member(address, Removed, Set.empty)) } override def toString = diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index db3b126571..8da1d7141c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -12,15 +12,26 @@ import akka.actor.Address import MemberStatus._ /** - * Represents the address and the current status of a cluster member node. + * Represents the address, current status, and roles of a cluster member node. * - * Note: `hashCode` and `equals` are solely based on the underlying `Address`, not its `MemberStatus`. + * Note: `hashCode` and `equals` are solely based on the underlying `Address`, not its `MemberStatus` + * and roles. */ -class Member(val address: Address, val status: MemberStatus) extends ClusterMessage { +case class Member(val address: Address, val status: MemberStatus, roles: Set[String]) extends ClusterMessage { override def hashCode = address.## - override def equals(other: Any) = Member.unapply(this) == Member.unapply(other) + override def equals(other: Any) = other match { + case m: Member ⇒ address == m.address + case _ ⇒ false + } override def toString = "Member(address = %s, status = %s)" format (address, status) - def copy(address: Address = this.address, status: MemberStatus = this.status): Member = new Member(address, status) + + def hasRole(role: String): Boolean = roles.contains(role) + + /** + * Java API + */ + def getRoles: java.util.Set[String] = + scala.collection.JavaConverters.setAsJavaSetConverter(roles).asJava } /** @@ -65,13 +76,6 @@ object Member { def compare(a: Member, b: Member): Int = addressOrdering.compare(a.address, b.address) } - def apply(address: Address, status: MemberStatus): Member = new Member(address, status) - - def unapply(other: Any) = other match { - case m: Member ⇒ Some(m.address) - case _ ⇒ None - } - def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = { // group all members by Address => Seq[Member] val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address) diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 1eb8b7a712..12ac7b7206 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -74,26 +74,31 @@ object ClusterRouterSettings { /** * Settings for create and deploy of the routees */ - def apply(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean): ClusterRouterSettings = - new ClusterRouterSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees) + def apply(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: Option[String]): ClusterRouterSettings = + new ClusterRouterSettings(totalInstances, maxInstancesPerNode, routeesPath = "", allowLocalRoutees, useRole) /** * Settings for remote deployment of the routees, allowed to use routees on own node */ - def apply(totalInstances: Int, maxInstancesPerNode: Int): ClusterRouterSettings = - apply(totalInstances, maxInstancesPerNode, allowLocalRoutees = true) + def apply(totalInstances: Int, maxInstancesPerNode: Int, useRole: Option[String]): ClusterRouterSettings = + apply(totalInstances, maxInstancesPerNode, allowLocalRoutees = true, useRole) /** * Settings for lookup of the routees */ - def apply(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean): ClusterRouterSettings = - new ClusterRouterSettings(totalInstances, routeesPath, allowLocalRoutees) + def apply(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean, useRole: Option[String]): ClusterRouterSettings = + new ClusterRouterSettings(totalInstances, maxInstancesPerNode = 1, routeesPath, allowLocalRoutees, useRole) /** * Settings for lookup of the routees, allowed to use routees on own node */ - def apply(totalInstances: Int, routeesPath: String): ClusterRouterSettings = - apply(totalInstances, routeesPath, allowLocalRoutees = true) + def apply(totalInstances: Int, routeesPath: String, useRole: Option[String]): ClusterRouterSettings = + apply(totalInstances, routeesPath, allowLocalRoutees = true, useRole) + + def useRoleOption(role: String): Option[String] = role match { + case null | "" ⇒ None + case _ ⇒ Some(role) + } } /** @@ -106,19 +111,22 @@ case class ClusterRouterSettings private[akka] ( totalInstances: Int, maxInstancesPerNode: Int, routeesPath: String, - allowLocalRoutees: Boolean) { + allowLocalRoutees: Boolean, + useRole: Option[String]) { /** * Java API: Settings for create and deploy of the routees */ - def this(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean) = - this(totalInstances, maxInstancesPerNode, routeesPath = "", allowLocalRoutees) + def this(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: String) = + this(totalInstances, maxInstancesPerNode, routeesPath = "", allowLocalRoutees, + ClusterRouterSettings.useRoleOption(useRole)) /** * Java API: Settings for lookup of the routees */ - def this(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean) = - this(totalInstances, maxInstancesPerNode = 1, routeesPath, allowLocalRoutees) + def this(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean, useRole: String) = + this(totalInstances, maxInstancesPerNode = 1, routeesPath, allowLocalRoutees, + ClusterRouterSettings.useRoleOption(useRole)) if (totalInstances <= 0) throw new IllegalArgumentException("totalInstances of cluster router must be > 0") if (maxInstancesPerNode <= 0) throw new IllegalArgumentException("maxInstancesPerNode of cluster router must be > 0") @@ -220,7 +228,7 @@ private[akka] class ClusterRouteeProvider( private[routing] def availableNodes: immutable.SortedSet[Address] = { import Member.addressOrdering val currentNodes = nodes - if (currentNodes.isEmpty && settings.allowLocalRoutees) + if (currentNodes.isEmpty && settings.allowLocalRoutees && satisfiesRole(cluster.selfRoles)) //use my own node, cluster information not updated yet immutable.SortedSet(cluster.selfAddress) else @@ -236,7 +244,14 @@ private[akka] class ClusterRouteeProvider( } private[routing] def isAvailable(m: Member): Boolean = - m.status == MemberStatus.Up && (settings.allowLocalRoutees || m.address != cluster.selfAddress) + m.status == MemberStatus.Up && + satisfiesRole(m.roles) && + (settings.allowLocalRoutees || m.address != cluster.selfAddress) + + private def satisfiesRole(memberRoles: Set[String]): Boolean = settings.useRole match { + case None ⇒ true + case Some(r) ⇒ memberRoles.contains(r) + } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala index 33ab5a7d4e..a91d672326 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala @@ -7,6 +7,7 @@ import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfter import scala.collection.immutable.SortedSet import scala.concurrent.duration._ +import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ @@ -25,64 +26,110 @@ object MinMembersBeforeUpMultiJvmSpec extends MultiNodeConfig { withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } +object MinMembersOfRoleBeforeUpMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString( + "akka.cluster.role.backend.min-nr-of-members = 2")). + withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) + + nodeConfig(first)( + ConfigFactory.parseString("akka.cluster.roles =[frontend]")) + + nodeConfig(second, third)( + ConfigFactory.parseString("akka.cluster.roles =[backend]")) +} + class MinMembersBeforeUpMultiJvmNode1 extends MinMembersBeforeUpSpec class MinMembersBeforeUpMultiJvmNode2 extends MinMembersBeforeUpSpec class MinMembersBeforeUpMultiJvmNode3 extends MinMembersBeforeUpSpec -abstract class MinMembersBeforeUpSpec - extends MultiNodeSpec(MinMembersBeforeUpMultiJvmSpec) - with MultiNodeClusterSpec { +class MinMembersOfRoleBeforeUpMultiJvmNode1 extends MinMembersOfRoleBeforeUpSpec +class MinMembersOfRoleBeforeUpMultiJvmNode2 extends MinMembersOfRoleBeforeUpSpec +class MinMembersOfRoleBeforeUpMultiJvmNode3 extends MinMembersOfRoleBeforeUpSpec - import MinMembersBeforeUpMultiJvmSpec._ - import ClusterEvent._ +abstract class MinMembersBeforeUpSpec extends MinMembersBeforeUpBase(MinMembersBeforeUpMultiJvmSpec) { + + override def first: RoleName = MinMembersBeforeUpMultiJvmSpec.first + override def second: RoleName = MinMembersBeforeUpMultiJvmSpec.second + override def third: RoleName = MinMembersBeforeUpMultiJvmSpec.third "Cluster leader" must { "wait with moving members to UP until minimum number of members have joined" taggedAs LongRunningTest in { - - val onUpLatch = TestLatch(1) - cluster.registerOnMemberUp(onUpLatch.countDown()) - - runOn(first) { - cluster join myself - awaitCond { - val result = clusterView.status == Joining - clusterView.refreshCurrentState() - result - } - } - enterBarrier("first-started") - - onUpLatch.isOpen must be(false) - - runOn(second) { - cluster.join(first) - } - runOn(first, second) { - val expectedAddresses = Set(first, second) map address - awaitCond { - val result = clusterView.members.map(_.address) == expectedAddresses - clusterView.refreshCurrentState() - result - } - clusterView.members.map(_.status) must be(Set(Joining)) - // and it should not change - 1 to 5 foreach { _ ⇒ - Thread.sleep(1000) - clusterView.members.map(_.address) must be(expectedAddresses) - clusterView.members.map(_.status) must be(Set(Joining)) - } - } - enterBarrier("second-joined") - - runOn(third) { - cluster.join(first) - } - awaitClusterUp(first, second, third) - - onUpLatch.await - - enterBarrier("after-1") + testWaitMovingMembersToUp() } - } } + +abstract class MinMembersOfRoleBeforeUpSpec extends MinMembersBeforeUpBase(MinMembersOfRoleBeforeUpMultiJvmSpec) { + + override def first: RoleName = MinMembersOfRoleBeforeUpMultiJvmSpec.first + override def second: RoleName = MinMembersOfRoleBeforeUpMultiJvmSpec.second + override def third: RoleName = MinMembersOfRoleBeforeUpMultiJvmSpec.third + + "Cluster leader" must { + "wait with moving members to UP until minimum number of members with specific role have joined" taggedAs LongRunningTest in { + testWaitMovingMembersToUp() + } + } +} + +abstract class MinMembersBeforeUpBase(multiNodeConfig: MultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) + with MultiNodeClusterSpec { + + import ClusterEvent._ + + def first: RoleName + def second: RoleName + def third: RoleName + + def testWaitMovingMembersToUp(): Unit = { + val onUpLatch = TestLatch(1) + cluster.registerOnMemberUp(onUpLatch.countDown()) + + runOn(first) { + cluster join myself + awaitCond { + val result = clusterView.status == Joining + clusterView.refreshCurrentState() + result + } + } + enterBarrier("first-started") + + onUpLatch.isOpen must be(false) + + runOn(second) { + cluster.join(first) + } + runOn(first, second) { + val expectedAddresses = Set(first, second) map address + awaitCond { + val result = clusterView.members.map(_.address) == expectedAddresses + clusterView.refreshCurrentState() + result + } + clusterView.members.map(_.status) must be(Set(Joining)) + // and it should not change + 1 to 5 foreach { _ ⇒ + Thread.sleep(1000) + clusterView.members.map(_.address) must be(expectedAddresses) + clusterView.members.map(_.status) must be(Set(Joining)) + } + } + enterBarrier("second-joined") + + runOn(third) { + cluster.join(first) + } + awaitClusterUp(first, second, third) + + onUpLatch.await + + enterBarrier("after-1") + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 9df1268b93..4aa2a963c4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -230,7 +230,7 @@ abstract class TransitionSpec runOn(third) { markNodeAsUnavailable(second) reapUnreachable() - awaitCond(clusterView.unreachableMembers.contains(Member(second, Up))) + awaitCond(clusterView.unreachableMembers.contains(Member(second, Up, Set.empty))) awaitCond(seenLatestGossip == Set(third)) } @@ -239,7 +239,7 @@ abstract class TransitionSpec third gossipTo first runOn(first, third) { - awaitCond(clusterView.unreachableMembers.contains(Member(second, Up))) + awaitCond(clusterView.unreachableMembers.contains(Member(second, Up, Set.empty))) } runOn(first) { @@ -251,7 +251,7 @@ abstract class TransitionSpec first gossipTo third runOn(first, third) { - awaitCond(clusterView.unreachableMembers.contains(Member(second, Down))) + awaitCond(clusterView.unreachableMembers.contains(Member(second, Down, Set.empty))) awaitMemberStatus(second, Down) awaitCond(seenLatestGossip == Set(first, third)) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/AdaptiveLoadBalancingRouterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/AdaptiveLoadBalancingRouterSpec.scala index c98413c21a..70477d03b7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/AdaptiveLoadBalancingRouterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/AdaptiveLoadBalancingRouterSpec.scala @@ -115,7 +115,7 @@ abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoa def startRouter(name: String): ActorRef = { val router = system.actorOf(Props[Routee].withRouter(ClusterRouterConfig( local = AdaptiveLoadBalancingRouter(HeapMetricsSelector), - settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1))), name) + settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1, useRole = None))), name) awaitCond { // it may take some time until router receives cluster member events currentRoutees(router).size == roles.size diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala index 7f6cdba39a..b72f793228 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala @@ -124,7 +124,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC "deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in { runOn(first) { val router2 = system.actorOf(Props[Echo].withRouter(ClusterRouterConfig(local = ConsistentHashingRouter(), - settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 2))), "router2") + settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 2, useRole = None))), "router2") awaitCond { // it may take some time until router receives cluster member events currentRoutees(router2).size == 6 @@ -157,7 +157,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC val router4 = system.actorOf(Props[Echo].withRouter(ClusterRouterConfig( local = ConsistentHashingRouter(hashMapping = hashMapping), - settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1))), "router4") + settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1, useRole = None))), "router4") assertHashMapping(router4) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala index 0caf029de2..430df7ab50 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala @@ -71,10 +71,21 @@ object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig { routees-path = "/user/myservice" } } + /router5 { + router = round-robin + nr-of-instances = 10 + cluster { + enabled = on + use-role = a + } + } } """)). withFallback(MultiNodeClusterSpec.clusterConfig)) + nodeConfig(first, second)(ConfigFactory.parseString("""akka.cluster.roles =["a", "c"]""")) + nodeConfig(third)(ConfigFactory.parseString("""akka.cluster.roles =["b", "c"]""")) + } class ClusterRoundRobinRoutedActorMultiJvmNode1 extends ClusterRoundRobinRoutedActorSpec @@ -89,9 +100,10 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou lazy val router1 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router1") lazy val router2 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(RoundRobinRouter(), - ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router2") + ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1, useRole = None))), "router2") lazy val router3 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router3") lazy val router4 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router4") + lazy val router5 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router5") def receiveReplies(routeeType: RouteeType, expectedReplies: Int): Map[Address, Int] = { val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0) @@ -240,6 +252,28 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou enterBarrier("after-6") } + "deploy routees to specified node role" taggedAs LongRunningTest in { + + runOn(first) { + awaitCond(currentRoutees(router5).size == 2) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router5 ! "hit" + } + + val replies = receiveReplies(DeployRoutee, iterationCount) + + replies(first) must be > (0) + replies(second) must be > (0) + replies(third) must be(0) + replies(fourth) must be(0) + replies.values.sum must be(iterationCount) + } + + enterBarrier("after-7") + } + "deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in { runOn(first) { @@ -263,7 +297,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou replies.values.sum must be(iterationCount) } - enterBarrier("after-7") + enterBarrier("after-8") } "deploy programatically defined routees to other node when a node becomes down" taggedAs LongRunningTest in { @@ -292,7 +326,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou replies.values.sum must be(iterationCount) } - enterBarrier("after-8") + enterBarrier("after-9") } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 9cdb47aeb2..7e268660a3 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -39,6 +39,8 @@ class ClusterConfigSpec extends AkkaSpec { AutoJoin must be(true) AutoDown must be(false) MinNrOfMembers must be(1) + MinNrOfMembersOfRole must be === Map.empty + Roles must be === Set.empty JmxEnabled must be(true) UseDispatcher must be(Dispatchers.DefaultDispatcherId) GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala index 7aa7b81047..3c40f1df6e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -53,7 +53,7 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) { service, deployment.get.config, ClusterRouterConfig(RoundRobinRouter(20), ClusterRouterSettings( - totalInstances = 20, maxInstancesPerNode = 3, allowLocalRoutees = false)), + totalInstances = 20, maxInstancesPerNode = 3, allowLocalRoutees = false, useRole = None)), ClusterScope))) } @@ -67,7 +67,7 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) { service, deployment.get.config, ClusterRouterConfig(RoundRobinRouter(20), ClusterRouterSettings( - totalInstances = 20, routeesPath = "/user/myservice", allowLocalRoutees = false)), + totalInstances = 20, routeesPath = "/user/myservice", allowLocalRoutees = false, useRole = None)), ClusterScope))) } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index 3d3ce0ac3e..f7f6c7789b 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -24,23 +24,24 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender { var publisher: ActorRef = _ - val aUp = Member(Address("akka.tcp", "sys", "a", 2552), Up) + val aUp = Member(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty) val aLeaving = aUp.copy(status = Leaving) val aExiting = aUp.copy(status = Exiting) val aRemoved = aUp.copy(status = Removed) - val bUp = Member(Address("akka.tcp", "sys", "b", 2552), Up) + val bUp = Member(Address("akka.tcp", "sys", "b", 2552), Up, Set.empty) val bRemoved = bUp.copy(status = Removed) - val cJoining = Member(Address("akka.tcp", "sys", "c", 2552), Joining) + val cJoining = Member(Address("akka.tcp", "sys", "c", 2552), Joining, Set("GRP")) val cUp = cJoining.copy(status = Up) val cRemoved = cUp.copy(status = Removed) - val dUp = Member(Address("akka.tcp", "sys", "a", 2551), Up) + val a51Up = Member(Address("akka.tcp", "sys", "a", 2551), Up, Set.empty) + val dUp = Member(Address("akka.tcp", "sys", "d", 2552), Up, Set("GRP")) val g0 = Gossip(members = SortedSet(aUp)).seen(aUp.address) val g1 = Gossip(members = SortedSet(aUp, bUp, cJoining)).seen(aUp.address).seen(bUp.address).seen(cJoining.address) val g2 = Gossip(members = SortedSet(aUp, bUp, cUp)).seen(aUp.address) val g3 = g2.seen(bUp.address).seen(cUp.address) - val g4 = Gossip(members = SortedSet(dUp, aUp, bUp, cUp)).seen(aUp.address) - val g5 = Gossip(members = SortedSet(dUp, aUp, bUp, cUp)).seen(aUp.address).seen(bUp.address).seen(cUp.address).seen(dUp.address) + val g4 = Gossip(members = SortedSet(a51Up, aUp, bUp, cUp)).seen(aUp.address) + val g5 = Gossip(members = SortedSet(a51Up, aUp, bUp, cUp)).seen(aUp.address).seen(bUp.address).seen(cUp.address).seen(a51Up.address) val g6 = Gossip(members = SortedSet(aLeaving, bUp, cUp)).seen(aUp.address) val g7 = Gossip(members = SortedSet(aExiting, bUp, cUp)).seen(aUp.address) @@ -69,10 +70,10 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec "publish leader changed" in { publisher ! PublishChanges(g4) - memberSubscriber.expectMsg(MemberUp(dUp)) + memberSubscriber.expectMsg(MemberUp(a51Up)) memberSubscriber.expectMsg(MemberUp(bUp)) memberSubscriber.expectMsg(MemberUp(cUp)) - memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address))) + memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address))) memberSubscriber.expectNoMsg(1 second) } @@ -96,15 +97,25 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec "not publish leader changed when same leader" in { publisher ! PublishChanges(g4) - memberSubscriber.expectMsg(MemberUp(dUp)) + memberSubscriber.expectMsg(MemberUp(a51Up)) memberSubscriber.expectMsg(MemberUp(bUp)) memberSubscriber.expectMsg(MemberUp(cUp)) - memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address))) + memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address))) publisher ! PublishChanges(g5) memberSubscriber.expectNoMsg(1 second) } + "publish role leader changed" in { + val subscriber = TestProbe() + publisher ! Subscribe(subscriber.ref, classOf[RoleLeaderChanged]) + subscriber.expectMsgType[CurrentClusterState] + publisher ! PublishChanges(Gossip(members = SortedSet(cJoining, dUp))) + subscriber.expectMsg(RoleLeaderChanged("GRP", Some(dUp.address))) + publisher ! PublishChanges(Gossip(members = SortedSet(cUp, dUp))) + subscriber.expectMsg(RoleLeaderChanged("GRP", Some(cUp.address))) + } + "send CurrentClusterState when subscribe" in { val subscriber = TestProbe() publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent]) @@ -132,6 +143,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec publisher ! PublishChanges(g3) subscriber.expectMsg(MemberUp(bUp)) subscriber.expectMsg(MemberUp(cUp)) + subscriber.expectMsg(RoleLeaderChanged("GRP", Some(cUp.address))) subscriber.expectMsgType[SeenChanged] publisher ! PublishStart diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index 21e20d2b6f..f624012307 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -15,19 +15,25 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { import MemberStatus._ import ClusterEvent._ - val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Up) - val a2 = Member(Address("akka.tcp", "sys", "a", 2552), Joining) - val a3 = Member(Address("akka.tcp", "sys", "a", 2552), Removed) - val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up) - val b2 = Member(Address("akka.tcp", "sys", "b", 2552), Removed) - val b3 = Member(Address("akka.tcp", "sys", "b", 2552), Down) - val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Leaving) - val c2 = Member(Address("akka.tcp", "sys", "c", 2552), Up) - val d1 = Member(Address("akka.tcp", "sys", "d", 2552), Leaving) - val d2 = Member(Address("akka.tcp", "sys", "d", 2552), Removed) - val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Joining) - val e2 = Member(Address("akka.tcp", "sys", "e", 2552), Up) - val e3 = Member(Address("akka.tcp", "sys", "e", 2552), Down) + val aRoles = Set("AA", "AB") + val aJoining = Member(Address("akka.tcp", "sys", "a", 2552), Joining, aRoles) + val aUp = Member(Address("akka.tcp", "sys", "a", 2552), Up, aRoles) + val aRemoved = Member(Address("akka.tcp", "sys", "a", 2552), Removed, aRoles) + val bRoles = Set("AB", "BB") + val bUp = Member(Address("akka.tcp", "sys", "b", 2552), Up, bRoles) + val bDown = Member(Address("akka.tcp", "sys", "b", 2552), Down, bRoles) + val bRemoved = Member(Address("akka.tcp", "sys", "b", 2552), Removed, bRoles) + val cRoles = Set.empty[String] + val cUp = Member(Address("akka.tcp", "sys", "c", 2552), Up, cRoles) + val cLeaving = Member(Address("akka.tcp", "sys", "c", 2552), Leaving, cRoles) + val dRoles = Set("DD", "DE") + val dLeaving = Member(Address("akka.tcp", "sys", "d", 2552), Leaving, dRoles) + val dExiting = Member(Address("akka.tcp", "sys", "d", 2552), Exiting, dRoles) + val dRemoved = Member(Address("akka.tcp", "sys", "d", 2552), Removed, dRoles) + val eRoles = Set("EE", "DE") + val eJoining = Member(Address("akka.tcp", "sys", "e", 2552), Joining, eRoles) + val eUp = Member(Address("akka.tcp", "sys", "e", 2552), Up, eRoles) + val eDown = Member(Address("akka.tcp", "sys", "e", 2552), Down, eRoles) def converge(gossip: Gossip): (Gossip, Set[Address]) = ((gossip, Set.empty[Address]) /: gossip.members) { (gs, m) ⇒ (gs._1.seen(m.address), gs._2 + m.address) } @@ -35,66 +41,83 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { "Domain events" must { "be empty for the same gossip" in { - val g1 = Gossip(members = SortedSet(a1)) + val g1 = Gossip(members = SortedSet(aUp)) diffUnreachable(g1, g1) must be(Seq.empty) } "be produced for new members" in { - val (g1, _) = converge(Gossip(members = SortedSet(a1))) - val (g2, s2) = converge(Gossip(members = SortedSet(a1, b1, e1))) + val (g1, _) = converge(Gossip(members = SortedSet(aUp))) + val (g2, s2) = converge(Gossip(members = SortedSet(aUp, bUp, eJoining))) - diffMemberEvents(g1, g2) must be(Seq(MemberUp(b1))) + diffMemberEvents(g1, g2) must be(Seq(MemberUp(bUp))) diffUnreachable(g1, g2) must be(Seq.empty) diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2))) } "be produced for changed status of members" in { - val (g1, _) = converge(Gossip(members = SortedSet(a2, b1, c2))) - val (g2, s2) = converge(Gossip(members = SortedSet(a1, b1, c1, e1))) + val (g1, _) = converge(Gossip(members = SortedSet(aJoining, bUp, cUp))) + val (g2, s2) = converge(Gossip(members = SortedSet(aUp, bUp, cLeaving, eJoining))) - diffMemberEvents(g1, g2) must be(Seq(MemberUp(a1))) + diffMemberEvents(g1, g2) must be(Seq(MemberUp(aUp))) diffUnreachable(g1, g2) must be(Seq.empty) diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2))) } "be produced for members in unreachable" in { - val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(c2, e2))) - val g2 = Gossip(members = SortedSet(a1), overview = GossipOverview(unreachable = Set(c2, b3, e3))) + val g1 = Gossip(members = SortedSet(aUp, bUp), overview = GossipOverview(unreachable = Set(cUp, eUp))) + val g2 = Gossip(members = SortedSet(aUp), overview = GossipOverview(unreachable = Set(cUp, bDown, eDown))) - diffUnreachable(g1, g2) must be(Seq(UnreachableMember(b3))) + diffUnreachable(g1, g2) must be(Seq(UnreachableMember(bDown))) diffSeen(g1, g2) must be(Seq.empty) } "be produced for removed members" in { - val (g1, _) = converge(Gossip(members = SortedSet(a1, d1))) - val (g2, s2) = converge(Gossip(members = SortedSet(a1))) + val (g1, _) = converge(Gossip(members = SortedSet(aUp, dLeaving))) + val (g2, s2) = converge(Gossip(members = SortedSet(aUp))) - diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(d2))) + diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(dRemoved))) diffUnreachable(g1, g2) must be(Seq.empty) diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2))) } "be produced for convergence changes" in { - val g1 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address).seen(e1.address) - val g2 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address) + val g1 = Gossip(members = SortedSet(aUp, bUp, eJoining)).seen(aUp.address).seen(bUp.address).seen(eJoining.address) + val g2 = Gossip(members = SortedSet(aUp, bUp, eJoining)).seen(aUp.address).seen(bUp.address) diffMemberEvents(g1, g2) must be(Seq.empty) diffUnreachable(g1, g2) must be(Seq.empty) - diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = false, seenBy = Set(a1.address, b1.address)))) + diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = false, seenBy = Set(aUp.address, bUp.address)))) diffMemberEvents(g2, g1) must be(Seq.empty) diffUnreachable(g2, g1) must be(Seq.empty) - diffSeen(g2, g1) must be(Seq(SeenChanged(convergence = true, seenBy = Set(a1.address, b1.address, e1.address)))) + diffSeen(g2, g1) must be(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address, eJoining.address)))) } "be produced for leader changes" in { - val (g1, _) = converge(Gossip(members = SortedSet(a1, b1, e1))) - val (g2, s2) = converge(Gossip(members = SortedSet(b1, e1))) + val (g1, _) = converge(Gossip(members = SortedSet(aUp, bUp, eJoining))) + val (g2, s2) = converge(Gossip(members = SortedSet(bUp, eJoining))) - diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(a3))) + diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(aRemoved))) diffUnreachable(g1, g2) must be(Seq.empty) diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2))) - diffLeader(g1, g2) must be(Seq(LeaderChanged(Some(b1.address)))) + diffLeader(g1, g2) must be(Seq(LeaderChanged(Some(bUp.address)))) + } + + "be produced for role leader changes" in { + val g0 = Gossip.empty + val g1 = Gossip(members = SortedSet(aUp, bUp, cUp, dLeaving, eJoining)) + val g2 = Gossip(members = SortedSet(bUp, cUp, dExiting, eJoining)) + diffRolesLeader(g0, g1) must be( + Set(RoleLeaderChanged("AA", Some(aUp.address)), + RoleLeaderChanged("AB", Some(aUp.address)), + RoleLeaderChanged("BB", Some(bUp.address)), + RoleLeaderChanged("DD", Some(dLeaving.address)), + RoleLeaderChanged("DE", Some(dLeaving.address)), + RoleLeaderChanged("EE", Some(eUp.address)))) + diffRolesLeader(g1, g2) must be( + Set(RoleLeaderChanged("AA", None), + RoleLeaderChanged("AB", Some(bUp.address)), + RoleLeaderChanged("DE", Some(eJoining.address)))) } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 9b677141d3..1e646c7bed 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -14,18 +14,18 @@ class GossipSpec extends WordSpec with MustMatchers { import MemberStatus._ - val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Up) - val a2 = Member(Address("akka.tcp", "sys", "a", 2552), Joining) - val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up) - val b2 = Member(Address("akka.tcp", "sys", "b", 2552), Removed) - val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Leaving) - val c2 = Member(Address("akka.tcp", "sys", "c", 2552), Up) - val c3 = Member(Address("akka.tcp", "sys", "c", 2552), Exiting) - val d1 = Member(Address("akka.tcp", "sys", "d", 2552), Leaving) - val d2 = Member(Address("akka.tcp", "sys", "d", 2552), Removed) - val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Joining) - val e2 = Member(Address("akka.tcp", "sys", "e", 2552), Up) - val e3 = Member(Address("akka.tcp", "sys", "e", 2552), Down) + val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty) + val a2 = a1.copy(status = Joining) + val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up, Set.empty) + val b2 = b1.copy(status = Removed) + val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Leaving, Set.empty) + val c2 = c1.copy(status = Up) + val c3 = c1.copy(status = Exiting) + val d1 = Member(Address("akka.tcp", "sys", "d", 2552), Leaving, Set.empty) + val d2 = d1.copy(status = Removed) + val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Joining, Set.empty) + val e2 = e1.copy(status = Up) + val e3 = e1.copy(status = Down) "A Gossip" must { diff --git a/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala index 3e19ab1e0c..c8537b2bb0 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala @@ -17,29 +17,31 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { import Member.addressOrdering import MemberStatus._ + def m(address: Address, status: MemberStatus): Member = Member(address, status, Set.empty) + "An Ordering[Member]" must { "order members by host:port" in { val members = SortedSet.empty[Member] + - Member(AddressFromURIString("akka://sys@darkstar:1112"), Up) + - Member(AddressFromURIString("akka://sys@darkstar:1113"), Joining) + - Member(AddressFromURIString("akka://sys@darkstar:1111"), Up) + m(AddressFromURIString("akka://sys@darkstar:1112"), Up) + + m(AddressFromURIString("akka://sys@darkstar:1113"), Joining) + + m(AddressFromURIString("akka://sys@darkstar:1111"), Up) val seq = members.toSeq seq.size must equal(3) - seq(0) must equal(Member(AddressFromURIString("akka://sys@darkstar:1111"), Up)) - seq(1) must equal(Member(AddressFromURIString("akka://sys@darkstar:1112"), Up)) - seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Joining)) + seq(0) must equal(m(AddressFromURIString("akka://sys@darkstar:1111"), Up)) + seq(1) must equal(m(AddressFromURIString("akka://sys@darkstar:1112"), Up)) + seq(2) must equal(m(AddressFromURIString("akka://sys@darkstar:1113"), Joining)) } "be sorted by address correctly" in { import Member.ordering // sorting should be done on host and port, only - val m1 = Member(Address("akka.tcp", "sys1", "host1", 9000), MemberStatus.Up) - val m2 = Member(Address("akka.tcp", "sys1", "host1", 10000), MemberStatus.Up) - val m3 = Member(Address("cluster", "sys2", "host2", 8000), MemberStatus.Up) - val m4 = Member(Address("cluster", "sys2", "host2", 9000), MemberStatus.Up) - val m5 = Member(Address("cluster", "sys1", "host2", 10000), MemberStatus.Up) + val m1 = m(Address("akka.tcp", "sys1", "host1", 9000), Up) + val m2 = m(Address("akka.tcp", "sys1", "host1", 10000), Up) + val m3 = m(Address("cluster", "sys2", "host2", 8000), Up) + val m4 = m(Address("cluster", "sys2", "host2", 9000), Up) + val m5 = m(Address("cluster", "sys1", "host2", 10000), Up) val expected = IndexedSeq(m1, m2, m3, m4, m5) val shuffled = Random.shuffle(expected) @@ -49,9 +51,9 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { "have stable equals and hashCode" in { val address = Address("akka.tcp", "sys1", "host1", 9000) - val m1 = Member(address, MemberStatus.Joining) - val m2 = Member(address, MemberStatus.Up) - val m3 = Member(address.copy(port = Some(10000)), MemberStatus.Up) + val m1 = m(address, Joining) + val m2 = m(address, Up) + val m3 = m(address.copy(port = Some(10000)), Up) m1 must be(m2) m1.hashCode must be(m2.hashCode) @@ -64,9 +66,9 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { val address1 = Address("akka.tcp", "sys1", "host1", 9001) val address2 = address1.copy(port = Some(9002)) - val x = Member(address1, Exiting) - val y = Member(address1, Removed) - val z = Member(address2, Up) + val x = m(address1, Exiting) + val y = m(address1, Removed) + val z = m(address2, Up) Member.ordering.compare(x, y) must be(0) Member.ordering.compare(x, z) must be(Member.ordering.compare(y, z)) } @@ -76,11 +78,11 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { val address2 = address1.copy(port = Some(9002)) val address3 = address1.copy(port = Some(9003)) - (SortedSet(Member(address1, MemberStatus.Joining)) - Member(address1, MemberStatus.Up)) must be(SortedSet.empty[Member]) - (SortedSet(Member(address1, MemberStatus.Exiting)) - Member(address1, MemberStatus.Removed)) must be(SortedSet.empty[Member]) - (SortedSet(Member(address1, MemberStatus.Up)) - Member(address1, MemberStatus.Exiting)) must be(SortedSet.empty[Member]) - (SortedSet(Member(address2, Up), Member(address3, Joining), Member(address1, MemberStatus.Exiting)) - Member(address1, MemberStatus.Removed)) must be( - SortedSet(Member(address2, Up), Member(address3, Joining))) + (SortedSet(m(address1, Joining)) - m(address1, Up)) must be(SortedSet.empty[Member]) + (SortedSet(m(address1, Exiting)) - m(address1, Removed)) must be(SortedSet.empty[Member]) + (SortedSet(m(address1, Up)) - m(address1, Exiting)) must be(SortedSet.empty[Member]) + (SortedSet(m(address2, Up), m(address3, Joining), m(address1, Exiting)) - m(address1, Removed)) must be( + SortedSet(m(address2, Up), m(address3, Joining))) } } @@ -136,14 +138,14 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { "order members with status Joining, Exiting and Down last" in { val address = Address("akka.tcp", "sys1", "host1", 5000) - val m1 = Member(address, MemberStatus.Joining) - val m2 = Member(address.copy(port = Some(7000)), MemberStatus.Joining) - val m3 = Member(address.copy(port = Some(3000)), MemberStatus.Exiting) - val m4 = Member(address.copy(port = Some(6000)), MemberStatus.Exiting) - val m5 = Member(address.copy(port = Some(2000)), MemberStatus.Down) - val m6 = Member(address.copy(port = Some(4000)), MemberStatus.Down) - val m7 = Member(address.copy(port = Some(8000)), MemberStatus.Up) - val m8 = Member(address.copy(port = Some(9000)), MemberStatus.Up) + val m1 = m(address, Joining) + val m2 = m(address.copy(port = Some(7000)), Joining) + val m3 = m(address.copy(port = Some(3000)), Exiting) + val m4 = m(address.copy(port = Some(6000)), Exiting) + val m5 = m(address.copy(port = Some(2000)), Down) + val m6 = m(address.copy(port = Some(4000)), Down) + val m7 = m(address.copy(port = Some(8000)), Up) + val m8 = m(address.copy(port = Some(9000)), Up) val expected = IndexedSeq(m7, m8, m1, m2, m3, m4, m5, m6) val shuffled = Random.shuffle(expected) shuffled.sorted(Member.leaderStatusOrdering) must be(expected) diff --git a/akka-contrib/docs/cluster-singleton.rst b/akka-contrib/docs/cluster-singleton.rst index 47a372115b..5205a5546a 100644 --- a/akka-contrib/docs/cluster-singleton.rst +++ b/akka-contrib/docs/cluster-singleton.rst @@ -19,12 +19,13 @@ such as single-point of bottleneck. Single-point of failure is also a relevant c but for some cases this feature takes care of that by making sure that another singleton instance will eventually be started. -The cluster singleton pattern is implemented by ``akka.contrib.pattern.ClusterSingletonManager``, -which is an actor that is supposed to be started on all nodes in the cluster. -The actual singleton actor is started by the ``ClusterSingletonManager`` on the -leader node of the cluster by creating a child actor from supplied ``Props``. -``ClusterSingletonManager`` makes sure that at most one singleton instance is -running at any point in time. +The cluster singleton pattern is implemented by ``akka.contrib.pattern.ClusterSingletonManager``. +It manages singleton actor instance among all cluster nodes or a group of nodes tagged with +a specific role. ``ClusterSingletonManager`` is an actor that is supposed to be started on +all nodes, or all nodes with specified role, in the cluster. The actual singleton actor is +started by the ``ClusterSingletonManager`` on the leader node by creating a child actor from +supplied ``Props``. ``ClusterSingletonManager`` makes sure that at most one singleton instance +is running at any point in time. The singleton actor is always running on the leader member, which is nothing more than the address currently sorted first in the member ring. This can change when adding @@ -39,9 +40,9 @@ not be a graceful hand-over, but more than one active singletons is prevented by reasonable means. Some corner cases are eventually resolved by configurable timeouts. You access the singleton actor with ``actorFor`` using the names you have specified when -creating the ClusterSingletonManager. You can subscribe to cluster ``LeaderChanged`` events -to keep track of which node it is supposed to be running on. Alternatively the singleton -actor may broadcast its existence when it is started. +creating the ClusterSingletonManager. You can subscribe to cluster ``LeaderChanged`` or +``RoleLeaderChanged`` events to keep track of which node it is supposed to be running on. +Alternatively the singleton actor may broadcast its existence when it is started. An Example ---------- @@ -57,7 +58,12 @@ supply the ``Props`` of the singleton actor, in this case the JMS queue consumer .. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#create-singleton-manager +Here we limit the singleton to nodes tagged with the ``"worker"`` role, but all nodes, independent of +role, can be used by specifying ``None`` as ``role`` parameter. + The corresponding Java API for the ``singeltonProps`` function is ``akka.contrib.pattern.ClusterSingletonPropsFactory``. +The Java API constructor takes a plain String for the role parameter and ``null`` means that all nodes, independent of +role, are used. Here we use an application specific ``terminationMessage`` to be able to close the resources before actually stopping the singleton actor. Note that ``PoisonPill`` is a @@ -72,12 +78,15 @@ This message will be sent over to the ``ClusterSingletonManager`` at the new lea will be passed to the ``singletonProps`` factory when creating the new singleton instance. With the names given above the path of singleton actor can be constructed by subscribing to -``LeaderChanged`` cluster event and the actor reference is then looked up using ``actorFor``: +``RoleLeaderChanged`` cluster event and the actor reference is then looked up using ``actorFor``: -.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#singleton-proxy +.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#singleton-proxy2 + +Subscribe to ``LeaderChanged`` instead of ``RoleLeaderChanged`` if you don't limit the singleton to +the group of members tagged with a specific role. Note that the hand-over might still be in progress and the singleton actor might not be started yet -when you receive the ``LeaderChanged`` event. +when you receive the ``LeaderChanged`` / ``RoleLeaderChanged`` event. To test scenarios where the cluster leader node is removed or shut down you can use :ref:`multi-node-testing` and utilize the fact that the leader is supposed to be the first member when sorted by member address. diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala index be5dfa4717..e31e84db8c 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala @@ -90,6 +90,11 @@ object ClusterSingletonManager { val TakeOverRetryTimer = "take-over-retry" val CleanupTimer = "cleanup" + def roleOption(role: String): Option[String] = role match { + case null | "" ⇒ None + case _ ⇒ Some(role) + } + object LeaderChangedBuffer { /** * Request to deliver one more event. @@ -110,7 +115,7 @@ object ClusterSingletonManager { * `GetNext` request is allowed. Incoming events are buffered and delivered * upon `GetNext` request. */ - class LeaderChangedBuffer extends Actor { + class LeaderChangedBuffer(role: Option[String]) extends Actor { import LeaderChangedBuffer._ import context.dispatcher @@ -119,14 +124,23 @@ object ClusterSingletonManager { var memberCount = 0 // subscribe to LeaderChanged, re-subscribe when restart - override def preStart(): Unit = cluster.subscribe(self, classOf[LeaderChanged]) + override def preStart(): Unit = role match { + case None ⇒ cluster.subscribe(self, classOf[LeaderChanged]) + case Some(_) ⇒ cluster.subscribe(self, classOf[RoleLeaderChanged]) + } override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case state: CurrentClusterState ⇒ - changes :+= InitialLeaderState(state.leader, state.members.size) + val initial = role match { + case None ⇒ InitialLeaderState(state.leader, state.members.size) + case Some(r) ⇒ InitialLeaderState(state.roleLeader(r), state.members.count(_.hasRole(r))) + } + changes :+= initial case event: LeaderChanged ⇒ changes :+= event + case RoleLeaderChanged(r, leader) ⇒ + if (role.orNull == r) changes :+= LeaderChanged(leader) case GetNext if changes.isEmpty ⇒ context.become(deliverNext, discardOld = false) case GetNext ⇒ @@ -138,11 +152,20 @@ object ClusterSingletonManager { // the buffer was empty when GetNext was received, deliver next event immediately def deliverNext: Actor.Receive = { case state: CurrentClusterState ⇒ - context.parent ! InitialLeaderState(state.leader, state.members.size) + val initial = role match { + case None ⇒ InitialLeaderState(state.leader, state.members.size) + case Some(r) ⇒ InitialLeaderState(state.roleLeader(r), state.members.count(_.hasRole(r))) + } + context.parent ! initial context.unbecome() case event: LeaderChanged ⇒ context.parent ! event context.unbecome() + case RoleLeaderChanged(r, leader) ⇒ + if (role.orNull == r) { + context.parent ! LeaderChanged(leader) + context.unbecome() + } } } @@ -176,11 +199,13 @@ trait ClusterSingletonPropsFactory extends Serializable { class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(message, null) /** - * Manages a cluster wide singleton actor instance, i.e. - * at most one singleton instance is running at any point in time. - * The ClusterSingletonManager is supposed to be started on all - * nodes in the cluster with `actorOf`. The actual singleton is - * started on the leader node of the cluster by creating a child + * Manages singleton actor instance among all cluster nodes or a group + * of nodes tagged with a specific role. At most one singleton instance + * is running at any point in time. + * + * The ClusterSingletonManager is supposed to be started on all nodes, + * or all nodes with specified role, in the cluster with `actorOf`. + * The actual singleton is started on the leader node by creating a child * actor from the supplied `singletonProps`. * * The singleton actor is always running on the leader member, which is @@ -206,7 +231,8 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess * * You access the singleton actor with `actorFor` using the names you have * specified when creating the ClusterSingletonManager. You can subscribe to - * [[akka.cluster.ClusterEvent.LeaderChanged]] to keep track of which node + * [[akka.cluster.ClusterEvent.LeaderChanged]] or + * [[akka.cluster.ClusterEvent.RoleLeaderChanged]] to keep track of which node * it is supposed to be running on. Alternatively the singleton actor may * broadcast its existence when it is started. * @@ -232,6 +258,10 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess * Note that [[akka.actor.PoisonPill]] is a perfectly fine * `terminationMessage` if you only need to stop the actor. * + * '''''role''''' Singleton among the nodes tagged with specified role. + * If the role is not specified it's a singleton among all nodes in + * the cluster. + * * '''''maxHandOverRetries''''' When a node is becoming leader it sends * hand-over request to previous leader. This is retried with the * `retryInterval` until the previous leader confirms that the hand @@ -262,6 +292,7 @@ class ClusterSingletonManager( singletonProps: Option[Any] ⇒ Props, singletonName: String, terminationMessage: Any, + role: Option[String], maxHandOverRetries: Int = 20, maxTakeOverRetries: Int = 15, retryInterval: FiniteDuration = 1.second, @@ -278,13 +309,14 @@ class ClusterSingletonManager( def this( singletonName: String, terminationMessage: Any, + role: String, maxHandOverRetries: Int, maxTakeOverRetries: Int, retryInterval: FiniteDuration, loggingEnabled: Boolean, singletonPropsFactory: ClusterSingletonPropsFactory) = this(handOverData ⇒ singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage, - maxHandOverRetries, maxTakeOverRetries, retryInterval) + ClusterSingletonManager.Internal.roleOption(role), maxHandOverRetries, maxTakeOverRetries, retryInterval) /** * Java API constructor with default values. @@ -292,8 +324,10 @@ class ClusterSingletonManager( def this( singletonName: String, terminationMessage: Any, + role: String, singletonPropsFactory: ClusterSingletonPropsFactory) = - this(handOverData ⇒ singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage) + this(handOverData ⇒ singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage, + ClusterSingletonManager.Internal.roleOption(role)) import ClusterSingletonManager._ import ClusterSingletonManager.Internal._ @@ -301,6 +335,12 @@ class ClusterSingletonManager( val cluster = Cluster(context.system) val selfAddressOption = Some(cluster.selfAddress) + + role match { + case None ⇒ + case Some(r) ⇒ require(cluster.selfRoles.contains(r), s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]") + } + // started when when self member is Up var leaderChangedBuffer: ActorRef = _ // Previous GetNext request delivered event and new GetNext is to be sent @@ -357,7 +397,7 @@ class ClusterSingletonManager( when(Start) { case Event(StartLeaderChangedBuffer, _) ⇒ - leaderChangedBuffer = context.actorOf(Props[LeaderChangedBuffer].withDispatcher(context.props.dispatcher)) + leaderChangedBuffer = context.actorOf(Props(new LeaderChangedBuffer(role)).withDispatcher(context.props.dispatcher)) getNextLeaderChanged() stay diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerChaosSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerChaosSpec.scala index 6255910ae4..3d0cd3c186 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerChaosSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerChaosSpec.scala @@ -90,7 +90,8 @@ class ClusterSingletonManagerChaosSpec extends MultiNodeSpec(ClusterSingletonMan system.actorOf(Props(new ClusterSingletonManager( singletonProps = handOverData ⇒ Props(new Echo(testActor)), singletonName = "echo", - terminationMessage = PoisonPill)), + terminationMessage = PoisonPill, + role = None)), name = "singleton") } diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala index 835a78290b..f1ae011c61 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala @@ -27,6 +27,7 @@ import akka.actor.Terminated object ClusterSingletonManagerSpec extends MultiNodeConfig { val controller = role("controller") + val observer = role("observer") val first = role("first") val second = role("second") val third = role("third") @@ -42,6 +43,9 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig { akka.cluster.auto-down = on """)) + nodeConfig(first, second, third, fourth, fifth, sixth)( + ConfigFactory.parseString("akka.cluster.roles =[worker]")) + object PointToPointChannel { case object RegisterConsumer case object UnregisterConsumer @@ -162,6 +166,30 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig { } //#singleton-proxy + // documentation of how to keep track of the role leader address in user land + //#singleton-proxy2 + class ConsumerProxy2 extends Actor { + // subscribe to RoleLeaderChanged, re-subscribe when restart + override def preStart(): Unit = + Cluster(context.system).subscribe(self, classOf[RoleLeaderChanged]) + override def postStop(): Unit = + Cluster(context.system).unsubscribe(self) + + val role = "worker" + var leaderAddress: Option[Address] = None + + def receive = { + case state: CurrentClusterState ⇒ leaderAddress = state.roleLeader(role) + case RoleLeaderChanged(r, leader) ⇒ if (r == role) leaderAddress = leader + case other ⇒ consumer foreach { _ forward other } + } + + def consumer: Option[ActorRef] = + leaderAddress map (a ⇒ context.actorFor(RootActorPath(a) / + "user" / "singleton" / "consumer")) + } + //#singleton-proxy2 + } class ClusterSingletonManagerMultiJvmNode1 extends ClusterSingletonManagerSpec @@ -171,6 +199,7 @@ class ClusterSingletonManagerMultiJvmNode4 extends ClusterSingletonManagerSpec class ClusterSingletonManagerMultiJvmNode5 extends ClusterSingletonManagerSpec class ClusterSingletonManagerMultiJvmNode6 extends ClusterSingletonManagerSpec class ClusterSingletonManagerMultiJvmNode7 extends ClusterSingletonManagerSpec +class ClusterSingletonManagerMultiJvmNode8 extends ClusterSingletonManagerSpec class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerSpec) with STMultiNodeSpec with ImplicitSender { import ClusterSingletonManagerSpec._ @@ -181,13 +210,13 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS //#sort-cluster-roles // Sort the roles in the order used by the cluster. - lazy val sortedClusterRoles: immutable.IndexedSeq[RoleName] = { + lazy val sortedWorkerNodes: immutable.IndexedSeq[RoleName] = { implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] { import Member.addressOrdering def compare(x: RoleName, y: RoleName) = addressOrdering.compare(node(x).address, node(y).address) } - roles.filterNot(_ == controller).toVector.sorted + roles.filterNot(r ⇒ r == controller || r == observer).toVector.sorted } //#sort-cluster-roles @@ -196,7 +225,7 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS def join(from: RoleName, to: RoleName): Unit = { runOn(from) { Cluster(system) join node(to).address - createSingleton() + if (Cluster(system).selfRoles.contains("worker")) createSingleton() } } @@ -206,7 +235,8 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS singletonProps = handOverData ⇒ Props(new Consumer(handOverData, queue, testActor)), singletonName = "consumer", - terminationMessage = End)), + terminationMessage = End, + role = Some("worker"))), name = "singleton") //#create-singleton-manager } @@ -231,7 +261,7 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS runOn(leader) { expectMsg(msg) } - runOn(sortedClusterRoles.filterNot(_ == leader): _*) { + runOn(sortedWorkerNodes.filterNot(_ == leader): _*) { expectNoMsg(1 second) } enterBarrier(leader.name + "-verified") @@ -251,7 +281,7 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS "A ClusterSingletonManager" must { "startup in single member cluster" in within(10 seconds) { - log.info("Sorted cluster nodes [{}]", sortedClusterRoles.map(node(_).address).mkString(", ")) + log.info("Sorted cluster nodes [{}]", sortedWorkerNodes.map(node(_).address).mkString(", ")) runOn(controller) { // watch that it is not terminated, which would indicate misbehaviour @@ -259,44 +289,48 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS } enterBarrier("queue-started") - join(sortedClusterRoles.last, sortedClusterRoles.last) - verify(sortedClusterRoles.last, msg = 1, expectedCurrent = 0) + join(sortedWorkerNodes.last, sortedWorkerNodes.last) + verify(sortedWorkerNodes.last, msg = 1, expectedCurrent = 0) + + // join the observer node as well, which should not influence since it doesn't have the "worker" role + join(observer, sortedWorkerNodes.last) + enterBarrier("after-1") } "hand over when new leader joins to 1 node cluster" in within(15 seconds) { - val newLeaderRole = sortedClusterRoles(4) - join(newLeaderRole, sortedClusterRoles.last) + val newLeaderRole = sortedWorkerNodes(4) + join(newLeaderRole, sortedWorkerNodes.last) verify(newLeaderRole, msg = 2, expectedCurrent = 1) } "hand over when new leader joins to 2 nodes cluster" in within(15 seconds) { - val newLeaderRole = sortedClusterRoles(3) - join(newLeaderRole, sortedClusterRoles.last) + val newLeaderRole = sortedWorkerNodes(3) + join(newLeaderRole, sortedWorkerNodes.last) verify(newLeaderRole, msg = 3, expectedCurrent = 2) } "hand over when new leader joins to 3 nodes cluster" in within(15 seconds) { - val newLeaderRole = sortedClusterRoles(2) - join(newLeaderRole, sortedClusterRoles.last) + val newLeaderRole = sortedWorkerNodes(2) + join(newLeaderRole, sortedWorkerNodes.last) verify(newLeaderRole, msg = 4, expectedCurrent = 3) } "hand over when new leader joins to 4 nodes cluster" in within(15 seconds) { - val newLeaderRole = sortedClusterRoles(1) - join(newLeaderRole, sortedClusterRoles.last) + val newLeaderRole = sortedWorkerNodes(1) + join(newLeaderRole, sortedWorkerNodes.last) verify(newLeaderRole, msg = 5, expectedCurrent = 4) } "hand over when new leader joins to 5 nodes cluster" in within(15 seconds) { - val newLeaderRole = sortedClusterRoles(0) - join(newLeaderRole, sortedClusterRoles.last) + val newLeaderRole = sortedWorkerNodes(0) + join(newLeaderRole, sortedWorkerNodes.last) verify(newLeaderRole, msg = 6, expectedCurrent = 5) } "hand over when leader leaves in 6 nodes cluster " in within(30 seconds) { //#test-leave - val leaveRole = sortedClusterRoles(0) - val newLeaderRole = sortedClusterRoles(1) + val leaveRole = sortedWorkerNodes(0) + val newLeaderRole = sortedWorkerNodes(1) runOn(leaveRole) { Cluster(system) leave node(leaveRole).address @@ -320,18 +354,18 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS system.eventStream.publish(Mute(EventFilter.error(pattern = ".*Association failed.*"))) enterBarrier("logs-muted") - crash(sortedClusterRoles(1)) - verify(sortedClusterRoles(2), msg = 8, expectedCurrent = 0) + crash(sortedWorkerNodes(1)) + verify(sortedWorkerNodes(2), msg = 8, expectedCurrent = 0) } "take over when two leaders crash in 3 nodes cluster" in within(60 seconds) { - crash(sortedClusterRoles(2), sortedClusterRoles(3)) - verify(sortedClusterRoles(4), msg = 9, expectedCurrent = 0) + crash(sortedWorkerNodes(2), sortedWorkerNodes(3)) + verify(sortedWorkerNodes(4), msg = 9, expectedCurrent = 0) } "take over when leader crashes in 2 nodes cluster" in within(60 seconds) { - crash(sortedClusterRoles(4)) - verify(sortedClusterRoles(5), msg = 10, expectedCurrent = 0) + crash(sortedWorkerNodes(4)) + verify(sortedWorkerNodes(5), msg = 10, expectedCurrent = 0) } } diff --git a/akka-docs/rst/cluster/cluster-usage-java.rst b/akka-docs/rst/cluster/cluster-usage-java.rst index 38220d1224..a6c1542529 100644 --- a/akka-docs/rst/cluster/cluster-usage-java.rst +++ b/akka-docs/rst/cluster/cluster-usage-java.rst @@ -85,7 +85,7 @@ In the log output you see that the cluster node has been started and changed sta 2552 corresponds to the port of the second seed-nodes element in the configuration. In the log output you see that the cluster node has been started and joins the other seed node -and becomes a member of the cluster. It's status changed to 'Up'. +and becomes a member of the cluster. Its status changed to 'Up'. Switch over to the first terminal window and see in the log output that the member joined. @@ -237,8 +237,17 @@ frontend nodes and 3 backend nodes:: mvn exec:java \ -Dexec.mainClass="sample.cluster.transformation.japi.TransformationFrontendMain" +Node Roles +^^^^^^^^^^ -.. note:: The above example should probably be designed as two separate, frontend/backend, clusters, when there is a `cluster client for decoupling clusters `_. +Not all nodes of a cluster need to perform the same function: there might be one sub-set which runs the web front-end, +one which runs the data access layer and one for the number-crunching. Deployment of actors—for example by cluster-aware +routers—can take node roles into account to achieve this distribution of responsibilities. + +The roles of a node is defined in the configuration property named ``akka.cluster.roles`` +and it is typically defined in the start script as a system property or environment variable. + +The roles of the nodes is part of the membership information in ``MemberEvent`` that you can subscribe to. How To Startup when Cluster Size Reached ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -251,6 +260,11 @@ before the leader changes member status of 'Joining' members to 'Up'. .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/factorial.conf#min-nr-of-members +In a similar way you can define required number of members of a certain role +before the leader changes member status of 'Joining' members to 'Up'. + +.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/factorial.conf#role-min-nr-of-members + You can start the actors in a ``registerOnMemberUp`` callback, which will be invoked when the current member status is changed tp 'Up', i.e. the cluster has at least the defined number of members. @@ -265,10 +279,10 @@ Cluster Singleton Pattern For some use cases it is convenient and sometimes also mandatory to ensure that you have exactly one actor of a certain type running somewhere in the cluster. -This can be implemented by subscribing to ``LeaderChanged`` events, but there are -several corner cases to consider. Therefore, this specific use case is made easily -accessible by the :ref:`cluster-singleton` in the contrib module. You can use it as is, -or adjust to fit your specific needs. +This can be implemented by subscribing to ``LeaderChanged`` or ``RoleLeaderChanged`` +events, but there are several corner cases to consider. Therefore, this specific use +case is made easily accessible by the :ref:`cluster-singleton` in the contrib module. +You can use it as is, or adjust to fit your specific needs. Failure Detector ^^^^^^^^^^^^^^^^ @@ -307,7 +321,7 @@ previous heartbeat. Phi is calculated from the mean and standard deviation of historical inter arrival times. The previous chart is an example for standard deviation of 200 ms. If the heartbeats arrive with less deviation the curve becomes steeper, -i.e. it's possible to determine failure more quickly. The curve looks like this for +i.e. it is possible to determine failure more quickly. The curve looks like this for a standard deviation of 100 ms. .. image:: images/phi2.png @@ -345,7 +359,9 @@ are already running, the configuration for a router looks like this: .. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala#router-lookup-config -It's the relative actor path defined in ``routees-path`` that identify what actor to lookup. +It is the relative actor path defined in ``routees-path`` that identify what actor to lookup. +It is possible to limit the lookup of routees to member nodes tagged with a certain role by +specifying ``use-role``. ``nr-of-instances`` defines total number of routees in the cluster, but there will not be more than one per node. Setting ``nr-of-instances`` to a high value will result in new routees @@ -361,6 +377,9 @@ the configuration for a router looks like this: .. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala#router-deploy-config +It is possible to limit the deployment of routees to member nodes tagged with a certain role by +specifying ``use-role``. + ``nr-of-instances`` defines total number of routees in the cluster, but the number of routees per node, ``max-nr-of-instances-per-node``, will not be exceeded. Setting ``nr-of-instances`` to a high value will result in creating and deploying additional routees when new nodes join @@ -373,8 +392,8 @@ The same type of router could also have been defined in code: See :ref:`cluster_configuration_java` section for further descriptions of the settings. -Router Example with Remote Deployed Routees -------------------------------------------- +Router Example with Lookup of Routees +------------------------------------- Let's take a look at how to use cluster aware routers. @@ -411,7 +430,7 @@ or with create and deploy of routees. Remember, routees are the workers in this We start with the router setup with lookup of routees. All nodes start ``StatsService`` and ``StatsWorker`` actors and the router is configured with ``routees-path``: -.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleMain.java#start-router-lookup +.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#config-router-lookup This means that user requests can be sent to ``StatsService`` on any node and it will use ``StatsWorker`` on all nodes. There can only be one worker per node, but that worker could easily @@ -424,22 +443,22 @@ Run it by starting nodes in different terminal windows. For example, starting 3 service nodes and 1 client:: mvn exec:java \ - -Dexec.mainClass="run-main sample.cluster.stats.japi.StatsSampleMain" \ + -Dexec.mainClass="sample.cluster.stats.japi.StatsSampleMain" \ -Dexec.args="2551" mvn exec:java \ - -Dexec.mainClass="run-main sample.cluster.stats.japi.StatsSampleMain" \ + -Dexec.mainClass="sample.cluster.stats.japi.StatsSampleMain" \ -Dexec.args="2552" mvn exec:java \ - -Dexec.mainClass="run-main sample.cluster.stats.japi.StatsSampleMain" + -Dexec.mainClass="sample.cluster.stats.japi.StatsSampleMain" mvn exec:java \ - -Dexec.mainClass="run-main sample.cluster.stats.japi.StatsSampleMain" + -Dexec.mainClass="sample.cluster.stats.japi.StatsSampleMain" -Router Example with Lookup of Routees -------------------------------------- +Router Example with Remote Deployed Routees +------------------------------------------- The above setup is nice for this example, but we will also take a look at how to use a single master node that creates and deploys workers. To keep track of a single @@ -460,7 +479,7 @@ sorted first in the member ring, i.e. it can change when new nodes join or when All nodes start ``StatsFacade`` and the ``ClusterSingletonManager``. The router is now configured like this: -.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterMain.java#start-router-deploy +.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#config-router-deploy This example is included in ``akka-samples/akka-sample-cluster`` and you can try it by copying the `source <@github@/akka-samples/akka-sample-cluster>`_ to your @@ -539,11 +558,11 @@ The frontend that receives user jobs and delegates to the backends via the route .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontend.java#frontend -As you can see, the router is defined in the same way as other routers, and in this case it's configured as follows: +As you can see, the router is defined in the same way as other routers, and in this case it is configured as follows: .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#adaptive-router -It's only router type ``adaptive`` and the ``metrics-selector`` that is specific to this router, other things work +It is only router type ``adaptive`` and the ``metrics-selector`` that is specific to this router, other things work in the same way as other routers. The same type of router could also have been defined in code: @@ -559,18 +578,18 @@ Run it by starting nodes in different terminal windows. For example, starting 3 one frontend:: mvn exec:java \ - -Dexec.mainClass="sample.cluster.factorial.FactorialBackendMain" \ + -Dexec.mainClass="sample.cluster.factorial.japi.FactorialBackendMain" \ -Dexec.args="2551" mvn exec:java \ - -Dexec.mainClass="sample.cluster.factorial.FactorialBackendMain" \ + -Dexec.mainClass="sample.cluster.factorial.japi.FactorialBackendMain" \ -Dexec.args="2552" mvn exec:java \ - -Dexec.mainClass="sample.cluster.factorial.FactorialBackendMain" + -Dexec.mainClass="sample.cluster.factorial.japi.FactorialBackendMain" mvn exec:java \ - -Dexec.mainClass="sample.cluster.factorial.FactorialFrontendMain" + -Dexec.mainClass="sample.cluster.factorial.japi.FactorialFrontendMain" Press ctrl-c in the terminal window of the frontend to stop the factorial calculations. @@ -578,7 +597,7 @@ Press ctrl-c in the terminal window of the frontend to stop the factorial calcul Subscribe to Metrics Events --------------------------- -It's possible to subscribe to the metrics events directly to implement other functionality. +It is possible to subscribe to the metrics events directly to implement other functionality. .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/MetricsListener.java#metrics-listener diff --git a/akka-docs/rst/cluster/cluster-usage-scala.rst b/akka-docs/rst/cluster/cluster-usage-scala.rst index e029e57f31..804234a892 100644 --- a/akka-docs/rst/cluster/cluster-usage-scala.rst +++ b/akka-docs/rst/cluster/cluster-usage-scala.rst @@ -63,7 +63,7 @@ In the log output you see that the cluster node has been started and changed sta 2552 corresponds to the port of the second seed-nodes element in the configuration. In the log output you see that the cluster node has been started and joins the other seed node -and becomes a member of the cluster. It's status changed to 'Up'. +and becomes a member of the cluster. Its status changed to 'Up'. Switch over to the first terminal window and see in the log output that the member joined. @@ -210,8 +210,17 @@ frontend nodes and 3 backend nodes:: run-main sample.cluster.transformation.TransformationFrontend +Node Roles +^^^^^^^^^^ -.. note:: The above example should probably be designed as two separate, frontend/backend, clusters, when there is a `cluster client for decoupling clusters `_. +Not all nodes of a cluster need to perform the same function: there might be one sub-set which runs the web front-end, +one which runs the data access layer and one for the number-crunching. Deployment of actors—for example by cluster-aware +routers—can take node roles into account to achieve this distribution of responsibilities. + +The roles of a node is defined in the configuration property named ``akka.cluster.roles`` +and it is typically defined in the start script as a system property or environment variable. + +The roles of the nodes is part of the membership information in ``MemberEvent`` that you can subscribe to. How To Startup when Cluster Size Reached ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -224,6 +233,11 @@ before the leader changes member status of 'Joining' members to 'Up'. .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/factorial.conf#min-nr-of-members +In a similar way you can define required number of members of a certain role +before the leader changes member status of 'Joining' members to 'Up'. + +.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/factorial.conf#role-min-nr-of-members + You can start the actors in a ``registerOnMemberUp`` callback, which will be invoked when the current member status is changed tp 'Up', i.e. the cluster has at least the defined number of members. @@ -238,10 +252,10 @@ Cluster Singleton Pattern For some use cases it is convenient and sometimes also mandatory to ensure that you have exactly one actor of a certain type running somewhere in the cluster. -This can be implemented by subscribing to ``LeaderChanged`` events, but there are -several corner cases to consider. Therefore, this specific use case is made easily -accessible by the :ref:`cluster-singleton` in the contrib module. You can use it as is, -or adjust to fit your specific needs. +This can be implemented by subscribing to ``LeaderChanged`` or ``RoleLeaderChanged`` +events, but there are several corner cases to consider. Therefore, this specific use +case is made easily accessible by the :ref:`cluster-singleton` in the contrib module. +You can use it as is, or adjust to fit your specific needs. Failure Detector ^^^^^^^^^^^^^^^^ @@ -280,7 +294,7 @@ previous heartbeat. Phi is calculated from the mean and standard deviation of historical inter arrival times. The previous chart is an example for standard deviation of 200 ms. If the heartbeats arrive with less deviation the curve becomes steeper, -i.e. it's possible to determine failure more quickly. The curve looks like this for +i.e. it is possible to determine failure more quickly. The curve looks like this for a standard deviation of 100 ms. .. image:: images/phi2.png @@ -321,7 +335,9 @@ are already running, the configuration for a router looks like this: .. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala#router-lookup-config -It's the relative actor path defined in ``routees-path`` that identify what actor to lookup. +It is the relative actor path defined in ``routees-path`` that identify what actor to lookup. +It is possible to limit the lookup of routees to member nodes tagged with a certain role by +specifying ``use-role``. ``nr-of-instances`` defines total number of routees in the cluster, but there will not be more than one per node. Setting ``nr-of-instances`` to a high value will result in new routees @@ -336,6 +352,8 @@ the configuration for a router looks like this: .. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala#router-deploy-config +It is possible to limit the deployment of routees to member nodes tagged with a certain role by +specifying ``use-role``. ``nr-of-instances`` defines total number of routees in the cluster, but the number of routees per node, ``max-nr-of-instances-per-node``, will not be exceeded. Setting ``nr-of-instances`` @@ -349,8 +367,8 @@ The same type of router could also have been defined in code: See :ref:`cluster_configuration_scala` section for further descriptions of the settings. -Router Example with Remote Deployed Routees -------------------------------------------- +Router Example with Lookup of Routees +------------------------------------- Let's take a look at how to use cluster aware routers. @@ -385,7 +403,7 @@ or with create and deploy of routees. Remember, routees are the workers in this We start with the router setup with lookup of routees. All nodes start ``StatsService`` and ``StatsWorker`` actors and the router is configured with ``routees-path``: -.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#start-router-lookup +.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#config-router-lookup This means that user requests can be sent to ``StatsService`` on any node and it will use ``StatsWorker`` on all nodes. There can only be one worker per node, but that worker could easily @@ -407,8 +425,8 @@ service nodes and 1 client:: run-main sample.cluster.stats.StatsSample -Router Example with Lookup of Routees -------------------------------------- +Router Example with Remote Deployed Routees +------------------------------------------- The above setup is nice for this example, but we will also take a look at how to use a single master node that creates and deploys workers. To keep track of a single @@ -429,7 +447,7 @@ sorted first in the member ring, i.e. it can change when new nodes join or when All nodes start ``StatsFacade`` and the ``ClusterSingletonManager``. The router is now configured like this: -.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#start-router-deploy +.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#config-router-deploy This example is included in ``akka-samples/akka-sample-cluster`` @@ -495,11 +513,11 @@ The frontend that receives user jobs and delegates to the backends via the route .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#frontend -As you can see, the router is defined in the same way as other routers, and in this case it's configured as follows: +As you can see, the router is defined in the same way as other routers, and in this case it is configured as follows: .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#adaptive-router -It's only router type ``adaptive`` and the ``metrics-selector`` that is specific to this router, other things work +It is only router type ``adaptive`` and the ``metrics-selector`` that is specific to this router, other things work in the same way as other routers. The same type of router could also have been defined in code: @@ -528,7 +546,7 @@ Press ctrl-c in the terminal window of the frontend to stop the factorial calcul Subscribe to Metrics Events --------------------------- -It's possible to subscribe to the metrics events directly to implement other functionality. +It is possible to subscribe to the metrics events directly to implement other functionality. .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#metrics-listener @@ -560,7 +578,7 @@ implemented differently, but often they are the same and extend an abstract test .. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala#concrete-tests Note the naming convention of these classes. The name of the classes must end with ``MultiJvmNode1``, ``MultiJvmNode2`` -and so on. It's possible to define another suffix to be used by the ``sbt-multi-jvm``, but the default should be +and so on. It is possible to define another suffix to be used by the ``sbt-multi-jvm``, but the default should be fine in most cases. Then the abstract ``MultiNodeSpec``, which takes the ``MultiNodeConfig`` as constructor parameter. diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialBackendMain.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialBackendMain.java index 14bf75091a..7041fd9428 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialBackendMain.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialBackendMain.java @@ -1,5 +1,6 @@ package sample.cluster.factorial.japi; +import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import akka.actor.ActorSystem; import akka.actor.Props; @@ -7,12 +8,15 @@ import akka.actor.Props; public class FactorialBackendMain { public static void main(String[] args) throws Exception { - // Override the configuration of the port - // when specified as program argument - if (args.length > 0) - System.setProperty("akka.remote.netty.tcp.port", args[0]); + // Override the configuration of the port when specified as program argument + final Config config = + (args.length > 0 ? + ConfigFactory.parseString(String.format("akka.remote.netty.tcp.port=%s", args[0])) : + ConfigFactory.empty()). + withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")). + withFallback(ConfigFactory.load("factorial")); - ActorSystem system = ActorSystem.create("ClusterSystem", ConfigFactory.load("factorial")); + ActorSystem system = ActorSystem.create("ClusterSystem", config); system.actorOf(new Props(FactorialBackend.class), "factorialBackend"); diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontend.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontend.java index 13af688739..0ee5fc0624 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontend.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontend.java @@ -62,29 +62,31 @@ public class FactorialFrontend extends UntypedActor { abstract class FactorialFrontend2 extends UntypedActor { //#router-lookup-in-code int totalInstances = 100; - String routeesPath = "/user/statsWorker"; + String routeesPath = "/user/factorialBackend"; boolean allowLocalRoutees = true; + String useRole = "backend"; ActorRef backend = getContext().actorOf( new Props(FactorialBackend.class).withRouter(new ClusterRouterConfig( - new AdaptiveLoadBalancingRouter(HeapMetricsSelector.getInstance(), 0), + new AdaptiveLoadBalancingRouter(HeapMetricsSelector.getInstance(), 0), new ClusterRouterSettings( - totalInstances, routeesPath, allowLocalRoutees))), + totalInstances, routeesPath, allowLocalRoutees, useRole))), "factorialBackendRouter2"); //#router-lookup-in-code } //not used, only for documentation -abstract class StatsService3 extends UntypedActor { +abstract class FactorialFrontend3 extends UntypedActor { //#router-deploy-in-code int totalInstances = 100; int maxInstancesPerNode = 3; boolean allowLocalRoutees = false; + String useRole = "backend"; ActorRef backend = getContext().actorOf( new Props(FactorialBackend.class).withRouter(new ClusterRouterConfig( new AdaptiveLoadBalancingRouter( - SystemLoadAverageMetricsSelector.getInstance(), 0), + SystemLoadAverageMetricsSelector.getInstance(), 0), new ClusterRouterSettings( - totalInstances, maxInstancesPerNode, allowLocalRoutees))), + totalInstances, maxInstancesPerNode, allowLocalRoutees, useRole))), "factorialBackendRouter3"); //#router-deploy-in-code } \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontendMain.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontendMain.java index e22ad6c2cb..c92c2a3eff 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontendMain.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontendMain.java @@ -1,5 +1,6 @@ package sample.cluster.factorial.japi; +import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import akka.actor.ActorSystem; import akka.actor.Props; @@ -12,8 +13,11 @@ public class FactorialFrontendMain { public static void main(String[] args) throws Exception { final int upToN = (args.length == 0 ? 200 : Integer.valueOf(args[0])); - final ActorSystem system = ActorSystem.create("ClusterSystem", ConfigFactory.load("factorial")); - system.log().info("Factorials will start when 3 members in the cluster."); + final Config config = ConfigFactory.parseString("akka.cluster.roles = [frontend]"). + withFallback(ConfigFactory.load("factorial")); + + final ActorSystem system = ActorSystem.create("ClusterSystem", config); + system.log().info("Factorials will start when 2 backend members in the cluster."); //#registerOnUp Cluster.get(system).registerOnMemberUp(new Runnable() { @Override diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java index 9b311835d3..bd80401cf9 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java @@ -9,7 +9,7 @@ import akka.actor.UntypedActor; import akka.dispatch.Recover; import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; -import akka.cluster.ClusterEvent.LeaderChanged; +import akka.cluster.ClusterEvent.RoleLeaderChanged; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.util.Timeout; @@ -25,10 +25,10 @@ public class StatsFacade extends UntypedActor { Address currentMaster = null; - //subscribe to cluster changes, MemberEvent + //subscribe to cluster changes, RoleLeaderChanged @Override public void preStart() { - cluster.subscribe(getSelf(), LeaderChanged.class); + cluster.subscribe(getSelf(), RoleLeaderChanged.class); } //re-subscribe when restart @@ -57,11 +57,12 @@ public class StatsFacade extends UntypedActor { } else if (message instanceof CurrentClusterState) { CurrentClusterState state = (CurrentClusterState) message; - currentMaster = state.getLeader(); + currentMaster = state.getRoleLeader("compute"); - } else if (message instanceof LeaderChanged) { - LeaderChanged leaderChanged = (LeaderChanged) message; - currentMaster = leaderChanged.getLeader(); + } else if (message instanceof RoleLeaderChanged) { + RoleLeaderChanged leaderChanged = (RoleLeaderChanged) message; + if (leaderChanged.role().equals("compute")) + currentMaster = leaderChanged.getLeader(); } else { unhandled(message); diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleClient.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleClient.java index 2a89390677..d0147007af 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleClient.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleClient.java @@ -77,14 +77,15 @@ public class StatsSampleClient extends UntypedActor { CurrentClusterState state = (CurrentClusterState) message; nodes.clear(); for (Member member : state.getMembers()) { - if (member.status().equals(MemberStatus.up())) { + if (member.hasRole("compute") && member.status().equals(MemberStatus.up())) { nodes.add(member.address()); } } } else if (message instanceof MemberUp) { MemberUp mUp = (MemberUp) message; - nodes.add(mUp.member().address()); + if (mUp.member().hasRole("compute")) + nodes.add(mUp.member().address()); } else if (message instanceof MemberEvent) { MemberEvent other = (MemberEvent) message; diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleClientMain.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleClientMain.java index 191a8f264c..7ce373231b 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleClientMain.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleClientMain.java @@ -8,6 +8,7 @@ import akka.actor.UntypedActorFactory; public class StatsSampleClientMain { public static void main(String[] args) throws Exception { + // note that client is not a compute node, role not defined ActorSystem system = ActorSystem.create("ClusterSystem"); system.actorOf(new Props(new UntypedActorFactory() { @Override diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleMain.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleMain.java index 9ca02ad644..e8c808f8c4 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleMain.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleMain.java @@ -1,37 +1,25 @@ package sample.cluster.stats.japi; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import akka.actor.ActorSystem; import akka.actor.Props; -import com.typesafe.config.ConfigFactory; - public class StatsSampleMain { public static void main(String[] args) throws Exception { - // Override the configuration of the port - // when specified as program argument - if (args.length > 0) - System.setProperty("akka.remote.netty.tcp.port", args[0]); + // Override the configuration of the port when specified as program argument + final Config config = + (args.length > 0 ? + ConfigFactory.parseString(String.format("akka.remote.netty.tcp.port=%s", args[0])) : + ConfigFactory.empty()). + withFallback(ConfigFactory.parseString("akka.cluster.roles = [compute]")). + withFallback(ConfigFactory.load()); - //#start-router-lookup - ActorSystem system = ActorSystem.create("ClusterSystem", - ConfigFactory.parseString( - "akka.actor.deployment { \n" + - " /statsService/workerRouter { \n" + - " router = consistent-hashing \n" + - " nr-of-instances = 100 \n" + - " cluster { \n" + - " enabled = on \n" + - " routees-path = \"/user/statsWorker\" \n" + - " allow-local-routees = on \n" + - " } \n" + - " } \n" + - "} \n") - .withFallback(ConfigFactory.load())); + ActorSystem system = ActorSystem.create("ClusterSystem", config); system.actorOf(new Props(StatsWorker.class), "statsWorker"); system.actorOf(new Props(StatsService.class), "statsService"); - //#start-router-lookup } } diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterClientMain.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterClientMain.java index 3c0101fbb6..942ff37d62 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterClientMain.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterClientMain.java @@ -11,22 +11,8 @@ import akka.contrib.pattern.ClusterSingletonPropsFactory; public class StatsSampleOneMasterClientMain { public static void main(String[] args) throws Exception { + // note that client is not a compute node, role not defined ActorSystem system = ActorSystem.create("ClusterSystem"); - - // the client is also part of the cluster - system.actorOf(new Props(new UntypedActorFactory() { - @Override - public ClusterSingletonManager create() { - return new ClusterSingletonManager("statsService", PoisonPill.getInstance(), - new ClusterSingletonPropsFactory() { - @Override - public Props create(Object handOverData) { - return new Props(StatsService.class); - } - }); - } - }), "singleton"); - system.actorOf(new Props(new UntypedActorFactory() { @Override public UntypedActor create() { diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterMain.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterMain.java index aeed8b133f..be4e8789a6 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterMain.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterMain.java @@ -1,5 +1,7 @@ package sample.cluster.stats.japi; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; @@ -7,38 +9,25 @@ import akka.actor.UntypedActorFactory; import akka.contrib.pattern.ClusterSingletonManager; import akka.contrib.pattern.ClusterSingletonPropsFactory; -import com.typesafe.config.ConfigFactory; - public class StatsSampleOneMasterMain { public static void main(String[] args) throws Exception { - // Override the configuration of the port - // when specified as program argument - if (args.length > 0) - System.setProperty("akka.remote.netty.tcp.port", args[0]); + // Override the configuration of the port when specified as program argument + final Config config = + (args.length > 0 ? + ConfigFactory.parseString(String.format("akka.remote.netty.tcp.port=%s", args[0])) : + ConfigFactory.empty()). + withFallback(ConfigFactory.parseString("akka.cluster.roles = [compute]")). + withFallback(ConfigFactory.load()); - //#start-router-deploy - ActorSystem system = ActorSystem.create("ClusterSystem", - ConfigFactory.parseString( - "akka.actor.deployment { \n" + - " /singleton/statsService/workerRouter { \n" + - " router = consistent-hashing \n" + - " nr-of-instances = 100 \n" + - " cluster { \n" + - " enabled = on \n" + - " max-nr-of-instances-per-node = 3 \n" + - " allow-local-routees = off \n" + - " } \n" + - " } \n" + - "} \n") - .withFallback(ConfigFactory.load())); - //#start-router-deploy + ActorSystem system = ActorSystem.create("ClusterSystem", config); //#create-singleton-manager system.actorOf(new Props(new UntypedActorFactory() { @Override public ClusterSingletonManager create() { return new ClusterSingletonManager("statsService", PoisonPill.getInstance(), + "compute", new ClusterSingletonPropsFactory() { @Override public Props create(Object handOverData) { diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsService.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsService.java index 6a3c2ca44d..81342c7d13 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsService.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsService.java @@ -60,10 +60,11 @@ abstract class StatsService2 extends UntypedActor { int totalInstances = 100; String routeesPath = "/user/statsWorker"; boolean allowLocalRoutees = true; + String useRole = "compute"; ActorRef workerRouter = getContext().actorOf( new Props(StatsWorker.class).withRouter(new ClusterRouterConfig( new ConsistentHashingRouter(0), new ClusterRouterSettings( - totalInstances, routeesPath, allowLocalRoutees))), + totalInstances, routeesPath, allowLocalRoutees, useRole))), "workerRouter2"); //#router-lookup-in-code } @@ -74,10 +75,11 @@ abstract class StatsService3 extends UntypedActor { int totalInstances = 100; int maxInstancesPerNode = 3; boolean allowLocalRoutees = false; + String useRole = "compute"; ActorRef workerRouter = getContext().actorOf( new Props(StatsWorker.class).withRouter(new ClusterRouterConfig( new ConsistentHashingRouter(0), new ClusterRouterSettings( - totalInstances, maxInstancesPerNode, allowLocalRoutees))), + totalInstances, maxInstancesPerNode, allowLocalRoutees, useRole))), "workerRouter3"); //#router-deploy-in-code } diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/transformation/japi/TransformationBackend.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/transformation/japi/TransformationBackend.java index 9214874cbf..786ae874b7 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/transformation/japi/TransformationBackend.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/transformation/japi/TransformationBackend.java @@ -54,10 +54,9 @@ public class TransformationBackend extends UntypedActor { } } - //try to register to all nodes, even though there - // might not be any frontend on all nodes void register(Member member) { - getContext().actorFor(member.address() + "/user/frontend").tell( + if (member.hasRole("frontend")) + getContext().actorFor(member.address() + "/user/frontend").tell( BACKEND_REGISTRATION, getSelf()); } } diff --git a/akka-samples/akka-sample-cluster/src/main/resources/application.conf b/akka-samples/akka-sample-cluster/src/main/resources/application.conf index b67eeac829..4c94ac6284 100644 --- a/akka-samples/akka-sample-cluster/src/main/resources/application.conf +++ b/akka-samples/akka-sample-cluster/src/main/resources/application.conf @@ -21,6 +21,36 @@ akka { } # //#cluster +# //#config-router-lookup +akka.actor.deployment { + /statsService/workerRouter { + router = consistent-hashing + nr-of-instances = 100 + cluster { + enabled = on + routees-path = "/user/statsWorker" + allow-local-routees = on + use-role = compute + } + } +} +# //#config-router-lookup + +# //#config-router-deploy +akka.actor.deployment { + /singleton/statsService/workerRouter { + router = consistent-hashing + nr-of-instances = 100 + cluster { + enabled = on + max-nr-of-instances-per-node = 3 + allow-local-routees = off + use-role = compute + } + } +} +# //#config-router-deploy + # //#adaptive-router akka.actor.deployment { /factorialFrontend/factorialBackendRouter = { @@ -33,6 +63,7 @@ akka.actor.deployment { cluster { enabled = on routees-path = "/user/factorialBackend" + use-role = backend allow-local-routees = off } } diff --git a/akka-samples/akka-sample-cluster/src/main/resources/factorial.conf b/akka-samples/akka-sample-cluster/src/main/resources/factorial.conf index 17e82db15d..e0c79671b6 100644 --- a/akka-samples/akka-sample-cluster/src/main/resources/factorial.conf +++ b/akka-samples/akka-sample-cluster/src/main/resources/factorial.conf @@ -2,4 +2,11 @@ include "application" # //#min-nr-of-members akka.cluster.min-nr-of-members = 3 -# //#min-nr-of-members \ No newline at end of file +# //#min-nr-of-members + +# //#role-min-nr-of-members +akka.cluster.role { + frontend.min-nr-of-members = 1 + backend.min-nr-of-members = 2 +} +# //#role-min-nr-of-members \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala index 15e5bfa21f..cc6c58fcfe 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala @@ -22,8 +22,11 @@ object FactorialFrontend { def main(args: Array[String]): Unit = { val upToN = if (args.isEmpty) 200 else args(0).toInt - val system = ActorSystem("ClusterSystem", ConfigFactory.load("factorial")) - system.log.info("Factorials will start when 3 members in the cluster.") + val config = ConfigFactory.parseString("akka.cluster.roles = [frontend]"). + withFallback(ConfigFactory.load("factorial")) + + val system = ActorSystem("ClusterSystem", config) + system.log.info("Factorials will start when 2 backend members in the cluster.") //#registerOnUp Cluster(system) registerOnMemberUp { system.actorOf(Props(new FactorialFrontend(upToN, repeat = true)), @@ -58,11 +61,14 @@ class FactorialFrontend(upToN: Int, repeat: Boolean) extends Actor with ActorLog object FactorialBackend { def main(args: Array[String]): Unit = { - // Override the configuration of the port - // when specified as program argument - if (args.nonEmpty) System.setProperty("akka.remote.netty.tcp.port", args(0)) + // Override the configuration of the port when specified as program argument + val config = + (if (args.nonEmpty) ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${args(0)}") + else ConfigFactory.empty).withFallback( + ConfigFactory.parseString("akka.cluster.roles = [backend]")). + withFallback(ConfigFactory.load("factorial")) - val system = ActorSystem("ClusterSystem", ConfigFactory.load("factorial")) + val system = ActorSystem("ClusterSystem", config) system.actorOf(Props[FactorialBackend], name = "factorialBackend") system.actorOf(Props[MetricsListener], name = "metricsListener") @@ -143,8 +149,8 @@ abstract class FactorialFrontend2 extends Actor { val backend = context.actorOf(Props[FactorialBackend].withRouter( ClusterRouterConfig(AdaptiveLoadBalancingRouter(HeapMetricsSelector), ClusterRouterSettings( - totalInstances = 100, routeesPath = "/user/statsWorker", - allowLocalRoutees = true))), + totalInstances = 100, routeesPath = "/user/factorialBackend", + allowLocalRoutees = true, useRole = Some("backend")))), name = "factorialBackendRouter2") //#router-lookup-in-code } @@ -161,7 +167,7 @@ abstract class FactorialFrontend3 extends Actor { ClusterRouterConfig(AdaptiveLoadBalancingRouter( SystemLoadAverageMetricsSelector), ClusterRouterSettings( totalInstances = 100, maxInstancesPerNode = 3, - allowLocalRoutees = false))), + allowLocalRoutees = false, useRole = Some("backend")))), name = "factorialBackendRouter3") //#router-deploy-in-code } \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala index 6f157a77d6..41fc1723df 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala @@ -95,9 +95,9 @@ class StatsFacade extends Actor with ActorLogging { var currentMaster: Option[Address] = None - // subscribe to cluster changes, LeaderChanged + // subscribe to cluster changes, RoleLeaderChanged // re-subscribe when restart - override def preStart(): Unit = cluster.subscribe(self, classOf[LeaderChanged]) + override def preStart(): Unit = cluster.subscribe(self, classOf[RoleLeaderChanged]) override def postStop(): Unit = cluster.unsubscribe(self) def receive = { @@ -112,8 +112,11 @@ class StatsFacade extends Actor with ActorLogging { case _ ⇒ JobFailed("Service unavailable, try again later") } pipeTo sender } - case state: CurrentClusterState ⇒ currentMaster = state.leader - case LeaderChanged(leader) ⇒ currentMaster = leader + case state: CurrentClusterState ⇒ + currentMaster = state.roleLeader("compute") + case RoleLeaderChanged(role, leader) ⇒ + if (role == "compute") + currentMaster = leader } } @@ -121,58 +124,36 @@ class StatsFacade extends Actor with ActorLogging { object StatsSample { def main(args: Array[String]): Unit = { - // Override the configuration of the port - // when specified as program argument - if (args.nonEmpty) System.setProperty("akka.remote.netty.tcp.port", args(0)) + // Override the configuration of the port when specified as program argument + val config = + (if (args.nonEmpty) ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${args(0)}") + else ConfigFactory.empty).withFallback( + ConfigFactory.parseString("akka.cluster.roles = [compute]")). + withFallback(ConfigFactory.load()) - //#start-router-lookup - val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(""" - akka.actor.deployment { - /statsService/workerRouter { - router = consistent-hashing - nr-of-instances = 100 - cluster { - enabled = on - routees-path = "/user/statsWorker" - allow-local-routees = on - } - } - } - """).withFallback(ConfigFactory.load())) + val system = ActorSystem("ClusterSystem", config) system.actorOf(Props[StatsWorker], name = "statsWorker") system.actorOf(Props[StatsService], name = "statsService") - //#start-router-lookup - } } object StatsSampleOneMaster { def main(args: Array[String]): Unit = { - // Override the configuration of the port - // when specified as program argument - if (args.nonEmpty) System.setProperty("akka.remote.netty.tcp.port", args(0)) + // Override the configuration of the port when specified as program argument + val config = + (if (args.nonEmpty) ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${args(0)}") + else ConfigFactory.empty).withFallback( + ConfigFactory.parseString("akka.cluster.roles = [compute]")). + withFallback(ConfigFactory.load()) - //#start-router-deploy - val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(""" - akka.actor.deployment { - /singleton/statsService/workerRouter { - router = consistent-hashing - nr-of-instances = 100 - cluster { - enabled = on - max-nr-of-instances-per-node = 3 - allow-local-routees = off - } - } - } - """).withFallback(ConfigFactory.load())) - //#start-router-deploy + val system = ActorSystem("ClusterSystem", config) //#create-singleton-manager system.actorOf(Props(new ClusterSingletonManager( singletonProps = _ ⇒ Props[StatsService], singletonName = "statsService", - terminationMessage = PoisonPill)), name = "singleton") + terminationMessage = PoisonPill, role = Some("compute"))), + name = "singleton") //#create-singleton-manager system.actorOf(Props[StatsFacade], name = "statsFacade") } @@ -180,6 +161,7 @@ object StatsSampleOneMaster { object StatsSampleClient { def main(args: Array[String]): Unit = { + // note that client is not a compute node, role not defined val system = ActorSystem("ClusterSystem") system.actorOf(Props(new StatsSampleClient("/user/statsService")), "client") } @@ -187,13 +169,8 @@ object StatsSampleClient { object StatsSampleOneMasterClient { def main(args: Array[String]): Unit = { + // note that client is not a compute node, role not defined val system = ActorSystem("ClusterSystem") - - // the client is also part of the cluster - system.actorOf(Props(new ClusterSingletonManager( - singletonProps = _ ⇒ Props[StatsService], singletonName = "statsService", - terminationMessage = PoisonPill)), name = "singleton") - system.actorOf(Props(new StatsSampleClient("/user/statsFacade")), "client") } } @@ -230,10 +207,12 @@ class StatsSampleClient(servicePath: String) extends Actor { case failed: JobFailed ⇒ println(failed) case state: CurrentClusterState ⇒ - nodes = state.members.collect { case m if m.status == MemberStatus.Up ⇒ m.address } - case MemberUp(m) ⇒ nodes += m.address - case other: MemberEvent ⇒ nodes -= other.member.address - case UnreachableMember(m) ⇒ nodes -= m.address + nodes = state.members.collect { + case m if m.hasRole("compute") && m.status == MemberStatus.Up ⇒ m.address + } + case MemberUp(m) if m.hasRole("compute") ⇒ nodes += m.address + case other: MemberEvent ⇒ nodes -= other.member.address + case UnreachableMember(m) ⇒ nodes -= m.address } } @@ -248,7 +227,7 @@ abstract class StatsService2 extends Actor { val workerRouter = context.actorOf(Props[StatsWorker].withRouter( ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings( totalInstances = 100, routeesPath = "/user/statsWorker", - allowLocalRoutees = true))), + allowLocalRoutees = true, useRole = Some("compute")))), name = "workerRouter2") //#router-lookup-in-code } @@ -263,7 +242,7 @@ abstract class StatsService3 extends Actor { val workerRouter = context.actorOf(Props[StatsWorker].withRouter( ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings( totalInstances = 100, maxInstancesPerNode = 3, - allowLocalRoutees = false))), + allowLocalRoutees = false, useRole = None))), name = "workerRouter3") //#router-deploy-in-code } diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala index e0947e04e7..01d309b2cd 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala @@ -3,7 +3,6 @@ package sample.cluster.transformation //#imports import language.postfixOps import scala.concurrent.duration._ - import akka.actor.Actor import akka.actor.ActorRef import akka.actor.ActorSystem @@ -17,6 +16,7 @@ import akka.cluster.Member import akka.cluster.MemberStatus import akka.pattern.ask import akka.util.Timeout +import com.typesafe.config.ConfigFactory //#imports //#messages @@ -28,11 +28,14 @@ case object BackendRegistration object TransformationFrontend { def main(args: Array[String]): Unit = { - // Override the configuration of the port - // when specified as program argument - if (args.nonEmpty) System.setProperty("akka.remote.netty.tcp.port", args(0)) + // Override the configuration of the port when specified as program argument + val config = + (if (args.nonEmpty) ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${args(0)}") + else ConfigFactory.empty).withFallback( + ConfigFactory.parseString("akka.cluster.roles = [frontend]")). + withFallback(ConfigFactory.load()) - val system = ActorSystem("ClusterSystem") + val system = ActorSystem("ClusterSystem", config) val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend") import system.dispatcher @@ -41,7 +44,7 @@ object TransformationFrontend { (frontend ? TransformationJob("hello-" + n)) onSuccess { case result ⇒ println(result) } - // wait a while until next request, + // wait a while until next request, // to avoid flooding the console with output Thread.sleep(2000) } @@ -75,11 +78,14 @@ class TransformationFrontend extends Actor { object TransformationBackend { def main(args: Array[String]): Unit = { - // Override the configuration of the port - // when specified as program argument - if (args.nonEmpty) System.setProperty("akka.remote.netty.tcp.port", args(0)) + // Override the configuration of the port when specified as program argument + val config = + (if (args.nonEmpty) ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${args(0)}") + else ConfigFactory.empty).withFallback( + ConfigFactory.parseString("akka.cluster.roles = [backend]")). + withFallback(ConfigFactory.load()) - val system = ActorSystem("ClusterSystem") + val system = ActorSystem("ClusterSystem", config) system.actorOf(Props[TransformationBackend], name = "backend") } } @@ -101,10 +107,9 @@ class TransformationBackend extends Actor { case MemberUp(m) ⇒ register(m) } - // try to register to all nodes, even though there - // might not be any frontend on all nodes def register(member: Member): Unit = - context.actorFor(RootActorPath(member.address) / "user" / "frontend") ! - BackendRegistration + if (member.hasRole("frontend")) + context.actorFor(RootActorPath(member.address) / "user" / "frontend") ! + BackendRegistration } //#backend \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala index aaa87c9d43..4dda541937 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala @@ -31,6 +31,7 @@ object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig { akka.loglevel = INFO akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.remote.log-remote-lifecycle-events = off + akka.cluster.roles = [compute] akka.cluster.auto-join = off # don't use sigar for tests, native lib not in path akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector @@ -43,6 +44,7 @@ object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig { enabled = on max-nr-of-instances-per-node = 3 allow-local-routees = off + use-role = compute } } } @@ -75,15 +77,15 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing Cluster(system) join node(first).address expectMsgAllOf( - MemberUp(Member(node(first).address, MemberStatus.Up)), - MemberUp(Member(node(second).address, MemberStatus.Up)), - MemberUp(Member(node(third).address, MemberStatus.Up))) + MemberUp(Member(node(first).address, MemberStatus.Up, Set.empty)), + MemberUp(Member(node(second).address, MemberStatus.Up, Set.empty)), + MemberUp(Member(node(third).address, MemberStatus.Up, Set.empty))) Cluster(system).unsubscribe(testActor) system.actorOf(Props(new ClusterSingletonManager( singletonProps = _ ⇒ Props[StatsService], singletonName = "statsService", - terminationMessage = PoisonPill)), name = "singleton") + terminationMessage = PoisonPill, role = Some("compute"))), name = "singleton") system.actorOf(Props[StatsFacade], "statsFacade") diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala index 7d9fbda51b..57aaf1f3b3 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala @@ -26,6 +26,7 @@ object StatsSampleSpecConfig extends MultiNodeConfig { commonConfig(ConfigFactory.parseString(""" akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.remote.log-remote-lifecycle-events = off + akka.cluster.roles = [compute] akka.cluster.auto-join = off # don't use sigar for tests, native lib not in path akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector @@ -38,6 +39,7 @@ object StatsSampleSpecConfig extends MultiNodeConfig { enabled = on routees-path = "/user/statsWorker" allow-local-routees = on + use-role = compute } } } @@ -96,9 +98,9 @@ abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig) system.actorOf(Props[StatsService], "statsService") expectMsgAllOf( - MemberUp(Member(firstAddress, MemberStatus.Up)), - MemberUp(Member(secondAddress, MemberStatus.Up)), - MemberUp(Member(thirdAddress, MemberStatus.Up))) + MemberUp(Member(firstAddress, MemberStatus.Up, Set.empty)), + MemberUp(Member(secondAddress, MemberStatus.Up, Set.empty)), + MemberUp(Member(thirdAddress, MemberStatus.Up, Set.empty))) Cluster(system).unsubscribe(testActor) diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala index 4583dac90e..8cd068123b 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala @@ -30,6 +30,7 @@ object StatsSampleJapiSpecConfig extends MultiNodeConfig { commonConfig(ConfigFactory.parseString(""" akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.remote.log-remote-lifecycle-events = off + akka.cluster.roles = [compute] akka.cluster.auto-join = off # don't use sigar for tests, native lib not in path akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector @@ -41,6 +42,7 @@ object StatsSampleJapiSpecConfig extends MultiNodeConfig { enabled = on routees-path = "/user/statsWorker" allow-local-routees = on + use-role = compute } } } @@ -81,9 +83,9 @@ abstract class StatsSampleJapiSpec extends MultiNodeSpec(StatsSampleJapiSpecConf system.actorOf(Props[StatsService], "statsService") expectMsgAllOf( - MemberUp(Member(firstAddress, MemberStatus.Up)), - MemberUp(Member(secondAddress, MemberStatus.Up)), - MemberUp(Member(thirdAddress, MemberStatus.Up))) + MemberUp(Member(firstAddress, MemberStatus.Up, Set.empty)), + MemberUp(Member(secondAddress, MemberStatus.Up, Set.empty)), + MemberUp(Member(thirdAddress, MemberStatus.Up, Set.empty))) Cluster(system).unsubscribe(testActor) diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleSingleMasterJapiSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleSingleMasterJapiSpec.scala index 3393db1a5f..ce6304f3ba 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleSingleMasterJapiSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleSingleMasterJapiSpec.scala @@ -33,6 +33,7 @@ object StatsSampleSingleMasterJapiSpecConfig extends MultiNodeConfig { akka.loglevel = INFO akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.remote.log-remote-lifecycle-events = off + akka.cluster.roles = [compute] akka.cluster.auto-join = off # don't use sigar for tests, native lib not in path akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector @@ -44,6 +45,7 @@ object StatsSampleSingleMasterJapiSpecConfig extends MultiNodeConfig { enabled = on max-nr-of-instances-per-node = 3 allow-local-routees = off + use-role = compute } } } @@ -75,15 +77,16 @@ abstract class StatsSampleSingleMasterJapiSpec extends MultiNodeSpec(StatsSample Cluster(system) join node(first).address expectMsgAllOf( - MemberUp(Member(node(first).address, MemberStatus.Up)), - MemberUp(Member(node(second).address, MemberStatus.Up)), - MemberUp(Member(node(third).address, MemberStatus.Up))) + MemberUp(Member(node(first).address, MemberStatus.Up, Set.empty)), + MemberUp(Member(node(second).address, MemberStatus.Up, Set.empty)), + MemberUp(Member(node(third).address, MemberStatus.Up, Set.empty))) Cluster(system).unsubscribe(testActor) system.actorOf(Props(new ClusterSingletonManager( singletonName = "statsService", terminationMessage = PoisonPill, + role = null, singletonPropsFactory = new ClusterSingletonPropsFactory { def create(handOverData: Any) = Props[StatsService] })), name = "singleton") diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/TransformationSampleSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/TransformationSampleSpec.scala index 0f18b0ce5b..ef7730d08e 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/TransformationSampleSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/TransformationSampleSpec.scala @@ -33,6 +33,11 @@ object TransformationSampleSpecConfig extends MultiNodeConfig { akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector """)) + nodeConfig(frontend1, frontend2)( + ConfigFactory.parseString("akka.cluster.roles =[frontend]")) + + nodeConfig(backend1, backend2, backend3)( + ConfigFactory.parseString("akka.cluster.roles =[backend]")) } // need one concrete test class per node diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/japi/TransformationSampleJapiSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/japi/TransformationSampleJapiSpec.scala index 2fbba499ea..c781ffe809 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/japi/TransformationSampleJapiSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/japi/TransformationSampleJapiSpec.scala @@ -34,6 +34,12 @@ object TransformationSampleJapiSpecConfig extends MultiNodeConfig { akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector """)) + nodeConfig(frontend1, frontend2)( + ConfigFactory.parseString("akka.cluster.roles =[frontend]")) + + nodeConfig(backend1, backend2, backend3)( + ConfigFactory.parseString("akka.cluster.roles =[backend]")) + } // need one concrete test class per node