diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 5458285245..b2126409c1 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -65,10 +65,6 @@ akka { # Probability value is between 0.0 and 1.0. 0.0 means never, 1.0 means always. gossip-different-view-probability = 0.8 - # Limit number of merge conflicts per second that are handled. If the limit is - # exceeded the conflicting gossip messages are dropped and will reappear later. - max-gossip-merge-rate = 5.0 - # Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf # [Hayashibara et al]) used by the cluster subsystem to detect unreachable members. failure-detector { diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index a779f4a2fc..6a7416476e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -289,7 +289,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto def initialized: Actor.Receive = { case msg: GossipEnvelope ⇒ receiveGossip(msg) - case msg: GossipMergeConflict ⇒ receiveGossipMerge(msg) case GossipTick ⇒ gossip() case ReapUnreachableTick ⇒ reapUnreachableMembers() case LeaderActionsTick ⇒ leaderActions() @@ -505,35 +504,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto publish(latestGossip) } - /** - * When conflicting versions of received and local [[akka.cluster.Gossip]] is detected - * it's forwarded to the leader for conflict resolution. Trying to simultaneously - * resolving conflicts at several nodes creates new conflicts. Therefore the leader resolves - * conflicts to limit divergence. To avoid overload there is also a configurable rate - * limit of how many conflicts that are handled by second. If the limit is - * exceeded the conflicting gossip messages are dropped and will reappear later. - */ - def receiveGossipMerge(merge: GossipMergeConflict): Unit = { - stats = stats.incrementMergeConflictCount - val rate = mergeRate(stats.mergeConflictCount) - if (rate <= MaxGossipMergeRate) { - receiveGossip(merge.a.copy(conversation = false)) - receiveGossip(merge.b.copy(conversation = false)) - - // use one-way gossip from leader to reduce load of leader - def sendBack(to: Address): Unit = { - if (to != selfAddress && !latestGossip.overview.unreachable.exists(_.address == to)) - oneWayGossipTo(to) - } - - sendBack(merge.a.from) - sendBack(merge.b.from) - - } else { - log.debug("Dropping gossip merge conflict due to rate [{}] / s ", rate) - } - } - /** * Receive new gossip. */ @@ -547,70 +517,47 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } else if (localGossip.overview.isNonDownUnreachable(from)) { log.debug("Ignoring received gossip from unreachable [{}] ", from) } else { - - // leader handles merge conflicts, or when they have different views of how is leader - val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader val comparison = remoteGossip.version tryCompareTo localGossip.version val conflict = comparison.isEmpty - if (conflict && !handleMerge) { - // delegate merge resolution to leader to reduce number of simultaneous resolves, - // which will result in new conflicts + val (winningGossip, talkback, newStats) = comparison match { + case None ⇒ + // conflicting versions, merge + (remoteGossip merge localGossip, true, stats.incrementMergeCount) + case Some(0) ⇒ + // same version + (remoteGossip mergeSeen localGossip, !remoteGossip.seenByAddress(selfAddress), stats.incrementSameCount) + case Some(x) if x < 0 ⇒ + // local is newer + (localGossip, true, stats.incrementNewerCount) + case _ ⇒ + // remote is newer + (remoteGossip, !remoteGossip.seenByAddress(selfAddress), stats.incrementOlderCount) + } - stats = stats.incrementMergeDetectedCount - log.debug("Merge conflict [{}] detected [{}] <> [{}]", stats.mergeDetectedCount, selfAddress, from) + stats = newStats + latestGossip = winningGossip seen selfAddress - stats = stats.incrementMergeConflictCount - val rate = mergeRate(stats.mergeConflictCount) + // for all new joining nodes we remove them from the failure detector + latestGossip.members foreach { + node ⇒ if (node.status == Joining && !localGossip.members(node)) failureDetector.remove(node.address) + } - if (rate <= MaxGossipMergeRate) - localGossip.leader foreach { clusterCore(_) ! GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope) } - else - log.debug("Skipping gossip merge conflict due to rate [{}] / s ", rate) + log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) - } else { + if (conflict) { + log.debug( + """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", + remoteGossip, localGossip, winningGossip) + } - val (winningGossip, talkback, newStats) = comparison match { - case None ⇒ - // conflicting versions, merge, and new version - ((remoteGossip merge localGossip) :+ vclockNode, true, stats) - case Some(0) ⇒ - // same version - // TODO optimize talkback based on how the merged seen differs - (remoteGossip mergeSeen localGossip, !remoteGossip.hasSeen(selfAddress), stats.incrementSameCount) - case Some(x) if x < 0 ⇒ - // local is newer - (localGossip, true, stats.incrementNewerCount) - case _ ⇒ - // remote is newer - (remoteGossip, !remoteGossip.hasSeen(selfAddress), stats.incrementOlderCount) - } + stats = stats.incrementReceivedGossipCount + publish(latestGossip) - stats = newStats - latestGossip = winningGossip seen selfAddress - - // for all new joining nodes we remove them from the failure detector - (latestGossip.members -- localGossip.members).foreach { - node ⇒ if (node.status == Joining) failureDetector.remove(node.address) - } - - log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) - - if (conflict) { - stats = stats.incrementMergeCount - log.debug( - """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", - remoteGossip, localGossip, winningGossip) - } - - stats = stats.incrementReceivedGossipCount - publish(latestGossip) - - if (envelope.conversation && talkback) { - // send back gossip to sender when sender had different view, i.e. merge, or sender had - // older or sender had newer - gossipTo(from) - } + if (envelope.conversation && talkback) { + // send back gossip to sender when sender had different view, i.e. merge, or sender had + // older or sender had newer + gossipTo(from) } } } @@ -621,8 +568,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto * Initiates a new round of gossip. */ def gossip(): Unit = { - stats = stats.copy(mergeConflictCount = 0) - log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress) if (!isSingletonCluster && isAvailable) { @@ -1053,9 +998,7 @@ private[cluster] class OnMemberUpListener(callback: Runnable) extends Actor with */ private[cluster] case class ClusterStats( receivedGossipCount: Long = 0L, - mergeConflictCount: Long = 0L, mergeCount: Long = 0L, - mergeDetectedCount: Long = 0L, sameCount: Long = 0L, newerCount: Long = 0L, olderCount: Long = 0L) { @@ -1063,15 +1006,9 @@ private[cluster] case class ClusterStats( def incrementReceivedGossipCount(): ClusterStats = copy(receivedGossipCount = receivedGossipCount + 1) - def incrementMergeConflictCount(): ClusterStats = - copy(mergeConflictCount = mergeConflictCount + 1) - def incrementMergeCount(): ClusterStats = copy(mergeCount = mergeCount + 1) - def incrementMergeDetectedCount(): ClusterStats = - copy(mergeDetectedCount = mergeDetectedCount + 1) - def incrementSameCount(): ClusterStats = copy(sameCount = sameCount + 1) @@ -1084,9 +1021,7 @@ private[cluster] case class ClusterStats( def :+(that: ClusterStats): ClusterStats = { ClusterStats( this.receivedGossipCount + that.receivedGossipCount, - this.mergeConflictCount + that.mergeConflictCount, this.mergeCount + that.mergeCount, - this.mergeDetectedCount + that.mergeDetectedCount, this.sameCount + that.sameCount, this.newerCount + that.newerCount, this.olderCount + that.olderCount) @@ -1095,9 +1030,7 @@ private[cluster] case class ClusterStats( def :-(that: ClusterStats): ClusterStats = { ClusterStats( this.receivedGossipCount - that.receivedGossipCount, - this.mergeConflictCount - that.mergeConflictCount, this.mergeCount - that.mergeCount, - this.mergeDetectedCount - that.mergeDetectedCount, this.sameCount - that.sameCount, this.newerCount - that.newerCount, this.olderCount - that.olderCount) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index e4b36d6f5a..27bde94a7f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -59,7 +59,6 @@ class ClusterSettings(val config: Config, val systemName: String) { case id ⇒ id } final val GossipDifferentViewProbability: Double = cc.getDouble("gossip-different-view-probability") - final val MaxGossipMergeRate: Double = cc.getDouble("max-gossip-merge-rate") final val SchedulerTickDuration: FiniteDuration = Duration(cc.getMilliseconds("scheduler.tick-duration"), MILLISECONDS) final val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel") final val MetricsEnabled: Boolean = cc.getBoolean("metrics.enabled") diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 46b250101d..2d3ba52570 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -43,7 +43,7 @@ private[cluster] object Gossip { * When a `Gossip` is received the version (vector clock) is used to determine if the * received `Gossip` is newer or older than the current local `Gossip`. The received `Gossip` * and local `Gossip` is merged in case of conflicting version, i.e. vector clocks without - * same history. When merged the seen table is cleared. + * same history. * * When a node is told by the user to leave the cluster the leader will move it to `Leaving` * and then rebalance and repartition the cluster and start hand-off by migrating the actors @@ -100,7 +100,7 @@ private[cluster] case class Gossip( * Map with the VectorClock (version) for the new gossip. */ def seen(address: Address): Gossip = { - if (hasSeen(address)) this + if (seenByAddress(address)) this else this copy (overview = overview copy (seen = overview.seen + (address -> version))) } @@ -116,24 +116,33 @@ private[cluster] case class Gossip( /** * Has this Gossip been seen by this address. */ - def hasSeen(address: Address): Boolean = { + def seenByAddress(address: Address): Boolean = { overview.seen.get(address).exists(_ == version) } + private def mergeSeenTables(allowed: Set[Member], one: Map[Address, VectorClock], another: Map[Address, VectorClock]): Map[Address, VectorClock] = { + (Map.empty[Address, VectorClock] /: allowed) { + (merged, member) ⇒ + val address = member.address + (one.get(address), another.get(address)) match { + case (None, None) ⇒ merged + case (Some(v1), None) ⇒ merged.updated(address, v1) + case (None, Some(v2)) ⇒ merged.updated(address, v2) + case (Some(v1), Some(v2)) ⇒ + v1 tryCompareTo v2 match { + case None ⇒ merged + case Some(x) if x > 0 ⇒ merged.updated(address, v1) + case _ ⇒ merged.updated(address, v2) + } + } + } + } + /** * Merges the seen table of two Gossip instances. */ - def mergeSeen(that: Gossip): Gossip = { - val mergedSeen = (overview.seen /: that.overview.seen) { - case (merged, (address, version)) ⇒ - val curr = merged.getOrElse(address, version) - if (curr > version) - merged + (address -> curr) - else - merged + (address -> version) - } - this copy (overview = overview copy (seen = mergedSeen)) - } + def mergeSeen(that: Gossip): Gossip = + this copy (overview = overview copy (seen = mergeSeenTables(members, overview.seen, that.overview.seen))) /** * Merges two Gossip instances including membership tables, and the VectorClock histories. @@ -151,8 +160,8 @@ private[cluster] case class Gossip( // and exclude unreachable val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) - // 4. fresh seen table - val mergedSeen = Map.empty[Address, VectorClock] + // 4. merge seen table + val mergedSeen = mergeSeenTables(mergedMembers, overview.seen, that.overview.seen) Gossip(mergedMembers, GossipOverview(mergedSeen, mergedUnreachable), mergedVClock) } @@ -164,29 +173,14 @@ private[cluster] case class Gossip( * @return true if convergence have been reached and false if not */ def convergence: Boolean = { - val unreachable = overview.unreachable - val seen = overview.seen - // First check that: // 1. we don't have any members that are unreachable, or // 2. all unreachable members in the set have status DOWN // Else we can't continue to check for convergence - // When that is done we check that all the entries in the 'seen' table have the same vector clock version - // and that all members exists in seen table - val hasUnreachable = unreachable.nonEmpty && unreachable.exists { _.status != Down } - def allMembersInSeen = members.forall(m ⇒ seen.contains(m.address)) + // When that is done we check that all members exists in the seen table and + // have the latest vector clock version - def seenSame: Boolean = - if (seen.isEmpty) { - // if both seen and members are empty, then every(no)body has seen the same thing - members.isEmpty - } else { - val values = seen.values - val seenHead = values.head - values.forall(_ == seenHead) - } - - !hasUnreachable && allMembersInSeen && seenSame + overview.unreachable.forall(_.status == Down) && members.forall(m ⇒ seenByAddress(m.address)) } def isLeader(address: Address): Boolean = leader == Some(address) @@ -240,11 +234,3 @@ private[cluster] case class GossipOverview( * Envelope adding a sender address to the gossip. */ private[cluster] case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage - -/** - * INTERNAL API - * When conflicting versions of received and local [[akka.cluster.Gossip]] is detected - * it's forwarded to the leader for conflict resolution. - */ -private[cluster] case class GossipMergeConflict(a: GossipEnvelope, b: GossipEnvelope) extends ClusterMessage - diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index c0902eeffd..ec851b1594 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -84,7 +84,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { normal-throughput-duration = 30s high-throughput-duration = 10s supervision-duration = 10s - supervision-one-iteration = 1s + supervision-one-iteration = 2.5s expected-test-duration = 600s # actors are created in a tree structure defined # by tree-width (number of children for each actor) and diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 39e273d345..9cdb47aeb2 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -42,7 +42,6 @@ class ClusterConfigSpec extends AkkaSpec { JmxEnabled must be(true) UseDispatcher must be(Dispatchers.DefaultDispatcherId) GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001) - MaxGossipMergeRate must be(5.0 plusOrMinus 0.0001) SchedulerTickDuration must be(33 millis) SchedulerTicksPerWheel must be(512) MetricsEnabled must be(true) diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 2978ba6373..9b677141d3 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -79,18 +79,6 @@ class GossipSpec extends WordSpec with MustMatchers { } - "start with fresh seen table after merge" in { - val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(e1.address) - val g2 = Gossip(members = SortedSet(a2, e2)).seen(a2.address).seen(e2.address) - - val merged1 = g1 merge g2 - merged1.overview.seen.isEmpty must be(true) - - val merged2 = g2 merge g1 - merged2.overview.seen.isEmpty must be(true) - - } - "not have node in both members and unreachable" in intercept[IllegalArgumentException] { Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(b2))) } @@ -121,17 +109,17 @@ class GossipSpec extends WordSpec with MustMatchers { keys.length must be(4) keys.toSet must be(Set(a1.address, b1.address, c1.address, d1.address)) - merged hasSeen (a1.address) must be(true) - merged hasSeen (b1.address) must be(false) - merged hasSeen (c1.address) must be(true) - merged hasSeen (d1.address) must be(true) - merged hasSeen (e1.address) must be(false) + merged seenByAddress (a1.address) must be(true) + merged seenByAddress (b1.address) must be(false) + merged seenByAddress (c1.address) must be(true) + merged seenByAddress (d1.address) must be(true) + merged seenByAddress (e1.address) must be(false) merged.overview.seen(b1.address) must be(g1.version) } - checkMerged(g3 mergeSeen g2) - checkMerged(g2 mergeSeen g3) + checkMerged(g3 merge g2) + checkMerged(g2 merge g3) } } }