diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 963d1527c1..5c0276ca8b 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -74,7 +74,7 @@ akka { # How often the current internal stats should be published. # A value of 0s can be used to always publish the stats, when it happens. # Disable with "off". - publish-stats-interval = 10s + publish-stats-interval = off # The id of the dispatcher to use for cluster actors. If not specified # default dispatcher is used. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index d2f08d15eb..0e15750823 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -232,7 +232,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // and the Gossip is not versioned for this 'Node' yet var latestGossip: Gossip = Gossip.empty - var stats = ClusterStats() + val statsEnabled = PublishStatsInterval.isFinite + var gossipStats = GossipStats() + var vclockStats = VectorClockStats() var seedNodeProcess: Option[ActorRef] = None @@ -576,22 +578,21 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val comparison = remoteGossip.version tryCompareTo localGossip.version - val (winningGossip, talkback, newStats) = comparison match { + val (winningGossip, talkback) = comparison match { case None ⇒ // conflicting versions, merge - (remoteGossip merge localGossip, true, stats.incrementMergeCount) + (remoteGossip merge localGossip, true) case Some(0) ⇒ // same version - (remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress), stats.incrementSameCount) + (remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress)) case Some(x) if x < 0 ⇒ // local is newer - (localGossip, true, stats.incrementNewerCount) + (localGossip, true) case _ ⇒ // remote is newer - (remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress), stats.incrementOlderCount) + (remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress)) } - stats = newStats latestGossip = winningGossip seen selfUniqueAddress // for all new joining nodes we remove them from the failure detector @@ -607,7 +608,18 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto remoteGossip, localGossip, winningGossip) } - stats = stats.incrementReceivedGossipCount + if (statsEnabled) { + gossipStats = comparison match { + case None ⇒ gossipStats.incrementMergeCount + case Some(0) ⇒ gossipStats.incrementSameCount + case Some(x) if x < 0 ⇒ gossipStats.incrementNewerCount + case _ ⇒ gossipStats.incrementOlderCount + } + vclockStats = VectorClockStats( + versionSize = latestGossip.version.versions.size, + latestGossip.members.count(m ⇒ latestGossip.seenByNode(m.uniqueAddress))) + } + publish(latestGossip) if (latestGossip.member(selfUniqueAddress).status == Exiting) @@ -904,7 +916,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto if (PublishStatsInterval == Duration.Zero) publishInternalStats() } - def publishInternalStats(): Unit = publisher ! CurrentInternalStats(stats) + def publishInternalStats(): Unit = publisher ! CurrentInternalStats(gossipStats, vclockStats) } @@ -1065,30 +1077,27 @@ private[cluster] class OnMemberUpListener(callback: Runnable) extends Actor with * INTERNAL API */ @SerialVersionUID(1L) -private[cluster] case class ClusterStats( +private[cluster] case class GossipStats( receivedGossipCount: Long = 0L, mergeCount: Long = 0L, sameCount: Long = 0L, newerCount: Long = 0L, olderCount: Long = 0L) { - def incrementReceivedGossipCount(): ClusterStats = - copy(receivedGossipCount = receivedGossipCount + 1) + def incrementMergeCount(): GossipStats = + copy(mergeCount = mergeCount + 1, receivedGossipCount = receivedGossipCount + 1) - def incrementMergeCount(): ClusterStats = - copy(mergeCount = mergeCount + 1) + def incrementSameCount(): GossipStats = + copy(sameCount = sameCount + 1, receivedGossipCount = receivedGossipCount + 1) - def incrementSameCount(): ClusterStats = - copy(sameCount = sameCount + 1) + def incrementNewerCount(): GossipStats = + copy(newerCount = newerCount + 1, receivedGossipCount = receivedGossipCount + 1) - def incrementNewerCount(): ClusterStats = - copy(newerCount = newerCount + 1) + def incrementOlderCount(): GossipStats = + copy(olderCount = olderCount + 1, receivedGossipCount = receivedGossipCount + 1) - def incrementOlderCount(): ClusterStats = - copy(olderCount = olderCount + 1) - - def :+(that: ClusterStats): ClusterStats = { - ClusterStats( + def :+(that: GossipStats): GossipStats = { + GossipStats( this.receivedGossipCount + that.receivedGossipCount, this.mergeCount + that.mergeCount, this.sameCount + that.sameCount, @@ -1096,8 +1105,8 @@ private[cluster] case class ClusterStats( this.olderCount + that.olderCount) } - def :-(that: ClusterStats): ClusterStats = { - ClusterStats( + def :-(that: GossipStats): GossipStats = { + GossipStats( this.receivedGossipCount - that.receivedGossipCount, this.mergeCount - that.mergeCount, this.sameCount - that.sameCount, @@ -1106,3 +1115,12 @@ private[cluster] case class ClusterStats( } } + +/** + * INTERNAL API + */ +@SerialVersionUID(1L) +private[cluster] case class VectorClockStats( + versionSize: Int = 0, + seenLatest: Int = 0) + diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 1721959ae3..32359f1811 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -169,7 +169,9 @@ object ClusterEvent { /** * INTERNAL API */ - private[cluster] case class CurrentInternalStats(stats: ClusterStats) extends ClusterDomainEvent + private[cluster] case class CurrentInternalStats( + gossipStats: GossipStats, + vclockStats: VectorClockStats) extends ClusterDomainEvent /** * INTERNAL API diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 6998fbb6c8..222f7d922c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -29,7 +29,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { * Current internal cluster stats, updated periodically via event bus. */ @volatile - private var _latestStats = ClusterStats() + private var _latestStats = CurrentInternalStats(GossipStats(), VectorClockStats()) /** * Current cluster metrics, updated periodically via event bus. @@ -63,7 +63,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { case RoleLeaderChanged(role, leader) ⇒ state = state.copy(roleLeaderMap = state.roleLeaderMap + (role -> leader)) case s: CurrentClusterState ⇒ state = s - case CurrentInternalStats(stats) ⇒ _latestStats = stats + case stats: CurrentInternalStats ⇒ _latestStats = stats case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes } } @@ -146,7 +146,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { /** * INTERNAL API */ - private[cluster] def latestStats: ClusterStats = _latestStats + private[cluster] def latestStats: CurrentInternalStats = _latestStats /** * Unsubscribe to cluster events. diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index cdb9b60792..c72d82efff 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -25,6 +25,7 @@ import akka.actor.SupervisorStrategy._ import akka.actor.Terminated import akka.cluster.ClusterEvent.ClusterMetricsChanged import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ClusterEvent.CurrentInternalStats import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.StandardMetrics.Cpu import akka.cluster.StandardMetrics.HeapMemory @@ -120,7 +121,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { akka.actor.provider = akka.cluster.ClusterActorRefProvider akka.cluster { auto-down = on - publish-stats-interval = 0 s # always, when it happens + publish-stats-interval = 1s } akka.loggers = ["akka.testkit.TestEventListener"] akka.loglevel = INFO @@ -228,9 +229,9 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { case class ClusterResult( address: Address, duration: Duration, - clusterStats: ClusterStats) + clusterStats: GossipStats) - case class AggregatedClusterResult(title: String, duration: Duration, clusterStats: ClusterStats) + case class AggregatedClusterResult(title: String, duration: Duration, clusterStats: GossipStats) /** * Central aggregator of cluster statistics and metrics. @@ -251,7 +252,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { } var clusterStatsObservedByNode = { import akka.cluster.Member.addressOrdering - immutable.SortedMap.empty[Address, ClusterStats] + immutable.SortedMap.empty[Address, CurrentInternalStats] } import context.dispatcher @@ -276,7 +277,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { case r: ClusterResult ⇒ results :+= r if (results.size == expectedResults) { - val aggregated = AggregatedClusterResult(title, maxDuration, totalClusterStats) + val aggregated = AggregatedClusterResult(title, maxDuration, totalGossipStats) if (infolog) log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") reportTo foreach { _ ! aggregated } @@ -288,7 +289,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { def maxDuration = results.map(_.duration).max - def totalClusterStats = results.foldLeft(ClusterStats()) { _ :+ _.clusterStats } + def totalGossipStats = results.foldLeft(GossipStats()) { _ :+ _.clusterStats } def formatMetrics: String = { import akka.cluster.Member.addressOrdering @@ -335,9 +336,16 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String = s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}" - def formatStats: String = - (clusterStatsObservedByNode map { case (monitor, stats) ⇒ s"${monitor}\t${stats}" }). - mkString("ClusterStats(gossip, merge, same, newer, older)\n", "\n", "") + def formatStats: String = { + def f(stats: CurrentInternalStats) = { + import stats.gossipStats._ + import stats.vclockStats._ + s"ClusterStats($receivedGossipCount, $mergeCount, $sameCount, $newerCount, $olderCount, $versionSize, $seenLatest)" + } + (clusterStatsObservedByNode map { case (monitor, stats) ⇒ s"${monitor}\t${f(stats)}" }). + mkString("ClusterStats(gossip, merge, same, newer, older, vclockSize, seenLatest)\n", "\n", "") + } + } /** @@ -357,7 +365,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { def formatHistory: String = (formatHistoryHeader +: (history map formatHistoryLine)).mkString("\n") - def formatHistoryHeader: String = "[Title]\t[Duration (ms)]\t[ClusterStats(gossip, merge, same, newer, older)]" + def formatHistoryHeader: String = "[Title]\t[Duration (ms)]\t[GossipStats(gossip, merge, same, newer, older)]" def formatHistoryLine(result: AggregatedClusterResult): String = s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats}" @@ -431,20 +439,18 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { class StatsObserver extends Actor { val cluster = Cluster(context.system) var reportTo: Option[ActorRef] = None - var startStats = cluster.readView.latestStats + var startStats: Option[GossipStats] = None - import context.dispatcher - val checkStatsTask = context.system.scheduler.schedule( - 1.second, 1.second, self, StatsTick) - - override def postStop(): Unit = { - checkStatsTask.cancel() - super.postStop() - } + override def preStart(): Unit = cluster.subscribe(self, classOf[CurrentInternalStats]) + override def postStop(): Unit = cluster.unsubscribe(self) def receive = { - case StatsTick ⇒ - val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats :- startStats) + case CurrentInternalStats(gossipStats, vclockStats) ⇒ + val diff = startStats match { + case None ⇒ { startStats = Some(gossipStats); gossipStats } + case Some(start) ⇒ gossipStats :- start + } + val res = StatsResult(cluster.selfAddress, CurrentInternalStats(diff, vclockStats)) reportTo foreach { _ ! res } case ReportTo(ref) ⇒ reportTo foreach context.unwatch @@ -456,7 +462,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { case _ ⇒ } case Reset ⇒ - startStats = cluster.readView.latestStats + startStats = None } } @@ -650,8 +656,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { def compare(that: PhiValue) = addressOrdering.compare(this.address, that.address) } case class ReportTo(ref: Option[ActorRef]) - case object StatsTick - case class StatsResult(from: Address, stats: ClusterStats) + case class StatsResult(from: Address, stats: CurrentInternalStats) type JobId = Int trait Job { def id: JobId } @@ -751,6 +756,8 @@ abstract class StressSpec val seedNodes = roles.take(numberOfSeedNodes) + def latestGossipStats = cluster.readView.latestStats.gossipStats + override def cluster: Cluster = { createWorker super.cluster @@ -928,12 +935,12 @@ abstract class StressSpec def reportResult[T](thunk: ⇒ T): T = { val startTime = System.nanoTime - val startStats = clusterView.latestStats + val startStats = clusterView.latestStats.gossipStats val returnValue = thunk clusterResultAggregator foreach { - _ ! ClusterResult(cluster.selfAddress, (System.nanoTime - startTime).nanos, cluster.readView.latestStats :- startStats) + _ ! ClusterResult(cluster.selfAddress, (System.nanoTime - startTime).nanos, latestGossipStats :- startStats) } returnValue diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 92e4be77b1..a3a3d8a9f7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -21,7 +21,10 @@ object TransitionMultiJvmSpec extends MultiNodeConfig { val third = role("third") commonConfig(debugConfig(on = false). - withFallback(ConfigFactory.parseString("akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks")). + withFallback(ConfigFactory.parseString(""" + akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks + akka.cluster.publish-stats-interval = 0 s # always, when it happens + """)). withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } @@ -82,10 +85,10 @@ abstract class TransitionSpec def gossipTo(toRole: RoleName): Unit = { gossipBarrierCounter += 1 runOn(toRole) { - val oldCount = clusterView.latestStats.receivedGossipCount + val oldCount = clusterView.latestStats.gossipStats.receivedGossipCount enterBarrier("before-gossip-" + gossipBarrierCounter) awaitCond { - clusterView.latestStats.receivedGossipCount != oldCount // received gossip + clusterView.latestStats.gossipStats.receivedGossipCount != oldCount // received gossip } // gossip chat will synchronize the views awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala index b7fe2c00d6..2e344b5c06 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala @@ -29,7 +29,6 @@ object UnreachableNodeJoinsAgainMultiNodeConfig extends MultiNodeConfig { commonConfig(ConfigFactory.parseString( """ akka.remote.log-remote-lifecycle-events = off - akka.cluster.publish-stats-interval = 0s """).withFallback(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))) testTransport(on = true) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 5294a604c1..74d0601c69 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -37,7 +37,7 @@ class ClusterConfigSpec extends AkkaSpec { HeartbeatRequestTimeToLive must be(1 minute) LeaderActionsInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second) - PublishStatsInterval must be(10 second) + PublishStatsInterval must be(Duration.Undefined) AutoDown must be(false) MinNrOfMembers must be(1) MinNrOfMembersOfRole must be === Map.empty