From 532c98c6cdb87ff6256f6f67a4f29b1dd0162048 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 18 Oct 2013 08:25:52 +0200 Subject: [PATCH] +clu #3458 Adjust biased gossip for large cluster --- .../src/main/resources/reference.conf | 4 +++ .../scala/akka/cluster/ClusterDaemon.scala | 30 ++++++++++++++++++- .../scala/akka/cluster/ClusterSettings.scala | 1 + .../akka/cluster/ClusterConfigSpec.scala | 1 + 4 files changed, 35 insertions(+), 1 deletion(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index ea9a208a9d..bc88a6ef2b 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -89,6 +89,10 @@ akka { # this probability. Otherwise Gossip to any random live node. # Probability value is between 0.0 and 1.0. 0.0 means never, 1.0 means always. gossip-different-view-probability = 0.8 + + # Reduced the above probability when the number of nodes in the cluster + # greater than this value. + reduce-gossip-different-view-probability = 400 # Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf # [Hayashibara et al]) used by the cluster subsystem to detect unreachable diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index e73889d6f6..6c9201c461 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -695,7 +695,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val localGossip = latestGossip val preferredGossipTargets: Vector[UniqueAddress] = - if (ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) { // If it's time to try to gossip to some nodes with a different view + if (ThreadLocalRandom.current.nextDouble() < adjustedGossipDifferentViewProbability) { + // If it's time to try to gossip to some nodes with a different view // gossip to a random alive member with preference to a member with older gossip version localGossip.members.collect { case m if !localGossip.seenByNode(m.uniqueAddress) && validNodeForGossip(m.uniqueAddress) ⇒ @@ -720,6 +721,33 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } } + /** + * For large clusters we should avoid shooting down individual + * nodes. Therefore the probability is reduced for large clusters. + */ + def adjustedGossipDifferentViewProbability: Double = { + val size = latestGossip.members.size + val low = ReduceGossipDifferentViewProbability + val high = low * 3 + // start reduction when cluster is larger than configured ReduceGossipDifferentViewProbability + if (size <= low) + GossipDifferentViewProbability + else { + // don't go lower than 1/10 of the configured GossipDifferentViewProbability + val minP = GossipDifferentViewProbability / 10 + if (size >= high) + minP + else { + // linear reduction of the probability with increasing number of nodes + // from ReduceGossipDifferentViewProbability at ReduceGossipDifferentViewProbability nodes + // to ReduceGossipDifferentViewProbability / 10 at ReduceGossipDifferentViewProbability * 3 nodes + // i.e. default from 0.8 at 400 nodes, to 0.08 at 1600 nodes + val k = (minP - GossipDifferentViewProbability) / (high - low) + GossipDifferentViewProbability + (size - low) * k + } + } + } + /** * Runs periodic leader actions, such as member status transitions, assigning partitions etc. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index e88d9156e6..508731b93c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -86,6 +86,7 @@ final class ClusterSettings(val config: Config, val systemName: String) { case id ⇒ id } val GossipDifferentViewProbability: Double = cc.getDouble("gossip-different-view-probability") + val ReduceGossipDifferentViewProbability: Int = cc.getInt("reduce-gossip-different-view-probability") val SchedulerTickDuration: FiniteDuration = Duration(cc.getMilliseconds("scheduler.tick-duration"), MILLISECONDS) val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel") val MetricsEnabled: Boolean = cc.getBoolean("metrics.enabled") diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 22b2c8eb83..f2f06f44a2 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -44,6 +44,7 @@ class ClusterConfigSpec extends AkkaSpec { JmxEnabled must be(true) UseDispatcher must be(Dispatchers.DefaultDispatcherId) GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001) + ReduceGossipDifferentViewProbability must be(400) SchedulerTickDuration must be(33 millis) SchedulerTicksPerWheel must be(512) MetricsEnabled must be(true)