From fad4289b1b9908f1faff8af255bb1ec34a27808d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Tue, 5 Mar 2013 12:49:35 +0100 Subject: [PATCH 1/2] Merge gossip seen table when versions are the same. See #3115 --- .../scala/akka/cluster/ClusterDaemon.scala | 65 ++++++++++++++-- .../src/main/scala/akka/cluster/Gossip.scala | 24 +++++- .../scala/akka/cluster/StressSpec.scala | 76 ++++++++++++++----- .../test/scala/akka/cluster/GossipSpec.scala | 27 ++++++- 4 files changed, 162 insertions(+), 30 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 7d0e9408a1..3bad4eb512 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -554,7 +554,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // leader handles merge conflicts, or when they have different views of how is leader val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader - val conflict = remoteGossip.version <> localGossip.version + val comparison = remoteGossip.version tryCompareTo localGossip.version + val conflict = !comparison.isDefined if (conflict && !handleMerge) { // delegate merge resolution to leader to reduce number of simultaneous resolves, @@ -573,10 +574,24 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } else { - val winningGossip = - if (conflict) (remoteGossip merge localGossip) :+ vclockNode // conflicting versions, merge, and new version - else if (remoteGossip.version < localGossip.version) localGossip // local gossip is newer - else remoteGossip // remote gossip is newer + val (winningGossip, talkback) = comparison match { + case None ⇒ + // conflicting versions, merge, and new version + ((remoteGossip merge localGossip) :+ vclockNode, true) + case Some(0) ⇒ + // same version + stats = stats.incrementSameCount + // TODO optimize talkback based on how the merged seen differs + (remoteGossip mergeSeen localGossip, !remoteGossip.hasSeen(selfAddress)) + case Some(x) if (x < 0) ⇒ + // local is newer + stats = stats.incrementNewerCount + (localGossip, true) + case _ ⇒ + // remote is newer + stats = stats.incrementOlderCount + (remoteGossip, !remoteGossip.hasSeen(selfAddress)) + } latestGossip = winningGossip seen selfAddress @@ -597,8 +612,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto stats = stats.incrementReceivedGossipCount publish(latestGossip) - if (envelope.conversation && - (conflict || (winningGossip ne remoteGossip) || (latestGossip ne remoteGossip))) { + if (envelope.conversation && talkback) { // send back gossip to sender when sender had different view, i.e. merge, or sender had // older or sender had newer gossipTo(from) @@ -1040,7 +1054,10 @@ private[cluster] case class ClusterStats( receivedGossipCount: Long = 0L, mergeConflictCount: Long = 0L, mergeCount: Long = 0L, - mergeDetectedCount: Long = 0L) { + mergeDetectedCount: Long = 0L, + sameCount: Long = 0L, + newerCount: Long = 0L, + olderCount: Long = 0L) { def incrementReceivedGossipCount(): ClusterStats = copy(receivedGossipCount = receivedGossipCount + 1) @@ -1053,4 +1070,36 @@ private[cluster] case class ClusterStats( def incrementMergeDetectedCount(): ClusterStats = copy(mergeDetectedCount = mergeDetectedCount + 1) + + def incrementSameCount(): ClusterStats = + copy(sameCount = sameCount + 1) + + def incrementNewerCount(): ClusterStats = + copy(newerCount = newerCount + 1) + + def incrementOlderCount(): ClusterStats = + copy(olderCount = olderCount + 1) + + def +(that: ClusterStats): ClusterStats = { + ClusterStats( + this.receivedGossipCount + that.receivedGossipCount, + this.mergeConflictCount + that.mergeConflictCount, + this.mergeCount + that.mergeCount, + this.mergeDetectedCount + that.mergeDetectedCount, + this.sameCount + that.sameCount, + this.newerCount + that.newerCount, + this.olderCount + that.olderCount) + } + + def -(that: ClusterStats): ClusterStats = { + ClusterStats( + this.receivedGossipCount - that.receivedGossipCount, + this.mergeConflictCount - that.mergeConflictCount, + this.mergeCount - that.mergeCount, + this.mergeDetectedCount - that.mergeDetectedCount, + this.sameCount - that.sameCount, + this.newerCount - that.newerCount, + this.olderCount - that.olderCount) + } + } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 0309a8d67a..7fdfe1b198 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -100,7 +100,7 @@ private[cluster] case class Gossip( * Map with the VectorClock (version) for the new gossip. */ def seen(address: Address): Gossip = { - if (overview.seen.contains(address) && overview.seen(address) == version) this + if (hasSeen(address)) this else this copy (overview = overview copy (seen = overview.seen + (address -> version))) } @@ -113,6 +113,28 @@ private[cluster] case class Gossip( }.toSet } + /** + * Has this Gossip been seen by this address. + */ + def hasSeen(address: Address): Boolean = { + overview.seen.get(address).fold { false } { _ == version } + } + + /** + * Merges the seen table of two Gossip instances. + */ + def mergeSeen(that: Gossip): Gossip = { + val mergedSeen = (overview.seen /: that.overview.seen) { + case (merged, (address, version)) ⇒ + val curr = merged.getOrElse(address, version) + if (curr > version) + merged + (address -> curr) + else + merged + (address -> version) + } + this copy (overview = overview copy (seen = mergedSeen)) + } + /** * Merges two Gossip instances including membership tables, and the VectorClock histories. */ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index 1875a0d48d..03e4609faa 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -215,6 +215,10 @@ object StressMultiJvmSpec extends MultiNodeConfig { import akka.cluster.Member.addressOrdering immutable.SortedMap.empty[Address, Set[PhiValue]] } + var clusterStatsObservedByNode = { + import akka.cluster.Member.addressOrdering + immutable.SortedMap.empty[Address, ClusterStats] + } import context.dispatcher val reportMetricsTask = context.system.scheduler.schedule( @@ -231,13 +235,14 @@ object StressMultiJvmSpec extends MultiNodeConfig { def receive = { case ClusterMetricsChanged(clusterMetrics) ⇒ nodeMetrics = clusterMetrics case PhiResult(from, phiValues) ⇒ phiValuesObservedByNode += from -> phiValues + case StatsResult(from, stats) => clusterStatsObservedByNode += from -> stats case ReportTick ⇒ - log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}") + log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") case r: ClusterResult ⇒ results :+= r if (results.size == expectedResults) { val aggregated = AggregatedClusterResult(title, maxDuration, totalClusterStats) - log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}") + log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") reportTo foreach { _ ! aggregated } context stop self } @@ -247,13 +252,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { def maxDuration = results.map(_.duration).max - def totalClusterStats = results.map(_.clusterStats).foldLeft(ClusterStats()) { (acc, s) ⇒ - ClusterStats( - receivedGossipCount = acc.receivedGossipCount + s.receivedGossipCount, - mergeConflictCount = acc.mergeConflictCount + s.mergeConflictCount, - mergeCount = acc.mergeCount + s.mergeCount, - mergeDetectedCount = acc.mergeDetectedCount + s.mergeDetectedCount) - } + def totalClusterStats = results.map(_.clusterStats).foldLeft(ClusterStats()){_ + _} def formatMetrics: String = { import akka.cluster.Member.addressOrdering @@ -300,6 +299,16 @@ object StressMultiJvmSpec extends MultiNodeConfig { def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String = s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}" + def formatStats: String = { + if (clusterStatsObservedByNode.isEmpty) "" + else { + val lines = + for { + (monitor, stats) ← clusterStatsObservedByNode.toSeq + } yield s"${monitor}\t${stats}" + ("ClusterStats" +: lines).mkString("\n") + } + } } /** @@ -319,10 +328,10 @@ object StressMultiJvmSpec extends MultiNodeConfig { def formatHistory: String = (formatHistoryHeader +: (history map formatHistoryLine)).mkString("\n") - def formatHistoryHeader: String = "title\tduration (ms)\tgossip count\tmerge count" + def formatHistoryHeader: String = "title\tduration (ms)\tcluster stats" def formatHistoryLine(result: AggregatedClusterResult): String = - s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats.receivedGossipCount}\t${result.clusterStats.mergeCount}" + s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats}" } @@ -381,6 +390,31 @@ object StressMultiJvmSpec extends MultiNodeConfig { } } + class StatsObserver extends Actor { + val cluster = Cluster(context.system) + var reportTo: Option[ActorRef] = None + var startStats = cluster.readView.latestStats + + import context.dispatcher + val checkStatsTask = context.system.scheduler.schedule( + 1.second, 1.second, self, StatsTick) + + override def postStop(): Unit = { + checkStatsTask.cancel() + super.postStop() + } + + def receive = { + case StatsTick ⇒ + val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats - startStats) + reportTo foreach { _ ! res } + case ReportTo(ref) ⇒ + reportTo = ref + case Reset ⇒ + startStats = cluster.readView.latestStats + } + } + /** * Master of routers * @@ -568,6 +602,8 @@ object StressMultiJvmSpec extends MultiNodeConfig { case class PhiResult(from: Address, phiValues: Set[PhiValue]) case class PhiValue(address: Address, countAboveOne: Int, count: Int, max: Double) case class ReportTo(ref: Option[ActorRef]) + case object StatsTick + case class StatsResult(from: Address, stats: ClusterStats) type JobId = Int trait Job { def id: JobId } @@ -622,6 +658,7 @@ abstract class StressSpec sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*"))) sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*PhiResult.*"))) sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*SendBatch.*"))) + sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*ClusterStats.*"))) } val seedNodes = roles.take(numberOfSeedNodes) @@ -645,6 +682,8 @@ abstract class StressSpec enterBarrier("result-aggregator-created-" + step) runOn(roles.take(nbrUsedRoles): _*) { phiObserver ! ReportTo(Some(clusterResultAggregator)) + statsObserver ! Reset + statsObserver ! ReportTo(Some(clusterResultAggregator)) } } @@ -654,6 +693,8 @@ abstract class StressSpec lazy val phiObserver = system.actorOf(Props[PhiObserver], "phiObserver") + lazy val statsObserver = system.actorOf(Props[StatsObserver], "statsObserver") + def awaitClusterResult: Unit = { runOn(roles.head) { val r = clusterResultAggregator @@ -789,14 +830,9 @@ abstract class StressSpec val returnValue = thunk - val duration = (System.nanoTime - startTime).nanos - val latestStats = clusterView.latestStats - val clusterStats = ClusterStats( - receivedGossipCount = latestStats.receivedGossipCount - startStats.receivedGossipCount, - mergeConflictCount = latestStats.mergeConflictCount - startStats.mergeConflictCount, - mergeCount = latestStats.mergeCount - startStats.mergeCount, - mergeDetectedCount = latestStats.mergeDetectedCount - startStats.mergeDetectedCount) - clusterResultAggregator ! ClusterResult(cluster.selfAddress, duration, clusterStats) + clusterResultAggregator ! + ClusterResult(cluster.selfAddress, (System.nanoTime - startTime).nanos, cluster.readView.latestStats - startStats) + returnValue } @@ -813,6 +849,7 @@ abstract class StressSpec val t = title + " round " + counter runOn(usedRoles: _*) { phiObserver ! Reset + statsObserver ! Reset } createResultAggregator(t, expectedResults = nbrUsedRoles, includeInHistory = true) val (nextAS, nextAddresses) = within(loopDuration) { @@ -852,6 +889,7 @@ abstract class StressSpec runOn(usedRoles: _*) { awaitUpConvergence(nbrUsedRoles, timeout = remaining) phiObserver ! Reset + statsObserver ! Reset } } enterBarrier("join-remove-shutdown-" + step) diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index cca7c4213e..339394bb53 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -90,8 +90,8 @@ class GossipSpec extends WordSpec with MustMatchers { } "start with fresh seen table after merge" in { - val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(a1.address) - val g2 = Gossip(members = SortedSet(a2, e2)).seen(e2.address).seen(e2.address) + val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(e1.address) + val g2 = Gossip(members = SortedSet(a2, e2)).seen(a2.address).seen(e2.address) val merged1 = g1 merge g2 merged1.overview.seen.isEmpty must be(true) @@ -120,5 +120,28 @@ class GossipSpec extends WordSpec with MustMatchers { Gossip(members = SortedSet(c3)).leader must be(Some(c3.address)) } + "merge seen table correctly" in { + val vclockNode = VectorClock.Node("something") + val g1 = (Gossip(members = SortedSet(a1, b1, c1, d1)) :+ vclockNode).seen(a1.address).seen(b1.address) + val g2 = (Gossip(members = SortedSet(a1, b1, c1, d1)) :+ vclockNode).seen(a1.address).seen(c1.address) + val g3 = (g1 copy (version = g2.version)).seen(d1.address) + + def checkMerged(merged: Gossip) { + val keys = merged.overview.seen.keys.toSeq + keys.length must be(4) + keys.toSet must be(Set(a1.address, b1.address, c1.address, d1.address)) + + merged hasSeen (a1.address) must be(true) + merged hasSeen (b1.address) must be(false) + merged hasSeen (c1.address) must be(true) + merged hasSeen (d1.address) must be(true) + merged hasSeen (e1.address) must be(false) + + merged.overview.seen(b1.address) must be(g1.version) + } + + checkMerged(g3 mergeSeen g2) + checkMerged(g2 mergeSeen g3) + } } } From 78c3ca359a530847e42eb0922569bf7b6278ec9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Wed, 6 Mar 2013 13:11:46 +0100 Subject: [PATCH 2/2] Fixes according to review. See #3115 --- .../scala/akka/cluster/ClusterDaemon.scala | 22 +++++------ .../src/main/scala/akka/cluster/Gossip.scala | 2 +- .../scala/akka/cluster/StressSpec.scala | 39 +++++++++---------- 3 files changed, 29 insertions(+), 34 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 3bad4eb512..5b9cbab9d3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -555,7 +555,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // leader handles merge conflicts, or when they have different views of how is leader val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader val comparison = remoteGossip.version tryCompareTo localGossip.version - val conflict = !comparison.isDefined + val conflict = comparison.isEmpty if (conflict && !handleMerge) { // delegate merge resolution to leader to reduce number of simultaneous resolves, @@ -574,25 +574,23 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } else { - val (winningGossip, talkback) = comparison match { + val (winningGossip, talkback, newStats) = comparison match { case None ⇒ // conflicting versions, merge, and new version - ((remoteGossip merge localGossip) :+ vclockNode, true) + ((remoteGossip merge localGossip) :+ vclockNode, true, stats) case Some(0) ⇒ // same version - stats = stats.incrementSameCount // TODO optimize talkback based on how the merged seen differs - (remoteGossip mergeSeen localGossip, !remoteGossip.hasSeen(selfAddress)) - case Some(x) if (x < 0) ⇒ + (remoteGossip mergeSeen localGossip, !remoteGossip.hasSeen(selfAddress), stats.incrementSameCount) + case Some(x) if x < 0 ⇒ // local is newer - stats = stats.incrementNewerCount - (localGossip, true) + (localGossip, true, stats.incrementNewerCount) case _ ⇒ // remote is newer - stats = stats.incrementOlderCount - (remoteGossip, !remoteGossip.hasSeen(selfAddress)) + (remoteGossip, !remoteGossip.hasSeen(selfAddress), stats.incrementOlderCount) } + stats = newStats latestGossip = winningGossip seen selfAddress // for all new joining nodes we remove them from the failure detector @@ -1080,7 +1078,7 @@ private[cluster] case class ClusterStats( def incrementOlderCount(): ClusterStats = copy(olderCount = olderCount + 1) - def +(that: ClusterStats): ClusterStats = { + def :+(that: ClusterStats): ClusterStats = { ClusterStats( this.receivedGossipCount + that.receivedGossipCount, this.mergeConflictCount + that.mergeConflictCount, @@ -1091,7 +1089,7 @@ private[cluster] case class ClusterStats( this.olderCount + that.olderCount) } - def -(that: ClusterStats): ClusterStats = { + def :-(that: ClusterStats): ClusterStats = { ClusterStats( this.receivedGossipCount - that.receivedGossipCount, this.mergeConflictCount - that.mergeConflictCount, diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 7fdfe1b198..ea7328f84f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -117,7 +117,7 @@ private[cluster] case class Gossip( * Has this Gossip been seen by this address. */ def hasSeen(address: Address): Boolean = { - overview.seen.get(address).fold { false } { _ == version } + overview.seen.get(address).exists(_ == version) } /** diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index 03e4609faa..b77e623682 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -213,7 +213,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { var nodeMetrics = Set.empty[NodeMetrics] var phiValuesObservedByNode = { import akka.cluster.Member.addressOrdering - immutable.SortedMap.empty[Address, Set[PhiValue]] + immutable.SortedMap.empty[Address, immutable.SortedSet[PhiValue]] } var clusterStatsObservedByNode = { import akka.cluster.Member.addressOrdering @@ -235,7 +235,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { def receive = { case ClusterMetricsChanged(clusterMetrics) ⇒ nodeMetrics = clusterMetrics case PhiResult(from, phiValues) ⇒ phiValuesObservedByNode += from -> phiValues - case StatsResult(from, stats) => clusterStatsObservedByNode += from -> stats + case StatsResult(from, stats) ⇒ clusterStatsObservedByNode += from -> stats case ReportTick ⇒ log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") case r: ClusterResult ⇒ @@ -252,7 +252,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { def maxDuration = results.map(_.duration).max - def totalClusterStats = results.map(_.clusterStats).foldLeft(ClusterStats()){_ + _} + def totalClusterStats = results.foldLeft(ClusterStats()){_ :+ _.clusterStats} def formatMetrics: String = { import akka.cluster.Member.addressOrdering @@ -286,11 +286,11 @@ object StressMultiJvmSpec extends MultiNodeConfig { import akka.cluster.Member.addressOrdering val lines = for { - (monitor, phiValues) ← phiValuesObservedByNode.toSeq - phi ← phiValues.toSeq.sortBy(_.address) + (monitor, phiValues) ← phiValuesObservedByNode + phi ← phiValues } yield formatPhiLine(monitor, phi.address, phi) - (formatPhiHeader +: lines).mkString("\n") + lines.mkString(formatPhiHeader + "\n", "\n", "") } } @@ -299,16 +299,9 @@ object StressMultiJvmSpec extends MultiNodeConfig { def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String = s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}" - def formatStats: String = { - if (clusterStatsObservedByNode.isEmpty) "" - else { - val lines = - for { - (monitor, stats) ← clusterStatsObservedByNode.toSeq - } yield s"${monitor}\t${stats}" - ("ClusterStats" +: lines).mkString("\n") - } - } + def formatStats: String = + (clusterStatsObservedByNode map { case (monitor, stats) => s"${monitor}\t${stats}" }). + mkString("ClusterStats\n", "\n", "") } /** @@ -377,7 +370,8 @@ object StressMultiJvmSpec extends MultiNodeConfig { math.max(previous.max, φ)) } } - reportTo foreach { _ ! PhiResult(cluster.selfAddress, phiByNode.values.toSet) } + val phiSet = immutable.SortedSet.empty[PhiValue] ++ phiByNode.values + reportTo foreach { _ ! PhiResult(cluster.selfAddress, phiSet) } case state: CurrentClusterState ⇒ nodes = state.members.map(_.address) case memberEvent: MemberEvent ⇒ nodes += memberEvent.member.address case ReportTo(ref) ⇒ reportTo = ref @@ -406,7 +400,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { def receive = { case StatsTick ⇒ - val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats - startStats) + val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats :- startStats) reportTo foreach { _ ! res } case ReportTo(ref) ⇒ reportTo = ref @@ -599,8 +593,11 @@ object StressMultiJvmSpec extends MultiNodeConfig { case object RetryTick case object ReportTick case object PhiTick - case class PhiResult(from: Address, phiValues: Set[PhiValue]) - case class PhiValue(address: Address, countAboveOne: Int, count: Int, max: Double) + case class PhiResult(from: Address, phiValues: immutable.SortedSet[PhiValue]) + case class PhiValue(address: Address, countAboveOne: Int, count: Int, max: Double) extends Ordered[PhiValue] { + import akka.cluster.Member.addressOrdering + def compare(that: PhiValue) = addressOrdering.compare(this.address, that.address) + } case class ReportTo(ref: Option[ActorRef]) case object StatsTick case class StatsResult(from: Address, stats: ClusterStats) @@ -831,7 +828,7 @@ abstract class StressSpec val returnValue = thunk clusterResultAggregator ! - ClusterResult(cluster.selfAddress, (System.nanoTime - startTime).nanos, cluster.readView.latestStats - startStats) + ClusterResult(cluster.selfAddress, (System.nanoTime - startTime).nanos, cluster.readView.latestStats :- startStats) returnValue }