diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 5458285245..b2126409c1 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -65,10 +65,6 @@ akka { # Probability value is between 0.0 and 1.0. 0.0 means never, 1.0 means always. gossip-different-view-probability = 0.8 - # Limit number of merge conflicts per second that are handled. If the limit is - # exceeded the conflicting gossip messages are dropped and will reappear later. - max-gossip-merge-rate = 5.0 - # Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf # [Hayashibara et al]) used by the cluster subsystem to detect unreachable members. failure-detector { diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 3ca4c343b7..21e423c974 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -540,8 +540,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto latestGossip = winningGossip seen selfAddress // for all new joining nodes we remove them from the failure detector - (latestGossip.members -- localGossip.members).foreach { - node ⇒ if (node.status == Joining) failureDetector.remove(node.address) + latestGossip.members foreach { + node ⇒ if (node.status == Joining && !localGossip.members(node)) failureDetector.remove(node.address) } log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index e4b36d6f5a..27bde94a7f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -59,7 +59,6 @@ class ClusterSettings(val config: Config, val systemName: String) { case id ⇒ id } final val GossipDifferentViewProbability: Double = cc.getDouble("gossip-different-view-probability") - final val MaxGossipMergeRate: Double = cc.getDouble("max-gossip-merge-rate") final val SchedulerTickDuration: FiniteDuration = Duration(cc.getMilliseconds("scheduler.tick-duration"), MILLISECONDS) final val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel") final val MetricsEnabled: Boolean = cc.getBoolean("metrics.enabled") diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 23fc6c8885..2d3ba52570 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -120,17 +120,21 @@ private[cluster] case class Gossip( overview.seen.get(address).exists(_ == version) } - private def mergeSeenTables(allowed: immutable.Set[Member], one: Map[Address, VectorClock], another: Map[Address, VectorClock]): Map[Address, VectorClock] = { - (one.filter { case (a, v) ⇒ allowed.exists(_.address == a) } /: another) { - case (merged, (address, oneVersion)) ⇒ - if (allowed.exists(_.address == address)) { - val anotherVersion = merged.getOrElse(address, oneVersion) - anotherVersion tryCompareTo oneVersion match { - case None ⇒ merged - address - case Some(x) if x > 0 ⇒ merged + (address -> anotherVersion) - case _ ⇒ merged + (address -> oneVersion) - } - } else merged + private def mergeSeenTables(allowed: Set[Member], one: Map[Address, VectorClock], another: Map[Address, VectorClock]): Map[Address, VectorClock] = { + (Map.empty[Address, VectorClock] /: allowed) { + (merged, member) ⇒ + val address = member.address + (one.get(address), another.get(address)) match { + case (None, None) ⇒ merged + case (Some(v1), None) ⇒ merged.updated(address, v1) + case (None, Some(v2)) ⇒ merged.updated(address, v2) + case (Some(v1), Some(v2)) ⇒ + v1 tryCompareTo v2 match { + case None ⇒ merged + case Some(x) if x > 0 ⇒ merged.updated(address, v1) + case _ ⇒ merged.updated(address, v2) + } + } } } @@ -169,19 +173,14 @@ private[cluster] case class Gossip( * @return true if convergence have been reached and false if not */ def convergence: Boolean = { - val unreachable = overview.unreachable - val seen = overview.seen - // First check that: // 1. we don't have any members that are unreachable, or // 2. all unreachable members in the set have status DOWN // Else we can't continue to check for convergence // When that is done we check that all members exists in the seen table and // have the latest vector clock version - val hasUnreachable = unreachable.nonEmpty && unreachable.exists { _.status != Down } - def allMembersInSeenHasLatest = members.forall(m ⇒ seen.get(m.address).exists(_ == version)) - !hasUnreachable && allMembersInSeenHasLatest + overview.unreachable.forall(_.status == Down) && members.forall(m ⇒ seenByAddress(m.address)) } def isLeader(address: Address): Boolean = leader == Some(address) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index c0902eeffd..ec851b1594 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -84,7 +84,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { normal-throughput-duration = 30s high-throughput-duration = 10s supervision-duration = 10s - supervision-one-iteration = 1s + supervision-one-iteration = 2.5s expected-test-duration = 600s # actors are created in a tree structure defined # by tree-width (number of children for each actor) and diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 39e273d345..9cdb47aeb2 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -42,7 +42,6 @@ class ClusterConfigSpec extends AkkaSpec { JmxEnabled must be(true) UseDispatcher must be(Dispatchers.DefaultDispatcherId) GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001) - MaxGossipMergeRate must be(5.0 plusOrMinus 0.0001) SchedulerTickDuration must be(33 millis) SchedulerTicksPerWheel must be(512) MetricsEnabled must be(true)