Cluster metrics internal API and cluster-wide transport of metrics data.

* Create Cluster Metrics API
* Create transport of relevant metrics data
Does not include load-balancing routers.
This commit is contained in:
Helena Edelson 2012-09-24 13:07:11 -06:00
parent 9a20baa831
commit dbce1c8b85
16 changed files with 1295 additions and 7 deletions

View file

@ -100,6 +100,23 @@ akka {
max-sample-size = 1000
}
# Uses JMX and Hyperic SIGAR, if SIGAR is on the classpath.
metrics {
# Enable or disable metrics collector for load-balancing nodes.
enabled = on
# How often metrics is sampled on a node.
metrics-interval = 3s
# How often a node publishes metrics information.
gossip-interval = 3s
# How quickly the exponential weighting of past data is decayed compared to new data.
# If set to 0 data streaming over time will be turned off.
# Set higher to increase the bias toward newer values
rate-of-decay = 10
}
# If the tick-duration of the default scheduler is longer than the tick-duration
# configured here a dedicated scheduler will be used for periodic tasks of the cluster,
# otherwise the default scheduler is used.

View file

@ -7,7 +7,7 @@ import scala.collection.immutable.SortedSet
import scala.concurrent.util.{ Deadline, Duration }
import scala.concurrent.util.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, PoisonPill, Scheduler }
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, Scheduler }
import akka.actor.Status.Failure
import akka.event.EventStream
import akka.pattern.ask
@ -94,6 +94,8 @@ private[cluster] object InternalClusterAction {
case object ReapUnreachableTick extends Tick
case object MetricsTick extends Tick
case object LeaderActionsTick extends Tick
case object PublishStatsTick extends Tick
@ -152,6 +154,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
withDispatcher(context.props.dispatcher), name = "core")
val heartbeat = context.actorOf(Props[ClusterHeartbeatDaemon].
withDispatcher(context.props.dispatcher), name = "heartbeat")
if (settings.MetricsEnabled) context.actorOf(Props[ClusterMetricsCollector].
withDispatcher(context.props.dispatcher), name = "metrics")
def receive = {
case InternalClusterAction.GetClusterCoreRef sender ! core
@ -214,8 +218,8 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging {
self ! LeaderActionsTick
}
// start periodic publish of current state
private val publishStateTask: Option[Cancellable] =
// start periodic publish of current stats
private val publishStatsTask: Option[Cancellable] =
if (PublishStatsInterval == Duration.Zero) None
else Some(FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval).asInstanceOf[FiniteDuration], PublishStatsInterval) {
self ! PublishStatsTick
@ -230,7 +234,7 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging {
heartbeatTask.cancel()
failureDetectorReaperTask.cancel()
leaderActionsTask.cancel()
publishStateTask foreach { _.cancel() }
publishStatsTask foreach { _.cancel() }
}
def uninitialized: Actor.Receive = {
@ -875,7 +879,7 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: IndexedSeq[Address])
case JoinSeedNode
// send InitJoin to all seed nodes (except myself)
seedNodes.collect {
case a if a != selfAddress context.system.actorFor(context.parent.path.toStringWithAddress(a))
case a if a != selfAddress context.actorFor(context.parent.path.toStringWithAddress(a))
} foreach { _ ! InitJoin }
case InitJoinAck(address)
// first InitJoinAck reply
@ -904,7 +908,7 @@ private[cluster] final class ClusterCoreSender extends Actor with ActorLogging {
* Looks up and returns the remote cluster command connection for the specific address.
*/
private def clusterCoreConnectionFor(address: Address): ActorRef =
context.system.actorFor(RootActorPath(address) / "system" / "cluster" / "core")
context.actorFor(RootActorPath(address) / "system" / "cluster" / "core")
def receive = {
case SendClusterMessage(to, msg)

View file

@ -88,6 +88,11 @@ object ClusterEvent {
if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member)
}
/**
* Current snapshot of cluster member metrics. Published to subscribers.
*/
case class ClusterMetricsChanged(nodes: Set[NodeMetrics]) extends ClusterDomainEvent
/**
* Cluster convergence state changed.
*/

View file

@ -62,7 +62,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
* Looks up and returns the remote cluster heartbeat connection for the specific address.
*/
def clusterHeartbeatConnectionFor(address: Address): ActorRef =
context.system.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat")
context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat")
val digester = MessageDigest.getInstance("MD5")

View file

@ -0,0 +1,581 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.language.postfixOps
import scala.concurrent.util.duration._
import scala.concurrent.util.FiniteDuration
import scala.collection.immutable.{ SortedSet, Map }
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.{ Try, Success, Failure }
import scala.math.ScalaNumber
import scala.runtime.{ RichLong, RichDouble, RichInt }
import akka.actor._
import akka.event.LoggingAdapter
import akka.cluster.MemberStatus.Up
import java.lang.management.{ OperatingSystemMXBean, MemoryMXBean, ManagementFactory }
import java.lang.reflect.Method
import java.lang.System.{ currentTimeMillis newTimestamp }
/**
* This strategy is primarily for load-balancing of nodes. It controls metrics sampling
* at a regular frequency, prepares highly variable data for further analysis by other entities,
* and publishes the latest cluster metrics data around the node ring to assist in determining
* the need to redirect traffic to the least-loaded nodes.
*
* Metrics sampling is delegated to the [[akka.cluster.MetricsCollector]].
*
* Calculation of statistical data for each monitored process is delegated to the
* [[akka.cluster.DataStream]] for exponential smoothing, with additional decay factor.
*
* INTERNAL API.
*
* @author Helena Edelson
*/
private[cluster] class ClusterMetricsCollector extends Actor with ActorLogging {
import InternalClusterAction._
import ClusterEvent._
import Member.addressOrdering
import context.dispatcher
val cluster = Cluster(context.system)
import cluster.{ selfAddress, scheduler, settings }
import settings._
/**
* The node ring gossipped that contains only members that are Up.
*/
var nodes: SortedSet[Address] = SortedSet.empty
/**
* The latest metric values with their statistical data.
*/
var latestGossip: MetricsGossip = MetricsGossip(MetricsRateOfDecay)
/**
* The metrics collector that samples data on the node.
*/
val collector: MetricsCollector = MetricsCollector(selfAddress, log, context.system.asInstanceOf[ExtendedActorSystem].dynamicAccess)
/**
* Start periodic gossip to random nodes in cluster
*/
val gossipTask = FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(MetricsGossipInterval).asInstanceOf[FiniteDuration], MetricsGossipInterval) {
self ! GossipTick
}
/**
* Start periodic metrics collection
*/
val metricsTask = FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(MetricsInterval).asInstanceOf[FiniteDuration], MetricsInterval) {
self ! MetricsTick
}
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
log.info("Metrics collection has started successfully on node [{}]", selfAddress)
}
def receive = {
case GossipTick gossip()
case MetricsTick collect()
case state: CurrentClusterState receiveState(state)
case MemberUp(m) receiveMember(m)
case e: MemberEvent removeMember(e)
case msg: MetricsGossipEnvelope receiveGossip(msg)
}
override def postStop: Unit = {
cluster unsubscribe self
gossipTask.cancel()
metricsTask.cancel()
collector.close()
}
/**
* Adds a member to the node ring.
*/
def receiveMember(member: Member): Unit = nodes += member.address
/**
* Removes a member from the member node ring.
*/
def removeMember(event: MemberEvent): Unit = {
nodes -= event.member.address
latestGossip = latestGossip remove event.member.address
}
/**
* Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus.Up]].
*/
def receiveState(state: CurrentClusterState): Unit = nodes = state.members collect { case m if m.status == Up m.address }
/**
* Samples the latest metrics for the node, updates metrics statistics in
* [[akka.cluster.MetricsGossip]], and publishes the change to the event bus.
*
* INTERNAL API
*
* @see [[akka.cluster.ClusterMetricsCollector.collect( )]]
*/
def collect(): Unit = {
latestGossip :+= collector.sample
publish()
}
/**
* Receives changes from peer nodes, merges remote with local gossip nodes, then publishes
* changes to the event stream for load balancing router consumption, and gossips to peers.
*/
def receiveGossip(envelope: MetricsGossipEnvelope): Unit = {
val remoteGossip = envelope.gossip
if (remoteGossip != latestGossip) {
latestGossip = latestGossip merge remoteGossip
publish()
gossipTo(envelope.from)
}
}
/**
* Gossip to peer nodes.
*/
def gossip(): Unit = selectRandomNode((nodes - selfAddress).toIndexedSeq) foreach gossipTo
def gossipTo(address: Address): Unit =
context.actorFor(self.path.toStringWithAddress(address)) ! MetricsGossipEnvelope(selfAddress, latestGossip)
def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] =
if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
/**
* Publishes to the event stream.
*/
def publish(): Unit = context.system.eventStream publish ClusterMetricsChanged(latestGossip.nodes)
}
/**
* INTERNAL API
*
* @param nodes metrics per node
* @author Helena Edelson
*/
private[cluster] case class MetricsGossip(rateOfDecay: Int, nodes: Set[NodeMetrics] = Set.empty) {
/**
* Removes nodes if their correlating node ring members are not [[akka.cluster.MemberStatus.Up]]
*/
def remove(node: Address): MetricsGossip = copy(nodes = nodes filterNot (_.address == node))
/**
* Adds new remote [[akka.cluster.NodeMetrics]] and merges existing from a remote gossip.
*/
def merge(remoteGossip: MetricsGossip): MetricsGossip = {
val remoteNodes = remoteGossip.nodes.map(n n.address -> n).toMap
val toMerge = nodeKeys intersect remoteNodes.keySet
val onlyInRemote = remoteNodes.keySet -- nodeKeys
val onlyInLocal = nodeKeys -- remoteNodes.keySet
val seen = nodes.collect {
case n if toMerge contains n.address n merge remoteNodes(n.address)
case n if onlyInLocal contains n.address n
}
val unseen = remoteGossip.nodes.collect { case n if onlyInRemote contains n.address n }
copy(nodes = seen ++ unseen)
}
/**
* Adds new local [[akka.cluster.NodeMetrics]] and initializes the data, or merges an existing.
*/
def :+(data: NodeMetrics): MetricsGossip = {
val previous = metricsFor(data)
val names = previous map (_.name)
val (toMerge: Set[Metric], unseen: Set[Metric]) = data.metrics partition (a names contains a.name)
val initialized = unseen.map(_.initialize(rateOfDecay))
val merged = toMerge flatMap (latest previous.collect { case peer if latest same peer peer :+ latest })
val refreshed = nodes filterNot (_.address == data.address)
copy(nodes = refreshed + data.copy(metrics = initialized ++ merged))
}
/**
* Returns a set of [[akka.actor.Address]] for a given node set.
*/
def nodeKeys: Set[Address] = nodes map (_.address)
/**
* Returns metrics for a node if exists.
*/
def metricsFor(node: NodeMetrics): Set[Metric] = nodes flatMap (n if (n same node) n.metrics else Set.empty[Metric])
}
/**
* Envelope adding a sender address to the gossip.
* INTERNAL API
*/
private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip) extends ClusterMessage
/**
* The exponentially weighted moving average (EWMA) approach captures short-term
* movements in volatility for a conditional volatility forecasting model. By virtue
* of its alpha, or decay factor, this provides a statistical streaming data model
* that is exponentially biased towards newer entries.
*
* An EWMA only needs the most recent forecast value to be kept, as opposed to a standard
* moving average model.
*
* INTERNAL API
*
* @param decay sets how quickly the exponential weighting decays for past data compared to new data
*
* @param ewma the current exponentially weighted moving average, e.g. Y(n - 1), or,
* the sampled value resulting from the previous smoothing iteration.
* This value is always used as the previous EWMA to calculate the new EWMA.
*
* @param timestamp the most recent time of sampling
*
* @param startTime the time of initial sampling for this data stream
*
* @author Helena Edelson
*/
private[cluster] case class DataStream(decay: Int, ewma: ScalaNumber, startTime: Long, timestamp: Long)
extends ClusterMessage with MetricNumericConverter {
/**
* The rate at which the weights of past observations
* decay as they become more distant.
*/
private val α = 2 / decay + 1
/**
* Calculates the exponentially weighted moving average for a given monitored data set.
* The datam can be too large to fit into an int or long, thus we use ScalaNumber,
* and defer to BigInt or BigDecimal.
*
* @param xn the new data point
* @return an new [[akka.cluster.DataStream]] with the updated yn and timestamp
*/
def :+(xn: ScalaNumber): DataStream = convert(xn) fold (
nl copy(ewma = BigInt(α * nl + 1 - α * ewma.longValue()), timestamp = newTimestamp),
nd copy(ewma = BigDecimal(α * nd + 1 - α * ewma.doubleValue()), timestamp = newTimestamp))
/**
* The duration of observation for this data stream
*/
def duration: FiniteDuration = (timestamp - startTime) millis
}
/**
* Companion object of DataStream class.
*
* @author Helena Edelson
*/
private[cluster] object DataStream {
def apply(decay: Int, data: ScalaNumber): Option[DataStream] = if (decay > 0)
Some(DataStream(decay, data, newTimestamp, newTimestamp)) else None
}
/**
* INTERNAL API
*
* @param name the metric name
*
* @param value the metric value, which may or may not be defined
*
* @param average the data stream of the metric value, for trending over time. Metrics that are already
* averages (e.g. system load average) or finite (e.g. as total cores), are not trended.
*
* @author Helena Edelson
*/
private[cluster] case class Metric(name: String, value: Option[ScalaNumber], average: Option[DataStream])
extends ClusterMessage with MetricNumericConverter {
/**
* Returns the metric with a new data stream for data trending if eligible,
* otherwise returns the unchanged metric.
*/
def initialize(decay: Int): Metric = if (initializable) copy(average = DataStream(decay, value.get)) else this
/**
* If defined ( [[akka.cluster.MetricNumericConverter.defined()]] ), updates the new
* data point, and if defined, updates the data stream. Returns the updated metric.
*/
def :+(latest: Metric): Metric = latest.value match {
case Some(v) if this same latest average match {
case Some(previous) copy(value = Some(v), average = Some(previous :+ v))
case None if latest.average.isDefined copy(value = Some(v), average = latest.average)
case None if !latest.average.isDefined copy(value = Some(v))
}
case None this
}
/**
* @see [[akka.cluster.MetricNumericConverter.defined()]]
*/
def isDefined: Boolean = value match {
case Some(a) defined(a)
case None false
}
/**
* Returns true if <code>that</code> is tracking the same metric as this.
*/
def same(that: Metric): Boolean = name == that.name
/**
* Returns true if the metric requires initialization.
*/
def initializable: Boolean = trendable && isDefined && average.isEmpty
/**
* Returns true if the metric is a value applicable for trending.
*/
def trendable: Boolean = !(Metric.noStream contains name)
}
/**
* Companion object of Metric class.
*
* @author Helena Edelson
*/
private[cluster] object Metric extends MetricNumericConverter {
/**
* The metrics that are already averages or finite are not trended over time.
*/
private val noStream = Set("system-load-average", "total-cores", "processors")
/**
* Evaluates validity of <code>value</code> based on whether it is available (SIGAR on classpath)
* or defined for the OS (JMX). If undefined we set the value option to None and do not modify
* the latest sampled metric to avoid skewing the statistical trend.
*/
def apply(name: String, value: Option[ScalaNumber]): Metric = value match {
case Some(v) if defined(v) Metric(name, value, None)
case _ Metric(name, None, None)
}
}
/**
* The snapshot of current sampled health metrics for any monitored process.
* Collected and gossipped at regular intervals for dynamic cluster management strategies.
*
* For the JVM memory. The amount of used and committed memory will always be <= max if max is defined.
* A memory allocation may fail if it attempts to increase the used memory such that used > committed
* even if used <= max is true (e.g. when the system virtual memory is low).
*
* The system is possibly nearing a bottleneck if the system load average is nearing in cpus/cores.
*
* @param address [[akka.actor.Address]] of the node the metrics are gathered at
*
* @param timestamp the time of sampling
*
* @param metrics the array of sampled [[akka.actor.Metric]]
*
* @author Helena Edelson
*/
private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) extends ClusterMessage {
/**
* Returns the most recent data.
*/
def merge(that: NodeMetrics): NodeMetrics = if (this updatable that) copy(metrics = that.metrics, timestamp = that.timestamp) else this
/**
* Returns true if <code>that</code> address is the same as this and its metric set is more recent.
*/
def updatable(that: NodeMetrics): Boolean = (this same that) && (that.timestamp > timestamp)
/**
* Returns true if <code>that</code> address is the same as this
*/
def same(that: NodeMetrics): Boolean = address == that.address
}
/**
* Encapsulates evaluation of validity of metric values, conversion of an actual metric value to
* a [[akka.cluster.Metric]] for consumption by subscribed cluster entities.
*
* INTERNAL API
*
* @author Helena Edelson
*/
private[cluster] trait MetricNumericConverter {
/**
* A defined value is neither a -1 or NaN/Infinite:
* <ul><li>JMX system load average and max heap can be 'undefined' for certain OS, in which case a -1 is returned</li>
* <li>SIGAR combined CPU can occasionally return a NaN or Infinite (known bug)</li></ul>
*/
def defined(value: ScalaNumber): Boolean = convert(value) fold (a value != -1, b !(b.isNaN || b.isInfinite))
/**
* May involve rounding or truncation.
*/
def convert(from: ScalaNumber): Either[Long, Double] = from match {
case n: BigInt Left(n.longValue())
case n: BigDecimal Right(n.doubleValue())
case n: RichInt Left(n.abs)
case n: RichLong Left(n.self)
case n: RichDouble Right(n.self)
}
}
/**
* Loads JVM metrics through JMX monitoring beans. If Hyperic SIGAR is on the classpath, this
* loads wider and more accurate range of metrics in combination with SIGAR's native OS library.
*
* FIXME switch to Scala reflection
*
* INTERNAL API
*
* @param sigar the optional org.hyperic.Sigar instance
*
* @param address The [[akka.actor.Address]] of the node being sampled
*
* @author Helena Edelson
*/
private[cluster] class MetricsCollector private (private val sigar: Option[AnyRef], address: Address) extends MetricNumericConverter {
private val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean
private val osMBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean
private val LoadAverage: Option[Method] = createMethodFrom(sigar, "getLoadAverage")
private val CpuList: Option[Method] = createMethodFrom(sigar, "getCpuInfoList").map(m m)
private val NetInterfaces: Option[Method] = createMethodFrom(sigar, "getNetInterfaceList")
private val Cpu: Option[Method] = createMethodFrom(sigar, "getCpuPerc")
private val CombinedCpu: Option[Method] = Try(Cpu.get.getReturnType.getMethod("getCombined")).toOption
/**
* Samples and collects new data points.
*
* @return [[akka.cluster.NodeMetrics]]
*/
def sample: NodeMetrics = NodeMetrics(address, newTimestamp, Set(cpuCombined, totalCores,
systemLoadAverage, used, committed, max, processors, networkMaxRx, networkMaxTx))
/**
* (SIGAR / JMX) Returns the OS-specific average system load on the CPUs in the system, for the past 1 minute.
* On some systems the JMX OS system load average may not be available, in which case a -1 is returned.
* Hyperic SIGAR provides more precise values, thus, if the library is on the classpath, it is the default.
*/
def systemLoadAverage: Metric = Metric("system-load-average", Some(BigDecimal(Try(
LoadAverage.get.invoke(sigar.get).asInstanceOf[Array[Double]].toSeq.head) getOrElse osMBean.getSystemLoadAverage)))
/**
* (JMX) Returns the number of available processors
*/
def processors: Metric = Metric("processors", Some(BigInt(osMBean.getAvailableProcessors)))
/**
* (JMX) Returns the current sum of heap memory used from all heap memory pools (in bytes).
*/
def used: Metric = Metric("heap-memory-used", Some(BigInt(memoryMBean.getHeapMemoryUsage.getUsed)))
/**
* (JMX) Returns the current sum of heap memory guaranteed to be available to the JVM
* from all heap memory pools (in bytes). Committed will always be greater
* than or equal to used.
*/
def committed: Metric = Metric("heap-memory-committed", Some(BigInt(memoryMBean.getHeapMemoryUsage.getCommitted)))
/**
* (JMX) Returns the maximum amount of memory (in bytes) that can be used
* for JVM memory management. If undefined, returns -1.
*/
def max: Metric = Metric("heap-memory-max", Some(BigInt(memoryMBean.getHeapMemoryUsage.getMax)))
/**
* (SIGAR) Returns the combined CPU sum of User + Sys + Nice + Wait, in percentage. This metric can describe
* the amount of time the CPU spent executing code during n-interval and how much more it could
* theoretically. Note that 99% CPU utilization can be optimal or indicative of failure.
*
* In the data stream, this will sometimes return with a valid metric value, and sometimes as a NaN or Infinite.
* Documented bug https://bugzilla.redhat.com/show_bug.cgi?id=749121 and several others.
*/
def cpuCombined: Metric = Metric("cpu-combined", Try(BigDecimal(CombinedCpu.get.invoke(Cpu.get.invoke(sigar.get)).asInstanceOf[Double])).toOption)
/**
* (SIGAR) Returns the total number of cores.
*/
def totalCores: Metric = Metric("total-cores", Try(BigInt(CpuList.get.invoke(sigar.get).asInstanceOf[Array[AnyRef]].map(cpu
createMethodFrom(Some(cpu), "getTotalCores").get.invoke(cpu).asInstanceOf[Int]).head)).toOption)
//Array[Int].head - if this would differ on some servers, expose all. In testing each int was always equal.
/**
* (SIGAR) Returns the max network IO read/write value, in bytes, for network latency evaluation.
*/
def networkMaxRx: Metric = networkMaxFor("getRxBytes", "network-max-rx")
/**
* (SIGAR) Returns the max network IO tx value, in bytes.
*/
def networkMaxTx: Metric = networkMaxFor("getTxBytes", "network-max-tx")
/**
* Returns the network stats per interface.
*/
def networkStats: Map[String, AnyRef] = Try(NetInterfaces.get.invoke(sigar.get).asInstanceOf[Array[String]].map(arg
arg -> (createMethodFrom(sigar, "getNetInterfaceStat", Array(classOf[String])).get.invoke(sigar.get, arg))).toMap) getOrElse Map.empty[String, AnyRef]
/**
* Returns true if SIGAR is successfully installed on the classpath, otherwise false.
*/
def isSigar: Boolean = sigar.isDefined
/**
* Releases any native resources associated with this instance.
*/
def close(): Unit = if (isSigar) Try(createMethodFrom(sigar, "close").get.invoke(sigar.get)) getOrElse Unit
/**
* Returns the max bytes for the given <code>method</code> in metric for <code>metric</code> from the network interface stats.
*/
private def networkMaxFor(method: String, metric: String): Metric = Metric(metric, Try(Some(BigInt(
networkStats.collect { case (_, a) createMethodFrom(Some(a), method).get.invoke(a).asInstanceOf[Long] }.max))) getOrElse None)
private def createMethodFrom(ref: Option[AnyRef], method: String, types: Array[(Class[_])] = Array.empty[(Class[_])]): Option[Method] =
Try(ref.get.getClass.getMethod(method, types: _*)).toOption
}
/**
* Companion object of MetricsCollector class.
*
* @author Helena Edelson
*/
private[cluster] object MetricsCollector {
def apply(address: Address, log: LoggingAdapter, dynamicAccess: DynamicAccess): MetricsCollector =
dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Seq.empty) match {
case Success(identity) new MetricsCollector(Some(identity), address)
case Failure(e)
log.debug(e.toString)
log.info("Hyperic SIGAR was not found on the classpath or not installed properly. " +
"Metrics will be retreived from MBeans, and may be incorrect on some platforms. " +
"To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate" +
"platform-specific native libary to 'java.library.path'.")
new MetricsCollector(None, address)
}
}

View file

@ -30,6 +30,12 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
@volatile
private var _latestStats = ClusterStats()
/**
* Current cluster metrics, updated periodically via event bus.
*/
@volatile
private var _clusterMetrics: Set[NodeMetrics] = Set.empty
val selfAddress = cluster.selfAddress
// create actor that subscribes to the cluster eventBus to update current read view state
@ -56,6 +62,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
case ConvergenceChanged(convergence) state = state.copy(convergence = convergence)
case s: CurrentClusterState state = s
case CurrentInternalStats(stats) _latestStats = stats
case ClusterMetricsChanged(nodes) _clusterMetrics = nodes
case _ // ignore, not interesting
}
}).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener")
@ -118,6 +125,11 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
!unreachableMembers.contains(myself) && !myself.status.isUnavailable
}
/**
* Current cluster metrics.
*/
def clusterMetrics: Set[NodeMetrics] = _clusterMetrics
/**
* INTERNAL API
* The nodes that has seen current version of the Gossip.

View file

@ -50,6 +50,10 @@ class ClusterSettings(val config: Config, val systemName: String) {
maxFailures = getInt("akka.cluster.send-circuit-breaker.max-failures"),
callTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.call-timeout"), MILLISECONDS),
resetTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.reset-timeout"), MILLISECONDS))
final val MetricsEnabled: Boolean = getBoolean("akka.cluster.metrics.enabled")
final val MetricsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.metrics-interval"), MILLISECONDS)
final val MetricsGossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.gossip-interval"), MILLISECONDS)
final val MetricsRateOfDecay: Int = getInt("akka.cluster.metrics.rate-of-decay")
}
case class CircuitBreakerSettings(maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)

View file

@ -0,0 +1,36 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.language.postfixOps
import scala.concurrent.util.duration._
import akka.remote.testkit.{MultiNodeSpec, MultiNodeConfig}
import com.typesafe.config.ConfigFactory
import akka.testkit.LongRunningTest
object ClusterMetricsDataStreamingOffMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(ConfigFactory.parseString("akka.cluster.metrics.rate-of-decay = 0")
.withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
}
class ClusterMetricsDataStreamingMultiJvmNode1 extends ClusterMetricsDataStreamingOffSpec
class ClusterMetricsDataStreamingMultiJvmNode2 extends ClusterMetricsDataStreamingOffSpec
abstract class ClusterMetricsDataStreamingOffSpec extends MultiNodeSpec(ClusterMetricsDataStreamingOffMultiJvmSpec) with MultiNodeClusterSpec with MetricSpec {
"Cluster metrics" must {
"not collect stream metric data" taggedAs LongRunningTest in within(30 seconds) {
awaitClusterUp(roles: _*)
enterBarrier("cluster-started")
runOn(roles: _*) {
awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size)
awaitCond(clusterView.clusterMetrics.size == roles.size)
awaitCond(clusterView.clusterMetrics.flatMap(_.metrics).filter(_.trendable).forall(_.average.isEmpty))
}
enterBarrier("after")
}
}
}

View file

@ -0,0 +1,34 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.remote.testkit.{MultiNodeSpec, MultiNodeConfig}
import com.typesafe.config.ConfigFactory
import akka.testkit.LongRunningTest
object ClusterMetricsDisabledMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(ConfigFactory.parseString("akka.cluster.metrics.enabled = off")
.withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
}
class ClusterMetricsDisabledMultiJvmNode1 extends ClusterMetricsDisabledSpec
class ClusterMetricsDisabledMultiJvmNode2 extends ClusterMetricsDisabledSpec
abstract class ClusterMetricsDisabledSpec extends MultiNodeSpec(ClusterMetricsDisabledMultiJvmSpec) with MultiNodeClusterSpec {
"Cluster metrics" must {
"not collect metrics, not publish ClusterMetricsChanged, and not gossip metrics" taggedAs LongRunningTest in {
awaitClusterUp(roles: _*)
enterBarrier("cluster-started")
runOn(roles: _*) {
awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size)
awaitCond(clusterView.clusterMetrics.isEmpty)
}
enterBarrier("after")
}
}
}

View file

@ -0,0 +1,74 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.language.postfixOps
import scala.concurrent.util.duration._
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.actor.ExtendedActorSystem
object ClusterMetricsMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
commonConfig(ConfigFactory.parseString("""
akka.cluster.auto-join = on
akka.cluster.metrics.enabled = on
akka.cluster.metrics.metrics-interval = 3 s
akka.cluster.metrics.gossip-interval = 3 s
akka.cluster.metrics.rate-of-decay = 10
akka.loglevel = INFO
akka.remote.log-sent-messages = off
akka.remote.log-received-messages = off
akka.actor.debug.receive = off
akka.actor.debug.unhandled = off
akka.actor.debug.lifecycle = off
akka.actor.debug.autoreceive = off
akka.actor.debug.fsm = off""")
.withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
}
class ClusterMetricsMultiJvmNode1 extends ClusterMetricsSpec
class ClusterMetricsMultiJvmNode2 extends ClusterMetricsSpec
class ClusterMetricsMultiJvmNode3 extends ClusterMetricsSpec
class ClusterMetricsMultiJvmNode4 extends ClusterMetricsSpec
class ClusterMetricsMultiJvmNode5 extends ClusterMetricsSpec
abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSpec) with MultiNodeClusterSpec with MetricSpec {
import ClusterMetricsMultiJvmSpec._
val collector = MetricsCollector(cluster.selfAddress, log, system.asInstanceOf[ExtendedActorSystem].dynamicAccess)
"Cluster metrics" must {
"periodically collect metrics on each node, publish ClusterMetricsChanged to the event stream, " +
"and gossip metrics around the node ring" taggedAs LongRunningTest in within(60 seconds) {
awaitClusterUp(roles: _*)
enterBarrier("cluster-started")
runOn(roles: _*) {
awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size)
awaitCond(clusterView.clusterMetrics.size == roles.size)
assertInitialized(cluster.settings.MetricsRateOfDecay, collectNodeMetrics(clusterView.clusterMetrics).toSet)
clusterView.clusterMetrics.foreach(n => assertExpectedSampleSize(collector.isSigar, cluster.settings.MetricsRateOfDecay, n))
}
enterBarrier("after")
}
"reflect the correct number of node metrics in cluster view" taggedAs LongRunningTest in within(30 seconds) {
runOn(second) {
cluster.leave(first)
}
enterBarrier("first-left")
runOn(second, third, fourth, fifth) {
awaitCond(clusterView.clusterMetrics.size == (roles.size - 1))
}
enterBarrier("finished")
}
}
}

View file

@ -45,6 +45,10 @@ class ClusterConfigSpec extends AkkaSpec {
maxFailures = 3,
callTimeout = 2 seconds,
resetTimeout = 30 seconds))
MetricsEnabled must be(true)
MetricsInterval must be(3 seconds)
MetricsGossipInterval must be(3 seconds)
MetricsRateOfDecay must be(10)
}
}
}

View file

@ -0,0 +1,62 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import language.postfixOps
import scala.concurrent.util.duration._
import akka.testkit.{ LongRunningTest, AkkaSpec }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DataStreamSpec extends AkkaSpec(MetricsEnabledSpec.config) with AbstractClusterMetricsSpec with MetricNumericConverter {
import system.dispatcher
val collector = createMetricsCollector
val DefaultRateOfDecay = 10
"DataStream" must {
"calculate the ewma for multiple, variable, data streams" taggedAs LongRunningTest in {
val firstDataSet = collector.sample.metrics.collect { case m if m.trendable && m.isDefined m.initialize(DefaultRateOfDecay) }
var streamingDataSet = firstDataSet
val cancellable = system.scheduler.schedule(0 seconds, 100 millis) {
streamingDataSet = collector.sample.metrics.flatMap(latest streamingDataSet.collect {
case streaming if (latest.trendable && latest.isDefined) && (latest same streaming)
&& (latest.value.get != streaming.value.get) {
val updatedDataStream = streaming.average.get :+ latest.value.get
updatedDataStream.timestamp must be > (streaming.average.get.timestamp)
updatedDataStream.duration.length must be > (streaming.average.get.duration.length)
updatedDataStream.ewma must not be (streaming.average.get.ewma)
updatedDataStream.ewma must not be (latest.value.get)
streaming.copy(value = latest.value, average = Some(updatedDataStream))
}
})
}
awaitCond(firstDataSet.size == streamingDataSet.size, longDuration)
cancellable.cancel()
val finalDataSet = streamingDataSet.map(m m.name -> m).toMap
firstDataSet map {
first
val newMetric = finalDataSet(first.name)
val e1 = first.average.get
val e2 = newMetric.average.get
if (first.value.get != newMetric.value.get) {
e2.ewma must not be (first.value.get)
e2.ewma must not be (newMetric.value.get)
}
if (first.value.get.longValue > newMetric.value.get.longValue) e1.ewma.longValue must be > e2.ewma.longValue
else if (first.value.get.longValue < newMetric.value.get.longValue) e1.ewma.longValue must be < e2.ewma.longValue
}
}
"data streaming is disabled if the decay is set to 0" in {
val data = collector.sample.metrics map (_.initialize(0))
data foreach (_.average.isEmpty must be(true))
}
}
}

View file

@ -0,0 +1,53 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.testkit.{ ImplicitSender, AkkaSpec }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricNumericConverter with ImplicitSender with AbstractClusterMetricsSpec {
"MetricNumericConverter" must {
val collector = createMetricsCollector
"convert " in {
convert(0).isLeft must be(true)
convert(1).left.get must be(1)
convert(1L).isLeft must be(true)
convert(0.0).isRight must be(true)
}
"define a new metric" in {
val metric = Metric("heap-memory-used", Some(0L))
metric.initializable must be(true)
metric.name must not be (null)
metric.average.isEmpty must be(true)
metric.trendable must be(true)
if (collector.isSigar) {
val cores = collector.totalCores
cores.isDefined must be(true)
cores.value.get.intValue must be > (0)
cores.initializable must be(false)
}
}
"define an undefined value with a None " in {
Metric("x", Some(-1)).value.isDefined must be(false)
Metric("x", Some(java.lang.Double.NaN)).value.isDefined must be(false)
Metric("x", None).isDefined must be(false)
}
"recognize whether a metric value is defined" in {
defined(0) must be(true)
defined(0.0) must be(true)
}
"recognize whether a metric value is not defined" in {
defined(-1) must be(false)
defined(Double.NaN) must be(false)
}
}
}

View file

@ -0,0 +1,240 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.language.postfixOps
import scala.concurrent.util.duration._
import scala.concurrent.util.FiniteDuration
import scala.concurrent.Await
import scala.util.{ Try, Failure }
import akka.actor._
import akka.testkit._
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
object MetricsEnabledSpec {
val config = """
akka.cluster.metrics.enabled = on
akka.cluster.metrics.metrics-interval = 1 s
akka.cluster.metrics.gossip-interval = 1 s
akka.cluster.metrics.rate-of-decay = 10
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.loglevel = INFO"""
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with AbstractClusterMetricsSpec with MetricSpec {
import system.dispatcher
val collector = createMetricsCollector
"Metric must" must {
"create and initialize a new metric or merge an existing one" in {
for (i 0 to samples) {
val metrics = collector.sample.metrics
assertCreatedUninitialized(metrics)
assertInitialized(window, metrics map (_.initialize(window)))
}
}
"merge 2 metrics that are tracking the same metric" in {
for (i 0 to samples) {
val sample1 = collector.sample.metrics
val sample2 = collector.sample.metrics
var merged = sample2 flatMap (latest sample1 collect {
case peer if latest same peer {
val m = peer :+ latest
assertMerged(latest, peer, m)
m
}
})
val sample3 = collector.sample.metrics map (_.initialize(window))
val sample4 = collector.sample.metrics map (_.initialize(window))
merged = sample4 flatMap (latest sample3 collect {
case peer if latest same peer {
val m = peer :+ latest
assertMerged(latest, peer, m)
m
}
})
merged.size must be(sample3.size)
merged.size must be(sample4.size)
}
}
}
"MetricsCollector" must {
"not raise errors when attempting reflective code in apply" in {
Try(createMetricsCollector must not be null) match {
case Failure(e) fail("No error should have been raised creating 'createMetricsCollector'.")
case _ //
}
}
"collect accurate metrics for a node" in {
val sample = collector.sample
assertExpectedSampleSize(collector.isSigar, window, sample)
val metrics = sample.metrics.collect { case m if m.isDefined (m.name, m.value.get) }
val used = metrics collectFirst { case (a, b) if a == "heap-memory-used" b }
val committed = metrics collectFirst { case (a, b) if a == "heap-memory-committed" b }
metrics collect {
case (a, b) if a == "cpu-combined"
b.doubleValue must be <= (1.0)
b.doubleValue must be >= (0.0)
b
case (a, b) if a == "total-cores" b.intValue must be > (0); b
case (a, b) if a == "network-max-rx" b.longValue must be > (0L); b
case (a, b) if a == "network-max-tx" b.longValue must be > (0L); b
case (a, b) if a == "system-load-average" b.doubleValue must be >= (0.0); b
case (a, b) if a == "processors" b.intValue must be >= (0); b
case (a, b) if a == "heap-memory-used" b.longValue must be >= (0L); b
case (a, b) if a == "heap-memory-committed" b.longValue must be > (0L); b
case (a, b) if a == "heap-memory-max"
used.get.longValue must be < (b.longValue)
committed.get.longValue must be < (b.longValue)
used.get.longValue + committed.get.longValue must be <= (b.longValue)
b
}
}
"collect SIGAR metrics if it is on the classpath" in {
if (collector.isSigar) {
// combined cpu may or may not be defined on a given sampling
// systemLoadAverage is SIGAR present
collector.systemLoadAverage.isDefined must be(true)
collector.networkStats.nonEmpty must be(true)
collector.networkMaxRx.isDefined must be(true)
collector.networkMaxTx.isDefined must be(true)
collector.totalCores.isDefined must be(true)
}
}
"collect JMX metrics" in {
// heap max may be undefined depending on the OS
// systemLoadAverage is JMX is SIGAR not present
collector.systemLoadAverage.isDefined must be(true)
collector.used.isDefined must be(true)
collector.committed.isDefined must be(true)
collector.processors.isDefined must be(true)
}
"collect [" + samples + "] node metrics samples in an acceptable duration" taggedAs LongRunningTest in {
val latch = TestLatch(samples)
val task = FixedRateTask(system.scheduler, 0 seconds, interval) {
val sample = collector.sample
assertCreatedUninitialized(sample.metrics)
assertExpectedSampleSize(collector.isSigar, window, sample)
latch.countDown()
}
Await.ready(latch, longDuration)
task.cancel()
}
}
}
trait MetricSpec extends WordSpec with MustMatchers {
def assertMasterMetricsAgainstGossipMetrics(master: Set[NodeMetrics], gossip: MetricsGossip): Unit = {
val masterMetrics = collectNodeMetrics(master)
val gossipMetrics = collectNodeMetrics(gossip.nodes)
gossipMetrics.size must be(masterMetrics.size plusOrMinus 1) // combined cpu
}
def assertExpectedNodeAddresses(gossip: MetricsGossip, nodes: Set[NodeMetrics]): Unit =
gossip.nodes.map(_.address) must be(nodes.map(_.address))
def assertExpectedSampleSize(isSigar: Boolean, gossip: MetricsGossip): Unit =
gossip.nodes.foreach(n assertExpectedSampleSize(isSigar, gossip.rateOfDecay, n))
def assertCreatedUninitialized(gossip: MetricsGossip): Unit =
gossip.nodes.foreach(n assertCreatedUninitialized(n.metrics.filterNot(_.trendable)))
def assertInitialized(gossip: MetricsGossip): Unit =
gossip.nodes.foreach(n assertInitialized(gossip.rateOfDecay, n.metrics))
def assertCreatedUninitialized(metrics: Set[Metric]): Unit = {
metrics.size must be > (0)
metrics foreach { m
m.average.isEmpty must be(true)
if (m.value.isDefined) m.isDefined must be(true)
if (m.initializable) (m.trendable && m.isDefined && m.average.isEmpty) must be(true)
}
}
def assertInitialized(decay: Int, metrics: Set[Metric]): Unit = if (decay > 0) metrics.filter(_.trendable) foreach { m
m.initializable must be(false)
if (m.isDefined) m.average.isDefined must be(true)
}
def assertMerged(latest: Metric, peer: Metric, merged: Metric): Unit = if (latest same peer) {
if (latest.isDefined) {
if (peer.isDefined) {
merged.isDefined must be(true)
merged.value.get must be(latest.value.get)
if (latest.trendable) {
if (latest.initializable) merged.average.isEmpty must be(true)
else merged.average.isDefined must be(true)
}
} else {
merged.isDefined must be(true)
merged.value.get must be(latest.value.get)
if (latest.average.isDefined) merged.average.get must be(latest.average.get)
else merged.average.isEmpty must be(true)
}
} else {
if (peer.isDefined) {
merged.isDefined must be(true)
merged.value.get must be(peer.value.get)
if (peer.trendable) {
if (peer.initializable) merged.average.isEmpty must be(true)
else merged.average.isDefined must be(true)
}
} else {
merged.isDefined must be(false)
merged.average.isEmpty must be(true)
}
}
}
def assertExpectedSampleSize(isSigar: Boolean, decay: Int, node: NodeMetrics): Unit = {
node.metrics.size must be(9)
val metrics = node.metrics.filter(_.isDefined)
if (isSigar) { // combined cpu + jmx max heap
metrics.size must be >= (7)
metrics.size must be <= (9)
} else { // jmx max heap
metrics.size must be >= (4)
metrics.size must be <= (5)
}
if (decay > 0) metrics.collect { case m if m.trendable && (!m.initializable) m }.foreach(_.average.isDefined must be(true))
}
def collectNodeMetrics(nodes: Set[NodeMetrics]): Seq[Metric] = {
var r: Seq[Metric] = Seq.empty
nodes.foreach(n r ++= n.metrics.filter(_.isDefined))
r
}
}
trait AbstractClusterMetricsSpec extends DefaultTimeout {
this: AkkaSpec
val selfAddress = new Address("akka", "localhost")
val window = 49
val interval: FiniteDuration = 100 millis
val longDuration = 120 seconds // for long running tests
val samples = 100
def createMetricsCollector: MetricsCollector = MetricsCollector(selfAddress, log, system.asInstanceOf[ExtendedActorSystem].dynamicAccess)
}

View file

@ -0,0 +1,107 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.concurrent.util.duration._
import akka.testkit.{ ImplicitSender, AkkaSpec }
import akka.actor.Address
import java.lang.System.{ currentTimeMillis newTimestamp }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with AbstractClusterMetricsSpec with MetricSpec {
val collector = createMetricsCollector
"A MetricsGossip" must {
"add and initialize new NodeMetrics" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
var localGossip = MetricsGossip(window)
localGossip :+= m1
localGossip.nodes.size must be(1)
localGossip.nodeKeys.size must be(localGossip.nodes.size)
assertMasterMetricsAgainstGossipMetrics(Set(m1), localGossip)
assertExpectedSampleSize(collector.isSigar, localGossip)
assertInitialized(localGossip.rateOfDecay, collectNodeMetrics(localGossip.nodes).toSet)
localGossip :+= m2
localGossip.nodes.size must be(2)
localGossip.nodeKeys.size must be(localGossip.nodes.size)
assertMasterMetricsAgainstGossipMetrics(Set(m1, m2), localGossip)
assertExpectedSampleSize(collector.isSigar, localGossip)
assertInitialized(localGossip.rateOfDecay, collectNodeMetrics(localGossip.nodes).toSet)
}
"merge peer metrics" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
var remoteGossip = MetricsGossip(window)
remoteGossip :+= m1
remoteGossip :+= m2
remoteGossip.nodes.size must be(2)
val beforeMergeNodes = remoteGossip.nodes
val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = newTimestamp)
remoteGossip :+= m2Updated // merge peers
remoteGossip.nodes.size must be(2)
assertMasterMetricsAgainstGossipMetrics(beforeMergeNodes, remoteGossip)
assertExpectedSampleSize(collector.isSigar, remoteGossip)
remoteGossip.nodes collect { case peer if peer.address == m2.address peer.timestamp must be(m2Updated.timestamp) }
}
"merge an existing metric set for a node and update node ring" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
val m3 = NodeMetrics(Address("akka", "sys", "a", 2556), newTimestamp, collector.sample.metrics)
val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = newTimestamp)
var localGossip = MetricsGossip(window)
localGossip :+= m1
localGossip :+= m2
var remoteGossip = MetricsGossip(window)
remoteGossip :+= m3
remoteGossip :+= m2Updated
localGossip.nodeKeys.contains(m1.address) must be(true)
remoteGossip.nodeKeys.contains(m3.address) must be(true)
// must contain nodes 1,3, and the most recent version of 2
val mergedGossip = localGossip merge remoteGossip
mergedGossip.nodes.size must be(3)
assertExpectedNodeAddresses(mergedGossip, Set(m1, m2, m3))
assertExpectedSampleSize(collector.isSigar, mergedGossip)
assertCreatedUninitialized(mergedGossip)
assertInitialized(mergedGossip)
mergedGossip.nodes.find(_.address == m2.address).get.timestamp must be(m2Updated.timestamp)
}
"get the current NodeMetrics if it exists in the local nodes" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
var localGossip = MetricsGossip(window)
localGossip :+= m1
localGossip.metricsFor(m1).nonEmpty must be(true)
}
"remove a node if it is no longer Up" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
var localGossip = MetricsGossip(window)
localGossip :+= m1
localGossip :+= m2
localGossip.nodes.size must be(2)
localGossip = localGossip remove m1.address
localGossip.nodes.size must be(1)
localGossip.nodes.exists(_.address == m1.address) must be(false)
}
}
}

View file

@ -0,0 +1,55 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.testkit.AkkaSpec
import akka.actor.Address
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class NodeMetricsSpec extends AkkaSpec with AbstractClusterMetricsSpec with MetricSpec {
val collector = createMetricsCollector
val node1 = Address("akka", "sys", "a", 2554)
val node2 = Address("akka", "sys", "a", 2555)
"NodeMetrics must" must {
"recognize updatable nodes" in {
(NodeMetrics(node1, 0) updatable NodeMetrics(node1, 1)) must be(true)
}
"recognize non-updatable nodes" in {
(NodeMetrics(node1, 1) updatable NodeMetrics(node2, 0)) must be(false)
}
"return correct result for 2 'same' nodes" in {
(NodeMetrics(node1, 0) same NodeMetrics(node1, 0)) must be(true)
}
"return correct result for 2 not 'same' nodes" in {
(NodeMetrics(node1, 0) same NodeMetrics(node2, 0)) must be(false)
}
"merge 2 NodeMetrics by most recent" in {
val sample1 = NodeMetrics(node1, 1, collector.sample.metrics)
val sample2 = NodeMetrics(node1, 2, collector.sample.metrics)
val merged = sample1 merge sample2
merged.timestamp must be(sample2.timestamp)
merged.metrics must be(sample2.metrics)
}
"not merge 2 NodeMetrics if master is more recent" in {
val sample1 = NodeMetrics(node1, 1, collector.sample.metrics)
val sample2 = NodeMetrics(node2, 0, sample1.metrics)
val merged = sample2 merge sample2 // older and not same
merged.timestamp must be(sample2.timestamp)
merged.metrics must be(sample2.metrics)
}
}
}