Merge pull request #1324 from akka/wip-3209-remove-unreachable-patriknw
Don't send Remove command to unreachable, see #3209
This commit is contained in:
commit
3cfe8f28a2
1 changed files with 14 additions and 11 deletions
|
|
@ -454,7 +454,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
* to shut down himself.
|
||||
*
|
||||
* In the future we might change this to allow the USER to send a Removed(address) message telling an
|
||||
* arbitrary node to be moved direcly from UP -> REMOVED.
|
||||
* arbitrary node to be moved directly from UP -> REMOVED.
|
||||
*/
|
||||
def removing(address: Address): Unit = {
|
||||
log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
|
||||
|
|
@ -642,6 +642,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
upMembers,
|
||||
exitingMembers,
|
||||
removedMembers,
|
||||
removedUnreachableMembers,
|
||||
unreachableButNotDownedMembers) =
|
||||
|
||||
if (localGossip.convergence) {
|
||||
|
|
@ -678,24 +679,21 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
// 1. check for state-changes to update
|
||||
// 2. store away removed and exiting members so we can separate the pure state changes
|
||||
val (removedMembers, newMembers1) = localMembers partition (m ⇒ m.status == Exiting || m.status == Down)
|
||||
val removedMembers2 = removedMembers ++ localUnreachableMembers.filter(_.status == Down)
|
||||
val (removedUnreachable, newUnreachable) = localUnreachableMembers partition (_.status == Down)
|
||||
|
||||
val (upMembers, newMembers2) = newMembers1 partition (isJoiningToUp(_))
|
||||
|
||||
val exitingMembers = newMembers2 filter (_.status == Leaving && hasPartionHandoffCompletedSuccessfully)
|
||||
|
||||
val hasChangedState = removedMembers2.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty
|
||||
val hasChangedState = removedMembers.nonEmpty || removedUnreachable.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty
|
||||
|
||||
// removing REMOVED nodes from the 'seen' table
|
||||
val newSeen = localSeen -- removedMembers2.map(_.address)
|
||||
val newSeen = localSeen -- removedMembers.map(_.address) -- removedUnreachable.map(_.address)
|
||||
|
||||
// removing REMOVED nodes from the 'unreachable' set
|
||||
val newUnreachableMembers = localUnreachableMembers -- removedMembers2
|
||||
|
||||
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
|
||||
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachable) // update gossip overview
|
||||
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
|
||||
|
||||
(newGossip, hasChangedState, upMembers, exitingMembers, removedMembers2, Member.none)
|
||||
(newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, removedUnreachable, Member.none)
|
||||
|
||||
} else if (AutoDown) {
|
||||
// we don't have convergence - so we might have unreachable nodes
|
||||
|
|
@ -718,9 +716,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
|
||||
val newGossip = localGossip copy (overview = newOverview) // update gossip
|
||||
|
||||
(newGossip, unreachableButNotDownedMembers.nonEmpty, Member.none, Member.none, Member.none, unreachableButNotDownedMembers)
|
||||
(newGossip, unreachableButNotDownedMembers.nonEmpty, Member.none, Member.none, Member.none, Member.none, unreachableButNotDownedMembers)
|
||||
|
||||
} else (localGossip, false, Member.none, Member.none, Member.none, Member.none)
|
||||
} else (localGossip, false, Member.none, Member.none, Member.none, Member.none, Member.none)
|
||||
|
||||
if (hasChangedState) { // we have a change of state - version it and try to update
|
||||
// ----------------------
|
||||
|
|
@ -772,6 +770,11 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as [{}]", selfAddress, member.address, Down)
|
||||
}
|
||||
|
||||
// log the auto-downing of the unreachable nodes
|
||||
removedUnreachableMembers foreach { member ⇒
|
||||
log.info("Cluster Node [{}] - Leader is removing unreachable node [{}]", selfAddress, member.address)
|
||||
}
|
||||
|
||||
publish(latestGossip)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue