Implemented/Fixed Cluster.remove() and state transition from LEAVING -> REMOVED.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2012-06-08 11:51:34 +02:00
parent 57fadc1f7d
commit 45b2484f62

View file

@ -50,7 +50,7 @@ sealed trait ClusterMessage extends Serializable
/**
* Cluster commands sent by the USER.
*/
object ClusterAction {
object ClusterUserAction {
/**
* Command to join the cluster. Sent when a node (reprsesented by 'address')
@ -72,6 +72,12 @@ object ClusterAction {
* Command to remove a node from the cluster immediately.
*/
case class Remove(address: Address) extends ClusterMessage
}
/**
* Cluster commands sent by the LEADER.
*/
object ClusterLeaderAction {
/**
* Command to mark a node to be removed from the cluster immediately.
@ -197,8 +203,8 @@ case class Gossip(
}
/**
* Marks the gossip as seen by this node (selfAddress) by updating the address entry in the 'gossip.overview.seen'
* Map with the VectorClock for the new gossip.
* Marks the gossip as seen by this node (address) by updating the address entry in the 'gossip.overview.seen'
* Map with the VectorClock (version) for the new gossip.
*/
def seen(address: Address): Gossip = {
if (overview.seen.contains(address) && overview.seen(address) == version) this
@ -253,7 +259,8 @@ case class Gossip(
* Instantiated as a single instance for each Cluster - e.g. commands are serialized to Cluster message after message.
*/
final class ClusterCommandDaemon extends Actor {
import ClusterAction._
import ClusterUserAction._
import ClusterLeaderAction._
val cluster = Cluster(context.system)
val log = Logging(context.system, this)
@ -331,8 +338,6 @@ trait ClusterNodeMBean {
def leave(address: String)
def down(address: String)
def remove(address: String)
def shutdown()
}
/**
@ -499,10 +504,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
/**
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
*
* INTERNAL API:
* Should not called by the user. The user can issue a LEAVE command which will tell the node
* to go through graceful handoff process LEAVE -> EXITING -> REMOVED -> SHUTDOWN.
*/
def shutdown(): Unit = {
private[akka] def shutdown(): Unit = {
if (isRunning.compareAndSet(true, false)) {
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
log.info("Cluster Node [{}] - Shutting down cluster node...", selfAddress)
gossipCanceller.cancel()
failureDetectorReaperCanceller.cancel()
leaderActionsCanceller.cancel()
@ -512,6 +521,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
} catch {
case e: InstanceNotFoundException // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
}
log.info("Cluster Node [{}] - Cluster node successfully shut down", selfAddress)
}
}
@ -543,7 +553,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
*/
def join(address: Address): Unit = {
val connection = clusterCommandConnectionFor(address)
val command = ClusterAction.Join(selfAddress)
val command = ClusterUserAction.Join(selfAddress)
log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", selfAddress, address, connection)
connection ! command
}
@ -552,21 +562,21 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
* Send command to issue state transition to LEAVING for the node specified by 'address'.
*/
def leave(address: Address): Unit = {
clusterCommandDaemon ! ClusterAction.Leave(address)
clusterCommandDaemon ! ClusterUserAction.Leave(address)
}
/**
* Send command to issue state transition to from DOWN to EXITING for the node specified by 'address'.
* Send command to DOWN the node specified by 'address'.
*/
def down(address: Address): Unit = {
clusterCommandDaemon ! ClusterAction.Down(address)
clusterCommandDaemon ! ClusterUserAction.Down(address)
}
/**
* Send command to issue state transition to REMOVED for the node specified by 'address'.
* Send command to REMOVE the node specified by 'address'.
*/
def remove(address: Address): Unit = {
clusterCommandDaemon ! ClusterAction.Remove(address)
clusterCommandDaemon ! ClusterUserAction.Remove(address)
}
// ========================================================
@ -642,13 +652,15 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
*/
private[cluster] final def exiting(address: Address): Unit = {
log.info("Cluster Node [{}] - Marking node [{}] as EXITING", selfAddress, address)
// FIXME implement when we implement hand-off
}
/**
* State transition to REMOVED.
*/
private[cluster] final def removing(address: Address): Unit = {
log.info("Cluster Node [{}] - Marking node [{}] as REMOVED", selfAddress, address)
log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
shutdown()
}
/**
@ -727,6 +739,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val winningGossip =
if (remoteGossip.version <> localGossip.version) {
// concurrent
println("=======>>> CONCURRENT")
val mergedGossip = remoteGossip merge localGossip
val versionedMergedGossip = mergedGossip + vclockNode
@ -737,20 +750,23 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
versionedMergedGossip
} else if (remoteGossip.version < localGossip.version) {
println("=======>>> LOCAL")
// local gossip is newer
localGossip
} else {
println("=======>>> REMOTE")
// remote gossip is newer
remoteGossip
}
println("=======>>> WINNING " + winningGossip.members.mkString(", "))
val newState = localState copy (latestGossip = winningGossip seen selfAddress)
// if we won the race then update else try again
if (!state.compareAndSet(localState, newState)) receive(sender, remoteGossip) // recur if we fail the update
else {
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, sender.address)
log.info("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, sender.address)
if (sender.address != selfAddress) failureDetector heartbeat sender.address
@ -772,8 +788,8 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
* @param oldState the state to change the member status in
* @return the updated new state with the new member status
*/
private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = {
log.info("Cluster Node [{}] - Switching membership status to [{}]", selfAddress, newStatus)
private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = { // TODO: Removed this method? Currently not used.
log.debug("Cluster Node [{}] - Switching membership status to [{}]", selfAddress, newStatus)
val localSelf = self
@ -789,7 +805,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
else member
}
// ugly crap to work around bug in scala colletions ('val ss: SortedSet[Member] = SortedSet.empty[Member] ++ aSet' does not compile)
// NOTE: ugly crap to work around bug in scala colletions ('val ss: SortedSet[Member] = SortedSet.empty[Member] ++ aSet' does not compile)
val newMembersSortedSet = SortedSet[Member](newMembersSet.toList: _*)
val newGossip = localGossip copy (members = newMembersSortedSet)
@ -936,8 +952,8 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val localUnreachableMembers = localOverview.unreachable
// Leader actions are as follows:
// 1. Move JOINING => UP -- When a node joins the cluster
// 2. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence)
// 1. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring
// 2. Move JOINING => UP -- When a node joins the cluster
// 3. Move LEAVING => EXITING -- When all partition handoff has completed
// 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
// 5. Updating the vclock version for the changes
@ -951,9 +967,20 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val newMembers =
localMembers map { member
// ----------------------
// 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring
// ----------------------
localMembers filter { member
if (member.status == MemberStatus.Exiting) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - Removing node from node ring", selfAddress, member.address)
hasChangedState = true
clusterCommandConnectionFor(member.address) ! ClusterUserAction.Remove(member.address) // tell the removed node to shut himself down
false
} else true
} map { member
// ----------------------
// 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
// 2. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
// ----------------------
if (member.status == MemberStatus.Joining) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address)
@ -961,16 +988,6 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
member copy (status = MemberStatus.Up)
} else member
} map { member
// ----------------------
// 2. Move EXITING => REMOVED (once all nodes have seen that this node is EXITING e.g. we have a convergence)
// ----------------------
if (member.status == MemberStatus.Exiting) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", selfAddress, member.address)
hasChangedState = true
member copy (status = MemberStatus.Removed)
} else member
} map { member
// ----------------------
// 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
@ -978,10 +995,12 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
if (member.status == MemberStatus.Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, member.address)
hasChangedState = true
clusterCommandConnectionFor(member.address) ! ClusterLeaderAction.Exit(member.address) // FIXME should use ? to await completion of handoff?
member copy (status = MemberStatus.Exiting)
} else member
}
localGossip copy (members = newMembers) // update gossip
} else if (autoDown) {
@ -1045,7 +1064,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
// First check that:
// 1. we don't have any members that are unreachable (unreachable.isEmpty == true), or
// 2. all unreachable members in the set have status DOWN
// 2. all unreachable members in the set have status DOWN or REMOVED
// Else we can't continue to check for convergence
// When that is done we check that all the entries in the 'seen' table have the same vector clock version
if (unreachable.isEmpty || !unreachable.exists { m
@ -1055,8 +1074,10 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val seen = gossip.overview.seen
val views = Set.empty[VectorClock] ++ seen.values
println("=======>>> VIEWS " + views.size)
if (views.size == 1) {
log.debug("Cluster Node [{}] - Cluster convergence reached", selfAddress)
println("=======>>> ----------------------- HAS CONVERGENCE")
Some(gossip)
} else None
} else None
@ -1144,8 +1165,6 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
def down(address: String) = clusterNode.down(AddressFromURIString(address))
def remove(address: String) = clusterNode.remove(AddressFromURIString(address))
def shutdown() = clusterNode.shutdown()
}
log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", selfAddress, clusterMBeanName)
try {