diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf
index d75ddd0a00..8539e56b1c 100644
--- a/akka-cluster/src/main/resources/reference.conf
+++ b/akka-cluster/src/main/resources/reference.conf
@@ -70,7 +70,7 @@ akka {
failure-detector {
# FQCN of the failure detector implementation.
- # It must implement akka.cluster.akka.cluster and
+ # It must implement akka.cluster.cluster.FailureDetector and
# have constructor with akka.actor.ActorSystem and
# akka.cluster.ClusterSettings parameters
implementation-class = "akka.cluster.AccrualFailureDetector"
@@ -118,14 +118,22 @@ akka {
# Enable or disable metrics collector for load-balancing nodes.
enabled = on
+ # FQCN of the metrics collector implementation.
+ # It must implement akka.cluster.cluster.MetricsCollector and
+ # have constructor with akka.actor.ActorSystem parameter.
+ collector-class = "akka.cluster.SigarMetricsCollector"
+
# How often metrics is sampled on a node.
- metrics-interval = 3s
+ collect-interval = 3s
# How often a node publishes metrics information.
gossip-interval = 3s
# How quickly the exponential weighting of past data is decayed compared to
- # new data. Set higher to increase the bias toward newer values.
+ # new data. Set lower to increase the bias toward newer values.
+ # Corresponds to 'N time periods' as explained in
+ # http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
+ # Must be >= 1
rate-of-decay = 10
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala
index 07016b098c..ab3b6f5777 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala
@@ -4,29 +4,36 @@
package akka.cluster
-import scala.language.postfixOps
-import scala.concurrent.duration._
-import scala.collection.immutable.{ SortedSet, Map }
-import scala.concurrent.forkjoin.ThreadLocalRandom
-import scala.util.{ Try, Success, Failure }
-import math.{ ScalaNumber, ScalaNumericConversions }
-import scala.runtime.{ RichLong, RichDouble, RichInt }
-
-import akka.actor._
-import akka.event.LoggingAdapter
-import akka.cluster.MemberStatus.Up
-
-import java.lang.management.{ OperatingSystemMXBean, MemoryMXBean, ManagementFactory }
-import java.lang.reflect.Method
+import java.io.Closeable
import java.lang.System.{ currentTimeMillis ⇒ newTimestamp }
+import java.lang.management.{ OperatingSystemMXBean, MemoryMXBean, ManagementFactory }
+import java.lang.reflect.InvocationTargetException
+import java.lang.reflect.Method
+
+import scala.collection.immutable.{ SortedSet, Map }
+import scala.concurrent.duration._
+import scala.concurrent.forkjoin.ThreadLocalRandom
+import scala.runtime.{ RichLong, RichDouble, RichInt }
+import scala.util.{ Try, Success, Failure }
+
+import akka.ConfigurationException
+import akka.actor.Actor
+import akka.actor.ActorLogging
+import akka.actor.ActorRef
+import akka.actor.ActorSystem
+import akka.actor.Address
+import akka.actor.DynamicAccess
+import akka.actor.ExtendedActorSystem
+import akka.cluster.MemberStatus.Up
+import akka.event.Logging
/**
* INTERNAL API.
*
- * This strategy is primarily for load-balancing of nodes. It controls metrics sampling
+ * Cluster metrics is primarily for load-balancing of nodes. It controls metrics sampling
* at a regular frequency, prepares highly variable data for further analysis by other entities,
- * and publishes the latest cluster metrics data around the node ring to assist in determining
- * the need to redirect traffic to the least-loaded nodes.
+ * and publishes the latest cluster metrics data around the node ring and local eventStream
+ * to assist in determining the need to redirect traffic to the least-loaded nodes.
*
* Metrics sampling is delegated to the [[akka.cluster.MetricsCollector]].
*
@@ -43,8 +50,6 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
import cluster.{ selfAddress, scheduler, settings }
import settings._
- val DefaultRateOfDecay: Int = 10
-
/**
* The node ring gossipped that contains only members that are Up.
*/
@@ -53,12 +58,12 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
/**
* The latest metric values with their statistical data.
*/
- var latestGossip: MetricsGossip = MetricsGossip(if (MetricsRateOfDecay <= 0) DefaultRateOfDecay else MetricsRateOfDecay)
+ var latestGossip: MetricsGossip = MetricsGossip()
/**
* The metrics collector that samples data on the node.
*/
- val collector: MetricsCollector = MetricsCollector(selfAddress, log, context.system.asInstanceOf[ExtendedActorSystem].dynamicAccess)
+ val collector: MetricsCollector = MetricsCollector(context.system.asInstanceOf[ExtendedActorSystem], settings)
/**
* Start periodic gossip to random nodes in cluster
@@ -160,7 +165,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
*
* @param nodes metrics per node
*/
-private[cluster] case class MetricsGossip(rateOfDecay: Int, nodes: Set[NodeMetrics] = Set.empty) {
+private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics] = Set.empty) {
/**
* Removes nodes if their correlating node ring members are not [[akka.cluster.MemberStatus.Up]]
@@ -194,11 +199,10 @@ private[cluster] case class MetricsGossip(rateOfDecay: Int, nodes: Set[NodeMetri
val names = previous map (_.name)
val (toMerge: Set[Metric], unseen: Set[Metric]) = data.metrics partition (a ⇒ names contains a.name)
- val initialized = unseen.map(_.initialize(rateOfDecay))
val merged = toMerge flatMap (latest ⇒ previous.collect { case peer if latest same peer ⇒ peer :+ latest })
val refreshed = nodes filterNot (_.address == data.address)
- copy(nodes = refreshed + data.copy(metrics = initialized ++ merged))
+ copy(nodes = refreshed + data.copy(metrics = unseen ++ merged))
}
/**
@@ -231,6 +235,8 @@ private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: Metrics
* INTERNAL API
*
* @param decay sets how quickly the exponential weighting decays for past data compared to new data
+ * Corresponds to 'N time periods' as explained in
+ * http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
*
* @param ewma the current exponentially weighted moving average, e.g. Y(n - 1), or,
* the sampled value resulting from the previous smoothing iteration.
@@ -240,36 +246,30 @@ private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: Metrics
*
* @param startTime the time of initial sampling for this data stream
*/
-private[cluster] case class DataStream(decay: Int, ewma: ScalaNumericConversions, startTime: Long, timestamp: Long)
- extends ClusterMessage with MetricNumericConverter {
+private[cluster] case class DataStream(decay: Int, ewma: Double, startTime: Long, timestamp: Long)
+ extends ClusterMessage {
+
+ require(decay >= 1, "Rate of decay must be >= 1")
/**
* The rate at which the weights of past observations
* decay as they become more distant.
*/
- private val α = 2 / decay + 1
+ private val α = 2.0 / (decay + 1)
/**
* Calculates the exponentially weighted moving average for a given monitored data set.
- * The datam can be too large to fit into an int or long, thus we use ScalaNumericConversions,
- * and defer to BigInt or BigDecimal.
- *
- * FIXME - look into the math: the new ewma becomes the new value
- * value: 416979936, prev ewma: 416581760, new ewma: 416979936
*
* @param xn the new data point
*
* @return a [[akka.cluster.DataStream]] with the updated yn and timestamp
*/
- def :+(xn: ScalaNumericConversions): DataStream = if (xn != ewma) convert(xn) fold (
- nl ⇒ copy(ewma = BigInt((α * nl) + (1 - α) * ewma.longValue()), timestamp = newTimestamp),
- nd ⇒ copy(ewma = BigDecimal((α * nd) + (1 - α) * ewma.doubleValue()), timestamp = newTimestamp))
- else this
+ def :+(xn: Double): DataStream = copy(ewma = (α * xn) + (1 - α) * ewma, timestamp = newTimestamp)
/**
* The duration of observation for this data stream
*/
- def duration: FiniteDuration = (timestamp - startTime) millis
+ def duration: FiniteDuration = (timestamp - startTime).millis
}
@@ -278,19 +278,17 @@ private[cluster] case class DataStream(decay: Int, ewma: ScalaNumericConversions
*
* @param name the metric name
*
- * @param value the metric value, which may or may not be defined
+ * @param value the metric value, which may or may not be defined, it must be a valid numerical value,
+ * see [[akka.cluster.MetricNumericConverter.defined()]]
*
* @param average the data stream of the metric value, for trending over time. Metrics that are already
* averages (e.g. system load average) or finite (e.g. as total cores), are not trended.
*/
-private[cluster] case class Metric(name: String, value: Option[ScalaNumericConversions], average: Option[DataStream])
+private[cluster] case class Metric private (name: String, value: Option[Number], average: Option[DataStream],
+ dummy: Boolean) // dummy because of overloading clash with apply
extends ClusterMessage with MetricNumericConverter {
- /**
- * Returns the metric with a new data stream for data trending if eligible,
- * otherwise returns the unchanged metric.
- */
- def initialize(decay: Int): Metric = if (initializable) copy(average = Some(DataStream(decay, value.get, newTimestamp, newTimestamp))) else this
+ require(value.isEmpty || defined(value.get), "Invalid Metric [%s] value [%]".format(name, value))
/**
* If defined ( [[akka.cluster.MetricNumericConverter.defined()]] ), updates the new
@@ -298,36 +296,27 @@ private[cluster] case class Metric(name: String, value: Option[ScalaNumericConve
*/
def :+(latest: Metric): Metric = latest.value match {
case Some(v) if this same latest ⇒ average match {
- case Some(previous) ⇒ copy(value = Some(v), average = Some(previous :+ v))
- case None if latest.average.isDefined ⇒ copy(value = Some(v), average = latest.average)
- case None if !latest.average.isDefined ⇒ copy(value = Some(v))
+ case Some(previous) ⇒ copy(value = latest.value, average = Some(previous :+ v.doubleValue))
+ case None if latest.average.isDefined ⇒ copy(value = latest.value, average = latest.average)
+ case None if latest.average.isEmpty ⇒ copy(value = latest.value)
}
case None ⇒ this
}
+ def isDefined: Boolean = value.isDefined
+
/**
- * @see [[akka.cluster.MetricNumericConverter.defined()]]
+ * The numerical value of the average, if defined, otherwise the latest value,
+ * if defined.
*/
- def isDefined: Boolean = value match {
- case Some(a) ⇒ defined(a)
- case None ⇒ false
- }
+ def averageValue: Option[Double] =
+ average map (_.ewma) orElse value.map(_.doubleValue)
/**
* Returns true if that is tracking the same metric as this.
*/
def same(that: Metric): Boolean = name == that.name
- /**
- * Returns true if the metric requires initialization.
- */
- def initializable: Boolean = trendable && isDefined && average.isEmpty
-
- /**
- * Returns true if the metric is a value applicable for trending.
- */
- def trendable: Boolean = !(Metric.noStream contains name)
-
}
/**
@@ -336,65 +325,30 @@ private[cluster] case class Metric(name: String, value: Option[ScalaNumericConve
* Companion object of Metric class.
*/
private[cluster] object Metric extends MetricNumericConverter {
+ import NodeMetrics.MetricValues._
- /**
- * The metrics that are already averages or finite are not trended over time.
- */
- private val noStream = Set("system-load-average", "total-cores", "processors")
+ private def definedValue(value: Option[Number]): Option[Number] =
+ if (value.isDefined && defined(value.get)) value else None
/**
* Evaluates validity of value based on whether it is available (SIGAR on classpath)
* or defined for the OS (JMX). If undefined we set the value option to None and do not modify
* the latest sampled metric to avoid skewing the statistical trend.
+ *
+ * @param decay rate of decay for values applicable for trending
*/
- def apply(name: String, value: Option[ScalaNumericConversions]): Metric = value match {
- case Some(v) if defined(v) ⇒ Metric(name, value, None)
- case _ ⇒ Metric(name, None, None)
+ def apply(name: String, value: Option[Number], decay: Option[Int]): Metric = {
+ val dv = definedValue(value)
+ val average =
+ if (decay.isDefined && dv.isDefined) {
+ val now = newTimestamp
+ Some(DataStream(decay.get, dv.get.doubleValue, now, now))
+ } else
+ None
+ new Metric(name, dv, average, true)
}
-}
-
-/**
- * Reusable logic for particular metric categories or to leverage all for routing.
- */
-private[cluster] trait MetricsAwareClusterNodeSelector {
- import NodeMetrics._
- import NodeMetrics.NodeMetricsComparator._
-
- /**
- * Returns the address of the available node with the lowest cumulative difference
- * between heap memory max and used/committed.
- */
- def selectByMemory(nodes: Set[NodeMetrics]): Option[Address] = Try(Some(nodes.map {
- n ⇒
- val (used, committed, max) = MetricValues.unapply(n.heapMemory)
- (n.address, max match {
- case Some(m) ⇒ ((committed - used) + (m - used) + (m - committed))
- case None ⇒ committed - used
- })
- }.min._1)) getOrElse None
-
- // TODO
- def selectByNetworkLatency(nodes: Set[NodeMetrics]): Option[Address] = None
- /* Try(nodes.map {
- n ⇒
- val (rxBytes, txBytes) = MetricValues.unapply(n.networkLatency).get
- (n.address, (rxBytes + txBytes))
- }.min._1) getOrElse None // TODO: min or max
- */
-
- // TODO
- def selectByCpu(nodes: Set[NodeMetrics]): Option[Address] = None
- /* Try(nodes.map {
- n ⇒
- val (loadAverage, processors, combinedCpu, cores) = MetricValues.unapply(n.cpu)
- var cumulativeDifference = 0
- // TODO: calculate
- combinedCpu.get
- cores.get
- (n.address, cumulativeDifference)
- }.min._1) getOrElse None // TODO min or max
- }*/
+ def apply(name: String): Metric = new Metric(name, None, None, true)
}
/**
@@ -417,6 +371,7 @@ private[cluster] trait MetricsAwareClusterNodeSelector {
*/
private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) extends ClusterMessage {
import NodeMetrics._
+ import NodeMetrics.MetricValues._
/**
* Returns the most recent data.
@@ -436,13 +391,13 @@ private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metri
/**
* Of all the data streams, this fluctuates the most.
*/
- def heapMemory: HeapMemory = HeapMemory(metric("heap-memory-used"), metric("heap-memory-committed"), metric("heap-memory-max"))
+ def heapMemory: HeapMemory = HeapMemory(metric(HeapMemoryUsed), metric(HeapMemoryCommitted), metric(HeapMemoryMax))
- def networkLatency: NetworkLatency = NetworkLatency(metric("network-max-rx"), metric("network-max-tx"))
+ def networkLatency: NetworkLatency = NetworkLatency(metric(NetworkInboundRate), metric(NetworkOutboundRate))
- def cpu: Cpu = Cpu(metric("system-load-average"), metric("processors"), metric("cpu-combined"), metric("total-cores"))
+ def cpu: Cpu = Cpu(metric(SystemLoadAverage), metric(Processors), metric(CpuCombined), metric(TotalCores))
- def metric(key: String): Metric = metrics.collectFirst { case m if m.name == key ⇒ m } getOrElse Metric(key, None)
+ def metric(key: String): Metric = metrics.collectFirst { case m if m.name == key ⇒ m } getOrElse Metric(key)
}
@@ -456,44 +411,36 @@ private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metri
*/
private[cluster] object NodeMetrics {
- object NodeMetricsComparator extends MetricNumericConverter {
-
- implicit val longMinOrdering: Ordering[Long] = Ordering.fromLessThan[Long] { (a, b) ⇒ (a < b) }
-
- implicit val longMinAddressOrdering: Ordering[(Address, Long)] = new Ordering[(Address, Long)] {
- def compare(a: (Address, Long), b: (Address, Long)): Int = longMinOrdering.compare(a._2, b._2)
- }
-
- def maxAddressLong(seq: Seq[(Address, Long)]): (Address, Long) =
- seq.reduceLeft((a: (Address, Long), b: (Address, Long)) ⇒ if (a._2 > b._2) a else b)
-
- implicit val doubleMinOrdering: Ordering[Double] = Ordering.fromLessThan[Double] { (a, b) ⇒ (a < b) }
-
- implicit val doubleMinAddressOrdering: Ordering[(Address, Double)] = new Ordering[(Address, Double)] {
- def compare(a: (Address, Double), b: (Address, Double)): Int = doubleMinOrdering.compare(a._2, b._2)
- }
-
- def maxAddressDouble(seq: Seq[(Address, Double)]): (Address, Double) =
- seq.reduceLeft((a: (Address, Double), b: (Address, Double)) ⇒ if (a._2 > b._2) a else b)
- }
-
sealed trait MetricValues
object MetricValues {
+ final val HeapMemoryUsed = "heap-memory-used"
+ final val HeapMemoryCommitted = "heap-memory-committed"
+ final val HeapMemoryMax = "heap-memory-max"
+ final val NetworkInboundRate = "network-max-inbound"
+ final val NetworkOutboundRate = "network-max-outbound"
+ final val SystemLoadAverage = "system-load-average"
+ final val Processors = "processors"
+ final val CpuCombined = "cpu-combined"
+ final val TotalCores = "total-cores"
+
def unapply(v: HeapMemory): Tuple3[Long, Long, Option[Long]] =
- (v.used.average.get.ewma.longValue(),
- v.committed.average.get.ewma.longValue(),
- Try(Some(v.max.average.get.ewma.longValue())) getOrElse None)
+ (v.used.averageValue.get.longValue,
+ v.committed.averageValue.get.longValue,
+ v.max.averageValue map (_.longValue) orElse None)
def unapply(v: NetworkLatency): Option[(Long, Long)] =
- Try(Some(v.maxRxIO.average.get.ewma.longValue(), v.maxTxIO.average.get.ewma.longValue())) getOrElse None
+ (v.inbound.averageValue, v.outbound.averageValue) match {
+ case (Some(a), Some(b)) ⇒ Some((a.longValue, b.longValue))
+ case _ ⇒ None
+ }
- def unapply(v: Cpu): Tuple4[Double, Int, Option[Double], Option[Int]] =
- (v.systemLoadAverage.value.get.doubleValue(),
- v.processors.value.get.intValue(),
- Try(Some(v.combinedCpu.average.get.ewma.doubleValue())) getOrElse None,
- Try(Some(v.cores.value.get.intValue())) getOrElse None)
+ def unapply(v: Cpu): Tuple4[Option[Double], Int, Option[Double], Option[Int]] =
+ (v.systemLoadAverage.averageValue map (_.doubleValue),
+ v.processors.averageValue.get.intValue,
+ v.combinedCpu.averageValue map (_.doubleValue),
+ v.cores.averageValue map (_.intValue))
}
/**
@@ -505,14 +452,17 @@ private[cluster] object NodeMetrics {
* @param max the maximum amount of memory (in bytes) that can be used for JVM memory management.
* Can be undefined on some OS.
*/
- case class HeapMemory(used: Metric, committed: Metric, max: Metric) extends MetricValues
+ case class HeapMemory(used: Metric, committed: Metric, max: Metric) extends MetricValues {
+ require(used.isDefined, "used must be defined")
+ require(committed.isDefined, "committed must be defined")
+ }
/**
- * @param maxRxIO the max network IO rx value, in bytes
+ * @param inbound the inbound network IO rate, in bytes
*
- * @param maxTxIO the max network IO tx value, in bytes
+ * @param outbound the outbound network IO rate, in bytes
*/
- case class NetworkLatency(maxRxIO: Metric, maxTxIO: Metric) extends MetricValues
+ case class NetworkLatency(inbound: Metric, outbound: Metric) extends MetricValues
/**
* @param systemLoadAverage OS-specific average load on the CPUs in the system, for the past 1 minute
@@ -524,7 +474,9 @@ private[cluster] object NodeMetrics {
*
* @param cores the number of cores (multi-core: per processor)
*/
- private[cluster] case class Cpu(systemLoadAverage: Metric, processors: Metric, combinedCpu: Metric, cores: Metric) extends MetricValues
+ private[cluster] case class Cpu(systemLoadAverage: Metric, processors: Metric, combinedCpu: Metric, cores: Metric) extends MetricValues {
+ require(processors.isDefined, "processors must be defined")
+ }
}
@@ -537,44 +489,152 @@ private[cluster] object NodeMetrics {
private[cluster] trait MetricNumericConverter {
/**
- * A defined value is neither a -1 or NaN/Infinite:
+ * An undefined value is neither a -1 (negative) or NaN/Infinite:
*
- JMX system load average and max heap can be 'undefined' for certain OS, in which case a -1 is returned
* - SIGAR combined CPU can occasionally return a NaN or Infinite (known bug)
*/
- def defined(value: ScalaNumericConversions): Boolean =
- convert(value) fold (a ⇒ value.underlying != -1, b ⇒ !(b.isNaN || b.isInfinite))
+ def defined(value: Number): Boolean = convertNumber(value) match {
+ case Left(a) ⇒ a >= 0
+ case Right(b) ⇒ !(b.isNaN || b.isInfinite)
+ }
/**
* May involve rounding or truncation.
*/
- def convert(from: ScalaNumericConversions): Either[Long, Double] = from match {
- case n: BigInt ⇒ Left(n.longValue())
- case n: BigDecimal ⇒ Right(n.doubleValue())
+ def convertNumber(from: Any): Either[Long, Double] = from match {
+ case n: Int ⇒ Left(n)
+ case n: Long ⇒ Left(n)
+ case n: Double ⇒ Right(n)
+ case n: Float ⇒ Right(n)
case n: RichInt ⇒ Left(n.abs)
case n: RichLong ⇒ Left(n.self)
case n: RichDouble ⇒ Right(n.self)
+ case n: BigInt ⇒ Left(n.longValue)
+ case n: BigDecimal ⇒ Right(n.doubleValue)
+ case x ⇒ throw new IllegalArgumentException("Not a number [%s]" format x)
}
}
+/**
+ * INTERNAL API
+ */
+private[cluster] trait MetricsCollector extends Closeable {
+ /**
+ * Samples and collects new data points.
+ */
+ def sample: NodeMetrics
+}
+
/**
* INTERNAL API
*
- * Loads JVM metrics through JMX monitoring beans. If Hyperic SIGAR is on the classpath, this
- * loads wider and more accurate range of metrics in combination with SIGAR's native OS library.
- *
- * FIXME switch to Scala reflection
- *
- * @param sigar the optional org.hyperic.Sigar instance
+ * Loads JVM metrics through JMX monitoring beans.
*
* @param address The [[akka.actor.Address]] of the node being sampled
+ * @param decay how quickly the exponential weighting of past data is decayed
*/
-private[cluster] class MetricsCollector private (private val sigar: Option[AnyRef], address: Address) extends MetricNumericConverter {
+private[cluster] class JmxMetricsCollector(address: Address, decay: Int) extends MetricsCollector {
+ import NodeMetrics.MetricValues._
+
+ private def this(cluster: Cluster) =
+ this(cluster.selfAddress, cluster.settings.MetricsRateOfDecay)
+
+ def this(system: ActorSystem) = this(Cluster(system))
+
+ private val decayOption = Some(decay)
private val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean
private val osMBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean
+ /**
+ * Samples and collects new data points.
+ */
+ def sample: NodeMetrics = NodeMetrics(address, newTimestamp, Set(
+ systemLoadAverage, heapUsed, heapCommitted, heapMax, processors))
+
+ /**
+ * JMX Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
+ * On some systems the JMX OS system load average may not be available, in which case a -1 is returned,
+ * which means that the returned Metric is undefined.
+ */
+ def systemLoadAverage: Metric = Metric(
+ name = SystemLoadAverage,
+ value = Some(osMBean.getSystemLoadAverage),
+ decay = None)
+
+ /**
+ * (JMX) Returns the number of available processors
+ */
+ def processors: Metric = Metric(
+ name = Processors,
+ value = Some(osMBean.getAvailableProcessors),
+ decay = None)
+
+ // FIXME those three heap calls should be done at once
+
+ /**
+ * (JMX) Returns the current sum of heap memory used from all heap memory pools (in bytes).
+ */
+ def heapUsed: Metric = Metric(
+ name = HeapMemoryUsed,
+ value = Some(memoryMBean.getHeapMemoryUsage.getUsed),
+ decay = decayOption)
+
+ /**
+ * (JMX) Returns the current sum of heap memory guaranteed to be available to the JVM
+ * from all heap memory pools (in bytes).
+ */
+ def heapCommitted: Metric = Metric(
+ name = HeapMemoryCommitted,
+ value = Some(memoryMBean.getHeapMemoryUsage.getCommitted),
+ decay = decayOption)
+
+ /**
+ * (JMX) Returns the maximum amount of memory (in bytes) that can be used
+ * for JVM memory management. If not defined the metrics value is None, i.e.
+ * never negative.
+ */
+ def heapMax: Metric = Metric(
+ name = HeapMemoryMax,
+ value = Some(memoryMBean.getHeapMemoryUsage.getMax),
+ decay = None)
+
+ def close(): Unit = ()
+
+}
+
+/**
+ * INTERNAL API
+ *
+ * Loads metrics through Hyperic SIGAR and JMX monitoring beans. This
+ * loads wider and more accurate range of metrics compared to JmxMetricsCollector
+ * by using SIGAR's native OS library.
+ *
+ * The constructor will by design throw exception if org.hyperic.sigar.Sigar can't be loaded, due
+ * to missing classes or native libraries.
+ *
+ * TODO switch to Scala reflection
+ *
+ * @param address The [[akka.actor.Address]] of the node being sampled
+ * @param decay how quickly the exponential weighting of past data is decayed
+ * @param sigar the org.hyperic.Sigar instance
+ */
+private[cluster] class SigarMetricsCollector(address: Address, decay: Int, sigar: AnyRef)
+ extends JmxMetricsCollector(address, decay) {
+ import NodeMetrics.MetricValues._
+
+ def this(address: Address, decay: Int, dynamicAccess: DynamicAccess) =
+ this(address, decay, dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Seq.empty).get)
+
+ private def this(cluster: Cluster) =
+ this(cluster.selfAddress, cluster.settings.MetricsRateOfDecay, cluster.system.dynamicAccess)
+
+ def this(system: ActorSystem) = this(Cluster(system))
+
+ private val decayOption = Some(decay)
+
private val LoadAverage: Option[Method] = createMethodFrom(sigar, "getLoadAverage")
private val CpuList: Option[Method] = createMethodFrom(sigar, "getCpuInfoList").map(m ⇒ m)
@@ -585,48 +645,41 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe
private val CombinedCpu: Option[Method] = Try(Cpu.get.getReturnType.getMethod("getCombined")).toOption
+ private val Pid: Option[Method] = createMethodFrom(sigar, "getPid")
+
+ // Do something initially, in constructor, to make sure that the native library can be loaded.
+ // This will by design throw exception if sigar isn't usable
+ val pid: Long = createMethodFrom(sigar, "getPid") match {
+ case Some(method) ⇒
+ try method.invoke(sigar).asInstanceOf[Long] catch {
+ case e: InvocationTargetException if e.getCause.isInstanceOf[LinkageError] ⇒
+ // native libraries not in place
+ // don't throw fatal LinkageError, but something less harmless
+ throw new IllegalArgumentException(e.getCause.toString)
+ case e: InvocationTargetException ⇒ throw e.getCause
+ }
+ case None ⇒ throw new IllegalArgumentException("Wrong version of Sigar, expected 'getPid' method")
+ }
+
/**
* Samples and collects new data points.
- *
- * @return [[akka.cluster.NodeMetrics]]
*/
- def sample: NodeMetrics = NodeMetrics(address, newTimestamp, Set(cpuCombined, totalCores,
- systemLoadAverage, used, committed, max, processors, networkMaxRx, networkMaxTx))
+ override def sample: NodeMetrics = NodeMetrics(address, newTimestamp, Set(cpuCombined, totalCores,
+ systemLoadAverage, heapUsed, heapCommitted, heapMax, processors, networkMaxRx, networkMaxTx))
/**
* (SIGAR / JMX) Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
- * On some systems the JMX OS system load average may not be available, in which case a Metric with
- * undefined value is returned.
+ * On some systems the JMX OS system load average may not be available, in which case a -1 is returned,
+ * which means that the returned Metric is undefined.
* Hyperic SIGAR provides more precise values, thus, if the library is on the classpath, it is the default.
*/
- def systemLoadAverage: Metric = Metric("system-load-average",
- Try(LoadAverage.get.invoke(sigar.get).asInstanceOf[Array[Double]].toSeq.head).getOrElse(
- osMBean.getSystemLoadAverage) match {
- case x if x < 0 ⇒ None // load average may be unavailable on some platform
- case x ⇒ Some(BigDecimal(x))
- })
-
- /**
- * (JMX) Returns the number of available processors
- */
- def processors: Metric = Metric("processors", Some(BigInt(osMBean.getAvailableProcessors)))
-
- /**
- * (JMX) Returns the current sum of heap memory used from all heap memory pools (in bytes).
- */
- def used: Metric = Metric("heap-memory-used", Some(BigInt(memoryMBean.getHeapMemoryUsage.getUsed)))
-
- /**
- * (JMX) Returns the current sum of heap memory guaranteed to be available to the JVM
- * from all heap memory pools (in bytes).
- */
- def committed: Metric = Metric("heap-memory-committed", Some(BigInt(memoryMBean.getHeapMemoryUsage.getCommitted)))
-
- /**
- * (JMX) Returns the maximum amount of memory (in bytes) that can be used
- * for JVM memory management. If undefined, returns -1.
- */
- def max: Metric = Metric("heap-memory-max", Some(BigInt(memoryMBean.getHeapMemoryUsage.getMax)))
+ override def systemLoadAverage: Metric = {
+ val m = Metric(
+ name = SystemLoadAverage,
+ value = Try(LoadAverage.get.invoke(sigar).asInstanceOf[Array[AnyRef]].head.asInstanceOf[Number]).toOption,
+ decay = None)
+ if (m.isDefined) m else super.systemLoadAverage
+ }
/**
* (SIGAR) Returns the combined CPU sum of User + Sys + Nice + Wait, in percentage. This metric can describe
@@ -636,49 +689,61 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe
* In the data stream, this will sometimes return with a valid metric value, and sometimes as a NaN or Infinite.
* Documented bug https://bugzilla.redhat.com/show_bug.cgi?id=749121 and several others.
*/
- def cpuCombined: Metric = Metric("cpu-combined", Try(BigDecimal(CombinedCpu.get.invoke(Cpu.get.invoke(sigar.get)).asInstanceOf[Double])).toOption)
+ def cpuCombined: Metric = Metric(
+ name = CpuCombined,
+ value = Try(CombinedCpu.get.invoke(Cpu.get.invoke(sigar)).asInstanceOf[Number]).toOption,
+ decay = decayOption)
/**
* FIXME: Array[Int].head - expose all if cores per processor might differ.
* (SIGAR) Returns the total number of cores.
+ *
+ * FIXME do we need this information, isn't it enough with jmx processors?
*/
- def totalCores: Metric = Metric("total-cores", Try(BigInt(CpuList.get.invoke(sigar.get).asInstanceOf[Array[AnyRef]].map(cpu ⇒
- createMethodFrom(Some(cpu), "getTotalCores").get.invoke(cpu).asInstanceOf[Int]).head)).toOption)
+ def totalCores: Metric = Metric(
+ name = TotalCores,
+ value = Try(CpuList.get.invoke(sigar).asInstanceOf[Array[AnyRef]].map(cpu ⇒
+ createMethodFrom(cpu, "getTotalCores").get.invoke(cpu).asInstanceOf[Number]).head).toOption,
+ decay = None)
+
+ // FIXME those two network calls should be combined into one
/**
* (SIGAR) Returns the max network IO read/write value, in bytes, for network latency evaluation.
*/
- def networkMaxRx: Metric = networkMaxFor("getRxBytes", "network-max-rx")
+ def networkMaxRx: Metric = networkFor("getRxBytes", NetworkInboundRate)
/**
- * (SIGAR) Returns the max network IO tx value, in bytes.
+ * (SIGAR) Returns the max network IO outbound value, in bytes.
*/
- def networkMaxTx: Metric = networkMaxFor("getTxBytes", "network-max-tx")
-
- /**
- * Returns the network stats per interface.
- */
- def networkStats: Map[String, AnyRef] = Try(NetInterfaces.get.invoke(sigar.get).asInstanceOf[Array[String]].map(arg ⇒
- arg -> (createMethodFrom(sigar, "getNetInterfaceStat", Array(classOf[String])).get.invoke(sigar.get, arg))).toMap) getOrElse Map.empty[String, AnyRef]
-
- /**
- * Returns true if SIGAR is successfully installed on the classpath, otherwise false.
- */
- def isSigar: Boolean = sigar.isDefined
+ def networkMaxTx: Metric = networkFor("getTxBytes", NetworkOutboundRate)
/**
* Releases any native resources associated with this instance.
*/
- def close(): Unit = if (isSigar) Try(createMethodFrom(sigar, "close").get.invoke(sigar.get)) getOrElse Unit
+ override def close(): Unit = Try(createMethodFrom(sigar, "close").get.invoke(sigar))
+
+ // FIXME network metrics needs thought and refactoring
/**
* Returns the max bytes for the given method in metric for metric from the network interface stats.
*/
- private def networkMaxFor(method: String, metric: String): Metric = Metric(metric, Try(Some(BigInt(
- networkStats.collect { case (_, a) ⇒ createMethodFrom(Some(a), method).get.invoke(a).asInstanceOf[Long] }.max))) getOrElse None)
+ private def networkFor(method: String, metric: String): Metric = Metric(
+ name = metric,
+ value = Try(networkStats.collect {
+ case (_, a) ⇒
+ createMethodFrom(a, method).get.invoke(a).asInstanceOf[Long]
+ }.max.asInstanceOf[Number]).toOption.orElse(None),
+ decay = decayOption)
- private def createMethodFrom(ref: Option[AnyRef], method: String, types: Array[(Class[_])] = Array.empty[(Class[_])]): Option[Method] =
- Try(ref.get.getClass.getMethod(method, types: _*)).toOption
+ /**
+ * Returns the network stats per interface.
+ */
+ private def networkStats: Map[String, AnyRef] = Try(NetInterfaces.get.invoke(sigar).asInstanceOf[Array[String]].map(arg ⇒
+ arg -> (createMethodFrom(sigar, "getNetInterfaceStat", Array(classOf[String])).get.invoke(sigar, arg))).toMap) getOrElse Map.empty[String, AnyRef]
+
+ private def createMethodFrom(ref: AnyRef, method: String, types: Array[(Class[_])] = Array.empty[(Class[_])]): Option[Method] =
+ Try(ref.getClass.getMethod(method, types: _*)).toOption
}
@@ -687,16 +752,26 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe
* Companion object of MetricsCollector class.
*/
private[cluster] object MetricsCollector {
- def apply(address: Address, log: LoggingAdapter, dynamicAccess: DynamicAccess): MetricsCollector =
- dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Seq.empty) match {
- case Success(identity) ⇒ new MetricsCollector(Some(identity), address)
- case Failure(e) ⇒
- log.debug(e.toString)
- log.info("Hyperic SIGAR was not found on the classpath or not installed properly. " +
- "Metrics will be retreived from MBeans, and may be incorrect on some platforms. " +
- "To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate" +
- "platform-specific native libary to 'java.library.path'.")
- new MetricsCollector(None, address)
+ def apply(system: ExtendedActorSystem, settings: ClusterSettings): MetricsCollector = {
+ import settings.{ MetricsCollectorClass ⇒ fqcn }
+ def log = Logging(system, "MetricsCollector")
+ if (fqcn == classOf[SigarMetricsCollector].getName) {
+ Try(new SigarMetricsCollector(system)) match {
+ case Success(sigarCollector) ⇒ sigarCollector
+ case Failure(e) ⇒
+ log.info("Metrics will be retreived from MBeans, and may be incorrect on some platforms. " +
+ "To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate " +
+ "platform-specific native libary to 'java.library.path'. Reason: " +
+ e.toString)
+ new JmxMetricsCollector(system)
+ }
+
+ } else {
+ system.dynamicAccess.createInstanceFor[MetricsCollector](
+ fqcn, Seq(classOf[ActorSystem] -> system)).recover({
+ case e ⇒ throw new ConfigurationException("Could not create custom metrics collector [" + fqcn + "] due to:" + e.toString)
+ }).get
}
+ }
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
index b8fa31fbc3..1b2d11745e 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
@@ -71,9 +71,16 @@ class ClusterSettings(val config: Config, val systemName: String) {
callTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.call-timeout"), MILLISECONDS),
resetTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.reset-timeout"), MILLISECONDS))
final val MetricsEnabled: Boolean = getBoolean("akka.cluster.metrics.enabled")
- final val MetricsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.metrics-interval"), MILLISECONDS)
+ final val MetricsCollectorClass: String = getString("akka.cluster.metrics.collector-class")
+ final val MetricsInterval: FiniteDuration = {
+ val d = Duration(getMilliseconds("akka.cluster.metrics.collect-interval"), MILLISECONDS)
+ require(d > Duration.Zero, "metrics.collect-interval must be > 0"); d
+ }
final val MetricsGossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.gossip-interval"), MILLISECONDS)
- final val MetricsRateOfDecay: Int = getInt("akka.cluster.metrics.rate-of-decay")
+ final val MetricsRateOfDecay: Int = {
+ val n = getInt("akka.cluster.metrics.rate-of-decay")
+ require(n >= 1, "metrics.rate-of-decay must be >= 1"); n
+ }
}
case class CircuitBreakerSettings(maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)
diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterLoadBalancingRouter.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterLoadBalancingRouter.scala
new file mode 100644
index 0000000000..52b63ef5f6
--- /dev/null
+++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterLoadBalancingRouter.scala
@@ -0,0 +1,271 @@
+/*
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+
+package akka.cluster.routing
+
+import scala.concurrent.forkjoin.ThreadLocalRandom
+import scala.collection.JavaConverters.iterableAsScalaIterableConverter
+
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.Address
+import akka.actor.OneForOneStrategy
+import akka.actor.Props
+import akka.actor.SupervisorStrategy
+import akka.cluster.Cluster
+import akka.cluster.ClusterEvent.ClusterMetricsChanged
+import akka.cluster.ClusterEvent.CurrentClusterState
+import akka.cluster.NodeMetrics
+import akka.cluster.NodeMetrics.MetricValues
+import akka.dispatch.Dispatchers
+import akka.event.Logging
+import akka.routing.Broadcast
+import akka.routing.Destination
+import akka.routing.Resizer
+import akka.routing.Route
+import akka.routing.RouteeProvider
+import akka.routing.RouterConfig
+
+/**
+ * INTERNAL API
+ */
+private[cluster] object ClusterLoadBalancingRouter {
+ val defaultSupervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
+ case _ ⇒ SupervisorStrategy.Escalate
+ }
+}
+
+/**
+ * A Router that performs load balancing to cluster nodes based on
+ * cluster metric data.
+ *
+ * It uses random selection of routees based probabilities derived from
+ * the remaining capacity of corresponding node.
+ *
+ * Please note that providing both 'nrOfInstances' and 'routees' does not make logical
+ * sense as this means that the router should both create new actors and use the 'routees'
+ * actor(s). In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
+ *
+ * The configuration parameter trumps the constructor arguments. This means that
+ * if you provide either 'nrOfInstances' or 'routees' during instantiation they will
+ * be ignored if the router is defined in the configuration file for the actor being used.
+ *
+ * Supervision Setup
+ *
+ * The router creates a “head” actor which supervises and/or monitors the
+ * routees. Instances are created as children of this actor, hence the
+ * children are not supervised by the parent of the router. Common choices are
+ * to always escalate (meaning that fault handling is always applied to all
+ * children simultaneously; this is the default) or use the parent’s strategy,
+ * which will result in routed children being treated individually, but it is
+ * possible as well to use Routers to give different supervisor strategies to
+ * different groups of children.
+ *
+ * @param metricsSelector decides what probability to use for selecting a routee, based
+ * on remaining capacity as indicated by the node metrics
+ * @param routees string representation of the actor paths of the routees that will be looked up
+ * using `actorFor` in [[akka.actor.ActorRefProvider]]
+ */
+@SerialVersionUID(1L)
+case class ClusterLoadBalancingRouter(
+ metricsSelector: MetricsSelector,
+ nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
+ override val resizer: Option[Resizer] = None,
+ val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
+ val supervisorStrategy: SupervisorStrategy = ClusterLoadBalancingRouter.defaultSupervisorStrategy)
+ extends RouterConfig with ClusterLoadBalancingRouterLike {
+
+ /**
+ * Constructor that sets nrOfInstances to be created.
+ * Java API
+ * @param selector the selector is responsible for producing weighted mix of routees from the node metrics
+ * @param nr number of routees to create
+ */
+ def this(selector: MetricsSelector, nr: Int) = this(metricsSelector = selector, nrOfInstances = nr)
+
+ /**
+ * Constructor that sets the routees to be used.
+ * Java API
+ * @param selector the selector is responsible for producing weighted mix of routees from the node metrics
+ * @param routeePaths string representation of the actor paths of the routees that will be looked up
+ * using `actorFor` in [[akka.actor.ActorRefProvider]]
+ */
+ def this(selector: MetricsSelector, routeePaths: java.lang.Iterable[String]) =
+ this(metricsSelector = selector, routees = routeePaths.asScala)
+
+ /**
+ * Constructor that sets the resizer to be used.
+ * Java API
+ * @param selector the selector is responsible for producing weighted mix of routees from the node metrics
+ */
+ def this(selector: MetricsSelector, resizer: Resizer) =
+ this(metricsSelector = selector, resizer = Some(resizer))
+
+ /**
+ * Java API for setting routerDispatcher
+ */
+ def withDispatcher(dispatcherId: String): ClusterLoadBalancingRouter =
+ copy(routerDispatcher = dispatcherId)
+
+ /**
+ * Java API for setting the supervisor strategy to be used for the “head”
+ * Router actor.
+ */
+ def withSupervisorStrategy(strategy: SupervisorStrategy): ClusterLoadBalancingRouter =
+ copy(supervisorStrategy = strategy)
+
+}
+
+/**
+ * INTERNAL API.
+ *
+ * This strategy is a metrics-aware router which performs load balancing of
+ * cluster nodes based on cluster metric data. It consumes [[akka.cluster.ClusterMetricsChanged]]
+ * events and the [[akka.cluster.routing.MetricsSelector]] creates an mix of
+ * weighted routees based on the node metrics. Messages are routed randomly to the
+ * weighted routees, i.e. nodes with lower load are more likely to be used than nodes with
+ * higher load
+ */
+trait ClusterLoadBalancingRouterLike { this: RouterConfig ⇒
+
+ def metricsSelector: MetricsSelector
+
+ def nrOfInstances: Int
+
+ def routees: Iterable[String]
+
+ def routerDispatcher: String
+
+ override def createRoute(routeeProvider: RouteeProvider): Route = {
+ if (resizer.isEmpty) {
+ if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances)
+ else routeeProvider.registerRouteesFor(routees)
+ }
+
+ val log = Logging(routeeProvider.context.system, routeeProvider.context.self)
+
+ // Function that points to the routees to use, starts with the plain routees
+ // of the routeeProvider and then changes to the current weighted routees
+ // produced by the metricsSelector. The reason for using a function is that
+ // routeeProvider.routees can change.
+ @volatile var weightedRoutees: () ⇒ IndexedSeq[ActorRef] = () ⇒ routeeProvider.routees
+
+ // subscribe to ClusterMetricsChanged and update weightedRoutees
+ val metricsListener = routeeProvider.context.actorOf(Props(new Actor {
+
+ val cluster = Cluster(routeeProvider.context.system)
+
+ override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterMetricsChanged])
+ override def postStop(): Unit = cluster.unsubscribe(self)
+
+ def receive = {
+ case ClusterMetricsChanged(metrics) ⇒ receiveMetrics(metrics)
+ case _: CurrentClusterState ⇒ // ignore
+ }
+
+ def receiveMetrics(metrics: Set[NodeMetrics]): Unit = {
+ val routees = metricsSelector.weightedRefs(routeeProvider.routees, cluster.selfAddress, metrics)
+ weightedRoutees = () ⇒ routees
+ }
+
+ }).withDispatcher(routerDispatcher), name = "metricsListener")
+
+ def getNext(): ActorRef = {
+ val currentRoutees = weightedRoutees.apply
+ if (currentRoutees.isEmpty) routeeProvider.context.system.deadLetters
+ else currentRoutees(ThreadLocalRandom.current.nextInt(currentRoutees.size))
+ }
+
+ {
+ case (sender, message) ⇒
+ message match {
+ case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees)
+ case msg ⇒ List(Destination(sender, getNext()))
+ }
+ }
+ }
+}
+
+/**
+ * MetricsSelector that uses the heap metrics.
+ * Low heap capacity => lower weight.
+ */
+@SerialVersionUID(1L)
+case object HeapMetricsSelector extends MetricsSelector {
+ /**
+ * Java API: get the singleton instance
+ */
+ def getInstance = this
+
+ override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = {
+ nodeMetrics.map { n ⇒
+ val (used, committed, max) = MetricValues.unapply(n.heapMemory)
+ val capacity = max match {
+ case None ⇒ (committed - used).toDouble / committed
+ case Some(m) ⇒ (m - used).toDouble / m
+ }
+ (n.address, capacity)
+ }.toMap
+ }
+}
+
+// FIXME implement more MetricsSelectors, such as CpuMetricsSelector,
+// LoadAverageMetricsSelector, NetworkMetricsSelector.
+// Also a CompositeMetricsSelector which uses a mix of other
+// selectors.
+
+/**
+ * A MetricsSelector is responsible for producing weighted mix of routees
+ * from the node metrics. The weights are typically proportional to the
+ * remaining capacity.
+ */
+abstract class MetricsSelector {
+
+ /**
+ * Remaining capacity for each node. The value is between
+ * 0.0 and 1.0, where 0.0 means no remaining capacity (full
+ * utilization) and 1.0 means full remaining capacity (zero
+ * utilization).
+ */
+ def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double]
+
+ /**
+ * Converts the capacity values to weights. The node with lowest
+ * capacity gets weight 1 (lowest usable capacity is 1%) and other
+ * nodes gets weights proportional to their capacity compared to
+ * the node with lowest capacity.
+ */
+ def weights(capacity: Map[Address, Double]): Map[Address, Int] = {
+ if (capacity.isEmpty) Map.empty[Address, Int]
+ else {
+ val (_, min) = capacity.minBy { case (_, c) ⇒ c }
+ // lowest usable capacity is 1% (>= 0.5% will be rounded to weight 1), also avoids div by zero
+ val divisor = math.max(0.01, min)
+ capacity mapValues { c ⇒ math.round((c) / divisor).toInt }
+ }
+ }
+
+ /**
+ * Allocates a list of actor refs according to the weight of their node, i.e.
+ * weight 3 of node A will allocate 3 slots for each ref with address A.
+ */
+ def weightedRefs(refs: IndexedSeq[ActorRef], selfAddress: Address, weights: Map[Address, Int]): IndexedSeq[ActorRef] = {
+ def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
+ case Address(_, _, None, None) ⇒ selfAddress
+ case a ⇒ a
+ }
+
+ val w = weights.withDefaultValue(1)
+ refs.foldLeft(IndexedSeq.empty[ActorRef]) { (acc, ref) ⇒
+ acc ++ IndexedSeq.fill(w(fullAddress(ref)))(ref)
+ }
+ }
+
+ /**
+ * Combines the different pieces to allocate a list of weighted actor refs
+ * based on the node metrics.
+ */
+ def weightedRefs(refs: IndexedSeq[ActorRef], selfAddress: Address, nodeMetrics: Set[NodeMetrics]): IndexedSeq[ActorRef] =
+ weightedRefs(refs, selfAddress, weights(capacity(nodeMetrics)))
+}
\ No newline at end of file
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala
index e04d3612d3..aa7c10b1ed 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala
@@ -31,6 +31,8 @@ class ClusterMetricsMultiJvmNode5 extends ClusterMetricsSpec
abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSpec) with MultiNodeClusterSpec with MetricSpec {
import ClusterMetricsMultiJvmSpec._
+ def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]
+
"Cluster metrics" must {
"periodically collect metrics on each node, publish ClusterMetricsChanged to the event stream, " +
"and gossip metrics around the node ring" taggedAs LongRunningTest in within(60 seconds) {
@@ -38,9 +40,8 @@ abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSp
enterBarrier("cluster-started")
awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size)
awaitCond(clusterView.clusterMetrics.size == roles.size)
- assertInitialized(cluster.settings.MetricsRateOfDecay, collectNodeMetrics(clusterView.clusterMetrics).toSet)
- val collector = MetricsCollector(cluster.selfAddress, log, system.asInstanceOf[ExtendedActorSystem].dynamicAccess)
- clusterView.clusterMetrics.foreach(n ⇒ assertExpectedSampleSize(collector.isSigar, cluster.settings.MetricsRateOfDecay, n))
+ val collector = MetricsCollector(cluster.system, cluster.settings)
+ collector.sample.metrics.size must be > (3)
enterBarrier("after")
}
"reflect the correct number of node metrics in cluster view" taggedAs LongRunningTest in within(30 seconds) {
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala
index 12fc8ebbc6..4c799aec63 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala
@@ -64,7 +64,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
def muteLog(sys: ActorSystem = system): Unit = {
if (!sys.log.isDebugEnabled) {
Seq(".*Metrics collection has started successfully.*",
- ".*Hyperic SIGAR was not found on the classpath.*",
+ ".*Metrics will be retreived from MBeans.*",
".*Cluster Node.* - registered cluster JMX MBean.*",
".*Cluster Node.* - is starting up.*",
".*Shutting down cluster Node.*",
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterAdaptiveLoadBalancingRouterMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterAdaptiveLoadBalancingRouterMultiJvmSpec.scala
deleted file mode 100644
index b62f60c0ce..0000000000
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterAdaptiveLoadBalancingRouterMultiJvmSpec.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2009-2012 Typesafe Inc.
- */
-
-package akka.cluster.routing
-
-import akka.remote.testkit.{MultiNodeSpec, MultiNodeConfig}
-import akka.testkit.{LongRunningTest, DefaultTimeout, ImplicitSender}
-import akka.actor._
-import akka.cluster.{ MemberStatus, MultiNodeClusterSpec }
-import akka.cluster.routing.ClusterRoundRobinRoutedActorMultiJvmSpec.SomeActor
-
-
-object ClusterAdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {
-
- val first = role("first")
- val second = role("second")
- val third = role("third")
- val fourth = role("fourth")
- val fifth = role("fifth")
-
- // TODO - config
- commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
-
-}
-
-class ClusterAdaptiveLoadBalancingRouterMultiJvmNode1 extends ClusterAdaptiveLoadBalancingRouterSpec
-class ClusterAdaptiveLoadBalancingRouterMultiJvmNode2 extends ClusterAdaptiveLoadBalancingRouterSpec
-class ClusterAdaptiveLoadBalancingRouterMultiJvmNode3 extends ClusterAdaptiveLoadBalancingRouterSpec
-class ClusterAdaptiveLoadBalancingRouterMultiJvmNode4 extends ClusterAdaptiveLoadBalancingRouterSpec
-class ClusterAdaptiveLoadBalancingRouterMultiJvmNode5 extends ClusterAdaptiveLoadBalancingRouterSpec
-
-abstract class ClusterAdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(ClusterAdaptiveLoadBalancingRouterMultiJvmSpec)
-with MultiNodeClusterSpec
-with ImplicitSender with DefaultTimeout {
- import ClusterAdaptiveLoadBalancingRouterMultiJvmSpec._
-
- // TODO configure properly and leverage the other pending load balancing routers
- lazy val router1 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(),
- ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router1")
- lazy val router2 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(),
- ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router2")
- lazy val router3 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(),
- ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router3")
- lazy val router4 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(),
- ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router4")
- lazy val router5 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(),
- ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router5")
-
- "A cluster with a ClusterAdaptiveLoadBalancingRouter" must {
- "start cluster with 5 nodes" taggedAs LongRunningTest in {
- awaitClusterUp(roles: _*)
- enterBarrier("cluster-started")
- awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size)
- awaitCond(clusterView.clusterMetrics.size == roles.size)
- enterBarrier("cluster-metrics-consumer-ready")
- }
- // TODO the rest of the necessary testing. All the work needed for consumption and extraction
- // of the data needed is in ClusterMetricsCollector._
- }
-}
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterLoadBalancingRouter.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterLoadBalancingRouter.scala
deleted file mode 100644
index 969372112f..0000000000
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterLoadBalancingRouter.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Copyright (C) 2009-2012 Typesafe Inc.
- */
-
-package akka.cluster.routing
-
-import language.implicitConversions
-import language.postfixOps
-import akka.actor._
-import akka.cluster._
-import akka.routing._
-import akka.dispatch.Dispatchers
-import akka.event.Logging
-import java.util.concurrent.atomic.{ AtomicLong, AtomicReference }
-import akka.routing.Destination
-import akka.cluster.NodeMetrics
-import akka.routing.Broadcast
-import akka.actor.OneForOneStrategy
-import akka.cluster.ClusterEvent.ClusterMetricsChanged
-import akka.cluster.ClusterEvent.CurrentClusterState
-import util.Try
-import akka.cluster.NodeMetrics.{ NodeMetricsComparator, MetricValues }
-import NodeMetricsComparator._
-
-/**
- * INTERNAL API.
- *
- * Trait that embodies the contract for all load balancing implementations.
- */
-private[cluster] trait LoadBalancer {
-
- /**
- * Compares only those nodes that are deemed 'available' by the
- * [[akka.routing.RouteeProvider]]
- */
- def selectNodeByHealth(availableNodes: Set[NodeMetrics]): Option[Address]
-
-}
-
-/**
- * INTERNAL API
- */
-private[cluster] object ClusterLoadBalancingRouter {
- val defaultSupervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
- case _ ⇒ SupervisorStrategy.Escalate
- }
-}
-
-/**
- * INTERNAL API.
- *
- * The abstract consumer of [[akka.cluster.ClusterMetricsChanged]] events and the primary consumer
- * of cluster metric data. This strategy is a metrics-aware router which performs load balancing of
- * cluster nodes with a fallback strategy of a [[akka.routing.RoundRobinRouter]].
- *
- * Load balancing of nodes is based on .. etc etc desc forthcoming
- */
-trait ClusterAdaptiveLoadBalancingRouterLike extends RoundRobinLike with LoadBalancer { this: RouterConfig ⇒
-
- def routerDispatcher: String
-
- override def createRoute(routeeProvider: RouteeProvider): Route = {
- if (resizer.isEmpty) {
- if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances)
- else routeeProvider.registerRouteesFor(routees)
- }
-
- val log = Logging(routeeProvider.context.system, routeeProvider.context.self)
-
- val next = new AtomicLong(0)
-
- val nodeMetrics = new AtomicReference[Set[NodeMetrics]](Set.empty)
-
- val metricsListener = routeeProvider.context.actorOf(Props(new Actor {
- def receive = {
- case ClusterMetricsChanged(metrics) ⇒ receiveMetrics(metrics)
- case _: CurrentClusterState ⇒ // ignore
- }
- def receiveMetrics(metrics: Set[NodeMetrics]): Unit = {
- val availableNodes = routeeProvider.routees.map(_.path.address).toSet
- val updated: Set[NodeMetrics] = nodeMetrics.get.collect { case node if availableNodes contains node.address ⇒ node }
- nodeMetrics.set(updated)
- }
- override def postStop(): Unit = Cluster(routeeProvider.context.system) unsubscribe self
- }).withDispatcher(routerDispatcher), name = "metricsListener")
- Cluster(routeeProvider.context.system).subscribe(metricsListener, classOf[ClusterMetricsChanged])
-
- def getNext(): ActorRef = {
- // TODO use as/where you will... selects by health category based on the implementation
- val address: Option[Address] = selectNodeByHealth(nodeMetrics.get)
- // TODO actual routee selection. defaults to round robin.
- routeeProvider.routees((next.getAndIncrement % routees.size).asInstanceOf[Int])
- }
-
- def routeTo(): ActorRef = if (routeeProvider.routees.isEmpty) routeeProvider.context.system.deadLetters else getNext()
-
- {
- case (sender, message) ⇒
- message match {
- case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees)
- case msg ⇒ List(Destination(sender, routeTo()))
- }
- }
- }
-}
-
-/**
- * Selects by all monitored metric types (memory, network latency, cpu...) and
- * chooses the healthiest node to route to.
- */
-@SerialVersionUID(1L)
-private[cluster] case class ClusterAdaptiveMetricsLoadBalancingRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
- override val resizer: Option[Resizer] = None,
- val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
- val supervisorStrategy: SupervisorStrategy = ClusterLoadBalancingRouter.defaultSupervisorStrategy)
- extends RouterConfig with ClusterAdaptiveLoadBalancingRouterLike with MetricsAwareClusterNodeSelector {
-
- // TODO
- def selectNodeByHealth(nodes: Set[NodeMetrics]): Option[Address] = {
- val s = Set(selectByMemory(nodes), selectByNetworkLatency(nodes), selectByCpu(nodes))
- s.head // TODO select the Address that appears with the highest or lowest frequency
- }
-}
-
-@SerialVersionUID(1L)
-private[cluster] case class MemoryLoadBalancingRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
- override val resizer: Option[Resizer] = None,
- val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
- val supervisorStrategy: SupervisorStrategy = ClusterLoadBalancingRouter.defaultSupervisorStrategy)
- extends RouterConfig with ClusterAdaptiveLoadBalancingRouterLike with MetricsAwareClusterNodeSelector {
-
- def selectNodeByHealth(nodes: Set[NodeMetrics]): Option[Address] = selectByMemory(nodes)
-}
-
-@SerialVersionUID(1L)
-private[cluster] case class CpuLoadBalancer(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
- override val resizer: Option[Resizer] = None,
- val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
- val supervisorStrategy: SupervisorStrategy = ClusterLoadBalancingRouter.defaultSupervisorStrategy)
- extends RouterConfig with ClusterAdaptiveLoadBalancingRouterLike with MetricsAwareClusterNodeSelector {
-
- // TODO
- def selectNodeByHealth(nodes: Set[NodeMetrics]): Option[Address] = None
-}
-
-@SerialVersionUID(1L)
-private[cluster] case class NetworkLatencyLoadBalancer(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
- override val resizer: Option[Resizer] = None,
- val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
- val supervisorStrategy: SupervisorStrategy = ClusterLoadBalancingRouter.defaultSupervisorStrategy)
- extends RouterConfig with ClusterAdaptiveLoadBalancingRouterLike with MetricsAwareClusterNodeSelector {
-
- // TODO
- def selectNodeByHealth(nodes: Set[NodeMetrics]): Option[Address] = None
-}
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterLoadBalancingRouterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterLoadBalancingRouterSpec.scala
new file mode 100644
index 0000000000..028693f04c
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterLoadBalancingRouterSpec.scala
@@ -0,0 +1,162 @@
+/*
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+
+package akka.cluster.routing
+
+import language.postfixOps
+import scala.concurrent.duration._
+import java.lang.management.ManagementFactory
+import akka.remote.testkit.{ MultiNodeSpec, MultiNodeConfig }
+import akka.testkit.{ LongRunningTest, DefaultTimeout, ImplicitSender }
+import akka.actor._
+import akka.cluster.{ MemberStatus, MultiNodeClusterSpec }
+import scala.concurrent.Await
+import akka.routing.RouterRoutees
+import akka.pattern.ask
+import akka.routing.CurrentRoutees
+import akka.cluster.Cluster
+import scala.concurrent.forkjoin.ThreadLocalRandom
+import com.typesafe.config.ConfigFactory
+
+object ClusterLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {
+
+ class Routee extends Actor {
+ var usedMemory: Array[Byte] = _
+ def receive = {
+ case _ ⇒ sender ! Reply(Cluster(context.system).selfAddress)
+ }
+ }
+
+ class Memory extends Actor with ActorLogging {
+ var usedMemory: Array[Byte] = _
+ def receive = {
+ case AllocateMemory ⇒
+ val heap = ManagementFactory.getMemoryMXBean.getHeapMemoryUsage
+ // getMax can be undefined (-1)
+ val max = math.max(heap.getMax, heap.getCommitted)
+ val used = heap.getUsed
+ val allocate = (0.8 * (max - used)).toInt
+ usedMemory = Array.fill(allocate)(ThreadLocalRandom.current.nextInt(127).toByte)
+ }
+ }
+
+ case object AllocateMemory
+ case class Reply(address: Address)
+
+ val first = role("first")
+ val second = role("second")
+ val third = role("third")
+
+ commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
+ akka.cluster.metrics.collect-interval = 1s
+ akka.cluster.metrics.gossip-interval = 1s
+ """)).withFallback(MultiNodeClusterSpec.clusterConfig))
+
+}
+
+class ClusterLoadBalancingRouterMultiJvmNode1 extends ClusterLoadBalancingRouterSpec
+class ClusterLoadBalancingRouterMultiJvmNode2 extends ClusterLoadBalancingRouterSpec
+class ClusterLoadBalancingRouterMultiJvmNode3 extends ClusterLoadBalancingRouterSpec
+
+abstract class ClusterLoadBalancingRouterSpec extends MultiNodeSpec(ClusterLoadBalancingRouterMultiJvmSpec)
+ with MultiNodeClusterSpec
+ with ImplicitSender with DefaultTimeout {
+ import ClusterLoadBalancingRouterMultiJvmSpec._
+
+ def currentRoutees(router: ActorRef) =
+ Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees
+
+ def receiveReplies(expectedReplies: Int): Map[Address, Int] = {
+ val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0)
+ (receiveWhile(5 seconds, messages = expectedReplies) {
+ case Reply(address) ⇒ address
+ }).foldLeft(zero) {
+ case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1))
+ }
+ }
+
+ /**
+ * Fills in self address for local ActorRef
+ */
+ def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
+ case Address(_, _, None, None) ⇒ cluster.selfAddress
+ case a ⇒ a
+ }
+
+ def startRouter(name: String): ActorRef = {
+ val router = system.actorOf(Props[Routee].withRouter(ClusterRouterConfig(
+ local = ClusterLoadBalancingRouter(HeapMetricsSelector),
+ settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1))), name)
+ awaitCond {
+ // it may take some time until router receives cluster member events
+ currentRoutees(router).size == roles.size
+ }
+ currentRoutees(router).map(fullAddress).toSet must be(roles.map(address).toSet)
+ router
+ }
+
+ "A cluster with a ClusterLoadBalancingRouter" must {
+ "start cluster nodes" taggedAs LongRunningTest in {
+ awaitClusterUp(roles: _*)
+ enterBarrier("after-1")
+ }
+
+ "use all nodes in the cluster when not overloaded" taggedAs LongRunningTest in {
+ runOn(first) {
+ val router1 = startRouter("router1")
+
+ // collect some metrics before we start
+ Thread.sleep(10000)
+
+ val iterationCount = 100
+ for (i ← 0 until iterationCount) {
+ router1 ! "hit"
+ Thread.sleep(10)
+ }
+
+ val replies = receiveReplies(iterationCount)
+
+ replies(first) must be > (0)
+ replies(second) must be > (0)
+ replies(third) must be > (0)
+ replies.values.sum must be(iterationCount)
+
+ }
+
+ enterBarrier("after-2")
+ }
+
+ "prefer node with more free heap capacity" taggedAs LongRunningTest in {
+ System.gc()
+ enterBarrier("gc")
+
+ runOn(second) {
+ system.actorOf(Props[Memory], "memory") ! AllocateMemory
+ }
+ enterBarrier("heap-allocated")
+
+ runOn(first) {
+ val router2 = startRouter("router2")
+ router2
+
+ // collect some metrics before we start
+ Thread.sleep(10000)
+
+ val iterationCount = 100
+ for (i ← 0 until iterationCount) {
+ router2 ! "hit"
+ Thread.sleep(10)
+ }
+
+ val replies = receiveReplies(iterationCount)
+
+ replies(third) must be > (replies(second))
+ replies.values.sum must be(iterationCount)
+
+ }
+
+ enterBarrier("after-3")
+ }
+ }
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala
index 2d1a6542bd..34513076c0 100644
--- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala
@@ -47,6 +47,7 @@ class ClusterConfigSpec extends AkkaSpec {
callTimeout = 2 seconds,
resetTimeout = 30 seconds))
MetricsEnabled must be(true)
+ MetricsCollectorClass must be("akka.cluster.SigarMetricsCollector")
MetricsInterval must be(3 seconds)
MetricsGossipInterval must be(3 seconds)
MetricsRateOfDecay must be(10)
diff --git a/akka-cluster/src/test/scala/akka/cluster/DataStreamSpec.scala b/akka-cluster/src/test/scala/akka/cluster/DataStreamSpec.scala
index b6357ddc46..f6519f000f 100644
--- a/akka-cluster/src/test/scala/akka/cluster/DataStreamSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/DataStreamSpec.scala
@@ -7,48 +7,74 @@ package akka.cluster
import language.postfixOps
import scala.concurrent.duration._
import akka.testkit.{ LongRunningTest, AkkaSpec }
+import scala.concurrent.forkjoin.ThreadLocalRandom
+import System.currentTimeMillis
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
-class DataStreamSpec extends AkkaSpec(MetricsEnabledSpec.config) with AbstractClusterMetricsSpec {
+class DataStreamSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricSpec with MetricsCollectorFactory {
import system.dispatcher
val collector = createMetricsCollector
"DataStream" must {
+ "calcualate same ewma for constant values" in {
+ val ds = DataStream(decay = 5, ewma = 100.0, currentTimeMillis, currentTimeMillis) :+
+ 100.0 :+ 100.0 :+ 100.0
+ ds.ewma must be(100.0 plusOrMinus 0.001)
+ }
+
+ "calcualate correct ewma for normal decay" in {
+ val d0 = DataStream(decay = 10, ewma = 1000.0, currentTimeMillis, currentTimeMillis)
+ d0.ewma must be(1000.0 plusOrMinus 0.01)
+ val d1 = d0 :+ 10.0
+ d1.ewma must be(820.0 plusOrMinus 0.01)
+ val d2 = d1 :+ 10.0
+ d2.ewma must be(672.73 plusOrMinus 0.01)
+ val d3 = d2 :+ 10.0
+ d3.ewma must be(552.23 plusOrMinus 0.01)
+ val d4 = d3 :+ 10.0
+ d4.ewma must be(453.64 plusOrMinus 0.01)
+
+ val dn = (1 to 100).foldLeft(d0)((d, _) ⇒ d :+ 10.0)
+ dn.ewma must be(10.0 plusOrMinus 0.1)
+ }
+
+ "calculate ewma for decay 1" in {
+ val d0 = DataStream(decay = 1, ewma = 100.0, currentTimeMillis, currentTimeMillis)
+ d0.ewma must be(100.0 plusOrMinus 0.01)
+ val d1 = d0 :+ 1.0
+ d1.ewma must be(1.0 plusOrMinus 0.01)
+ val d2 = d1 :+ 57.0
+ d2.ewma must be(57.0 plusOrMinus 0.01)
+ val d3 = d2 :+ 10.0
+ d3.ewma must be(10.0 plusOrMinus 0.01)
+ }
+
"calculate the ewma for multiple, variable, data streams" taggedAs LongRunningTest in {
- val firstDataSet = collector.sample.metrics.collect { case m if m.trendable && m.isDefined ⇒ m.initialize(DefaultRateOfDecay) }
- var streamingDataSet = firstDataSet
-
- val cancellable = system.scheduler.schedule(0 seconds, 100 millis) {
- streamingDataSet = collector.sample.metrics.flatMap(latest ⇒ streamingDataSet.collect {
- case streaming if (latest.trendable && latest.isDefined) && (latest same streaming)
- && (latest.value.get != streaming.value.get) ⇒ {
- val updatedDataStream = streaming.average.get :+ latest.value.get
- updatedDataStream.timestamp must be > (streaming.average.get.timestamp)
- updatedDataStream.duration.length must be > (streaming.average.get.duration.length)
- updatedDataStream.ewma must not be (streaming.average.get.ewma)
- updatedDataStream.ewma must not be (latest.value.get)
- streaming.copy(value = latest.value, average = Some(updatedDataStream))
+ var streamingDataSet = Map.empty[String, Metric]
+ var usedMemory = Array.empty[Byte]
+ (1 to 50) foreach { _ ⇒
+ Thread.sleep(100)
+ usedMemory = usedMemory ++ Array.fill(1024)(ThreadLocalRandom.current.nextInt(127).toByte)
+ val changes = collector.sample.metrics.flatMap { latest ⇒
+ streamingDataSet.get(latest.name) match {
+ case None ⇒ Some(latest)
+ case Some(previous) ⇒
+ if (latest.average.isDefined && latest.isDefined && latest.value.get != previous.value.get) {
+ val updated = previous :+ latest
+ updated.average.isDefined must be(true)
+ val updatedAverage = updated.average.get
+ updatedAverage.timestamp must be > (previous.average.get.timestamp)
+ updatedAverage.duration.length must be > (previous.average.get.duration.length)
+ updatedAverage.ewma must not be (previous.average.get.ewma)
+ (previous.value.get.longValue compare latest.value.get.longValue) must be(
+ previous.average.get.ewma.longValue compare updatedAverage.ewma.longValue)
+ Some(updated)
+ } else None
}
- })
- }
- awaitCond(firstDataSet.size == streamingDataSet.size, longDuration)
- cancellable.cancel()
-
- val finalDataSet = streamingDataSet.map(m ⇒ m.name -> m).toMap
- firstDataSet map {
- first ⇒
- val newMetric = finalDataSet(first.name)
- val e1 = first.average.get
- val e2 = newMetric.average.get
-
- if (first.value.get != newMetric.value.get) {
- e2.ewma must not be (first.value.get)
- e2.ewma must not be (newMetric.value.get)
- }
- if (first.value.get.longValue > newMetric.value.get.longValue) e1.ewma.longValue must be > e2.ewma.longValue
- else if (first.value.get.longValue < newMetric.value.get.longValue) e1.ewma.longValue must be < e2.ewma.longValue
+ }
+ streamingDataSet ++= changes.map(m ⇒ m.name -> m).toMap
}
}
}
diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala
index 1f23da769c..f866056d9f 100644
--- a/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala
@@ -5,39 +5,43 @@
package akka.cluster
import akka.testkit.{ ImplicitSender, AkkaSpec }
+import akka.cluster.NodeMetrics.MetricValues._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
-class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricNumericConverter with ImplicitSender with AbstractClusterMetricsSpec {
+class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricNumericConverter with ImplicitSender with MetricSpec
+ with MetricsCollectorFactory {
"MetricNumericConverter" must {
val collector = createMetricsCollector
"convert " in {
- convert(0).isLeft must be(true)
- convert(1).left.get must be(1)
- convert(1L).isLeft must be(true)
- convert(0.0).isRight must be(true)
+ convertNumber(0).isLeft must be(true)
+ convertNumber(1).left.get must be(1)
+ convertNumber(1L).isLeft must be(true)
+ convertNumber(0.0).isRight must be(true)
}
"define a new metric" in {
- val metric = Metric("heap-memory-used", Some(0L))
- metric.initializable must be(true)
- metric.name must not be (null)
- metric.average.isEmpty must be(true)
- metric.trendable must be(true)
+ val metric = Metric(HeapMemoryUsed, Some(256L), decay = Some(10))
+ metric.name must be(HeapMemoryUsed)
+ metric.isDefined must be(true)
+ metric.value must be(Some(256L))
+ metric.average.isDefined must be(true)
+ metric.average.get.ewma must be(256L)
- if (collector.isSigar) {
- val cores = collector.totalCores
- cores.isDefined must be(true)
- cores.value.get.intValue must be > (0)
- cores.initializable must be(false)
+ collector match {
+ case c: SigarMetricsCollector ⇒
+ val cores = c.totalCores
+ cores.isDefined must be(true)
+ cores.value.get.intValue must be > (0)
+ case _ ⇒
}
}
"define an undefined value with a None " in {
- Metric("x", Some(-1)).value.isDefined must be(false)
- Metric("x", Some(java.lang.Double.NaN)).value.isDefined must be(false)
- Metric("x", None).isDefined must be(false)
+ Metric("x", Some(-1), None).value.isDefined must be(false)
+ Metric("x", Some(java.lang.Double.NaN), None).value.isDefined must be(false)
+ Metric("x", None, None).isDefined must be(false)
}
"recognize whether a metric value is defined" in {
diff --git a/akka-cluster/src/test/scala/akka/cluster/NodeMetricsComparatorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricValuesSpec.scala
similarity index 53%
rename from akka-cluster/src/test/scala/akka/cluster/NodeMetricsComparatorSpec.scala
rename to akka-cluster/src/test/scala/akka/cluster/MetricValuesSpec.scala
index 54e540763b..5910d0510f 100644
--- a/akka-cluster/src/test/scala/akka/cluster/NodeMetricsComparatorSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/MetricValuesSpec.scala
@@ -4,17 +4,42 @@
package akka.cluster
+import scala.util.Try
import akka.actor.Address
+import akka.cluster.NodeMetrics.MetricValues._
+import akka.testkit.AkkaSpec
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
-class NodeMetricsComparatorSpec extends ClusterMetricsRouteeSelectorSpec {
-
+class MetricValuesSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricSpec
+ with MetricsCollectorFactory {
import NodeMetrics._
+ val collector = createMetricsCollector
+
+ val node1 = NodeMetrics(Address("akka", "sys", "a", 2554), 1, collector.sample.metrics)
+ val node2 = NodeMetrics(Address("akka", "sys", "a", 2555), 1, collector.sample.metrics)
+
+ var nodes: Seq[NodeMetrics] = Seq(node1, node2)
+
+ // work up the data streams where applicable
+ for (i ← 1 to 100) {
+ nodes = nodes map {
+ n ⇒
+ n.copy(metrics = collector.sample.metrics.flatMap(latest ⇒ n.metrics.collect {
+ case streaming if latest same streaming ⇒
+ streaming.average match {
+ case Some(e) ⇒ streaming.copy(value = latest.value, average =
+ if (latest.isDefined) Some(e :+ latest.value.get.doubleValue) else None)
+ case None ⇒ streaming.copy(value = latest.value)
+ }
+ }))
+ }
+ }
+
"NodeMetrics.MetricValues" must {
"extract expected metrics for load balancing" in {
- val stream1 = node2.metric("heap-memory-committed").value.get.longValue()
- val stream2 = node1.metric("heap-memory-used").value.get.longValue()
+ val stream1 = node2.metric(HeapMemoryCommitted).value.get.longValue
+ val stream2 = node1.metric(HeapMemoryUsed).value.get.longValue
stream1 must be >= (stream2)
}
@@ -39,8 +64,9 @@ class NodeMetricsComparatorSpec extends ClusterMetricsRouteeSelectorSpec {
}
val (systemLoadAverage, processors, combinedCpu, cores) = MetricValues.unapply(node.cpu)
- systemLoadAverage must be >= (0.0)
processors must be > (0)
+ if (systemLoadAverage.isDefined)
+ systemLoadAverage.get must be >= (0.0)
if (combinedCpu.isDefined) {
combinedCpu.get must be <= (1.0)
combinedCpu.get must be >= (0.0)
@@ -53,29 +79,4 @@ class NodeMetricsComparatorSpec extends ClusterMetricsRouteeSelectorSpec {
}
}
- "NodeMetricsComparator" must {
- val seq = Seq((Address("akka", "sys", "a", 2554), 9L), (Address("akka", "sys", "a", 2555), 8L))
- val seq2 = Seq((Address("akka", "sys", "a", 2554), 9.0), (Address("akka", "sys", "a", 2555), 8.0))
-
- "handle min ordering of a (Address, Long)" in {
- import NodeMetricsComparator.longMinAddressOrdering
- seq.min._1.port.get must be(2555)
- seq.min._2 must be(8L)
- }
- "handle max ordering of a (Address, Long)" in {
- val (address, value) = NodeMetricsComparator.maxAddressLong(seq)
- value must be(9L)
- address.port.get must be(2554)
- }
- "handle min ordering of a (Address, Double)" in {
- import NodeMetricsComparator.doubleMinAddressOrdering
- seq2.min._1.port.get must be(2555)
- seq2.min._2 must be(8.0)
- }
- "handle max ordering of a (Address, Double)" in {
- val (address, value) = NodeMetricsComparator.maxAddressDouble(seq2)
- value must be(9.0)
- address.port.get must be(2554)
- }
- }
}
\ No newline at end of file
diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricsAwareClusterNodeSelectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricsAwareClusterNodeSelectorSpec.scala
deleted file mode 100644
index 01327d65f9..0000000000
--- a/akka-cluster/src/test/scala/akka/cluster/MetricsAwareClusterNodeSelectorSpec.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright (C) 2009-2012 Typesafe Inc.
- */
-
-package akka.cluster
-
-import akka.testkit.AkkaSpec
-import akka.actor.Address
-import akka.cluster.NodeMetrics.MetricValues
-import util.control.NonFatal
-import util.Try
-
-@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
-class MetricsAwareClusterNodeSelectorSpec extends ClusterMetricsRouteeSelectorSpec with MetricsAwareClusterNodeSelector {
- import NodeMetrics.NodeMetricsComparator.longMinAddressOrdering
-
- "MetricsAwareClusterNodeSelector" must {
-
- "select the address of the node with the lowest memory" in {
- for (i ← 0 to 10) { // run enough times to insure we test differing metric values
-
- val map = nodes.map(n ⇒ n.address -> MetricValues.unapply(n.heapMemory)).toMap
- val (used1, committed1, max1) = map.get(node1.address).get
- val (used2, committed2, max2) = map.get(node2.address).get
- val diff1 = max1 match {
- case Some(m) ⇒ ((committed1 - used1) + (m - used1) + (m - committed1))
- case None ⇒ committed1 - used1
- }
- val diff2 = max2 match {
- case Some(m) ⇒ ((committed2 - used2) + (m - used2) + (m - committed2))
- case None ⇒ committed2 - used2
- }
- val testMin = Set(diff1, diff2).min
- val expectedAddress = if (testMin == diff1) node1.address else node2.address
- val address = selectByMemory(nodes.toSet).get
- address must be(expectedAddress)
- }
- }
- "select the address of the node with the lowest network latency" in {
- // TODO
- }
- "select the address of the node with the best CPU health" in {
- // TODO
- }
- "select the address of the node with the best overall health based on all metric categories monitored" in {
- // TODO
- }
- }
-}
-
-abstract class ClusterMetricsRouteeSelectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with AbstractClusterMetricsSpec {
-
- val collector = createMetricsCollector
-
- var node1 = NodeMetrics(Address("akka", "sys", "a", 2554), 1, collector.sample.metrics)
- node1 = node1.copy(metrics = node1.metrics.collect { case m ⇒ m.initialize(DefaultRateOfDecay) })
-
- var node2 = NodeMetrics(Address("akka", "sys", "a", 2555), 1, collector.sample.metrics)
- node2 = node2.copy(metrics = node2.metrics.collect { case m ⇒ m.initialize(DefaultRateOfDecay) })
-
- var nodes: Seq[NodeMetrics] = Seq(node1, node2)
-
- // work up the data streams where applicable
- for (i ← 0 to samples) {
- nodes = nodes map {
- n ⇒
- n.copy(metrics = collector.sample.metrics.flatMap(latest ⇒ n.metrics.collect {
- case streaming if latest same streaming ⇒
- streaming.average match {
- case Some(e) ⇒ streaming.copy(value = latest.value, average = Try(Some(e :+ latest.value.get)) getOrElse None)
- case None ⇒ streaming.copy(value = latest.value)
- }
- }))
- }
- }
-}
diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala
index c7a4ad4c73..ad836bf5f7 100644
--- a/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala
@@ -7,17 +7,18 @@ package akka.cluster
import scala.language.postfixOps
import scala.concurrent.duration._
import scala.concurrent.Await
+import scala.util.{ Success, Try, Failure }
import akka.actor._
import akka.testkit._
+import akka.cluster.NodeMetrics.MetricValues._
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
-import util.{ Success, Try, Failure }
object MetricsEnabledSpec {
val config = """
akka.cluster.metrics.enabled = on
- akka.cluster.metrics.metrics-interval = 1 s
+ akka.cluster.metrics.collect-interval = 1 s
akka.cluster.metrics.gossip-interval = 1 s
akka.cluster.metrics.rate-of-decay = 10
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
@@ -25,22 +26,16 @@ object MetricsEnabledSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
-class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with AbstractClusterMetricsSpec with MetricSpec {
+class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with MetricSpec
+ with MetricsCollectorFactory {
import system.dispatcher
val collector = createMetricsCollector
"Metric must" must {
- "create and initialize a new metric or merge an existing one" in {
- for (i ← 0 to samples) {
- val metrics = collector.sample.metrics
- assertCreatedUninitialized(metrics)
- assertInitialized(DefaultRateOfDecay, metrics map (_.initialize(DefaultRateOfDecay)))
- }
- }
"merge 2 metrics that are tracking the same metric" in {
- for (i ← 0 to samples) {
+ for (i ← 1 to 20) {
val sample1 = collector.sample.metrics
val sample2 = collector.sample.metrics
var merged = sample2 flatMap (latest ⇒ sample1 collect {
@@ -51,8 +46,8 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl
}
})
- val sample3 = collector.sample.metrics map (_.initialize(DefaultRateOfDecay))
- val sample4 = collector.sample.metrics map (_.initialize(DefaultRateOfDecay))
+ val sample3 = collector.sample.metrics
+ val sample4 = collector.sample.metrics
merged = sample4 flatMap (latest ⇒ sample3 collect {
case peer if latest same peer ⇒ {
val m = peer :+ latest
@@ -74,62 +69,61 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl
"collect accurate metrics for a node" in {
val sample = collector.sample
- assertExpectedSampleSize(collector.isSigar, DefaultRateOfDecay, sample)
val metrics = sample.metrics.collect { case m if m.isDefined ⇒ (m.name, m.value.get) }
- val used = metrics collectFirst { case ("heap-memory-used", b) ⇒ b }
- val committed = metrics collectFirst { case ("heap-memory-committed", b) ⇒ b }
+ val used = metrics collectFirst { case (HeapMemoryUsed, b) ⇒ b }
+ val committed = metrics collectFirst { case (HeapMemoryCommitted, b) ⇒ b }
metrics foreach {
- case ("total-cores", b) ⇒ b.intValue must be > (0)
- case ("network-max-rx", b) ⇒ b.longValue must be > (0L)
- case ("network-max-tx", b) ⇒ b.longValue must be > (0L)
- case ("system-load-average", b) ⇒ b.doubleValue must be >= (0.0)
- case ("processors", b) ⇒ b.intValue must be >= (0)
- case ("heap-memory-used", b) ⇒ b.longValue must be >= (0L)
- case ("heap-memory-committed", b) ⇒ b.longValue must be > (0L)
- case ("cpu-combined", b) ⇒
- b.doubleValue must be <= (1.0)
- b.doubleValue must be >= (0.0)
- case ("heap-memory-max", b) ⇒
+ case (TotalCores, b) ⇒ b.intValue must be > (0)
+ case (NetworkInboundRate, b) ⇒ b.longValue must be > (0L)
+ case (NetworkOutboundRate, b) ⇒ b.longValue must be > (0L)
+ case (SystemLoadAverage, b) ⇒ b.doubleValue must be >= (0.0)
+ case (Processors, b) ⇒ b.intValue must be >= (0)
+ case (HeapMemoryUsed, b) ⇒ b.longValue must be >= (0L)
+ case (HeapMemoryCommitted, b) ⇒ b.longValue must be > (0L)
+ case (HeapMemoryMax, b) ⇒
+ b.longValue must be > (0L)
used.get.longValue must be <= (b.longValue)
committed.get.longValue must be <= (b.longValue)
+ case (CpuCombined, b) ⇒
+ b.doubleValue must be <= (1.0)
+ b.doubleValue must be >= (0.0)
+
}
}
"collect SIGAR metrics if it is on the classpath" in {
- if (collector.isSigar) {
- // combined cpu may or may not be defined on a given sampling
- // systemLoadAverage is SIGAR present
- collector.systemLoadAverage.isDefined must be(true)
- collector.networkStats.nonEmpty must be(true)
- collector.networkMaxRx.isDefined must be(true)
- collector.networkMaxTx.isDefined must be(true)
- collector.totalCores.isDefined must be(true)
+ collector match {
+ case c: SigarMetricsCollector ⇒
+ // combined cpu may or may not be defined on a given sampling
+ // systemLoadAverage is not present on all platforms
+ c.networkMaxRx.isDefined must be(true)
+ c.networkMaxTx.isDefined must be(true)
+ c.totalCores.isDefined must be(true)
+ case _ ⇒
}
}
"collect JMX metrics" in {
// heap max may be undefined depending on the OS
- // systemLoadAverage is JMX if SIGAR not present, but not available on all OS
- collector.used.isDefined must be(true)
- collector.committed.isDefined must be(true)
- collector.processors.isDefined must be(true)
+ // systemLoadAverage is JMX when SIGAR not present, but
+ // it's not present on all platforms
+ val c = collector.asInstanceOf[JmxMetricsCollector]
+ c.heapUsed.isDefined must be(true)
+ c.heapCommitted.isDefined must be(true)
+ c.processors.isDefined must be(true)
}
- "collect [" + samples + "] node metrics samples in an acceptable duration" taggedAs LongRunningTest in {
- val latch = TestLatch(samples)
- val task = system.scheduler.schedule(0 seconds, interval) {
+ "collect 50 node metrics samples in an acceptable duration" taggedAs LongRunningTest in within(7 seconds) {
+ (1 to 50) foreach { _ ⇒
val sample = collector.sample
- assertCreatedUninitialized(sample.metrics)
- assertExpectedSampleSize(collector.isSigar, DefaultRateOfDecay, sample)
- latch.countDown()
+ sample.metrics.size must be >= (3)
+ Thread.sleep(100)
}
- Await.ready(latch, longDuration)
- task.cancel()
}
}
}
-trait MetricSpec extends WordSpec with MustMatchers {
+trait MetricSpec extends WordSpec with MustMatchers { this: { def system: ActorSystem } ⇒
def assertMasterMetricsAgainstGossipMetrics(master: Set[NodeMetrics], gossip: MetricsGossip): Unit = {
val masterMetrics = collectNodeMetrics(master)
@@ -140,52 +134,22 @@ trait MetricSpec extends WordSpec with MustMatchers {
def assertExpectedNodeAddresses(gossip: MetricsGossip, nodes: Set[NodeMetrics]): Unit =
gossip.nodes.map(_.address) must be(nodes.map(_.address))
- def assertExpectedSampleSize(isSigar: Boolean, gossip: MetricsGossip): Unit =
- gossip.nodes.foreach(n ⇒ assertExpectedSampleSize(isSigar, gossip.rateOfDecay, n))
-
- def assertCreatedUninitialized(gossip: MetricsGossip): Unit =
- gossip.nodes.foreach(n ⇒ assertCreatedUninitialized(n.metrics.filterNot(_.trendable)))
-
- def assertInitialized(gossip: MetricsGossip): Unit =
- gossip.nodes.foreach(n ⇒ assertInitialized(gossip.rateOfDecay, n.metrics))
-
- def assertCreatedUninitialized(metrics: Set[Metric]): Unit = {
- metrics.size must be > (0)
- metrics foreach { m ⇒
- m.average.isEmpty must be(true)
- if (m.value.isDefined) m.isDefined must be(true)
- if (m.initializable) (m.trendable && m.isDefined && m.average.isEmpty) must be(true)
- }
- }
-
- def assertInitialized(decay: Int, metrics: Set[Metric]): Unit = if (decay > 0) metrics.filter(_.trendable) foreach { m ⇒
- m.initializable must be(false)
- if (m.isDefined) m.average.isDefined must be(true)
- }
-
def assertMerged(latest: Metric, peer: Metric, merged: Metric): Unit = if (latest same peer) {
if (latest.isDefined) {
if (peer.isDefined) {
merged.isDefined must be(true)
merged.value.get must be(latest.value.get)
- if (latest.trendable) {
- if (latest.initializable) merged.average.isEmpty must be(true)
- else merged.average.isDefined must be(true)
- }
+ merged.average.isDefined must be(latest.average.isDefined)
} else {
merged.isDefined must be(true)
merged.value.get must be(latest.value.get)
- if (latest.average.isDefined) merged.average.get must be(latest.average.get)
- else merged.average.isEmpty must be(true)
+ merged.average.isDefined must be(latest.average.isDefined || peer.average.isDefined)
}
} else {
if (peer.isDefined) {
merged.isDefined must be(true)
merged.value.get must be(peer.value.get)
- if (peer.trendable) {
- if (peer.initializable) merged.average.isEmpty must be(true)
- else merged.average.isDefined must be(true)
- }
+ merged.average.isDefined must be(peer.average.isDefined)
} else {
merged.isDefined must be(false)
merged.average.isEmpty must be(true)
@@ -193,20 +157,6 @@ trait MetricSpec extends WordSpec with MustMatchers {
}
}
- def assertExpectedSampleSize(isSigar: Boolean, decay: Int, node: NodeMetrics): Unit = {
- node.metrics.size must be(9)
- val metrics = node.metrics.filter(_.isDefined)
- if (isSigar) { // combined cpu + jmx max heap
- metrics.size must be >= (7)
- metrics.size must be <= (9)
- } else { // jmx max heap
- metrics.size must be >= (4)
- metrics.size must be <= (5)
- }
-
- if (decay > 0) metrics.collect { case m if m.trendable && (!m.initializable) ⇒ m }.foreach(_.average.isDefined must be(true))
- }
-
def collectNodeMetrics(nodes: Set[NodeMetrics]): Seq[Metric] = {
var r: Seq[Metric] = Seq.empty
nodes.foreach(n ⇒ r ++= n.metrics.filter(_.isDefined))
@@ -214,19 +164,25 @@ trait MetricSpec extends WordSpec with MustMatchers {
}
}
-trait AbstractClusterMetricsSpec extends DefaultTimeout {
- this: AkkaSpec ⇒
+/**
+ * Used when testing metrics without full cluster
+ */
+trait MetricsCollectorFactory { this: AkkaSpec ⇒
- val selfAddress = new Address("akka", "localhost")
+ private def extendedActorSystem = system.asInstanceOf[ExtendedActorSystem]
- val DefaultRateOfDecay = 10
+ def selfAddress = extendedActorSystem.provider.rootPath.address
- val interval: FiniteDuration = 100 millis
+ val defaultRateOfDecay = 10
- val longDuration = 120 seconds // for long running tests
+ def createMetricsCollector: MetricsCollector =
+ Try(new SigarMetricsCollector(selfAddress, defaultRateOfDecay, extendedActorSystem.dynamicAccess)) match {
+ case Success(sigarCollector) ⇒ sigarCollector
+ case Failure(e) ⇒
+ log.debug("Metrics will be retreived from MBeans, Sigar failed to load. Reason: " +
+ e.getMessage)
+ new JmxMetricsCollector(selfAddress, defaultRateOfDecay)
+ }
- val samples = 100
-
- def createMetricsCollector: MetricsCollector = MetricsCollector(selfAddress, log, system.asInstanceOf[ExtendedActorSystem].dynamicAccess)
-
-}
\ No newline at end of file
+ def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricsGossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricsGossipSpec.scala
index e44b02e409..3ec10001f2 100644
--- a/akka-cluster/src/test/scala/akka/cluster/MetricsGossipSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/MetricsGossipSpec.scala
@@ -12,7 +12,8 @@ import akka.actor.Address
import java.lang.System.{ currentTimeMillis ⇒ newTimestamp }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
-class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with AbstractClusterMetricsSpec with MetricSpec {
+class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with MetricSpec
+ with MetricsCollectorFactory {
val collector = createMetricsCollector
@@ -21,27 +22,25 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
- var localGossip = MetricsGossip(DefaultRateOfDecay)
+ var localGossip = MetricsGossip()
localGossip :+= m1
localGossip.nodes.size must be(1)
localGossip.nodeKeys.size must be(localGossip.nodes.size)
assertMasterMetricsAgainstGossipMetrics(Set(m1), localGossip)
- assertExpectedSampleSize(collector.isSigar, localGossip)
- assertInitialized(localGossip.rateOfDecay, collectNodeMetrics(localGossip.nodes).toSet)
+ collector.sample.metrics.size must be > (3)
localGossip :+= m2
localGossip.nodes.size must be(2)
localGossip.nodeKeys.size must be(localGossip.nodes.size)
assertMasterMetricsAgainstGossipMetrics(Set(m1, m2), localGossip)
- assertExpectedSampleSize(collector.isSigar, localGossip)
- assertInitialized(localGossip.rateOfDecay, collectNodeMetrics(localGossip.nodes).toSet)
+ collector.sample.metrics.size must be > (3)
}
"merge peer metrics" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
- var remoteGossip = MetricsGossip(DefaultRateOfDecay)
+ var remoteGossip = MetricsGossip()
remoteGossip :+= m1
remoteGossip :+= m2
remoteGossip.nodes.size must be(2)
@@ -51,7 +50,7 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
remoteGossip :+= m2Updated // merge peers
remoteGossip.nodes.size must be(2)
assertMasterMetricsAgainstGossipMetrics(beforeMergeNodes, remoteGossip)
- assertExpectedSampleSize(collector.isSigar, remoteGossip)
+ remoteGossip.nodes.foreach(_.metrics.size must be > (3))
remoteGossip.nodes collect { case peer if peer.address == m2.address ⇒ peer.timestamp must be(m2Updated.timestamp) }
}
@@ -61,11 +60,11 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val m3 = NodeMetrics(Address("akka", "sys", "a", 2556), newTimestamp, collector.sample.metrics)
val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = newTimestamp)
- var localGossip = MetricsGossip(DefaultRateOfDecay)
+ var localGossip = MetricsGossip()
localGossip :+= m1
localGossip :+= m2
- var remoteGossip = MetricsGossip(DefaultRateOfDecay)
+ var remoteGossip = MetricsGossip()
remoteGossip :+= m3
remoteGossip :+= m2Updated
@@ -76,15 +75,13 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val mergedGossip = localGossip merge remoteGossip
mergedGossip.nodes.size must be(3)
assertExpectedNodeAddresses(mergedGossip, Set(m1, m2, m3))
- assertExpectedSampleSize(collector.isSigar, mergedGossip)
- assertCreatedUninitialized(mergedGossip)
- assertInitialized(mergedGossip)
+ mergedGossip.nodes.foreach(_.metrics.size must be > (3))
mergedGossip.nodes.find(_.address == m2.address).get.timestamp must be(m2Updated.timestamp)
}
"get the current NodeMetrics if it exists in the local nodes" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
- var localGossip = MetricsGossip(DefaultRateOfDecay)
+ var localGossip = MetricsGossip()
localGossip :+= m1
localGossip.metricsFor(m1).nonEmpty must be(true)
}
@@ -93,7 +90,7 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
- var localGossip = MetricsGossip(DefaultRateOfDecay)
+ var localGossip = MetricsGossip()
localGossip :+= m1
localGossip :+= m2
diff --git a/akka-cluster/src/test/scala/akka/cluster/NodeMetricsSpec.scala b/akka-cluster/src/test/scala/akka/cluster/NodeMetricsSpec.scala
index 5d58bc84e5..d6a47db670 100644
--- a/akka-cluster/src/test/scala/akka/cluster/NodeMetricsSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/NodeMetricsSpec.scala
@@ -8,7 +8,8 @@ import akka.testkit.AkkaSpec
import akka.actor.Address
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
-class NodeMetricsSpec extends AkkaSpec with AbstractClusterMetricsSpec with MetricSpec {
+class NodeMetricsSpec extends AkkaSpec with MetricSpec
+ with MetricsCollectorFactory {
val collector = createMetricsCollector
diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/HeapMetricsSelectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/routing/HeapMetricsSelectorSpec.scala
new file mode 100644
index 0000000000..5a8fecc2eb
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/routing/HeapMetricsSelectorSpec.scala
@@ -0,0 +1,114 @@
+/*
+ * Copyright (C) 2009-2012 Typesafe Inc.
+ */
+
+package akka.cluster.routing
+
+import akka.testkit.AkkaSpec
+import akka.actor.Address
+import akka.actor.RootActorPath
+import akka.cluster.NodeMetrics
+import akka.cluster.NodeMetrics.MetricValues._
+import akka.cluster.Metric
+import com.typesafe.config.ConfigFactory
+
+@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
+class HeapMetricsSelectorSpec extends AkkaSpec(ConfigFactory.parseString("""
+ akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
+ akka.remote.netty.port = 0
+ """)) {
+
+ val selector = HeapMetricsSelector
+
+ val a1 = Address("akka", "sys", "a1", 2551)
+ val b1 = Address("akka", "sys", "b1", 2551)
+ val c1 = Address("akka", "sys", "c1", 2551)
+ val d1 = Address("akka", "sys", "d1", 2551)
+
+ val decay = Some(10)
+
+ val nodeMetricsA = NodeMetrics(a1, System.currentTimeMillis, Set(
+ Metric(HeapMemoryUsed, Some(BigInt(128)), decay),
+ Metric(HeapMemoryCommitted, Some(BigInt(256)), decay),
+ Metric(HeapMemoryMax, Some(BigInt(512)), None)))
+
+ val nodeMetricsB = NodeMetrics(b1, System.currentTimeMillis, Set(
+ Metric(HeapMemoryUsed, Some(BigInt(256)), decay),
+ Metric(HeapMemoryCommitted, Some(BigInt(512)), decay),
+ Metric(HeapMemoryMax, Some(BigInt(1024)), None)))
+
+ val nodeMetricsC = NodeMetrics(c1, System.currentTimeMillis, Set(
+ Metric(HeapMemoryUsed, Some(BigInt(1024)), decay),
+ Metric(HeapMemoryCommitted, Some(BigInt(1024)), decay),
+ Metric(HeapMemoryMax, Some(BigInt(1024)), None)))
+
+ val nodeMetrics = Set(nodeMetricsA, nodeMetricsB, nodeMetricsC)
+
+ "MetricsAwareClusterNodeSelector" must {
+
+ "calculate capacity of heap metrics" in {
+ val capacity = selector.capacity(nodeMetrics)
+ capacity(a1) must be(0.75 plusOrMinus 0.0001)
+ capacity(b1) must be(0.75 plusOrMinus 0.0001)
+ capacity(c1) must be(0.0 plusOrMinus 0.0001)
+ }
+
+ "calculate weights from capacity" in {
+ val capacity = Map(a1 -> 0.6, b1 -> 0.3, c1 -> 0.1)
+ val weights = selector.weights(capacity)
+ weights must be(Map(c1 -> 1, b1 -> 3, a1 -> 6))
+ }
+
+ "handle low and zero capacity" in {
+ val capacity = Map(a1 -> 0.0, b1 -> 1.0, c1 -> 0.005, d1 -> 0.004)
+ val weights = selector.weights(capacity)
+ weights must be(Map(a1 -> 0, b1 -> 100, c1 -> 1, d1 -> 0))
+ }
+
+ "allocate weighted refs" in {
+ val weights = Map(a1 -> 1, b1 -> 3, c1 -> 10)
+ val refs = IndexedSeq(
+ system.actorFor(RootActorPath(a1) / "user" / "a"),
+ system.actorFor(RootActorPath(b1) / "user" / "b"),
+ system.actorFor(RootActorPath(c1) / "user" / "c"))
+ val result = selector.weightedRefs(refs, a1, weights)
+ val grouped = result.groupBy(_.path.address)
+ grouped(a1).size must be(1)
+ grouped(b1).size must be(3)
+ grouped(c1).size must be(10)
+ }
+
+ "allocate refs for undefined weight" in {
+ val weights = Map(a1 -> 1, b1 -> 2)
+ val refs = IndexedSeq(
+ system.actorFor(RootActorPath(a1) / "user" / "a"),
+ system.actorFor(RootActorPath(b1) / "user" / "b"),
+ system.actorFor(RootActorPath(c1) / "user" / "c"))
+ val result = selector.weightedRefs(refs, a1, weights)
+ val grouped = result.groupBy(_.path.address)
+ grouped(a1).size must be(1)
+ grouped(b1).size must be(2)
+ grouped(c1).size must be(1)
+ }
+
+ "allocate weighted local refs" in {
+ val weights = Map(a1 -> 2, b1 -> 1, c1 -> 10)
+ val refs = IndexedSeq(
+ testActor,
+ system.actorFor(RootActorPath(b1) / "user" / "b"),
+ system.actorFor(RootActorPath(c1) / "user" / "c"))
+ val result = selector.weightedRefs(refs, a1, weights)
+ result.filter(_ == testActor).size must be(2)
+ }
+
+ "not allocate ref with weight zero" in {
+ val weights = Map(a1 -> 0, b1 -> 2, c1 -> 10)
+ val refs = IndexedSeq(
+ system.actorFor(RootActorPath(a1) / "user" / "a"),
+ system.actorFor(RootActorPath(b1) / "user" / "b"),
+ system.actorFor(RootActorPath(c1) / "user" / "c"))
+ val result = selector.weightedRefs(refs, a1, weights)
+ result.filter(_ == refs.head).size must be(0)
+ }
+ }
+}
\ No newline at end of file