diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index d42c9306a6..c76b637164 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -69,7 +69,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { * if (Cluster(system).isLeader) { ... } * }}} */ -class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension with ClusterEnvironment { +class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension with ClusterEnvironment { import ClusterEvent._ @@ -88,21 +88,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) log.info("Cluster Node [{}] - is starting up...", selfAddress) - /** - * Read view of cluster state, updated via subscription of - * cluster events published on the event bus. - */ - @volatile - private var state: CurrentClusterState = CurrentClusterState() - - /** - * INTERNAL API - * Read only view of internal cluster stats, updated periodically by - * ClusterCoreDaemon via event bus. Access with `latestStats`. - */ - @volatile - private var _latestStats = ClusterStats() - // ======================================================== // ===================== WORK DAEMONS ===================== // ======================================================== @@ -169,22 +154,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) Await.result((clusterDaemons ? InternalClusterAction.GetClusterCoreRef).mapTo[ActorRef], timeout.duration) } - // create actor that subscribes to the cluster eventBus to update current read view state - private val eventBusListener: ActorRef = { - system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor { - override def preStart(): Unit = subscribe(self, classOf[ClusterDomainEvent]) - override def postStop(): Unit = unsubscribe(self) - - def receive = { - case SeenChanged(convergence, seenBy) ⇒ state = state.copy(convergence = convergence, seenBy = seenBy) - case MembersChanged(members) ⇒ state = state.copy(members = members) - case UnreachableMembersChanged(unreachable) ⇒ state = state.copy(unreachable = unreachable) - case LeaderChanged(leader, convergence) ⇒ state = state.copy(leader = leader, convergence = convergence) - case s: CurrentClusterState ⇒ state = s - case CurrentInternalStats(stats) ⇒ _latestStats = stats - case _ ⇒ // ignore, not interesting - } - }).withDispatcher(UseDispatcher), name = "clusterEventBusListener") + @volatile + private var readViewStarted = false + private[cluster] lazy val readView: ClusterReadView = { + val readView = new ClusterReadView(this) + readViewStarted = true + readView } system.registerOnTermination(shutdown()) @@ -198,74 +173,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // ===================== PUBLIC API ===================== // ====================================================== - def self: Member = { - state.members.find(_.address == selfAddress).orElse(state.unreachable.find(_.address == selfAddress)). - getOrElse(Member(selfAddress, MemberStatus.Removed)) - } - /** * Returns true if the cluster node is up and running, false if it is shut down. */ def isRunning: Boolean = _isRunning.get - /** - * Current cluster members, sorted with leader first. - */ - def members: SortedSet[Member] = state.members - - /** - * Members that has been detected as unreachable. - */ - def unreachableMembers: Set[Member] = state.unreachable - - /** - * Member status for this node ([[akka.cluster.MemberStatus]]). - * - * NOTE: If the node has been removed from the cluster (and shut down) then it's status is set to the 'REMOVED' tombstone state - * and is no longer present in the node ring or any other part of the gossiping state. However in order to maintain the - * model and the semantics the user would expect, this method will in this situation return `MemberStatus.Removed`. - */ - def status: MemberStatus = self.status - - /** - * Is this node the leader? - */ - def isLeader: Boolean = leader == Some(selfAddress) - - /** - * Get the address of the current leader. - */ - def leader: Option[Address] = state.leader - - /** - * Is this node a singleton cluster? - */ - def isSingletonCluster: Boolean = members.size == 1 - - /** - * Checks if we have a cluster convergence. - */ - def convergence: Boolean = state.convergence - - /** - * The nodes that has seen current version of the Gossip. - */ - def seenBy: Set[Address] = state.seenBy - - /** - * Returns true if the node is UP or JOINING. - */ - def isAvailable: Boolean = { - val myself = self - !unreachableMembers.contains(myself) && !myself.status.isUnavailable - } - - /** - * Make it possible to override/configure seedNodes from tests without - * specifying in config. Addresses are unknown before startup time. - */ - def seedNodes: IndexedSeq[Address] = SeedNodes - /** * Subscribe to cluster domain events. * The `to` Class can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]] @@ -305,9 +217,10 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // ======================================================== /** - * INTERNAL API + * Make it possible to override/configure seedNodes from tests without + * specifying in config. Addresses are unknown before startup time. */ - private[cluster] def latestStats: ClusterStats = _latestStats + private[cluster] def seedNodes: IndexedSeq[Address] = SeedNodes /** * INTERNAL API. @@ -321,8 +234,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (_isRunning.compareAndSet(true, false)) { log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) - system.stop(eventBusListener) system.stop(clusterDaemons) + if (readViewStarted) readView.close() scheduler.close() diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala index f0b77808f7..4eb27e836e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala @@ -33,10 +33,11 @@ trait ClusterNodeMBean { /** * Internal API */ -private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) { +private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) { private val mBeanServer = ManagementFactory.getPlatformMBeanServer private val clusterMBeanName = new ObjectName("akka:type=Cluster") + private def clusterView = cluster.readView /** * Creates the cluster JMX MBean and registers it in the MBean server. @@ -57,34 +58,34 @@ private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) { * }}} */ def getClusterStatus: String = { - val unreachable = clusterNode.unreachableMembers - "\nMembers:\n\t" + clusterNode.members.mkString("\n\t") + + val unreachable = clusterView.unreachableMembers + "\nMembers:\n\t" + clusterView.members.mkString("\n\t") + { if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } } - def getMemberStatus: String = clusterNode.status.toString + def getMemberStatus: String = clusterView.status.toString - def getLeader: String = clusterNode.leader.toString + def getLeader: String = clusterView.leader.toString - def isSingleton: Boolean = clusterNode.isSingletonCluster + def isSingleton: Boolean = clusterView.isSingletonCluster - def isConvergence: Boolean = clusterNode.convergence + def isConvergence: Boolean = clusterView.convergence - def isAvailable: Boolean = clusterNode.isAvailable + def isAvailable: Boolean = clusterView.isAvailable - def isRunning: Boolean = clusterNode.isRunning + def isRunning: Boolean = clusterView.isRunning // JMX commands - def join(address: String) = clusterNode.join(AddressFromURIString(address)) + def join(address: String) = cluster.join(AddressFromURIString(address)) - def leave(address: String) = clusterNode.leave(AddressFromURIString(address)) + def leave(address: String) = cluster.leave(AddressFromURIString(address)) - def down(address: String) = clusterNode.down(AddressFromURIString(address)) + def down(address: String) = cluster.down(AddressFromURIString(address)) } try { mBeanServer.registerMBean(mbean, clusterMBeanName) - log.info("Cluster Node [{}] - registered cluster JMX MBean [{}]", clusterNode.selfAddress, clusterMBeanName) + log.info("Cluster Node [{}] - registered cluster JMX MBean [{}]", clusterView.selfAddress, clusterMBeanName) } catch { case e: InstanceAlreadyExistsException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing) } @@ -94,6 +95,7 @@ private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) { * Unregisters the cluster JMX MBean from MBean server. */ def unregisterMBean(): Unit = { + clusterView.close() try { mBeanServer.unregisterMBean(clusterMBeanName) } catch { diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala new file mode 100644 index 0000000000..4d3904cbb8 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import java.io.Closeable +import scala.collection.immutable.SortedSet + +import akka.actor.{ Actor, ActorRef, ActorSystemImpl, Address, Props } +import akka.cluster.ClusterEvent._ + +/** + * INTERNAL API + * + * Read view of cluster state, updated via subscription of + * cluster events published on the event bus. + */ +private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { + + /** + * Current state + */ + @volatile + private var state: CurrentClusterState = CurrentClusterState() + + /** + * Current internal cluster stats, updated periodically via event bus. + */ + @volatile + private var _latestStats = ClusterStats() + + val selfAddress = cluster.selfAddress + + // create actor that subscribes to the cluster eventBus to update current read view state + private val eventBusListener: ActorRef = { + cluster.system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor { + override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent]) + override def postStop(): Unit = cluster.unsubscribe(self) + + def receive = { + case SeenChanged(convergence, seenBy) ⇒ state = state.copy(convergence = convergence, seenBy = seenBy) + case MembersChanged(members) ⇒ state = state.copy(members = members) + case UnreachableMembersChanged(unreachable) ⇒ state = state.copy(unreachable = unreachable) + case LeaderChanged(leader, convergence) ⇒ state = state.copy(leader = leader, convergence = convergence) + case s: CurrentClusterState ⇒ state = s + case CurrentInternalStats(stats) ⇒ _latestStats = stats + case _ ⇒ // ignore, not interesting + } + }).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener") + } + + def self: Member = { + state.members.find(_.address == selfAddress).orElse(state.unreachable.find(_.address == selfAddress)). + getOrElse(Member(selfAddress, MemberStatus.Removed)) + } + + /** + * Returns true if the cluster node is up and running, false if it is shut down. + */ + def isRunning: Boolean = cluster.isRunning + + /** + * Current cluster members, sorted with leader first. + */ + def members: SortedSet[Member] = state.members + + /** + * Members that has been detected as unreachable. + */ + def unreachableMembers: Set[Member] = state.unreachable + + /** + * Member status for this node ([[akka.cluster.MemberStatus]]). + * + * NOTE: If the node has been removed from the cluster (and shut down) then it's status is set to the 'REMOVED' tombstone state + * and is no longer present in the node ring or any other part of the gossiping state. However in order to maintain the + * model and the semantics the user would expect, this method will in this situation return `MemberStatus.Removed`. + */ + def status: MemberStatus = self.status + + /** + * Is this node the leader? + */ + def isLeader: Boolean = leader == Some(selfAddress) + + /** + * Get the address of the current leader. + */ + def leader: Option[Address] = state.leader + + /** + * Is this node a singleton cluster? + */ + def isSingletonCluster: Boolean = members.size == 1 + + /** + * Checks if we have a cluster convergence. + */ + def convergence: Boolean = state.convergence + + /** + * Returns true if the node is UP or JOINING. + */ + def isAvailable: Boolean = { + val myself = self + !unreachableMembers.contains(myself) && !myself.status.isUnavailable + } + + /** + * INTERNAL API + * The nodes that has seen current version of the Gossip. + */ + private[cluster] def seenBy: Set[Address] = state.seenBy + + /** + * INTERNAL API + */ + private[cluster] def latestStats: ClusterStats = _latestStats + + /** + * Unsubscribe to cluster events. + */ + def close(): Unit = { + cluster.system.stop(eventBusListener) + } + +} \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index d6ac36fd09..e0440394a7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -50,7 +50,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec enterBarrier("down-third-node") awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) - cluster.members.exists(_.address == thirdAddress) must be(false) + clusterView.members.exists(_.address == thirdAddress) must be(false) } runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index 910d8e25e4..82d90c81b5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -48,7 +48,7 @@ abstract class ClientDowningNodeThatIsUpSpec markNodeAsUnavailable(thirdAddress) awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) - cluster.members.exists(_.address == thirdAddress) must be(false) + clusterView.members.exists(_.address == thirdAddress) must be(false) } runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index aa55cd6162..1862b8ea40 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -65,15 +65,15 @@ abstract class ConvergenceSpec within(28 seconds) { // third becomes unreachable - awaitCond(cluster.unreachableMembers.size == 1) - awaitCond(cluster.members.size == 2) - awaitCond(cluster.members.forall(_.status == MemberStatus.Up)) + awaitCond(clusterView.unreachableMembers.size == 1) + awaitCond(clusterView.members.size == 2) + awaitCond(clusterView.members.forall(_.status == MemberStatus.Up)) awaitSeenSameState(first, second) // still one unreachable - cluster.unreachableMembers.size must be(1) - cluster.unreachableMembers.head.address must be(thirdAddress) + clusterView.unreachableMembers.size must be(1) + clusterView.unreachableMembers.head.address must be(thirdAddress) // and therefore no convergence - cluster.convergence must be(false) + clusterView.convergence must be(false) } } @@ -88,18 +88,18 @@ abstract class ConvergenceSpec } def memberStatus(address: Address): Option[MemberStatus] = - cluster.members.collectFirst { case m if m.address == address ⇒ m.status } + clusterView.members.collectFirst { case m if m.address == address ⇒ m.status } def assertNotMovedUp: Unit = { within(20 seconds) { - awaitCond(cluster.members.size == 3) + awaitCond(clusterView.members.size == 3) awaitSeenSameState(first, second, fourth) memberStatus(first) must be(Some(MemberStatus.Up)) memberStatus(second) must be(Some(MemberStatus.Up)) // leader is not allowed to move the new node to Up memberStatus(fourth) must be(Some(MemberStatus.Joining)) // still no convergence - cluster.convergence must be(false) + clusterView.convergence must be(false) } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index aa8a45751c..99dd2633a4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -137,9 +137,9 @@ abstract class LargeClusterSpec val clusterNodes = ifNode(from)(joiningClusterNodes)(systems.map(Cluster(_)).toSet) val startGossipCounts = Map.empty[Cluster, Long] ++ - clusterNodes.map(c ⇒ (c -> c.latestStats.receivedGossipCount)) + clusterNodes.map(c ⇒ (c -> c.readView.latestStats.receivedGossipCount)) def gossipCount(c: Cluster): Long = { - c.latestStats.receivedGossipCount - startGossipCounts(c) + c.readView.latestStats.receivedGossipCount - startGossipCounts(c) } val startTime = System.nanoTime def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms" @@ -164,7 +164,7 @@ abstract class LargeClusterSpec Await.ready(latch, remaining) - awaitCond(clusterNodes.forall(_.convergence)) + awaitCond(clusterNodes.forall(_.readView.convergence)) val counts = clusterNodes.map(gossipCount(_)) val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / clusterNodes.size, counts.min, counts.max) log.info("Convergence of [{}] nodes reached, it took [{}], received [{}] gossip messages per node", @@ -266,9 +266,9 @@ abstract class LargeClusterSpec within(30.seconds + (3.seconds * liveNodes)) { val startGossipCounts = Map.empty[Cluster, Long] ++ - systems.map(sys ⇒ (Cluster(sys) -> Cluster(sys).latestStats.receivedGossipCount)) + systems.map(sys ⇒ (Cluster(sys) -> Cluster(sys).readView.latestStats.receivedGossipCount)) def gossipCount(c: Cluster): Long = { - c.latestStats.receivedGossipCount - startGossipCounts(c) + c.readView.latestStats.receivedGossipCount - startGossipCounts(c) } val startTime = System.nanoTime def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms" @@ -303,8 +303,8 @@ abstract class LargeClusterSpec runOn(firstDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) { Await.ready(latch, remaining) - awaitCond(systems.forall(Cluster(_).convergence)) - val mergeCount = systems.map(sys ⇒ Cluster(sys).latestStats.mergeCount).sum + awaitCond(systems.forall(Cluster(_).readView.convergence)) + val mergeCount = systems.map(sys ⇒ Cluster(sys).readView.latestStats.mergeCount).sum val counts = systems.map(sys ⇒ gossipCount(Cluster(sys))) val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / nodesPerDatacenter, counts.min, counts.max) log.info("Convergence of [{}] nodes reached after failure, it took [{}], received [{}] gossip messages per node, merged [{}] times", diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index 9ed8f27ad4..1a657b3da8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -46,7 +46,7 @@ abstract class LeaderElectionSpec awaitClusterUp(first, second, third, fourth) if (myself != controller) { - cluster.isLeader must be(myself == sortedRoles.head) + clusterView.isLeader must be(myself == sortedRoles.head) assertLeaderIn(sortedRoles) } @@ -87,7 +87,7 @@ abstract class LeaderElectionSpec awaitUpConvergence(currentRoles.size - 1) val nextExpectedLeader = remainingRoles.head - cluster.isLeader must be(myself == nextExpectedLeader) + clusterView.isLeader must be(myself == nextExpectedLeader) assertLeaderIn(remainingRoles) enterBarrier("completed") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala index a5cc5f3b4d..6c16a9550a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala @@ -44,11 +44,11 @@ abstract class LeaderLeavingSpec awaitClusterUp(first, second, third) - val oldLeaderAddress = cluster.leader.get + val oldLeaderAddress = clusterView.leader.get within(leaderHandoffWaitingTime) { - if (cluster.isLeader) { + if (clusterView.isLeader) { enterBarrier("registered-listener") @@ -56,13 +56,13 @@ abstract class LeaderLeavingSpec enterBarrier("leader-left") // verify that a NEW LEADER have taken over - awaitCond(!cluster.isLeader) + awaitCond(!clusterView.isLeader) // verify that the LEADER is shut down awaitCond(!cluster.isRunning) // verify that the LEADER is REMOVED - awaitCond(cluster.status == MemberStatus.Removed) + awaitCond(clusterView.status == MemberStatus.Removed) } else { @@ -90,13 +90,13 @@ abstract class LeaderLeavingSpec exitingLatch.await // verify that the LEADER is no longer part of the 'members' set - awaitCond(cluster.members.forall(_.address != oldLeaderAddress)) + awaitCond(clusterView.members.forall(_.address != oldLeaderAddress)) // verify that the LEADER is not part of the 'unreachable' set - awaitCond(cluster.unreachableMembers.forall(_.address != oldLeaderAddress)) + awaitCond(clusterView.unreachableMembers.forall(_.address != oldLeaderAddress)) // verify that we have a new LEADER - awaitCond(cluster.leader != oldLeaderAddress) + awaitCond(clusterView.leader != oldLeaderAddress) } enterBarrier("finished") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index f48cb941fb..0eeee4334a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -19,7 +19,7 @@ object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) - .withFallback(ConfigFactory.parseString("akka.cluster.leader-actions-interval = 5 s") // increase the leader action task interval to allow time checking for JOIN before leader moves it to UP + .withFallback(ConfigFactory.parseString("akka.clusterView.leader-actions-interval = 5 s") // increase the leader action task interval to allow time checking for JOIN before leader moves it to UP .withFallback(MultiNodeClusterSpec.clusterConfig))) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index 20e6a6132c..69ef096613 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -21,7 +21,7 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" - akka.cluster.leader-actions-interval = 5 s + akka.clusterView.leader-actions-interval = 5 s akka.cluster.unreachable-nodes-reaper-interval = 300 s # turn "off" """)) .withFallback(MultiNodeClusterSpec.clusterConfig)) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 91f0e82939..73837ef697 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -97,6 +97,8 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu } } + def clusterView: ClusterReadView = cluster.readView + /** * Get the cluster node to use. */ @@ -106,11 +108,11 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu * Use this method for the initial startup of the cluster node. */ def startClusterNode(): Unit = { - if (cluster.members.isEmpty) { + if (clusterView.members.isEmpty) { cluster join myself - awaitCond(cluster.members.exists(_.address == address(myself))) + awaitCond(clusterView.members.exists(_.address == address(myself))) } else - cluster.self + clusterView.self } /** @@ -168,8 +170,8 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu def assertLeaderIn(nodesInCluster: Seq[RoleName]): Unit = if (nodesInCluster.contains(myself)) { nodesInCluster.length must not be (0) val expectedLeader = roleOfLeader(nodesInCluster) - cluster.isLeader must be(ifNode(expectedLeader)(true)(false)) - cluster.status must (be(MemberStatus.Up) or be(MemberStatus.Leaving)) + clusterView.isLeader must be(ifNode(expectedLeader)(true)(false)) + clusterView.status must (be(MemberStatus.Up) or be(MemberStatus.Leaving)) } /** @@ -181,12 +183,12 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu canNotBePartOfMemberRing: Seq[Address] = Seq.empty[Address], timeout: Duration = 20.seconds): Unit = { within(timeout) { - awaitCond(cluster.members.size == numberOfMembers) - awaitCond(cluster.members.forall(_.status == MemberStatus.Up)) - awaitCond(cluster.convergence) + awaitCond(clusterView.members.size == numberOfMembers) + awaitCond(clusterView.members.forall(_.status == MemberStatus.Up)) + awaitCond(clusterView.convergence) if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set awaitCond( - canNotBePartOfMemberRing forall (address ⇒ !(cluster.members exists (_.address == address)))) + canNotBePartOfMemberRing forall (address ⇒ !(clusterView.members exists (_.address == address)))) } } @@ -194,7 +196,7 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu * Wait until the specified nodes have seen the same gossip overview. */ def awaitSeenSameState(addresses: Address*): Unit = - awaitCond((addresses.toSet -- cluster.seenBy).isEmpty) + awaitCond((addresses.toSet -- clusterView.seenBy).isEmpty) def roleOfLeader(nodesInCluster: Seq[RoleName] = roles): RoleName = { nodesInCluster.length must not be (0) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala index 0476e52516..de21d714bb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala @@ -16,7 +16,7 @@ object NodeJoinMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) - .withFallback(ConfigFactory.parseString("akka.cluster.leader-actions-interval = 5 s") // increase the leader action task interval + .withFallback(ConfigFactory.parseString("akka.clusterView.leader-actions-interval = 5 s") // increase the leader action task interval .withFallback(MultiNodeClusterSpec.clusterConfig))) } @@ -42,7 +42,7 @@ abstract class NodeJoinSpec cluster.join(first) } - awaitCond(cluster.members.exists { member ⇒ member.address == address(second) && member.status == MemberStatus.Joining }) + awaitCond(clusterView.members.exists { member ⇒ member.address == address(second) && member.status == MemberStatus.Joining }) enterBarrier("after") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala index e9730dee69..752857316b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala @@ -43,16 +43,16 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec runOn(first, third) { // verify that the 'second' node is no longer part of the 'members' set - awaitCond(cluster.members.forall(_.address != address(second)), reaperWaitingTime) + awaitCond(clusterView.members.forall(_.address != address(second)), reaperWaitingTime) // verify that the 'second' node is not part of the 'unreachable' set - awaitCond(cluster.unreachableMembers.forall(_.address != address(second)), reaperWaitingTime) + awaitCond(clusterView.unreachableMembers.forall(_.address != address(second)), reaperWaitingTime) } runOn(second) { // verify that the second node is shut down and has status REMOVED awaitCond(!cluster.isRunning, reaperWaitingTime) - awaitCond(cluster.status == MemberStatus.Removed, reaperWaitingTime) + awaitCond(clusterView.status == MemberStatus.Removed, reaperWaitingTime) } enterBarrier("finished") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index 9bb78e3539..3df642afad 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -38,12 +38,12 @@ abstract class NodeMembershipSpec runOn(first, second) { cluster.join(first) - awaitCond(cluster.members.size == 2) - assertMembers(cluster.members, first, second) + awaitCond(clusterView.members.size == 2) + assertMembers(clusterView.members, first, second) awaitCond { - cluster.members.forall(_.status == MemberStatus.Up) + clusterView.members.forall(_.status == MemberStatus.Up) } - awaitCond(cluster.convergence) + awaitCond(clusterView.convergence) } enterBarrier("after-1") @@ -55,12 +55,12 @@ abstract class NodeMembershipSpec cluster.join(first) } - awaitCond(cluster.members.size == 3) - assertMembers(cluster.members, first, second, third) + awaitCond(clusterView.members.size == 3) + assertMembers(clusterView.members, first, second, third) awaitCond { - cluster.members.forall(_.status == MemberStatus.Up) + clusterView.members.forall(_.status == MemberStatus.Up) } - awaitCond(cluster.convergence) + awaitCond(clusterView.convergence) enterBarrier("after-2") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala index a07d9059f0..8ee02ee197 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -60,7 +60,7 @@ abstract class NodeUpSpec for (n ← 1 to 20) { Thread.sleep(100.millis.dilated.toMillis) unexpected.get must be(SortedSet.empty) - cluster.members.forall(_.status == MemberStatus.Up) must be(true) + clusterView.members.forall(_.status == MemberStatus.Up) must be(true) } enterBarrier("after-2") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index dddeac8816..d044d90f72 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -42,14 +42,14 @@ abstract class SingletonClusterSpec "become singleton cluster when started with 'auto-join=on' and 'seed-nodes=[]'" taggedAs LongRunningTest in { startClusterNode() awaitUpConvergence(1) - cluster.isSingletonCluster must be(true) + clusterView.isSingletonCluster must be(true) enterBarrier("after-1") } "not be singleton cluster when joined with other node" taggedAs LongRunningTest in { awaitClusterUp(first, second) - cluster.isSingletonCluster must be(false) + clusterView.isSingletonCluster must be(false) assertLeader(first, second) enterBarrier("after-2") @@ -63,7 +63,7 @@ abstract class SingletonClusterSpec markNodeAsUnavailable(secondAddress) awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds) - cluster.isSingletonCluster must be(true) + clusterView.isSingletonCluster must be(true) assertLeader(first) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala index 0a09db0546..d030734a71 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -78,10 +78,10 @@ abstract class SplitBrainSpec } runOn(side1: _*) { - awaitCond(cluster.unreachableMembers.map(_.address) == (side2.toSet map address), 20 seconds) + awaitCond(clusterView.unreachableMembers.map(_.address) == (side2.toSet map address), 20 seconds) } runOn(side2: _*) { - awaitCond(cluster.unreachableMembers.map(_.address) == (side1.toSet map address), 20 seconds) + awaitCond(clusterView.unreachableMembers.map(_.address) == (side1.toSet map address), 20 seconds) } enterBarrier("after-2") @@ -91,16 +91,16 @@ abstract class SplitBrainSpec runOn(side1: _*) { // auto-down = on - awaitCond(cluster.unreachableMembers.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) - cluster.unreachableMembers.map(_.address) must be(side2.toSet map address) + awaitCond(clusterView.unreachableMembers.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) + clusterView.unreachableMembers.map(_.address) must be(side2.toSet map address) awaitUpConvergence(side1.size, side2 map address) assertLeader(side1: _*) } runOn(side2: _*) { // auto-down = on - awaitCond(cluster.unreachableMembers.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) - cluster.unreachableMembers.map(_.address) must be(side1.toSet map address) + awaitCond(clusterView.unreachableMembers.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) + clusterView.unreachableMembers.map(_.address) must be(side1.toSet map address) awaitUpConvergence(side2.size, side1 map address) assertLeader(side2: _*) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 5fac9f3f8f..0d5e60b7ad 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -42,18 +42,18 @@ abstract class TransitionSpec def nonLeader(roles: RoleName*) = roles.toSeq.sorted.tail def memberStatus(address: Address): MemberStatus = { - val statusOption = (cluster.members ++ cluster.unreachableMembers).collectFirst { + val statusOption = (clusterView.members ++ clusterView.unreachableMembers).collectFirst { case m if m.address == address ⇒ m.status } statusOption must not be (None) statusOption.get } - def memberAddresses: Set[Address] = cluster.members.map(_.address) + def memberAddresses: Set[Address] = clusterView.members.map(_.address) def members: Set[RoleName] = memberAddresses.flatMap(roleName(_)) - def seenLatestGossip: Set[RoleName] = cluster.seenBy flatMap roleName + def seenLatestGossip: Set[RoleName] = clusterView.seenBy flatMap roleName def awaitSeen(addresses: Address*): Unit = awaitCond { (seenLatestGossip map address) == addresses.toSet @@ -90,10 +90,10 @@ abstract class TransitionSpec def gossipTo(toRole: RoleName): Unit = { gossipBarrierCounter += 1 runOn(toRole) { - val oldCount = cluster.latestStats.receivedGossipCount + val oldCount = clusterView.latestStats.receivedGossipCount enterBarrier("before-gossip-" + gossipBarrierCounter) awaitCond { - cluster.latestStats.receivedGossipCount != oldCount // received gossip + clusterView.latestStats.receivedGossipCount != oldCount // received gossip } // gossip chat will synchronize the views awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty) @@ -120,11 +120,11 @@ abstract class TransitionSpec runOn(first) { startClusterNode() - cluster.isSingletonCluster must be(true) - cluster.status must be(Joining) - cluster.convergence must be(true) + clusterView.isSingletonCluster must be(true) + clusterView.status must be(Joining) + clusterView.convergence must be(true) leaderActions() - cluster.status must be(Up) + clusterView.status must be(Up) } enterBarrier("after-1") @@ -141,7 +141,7 @@ abstract class TransitionSpec memberStatus(first) must be(Up) memberStatus(second) must be(Joining) awaitCond(seenLatestGossip == Set(first, second)) - cluster.convergence must be(true) + clusterView.convergence must be(true) } enterBarrier("convergence-joining-2") @@ -158,7 +158,7 @@ abstract class TransitionSpec awaitCond(memberStatus(second) == Up) seenLatestGossip must be(Set(first, second)) memberStatus(first) must be(Up) - cluster.convergence must be(true) + clusterView.convergence must be(true) } enterBarrier("after-2") @@ -174,7 +174,7 @@ abstract class TransitionSpec awaitMembers(first, second, third) memberStatus(third) must be(Joining) awaitCond(seenLatestGossip == Set(second, third)) - cluster.convergence must be(false) + clusterView.convergence must be(false) } enterBarrier("third-joined-second") @@ -185,7 +185,7 @@ abstract class TransitionSpec memberStatus(third) must be(Joining) awaitCond(memberStatus(second) == Up) seenLatestGossip must be(Set(first, second, third)) - cluster.convergence must be(true) + clusterView.convergence must be(true) } first gossipTo third @@ -195,7 +195,7 @@ abstract class TransitionSpec memberStatus(second) must be(Up) memberStatus(third) must be(Joining) seenLatestGossip must be(Set(first, second, third)) - cluster.convergence must be(true) + clusterView.convergence must be(true) } enterBarrier("convergence-joining-3") @@ -213,7 +213,7 @@ abstract class TransitionSpec runOn(nonLeader(first, second, third).head) { memberStatus(third) must be(Up) seenLatestGossip must be(Set(leader(first, second, third), myself)) - cluster.convergence must be(false) + clusterView.convergence must be(false) } // first non-leader gossipTo the other non-leader @@ -225,7 +225,7 @@ abstract class TransitionSpec runOn(nonLeader(first, second, third).tail.head) { memberStatus(third) must be(Up) seenLatestGossip must be(Set(first, second, third)) - cluster.convergence must be(true) + clusterView.convergence must be(true) } // first non-leader gossipTo the leader @@ -235,7 +235,7 @@ abstract class TransitionSpec memberStatus(second) must be(Up) memberStatus(third) must be(Up) seenLatestGossip must be(Set(first, second, third)) - cluster.convergence must be(true) + clusterView.convergence must be(true) } enterBarrier("after-3") @@ -245,7 +245,7 @@ abstract class TransitionSpec runOn(third) { markNodeAsUnavailable(second) reapUnreachable() - cluster.unreachableMembers must contain(Member(second, Up)) + clusterView.unreachableMembers must contain(Member(second, Up)) seenLatestGossip must be(Set(third)) } @@ -254,8 +254,8 @@ abstract class TransitionSpec third gossipTo first runOn(first, third) { - cluster.unreachableMembers must contain(Member(second, Up)) - cluster.convergence must be(false) + clusterView.unreachableMembers must contain(Member(second, Up)) + clusterView.convergence must be(false) } runOn(first) { @@ -268,10 +268,10 @@ abstract class TransitionSpec first gossipTo third runOn(first, third) { - cluster.unreachableMembers must contain(Member(second, Down)) + clusterView.unreachableMembers must contain(Member(second, Down)) memberStatus(second) must be(Down) seenLatestGossip must be(Set(first, third)) - cluster.convergence must be(true) + clusterView.convergence must be(true) } enterBarrier("after-6") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index f20173bf4f..ca5c8f3265 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -80,13 +80,13 @@ abstract class UnreachableNodeRejoinsClusterSpec within(30 seconds) { // victim becomes all alone awaitCond({ - val members = cluster.members - cluster.unreachableMembers.size == (roles.size - 1) && + val members = clusterView.members + clusterView.unreachableMembers.size == (roles.size - 1) && members.size == 1 && members.forall(_.status == MemberStatus.Up) }) - cluster.unreachableMembers.map(_.address) must be((allButVictim map address).toSet) - cluster.convergence must be(false) + clusterView.unreachableMembers.map(_.address) must be((allButVictim map address).toSet) + clusterView.convergence must be(false) } } @@ -95,17 +95,17 @@ abstract class UnreachableNodeRejoinsClusterSpec within(30 seconds) { // victim becomes unreachable awaitCond({ - val members = cluster.members - cluster.unreachableMembers.size == 1 && + val members = clusterView.members + clusterView.unreachableMembers.size == 1 && members.size == (roles.size - 1) && members.forall(_.status == MemberStatus.Up) }) awaitSeenSameState(allButVictim map address: _*) // still one unreachable - cluster.unreachableMembers.size must be(1) - cluster.unreachableMembers.head.address must be(node(victim).address) + clusterView.unreachableMembers.size must be(1) + clusterView.unreachableMembers.head.address must be(node(victim).address) // and therefore no convergence - cluster.convergence must be(false) + clusterView.convergence must be(false) } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 8e6035e6d1..73364b853e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -44,6 +44,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { val failureDetector = new FailureDetectorPuppet(system) val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) + def clusterView = cluster.readView def leaderActions(): Unit = { cluster.clusterCore ! LeaderActionsTick @@ -70,16 +71,16 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { } "initially become singleton cluster when joining itself and reach convergence" in { - cluster.members.size must be(0) // auto-join = off + clusterView.members.size must be(0) // auto-join = off cluster.join(selfAddress) Thread.sleep(5000) - awaitCond(cluster.isSingletonCluster) - cluster.self.address must be(selfAddress) - cluster.members.map(_.address) must be(Set(selfAddress)) - cluster.status must be(MemberStatus.Joining) - cluster.convergence must be(true) + awaitCond(clusterView.isSingletonCluster) + clusterView.self.address must be(selfAddress) + clusterView.members.map(_.address) must be(Set(selfAddress)) + clusterView.status must be(MemberStatus.Joining) + clusterView.convergence must be(true) leaderActions() - cluster.status must be(MemberStatus.Up) + clusterView.status must be(MemberStatus.Up) } }