From 68383b5001482cc91f86cf54234f85599f2a3dc1 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 17 Nov 2016 19:07:22 +0100 Subject: [PATCH] harden cluster leaving, #21847 As documented in the code: // Leader is moving itself from Leaving to Exiting. Let others know (best effort) // before shutdown. Otherwise they will not see the Exiting state change // and there will not be convergence until they have detected this node as // unreachable and the required downing has finished. They will still need to detect // unreachable, but Exiting unreachable will be removed without downing, i.e. // normally the leaving of a leader will be graceful without the need // for downing. However, if those final gossip messages never arrive it is // alright to require the downing, because that is probably caused by a // network failure anyway. That is fine, but this change improves the selection of the nodes to send the final gossip messages to. I could reproduce the failure in ClusterSingletonManagerLeaveSpec and with additional logging I verified that in the failure case it picked the "first" node 3 times (it's random) and that node had already been shutdown (left earlier in the test) but was not removed yet. --- .../scala/akka/cluster/ClusterDaemon.scala | 37 +++++++++++++++++-- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index afb87d0253..e44d3f044a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -15,6 +15,8 @@ import akka.cluster.ClusterEvent._ import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } import scala.collection.breakOut import akka.remote.QuarantinedEvent +import java.util.ArrayList +import java.util.Collections /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -247,7 +249,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with protected def selfUniqueAddress = cluster.selfUniqueAddress - val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3 + val NumberOfGossipsBeforeShutdownWhenLeaderExits = 5 val MaxGossipsBeforeShuttingDownMyself = 5 def vclockName(node: UniqueAddress): String = s"${node.address}-${node.longUid}" @@ -573,6 +575,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with logInfo("Marked address [{}] as [{}]", address, Leaving) publish(latestGossip) + // immediate gossip to speed up the leaving process + gossip() } } @@ -775,6 +779,32 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with def isGossipSpeedupNeeded: Boolean = (latestGossip.overview.seen.size < latestGossip.members.size / 2) + /** + * Sends full gossip to `n` other random members. + */ + def gossipRandomN(n: Int): Unit = { + if (!isSingletonCluster && n > 0) { + val localGossip = latestGossip + // using ArrayList to be able to shuffle + val possibleTargets = new ArrayList[UniqueAddress](localGossip.members.size) + localGossip.members.foreach { m ⇒ + if (validNodeForGossip(m.uniqueAddress)) + possibleTargets.add(m.uniqueAddress) + } + val randomTargets = + if (possibleTargets.size <= n) + possibleTargets + else { + Collections.shuffle(possibleTargets, ThreadLocalRandom.current()) + possibleTargets.subList(0, n) + } + + val iter = randomTargets.iterator + while (iter.hasNext) + gossipTo(iter.next()) + } + } + /** * Initiates a new round of gossip. */ @@ -878,8 +908,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with logInfo("Shutting down myself") // not crucial to send gossip, but may speedup removal since fallback to failure detection is not needed // if other downed know that this node has seen the version - downed.filterNot(n ⇒ unreachable(n) || n == selfUniqueAddress).take(MaxGossipsBeforeShuttingDownMyself) - .foreach(gossipTo) + gossipRandomN(MaxGossipsBeforeShuttingDownMyself) shutdown() } } @@ -988,7 +1017,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with // for downing. However, if those final gossip messages never arrive it is // alright to require the downing, because that is probably caused by a // network failure anyway. - for (_ ← 1 to NumberOfGossipsBeforeShutdownWhenLeaderExits) gossip() + gossipRandomN(NumberOfGossipsBeforeShutdownWhenLeaderExits) shutdown() }