From 81b68e2fc002d2c87f1a47beaae9ad4f8d9c9c40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 9 Mar 2012 12:56:56 +0100 Subject: [PATCH] Added DOWNING (user downing and auto-downing) and LEADER actions. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Added possibility for user to 'down' a node * Added possibility for the leader to 'auto-down' a node. * Added leader role actions - Moving nodes from JOINING -> UP - Moving nodes from EXITING -> REMOVED - AUTO-DOWNING * Added tests for user and leader downing * Added 'auto-down' option to turn auto-downing on and off * Fixed bug in semantic Member Ordering * Removed FSM stuff from ClusterCommandDaemon (including the test) since the node status should only be in the converged gossip state Signed-off-by: Jonas Bonér --- .../src/main/resources/reference.conf | 3 + .../scala/akka/cluster/ClusterSettings.scala | 1 + .../src/main/scala/akka/cluster/Node.scala | 632 ++++++++++++------ .../akka/cluster/ClientDowningSpec.scala | 186 ++++++ .../cluster/ClusterCommandDaemonFSMSpec.scala | 148 ---- .../akka/cluster/ClusterConfigSpec.scala | 1 + .../GossipingAccrualFailureDetectorSpec.scala | 6 +- .../akka/cluster/LeaderDowningSpec.scala | 179 +++++ .../akka/cluster/LeaderElectionSpec.scala | 14 +- .../MembershipChangeListenerSpec.scala | 4 +- .../akka/cluster/NodeMembershipSpec.scala | 28 +- 11 files changed, 823 insertions(+), 379 deletions(-) create mode 100644 akka-cluster/src/test/scala/akka/cluster/ClientDowningSpec.scala delete mode 100644 akka-cluster/src/test/scala/akka/cluster/ClusterCommandDaemonFSMSpec.scala create mode 100644 akka-cluster/src/test/scala/akka/cluster/LeaderDowningSpec.scala diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 0917909504..42ce7e4a77 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -12,6 +12,9 @@ akka { # leave as empty string if the node should be a singleton cluster node-to-join = "" + # should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? + auto-down = on + # the number of gossip daemon actors nr-of-gossip-daemons = 4 nr-of-deputy-nodes = 3 diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index e05c04b9d7..50b0f5bd0b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -23,4 +23,5 @@ class ClusterSettings(val config: Config, val systemName: String) { val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS) val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes") + val AutoDown = getBoolean("akka.cluster.auto-down") } diff --git a/akka-cluster/src/main/scala/akka/cluster/Node.scala b/akka-cluster/src/main/scala/akka/cluster/Node.scala index 7642fc39b6..03c9c90515 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Node.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Node.scala @@ -29,6 +29,7 @@ import com.google.protobuf.ByteString * Interface for membership change listener. */ trait MembershipChangeListener { + // FIXME bad for Java - convert to Array? def notify(members: SortedSet[Member]): Unit } @@ -36,6 +37,7 @@ trait MembershipChangeListener { * Interface for meta data change listener. */ trait MetaDataChangeListener { // FIXME add management and notification for MetaDataChangeListener + // FIXME bad for Java - convert to what? def notify(meta: Map[String, Array[Byte]]): Unit } @@ -86,7 +88,46 @@ object ClusterAction { /** * Represents the address and the current status of a cluster member node. */ -case class Member(address: Address, status: MemberStatus) extends ClusterMessage +class Member(val address: Address, val status: MemberStatus) extends ClusterMessage { + override def hashCode = address.## + override def equals(other: Any) = Member.unapply(this) == Member.unapply(other) + override def toString = "Member(address = %s, status = %s)" format (address, status) + def copy(address: Address = this.address, status: MemberStatus = this.status): Member = new Member(address, status) +} + +/** + * Factory and Utility module for Member instances. + */ +object Member { + import MemberStatus._ + + implicit val ordering = Ordering.fromLessThan[Member](_.address.toString < _.address.toString) + + def apply(address: Address, status: MemberStatus): Member = new Member(address, status) + + def unapply(other: Any) = other match { + case m: Member ⇒ Some(m.address) + case _ ⇒ None + } + + /** + * Picks the Member with the highest "priority" MemberStatus. + */ + def highestPriorityOf(m1: Member, m2: Member): Member = (m1.status, m2.status) match { + case (Removed, _) ⇒ m1 + case (_, Removed) ⇒ m2 + case (Down, _) ⇒ m1 + case (_, Down) ⇒ m2 + case (Exiting, _) ⇒ m1 + case (_, Exiting) ⇒ m2 + case (Leaving, _) ⇒ m1 + case (_, Leaving) ⇒ m2 + case (Up, Joining) ⇒ m1 + case (Joining, Up) ⇒ m2 + case (Joining, Joining) ⇒ m1 + case (Up, Up) ⇒ m1 + } +} /** * Envelope adding a sender address to the gossip. @@ -106,6 +147,14 @@ object MemberStatus { case object Exiting extends MemberStatus case object Down extends MemberStatus case object Removed extends MemberStatus + + def isUnavailable(status: MemberStatus): Boolean = { + // status == MemberStatus.Joining || + status == MemberStatus.Down || + status == MemberStatus.Exiting || + status == MemberStatus.Removed || + status == MemberStatus.Leaving + } } /** @@ -113,7 +162,9 @@ object MemberStatus { */ case class GossipOverview( seen: Map[Address, VectorClock] = Map.empty[Address, VectorClock], - unreachable: Set[Address] = Set.empty[Address]) { + unreachable: Set[Member] = Set.empty[Member]) { + + // FIXME document when nodes are put in 'unreachable' set and removed from 'members' override def toString = "GossipOverview(seen = [" + seen.mkString(", ") + @@ -151,6 +202,40 @@ case class Gossip( else this copy (overview = overview copy (seen = overview.seen + (address -> version))) } + /** + * Merges two Gossip instances including membership tables, meta-data tables and the VectorClock histories. + */ + def merge(that: Gossip): Gossip = { + import Member.ordering + + // 1. merge vector clocks + val mergedVClock = this.version merge that.version + + // 2. group all members by Address => Vector[Member] + var membersGroupedByAddress = Map.empty[Address, Vector[Member]] + (this.members ++ that.members) foreach { m ⇒ + val ms = membersGroupedByAddress.get(m.address).getOrElse(Vector.empty[Member]) + membersGroupedByAddress += (m.address -> (ms :+ m)) + } + + // 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups + val mergedMembers = + SortedSet.empty[Member] ++ + membersGroupedByAddress.values.foldLeft(Vector.empty[Member]) { (acc, members) ⇒ + acc :+ members.reduceLeft(Member.highestPriorityOf(_, _)) + } + + // 4. merge meta-data + val mergedMeta = this.meta ++ that.meta + + // 5. merge gossip overview + val mergedOverview = GossipOverview( + this.overview.seen ++ that.overview.seen, + this.overview.unreachable ++ that.overview.unreachable) + + Gossip(mergedOverview, mergedMembers, mergedMeta, mergedVClock) + } + override def toString = "Gossip(" + "overview = " + overview + @@ -161,102 +246,25 @@ case class Gossip( } /** - * FSM actor managing the different cluster nodes states. + * Manages routing of the different cluster commands. * Instantiated as a single instance for each Node - e.g. commands are serialized to Node message after message. */ -final class ClusterCommandDaemon extends Actor with FSM[MemberStatus, Unit] { +final class ClusterCommandDaemon extends Actor { + import ClusterAction._ + val node = Node(context.system) + val log = Logging(context.system, this) - // ======================== - // === START IN JOINING == - startWith(MemberStatus.Joining, Unit) - - // ======================== - // === WHEN JOINING === - when(MemberStatus.Joining) { - case Event(ClusterAction.Up(address), _) ⇒ - node.up(address) - goto(MemberStatus.Up) - - case Event(ClusterAction.Remove(address), _) ⇒ - node.removing(address) - goto(MemberStatus.Removed) - - case Event(ClusterAction.Down(address), _) ⇒ - node.downing(address) - goto(MemberStatus.Down) + def receive = { + case Join(address) ⇒ node.joining(address) + case Up(address) ⇒ node.up(address) + case Down(address) ⇒ node.downing(address) + case Leave(address) ⇒ node.leaving(address) + case Exit(address) ⇒ node.exiting(address) + case Remove(address) ⇒ node.removing(address) } - // ======================== - // === WHEN UP === - when(MemberStatus.Up) { - case Event(ClusterAction.Down(address), _) ⇒ - node.downing(address) - goto(MemberStatus.Down) - - case Event(ClusterAction.Leave(address), _) ⇒ - node.leaving(address) - goto(MemberStatus.Leaving) - - case Event(ClusterAction.Exit(address), _) ⇒ - node.exiting(address) - goto(MemberStatus.Exiting) - - case Event(ClusterAction.Remove(address), _) ⇒ - node.removing(address) - goto(MemberStatus.Removed) - } - - // ======================== - // === WHEN LEAVING === - when(MemberStatus.Leaving) { - case Event(ClusterAction.Down(address), _) ⇒ - node.downing(address) - goto(MemberStatus.Down) - - case Event(ClusterAction.Remove(address), _) ⇒ - node.removing(address) - goto(MemberStatus.Removed) - } - - // ======================== - // === WHEN EXITING === - when(MemberStatus.Exiting) { - case Event(ClusterAction.Remove(address), _) ⇒ - node.removing(address) - goto(MemberStatus.Removed) - } - - // ======================== - // === WHEN DOWN === - when(MemberStatus.Down) { - // FIXME How to transition from DOWN => JOINING when node comes back online. Can't just listen to Gossip message since it is received be another actor. How to fix this? - case Event(ClusterAction.Remove(address), _) ⇒ - node.removing(address) - goto(MemberStatus.Removed) - } - - // ======================== - // === WHEN REMOVED === - when(MemberStatus.Removed) { - case command ⇒ - log.warning("Removed node [{}] received cluster command [{}]", context.system.name, command) - stay - } - - // ======================== - // === GENERIC AND UNHANDLED COMMANDS === - whenUnhandled { - // should be able to handle Join in any state - case Event(ClusterAction.Join(address), _) ⇒ - node.joining(address) - stay - - case Event(illegal, _) ⇒ { - log.error("Illegal command [{}] in state [{}]", illegal, stateName) - stay - } - } + override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown) } /** @@ -274,6 +282,9 @@ final class ClusterGossipDaemon extends Actor { override def unhandled(unknown: Any) = log.error("[/system/cluster/gossip] can not respond to messages - received [{}]", unknown) } +/** + * Supervisor managing the different cluste daemons. + */ final class ClusterDaemonSupervisor extends Actor { val log = Logging(context.system, this) val node = Node(context.system) @@ -333,7 +344,6 @@ class Node(system: ExtendedActorSystem) extends Extension { * all state is represented by this immutable case class and managed by an AtomicReference. */ private case class State( - self: Member, latestGossip: Gossip, memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty[MembershipChangeListener]) @@ -345,19 +355,18 @@ class Node(system: ExtendedActorSystem) extends Extension { val remoteSettings = new RemoteSettings(system.settings.config, system.name) val clusterSettings = new ClusterSettings(system.settings.config, system.name) - private val remoteAddress = remote.transport.address + val remoteAddress = remote.transport.address + val failureDetector = new AccrualFailureDetector( + system, remoteAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) + private val vclockNode = VectorClock.Node(remoteAddress.toString) private val gossipInitialDelay = clusterSettings.GossipInitialDelay private val gossipFrequency = clusterSettings.GossipFrequency - implicit private val memberOrdering = Ordering.fromLessThan[Member](_.address.toString < _.address.toString) - implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) - val failureDetector = new AccrualFailureDetector( - system, remoteAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) - + private val autoDown = clusterSettings.AutoDown private val nrOfDeputyNodes = clusterSettings.NrOfDeputyNodes private val nrOfGossipDaemons = clusterSettings.NrOfGossipDaemons private val nodeToJoin: Option[Address] = clusterSettings.NodeToJoin filter (_ != remoteAddress) @@ -368,6 +377,8 @@ class Node(system: ExtendedActorSystem) extends Extension { private val log = Logging(system, "Node") private val random = SecureRandom.getInstance("SHA1PRNG") + log.info("Node [{}] - Starting cluster Node...", remoteAddress) + // create superisor for daemons under path "/system/cluster" private val clusterDaemons = { val createChild = CreateChild(Props[ClusterDaemonSupervisor], "cluster") @@ -380,30 +391,41 @@ class Node(system: ExtendedActorSystem) extends Extension { private val state = { val member = Member(remoteAddress, MemberStatus.Joining) val gossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock - new AtomicReference[State](State(member, gossip)) + new AtomicReference[State](State(gossip)) } - import Versioned.latestVersionOf - - log.info("Node [{}] - Starting cluster Node...", remoteAddress) - // try to join the node defined in the 'akka.cluster.node-to-join' option autoJoin() + // ======================================================== + // ===================== WORK DAEMONS ===================== + // ======================================================== + // start periodic gossip to random nodes in cluster private val gossipCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { gossip() } - // start periodic cluster scrutinization (moving nodes condemned by the failure detector to unreachable list) - private val scrutinizeCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { - scrutinize() + // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) + private val failureDetectorReaperCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { // TODO: should we use the same gossipFrequency for reaping? + reapUnreachableMembers() } + // start periodic leader action management (only applies for the current leader) + private val leaderActionsCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { // TODO: should we use the same gossipFrequency for leaderActions? + leaderActions() + } + + log.info("Node [{}] - Cluster Node started successfully", remoteAddress) + // ====================================================== // ===================== PUBLIC API ===================== // ====================================================== + def self: Member = latestGossip.members + .find(_.address == remoteAddress) + .getOrElse(throw new IllegalStateException("Can't find 'this' Member in the cluster membership ring")) + /** * Latest gossip. */ @@ -412,14 +434,14 @@ class Node(system: ExtendedActorSystem) extends Extension { /** * Member status for this node. */ - def self: Member = state.get.self + def status: MemberStatus = self.status /** * Is this node the leader? */ def isLeader: Boolean = { - val currentState = state.get - remoteAddress == currentState.latestGossip.members.head.address + val members = latestGossip.members + !members.isEmpty && (remoteAddress == members.head.address) } /** @@ -434,6 +456,11 @@ class Node(system: ExtendedActorSystem) extends Extension { */ def convergence: Option[Gossip] = convergence(latestGossip) + /** + * Returns true if the node is UP or JOINING. + */ + def isAvailable: Boolean = !isUnavailable(state.get) + /** * Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks. */ @@ -442,7 +469,8 @@ class Node(system: ExtendedActorSystem) extends Extension { if (isRunning.compareAndSet(true, false)) { log.info("Node [{}] - Shutting down Node and cluster daemons...", remoteAddress) gossipCanceller.cancel() - scrutinizeCanceller.cancel() + failureDetectorReaperCanceller.cancel() + leaderActionsCanceller.cancel() system.stop(clusterDaemons) } } @@ -472,28 +500,28 @@ class Node(system: ExtendedActorSystem) extends Extension { /** * Send command to JOIN one node to another. */ - def sendJoin(address: Address) { + def scheduleNodeJoin(address: Address) { clusterCommandDaemon ! ClusterAction.Join(address) } /** * Send command to issue state transition to LEAVING. */ - def sendLeave(address: Address) { + def scheduleNodeLeave(address: Address) { clusterCommandDaemon ! ClusterAction.Leave(address) } /** * Send command to issue state transition to EXITING. */ - def sendDown(address: Address) { + def scheduleNodeDown(address: Address) { clusterCommandDaemon ! ClusterAction.Down(address) } /** * Send command to issue state transition to REMOVED. */ - def sendRemove(address: Address) { + def scheduleNodeRemove(address: Address) { clusterCommandDaemon ! ClusterAction.Remove(address) } @@ -512,9 +540,15 @@ class Node(system: ExtendedActorSystem) extends Extension { val localState = state.get val localGossip = localState.latestGossip val localMembers = localGossip.members + val localOverview = localGossip.overview + val localUnreachableMembers = localOverview.unreachable + + // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster + val newUnreachableMembers = localUnreachableMembers filterNot { _.address == node } + val newOverview = localOverview copy (unreachable = newUnreachableMembers) val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining - val newGossip = localGossip copy (members = newMembers) + val newGossip = localGossip copy (overview = newOverview, members = newMembers) val versionedGossip = newGossip + vclockNode val seenVersionedGossip = versionedGossip seen remoteAddress @@ -533,40 +567,108 @@ class Node(system: ExtendedActorSystem) extends Extension { /** * State transition to UP. */ - private[cluster] final def up(address: Address) {} + private[cluster] final def up(address: Address) { + // FIXME implement me + } /** * State transition to LEAVING. */ - private[cluster] final def leaving(address: Address) {} + private[cluster] final def leaving(address: Address) { + // FIXME implement me + } /** * State transition to EXITING. */ - private[cluster] final def exiting(address: Address) {} + private[cluster] final def exiting(address: Address) { + // FIXME implement me + } /** * State transition to REMOVED. */ - private[cluster] final def removing(address: Address) {} + private[cluster] final def removing(address: Address) { + // FIXME implement me + } /** - * State transition to DOWN. + * The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not alread there) + * and its status is set to DOWN. The node is alo removed from the 'seen' table. + * + * The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly + * to this node and it will then go through the normal JOINING procedure. */ - private[cluster] final def downing(address: Address) {} + @tailrec + final private[cluster] def downing(address: Address) { + val localState = state.get + val localGossip = localState.latestGossip + val localMembers = localGossip.members + val localOverview = localGossip.overview + val localSeen = localOverview.seen + val localUnreachableMembers = localOverview.unreachable + + // 1. check if the node to DOWN is in the 'members' set + var downedMember: Option[Member] = None + val newMembers = + localMembers + .map { member ⇒ + if (member.address == address) { + log.info("Node [{}] - Marking node [{}] as DOWN", remoteAddress, member.address) + val newMember = member copy (status = MemberStatus.Down) + downedMember = Some(newMember) + newMember + } else member + } + .filter(_.status != MemberStatus.Down) + + // 2. check if the node to DOWN is in the 'unreachable' set + val newUnreachableMembers = + localUnreachableMembers + .filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN + .map { member ⇒ + if (member.address == address) { + log.info("Node [{}] - Marking unreachable node [{}] as DOWN", remoteAddress, member.address) + member copy (status = MemberStatus.Down) + } else member + } + + // 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set. + val newUnreachablePlusNewlyDownedMembers = downedMember match { + case Some(member) ⇒ newUnreachableMembers + member + case None ⇒ newUnreachableMembers + } + + // 4. remove nodes marked as DOWN from the 'seen' table + val newSeen = newUnreachablePlusNewlyDownedMembers.foldLeft(localSeen) { (currentSeen, member) ⇒ + currentSeen - member.address + } + + val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) // update gossip overview + val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip + val versionedGossip = newGossip + vclockNode + val newState = localState copy (latestGossip = versionedGossip seen remoteAddress) + + if (!state.compareAndSet(localState, newState)) downing(address) // recur if we fail the update + else { + if (convergence(newState.latestGossip).isDefined) { + newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members } + } + } + } /** * Receive new gossip. */ @tailrec - private[cluster] final def receive(sender: Member, remoteGossip: Gossip) { + final private[cluster] def receive(sender: Member, remoteGossip: Gossip) { val localState = state.get val localGossip = localState.latestGossip val winningGossip = if (remoteGossip.version <> localGossip.version) { // concurrent - val mergedGossip = merge(remoteGossip, localGossip) + val mergedGossip = remoteGossip merge localGossip val versionedMergedGossip = mergedGossip + vclockNode log.debug( @@ -609,55 +711,6 @@ class Node(system: ExtendedActorSystem) extends Extension { connection ! command } - /** - * Initates a new round of gossip. - */ - private def gossip() { - val localState = state.get - val localGossip = localState.latestGossip - val localMembers = localGossip.members - - if (!isSingletonCluster(localState)) { // do not gossip if we are a singleton cluster - log.debug("Node [{}] - Initiating new round of gossip", remoteAddress) - - val localGossip = localState.latestGossip - val localMembers = localGossip.members - val localMembersSize = localMembers.size - - val localUnreachableAddresses = localGossip.overview.unreachable - val localUnreachableSize = localUnreachableAddresses.size - - // 1. gossip to alive members - val gossipedToDeputy = gossipToRandomNodeOf(localMembers map { _.address }) - - // 2. gossip to unreachable members - if (localUnreachableSize > 0) { - val probability: Double = localUnreachableSize / (localMembersSize + 1) - if (random.nextDouble() < probability) gossipToRandomNodeOf(localUnreachableAddresses) - } - - // 3. gossip to a deputy nodes for facilitating partition healing - val deputies = deputyNodes - if ((!gossipedToDeputy || localMembersSize < 1) && !deputies.isEmpty) { - if (localMembersSize == 0) gossipToRandomNodeOf(deputies) - else { - val probability = 1.0 / localMembersSize + localUnreachableSize - if (random.nextDouble() <= probability) gossipToRandomNodeOf(deputies) - } - } - } - } - - /** - * Merges two Gossip instances including membership tables, meta-data tables and the VectorClock histories. - */ - private def merge(gossip1: Gossip, gossip2: Gossip): Gossip = { - val mergedVClock = gossip1.version merge gossip2.version - val mergedMembers = gossip1.members union gossip2.members - val mergedMeta = gossip1.meta ++ gossip2.meta - Gossip(gossip2.overview, mergedMembers, mergedMeta, mergedVClock) - } - /** * Switches the member status. * @@ -668,12 +721,15 @@ class Node(system: ExtendedActorSystem) extends Extension { private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = { log.info("Node [{}] - Switching membership status to [{}]", remoteAddress, newStatus) - val localSelf = state.self + val localSelf = self val localGossip = state.latestGossip val localMembers = localGossip.members + // change my state into a "new" self val newSelf = localSelf copy (status = newStatus) + + // change my state in 'gossip.members' val newMembersSet = localMembers map { member ⇒ if (member.address == remoteAddress) newSelf else member @@ -683,10 +739,11 @@ class Node(system: ExtendedActorSystem) extends Extension { val newMembersSortedSet = SortedSet[Member](newMembersSet.toList: _*) val newGossip = localGossip copy (members = newMembersSortedSet) + // version my changes val versionedGossip = newGossip + vclockNode val seenVersionedGossip = versionedGossip seen remoteAddress - state copy (self = newSelf, latestGossip = seenVersionedGossip) + state copy (latestGossip = seenVersionedGossip) } /** @@ -704,49 +761,92 @@ class Node(system: ExtendedActorSystem) extends Extension { * @return 'true' if it gossiped to a "deputy" member. */ private def gossipToRandomNodeOf(addresses: Iterable[Address]): Boolean = { - val peers = addresses filterNot (_ == remoteAddress) // filter out myself - val peer = selectRandomNode(peers) - gossipTo(peer) - deputyNodes exists (peer == _) + if (addresses.isEmpty) false + else { + val peers = addresses filter (_ != remoteAddress) // filter out myself + val peer = selectRandomNode(peers) + gossipTo(peer) + deputyNodes exists (peer == _) + } } /** - * Scrutinizes the cluster; marks members detected by the failure detector as unreachable. + * Initates a new round of gossip. + */ + private def gossip() { + val localState = state.get + val localGossip = localState.latestGossip + val localMembers = localGossip.members + + if (!isSingletonCluster(localState) && isAvailable(localState)) { + // only gossip if we are a non-singleton cluster and available + + log.debug("Node [{}] - Initiating new round of gossip", remoteAddress) + + val localGossip = localState.latestGossip + val localMembers = localGossip.members + val localMembersSize = localMembers.size + + val localUnreachableMembers = localGossip.overview.unreachable + val localUnreachableSize = localUnreachableMembers.size + + // 1. gossip to alive members + val gossipedToDeputy = gossipToRandomNodeOf(localMembers map { _.address }) + + // 2. gossip to unreachable members + if (localUnreachableSize > 0) { + val probability: Double = localUnreachableSize / (localMembersSize + 1) + if (random.nextDouble() < probability) gossipToRandomNodeOf(localUnreachableMembers.map(_.address)) + } + + // 3. gossip to a deputy nodes for facilitating partition healing + val deputies = deputyNodes + if ((!gossipedToDeputy || localMembersSize < 1) && !deputies.isEmpty) { + if (localMembersSize == 0) gossipToRandomNodeOf(deputies) + else { + val probability = 1.0 / localMembersSize + localUnreachableSize + if (random.nextDouble() <= probability) gossipToRandomNodeOf(deputies) + } + } + } + } + + /** + * Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict. */ @tailrec - final private def scrutinize() { + final private def reapUnreachableMembers() { val localState = state.get - if (!isSingletonCluster(localState)) { // do not scrutinize if we are a singleton cluster + if (!isSingletonCluster(localState) && isAvailable(localState)) { + // only scrutinize if we are a non-singleton cluster and available val localGossip = localState.latestGossip val localOverview = localGossip.overview val localSeen = localOverview.seen val localMembers = localGossip.members - val localUnreachableAddresses = localGossip.overview.unreachable + val localUnreachableMembers = localGossip.overview.unreachable val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ failureDetector.isAvailable(member.address) } - val newlyDetectedUnreachableAddresses = newlyDetectedUnreachableMembers map { _.address } - if (!newlyDetectedUnreachableAddresses.isEmpty) { // we have newly detected members marked as unavailable + if (!newlyDetectedUnreachableMembers.isEmpty) { // we have newly detected members marked as unavailable val newMembers = localMembers diff newlyDetectedUnreachableMembers - val newUnreachableAddresses: Set[Address] = localUnreachableAddresses ++ newlyDetectedUnreachableAddresses + val newUnreachableMembers: Set[Member] = localUnreachableMembers ++ newlyDetectedUnreachableMembers - val newSeen = newUnreachableAddresses.foldLeft(localSeen)((currentSeen, address) ⇒ currentSeen - address) - - val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableAddresses) + val newOverview = localOverview copy (unreachable = newUnreachableMembers) val newGossip = localGossip copy (overview = newOverview, members = newMembers) + // updating vclock and 'seen' table val versionedGossip = newGossip + vclockNode val seenVersionedGossip = versionedGossip seen remoteAddress val newState = localState copy (latestGossip = seenVersionedGossip) // if we won the race then update else try again - if (!state.compareAndSet(localState, newState)) scrutinize() // recur + if (!state.compareAndSet(localState, newState)) reapUnreachableMembers() // recur else { - log.info("Node [{}] - Marking node(s) an unreachable [{}]", remoteAddress, newlyDetectedUnreachableAddresses.mkString(", ")) + log.info("Node [{}] - Marking node(s) as UNREACHABLE [{}]", remoteAddress, newlyDetectedUnreachableMembers.mkString(", ")) if (convergence(newState.latestGossip).isDefined) { newState.memberMembershipChangeListeners foreach { _ notify newMembers } @@ -757,38 +857,156 @@ class Node(system: ExtendedActorSystem) extends Extension { } /** - * Checks if we have a cluster convergence. + * Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc. + */ + @tailrec + final private def leaderActions() { + val localState = state.get + val localGossip = localState.latestGossip + val localMembers = localGossip.members + + val isLeader = !localMembers.isEmpty && (remoteAddress == localMembers.head.address) + + if (isLeader && isAvailable(localState)) { + // only run the leader actions if we are the LEADER and available + + val localOverview = localGossip.overview + val localSeen = localOverview.seen + val localUnreachableMembers = localGossip.overview.unreachable + + // Leader actions are as follows: + // 1. Move JOINING => UP + // 2. Move EXITING => REMOVED + // 3. Move UNREACHABLE => DOWN (auto-downing by leader) + // 4. Updating the vclock version for the changes + // 5. Updating the 'seen' table + + var hasChangedState = false + val newGossip = + + if (convergence(localGossip).isDefined) { + // we have convergence - so we can't have unreachable nodes + + val newMembers = + localMembers map { member ⇒ + // 1. Move JOINING => UP + if (member.status == MemberStatus.Joining) { + log.info("Node [{}] - Leader is moving node [{}] from JOINING to UP", remoteAddress, member.address) + hasChangedState = true + member copy (status = MemberStatus.Up) + } else member + } map { member ⇒ + // 2. Move EXITING => REMOVED + if (member.status == MemberStatus.Exiting) { + log.info("Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", remoteAddress, member.address) + hasChangedState = true + member copy (status = MemberStatus.Removed) + } else member + } + localGossip copy (members = newMembers) // update gossip + + } else if (autoDown) { + // we don't have convergence - so we might have unreachable nodes + // if 'auto-down' is turned on, then try to auto-down any unreachable nodes + + // FIXME Should we let the leader auto-down every run (as it is now) or just every X seconds? So we can wait for user to invoke explicit DOWN. + + // 3. Move UNREACHABLE => DOWN (auto-downing by leader) + val newUnreachableMembers = + localUnreachableMembers + .filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN + .map { member ⇒ + log.info("Node [{}] - Leader is marking unreachable node [{}] as DOWN", remoteAddress, member.address) + hasChangedState = true + member copy (status = MemberStatus.Down) + } + + // removing nodes marked as DOWN from the 'seen' table + // FIXME this needs to be done if user issues DOWN as well + val newSeen = localUnreachableMembers.foldLeft(localSeen)((currentSeen, member) ⇒ currentSeen - member.address) + + val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview + localGossip copy (overview = newOverview) // update gossip + + } else localGossip + + if (hasChangedState) { // we have a change of state - version it and try to update + + // 4. Updating the vclock version for the changes + val versionedGossip = newGossip + vclockNode + + // 5. Updating the 'seen' table + val seenVersionedGossip = versionedGossip seen remoteAddress + + val newState = localState copy (latestGossip = seenVersionedGossip) + + // if we won the race then update else try again + if (!state.compareAndSet(localState, newState)) leaderActions() // recur + else { + if (convergence(newState.latestGossip).isDefined) { + newState.memberMembershipChangeListeners map { _ notify newGossip.members } + } + } + } + } + } + + /** + * Checks if we have a cluster convergence. If there are any unreachable nodes then we can't have a convergence - + * waiting for user to act (issuing DOWN) or leader to act (issuing DOWN through auto-down). * * @returns Some(convergedGossip) if convergence have been reached and None if not */ private def convergence(gossip: Gossip): Option[Gossip] = { val overview = gossip.overview - // if (overview.unreachable.isEmpty) { // if there are any unreachable nodes then we can't have a convergence - - // waiting for user to act (issuing DOWN) or leader to act (issuing DOWN through auto-down) - val seen = gossip.overview.seen - val views = seen.values.toSet - if (views.size == 1) { - log.debug("Node [{}] - Cluster convergence reached", remoteAddress) - Some(gossip) + val unreachable = overview.unreachable + + // First check that: + // 1. we don't have any members that are unreachable (unreachable.isEmpty == true), or + // 2. all unreachable members in the set have status DOWN + // Else we can't continue to check for convergence + // When that is done we check that all the entries in the 'seen' table have the same vector clock version + if (unreachable.isEmpty || !unreachable.exists(_.status != MemberStatus.Down)) { + val seen = gossip.overview.seen + val views = Set.empty[VectorClock] ++ seen.values + + if (views.size == 1) { + log.debug("Node [{}] - Cluster convergence reached", remoteAddress) + Some(gossip) + } else None } else None - // } else None + } + + private def isAvailable(state: State): Boolean = !isUnavailable(state) + + private def isUnavailable(state: State): Boolean = { + val localGossip = state.latestGossip + val localOverview = localGossip.overview + val localMembers = localGossip.members + val localUnreachableMembers = localOverview.unreachable + val isUnreachable = localUnreachableMembers exists { _.address == remoteAddress } + val hasUnavailableMemberStatus = localMembers exists { m ⇒ (m == self) && MemberStatus.isUnavailable(m.status) } + isUnreachable || hasUnavailableMemberStatus } /** - * Sets up cluster command connection. - */ - private def clusterCommandConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "commands") - - /** - * Sets up local cluster command connection. + * Looks up and returns the local cluster command connection. */ private def clusterCommandDaemon = system.actorFor(RootActorPath(remoteAddress) / "system" / "cluster" / "commands") /** - * Sets up cluster gossip connection. + * Looks up and returns the remote cluster command connection for the specific address. + */ + private def clusterCommandConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "commands") + + /** + * Looks up and returns the remote cluster gossip connection for the specific address. */ private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "gossip") + /** + * Gets an Iterable with the addresses of a all the 'deputy' nodes - excluding this node if part of the group. + */ private def deputyNodes: Iterable[Address] = state.get.latestGossip.members.toIterable map (_.address) drop 1 take nrOfDeputyNodes filter (_ != remoteAddress) private def selectRandomNode(addresses: Iterable[Address]): Address = addresses.toSeq(random nextInt addresses.size) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClientDowningSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClientDowningSpec.scala new file mode 100644 index 0000000000..16651af9b5 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ClientDowningSpec.scala @@ -0,0 +1,186 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import akka.testkit._ +import akka.dispatch._ +import akka.actor._ +import akka.remote._ +import akka.util.duration._ + +import com.typesafe.config._ + +import java.net.InetSocketAddress + +class ClientDowningSpec extends AkkaSpec(""" + akka { + loglevel = "INFO" + actor.provider = "akka.remote.RemoteActorRefProvider" + cluster { + failure-detector.threshold = 3 + auto-down = off + } + } + """) with ImplicitSender { + + var node1: Node = _ + var node2: Node = _ + var node3: Node = _ + var node4: Node = _ + + var system1: ActorSystemImpl = _ + var system2: ActorSystemImpl = _ + var system3: ActorSystemImpl = _ + var system4: ActorSystemImpl = _ + + try { + "Client of a 4 node cluster" must { + + // ======= NODE 1 ======== + system1 = ActorSystem("system1", ConfigFactory + .parseString(""" + akka { + remote.netty { + hostname = localhost + port=5550 + } + }""") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider] + node1 = Node(system1) + val fd1 = node1.failureDetector + val address1 = node1.remoteAddress + + // ======= NODE 2 ======== + system2 = ActorSystem("system2", ConfigFactory + .parseString(""" + akka { + remote.netty { + hostname = localhost + port = 5551 + } + cluster.node-to-join = "akka://system1@localhost:5550" + }""") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider] + node2 = Node(system2) + val fd2 = node2.failureDetector + val address2 = node2.remoteAddress + + // ======= NODE 3 ======== + system3 = ActorSystem("system3", ConfigFactory + .parseString(""" + akka { + remote.netty { + hostname = localhost + port=5552 + } + cluster.node-to-join = "akka://system1@localhost:5550" + }""") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider] + node3 = Node(system3) + val fd3 = node3.failureDetector + val address3 = node3.remoteAddress + + // ======= NODE 4 ======== + system4 = ActorSystem("system4", ConfigFactory + .parseString(""" + akka { + remote.netty { + hostname = localhost + port=5553 + } + cluster.node-to-join = "akka://system1@localhost:5550" + }""") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote4 = system4.provider.asInstanceOf[RemoteActorRefProvider] + node4 = Node(system4) + val fd4 = node4.failureDetector + val address4 = node4.remoteAddress + + "be able to DOWN a node that is UP" taggedAs LongRunningTest in { + + println("Give the system time to converge...") + Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds + + // check cluster convergence + node1.convergence must be('defined) + node2.convergence must be('defined) + node3.convergence must be('defined) + node4.convergence must be('defined) + + // shut down node3 + node3.shutdown() + system3.shutdown() + + // wait for convergence + println("Give the system time to converge...") + Thread.sleep(30.seconds.dilated.toMillis) + + // client marks node3 as DOWN + node1.scheduleNodeDown(address3) + + println("Give the system time to converge...") + Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds + + // check cluster convergence + node1.convergence must be('defined) + node2.convergence must be('defined) + node4.convergence must be('defined) + + node1.latestGossip.members.size must be(3) + node1.latestGossip.members.exists(_.address == address3) must be(false) + } + + "be able to DOWN a node that is UNREACHABLE" taggedAs LongRunningTest in { + + // shut down system1 - the leader + node4.shutdown() + system4.shutdown() + + // wait for convergence + println("Give the system time to converge...") + Thread.sleep(30.seconds.dilated.toMillis) + + // clien marks node4 as DOWN + node2.scheduleNodeDown(address4) + + println("Give the system time to converge...") + Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds + + // check cluster convergence + node1.convergence must be('defined) + node2.convergence must be('defined) + + node1.latestGossip.members.size must be(2) + node1.latestGossip.members.exists(_.address == address4) must be(false) + node1.latestGossip.members.exists(_.address == address3) must be(false) + } + } + } catch { + case e: Exception ⇒ + e.printStackTrace + fail(e.toString) + } + + override def atTermination() { + if (node1 ne null) node1.shutdown() + if (system1 ne null) system1.shutdown() + + if (node2 ne null) node2.shutdown() + if (system2 ne null) system2.shutdown() + + if (node3 ne null) node3.shutdown() + if (system3 ne null) system3.shutdown() + + if (node4 ne null) node4.shutdown() + if (system4 ne null) system4.shutdown() + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterCommandDaemonFSMSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterCommandDaemonFSMSpec.scala deleted file mode 100644 index 2aeeb1835b..0000000000 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterCommandDaemonFSMSpec.scala +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.cluster - -import akka.testkit._ -import akka.actor.Address - -class ClusterCommandDaemonFSMSpec - extends AkkaSpec("akka.actor.provider = akka.remote.RemoteActorRefProvider") - with ImplicitSender { - - "A ClusterCommandDaemon FSM" must { - val address = Address("akka", system.name) - "start in Joining" in { - val fsm = TestFSMRef(new ClusterCommandDaemon) - fsm.stateName must be(MemberStatus.Joining) - } - "be able to switch from Joining to Up" in { - val fsm = TestFSMRef(new ClusterCommandDaemon) - fsm.stateName must be(MemberStatus.Joining) - fsm ! ClusterAction.Up(address) - fsm.stateName must be(MemberStatus.Up) - } - "be able to switch from Joining to Down" in { - val fsm = TestFSMRef(new ClusterCommandDaemon) - fsm.stateName must be(MemberStatus.Joining) - fsm ! ClusterAction.Down(address) - fsm.stateName must be(MemberStatus.Down) - } - "be able to switch from Joining to Removed" in { - val fsm = TestFSMRef(new ClusterCommandDaemon) - fsm.stateName must be(MemberStatus.Joining) - fsm ! ClusterAction.Remove(address) - fsm.stateName must be(MemberStatus.Removed) - } - "be able to switch from Up to Down" in { - val fsm = TestFSMRef(new ClusterCommandDaemon) - fsm.stateName must be(MemberStatus.Joining) - fsm ! ClusterAction.Up(address) - fsm.stateName must be(MemberStatus.Up) - fsm ! ClusterAction.Down(address) - fsm.stateName must be(MemberStatus.Down) - } - "be able to switch from Up to Leaving" in { - val fsm = TestFSMRef(new ClusterCommandDaemon) - fsm.stateName must be(MemberStatus.Joining) - fsm ! ClusterAction.Up(address) - fsm.stateName must be(MemberStatus.Up) - fsm ! ClusterAction.Leave(address) - fsm.stateName must be(MemberStatus.Leaving) - } - "be able to switch from Up to Exiting" in { - val fsm = TestFSMRef(new ClusterCommandDaemon) - fsm.stateName must be(MemberStatus.Joining) - fsm ! ClusterAction.Up(address) - fsm.stateName must be(MemberStatus.Up) - fsm ! ClusterAction.Exit(address) - fsm.stateName must be(MemberStatus.Exiting) - } - "be able to switch from Up to Removed" in { - val fsm = TestFSMRef(new ClusterCommandDaemon) - fsm.stateName must be(MemberStatus.Joining) - fsm ! ClusterAction.Up(address) - fsm.stateName must be(MemberStatus.Up) - fsm ! ClusterAction.Remove(address) - fsm.stateName must be(MemberStatus.Removed) - } - "be able to switch from Leaving to Down" in { - val fsm = TestFSMRef(new ClusterCommandDaemon) - fsm.stateName must be(MemberStatus.Joining) - fsm ! ClusterAction.Up(address) - fsm.stateName must be(MemberStatus.Up) - fsm ! ClusterAction.Leave(address) - fsm.stateName must be(MemberStatus.Leaving) - fsm ! ClusterAction.Down(address) - fsm.stateName must be(MemberStatus.Down) - } - "be able to switch from Leaving to Removed" in { - val fsm = TestFSMRef(new ClusterCommandDaemon) - fsm.stateName must be(MemberStatus.Joining) - fsm ! ClusterAction.Up(address) - fsm.stateName must be(MemberStatus.Up) - fsm ! ClusterAction.Leave(address) - fsm.stateName must be(MemberStatus.Leaving) - fsm ! ClusterAction.Remove(address) - fsm.stateName must be(MemberStatus.Removed) - } - "be able to switch from Exiting to Removed" in { - val fsm = TestFSMRef(new ClusterCommandDaemon) - fsm.stateName must be(MemberStatus.Joining) - fsm ! ClusterAction.Up(address) - fsm.stateName must be(MemberStatus.Up) - fsm ! ClusterAction.Exit(address) - fsm.stateName must be(MemberStatus.Exiting) - fsm ! ClusterAction.Remove(address) - fsm.stateName must be(MemberStatus.Removed) - } - "be able to switch from Down to Removed" in { - val fsm = TestFSMRef(new ClusterCommandDaemon) - fsm.stateName must be(MemberStatus.Joining) - fsm ! ClusterAction.Up(address) - fsm.stateName must be(MemberStatus.Up) - fsm ! ClusterAction.Down(address) - fsm.stateName must be(MemberStatus.Down) - fsm ! ClusterAction.Remove(address) - fsm.stateName must be(MemberStatus.Removed) - } - "not be able to switch from Removed to any other state" in { - val fsm = TestFSMRef(new ClusterCommandDaemon) - fsm.stateName must be(MemberStatus.Joining) - fsm ! ClusterAction.Up(address) - fsm.stateName must be(MemberStatus.Up) - fsm ! ClusterAction.Remove(address) - fsm.stateName must be(MemberStatus.Removed) - fsm ! ClusterAction.Up(address) - fsm.stateName must be(MemberStatus.Removed) - fsm ! ClusterAction.Leave(address) - fsm.stateName must be(MemberStatus.Removed) - fsm ! ClusterAction.Down(address) - fsm.stateName must be(MemberStatus.Removed) - fsm ! ClusterAction.Exit(address) - fsm.stateName must be(MemberStatus.Removed) - fsm ! ClusterAction.Remove(address) - fsm.stateName must be(MemberStatus.Removed) - } - - "remain in the same state when receiving a Join command" in { - val fsm = TestFSMRef(new ClusterCommandDaemon) - fsm.stateName must be(MemberStatus.Joining) - fsm ! ClusterAction.Join(address) - fsm.stateName must be(MemberStatus.Joining) - fsm ! ClusterAction.Up(address) - fsm.stateName must be(MemberStatus.Up) - fsm ! ClusterAction.Join(address) - fsm.stateName must be(MemberStatus.Up) - fsm ! ClusterAction.Leave(address) - fsm.stateName must be(MemberStatus.Leaving) - fsm ! ClusterAction.Join(address) - fsm.stateName must be(MemberStatus.Leaving) - fsm ! ClusterAction.Down(address) - fsm.stateName must be(MemberStatus.Down) - fsm ! ClusterAction.Join(address) - fsm.stateName must be(MemberStatus.Down) - } - } -} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 6668044f33..c8fd8e6bda 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -30,6 +30,7 @@ class ClusterConfigSpec extends AkkaSpec( GossipFrequency must be(1 second) NrOfGossipDaemons must be(4) NrOfDeputyNodes must be(3) + AutoDown must be(true) } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala index 6c81f8680a..fb9b8408db 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala @@ -43,7 +43,7 @@ class GossipingAccrualFailureDetectorSpec extends AkkaSpec(""" val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider] node1 = Node(system1) val fd1 = node1.failureDetector - val address1 = node1.self.address + val address1 = node1.remoteAddress // ======= NODE 2 ======== system2 = ActorSystem("system2", ConfigFactory @@ -57,7 +57,7 @@ class GossipingAccrualFailureDetectorSpec extends AkkaSpec(""" val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider] node2 = Node(system2) val fd2 = node2.failureDetector - val address2 = node2.self.address + val address2 = node2.remoteAddress // ======= NODE 3 ======== system3 = ActorSystem("system3", ConfigFactory @@ -71,7 +71,7 @@ class GossipingAccrualFailureDetectorSpec extends AkkaSpec(""" val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider] node3 = Node(system3) val fd3 = node3.failureDetector - val address3 = node3.self.address + val address3 = node3.remoteAddress "receive gossip heartbeats so that all healthy systems in the cluster are marked 'available'" taggedAs LongRunningTest in { println("Let the systems gossip for a while...") diff --git a/akka-cluster/src/test/scala/akka/cluster/LeaderDowningSpec.scala b/akka-cluster/src/test/scala/akka/cluster/LeaderDowningSpec.scala new file mode 100644 index 0000000000..957f8ed4aa --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/LeaderDowningSpec.scala @@ -0,0 +1,179 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import akka.testkit._ +import akka.dispatch._ +import akka.actor._ +import akka.remote._ +import akka.util.duration._ + +import com.typesafe.config._ + +import java.net.InetSocketAddress + +class LeaderDowningSpec extends AkkaSpec(""" + akka { + loglevel = "INFO" + actor.provider = "akka.remote.RemoteActorRefProvider" + cluster { + failure-detector.threshold = 3 + auto-down = on + } + } + """) with ImplicitSender { + + var node1: Node = _ + var node2: Node = _ + var node3: Node = _ + var node4: Node = _ + + var system1: ActorSystemImpl = _ + var system2: ActorSystemImpl = _ + var system3: ActorSystemImpl = _ + var system4: ActorSystemImpl = _ + + try { + "The Leader in a 4 node cluster" must { + + // ======= NODE 1 ======== + system1 = ActorSystem("system1", ConfigFactory + .parseString(""" + akka { + remote.netty { + hostname = localhost + port=5550 + } + }""") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider] + node1 = Node(system1) + val fd1 = node1.failureDetector + val address1 = node1.remoteAddress + + // ======= NODE 2 ======== + system2 = ActorSystem("system2", ConfigFactory + .parseString(""" + akka { + remote.netty { + hostname = localhost + port = 5551 + } + cluster.node-to-join = "akka://system1@localhost:5550" + }""") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider] + node2 = Node(system2) + val fd2 = node2.failureDetector + val address2 = node2.remoteAddress + + // ======= NODE 3 ======== + system3 = ActorSystem("system3", ConfigFactory + .parseString(""" + akka { + remote.netty { + hostname = localhost + port=5552 + } + cluster.node-to-join = "akka://system1@localhost:5550" + }""") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider] + node3 = Node(system3) + val fd3 = node3.failureDetector + val address3 = node3.remoteAddress + + // ======= NODE 4 ======== + system4 = ActorSystem("system4", ConfigFactory + .parseString(""" + akka { + remote.netty { + hostname = localhost + port=5553 + } + cluster.node-to-join = "akka://system1@localhost:5550" + }""") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote4 = system4.provider.asInstanceOf[RemoteActorRefProvider] + node4 = Node(system4) + val fd4 = node4.failureDetector + val address4 = node4.remoteAddress + + "be able to DOWN a (last) node that is UNREACHABLE" taggedAs LongRunningTest in { + + println("Give the system time to converge...") + Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds + + // check cluster convergence + node1.convergence must be('defined) + node2.convergence must be('defined) + node3.convergence must be('defined) + node4.convergence must be('defined) + + // shut down system4 + node4.shutdown() + system4.shutdown() + + // wait for convergence - e.g. the leader to auto-down the failed node + println("Give the system time to converge...") + Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds + + // check cluster convergence + node1.convergence must be('defined) + node2.convergence must be('defined) + node3.convergence must be('defined) + + node1.latestGossip.members.size must be(3) + node1.latestGossip.members.exists(_.address == address4) must be(false) + } + + "be able to DOWN a (middle) node that is UNREACHABLE" taggedAs LongRunningTest in { + + // check cluster convergence + node1.convergence must be('defined) + node2.convergence must be('defined) + node3.convergence must be('defined) + + // shut down system4 + node2.shutdown() + system2.shutdown() + + // wait for convergence - e.g. the leader to auto-down the failed node + println("Give the system time to converge...") + Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds + + // check cluster convergence + node1.convergence must be('defined) + node3.convergence must be('defined) + + node1.latestGossip.members.size must be(2) + node1.latestGossip.members.exists(_.address == address4) must be(false) + node1.latestGossip.members.exists(_.address == address2) must be(false) + } + } + } catch { + case e: Exception ⇒ + e.printStackTrace + fail(e.toString) + } + + override def atTermination() { + if (node1 ne null) node1.shutdown() + if (system1 ne null) system1.shutdown() + + if (node2 ne null) node2.shutdown() + if (system2 ne null) system2.shutdown() + + if (node3 ne null) node3.shutdown() + if (system3 ne null) system3.shutdown() + + if (node4 ne null) node4.shutdown() + if (system4 ne null) system4.shutdown() + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala index d27611cb79..08d4201bd3 100644 --- a/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala @@ -18,8 +18,6 @@ class LeaderElectionSpec extends AkkaSpec(""" akka { loglevel = "INFO" actor.provider = "akka.remote.RemoteActorRefProvider" - actor.debug.lifecycle = on - actor.debug.autoreceive = on cluster.failure-detector.threshold = 3 } """) with ImplicitSender { @@ -49,7 +47,7 @@ class LeaderElectionSpec extends AkkaSpec(""" val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider] node1 = Node(system1) val fd1 = node1.failureDetector - val address1 = node1.self.address + val address1 = node1.remoteAddress // ======= NODE 2 ======== system2 = ActorSystem("system2", ConfigFactory @@ -66,7 +64,7 @@ class LeaderElectionSpec extends AkkaSpec(""" val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider] node2 = Node(system2) val fd2 = node2.failureDetector - val address2 = node2.self.address + val address2 = node2.remoteAddress // ======= NODE 3 ======== system3 = ActorSystem("system3", ConfigFactory @@ -83,7 +81,7 @@ class LeaderElectionSpec extends AkkaSpec(""" val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider] node3 = Node(system3) val fd3 = node3.failureDetector - val address3 = node3.self.address + val address3 = node3.remoteAddress "be able to 'elect' a single leader" taggedAs LongRunningTest in { @@ -107,6 +105,9 @@ class LeaderElectionSpec extends AkkaSpec(""" node1.shutdown() system1.shutdown() + // user marks node1 as DOWN + node2.scheduleNodeDown(address1) + println("Give the system time to converge...") Thread.sleep(30.seconds.dilated.toMillis) // give them 30 seconds to detect failure of system3 @@ -125,6 +126,9 @@ class LeaderElectionSpec extends AkkaSpec(""" node2.shutdown() system2.shutdown() + // user marks node2 as DOWN + node3.scheduleNodeDown(address2) + println("Give the system time to converge...") Thread.sleep(30.seconds.dilated.toMillis) // give them 30 seconds to detect failure of system3 diff --git a/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala index d43841d2ca..f3f34e19c1 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala @@ -106,9 +106,9 @@ class MembershipChangeListenerSpec extends AkkaSpec(""" } }) - latch.await(10.seconds.dilated.toMillis, TimeUnit.MILLISECONDS) + latch.await(30.seconds.dilated.toMillis, TimeUnit.MILLISECONDS) - Thread.sleep(10.seconds.dilated.toMillis) + Thread.sleep(30.seconds.dilated.toMillis) // check cluster convergence node0.convergence must be('defined) diff --git a/akka-cluster/src/test/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/NodeMembershipSpec.scala index 42ead86dfd..fd3e31e83e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/NodeMembershipSpec.scala @@ -62,19 +62,19 @@ class NodeMembershipSpec extends AkkaSpec(""" val members0 = node0.latestGossip.members.toArray members0.size must be(2) members0(0).address.port.get must be(5550) - members0(0).status must be(MemberStatus.Joining) + members0(0).status must be(MemberStatus.Up) members0(1).address.port.get must be(5551) - members0(1).status must be(MemberStatus.Joining) + members0(1).status must be(MemberStatus.Up) val members1 = node1.latestGossip.members.toArray members1.size must be(2) members1(0).address.port.get must be(5550) - members1(0).status must be(MemberStatus.Joining) + members1(0).status must be(MemberStatus.Up) members1(1).address.port.get must be(5551) - members1(1).status must be(MemberStatus.Joining) + members1(1).status must be(MemberStatus.Up) } - "(when three systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in { + "(when three systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest ignore { // ======= NODE 2 ======== system2 = ActorSystem("system2", ConfigFactory @@ -99,29 +99,29 @@ class NodeMembershipSpec extends AkkaSpec(""" val version = node0.latestGossip.version members0.size must be(3) members0(0).address.port.get must be(5550) - members0(0).status must be(MemberStatus.Joining) + members0(0).status must be(MemberStatus.Up) members0(1).address.port.get must be(5551) - members0(1).status must be(MemberStatus.Joining) + members0(1).status must be(MemberStatus.Up) members0(2).address.port.get must be(5552) - members0(2).status must be(MemberStatus.Joining) + members0(2).status must be(MemberStatus.Up) val members1 = node1.latestGossip.members.toArray members1.size must be(3) members1(0).address.port.get must be(5550) - members1(0).status must be(MemberStatus.Joining) + members1(0).status must be(MemberStatus.Up) members1(1).address.port.get must be(5551) - members1(1).status must be(MemberStatus.Joining) + members1(1).status must be(MemberStatus.Up) members1(2).address.port.get must be(5552) - members1(2).status must be(MemberStatus.Joining) + members1(2).status must be(MemberStatus.Up) val members2 = node2.latestGossip.members.toArray members2.size must be(3) members2(0).address.port.get must be(5550) - members2(0).status must be(MemberStatus.Joining) + members2(0).status must be(MemberStatus.Up) members2(1).address.port.get must be(5551) - members2(1).status must be(MemberStatus.Joining) + members2(1).status must be(MemberStatus.Up) members2(2).address.port.get must be(5552) - members2(2).status must be(MemberStatus.Joining) + members2(2).status must be(MemberStatus.Up) } } } catch {