Don't send Remove command to unreachable, see #3209

This commit is contained in:
Patrik Nordwall 2013-04-09 21:06:48 +02:00
parent 631391b7d6
commit da621c502f

View file

@ -453,7 +453,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
* to shut down himself. * to shut down himself.
* *
* In the future we might change this to allow the USER to send a Removed(address) message telling an * In the future we might change this to allow the USER to send a Removed(address) message telling an
* arbitrary node to be moved direcly from UP -> REMOVED. * arbitrary node to be moved directly from UP -> REMOVED.
*/ */
def removing(address: Address): Unit = { def removing(address: Address): Unit = {
log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress) log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
@ -641,6 +641,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
upMembers, upMembers,
exitingMembers, exitingMembers,
removedMembers, removedMembers,
removedUnreachableMembers,
unreachableButNotDownedMembers) = unreachableButNotDownedMembers) =
if (localGossip.convergence) { if (localGossip.convergence) {
@ -677,24 +678,21 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// 1. check for state-changes to update // 1. check for state-changes to update
// 2. store away removed and exiting members so we can separate the pure state changes // 2. store away removed and exiting members so we can separate the pure state changes
val (removedMembers, newMembers1) = localMembers partition (m m.status == Exiting || m.status == Down) val (removedMembers, newMembers1) = localMembers partition (m m.status == Exiting || m.status == Down)
val removedMembers2 = removedMembers ++ localUnreachableMembers.filter(_.status == Down) val (removedUnreachable, newUnreachable) = localUnreachableMembers partition (_.status == Down)
val (upMembers, newMembers2) = newMembers1 partition (isJoiningToUp(_)) val (upMembers, newMembers2) = newMembers1 partition (isJoiningToUp(_))
val exitingMembers = newMembers2 filter (_.status == Leaving && hasPartionHandoffCompletedSuccessfully) val exitingMembers = newMembers2 filter (_.status == Leaving && hasPartionHandoffCompletedSuccessfully)
val hasChangedState = removedMembers2.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty val hasChangedState = removedMembers.nonEmpty || removedUnreachable.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty
// removing REMOVED nodes from the 'seen' table // removing REMOVED nodes from the 'seen' table
val newSeen = localSeen -- removedMembers2.map(_.address) val newSeen = localSeen -- removedMembers.map(_.address) -- removedUnreachable.map(_.address)
// removing REMOVED nodes from the 'unreachable' set val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachable) // update gossip overview
val newUnreachableMembers = localUnreachableMembers -- removedMembers2
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
(newGossip, hasChangedState, upMembers, exitingMembers, removedMembers2, Member.none) (newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, removedUnreachable, Member.none)
} else if (AutoDown) { } else if (AutoDown) {
// we don't have convergence - so we might have unreachable nodes // we don't have convergence - so we might have unreachable nodes
@ -717,9 +715,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (overview = newOverview) // update gossip val newGossip = localGossip copy (overview = newOverview) // update gossip
(newGossip, unreachableButNotDownedMembers.nonEmpty, Member.none, Member.none, Member.none, unreachableButNotDownedMembers) (newGossip, unreachableButNotDownedMembers.nonEmpty, Member.none, Member.none, Member.none, Member.none, unreachableButNotDownedMembers)
} else (localGossip, false, Member.none, Member.none, Member.none, Member.none) } else (localGossip, false, Member.none, Member.none, Member.none, Member.none, Member.none)
if (hasChangedState) { // we have a change of state - version it and try to update if (hasChangedState) { // we have a change of state - version it and try to update
// ---------------------- // ----------------------
@ -771,6 +769,11 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as [{}]", selfAddress, member.address, Down) log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as [{}]", selfAddress, member.address, Down)
} }
// log the auto-downing of the unreachable nodes
removedUnreachableMembers foreach { member
log.info("Cluster Node [{}] - Leader is removing unreachable node [{}]", selfAddress, member.address)
}
publish(latestGossip) publish(latestGossip)
} }
} }