Fixed all issues from review. In particular fully separated state transformation and preparation for side-effecting processing.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2012-06-18 13:53:49 +02:00
parent c0dff0050b
commit 8b6652a794

View file

@ -770,19 +770,20 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
private[cluster] final def leaving(address: Address) {
val localState = state.get
val localGossip = localState.latestGossip
if (localGossip.members.exists(_.address == address)) { // only try to update if the node is available (in the member ring)
val newMembers = localGossip.members map { member if (member.address == address) Member(address, Leaving) else member } // mark node as LEAVING
val newGossip = localGossip copy (members = newMembers)
val newMembers = localGossip.members + Member(address, Leaving) // mark node as LEAVING
val newGossip = localGossip copy (members = newMembers)
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
val newState = localState copy (latestGossip = seenVersionedGossip)
val newState = localState copy (latestGossip = seenVersionedGossip)
if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update
else {
log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address)
notifyMembershipChangeListeners(localState, newState)
if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update
else {
log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address)
notifyMembershipChangeListeners(localState, newState)
}
}
}
@ -1082,115 +1083,126 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address)
// FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
def hasPartionHandoffCompletedSuccessfully(gossip: Gossip): Boolean = {
true
}
if (isLeader && isAvailable(localState)) {
// only run the leader actions if we are the LEADER and available
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
val hasPartionHandoffCompletedSuccessfully: Boolean = {
// FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
true
}
// Leader actions are as follows:
// 1. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table
// 2. Move JOINING => UP -- When a node joins the cluster
// 3. Move LEAVING => EXITING -- When all partition handoff has completed
// 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
// 5. Updating the vclock version for the changes
// 6. Updating the 'seen' table
// 5. Store away all stuff needed for the side-effecting processing in 10.
// 6. Updating the vclock version for the changes
// 7. Updating the 'seen' table
// 8. Try to update the state with the new gossip
// 9. If failure - retry
// 10. If success - run all the side-effecting processing
// store away removed and exiting members so we can separate the pure state changes (that can be retried on collision) and the side-effecting message sending
var removedMembers = Set.empty[Member]
var exitingMembers = Set.empty[Member]
var hasChangedState = false
val newGossip =
val (
newGossip: Gossip,
hasChangedState: Boolean,
upMembers: Set[Member],
exitingMembers: Set[Member],
removedMembers: Set[Member],
unreachableButNotDownedMembers: Set[Member]) =
if (convergence(localGossip).isDefined) {
// we have convergence - so we can't have unreachable nodes
// transform the node member ring - filterNot/map/map
val newMembers =
// ----------------------
// 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table
// ----------------------
localMembers filter { member
if (member.status == MemberStatus.Exiting) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - and removing node from node ring", selfAddress, member.address)
hasChangedState = true
removedMembers = removedMembers + member
false
} else true
localMembers filterNot { member
// ----------------------
// 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table
// ----------------------
member.status == MemberStatus.Exiting
} map { member
// ----------------------
// 2. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
// ----------------------
if (member.status == Joining) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address)
hasChangedState = true
member copy (status = Up)
} else member
if (member.status == Joining) member copy (status = Up)
else member
} map { member
// ----------------------
// 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
// ----------------------
if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, member.address)
hasChangedState = true
exitingMembers = exitingMembers + member
member copy (status = Exiting)
} else member
if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully) member copy (status = Exiting)
else member
}
// ----------------------
// 5. Store away all stuff needed for the side-effecting processing in 10.
// ----------------------
// Check for the need to do side-effecting on successful state change
// Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED
// to check for state-changes and to store away removed and exiting members for later notification
// 1. check for state-changes to update
// 2. store away removed and exiting members so we can separate the pure state changes (that can be retried on collision) and the side-effecting message sending
val (removedMembers, newMembers1) = localMembers partition (_.status == Exiting)
val (upMembers, newMembers2) = newMembers1 partition (_.status == Joining)
val (exitingMembers, newMembers3) = newMembers2 partition (_.status == Leaving && hasPartionHandoffCompletedSuccessfully)
val hasChangedState = removedMembers.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty
// removing REMOVED nodes from the 'seen' table
val newSeen = removedMembers.foldLeft(localSeen) { (seen, removed) seen - removed.address }
//val newSeen = removedMembers.foldLeft(localSeen) { (seen, removed) seen - removed.address }
val newSeen = localSeen -- removedMembers.map(_.address)
// removing REMOVED nodes from the 'unreachable' set
val newUnreachableMembers = removedMembers.foldLeft(localUnreachableMembers) { (unreachable, removed) unreachable - removed }
//val newUnreachableMembers = removedMembers.foldLeft(localUnreachableMembers) { (unreachable, removed) unreachable - removed }
val newUnreachableMembers = localUnreachableMembers -- removedMembers
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
localGossip copy (members = newMembers, overview = newOverview) // update gossip
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
(newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, Set.empty[Member])
} else if (AutoDown) {
// we don't have convergence - so we might have unreachable nodes
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
// ----------------------
// 4. Move UNREACHABLE => DOWN (auto-downing by leader)
// ----------------------
val newUnreachableMembers =
localUnreachableMembers.map { member
// no need to DOWN members already DOWN
if (member.status == Down) member
else {
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address)
hasChangedState = true
member copy (status = Down)
}
}
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
val newUnreachableMembers = localUnreachableMembers.map { member
// ----------------------
// 5. Move UNREACHABLE => DOWN (auto-downing by leader)
// ----------------------
if (member.status == Down) member // no need to DOWN members already DOWN
else member copy (status = Down)
}
// Check for the need to do side-effecting on successful state change
val (unreachableButNotDownedMembers, _) = localUnreachableMembers partition (_.status != Down)
// removing nodes marked as DOWN from the 'seen' table
val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down m.address }
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
localGossip copy (overview = newOverview) // update gossip
val newGossip = localGossip copy (overview = newOverview) // update gossip
} else localGossip
(newGossip, unreachableButNotDownedMembers.nonEmpty, Set.empty[Member], Set.empty[Member], Set.empty[Member], unreachableButNotDownedMembers)
} else (localGossip, false, Set.empty[Member], Set.empty[Member], Set.empty[Member], Set.empty[Member])
if (hasChangedState) { // we have a change of state - version it and try to update
// ----------------------
// 5. Updating the vclock version for the changes
// 6. Updating the vclock version for the changes
// ----------------------
val versionedGossip = newGossip :+ vclockNode
// ----------------------
// 6. Updating the 'seen' table
// 7. Updating the 'seen' table
// Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED
// ----------------------
val seenVersionedGossip =
@ -1199,27 +1211,49 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val newState = localState copy (latestGossip = seenVersionedGossip)
// if we won the race then update else try again
if (!state.compareAndSet(localState, newState)) leaderActions() // recur
else {
// do the side-effecting notifications on state-change success
// ----------------------
// 8. Try to update the state with the new gossip
// ----------------------
if (!state.compareAndSet(localState, newState)) {
if (removedMembers.exists(_.address == selfAddress)) {
// we now know that this node (the leader) is just about to shut down since it will be moved from EXITING -> REMOVED
// so now let's gossip out this information directly since there will not be any other chance
gossip()
}
// ----------------------
// 9. Failure - retry
// ----------------------
leaderActions() // recur
} else {
// ----------------------
// 10. Success - run all the side-effecting processing
// ----------------------
// if (removedMembers.exists(_.address == selfAddress)) {
// // we now know that this node (the leader) is just about to shut down since it will be moved from EXITING -> REMOVED
// // so now let's gossip out this information directly since there will not be any other chance
// gossip()
// }
// log the move of members from joining to up
upMembers foreach { member log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) }
// tell all removed members to remove and shut down themselves
removedMembers.map(_.address) foreach { address
removedMembers foreach { member
val address = member.address
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - and removing node from node ring", selfAddress, address)
clusterCommandConnectionFor(address) ! ClusterLeaderAction.Remove(address)
}
// tell all exiting members to exit
exitingMembers.map(_.address) foreach { address
exitingMembers foreach { member
val address = member.address
log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, address)
clusterCommandConnectionFor(address) ! ClusterLeaderAction.Exit(address) // FIXME should use ? to await completion of handoff?
}
// log the auto-downing of the unreachable nodes
unreachableButNotDownedMembers foreach { member
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address)
}
notifyMembershipChangeListeners(localState, newState)
}
}
@ -1273,13 +1307,13 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
private def isUnavailable(state: State): Boolean = {
val localGossip = state.latestGossip
val isUnreachable = localGossip.overview.unreachable exists { _.address == selfAddress }
val hasUnavailableMemberStatus = localGossip.members exists { _.status.isUnavailable }
val hasUnavailableMemberStatus = localGossip.members exists { m (m == self) && m.status.isUnavailable }
isUnreachable || hasUnavailableMemberStatus
}
private def notifyMembershipChangeListeners(oldState: State, newState: State): Unit = {
val oldMembersStatus = oldState.latestGossip.members.toSeq.map(m (m.address, m.status))
val newMembersStatus = newState.latestGossip.members.toSeq.map(m (m.address, m.status))
val oldMembersStatus = oldState.latestGossip.members.map(m (m.address, m.status))
val newMembersStatus = newState.latestGossip.members.map(m (m.address, m.status))
if (newMembersStatus != oldMembersStatus)
newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
}