Move Cluster query methods to ClusterReadView, see #2202

* Better separation of concerns
* Internal API (could be made public if requested)
This commit is contained in:
Patrik Nordwall 2012-08-16 18:28:01 +02:00
parent 331cd7fca3
commit 4e2d7b0495
21 changed files with 257 additions and 211 deletions

View file

@ -69,7 +69,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
* if (Cluster(system).isLeader) { ... }
* }}}
*/
class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension with ClusterEnvironment {
class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension with ClusterEnvironment {
import ClusterEvent._
@ -88,21 +88,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
log.info("Cluster Node [{}] - is starting up...", selfAddress)
/**
* Read view of cluster state, updated via subscription of
* cluster events published on the event bus.
*/
@volatile
private var state: CurrentClusterState = CurrentClusterState()
/**
* INTERNAL API
* Read only view of internal cluster stats, updated periodically by
* ClusterCoreDaemon via event bus. Access with `latestStats`.
*/
@volatile
private var _latestStats = ClusterStats()
// ========================================================
// ===================== WORK DAEMONS =====================
// ========================================================
@ -169,22 +154,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
Await.result((clusterDaemons ? InternalClusterAction.GetClusterCoreRef).mapTo[ActorRef], timeout.duration)
}
// create actor that subscribes to the cluster eventBus to update current read view state
private val eventBusListener: ActorRef = {
system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor {
override def preStart(): Unit = subscribe(self, classOf[ClusterDomainEvent])
override def postStop(): Unit = unsubscribe(self)
def receive = {
case SeenChanged(convergence, seenBy) state = state.copy(convergence = convergence, seenBy = seenBy)
case MembersChanged(members) state = state.copy(members = members)
case UnreachableMembersChanged(unreachable) state = state.copy(unreachable = unreachable)
case LeaderChanged(leader, convergence) state = state.copy(leader = leader, convergence = convergence)
case s: CurrentClusterState state = s
case CurrentInternalStats(stats) _latestStats = stats
case _ // ignore, not interesting
}
}).withDispatcher(UseDispatcher), name = "clusterEventBusListener")
@volatile
private var readViewStarted = false
private[cluster] lazy val readView: ClusterReadView = {
val readView = new ClusterReadView(this)
readViewStarted = true
readView
}
system.registerOnTermination(shutdown())
@ -198,74 +173,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// ===================== PUBLIC API =====================
// ======================================================
def self: Member = {
state.members.find(_.address == selfAddress).orElse(state.unreachable.find(_.address == selfAddress)).
getOrElse(Member(selfAddress, MemberStatus.Removed))
}
/**
* Returns true if the cluster node is up and running, false if it is shut down.
*/
def isRunning: Boolean = _isRunning.get
/**
* Current cluster members, sorted with leader first.
*/
def members: SortedSet[Member] = state.members
/**
* Members that has been detected as unreachable.
*/
def unreachableMembers: Set[Member] = state.unreachable
/**
* Member status for this node ([[akka.cluster.MemberStatus]]).
*
* NOTE: If the node has been removed from the cluster (and shut down) then it's status is set to the 'REMOVED' tombstone state
* and is no longer present in the node ring or any other part of the gossiping state. However in order to maintain the
* model and the semantics the user would expect, this method will in this situation return `MemberStatus.Removed`.
*/
def status: MemberStatus = self.status
/**
* Is this node the leader?
*/
def isLeader: Boolean = leader == Some(selfAddress)
/**
* Get the address of the current leader.
*/
def leader: Option[Address] = state.leader
/**
* Is this node a singleton cluster?
*/
def isSingletonCluster: Boolean = members.size == 1
/**
* Checks if we have a cluster convergence.
*/
def convergence: Boolean = state.convergence
/**
* The nodes that has seen current version of the Gossip.
*/
def seenBy: Set[Address] = state.seenBy
/**
* Returns true if the node is UP or JOINING.
*/
def isAvailable: Boolean = {
val myself = self
!unreachableMembers.contains(myself) && !myself.status.isUnavailable
}
/**
* Make it possible to override/configure seedNodes from tests without
* specifying in config. Addresses are unknown before startup time.
*/
def seedNodes: IndexedSeq[Address] = SeedNodes
/**
* Subscribe to cluster domain events.
* The `to` Class can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]]
@ -305,9 +217,10 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// ========================================================
/**
* INTERNAL API
* Make it possible to override/configure seedNodes from tests without
* specifying in config. Addresses are unknown before startup time.
*/
private[cluster] def latestStats: ClusterStats = _latestStats
private[cluster] def seedNodes: IndexedSeq[Address] = SeedNodes
/**
* INTERNAL API.
@ -321,8 +234,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
if (_isRunning.compareAndSet(true, false)) {
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
system.stop(eventBusListener)
system.stop(clusterDaemons)
if (readViewStarted) readView.close()
scheduler.close()

View file

@ -33,10 +33,11 @@ trait ClusterNodeMBean {
/**
* Internal API
*/
private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) {
private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) {
private val mBeanServer = ManagementFactory.getPlatformMBeanServer
private val clusterMBeanName = new ObjectName("akka:type=Cluster")
private def clusterView = cluster.readView
/**
* Creates the cluster JMX MBean and registers it in the MBean server.
@ -57,34 +58,34 @@ private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) {
* }}}
*/
def getClusterStatus: String = {
val unreachable = clusterNode.unreachableMembers
"\nMembers:\n\t" + clusterNode.members.mkString("\n\t") +
val unreachable = clusterView.unreachableMembers
"\nMembers:\n\t" + clusterView.members.mkString("\n\t") +
{ if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" }
}
def getMemberStatus: String = clusterNode.status.toString
def getMemberStatus: String = clusterView.status.toString
def getLeader: String = clusterNode.leader.toString
def getLeader: String = clusterView.leader.toString
def isSingleton: Boolean = clusterNode.isSingletonCluster
def isSingleton: Boolean = clusterView.isSingletonCluster
def isConvergence: Boolean = clusterNode.convergence
def isConvergence: Boolean = clusterView.convergence
def isAvailable: Boolean = clusterNode.isAvailable
def isAvailable: Boolean = clusterView.isAvailable
def isRunning: Boolean = clusterNode.isRunning
def isRunning: Boolean = clusterView.isRunning
// JMX commands
def join(address: String) = clusterNode.join(AddressFromURIString(address))
def join(address: String) = cluster.join(AddressFromURIString(address))
def leave(address: String) = clusterNode.leave(AddressFromURIString(address))
def leave(address: String) = cluster.leave(AddressFromURIString(address))
def down(address: String) = clusterNode.down(AddressFromURIString(address))
def down(address: String) = cluster.down(AddressFromURIString(address))
}
try {
mBeanServer.registerMBean(mbean, clusterMBeanName)
log.info("Cluster Node [{}] - registered cluster JMX MBean [{}]", clusterNode.selfAddress, clusterMBeanName)
log.info("Cluster Node [{}] - registered cluster JMX MBean [{}]", clusterView.selfAddress, clusterMBeanName)
} catch {
case e: InstanceAlreadyExistsException // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
}
@ -94,6 +95,7 @@ private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) {
* Unregisters the cluster JMX MBean from MBean server.
*/
def unregisterMBean(): Unit = {
clusterView.close()
try {
mBeanServer.unregisterMBean(clusterMBeanName)
} catch {

View file

@ -0,0 +1,128 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import java.io.Closeable
import scala.collection.immutable.SortedSet
import akka.actor.{ Actor, ActorRef, ActorSystemImpl, Address, Props }
import akka.cluster.ClusterEvent._
/**
* INTERNAL API
*
* Read view of cluster state, updated via subscription of
* cluster events published on the event bus.
*/
private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
/**
* Current state
*/
@volatile
private var state: CurrentClusterState = CurrentClusterState()
/**
* Current internal cluster stats, updated periodically via event bus.
*/
@volatile
private var _latestStats = ClusterStats()
val selfAddress = cluster.selfAddress
// create actor that subscribes to the cluster eventBus to update current read view state
private val eventBusListener: ActorRef = {
cluster.system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor {
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent])
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case SeenChanged(convergence, seenBy) state = state.copy(convergence = convergence, seenBy = seenBy)
case MembersChanged(members) state = state.copy(members = members)
case UnreachableMembersChanged(unreachable) state = state.copy(unreachable = unreachable)
case LeaderChanged(leader, convergence) state = state.copy(leader = leader, convergence = convergence)
case s: CurrentClusterState state = s
case CurrentInternalStats(stats) _latestStats = stats
case _ // ignore, not interesting
}
}).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener")
}
def self: Member = {
state.members.find(_.address == selfAddress).orElse(state.unreachable.find(_.address == selfAddress)).
getOrElse(Member(selfAddress, MemberStatus.Removed))
}
/**
* Returns true if the cluster node is up and running, false if it is shut down.
*/
def isRunning: Boolean = cluster.isRunning
/**
* Current cluster members, sorted with leader first.
*/
def members: SortedSet[Member] = state.members
/**
* Members that has been detected as unreachable.
*/
def unreachableMembers: Set[Member] = state.unreachable
/**
* Member status for this node ([[akka.cluster.MemberStatus]]).
*
* NOTE: If the node has been removed from the cluster (and shut down) then it's status is set to the 'REMOVED' tombstone state
* and is no longer present in the node ring or any other part of the gossiping state. However in order to maintain the
* model and the semantics the user would expect, this method will in this situation return `MemberStatus.Removed`.
*/
def status: MemberStatus = self.status
/**
* Is this node the leader?
*/
def isLeader: Boolean = leader == Some(selfAddress)
/**
* Get the address of the current leader.
*/
def leader: Option[Address] = state.leader
/**
* Is this node a singleton cluster?
*/
def isSingletonCluster: Boolean = members.size == 1
/**
* Checks if we have a cluster convergence.
*/
def convergence: Boolean = state.convergence
/**
* Returns true if the node is UP or JOINING.
*/
def isAvailable: Boolean = {
val myself = self
!unreachableMembers.contains(myself) && !myself.status.isUnavailable
}
/**
* INTERNAL API
* The nodes that has seen current version of the Gossip.
*/
private[cluster] def seenBy: Set[Address] = state.seenBy
/**
* INTERNAL API
*/
private[cluster] def latestStats: ClusterStats = _latestStats
/**
* Unsubscribe to cluster events.
*/
def close(): Unit = {
cluster.system.stop(eventBusListener)
}
}

View file

@ -50,7 +50,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec
enterBarrier("down-third-node")
awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress))
cluster.members.exists(_.address == thirdAddress) must be(false)
clusterView.members.exists(_.address == thirdAddress) must be(false)
}
runOn(third) {

View file

@ -48,7 +48,7 @@ abstract class ClientDowningNodeThatIsUpSpec
markNodeAsUnavailable(thirdAddress)
awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress))
cluster.members.exists(_.address == thirdAddress) must be(false)
clusterView.members.exists(_.address == thirdAddress) must be(false)
}
runOn(third) {

View file

@ -65,15 +65,15 @@ abstract class ConvergenceSpec
within(28 seconds) {
// third becomes unreachable
awaitCond(cluster.unreachableMembers.size == 1)
awaitCond(cluster.members.size == 2)
awaitCond(cluster.members.forall(_.status == MemberStatus.Up))
awaitCond(clusterView.unreachableMembers.size == 1)
awaitCond(clusterView.members.size == 2)
awaitCond(clusterView.members.forall(_.status == MemberStatus.Up))
awaitSeenSameState(first, second)
// still one unreachable
cluster.unreachableMembers.size must be(1)
cluster.unreachableMembers.head.address must be(thirdAddress)
clusterView.unreachableMembers.size must be(1)
clusterView.unreachableMembers.head.address must be(thirdAddress)
// and therefore no convergence
cluster.convergence must be(false)
clusterView.convergence must be(false)
}
}
@ -88,18 +88,18 @@ abstract class ConvergenceSpec
}
def memberStatus(address: Address): Option[MemberStatus] =
cluster.members.collectFirst { case m if m.address == address m.status }
clusterView.members.collectFirst { case m if m.address == address m.status }
def assertNotMovedUp: Unit = {
within(20 seconds) {
awaitCond(cluster.members.size == 3)
awaitCond(clusterView.members.size == 3)
awaitSeenSameState(first, second, fourth)
memberStatus(first) must be(Some(MemberStatus.Up))
memberStatus(second) must be(Some(MemberStatus.Up))
// leader is not allowed to move the new node to Up
memberStatus(fourth) must be(Some(MemberStatus.Joining))
// still no convergence
cluster.convergence must be(false)
clusterView.convergence must be(false)
}
}

View file

@ -137,9 +137,9 @@ abstract class LargeClusterSpec
val clusterNodes = ifNode(from)(joiningClusterNodes)(systems.map(Cluster(_)).toSet)
val startGossipCounts = Map.empty[Cluster, Long] ++
clusterNodes.map(c (c -> c.latestStats.receivedGossipCount))
clusterNodes.map(c (c -> c.readView.latestStats.receivedGossipCount))
def gossipCount(c: Cluster): Long = {
c.latestStats.receivedGossipCount - startGossipCounts(c)
c.readView.latestStats.receivedGossipCount - startGossipCounts(c)
}
val startTime = System.nanoTime
def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms"
@ -164,7 +164,7 @@ abstract class LargeClusterSpec
Await.ready(latch, remaining)
awaitCond(clusterNodes.forall(_.convergence))
awaitCond(clusterNodes.forall(_.readView.convergence))
val counts = clusterNodes.map(gossipCount(_))
val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / clusterNodes.size, counts.min, counts.max)
log.info("Convergence of [{}] nodes reached, it took [{}], received [{}] gossip messages per node",
@ -266,9 +266,9 @@ abstract class LargeClusterSpec
within(30.seconds + (3.seconds * liveNodes)) {
val startGossipCounts = Map.empty[Cluster, Long] ++
systems.map(sys (Cluster(sys) -> Cluster(sys).latestStats.receivedGossipCount))
systems.map(sys (Cluster(sys) -> Cluster(sys).readView.latestStats.receivedGossipCount))
def gossipCount(c: Cluster): Long = {
c.latestStats.receivedGossipCount - startGossipCounts(c)
c.readView.latestStats.receivedGossipCount - startGossipCounts(c)
}
val startTime = System.nanoTime
def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms"
@ -303,8 +303,8 @@ abstract class LargeClusterSpec
runOn(firstDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) {
Await.ready(latch, remaining)
awaitCond(systems.forall(Cluster(_).convergence))
val mergeCount = systems.map(sys Cluster(sys).latestStats.mergeCount).sum
awaitCond(systems.forall(Cluster(_).readView.convergence))
val mergeCount = systems.map(sys Cluster(sys).readView.latestStats.mergeCount).sum
val counts = systems.map(sys gossipCount(Cluster(sys)))
val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / nodesPerDatacenter, counts.min, counts.max)
log.info("Convergence of [{}] nodes reached after failure, it took [{}], received [{}] gossip messages per node, merged [{}] times",

View file

@ -46,7 +46,7 @@ abstract class LeaderElectionSpec
awaitClusterUp(first, second, third, fourth)
if (myself != controller) {
cluster.isLeader must be(myself == sortedRoles.head)
clusterView.isLeader must be(myself == sortedRoles.head)
assertLeaderIn(sortedRoles)
}
@ -87,7 +87,7 @@ abstract class LeaderElectionSpec
awaitUpConvergence(currentRoles.size - 1)
val nextExpectedLeader = remainingRoles.head
cluster.isLeader must be(myself == nextExpectedLeader)
clusterView.isLeader must be(myself == nextExpectedLeader)
assertLeaderIn(remainingRoles)
enterBarrier("completed")

View file

@ -44,11 +44,11 @@ abstract class LeaderLeavingSpec
awaitClusterUp(first, second, third)
val oldLeaderAddress = cluster.leader.get
val oldLeaderAddress = clusterView.leader.get
within(leaderHandoffWaitingTime) {
if (cluster.isLeader) {
if (clusterView.isLeader) {
enterBarrier("registered-listener")
@ -56,13 +56,13 @@ abstract class LeaderLeavingSpec
enterBarrier("leader-left")
// verify that a NEW LEADER have taken over
awaitCond(!cluster.isLeader)
awaitCond(!clusterView.isLeader)
// verify that the LEADER is shut down
awaitCond(!cluster.isRunning)
// verify that the LEADER is REMOVED
awaitCond(cluster.status == MemberStatus.Removed)
awaitCond(clusterView.status == MemberStatus.Removed)
} else {
@ -90,13 +90,13 @@ abstract class LeaderLeavingSpec
exitingLatch.await
// verify that the LEADER is no longer part of the 'members' set
awaitCond(cluster.members.forall(_.address != oldLeaderAddress))
awaitCond(clusterView.members.forall(_.address != oldLeaderAddress))
// verify that the LEADER is not part of the 'unreachable' set
awaitCond(cluster.unreachableMembers.forall(_.address != oldLeaderAddress))
awaitCond(clusterView.unreachableMembers.forall(_.address != oldLeaderAddress))
// verify that we have a new LEADER
awaitCond(cluster.leader != oldLeaderAddress)
awaitCond(clusterView.leader != oldLeaderAddress)
}
enterBarrier("finished")

View file

@ -19,7 +19,7 @@ object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig {
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("akka.cluster.leader-actions-interval = 5 s") // increase the leader action task interval to allow time checking for JOIN before leader moves it to UP
.withFallback(ConfigFactory.parseString("akka.clusterView.leader-actions-interval = 5 s") // increase the leader action task interval to allow time checking for JOIN before leader moves it to UP
.withFallback(MultiNodeClusterSpec.clusterConfig)))
}

View file

@ -21,7 +21,7 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig {
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
akka.cluster.leader-actions-interval = 5 s
akka.clusterView.leader-actions-interval = 5 s
akka.cluster.unreachable-nodes-reaper-interval = 300 s # turn "off"
"""))
.withFallback(MultiNodeClusterSpec.clusterConfig))

View file

@ -97,6 +97,8 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
}
}
def clusterView: ClusterReadView = cluster.readView
/**
* Get the cluster node to use.
*/
@ -106,11 +108,11 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
* Use this method for the initial startup of the cluster node.
*/
def startClusterNode(): Unit = {
if (cluster.members.isEmpty) {
if (clusterView.members.isEmpty) {
cluster join myself
awaitCond(cluster.members.exists(_.address == address(myself)))
awaitCond(clusterView.members.exists(_.address == address(myself)))
} else
cluster.self
clusterView.self
}
/**
@ -168,8 +170,8 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
def assertLeaderIn(nodesInCluster: Seq[RoleName]): Unit = if (nodesInCluster.contains(myself)) {
nodesInCluster.length must not be (0)
val expectedLeader = roleOfLeader(nodesInCluster)
cluster.isLeader must be(ifNode(expectedLeader)(true)(false))
cluster.status must (be(MemberStatus.Up) or be(MemberStatus.Leaving))
clusterView.isLeader must be(ifNode(expectedLeader)(true)(false))
clusterView.status must (be(MemberStatus.Up) or be(MemberStatus.Leaving))
}
/**
@ -181,12 +183,12 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
canNotBePartOfMemberRing: Seq[Address] = Seq.empty[Address],
timeout: Duration = 20.seconds): Unit = {
within(timeout) {
awaitCond(cluster.members.size == numberOfMembers)
awaitCond(cluster.members.forall(_.status == MemberStatus.Up))
awaitCond(cluster.convergence)
awaitCond(clusterView.members.size == numberOfMembers)
awaitCond(clusterView.members.forall(_.status == MemberStatus.Up))
awaitCond(clusterView.convergence)
if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set
awaitCond(
canNotBePartOfMemberRing forall (address !(cluster.members exists (_.address == address))))
canNotBePartOfMemberRing forall (address !(clusterView.members exists (_.address == address))))
}
}
@ -194,7 +196,7 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
* Wait until the specified nodes have seen the same gossip overview.
*/
def awaitSeenSameState(addresses: Address*): Unit =
awaitCond((addresses.toSet -- cluster.seenBy).isEmpty)
awaitCond((addresses.toSet -- clusterView.seenBy).isEmpty)
def roleOfLeader(nodesInCluster: Seq[RoleName] = roles): RoleName = {
nodesInCluster.length must not be (0)

View file

@ -16,7 +16,7 @@ object NodeJoinMultiJvmSpec extends MultiNodeConfig {
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("akka.cluster.leader-actions-interval = 5 s") // increase the leader action task interval
.withFallback(ConfigFactory.parseString("akka.clusterView.leader-actions-interval = 5 s") // increase the leader action task interval
.withFallback(MultiNodeClusterSpec.clusterConfig)))
}
@ -42,7 +42,7 @@ abstract class NodeJoinSpec
cluster.join(first)
}
awaitCond(cluster.members.exists { member member.address == address(second) && member.status == MemberStatus.Joining })
awaitCond(clusterView.members.exists { member member.address == address(second) && member.status == MemberStatus.Joining })
enterBarrier("after")
}

View file

@ -43,16 +43,16 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
runOn(first, third) {
// verify that the 'second' node is no longer part of the 'members' set
awaitCond(cluster.members.forall(_.address != address(second)), reaperWaitingTime)
awaitCond(clusterView.members.forall(_.address != address(second)), reaperWaitingTime)
// verify that the 'second' node is not part of the 'unreachable' set
awaitCond(cluster.unreachableMembers.forall(_.address != address(second)), reaperWaitingTime)
awaitCond(clusterView.unreachableMembers.forall(_.address != address(second)), reaperWaitingTime)
}
runOn(second) {
// verify that the second node is shut down and has status REMOVED
awaitCond(!cluster.isRunning, reaperWaitingTime)
awaitCond(cluster.status == MemberStatus.Removed, reaperWaitingTime)
awaitCond(clusterView.status == MemberStatus.Removed, reaperWaitingTime)
}
enterBarrier("finished")

View file

@ -38,12 +38,12 @@ abstract class NodeMembershipSpec
runOn(first, second) {
cluster.join(first)
awaitCond(cluster.members.size == 2)
assertMembers(cluster.members, first, second)
awaitCond(clusterView.members.size == 2)
assertMembers(clusterView.members, first, second)
awaitCond {
cluster.members.forall(_.status == MemberStatus.Up)
clusterView.members.forall(_.status == MemberStatus.Up)
}
awaitCond(cluster.convergence)
awaitCond(clusterView.convergence)
}
enterBarrier("after-1")
@ -55,12 +55,12 @@ abstract class NodeMembershipSpec
cluster.join(first)
}
awaitCond(cluster.members.size == 3)
assertMembers(cluster.members, first, second, third)
awaitCond(clusterView.members.size == 3)
assertMembers(clusterView.members, first, second, third)
awaitCond {
cluster.members.forall(_.status == MemberStatus.Up)
clusterView.members.forall(_.status == MemberStatus.Up)
}
awaitCond(cluster.convergence)
awaitCond(clusterView.convergence)
enterBarrier("after-2")
}

View file

@ -60,7 +60,7 @@ abstract class NodeUpSpec
for (n 1 to 20) {
Thread.sleep(100.millis.dilated.toMillis)
unexpected.get must be(SortedSet.empty)
cluster.members.forall(_.status == MemberStatus.Up) must be(true)
clusterView.members.forall(_.status == MemberStatus.Up) must be(true)
}
enterBarrier("after-2")

View file

@ -42,14 +42,14 @@ abstract class SingletonClusterSpec
"become singleton cluster when started with 'auto-join=on' and 'seed-nodes=[]'" taggedAs LongRunningTest in {
startClusterNode()
awaitUpConvergence(1)
cluster.isSingletonCluster must be(true)
clusterView.isSingletonCluster must be(true)
enterBarrier("after-1")
}
"not be singleton cluster when joined with other node" taggedAs LongRunningTest in {
awaitClusterUp(first, second)
cluster.isSingletonCluster must be(false)
clusterView.isSingletonCluster must be(false)
assertLeader(first, second)
enterBarrier("after-2")
@ -63,7 +63,7 @@ abstract class SingletonClusterSpec
markNodeAsUnavailable(secondAddress)
awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds)
cluster.isSingletonCluster must be(true)
clusterView.isSingletonCluster must be(true)
assertLeader(first)
}

View file

@ -78,10 +78,10 @@ abstract class SplitBrainSpec
}
runOn(side1: _*) {
awaitCond(cluster.unreachableMembers.map(_.address) == (side2.toSet map address), 20 seconds)
awaitCond(clusterView.unreachableMembers.map(_.address) == (side2.toSet map address), 20 seconds)
}
runOn(side2: _*) {
awaitCond(cluster.unreachableMembers.map(_.address) == (side1.toSet map address), 20 seconds)
awaitCond(clusterView.unreachableMembers.map(_.address) == (side1.toSet map address), 20 seconds)
}
enterBarrier("after-2")
@ -91,16 +91,16 @@ abstract class SplitBrainSpec
runOn(side1: _*) {
// auto-down = on
awaitCond(cluster.unreachableMembers.forall(m m.status == MemberStatus.Down), 15 seconds)
cluster.unreachableMembers.map(_.address) must be(side2.toSet map address)
awaitCond(clusterView.unreachableMembers.forall(m m.status == MemberStatus.Down), 15 seconds)
clusterView.unreachableMembers.map(_.address) must be(side2.toSet map address)
awaitUpConvergence(side1.size, side2 map address)
assertLeader(side1: _*)
}
runOn(side2: _*) {
// auto-down = on
awaitCond(cluster.unreachableMembers.forall(m m.status == MemberStatus.Down), 15 seconds)
cluster.unreachableMembers.map(_.address) must be(side1.toSet map address)
awaitCond(clusterView.unreachableMembers.forall(m m.status == MemberStatus.Down), 15 seconds)
clusterView.unreachableMembers.map(_.address) must be(side1.toSet map address)
awaitUpConvergence(side2.size, side1 map address)
assertLeader(side2: _*)
}

View file

@ -42,18 +42,18 @@ abstract class TransitionSpec
def nonLeader(roles: RoleName*) = roles.toSeq.sorted.tail
def memberStatus(address: Address): MemberStatus = {
val statusOption = (cluster.members ++ cluster.unreachableMembers).collectFirst {
val statusOption = (clusterView.members ++ clusterView.unreachableMembers).collectFirst {
case m if m.address == address m.status
}
statusOption must not be (None)
statusOption.get
}
def memberAddresses: Set[Address] = cluster.members.map(_.address)
def memberAddresses: Set[Address] = clusterView.members.map(_.address)
def members: Set[RoleName] = memberAddresses.flatMap(roleName(_))
def seenLatestGossip: Set[RoleName] = cluster.seenBy flatMap roleName
def seenLatestGossip: Set[RoleName] = clusterView.seenBy flatMap roleName
def awaitSeen(addresses: Address*): Unit = awaitCond {
(seenLatestGossip map address) == addresses.toSet
@ -90,10 +90,10 @@ abstract class TransitionSpec
def gossipTo(toRole: RoleName): Unit = {
gossipBarrierCounter += 1
runOn(toRole) {
val oldCount = cluster.latestStats.receivedGossipCount
val oldCount = clusterView.latestStats.receivedGossipCount
enterBarrier("before-gossip-" + gossipBarrierCounter)
awaitCond {
cluster.latestStats.receivedGossipCount != oldCount // received gossip
clusterView.latestStats.receivedGossipCount != oldCount // received gossip
}
// gossip chat will synchronize the views
awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty)
@ -120,11 +120,11 @@ abstract class TransitionSpec
runOn(first) {
startClusterNode()
cluster.isSingletonCluster must be(true)
cluster.status must be(Joining)
cluster.convergence must be(true)
clusterView.isSingletonCluster must be(true)
clusterView.status must be(Joining)
clusterView.convergence must be(true)
leaderActions()
cluster.status must be(Up)
clusterView.status must be(Up)
}
enterBarrier("after-1")
@ -141,7 +141,7 @@ abstract class TransitionSpec
memberStatus(first) must be(Up)
memberStatus(second) must be(Joining)
awaitCond(seenLatestGossip == Set(first, second))
cluster.convergence must be(true)
clusterView.convergence must be(true)
}
enterBarrier("convergence-joining-2")
@ -158,7 +158,7 @@ abstract class TransitionSpec
awaitCond(memberStatus(second) == Up)
seenLatestGossip must be(Set(first, second))
memberStatus(first) must be(Up)
cluster.convergence must be(true)
clusterView.convergence must be(true)
}
enterBarrier("after-2")
@ -174,7 +174,7 @@ abstract class TransitionSpec
awaitMembers(first, second, third)
memberStatus(third) must be(Joining)
awaitCond(seenLatestGossip == Set(second, third))
cluster.convergence must be(false)
clusterView.convergence must be(false)
}
enterBarrier("third-joined-second")
@ -185,7 +185,7 @@ abstract class TransitionSpec
memberStatus(third) must be(Joining)
awaitCond(memberStatus(second) == Up)
seenLatestGossip must be(Set(first, second, third))
cluster.convergence must be(true)
clusterView.convergence must be(true)
}
first gossipTo third
@ -195,7 +195,7 @@ abstract class TransitionSpec
memberStatus(second) must be(Up)
memberStatus(third) must be(Joining)
seenLatestGossip must be(Set(first, second, third))
cluster.convergence must be(true)
clusterView.convergence must be(true)
}
enterBarrier("convergence-joining-3")
@ -213,7 +213,7 @@ abstract class TransitionSpec
runOn(nonLeader(first, second, third).head) {
memberStatus(third) must be(Up)
seenLatestGossip must be(Set(leader(first, second, third), myself))
cluster.convergence must be(false)
clusterView.convergence must be(false)
}
// first non-leader gossipTo the other non-leader
@ -225,7 +225,7 @@ abstract class TransitionSpec
runOn(nonLeader(first, second, third).tail.head) {
memberStatus(third) must be(Up)
seenLatestGossip must be(Set(first, second, third))
cluster.convergence must be(true)
clusterView.convergence must be(true)
}
// first non-leader gossipTo the leader
@ -235,7 +235,7 @@ abstract class TransitionSpec
memberStatus(second) must be(Up)
memberStatus(third) must be(Up)
seenLatestGossip must be(Set(first, second, third))
cluster.convergence must be(true)
clusterView.convergence must be(true)
}
enterBarrier("after-3")
@ -245,7 +245,7 @@ abstract class TransitionSpec
runOn(third) {
markNodeAsUnavailable(second)
reapUnreachable()
cluster.unreachableMembers must contain(Member(second, Up))
clusterView.unreachableMembers must contain(Member(second, Up))
seenLatestGossip must be(Set(third))
}
@ -254,8 +254,8 @@ abstract class TransitionSpec
third gossipTo first
runOn(first, third) {
cluster.unreachableMembers must contain(Member(second, Up))
cluster.convergence must be(false)
clusterView.unreachableMembers must contain(Member(second, Up))
clusterView.convergence must be(false)
}
runOn(first) {
@ -268,10 +268,10 @@ abstract class TransitionSpec
first gossipTo third
runOn(first, third) {
cluster.unreachableMembers must contain(Member(second, Down))
clusterView.unreachableMembers must contain(Member(second, Down))
memberStatus(second) must be(Down)
seenLatestGossip must be(Set(first, third))
cluster.convergence must be(true)
clusterView.convergence must be(true)
}
enterBarrier("after-6")

View file

@ -80,13 +80,13 @@ abstract class UnreachableNodeRejoinsClusterSpec
within(30 seconds) {
// victim becomes all alone
awaitCond({
val members = cluster.members
cluster.unreachableMembers.size == (roles.size - 1) &&
val members = clusterView.members
clusterView.unreachableMembers.size == (roles.size - 1) &&
members.size == 1 &&
members.forall(_.status == MemberStatus.Up)
})
cluster.unreachableMembers.map(_.address) must be((allButVictim map address).toSet)
cluster.convergence must be(false)
clusterView.unreachableMembers.map(_.address) must be((allButVictim map address).toSet)
clusterView.convergence must be(false)
}
}
@ -95,17 +95,17 @@ abstract class UnreachableNodeRejoinsClusterSpec
within(30 seconds) {
// victim becomes unreachable
awaitCond({
val members = cluster.members
cluster.unreachableMembers.size == 1 &&
val members = clusterView.members
clusterView.unreachableMembers.size == 1 &&
members.size == (roles.size - 1) &&
members.forall(_.status == MemberStatus.Up)
})
awaitSeenSameState(allButVictim map address: _*)
// still one unreachable
cluster.unreachableMembers.size must be(1)
cluster.unreachableMembers.head.address must be(node(victim).address)
clusterView.unreachableMembers.size must be(1)
clusterView.unreachableMembers.head.address must be(node(victim).address)
// and therefore no convergence
cluster.convergence must be(false)
clusterView.convergence must be(false)
}
}

View file

@ -44,6 +44,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
val failureDetector = new FailureDetectorPuppet(system)
val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector)
def clusterView = cluster.readView
def leaderActions(): Unit = {
cluster.clusterCore ! LeaderActionsTick
@ -70,16 +71,16 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
}
"initially become singleton cluster when joining itself and reach convergence" in {
cluster.members.size must be(0) // auto-join = off
clusterView.members.size must be(0) // auto-join = off
cluster.join(selfAddress)
Thread.sleep(5000)
awaitCond(cluster.isSingletonCluster)
cluster.self.address must be(selfAddress)
cluster.members.map(_.address) must be(Set(selfAddress))
cluster.status must be(MemberStatus.Joining)
cluster.convergence must be(true)
awaitCond(clusterView.isSingletonCluster)
clusterView.self.address must be(selfAddress)
clusterView.members.map(_.address) must be(Set(selfAddress))
clusterView.status must be(MemberStatus.Joining)
clusterView.convergence must be(true)
leaderActions()
cluster.status must be(MemberStatus.Up)
clusterView.status must be(MemberStatus.Up)
}
}