Hardening of cluster member leaving path, see #3309

* Removed leader commands for Shutdown and Exit
* Member shutdown itself  when it sees itself as Exiting
* Singleton cluster with status Exiting will shutdown itself,
  in case the Exiting gossip never arrives
* Exiting member not part convergence check
* Exiting member is removed by leader (on convergence) when the
  exiting member is in the unreachable set, i.e. sucessfully shutdown
* Reverted the change made for #3266, i.e. Exiting is
  detected as unreachable again.
* Adjust ClusterSingletonManager to new Exiting behaviour
* Fix bug in HeartbeatSender, which caused it to continue to
  send heartbeats to removed nodes, instead of rebalancing
* Refactoring of leaderActions method
* Leaving section in docs
This commit is contained in:
Patrik Nordwall 2013-05-09 09:49:59 +02:00
parent 85954621ef
commit a0a0f39613
20 changed files with 338 additions and 340 deletions

View file

@ -148,29 +148,6 @@ private[cluster] object InternalClusterAction {
case class PublishEvent(event: ClusterDomainEvent) extends PublishMessage
}
/**
* INTERNAL API.
*
* Cluster commands sent by the LEADER.
*/
private[cluster] object ClusterLeaderAction {
/**
* Command to mark a node to be removed from the cluster immediately.
* Can only be sent by the leader.
* @param node the node to exit, i.e. destination of the message
*/
@SerialVersionUID(1L)
case class Exit(node: UniqueAddress) extends ClusterMessage
/**
* Command to remove a node from the cluster immediately.
* @param node the node to shutdown, i.e. destination of the message
*/
@SerialVersionUID(1L)
case class Shutdown(node: UniqueAddress) extends ClusterMessage
}
/**
* INTERNAL API.
*
@ -239,13 +216,14 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi
*/
private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import ClusterLeaderAction._
import InternalClusterAction._
val cluster = Cluster(context.system)
import cluster.{ selfAddress, selfUniqueAddress, scheduler, failureDetector }
import cluster.settings._
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3
def vclockName(node: UniqueAddress): String = node.address + "-" + node.uid
val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress))
@ -263,7 +241,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
private def clusterCore(address: Address): ActorSelection =
context.actorSelection(RootActorPath(address) / "system" / "cluster" / "core" / "daemon")
val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender].
context.actorOf(Props[ClusterHeartbeatSender].
withDispatcher(UseDispatcher), name = "heartbeatSender")
import context.dispatcher
@ -334,8 +312,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
case Join(node, roles) joining(node, roles)
case ClusterUserAction.Down(address) downing(address)
case ClusterUserAction.Leave(address) leaving(address)
case Exit(node) exiting(node)
case Shutdown(node) shutdown(node)
case SendGossipTo(address) sendGossipTo(address)
case msg: SubscriptionMessage publisher forward msg
case ClusterUserAction.JoinTo(address)
@ -451,10 +427,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val newMembers = localMembers + Member(node, roles) + Member(selfUniqueAddress, cluster.selfRoles)
val newGossip = latestGossip copy (members = newMembers)
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfUniqueAddress
latestGossip = seenVersionedGossip
updateLatestGossip(newGossip)
log.info("Cluster Node [{}] - Node [{}] is JOINING, roles [{}]", selfAddress, node.address, roles.mkString(", "))
if (node != selfUniqueAddress) {
@ -494,10 +467,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val newMembers = latestGossip.members map { m if (m.address == address) m.copy(status = Leaving) else m } // mark node as LEAVING
val newGossip = latestGossip copy (members = newMembers)
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfUniqueAddress
latestGossip = seenVersionedGossip
updateLatestGossip(newGossip)
log.info("Cluster Node [{}] - Marked address [{}] as [{}]", selfAddress, address, Leaving)
publish(latestGossip)
@ -505,23 +475,12 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
}
/**
* State transition to EXITING.
* This method is called when a member sees itself as Exiting.
*/
def exiting(node: UniqueAddress): Unit =
if (node == selfUniqueAddress) {
log.info("Cluster Node [{}] - Marked as [{}]", selfAddress, Exiting)
// TODO implement when we need hand-off
}
/**
* This method is only called after the LEADER has sent a Shutdown message - telling the node
* to shut down himself.
*/
def shutdown(node: UniqueAddress): Unit =
if (node == selfUniqueAddress) {
log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
cluster.shutdown()
}
def shutdown(): Unit = {
log.info("Cluster Node [{}] - Node shutting down...", latestGossip.member(selfUniqueAddress))
cluster.shutdown()
}
/**
* State transition to DOW.
@ -568,8 +527,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// update gossip overview
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip
val versionedGossip = newGossip :+ vclockNode
latestGossip = versionedGossip seen selfUniqueAddress
updateLatestGossip(newGossip)
publish(latestGossip)
}
@ -645,7 +603,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
stats = stats.incrementReceivedGossipCount
publish(latestGossip)
if (talkback) {
if (latestGossip.member(selfUniqueAddress).status == Exiting)
shutdown()
else if (talkback) {
// send back gossip to sender when sender had different view, i.e. merge, or sender had
// older or sender had newer
gossipTo(from, sender)
@ -698,189 +658,154 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
* Runs periodic leader actions, such as member status transitions, auto-downing unreachable nodes,
* assigning partitions etc.
*/
def leaderActions(): Unit = {
val localGossip = latestGossip
val localMembers = localGossip.members
val isLeader = localGossip.isLeader(selfUniqueAddress)
if (isLeader && isAvailable) {
def leaderActions(): Unit =
if (latestGossip.isLeader(selfUniqueAddress) && isAvailable) {
// only run the leader actions if we are the LEADER and available
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
val hasPartionHandoffCompletedSuccessfully: Boolean = {
// FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
true
if (AutoDown)
leaderAutoDownActions()
if (latestGossip.convergence)
leaderActionsOnConvergence()
}
/**
* Leader actions are as follows:
* 1. Move JOINING => UP -- When a node joins the cluster
* 2. Move LEAVING => EXITING -- When all partition handoff has completed
* 3. Non-exiting remain -- When all partition handoff has completed
* 4. Move unreachable EXITING => REMOVED -- When all nodes have seen the EXITING node as unreachable (convergence) -
* remove the node from the node ring and seen table
* 5. Move unreachable DOWN/EXITING => REMOVED -- When all nodes have seen that the node is DOWN/EXITING (convergence) -
* remove the node from the node ring and seen table
* 7. Updating the vclock version for the changes
* 8. Updating the `seen` table
* 9. Update the state with the new gossip
*/
def leaderActionsOnConvergence(): Unit = {
val localGossip = latestGossip
val localMembers = localGossip.members
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
val hasPartionHandoffCompletedSuccessfully: Boolean = {
// FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
true
}
def enoughMembers: Boolean = {
localMembers.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall {
case (role, threshold) localMembers.count(_.hasRole(role)) >= threshold
}
}
def isJoiningToUp(m: Member): Boolean = m.status == Joining && enoughMembers
val (removedUnreachable, newUnreachable) = localUnreachableMembers partition { m
Gossip.removeUnreachableWithMemberStatus(m.status)
}
val changedMembers = localMembers collect {
var upNumber = 0
{
case m if isJoiningToUp(m)
// Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
// and minimum number of nodes have joined the cluster
if (upNumber == 0) {
// It is alright to use same upNumber as already used by a removed member, since the upNumber
// is only used for comparing age of current cluster members (Member.isOlderThan)
val youngest = localGossip.youngestMember
upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber)
} else {
upNumber += 1
}
m.copyUp(upNumber)
case m if m.status == Leaving && hasPartionHandoffCompletedSuccessfully
// Move LEAVING => EXITING (once we have a convergence on LEAVING
// *and* if we have a successful partition handoff)
m copy (status = Exiting)
}
}
if (removedUnreachable.nonEmpty || changedMembers.nonEmpty) {
// handle changes
// replace changed members
val newMembers = localMembers -- changedMembers ++ changedMembers
// removing REMOVED nodes from the `seen` table
val newSeen = localSeen -- removedUnreachable.map(_.uniqueAddress)
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachable) // update gossip overview
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
updateLatestGossip(newGossip)
// log status changes
changedMembers foreach { m
log.info("Cluster Node [{}] - Leader is moving node [{}] to [{}]",
selfAddress, m.address, m.status)
}
// Leader actions are as follows:
// 1. Move JOINING => UP -- When a node joins the cluster
// 2. Move LEAVING => EXITING -- When all partition handoff has completed
// 3. Non-exiting remain -- When all partition handoff has completed
// 4. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table
// 5. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
// 6. Move DOWN => REMOVED -- When all nodes have seen that the node is DOWN (convergence) - remove the nodes from the node ring and seen table
// 7. Updating the vclock version for the changes
// 8. Updating the `seen` table
// 9. Try to update the state with the new gossip
// 10. If success - run all the side-effecting processing
val (
newGossip: Gossip,
hasChangedState: Boolean,
upMembers,
exitingMembers,
removedMembers,
removedUnreachableMembers,
unreachableButNotDownedMembers) =
if (localGossip.convergence) {
// we have convergence - so we can't have unreachable nodes
def enoughMembers: Boolean = {
localMembers.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall {
case (role, threshold) localMembers.count(_.hasRole(role)) >= threshold
}
}
def isJoiningToUp(m: Member): Boolean = m.status == Joining && enoughMembers
// transform the node member ring
val newMembers = localMembers collect {
var upNumber = 0
{
// Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
// and minimum number of nodes have joined the cluster
case member if isJoiningToUp(member)
if (upNumber == 0) {
// It is alright to use same upNumber as already used by a removed member, since the upNumber
// is only used for comparing age of current cluster members (Member.isOlderThan)
val youngest = localGossip.youngestMember
upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber)
} else {
upNumber += 1
}
member.copyUp(upNumber)
// Move LEAVING => EXITING (once we have a convergence on LEAVING
// *and* if we have a successful partition handoff)
case member if member.status == Leaving && hasPartionHandoffCompletedSuccessfully
member copy (status = Exiting)
// Everyone else that is not Exiting stays as they are
case member if member.status != Exiting && member.status != Down member
// Move EXITING => REMOVED, DOWN => REMOVED - i.e. remove the nodes from the `members` set/node ring and seen table
}
}
// ----------------------
// Store away all stuff needed for the side-effecting processing
// ----------------------
// Check for the need to do side-effecting on successful state change
// Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED, DOWN -> REMOVED
// to check for state-changes and to store away removed and exiting members for later notification
// 1. check for state-changes to update
// 2. store away removed and exiting members so we can separate the pure state changes
val (removedMembers, newMembers1) = localMembers partition (m m.status == Exiting || m.status == Down)
val (removedUnreachable, newUnreachable) = localUnreachableMembers partition (_.status == Down)
val (upMembers, newMembers2) = newMembers1 partition (isJoiningToUp(_))
val exitingMembers = newMembers2 filter (_.status == Leaving && hasPartionHandoffCompletedSuccessfully)
val hasChangedState = removedMembers.nonEmpty || removedUnreachable.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty
// removing REMOVED nodes from the `seen` table
val newSeen = localSeen -- removedMembers.map(_.uniqueAddress) -- removedUnreachable.map(_.uniqueAddress)
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachable) // update gossip overview
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
(newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, removedUnreachable, Member.none)
} else if (AutoDown) {
// we don't have convergence - so we might have unreachable nodes
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
val newUnreachableMembers = localUnreachableMembers collect {
// ----------------------
// Move UNREACHABLE => DOWN (auto-downing by leader)
// ----------------------
case member if member.status != Down member copy (status = Down)
case downMember downMember // no need to DOWN members already DOWN
}
// Check for the need to do side-effecting on successful state change
val unreachableButNotDownedMembers = localUnreachableMembers filter (_.status != Down)
// removing nodes marked as DOWN from the `seen` table
val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down m.uniqueAddress }
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (overview = newOverview) // update gossip
(newGossip, unreachableButNotDownedMembers.nonEmpty, Member.none, Member.none, Member.none, Member.none, unreachableButNotDownedMembers)
} else (localGossip, false, Member.none, Member.none, Member.none, Member.none, Member.none)
if (hasChangedState) { // we have a change of state - version it and try to update
// ----------------------
// Updating the vclock version for the changes
// ----------------------
val versionedGossip = newGossip :+ vclockNode
// ----------------------
// Updating the `seen` table
// Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED
// ----------------------
val seenVersionedGossip =
if (removedMembers.exists(_.uniqueAddress == selfUniqueAddress)) versionedGossip
else versionedGossip seen selfUniqueAddress
// ----------------------
// Update the state with the new gossip
// ----------------------
latestGossip = seenVersionedGossip
// ----------------------
// Run all the side-effecting processing
// ----------------------
// log the move of members from joining to up
upMembers foreach { member
log.info("Cluster Node [{}] - Leader is moving node [{}] from [{}] to [{}]",
selfAddress, member.address, member.status, Up)
}
// tell all removed members to remove and shut down themselves
removedMembers foreach { member
val address = member.address
log.info("Cluster Node [{}] - Leader is moving node [{}] from [{}] to [{}] - and removing node from node ring",
selfAddress, address, member.status, Removed)
clusterCore(address) ! ClusterLeaderAction.Shutdown(member.uniqueAddress)
}
// tell all exiting members to exit
exitingMembers foreach { member
val address = member.address
log.info("Cluster Node [{}] - Leader is moving node [{}] from [{}] to [{}]",
selfAddress, address, member.status, Exiting)
clusterCore(address) ! ClusterLeaderAction.Exit(member.uniqueAddress) // FIXME should wait for completion of handoff?
}
// log the auto-downing of the unreachable nodes
unreachableButNotDownedMembers foreach { member
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as [{}]", selfAddress, member.address, Down)
}
// log the auto-downing of the unreachable nodes
removedUnreachableMembers foreach { member
log.info("Cluster Node [{}] - Leader is removing unreachable node [{}]", selfAddress, member.address)
}
publish(latestGossip)
// log the removal of the unreachable nodes
removedUnreachable foreach { m
val status = if (m.status == Exiting) "exiting" else "unreachable"
log.info("Cluster Node [{}] - Leader is removing {} node [{}]", selfAddress, status, m.address)
}
publish(latestGossip)
if (latestGossip.member(selfUniqueAddress).status == Exiting) {
// Leader is moving itself from Leaving to Exiting. Let others know (best effort)
// before shutdown. Otherwise they will not see the Exiting state change
// and there will not be convergence until they have detected this node as
// unreachable and the required downing has finished. They will still need to detect
// unreachable, but Exiting unreachable will be removed without downing, i.e.
// normally the leaving of a leader will be graceful without the need
// for downing. However, if those final gossip messages never arrive it is
// alright to require the downing, because that is probably caused by a
// network failure anyway.
for (_ 1 to NumberOfGossipsBeforeShutdownWhenLeaderExits) gossip()
shutdown()
}
}
}
/**
* When the node is in the UNREACHABLE set it can be auto-down by leader
*/
def leaderAutoDownActions(): Unit = {
val localGossip = latestGossip
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
val changedUnreachableMembers = localUnreachableMembers collect {
case m if !Gossip.convergenceSkipUnreachableWithMemberStatus(m.status) m copy (status = Down)
}
if (changedUnreachableMembers.nonEmpty) {
// handle changes
// replace changed unreachable
val newUnreachableMembers = localUnreachableMembers -- changedUnreachableMembers ++ changedUnreachableMembers
// removing nodes marked as Down/Exiting from the `seen` table
val newSeen = localSeen -- changedUnreachableMembers.map(_.uniqueAddress)
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (overview = newOverview) // update gossip
updateLatestGossip(newGossip)
// log the auto-downing of the unreachable nodes
changedUnreachableMembers foreach { m
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as [{}]", selfAddress, m.address, m.status)
}
publish(latestGossip)
}
}
@ -897,7 +822,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val localUnreachableMembers = localGossip.overview.unreachable
val newlyDetectedUnreachableMembers = localMembers filterNot { member
member.uniqueAddress == selfUniqueAddress || member.status == Exiting || failureDetector.isAvailable(member.address)
member.uniqueAddress == selfUniqueAddress || failureDetector.isAvailable(member.address)
}
if (newlyDetectedUnreachableMembers.nonEmpty) {
@ -908,13 +833,14 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val newOverview = localOverview copy (unreachable = newUnreachableMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
// updating vclock and `seen` table
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfUniqueAddress
updateLatestGossip(newGossip)
latestGossip = seenVersionedGossip
log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", "))
val (exiting, nonExiting) = newlyDetectedUnreachableMembers.partition(_.status == Exiting)
if (nonExiting.nonEmpty)
log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, nonExiting.mkString(", "))
if (exiting.nonEmpty)
log.info("Cluster Node [{}] - Marking exiting node(s) as UNREACHABLE [{}]. This is expected and they will be removed.",
selfAddress, exiting.mkString(", "))
publish(latestGossip)
}
@ -958,6 +884,15 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
def validNodeForGossip(node: UniqueAddress): Boolean =
(node != selfUniqueAddress && latestGossip.members.exists(_.uniqueAddress == node))
def updateLatestGossip(newGossip: Gossip): Unit = {
// Updating the vclock version for the changes
val versionedGossip = newGossip :+ vclockNode
// Updating the `seen` table
val seenVersionedGossip = versionedGossip seen selfUniqueAddress
// Update the state with the new gossip
latestGossip = seenVersionedGossip
}
def publish(newGossip: Gossip): Unit = {
publisher ! PublishChanges(newGossip)
if (PublishStatsInterval == Duration.Zero) publishInternalStats()