ClusterLoadBalancingRouter and refactoring of metrics, see #2547

* MetricsSelector, calculate capacity, weights and allocate weighted
  routee refs
* ClusterLoadBalancingRouterSpec
* Optional heap max
* Constants for the metric fields
* Refactoring of Metric and decay
* Rewrite of DataStreamSpec
* Correction of EWMA and removal of BigInt, BigDecimal
* Separation of MetricsCollector into trait and two classes,
  SigarMetricsCollector and JmxMetricsCollector
* This will reduce cost when sigar is not installed, such as
  avoiding throwing and catching exc for every call
* Improved error handling for loading sigar
* Made MetricsCollector implementation configurable
* Tested with sigar
This commit is contained in:
Patrik Nordwall 2012-11-07 20:36:24 +01:00
parent f306964fca
commit c9d206764a
18 changed files with 1065 additions and 733 deletions

View file

@ -70,7 +70,7 @@ akka {
failure-detector { failure-detector {
# FQCN of the failure detector implementation. # FQCN of the failure detector implementation.
# It must implement akka.cluster.akka.cluster and # It must implement akka.cluster.cluster.FailureDetector and
# have constructor with akka.actor.ActorSystem and # have constructor with akka.actor.ActorSystem and
# akka.cluster.ClusterSettings parameters # akka.cluster.ClusterSettings parameters
implementation-class = "akka.cluster.AccrualFailureDetector" implementation-class = "akka.cluster.AccrualFailureDetector"
@ -118,14 +118,22 @@ akka {
# Enable or disable metrics collector for load-balancing nodes. # Enable or disable metrics collector for load-balancing nodes.
enabled = on enabled = on
# FQCN of the metrics collector implementation.
# It must implement akka.cluster.cluster.MetricsCollector and
# have constructor with akka.actor.ActorSystem parameter.
collector-class = "akka.cluster.SigarMetricsCollector"
# How often metrics is sampled on a node. # How often metrics is sampled on a node.
metrics-interval = 3s collect-interval = 3s
# How often a node publishes metrics information. # How often a node publishes metrics information.
gossip-interval = 3s gossip-interval = 3s
# How quickly the exponential weighting of past data is decayed compared to # How quickly the exponential weighting of past data is decayed compared to
# new data. Set higher to increase the bias toward newer values. # new data. Set lower to increase the bias toward newer values.
# Corresponds to 'N time periods' as explained in
# http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
# Must be >= 1
rate-of-decay = 10 rate-of-decay = 10
} }

View file

@ -4,29 +4,36 @@
package akka.cluster package akka.cluster
import scala.language.postfixOps import java.io.Closeable
import scala.concurrent.duration._
import scala.collection.immutable.{ SortedSet, Map }
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.{ Try, Success, Failure }
import math.{ ScalaNumber, ScalaNumericConversions }
import scala.runtime.{ RichLong, RichDouble, RichInt }
import akka.actor._
import akka.event.LoggingAdapter
import akka.cluster.MemberStatus.Up
import java.lang.management.{ OperatingSystemMXBean, MemoryMXBean, ManagementFactory }
import java.lang.reflect.Method
import java.lang.System.{ currentTimeMillis newTimestamp } import java.lang.System.{ currentTimeMillis newTimestamp }
import java.lang.management.{ OperatingSystemMXBean, MemoryMXBean, ManagementFactory }
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import scala.collection.immutable.{ SortedSet, Map }
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.runtime.{ RichLong, RichDouble, RichInt }
import scala.util.{ Try, Success, Failure }
import akka.ConfigurationException
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.DynamicAccess
import akka.actor.ExtendedActorSystem
import akka.cluster.MemberStatus.Up
import akka.event.Logging
/** /**
* INTERNAL API. * INTERNAL API.
* *
* This strategy is primarily for load-balancing of nodes. It controls metrics sampling * Cluster metrics is primarily for load-balancing of nodes. It controls metrics sampling
* at a regular frequency, prepares highly variable data for further analysis by other entities, * at a regular frequency, prepares highly variable data for further analysis by other entities,
* and publishes the latest cluster metrics data around the node ring to assist in determining * and publishes the latest cluster metrics data around the node ring and local eventStream
* the need to redirect traffic to the least-loaded nodes. * to assist in determining the need to redirect traffic to the least-loaded nodes.
* *
* Metrics sampling is delegated to the [[akka.cluster.MetricsCollector]]. * Metrics sampling is delegated to the [[akka.cluster.MetricsCollector]].
* *
@ -43,8 +50,6 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
import cluster.{ selfAddress, scheduler, settings } import cluster.{ selfAddress, scheduler, settings }
import settings._ import settings._
val DefaultRateOfDecay: Int = 10
/** /**
* The node ring gossipped that contains only members that are Up. * The node ring gossipped that contains only members that are Up.
*/ */
@ -53,12 +58,12 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
/** /**
* The latest metric values with their statistical data. * The latest metric values with their statistical data.
*/ */
var latestGossip: MetricsGossip = MetricsGossip(if (MetricsRateOfDecay <= 0) DefaultRateOfDecay else MetricsRateOfDecay) var latestGossip: MetricsGossip = MetricsGossip()
/** /**
* The metrics collector that samples data on the node. * The metrics collector that samples data on the node.
*/ */
val collector: MetricsCollector = MetricsCollector(selfAddress, log, context.system.asInstanceOf[ExtendedActorSystem].dynamicAccess) val collector: MetricsCollector = MetricsCollector(context.system.asInstanceOf[ExtendedActorSystem], settings)
/** /**
* Start periodic gossip to random nodes in cluster * Start periodic gossip to random nodes in cluster
@ -160,7 +165,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
* *
* @param nodes metrics per node * @param nodes metrics per node
*/ */
private[cluster] case class MetricsGossip(rateOfDecay: Int, nodes: Set[NodeMetrics] = Set.empty) { private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics] = Set.empty) {
/** /**
* Removes nodes if their correlating node ring members are not [[akka.cluster.MemberStatus.Up]] * Removes nodes if their correlating node ring members are not [[akka.cluster.MemberStatus.Up]]
@ -194,11 +199,10 @@ private[cluster] case class MetricsGossip(rateOfDecay: Int, nodes: Set[NodeMetri
val names = previous map (_.name) val names = previous map (_.name)
val (toMerge: Set[Metric], unseen: Set[Metric]) = data.metrics partition (a names contains a.name) val (toMerge: Set[Metric], unseen: Set[Metric]) = data.metrics partition (a names contains a.name)
val initialized = unseen.map(_.initialize(rateOfDecay))
val merged = toMerge flatMap (latest previous.collect { case peer if latest same peer peer :+ latest }) val merged = toMerge flatMap (latest previous.collect { case peer if latest same peer peer :+ latest })
val refreshed = nodes filterNot (_.address == data.address) val refreshed = nodes filterNot (_.address == data.address)
copy(nodes = refreshed + data.copy(metrics = initialized ++ merged)) copy(nodes = refreshed + data.copy(metrics = unseen ++ merged))
} }
/** /**
@ -231,6 +235,8 @@ private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: Metrics
* INTERNAL API * INTERNAL API
* *
* @param decay sets how quickly the exponential weighting decays for past data compared to new data * @param decay sets how quickly the exponential weighting decays for past data compared to new data
* Corresponds to 'N time periods' as explained in
* http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
* *
* @param ewma the current exponentially weighted moving average, e.g. Y(n - 1), or, * @param ewma the current exponentially weighted moving average, e.g. Y(n - 1), or,
* the sampled value resulting from the previous smoothing iteration. * the sampled value resulting from the previous smoothing iteration.
@ -240,36 +246,30 @@ private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: Metrics
* *
* @param startTime the time of initial sampling for this data stream * @param startTime the time of initial sampling for this data stream
*/ */
private[cluster] case class DataStream(decay: Int, ewma: ScalaNumericConversions, startTime: Long, timestamp: Long) private[cluster] case class DataStream(decay: Int, ewma: Double, startTime: Long, timestamp: Long)
extends ClusterMessage with MetricNumericConverter { extends ClusterMessage {
require(decay >= 1, "Rate of decay must be >= 1")
/** /**
* The rate at which the weights of past observations * The rate at which the weights of past observations
* decay as they become more distant. * decay as they become more distant.
*/ */
private val α = 2 / decay + 1 private val α = 2.0 / (decay + 1)
/** /**
* Calculates the exponentially weighted moving average for a given monitored data set. * Calculates the exponentially weighted moving average for a given monitored data set.
* The datam can be too large to fit into an int or long, thus we use ScalaNumericConversions,
* and defer to BigInt or BigDecimal.
*
* FIXME - look into the math: the new ewma becomes the new value
* value: 416979936, prev ewma: 416581760, new ewma: 416979936
* *
* @param xn the new data point * @param xn the new data point
* *
* @return a [[akka.cluster.DataStream]] with the updated yn and timestamp * @return a [[akka.cluster.DataStream]] with the updated yn and timestamp
*/ */
def :+(xn: ScalaNumericConversions): DataStream = if (xn != ewma) convert(xn) fold ( def :+(xn: Double): DataStream = copy(ewma = (α * xn) + (1 - α) * ewma, timestamp = newTimestamp)
nl copy(ewma = BigInt((α * nl) + (1 - α) * ewma.longValue()), timestamp = newTimestamp),
nd copy(ewma = BigDecimal((α * nd) + (1 - α) * ewma.doubleValue()), timestamp = newTimestamp))
else this
/** /**
* The duration of observation for this data stream * The duration of observation for this data stream
*/ */
def duration: FiniteDuration = (timestamp - startTime) millis def duration: FiniteDuration = (timestamp - startTime).millis
} }
@ -278,19 +278,17 @@ private[cluster] case class DataStream(decay: Int, ewma: ScalaNumericConversions
* *
* @param name the metric name * @param name the metric name
* *
* @param value the metric value, which may or may not be defined * @param value the metric value, which may or may not be defined, it must be a valid numerical value,
* see [[akka.cluster.MetricNumericConverter.defined()]]
* *
* @param average the data stream of the metric value, for trending over time. Metrics that are already * @param average the data stream of the metric value, for trending over time. Metrics that are already
* averages (e.g. system load average) or finite (e.g. as total cores), are not trended. * averages (e.g. system load average) or finite (e.g. as total cores), are not trended.
*/ */
private[cluster] case class Metric(name: String, value: Option[ScalaNumericConversions], average: Option[DataStream]) private[cluster] case class Metric private (name: String, value: Option[Number], average: Option[DataStream],
dummy: Boolean) // dummy because of overloading clash with apply
extends ClusterMessage with MetricNumericConverter { extends ClusterMessage with MetricNumericConverter {
/** require(value.isEmpty || defined(value.get), "Invalid Metric [%s] value [%]".format(name, value))
* Returns the metric with a new data stream for data trending if eligible,
* otherwise returns the unchanged metric.
*/
def initialize(decay: Int): Metric = if (initializable) copy(average = Some(DataStream(decay, value.get, newTimestamp, newTimestamp))) else this
/** /**
* If defined ( [[akka.cluster.MetricNumericConverter.defined()]] ), updates the new * If defined ( [[akka.cluster.MetricNumericConverter.defined()]] ), updates the new
@ -298,36 +296,27 @@ private[cluster] case class Metric(name: String, value: Option[ScalaNumericConve
*/ */
def :+(latest: Metric): Metric = latest.value match { def :+(latest: Metric): Metric = latest.value match {
case Some(v) if this same latest average match { case Some(v) if this same latest average match {
case Some(previous) copy(value = Some(v), average = Some(previous :+ v)) case Some(previous) copy(value = latest.value, average = Some(previous :+ v.doubleValue))
case None if latest.average.isDefined copy(value = Some(v), average = latest.average) case None if latest.average.isDefined copy(value = latest.value, average = latest.average)
case None if !latest.average.isDefined copy(value = Some(v)) case None if latest.average.isEmpty copy(value = latest.value)
} }
case None this case None this
} }
def isDefined: Boolean = value.isDefined
/** /**
* @see [[akka.cluster.MetricNumericConverter.defined()]] * The numerical value of the average, if defined, otherwise the latest value,
* if defined.
*/ */
def isDefined: Boolean = value match { def averageValue: Option[Double] =
case Some(a) defined(a) average map (_.ewma) orElse value.map(_.doubleValue)
case None false
}
/** /**
* Returns true if <code>that</code> is tracking the same metric as this. * Returns true if <code>that</code> is tracking the same metric as this.
*/ */
def same(that: Metric): Boolean = name == that.name def same(that: Metric): Boolean = name == that.name
/**
* Returns true if the metric requires initialization.
*/
def initializable: Boolean = trendable && isDefined && average.isEmpty
/**
* Returns true if the metric is a value applicable for trending.
*/
def trendable: Boolean = !(Metric.noStream contains name)
} }
/** /**
@ -336,65 +325,30 @@ private[cluster] case class Metric(name: String, value: Option[ScalaNumericConve
* Companion object of Metric class. * Companion object of Metric class.
*/ */
private[cluster] object Metric extends MetricNumericConverter { private[cluster] object Metric extends MetricNumericConverter {
import NodeMetrics.MetricValues._
/** private def definedValue(value: Option[Number]): Option[Number] =
* The metrics that are already averages or finite are not trended over time. if (value.isDefined && defined(value.get)) value else None
*/
private val noStream = Set("system-load-average", "total-cores", "processors")
/** /**
* Evaluates validity of <code>value</code> based on whether it is available (SIGAR on classpath) * Evaluates validity of <code>value</code> based on whether it is available (SIGAR on classpath)
* or defined for the OS (JMX). If undefined we set the value option to None and do not modify * or defined for the OS (JMX). If undefined we set the value option to None and do not modify
* the latest sampled metric to avoid skewing the statistical trend. * the latest sampled metric to avoid skewing the statistical trend.
*
* @param decay rate of decay for values applicable for trending
*/ */
def apply(name: String, value: Option[ScalaNumericConversions]): Metric = value match { def apply(name: String, value: Option[Number], decay: Option[Int]): Metric = {
case Some(v) if defined(v) Metric(name, value, None) val dv = definedValue(value)
case _ Metric(name, None, None) val average =
if (decay.isDefined && dv.isDefined) {
val now = newTimestamp
Some(DataStream(decay.get, dv.get.doubleValue, now, now))
} else
None
new Metric(name, dv, average, true)
} }
}
/**
* Reusable logic for particular metric categories or to leverage all for routing.
*/
private[cluster] trait MetricsAwareClusterNodeSelector {
import NodeMetrics._
import NodeMetrics.NodeMetricsComparator._
/**
* Returns the address of the available node with the lowest cumulative difference
* between heap memory max and used/committed.
*/
def selectByMemory(nodes: Set[NodeMetrics]): Option[Address] = Try(Some(nodes.map {
n
val (used, committed, max) = MetricValues.unapply(n.heapMemory)
(n.address, max match {
case Some(m) ((committed - used) + (m - used) + (m - committed))
case None committed - used
})
}.min._1)) getOrElse None
// TODO
def selectByNetworkLatency(nodes: Set[NodeMetrics]): Option[Address] = None
/* Try(nodes.map {
n
val (rxBytes, txBytes) = MetricValues.unapply(n.networkLatency).get
(n.address, (rxBytes + txBytes))
}.min._1) getOrElse None // TODO: min or max
*/
// TODO
def selectByCpu(nodes: Set[NodeMetrics]): Option[Address] = None
/* Try(nodes.map {
n
val (loadAverage, processors, combinedCpu, cores) = MetricValues.unapply(n.cpu)
var cumulativeDifference = 0
// TODO: calculate
combinedCpu.get
cores.get
(n.address, cumulativeDifference)
}.min._1) getOrElse None // TODO min or max
}*/
def apply(name: String): Metric = new Metric(name, None, None, true)
} }
/** /**
@ -417,6 +371,7 @@ private[cluster] trait MetricsAwareClusterNodeSelector {
*/ */
private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) extends ClusterMessage { private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) extends ClusterMessage {
import NodeMetrics._ import NodeMetrics._
import NodeMetrics.MetricValues._
/** /**
* Returns the most recent data. * Returns the most recent data.
@ -436,13 +391,13 @@ private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metri
/** /**
* Of all the data streams, this fluctuates the most. * Of all the data streams, this fluctuates the most.
*/ */
def heapMemory: HeapMemory = HeapMemory(metric("heap-memory-used"), metric("heap-memory-committed"), metric("heap-memory-max")) def heapMemory: HeapMemory = HeapMemory(metric(HeapMemoryUsed), metric(HeapMemoryCommitted), metric(HeapMemoryMax))
def networkLatency: NetworkLatency = NetworkLatency(metric("network-max-rx"), metric("network-max-tx")) def networkLatency: NetworkLatency = NetworkLatency(metric(NetworkInboundRate), metric(NetworkOutboundRate))
def cpu: Cpu = Cpu(metric("system-load-average"), metric("processors"), metric("cpu-combined"), metric("total-cores")) def cpu: Cpu = Cpu(metric(SystemLoadAverage), metric(Processors), metric(CpuCombined), metric(TotalCores))
def metric(key: String): Metric = metrics.collectFirst { case m if m.name == key m } getOrElse Metric(key, None) def metric(key: String): Metric = metrics.collectFirst { case m if m.name == key m } getOrElse Metric(key)
} }
@ -456,44 +411,36 @@ private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metri
*/ */
private[cluster] object NodeMetrics { private[cluster] object NodeMetrics {
object NodeMetricsComparator extends MetricNumericConverter {
implicit val longMinOrdering: Ordering[Long] = Ordering.fromLessThan[Long] { (a, b) (a < b) }
implicit val longMinAddressOrdering: Ordering[(Address, Long)] = new Ordering[(Address, Long)] {
def compare(a: (Address, Long), b: (Address, Long)): Int = longMinOrdering.compare(a._2, b._2)
}
def maxAddressLong(seq: Seq[(Address, Long)]): (Address, Long) =
seq.reduceLeft((a: (Address, Long), b: (Address, Long)) if (a._2 > b._2) a else b)
implicit val doubleMinOrdering: Ordering[Double] = Ordering.fromLessThan[Double] { (a, b) (a < b) }
implicit val doubleMinAddressOrdering: Ordering[(Address, Double)] = new Ordering[(Address, Double)] {
def compare(a: (Address, Double), b: (Address, Double)): Int = doubleMinOrdering.compare(a._2, b._2)
}
def maxAddressDouble(seq: Seq[(Address, Double)]): (Address, Double) =
seq.reduceLeft((a: (Address, Double), b: (Address, Double)) if (a._2 > b._2) a else b)
}
sealed trait MetricValues sealed trait MetricValues
object MetricValues { object MetricValues {
final val HeapMemoryUsed = "heap-memory-used"
final val HeapMemoryCommitted = "heap-memory-committed"
final val HeapMemoryMax = "heap-memory-max"
final val NetworkInboundRate = "network-max-inbound"
final val NetworkOutboundRate = "network-max-outbound"
final val SystemLoadAverage = "system-load-average"
final val Processors = "processors"
final val CpuCombined = "cpu-combined"
final val TotalCores = "total-cores"
def unapply(v: HeapMemory): Tuple3[Long, Long, Option[Long]] = def unapply(v: HeapMemory): Tuple3[Long, Long, Option[Long]] =
(v.used.average.get.ewma.longValue(), (v.used.averageValue.get.longValue,
v.committed.average.get.ewma.longValue(), v.committed.averageValue.get.longValue,
Try(Some(v.max.average.get.ewma.longValue())) getOrElse None) v.max.averageValue map (_.longValue) orElse None)
def unapply(v: NetworkLatency): Option[(Long, Long)] = def unapply(v: NetworkLatency): Option[(Long, Long)] =
Try(Some(v.maxRxIO.average.get.ewma.longValue(), v.maxTxIO.average.get.ewma.longValue())) getOrElse None (v.inbound.averageValue, v.outbound.averageValue) match {
case (Some(a), Some(b)) Some((a.longValue, b.longValue))
case _ None
}
def unapply(v: Cpu): Tuple4[Double, Int, Option[Double], Option[Int]] = def unapply(v: Cpu): Tuple4[Option[Double], Int, Option[Double], Option[Int]] =
(v.systemLoadAverage.value.get.doubleValue(), (v.systemLoadAverage.averageValue map (_.doubleValue),
v.processors.value.get.intValue(), v.processors.averageValue.get.intValue,
Try(Some(v.combinedCpu.average.get.ewma.doubleValue())) getOrElse None, v.combinedCpu.averageValue map (_.doubleValue),
Try(Some(v.cores.value.get.intValue())) getOrElse None) v.cores.averageValue map (_.intValue))
} }
/** /**
@ -505,14 +452,17 @@ private[cluster] object NodeMetrics {
* @param max the maximum amount of memory (in bytes) that can be used for JVM memory management. * @param max the maximum amount of memory (in bytes) that can be used for JVM memory management.
* Can be undefined on some OS. * Can be undefined on some OS.
*/ */
case class HeapMemory(used: Metric, committed: Metric, max: Metric) extends MetricValues case class HeapMemory(used: Metric, committed: Metric, max: Metric) extends MetricValues {
require(used.isDefined, "used must be defined")
require(committed.isDefined, "committed must be defined")
}
/** /**
* @param maxRxIO the max network IO rx value, in bytes * @param inbound the inbound network IO rate, in bytes
* *
* @param maxTxIO the max network IO tx value, in bytes * @param outbound the outbound network IO rate, in bytes
*/ */
case class NetworkLatency(maxRxIO: Metric, maxTxIO: Metric) extends MetricValues case class NetworkLatency(inbound: Metric, outbound: Metric) extends MetricValues
/** /**
* @param systemLoadAverage OS-specific average load on the CPUs in the system, for the past 1 minute * @param systemLoadAverage OS-specific average load on the CPUs in the system, for the past 1 minute
@ -524,7 +474,9 @@ private[cluster] object NodeMetrics {
* *
* @param cores the number of cores (multi-core: per processor) * @param cores the number of cores (multi-core: per processor)
*/ */
private[cluster] case class Cpu(systemLoadAverage: Metric, processors: Metric, combinedCpu: Metric, cores: Metric) extends MetricValues private[cluster] case class Cpu(systemLoadAverage: Metric, processors: Metric, combinedCpu: Metric, cores: Metric) extends MetricValues {
require(processors.isDefined, "processors must be defined")
}
} }
@ -537,44 +489,152 @@ private[cluster] object NodeMetrics {
private[cluster] trait MetricNumericConverter { private[cluster] trait MetricNumericConverter {
/** /**
* A defined value is neither a -1 or NaN/Infinite: * An undefined value is neither a -1 (negative) or NaN/Infinite:
* <ul><li>JMX system load average and max heap can be 'undefined' for certain OS, in which case a -1 is returned</li> * <ul><li>JMX system load average and max heap can be 'undefined' for certain OS, in which case a -1 is returned</li>
* <li>SIGAR combined CPU can occasionally return a NaN or Infinite (known bug)</li></ul> * <li>SIGAR combined CPU can occasionally return a NaN or Infinite (known bug)</li></ul>
*/ */
def defined(value: ScalaNumericConversions): Boolean = def defined(value: Number): Boolean = convertNumber(value) match {
convert(value) fold (a value.underlying != -1, b !(b.isNaN || b.isInfinite)) case Left(a) a >= 0
case Right(b) !(b.isNaN || b.isInfinite)
}
/** /**
* May involve rounding or truncation. * May involve rounding or truncation.
*/ */
def convert(from: ScalaNumericConversions): Either[Long, Double] = from match { def convertNumber(from: Any): Either[Long, Double] = from match {
case n: BigInt Left(n.longValue()) case n: Int Left(n)
case n: BigDecimal Right(n.doubleValue()) case n: Long Left(n)
case n: Double Right(n)
case n: Float Right(n)
case n: RichInt Left(n.abs) case n: RichInt Left(n.abs)
case n: RichLong Left(n.self) case n: RichLong Left(n.self)
case n: RichDouble Right(n.self) case n: RichDouble Right(n.self)
case n: BigInt Left(n.longValue)
case n: BigDecimal Right(n.doubleValue)
case x throw new IllegalArgumentException("Not a number [%s]" format x)
} }
} }
/**
* INTERNAL API
*/
private[cluster] trait MetricsCollector extends Closeable {
/**
* Samples and collects new data points.
*/
def sample: NodeMetrics
}
/** /**
* INTERNAL API * INTERNAL API
* *
* Loads JVM metrics through JMX monitoring beans. If Hyperic SIGAR is on the classpath, this * Loads JVM metrics through JMX monitoring beans.
* loads wider and more accurate range of metrics in combination with SIGAR's native OS library.
*
* FIXME switch to Scala reflection
*
* @param sigar the optional org.hyperic.Sigar instance
* *
* @param address The [[akka.actor.Address]] of the node being sampled * @param address The [[akka.actor.Address]] of the node being sampled
* @param decay how quickly the exponential weighting of past data is decayed
*/ */
private[cluster] class MetricsCollector private (private val sigar: Option[AnyRef], address: Address) extends MetricNumericConverter { private[cluster] class JmxMetricsCollector(address: Address, decay: Int) extends MetricsCollector {
import NodeMetrics.MetricValues._
private def this(cluster: Cluster) =
this(cluster.selfAddress, cluster.settings.MetricsRateOfDecay)
def this(system: ActorSystem) = this(Cluster(system))
private val decayOption = Some(decay)
private val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean private val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean
private val osMBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean private val osMBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean
/**
* Samples and collects new data points.
*/
def sample: NodeMetrics = NodeMetrics(address, newTimestamp, Set(
systemLoadAverage, heapUsed, heapCommitted, heapMax, processors))
/**
* JMX Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
* On some systems the JMX OS system load average may not be available, in which case a -1 is returned,
* which means that the returned Metric is undefined.
*/
def systemLoadAverage: Metric = Metric(
name = SystemLoadAverage,
value = Some(osMBean.getSystemLoadAverage),
decay = None)
/**
* (JMX) Returns the number of available processors
*/
def processors: Metric = Metric(
name = Processors,
value = Some(osMBean.getAvailableProcessors),
decay = None)
// FIXME those three heap calls should be done at once
/**
* (JMX) Returns the current sum of heap memory used from all heap memory pools (in bytes).
*/
def heapUsed: Metric = Metric(
name = HeapMemoryUsed,
value = Some(memoryMBean.getHeapMemoryUsage.getUsed),
decay = decayOption)
/**
* (JMX) Returns the current sum of heap memory guaranteed to be available to the JVM
* from all heap memory pools (in bytes).
*/
def heapCommitted: Metric = Metric(
name = HeapMemoryCommitted,
value = Some(memoryMBean.getHeapMemoryUsage.getCommitted),
decay = decayOption)
/**
* (JMX) Returns the maximum amount of memory (in bytes) that can be used
* for JVM memory management. If not defined the metrics value is None, i.e.
* never negative.
*/
def heapMax: Metric = Metric(
name = HeapMemoryMax,
value = Some(memoryMBean.getHeapMemoryUsage.getMax),
decay = None)
def close(): Unit = ()
}
/**
* INTERNAL API
*
* Loads metrics through Hyperic SIGAR and JMX monitoring beans. This
* loads wider and more accurate range of metrics compared to JmxMetricsCollector
* by using SIGAR's native OS library.
*
* The constructor will by design throw exception if org.hyperic.sigar.Sigar can't be loaded, due
* to missing classes or native libraries.
*
* TODO switch to Scala reflection
*
* @param address The [[akka.actor.Address]] of the node being sampled
* @param decay how quickly the exponential weighting of past data is decayed
* @param sigar the org.hyperic.Sigar instance
*/
private[cluster] class SigarMetricsCollector(address: Address, decay: Int, sigar: AnyRef)
extends JmxMetricsCollector(address, decay) {
import NodeMetrics.MetricValues._
def this(address: Address, decay: Int, dynamicAccess: DynamicAccess) =
this(address, decay, dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Seq.empty).get)
private def this(cluster: Cluster) =
this(cluster.selfAddress, cluster.settings.MetricsRateOfDecay, cluster.system.dynamicAccess)
def this(system: ActorSystem) = this(Cluster(system))
private val decayOption = Some(decay)
private val LoadAverage: Option[Method] = createMethodFrom(sigar, "getLoadAverage") private val LoadAverage: Option[Method] = createMethodFrom(sigar, "getLoadAverage")
private val CpuList: Option[Method] = createMethodFrom(sigar, "getCpuInfoList").map(m m) private val CpuList: Option[Method] = createMethodFrom(sigar, "getCpuInfoList").map(m m)
@ -585,48 +645,41 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe
private val CombinedCpu: Option[Method] = Try(Cpu.get.getReturnType.getMethod("getCombined")).toOption private val CombinedCpu: Option[Method] = Try(Cpu.get.getReturnType.getMethod("getCombined")).toOption
private val Pid: Option[Method] = createMethodFrom(sigar, "getPid")
// Do something initially, in constructor, to make sure that the native library can be loaded.
// This will by design throw exception if sigar isn't usable
val pid: Long = createMethodFrom(sigar, "getPid") match {
case Some(method)
try method.invoke(sigar).asInstanceOf[Long] catch {
case e: InvocationTargetException if e.getCause.isInstanceOf[LinkageError]
// native libraries not in place
// don't throw fatal LinkageError, but something less harmless
throw new IllegalArgumentException(e.getCause.toString)
case e: InvocationTargetException throw e.getCause
}
case None throw new IllegalArgumentException("Wrong version of Sigar, expected 'getPid' method")
}
/** /**
* Samples and collects new data points. * Samples and collects new data points.
*
* @return [[akka.cluster.NodeMetrics]]
*/ */
def sample: NodeMetrics = NodeMetrics(address, newTimestamp, Set(cpuCombined, totalCores, override def sample: NodeMetrics = NodeMetrics(address, newTimestamp, Set(cpuCombined, totalCores,
systemLoadAverage, used, committed, max, processors, networkMaxRx, networkMaxTx)) systemLoadAverage, heapUsed, heapCommitted, heapMax, processors, networkMaxRx, networkMaxTx))
/** /**
* (SIGAR / JMX) Returns the OS-specific average load on the CPUs in the system, for the past 1 minute. * (SIGAR / JMX) Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
* On some systems the JMX OS system load average may not be available, in which case a Metric with * On some systems the JMX OS system load average may not be available, in which case a -1 is returned,
* undefined value is returned. * which means that the returned Metric is undefined.
* Hyperic SIGAR provides more precise values, thus, if the library is on the classpath, it is the default. * Hyperic SIGAR provides more precise values, thus, if the library is on the classpath, it is the default.
*/ */
def systemLoadAverage: Metric = Metric("system-load-average", override def systemLoadAverage: Metric = {
Try(LoadAverage.get.invoke(sigar.get).asInstanceOf[Array[Double]].toSeq.head).getOrElse( val m = Metric(
osMBean.getSystemLoadAverage) match { name = SystemLoadAverage,
case x if x < 0 None // load average may be unavailable on some platform value = Try(LoadAverage.get.invoke(sigar).asInstanceOf[Array[AnyRef]].head.asInstanceOf[Number]).toOption,
case x Some(BigDecimal(x)) decay = None)
}) if (m.isDefined) m else super.systemLoadAverage
}
/**
* (JMX) Returns the number of available processors
*/
def processors: Metric = Metric("processors", Some(BigInt(osMBean.getAvailableProcessors)))
/**
* (JMX) Returns the current sum of heap memory used from all heap memory pools (in bytes).
*/
def used: Metric = Metric("heap-memory-used", Some(BigInt(memoryMBean.getHeapMemoryUsage.getUsed)))
/**
* (JMX) Returns the current sum of heap memory guaranteed to be available to the JVM
* from all heap memory pools (in bytes).
*/
def committed: Metric = Metric("heap-memory-committed", Some(BigInt(memoryMBean.getHeapMemoryUsage.getCommitted)))
/**
* (JMX) Returns the maximum amount of memory (in bytes) that can be used
* for JVM memory management. If undefined, returns -1.
*/
def max: Metric = Metric("heap-memory-max", Some(BigInt(memoryMBean.getHeapMemoryUsage.getMax)))
/** /**
* (SIGAR) Returns the combined CPU sum of User + Sys + Nice + Wait, in percentage. This metric can describe * (SIGAR) Returns the combined CPU sum of User + Sys + Nice + Wait, in percentage. This metric can describe
@ -636,49 +689,61 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe
* In the data stream, this will sometimes return with a valid metric value, and sometimes as a NaN or Infinite. * In the data stream, this will sometimes return with a valid metric value, and sometimes as a NaN or Infinite.
* Documented bug https://bugzilla.redhat.com/show_bug.cgi?id=749121 and several others. * Documented bug https://bugzilla.redhat.com/show_bug.cgi?id=749121 and several others.
*/ */
def cpuCombined: Metric = Metric("cpu-combined", Try(BigDecimal(CombinedCpu.get.invoke(Cpu.get.invoke(sigar.get)).asInstanceOf[Double])).toOption) def cpuCombined: Metric = Metric(
name = CpuCombined,
value = Try(CombinedCpu.get.invoke(Cpu.get.invoke(sigar)).asInstanceOf[Number]).toOption,
decay = decayOption)
/** /**
* FIXME: Array[Int].head - expose all if cores per processor might differ. * FIXME: Array[Int].head - expose all if cores per processor might differ.
* (SIGAR) Returns the total number of cores. * (SIGAR) Returns the total number of cores.
*
* FIXME do we need this information, isn't it enough with jmx processors?
*/ */
def totalCores: Metric = Metric("total-cores", Try(BigInt(CpuList.get.invoke(sigar.get).asInstanceOf[Array[AnyRef]].map(cpu def totalCores: Metric = Metric(
createMethodFrom(Some(cpu), "getTotalCores").get.invoke(cpu).asInstanceOf[Int]).head)).toOption) name = TotalCores,
value = Try(CpuList.get.invoke(sigar).asInstanceOf[Array[AnyRef]].map(cpu
createMethodFrom(cpu, "getTotalCores").get.invoke(cpu).asInstanceOf[Number]).head).toOption,
decay = None)
// FIXME those two network calls should be combined into one
/** /**
* (SIGAR) Returns the max network IO read/write value, in bytes, for network latency evaluation. * (SIGAR) Returns the max network IO read/write value, in bytes, for network latency evaluation.
*/ */
def networkMaxRx: Metric = networkMaxFor("getRxBytes", "network-max-rx") def networkMaxRx: Metric = networkFor("getRxBytes", NetworkInboundRate)
/** /**
* (SIGAR) Returns the max network IO tx value, in bytes. * (SIGAR) Returns the max network IO outbound value, in bytes.
*/ */
def networkMaxTx: Metric = networkMaxFor("getTxBytes", "network-max-tx") def networkMaxTx: Metric = networkFor("getTxBytes", NetworkOutboundRate)
/**
* Returns the network stats per interface.
*/
def networkStats: Map[String, AnyRef] = Try(NetInterfaces.get.invoke(sigar.get).asInstanceOf[Array[String]].map(arg
arg -> (createMethodFrom(sigar, "getNetInterfaceStat", Array(classOf[String])).get.invoke(sigar.get, arg))).toMap) getOrElse Map.empty[String, AnyRef]
/**
* Returns true if SIGAR is successfully installed on the classpath, otherwise false.
*/
def isSigar: Boolean = sigar.isDefined
/** /**
* Releases any native resources associated with this instance. * Releases any native resources associated with this instance.
*/ */
def close(): Unit = if (isSigar) Try(createMethodFrom(sigar, "close").get.invoke(sigar.get)) getOrElse Unit override def close(): Unit = Try(createMethodFrom(sigar, "close").get.invoke(sigar))
// FIXME network metrics needs thought and refactoring
/** /**
* Returns the max bytes for the given <code>method</code> in metric for <code>metric</code> from the network interface stats. * Returns the max bytes for the given <code>method</code> in metric for <code>metric</code> from the network interface stats.
*/ */
private def networkMaxFor(method: String, metric: String): Metric = Metric(metric, Try(Some(BigInt( private def networkFor(method: String, metric: String): Metric = Metric(
networkStats.collect { case (_, a) createMethodFrom(Some(a), method).get.invoke(a).asInstanceOf[Long] }.max))) getOrElse None) name = metric,
value = Try(networkStats.collect {
case (_, a)
createMethodFrom(a, method).get.invoke(a).asInstanceOf[Long]
}.max.asInstanceOf[Number]).toOption.orElse(None),
decay = decayOption)
private def createMethodFrom(ref: Option[AnyRef], method: String, types: Array[(Class[_])] = Array.empty[(Class[_])]): Option[Method] = /**
Try(ref.get.getClass.getMethod(method, types: _*)).toOption * Returns the network stats per interface.
*/
private def networkStats: Map[String, AnyRef] = Try(NetInterfaces.get.invoke(sigar).asInstanceOf[Array[String]].map(arg
arg -> (createMethodFrom(sigar, "getNetInterfaceStat", Array(classOf[String])).get.invoke(sigar, arg))).toMap) getOrElse Map.empty[String, AnyRef]
private def createMethodFrom(ref: AnyRef, method: String, types: Array[(Class[_])] = Array.empty[(Class[_])]): Option[Method] =
Try(ref.getClass.getMethod(method, types: _*)).toOption
} }
@ -687,16 +752,26 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe
* Companion object of MetricsCollector class. * Companion object of MetricsCollector class.
*/ */
private[cluster] object MetricsCollector { private[cluster] object MetricsCollector {
def apply(address: Address, log: LoggingAdapter, dynamicAccess: DynamicAccess): MetricsCollector = def apply(system: ExtendedActorSystem, settings: ClusterSettings): MetricsCollector = {
dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Seq.empty) match { import settings.{ MetricsCollectorClass fqcn }
case Success(identity) new MetricsCollector(Some(identity), address) def log = Logging(system, "MetricsCollector")
case Failure(e) if (fqcn == classOf[SigarMetricsCollector].getName) {
log.debug(e.toString) Try(new SigarMetricsCollector(system)) match {
log.info("Hyperic SIGAR was not found on the classpath or not installed properly. " + case Success(sigarCollector) sigarCollector
"Metrics will be retreived from MBeans, and may be incorrect on some platforms. " + case Failure(e)
"To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate" + log.info("Metrics will be retreived from MBeans, and may be incorrect on some platforms. " +
"platform-specific native libary to 'java.library.path'.") "To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate " +
new MetricsCollector(None, address) "platform-specific native libary to 'java.library.path'. Reason: " +
e.toString)
new JmxMetricsCollector(system)
}
} else {
system.dynamicAccess.createInstanceFor[MetricsCollector](
fqcn, Seq(classOf[ActorSystem] -> system)).recover({
case e throw new ConfigurationException("Could not create custom metrics collector [" + fqcn + "] due to:" + e.toString)
}).get
} }
}
} }

View file

@ -71,9 +71,16 @@ class ClusterSettings(val config: Config, val systemName: String) {
callTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.call-timeout"), MILLISECONDS), callTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.call-timeout"), MILLISECONDS),
resetTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.reset-timeout"), MILLISECONDS)) resetTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.reset-timeout"), MILLISECONDS))
final val MetricsEnabled: Boolean = getBoolean("akka.cluster.metrics.enabled") final val MetricsEnabled: Boolean = getBoolean("akka.cluster.metrics.enabled")
final val MetricsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.metrics-interval"), MILLISECONDS) final val MetricsCollectorClass: String = getString("akka.cluster.metrics.collector-class")
final val MetricsInterval: FiniteDuration = {
val d = Duration(getMilliseconds("akka.cluster.metrics.collect-interval"), MILLISECONDS)
require(d > Duration.Zero, "metrics.collect-interval must be > 0"); d
}
final val MetricsGossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.gossip-interval"), MILLISECONDS) final val MetricsGossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.gossip-interval"), MILLISECONDS)
final val MetricsRateOfDecay: Int = getInt("akka.cluster.metrics.rate-of-decay") final val MetricsRateOfDecay: Int = {
val n = getInt("akka.cluster.metrics.rate-of-decay")
require(n >= 1, "metrics.rate-of-decay must be >= 1"); n
}
} }
case class CircuitBreakerSettings(maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration) case class CircuitBreakerSettings(maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)

View file

@ -0,0 +1,271 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.ClusterMetricsChanged
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.NodeMetrics
import akka.cluster.NodeMetrics.MetricValues
import akka.dispatch.Dispatchers
import akka.event.Logging
import akka.routing.Broadcast
import akka.routing.Destination
import akka.routing.Resizer
import akka.routing.Route
import akka.routing.RouteeProvider
import akka.routing.RouterConfig
/**
* INTERNAL API
*/
private[cluster] object ClusterLoadBalancingRouter {
val defaultSupervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ SupervisorStrategy.Escalate
}
}
/**
* A Router that performs load balancing to cluster nodes based on
* cluster metric data.
*
* It uses random selection of routees based probabilities derived from
* the remaining capacity of corresponding node.
*
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical
* sense as this means that the router should both create new actors and use the 'routees'
* actor(s). In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* @param metricsSelector decides what probability to use for selecting a routee, based
* on remaining capacity as indicated by the node metrics
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
@SerialVersionUID(1L)
case class ClusterLoadBalancingRouter(
metricsSelector: MetricsSelector,
nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = ClusterLoadBalancingRouter.defaultSupervisorStrategy)
extends RouterConfig with ClusterLoadBalancingRouterLike {
/**
* Constructor that sets nrOfInstances to be created.
* Java API
* @param selector the selector is responsible for producing weighted mix of routees from the node metrics
* @param nr number of routees to create
*/
def this(selector: MetricsSelector, nr: Int) = this(metricsSelector = selector, nrOfInstances = nr)
/**
* Constructor that sets the routees to be used.
* Java API
* @param selector the selector is responsible for producing weighted mix of routees from the node metrics
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(selector: MetricsSelector, routeePaths: java.lang.Iterable[String]) =
this(metricsSelector = selector, routees = routeePaths.asScala)
/**
* Constructor that sets the resizer to be used.
* Java API
* @param selector the selector is responsible for producing weighted mix of routees from the node metrics
*/
def this(selector: MetricsSelector, resizer: Resizer) =
this(metricsSelector = selector, resizer = Some(resizer))
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String): ClusterLoadBalancingRouter =
copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy): ClusterLoadBalancingRouter =
copy(supervisorStrategy = strategy)
}
/**
* INTERNAL API.
*
* This strategy is a metrics-aware router which performs load balancing of
* cluster nodes based on cluster metric data. It consumes [[akka.cluster.ClusterMetricsChanged]]
* events and the [[akka.cluster.routing.MetricsSelector]] creates an mix of
* weighted routees based on the node metrics. Messages are routed randomly to the
* weighted routees, i.e. nodes with lower load are more likely to be used than nodes with
* higher load
*/
trait ClusterLoadBalancingRouterLike { this: RouterConfig
def metricsSelector: MetricsSelector
def nrOfInstances: Int
def routees: Iterable[String]
def routerDispatcher: String
override def createRoute(routeeProvider: RouteeProvider): Route = {
if (resizer.isEmpty) {
if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances)
else routeeProvider.registerRouteesFor(routees)
}
val log = Logging(routeeProvider.context.system, routeeProvider.context.self)
// Function that points to the routees to use, starts with the plain routees
// of the routeeProvider and then changes to the current weighted routees
// produced by the metricsSelector. The reason for using a function is that
// routeeProvider.routees can change.
@volatile var weightedRoutees: () IndexedSeq[ActorRef] = () routeeProvider.routees
// subscribe to ClusterMetricsChanged and update weightedRoutees
val metricsListener = routeeProvider.context.actorOf(Props(new Actor {
val cluster = Cluster(routeeProvider.context.system)
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterMetricsChanged])
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case ClusterMetricsChanged(metrics) receiveMetrics(metrics)
case _: CurrentClusterState // ignore
}
def receiveMetrics(metrics: Set[NodeMetrics]): Unit = {
val routees = metricsSelector.weightedRefs(routeeProvider.routees, cluster.selfAddress, metrics)
weightedRoutees = () routees
}
}).withDispatcher(routerDispatcher), name = "metricsListener")
def getNext(): ActorRef = {
val currentRoutees = weightedRoutees.apply
if (currentRoutees.isEmpty) routeeProvider.context.system.deadLetters
else currentRoutees(ThreadLocalRandom.current.nextInt(currentRoutees.size))
}
{
case (sender, message)
message match {
case Broadcast(msg) toAll(sender, routeeProvider.routees)
case msg List(Destination(sender, getNext()))
}
}
}
}
/**
* MetricsSelector that uses the heap metrics.
* Low heap capacity => lower weight.
*/
@SerialVersionUID(1L)
case object HeapMetricsSelector extends MetricsSelector {
/**
* Java API: get the singleton instance
*/
def getInstance = this
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = {
nodeMetrics.map { n
val (used, committed, max) = MetricValues.unapply(n.heapMemory)
val capacity = max match {
case None (committed - used).toDouble / committed
case Some(m) (m - used).toDouble / m
}
(n.address, capacity)
}.toMap
}
}
// FIXME implement more MetricsSelectors, such as CpuMetricsSelector,
// LoadAverageMetricsSelector, NetworkMetricsSelector.
// Also a CompositeMetricsSelector which uses a mix of other
// selectors.
/**
* A MetricsSelector is responsible for producing weighted mix of routees
* from the node metrics. The weights are typically proportional to the
* remaining capacity.
*/
abstract class MetricsSelector {
/**
* Remaining capacity for each node. The value is between
* 0.0 and 1.0, where 0.0 means no remaining capacity (full
* utilization) and 1.0 means full remaining capacity (zero
* utilization).
*/
def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double]
/**
* Converts the capacity values to weights. The node with lowest
* capacity gets weight 1 (lowest usable capacity is 1%) and other
* nodes gets weights proportional to their capacity compared to
* the node with lowest capacity.
*/
def weights(capacity: Map[Address, Double]): Map[Address, Int] = {
if (capacity.isEmpty) Map.empty[Address, Int]
else {
val (_, min) = capacity.minBy { case (_, c) c }
// lowest usable capacity is 1% (>= 0.5% will be rounded to weight 1), also avoids div by zero
val divisor = math.max(0.01, min)
capacity mapValues { c math.round((c) / divisor).toInt }
}
}
/**
* Allocates a list of actor refs according to the weight of their node, i.e.
* weight 3 of node A will allocate 3 slots for each ref with address A.
*/
def weightedRefs(refs: IndexedSeq[ActorRef], selfAddress: Address, weights: Map[Address, Int]): IndexedSeq[ActorRef] = {
def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
case Address(_, _, None, None) selfAddress
case a a
}
val w = weights.withDefaultValue(1)
refs.foldLeft(IndexedSeq.empty[ActorRef]) { (acc, ref)
acc ++ IndexedSeq.fill(w(fullAddress(ref)))(ref)
}
}
/**
* Combines the different pieces to allocate a list of weighted actor refs
* based on the node metrics.
*/
def weightedRefs(refs: IndexedSeq[ActorRef], selfAddress: Address, nodeMetrics: Set[NodeMetrics]): IndexedSeq[ActorRef] =
weightedRefs(refs, selfAddress, weights(capacity(nodeMetrics)))
}

View file

@ -31,6 +31,8 @@ class ClusterMetricsMultiJvmNode5 extends ClusterMetricsSpec
abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSpec) with MultiNodeClusterSpec with MetricSpec { abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSpec) with MultiNodeClusterSpec with MetricSpec {
import ClusterMetricsMultiJvmSpec._ import ClusterMetricsMultiJvmSpec._
def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]
"Cluster metrics" must { "Cluster metrics" must {
"periodically collect metrics on each node, publish ClusterMetricsChanged to the event stream, " + "periodically collect metrics on each node, publish ClusterMetricsChanged to the event stream, " +
"and gossip metrics around the node ring" taggedAs LongRunningTest in within(60 seconds) { "and gossip metrics around the node ring" taggedAs LongRunningTest in within(60 seconds) {
@ -38,9 +40,8 @@ abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSp
enterBarrier("cluster-started") enterBarrier("cluster-started")
awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size) awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size)
awaitCond(clusterView.clusterMetrics.size == roles.size) awaitCond(clusterView.clusterMetrics.size == roles.size)
assertInitialized(cluster.settings.MetricsRateOfDecay, collectNodeMetrics(clusterView.clusterMetrics).toSet) val collector = MetricsCollector(cluster.system, cluster.settings)
val collector = MetricsCollector(cluster.selfAddress, log, system.asInstanceOf[ExtendedActorSystem].dynamicAccess) collector.sample.metrics.size must be > (3)
clusterView.clusterMetrics.foreach(n assertExpectedSampleSize(collector.isSigar, cluster.settings.MetricsRateOfDecay, n))
enterBarrier("after") enterBarrier("after")
} }
"reflect the correct number of node metrics in cluster view" taggedAs LongRunningTest in within(30 seconds) { "reflect the correct number of node metrics in cluster view" taggedAs LongRunningTest in within(30 seconds) {

View file

@ -64,7 +64,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
def muteLog(sys: ActorSystem = system): Unit = { def muteLog(sys: ActorSystem = system): Unit = {
if (!sys.log.isDebugEnabled) { if (!sys.log.isDebugEnabled) {
Seq(".*Metrics collection has started successfully.*", Seq(".*Metrics collection has started successfully.*",
".*Hyperic SIGAR was not found on the classpath.*", ".*Metrics will be retreived from MBeans.*",
".*Cluster Node.* - registered cluster JMX MBean.*", ".*Cluster Node.* - registered cluster JMX MBean.*",
".*Cluster Node.* - is starting up.*", ".*Cluster Node.* - is starting up.*",
".*Shutting down cluster Node.*", ".*Shutting down cluster Node.*",

View file

@ -1,61 +0,0 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import akka.remote.testkit.{MultiNodeSpec, MultiNodeConfig}
import akka.testkit.{LongRunningTest, DefaultTimeout, ImplicitSender}
import akka.actor._
import akka.cluster.{ MemberStatus, MultiNodeClusterSpec }
import akka.cluster.routing.ClusterRoundRobinRoutedActorMultiJvmSpec.SomeActor
object ClusterAdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
// TODO - config
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class ClusterAdaptiveLoadBalancingRouterMultiJvmNode1 extends ClusterAdaptiveLoadBalancingRouterSpec
class ClusterAdaptiveLoadBalancingRouterMultiJvmNode2 extends ClusterAdaptiveLoadBalancingRouterSpec
class ClusterAdaptiveLoadBalancingRouterMultiJvmNode3 extends ClusterAdaptiveLoadBalancingRouterSpec
class ClusterAdaptiveLoadBalancingRouterMultiJvmNode4 extends ClusterAdaptiveLoadBalancingRouterSpec
class ClusterAdaptiveLoadBalancingRouterMultiJvmNode5 extends ClusterAdaptiveLoadBalancingRouterSpec
abstract class ClusterAdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(ClusterAdaptiveLoadBalancingRouterMultiJvmSpec)
with MultiNodeClusterSpec
with ImplicitSender with DefaultTimeout {
import ClusterAdaptiveLoadBalancingRouterMultiJvmSpec._
// TODO configure properly and leverage the other pending load balancing routers
lazy val router1 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(),
ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router1")
lazy val router2 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(),
ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router2")
lazy val router3 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(),
ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router3")
lazy val router4 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(),
ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router4")
lazy val router5 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(),
ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router5")
"A cluster with a ClusterAdaptiveLoadBalancingRouter" must {
"start cluster with 5 nodes" taggedAs LongRunningTest in {
awaitClusterUp(roles: _*)
enterBarrier("cluster-started")
awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size)
awaitCond(clusterView.clusterMetrics.size == roles.size)
enterBarrier("cluster-metrics-consumer-ready")
}
// TODO the rest of the necessary testing. All the work needed for consumption and extraction
// of the data needed is in ClusterMetricsCollector._
}
}

View file

@ -1,155 +0,0 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import language.implicitConversions
import language.postfixOps
import akka.actor._
import akka.cluster._
import akka.routing._
import akka.dispatch.Dispatchers
import akka.event.Logging
import java.util.concurrent.atomic.{ AtomicLong, AtomicReference }
import akka.routing.Destination
import akka.cluster.NodeMetrics
import akka.routing.Broadcast
import akka.actor.OneForOneStrategy
import akka.cluster.ClusterEvent.ClusterMetricsChanged
import akka.cluster.ClusterEvent.CurrentClusterState
import util.Try
import akka.cluster.NodeMetrics.{ NodeMetricsComparator, MetricValues }
import NodeMetricsComparator._
/**
* INTERNAL API.
*
* Trait that embodies the contract for all load balancing implementations.
*/
private[cluster] trait LoadBalancer {
/**
* Compares only those nodes that are deemed 'available' by the
* [[akka.routing.RouteeProvider]]
*/
def selectNodeByHealth(availableNodes: Set[NodeMetrics]): Option[Address]
}
/**
* INTERNAL API
*/
private[cluster] object ClusterLoadBalancingRouter {
val defaultSupervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ SupervisorStrategy.Escalate
}
}
/**
* INTERNAL API.
*
* The abstract consumer of [[akka.cluster.ClusterMetricsChanged]] events and the primary consumer
* of cluster metric data. This strategy is a metrics-aware router which performs load balancing of
* cluster nodes with a fallback strategy of a [[akka.routing.RoundRobinRouter]].
*
* Load balancing of nodes is based on .. etc etc desc forthcoming
*/
trait ClusterAdaptiveLoadBalancingRouterLike extends RoundRobinLike with LoadBalancer { this: RouterConfig
def routerDispatcher: String
override def createRoute(routeeProvider: RouteeProvider): Route = {
if (resizer.isEmpty) {
if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances)
else routeeProvider.registerRouteesFor(routees)
}
val log = Logging(routeeProvider.context.system, routeeProvider.context.self)
val next = new AtomicLong(0)
val nodeMetrics = new AtomicReference[Set[NodeMetrics]](Set.empty)
val metricsListener = routeeProvider.context.actorOf(Props(new Actor {
def receive = {
case ClusterMetricsChanged(metrics) receiveMetrics(metrics)
case _: CurrentClusterState // ignore
}
def receiveMetrics(metrics: Set[NodeMetrics]): Unit = {
val availableNodes = routeeProvider.routees.map(_.path.address).toSet
val updated: Set[NodeMetrics] = nodeMetrics.get.collect { case node if availableNodes contains node.address node }
nodeMetrics.set(updated)
}
override def postStop(): Unit = Cluster(routeeProvider.context.system) unsubscribe self
}).withDispatcher(routerDispatcher), name = "metricsListener")
Cluster(routeeProvider.context.system).subscribe(metricsListener, classOf[ClusterMetricsChanged])
def getNext(): ActorRef = {
// TODO use as/where you will... selects by health category based on the implementation
val address: Option[Address] = selectNodeByHealth(nodeMetrics.get)
// TODO actual routee selection. defaults to round robin.
routeeProvider.routees((next.getAndIncrement % routees.size).asInstanceOf[Int])
}
def routeTo(): ActorRef = if (routeeProvider.routees.isEmpty) routeeProvider.context.system.deadLetters else getNext()
{
case (sender, message)
message match {
case Broadcast(msg) toAll(sender, routeeProvider.routees)
case msg List(Destination(sender, routeTo()))
}
}
}
}
/**
* Selects by all monitored metric types (memory, network latency, cpu...) and
* chooses the healthiest node to route to.
*/
@SerialVersionUID(1L)
private[cluster] case class ClusterAdaptiveMetricsLoadBalancingRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = ClusterLoadBalancingRouter.defaultSupervisorStrategy)
extends RouterConfig with ClusterAdaptiveLoadBalancingRouterLike with MetricsAwareClusterNodeSelector {
// TODO
def selectNodeByHealth(nodes: Set[NodeMetrics]): Option[Address] = {
val s = Set(selectByMemory(nodes), selectByNetworkLatency(nodes), selectByCpu(nodes))
s.head // TODO select the Address that appears with the highest or lowest frequency
}
}
@SerialVersionUID(1L)
private[cluster] case class MemoryLoadBalancingRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = ClusterLoadBalancingRouter.defaultSupervisorStrategy)
extends RouterConfig with ClusterAdaptiveLoadBalancingRouterLike with MetricsAwareClusterNodeSelector {
def selectNodeByHealth(nodes: Set[NodeMetrics]): Option[Address] = selectByMemory(nodes)
}
@SerialVersionUID(1L)
private[cluster] case class CpuLoadBalancer(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = ClusterLoadBalancingRouter.defaultSupervisorStrategy)
extends RouterConfig with ClusterAdaptiveLoadBalancingRouterLike with MetricsAwareClusterNodeSelector {
// TODO
def selectNodeByHealth(nodes: Set[NodeMetrics]): Option[Address] = None
}
@SerialVersionUID(1L)
private[cluster] case class NetworkLatencyLoadBalancer(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = ClusterLoadBalancingRouter.defaultSupervisorStrategy)
extends RouterConfig with ClusterAdaptiveLoadBalancingRouterLike with MetricsAwareClusterNodeSelector {
// TODO
def selectNodeByHealth(nodes: Set[NodeMetrics]): Option[Address] = None
}

View file

@ -0,0 +1,162 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import language.postfixOps
import scala.concurrent.duration._
import java.lang.management.ManagementFactory
import akka.remote.testkit.{ MultiNodeSpec, MultiNodeConfig }
import akka.testkit.{ LongRunningTest, DefaultTimeout, ImplicitSender }
import akka.actor._
import akka.cluster.{ MemberStatus, MultiNodeClusterSpec }
import scala.concurrent.Await
import akka.routing.RouterRoutees
import akka.pattern.ask
import akka.routing.CurrentRoutees
import akka.cluster.Cluster
import scala.concurrent.forkjoin.ThreadLocalRandom
import com.typesafe.config.ConfigFactory
object ClusterLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {
class Routee extends Actor {
var usedMemory: Array[Byte] = _
def receive = {
case _ sender ! Reply(Cluster(context.system).selfAddress)
}
}
class Memory extends Actor with ActorLogging {
var usedMemory: Array[Byte] = _
def receive = {
case AllocateMemory
val heap = ManagementFactory.getMemoryMXBean.getHeapMemoryUsage
// getMax can be undefined (-1)
val max = math.max(heap.getMax, heap.getCommitted)
val used = heap.getUsed
val allocate = (0.8 * (max - used)).toInt
usedMemory = Array.fill(allocate)(ThreadLocalRandom.current.nextInt(127).toByte)
}
}
case object AllocateMemory
case class Reply(address: Address)
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
akka.cluster.metrics.collect-interval = 1s
akka.cluster.metrics.gossip-interval = 1s
""")).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class ClusterLoadBalancingRouterMultiJvmNode1 extends ClusterLoadBalancingRouterSpec
class ClusterLoadBalancingRouterMultiJvmNode2 extends ClusterLoadBalancingRouterSpec
class ClusterLoadBalancingRouterMultiJvmNode3 extends ClusterLoadBalancingRouterSpec
abstract class ClusterLoadBalancingRouterSpec extends MultiNodeSpec(ClusterLoadBalancingRouterMultiJvmSpec)
with MultiNodeClusterSpec
with ImplicitSender with DefaultTimeout {
import ClusterLoadBalancingRouterMultiJvmSpec._
def currentRoutees(router: ActorRef) =
Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees
def receiveReplies(expectedReplies: Int): Map[Address, Int] = {
val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0)
(receiveWhile(5 seconds, messages = expectedReplies) {
case Reply(address) address
}).foldLeft(zero) {
case (replyMap, address) replyMap + (address -> (replyMap(address) + 1))
}
}
/**
* Fills in self address for local ActorRef
*/
def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
case Address(_, _, None, None) cluster.selfAddress
case a a
}
def startRouter(name: String): ActorRef = {
val router = system.actorOf(Props[Routee].withRouter(ClusterRouterConfig(
local = ClusterLoadBalancingRouter(HeapMetricsSelector),
settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1))), name)
awaitCond {
// it may take some time until router receives cluster member events
currentRoutees(router).size == roles.size
}
currentRoutees(router).map(fullAddress).toSet must be(roles.map(address).toSet)
router
}
"A cluster with a ClusterLoadBalancingRouter" must {
"start cluster nodes" taggedAs LongRunningTest in {
awaitClusterUp(roles: _*)
enterBarrier("after-1")
}
"use all nodes in the cluster when not overloaded" taggedAs LongRunningTest in {
runOn(first) {
val router1 = startRouter("router1")
// collect some metrics before we start
Thread.sleep(10000)
val iterationCount = 100
for (i 0 until iterationCount) {
router1 ! "hit"
Thread.sleep(10)
}
val replies = receiveReplies(iterationCount)
replies(first) must be > (0)
replies(second) must be > (0)
replies(third) must be > (0)
replies.values.sum must be(iterationCount)
}
enterBarrier("after-2")
}
"prefer node with more free heap capacity" taggedAs LongRunningTest in {
System.gc()
enterBarrier("gc")
runOn(second) {
system.actorOf(Props[Memory], "memory") ! AllocateMemory
}
enterBarrier("heap-allocated")
runOn(first) {
val router2 = startRouter("router2")
router2
// collect some metrics before we start
Thread.sleep(10000)
val iterationCount = 100
for (i 0 until iterationCount) {
router2 ! "hit"
Thread.sleep(10)
}
val replies = receiveReplies(iterationCount)
replies(third) must be > (replies(second))
replies.values.sum must be(iterationCount)
}
enterBarrier("after-3")
}
}
}

View file

@ -47,6 +47,7 @@ class ClusterConfigSpec extends AkkaSpec {
callTimeout = 2 seconds, callTimeout = 2 seconds,
resetTimeout = 30 seconds)) resetTimeout = 30 seconds))
MetricsEnabled must be(true) MetricsEnabled must be(true)
MetricsCollectorClass must be("akka.cluster.SigarMetricsCollector")
MetricsInterval must be(3 seconds) MetricsInterval must be(3 seconds)
MetricsGossipInterval must be(3 seconds) MetricsGossipInterval must be(3 seconds)
MetricsRateOfDecay must be(10) MetricsRateOfDecay must be(10)

View file

@ -7,48 +7,74 @@ package akka.cluster
import language.postfixOps import language.postfixOps
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.testkit.{ LongRunningTest, AkkaSpec } import akka.testkit.{ LongRunningTest, AkkaSpec }
import scala.concurrent.forkjoin.ThreadLocalRandom
import System.currentTimeMillis
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DataStreamSpec extends AkkaSpec(MetricsEnabledSpec.config) with AbstractClusterMetricsSpec { class DataStreamSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricSpec with MetricsCollectorFactory {
import system.dispatcher import system.dispatcher
val collector = createMetricsCollector val collector = createMetricsCollector
"DataStream" must { "DataStream" must {
"calcualate same ewma for constant values" in {
val ds = DataStream(decay = 5, ewma = 100.0, currentTimeMillis, currentTimeMillis) :+
100.0 :+ 100.0 :+ 100.0
ds.ewma must be(100.0 plusOrMinus 0.001)
}
"calcualate correct ewma for normal decay" in {
val d0 = DataStream(decay = 10, ewma = 1000.0, currentTimeMillis, currentTimeMillis)
d0.ewma must be(1000.0 plusOrMinus 0.01)
val d1 = d0 :+ 10.0
d1.ewma must be(820.0 plusOrMinus 0.01)
val d2 = d1 :+ 10.0
d2.ewma must be(672.73 plusOrMinus 0.01)
val d3 = d2 :+ 10.0
d3.ewma must be(552.23 plusOrMinus 0.01)
val d4 = d3 :+ 10.0
d4.ewma must be(453.64 plusOrMinus 0.01)
val dn = (1 to 100).foldLeft(d0)((d, _) d :+ 10.0)
dn.ewma must be(10.0 plusOrMinus 0.1)
}
"calculate ewma for decay 1" in {
val d0 = DataStream(decay = 1, ewma = 100.0, currentTimeMillis, currentTimeMillis)
d0.ewma must be(100.0 plusOrMinus 0.01)
val d1 = d0 :+ 1.0
d1.ewma must be(1.0 plusOrMinus 0.01)
val d2 = d1 :+ 57.0
d2.ewma must be(57.0 plusOrMinus 0.01)
val d3 = d2 :+ 10.0
d3.ewma must be(10.0 plusOrMinus 0.01)
}
"calculate the ewma for multiple, variable, data streams" taggedAs LongRunningTest in { "calculate the ewma for multiple, variable, data streams" taggedAs LongRunningTest in {
val firstDataSet = collector.sample.metrics.collect { case m if m.trendable && m.isDefined m.initialize(DefaultRateOfDecay) } var streamingDataSet = Map.empty[String, Metric]
var streamingDataSet = firstDataSet var usedMemory = Array.empty[Byte]
(1 to 50) foreach { _
val cancellable = system.scheduler.schedule(0 seconds, 100 millis) { Thread.sleep(100)
streamingDataSet = collector.sample.metrics.flatMap(latest streamingDataSet.collect { usedMemory = usedMemory ++ Array.fill(1024)(ThreadLocalRandom.current.nextInt(127).toByte)
case streaming if (latest.trendable && latest.isDefined) && (latest same streaming) val changes = collector.sample.metrics.flatMap { latest
&& (latest.value.get != streaming.value.get) { streamingDataSet.get(latest.name) match {
val updatedDataStream = streaming.average.get :+ latest.value.get case None Some(latest)
updatedDataStream.timestamp must be > (streaming.average.get.timestamp) case Some(previous)
updatedDataStream.duration.length must be > (streaming.average.get.duration.length) if (latest.average.isDefined && latest.isDefined && latest.value.get != previous.value.get) {
updatedDataStream.ewma must not be (streaming.average.get.ewma) val updated = previous :+ latest
updatedDataStream.ewma must not be (latest.value.get) updated.average.isDefined must be(true)
streaming.copy(value = latest.value, average = Some(updatedDataStream)) val updatedAverage = updated.average.get
updatedAverage.timestamp must be > (previous.average.get.timestamp)
updatedAverage.duration.length must be > (previous.average.get.duration.length)
updatedAverage.ewma must not be (previous.average.get.ewma)
(previous.value.get.longValue compare latest.value.get.longValue) must be(
previous.average.get.ewma.longValue compare updatedAverage.ewma.longValue)
Some(updated)
} else None
} }
}) }
} streamingDataSet ++= changes.map(m m.name -> m).toMap
awaitCond(firstDataSet.size == streamingDataSet.size, longDuration)
cancellable.cancel()
val finalDataSet = streamingDataSet.map(m m.name -> m).toMap
firstDataSet map {
first
val newMetric = finalDataSet(first.name)
val e1 = first.average.get
val e2 = newMetric.average.get
if (first.value.get != newMetric.value.get) {
e2.ewma must not be (first.value.get)
e2.ewma must not be (newMetric.value.get)
}
if (first.value.get.longValue > newMetric.value.get.longValue) e1.ewma.longValue must be > e2.ewma.longValue
else if (first.value.get.longValue < newMetric.value.get.longValue) e1.ewma.longValue must be < e2.ewma.longValue
} }
} }
} }

View file

@ -5,39 +5,43 @@
package akka.cluster package akka.cluster
import akka.testkit.{ ImplicitSender, AkkaSpec } import akka.testkit.{ ImplicitSender, AkkaSpec }
import akka.cluster.NodeMetrics.MetricValues._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricNumericConverter with ImplicitSender with AbstractClusterMetricsSpec { class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricNumericConverter with ImplicitSender with MetricSpec
with MetricsCollectorFactory {
"MetricNumericConverter" must { "MetricNumericConverter" must {
val collector = createMetricsCollector val collector = createMetricsCollector
"convert " in { "convert " in {
convert(0).isLeft must be(true) convertNumber(0).isLeft must be(true)
convert(1).left.get must be(1) convertNumber(1).left.get must be(1)
convert(1L).isLeft must be(true) convertNumber(1L).isLeft must be(true)
convert(0.0).isRight must be(true) convertNumber(0.0).isRight must be(true)
} }
"define a new metric" in { "define a new metric" in {
val metric = Metric("heap-memory-used", Some(0L)) val metric = Metric(HeapMemoryUsed, Some(256L), decay = Some(10))
metric.initializable must be(true) metric.name must be(HeapMemoryUsed)
metric.name must not be (null) metric.isDefined must be(true)
metric.average.isEmpty must be(true) metric.value must be(Some(256L))
metric.trendable must be(true) metric.average.isDefined must be(true)
metric.average.get.ewma must be(256L)
if (collector.isSigar) { collector match {
val cores = collector.totalCores case c: SigarMetricsCollector
cores.isDefined must be(true) val cores = c.totalCores
cores.value.get.intValue must be > (0) cores.isDefined must be(true)
cores.initializable must be(false) cores.value.get.intValue must be > (0)
case _
} }
} }
"define an undefined value with a None " in { "define an undefined value with a None " in {
Metric("x", Some(-1)).value.isDefined must be(false) Metric("x", Some(-1), None).value.isDefined must be(false)
Metric("x", Some(java.lang.Double.NaN)).value.isDefined must be(false) Metric("x", Some(java.lang.Double.NaN), None).value.isDefined must be(false)
Metric("x", None).isDefined must be(false) Metric("x", None, None).isDefined must be(false)
} }
"recognize whether a metric value is defined" in { "recognize whether a metric value is defined" in {

View file

@ -4,17 +4,42 @@
package akka.cluster package akka.cluster
import scala.util.Try
import akka.actor.Address import akka.actor.Address
import akka.cluster.NodeMetrics.MetricValues._
import akka.testkit.AkkaSpec
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class NodeMetricsComparatorSpec extends ClusterMetricsRouteeSelectorSpec { class MetricValuesSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricSpec
with MetricsCollectorFactory {
import NodeMetrics._ import NodeMetrics._
val collector = createMetricsCollector
val node1 = NodeMetrics(Address("akka", "sys", "a", 2554), 1, collector.sample.metrics)
val node2 = NodeMetrics(Address("akka", "sys", "a", 2555), 1, collector.sample.metrics)
var nodes: Seq[NodeMetrics] = Seq(node1, node2)
// work up the data streams where applicable
for (i 1 to 100) {
nodes = nodes map {
n
n.copy(metrics = collector.sample.metrics.flatMap(latest n.metrics.collect {
case streaming if latest same streaming
streaming.average match {
case Some(e) streaming.copy(value = latest.value, average =
if (latest.isDefined) Some(e :+ latest.value.get.doubleValue) else None)
case None streaming.copy(value = latest.value)
}
}))
}
}
"NodeMetrics.MetricValues" must { "NodeMetrics.MetricValues" must {
"extract expected metrics for load balancing" in { "extract expected metrics for load balancing" in {
val stream1 = node2.metric("heap-memory-committed").value.get.longValue() val stream1 = node2.metric(HeapMemoryCommitted).value.get.longValue
val stream2 = node1.metric("heap-memory-used").value.get.longValue() val stream2 = node1.metric(HeapMemoryUsed).value.get.longValue
stream1 must be >= (stream2) stream1 must be >= (stream2)
} }
@ -39,8 +64,9 @@ class NodeMetricsComparatorSpec extends ClusterMetricsRouteeSelectorSpec {
} }
val (systemLoadAverage, processors, combinedCpu, cores) = MetricValues.unapply(node.cpu) val (systemLoadAverage, processors, combinedCpu, cores) = MetricValues.unapply(node.cpu)
systemLoadAverage must be >= (0.0)
processors must be > (0) processors must be > (0)
if (systemLoadAverage.isDefined)
systemLoadAverage.get must be >= (0.0)
if (combinedCpu.isDefined) { if (combinedCpu.isDefined) {
combinedCpu.get must be <= (1.0) combinedCpu.get must be <= (1.0)
combinedCpu.get must be >= (0.0) combinedCpu.get must be >= (0.0)
@ -53,29 +79,4 @@ class NodeMetricsComparatorSpec extends ClusterMetricsRouteeSelectorSpec {
} }
} }
"NodeMetricsComparator" must {
val seq = Seq((Address("akka", "sys", "a", 2554), 9L), (Address("akka", "sys", "a", 2555), 8L))
val seq2 = Seq((Address("akka", "sys", "a", 2554), 9.0), (Address("akka", "sys", "a", 2555), 8.0))
"handle min ordering of a (Address, Long)" in {
import NodeMetricsComparator.longMinAddressOrdering
seq.min._1.port.get must be(2555)
seq.min._2 must be(8L)
}
"handle max ordering of a (Address, Long)" in {
val (address, value) = NodeMetricsComparator.maxAddressLong(seq)
value must be(9L)
address.port.get must be(2554)
}
"handle min ordering of a (Address, Double)" in {
import NodeMetricsComparator.doubleMinAddressOrdering
seq2.min._1.port.get must be(2555)
seq2.min._2 must be(8.0)
}
"handle max ordering of a (Address, Double)" in {
val (address, value) = NodeMetricsComparator.maxAddressDouble(seq2)
value must be(9.0)
address.port.get must be(2554)
}
}
} }

View file

@ -1,76 +0,0 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.testkit.AkkaSpec
import akka.actor.Address
import akka.cluster.NodeMetrics.MetricValues
import util.control.NonFatal
import util.Try
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricsAwareClusterNodeSelectorSpec extends ClusterMetricsRouteeSelectorSpec with MetricsAwareClusterNodeSelector {
import NodeMetrics.NodeMetricsComparator.longMinAddressOrdering
"MetricsAwareClusterNodeSelector" must {
"select the address of the node with the lowest memory" in {
for (i 0 to 10) { // run enough times to insure we test differing metric values
val map = nodes.map(n n.address -> MetricValues.unapply(n.heapMemory)).toMap
val (used1, committed1, max1) = map.get(node1.address).get
val (used2, committed2, max2) = map.get(node2.address).get
val diff1 = max1 match {
case Some(m) ((committed1 - used1) + (m - used1) + (m - committed1))
case None committed1 - used1
}
val diff2 = max2 match {
case Some(m) ((committed2 - used2) + (m - used2) + (m - committed2))
case None committed2 - used2
}
val testMin = Set(diff1, diff2).min
val expectedAddress = if (testMin == diff1) node1.address else node2.address
val address = selectByMemory(nodes.toSet).get
address must be(expectedAddress)
}
}
"select the address of the node with the lowest network latency" in {
// TODO
}
"select the address of the node with the best CPU health" in {
// TODO
}
"select the address of the node with the best overall health based on all metric categories monitored" in {
// TODO
}
}
}
abstract class ClusterMetricsRouteeSelectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with AbstractClusterMetricsSpec {
val collector = createMetricsCollector
var node1 = NodeMetrics(Address("akka", "sys", "a", 2554), 1, collector.sample.metrics)
node1 = node1.copy(metrics = node1.metrics.collect { case m m.initialize(DefaultRateOfDecay) })
var node2 = NodeMetrics(Address("akka", "sys", "a", 2555), 1, collector.sample.metrics)
node2 = node2.copy(metrics = node2.metrics.collect { case m m.initialize(DefaultRateOfDecay) })
var nodes: Seq[NodeMetrics] = Seq(node1, node2)
// work up the data streams where applicable
for (i 0 to samples) {
nodes = nodes map {
n
n.copy(metrics = collector.sample.metrics.flatMap(latest n.metrics.collect {
case streaming if latest same streaming
streaming.average match {
case Some(e) streaming.copy(value = latest.value, average = Try(Some(e :+ latest.value.get)) getOrElse None)
case None streaming.copy(value = latest.value)
}
}))
}
}
}

View file

@ -7,17 +7,18 @@ package akka.cluster
import scala.language.postfixOps import scala.language.postfixOps
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.Await import scala.concurrent.Await
import scala.util.{ Success, Try, Failure }
import akka.actor._ import akka.actor._
import akka.testkit._ import akka.testkit._
import akka.cluster.NodeMetrics.MetricValues._
import org.scalatest.WordSpec import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import util.{ Success, Try, Failure }
object MetricsEnabledSpec { object MetricsEnabledSpec {
val config = """ val config = """
akka.cluster.metrics.enabled = on akka.cluster.metrics.enabled = on
akka.cluster.metrics.metrics-interval = 1 s akka.cluster.metrics.collect-interval = 1 s
akka.cluster.metrics.gossip-interval = 1 s akka.cluster.metrics.gossip-interval = 1 s
akka.cluster.metrics.rate-of-decay = 10 akka.cluster.metrics.rate-of-decay = 10
akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.actor.provider = "akka.remote.RemoteActorRefProvider"
@ -25,22 +26,16 @@ object MetricsEnabledSpec {
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with AbstractClusterMetricsSpec with MetricSpec { class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with MetricSpec
with MetricsCollectorFactory {
import system.dispatcher import system.dispatcher
val collector = createMetricsCollector val collector = createMetricsCollector
"Metric must" must { "Metric must" must {
"create and initialize a new metric or merge an existing one" in {
for (i 0 to samples) {
val metrics = collector.sample.metrics
assertCreatedUninitialized(metrics)
assertInitialized(DefaultRateOfDecay, metrics map (_.initialize(DefaultRateOfDecay)))
}
}
"merge 2 metrics that are tracking the same metric" in { "merge 2 metrics that are tracking the same metric" in {
for (i 0 to samples) { for (i 1 to 20) {
val sample1 = collector.sample.metrics val sample1 = collector.sample.metrics
val sample2 = collector.sample.metrics val sample2 = collector.sample.metrics
var merged = sample2 flatMap (latest sample1 collect { var merged = sample2 flatMap (latest sample1 collect {
@ -51,8 +46,8 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl
} }
}) })
val sample3 = collector.sample.metrics map (_.initialize(DefaultRateOfDecay)) val sample3 = collector.sample.metrics
val sample4 = collector.sample.metrics map (_.initialize(DefaultRateOfDecay)) val sample4 = collector.sample.metrics
merged = sample4 flatMap (latest sample3 collect { merged = sample4 flatMap (latest sample3 collect {
case peer if latest same peer { case peer if latest same peer {
val m = peer :+ latest val m = peer :+ latest
@ -74,62 +69,61 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl
"collect accurate metrics for a node" in { "collect accurate metrics for a node" in {
val sample = collector.sample val sample = collector.sample
assertExpectedSampleSize(collector.isSigar, DefaultRateOfDecay, sample)
val metrics = sample.metrics.collect { case m if m.isDefined (m.name, m.value.get) } val metrics = sample.metrics.collect { case m if m.isDefined (m.name, m.value.get) }
val used = metrics collectFirst { case ("heap-memory-used", b) b } val used = metrics collectFirst { case (HeapMemoryUsed, b) b }
val committed = metrics collectFirst { case ("heap-memory-committed", b) b } val committed = metrics collectFirst { case (HeapMemoryCommitted, b) b }
metrics foreach { metrics foreach {
case ("total-cores", b) b.intValue must be > (0) case (TotalCores, b) b.intValue must be > (0)
case ("network-max-rx", b) b.longValue must be > (0L) case (NetworkInboundRate, b) b.longValue must be > (0L)
case ("network-max-tx", b) b.longValue must be > (0L) case (NetworkOutboundRate, b) b.longValue must be > (0L)
case ("system-load-average", b) b.doubleValue must be >= (0.0) case (SystemLoadAverage, b) b.doubleValue must be >= (0.0)
case ("processors", b) b.intValue must be >= (0) case (Processors, b) b.intValue must be >= (0)
case ("heap-memory-used", b) b.longValue must be >= (0L) case (HeapMemoryUsed, b) b.longValue must be >= (0L)
case ("heap-memory-committed", b) b.longValue must be > (0L) case (HeapMemoryCommitted, b) b.longValue must be > (0L)
case ("cpu-combined", b) case (HeapMemoryMax, b)
b.doubleValue must be <= (1.0) b.longValue must be > (0L)
b.doubleValue must be >= (0.0)
case ("heap-memory-max", b)
used.get.longValue must be <= (b.longValue) used.get.longValue must be <= (b.longValue)
committed.get.longValue must be <= (b.longValue) committed.get.longValue must be <= (b.longValue)
case (CpuCombined, b)
b.doubleValue must be <= (1.0)
b.doubleValue must be >= (0.0)
} }
} }
"collect SIGAR metrics if it is on the classpath" in { "collect SIGAR metrics if it is on the classpath" in {
if (collector.isSigar) { collector match {
// combined cpu may or may not be defined on a given sampling case c: SigarMetricsCollector
// systemLoadAverage is SIGAR present // combined cpu may or may not be defined on a given sampling
collector.systemLoadAverage.isDefined must be(true) // systemLoadAverage is not present on all platforms
collector.networkStats.nonEmpty must be(true) c.networkMaxRx.isDefined must be(true)
collector.networkMaxRx.isDefined must be(true) c.networkMaxTx.isDefined must be(true)
collector.networkMaxTx.isDefined must be(true) c.totalCores.isDefined must be(true)
collector.totalCores.isDefined must be(true) case _
} }
} }
"collect JMX metrics" in { "collect JMX metrics" in {
// heap max may be undefined depending on the OS // heap max may be undefined depending on the OS
// systemLoadAverage is JMX if SIGAR not present, but not available on all OS // systemLoadAverage is JMX when SIGAR not present, but
collector.used.isDefined must be(true) // it's not present on all platforms
collector.committed.isDefined must be(true) val c = collector.asInstanceOf[JmxMetricsCollector]
collector.processors.isDefined must be(true) c.heapUsed.isDefined must be(true)
c.heapCommitted.isDefined must be(true)
c.processors.isDefined must be(true)
} }
"collect [" + samples + "] node metrics samples in an acceptable duration" taggedAs LongRunningTest in { "collect 50 node metrics samples in an acceptable duration" taggedAs LongRunningTest in within(7 seconds) {
val latch = TestLatch(samples) (1 to 50) foreach { _
val task = system.scheduler.schedule(0 seconds, interval) {
val sample = collector.sample val sample = collector.sample
assertCreatedUninitialized(sample.metrics) sample.metrics.size must be >= (3)
assertExpectedSampleSize(collector.isSigar, DefaultRateOfDecay, sample) Thread.sleep(100)
latch.countDown()
} }
Await.ready(latch, longDuration)
task.cancel()
} }
} }
} }
trait MetricSpec extends WordSpec with MustMatchers { trait MetricSpec extends WordSpec with MustMatchers { this: { def system: ActorSystem }
def assertMasterMetricsAgainstGossipMetrics(master: Set[NodeMetrics], gossip: MetricsGossip): Unit = { def assertMasterMetricsAgainstGossipMetrics(master: Set[NodeMetrics], gossip: MetricsGossip): Unit = {
val masterMetrics = collectNodeMetrics(master) val masterMetrics = collectNodeMetrics(master)
@ -140,52 +134,22 @@ trait MetricSpec extends WordSpec with MustMatchers {
def assertExpectedNodeAddresses(gossip: MetricsGossip, nodes: Set[NodeMetrics]): Unit = def assertExpectedNodeAddresses(gossip: MetricsGossip, nodes: Set[NodeMetrics]): Unit =
gossip.nodes.map(_.address) must be(nodes.map(_.address)) gossip.nodes.map(_.address) must be(nodes.map(_.address))
def assertExpectedSampleSize(isSigar: Boolean, gossip: MetricsGossip): Unit =
gossip.nodes.foreach(n assertExpectedSampleSize(isSigar, gossip.rateOfDecay, n))
def assertCreatedUninitialized(gossip: MetricsGossip): Unit =
gossip.nodes.foreach(n assertCreatedUninitialized(n.metrics.filterNot(_.trendable)))
def assertInitialized(gossip: MetricsGossip): Unit =
gossip.nodes.foreach(n assertInitialized(gossip.rateOfDecay, n.metrics))
def assertCreatedUninitialized(metrics: Set[Metric]): Unit = {
metrics.size must be > (0)
metrics foreach { m
m.average.isEmpty must be(true)
if (m.value.isDefined) m.isDefined must be(true)
if (m.initializable) (m.trendable && m.isDefined && m.average.isEmpty) must be(true)
}
}
def assertInitialized(decay: Int, metrics: Set[Metric]): Unit = if (decay > 0) metrics.filter(_.trendable) foreach { m
m.initializable must be(false)
if (m.isDefined) m.average.isDefined must be(true)
}
def assertMerged(latest: Metric, peer: Metric, merged: Metric): Unit = if (latest same peer) { def assertMerged(latest: Metric, peer: Metric, merged: Metric): Unit = if (latest same peer) {
if (latest.isDefined) { if (latest.isDefined) {
if (peer.isDefined) { if (peer.isDefined) {
merged.isDefined must be(true) merged.isDefined must be(true)
merged.value.get must be(latest.value.get) merged.value.get must be(latest.value.get)
if (latest.trendable) { merged.average.isDefined must be(latest.average.isDefined)
if (latest.initializable) merged.average.isEmpty must be(true)
else merged.average.isDefined must be(true)
}
} else { } else {
merged.isDefined must be(true) merged.isDefined must be(true)
merged.value.get must be(latest.value.get) merged.value.get must be(latest.value.get)
if (latest.average.isDefined) merged.average.get must be(latest.average.get) merged.average.isDefined must be(latest.average.isDefined || peer.average.isDefined)
else merged.average.isEmpty must be(true)
} }
} else { } else {
if (peer.isDefined) { if (peer.isDefined) {
merged.isDefined must be(true) merged.isDefined must be(true)
merged.value.get must be(peer.value.get) merged.value.get must be(peer.value.get)
if (peer.trendable) { merged.average.isDefined must be(peer.average.isDefined)
if (peer.initializable) merged.average.isEmpty must be(true)
else merged.average.isDefined must be(true)
}
} else { } else {
merged.isDefined must be(false) merged.isDefined must be(false)
merged.average.isEmpty must be(true) merged.average.isEmpty must be(true)
@ -193,20 +157,6 @@ trait MetricSpec extends WordSpec with MustMatchers {
} }
} }
def assertExpectedSampleSize(isSigar: Boolean, decay: Int, node: NodeMetrics): Unit = {
node.metrics.size must be(9)
val metrics = node.metrics.filter(_.isDefined)
if (isSigar) { // combined cpu + jmx max heap
metrics.size must be >= (7)
metrics.size must be <= (9)
} else { // jmx max heap
metrics.size must be >= (4)
metrics.size must be <= (5)
}
if (decay > 0) metrics.collect { case m if m.trendable && (!m.initializable) m }.foreach(_.average.isDefined must be(true))
}
def collectNodeMetrics(nodes: Set[NodeMetrics]): Seq[Metric] = { def collectNodeMetrics(nodes: Set[NodeMetrics]): Seq[Metric] = {
var r: Seq[Metric] = Seq.empty var r: Seq[Metric] = Seq.empty
nodes.foreach(n r ++= n.metrics.filter(_.isDefined)) nodes.foreach(n r ++= n.metrics.filter(_.isDefined))
@ -214,19 +164,25 @@ trait MetricSpec extends WordSpec with MustMatchers {
} }
} }
trait AbstractClusterMetricsSpec extends DefaultTimeout { /**
this: AkkaSpec * Used when testing metrics without full cluster
*/
trait MetricsCollectorFactory { this: AkkaSpec
val selfAddress = new Address("akka", "localhost") private def extendedActorSystem = system.asInstanceOf[ExtendedActorSystem]
val DefaultRateOfDecay = 10 def selfAddress = extendedActorSystem.provider.rootPath.address
val interval: FiniteDuration = 100 millis val defaultRateOfDecay = 10
val longDuration = 120 seconds // for long running tests def createMetricsCollector: MetricsCollector =
Try(new SigarMetricsCollector(selfAddress, defaultRateOfDecay, extendedActorSystem.dynamicAccess)) match {
case Success(sigarCollector) sigarCollector
case Failure(e)
log.debug("Metrics will be retreived from MBeans, Sigar failed to load. Reason: " +
e.getMessage)
new JmxMetricsCollector(selfAddress, defaultRateOfDecay)
}
val samples = 100 def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]
}
def createMetricsCollector: MetricsCollector = MetricsCollector(selfAddress, log, system.asInstanceOf[ExtendedActorSystem].dynamicAccess)
}

View file

@ -12,7 +12,8 @@ import akka.actor.Address
import java.lang.System.{ currentTimeMillis newTimestamp } import java.lang.System.{ currentTimeMillis newTimestamp }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with AbstractClusterMetricsSpec with MetricSpec { class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with MetricSpec
with MetricsCollectorFactory {
val collector = createMetricsCollector val collector = createMetricsCollector
@ -21,27 +22,25 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics) val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics) val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
var localGossip = MetricsGossip(DefaultRateOfDecay) var localGossip = MetricsGossip()
localGossip :+= m1 localGossip :+= m1
localGossip.nodes.size must be(1) localGossip.nodes.size must be(1)
localGossip.nodeKeys.size must be(localGossip.nodes.size) localGossip.nodeKeys.size must be(localGossip.nodes.size)
assertMasterMetricsAgainstGossipMetrics(Set(m1), localGossip) assertMasterMetricsAgainstGossipMetrics(Set(m1), localGossip)
assertExpectedSampleSize(collector.isSigar, localGossip) collector.sample.metrics.size must be > (3)
assertInitialized(localGossip.rateOfDecay, collectNodeMetrics(localGossip.nodes).toSet)
localGossip :+= m2 localGossip :+= m2
localGossip.nodes.size must be(2) localGossip.nodes.size must be(2)
localGossip.nodeKeys.size must be(localGossip.nodes.size) localGossip.nodeKeys.size must be(localGossip.nodes.size)
assertMasterMetricsAgainstGossipMetrics(Set(m1, m2), localGossip) assertMasterMetricsAgainstGossipMetrics(Set(m1, m2), localGossip)
assertExpectedSampleSize(collector.isSigar, localGossip) collector.sample.metrics.size must be > (3)
assertInitialized(localGossip.rateOfDecay, collectNodeMetrics(localGossip.nodes).toSet)
} }
"merge peer metrics" in { "merge peer metrics" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics) val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics) val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
var remoteGossip = MetricsGossip(DefaultRateOfDecay) var remoteGossip = MetricsGossip()
remoteGossip :+= m1 remoteGossip :+= m1
remoteGossip :+= m2 remoteGossip :+= m2
remoteGossip.nodes.size must be(2) remoteGossip.nodes.size must be(2)
@ -51,7 +50,7 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
remoteGossip :+= m2Updated // merge peers remoteGossip :+= m2Updated // merge peers
remoteGossip.nodes.size must be(2) remoteGossip.nodes.size must be(2)
assertMasterMetricsAgainstGossipMetrics(beforeMergeNodes, remoteGossip) assertMasterMetricsAgainstGossipMetrics(beforeMergeNodes, remoteGossip)
assertExpectedSampleSize(collector.isSigar, remoteGossip) remoteGossip.nodes.foreach(_.metrics.size must be > (3))
remoteGossip.nodes collect { case peer if peer.address == m2.address peer.timestamp must be(m2Updated.timestamp) } remoteGossip.nodes collect { case peer if peer.address == m2.address peer.timestamp must be(m2Updated.timestamp) }
} }
@ -61,11 +60,11 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val m3 = NodeMetrics(Address("akka", "sys", "a", 2556), newTimestamp, collector.sample.metrics) val m3 = NodeMetrics(Address("akka", "sys", "a", 2556), newTimestamp, collector.sample.metrics)
val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = newTimestamp) val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = newTimestamp)
var localGossip = MetricsGossip(DefaultRateOfDecay) var localGossip = MetricsGossip()
localGossip :+= m1 localGossip :+= m1
localGossip :+= m2 localGossip :+= m2
var remoteGossip = MetricsGossip(DefaultRateOfDecay) var remoteGossip = MetricsGossip()
remoteGossip :+= m3 remoteGossip :+= m3
remoteGossip :+= m2Updated remoteGossip :+= m2Updated
@ -76,15 +75,13 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val mergedGossip = localGossip merge remoteGossip val mergedGossip = localGossip merge remoteGossip
mergedGossip.nodes.size must be(3) mergedGossip.nodes.size must be(3)
assertExpectedNodeAddresses(mergedGossip, Set(m1, m2, m3)) assertExpectedNodeAddresses(mergedGossip, Set(m1, m2, m3))
assertExpectedSampleSize(collector.isSigar, mergedGossip) mergedGossip.nodes.foreach(_.metrics.size must be > (3))
assertCreatedUninitialized(mergedGossip)
assertInitialized(mergedGossip)
mergedGossip.nodes.find(_.address == m2.address).get.timestamp must be(m2Updated.timestamp) mergedGossip.nodes.find(_.address == m2.address).get.timestamp must be(m2Updated.timestamp)
} }
"get the current NodeMetrics if it exists in the local nodes" in { "get the current NodeMetrics if it exists in the local nodes" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics) val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
var localGossip = MetricsGossip(DefaultRateOfDecay) var localGossip = MetricsGossip()
localGossip :+= m1 localGossip :+= m1
localGossip.metricsFor(m1).nonEmpty must be(true) localGossip.metricsFor(m1).nonEmpty must be(true)
} }
@ -93,7 +90,7 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics) val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics) val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
var localGossip = MetricsGossip(DefaultRateOfDecay) var localGossip = MetricsGossip()
localGossip :+= m1 localGossip :+= m1
localGossip :+= m2 localGossip :+= m2

View file

@ -8,7 +8,8 @@ import akka.testkit.AkkaSpec
import akka.actor.Address import akka.actor.Address
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class NodeMetricsSpec extends AkkaSpec with AbstractClusterMetricsSpec with MetricSpec { class NodeMetricsSpec extends AkkaSpec with MetricSpec
with MetricsCollectorFactory {
val collector = createMetricsCollector val collector = createMetricsCollector

View file

@ -0,0 +1,114 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import akka.testkit.AkkaSpec
import akka.actor.Address
import akka.actor.RootActorPath
import akka.cluster.NodeMetrics
import akka.cluster.NodeMetrics.MetricValues._
import akka.cluster.Metric
import com.typesafe.config.ConfigFactory
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class HeapMetricsSelectorSpec extends AkkaSpec(ConfigFactory.parseString("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.netty.port = 0
""")) {
val selector = HeapMetricsSelector
val a1 = Address("akka", "sys", "a1", 2551)
val b1 = Address("akka", "sys", "b1", 2551)
val c1 = Address("akka", "sys", "c1", 2551)
val d1 = Address("akka", "sys", "d1", 2551)
val decay = Some(10)
val nodeMetricsA = NodeMetrics(a1, System.currentTimeMillis, Set(
Metric(HeapMemoryUsed, Some(BigInt(128)), decay),
Metric(HeapMemoryCommitted, Some(BigInt(256)), decay),
Metric(HeapMemoryMax, Some(BigInt(512)), None)))
val nodeMetricsB = NodeMetrics(b1, System.currentTimeMillis, Set(
Metric(HeapMemoryUsed, Some(BigInt(256)), decay),
Metric(HeapMemoryCommitted, Some(BigInt(512)), decay),
Metric(HeapMemoryMax, Some(BigInt(1024)), None)))
val nodeMetricsC = NodeMetrics(c1, System.currentTimeMillis, Set(
Metric(HeapMemoryUsed, Some(BigInt(1024)), decay),
Metric(HeapMemoryCommitted, Some(BigInt(1024)), decay),
Metric(HeapMemoryMax, Some(BigInt(1024)), None)))
val nodeMetrics = Set(nodeMetricsA, nodeMetricsB, nodeMetricsC)
"MetricsAwareClusterNodeSelector" must {
"calculate capacity of heap metrics" in {
val capacity = selector.capacity(nodeMetrics)
capacity(a1) must be(0.75 plusOrMinus 0.0001)
capacity(b1) must be(0.75 plusOrMinus 0.0001)
capacity(c1) must be(0.0 plusOrMinus 0.0001)
}
"calculate weights from capacity" in {
val capacity = Map(a1 -> 0.6, b1 -> 0.3, c1 -> 0.1)
val weights = selector.weights(capacity)
weights must be(Map(c1 -> 1, b1 -> 3, a1 -> 6))
}
"handle low and zero capacity" in {
val capacity = Map(a1 -> 0.0, b1 -> 1.0, c1 -> 0.005, d1 -> 0.004)
val weights = selector.weights(capacity)
weights must be(Map(a1 -> 0, b1 -> 100, c1 -> 1, d1 -> 0))
}
"allocate weighted refs" in {
val weights = Map(a1 -> 1, b1 -> 3, c1 -> 10)
val refs = IndexedSeq(
system.actorFor(RootActorPath(a1) / "user" / "a"),
system.actorFor(RootActorPath(b1) / "user" / "b"),
system.actorFor(RootActorPath(c1) / "user" / "c"))
val result = selector.weightedRefs(refs, a1, weights)
val grouped = result.groupBy(_.path.address)
grouped(a1).size must be(1)
grouped(b1).size must be(3)
grouped(c1).size must be(10)
}
"allocate refs for undefined weight" in {
val weights = Map(a1 -> 1, b1 -> 2)
val refs = IndexedSeq(
system.actorFor(RootActorPath(a1) / "user" / "a"),
system.actorFor(RootActorPath(b1) / "user" / "b"),
system.actorFor(RootActorPath(c1) / "user" / "c"))
val result = selector.weightedRefs(refs, a1, weights)
val grouped = result.groupBy(_.path.address)
grouped(a1).size must be(1)
grouped(b1).size must be(2)
grouped(c1).size must be(1)
}
"allocate weighted local refs" in {
val weights = Map(a1 -> 2, b1 -> 1, c1 -> 10)
val refs = IndexedSeq(
testActor,
system.actorFor(RootActorPath(b1) / "user" / "b"),
system.actorFor(RootActorPath(c1) / "user" / "c"))
val result = selector.weightedRefs(refs, a1, weights)
result.filter(_ == testActor).size must be(2)
}
"not allocate ref with weight zero" in {
val weights = Map(a1 -> 0, b1 -> 2, c1 -> 10)
val refs = IndexedSeq(
system.actorFor(RootActorPath(a1) / "user" / "a"),
system.actorFor(RootActorPath(b1) / "user" / "b"),
system.actorFor(RootActorPath(c1) / "user" / "c"))
val result = selector.weightedRefs(refs, a1, weights)
result.filter(_ == refs.head).size must be(0)
}
}
}