diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 7d0e9408a1..3bad4eb512 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -554,7 +554,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // leader handles merge conflicts, or when they have different views of how is leader val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader - val conflict = remoteGossip.version <> localGossip.version + val comparison = remoteGossip.version tryCompareTo localGossip.version + val conflict = !comparison.isDefined if (conflict && !handleMerge) { // delegate merge resolution to leader to reduce number of simultaneous resolves, @@ -573,10 +574,24 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } else { - val winningGossip = - if (conflict) (remoteGossip merge localGossip) :+ vclockNode // conflicting versions, merge, and new version - else if (remoteGossip.version < localGossip.version) localGossip // local gossip is newer - else remoteGossip // remote gossip is newer + val (winningGossip, talkback) = comparison match { + case None ⇒ + // conflicting versions, merge, and new version + ((remoteGossip merge localGossip) :+ vclockNode, true) + case Some(0) ⇒ + // same version + stats = stats.incrementSameCount + // TODO optimize talkback based on how the merged seen differs + (remoteGossip mergeSeen localGossip, !remoteGossip.hasSeen(selfAddress)) + case Some(x) if (x < 0) ⇒ + // local is newer + stats = stats.incrementNewerCount + (localGossip, true) + case _ ⇒ + // remote is newer + stats = stats.incrementOlderCount + (remoteGossip, !remoteGossip.hasSeen(selfAddress)) + } latestGossip = winningGossip seen selfAddress @@ -597,8 +612,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto stats = stats.incrementReceivedGossipCount publish(latestGossip) - if (envelope.conversation && - (conflict || (winningGossip ne remoteGossip) || (latestGossip ne remoteGossip))) { + if (envelope.conversation && talkback) { // send back gossip to sender when sender had different view, i.e. merge, or sender had // older or sender had newer gossipTo(from) @@ -1040,7 +1054,10 @@ private[cluster] case class ClusterStats( receivedGossipCount: Long = 0L, mergeConflictCount: Long = 0L, mergeCount: Long = 0L, - mergeDetectedCount: Long = 0L) { + mergeDetectedCount: Long = 0L, + sameCount: Long = 0L, + newerCount: Long = 0L, + olderCount: Long = 0L) { def incrementReceivedGossipCount(): ClusterStats = copy(receivedGossipCount = receivedGossipCount + 1) @@ -1053,4 +1070,36 @@ private[cluster] case class ClusterStats( def incrementMergeDetectedCount(): ClusterStats = copy(mergeDetectedCount = mergeDetectedCount + 1) + + def incrementSameCount(): ClusterStats = + copy(sameCount = sameCount + 1) + + def incrementNewerCount(): ClusterStats = + copy(newerCount = newerCount + 1) + + def incrementOlderCount(): ClusterStats = + copy(olderCount = olderCount + 1) + + def +(that: ClusterStats): ClusterStats = { + ClusterStats( + this.receivedGossipCount + that.receivedGossipCount, + this.mergeConflictCount + that.mergeConflictCount, + this.mergeCount + that.mergeCount, + this.mergeDetectedCount + that.mergeDetectedCount, + this.sameCount + that.sameCount, + this.newerCount + that.newerCount, + this.olderCount + that.olderCount) + } + + def -(that: ClusterStats): ClusterStats = { + ClusterStats( + this.receivedGossipCount - that.receivedGossipCount, + this.mergeConflictCount - that.mergeConflictCount, + this.mergeCount - that.mergeCount, + this.mergeDetectedCount - that.mergeDetectedCount, + this.sameCount - that.sameCount, + this.newerCount - that.newerCount, + this.olderCount - that.olderCount) + } + } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 0309a8d67a..7fdfe1b198 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -100,7 +100,7 @@ private[cluster] case class Gossip( * Map with the VectorClock (version) for the new gossip. */ def seen(address: Address): Gossip = { - if (overview.seen.contains(address) && overview.seen(address) == version) this + if (hasSeen(address)) this else this copy (overview = overview copy (seen = overview.seen + (address -> version))) } @@ -113,6 +113,28 @@ private[cluster] case class Gossip( }.toSet } + /** + * Has this Gossip been seen by this address. + */ + def hasSeen(address: Address): Boolean = { + overview.seen.get(address).fold { false } { _ == version } + } + + /** + * Merges the seen table of two Gossip instances. + */ + def mergeSeen(that: Gossip): Gossip = { + val mergedSeen = (overview.seen /: that.overview.seen) { + case (merged, (address, version)) ⇒ + val curr = merged.getOrElse(address, version) + if (curr > version) + merged + (address -> curr) + else + merged + (address -> version) + } + this copy (overview = overview copy (seen = mergedSeen)) + } + /** * Merges two Gossip instances including membership tables, and the VectorClock histories. */ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index 1875a0d48d..03e4609faa 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -215,6 +215,10 @@ object StressMultiJvmSpec extends MultiNodeConfig { import akka.cluster.Member.addressOrdering immutable.SortedMap.empty[Address, Set[PhiValue]] } + var clusterStatsObservedByNode = { + import akka.cluster.Member.addressOrdering + immutable.SortedMap.empty[Address, ClusterStats] + } import context.dispatcher val reportMetricsTask = context.system.scheduler.schedule( @@ -231,13 +235,14 @@ object StressMultiJvmSpec extends MultiNodeConfig { def receive = { case ClusterMetricsChanged(clusterMetrics) ⇒ nodeMetrics = clusterMetrics case PhiResult(from, phiValues) ⇒ phiValuesObservedByNode += from -> phiValues + case StatsResult(from, stats) => clusterStatsObservedByNode += from -> stats case ReportTick ⇒ - log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}") + log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") case r: ClusterResult ⇒ results :+= r if (results.size == expectedResults) { val aggregated = AggregatedClusterResult(title, maxDuration, totalClusterStats) - log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}") + log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") reportTo foreach { _ ! aggregated } context stop self } @@ -247,13 +252,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { def maxDuration = results.map(_.duration).max - def totalClusterStats = results.map(_.clusterStats).foldLeft(ClusterStats()) { (acc, s) ⇒ - ClusterStats( - receivedGossipCount = acc.receivedGossipCount + s.receivedGossipCount, - mergeConflictCount = acc.mergeConflictCount + s.mergeConflictCount, - mergeCount = acc.mergeCount + s.mergeCount, - mergeDetectedCount = acc.mergeDetectedCount + s.mergeDetectedCount) - } + def totalClusterStats = results.map(_.clusterStats).foldLeft(ClusterStats()){_ + _} def formatMetrics: String = { import akka.cluster.Member.addressOrdering @@ -300,6 +299,16 @@ object StressMultiJvmSpec extends MultiNodeConfig { def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String = s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}" + def formatStats: String = { + if (clusterStatsObservedByNode.isEmpty) "" + else { + val lines = + for { + (monitor, stats) ← clusterStatsObservedByNode.toSeq + } yield s"${monitor}\t${stats}" + ("ClusterStats" +: lines).mkString("\n") + } + } } /** @@ -319,10 +328,10 @@ object StressMultiJvmSpec extends MultiNodeConfig { def formatHistory: String = (formatHistoryHeader +: (history map formatHistoryLine)).mkString("\n") - def formatHistoryHeader: String = "title\tduration (ms)\tgossip count\tmerge count" + def formatHistoryHeader: String = "title\tduration (ms)\tcluster stats" def formatHistoryLine(result: AggregatedClusterResult): String = - s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats.receivedGossipCount}\t${result.clusterStats.mergeCount}" + s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats}" } @@ -381,6 +390,31 @@ object StressMultiJvmSpec extends MultiNodeConfig { } } + class StatsObserver extends Actor { + val cluster = Cluster(context.system) + var reportTo: Option[ActorRef] = None + var startStats = cluster.readView.latestStats + + import context.dispatcher + val checkStatsTask = context.system.scheduler.schedule( + 1.second, 1.second, self, StatsTick) + + override def postStop(): Unit = { + checkStatsTask.cancel() + super.postStop() + } + + def receive = { + case StatsTick ⇒ + val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats - startStats) + reportTo foreach { _ ! res } + case ReportTo(ref) ⇒ + reportTo = ref + case Reset ⇒ + startStats = cluster.readView.latestStats + } + } + /** * Master of routers * @@ -568,6 +602,8 @@ object StressMultiJvmSpec extends MultiNodeConfig { case class PhiResult(from: Address, phiValues: Set[PhiValue]) case class PhiValue(address: Address, countAboveOne: Int, count: Int, max: Double) case class ReportTo(ref: Option[ActorRef]) + case object StatsTick + case class StatsResult(from: Address, stats: ClusterStats) type JobId = Int trait Job { def id: JobId } @@ -622,6 +658,7 @@ abstract class StressSpec sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*"))) sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*PhiResult.*"))) sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*SendBatch.*"))) + sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*ClusterStats.*"))) } val seedNodes = roles.take(numberOfSeedNodes) @@ -645,6 +682,8 @@ abstract class StressSpec enterBarrier("result-aggregator-created-" + step) runOn(roles.take(nbrUsedRoles): _*) { phiObserver ! ReportTo(Some(clusterResultAggregator)) + statsObserver ! Reset + statsObserver ! ReportTo(Some(clusterResultAggregator)) } } @@ -654,6 +693,8 @@ abstract class StressSpec lazy val phiObserver = system.actorOf(Props[PhiObserver], "phiObserver") + lazy val statsObserver = system.actorOf(Props[StatsObserver], "statsObserver") + def awaitClusterResult: Unit = { runOn(roles.head) { val r = clusterResultAggregator @@ -789,14 +830,9 @@ abstract class StressSpec val returnValue = thunk - val duration = (System.nanoTime - startTime).nanos - val latestStats = clusterView.latestStats - val clusterStats = ClusterStats( - receivedGossipCount = latestStats.receivedGossipCount - startStats.receivedGossipCount, - mergeConflictCount = latestStats.mergeConflictCount - startStats.mergeConflictCount, - mergeCount = latestStats.mergeCount - startStats.mergeCount, - mergeDetectedCount = latestStats.mergeDetectedCount - startStats.mergeDetectedCount) - clusterResultAggregator ! ClusterResult(cluster.selfAddress, duration, clusterStats) + clusterResultAggregator ! + ClusterResult(cluster.selfAddress, (System.nanoTime - startTime).nanos, cluster.readView.latestStats - startStats) + returnValue } @@ -813,6 +849,7 @@ abstract class StressSpec val t = title + " round " + counter runOn(usedRoles: _*) { phiObserver ! Reset + statsObserver ! Reset } createResultAggregator(t, expectedResults = nbrUsedRoles, includeInHistory = true) val (nextAS, nextAddresses) = within(loopDuration) { @@ -852,6 +889,7 @@ abstract class StressSpec runOn(usedRoles: _*) { awaitUpConvergence(nbrUsedRoles, timeout = remaining) phiObserver ! Reset + statsObserver ! Reset } } enterBarrier("join-remove-shutdown-" + step) diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index cca7c4213e..339394bb53 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -90,8 +90,8 @@ class GossipSpec extends WordSpec with MustMatchers { } "start with fresh seen table after merge" in { - val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(a1.address) - val g2 = Gossip(members = SortedSet(a2, e2)).seen(e2.address).seen(e2.address) + val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(e1.address) + val g2 = Gossip(members = SortedSet(a2, e2)).seen(a2.address).seen(e2.address) val merged1 = g1 merge g2 merged1.overview.seen.isEmpty must be(true) @@ -120,5 +120,28 @@ class GossipSpec extends WordSpec with MustMatchers { Gossip(members = SortedSet(c3)).leader must be(Some(c3.address)) } + "merge seen table correctly" in { + val vclockNode = VectorClock.Node("something") + val g1 = (Gossip(members = SortedSet(a1, b1, c1, d1)) :+ vclockNode).seen(a1.address).seen(b1.address) + val g2 = (Gossip(members = SortedSet(a1, b1, c1, d1)) :+ vclockNode).seen(a1.address).seen(c1.address) + val g3 = (g1 copy (version = g2.version)).seen(d1.address) + + def checkMerged(merged: Gossip) { + val keys = merged.overview.seen.keys.toSeq + keys.length must be(4) + keys.toSet must be(Set(a1.address, b1.address, c1.address, d1.address)) + + merged hasSeen (a1.address) must be(true) + merged hasSeen (b1.address) must be(false) + merged hasSeen (c1.address) must be(true) + merged hasSeen (d1.address) must be(true) + merged hasSeen (e1.address) must be(false) + + merged.overview.seen(b1.address) must be(g1.version) + } + + checkMerged(g3 mergeSeen g2) + checkMerged(g2 mergeSeen g3) + } } }