diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index bcf288dfec..30e425dafd 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -40,9 +40,9 @@ akka { # how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring? unreachable-nodes-reaper-interval = 1s - # How often the current state (Gossip) should be published for reading from the outside. - # A value of 0 s can be used to always publish the state, when it happens. - publish-state-interval = 1s + # How often the current internal stats should be published. + # A value of 0 s can be used to always publish the stats, when it happens. + publish-stats-interval = 10s # A joining node stops sending heartbeats to the node to join if it hasn't become member # of the cluster within this deadline. diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index ce5b345346..66a778af9d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -89,19 +89,19 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) log.info("Cluster Node [{}] - is starting up...", selfAddress) /** - * Read only view of cluster state, updated periodically by - * ClusterCoreDaemon. Access with `latestGossip`. + * Read view of cluster state, updated via subscription of + * cluster events published on the event bus. */ @volatile - private[cluster] var _latestGossip: Gossip = Gossip() + private var state: CurrentClusterState = CurrentClusterState() /** * INTERNAL API * Read only view of internal cluster stats, updated periodically by - * ClusterCoreDaemon. Access with `latestStats`. + * ClusterCoreDaemon via event bus. Access with `latestStats`. */ @volatile - private[cluster] var _latestStats = ClusterStats() + private var _latestStats = ClusterStats() // ======================================================== // ===================== WORK DAEMONS ===================== @@ -155,20 +155,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } } - // create actor that subscribes to the cluster eventBus to update current read view state - private val eventBusListener: ActorRef = { - val listener = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor { - def receive = { - case MembershipGossipChanged(gossip) ⇒ _latestGossip = gossip - case InternalStatsChanged(stats) ⇒ _latestStats = stats - case _ ⇒ // ignore, not interesting - } - }).withDispatcher(UseDispatcher), name = "clusterEventBusListener") - - subscribe(listener, classOf[ClusterDomainEvent]) - listener - } - // create supervisor for daemons under path "/system/cluster" private val clusterDaemons: ActorRef = { system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new ClusterDaemon(this)). @@ -183,6 +169,24 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) Await.result((clusterDaemons ? InternalClusterAction.GetClusterCoreRef).mapTo[ActorRef], timeout.duration) } + // create actor that subscribes to the cluster eventBus to update current read view state + private val eventBusListener: ActorRef = { + system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor { + override def preStart(): Unit = subscribe(self, classOf[ClusterDomainEvent]) + override def postStop(): Unit = unsubscribe(self) + + def receive = { + case s: CurrentClusterState ⇒ state = s + case MembersChanged(members) ⇒ state = state.copy(members = members) + case UnreachableMembersChanged(unreachable) ⇒ state = state.copy(unreachable = unreachable) + case LeaderChanged(leader) ⇒ state = state.copy(leader = leader) + case SeenChanged(convergence, seenBy) ⇒ state = state.copy(convergence = convergence, seenBy = seenBy) + case CurrentInternalStats(stats) ⇒ _latestStats = stats + case _ ⇒ // ignore, not interesting + } + }).withDispatcher(UseDispatcher), name = "clusterEventBusListener") + } + system.registerOnTermination(shutdown()) private val clusterJmx = new ClusterJmx(this, log) @@ -194,7 +198,10 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // ===================== PUBLIC API ===================== // ====================================================== - def self: Member = latestGossip.member(selfAddress) + def self: Member = { + state.members.find(_.address == selfAddress).orElse(state.unreachable.find(_.address == selfAddress)). + getOrElse(Member(selfAddress, MemberStatus.Removed)) + } /** * Returns true if the cluster node is up and running, false if it is shut down. @@ -202,9 +209,14 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) def isRunning: Boolean = _isRunning.get /** - * Latest gossip. + * Current cluster members, sorted with leader first. */ - def latestGossip: Gossip = _latestGossip + def members: SortedSet[Member] = state.members + + /** + * Members that has been detected as unreachable. + */ + def unreachableMembers: Set[Member] = state.unreachable /** * Member status for this node ([[akka.cluster.MemberStatus]]). @@ -218,35 +230,35 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) /** * Is this node the leader? */ - def isLeader: Boolean = latestGossip.isLeader(selfAddress) + def isLeader: Boolean = leader == Some(selfAddress) /** * Get the address of the current leader. */ - def leader: Address = latestGossip.leader match { - case Some(x) ⇒ x - case None ⇒ throw new IllegalStateException("There is no leader in this cluster") - } + def leader: Option[Address] = state.leader /** * Is this node a singleton cluster? */ - def isSingletonCluster: Boolean = latestGossip.isSingletonCluster + def isSingletonCluster: Boolean = members.size == 1 /** * Checks if we have a cluster convergence. - * - * @return Some(convergedGossip) if convergence have been reached and None if not */ - def convergence: Option[Gossip] = latestGossip match { - case gossip if gossip.convergence ⇒ Some(gossip) - case _ ⇒ None - } + def convergence: Boolean = state.convergence + + /** + * The nodes that has seen current version of the Gossip. + */ + def seenBy: Set[Address] = state.seenBy /** * Returns true if the node is UP or JOINING. */ - def isAvailable: Boolean = latestGossip.isAvailable(selfAddress) + def isAvailable: Boolean = { + val myself = self + !unreachableMembers.contains(myself) && !myself.status.isUnavailable + } /** * Make it possible to override/configure seedNodes from tests without @@ -257,14 +269,17 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) /** * Subscribe to cluster domain events. * The `to` Class can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]] - * or subclass. + * or subclass. A snapshot of [[akka.cluster.ClusterEvent.CurrentClusterState]] + * will also be sent to the subscriber. */ - def subscribe(subscriber: ActorRef, to: Class[_]): Unit = system.eventStream.subscribe(subscriber, to) + def subscribe(subscriber: ActorRef, to: Class[_]): Unit = + clusterCore ! InternalClusterAction.Subscribe(subscriber, to) /** - * Subscribe to cluster domain events. + * Unsubscribe to cluster domain events. */ - def unsubscribe(subscriber: ActorRef): Unit = system.eventStream.unsubscribe(subscriber) + def unsubscribe(subscriber: ActorRef): Unit = + clusterCore ! InternalClusterAction.Unsubscribe(subscriber) /** * Try to join this cluster node with the node specified by 'address'. @@ -289,6 +304,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // ===================== INTERNAL API ===================== // ======================================================== + /** + * INTERNAL API + */ + private[cluster] def latestStats: ClusterStats = _latestStats + /** * INTERNAL API. * @@ -301,9 +321,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (_isRunning.compareAndSet(true, false)) { log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) - system.stop(clusterDaemons) - unsubscribe(eventBusListener) system.stop(eventBusListener) + system.stop(clusterDaemons) scheduler.close() @@ -313,32 +332,5 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } } - /** - * INTERNAL API - */ - private[cluster] def latestStats: ClusterStats = _latestStats - -} - -/** - * Domain events published to the cluster event bus. - */ -object ClusterEvent { - /** - * Marker interface for cluster domain events. - */ - trait ClusterDomainEvent - - /** - * Set of cluster members, or their status has changed. - */ - case class MembersChanged(members: SortedSet[Member]) extends ClusterDomainEvent - - case class MembershipGossipChanged(gossip: Gossip) extends ClusterDomainEvent - /** - * INTERNAL API - */ - private[cluster] case class InternalStatsChanged(stats: ClusterStats) extends ClusterDomainEvent - } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 9016349a84..1d7a2a7514 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -14,6 +14,7 @@ import akka.util.Timeout import akka.pattern.{ AskTimeoutException, ask, pipe } import akka.cluster.MemberStatus._ import akka.cluster.ClusterEvent._ +import language.existentials /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -82,7 +83,7 @@ private[cluster] object InternalClusterAction { case object LeaderActionsTick - case object PublishStateTick + case object PublishStatsTick case class SendClusterMessage(to: Address, msg: ClusterMessage) @@ -90,6 +91,9 @@ private[cluster] object InternalClusterAction { case object GetClusterCoreRef + case class Subscribe(subscriber: ActorRef, to: Class[_]) + case class Unsubscribe(subscriber: ActorRef) + case class Ping(timestamp: Long = System.currentTimeMillis) extends ClusterMessage case class Pong(ping: Ping, timestamp: Long = System.currentTimeMillis) extends ClusterMessage @@ -205,9 +209,9 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) // start periodic publish of current state private val publishStateTask: Option[Cancellable] = - if (PublishStateInterval == Duration.Zero) None - else Some(FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(PublishStateInterval), PublishStateInterval) { - self ! PublishStateTick + if (PublishStatsInterval == Duration.Zero) None + else Some(FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval), PublishStatsInterval) { + self ! PublishStatsTick }) override def preStart(): Unit = { @@ -229,7 +233,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) case HeartbeatTick ⇒ heartbeat() case ReapUnreachableTick ⇒ reapUnreachableMembers() case LeaderActionsTick ⇒ leaderActions() - case PublishStateTick ⇒ publishState() + case PublishStatsTick ⇒ publishInternalStats() case JoinSeedNode ⇒ joinSeedNode() case InitJoin ⇒ initJoin() case InitJoinAck(address) ⇒ join(address) @@ -241,6 +245,8 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) case Exit(address) ⇒ exiting(address) case Remove(address) ⇒ removing(address) case SendGossipTo(address) ⇒ gossipTo(address) + case Subscribe(subscriber, to) ⇒ subscribe(subscriber, to) + case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) case p: Ping ⇒ ping(p) } @@ -802,23 +808,63 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress) coreSender ! SendClusterMessage(address, gossipMsg) + def subscribe(subscriber: ActorRef, to: Class[_]): Unit = { + subscriber ! CurrentClusterState( + members = latestGossip.members, + unreachable = latestGossip.overview.unreachable, + convergence = latestGossip.convergence, + seenBy = latestGossip.seenBy, + leader = latestGossip.leader) + eventStream.subscribe(subscriber, to) + } + + def unsubscribe(subscriber: ActorRef): Unit = + eventStream.unsubscribe(subscriber) + def publish(oldGossip: Gossip): Unit = { - if (PublishStateInterval == Duration.Zero) publishState() - publishMembers(oldGossip.members) + publishMembers(oldGossip) + publishUnreachableMembers(oldGossip) + publishLeader(oldGossip) + publishSeen(oldGossip) + if (PublishStatsInterval == Duration.Zero) publishInternalStats() } - def publishState(): Unit = { - eventStream publish MembershipGossipChanged(latestGossip) - eventStream publish InternalStatsChanged(stats) - } - - def publishMembers(oldMembers: SortedSet[Member]): Unit = { - val oldMembersStatus = oldMembers.map(m ⇒ (m.address, m.status)) - val newMembersStatus = latestGossip.members.map(m ⇒ (m.address, m.status)) - if (newMembersStatus != oldMembersStatus) + def publishMembers(oldGossip: Gossip): Unit = { + if (!isSame(oldGossip.members, latestGossip.members)) eventStream publish MembersChanged(latestGossip.members) } + def publishUnreachableMembers(oldGossip: Gossip): Unit = { + if (!isSame(oldGossip.overview.unreachable, latestGossip.overview.unreachable)) + eventStream publish UnreachableMembersChanged(latestGossip.overview.unreachable) + } + + def isSame(oldMembers: Set[Member], newMembers: Set[Member]): Boolean = { + def oldMembersStatus = oldMembers.map(m ⇒ (m.address, m.status)) + def newMembersStatus = newMembers.map(m ⇒ (m.address, m.status)) + (newMembers eq oldMembers) || ((newMembers.size == oldMembers.size) && (newMembersStatus == oldMembersStatus)) + } + + def publishLeader(oldGossip: Gossip): Unit = { + if (latestGossip.leader != oldGossip.leader) + eventStream publish LeaderChanged(latestGossip.leader) + } + + def publishSeen(oldGossip: Gossip): Unit = { + val oldConvergence = oldGossip.convergence + val newConvergence = latestGossip.convergence + val oldSeenBy = oldGossip.seenBy + val newSeenBy = latestGossip.seenBy + + if (newConvergence != oldConvergence || newSeenBy != oldSeenBy) { + eventStream publish SeenChanged(newConvergence, newSeenBy) + } + } + + def publishInternalStats(): Unit = { + eventStream publish CurrentInternalStats(stats) + } + def eventStream: EventStream = context.system.eventStream def ping(p: Ping): Unit = sender ! Pong(p) @@ -843,6 +889,49 @@ private[cluster] final class ClusterCoreSender(selfAddress: Address) extends Act } } +/** + * Domain events published to the event bus. + */ +object ClusterEvent { + /** + * Marker interface for cluster domain events. + */ + trait ClusterDomainEvent + + /** + * Current snapshot state of the cluster. Sent to new subscriber. + */ + case class CurrentClusterState( + members: SortedSet[Member] = SortedSet.empty, + unreachable: Set[Member] = Set.empty, + convergence: Boolean = false, + seenBy: Set[Address] = Set.empty, + leader: Option[Address] = None) extends ClusterDomainEvent + + /** + * Set of cluster members or their status have changed. + */ + case class MembersChanged(members: SortedSet[Member]) extends ClusterDomainEvent + + /** + * Set of unreachable cluster members or their status have changed. + */ + case class UnreachableMembersChanged(unreachable: Set[Member]) extends ClusterDomainEvent + + /** + * The nodes that have seen current version of the Gossip. + */ + case class SeenChanged(convergence: Boolean, seenBy: Set[Address]) extends ClusterDomainEvent + + case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent + + /** + * INTERNAL API + */ + private[cluster] case class CurrentInternalStats(stats: ClusterStats) extends ClusterDomainEvent + +} + /** * INTERNAL API */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala index 6a01057e7d..f0b77808f7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala @@ -57,9 +57,8 @@ private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) { * }}} */ def getClusterStatus: String = { - val gossip = clusterNode.latestGossip - val unreachable = gossip.overview.unreachable - "\nMembers:\n\t" + gossip.members.mkString("\n\t") + + val unreachable = clusterNode.unreachableMembers + "\nMembers:\n\t" + clusterNode.members.mkString("\n\t") + { if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } } @@ -69,7 +68,7 @@ private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) { def isSingleton: Boolean = clusterNode.isSingletonCluster - def isConvergence: Boolean = clusterNode.convergence.isDefined + def isConvergence: Boolean = clusterNode.convergence def isAvailable: Boolean = clusterNode.isAvailable diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index b8f5463529..544b48870a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -32,7 +32,7 @@ class ClusterSettings(val config: Config, val systemName: String) { final val HeartbeatInterval: Duration = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) final val LeaderActionsInterval: Duration = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) final val UnreachableNodesReaperInterval: Duration = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) - final val PublishStateInterval: Duration = Duration(getMilliseconds("akka.cluster.publish-state-interval"), MILLISECONDS) + final val PublishStatsInterval: Duration = Duration(getMilliseconds("akka.cluster.publish-stats-interval"), MILLISECONDS) final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join") final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down") final val JoinTimeout: Duration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS) diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 800fa6b584..b975034c66 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -8,11 +8,16 @@ import akka.actor.Address import scala.collection.immutable.SortedSet import MemberStatus._ -object Gossip { +/** + * Internal API + */ +private[cluster] object Gossip { val emptyMembers: SortedSet[Member] = SortedSet.empty } /** + * INTERNAL API + * * Represents the state of the cluster; cluster ring membership, ring convergence - * all versioned by a vector clock. * @@ -43,7 +48,7 @@ object Gossip { * `Removed` by removing it from the `members` set and sending a `Removed` command to the * removed node telling it to shut itself down. */ -case class Gossip( +private[cluster] case class Gossip( overview: GossipOverview = GossipOverview(), members: SortedSet[Member] = Gossip.emptyMembers, // sorted set of members with their status, sorted by address version: VectorClock = VectorClock()) // vector clock version @@ -95,6 +100,15 @@ case class Gossip( else this copy (overview = overview copy (seen = overview.seen + (address -> version))) } + /** + * The nodes that have seen current version of the Gossip. + */ + def seenBy: Set[Address] = { + overview.seen.collect { + case (address, vclock) if vclock == version ⇒ address + }.toSet + } + /** * Merges two Gossip instances including membership tables, and the VectorClock histories. */ @@ -147,8 +161,7 @@ case class Gossip( !hasUnreachable && allMembersInSeen && seenSame } - def isLeader(address: Address): Boolean = - members.nonEmpty && (address == members.head.address) + def isLeader(address: Address): Boolean = leader == Some(address) def leader: Option[Address] = members.headOption.map(_.address) @@ -179,9 +192,10 @@ case class Gossip( } /** + * INTERNAL API * Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes. */ -case class GossipOverview( +private[cluster] case class GossipOverview( seen: Map[Address, VectorClock] = Map.empty, unreachable: Set[Member] = Set.empty) { @@ -195,13 +209,15 @@ case class GossipOverview( } /** + * INTERNAL API * Envelope adding a sender address to the gossip. */ -case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage +private[cluster] case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage /** + * INTERNAL API * When conflicting versions of received and local [[akka.cluster.Gossip]] is detected * it's forwarded to the leader for conflict resolution. */ -case class GossipMergeConflict(a: GossipEnvelope, b: GossipEnvelope) extends ClusterMessage +private[cluster] case class GossipMergeConflict(a: GossipEnvelope, b: GossipEnvelope) extends ClusterMessage diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index 8112aeab25..d6ac36fd09 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -50,7 +50,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec enterBarrier("down-third-node") awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) - cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false) + cluster.members.exists(_.address == thirdAddress) must be(false) } runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index 4c65e85054..910d8e25e4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -48,7 +48,7 @@ abstract class ClientDowningNodeThatIsUpSpec markNodeAsUnavailable(thirdAddress) awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) - cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false) + cluster.members.exists(_.address == thirdAddress) must be(false) } runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index f29bb97f49..aa55cd6162 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -65,15 +65,15 @@ abstract class ConvergenceSpec within(28 seconds) { // third becomes unreachable - awaitCond(cluster.latestGossip.overview.unreachable.size == 1) - awaitCond(cluster.latestGossip.members.size == 2) - awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitCond(cluster.unreachableMembers.size == 1) + awaitCond(cluster.members.size == 2) + awaitCond(cluster.members.forall(_.status == MemberStatus.Up)) awaitSeenSameState(first, second) // still one unreachable - cluster.latestGossip.overview.unreachable.size must be(1) - cluster.latestGossip.overview.unreachable.head.address must be(thirdAddress) + cluster.unreachableMembers.size must be(1) + cluster.unreachableMembers.head.address must be(thirdAddress) // and therefore no convergence - cluster.convergence.isDefined must be(false) + cluster.convergence must be(false) } } @@ -88,18 +88,18 @@ abstract class ConvergenceSpec } def memberStatus(address: Address): Option[MemberStatus] = - cluster.latestGossip.members.collectFirst { case m if m.address == address ⇒ m.status } + cluster.members.collectFirst { case m if m.address == address ⇒ m.status } def assertNotMovedUp: Unit = { within(20 seconds) { - awaitCond(cluster.latestGossip.members.size == 3) + awaitCond(cluster.members.size == 3) awaitSeenSameState(first, second, fourth) memberStatus(first) must be(Some(MemberStatus.Up)) memberStatus(second) must be(Some(MemberStatus.Up)) // leader is not allowed to move the new node to Up memberStatus(fourth) must be(Some(MemberStatus.Joining)) // still no convergence - cluster.convergence.isDefined must be(false) + cluster.convergence must be(false) } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index 6bc8ba0de8..aa8a45751c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -40,7 +40,7 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { auto-join = off auto-down = on failure-detector.acceptable-heartbeat-pause = 10s - publish-state-interval = 0 s # always, when it happens + publish-stats-interval = 0 s # always, when it happens } akka.loglevel = INFO akka.actor.default-dispatcher.fork-join-executor { @@ -164,7 +164,7 @@ abstract class LargeClusterSpec Await.ready(latch, remaining) - awaitCond(clusterNodes.forall(_.convergence.isDefined)) + awaitCond(clusterNodes.forall(_.convergence)) val counts = clusterNodes.map(gossipCount(_)) val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / clusterNodes.size, counts.min, counts.max) log.info("Convergence of [{}] nodes reached, it took [{}], received [{}] gossip messages per node", @@ -276,15 +276,23 @@ abstract class LargeClusterSpec val latch = TestLatch(nodesPerDatacenter) systems foreach { sys ⇒ Cluster(sys).subscribe(sys.actorOf(Props(new Actor { + var gotExpectedLiveNodes = false + var gotExpectedUnreachableNodes = false def receive = { - case MembersChanged(members) ⇒ - if (!latch.isOpen && members.size == liveNodes && Cluster(sys).latestGossip.overview.unreachable.size == unreachableNodes) { - log.info("Detected [{}] unreachable nodes in [{}], it took [{}], received [{}] gossip messages", - unreachableNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys))) - latch.countDown() - } + case MembersChanged(members) if !latch.isOpen ⇒ + gotExpectedLiveNodes = members.size == liveNodes + checkDone() + case UnreachableMembersChanged(unreachable) if !latch.isOpen ⇒ + gotExpectedUnreachableNodes = unreachable.size == unreachableNodes + checkDone() + case _ ⇒ // not interesting } - })), classOf[MembersChanged]) + def checkDone(): Unit = if (gotExpectedLiveNodes && gotExpectedUnreachableNodes) { + log.info("Detected [{}] unreachable nodes in [{}], it took [{}], received [{}] gossip messages", + unreachableNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys))) + latch.countDown() + } + })), classOf[ClusterDomainEvent]) } runOn(firstDatacenter) { @@ -295,7 +303,7 @@ abstract class LargeClusterSpec runOn(firstDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) { Await.ready(latch, remaining) - awaitCond(systems.forall(Cluster(_).convergence.isDefined)) + awaitCond(systems.forall(Cluster(_).convergence)) val mergeCount = systems.map(sys ⇒ Cluster(sys).latestStats.mergeCount).sum val counts = systems.map(sys ⇒ gossipCount(Cluster(sys))) val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / nodesPerDatacenter, counts.min, counts.max) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala index 844f1be226..a5cc5f3b4d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala @@ -44,7 +44,7 @@ abstract class LeaderLeavingSpec awaitClusterUp(first, second, third) - val oldLeaderAddress = cluster.leader + val oldLeaderAddress = cluster.leader.get within(leaderHandoffWaitingTime) { @@ -90,10 +90,10 @@ abstract class LeaderLeavingSpec exitingLatch.await // verify that the LEADER is no longer part of the 'members' set - awaitCond(cluster.latestGossip.members.forall(_.address != oldLeaderAddress)) + awaitCond(cluster.members.forall(_.address != oldLeaderAddress)) // verify that the LEADER is not part of the 'unreachable' set - awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != oldLeaderAddress)) + awaitCond(cluster.unreachableMembers.forall(_.address != oldLeaderAddress)) // verify that we have a new LEADER awaitCond(cluster.leader != oldLeaderAddress) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 2362da8aef..91f0e82939 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -29,7 +29,7 @@ object MultiNodeClusterSpec { leader-actions-interval = 200 ms unreachable-nodes-reaper-interval = 200 ms periodic-tasks-initial-delay = 300 ms - publish-state-interval = 0 s # always, when it happens + publish-stats-interval = 0 s # always, when it happens } akka.test { single-expect-default = 5 s @@ -106,9 +106,9 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu * Use this method for the initial startup of the cluster node. */ def startClusterNode(): Unit = { - if (cluster.latestGossip.members.isEmpty) { + if (cluster.members.isEmpty) { cluster join myself - awaitCond(cluster.latestGossip.members.exists(_.address == address(myself))) + awaitCond(cluster.members.exists(_.address == address(myself))) } else cluster.self } @@ -181,25 +181,20 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu canNotBePartOfMemberRing: Seq[Address] = Seq.empty[Address], timeout: Duration = 20.seconds): Unit = { within(timeout) { - awaitCond(cluster.latestGossip.members.size == numberOfMembers) - awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) - awaitCond(cluster.convergence.isDefined) + awaitCond(cluster.members.size == numberOfMembers) + awaitCond(cluster.members.forall(_.status == MemberStatus.Up)) + awaitCond(cluster.convergence) if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set awaitCond( - canNotBePartOfMemberRing forall (address ⇒ !(cluster.latestGossip.members exists (_.address == address)))) + canNotBePartOfMemberRing forall (address ⇒ !(cluster.members exists (_.address == address)))) } } /** * Wait until the specified nodes have seen the same gossip overview. */ - def awaitSeenSameState(addresses: Address*): Unit = { - awaitCond { - val seen = cluster.latestGossip.overview.seen - val seenVectorClocks = addresses.flatMap(seen.get(_)) - seenVectorClocks.size == addresses.size && seenVectorClocks.toSet.size == 1 - } - } + def awaitSeenSameState(addresses: Address*): Unit = + awaitCond((addresses.toSet -- cluster.seenBy).isEmpty) def roleOfLeader(nodesInCluster: Seq[RoleName] = roles): RoleName = { nodesInCluster.length must not be (0) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala index 1a35af6411..0476e52516 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala @@ -42,7 +42,7 @@ abstract class NodeJoinSpec cluster.join(first) } - awaitCond(cluster.latestGossip.members.exists { member ⇒ member.address == address(second) && member.status == MemberStatus.Joining }) + awaitCond(cluster.members.exists { member ⇒ member.address == address(second) && member.status == MemberStatus.Joining }) enterBarrier("after") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala index d5c374ba64..e9730dee69 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala @@ -43,10 +43,10 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec runOn(first, third) { // verify that the 'second' node is no longer part of the 'members' set - awaitCond(cluster.latestGossip.members.forall(_.address != address(second)), reaperWaitingTime) + awaitCond(cluster.members.forall(_.address != address(second)), reaperWaitingTime) // verify that the 'second' node is not part of the 'unreachable' set - awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != address(second)), reaperWaitingTime) + awaitCond(cluster.unreachableMembers.forall(_.address != address(second)), reaperWaitingTime) } runOn(second) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index 364edca08b..9bb78e3539 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -38,12 +38,12 @@ abstract class NodeMembershipSpec runOn(first, second) { cluster.join(first) - awaitCond(cluster.latestGossip.members.size == 2) - assertMembers(cluster.latestGossip.members, first, second) + awaitCond(cluster.members.size == 2) + assertMembers(cluster.members, first, second) awaitCond { - cluster.latestGossip.members.forall(_.status == MemberStatus.Up) + cluster.members.forall(_.status == MemberStatus.Up) } - awaitCond(cluster.convergence.isDefined) + awaitCond(cluster.convergence) } enterBarrier("after-1") @@ -55,12 +55,12 @@ abstract class NodeMembershipSpec cluster.join(first) } - awaitCond(cluster.latestGossip.members.size == 3) - assertMembers(cluster.latestGossip.members, first, second, third) + awaitCond(cluster.members.size == 3) + assertMembers(cluster.members, first, second, third) awaitCond { - cluster.latestGossip.members.forall(_.status == MemberStatus.Up) + cluster.members.forall(_.status == MemberStatus.Up) } - awaitCond(cluster.convergence.isDefined) + awaitCond(cluster.convergence) enterBarrier("after-2") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala index 1510663784..a07d9059f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -60,7 +60,7 @@ abstract class NodeUpSpec for (n ← 1 to 20) { Thread.sleep(100.millis.dilated.toMillis) unexpected.get must be(SortedSet.empty) - cluster.latestGossip.members.forall(_.status == MemberStatus.Up) must be(true) + cluster.members.forall(_.status == MemberStatus.Up) must be(true) } enterBarrier("after-2") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala index c2c517fd16..0a09db0546 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -78,10 +78,10 @@ abstract class SplitBrainSpec } runOn(side1: _*) { - awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side2.toSet map address), 20 seconds) + awaitCond(cluster.unreachableMembers.map(_.address) == (side2.toSet map address), 20 seconds) } runOn(side2: _*) { - awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side1.toSet map address), 20 seconds) + awaitCond(cluster.unreachableMembers.map(_.address) == (side1.toSet map address), 20 seconds) } enterBarrier("after-2") @@ -91,16 +91,16 @@ abstract class SplitBrainSpec runOn(side1: _*) { // auto-down = on - awaitCond(cluster.latestGossip.overview.unreachable.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) - cluster.latestGossip.overview.unreachable.map(_.address) must be(side2.toSet map address) + awaitCond(cluster.unreachableMembers.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) + cluster.unreachableMembers.map(_.address) must be(side2.toSet map address) awaitUpConvergence(side1.size, side2 map address) assertLeader(side1: _*) } runOn(side2: _*) { // auto-down = on - awaitCond(cluster.latestGossip.overview.unreachable.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) - cluster.latestGossip.overview.unreachable.map(_.address) must be(side1.toSet map address) + awaitCond(cluster.unreachableMembers.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) + cluster.unreachableMembers.map(_.address) must be(side1.toSet map address) awaitUpConvergence(side2.size, side1 map address) assertLeader(side2: _*) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index c86622ce8b..5fac9f3f8f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -42,23 +42,18 @@ abstract class TransitionSpec def nonLeader(roles: RoleName*) = roles.toSeq.sorted.tail def memberStatus(address: Address): MemberStatus = { - val statusOption = (cluster.latestGossip.members ++ cluster.latestGossip.overview.unreachable).collectFirst { + val statusOption = (cluster.members ++ cluster.unreachableMembers).collectFirst { case m if m.address == address ⇒ m.status } statusOption must not be (None) statusOption.get } - def memberAddresses: Set[Address] = cluster.latestGossip.members.map(_.address) + def memberAddresses: Set[Address] = cluster.members.map(_.address) def members: Set[RoleName] = memberAddresses.flatMap(roleName(_)) - def seenLatestGossip: Set[RoleName] = { - val gossip = cluster.latestGossip - gossip.overview.seen.collect { - case (address, v) if v == gossip.version ⇒ roleName(address) - }.flatten.toSet - } + def seenLatestGossip: Set[RoleName] = cluster.seenBy flatMap roleName def awaitSeen(addresses: Address*): Unit = awaitCond { (seenLatestGossip map address) == addresses.toSet @@ -95,9 +90,11 @@ abstract class TransitionSpec def gossipTo(toRole: RoleName): Unit = { gossipBarrierCounter += 1 runOn(toRole) { - val g = cluster.latestGossip + val oldCount = cluster.latestStats.receivedGossipCount enterBarrier("before-gossip-" + gossipBarrierCounter) - awaitCond(cluster.latestGossip != g) // received gossip + awaitCond { + cluster.latestStats.receivedGossipCount != oldCount // received gossip + } // gossip chat will synchronize the views awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty) enterBarrier("after-gossip-" + gossipBarrierCounter) @@ -125,7 +122,7 @@ abstract class TransitionSpec startClusterNode() cluster.isSingletonCluster must be(true) cluster.status must be(Joining) - cluster.convergence.isDefined must be(true) + cluster.convergence must be(true) leaderActions() cluster.status must be(Up) } @@ -144,7 +141,7 @@ abstract class TransitionSpec memberStatus(first) must be(Up) memberStatus(second) must be(Joining) awaitCond(seenLatestGossip == Set(first, second)) - cluster.convergence.isDefined must be(true) + cluster.convergence must be(true) } enterBarrier("convergence-joining-2") @@ -161,7 +158,7 @@ abstract class TransitionSpec awaitCond(memberStatus(second) == Up) seenLatestGossip must be(Set(first, second)) memberStatus(first) must be(Up) - cluster.convergence.isDefined must be(true) + cluster.convergence must be(true) } enterBarrier("after-2") @@ -177,7 +174,7 @@ abstract class TransitionSpec awaitMembers(first, second, third) memberStatus(third) must be(Joining) awaitCond(seenLatestGossip == Set(second, third)) - cluster.convergence.isDefined must be(false) + cluster.convergence must be(false) } enterBarrier("third-joined-second") @@ -188,7 +185,7 @@ abstract class TransitionSpec memberStatus(third) must be(Joining) awaitCond(memberStatus(second) == Up) seenLatestGossip must be(Set(first, second, third)) - cluster.convergence.isDefined must be(true) + cluster.convergence must be(true) } first gossipTo third @@ -198,7 +195,7 @@ abstract class TransitionSpec memberStatus(second) must be(Up) memberStatus(third) must be(Joining) seenLatestGossip must be(Set(first, second, third)) - cluster.convergence.isDefined must be(true) + cluster.convergence must be(true) } enterBarrier("convergence-joining-3") @@ -216,7 +213,7 @@ abstract class TransitionSpec runOn(nonLeader(first, second, third).head) { memberStatus(third) must be(Up) seenLatestGossip must be(Set(leader(first, second, third), myself)) - cluster.convergence.isDefined must be(false) + cluster.convergence must be(false) } // first non-leader gossipTo the other non-leader @@ -228,7 +225,7 @@ abstract class TransitionSpec runOn(nonLeader(first, second, third).tail.head) { memberStatus(third) must be(Up) seenLatestGossip must be(Set(first, second, third)) - cluster.convergence.isDefined must be(true) + cluster.convergence must be(true) } // first non-leader gossipTo the leader @@ -238,7 +235,7 @@ abstract class TransitionSpec memberStatus(second) must be(Up) memberStatus(third) must be(Up) seenLatestGossip must be(Set(first, second, third)) - cluster.convergence.isDefined must be(true) + cluster.convergence must be(true) } enterBarrier("after-3") @@ -248,7 +245,7 @@ abstract class TransitionSpec runOn(third) { markNodeAsUnavailable(second) reapUnreachable() - cluster.latestGossip.overview.unreachable must contain(Member(second, Up)) + cluster.unreachableMembers must contain(Member(second, Up)) seenLatestGossip must be(Set(third)) } @@ -257,8 +254,8 @@ abstract class TransitionSpec third gossipTo first runOn(first, third) { - cluster.latestGossip.overview.unreachable must contain(Member(second, Up)) - cluster.convergence.isDefined must be(false) + cluster.unreachableMembers must contain(Member(second, Up)) + cluster.convergence must be(false) } runOn(first) { @@ -271,10 +268,10 @@ abstract class TransitionSpec first gossipTo third runOn(first, third) { - cluster.latestGossip.overview.unreachable must contain(Member(second, Down)) + cluster.unreachableMembers must contain(Member(second, Down)) memberStatus(second) must be(Down) seenLatestGossip must be(Set(first, third)) - cluster.convergence.isDefined must be(true) + cluster.convergence must be(true) } enterBarrier("after-6") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index 0ef9dd603f..f20173bf4f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -80,13 +80,13 @@ abstract class UnreachableNodeRejoinsClusterSpec within(30 seconds) { // victim becomes all alone awaitCond({ - val gossip = cluster.latestGossip - gossip.overview.unreachable.size == (roles.size - 1) && - gossip.members.size == 1 && - gossip.members.forall(_.status == MemberStatus.Up) + val members = cluster.members + cluster.unreachableMembers.size == (roles.size - 1) && + members.size == 1 && + members.forall(_.status == MemberStatus.Up) }) - cluster.latestGossip.overview.unreachable.map(_.address) must be((allButVictim map address).toSet) - cluster.convergence.isDefined must be(false) + cluster.unreachableMembers.map(_.address) must be((allButVictim map address).toSet) + cluster.convergence must be(false) } } @@ -95,17 +95,17 @@ abstract class UnreachableNodeRejoinsClusterSpec within(30 seconds) { // victim becomes unreachable awaitCond({ - val gossip = cluster.latestGossip - gossip.overview.unreachable.size == 1 && - gossip.members.size == (roles.size - 1) && - gossip.members.forall(_.status == MemberStatus.Up) + val members = cluster.members + cluster.unreachableMembers.size == 1 && + members.size == (roles.size - 1) && + members.forall(_.status == MemberStatus.Up) }) awaitSeenSameState(allButVictim map address: _*) // still one unreachable - cluster.latestGossip.overview.unreachable.size must be(1) - cluster.latestGossip.overview.unreachable.head.address must be(node(victim).address) + cluster.unreachableMembers.size must be(1) + cluster.unreachableMembers.head.address must be(node(victim).address) // and therefore no convergence - cluster.convergence.isDefined must be(false) + cluster.convergence must be(false) } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 8d420dc021..2d7565f5f5 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -31,7 +31,7 @@ class ClusterConfigSpec extends AkkaSpec { HeartbeatInterval must be(1 second) LeaderActionsInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second) - PublishStateInterval must be(1 second) + PublishStatsInterval must be(10 second) JoinTimeout must be(60 seconds) AutoJoin must be(true) AutoDown must be(false) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index dd3fe83de9..8e6035e6d1 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -25,7 +25,7 @@ object ClusterSpec { auto-join = off auto-down = off periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks - publish-state-interval = 0 s # always, when it happens + publish-stats-interval = 0 s # always, when it happens } akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.remote.netty.port = 0 @@ -70,13 +70,14 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { } "initially become singleton cluster when joining itself and reach convergence" in { - cluster.isSingletonCluster must be(false) // auto-join = off + cluster.members.size must be(0) // auto-join = off cluster.join(selfAddress) + Thread.sleep(5000) awaitCond(cluster.isSingletonCluster) cluster.self.address must be(selfAddress) - cluster.latestGossip.members.map(_.address) must be(Set(selfAddress)) + cluster.members.map(_.address) must be(Set(selfAddress)) cluster.status must be(MemberStatus.Joining) - cluster.convergence.isDefined must be(true) + cluster.convergence must be(true) leaderActions() cluster.status must be(MemberStatus.Up) }