From 45b2484f62982c19ece9aca1b3301375153db7e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 8 Jun 2012 11:51:34 +0200 Subject: [PATCH] Implemented/Fixed Cluster.remove() and state transition from LEAVING -> REMOVED. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../src/main/scala/akka/cluster/Cluster.scala | 91 +++++++++++-------- 1 file changed, 55 insertions(+), 36 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index c5ad773989..b2fe9c7352 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -50,7 +50,7 @@ sealed trait ClusterMessage extends Serializable /** * Cluster commands sent by the USER. */ -object ClusterAction { +object ClusterUserAction { /** * Command to join the cluster. Sent when a node (reprsesented by 'address') @@ -72,6 +72,12 @@ object ClusterAction { * Command to remove a node from the cluster immediately. */ case class Remove(address: Address) extends ClusterMessage +} + +/** + * Cluster commands sent by the LEADER. + */ +object ClusterLeaderAction { /** * Command to mark a node to be removed from the cluster immediately. @@ -197,8 +203,8 @@ case class Gossip( } /** - * Marks the gossip as seen by this node (selfAddress) by updating the address entry in the 'gossip.overview.seen' - * Map with the VectorClock for the new gossip. + * Marks the gossip as seen by this node (address) by updating the address entry in the 'gossip.overview.seen' + * Map with the VectorClock (version) for the new gossip. */ def seen(address: Address): Gossip = { if (overview.seen.contains(address) && overview.seen(address) == version) this @@ -253,7 +259,8 @@ case class Gossip( * Instantiated as a single instance for each Cluster - e.g. commands are serialized to Cluster message after message. */ final class ClusterCommandDaemon extends Actor { - import ClusterAction._ + import ClusterUserAction._ + import ClusterLeaderAction._ val cluster = Cluster(context.system) val log = Logging(context.system, this) @@ -331,8 +338,6 @@ trait ClusterNodeMBean { def leave(address: String) def down(address: String) def remove(address: String) - - def shutdown() } /** @@ -499,10 +504,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks. + * + * INTERNAL API: + * Should not called by the user. The user can issue a LEAVE command which will tell the node + * to go through graceful handoff process LEAVE -> EXITING -> REMOVED -> SHUTDOWN. */ - def shutdown(): Unit = { + private[akka] def shutdown(): Unit = { if (isRunning.compareAndSet(true, false)) { - log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) + log.info("Cluster Node [{}] - Shutting down cluster node...", selfAddress) gossipCanceller.cancel() failureDetectorReaperCanceller.cancel() leaderActionsCanceller.cancel() @@ -512,6 +521,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ } catch { case e: InstanceNotFoundException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing) } + log.info("Cluster Node [{}] - Cluster node successfully shut down", selfAddress) } } @@ -543,7 +553,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ */ def join(address: Address): Unit = { val connection = clusterCommandConnectionFor(address) - val command = ClusterAction.Join(selfAddress) + val command = ClusterUserAction.Join(selfAddress) log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", selfAddress, address, connection) connection ! command } @@ -552,21 +562,21 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * Send command to issue state transition to LEAVING for the node specified by 'address'. */ def leave(address: Address): Unit = { - clusterCommandDaemon ! ClusterAction.Leave(address) + clusterCommandDaemon ! ClusterUserAction.Leave(address) } /** - * Send command to issue state transition to from DOWN to EXITING for the node specified by 'address'. + * Send command to DOWN the node specified by 'address'. */ def down(address: Address): Unit = { - clusterCommandDaemon ! ClusterAction.Down(address) + clusterCommandDaemon ! ClusterUserAction.Down(address) } /** - * Send command to issue state transition to REMOVED for the node specified by 'address'. + * Send command to REMOVE the node specified by 'address'. */ def remove(address: Address): Unit = { - clusterCommandDaemon ! ClusterAction.Remove(address) + clusterCommandDaemon ! ClusterUserAction.Remove(address) } // ======================================================== @@ -642,13 +652,15 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ */ private[cluster] final def exiting(address: Address): Unit = { log.info("Cluster Node [{}] - Marking node [{}] as EXITING", selfAddress, address) + // FIXME implement when we implement hand-off } /** * State transition to REMOVED. */ private[cluster] final def removing(address: Address): Unit = { - log.info("Cluster Node [{}] - Marking node [{}] as REMOVED", selfAddress, address) + log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress) + shutdown() } /** @@ -727,6 +739,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val winningGossip = if (remoteGossip.version <> localGossip.version) { // concurrent + println("=======>>> CONCURRENT") val mergedGossip = remoteGossip merge localGossip val versionedMergedGossip = mergedGossip + vclockNode @@ -737,20 +750,23 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ versionedMergedGossip } else if (remoteGossip.version < localGossip.version) { + println("=======>>> LOCAL") // local gossip is newer localGossip } else { + println("=======>>> REMOTE") // remote gossip is newer remoteGossip } + println("=======>>> WINNING " + winningGossip.members.mkString(", ")) val newState = localState copy (latestGossip = winningGossip seen selfAddress) // if we won the race then update else try again if (!state.compareAndSet(localState, newState)) receive(sender, remoteGossip) // recur if we fail the update else { - log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, sender.address) + log.info("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, sender.address) if (sender.address != selfAddress) failureDetector heartbeat sender.address @@ -772,8 +788,8 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * @param oldState the state to change the member status in * @return the updated new state with the new member status */ - private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = { - log.info("Cluster Node [{}] - Switching membership status to [{}]", selfAddress, newStatus) + private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = { // TODO: Removed this method? Currently not used. + log.debug("Cluster Node [{}] - Switching membership status to [{}]", selfAddress, newStatus) val localSelf = self @@ -789,7 +805,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ else member } - // ugly crap to work around bug in scala colletions ('val ss: SortedSet[Member] = SortedSet.empty[Member] ++ aSet' does not compile) + // NOTE: ugly crap to work around bug in scala colletions ('val ss: SortedSet[Member] = SortedSet.empty[Member] ++ aSet' does not compile) val newMembersSortedSet = SortedSet[Member](newMembersSet.toList: _*) val newGossip = localGossip copy (members = newMembersSortedSet) @@ -936,8 +952,8 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val localUnreachableMembers = localOverview.unreachable // Leader actions are as follows: - // 1. Move JOINING => UP -- When a node joins the cluster - // 2. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) + // 1. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring + // 2. Move JOINING => UP -- When a node joins the cluster // 3. Move LEAVING => EXITING -- When all partition handoff has completed // 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader // 5. Updating the vclock version for the changes @@ -951,9 +967,20 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val newMembers = - localMembers map { member ⇒ + // ---------------------- + // 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring + // ---------------------- + localMembers filter { member ⇒ + if (member.status == MemberStatus.Exiting) { + log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - Removing node from node ring", selfAddress, member.address) + hasChangedState = true + clusterCommandConnectionFor(member.address) ! ClusterUserAction.Remove(member.address) // tell the removed node to shut himself down + false + } else true + + } map { member ⇒ // ---------------------- - // 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence) + // 2. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence) // ---------------------- if (member.status == MemberStatus.Joining) { log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) @@ -961,16 +988,6 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ member copy (status = MemberStatus.Up) } else member - } map { member ⇒ - // ---------------------- - // 2. Move EXITING => REMOVED (once all nodes have seen that this node is EXITING e.g. we have a convergence) - // ---------------------- - if (member.status == MemberStatus.Exiting) { - log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", selfAddress, member.address) - hasChangedState = true - member copy (status = MemberStatus.Removed) - } else member - } map { member ⇒ // ---------------------- // 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff) @@ -978,10 +995,12 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ if (member.status == MemberStatus.Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) { log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, member.address) hasChangedState = true + clusterCommandConnectionFor(member.address) ! ClusterLeaderAction.Exit(member.address) // FIXME should use ? to await completion of handoff? member copy (status = MemberStatus.Exiting) } else member } + localGossip copy (members = newMembers) // update gossip } else if (autoDown) { @@ -1045,7 +1064,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // First check that: // 1. we don't have any members that are unreachable (unreachable.isEmpty == true), or - // 2. all unreachable members in the set have status DOWN + // 2. all unreachable members in the set have status DOWN or REMOVED // Else we can't continue to check for convergence // When that is done we check that all the entries in the 'seen' table have the same vector clock version if (unreachable.isEmpty || !unreachable.exists { m ⇒ @@ -1055,8 +1074,10 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val seen = gossip.overview.seen val views = Set.empty[VectorClock] ++ seen.values + println("=======>>> VIEWS " + views.size) if (views.size == 1) { log.debug("Cluster Node [{}] - Cluster convergence reached", selfAddress) + println("=======>>> ----------------------- HAS CONVERGENCE") Some(gossip) } else None } else None @@ -1144,8 +1165,6 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ def down(address: String) = clusterNode.down(AddressFromURIString(address)) def remove(address: String) = clusterNode.remove(AddressFromURIString(address)) - - def shutdown() = clusterNode.shutdown() } log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", selfAddress, clusterMBeanName) try {