diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 3bad4eb512..5b9cbab9d3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -555,7 +555,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // leader handles merge conflicts, or when they have different views of how is leader val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader val comparison = remoteGossip.version tryCompareTo localGossip.version - val conflict = !comparison.isDefined + val conflict = comparison.isEmpty if (conflict && !handleMerge) { // delegate merge resolution to leader to reduce number of simultaneous resolves, @@ -574,25 +574,23 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } else { - val (winningGossip, talkback) = comparison match { + val (winningGossip, talkback, newStats) = comparison match { case None ⇒ // conflicting versions, merge, and new version - ((remoteGossip merge localGossip) :+ vclockNode, true) + ((remoteGossip merge localGossip) :+ vclockNode, true, stats) case Some(0) ⇒ // same version - stats = stats.incrementSameCount // TODO optimize talkback based on how the merged seen differs - (remoteGossip mergeSeen localGossip, !remoteGossip.hasSeen(selfAddress)) - case Some(x) if (x < 0) ⇒ + (remoteGossip mergeSeen localGossip, !remoteGossip.hasSeen(selfAddress), stats.incrementSameCount) + case Some(x) if x < 0 ⇒ // local is newer - stats = stats.incrementNewerCount - (localGossip, true) + (localGossip, true, stats.incrementNewerCount) case _ ⇒ // remote is newer - stats = stats.incrementOlderCount - (remoteGossip, !remoteGossip.hasSeen(selfAddress)) + (remoteGossip, !remoteGossip.hasSeen(selfAddress), stats.incrementOlderCount) } + stats = newStats latestGossip = winningGossip seen selfAddress // for all new joining nodes we remove them from the failure detector @@ -1080,7 +1078,7 @@ private[cluster] case class ClusterStats( def incrementOlderCount(): ClusterStats = copy(olderCount = olderCount + 1) - def +(that: ClusterStats): ClusterStats = { + def :+(that: ClusterStats): ClusterStats = { ClusterStats( this.receivedGossipCount + that.receivedGossipCount, this.mergeConflictCount + that.mergeConflictCount, @@ -1091,7 +1089,7 @@ private[cluster] case class ClusterStats( this.olderCount + that.olderCount) } - def -(that: ClusterStats): ClusterStats = { + def :-(that: ClusterStats): ClusterStats = { ClusterStats( this.receivedGossipCount - that.receivedGossipCount, this.mergeConflictCount - that.mergeConflictCount, diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 7fdfe1b198..ea7328f84f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -117,7 +117,7 @@ private[cluster] case class Gossip( * Has this Gossip been seen by this address. */ def hasSeen(address: Address): Boolean = { - overview.seen.get(address).fold { false } { _ == version } + overview.seen.get(address).exists(_ == version) } /** diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index 03e4609faa..b77e623682 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -213,7 +213,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { var nodeMetrics = Set.empty[NodeMetrics] var phiValuesObservedByNode = { import akka.cluster.Member.addressOrdering - immutable.SortedMap.empty[Address, Set[PhiValue]] + immutable.SortedMap.empty[Address, immutable.SortedSet[PhiValue]] } var clusterStatsObservedByNode = { import akka.cluster.Member.addressOrdering @@ -235,7 +235,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { def receive = { case ClusterMetricsChanged(clusterMetrics) ⇒ nodeMetrics = clusterMetrics case PhiResult(from, phiValues) ⇒ phiValuesObservedByNode += from -> phiValues - case StatsResult(from, stats) => clusterStatsObservedByNode += from -> stats + case StatsResult(from, stats) ⇒ clusterStatsObservedByNode += from -> stats case ReportTick ⇒ log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") case r: ClusterResult ⇒ @@ -252,7 +252,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { def maxDuration = results.map(_.duration).max - def totalClusterStats = results.map(_.clusterStats).foldLeft(ClusterStats()){_ + _} + def totalClusterStats = results.foldLeft(ClusterStats()){_ :+ _.clusterStats} def formatMetrics: String = { import akka.cluster.Member.addressOrdering @@ -286,11 +286,11 @@ object StressMultiJvmSpec extends MultiNodeConfig { import akka.cluster.Member.addressOrdering val lines = for { - (monitor, phiValues) ← phiValuesObservedByNode.toSeq - phi ← phiValues.toSeq.sortBy(_.address) + (monitor, phiValues) ← phiValuesObservedByNode + phi ← phiValues } yield formatPhiLine(monitor, phi.address, phi) - (formatPhiHeader +: lines).mkString("\n") + lines.mkString(formatPhiHeader + "\n", "\n", "") } } @@ -299,16 +299,9 @@ object StressMultiJvmSpec extends MultiNodeConfig { def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String = s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}" - def formatStats: String = { - if (clusterStatsObservedByNode.isEmpty) "" - else { - val lines = - for { - (monitor, stats) ← clusterStatsObservedByNode.toSeq - } yield s"${monitor}\t${stats}" - ("ClusterStats" +: lines).mkString("\n") - } - } + def formatStats: String = + (clusterStatsObservedByNode map { case (monitor, stats) => s"${monitor}\t${stats}" }). + mkString("ClusterStats\n", "\n", "") } /** @@ -377,7 +370,8 @@ object StressMultiJvmSpec extends MultiNodeConfig { math.max(previous.max, φ)) } } - reportTo foreach { _ ! PhiResult(cluster.selfAddress, phiByNode.values.toSet) } + val phiSet = immutable.SortedSet.empty[PhiValue] ++ phiByNode.values + reportTo foreach { _ ! PhiResult(cluster.selfAddress, phiSet) } case state: CurrentClusterState ⇒ nodes = state.members.map(_.address) case memberEvent: MemberEvent ⇒ nodes += memberEvent.member.address case ReportTo(ref) ⇒ reportTo = ref @@ -406,7 +400,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { def receive = { case StatsTick ⇒ - val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats - startStats) + val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats :- startStats) reportTo foreach { _ ! res } case ReportTo(ref) ⇒ reportTo = ref @@ -599,8 +593,11 @@ object StressMultiJvmSpec extends MultiNodeConfig { case object RetryTick case object ReportTick case object PhiTick - case class PhiResult(from: Address, phiValues: Set[PhiValue]) - case class PhiValue(address: Address, countAboveOne: Int, count: Int, max: Double) + case class PhiResult(from: Address, phiValues: immutable.SortedSet[PhiValue]) + case class PhiValue(address: Address, countAboveOne: Int, count: Int, max: Double) extends Ordered[PhiValue] { + import akka.cluster.Member.addressOrdering + def compare(that: PhiValue) = addressOrdering.compare(this.address, that.address) + } case class ReportTo(ref: Option[ActorRef]) case object StatsTick case class StatsResult(from: Address, stats: ClusterStats) @@ -831,7 +828,7 @@ abstract class StressSpec val returnValue = thunk clusterResultAggregator ! - ClusterResult(cluster.selfAddress, (System.nanoTime - startTime).nanos, cluster.readView.latestStats - startStats) + ClusterResult(cluster.selfAddress, (System.nanoTime - startTime).nanos, cluster.readView.latestStats :- startStats) returnValue }