harden cluster leaving, #21847

As documented in the code:

// Leader is moving itself from Leaving to Exiting. Let others know (best effort)
// before shutdown. Otherwise they will not see the Exiting state change
// and there will not be convergence until they have detected this node as
// unreachable and the required downing has finished. They will still need to detect
// unreachable, but Exiting unreachable will be removed without downing, i.e.
// normally the leaving of a leader will be graceful without the need
// for downing. However, if those final gossip messages never arrive it is
// alright to require the downing, because that is probably caused by a
// network failure anyway.

That is fine, but this change improves the selection of the nodes to
send the final gossip messages to.

I could reproduce the failure in ClusterSingletonManagerLeaveSpec and with
additional logging I verified that in the failure case it picked the "first"
node 3 times (it's random) and that node had already been shutdown (left earlier
in the test) but was not removed yet.
This commit is contained in:
Patrik Nordwall 2016-11-17 19:07:22 +01:00
parent bffa9384b6
commit 68383b5001

View file

@ -15,6 +15,8 @@ import akka.cluster.ClusterEvent._
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import scala.collection.breakOut
import akka.remote.QuarantinedEvent
import java.util.ArrayList
import java.util.Collections
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
@ -247,7 +249,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
protected def selfUniqueAddress = cluster.selfUniqueAddress
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 5
val MaxGossipsBeforeShuttingDownMyself = 5
def vclockName(node: UniqueAddress): String = s"${node.address}-${node.longUid}"
@ -573,6 +575,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
logInfo("Marked address [{}] as [{}]", address, Leaving)
publish(latestGossip)
// immediate gossip to speed up the leaving process
gossip()
}
}
@ -775,6 +779,32 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def isGossipSpeedupNeeded: Boolean =
(latestGossip.overview.seen.size < latestGossip.members.size / 2)
/**
* Sends full gossip to `n` other random members.
*/
def gossipRandomN(n: Int): Unit = {
if (!isSingletonCluster && n > 0) {
val localGossip = latestGossip
// using ArrayList to be able to shuffle
val possibleTargets = new ArrayList[UniqueAddress](localGossip.members.size)
localGossip.members.foreach { m
if (validNodeForGossip(m.uniqueAddress))
possibleTargets.add(m.uniqueAddress)
}
val randomTargets =
if (possibleTargets.size <= n)
possibleTargets
else {
Collections.shuffle(possibleTargets, ThreadLocalRandom.current())
possibleTargets.subList(0, n)
}
val iter = randomTargets.iterator
while (iter.hasNext)
gossipTo(iter.next())
}
}
/**
* Initiates a new round of gossip.
*/
@ -878,8 +908,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
logInfo("Shutting down myself")
// not crucial to send gossip, but may speedup removal since fallback to failure detection is not needed
// if other downed know that this node has seen the version
downed.filterNot(n unreachable(n) || n == selfUniqueAddress).take(MaxGossipsBeforeShuttingDownMyself)
.foreach(gossipTo)
gossipRandomN(MaxGossipsBeforeShuttingDownMyself)
shutdown()
}
}
@ -988,7 +1017,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// for downing. However, if those final gossip messages never arrive it is
// alright to require the downing, because that is probably caused by a
// network failure anyway.
for (_ 1 to NumberOfGossipsBeforeShutdownWhenLeaderExits) gossip()
gossipRandomN(NumberOfGossipsBeforeShutdownWhenLeaderExits)
shutdown()
}