Cluster node roles, see #3049

* Config of node roles cluster.role
* Cluster router configurable with use-role
* RoleLeaderChanged event
* Cluster singleton per role
* Cluster only starts once all required per-role node
  counts are reached,
  role.<role-name>.min-nr-of-members config
*  Update documentation and make use of the roles in the examples
This commit is contained in:
Patrik Nordwall 2013-03-14 20:32:43 +01:00
parent 6e8125a46e
commit 7eac88f372
49 changed files with 870 additions and 481 deletions

View file

@ -28,6 +28,23 @@ akka {
# formed in case of network partition.
auto-down = off
# The roles of this member. List of strings, e.g. roles = ["A", "B"].
# The roles are part of the membership information and can be used by
# routers or other services to distribute work to certain member types,
# e.g. front-end and back-end nodes.
roles = []
role {
# Minimum required number of members of a certain role before the leader changes
# member status of 'Joining' members to 'Up'. Typically used together with
# 'Cluster.registerOnMemberUp' to defer some action, such as starting actors,
# until the cluster has reached a certain size.
# E.g. to require 2 nodes with role 'frontend' and 3 nodes with role 'backend':
# frontend.min-nr-of-members = 2
# backend.min-nr-of-members = 3
#<role-name>.min-nr-of-members = 1
}
# Minimum required number of members before the leader changes member status
# of 'Joining' members to 'Up'. Typically used together with
# 'Cluster.registerOnMemberUp' to defer some action, such as starting actors,
@ -201,6 +218,9 @@ akka {
# when routees-path is defined.
routees-path = ""
# Use members with specified role, or all members if undefined.
use-role = ""
}
}

View file

@ -73,6 +73,17 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
format(system, other.getClass.getName))
}
/**
* roles that this member has
*/
def selfRoles: Set[String] = settings.Roles
/**
* Java API: roles that this member has
*/
def getSelfRoles: java.util.Set[String] =
scala.collection.JavaConverters.setAsJavaSetConverter(selfRoles).asJava
private val _isTerminated = new AtomicBoolean(false)
private val log = Logging(system, "Cluster")

View file

@ -116,11 +116,13 @@ private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: Dynami
if (deploy.routerConfig.isInstanceOf[RemoteRouterConfig])
throw new ConfigurationException("Cluster deployment can't be combined with [%s]".format(deploy.routerConfig))
import ClusterRouterSettings.useRoleOption
val clusterRouterSettings = ClusterRouterSettings(
totalInstances = deploy.config.getInt("nr-of-instances"),
maxInstancesPerNode = deploy.config.getInt("cluster.max-nr-of-instances-per-node"),
allowLocalRoutees = deploy.config.getBoolean("cluster.allow-local-routees"),
routeesPath = deploy.config.getString("cluster.routees-path"))
routeesPath = deploy.config.getString("cluster.routees-path"),
useRole = useRoleOption(deploy.config.getString("cluster.use-role")))
Some(deploy.copy(
routerConfig = ClusterRouterConfig(deploy.routerConfig, clusterRouterSettings), scope = ClusterScope))

View file

@ -37,7 +37,7 @@ object ClusterUserAction {
* Command to join the cluster. Sent when a node (represented by 'address')
* wants to join another node (the receiver).
*/
case class Join(address: Address) extends ClusterMessage
case class Join(address: Address, roles: Set[String]) extends ClusterMessage
/**
* Command to leave the cluster.
@ -288,20 +288,20 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
}
def initialized: Actor.Receive = {
case msg: GossipEnvelope receiveGossip(msg)
case GossipTick gossip()
case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions()
case PublishStatsTick publishInternalStats()
case InitJoin initJoin()
case JoinTo(address) join(address)
case ClusterUserAction.Join(address) joining(address)
case ClusterUserAction.Down(address) downing(address)
case ClusterUserAction.Leave(address) leaving(address)
case Exit(address) exiting(address)
case Remove(address) removing(address)
case SendGossipTo(address) gossipTo(address)
case msg: SubscriptionMessage publisher forward msg
case msg: GossipEnvelope receiveGossip(msg)
case GossipTick gossip()
case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions()
case PublishStatsTick publishInternalStats()
case InitJoin initJoin()
case JoinTo(address) join(address)
case ClusterUserAction.Join(address, roles) joining(address, roles)
case ClusterUserAction.Down(address) downing(address)
case ClusterUserAction.Leave(address) leaving(address)
case Exit(address) exiting(address)
case Remove(address) removing(address)
case SendGossipTo(address) gossipTo(address)
case msg: SubscriptionMessage publisher forward msg
}
@ -366,16 +366,16 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
context.become(initialized)
if (address == selfAddress)
joining(address)
joining(address, cluster.selfRoles)
else
clusterCore(address) ! ClusterUserAction.Join(selfAddress)
clusterCore(address) ! ClusterUserAction.Join(selfAddress, cluster.selfRoles)
}
}
/**
* State transition to JOINING - new node joining.
*/
def joining(node: Address): Unit = {
def joining(node: Address, roles: Set[String]): Unit = {
if (node.protocol != selfAddress.protocol)
log.warning("Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]",
selfAddress.protocol, node.protocol)
@ -396,7 +396,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates)
val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining)
val newMembers = localMembers + Member(node, Joining, roles) + Member(selfAddress, Joining, cluster.selfRoles)
val newGossip = latestGossip copy (members = newMembers)
val versionedGossip = newGossip :+ vclockNode
@ -404,7 +404,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
latestGossip = seenVersionedGossip
log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node)
log.info("Cluster Node [{}] - Node [{}] is JOINING, roles [{}]", selfAddress, node, roles.mkString(", "))
if (node != selfAddress) {
gossipTo(node)
}
@ -419,7 +419,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
*/
def leaving(address: Address): Unit = {
if (latestGossip.members.exists(_.address == address)) { // only try to update if the node is available (in the member ring)
val newMembers = latestGossip.members map { member if (member.address == address) Member(address, Leaving) else member } // mark node as LEAVING
val newMembers = latestGossip.members map { m if (m.address == address) m.copy(status = Leaving) else m } // mark node as LEAVING
val newGossip = latestGossip copy (members = newMembers)
val versionedGossip = newGossip :+ vclockNode
@ -636,8 +636,12 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
if (localGossip.convergence) {
// we have convergence - so we can't have unreachable nodes
val numberOfMembers = localMembers.size
def isJoiningToUp(m: Member): Boolean = m.status == Joining && numberOfMembers >= MinNrOfMembers
def enoughMembers: Boolean = {
localMembers.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall {
case (role, threshold) localMembers.count(_.hasRole(role)) >= threshold
}
}
def isJoiningToUp(m: Member): Boolean = m.status == Joining && enoughMembers
// transform the node member ring
val newMembers = localMembers collect {

View file

@ -35,7 +35,8 @@ object ClusterEvent {
members: immutable.SortedSet[Member] = immutable.SortedSet.empty,
unreachable: Set[Member] = Set.empty,
seenBy: Set[Address] = Set.empty,
leader: Option[Address] = None) extends ClusterDomainEvent {
leader: Option[Address] = None,
roleLeaderMap: Map[String, Option[Address]] = Map.empty) extends ClusterDomainEvent {
/**
* Java API: get current member list.
@ -61,6 +62,28 @@ object ClusterEvent {
* Java API: get address of current leader, or null if none
*/
def getLeader: Address = leader orNull
/**
* All node roles in the cluster
*/
def allRoles: Set[String] = roleLeaderMap.keySet
/**
* Java API: All node roles in the cluster
*/
def getAllRoles: java.util.Set[String] =
scala.collection.JavaConverters.setAsJavaSetConverter(allRoles).asJava
/**
* get address of current leader, if any, within the role set
*/
def roleLeader(role: String): Option[Address] = roleLeaderMap.getOrElse(role, None)
/**
* Java API: get address of current leader within the role set,
* or null if no node with that role
*/
def getRoleLeader(role: String): Address = roleLeaderMap.get(role).flatten.orNull
}
/**
@ -107,6 +130,18 @@ object ClusterEvent {
def getLeader: Address = leader orNull
}
/**
* First member (leader) of the members within a role set changed.
* Published when the state change is first seen on a node.
*/
case class RoleLeaderChanged(role: String, leader: Option[Address]) extends ClusterDomainEvent {
/**
* Java API
* @return address of current leader, or null if none
*/
def getLeader: Address = leader orNull
}
/**
* A member is considered as unreachable by the failure detector.
*/
@ -184,9 +219,22 @@ object ClusterEvent {
/**
* INTERNAL API
*/
private[cluster] def diffLeader(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[LeaderChanged] =
if (newGossip.leader != oldGossip.leader) List(LeaderChanged(newGossip.leader))
private[cluster] def diffLeader(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[LeaderChanged] = {
val newLeader = newGossip.leader
if (newLeader != oldGossip.leader) List(LeaderChanged(newLeader))
else Nil
}
/**
* INTERNAL API
*/
private[cluster] def diffRolesLeader(oldGossip: Gossip, newGossip: Gossip): Set[RoleLeaderChanged] = {
for {
role (oldGossip.allRoles ++ newGossip.allRoles)
newLeader = newGossip.roleLeader(role)
if newLeader != oldGossip.roleLeader(role)
} yield RoleLeaderChanged(role, newLeader)
}
/**
* INTERNAL API
@ -242,7 +290,8 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
members = latestGossip.members,
unreachable = latestGossip.overview.unreachable,
seenBy = latestGossip.seenBy,
leader = latestGossip.leader)
leader = latestGossip.leader,
roleLeaderMap = latestGossip.allRoles.map(r r -> latestGossip.roleLeader(r))(collection.breakOut))
receiver match {
case Some(ref) ref ! state
case None publish(state)
@ -275,6 +324,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
}
}
diffLeader(oldGossip, newGossip) foreach publish
diffRolesLeader(oldGossip, newGossip) foreach publish
// publish internal SeenState for testing purposes
diffSeen(oldGossip, newGossip) foreach publish
}

View file

@ -57,7 +57,10 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
// replace current member with new member (might have different status, only address is used in equals)
state = state.copy(members = state.members - event.member + event.member,
unreachable = state.unreachable - event.member)
case LeaderChanged(leader) state = state.copy(leader = leader)
case LeaderChanged(leader)
state = state.copy(leader = leader)
case RoleLeaderChanged(role, leader)
state = state.copy(roleLeaderMap = state.roleLeaderMap + (role -> leader))
case s: CurrentClusterState state = s
case CurrentInternalStats(stats) _latestStats = stats
case ClusterMetricsChanged(nodes) _clusterMetrics = nodes
@ -68,7 +71,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
def self: Member = {
state.members.find(_.address == selfAddress).orElse(state.unreachable.find(_.address == selfAddress)).
getOrElse(Member(selfAddress, MemberStatus.Removed))
getOrElse(Member(selfAddress, MemberStatus.Removed, cluster.selfRoles))
}
/**

View file

@ -5,6 +5,7 @@ package akka.cluster
import scala.collection.immutable
import com.typesafe.config.Config
import com.typesafe.config.ConfigObject
import scala.concurrent.duration.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.ConfigurationException
@ -50,9 +51,16 @@ class ClusterSettings(val config: Config, val systemName: String) {
final val PublishStatsInterval: FiniteDuration = Duration(cc.getMilliseconds("publish-stats-interval"), MILLISECONDS)
final val AutoJoin: Boolean = cc.getBoolean("auto-join")
final val AutoDown: Boolean = cc.getBoolean("auto-down")
final val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet
final val MinNrOfMembers: Int = {
cc.getInt("min-nr-of-members")
} requiring (_ > 0, "min-nr-of-members must be > 0")
final val MinNrOfMembersOfRole: Map[String, Int] = {
import scala.collection.JavaConverters._
cc.getConfig("role").root.asScala.collect {
case (key, value: ConfigObject) (key -> value.toConfig.getInt("min-nr-of-members"))
}.toMap
}
final val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled")
final val UseDispatcher: String = cc.getString("use-dispatcher") match {
case "" Dispatchers.DefaultDispatcherId

View file

@ -185,12 +185,18 @@ private[cluster] case class Gossip(
def isLeader(address: Address): Boolean = leader == Some(address)
def leader: Option[Address] = {
if (members.isEmpty) None
else members.find(m m.status != Joining && m.status != Exiting && m.status != Down).
orElse(Some(members.min(Member.leaderStatusOrdering))).map(_.address)
def leader: Option[Address] = leaderOf(members)
def roleLeader(role: String): Option[Address] = leaderOf(members.filter(_.hasRole(role)))
private def leaderOf(mbrs: immutable.SortedSet[Member]): Option[Address] = {
if (mbrs.isEmpty) None
else mbrs.find(m m.status != Joining && m.status != Exiting && m.status != Down).
orElse(Some(mbrs.min(Member.leaderStatusOrdering))).map(_.address)
}
def allRoles: Set[String] = members.flatMap(_.roles)
def isSingletonCluster: Boolean = members.size == 1
/**
@ -201,7 +207,7 @@ private[cluster] case class Gossip(
def member(address: Address): Member = {
members.find(_.address == address).orElse(overview.unreachable.find(_.address == address)).
getOrElse(Member(address, Removed))
getOrElse(Member(address, Removed, Set.empty))
}
override def toString =

View file

@ -12,15 +12,26 @@ import akka.actor.Address
import MemberStatus._
/**
* Represents the address and the current status of a cluster member node.
* Represents the address, current status, and roles of a cluster member node.
*
* Note: `hashCode` and `equals` are solely based on the underlying `Address`, not its `MemberStatus`.
* Note: `hashCode` and `equals` are solely based on the underlying `Address`, not its `MemberStatus`
* and roles.
*/
class Member(val address: Address, val status: MemberStatus) extends ClusterMessage {
case class Member(val address: Address, val status: MemberStatus, roles: Set[String]) extends ClusterMessage {
override def hashCode = address.##
override def equals(other: Any) = Member.unapply(this) == Member.unapply(other)
override def equals(other: Any) = other match {
case m: Member address == m.address
case _ false
}
override def toString = "Member(address = %s, status = %s)" format (address, status)
def copy(address: Address = this.address, status: MemberStatus = this.status): Member = new Member(address, status)
def hasRole(role: String): Boolean = roles.contains(role)
/**
* Java API
*/
def getRoles: java.util.Set[String] =
scala.collection.JavaConverters.setAsJavaSetConverter(roles).asJava
}
/**
@ -65,13 +76,6 @@ object Member {
def compare(a: Member, b: Member): Int = addressOrdering.compare(a.address, b.address)
}
def apply(address: Address, status: MemberStatus): Member = new Member(address, status)
def unapply(other: Any) = other match {
case m: Member Some(m.address)
case _ None
}
def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = {
// group all members by Address => Seq[Member]
val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address)

View file

@ -74,26 +74,31 @@ object ClusterRouterSettings {
/**
* Settings for create and deploy of the routees
*/
def apply(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean): ClusterRouterSettings =
new ClusterRouterSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees)
def apply(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: Option[String]): ClusterRouterSettings =
new ClusterRouterSettings(totalInstances, maxInstancesPerNode, routeesPath = "", allowLocalRoutees, useRole)
/**
* Settings for remote deployment of the routees, allowed to use routees on own node
*/
def apply(totalInstances: Int, maxInstancesPerNode: Int): ClusterRouterSettings =
apply(totalInstances, maxInstancesPerNode, allowLocalRoutees = true)
def apply(totalInstances: Int, maxInstancesPerNode: Int, useRole: Option[String]): ClusterRouterSettings =
apply(totalInstances, maxInstancesPerNode, allowLocalRoutees = true, useRole)
/**
* Settings for lookup of the routees
*/
def apply(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean): ClusterRouterSettings =
new ClusterRouterSettings(totalInstances, routeesPath, allowLocalRoutees)
def apply(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean, useRole: Option[String]): ClusterRouterSettings =
new ClusterRouterSettings(totalInstances, maxInstancesPerNode = 1, routeesPath, allowLocalRoutees, useRole)
/**
* Settings for lookup of the routees, allowed to use routees on own node
*/
def apply(totalInstances: Int, routeesPath: String): ClusterRouterSettings =
apply(totalInstances, routeesPath, allowLocalRoutees = true)
def apply(totalInstances: Int, routeesPath: String, useRole: Option[String]): ClusterRouterSettings =
apply(totalInstances, routeesPath, allowLocalRoutees = true, useRole)
def useRoleOption(role: String): Option[String] = role match {
case null | "" None
case _ Some(role)
}
}
/**
@ -106,19 +111,22 @@ case class ClusterRouterSettings private[akka] (
totalInstances: Int,
maxInstancesPerNode: Int,
routeesPath: String,
allowLocalRoutees: Boolean) {
allowLocalRoutees: Boolean,
useRole: Option[String]) {
/**
* Java API: Settings for create and deploy of the routees
*/
def this(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean) =
this(totalInstances, maxInstancesPerNode, routeesPath = "", allowLocalRoutees)
def this(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: String) =
this(totalInstances, maxInstancesPerNode, routeesPath = "", allowLocalRoutees,
ClusterRouterSettings.useRoleOption(useRole))
/**
* Java API: Settings for lookup of the routees
*/
def this(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean) =
this(totalInstances, maxInstancesPerNode = 1, routeesPath, allowLocalRoutees)
def this(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean, useRole: String) =
this(totalInstances, maxInstancesPerNode = 1, routeesPath, allowLocalRoutees,
ClusterRouterSettings.useRoleOption(useRole))
if (totalInstances <= 0) throw new IllegalArgumentException("totalInstances of cluster router must be > 0")
if (maxInstancesPerNode <= 0) throw new IllegalArgumentException("maxInstancesPerNode of cluster router must be > 0")
@ -220,7 +228,7 @@ private[akka] class ClusterRouteeProvider(
private[routing] def availableNodes: immutable.SortedSet[Address] = {
import Member.addressOrdering
val currentNodes = nodes
if (currentNodes.isEmpty && settings.allowLocalRoutees)
if (currentNodes.isEmpty && settings.allowLocalRoutees && satisfiesRole(cluster.selfRoles))
//use my own node, cluster information not updated yet
immutable.SortedSet(cluster.selfAddress)
else
@ -236,7 +244,14 @@ private[akka] class ClusterRouteeProvider(
}
private[routing] def isAvailable(m: Member): Boolean =
m.status == MemberStatus.Up && (settings.allowLocalRoutees || m.address != cluster.selfAddress)
m.status == MemberStatus.Up &&
satisfiesRole(m.roles) &&
(settings.allowLocalRoutees || m.address != cluster.selfAddress)
private def satisfiesRole(memberRoles: Set[String]): Boolean = settings.useRole match {
case None true
case Some(r) memberRoles.contains(r)
}
}

View file

@ -7,6 +7,7 @@ import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import scala.collection.immutable.SortedSet
import scala.concurrent.duration._
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
@ -25,64 +26,110 @@ object MinMembersBeforeUpMultiJvmSpec extends MultiNodeConfig {
withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
}
object MinMembersOfRoleBeforeUpMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(
"akka.cluster.role.backend.min-nr-of-members = 2")).
withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
nodeConfig(first)(
ConfigFactory.parseString("akka.cluster.roles =[frontend]"))
nodeConfig(second, third)(
ConfigFactory.parseString("akka.cluster.roles =[backend]"))
}
class MinMembersBeforeUpMultiJvmNode1 extends MinMembersBeforeUpSpec
class MinMembersBeforeUpMultiJvmNode2 extends MinMembersBeforeUpSpec
class MinMembersBeforeUpMultiJvmNode3 extends MinMembersBeforeUpSpec
abstract class MinMembersBeforeUpSpec
extends MultiNodeSpec(MinMembersBeforeUpMultiJvmSpec)
with MultiNodeClusterSpec {
class MinMembersOfRoleBeforeUpMultiJvmNode1 extends MinMembersOfRoleBeforeUpSpec
class MinMembersOfRoleBeforeUpMultiJvmNode2 extends MinMembersOfRoleBeforeUpSpec
class MinMembersOfRoleBeforeUpMultiJvmNode3 extends MinMembersOfRoleBeforeUpSpec
import MinMembersBeforeUpMultiJvmSpec._
import ClusterEvent._
abstract class MinMembersBeforeUpSpec extends MinMembersBeforeUpBase(MinMembersBeforeUpMultiJvmSpec) {
override def first: RoleName = MinMembersBeforeUpMultiJvmSpec.first
override def second: RoleName = MinMembersBeforeUpMultiJvmSpec.second
override def third: RoleName = MinMembersBeforeUpMultiJvmSpec.third
"Cluster leader" must {
"wait with moving members to UP until minimum number of members have joined" taggedAs LongRunningTest in {
val onUpLatch = TestLatch(1)
cluster.registerOnMemberUp(onUpLatch.countDown())
runOn(first) {
cluster join myself
awaitCond {
val result = clusterView.status == Joining
clusterView.refreshCurrentState()
result
}
}
enterBarrier("first-started")
onUpLatch.isOpen must be(false)
runOn(second) {
cluster.join(first)
}
runOn(first, second) {
val expectedAddresses = Set(first, second) map address
awaitCond {
val result = clusterView.members.map(_.address) == expectedAddresses
clusterView.refreshCurrentState()
result
}
clusterView.members.map(_.status) must be(Set(Joining))
// and it should not change
1 to 5 foreach { _
Thread.sleep(1000)
clusterView.members.map(_.address) must be(expectedAddresses)
clusterView.members.map(_.status) must be(Set(Joining))
}
}
enterBarrier("second-joined")
runOn(third) {
cluster.join(first)
}
awaitClusterUp(first, second, third)
onUpLatch.await
enterBarrier("after-1")
testWaitMovingMembersToUp()
}
}
}
abstract class MinMembersOfRoleBeforeUpSpec extends MinMembersBeforeUpBase(MinMembersOfRoleBeforeUpMultiJvmSpec) {
override def first: RoleName = MinMembersOfRoleBeforeUpMultiJvmSpec.first
override def second: RoleName = MinMembersOfRoleBeforeUpMultiJvmSpec.second
override def third: RoleName = MinMembersOfRoleBeforeUpMultiJvmSpec.third
"Cluster leader" must {
"wait with moving members to UP until minimum number of members with specific role have joined" taggedAs LongRunningTest in {
testWaitMovingMembersToUp()
}
}
}
abstract class MinMembersBeforeUpBase(multiNodeConfig: MultiNodeConfig)
extends MultiNodeSpec(multiNodeConfig)
with MultiNodeClusterSpec {
import ClusterEvent._
def first: RoleName
def second: RoleName
def third: RoleName
def testWaitMovingMembersToUp(): Unit = {
val onUpLatch = TestLatch(1)
cluster.registerOnMemberUp(onUpLatch.countDown())
runOn(first) {
cluster join myself
awaitCond {
val result = clusterView.status == Joining
clusterView.refreshCurrentState()
result
}
}
enterBarrier("first-started")
onUpLatch.isOpen must be(false)
runOn(second) {
cluster.join(first)
}
runOn(first, second) {
val expectedAddresses = Set(first, second) map address
awaitCond {
val result = clusterView.members.map(_.address) == expectedAddresses
clusterView.refreshCurrentState()
result
}
clusterView.members.map(_.status) must be(Set(Joining))
// and it should not change
1 to 5 foreach { _
Thread.sleep(1000)
clusterView.members.map(_.address) must be(expectedAddresses)
clusterView.members.map(_.status) must be(Set(Joining))
}
}
enterBarrier("second-joined")
runOn(third) {
cluster.join(first)
}
awaitClusterUp(first, second, third)
onUpLatch.await
enterBarrier("after-1")
}
}

View file

@ -230,7 +230,7 @@ abstract class TransitionSpec
runOn(third) {
markNodeAsUnavailable(second)
reapUnreachable()
awaitCond(clusterView.unreachableMembers.contains(Member(second, Up)))
awaitCond(clusterView.unreachableMembers.contains(Member(second, Up, Set.empty)))
awaitCond(seenLatestGossip == Set(third))
}
@ -239,7 +239,7 @@ abstract class TransitionSpec
third gossipTo first
runOn(first, third) {
awaitCond(clusterView.unreachableMembers.contains(Member(second, Up)))
awaitCond(clusterView.unreachableMembers.contains(Member(second, Up, Set.empty)))
}
runOn(first) {
@ -251,7 +251,7 @@ abstract class TransitionSpec
first gossipTo third
runOn(first, third) {
awaitCond(clusterView.unreachableMembers.contains(Member(second, Down)))
awaitCond(clusterView.unreachableMembers.contains(Member(second, Down, Set.empty)))
awaitMemberStatus(second, Down)
awaitCond(seenLatestGossip == Set(first, third))
}

View file

@ -115,7 +115,7 @@ abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoa
def startRouter(name: String): ActorRef = {
val router = system.actorOf(Props[Routee].withRouter(ClusterRouterConfig(
local = AdaptiveLoadBalancingRouter(HeapMetricsSelector),
settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1))), name)
settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1, useRole = None))), name)
awaitCond {
// it may take some time until router receives cluster member events
currentRoutees(router).size == roles.size

View file

@ -124,7 +124,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC
"deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in {
runOn(first) {
val router2 = system.actorOf(Props[Echo].withRouter(ClusterRouterConfig(local = ConsistentHashingRouter(),
settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 2))), "router2")
settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 2, useRole = None))), "router2")
awaitCond {
// it may take some time until router receives cluster member events
currentRoutees(router2).size == 6
@ -157,7 +157,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC
val router4 = system.actorOf(Props[Echo].withRouter(ClusterRouterConfig(
local = ConsistentHashingRouter(hashMapping = hashMapping),
settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1))), "router4")
settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1, useRole = None))), "router4")
assertHashMapping(router4)
}

View file

@ -71,10 +71,21 @@ object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig {
routees-path = "/user/myservice"
}
}
/router5 {
router = round-robin
nr-of-instances = 10
cluster {
enabled = on
use-role = a
}
}
}
""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
nodeConfig(first, second)(ConfigFactory.parseString("""akka.cluster.roles =["a", "c"]"""))
nodeConfig(third)(ConfigFactory.parseString("""akka.cluster.roles =["b", "c"]"""))
}
class ClusterRoundRobinRoutedActorMultiJvmNode1 extends ClusterRoundRobinRoutedActorSpec
@ -89,9 +100,10 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
lazy val router1 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router1")
lazy val router2 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(RoundRobinRouter(),
ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router2")
ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1, useRole = None))), "router2")
lazy val router3 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router3")
lazy val router4 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router4")
lazy val router5 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router5")
def receiveReplies(routeeType: RouteeType, expectedReplies: Int): Map[Address, Int] = {
val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0)
@ -240,6 +252,28 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
enterBarrier("after-6")
}
"deploy routees to specified node role" taggedAs LongRunningTest in {
runOn(first) {
awaitCond(currentRoutees(router5).size == 2)
val iterationCount = 10
for (i 0 until iterationCount) {
router5 ! "hit"
}
val replies = receiveReplies(DeployRoutee, iterationCount)
replies(first) must be > (0)
replies(second) must be > (0)
replies(third) must be(0)
replies(fourth) must be(0)
replies.values.sum must be(iterationCount)
}
enterBarrier("after-7")
}
"deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in {
runOn(first) {
@ -263,7 +297,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
replies.values.sum must be(iterationCount)
}
enterBarrier("after-7")
enterBarrier("after-8")
}
"deploy programatically defined routees to other node when a node becomes down" taggedAs LongRunningTest in {
@ -292,7 +326,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
replies.values.sum must be(iterationCount)
}
enterBarrier("after-8")
enterBarrier("after-9")
}
}

View file

@ -39,6 +39,8 @@ class ClusterConfigSpec extends AkkaSpec {
AutoJoin must be(true)
AutoDown must be(false)
MinNrOfMembers must be(1)
MinNrOfMembersOfRole must be === Map.empty
Roles must be === Set.empty
JmxEnabled must be(true)
UseDispatcher must be(Dispatchers.DefaultDispatcherId)
GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001)

View file

@ -53,7 +53,7 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) {
service,
deployment.get.config,
ClusterRouterConfig(RoundRobinRouter(20), ClusterRouterSettings(
totalInstances = 20, maxInstancesPerNode = 3, allowLocalRoutees = false)),
totalInstances = 20, maxInstancesPerNode = 3, allowLocalRoutees = false, useRole = None)),
ClusterScope)))
}
@ -67,7 +67,7 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) {
service,
deployment.get.config,
ClusterRouterConfig(RoundRobinRouter(20), ClusterRouterSettings(
totalInstances = 20, routeesPath = "/user/myservice", allowLocalRoutees = false)),
totalInstances = 20, routeesPath = "/user/myservice", allowLocalRoutees = false, useRole = None)),
ClusterScope)))
}

View file

@ -24,23 +24,24 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
with BeforeAndAfterEach with ImplicitSender {
var publisher: ActorRef = _
val aUp = Member(Address("akka.tcp", "sys", "a", 2552), Up)
val aUp = Member(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty)
val aLeaving = aUp.copy(status = Leaving)
val aExiting = aUp.copy(status = Exiting)
val aRemoved = aUp.copy(status = Removed)
val bUp = Member(Address("akka.tcp", "sys", "b", 2552), Up)
val bUp = Member(Address("akka.tcp", "sys", "b", 2552), Up, Set.empty)
val bRemoved = bUp.copy(status = Removed)
val cJoining = Member(Address("akka.tcp", "sys", "c", 2552), Joining)
val cJoining = Member(Address("akka.tcp", "sys", "c", 2552), Joining, Set("GRP"))
val cUp = cJoining.copy(status = Up)
val cRemoved = cUp.copy(status = Removed)
val dUp = Member(Address("akka.tcp", "sys", "a", 2551), Up)
val a51Up = Member(Address("akka.tcp", "sys", "a", 2551), Up, Set.empty)
val dUp = Member(Address("akka.tcp", "sys", "d", 2552), Up, Set("GRP"))
val g0 = Gossip(members = SortedSet(aUp)).seen(aUp.address)
val g1 = Gossip(members = SortedSet(aUp, bUp, cJoining)).seen(aUp.address).seen(bUp.address).seen(cJoining.address)
val g2 = Gossip(members = SortedSet(aUp, bUp, cUp)).seen(aUp.address)
val g3 = g2.seen(bUp.address).seen(cUp.address)
val g4 = Gossip(members = SortedSet(dUp, aUp, bUp, cUp)).seen(aUp.address)
val g5 = Gossip(members = SortedSet(dUp, aUp, bUp, cUp)).seen(aUp.address).seen(bUp.address).seen(cUp.address).seen(dUp.address)
val g4 = Gossip(members = SortedSet(a51Up, aUp, bUp, cUp)).seen(aUp.address)
val g5 = Gossip(members = SortedSet(a51Up, aUp, bUp, cUp)).seen(aUp.address).seen(bUp.address).seen(cUp.address).seen(a51Up.address)
val g6 = Gossip(members = SortedSet(aLeaving, bUp, cUp)).seen(aUp.address)
val g7 = Gossip(members = SortedSet(aExiting, bUp, cUp)).seen(aUp.address)
@ -69,10 +70,10 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
"publish leader changed" in {
publisher ! PublishChanges(g4)
memberSubscriber.expectMsg(MemberUp(dUp))
memberSubscriber.expectMsg(MemberUp(a51Up))
memberSubscriber.expectMsg(MemberUp(bUp))
memberSubscriber.expectMsg(MemberUp(cUp))
memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address)))
memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address)))
memberSubscriber.expectNoMsg(1 second)
}
@ -96,15 +97,25 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
"not publish leader changed when same leader" in {
publisher ! PublishChanges(g4)
memberSubscriber.expectMsg(MemberUp(dUp))
memberSubscriber.expectMsg(MemberUp(a51Up))
memberSubscriber.expectMsg(MemberUp(bUp))
memberSubscriber.expectMsg(MemberUp(cUp))
memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address)))
memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address)))
publisher ! PublishChanges(g5)
memberSubscriber.expectNoMsg(1 second)
}
"publish role leader changed" in {
val subscriber = TestProbe()
publisher ! Subscribe(subscriber.ref, classOf[RoleLeaderChanged])
subscriber.expectMsgType[CurrentClusterState]
publisher ! PublishChanges(Gossip(members = SortedSet(cJoining, dUp)))
subscriber.expectMsg(RoleLeaderChanged("GRP", Some(dUp.address)))
publisher ! PublishChanges(Gossip(members = SortedSet(cUp, dUp)))
subscriber.expectMsg(RoleLeaderChanged("GRP", Some(cUp.address)))
}
"send CurrentClusterState when subscribe" in {
val subscriber = TestProbe()
publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent])
@ -132,6 +143,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
publisher ! PublishChanges(g3)
subscriber.expectMsg(MemberUp(bUp))
subscriber.expectMsg(MemberUp(cUp))
subscriber.expectMsg(RoleLeaderChanged("GRP", Some(cUp.address)))
subscriber.expectMsgType[SeenChanged]
publisher ! PublishStart

View file

@ -15,19 +15,25 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers {
import MemberStatus._
import ClusterEvent._
val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Up)
val a2 = Member(Address("akka.tcp", "sys", "a", 2552), Joining)
val a3 = Member(Address("akka.tcp", "sys", "a", 2552), Removed)
val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up)
val b2 = Member(Address("akka.tcp", "sys", "b", 2552), Removed)
val b3 = Member(Address("akka.tcp", "sys", "b", 2552), Down)
val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Leaving)
val c2 = Member(Address("akka.tcp", "sys", "c", 2552), Up)
val d1 = Member(Address("akka.tcp", "sys", "d", 2552), Leaving)
val d2 = Member(Address("akka.tcp", "sys", "d", 2552), Removed)
val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Joining)
val e2 = Member(Address("akka.tcp", "sys", "e", 2552), Up)
val e3 = Member(Address("akka.tcp", "sys", "e", 2552), Down)
val aRoles = Set("AA", "AB")
val aJoining = Member(Address("akka.tcp", "sys", "a", 2552), Joining, aRoles)
val aUp = Member(Address("akka.tcp", "sys", "a", 2552), Up, aRoles)
val aRemoved = Member(Address("akka.tcp", "sys", "a", 2552), Removed, aRoles)
val bRoles = Set("AB", "BB")
val bUp = Member(Address("akka.tcp", "sys", "b", 2552), Up, bRoles)
val bDown = Member(Address("akka.tcp", "sys", "b", 2552), Down, bRoles)
val bRemoved = Member(Address("akka.tcp", "sys", "b", 2552), Removed, bRoles)
val cRoles = Set.empty[String]
val cUp = Member(Address("akka.tcp", "sys", "c", 2552), Up, cRoles)
val cLeaving = Member(Address("akka.tcp", "sys", "c", 2552), Leaving, cRoles)
val dRoles = Set("DD", "DE")
val dLeaving = Member(Address("akka.tcp", "sys", "d", 2552), Leaving, dRoles)
val dExiting = Member(Address("akka.tcp", "sys", "d", 2552), Exiting, dRoles)
val dRemoved = Member(Address("akka.tcp", "sys", "d", 2552), Removed, dRoles)
val eRoles = Set("EE", "DE")
val eJoining = Member(Address("akka.tcp", "sys", "e", 2552), Joining, eRoles)
val eUp = Member(Address("akka.tcp", "sys", "e", 2552), Up, eRoles)
val eDown = Member(Address("akka.tcp", "sys", "e", 2552), Down, eRoles)
def converge(gossip: Gossip): (Gossip, Set[Address]) =
((gossip, Set.empty[Address]) /: gossip.members) { (gs, m) (gs._1.seen(m.address), gs._2 + m.address) }
@ -35,66 +41,83 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers {
"Domain events" must {
"be empty for the same gossip" in {
val g1 = Gossip(members = SortedSet(a1))
val g1 = Gossip(members = SortedSet(aUp))
diffUnreachable(g1, g1) must be(Seq.empty)
}
"be produced for new members" in {
val (g1, _) = converge(Gossip(members = SortedSet(a1)))
val (g2, s2) = converge(Gossip(members = SortedSet(a1, b1, e1)))
val (g1, _) = converge(Gossip(members = SortedSet(aUp)))
val (g2, s2) = converge(Gossip(members = SortedSet(aUp, bUp, eJoining)))
diffMemberEvents(g1, g2) must be(Seq(MemberUp(b1)))
diffMemberEvents(g1, g2) must be(Seq(MemberUp(bUp)))
diffUnreachable(g1, g2) must be(Seq.empty)
diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2)))
}
"be produced for changed status of members" in {
val (g1, _) = converge(Gossip(members = SortedSet(a2, b1, c2)))
val (g2, s2) = converge(Gossip(members = SortedSet(a1, b1, c1, e1)))
val (g1, _) = converge(Gossip(members = SortedSet(aJoining, bUp, cUp)))
val (g2, s2) = converge(Gossip(members = SortedSet(aUp, bUp, cLeaving, eJoining)))
diffMemberEvents(g1, g2) must be(Seq(MemberUp(a1)))
diffMemberEvents(g1, g2) must be(Seq(MemberUp(aUp)))
diffUnreachable(g1, g2) must be(Seq.empty)
diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2)))
}
"be produced for members in unreachable" in {
val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(c2, e2)))
val g2 = Gossip(members = SortedSet(a1), overview = GossipOverview(unreachable = Set(c2, b3, e3)))
val g1 = Gossip(members = SortedSet(aUp, bUp), overview = GossipOverview(unreachable = Set(cUp, eUp)))
val g2 = Gossip(members = SortedSet(aUp), overview = GossipOverview(unreachable = Set(cUp, bDown, eDown)))
diffUnreachable(g1, g2) must be(Seq(UnreachableMember(b3)))
diffUnreachable(g1, g2) must be(Seq(UnreachableMember(bDown)))
diffSeen(g1, g2) must be(Seq.empty)
}
"be produced for removed members" in {
val (g1, _) = converge(Gossip(members = SortedSet(a1, d1)))
val (g2, s2) = converge(Gossip(members = SortedSet(a1)))
val (g1, _) = converge(Gossip(members = SortedSet(aUp, dLeaving)))
val (g2, s2) = converge(Gossip(members = SortedSet(aUp)))
diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(d2)))
diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(dRemoved)))
diffUnreachable(g1, g2) must be(Seq.empty)
diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2)))
}
"be produced for convergence changes" in {
val g1 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address).seen(e1.address)
val g2 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address)
val g1 = Gossip(members = SortedSet(aUp, bUp, eJoining)).seen(aUp.address).seen(bUp.address).seen(eJoining.address)
val g2 = Gossip(members = SortedSet(aUp, bUp, eJoining)).seen(aUp.address).seen(bUp.address)
diffMemberEvents(g1, g2) must be(Seq.empty)
diffUnreachable(g1, g2) must be(Seq.empty)
diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = false, seenBy = Set(a1.address, b1.address))))
diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = false, seenBy = Set(aUp.address, bUp.address))))
diffMemberEvents(g2, g1) must be(Seq.empty)
diffUnreachable(g2, g1) must be(Seq.empty)
diffSeen(g2, g1) must be(Seq(SeenChanged(convergence = true, seenBy = Set(a1.address, b1.address, e1.address))))
diffSeen(g2, g1) must be(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address, eJoining.address))))
}
"be produced for leader changes" in {
val (g1, _) = converge(Gossip(members = SortedSet(a1, b1, e1)))
val (g2, s2) = converge(Gossip(members = SortedSet(b1, e1)))
val (g1, _) = converge(Gossip(members = SortedSet(aUp, bUp, eJoining)))
val (g2, s2) = converge(Gossip(members = SortedSet(bUp, eJoining)))
diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(a3)))
diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(aRemoved)))
diffUnreachable(g1, g2) must be(Seq.empty)
diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2)))
diffLeader(g1, g2) must be(Seq(LeaderChanged(Some(b1.address))))
diffLeader(g1, g2) must be(Seq(LeaderChanged(Some(bUp.address))))
}
"be produced for role leader changes" in {
val g0 = Gossip.empty
val g1 = Gossip(members = SortedSet(aUp, bUp, cUp, dLeaving, eJoining))
val g2 = Gossip(members = SortedSet(bUp, cUp, dExiting, eJoining))
diffRolesLeader(g0, g1) must be(
Set(RoleLeaderChanged("AA", Some(aUp.address)),
RoleLeaderChanged("AB", Some(aUp.address)),
RoleLeaderChanged("BB", Some(bUp.address)),
RoleLeaderChanged("DD", Some(dLeaving.address)),
RoleLeaderChanged("DE", Some(dLeaving.address)),
RoleLeaderChanged("EE", Some(eUp.address))))
diffRolesLeader(g1, g2) must be(
Set(RoleLeaderChanged("AA", None),
RoleLeaderChanged("AB", Some(bUp.address)),
RoleLeaderChanged("DE", Some(eJoining.address))))
}
}
}

View file

@ -14,18 +14,18 @@ class GossipSpec extends WordSpec with MustMatchers {
import MemberStatus._
val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Up)
val a2 = Member(Address("akka.tcp", "sys", "a", 2552), Joining)
val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up)
val b2 = Member(Address("akka.tcp", "sys", "b", 2552), Removed)
val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Leaving)
val c2 = Member(Address("akka.tcp", "sys", "c", 2552), Up)
val c3 = Member(Address("akka.tcp", "sys", "c", 2552), Exiting)
val d1 = Member(Address("akka.tcp", "sys", "d", 2552), Leaving)
val d2 = Member(Address("akka.tcp", "sys", "d", 2552), Removed)
val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Joining)
val e2 = Member(Address("akka.tcp", "sys", "e", 2552), Up)
val e3 = Member(Address("akka.tcp", "sys", "e", 2552), Down)
val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty)
val a2 = a1.copy(status = Joining)
val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up, Set.empty)
val b2 = b1.copy(status = Removed)
val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Leaving, Set.empty)
val c2 = c1.copy(status = Up)
val c3 = c1.copy(status = Exiting)
val d1 = Member(Address("akka.tcp", "sys", "d", 2552), Leaving, Set.empty)
val d2 = d1.copy(status = Removed)
val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Joining, Set.empty)
val e2 = e1.copy(status = Up)
val e3 = e1.copy(status = Down)
"A Gossip" must {

View file

@ -17,29 +17,31 @@ class MemberOrderingSpec extends WordSpec with MustMatchers {
import Member.addressOrdering
import MemberStatus._
def m(address: Address, status: MemberStatus): Member = Member(address, status, Set.empty)
"An Ordering[Member]" must {
"order members by host:port" in {
val members = SortedSet.empty[Member] +
Member(AddressFromURIString("akka://sys@darkstar:1112"), Up) +
Member(AddressFromURIString("akka://sys@darkstar:1113"), Joining) +
Member(AddressFromURIString("akka://sys@darkstar:1111"), Up)
m(AddressFromURIString("akka://sys@darkstar:1112"), Up) +
m(AddressFromURIString("akka://sys@darkstar:1113"), Joining) +
m(AddressFromURIString("akka://sys@darkstar:1111"), Up)
val seq = members.toSeq
seq.size must equal(3)
seq(0) must equal(Member(AddressFromURIString("akka://sys@darkstar:1111"), Up))
seq(1) must equal(Member(AddressFromURIString("akka://sys@darkstar:1112"), Up))
seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Joining))
seq(0) must equal(m(AddressFromURIString("akka://sys@darkstar:1111"), Up))
seq(1) must equal(m(AddressFromURIString("akka://sys@darkstar:1112"), Up))
seq(2) must equal(m(AddressFromURIString("akka://sys@darkstar:1113"), Joining))
}
"be sorted by address correctly" in {
import Member.ordering
// sorting should be done on host and port, only
val m1 = Member(Address("akka.tcp", "sys1", "host1", 9000), MemberStatus.Up)
val m2 = Member(Address("akka.tcp", "sys1", "host1", 10000), MemberStatus.Up)
val m3 = Member(Address("cluster", "sys2", "host2", 8000), MemberStatus.Up)
val m4 = Member(Address("cluster", "sys2", "host2", 9000), MemberStatus.Up)
val m5 = Member(Address("cluster", "sys1", "host2", 10000), MemberStatus.Up)
val m1 = m(Address("akka.tcp", "sys1", "host1", 9000), Up)
val m2 = m(Address("akka.tcp", "sys1", "host1", 10000), Up)
val m3 = m(Address("cluster", "sys2", "host2", 8000), Up)
val m4 = m(Address("cluster", "sys2", "host2", 9000), Up)
val m5 = m(Address("cluster", "sys1", "host2", 10000), Up)
val expected = IndexedSeq(m1, m2, m3, m4, m5)
val shuffled = Random.shuffle(expected)
@ -49,9 +51,9 @@ class MemberOrderingSpec extends WordSpec with MustMatchers {
"have stable equals and hashCode" in {
val address = Address("akka.tcp", "sys1", "host1", 9000)
val m1 = Member(address, MemberStatus.Joining)
val m2 = Member(address, MemberStatus.Up)
val m3 = Member(address.copy(port = Some(10000)), MemberStatus.Up)
val m1 = m(address, Joining)
val m2 = m(address, Up)
val m3 = m(address.copy(port = Some(10000)), Up)
m1 must be(m2)
m1.hashCode must be(m2.hashCode)
@ -64,9 +66,9 @@ class MemberOrderingSpec extends WordSpec with MustMatchers {
val address1 = Address("akka.tcp", "sys1", "host1", 9001)
val address2 = address1.copy(port = Some(9002))
val x = Member(address1, Exiting)
val y = Member(address1, Removed)
val z = Member(address2, Up)
val x = m(address1, Exiting)
val y = m(address1, Removed)
val z = m(address2, Up)
Member.ordering.compare(x, y) must be(0)
Member.ordering.compare(x, z) must be(Member.ordering.compare(y, z))
}
@ -76,11 +78,11 @@ class MemberOrderingSpec extends WordSpec with MustMatchers {
val address2 = address1.copy(port = Some(9002))
val address3 = address1.copy(port = Some(9003))
(SortedSet(Member(address1, MemberStatus.Joining)) - Member(address1, MemberStatus.Up)) must be(SortedSet.empty[Member])
(SortedSet(Member(address1, MemberStatus.Exiting)) - Member(address1, MemberStatus.Removed)) must be(SortedSet.empty[Member])
(SortedSet(Member(address1, MemberStatus.Up)) - Member(address1, MemberStatus.Exiting)) must be(SortedSet.empty[Member])
(SortedSet(Member(address2, Up), Member(address3, Joining), Member(address1, MemberStatus.Exiting)) - Member(address1, MemberStatus.Removed)) must be(
SortedSet(Member(address2, Up), Member(address3, Joining)))
(SortedSet(m(address1, Joining)) - m(address1, Up)) must be(SortedSet.empty[Member])
(SortedSet(m(address1, Exiting)) - m(address1, Removed)) must be(SortedSet.empty[Member])
(SortedSet(m(address1, Up)) - m(address1, Exiting)) must be(SortedSet.empty[Member])
(SortedSet(m(address2, Up), m(address3, Joining), m(address1, Exiting)) - m(address1, Removed)) must be(
SortedSet(m(address2, Up), m(address3, Joining)))
}
}
@ -136,14 +138,14 @@ class MemberOrderingSpec extends WordSpec with MustMatchers {
"order members with status Joining, Exiting and Down last" in {
val address = Address("akka.tcp", "sys1", "host1", 5000)
val m1 = Member(address, MemberStatus.Joining)
val m2 = Member(address.copy(port = Some(7000)), MemberStatus.Joining)
val m3 = Member(address.copy(port = Some(3000)), MemberStatus.Exiting)
val m4 = Member(address.copy(port = Some(6000)), MemberStatus.Exiting)
val m5 = Member(address.copy(port = Some(2000)), MemberStatus.Down)
val m6 = Member(address.copy(port = Some(4000)), MemberStatus.Down)
val m7 = Member(address.copy(port = Some(8000)), MemberStatus.Up)
val m8 = Member(address.copy(port = Some(9000)), MemberStatus.Up)
val m1 = m(address, Joining)
val m2 = m(address.copy(port = Some(7000)), Joining)
val m3 = m(address.copy(port = Some(3000)), Exiting)
val m4 = m(address.copy(port = Some(6000)), Exiting)
val m5 = m(address.copy(port = Some(2000)), Down)
val m6 = m(address.copy(port = Some(4000)), Down)
val m7 = m(address.copy(port = Some(8000)), Up)
val m8 = m(address.copy(port = Some(9000)), Up)
val expected = IndexedSeq(m7, m8, m1, m2, m3, m4, m5, m6)
val shuffled = Random.shuffle(expected)
shuffled.sorted(Member.leaderStatusOrdering) must be(expected)

View file

@ -19,12 +19,13 @@ such as single-point of bottleneck. Single-point of failure is also a relevant c
but for some cases this feature takes care of that by making sure that another singleton
instance will eventually be started.
The cluster singleton pattern is implemented by ``akka.contrib.pattern.ClusterSingletonManager``,
which is an actor that is supposed to be started on all nodes in the cluster.
The actual singleton actor is started by the ``ClusterSingletonManager`` on the
leader node of the cluster by creating a child actor from supplied ``Props``.
``ClusterSingletonManager`` makes sure that at most one singleton instance is
running at any point in time.
The cluster singleton pattern is implemented by ``akka.contrib.pattern.ClusterSingletonManager``.
It manages singleton actor instance among all cluster nodes or a group of nodes tagged with
a specific role. ``ClusterSingletonManager`` is an actor that is supposed to be started on
all nodes, or all nodes with specified role, in the cluster. The actual singleton actor is
started by the ``ClusterSingletonManager`` on the leader node by creating a child actor from
supplied ``Props``. ``ClusterSingletonManager`` makes sure that at most one singleton instance
is running at any point in time.
The singleton actor is always running on the leader member, which is nothing more than
the address currently sorted first in the member ring. This can change when adding
@ -39,9 +40,9 @@ not be a graceful hand-over, but more than one active singletons is prevented by
reasonable means. Some corner cases are eventually resolved by configurable timeouts.
You access the singleton actor with ``actorFor`` using the names you have specified when
creating the ClusterSingletonManager. You can subscribe to cluster ``LeaderChanged`` events
to keep track of which node it is supposed to be running on. Alternatively the singleton
actor may broadcast its existence when it is started.
creating the ClusterSingletonManager. You can subscribe to cluster ``LeaderChanged`` or
``RoleLeaderChanged`` events to keep track of which node it is supposed to be running on.
Alternatively the singleton actor may broadcast its existence when it is started.
An Example
----------
@ -57,7 +58,12 @@ supply the ``Props`` of the singleton actor, in this case the JMS queue consumer
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#create-singleton-manager
Here we limit the singleton to nodes tagged with the ``"worker"`` role, but all nodes, independent of
role, can be used by specifying ``None`` as ``role`` parameter.
The corresponding Java API for the ``singeltonProps`` function is ``akka.contrib.pattern.ClusterSingletonPropsFactory``.
The Java API constructor takes a plain String for the role parameter and ``null`` means that all nodes, independent of
role, are used.
Here we use an application specific ``terminationMessage`` to be able to close the
resources before actually stopping the singleton actor. Note that ``PoisonPill`` is a
@ -72,12 +78,15 @@ This message will be sent over to the ``ClusterSingletonManager`` at the new lea
will be passed to the ``singletonProps`` factory when creating the new singleton instance.
With the names given above the path of singleton actor can be constructed by subscribing to
``LeaderChanged`` cluster event and the actor reference is then looked up using ``actorFor``:
``RoleLeaderChanged`` cluster event and the actor reference is then looked up using ``actorFor``:
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#singleton-proxy
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#singleton-proxy2
Subscribe to ``LeaderChanged`` instead of ``RoleLeaderChanged`` if you don't limit the singleton to
the group of members tagged with a specific role.
Note that the hand-over might still be in progress and the singleton actor might not be started yet
when you receive the ``LeaderChanged`` event.
when you receive the ``LeaderChanged`` / ``RoleLeaderChanged`` event.
To test scenarios where the cluster leader node is removed or shut down you can use :ref:`multi-node-testing` and
utilize the fact that the leader is supposed to be the first member when sorted by member address.

View file

@ -90,6 +90,11 @@ object ClusterSingletonManager {
val TakeOverRetryTimer = "take-over-retry"
val CleanupTimer = "cleanup"
def roleOption(role: String): Option[String] = role match {
case null | "" None
case _ Some(role)
}
object LeaderChangedBuffer {
/**
* Request to deliver one more event.
@ -110,7 +115,7 @@ object ClusterSingletonManager {
* `GetNext` request is allowed. Incoming events are buffered and delivered
* upon `GetNext` request.
*/
class LeaderChangedBuffer extends Actor {
class LeaderChangedBuffer(role: Option[String]) extends Actor {
import LeaderChangedBuffer._
import context.dispatcher
@ -119,14 +124,23 @@ object ClusterSingletonManager {
var memberCount = 0
// subscribe to LeaderChanged, re-subscribe when restart
override def preStart(): Unit = cluster.subscribe(self, classOf[LeaderChanged])
override def preStart(): Unit = role match {
case None cluster.subscribe(self, classOf[LeaderChanged])
case Some(_) cluster.subscribe(self, classOf[RoleLeaderChanged])
}
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case state: CurrentClusterState
changes :+= InitialLeaderState(state.leader, state.members.size)
val initial = role match {
case None InitialLeaderState(state.leader, state.members.size)
case Some(r) InitialLeaderState(state.roleLeader(r), state.members.count(_.hasRole(r)))
}
changes :+= initial
case event: LeaderChanged
changes :+= event
case RoleLeaderChanged(r, leader)
if (role.orNull == r) changes :+= LeaderChanged(leader)
case GetNext if changes.isEmpty
context.become(deliverNext, discardOld = false)
case GetNext
@ -138,11 +152,20 @@ object ClusterSingletonManager {
// the buffer was empty when GetNext was received, deliver next event immediately
def deliverNext: Actor.Receive = {
case state: CurrentClusterState
context.parent ! InitialLeaderState(state.leader, state.members.size)
val initial = role match {
case None InitialLeaderState(state.leader, state.members.size)
case Some(r) InitialLeaderState(state.roleLeader(r), state.members.count(_.hasRole(r)))
}
context.parent ! initial
context.unbecome()
case event: LeaderChanged
context.parent ! event
context.unbecome()
case RoleLeaderChanged(r, leader)
if (role.orNull == r) {
context.parent ! LeaderChanged(leader)
context.unbecome()
}
}
}
@ -176,11 +199,13 @@ trait ClusterSingletonPropsFactory extends Serializable {
class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(message, null)
/**
* Manages a cluster wide singleton actor instance, i.e.
* at most one singleton instance is running at any point in time.
* The ClusterSingletonManager is supposed to be started on all
* nodes in the cluster with `actorOf`. The actual singleton is
* started on the leader node of the cluster by creating a child
* Manages singleton actor instance among all cluster nodes or a group
* of nodes tagged with a specific role. At most one singleton instance
* is running at any point in time.
*
* The ClusterSingletonManager is supposed to be started on all nodes,
* or all nodes with specified role, in the cluster with `actorOf`.
* The actual singleton is started on the leader node by creating a child
* actor from the supplied `singletonProps`.
*
* The singleton actor is always running on the leader member, which is
@ -206,7 +231,8 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess
*
* You access the singleton actor with `actorFor` using the names you have
* specified when creating the ClusterSingletonManager. You can subscribe to
* [[akka.cluster.ClusterEvent.LeaderChanged]] to keep track of which node
* [[akka.cluster.ClusterEvent.LeaderChanged]] or
* [[akka.cluster.ClusterEvent.RoleLeaderChanged]] to keep track of which node
* it is supposed to be running on. Alternatively the singleton actor may
* broadcast its existence when it is started.
*
@ -232,6 +258,10 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess
* Note that [[akka.actor.PoisonPill]] is a perfectly fine
* `terminationMessage` if you only need to stop the actor.
*
* '''''role''''' Singleton among the nodes tagged with specified role.
* If the role is not specified it's a singleton among all nodes in
* the cluster.
*
* '''''maxHandOverRetries''''' When a node is becoming leader it sends
* hand-over request to previous leader. This is retried with the
* `retryInterval` until the previous leader confirms that the hand
@ -262,6 +292,7 @@ class ClusterSingletonManager(
singletonProps: Option[Any] Props,
singletonName: String,
terminationMessage: Any,
role: Option[String],
maxHandOverRetries: Int = 20,
maxTakeOverRetries: Int = 15,
retryInterval: FiniteDuration = 1.second,
@ -278,13 +309,14 @@ class ClusterSingletonManager(
def this(
singletonName: String,
terminationMessage: Any,
role: String,
maxHandOverRetries: Int,
maxTakeOverRetries: Int,
retryInterval: FiniteDuration,
loggingEnabled: Boolean,
singletonPropsFactory: ClusterSingletonPropsFactory) =
this(handOverData singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage,
maxHandOverRetries, maxTakeOverRetries, retryInterval)
ClusterSingletonManager.Internal.roleOption(role), maxHandOverRetries, maxTakeOverRetries, retryInterval)
/**
* Java API constructor with default values.
@ -292,8 +324,10 @@ class ClusterSingletonManager(
def this(
singletonName: String,
terminationMessage: Any,
role: String,
singletonPropsFactory: ClusterSingletonPropsFactory) =
this(handOverData singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage)
this(handOverData singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage,
ClusterSingletonManager.Internal.roleOption(role))
import ClusterSingletonManager._
import ClusterSingletonManager.Internal._
@ -301,6 +335,12 @@ class ClusterSingletonManager(
val cluster = Cluster(context.system)
val selfAddressOption = Some(cluster.selfAddress)
role match {
case None
case Some(r) require(cluster.selfRoles.contains(r), s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]")
}
// started when when self member is Up
var leaderChangedBuffer: ActorRef = _
// Previous GetNext request delivered event and new GetNext is to be sent
@ -357,7 +397,7 @@ class ClusterSingletonManager(
when(Start) {
case Event(StartLeaderChangedBuffer, _)
leaderChangedBuffer = context.actorOf(Props[LeaderChangedBuffer].withDispatcher(context.props.dispatcher))
leaderChangedBuffer = context.actorOf(Props(new LeaderChangedBuffer(role)).withDispatcher(context.props.dispatcher))
getNextLeaderChanged()
stay

View file

@ -90,7 +90,8 @@ class ClusterSingletonManagerChaosSpec extends MultiNodeSpec(ClusterSingletonMan
system.actorOf(Props(new ClusterSingletonManager(
singletonProps = handOverData Props(new Echo(testActor)),
singletonName = "echo",
terminationMessage = PoisonPill)),
terminationMessage = PoisonPill,
role = None)),
name = "singleton")
}

View file

@ -27,6 +27,7 @@ import akka.actor.Terminated
object ClusterSingletonManagerSpec extends MultiNodeConfig {
val controller = role("controller")
val observer = role("observer")
val first = role("first")
val second = role("second")
val third = role("third")
@ -42,6 +43,9 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
akka.cluster.auto-down = on
"""))
nodeConfig(first, second, third, fourth, fifth, sixth)(
ConfigFactory.parseString("akka.cluster.roles =[worker]"))
object PointToPointChannel {
case object RegisterConsumer
case object UnregisterConsumer
@ -162,6 +166,30 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
}
//#singleton-proxy
// documentation of how to keep track of the role leader address in user land
//#singleton-proxy2
class ConsumerProxy2 extends Actor {
// subscribe to RoleLeaderChanged, re-subscribe when restart
override def preStart(): Unit =
Cluster(context.system).subscribe(self, classOf[RoleLeaderChanged])
override def postStop(): Unit =
Cluster(context.system).unsubscribe(self)
val role = "worker"
var leaderAddress: Option[Address] = None
def receive = {
case state: CurrentClusterState leaderAddress = state.roleLeader(role)
case RoleLeaderChanged(r, leader) if (r == role) leaderAddress = leader
case other consumer foreach { _ forward other }
}
def consumer: Option[ActorRef] =
leaderAddress map (a context.actorFor(RootActorPath(a) /
"user" / "singleton" / "consumer"))
}
//#singleton-proxy2
}
class ClusterSingletonManagerMultiJvmNode1 extends ClusterSingletonManagerSpec
@ -171,6 +199,7 @@ class ClusterSingletonManagerMultiJvmNode4 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode5 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode6 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode7 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode8 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerSpec) with STMultiNodeSpec with ImplicitSender {
import ClusterSingletonManagerSpec._
@ -181,13 +210,13 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
//#sort-cluster-roles
// Sort the roles in the order used by the cluster.
lazy val sortedClusterRoles: immutable.IndexedSeq[RoleName] = {
lazy val sortedWorkerNodes: immutable.IndexedSeq[RoleName] = {
implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] {
import Member.addressOrdering
def compare(x: RoleName, y: RoleName) =
addressOrdering.compare(node(x).address, node(y).address)
}
roles.filterNot(_ == controller).toVector.sorted
roles.filterNot(r r == controller || r == observer).toVector.sorted
}
//#sort-cluster-roles
@ -196,7 +225,7 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system) join node(to).address
createSingleton()
if (Cluster(system).selfRoles.contains("worker")) createSingleton()
}
}
@ -206,7 +235,8 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
singletonProps = handOverData
Props(new Consumer(handOverData, queue, testActor)),
singletonName = "consumer",
terminationMessage = End)),
terminationMessage = End,
role = Some("worker"))),
name = "singleton")
//#create-singleton-manager
}
@ -231,7 +261,7 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
runOn(leader) {
expectMsg(msg)
}
runOn(sortedClusterRoles.filterNot(_ == leader): _*) {
runOn(sortedWorkerNodes.filterNot(_ == leader): _*) {
expectNoMsg(1 second)
}
enterBarrier(leader.name + "-verified")
@ -251,7 +281,7 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
"A ClusterSingletonManager" must {
"startup in single member cluster" in within(10 seconds) {
log.info("Sorted cluster nodes [{}]", sortedClusterRoles.map(node(_).address).mkString(", "))
log.info("Sorted cluster nodes [{}]", sortedWorkerNodes.map(node(_).address).mkString(", "))
runOn(controller) {
// watch that it is not terminated, which would indicate misbehaviour
@ -259,44 +289,48 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
}
enterBarrier("queue-started")
join(sortedClusterRoles.last, sortedClusterRoles.last)
verify(sortedClusterRoles.last, msg = 1, expectedCurrent = 0)
join(sortedWorkerNodes.last, sortedWorkerNodes.last)
verify(sortedWorkerNodes.last, msg = 1, expectedCurrent = 0)
// join the observer node as well, which should not influence since it doesn't have the "worker" role
join(observer, sortedWorkerNodes.last)
enterBarrier("after-1")
}
"hand over when new leader joins to 1 node cluster" in within(15 seconds) {
val newLeaderRole = sortedClusterRoles(4)
join(newLeaderRole, sortedClusterRoles.last)
val newLeaderRole = sortedWorkerNodes(4)
join(newLeaderRole, sortedWorkerNodes.last)
verify(newLeaderRole, msg = 2, expectedCurrent = 1)
}
"hand over when new leader joins to 2 nodes cluster" in within(15 seconds) {
val newLeaderRole = sortedClusterRoles(3)
join(newLeaderRole, sortedClusterRoles.last)
val newLeaderRole = sortedWorkerNodes(3)
join(newLeaderRole, sortedWorkerNodes.last)
verify(newLeaderRole, msg = 3, expectedCurrent = 2)
}
"hand over when new leader joins to 3 nodes cluster" in within(15 seconds) {
val newLeaderRole = sortedClusterRoles(2)
join(newLeaderRole, sortedClusterRoles.last)
val newLeaderRole = sortedWorkerNodes(2)
join(newLeaderRole, sortedWorkerNodes.last)
verify(newLeaderRole, msg = 4, expectedCurrent = 3)
}
"hand over when new leader joins to 4 nodes cluster" in within(15 seconds) {
val newLeaderRole = sortedClusterRoles(1)
join(newLeaderRole, sortedClusterRoles.last)
val newLeaderRole = sortedWorkerNodes(1)
join(newLeaderRole, sortedWorkerNodes.last)
verify(newLeaderRole, msg = 5, expectedCurrent = 4)
}
"hand over when new leader joins to 5 nodes cluster" in within(15 seconds) {
val newLeaderRole = sortedClusterRoles(0)
join(newLeaderRole, sortedClusterRoles.last)
val newLeaderRole = sortedWorkerNodes(0)
join(newLeaderRole, sortedWorkerNodes.last)
verify(newLeaderRole, msg = 6, expectedCurrent = 5)
}
"hand over when leader leaves in 6 nodes cluster " in within(30 seconds) {
//#test-leave
val leaveRole = sortedClusterRoles(0)
val newLeaderRole = sortedClusterRoles(1)
val leaveRole = sortedWorkerNodes(0)
val newLeaderRole = sortedWorkerNodes(1)
runOn(leaveRole) {
Cluster(system) leave node(leaveRole).address
@ -320,18 +354,18 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
system.eventStream.publish(Mute(EventFilter.error(pattern = ".*Association failed.*")))
enterBarrier("logs-muted")
crash(sortedClusterRoles(1))
verify(sortedClusterRoles(2), msg = 8, expectedCurrent = 0)
crash(sortedWorkerNodes(1))
verify(sortedWorkerNodes(2), msg = 8, expectedCurrent = 0)
}
"take over when two leaders crash in 3 nodes cluster" in within(60 seconds) {
crash(sortedClusterRoles(2), sortedClusterRoles(3))
verify(sortedClusterRoles(4), msg = 9, expectedCurrent = 0)
crash(sortedWorkerNodes(2), sortedWorkerNodes(3))
verify(sortedWorkerNodes(4), msg = 9, expectedCurrent = 0)
}
"take over when leader crashes in 2 nodes cluster" in within(60 seconds) {
crash(sortedClusterRoles(4))
verify(sortedClusterRoles(5), msg = 10, expectedCurrent = 0)
crash(sortedWorkerNodes(4))
verify(sortedWorkerNodes(5), msg = 10, expectedCurrent = 0)
}
}

View file

@ -85,7 +85,7 @@ In the log output you see that the cluster node has been started and changed sta
2552 corresponds to the port of the second seed-nodes element in the configuration.
In the log output you see that the cluster node has been started and joins the other seed node
and becomes a member of the cluster. It's status changed to 'Up'.
and becomes a member of the cluster. Its status changed to 'Up'.
Switch over to the first terminal window and see in the log output that the member joined.
@ -237,8 +237,17 @@ frontend nodes and 3 backend nodes::
mvn exec:java \
-Dexec.mainClass="sample.cluster.transformation.japi.TransformationFrontendMain"
Node Roles
^^^^^^^^^^
.. note:: The above example should probably be designed as two separate, frontend/backend, clusters, when there is a `cluster client for decoupling clusters <https://www.assembla.com/spaces/akka/tickets/1165>`_.
Not all nodes of a cluster need to perform the same function: there might be one sub-set which runs the web front-end,
one which runs the data access layer and one for the number-crunching. Deployment of actors—for example by cluster-aware
routers—can take node roles into account to achieve this distribution of responsibilities.
The roles of a node is defined in the configuration property named ``akka.cluster.roles``
and it is typically defined in the start script as a system property or environment variable.
The roles of the nodes is part of the membership information in ``MemberEvent`` that you can subscribe to.
How To Startup when Cluster Size Reached
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@ -251,6 +260,11 @@ before the leader changes member status of 'Joining' members to 'Up'.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/factorial.conf#min-nr-of-members
In a similar way you can define required number of members of a certain role
before the leader changes member status of 'Joining' members to 'Up'.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/factorial.conf#role-min-nr-of-members
You can start the actors in a ``registerOnMemberUp`` callback, which will
be invoked when the current member status is changed tp 'Up', i.e. the cluster
has at least the defined number of members.
@ -265,10 +279,10 @@ Cluster Singleton Pattern
For some use cases it is convenient and sometimes also mandatory to ensure that
you have exactly one actor of a certain type running somewhere in the cluster.
This can be implemented by subscribing to ``LeaderChanged`` events, but there are
several corner cases to consider. Therefore, this specific use case is made easily
accessible by the :ref:`cluster-singleton` in the contrib module. You can use it as is,
or adjust to fit your specific needs.
This can be implemented by subscribing to ``LeaderChanged`` or ``RoleLeaderChanged``
events, but there are several corner cases to consider. Therefore, this specific use
case is made easily accessible by the :ref:`cluster-singleton` in the contrib module.
You can use it as is, or adjust to fit your specific needs.
Failure Detector
^^^^^^^^^^^^^^^^
@ -307,7 +321,7 @@ previous heartbeat.
Phi is calculated from the mean and standard deviation of historical
inter arrival times. The previous chart is an example for standard deviation
of 200 ms. If the heartbeats arrive with less deviation the curve becomes steeper,
i.e. it's possible to determine failure more quickly. The curve looks like this for
i.e. it is possible to determine failure more quickly. The curve looks like this for
a standard deviation of 100 ms.
.. image:: images/phi2.png
@ -345,7 +359,9 @@ are already running, the configuration for a router looks like this:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala#router-lookup-config
It's the relative actor path defined in ``routees-path`` that identify what actor to lookup.
It is the relative actor path defined in ``routees-path`` that identify what actor to lookup.
It is possible to limit the lookup of routees to member nodes tagged with a certain role by
specifying ``use-role``.
``nr-of-instances`` defines total number of routees in the cluster, but there will not be
more than one per node. Setting ``nr-of-instances`` to a high value will result in new routees
@ -361,6 +377,9 @@ the configuration for a router looks like this:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala#router-deploy-config
It is possible to limit the deployment of routees to member nodes tagged with a certain role by
specifying ``use-role``.
``nr-of-instances`` defines total number of routees in the cluster, but the number of routees
per node, ``max-nr-of-instances-per-node``, will not be exceeded. Setting ``nr-of-instances``
to a high value will result in creating and deploying additional routees when new nodes join
@ -373,8 +392,8 @@ The same type of router could also have been defined in code:
See :ref:`cluster_configuration_java` section for further descriptions of the settings.
Router Example with Remote Deployed Routees
-------------------------------------------
Router Example with Lookup of Routees
-------------------------------------
Let's take a look at how to use cluster aware routers.
@ -411,7 +430,7 @@ or with create and deploy of routees. Remember, routees are the workers in this
We start with the router setup with lookup of routees. All nodes start ``StatsService`` and
``StatsWorker`` actors and the router is configured with ``routees-path``:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleMain.java#start-router-lookup
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#config-router-lookup
This means that user requests can be sent to ``StatsService`` on any node and it will use
``StatsWorker`` on all nodes. There can only be one worker per node, but that worker could easily
@ -424,22 +443,22 @@ Run it by starting nodes in different terminal windows. For example, starting 3
service nodes and 1 client::
mvn exec:java \
-Dexec.mainClass="run-main sample.cluster.stats.japi.StatsSampleMain" \
-Dexec.mainClass="sample.cluster.stats.japi.StatsSampleMain" \
-Dexec.args="2551"
mvn exec:java \
-Dexec.mainClass="run-main sample.cluster.stats.japi.StatsSampleMain" \
-Dexec.mainClass="sample.cluster.stats.japi.StatsSampleMain" \
-Dexec.args="2552"
mvn exec:java \
-Dexec.mainClass="run-main sample.cluster.stats.japi.StatsSampleMain"
-Dexec.mainClass="sample.cluster.stats.japi.StatsSampleMain"
mvn exec:java \
-Dexec.mainClass="run-main sample.cluster.stats.japi.StatsSampleMain"
-Dexec.mainClass="sample.cluster.stats.japi.StatsSampleMain"
Router Example with Lookup of Routees
-------------------------------------
Router Example with Remote Deployed Routees
-------------------------------------------
The above setup is nice for this example, but we will also take a look at how to use
a single master node that creates and deploys workers. To keep track of a single
@ -460,7 +479,7 @@ sorted first in the member ring, i.e. it can change when new nodes join or when
All nodes start ``StatsFacade`` and the ``ClusterSingletonManager``. The router is now configured like this:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterMain.java#start-router-deploy
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#config-router-deploy
This example is included in ``akka-samples/akka-sample-cluster`` and you can try it by copying the
`source <@github@/akka-samples/akka-sample-cluster>`_ to your
@ -539,11 +558,11 @@ The frontend that receives user jobs and delegates to the backends via the route
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontend.java#frontend
As you can see, the router is defined in the same way as other routers, and in this case it's configured as follows:
As you can see, the router is defined in the same way as other routers, and in this case it is configured as follows:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#adaptive-router
It's only router type ``adaptive`` and the ``metrics-selector`` that is specific to this router, other things work
It is only router type ``adaptive`` and the ``metrics-selector`` that is specific to this router, other things work
in the same way as other routers.
The same type of router could also have been defined in code:
@ -559,18 +578,18 @@ Run it by starting nodes in different terminal windows. For example, starting 3
one frontend::
mvn exec:java \
-Dexec.mainClass="sample.cluster.factorial.FactorialBackendMain" \
-Dexec.mainClass="sample.cluster.factorial.japi.FactorialBackendMain" \
-Dexec.args="2551"
mvn exec:java \
-Dexec.mainClass="sample.cluster.factorial.FactorialBackendMain" \
-Dexec.mainClass="sample.cluster.factorial.japi.FactorialBackendMain" \
-Dexec.args="2552"
mvn exec:java \
-Dexec.mainClass="sample.cluster.factorial.FactorialBackendMain"
-Dexec.mainClass="sample.cluster.factorial.japi.FactorialBackendMain"
mvn exec:java \
-Dexec.mainClass="sample.cluster.factorial.FactorialFrontendMain"
-Dexec.mainClass="sample.cluster.factorial.japi.FactorialFrontendMain"
Press ctrl-c in the terminal window of the frontend to stop the factorial calculations.
@ -578,7 +597,7 @@ Press ctrl-c in the terminal window of the frontend to stop the factorial calcul
Subscribe to Metrics Events
---------------------------
It's possible to subscribe to the metrics events directly to implement other functionality.
It is possible to subscribe to the metrics events directly to implement other functionality.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/MetricsListener.java#metrics-listener

View file

@ -63,7 +63,7 @@ In the log output you see that the cluster node has been started and changed sta
2552 corresponds to the port of the second seed-nodes element in the configuration.
In the log output you see that the cluster node has been started and joins the other seed node
and becomes a member of the cluster. It's status changed to 'Up'.
and becomes a member of the cluster. Its status changed to 'Up'.
Switch over to the first terminal window and see in the log output that the member joined.
@ -210,8 +210,17 @@ frontend nodes and 3 backend nodes::
run-main sample.cluster.transformation.TransformationFrontend
Node Roles
^^^^^^^^^^
.. note:: The above example should probably be designed as two separate, frontend/backend, clusters, when there is a `cluster client for decoupling clusters <https://www.assembla.com/spaces/akka/tickets/1165>`_.
Not all nodes of a cluster need to perform the same function: there might be one sub-set which runs the web front-end,
one which runs the data access layer and one for the number-crunching. Deployment of actors—for example by cluster-aware
routers—can take node roles into account to achieve this distribution of responsibilities.
The roles of a node is defined in the configuration property named ``akka.cluster.roles``
and it is typically defined in the start script as a system property or environment variable.
The roles of the nodes is part of the membership information in ``MemberEvent`` that you can subscribe to.
How To Startup when Cluster Size Reached
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@ -224,6 +233,11 @@ before the leader changes member status of 'Joining' members to 'Up'.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/factorial.conf#min-nr-of-members
In a similar way you can define required number of members of a certain role
before the leader changes member status of 'Joining' members to 'Up'.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/factorial.conf#role-min-nr-of-members
You can start the actors in a ``registerOnMemberUp`` callback, which will
be invoked when the current member status is changed tp 'Up', i.e. the cluster
has at least the defined number of members.
@ -238,10 +252,10 @@ Cluster Singleton Pattern
For some use cases it is convenient and sometimes also mandatory to ensure that
you have exactly one actor of a certain type running somewhere in the cluster.
This can be implemented by subscribing to ``LeaderChanged`` events, but there are
several corner cases to consider. Therefore, this specific use case is made easily
accessible by the :ref:`cluster-singleton` in the contrib module. You can use it as is,
or adjust to fit your specific needs.
This can be implemented by subscribing to ``LeaderChanged`` or ``RoleLeaderChanged``
events, but there are several corner cases to consider. Therefore, this specific use
case is made easily accessible by the :ref:`cluster-singleton` in the contrib module.
You can use it as is, or adjust to fit your specific needs.
Failure Detector
^^^^^^^^^^^^^^^^
@ -280,7 +294,7 @@ previous heartbeat.
Phi is calculated from the mean and standard deviation of historical
inter arrival times. The previous chart is an example for standard deviation
of 200 ms. If the heartbeats arrive with less deviation the curve becomes steeper,
i.e. it's possible to determine failure more quickly. The curve looks like this for
i.e. it is possible to determine failure more quickly. The curve looks like this for
a standard deviation of 100 ms.
.. image:: images/phi2.png
@ -321,7 +335,9 @@ are already running, the configuration for a router looks like this:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala#router-lookup-config
It's the relative actor path defined in ``routees-path`` that identify what actor to lookup.
It is the relative actor path defined in ``routees-path`` that identify what actor to lookup.
It is possible to limit the lookup of routees to member nodes tagged with a certain role by
specifying ``use-role``.
``nr-of-instances`` defines total number of routees in the cluster, but there will not be
more than one per node. Setting ``nr-of-instances`` to a high value will result in new routees
@ -336,6 +352,8 @@ the configuration for a router looks like this:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala#router-deploy-config
It is possible to limit the deployment of routees to member nodes tagged with a certain role by
specifying ``use-role``.
``nr-of-instances`` defines total number of routees in the cluster, but the number of routees
per node, ``max-nr-of-instances-per-node``, will not be exceeded. Setting ``nr-of-instances``
@ -349,8 +367,8 @@ The same type of router could also have been defined in code:
See :ref:`cluster_configuration_scala` section for further descriptions of the settings.
Router Example with Remote Deployed Routees
-------------------------------------------
Router Example with Lookup of Routees
-------------------------------------
Let's take a look at how to use cluster aware routers.
@ -385,7 +403,7 @@ or with create and deploy of routees. Remember, routees are the workers in this
We start with the router setup with lookup of routees. All nodes start ``StatsService`` and
``StatsWorker`` actors and the router is configured with ``routees-path``:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#start-router-lookup
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#config-router-lookup
This means that user requests can be sent to ``StatsService`` on any node and it will use
``StatsWorker`` on all nodes. There can only be one worker per node, but that worker could easily
@ -407,8 +425,8 @@ service nodes and 1 client::
run-main sample.cluster.stats.StatsSample
Router Example with Lookup of Routees
-------------------------------------
Router Example with Remote Deployed Routees
-------------------------------------------
The above setup is nice for this example, but we will also take a look at how to use
a single master node that creates and deploys workers. To keep track of a single
@ -429,7 +447,7 @@ sorted first in the member ring, i.e. it can change when new nodes join or when
All nodes start ``StatsFacade`` and the ``ClusterSingletonManager``. The router is now configured like this:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#start-router-deploy
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#config-router-deploy
This example is included in ``akka-samples/akka-sample-cluster``
@ -495,11 +513,11 @@ The frontend that receives user jobs and delegates to the backends via the route
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#frontend
As you can see, the router is defined in the same way as other routers, and in this case it's configured as follows:
As you can see, the router is defined in the same way as other routers, and in this case it is configured as follows:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#adaptive-router
It's only router type ``adaptive`` and the ``metrics-selector`` that is specific to this router, other things work
It is only router type ``adaptive`` and the ``metrics-selector`` that is specific to this router, other things work
in the same way as other routers.
The same type of router could also have been defined in code:
@ -528,7 +546,7 @@ Press ctrl-c in the terminal window of the frontend to stop the factorial calcul
Subscribe to Metrics Events
---------------------------
It's possible to subscribe to the metrics events directly to implement other functionality.
It is possible to subscribe to the metrics events directly to implement other functionality.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#metrics-listener
@ -560,7 +578,7 @@ implemented differently, but often they are the same and extend an abstract test
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala#concrete-tests
Note the naming convention of these classes. The name of the classes must end with ``MultiJvmNode1``, ``MultiJvmNode2``
and so on. It's possible to define another suffix to be used by the ``sbt-multi-jvm``, but the default should be
and so on. It is possible to define another suffix to be used by the ``sbt-multi-jvm``, but the default should be
fine in most cases.
Then the abstract ``MultiNodeSpec``, which takes the ``MultiNodeConfig`` as constructor parameter.

View file

@ -1,5 +1,6 @@
package sample.cluster.factorial.japi;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorSystem;
import akka.actor.Props;
@ -7,12 +8,15 @@ import akka.actor.Props;
public class FactorialBackendMain {
public static void main(String[] args) throws Exception {
// Override the configuration of the port
// when specified as program argument
if (args.length > 0)
System.setProperty("akka.remote.netty.tcp.port", args[0]);
// Override the configuration of the port when specified as program argument
final Config config =
(args.length > 0 ?
ConfigFactory.parseString(String.format("akka.remote.netty.tcp.port=%s", args[0])) :
ConfigFactory.empty()).
withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
withFallback(ConfigFactory.load("factorial"));
ActorSystem system = ActorSystem.create("ClusterSystem", ConfigFactory.load("factorial"));
ActorSystem system = ActorSystem.create("ClusterSystem", config);
system.actorOf(new Props(FactorialBackend.class), "factorialBackend");

View file

@ -62,29 +62,31 @@ public class FactorialFrontend extends UntypedActor {
abstract class FactorialFrontend2 extends UntypedActor {
//#router-lookup-in-code
int totalInstances = 100;
String routeesPath = "/user/statsWorker";
String routeesPath = "/user/factorialBackend";
boolean allowLocalRoutees = true;
String useRole = "backend";
ActorRef backend = getContext().actorOf(
new Props(FactorialBackend.class).withRouter(new ClusterRouterConfig(
new AdaptiveLoadBalancingRouter(HeapMetricsSelector.getInstance(), 0),
new AdaptiveLoadBalancingRouter(HeapMetricsSelector.getInstance(), 0),
new ClusterRouterSettings(
totalInstances, routeesPath, allowLocalRoutees))),
totalInstances, routeesPath, allowLocalRoutees, useRole))),
"factorialBackendRouter2");
//#router-lookup-in-code
}
//not used, only for documentation
abstract class StatsService3 extends UntypedActor {
abstract class FactorialFrontend3 extends UntypedActor {
//#router-deploy-in-code
int totalInstances = 100;
int maxInstancesPerNode = 3;
boolean allowLocalRoutees = false;
String useRole = "backend";
ActorRef backend = getContext().actorOf(
new Props(FactorialBackend.class).withRouter(new ClusterRouterConfig(
new AdaptiveLoadBalancingRouter(
SystemLoadAverageMetricsSelector.getInstance(), 0),
SystemLoadAverageMetricsSelector.getInstance(), 0),
new ClusterRouterSettings(
totalInstances, maxInstancesPerNode, allowLocalRoutees))),
totalInstances, maxInstancesPerNode, allowLocalRoutees, useRole))),
"factorialBackendRouter3");
//#router-deploy-in-code
}

View file

@ -1,5 +1,6 @@
package sample.cluster.factorial.japi;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorSystem;
import akka.actor.Props;
@ -12,8 +13,11 @@ public class FactorialFrontendMain {
public static void main(String[] args) throws Exception {
final int upToN = (args.length == 0 ? 200 : Integer.valueOf(args[0]));
final ActorSystem system = ActorSystem.create("ClusterSystem", ConfigFactory.load("factorial"));
system.log().info("Factorials will start when 3 members in the cluster.");
final Config config = ConfigFactory.parseString("akka.cluster.roles = [frontend]").
withFallback(ConfigFactory.load("factorial"));
final ActorSystem system = ActorSystem.create("ClusterSystem", config);
system.log().info("Factorials will start when 2 backend members in the cluster.");
//#registerOnUp
Cluster.get(system).registerOnMemberUp(new Runnable() {
@Override

View file

@ -9,7 +9,7 @@ import akka.actor.UntypedActor;
import akka.dispatch.Recover;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.LeaderChanged;
import akka.cluster.ClusterEvent.RoleLeaderChanged;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.util.Timeout;
@ -25,10 +25,10 @@ public class StatsFacade extends UntypedActor {
Address currentMaster = null;
//subscribe to cluster changes, MemberEvent
//subscribe to cluster changes, RoleLeaderChanged
@Override
public void preStart() {
cluster.subscribe(getSelf(), LeaderChanged.class);
cluster.subscribe(getSelf(), RoleLeaderChanged.class);
}
//re-subscribe when restart
@ -57,11 +57,12 @@ public class StatsFacade extends UntypedActor {
} else if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
currentMaster = state.getLeader();
currentMaster = state.getRoleLeader("compute");
} else if (message instanceof LeaderChanged) {
LeaderChanged leaderChanged = (LeaderChanged) message;
currentMaster = leaderChanged.getLeader();
} else if (message instanceof RoleLeaderChanged) {
RoleLeaderChanged leaderChanged = (RoleLeaderChanged) message;
if (leaderChanged.role().equals("compute"))
currentMaster = leaderChanged.getLeader();
} else {
unhandled(message);

View file

@ -77,14 +77,15 @@ public class StatsSampleClient extends UntypedActor {
CurrentClusterState state = (CurrentClusterState) message;
nodes.clear();
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
if (member.hasRole("compute") && member.status().equals(MemberStatus.up())) {
nodes.add(member.address());
}
}
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
nodes.add(mUp.member().address());
if (mUp.member().hasRole("compute"))
nodes.add(mUp.member().address());
} else if (message instanceof MemberEvent) {
MemberEvent other = (MemberEvent) message;

View file

@ -8,6 +8,7 @@ import akka.actor.UntypedActorFactory;
public class StatsSampleClientMain {
public static void main(String[] args) throws Exception {
// note that client is not a compute node, role not defined
ActorSystem system = ActorSystem.create("ClusterSystem");
system.actorOf(new Props(new UntypedActorFactory() {
@Override

View file

@ -1,37 +1,25 @@
package sample.cluster.stats.japi;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.typesafe.config.ConfigFactory;
public class StatsSampleMain {
public static void main(String[] args) throws Exception {
// Override the configuration of the port
// when specified as program argument
if (args.length > 0)
System.setProperty("akka.remote.netty.tcp.port", args[0]);
// Override the configuration of the port when specified as program argument
final Config config =
(args.length > 0 ?
ConfigFactory.parseString(String.format("akka.remote.netty.tcp.port=%s", args[0])) :
ConfigFactory.empty()).
withFallback(ConfigFactory.parseString("akka.cluster.roles = [compute]")).
withFallback(ConfigFactory.load());
//#start-router-lookup
ActorSystem system = ActorSystem.create("ClusterSystem",
ConfigFactory.parseString(
"akka.actor.deployment { \n" +
" /statsService/workerRouter { \n" +
" router = consistent-hashing \n" +
" nr-of-instances = 100 \n" +
" cluster { \n" +
" enabled = on \n" +
" routees-path = \"/user/statsWorker\" \n" +
" allow-local-routees = on \n" +
" } \n" +
" } \n" +
"} \n")
.withFallback(ConfigFactory.load()));
ActorSystem system = ActorSystem.create("ClusterSystem", config);
system.actorOf(new Props(StatsWorker.class), "statsWorker");
system.actorOf(new Props(StatsService.class), "statsService");
//#start-router-lookup
}
}

View file

@ -11,22 +11,8 @@ import akka.contrib.pattern.ClusterSingletonPropsFactory;
public class StatsSampleOneMasterClientMain {
public static void main(String[] args) throws Exception {
// note that client is not a compute node, role not defined
ActorSystem system = ActorSystem.create("ClusterSystem");
// the client is also part of the cluster
system.actorOf(new Props(new UntypedActorFactory() {
@Override
public ClusterSingletonManager create() {
return new ClusterSingletonManager("statsService", PoisonPill.getInstance(),
new ClusterSingletonPropsFactory() {
@Override
public Props create(Object handOverData) {
return new Props(StatsService.class);
}
});
}
}), "singleton");
system.actorOf(new Props(new UntypedActorFactory() {
@Override
public UntypedActor create() {

View file

@ -1,5 +1,7 @@
package sample.cluster.stats.japi;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
@ -7,38 +9,25 @@ import akka.actor.UntypedActorFactory;
import akka.contrib.pattern.ClusterSingletonManager;
import akka.contrib.pattern.ClusterSingletonPropsFactory;
import com.typesafe.config.ConfigFactory;
public class StatsSampleOneMasterMain {
public static void main(String[] args) throws Exception {
// Override the configuration of the port
// when specified as program argument
if (args.length > 0)
System.setProperty("akka.remote.netty.tcp.port", args[0]);
// Override the configuration of the port when specified as program argument
final Config config =
(args.length > 0 ?
ConfigFactory.parseString(String.format("akka.remote.netty.tcp.port=%s", args[0])) :
ConfigFactory.empty()).
withFallback(ConfigFactory.parseString("akka.cluster.roles = [compute]")).
withFallback(ConfigFactory.load());
//#start-router-deploy
ActorSystem system = ActorSystem.create("ClusterSystem",
ConfigFactory.parseString(
"akka.actor.deployment { \n" +
" /singleton/statsService/workerRouter { \n" +
" router = consistent-hashing \n" +
" nr-of-instances = 100 \n" +
" cluster { \n" +
" enabled = on \n" +
" max-nr-of-instances-per-node = 3 \n" +
" allow-local-routees = off \n" +
" } \n" +
" } \n" +
"} \n")
.withFallback(ConfigFactory.load()));
//#start-router-deploy
ActorSystem system = ActorSystem.create("ClusterSystem", config);
//#create-singleton-manager
system.actorOf(new Props(new UntypedActorFactory() {
@Override
public ClusterSingletonManager create() {
return new ClusterSingletonManager("statsService", PoisonPill.getInstance(),
"compute",
new ClusterSingletonPropsFactory() {
@Override
public Props create(Object handOverData) {

View file

@ -60,10 +60,11 @@ abstract class StatsService2 extends UntypedActor {
int totalInstances = 100;
String routeesPath = "/user/statsWorker";
boolean allowLocalRoutees = true;
String useRole = "compute";
ActorRef workerRouter = getContext().actorOf(
new Props(StatsWorker.class).withRouter(new ClusterRouterConfig(
new ConsistentHashingRouter(0), new ClusterRouterSettings(
totalInstances, routeesPath, allowLocalRoutees))),
totalInstances, routeesPath, allowLocalRoutees, useRole))),
"workerRouter2");
//#router-lookup-in-code
}
@ -74,10 +75,11 @@ abstract class StatsService3 extends UntypedActor {
int totalInstances = 100;
int maxInstancesPerNode = 3;
boolean allowLocalRoutees = false;
String useRole = "compute";
ActorRef workerRouter = getContext().actorOf(
new Props(StatsWorker.class).withRouter(new ClusterRouterConfig(
new ConsistentHashingRouter(0), new ClusterRouterSettings(
totalInstances, maxInstancesPerNode, allowLocalRoutees))),
totalInstances, maxInstancesPerNode, allowLocalRoutees, useRole))),
"workerRouter3");
//#router-deploy-in-code
}

View file

@ -54,10 +54,9 @@ public class TransformationBackend extends UntypedActor {
}
}
//try to register to all nodes, even though there
// might not be any frontend on all nodes
void register(Member member) {
getContext().actorFor(member.address() + "/user/frontend").tell(
if (member.hasRole("frontend"))
getContext().actorFor(member.address() + "/user/frontend").tell(
BACKEND_REGISTRATION, getSelf());
}
}

View file

@ -21,6 +21,36 @@ akka {
}
# //#cluster
# //#config-router-lookup
akka.actor.deployment {
/statsService/workerRouter {
router = consistent-hashing
nr-of-instances = 100
cluster {
enabled = on
routees-path = "/user/statsWorker"
allow-local-routees = on
use-role = compute
}
}
}
# //#config-router-lookup
# //#config-router-deploy
akka.actor.deployment {
/singleton/statsService/workerRouter {
router = consistent-hashing
nr-of-instances = 100
cluster {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = off
use-role = compute
}
}
}
# //#config-router-deploy
# //#adaptive-router
akka.actor.deployment {
/factorialFrontend/factorialBackendRouter = {
@ -33,6 +63,7 @@ akka.actor.deployment {
cluster {
enabled = on
routees-path = "/user/factorialBackend"
use-role = backend
allow-local-routees = off
}
}

View file

@ -2,4 +2,11 @@ include "application"
# //#min-nr-of-members
akka.cluster.min-nr-of-members = 3
# //#min-nr-of-members
# //#min-nr-of-members
# //#role-min-nr-of-members
akka.cluster.role {
frontend.min-nr-of-members = 1
backend.min-nr-of-members = 2
}
# //#role-min-nr-of-members

View file

@ -22,8 +22,11 @@ object FactorialFrontend {
def main(args: Array[String]): Unit = {
val upToN = if (args.isEmpty) 200 else args(0).toInt
val system = ActorSystem("ClusterSystem", ConfigFactory.load("factorial"))
system.log.info("Factorials will start when 3 members in the cluster.")
val config = ConfigFactory.parseString("akka.cluster.roles = [frontend]").
withFallback(ConfigFactory.load("factorial"))
val system = ActorSystem("ClusterSystem", config)
system.log.info("Factorials will start when 2 backend members in the cluster.")
//#registerOnUp
Cluster(system) registerOnMemberUp {
system.actorOf(Props(new FactorialFrontend(upToN, repeat = true)),
@ -58,11 +61,14 @@ class FactorialFrontend(upToN: Int, repeat: Boolean) extends Actor with ActorLog
object FactorialBackend {
def main(args: Array[String]): Unit = {
// Override the configuration of the port
// when specified as program argument
if (args.nonEmpty) System.setProperty("akka.remote.netty.tcp.port", args(0))
// Override the configuration of the port when specified as program argument
val config =
(if (args.nonEmpty) ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${args(0)}")
else ConfigFactory.empty).withFallback(
ConfigFactory.parseString("akka.cluster.roles = [backend]")).
withFallback(ConfigFactory.load("factorial"))
val system = ActorSystem("ClusterSystem", ConfigFactory.load("factorial"))
val system = ActorSystem("ClusterSystem", config)
system.actorOf(Props[FactorialBackend], name = "factorialBackend")
system.actorOf(Props[MetricsListener], name = "metricsListener")
@ -143,8 +149,8 @@ abstract class FactorialFrontend2 extends Actor {
val backend = context.actorOf(Props[FactorialBackend].withRouter(
ClusterRouterConfig(AdaptiveLoadBalancingRouter(HeapMetricsSelector),
ClusterRouterSettings(
totalInstances = 100, routeesPath = "/user/statsWorker",
allowLocalRoutees = true))),
totalInstances = 100, routeesPath = "/user/factorialBackend",
allowLocalRoutees = true, useRole = Some("backend")))),
name = "factorialBackendRouter2")
//#router-lookup-in-code
}
@ -161,7 +167,7 @@ abstract class FactorialFrontend3 extends Actor {
ClusterRouterConfig(AdaptiveLoadBalancingRouter(
SystemLoadAverageMetricsSelector), ClusterRouterSettings(
totalInstances = 100, maxInstancesPerNode = 3,
allowLocalRoutees = false))),
allowLocalRoutees = false, useRole = Some("backend")))),
name = "factorialBackendRouter3")
//#router-deploy-in-code
}

View file

@ -95,9 +95,9 @@ class StatsFacade extends Actor with ActorLogging {
var currentMaster: Option[Address] = None
// subscribe to cluster changes, LeaderChanged
// subscribe to cluster changes, RoleLeaderChanged
// re-subscribe when restart
override def preStart(): Unit = cluster.subscribe(self, classOf[LeaderChanged])
override def preStart(): Unit = cluster.subscribe(self, classOf[RoleLeaderChanged])
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
@ -112,8 +112,11 @@ class StatsFacade extends Actor with ActorLogging {
case _ JobFailed("Service unavailable, try again later")
} pipeTo sender
}
case state: CurrentClusterState currentMaster = state.leader
case LeaderChanged(leader) currentMaster = leader
case state: CurrentClusterState
currentMaster = state.roleLeader("compute")
case RoleLeaderChanged(role, leader)
if (role == "compute")
currentMaster = leader
}
}
@ -121,58 +124,36 @@ class StatsFacade extends Actor with ActorLogging {
object StatsSample {
def main(args: Array[String]): Unit = {
// Override the configuration of the port
// when specified as program argument
if (args.nonEmpty) System.setProperty("akka.remote.netty.tcp.port", args(0))
// Override the configuration of the port when specified as program argument
val config =
(if (args.nonEmpty) ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${args(0)}")
else ConfigFactory.empty).withFallback(
ConfigFactory.parseString("akka.cluster.roles = [compute]")).
withFallback(ConfigFactory.load())
//#start-router-lookup
val system = ActorSystem("ClusterSystem", ConfigFactory.parseString("""
akka.actor.deployment {
/statsService/workerRouter {
router = consistent-hashing
nr-of-instances = 100
cluster {
enabled = on
routees-path = "/user/statsWorker"
allow-local-routees = on
}
}
}
""").withFallback(ConfigFactory.load()))
val system = ActorSystem("ClusterSystem", config)
system.actorOf(Props[StatsWorker], name = "statsWorker")
system.actorOf(Props[StatsService], name = "statsService")
//#start-router-lookup
}
}
object StatsSampleOneMaster {
def main(args: Array[String]): Unit = {
// Override the configuration of the port
// when specified as program argument
if (args.nonEmpty) System.setProperty("akka.remote.netty.tcp.port", args(0))
// Override the configuration of the port when specified as program argument
val config =
(if (args.nonEmpty) ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${args(0)}")
else ConfigFactory.empty).withFallback(
ConfigFactory.parseString("akka.cluster.roles = [compute]")).
withFallback(ConfigFactory.load())
//#start-router-deploy
val system = ActorSystem("ClusterSystem", ConfigFactory.parseString("""
akka.actor.deployment {
/singleton/statsService/workerRouter {
router = consistent-hashing
nr-of-instances = 100
cluster {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = off
}
}
}
""").withFallback(ConfigFactory.load()))
//#start-router-deploy
val system = ActorSystem("ClusterSystem", config)
//#create-singleton-manager
system.actorOf(Props(new ClusterSingletonManager(
singletonProps = _ Props[StatsService], singletonName = "statsService",
terminationMessage = PoisonPill)), name = "singleton")
terminationMessage = PoisonPill, role = Some("compute"))),
name = "singleton")
//#create-singleton-manager
system.actorOf(Props[StatsFacade], name = "statsFacade")
}
@ -180,6 +161,7 @@ object StatsSampleOneMaster {
object StatsSampleClient {
def main(args: Array[String]): Unit = {
// note that client is not a compute node, role not defined
val system = ActorSystem("ClusterSystem")
system.actorOf(Props(new StatsSampleClient("/user/statsService")), "client")
}
@ -187,13 +169,8 @@ object StatsSampleClient {
object StatsSampleOneMasterClient {
def main(args: Array[String]): Unit = {
// note that client is not a compute node, role not defined
val system = ActorSystem("ClusterSystem")
// the client is also part of the cluster
system.actorOf(Props(new ClusterSingletonManager(
singletonProps = _ Props[StatsService], singletonName = "statsService",
terminationMessage = PoisonPill)), name = "singleton")
system.actorOf(Props(new StatsSampleClient("/user/statsFacade")), "client")
}
}
@ -230,10 +207,12 @@ class StatsSampleClient(servicePath: String) extends Actor {
case failed: JobFailed
println(failed)
case state: CurrentClusterState
nodes = state.members.collect { case m if m.status == MemberStatus.Up m.address }
case MemberUp(m) nodes += m.address
case other: MemberEvent nodes -= other.member.address
case UnreachableMember(m) nodes -= m.address
nodes = state.members.collect {
case m if m.hasRole("compute") && m.status == MemberStatus.Up m.address
}
case MemberUp(m) if m.hasRole("compute") nodes += m.address
case other: MemberEvent nodes -= other.member.address
case UnreachableMember(m) nodes -= m.address
}
}
@ -248,7 +227,7 @@ abstract class StatsService2 extends Actor {
val workerRouter = context.actorOf(Props[StatsWorker].withRouter(
ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings(
totalInstances = 100, routeesPath = "/user/statsWorker",
allowLocalRoutees = true))),
allowLocalRoutees = true, useRole = Some("compute")))),
name = "workerRouter2")
//#router-lookup-in-code
}
@ -263,7 +242,7 @@ abstract class StatsService3 extends Actor {
val workerRouter = context.actorOf(Props[StatsWorker].withRouter(
ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings(
totalInstances = 100, maxInstancesPerNode = 3,
allowLocalRoutees = false))),
allowLocalRoutees = false, useRole = None))),
name = "workerRouter3")
//#router-deploy-in-code
}

View file

@ -3,7 +3,6 @@ package sample.cluster.transformation
//#imports
import language.postfixOps
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
@ -17,6 +16,7 @@ import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.pattern.ask
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
//#imports
//#messages
@ -28,11 +28,14 @@ case object BackendRegistration
object TransformationFrontend {
def main(args: Array[String]): Unit = {
// Override the configuration of the port
// when specified as program argument
if (args.nonEmpty) System.setProperty("akka.remote.netty.tcp.port", args(0))
// Override the configuration of the port when specified as program argument
val config =
(if (args.nonEmpty) ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${args(0)}")
else ConfigFactory.empty).withFallback(
ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem")
val system = ActorSystem("ClusterSystem", config)
val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
import system.dispatcher
@ -41,7 +44,7 @@ object TransformationFrontend {
(frontend ? TransformationJob("hello-" + n)) onSuccess {
case result println(result)
}
// wait a while until next request,
// wait a while until next request,
// to avoid flooding the console with output
Thread.sleep(2000)
}
@ -75,11 +78,14 @@ class TransformationFrontend extends Actor {
object TransformationBackend {
def main(args: Array[String]): Unit = {
// Override the configuration of the port
// when specified as program argument
if (args.nonEmpty) System.setProperty("akka.remote.netty.tcp.port", args(0))
// Override the configuration of the port when specified as program argument
val config =
(if (args.nonEmpty) ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${args(0)}")
else ConfigFactory.empty).withFallback(
ConfigFactory.parseString("akka.cluster.roles = [backend]")).
withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem")
val system = ActorSystem("ClusterSystem", config)
system.actorOf(Props[TransformationBackend], name = "backend")
}
}
@ -101,10 +107,9 @@ class TransformationBackend extends Actor {
case MemberUp(m) register(m)
}
// try to register to all nodes, even though there
// might not be any frontend on all nodes
def register(member: Member): Unit =
context.actorFor(RootActorPath(member.address) / "user" / "frontend") !
BackendRegistration
if (member.hasRole("frontend"))
context.actorFor(RootActorPath(member.address) / "user" / "frontend") !
BackendRegistration
}
//#backend

View file

@ -31,6 +31,7 @@ object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig {
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.roles = [compute]
akka.cluster.auto-join = off
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
@ -43,6 +44,7 @@ object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = off
use-role = compute
}
}
}
@ -75,15 +77,15 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing
Cluster(system) join node(first).address
expectMsgAllOf(
MemberUp(Member(node(first).address, MemberStatus.Up)),
MemberUp(Member(node(second).address, MemberStatus.Up)),
MemberUp(Member(node(third).address, MemberStatus.Up)))
MemberUp(Member(node(first).address, MemberStatus.Up, Set.empty)),
MemberUp(Member(node(second).address, MemberStatus.Up, Set.empty)),
MemberUp(Member(node(third).address, MemberStatus.Up, Set.empty)))
Cluster(system).unsubscribe(testActor)
system.actorOf(Props(new ClusterSingletonManager(
singletonProps = _ Props[StatsService], singletonName = "statsService",
terminationMessage = PoisonPill)), name = "singleton")
terminationMessage = PoisonPill, role = Some("compute"))), name = "singleton")
system.actorOf(Props[StatsFacade], "statsFacade")

View file

@ -26,6 +26,7 @@ object StatsSampleSpecConfig extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.roles = [compute]
akka.cluster.auto-join = off
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
@ -38,6 +39,7 @@ object StatsSampleSpecConfig extends MultiNodeConfig {
enabled = on
routees-path = "/user/statsWorker"
allow-local-routees = on
use-role = compute
}
}
}
@ -96,9 +98,9 @@ abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig)
system.actorOf(Props[StatsService], "statsService")
expectMsgAllOf(
MemberUp(Member(firstAddress, MemberStatus.Up)),
MemberUp(Member(secondAddress, MemberStatus.Up)),
MemberUp(Member(thirdAddress, MemberStatus.Up)))
MemberUp(Member(firstAddress, MemberStatus.Up, Set.empty)),
MemberUp(Member(secondAddress, MemberStatus.Up, Set.empty)),
MemberUp(Member(thirdAddress, MemberStatus.Up, Set.empty)))
Cluster(system).unsubscribe(testActor)

View file

@ -30,6 +30,7 @@ object StatsSampleJapiSpecConfig extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.roles = [compute]
akka.cluster.auto-join = off
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
@ -41,6 +42,7 @@ object StatsSampleJapiSpecConfig extends MultiNodeConfig {
enabled = on
routees-path = "/user/statsWorker"
allow-local-routees = on
use-role = compute
}
}
}
@ -81,9 +83,9 @@ abstract class StatsSampleJapiSpec extends MultiNodeSpec(StatsSampleJapiSpecConf
system.actorOf(Props[StatsService], "statsService")
expectMsgAllOf(
MemberUp(Member(firstAddress, MemberStatus.Up)),
MemberUp(Member(secondAddress, MemberStatus.Up)),
MemberUp(Member(thirdAddress, MemberStatus.Up)))
MemberUp(Member(firstAddress, MemberStatus.Up, Set.empty)),
MemberUp(Member(secondAddress, MemberStatus.Up, Set.empty)),
MemberUp(Member(thirdAddress, MemberStatus.Up, Set.empty)))
Cluster(system).unsubscribe(testActor)

View file

@ -33,6 +33,7 @@ object StatsSampleSingleMasterJapiSpecConfig extends MultiNodeConfig {
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.roles = [compute]
akka.cluster.auto-join = off
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
@ -44,6 +45,7 @@ object StatsSampleSingleMasterJapiSpecConfig extends MultiNodeConfig {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = off
use-role = compute
}
}
}
@ -75,15 +77,16 @@ abstract class StatsSampleSingleMasterJapiSpec extends MultiNodeSpec(StatsSample
Cluster(system) join node(first).address
expectMsgAllOf(
MemberUp(Member(node(first).address, MemberStatus.Up)),
MemberUp(Member(node(second).address, MemberStatus.Up)),
MemberUp(Member(node(third).address, MemberStatus.Up)))
MemberUp(Member(node(first).address, MemberStatus.Up, Set.empty)),
MemberUp(Member(node(second).address, MemberStatus.Up, Set.empty)),
MemberUp(Member(node(third).address, MemberStatus.Up, Set.empty)))
Cluster(system).unsubscribe(testActor)
system.actorOf(Props(new ClusterSingletonManager(
singletonName = "statsService",
terminationMessage = PoisonPill,
role = null,
singletonPropsFactory = new ClusterSingletonPropsFactory {
def create(handOverData: Any) = Props[StatsService]
})), name = "singleton")

View file

@ -33,6 +33,11 @@ object TransformationSampleSpecConfig extends MultiNodeConfig {
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
"""))
nodeConfig(frontend1, frontend2)(
ConfigFactory.parseString("akka.cluster.roles =[frontend]"))
nodeConfig(backend1, backend2, backend3)(
ConfigFactory.parseString("akka.cluster.roles =[backend]"))
}
// need one concrete test class per node

View file

@ -34,6 +34,12 @@ object TransformationSampleJapiSpecConfig extends MultiNodeConfig {
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
"""))
nodeConfig(frontend1, frontend2)(
ConfigFactory.parseString("akka.cluster.roles =[frontend]"))
nodeConfig(backend1, backend2, backend3)(
ConfigFactory.parseString("akka.cluster.roles =[backend]"))
}
// need one concrete test class per node