diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index b233b9bfbf..528672363d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -770,19 +770,20 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) private[cluster] final def leaving(address: Address) { val localState = state.get val localGossip = localState.latestGossip + if (localGossip.members.exists(_.address == address)) { // only try to update if the node is available (in the member ring) + val newMembers = localGossip.members map { member ⇒ if (member.address == address) Member(address, Leaving) else member } // mark node as LEAVING + val newGossip = localGossip copy (members = newMembers) - val newMembers = localGossip.members + Member(address, Leaving) // mark node as LEAVING - val newGossip = localGossip copy (members = newMembers) + val versionedGossip = newGossip :+ vclockNode + val seenVersionedGossip = versionedGossip seen selfAddress - val versionedGossip = newGossip :+ vclockNode - val seenVersionedGossip = versionedGossip seen selfAddress + val newState = localState copy (latestGossip = seenVersionedGossip) - val newState = localState copy (latestGossip = seenVersionedGossip) - - if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update - else { - log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address) - notifyMembershipChangeListeners(localState, newState) + if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update + else { + log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address) + notifyMembershipChangeListeners(localState, newState) + } } } @@ -1082,115 +1083,126 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address) - // FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully - def hasPartionHandoffCompletedSuccessfully(gossip: Gossip): Boolean = { - true - } - if (isLeader && isAvailable(localState)) { // only run the leader actions if we are the LEADER and available val localOverview = localGossip.overview val localSeen = localOverview.seen val localUnreachableMembers = localOverview.unreachable + val hasPartionHandoffCompletedSuccessfully: Boolean = { + // FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully + true + } // Leader actions are as follows: // 1. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table // 2. Move JOINING => UP -- When a node joins the cluster // 3. Move LEAVING => EXITING -- When all partition handoff has completed // 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader - // 5. Updating the vclock version for the changes - // 6. Updating the 'seen' table + // 5. Store away all stuff needed for the side-effecting processing in 10. + // 6. Updating the vclock version for the changes + // 7. Updating the 'seen' table + // 8. Try to update the state with the new gossip + // 9. If failure - retry + // 10. If success - run all the side-effecting processing - // store away removed and exiting members so we can separate the pure state changes (that can be retried on collision) and the side-effecting message sending - var removedMembers = Set.empty[Member] - var exitingMembers = Set.empty[Member] - - var hasChangedState = false - val newGossip = + val ( + newGossip: Gossip, + hasChangedState: Boolean, + upMembers: Set[Member], + exitingMembers: Set[Member], + removedMembers: Set[Member], + unreachableButNotDownedMembers: Set[Member]) = if (convergence(localGossip).isDefined) { // we have convergence - so we can't have unreachable nodes + // transform the node member ring - filterNot/map/map val newMembers = - // ---------------------- - // 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table - // ---------------------- - localMembers filter { member ⇒ - if (member.status == MemberStatus.Exiting) { - log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - and removing node from node ring", selfAddress, member.address) - hasChangedState = true - removedMembers = removedMembers + member - false - } else true + localMembers filterNot { member ⇒ + // ---------------------- + // 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table + // ---------------------- + member.status == MemberStatus.Exiting } map { member ⇒ // ---------------------- // 2. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence) // ---------------------- - if (member.status == Joining) { - log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) - hasChangedState = true - member copy (status = Up) - } else member + if (member.status == Joining) member copy (status = Up) + else member } map { member ⇒ // ---------------------- // 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff) // ---------------------- - if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) { - log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, member.address) - hasChangedState = true - exitingMembers = exitingMembers + member - member copy (status = Exiting) - } else member - + if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully) member copy (status = Exiting) + else member } + // ---------------------- + // 5. Store away all stuff needed for the side-effecting processing in 10. + // ---------------------- + + // Check for the need to do side-effecting on successful state change + // Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED + // to check for state-changes and to store away removed and exiting members for later notification + // 1. check for state-changes to update + // 2. store away removed and exiting members so we can separate the pure state changes (that can be retried on collision) and the side-effecting message sending + val (removedMembers, newMembers1) = localMembers partition (_.status == Exiting) + + val (upMembers, newMembers2) = newMembers1 partition (_.status == Joining) + + val (exitingMembers, newMembers3) = newMembers2 partition (_.status == Leaving && hasPartionHandoffCompletedSuccessfully) + + val hasChangedState = removedMembers.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty + // removing REMOVED nodes from the 'seen' table - val newSeen = removedMembers.foldLeft(localSeen) { (seen, removed) ⇒ seen - removed.address } + //val newSeen = removedMembers.foldLeft(localSeen) { (seen, removed) ⇒ seen - removed.address } + val newSeen = localSeen -- removedMembers.map(_.address) // removing REMOVED nodes from the 'unreachable' set - val newUnreachableMembers = removedMembers.foldLeft(localUnreachableMembers) { (unreachable, removed) ⇒ unreachable - removed } + //val newUnreachableMembers = removedMembers.foldLeft(localUnreachableMembers) { (unreachable, removed) ⇒ unreachable - removed } + val newUnreachableMembers = localUnreachableMembers -- removedMembers val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview - localGossip copy (members = newMembers, overview = newOverview) // update gossip + val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip + + (newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, Set.empty[Member]) } else if (AutoDown) { // we don't have convergence - so we might have unreachable nodes - // if 'auto-down' is turned on, then try to auto-down any unreachable nodes - // ---------------------- - // 4. Move UNREACHABLE => DOWN (auto-downing by leader) - // ---------------------- - val newUnreachableMembers = - localUnreachableMembers.map { member ⇒ - // no need to DOWN members already DOWN - if (member.status == Down) member - else { - log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address) - hasChangedState = true - member copy (status = Down) - } - } + // if 'auto-down' is turned on, then try to auto-down any unreachable nodes + val newUnreachableMembers = localUnreachableMembers.map { member ⇒ + // ---------------------- + // 5. Move UNREACHABLE => DOWN (auto-downing by leader) + // ---------------------- + if (member.status == Down) member // no need to DOWN members already DOWN + else member copy (status = Down) + } + + // Check for the need to do side-effecting on successful state change + val (unreachableButNotDownedMembers, _) = localUnreachableMembers partition (_.status != Down) // removing nodes marked as DOWN from the 'seen' table val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down ⇒ m.address } val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview - localGossip copy (overview = newOverview) // update gossip + val newGossip = localGossip copy (overview = newOverview) // update gossip - } else localGossip + (newGossip, unreachableButNotDownedMembers.nonEmpty, Set.empty[Member], Set.empty[Member], Set.empty[Member], unreachableButNotDownedMembers) + + } else (localGossip, false, Set.empty[Member], Set.empty[Member], Set.empty[Member], Set.empty[Member]) if (hasChangedState) { // we have a change of state - version it and try to update - // ---------------------- - // 5. Updating the vclock version for the changes + // 6. Updating the vclock version for the changes // ---------------------- val versionedGossip = newGossip :+ vclockNode // ---------------------- - // 6. Updating the 'seen' table + // 7. Updating the 'seen' table // Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED // ---------------------- val seenVersionedGossip = @@ -1199,27 +1211,49 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newState = localState copy (latestGossip = seenVersionedGossip) - // if we won the race then update else try again - if (!state.compareAndSet(localState, newState)) leaderActions() // recur - else { - // do the side-effecting notifications on state-change success + // ---------------------- + // 8. Try to update the state with the new gossip + // ---------------------- + if (!state.compareAndSet(localState, newState)) { - if (removedMembers.exists(_.address == selfAddress)) { - // we now know that this node (the leader) is just about to shut down since it will be moved from EXITING -> REMOVED - // so now let's gossip out this information directly since there will not be any other chance - gossip() - } + // ---------------------- + // 9. Failure - retry + // ---------------------- + leaderActions() // recur + + } else { + // ---------------------- + // 10. Success - run all the side-effecting processing + // ---------------------- + + // if (removedMembers.exists(_.address == selfAddress)) { + // // we now know that this node (the leader) is just about to shut down since it will be moved from EXITING -> REMOVED + // // so now let's gossip out this information directly since there will not be any other chance + // gossip() + // } + + // log the move of members from joining to up + upMembers foreach { member ⇒ log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) } // tell all removed members to remove and shut down themselves - removedMembers.map(_.address) foreach { address ⇒ + removedMembers foreach { member ⇒ + val address = member.address + log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - and removing node from node ring", selfAddress, address) clusterCommandConnectionFor(address) ! ClusterLeaderAction.Remove(address) } // tell all exiting members to exit - exitingMembers.map(_.address) foreach { address ⇒ + exitingMembers foreach { member ⇒ + val address = member.address + log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, address) clusterCommandConnectionFor(address) ! ClusterLeaderAction.Exit(address) // FIXME should use ? to await completion of handoff? } + // log the auto-downing of the unreachable nodes + unreachableButNotDownedMembers foreach { member ⇒ + log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address) + } + notifyMembershipChangeListeners(localState, newState) } } @@ -1273,13 +1307,13 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) private def isUnavailable(state: State): Boolean = { val localGossip = state.latestGossip val isUnreachable = localGossip.overview.unreachable exists { _.address == selfAddress } - val hasUnavailableMemberStatus = localGossip.members exists { _.status.isUnavailable } + val hasUnavailableMemberStatus = localGossip.members exists { m ⇒ (m == self) && m.status.isUnavailable } isUnreachable || hasUnavailableMemberStatus } private def notifyMembershipChangeListeners(oldState: State, newState: State): Unit = { - val oldMembersStatus = oldState.latestGossip.members.toSeq.map(m ⇒ (m.address, m.status)) - val newMembersStatus = newState.latestGossip.members.toSeq.map(m ⇒ (m.address, m.status)) + val oldMembersStatus = oldState.latestGossip.members.map(m ⇒ (m.address, m.status)) + val newMembersStatus = newState.latestGossip.members.map(m ⇒ (m.address, m.status)) if (newMembersStatus != oldMembersStatus) newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members } }