parent
58756be937
commit
a323936299
8 changed files with 90 additions and 61 deletions
|
|
@ -74,7 +74,7 @@ akka {
|
|||
# How often the current internal stats should be published.
|
||||
# A value of 0s can be used to always publish the stats, when it happens.
|
||||
# Disable with "off".
|
||||
publish-stats-interval = 10s
|
||||
publish-stats-interval = off
|
||||
|
||||
# The id of the dispatcher to use for cluster actors. If not specified
|
||||
# default dispatcher is used.
|
||||
|
|
|
|||
|
|
@ -232,7 +232,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
// and the Gossip is not versioned for this 'Node' yet
|
||||
var latestGossip: Gossip = Gossip.empty
|
||||
|
||||
var stats = ClusterStats()
|
||||
val statsEnabled = PublishStatsInterval.isFinite
|
||||
var gossipStats = GossipStats()
|
||||
var vclockStats = VectorClockStats()
|
||||
|
||||
var seedNodeProcess: Option[ActorRef] = None
|
||||
|
||||
|
|
@ -576,22 +578,21 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
|
||||
val comparison = remoteGossip.version tryCompareTo localGossip.version
|
||||
|
||||
val (winningGossip, talkback, newStats) = comparison match {
|
||||
val (winningGossip, talkback) = comparison match {
|
||||
case None ⇒
|
||||
// conflicting versions, merge
|
||||
(remoteGossip merge localGossip, true, stats.incrementMergeCount)
|
||||
(remoteGossip merge localGossip, true)
|
||||
case Some(0) ⇒
|
||||
// same version
|
||||
(remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress), stats.incrementSameCount)
|
||||
(remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress))
|
||||
case Some(x) if x < 0 ⇒
|
||||
// local is newer
|
||||
(localGossip, true, stats.incrementNewerCount)
|
||||
(localGossip, true)
|
||||
case _ ⇒
|
||||
// remote is newer
|
||||
(remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress), stats.incrementOlderCount)
|
||||
(remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress))
|
||||
}
|
||||
|
||||
stats = newStats
|
||||
latestGossip = winningGossip seen selfUniqueAddress
|
||||
|
||||
// for all new joining nodes we remove them from the failure detector
|
||||
|
|
@ -607,7 +608,18 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
remoteGossip, localGossip, winningGossip)
|
||||
}
|
||||
|
||||
stats = stats.incrementReceivedGossipCount
|
||||
if (statsEnabled) {
|
||||
gossipStats = comparison match {
|
||||
case None ⇒ gossipStats.incrementMergeCount
|
||||
case Some(0) ⇒ gossipStats.incrementSameCount
|
||||
case Some(x) if x < 0 ⇒ gossipStats.incrementNewerCount
|
||||
case _ ⇒ gossipStats.incrementOlderCount
|
||||
}
|
||||
vclockStats = VectorClockStats(
|
||||
versionSize = latestGossip.version.versions.size,
|
||||
latestGossip.members.count(m ⇒ latestGossip.seenByNode(m.uniqueAddress)))
|
||||
}
|
||||
|
||||
publish(latestGossip)
|
||||
|
||||
if (latestGossip.member(selfUniqueAddress).status == Exiting)
|
||||
|
|
@ -904,7 +916,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
if (PublishStatsInterval == Duration.Zero) publishInternalStats()
|
||||
}
|
||||
|
||||
def publishInternalStats(): Unit = publisher ! CurrentInternalStats(stats)
|
||||
def publishInternalStats(): Unit = publisher ! CurrentInternalStats(gossipStats, vclockStats)
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -1065,30 +1077,27 @@ private[cluster] class OnMemberUpListener(callback: Runnable) extends Actor with
|
|||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
private[cluster] case class ClusterStats(
|
||||
private[cluster] case class GossipStats(
|
||||
receivedGossipCount: Long = 0L,
|
||||
mergeCount: Long = 0L,
|
||||
sameCount: Long = 0L,
|
||||
newerCount: Long = 0L,
|
||||
olderCount: Long = 0L) {
|
||||
|
||||
def incrementReceivedGossipCount(): ClusterStats =
|
||||
copy(receivedGossipCount = receivedGossipCount + 1)
|
||||
def incrementMergeCount(): GossipStats =
|
||||
copy(mergeCount = mergeCount + 1, receivedGossipCount = receivedGossipCount + 1)
|
||||
|
||||
def incrementMergeCount(): ClusterStats =
|
||||
copy(mergeCount = mergeCount + 1)
|
||||
def incrementSameCount(): GossipStats =
|
||||
copy(sameCount = sameCount + 1, receivedGossipCount = receivedGossipCount + 1)
|
||||
|
||||
def incrementSameCount(): ClusterStats =
|
||||
copy(sameCount = sameCount + 1)
|
||||
def incrementNewerCount(): GossipStats =
|
||||
copy(newerCount = newerCount + 1, receivedGossipCount = receivedGossipCount + 1)
|
||||
|
||||
def incrementNewerCount(): ClusterStats =
|
||||
copy(newerCount = newerCount + 1)
|
||||
def incrementOlderCount(): GossipStats =
|
||||
copy(olderCount = olderCount + 1, receivedGossipCount = receivedGossipCount + 1)
|
||||
|
||||
def incrementOlderCount(): ClusterStats =
|
||||
copy(olderCount = olderCount + 1)
|
||||
|
||||
def :+(that: ClusterStats): ClusterStats = {
|
||||
ClusterStats(
|
||||
def :+(that: GossipStats): GossipStats = {
|
||||
GossipStats(
|
||||
this.receivedGossipCount + that.receivedGossipCount,
|
||||
this.mergeCount + that.mergeCount,
|
||||
this.sameCount + that.sameCount,
|
||||
|
|
@ -1096,8 +1105,8 @@ private[cluster] case class ClusterStats(
|
|||
this.olderCount + that.olderCount)
|
||||
}
|
||||
|
||||
def :-(that: ClusterStats): ClusterStats = {
|
||||
ClusterStats(
|
||||
def :-(that: GossipStats): GossipStats = {
|
||||
GossipStats(
|
||||
this.receivedGossipCount - that.receivedGossipCount,
|
||||
this.mergeCount - that.mergeCount,
|
||||
this.sameCount - that.sameCount,
|
||||
|
|
@ -1106,3 +1115,12 @@ private[cluster] case class ClusterStats(
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
private[cluster] case class VectorClockStats(
|
||||
versionSize: Int = 0,
|
||||
seenLatest: Int = 0)
|
||||
|
||||
|
|
|
|||
|
|
@ -169,7 +169,9 @@ object ClusterEvent {
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[cluster] case class CurrentInternalStats(stats: ClusterStats) extends ClusterDomainEvent
|
||||
private[cluster] case class CurrentInternalStats(
|
||||
gossipStats: GossipStats,
|
||||
vclockStats: VectorClockStats) extends ClusterDomainEvent
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
|
|||
* Current internal cluster stats, updated periodically via event bus.
|
||||
*/
|
||||
@volatile
|
||||
private var _latestStats = ClusterStats()
|
||||
private var _latestStats = CurrentInternalStats(GossipStats(), VectorClockStats())
|
||||
|
||||
/**
|
||||
* Current cluster metrics, updated periodically via event bus.
|
||||
|
|
@ -63,7 +63,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
|
|||
case RoleLeaderChanged(role, leader) ⇒
|
||||
state = state.copy(roleLeaderMap = state.roleLeaderMap + (role -> leader))
|
||||
case s: CurrentClusterState ⇒ state = s
|
||||
case CurrentInternalStats(stats) ⇒ _latestStats = stats
|
||||
case stats: CurrentInternalStats ⇒ _latestStats = stats
|
||||
case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes
|
||||
}
|
||||
}
|
||||
|
|
@ -146,7 +146,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[cluster] def latestStats: ClusterStats = _latestStats
|
||||
private[cluster] def latestStats: CurrentInternalStats = _latestStats
|
||||
|
||||
/**
|
||||
* Unsubscribe to cluster events.
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import akka.actor.SupervisorStrategy._
|
|||
import akka.actor.Terminated
|
||||
import akka.cluster.ClusterEvent.ClusterMetricsChanged
|
||||
import akka.cluster.ClusterEvent.CurrentClusterState
|
||||
import akka.cluster.ClusterEvent.CurrentInternalStats
|
||||
import akka.cluster.ClusterEvent.MemberEvent
|
||||
import akka.cluster.StandardMetrics.Cpu
|
||||
import akka.cluster.StandardMetrics.HeapMemory
|
||||
|
|
@ -120,7 +121,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
akka.actor.provider = akka.cluster.ClusterActorRefProvider
|
||||
akka.cluster {
|
||||
auto-down = on
|
||||
publish-stats-interval = 0 s # always, when it happens
|
||||
publish-stats-interval = 1s
|
||||
}
|
||||
akka.loggers = ["akka.testkit.TestEventListener"]
|
||||
akka.loglevel = INFO
|
||||
|
|
@ -228,9 +229,9 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
case class ClusterResult(
|
||||
address: Address,
|
||||
duration: Duration,
|
||||
clusterStats: ClusterStats)
|
||||
clusterStats: GossipStats)
|
||||
|
||||
case class AggregatedClusterResult(title: String, duration: Duration, clusterStats: ClusterStats)
|
||||
case class AggregatedClusterResult(title: String, duration: Duration, clusterStats: GossipStats)
|
||||
|
||||
/**
|
||||
* Central aggregator of cluster statistics and metrics.
|
||||
|
|
@ -251,7 +252,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
}
|
||||
var clusterStatsObservedByNode = {
|
||||
import akka.cluster.Member.addressOrdering
|
||||
immutable.SortedMap.empty[Address, ClusterStats]
|
||||
immutable.SortedMap.empty[Address, CurrentInternalStats]
|
||||
}
|
||||
|
||||
import context.dispatcher
|
||||
|
|
@ -276,7 +277,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
case r: ClusterResult ⇒
|
||||
results :+= r
|
||||
if (results.size == expectedResults) {
|
||||
val aggregated = AggregatedClusterResult(title, maxDuration, totalClusterStats)
|
||||
val aggregated = AggregatedClusterResult(title, maxDuration, totalGossipStats)
|
||||
if (infolog)
|
||||
log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}")
|
||||
reportTo foreach { _ ! aggregated }
|
||||
|
|
@ -288,7 +289,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
|
||||
def maxDuration = results.map(_.duration).max
|
||||
|
||||
def totalClusterStats = results.foldLeft(ClusterStats()) { _ :+ _.clusterStats }
|
||||
def totalGossipStats = results.foldLeft(GossipStats()) { _ :+ _.clusterStats }
|
||||
|
||||
def formatMetrics: String = {
|
||||
import akka.cluster.Member.addressOrdering
|
||||
|
|
@ -335,9 +336,16 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String =
|
||||
s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}"
|
||||
|
||||
def formatStats: String =
|
||||
(clusterStatsObservedByNode map { case (monitor, stats) ⇒ s"${monitor}\t${stats}" }).
|
||||
mkString("ClusterStats(gossip, merge, same, newer, older)\n", "\n", "")
|
||||
def formatStats: String = {
|
||||
def f(stats: CurrentInternalStats) = {
|
||||
import stats.gossipStats._
|
||||
import stats.vclockStats._
|
||||
s"ClusterStats($receivedGossipCount, $mergeCount, $sameCount, $newerCount, $olderCount, $versionSize, $seenLatest)"
|
||||
}
|
||||
(clusterStatsObservedByNode map { case (monitor, stats) ⇒ s"${monitor}\t${f(stats)}" }).
|
||||
mkString("ClusterStats(gossip, merge, same, newer, older, vclockSize, seenLatest)\n", "\n", "")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -357,7 +365,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
def formatHistory: String =
|
||||
(formatHistoryHeader +: (history map formatHistoryLine)).mkString("\n")
|
||||
|
||||
def formatHistoryHeader: String = "[Title]\t[Duration (ms)]\t[ClusterStats(gossip, merge, same, newer, older)]"
|
||||
def formatHistoryHeader: String = "[Title]\t[Duration (ms)]\t[GossipStats(gossip, merge, same, newer, older)]"
|
||||
|
||||
def formatHistoryLine(result: AggregatedClusterResult): String =
|
||||
s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats}"
|
||||
|
|
@ -431,20 +439,18 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
class StatsObserver extends Actor {
|
||||
val cluster = Cluster(context.system)
|
||||
var reportTo: Option[ActorRef] = None
|
||||
var startStats = cluster.readView.latestStats
|
||||
var startStats: Option[GossipStats] = None
|
||||
|
||||
import context.dispatcher
|
||||
val checkStatsTask = context.system.scheduler.schedule(
|
||||
1.second, 1.second, self, StatsTick)
|
||||
|
||||
override def postStop(): Unit = {
|
||||
checkStatsTask.cancel()
|
||||
super.postStop()
|
||||
}
|
||||
override def preStart(): Unit = cluster.subscribe(self, classOf[CurrentInternalStats])
|
||||
override def postStop(): Unit = cluster.unsubscribe(self)
|
||||
|
||||
def receive = {
|
||||
case StatsTick ⇒
|
||||
val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats :- startStats)
|
||||
case CurrentInternalStats(gossipStats, vclockStats) ⇒
|
||||
val diff = startStats match {
|
||||
case None ⇒ { startStats = Some(gossipStats); gossipStats }
|
||||
case Some(start) ⇒ gossipStats :- start
|
||||
}
|
||||
val res = StatsResult(cluster.selfAddress, CurrentInternalStats(diff, vclockStats))
|
||||
reportTo foreach { _ ! res }
|
||||
case ReportTo(ref) ⇒
|
||||
reportTo foreach context.unwatch
|
||||
|
|
@ -456,7 +462,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
case _ ⇒
|
||||
}
|
||||
case Reset ⇒
|
||||
startStats = cluster.readView.latestStats
|
||||
startStats = None
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -650,8 +656,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
def compare(that: PhiValue) = addressOrdering.compare(this.address, that.address)
|
||||
}
|
||||
case class ReportTo(ref: Option[ActorRef])
|
||||
case object StatsTick
|
||||
case class StatsResult(from: Address, stats: ClusterStats)
|
||||
case class StatsResult(from: Address, stats: CurrentInternalStats)
|
||||
|
||||
type JobId = Int
|
||||
trait Job { def id: JobId }
|
||||
|
|
@ -751,6 +756,8 @@ abstract class StressSpec
|
|||
|
||||
val seedNodes = roles.take(numberOfSeedNodes)
|
||||
|
||||
def latestGossipStats = cluster.readView.latestStats.gossipStats
|
||||
|
||||
override def cluster: Cluster = {
|
||||
createWorker
|
||||
super.cluster
|
||||
|
|
@ -928,12 +935,12 @@ abstract class StressSpec
|
|||
|
||||
def reportResult[T](thunk: ⇒ T): T = {
|
||||
val startTime = System.nanoTime
|
||||
val startStats = clusterView.latestStats
|
||||
val startStats = clusterView.latestStats.gossipStats
|
||||
|
||||
val returnValue = thunk
|
||||
|
||||
clusterResultAggregator foreach {
|
||||
_ ! ClusterResult(cluster.selfAddress, (System.nanoTime - startTime).nanos, cluster.readView.latestStats :- startStats)
|
||||
_ ! ClusterResult(cluster.selfAddress, (System.nanoTime - startTime).nanos, latestGossipStats :- startStats)
|
||||
}
|
||||
|
||||
returnValue
|
||||
|
|
|
|||
|
|
@ -21,7 +21,10 @@ object TransitionMultiJvmSpec extends MultiNodeConfig {
|
|||
val third = role("third")
|
||||
|
||||
commonConfig(debugConfig(on = false).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks")).
|
||||
withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks
|
||||
akka.cluster.publish-stats-interval = 0 s # always, when it happens
|
||||
""")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
||||
}
|
||||
|
||||
|
|
@ -82,10 +85,10 @@ abstract class TransitionSpec
|
|||
def gossipTo(toRole: RoleName): Unit = {
|
||||
gossipBarrierCounter += 1
|
||||
runOn(toRole) {
|
||||
val oldCount = clusterView.latestStats.receivedGossipCount
|
||||
val oldCount = clusterView.latestStats.gossipStats.receivedGossipCount
|
||||
enterBarrier("before-gossip-" + gossipBarrierCounter)
|
||||
awaitCond {
|
||||
clusterView.latestStats.receivedGossipCount != oldCount // received gossip
|
||||
clusterView.latestStats.gossipStats.receivedGossipCount != oldCount // received gossip
|
||||
}
|
||||
// gossip chat will synchronize the views
|
||||
awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty)
|
||||
|
|
|
|||
|
|
@ -29,7 +29,6 @@ object UnreachableNodeJoinsAgainMultiNodeConfig extends MultiNodeConfig {
|
|||
commonConfig(ConfigFactory.parseString(
|
||||
"""
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
akka.cluster.publish-stats-interval = 0s
|
||||
""").withFallback(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
|
||||
testTransport(on = true)
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ class ClusterConfigSpec extends AkkaSpec {
|
|||
HeartbeatRequestTimeToLive must be(1 minute)
|
||||
LeaderActionsInterval must be(1 second)
|
||||
UnreachableNodesReaperInterval must be(1 second)
|
||||
PublishStatsInterval must be(10 second)
|
||||
PublishStatsInterval must be(Duration.Undefined)
|
||||
AutoDown must be(false)
|
||||
MinNrOfMembers must be(1)
|
||||
MinNrOfMembersOfRole must be === Map.empty
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue