diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index afb87d0253..e44d3f044a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -15,6 +15,8 @@ import akka.cluster.ClusterEvent._ import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } import scala.collection.breakOut import akka.remote.QuarantinedEvent +import java.util.ArrayList +import java.util.Collections /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -247,7 +249,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with protected def selfUniqueAddress = cluster.selfUniqueAddress - val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3 + val NumberOfGossipsBeforeShutdownWhenLeaderExits = 5 val MaxGossipsBeforeShuttingDownMyself = 5 def vclockName(node: UniqueAddress): String = s"${node.address}-${node.longUid}" @@ -573,6 +575,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with logInfo("Marked address [{}] as [{}]", address, Leaving) publish(latestGossip) + // immediate gossip to speed up the leaving process + gossip() } } @@ -775,6 +779,32 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with def isGossipSpeedupNeeded: Boolean = (latestGossip.overview.seen.size < latestGossip.members.size / 2) + /** + * Sends full gossip to `n` other random members. + */ + def gossipRandomN(n: Int): Unit = { + if (!isSingletonCluster && n > 0) { + val localGossip = latestGossip + // using ArrayList to be able to shuffle + val possibleTargets = new ArrayList[UniqueAddress](localGossip.members.size) + localGossip.members.foreach { m ⇒ + if (validNodeForGossip(m.uniqueAddress)) + possibleTargets.add(m.uniqueAddress) + } + val randomTargets = + if (possibleTargets.size <= n) + possibleTargets + else { + Collections.shuffle(possibleTargets, ThreadLocalRandom.current()) + possibleTargets.subList(0, n) + } + + val iter = randomTargets.iterator + while (iter.hasNext) + gossipTo(iter.next()) + } + } + /** * Initiates a new round of gossip. */ @@ -878,8 +908,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with logInfo("Shutting down myself") // not crucial to send gossip, but may speedup removal since fallback to failure detection is not needed // if other downed know that this node has seen the version - downed.filterNot(n ⇒ unreachable(n) || n == selfUniqueAddress).take(MaxGossipsBeforeShuttingDownMyself) - .foreach(gossipTo) + gossipRandomN(MaxGossipsBeforeShuttingDownMyself) shutdown() } } @@ -988,7 +1017,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with // for downing. However, if those final gossip messages never arrive it is // alright to require the downing, because that is probably caused by a // network failure anyway. - for (_ ← 1 to NumberOfGossipsBeforeShutdownWhenLeaderExits) gossip() + gossipRandomN(NumberOfGossipsBeforeShutdownWhenLeaderExits) shutdown() }