From da621c502f50d24489c35e7389f338581cd5368f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 9 Apr 2013 21:06:48 +0200 Subject: [PATCH] Don't send Remove command to unreachable, see #3209 --- .../scala/akka/cluster/ClusterDaemon.scala | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 12f6a90c4b..4eb6ecdea7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -453,7 +453,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto * to shut down himself. * * In the future we might change this to allow the USER to send a Removed(address) message telling an - * arbitrary node to be moved direcly from UP -> REMOVED. + * arbitrary node to be moved directly from UP -> REMOVED. */ def removing(address: Address): Unit = { log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress) @@ -641,6 +641,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto upMembers, exitingMembers, removedMembers, + removedUnreachableMembers, unreachableButNotDownedMembers) = if (localGossip.convergence) { @@ -677,24 +678,21 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // 1. check for state-changes to update // 2. store away removed and exiting members so we can separate the pure state changes val (removedMembers, newMembers1) = localMembers partition (m ⇒ m.status == Exiting || m.status == Down) - val removedMembers2 = removedMembers ++ localUnreachableMembers.filter(_.status == Down) + val (removedUnreachable, newUnreachable) = localUnreachableMembers partition (_.status == Down) val (upMembers, newMembers2) = newMembers1 partition (isJoiningToUp(_)) val exitingMembers = newMembers2 filter (_.status == Leaving && hasPartionHandoffCompletedSuccessfully) - val hasChangedState = removedMembers2.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty + val hasChangedState = removedMembers.nonEmpty || removedUnreachable.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty // removing REMOVED nodes from the 'seen' table - val newSeen = localSeen -- removedMembers2.map(_.address) + val newSeen = localSeen -- removedMembers.map(_.address) -- removedUnreachable.map(_.address) - // removing REMOVED nodes from the 'unreachable' set - val newUnreachableMembers = localUnreachableMembers -- removedMembers2 - - val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview + val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachable) // update gossip overview val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip - (newGossip, hasChangedState, upMembers, exitingMembers, removedMembers2, Member.none) + (newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, removedUnreachable, Member.none) } else if (AutoDown) { // we don't have convergence - so we might have unreachable nodes @@ -717,9 +715,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview val newGossip = localGossip copy (overview = newOverview) // update gossip - (newGossip, unreachableButNotDownedMembers.nonEmpty, Member.none, Member.none, Member.none, unreachableButNotDownedMembers) + (newGossip, unreachableButNotDownedMembers.nonEmpty, Member.none, Member.none, Member.none, Member.none, unreachableButNotDownedMembers) - } else (localGossip, false, Member.none, Member.none, Member.none, Member.none) + } else (localGossip, false, Member.none, Member.none, Member.none, Member.none, Member.none) if (hasChangedState) { // we have a change of state - version it and try to update // ---------------------- @@ -771,6 +769,11 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as [{}]", selfAddress, member.address, Down) } + // log the auto-downing of the unreachable nodes + removedUnreachableMembers foreach { member ⇒ + log.info("Cluster Node [{}] - Leader is removing unreachable node [{}]", selfAddress, member.address) + } + publish(latestGossip) } }