Added implementation of the LEAVE command for a cluster node.

* Added implementation of the LEAVE command for a cluster node
* Changed the meaning of Member.isUnavailable to only DOWN and REMOVED
* Removed EXIT and UP as user commands
* Fixed Cluster.self to fall back to checking for itself in the gossip.overview.unreachable set.
* Added Leader action transitioning from LEAVING -> EXITING
* Improved comments

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2012-06-01 16:49:50 +02:00
parent 3b06c0a7c2
commit 0449f85a86
3 changed files with 88 additions and 42 deletions

View file

@ -58,11 +58,6 @@ object ClusterAction {
*/ */
case class Join(address: Address) extends ClusterMessage case class Join(address: Address) extends ClusterMessage
/**
* Command to set a node to Up (from Joining).
*/
case class Up(address: Address) extends ClusterMessage
/** /**
* Command to leave the cluster. * Command to leave the cluster.
*/ */
@ -73,15 +68,16 @@ object ClusterAction {
*/ */
case class Down(address: Address) extends ClusterMessage case class Down(address: Address) extends ClusterMessage
/**
* Command to mark a node to be removed from the cluster immediately.
*/
case class Exit(address: Address) extends ClusterMessage
/** /**
* Command to remove a node from the cluster immediately. * Command to remove a node from the cluster immediately.
*/ */
case class Remove(address: Address) extends ClusterMessage case class Remove(address: Address) extends ClusterMessage
/**
* Command to mark a node to be removed from the cluster immediately.
* Can only be sent by the leader.
*/
private[akka] case class Exit(address: Address) extends ClusterMessage
} }
/** /**
@ -158,12 +154,10 @@ object MemberStatus {
case object Down extends MemberStatus case object Down extends MemberStatus
case object Removed extends MemberStatus case object Removed extends MemberStatus
def isUnavailable(status: MemberStatus): Boolean = { /**
status == MemberStatus.Down || * Using the same notion for 'unavailable' as 'non-convergence': DOWN and REMOVED.
status == MemberStatus.Exiting || */
status == MemberStatus.Removed || def isUnavailable(status: MemberStatus): Boolean = status == MemberStatus.Down || status == MemberStatus.Removed
status == MemberStatus.Leaving
}
} }
/** /**
@ -266,7 +260,6 @@ final class ClusterCommandDaemon extends Actor {
def receive = { def receive = {
case Join(address) cluster.joining(address) case Join(address) cluster.joining(address)
case Up(address) cluster.up(address)
case Down(address) cluster.downing(address) case Down(address) cluster.downing(address)
case Leave(address) cluster.leaving(address) case Leave(address) cluster.leaving(address)
case Exit(address) cluster.exiting(address) case Exit(address) cluster.exiting(address)
@ -453,9 +446,16 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
// ===================== PUBLIC API ===================== // ===================== PUBLIC API =====================
// ====================================================== // ======================================================
def self: Member = latestGossip.members def self: Member = {
val gossip = latestGossip
gossip.members
.find(_.address == selfAddress) .find(_.address == selfAddress)
.getOrElse(throw new IllegalStateException("Can't find 'this' Member (" + selfAddress + ") in the cluster membership ring")) .getOrElse {
gossip.overview.unreachable
.find(_.address == selfAddress)
.getOrElse(throw new IllegalStateException("Can't find 'this' Member [" + selfAddress + "] in the cluster membership ring or in the unreachable set"))
}
}
/** /**
* Latest gossip. * Latest gossip.
@ -609,18 +609,32 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
} }
} }
/**
* State transition to UP.
*/
private[cluster] final def up(address: Address): Unit = {
log.info("Cluster Node [{}] - Marking node [{}] as UP", selfAddress, address)
}
/** /**
* State transition to LEAVING. * State transition to LEAVING.
*/ */
private[cluster] final def leaving(address: Address): Unit = { @tailrec
log.info("Cluster Node [{}] - Marking node [{}] as LEAVING", selfAddress, address) private[cluster] final def leaving(address: Address) {
log.info("Cluster Node [{}] - Marking address [{}] as LEAVING", selfAddress, address)
val localState = state.get
val localGossip = localState.latestGossip
val localMembers = localGossip.members
val newMembers = localMembers + Member(address, MemberStatus.Leaving) // mark node as LEAVING
val newGossip = localGossip copy (members = newMembers)
val versionedGossip = newGossip + vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
val newState = localState copy (latestGossip = seenVersionedGossip)
if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update
else {
failureDetector heartbeat address // update heartbeat in failure detector
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
}
}
} }
/** /**
@ -909,6 +923,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address) val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address)
// FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
def hasPartionHandoffCompletedSuccessfully(gossip: Gossip): Boolean = {
true
}
if (isLeader && isAvailable(localState)) { if (isLeader && isAvailable(localState)) {
// only run the leader actions if we are the LEADER and available // only run the leader actions if we are the LEADER and available
@ -917,11 +936,12 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val localUnreachableMembers = localOverview.unreachable val localUnreachableMembers = localOverview.unreachable
// Leader actions are as follows: // Leader actions are as follows:
// 1. Move JOINING => UP // 1. Move JOINING => UP -- When a node joins the cluster
// 2. Move EXITING => REMOVED // 2. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence)
// 3. Move UNREACHABLE => DOWN (auto-downing by leader) // 3. Move LEAVING => EXITING -- When all partition handoff has completed
// 4. Updating the vclock version for the changes // 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
// 5. Updating the 'seen' table // 5. Updating the vclock version for the changes
// 6. Updating the 'seen' table
var hasChangedState = false var hasChangedState = false
val newGossip = val newGossip =
@ -930,20 +950,37 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
// we have convergence - so we can't have unreachable nodes // we have convergence - so we can't have unreachable nodes
val newMembers = val newMembers =
localMembers map { member localMembers map { member
// 1. Move JOINING => UP // ----------------------
// 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
// ----------------------
if (member.status == MemberStatus.Joining) { if (member.status == MemberStatus.Joining) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address)
hasChangedState = true hasChangedState = true
member copy (status = MemberStatus.Up) member copy (status = MemberStatus.Up)
} else member } else member
} map { member } map { member
// 2. Move EXITING => REMOVED // ----------------------
// 2. Move EXITING => REMOVED (once all nodes have seen that this node is EXITING e.g. we have a convergence)
// ----------------------
if (member.status == MemberStatus.Exiting) { if (member.status == MemberStatus.Exiting) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", selfAddress, member.address) log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", selfAddress, member.address)
hasChangedState = true hasChangedState = true
member copy (status = MemberStatus.Removed) member copy (status = MemberStatus.Removed)
} else member } else member
} map { member
// ----------------------
// 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
// ----------------------
if (member.status == MemberStatus.Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, member.address)
hasChangedState = true
member copy (status = MemberStatus.Exiting)
} else member
} }
localGossip copy (members = newMembers) // update gossip localGossip copy (members = newMembers) // update gossip
@ -951,7 +988,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
// we don't have convergence - so we might have unreachable nodes // we don't have convergence - so we might have unreachable nodes
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes // if 'auto-down' is turned on, then try to auto-down any unreachable nodes
// 3. Move UNREACHABLE => DOWN (auto-downing by leader) // ----------------------
// 4. Move UNREACHABLE => DOWN (auto-downing by leader)
// ----------------------
val newUnreachableMembers = val newUnreachableMembers =
localUnreachableMembers localUnreachableMembers
.filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN .filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN
@ -971,10 +1010,14 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
if (hasChangedState) { // we have a change of state - version it and try to update if (hasChangedState) { // we have a change of state - version it and try to update
// 4. Updating the vclock version for the changes // ----------------------
// 5. Updating the vclock version for the changes
// ----------------------
val versionedGossip = newGossip + vclockNode val versionedGossip = newGossip + vclockNode
// 5. Updating the 'seen' table // ----------------------
// 6. Updating the 'seen' table
// ----------------------
val seenVersionedGossip = versionedGossip seen selfAddress val seenVersionedGossip = versionedGossip seen selfAddress
val newState = localState copy (latestGossip = seenVersionedGossip) val newState = localState copy (latestGossip = seenVersionedGossip)
@ -1005,7 +1048,10 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
// 2. all unreachable members in the set have status DOWN // 2. all unreachable members in the set have status DOWN
// Else we can't continue to check for convergence // Else we can't continue to check for convergence
// When that is done we check that all the entries in the 'seen' table have the same vector clock version // When that is done we check that all the entries in the 'seen' table have the same vector clock version
if (unreachable.isEmpty || !unreachable.exists(m (m.status != MemberStatus.Down) && (m.status != MemberStatus.Removed))) { if (unreachable.isEmpty || !unreachable.exists { m
m.status != MemberStatus.Down &&
m.status != MemberStatus.Removed
}) {
val seen = gossip.overview.seen val seen = gossip.overview.seen
val views = Set.empty[VectorClock] ++ seen.values val views = Set.empty[VectorClock] ++ seen.values

View file

@ -18,7 +18,6 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig {
val c2 = role("c2") val c2 = role("c2")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
} }
class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec

View file

@ -5,7 +5,8 @@
Cluster Specification Cluster Specification
###################### ######################
.. note:: *This document describes the new clustering coming in Akka 2.1 (not 2.0)* .. note:: *This document describes the new clustering coming in Akka Coltrane and
is not available in the latest stable release)*
Intro Intro
===== =====