diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 4ebf64c1c7..3ca4c343b7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -289,7 +289,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto def initialized: Actor.Receive = { case msg: GossipEnvelope ⇒ receiveGossip(msg) - case msg: GossipMergeConflict ⇒ receiveGossipMerge(msg) case GossipTick ⇒ gossip() case ReapUnreachableTick ⇒ reapUnreachableMembers() case LeaderActionsTick ⇒ leaderActions() @@ -506,35 +505,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto publish(latestGossip) } - /** - * When conflicting versions of received and local [[akka.cluster.Gossip]] is detected - * it's forwarded to the leader for conflict resolution. Trying to simultaneously - * resolving conflicts at several nodes creates new conflicts. Therefore the leader resolves - * conflicts to limit divergence. To avoid overload there is also a configurable rate - * limit of how many conflicts that are handled by second. If the limit is - * exceeded the conflicting gossip messages are dropped and will reappear later. - */ - def receiveGossipMerge(merge: GossipMergeConflict): Unit = { - stats = stats.incrementMergeConflictCount - val rate = mergeRate(stats.mergeConflictCount) - if (rate <= MaxGossipMergeRate) { - receiveGossip(merge.a.copy(conversation = false)) - receiveGossip(merge.b.copy(conversation = false)) - - // use one-way gossip from leader to reduce load of leader - def sendBack(to: Address): Unit = { - if (to != selfAddress && !latestGossip.overview.unreachable.exists(_.address == to)) - oneWayGossipTo(to) - } - - sendBack(merge.a.from) - sendBack(merge.b.from) - - } else { - log.debug("Dropping gossip merge conflict due to rate [{}] / s ", rate) - } - } - /** * Receive new gossip. */ @@ -548,70 +518,47 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } else if (localGossip.overview.isNonDownUnreachable(from)) { log.debug("Ignoring received gossip from unreachable [{}] ", from) } else { - - // leader handles merge conflicts, or when they have different views of how is leader - val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader val comparison = remoteGossip.version tryCompareTo localGossip.version val conflict = comparison.isEmpty - if (conflict && !handleMerge) { - // delegate merge resolution to leader to reduce number of simultaneous resolves, - // which will result in new conflicts + val (winningGossip, talkback, newStats) = comparison match { + case None ⇒ + // conflicting versions, merge + (remoteGossip merge localGossip, true, stats.incrementMergeCount) + case Some(0) ⇒ + // same version + (remoteGossip mergeSeen localGossip, !remoteGossip.seenByAddress(selfAddress), stats.incrementSameCount) + case Some(x) if x < 0 ⇒ + // local is newer + (localGossip, true, stats.incrementNewerCount) + case _ ⇒ + // remote is newer + (remoteGossip, !remoteGossip.seenByAddress(selfAddress), stats.incrementOlderCount) + } - stats = stats.incrementMergeDetectedCount - log.debug("Merge conflict [{}] detected [{}] <> [{}]", stats.mergeDetectedCount, selfAddress, from) + stats = newStats + latestGossip = winningGossip seen selfAddress - stats = stats.incrementMergeConflictCount - val rate = mergeRate(stats.mergeConflictCount) + // for all new joining nodes we remove them from the failure detector + (latestGossip.members -- localGossip.members).foreach { + node ⇒ if (node.status == Joining) failureDetector.remove(node.address) + } - if (rate <= MaxGossipMergeRate) - localGossip.leader foreach { clusterCore(_) ! GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope) } - else - log.debug("Skipping gossip merge conflict due to rate [{}] / s ", rate) + log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) - } else { + if (conflict) { + log.debug( + """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", + remoteGossip, localGossip, winningGossip) + } - val (winningGossip, talkback, newStats) = comparison match { - case None ⇒ - // conflicting versions, merge, and new version - ((remoteGossip merge localGossip) :+ vclockNode, true, stats) - case Some(0) ⇒ - // same version - // TODO optimize talkback based on how the merged seen differs - (remoteGossip mergeSeen localGossip, !remoteGossip.hasSeen(selfAddress), stats.incrementSameCount) - case Some(x) if x < 0 ⇒ - // local is newer - (localGossip, true, stats.incrementNewerCount) - case _ ⇒ - // remote is newer - (remoteGossip, !remoteGossip.hasSeen(selfAddress), stats.incrementOlderCount) - } + stats = stats.incrementReceivedGossipCount + publish(latestGossip) - stats = newStats - latestGossip = winningGossip seen selfAddress - - // for all new joining nodes we remove them from the failure detector - (latestGossip.members -- localGossip.members).foreach { - node ⇒ if (node.status == Joining) failureDetector.remove(node.address) - } - - log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) - - if (conflict) { - stats = stats.incrementMergeCount - log.debug( - """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", - remoteGossip, localGossip, winningGossip) - } - - stats = stats.incrementReceivedGossipCount - publish(latestGossip) - - if (envelope.conversation && talkback) { - // send back gossip to sender when sender had different view, i.e. merge, or sender had - // older or sender had newer - gossipTo(from) - } + if (envelope.conversation && talkback) { + // send back gossip to sender when sender had different view, i.e. merge, or sender had + // older or sender had newer + gossipTo(from) } } } @@ -622,8 +569,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto * Initiates a new round of gossip. */ def gossip(): Unit = { - stats = stats.copy(mergeConflictCount = 0) - log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress) if (!isSingletonCluster && isAvailable) { @@ -1049,9 +994,7 @@ private[cluster] class OnMemberUpListener(callback: Runnable) extends Actor with */ private[cluster] case class ClusterStats( receivedGossipCount: Long = 0L, - mergeConflictCount: Long = 0L, mergeCount: Long = 0L, - mergeDetectedCount: Long = 0L, sameCount: Long = 0L, newerCount: Long = 0L, olderCount: Long = 0L) { @@ -1059,15 +1002,9 @@ private[cluster] case class ClusterStats( def incrementReceivedGossipCount(): ClusterStats = copy(receivedGossipCount = receivedGossipCount + 1) - def incrementMergeConflictCount(): ClusterStats = - copy(mergeConflictCount = mergeConflictCount + 1) - def incrementMergeCount(): ClusterStats = copy(mergeCount = mergeCount + 1) - def incrementMergeDetectedCount(): ClusterStats = - copy(mergeDetectedCount = mergeDetectedCount + 1) - def incrementSameCount(): ClusterStats = copy(sameCount = sameCount + 1) @@ -1080,9 +1017,7 @@ private[cluster] case class ClusterStats( def :+(that: ClusterStats): ClusterStats = { ClusterStats( this.receivedGossipCount + that.receivedGossipCount, - this.mergeConflictCount + that.mergeConflictCount, this.mergeCount + that.mergeCount, - this.mergeDetectedCount + that.mergeDetectedCount, this.sameCount + that.sameCount, this.newerCount + that.newerCount, this.olderCount + that.olderCount) @@ -1091,9 +1026,7 @@ private[cluster] case class ClusterStats( def :-(that: ClusterStats): ClusterStats = { ClusterStats( this.receivedGossipCount - that.receivedGossipCount, - this.mergeConflictCount - that.mergeConflictCount, this.mergeCount - that.mergeCount, - this.mergeDetectedCount - that.mergeDetectedCount, this.sameCount - that.sameCount, this.newerCount - that.newerCount, this.olderCount - that.olderCount) diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 46b250101d..23fc6c8885 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -43,7 +43,7 @@ private[cluster] object Gossip { * When a `Gossip` is received the version (vector clock) is used to determine if the * received `Gossip` is newer or older than the current local `Gossip`. The received `Gossip` * and local `Gossip` is merged in case of conflicting version, i.e. vector clocks without - * same history. When merged the seen table is cleared. + * same history. * * When a node is told by the user to leave the cluster the leader will move it to `Leaving` * and then rebalance and repartition the cluster and start hand-off by migrating the actors @@ -100,7 +100,7 @@ private[cluster] case class Gossip( * Map with the VectorClock (version) for the new gossip. */ def seen(address: Address): Gossip = { - if (hasSeen(address)) this + if (seenByAddress(address)) this else this copy (overview = overview copy (seen = overview.seen + (address -> version))) } @@ -116,24 +116,29 @@ private[cluster] case class Gossip( /** * Has this Gossip been seen by this address. */ - def hasSeen(address: Address): Boolean = { + def seenByAddress(address: Address): Boolean = { overview.seen.get(address).exists(_ == version) } + private def mergeSeenTables(allowed: immutable.Set[Member], one: Map[Address, VectorClock], another: Map[Address, VectorClock]): Map[Address, VectorClock] = { + (one.filter { case (a, v) ⇒ allowed.exists(_.address == a) } /: another) { + case (merged, (address, oneVersion)) ⇒ + if (allowed.exists(_.address == address)) { + val anotherVersion = merged.getOrElse(address, oneVersion) + anotherVersion tryCompareTo oneVersion match { + case None ⇒ merged - address + case Some(x) if x > 0 ⇒ merged + (address -> anotherVersion) + case _ ⇒ merged + (address -> oneVersion) + } + } else merged + } + } + /** * Merges the seen table of two Gossip instances. */ - def mergeSeen(that: Gossip): Gossip = { - val mergedSeen = (overview.seen /: that.overview.seen) { - case (merged, (address, version)) ⇒ - val curr = merged.getOrElse(address, version) - if (curr > version) - merged + (address -> curr) - else - merged + (address -> version) - } - this copy (overview = overview copy (seen = mergedSeen)) - } + def mergeSeen(that: Gossip): Gossip = + this copy (overview = overview copy (seen = mergeSeenTables(members, overview.seen, that.overview.seen))) /** * Merges two Gossip instances including membership tables, and the VectorClock histories. @@ -151,8 +156,8 @@ private[cluster] case class Gossip( // and exclude unreachable val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) - // 4. fresh seen table - val mergedSeen = Map.empty[Address, VectorClock] + // 4. merge seen table + val mergedSeen = mergeSeenTables(mergedMembers, overview.seen, that.overview.seen) Gossip(mergedMembers, GossipOverview(mergedSeen, mergedUnreachable), mergedVClock) } @@ -171,22 +176,12 @@ private[cluster] case class Gossip( // 1. we don't have any members that are unreachable, or // 2. all unreachable members in the set have status DOWN // Else we can't continue to check for convergence - // When that is done we check that all the entries in the 'seen' table have the same vector clock version - // and that all members exists in seen table + // When that is done we check that all members exists in the seen table and + // have the latest vector clock version val hasUnreachable = unreachable.nonEmpty && unreachable.exists { _.status != Down } - def allMembersInSeen = members.forall(m ⇒ seen.contains(m.address)) + def allMembersInSeenHasLatest = members.forall(m ⇒ seen.get(m.address).exists(_ == version)) - def seenSame: Boolean = - if (seen.isEmpty) { - // if both seen and members are empty, then every(no)body has seen the same thing - members.isEmpty - } else { - val values = seen.values - val seenHead = values.head - values.forall(_ == seenHead) - } - - !hasUnreachable && allMembersInSeen && seenSame + !hasUnreachable && allMembersInSeenHasLatest } def isLeader(address: Address): Boolean = leader == Some(address) @@ -240,11 +235,3 @@ private[cluster] case class GossipOverview( * Envelope adding a sender address to the gossip. */ private[cluster] case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage - -/** - * INTERNAL API - * When conflicting versions of received and local [[akka.cluster.Gossip]] is detected - * it's forwarded to the leader for conflict resolution. - */ -private[cluster] case class GossipMergeConflict(a: GossipEnvelope, b: GossipEnvelope) extends ClusterMessage - diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 2978ba6373..9b677141d3 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -79,18 +79,6 @@ class GossipSpec extends WordSpec with MustMatchers { } - "start with fresh seen table after merge" in { - val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(e1.address) - val g2 = Gossip(members = SortedSet(a2, e2)).seen(a2.address).seen(e2.address) - - val merged1 = g1 merge g2 - merged1.overview.seen.isEmpty must be(true) - - val merged2 = g2 merge g1 - merged2.overview.seen.isEmpty must be(true) - - } - "not have node in both members and unreachable" in intercept[IllegalArgumentException] { Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(b2))) } @@ -121,17 +109,17 @@ class GossipSpec extends WordSpec with MustMatchers { keys.length must be(4) keys.toSet must be(Set(a1.address, b1.address, c1.address, d1.address)) - merged hasSeen (a1.address) must be(true) - merged hasSeen (b1.address) must be(false) - merged hasSeen (c1.address) must be(true) - merged hasSeen (d1.address) must be(true) - merged hasSeen (e1.address) must be(false) + merged seenByAddress (a1.address) must be(true) + merged seenByAddress (b1.address) must be(false) + merged seenByAddress (c1.address) must be(true) + merged seenByAddress (d1.address) must be(true) + merged seenByAddress (e1.address) must be(false) merged.overview.seen(b1.address) must be(g1.version) } - checkMerged(g3 mergeSeen g2) - checkMerged(g2 mergeSeen g3) + checkMerged(g3 merge g2) + checkMerged(g2 merge g3) } } }