From c9d206764a183fe3e1799c1359c2c56978026181 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 7 Nov 2012 20:36:24 +0100 Subject: [PATCH] ClusterLoadBalancingRouter and refactoring of metrics, see #2547 * MetricsSelector, calculate capacity, weights and allocate weighted routee refs * ClusterLoadBalancingRouterSpec * Optional heap max * Constants for the metric fields * Refactoring of Metric and decay * Rewrite of DataStreamSpec * Correction of EWMA and removal of BigInt, BigDecimal * Separation of MetricsCollector into trait and two classes, SigarMetricsCollector and JmxMetricsCollector * This will reduce cost when sigar is not installed, such as avoiding throwing and catching exc for every call * Improved error handling for loading sigar * Made MetricsCollector implementation configurable * Tested with sigar --- .../src/main/resources/reference.conf | 14 +- .../cluster/ClusterMetricsCollector.scala | 535 ++++++++++-------- .../scala/akka/cluster/ClusterSettings.scala | 11 +- .../routing/ClusterLoadBalancingRouter.scala | 271 +++++++++ .../akka/cluster/ClusterMetricsSpec.scala | 7 +- .../akka/cluster/MultiNodeClusterSpec.scala | 2 +- ...ptiveLoadBalancingRouterMultiJvmSpec.scala | 61 -- .../routing/ClusterLoadBalancingRouter.scala | 155 ----- .../ClusterLoadBalancingRouterSpec.scala | 162 ++++++ .../akka/cluster/ClusterConfigSpec.scala | 1 + .../scala/akka/cluster/DataStreamSpec.scala | 90 +-- .../cluster/MetricNumericConverterSpec.scala | 40 +- ...ratorSpec.scala => MetricValuesSpec.scala} | 61 +- .../MetricsAwareClusterNodeSelectorSpec.scala | 76 --- .../akka/cluster/MetricsCollectorSpec.scala | 168 ++---- .../akka/cluster/MetricsGossipSpec.scala | 27 +- .../scala/akka/cluster/NodeMetricsSpec.scala | 3 +- .../routing/HeapMetricsSelectorSpec.scala | 114 ++++ 18 files changed, 1065 insertions(+), 733 deletions(-) create mode 100644 akka-cluster/src/main/scala/akka/cluster/routing/ClusterLoadBalancingRouter.scala delete mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterAdaptiveLoadBalancingRouterMultiJvmSpec.scala delete mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterLoadBalancingRouter.scala create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterLoadBalancingRouterSpec.scala rename akka-cluster/src/test/scala/akka/cluster/{NodeMetricsComparatorSpec.scala => MetricValuesSpec.scala} (53%) delete mode 100644 akka-cluster/src/test/scala/akka/cluster/MetricsAwareClusterNodeSelectorSpec.scala create mode 100644 akka-cluster/src/test/scala/akka/cluster/routing/HeapMetricsSelectorSpec.scala diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index d75ddd0a00..8539e56b1c 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -70,7 +70,7 @@ akka { failure-detector { # FQCN of the failure detector implementation. - # It must implement akka.cluster.akka.cluster and + # It must implement akka.cluster.cluster.FailureDetector and # have constructor with akka.actor.ActorSystem and # akka.cluster.ClusterSettings parameters implementation-class = "akka.cluster.AccrualFailureDetector" @@ -118,14 +118,22 @@ akka { # Enable or disable metrics collector for load-balancing nodes. enabled = on + # FQCN of the metrics collector implementation. + # It must implement akka.cluster.cluster.MetricsCollector and + # have constructor with akka.actor.ActorSystem parameter. + collector-class = "akka.cluster.SigarMetricsCollector" + # How often metrics is sampled on a node. - metrics-interval = 3s + collect-interval = 3s # How often a node publishes metrics information. gossip-interval = 3s # How quickly the exponential weighting of past data is decayed compared to - # new data. Set higher to increase the bias toward newer values. + # new data. Set lower to increase the bias toward newer values. + # Corresponds to 'N time periods' as explained in + # http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average + # Must be >= 1 rate-of-decay = 10 } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index 07016b098c..ab3b6f5777 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -4,29 +4,36 @@ package akka.cluster -import scala.language.postfixOps -import scala.concurrent.duration._ -import scala.collection.immutable.{ SortedSet, Map } -import scala.concurrent.forkjoin.ThreadLocalRandom -import scala.util.{ Try, Success, Failure } -import math.{ ScalaNumber, ScalaNumericConversions } -import scala.runtime.{ RichLong, RichDouble, RichInt } - -import akka.actor._ -import akka.event.LoggingAdapter -import akka.cluster.MemberStatus.Up - -import java.lang.management.{ OperatingSystemMXBean, MemoryMXBean, ManagementFactory } -import java.lang.reflect.Method +import java.io.Closeable import java.lang.System.{ currentTimeMillis ⇒ newTimestamp } +import java.lang.management.{ OperatingSystemMXBean, MemoryMXBean, ManagementFactory } +import java.lang.reflect.InvocationTargetException +import java.lang.reflect.Method + +import scala.collection.immutable.{ SortedSet, Map } +import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.runtime.{ RichLong, RichDouble, RichInt } +import scala.util.{ Try, Success, Failure } + +import akka.ConfigurationException +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.DynamicAccess +import akka.actor.ExtendedActorSystem +import akka.cluster.MemberStatus.Up +import akka.event.Logging /** * INTERNAL API. * - * This strategy is primarily for load-balancing of nodes. It controls metrics sampling + * Cluster metrics is primarily for load-balancing of nodes. It controls metrics sampling * at a regular frequency, prepares highly variable data for further analysis by other entities, - * and publishes the latest cluster metrics data around the node ring to assist in determining - * the need to redirect traffic to the least-loaded nodes. + * and publishes the latest cluster metrics data around the node ring and local eventStream + * to assist in determining the need to redirect traffic to the least-loaded nodes. * * Metrics sampling is delegated to the [[akka.cluster.MetricsCollector]]. * @@ -43,8 +50,6 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto import cluster.{ selfAddress, scheduler, settings } import settings._ - val DefaultRateOfDecay: Int = 10 - /** * The node ring gossipped that contains only members that are Up. */ @@ -53,12 +58,12 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto /** * The latest metric values with their statistical data. */ - var latestGossip: MetricsGossip = MetricsGossip(if (MetricsRateOfDecay <= 0) DefaultRateOfDecay else MetricsRateOfDecay) + var latestGossip: MetricsGossip = MetricsGossip() /** * The metrics collector that samples data on the node. */ - val collector: MetricsCollector = MetricsCollector(selfAddress, log, context.system.asInstanceOf[ExtendedActorSystem].dynamicAccess) + val collector: MetricsCollector = MetricsCollector(context.system.asInstanceOf[ExtendedActorSystem], settings) /** * Start periodic gossip to random nodes in cluster @@ -160,7 +165,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto * * @param nodes metrics per node */ -private[cluster] case class MetricsGossip(rateOfDecay: Int, nodes: Set[NodeMetrics] = Set.empty) { +private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics] = Set.empty) { /** * Removes nodes if their correlating node ring members are not [[akka.cluster.MemberStatus.Up]] @@ -194,11 +199,10 @@ private[cluster] case class MetricsGossip(rateOfDecay: Int, nodes: Set[NodeMetri val names = previous map (_.name) val (toMerge: Set[Metric], unseen: Set[Metric]) = data.metrics partition (a ⇒ names contains a.name) - val initialized = unseen.map(_.initialize(rateOfDecay)) val merged = toMerge flatMap (latest ⇒ previous.collect { case peer if latest same peer ⇒ peer :+ latest }) val refreshed = nodes filterNot (_.address == data.address) - copy(nodes = refreshed + data.copy(metrics = initialized ++ merged)) + copy(nodes = refreshed + data.copy(metrics = unseen ++ merged)) } /** @@ -231,6 +235,8 @@ private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: Metrics * INTERNAL API * * @param decay sets how quickly the exponential weighting decays for past data compared to new data + * Corresponds to 'N time periods' as explained in + * http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average * * @param ewma the current exponentially weighted moving average, e.g. Y(n - 1), or, * the sampled value resulting from the previous smoothing iteration. @@ -240,36 +246,30 @@ private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: Metrics * * @param startTime the time of initial sampling for this data stream */ -private[cluster] case class DataStream(decay: Int, ewma: ScalaNumericConversions, startTime: Long, timestamp: Long) - extends ClusterMessage with MetricNumericConverter { +private[cluster] case class DataStream(decay: Int, ewma: Double, startTime: Long, timestamp: Long) + extends ClusterMessage { + + require(decay >= 1, "Rate of decay must be >= 1") /** * The rate at which the weights of past observations * decay as they become more distant. */ - private val α = 2 / decay + 1 + private val α = 2.0 / (decay + 1) /** * Calculates the exponentially weighted moving average for a given monitored data set. - * The datam can be too large to fit into an int or long, thus we use ScalaNumericConversions, - * and defer to BigInt or BigDecimal. - * - * FIXME - look into the math: the new ewma becomes the new value - * value: 416979936, prev ewma: 416581760, new ewma: 416979936 * * @param xn the new data point * * @return a [[akka.cluster.DataStream]] with the updated yn and timestamp */ - def :+(xn: ScalaNumericConversions): DataStream = if (xn != ewma) convert(xn) fold ( - nl ⇒ copy(ewma = BigInt((α * nl) + (1 - α) * ewma.longValue()), timestamp = newTimestamp), - nd ⇒ copy(ewma = BigDecimal((α * nd) + (1 - α) * ewma.doubleValue()), timestamp = newTimestamp)) - else this + def :+(xn: Double): DataStream = copy(ewma = (α * xn) + (1 - α) * ewma, timestamp = newTimestamp) /** * The duration of observation for this data stream */ - def duration: FiniteDuration = (timestamp - startTime) millis + def duration: FiniteDuration = (timestamp - startTime).millis } @@ -278,19 +278,17 @@ private[cluster] case class DataStream(decay: Int, ewma: ScalaNumericConversions * * @param name the metric name * - * @param value the metric value, which may or may not be defined + * @param value the metric value, which may or may not be defined, it must be a valid numerical value, + * see [[akka.cluster.MetricNumericConverter.defined()]] * * @param average the data stream of the metric value, for trending over time. Metrics that are already * averages (e.g. system load average) or finite (e.g. as total cores), are not trended. */ -private[cluster] case class Metric(name: String, value: Option[ScalaNumericConversions], average: Option[DataStream]) +private[cluster] case class Metric private (name: String, value: Option[Number], average: Option[DataStream], + dummy: Boolean) // dummy because of overloading clash with apply extends ClusterMessage with MetricNumericConverter { - /** - * Returns the metric with a new data stream for data trending if eligible, - * otherwise returns the unchanged metric. - */ - def initialize(decay: Int): Metric = if (initializable) copy(average = Some(DataStream(decay, value.get, newTimestamp, newTimestamp))) else this + require(value.isEmpty || defined(value.get), "Invalid Metric [%s] value [%]".format(name, value)) /** * If defined ( [[akka.cluster.MetricNumericConverter.defined()]] ), updates the new @@ -298,36 +296,27 @@ private[cluster] case class Metric(name: String, value: Option[ScalaNumericConve */ def :+(latest: Metric): Metric = latest.value match { case Some(v) if this same latest ⇒ average match { - case Some(previous) ⇒ copy(value = Some(v), average = Some(previous :+ v)) - case None if latest.average.isDefined ⇒ copy(value = Some(v), average = latest.average) - case None if !latest.average.isDefined ⇒ copy(value = Some(v)) + case Some(previous) ⇒ copy(value = latest.value, average = Some(previous :+ v.doubleValue)) + case None if latest.average.isDefined ⇒ copy(value = latest.value, average = latest.average) + case None if latest.average.isEmpty ⇒ copy(value = latest.value) } case None ⇒ this } + def isDefined: Boolean = value.isDefined + /** - * @see [[akka.cluster.MetricNumericConverter.defined()]] + * The numerical value of the average, if defined, otherwise the latest value, + * if defined. */ - def isDefined: Boolean = value match { - case Some(a) ⇒ defined(a) - case None ⇒ false - } + def averageValue: Option[Double] = + average map (_.ewma) orElse value.map(_.doubleValue) /** * Returns true if that is tracking the same metric as this. */ def same(that: Metric): Boolean = name == that.name - /** - * Returns true if the metric requires initialization. - */ - def initializable: Boolean = trendable && isDefined && average.isEmpty - - /** - * Returns true if the metric is a value applicable for trending. - */ - def trendable: Boolean = !(Metric.noStream contains name) - } /** @@ -336,65 +325,30 @@ private[cluster] case class Metric(name: String, value: Option[ScalaNumericConve * Companion object of Metric class. */ private[cluster] object Metric extends MetricNumericConverter { + import NodeMetrics.MetricValues._ - /** - * The metrics that are already averages or finite are not trended over time. - */ - private val noStream = Set("system-load-average", "total-cores", "processors") + private def definedValue(value: Option[Number]): Option[Number] = + if (value.isDefined && defined(value.get)) value else None /** * Evaluates validity of value based on whether it is available (SIGAR on classpath) * or defined for the OS (JMX). If undefined we set the value option to None and do not modify * the latest sampled metric to avoid skewing the statistical trend. + * + * @param decay rate of decay for values applicable for trending */ - def apply(name: String, value: Option[ScalaNumericConversions]): Metric = value match { - case Some(v) if defined(v) ⇒ Metric(name, value, None) - case _ ⇒ Metric(name, None, None) + def apply(name: String, value: Option[Number], decay: Option[Int]): Metric = { + val dv = definedValue(value) + val average = + if (decay.isDefined && dv.isDefined) { + val now = newTimestamp + Some(DataStream(decay.get, dv.get.doubleValue, now, now)) + } else + None + new Metric(name, dv, average, true) } -} - -/** - * Reusable logic for particular metric categories or to leverage all for routing. - */ -private[cluster] trait MetricsAwareClusterNodeSelector { - import NodeMetrics._ - import NodeMetrics.NodeMetricsComparator._ - - /** - * Returns the address of the available node with the lowest cumulative difference - * between heap memory max and used/committed. - */ - def selectByMemory(nodes: Set[NodeMetrics]): Option[Address] = Try(Some(nodes.map { - n ⇒ - val (used, committed, max) = MetricValues.unapply(n.heapMemory) - (n.address, max match { - case Some(m) ⇒ ((committed - used) + (m - used) + (m - committed)) - case None ⇒ committed - used - }) - }.min._1)) getOrElse None - - // TODO - def selectByNetworkLatency(nodes: Set[NodeMetrics]): Option[Address] = None - /* Try(nodes.map { - n ⇒ - val (rxBytes, txBytes) = MetricValues.unapply(n.networkLatency).get - (n.address, (rxBytes + txBytes)) - }.min._1) getOrElse None // TODO: min or max - */ - - // TODO - def selectByCpu(nodes: Set[NodeMetrics]): Option[Address] = None - /* Try(nodes.map { - n ⇒ - val (loadAverage, processors, combinedCpu, cores) = MetricValues.unapply(n.cpu) - var cumulativeDifference = 0 - // TODO: calculate - combinedCpu.get - cores.get - (n.address, cumulativeDifference) - }.min._1) getOrElse None // TODO min or max - }*/ + def apply(name: String): Metric = new Metric(name, None, None, true) } /** @@ -417,6 +371,7 @@ private[cluster] trait MetricsAwareClusterNodeSelector { */ private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) extends ClusterMessage { import NodeMetrics._ + import NodeMetrics.MetricValues._ /** * Returns the most recent data. @@ -436,13 +391,13 @@ private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metri /** * Of all the data streams, this fluctuates the most. */ - def heapMemory: HeapMemory = HeapMemory(metric("heap-memory-used"), metric("heap-memory-committed"), metric("heap-memory-max")) + def heapMemory: HeapMemory = HeapMemory(metric(HeapMemoryUsed), metric(HeapMemoryCommitted), metric(HeapMemoryMax)) - def networkLatency: NetworkLatency = NetworkLatency(metric("network-max-rx"), metric("network-max-tx")) + def networkLatency: NetworkLatency = NetworkLatency(metric(NetworkInboundRate), metric(NetworkOutboundRate)) - def cpu: Cpu = Cpu(metric("system-load-average"), metric("processors"), metric("cpu-combined"), metric("total-cores")) + def cpu: Cpu = Cpu(metric(SystemLoadAverage), metric(Processors), metric(CpuCombined), metric(TotalCores)) - def metric(key: String): Metric = metrics.collectFirst { case m if m.name == key ⇒ m } getOrElse Metric(key, None) + def metric(key: String): Metric = metrics.collectFirst { case m if m.name == key ⇒ m } getOrElse Metric(key) } @@ -456,44 +411,36 @@ private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metri */ private[cluster] object NodeMetrics { - object NodeMetricsComparator extends MetricNumericConverter { - - implicit val longMinOrdering: Ordering[Long] = Ordering.fromLessThan[Long] { (a, b) ⇒ (a < b) } - - implicit val longMinAddressOrdering: Ordering[(Address, Long)] = new Ordering[(Address, Long)] { - def compare(a: (Address, Long), b: (Address, Long)): Int = longMinOrdering.compare(a._2, b._2) - } - - def maxAddressLong(seq: Seq[(Address, Long)]): (Address, Long) = - seq.reduceLeft((a: (Address, Long), b: (Address, Long)) ⇒ if (a._2 > b._2) a else b) - - implicit val doubleMinOrdering: Ordering[Double] = Ordering.fromLessThan[Double] { (a, b) ⇒ (a < b) } - - implicit val doubleMinAddressOrdering: Ordering[(Address, Double)] = new Ordering[(Address, Double)] { - def compare(a: (Address, Double), b: (Address, Double)): Int = doubleMinOrdering.compare(a._2, b._2) - } - - def maxAddressDouble(seq: Seq[(Address, Double)]): (Address, Double) = - seq.reduceLeft((a: (Address, Double), b: (Address, Double)) ⇒ if (a._2 > b._2) a else b) - } - sealed trait MetricValues object MetricValues { + final val HeapMemoryUsed = "heap-memory-used" + final val HeapMemoryCommitted = "heap-memory-committed" + final val HeapMemoryMax = "heap-memory-max" + final val NetworkInboundRate = "network-max-inbound" + final val NetworkOutboundRate = "network-max-outbound" + final val SystemLoadAverage = "system-load-average" + final val Processors = "processors" + final val CpuCombined = "cpu-combined" + final val TotalCores = "total-cores" + def unapply(v: HeapMemory): Tuple3[Long, Long, Option[Long]] = - (v.used.average.get.ewma.longValue(), - v.committed.average.get.ewma.longValue(), - Try(Some(v.max.average.get.ewma.longValue())) getOrElse None) + (v.used.averageValue.get.longValue, + v.committed.averageValue.get.longValue, + v.max.averageValue map (_.longValue) orElse None) def unapply(v: NetworkLatency): Option[(Long, Long)] = - Try(Some(v.maxRxIO.average.get.ewma.longValue(), v.maxTxIO.average.get.ewma.longValue())) getOrElse None + (v.inbound.averageValue, v.outbound.averageValue) match { + case (Some(a), Some(b)) ⇒ Some((a.longValue, b.longValue)) + case _ ⇒ None + } - def unapply(v: Cpu): Tuple4[Double, Int, Option[Double], Option[Int]] = - (v.systemLoadAverage.value.get.doubleValue(), - v.processors.value.get.intValue(), - Try(Some(v.combinedCpu.average.get.ewma.doubleValue())) getOrElse None, - Try(Some(v.cores.value.get.intValue())) getOrElse None) + def unapply(v: Cpu): Tuple4[Option[Double], Int, Option[Double], Option[Int]] = + (v.systemLoadAverage.averageValue map (_.doubleValue), + v.processors.averageValue.get.intValue, + v.combinedCpu.averageValue map (_.doubleValue), + v.cores.averageValue map (_.intValue)) } /** @@ -505,14 +452,17 @@ private[cluster] object NodeMetrics { * @param max the maximum amount of memory (in bytes) that can be used for JVM memory management. * Can be undefined on some OS. */ - case class HeapMemory(used: Metric, committed: Metric, max: Metric) extends MetricValues + case class HeapMemory(used: Metric, committed: Metric, max: Metric) extends MetricValues { + require(used.isDefined, "used must be defined") + require(committed.isDefined, "committed must be defined") + } /** - * @param maxRxIO the max network IO rx value, in bytes + * @param inbound the inbound network IO rate, in bytes * - * @param maxTxIO the max network IO tx value, in bytes + * @param outbound the outbound network IO rate, in bytes */ - case class NetworkLatency(maxRxIO: Metric, maxTxIO: Metric) extends MetricValues + case class NetworkLatency(inbound: Metric, outbound: Metric) extends MetricValues /** * @param systemLoadAverage OS-specific average load on the CPUs in the system, for the past 1 minute @@ -524,7 +474,9 @@ private[cluster] object NodeMetrics { * * @param cores the number of cores (multi-core: per processor) */ - private[cluster] case class Cpu(systemLoadAverage: Metric, processors: Metric, combinedCpu: Metric, cores: Metric) extends MetricValues + private[cluster] case class Cpu(systemLoadAverage: Metric, processors: Metric, combinedCpu: Metric, cores: Metric) extends MetricValues { + require(processors.isDefined, "processors must be defined") + } } @@ -537,44 +489,152 @@ private[cluster] object NodeMetrics { private[cluster] trait MetricNumericConverter { /** - * A defined value is neither a -1 or NaN/Infinite: + * An undefined value is neither a -1 (negative) or NaN/Infinite: * */ - def defined(value: ScalaNumericConversions): Boolean = - convert(value) fold (a ⇒ value.underlying != -1, b ⇒ !(b.isNaN || b.isInfinite)) + def defined(value: Number): Boolean = convertNumber(value) match { + case Left(a) ⇒ a >= 0 + case Right(b) ⇒ !(b.isNaN || b.isInfinite) + } /** * May involve rounding or truncation. */ - def convert(from: ScalaNumericConversions): Either[Long, Double] = from match { - case n: BigInt ⇒ Left(n.longValue()) - case n: BigDecimal ⇒ Right(n.doubleValue()) + def convertNumber(from: Any): Either[Long, Double] = from match { + case n: Int ⇒ Left(n) + case n: Long ⇒ Left(n) + case n: Double ⇒ Right(n) + case n: Float ⇒ Right(n) case n: RichInt ⇒ Left(n.abs) case n: RichLong ⇒ Left(n.self) case n: RichDouble ⇒ Right(n.self) + case n: BigInt ⇒ Left(n.longValue) + case n: BigDecimal ⇒ Right(n.doubleValue) + case x ⇒ throw new IllegalArgumentException("Not a number [%s]" format x) } } +/** + * INTERNAL API + */ +private[cluster] trait MetricsCollector extends Closeable { + /** + * Samples and collects new data points. + */ + def sample: NodeMetrics +} + /** * INTERNAL API * - * Loads JVM metrics through JMX monitoring beans. If Hyperic SIGAR is on the classpath, this - * loads wider and more accurate range of metrics in combination with SIGAR's native OS library. - * - * FIXME switch to Scala reflection - * - * @param sigar the optional org.hyperic.Sigar instance + * Loads JVM metrics through JMX monitoring beans. * * @param address The [[akka.actor.Address]] of the node being sampled + * @param decay how quickly the exponential weighting of past data is decayed */ -private[cluster] class MetricsCollector private (private val sigar: Option[AnyRef], address: Address) extends MetricNumericConverter { +private[cluster] class JmxMetricsCollector(address: Address, decay: Int) extends MetricsCollector { + import NodeMetrics.MetricValues._ + + private def this(cluster: Cluster) = + this(cluster.selfAddress, cluster.settings.MetricsRateOfDecay) + + def this(system: ActorSystem) = this(Cluster(system)) + + private val decayOption = Some(decay) private val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean private val osMBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean + /** + * Samples and collects new data points. + */ + def sample: NodeMetrics = NodeMetrics(address, newTimestamp, Set( + systemLoadAverage, heapUsed, heapCommitted, heapMax, processors)) + + /** + * JMX Returns the OS-specific average load on the CPUs in the system, for the past 1 minute. + * On some systems the JMX OS system load average may not be available, in which case a -1 is returned, + * which means that the returned Metric is undefined. + */ + def systemLoadAverage: Metric = Metric( + name = SystemLoadAverage, + value = Some(osMBean.getSystemLoadAverage), + decay = None) + + /** + * (JMX) Returns the number of available processors + */ + def processors: Metric = Metric( + name = Processors, + value = Some(osMBean.getAvailableProcessors), + decay = None) + + // FIXME those three heap calls should be done at once + + /** + * (JMX) Returns the current sum of heap memory used from all heap memory pools (in bytes). + */ + def heapUsed: Metric = Metric( + name = HeapMemoryUsed, + value = Some(memoryMBean.getHeapMemoryUsage.getUsed), + decay = decayOption) + + /** + * (JMX) Returns the current sum of heap memory guaranteed to be available to the JVM + * from all heap memory pools (in bytes). + */ + def heapCommitted: Metric = Metric( + name = HeapMemoryCommitted, + value = Some(memoryMBean.getHeapMemoryUsage.getCommitted), + decay = decayOption) + + /** + * (JMX) Returns the maximum amount of memory (in bytes) that can be used + * for JVM memory management. If not defined the metrics value is None, i.e. + * never negative. + */ + def heapMax: Metric = Metric( + name = HeapMemoryMax, + value = Some(memoryMBean.getHeapMemoryUsage.getMax), + decay = None) + + def close(): Unit = () + +} + +/** + * INTERNAL API + * + * Loads metrics through Hyperic SIGAR and JMX monitoring beans. This + * loads wider and more accurate range of metrics compared to JmxMetricsCollector + * by using SIGAR's native OS library. + * + * The constructor will by design throw exception if org.hyperic.sigar.Sigar can't be loaded, due + * to missing classes or native libraries. + * + * TODO switch to Scala reflection + * + * @param address The [[akka.actor.Address]] of the node being sampled + * @param decay how quickly the exponential weighting of past data is decayed + * @param sigar the org.hyperic.Sigar instance + */ +private[cluster] class SigarMetricsCollector(address: Address, decay: Int, sigar: AnyRef) + extends JmxMetricsCollector(address, decay) { + import NodeMetrics.MetricValues._ + + def this(address: Address, decay: Int, dynamicAccess: DynamicAccess) = + this(address, decay, dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Seq.empty).get) + + private def this(cluster: Cluster) = + this(cluster.selfAddress, cluster.settings.MetricsRateOfDecay, cluster.system.dynamicAccess) + + def this(system: ActorSystem) = this(Cluster(system)) + + private val decayOption = Some(decay) + private val LoadAverage: Option[Method] = createMethodFrom(sigar, "getLoadAverage") private val CpuList: Option[Method] = createMethodFrom(sigar, "getCpuInfoList").map(m ⇒ m) @@ -585,48 +645,41 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe private val CombinedCpu: Option[Method] = Try(Cpu.get.getReturnType.getMethod("getCombined")).toOption + private val Pid: Option[Method] = createMethodFrom(sigar, "getPid") + + // Do something initially, in constructor, to make sure that the native library can be loaded. + // This will by design throw exception if sigar isn't usable + val pid: Long = createMethodFrom(sigar, "getPid") match { + case Some(method) ⇒ + try method.invoke(sigar).asInstanceOf[Long] catch { + case e: InvocationTargetException if e.getCause.isInstanceOf[LinkageError] ⇒ + // native libraries not in place + // don't throw fatal LinkageError, but something less harmless + throw new IllegalArgumentException(e.getCause.toString) + case e: InvocationTargetException ⇒ throw e.getCause + } + case None ⇒ throw new IllegalArgumentException("Wrong version of Sigar, expected 'getPid' method") + } + /** * Samples and collects new data points. - * - * @return [[akka.cluster.NodeMetrics]] */ - def sample: NodeMetrics = NodeMetrics(address, newTimestamp, Set(cpuCombined, totalCores, - systemLoadAverage, used, committed, max, processors, networkMaxRx, networkMaxTx)) + override def sample: NodeMetrics = NodeMetrics(address, newTimestamp, Set(cpuCombined, totalCores, + systemLoadAverage, heapUsed, heapCommitted, heapMax, processors, networkMaxRx, networkMaxTx)) /** * (SIGAR / JMX) Returns the OS-specific average load on the CPUs in the system, for the past 1 minute. - * On some systems the JMX OS system load average may not be available, in which case a Metric with - * undefined value is returned. + * On some systems the JMX OS system load average may not be available, in which case a -1 is returned, + * which means that the returned Metric is undefined. * Hyperic SIGAR provides more precise values, thus, if the library is on the classpath, it is the default. */ - def systemLoadAverage: Metric = Metric("system-load-average", - Try(LoadAverage.get.invoke(sigar.get).asInstanceOf[Array[Double]].toSeq.head).getOrElse( - osMBean.getSystemLoadAverage) match { - case x if x < 0 ⇒ None // load average may be unavailable on some platform - case x ⇒ Some(BigDecimal(x)) - }) - - /** - * (JMX) Returns the number of available processors - */ - def processors: Metric = Metric("processors", Some(BigInt(osMBean.getAvailableProcessors))) - - /** - * (JMX) Returns the current sum of heap memory used from all heap memory pools (in bytes). - */ - def used: Metric = Metric("heap-memory-used", Some(BigInt(memoryMBean.getHeapMemoryUsage.getUsed))) - - /** - * (JMX) Returns the current sum of heap memory guaranteed to be available to the JVM - * from all heap memory pools (in bytes). - */ - def committed: Metric = Metric("heap-memory-committed", Some(BigInt(memoryMBean.getHeapMemoryUsage.getCommitted))) - - /** - * (JMX) Returns the maximum amount of memory (in bytes) that can be used - * for JVM memory management. If undefined, returns -1. - */ - def max: Metric = Metric("heap-memory-max", Some(BigInt(memoryMBean.getHeapMemoryUsage.getMax))) + override def systemLoadAverage: Metric = { + val m = Metric( + name = SystemLoadAverage, + value = Try(LoadAverage.get.invoke(sigar).asInstanceOf[Array[AnyRef]].head.asInstanceOf[Number]).toOption, + decay = None) + if (m.isDefined) m else super.systemLoadAverage + } /** * (SIGAR) Returns the combined CPU sum of User + Sys + Nice + Wait, in percentage. This metric can describe @@ -636,49 +689,61 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe * In the data stream, this will sometimes return with a valid metric value, and sometimes as a NaN or Infinite. * Documented bug https://bugzilla.redhat.com/show_bug.cgi?id=749121 and several others. */ - def cpuCombined: Metric = Metric("cpu-combined", Try(BigDecimal(CombinedCpu.get.invoke(Cpu.get.invoke(sigar.get)).asInstanceOf[Double])).toOption) + def cpuCombined: Metric = Metric( + name = CpuCombined, + value = Try(CombinedCpu.get.invoke(Cpu.get.invoke(sigar)).asInstanceOf[Number]).toOption, + decay = decayOption) /** * FIXME: Array[Int].head - expose all if cores per processor might differ. * (SIGAR) Returns the total number of cores. + * + * FIXME do we need this information, isn't it enough with jmx processors? */ - def totalCores: Metric = Metric("total-cores", Try(BigInt(CpuList.get.invoke(sigar.get).asInstanceOf[Array[AnyRef]].map(cpu ⇒ - createMethodFrom(Some(cpu), "getTotalCores").get.invoke(cpu).asInstanceOf[Int]).head)).toOption) + def totalCores: Metric = Metric( + name = TotalCores, + value = Try(CpuList.get.invoke(sigar).asInstanceOf[Array[AnyRef]].map(cpu ⇒ + createMethodFrom(cpu, "getTotalCores").get.invoke(cpu).asInstanceOf[Number]).head).toOption, + decay = None) + + // FIXME those two network calls should be combined into one /** * (SIGAR) Returns the max network IO read/write value, in bytes, for network latency evaluation. */ - def networkMaxRx: Metric = networkMaxFor("getRxBytes", "network-max-rx") + def networkMaxRx: Metric = networkFor("getRxBytes", NetworkInboundRate) /** - * (SIGAR) Returns the max network IO tx value, in bytes. + * (SIGAR) Returns the max network IO outbound value, in bytes. */ - def networkMaxTx: Metric = networkMaxFor("getTxBytes", "network-max-tx") - - /** - * Returns the network stats per interface. - */ - def networkStats: Map[String, AnyRef] = Try(NetInterfaces.get.invoke(sigar.get).asInstanceOf[Array[String]].map(arg ⇒ - arg -> (createMethodFrom(sigar, "getNetInterfaceStat", Array(classOf[String])).get.invoke(sigar.get, arg))).toMap) getOrElse Map.empty[String, AnyRef] - - /** - * Returns true if SIGAR is successfully installed on the classpath, otherwise false. - */ - def isSigar: Boolean = sigar.isDefined + def networkMaxTx: Metric = networkFor("getTxBytes", NetworkOutboundRate) /** * Releases any native resources associated with this instance. */ - def close(): Unit = if (isSigar) Try(createMethodFrom(sigar, "close").get.invoke(sigar.get)) getOrElse Unit + override def close(): Unit = Try(createMethodFrom(sigar, "close").get.invoke(sigar)) + + // FIXME network metrics needs thought and refactoring /** * Returns the max bytes for the given method in metric for metric from the network interface stats. */ - private def networkMaxFor(method: String, metric: String): Metric = Metric(metric, Try(Some(BigInt( - networkStats.collect { case (_, a) ⇒ createMethodFrom(Some(a), method).get.invoke(a).asInstanceOf[Long] }.max))) getOrElse None) + private def networkFor(method: String, metric: String): Metric = Metric( + name = metric, + value = Try(networkStats.collect { + case (_, a) ⇒ + createMethodFrom(a, method).get.invoke(a).asInstanceOf[Long] + }.max.asInstanceOf[Number]).toOption.orElse(None), + decay = decayOption) - private def createMethodFrom(ref: Option[AnyRef], method: String, types: Array[(Class[_])] = Array.empty[(Class[_])]): Option[Method] = - Try(ref.get.getClass.getMethod(method, types: _*)).toOption + /** + * Returns the network stats per interface. + */ + private def networkStats: Map[String, AnyRef] = Try(NetInterfaces.get.invoke(sigar).asInstanceOf[Array[String]].map(arg ⇒ + arg -> (createMethodFrom(sigar, "getNetInterfaceStat", Array(classOf[String])).get.invoke(sigar, arg))).toMap) getOrElse Map.empty[String, AnyRef] + + private def createMethodFrom(ref: AnyRef, method: String, types: Array[(Class[_])] = Array.empty[(Class[_])]): Option[Method] = + Try(ref.getClass.getMethod(method, types: _*)).toOption } @@ -687,16 +752,26 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe * Companion object of MetricsCollector class. */ private[cluster] object MetricsCollector { - def apply(address: Address, log: LoggingAdapter, dynamicAccess: DynamicAccess): MetricsCollector = - dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Seq.empty) match { - case Success(identity) ⇒ new MetricsCollector(Some(identity), address) - case Failure(e) ⇒ - log.debug(e.toString) - log.info("Hyperic SIGAR was not found on the classpath or not installed properly. " + - "Metrics will be retreived from MBeans, and may be incorrect on some platforms. " + - "To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate" + - "platform-specific native libary to 'java.library.path'.") - new MetricsCollector(None, address) + def apply(system: ExtendedActorSystem, settings: ClusterSettings): MetricsCollector = { + import settings.{ MetricsCollectorClass ⇒ fqcn } + def log = Logging(system, "MetricsCollector") + if (fqcn == classOf[SigarMetricsCollector].getName) { + Try(new SigarMetricsCollector(system)) match { + case Success(sigarCollector) ⇒ sigarCollector + case Failure(e) ⇒ + log.info("Metrics will be retreived from MBeans, and may be incorrect on some platforms. " + + "To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate " + + "platform-specific native libary to 'java.library.path'. Reason: " + + e.toString) + new JmxMetricsCollector(system) + } + + } else { + system.dynamicAccess.createInstanceFor[MetricsCollector]( + fqcn, Seq(classOf[ActorSystem] -> system)).recover({ + case e ⇒ throw new ConfigurationException("Could not create custom metrics collector [" + fqcn + "] due to:" + e.toString) + }).get } + } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index b8fa31fbc3..1b2d11745e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -71,9 +71,16 @@ class ClusterSettings(val config: Config, val systemName: String) { callTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.call-timeout"), MILLISECONDS), resetTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.reset-timeout"), MILLISECONDS)) final val MetricsEnabled: Boolean = getBoolean("akka.cluster.metrics.enabled") - final val MetricsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.metrics-interval"), MILLISECONDS) + final val MetricsCollectorClass: String = getString("akka.cluster.metrics.collector-class") + final val MetricsInterval: FiniteDuration = { + val d = Duration(getMilliseconds("akka.cluster.metrics.collect-interval"), MILLISECONDS) + require(d > Duration.Zero, "metrics.collect-interval must be > 0"); d + } final val MetricsGossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.gossip-interval"), MILLISECONDS) - final val MetricsRateOfDecay: Int = getInt("akka.cluster.metrics.rate-of-decay") + final val MetricsRateOfDecay: Int = { + val n = getInt("akka.cluster.metrics.rate-of-decay") + require(n >= 1, "metrics.rate-of-decay must be >= 1"); n + } } case class CircuitBreakerSettings(maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration) diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterLoadBalancingRouter.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterLoadBalancingRouter.scala new file mode 100644 index 0000000000..52b63ef5f6 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterLoadBalancingRouter.scala @@ -0,0 +1,271 @@ +/* + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster.routing + +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.collection.JavaConverters.iterableAsScalaIterableConverter + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Address +import akka.actor.OneForOneStrategy +import akka.actor.Props +import akka.actor.SupervisorStrategy +import akka.cluster.Cluster +import akka.cluster.ClusterEvent.ClusterMetricsChanged +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.NodeMetrics +import akka.cluster.NodeMetrics.MetricValues +import akka.dispatch.Dispatchers +import akka.event.Logging +import akka.routing.Broadcast +import akka.routing.Destination +import akka.routing.Resizer +import akka.routing.Route +import akka.routing.RouteeProvider +import akka.routing.RouterConfig + +/** + * INTERNAL API + */ +private[cluster] object ClusterLoadBalancingRouter { + val defaultSupervisorStrategy: SupervisorStrategy = OneForOneStrategy() { + case _ ⇒ SupervisorStrategy.Escalate + } +} + +/** + * A Router that performs load balancing to cluster nodes based on + * cluster metric data. + * + * It uses random selection of routees based probabilities derived from + * the remaining capacity of corresponding node. + * + * Please note that providing both 'nrOfInstances' and 'routees' does not make logical + * sense as this means that the router should both create new actors and use the 'routees' + * actor(s). In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + *

Supervision Setup

+ * + * The router creates a “head” actor which supervises and/or monitors the + * routees. Instances are created as children of this actor, hence the + * children are not supervised by the parent of the router. Common choices are + * to always escalate (meaning that fault handling is always applied to all + * children simultaneously; this is the default) or use the parent’s strategy, + * which will result in routed children being treated individually, but it is + * possible as well to use Routers to give different supervisor strategies to + * different groups of children. + * + * @param metricsSelector decides what probability to use for selecting a routee, based + * on remaining capacity as indicated by the node metrics + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] + */ +@SerialVersionUID(1L) +case class ClusterLoadBalancingRouter( + metricsSelector: MetricsSelector, + nrOfInstances: Int = 0, routees: Iterable[String] = Nil, + override val resizer: Option[Resizer] = None, + val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + val supervisorStrategy: SupervisorStrategy = ClusterLoadBalancingRouter.defaultSupervisorStrategy) + extends RouterConfig with ClusterLoadBalancingRouterLike { + + /** + * Constructor that sets nrOfInstances to be created. + * Java API + * @param selector the selector is responsible for producing weighted mix of routees from the node metrics + * @param nr number of routees to create + */ + def this(selector: MetricsSelector, nr: Int) = this(metricsSelector = selector, nrOfInstances = nr) + + /** + * Constructor that sets the routees to be used. + * Java API + * @param selector the selector is responsible for producing weighted mix of routees from the node metrics + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] + */ + def this(selector: MetricsSelector, routeePaths: java.lang.Iterable[String]) = + this(metricsSelector = selector, routees = routeePaths.asScala) + + /** + * Constructor that sets the resizer to be used. + * Java API + * @param selector the selector is responsible for producing weighted mix of routees from the node metrics + */ + def this(selector: MetricsSelector, resizer: Resizer) = + this(metricsSelector = selector, resizer = Some(resizer)) + + /** + * Java API for setting routerDispatcher + */ + def withDispatcher(dispatcherId: String): ClusterLoadBalancingRouter = + copy(routerDispatcher = dispatcherId) + + /** + * Java API for setting the supervisor strategy to be used for the “head” + * Router actor. + */ + def withSupervisorStrategy(strategy: SupervisorStrategy): ClusterLoadBalancingRouter = + copy(supervisorStrategy = strategy) + +} + +/** + * INTERNAL API. + * + * This strategy is a metrics-aware router which performs load balancing of + * cluster nodes based on cluster metric data. It consumes [[akka.cluster.ClusterMetricsChanged]] + * events and the [[akka.cluster.routing.MetricsSelector]] creates an mix of + * weighted routees based on the node metrics. Messages are routed randomly to the + * weighted routees, i.e. nodes with lower load are more likely to be used than nodes with + * higher load + */ +trait ClusterLoadBalancingRouterLike { this: RouterConfig ⇒ + + def metricsSelector: MetricsSelector + + def nrOfInstances: Int + + def routees: Iterable[String] + + def routerDispatcher: String + + override def createRoute(routeeProvider: RouteeProvider): Route = { + if (resizer.isEmpty) { + if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances) + else routeeProvider.registerRouteesFor(routees) + } + + val log = Logging(routeeProvider.context.system, routeeProvider.context.self) + + // Function that points to the routees to use, starts with the plain routees + // of the routeeProvider and then changes to the current weighted routees + // produced by the metricsSelector. The reason for using a function is that + // routeeProvider.routees can change. + @volatile var weightedRoutees: () ⇒ IndexedSeq[ActorRef] = () ⇒ routeeProvider.routees + + // subscribe to ClusterMetricsChanged and update weightedRoutees + val metricsListener = routeeProvider.context.actorOf(Props(new Actor { + + val cluster = Cluster(routeeProvider.context.system) + + override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterMetricsChanged]) + override def postStop(): Unit = cluster.unsubscribe(self) + + def receive = { + case ClusterMetricsChanged(metrics) ⇒ receiveMetrics(metrics) + case _: CurrentClusterState ⇒ // ignore + } + + def receiveMetrics(metrics: Set[NodeMetrics]): Unit = { + val routees = metricsSelector.weightedRefs(routeeProvider.routees, cluster.selfAddress, metrics) + weightedRoutees = () ⇒ routees + } + + }).withDispatcher(routerDispatcher), name = "metricsListener") + + def getNext(): ActorRef = { + val currentRoutees = weightedRoutees.apply + if (currentRoutees.isEmpty) routeeProvider.context.system.deadLetters + else currentRoutees(ThreadLocalRandom.current.nextInt(currentRoutees.size)) + } + + { + case (sender, message) ⇒ + message match { + case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) + case msg ⇒ List(Destination(sender, getNext())) + } + } + } +} + +/** + * MetricsSelector that uses the heap metrics. + * Low heap capacity => lower weight. + */ +@SerialVersionUID(1L) +case object HeapMetricsSelector extends MetricsSelector { + /** + * Java API: get the singleton instance + */ + def getInstance = this + + override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = { + nodeMetrics.map { n ⇒ + val (used, committed, max) = MetricValues.unapply(n.heapMemory) + val capacity = max match { + case None ⇒ (committed - used).toDouble / committed + case Some(m) ⇒ (m - used).toDouble / m + } + (n.address, capacity) + }.toMap + } +} + +// FIXME implement more MetricsSelectors, such as CpuMetricsSelector, +// LoadAverageMetricsSelector, NetworkMetricsSelector. +// Also a CompositeMetricsSelector which uses a mix of other +// selectors. + +/** + * A MetricsSelector is responsible for producing weighted mix of routees + * from the node metrics. The weights are typically proportional to the + * remaining capacity. + */ +abstract class MetricsSelector { + + /** + * Remaining capacity for each node. The value is between + * 0.0 and 1.0, where 0.0 means no remaining capacity (full + * utilization) and 1.0 means full remaining capacity (zero + * utilization). + */ + def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] + + /** + * Converts the capacity values to weights. The node with lowest + * capacity gets weight 1 (lowest usable capacity is 1%) and other + * nodes gets weights proportional to their capacity compared to + * the node with lowest capacity. + */ + def weights(capacity: Map[Address, Double]): Map[Address, Int] = { + if (capacity.isEmpty) Map.empty[Address, Int] + else { + val (_, min) = capacity.minBy { case (_, c) ⇒ c } + // lowest usable capacity is 1% (>= 0.5% will be rounded to weight 1), also avoids div by zero + val divisor = math.max(0.01, min) + capacity mapValues { c ⇒ math.round((c) / divisor).toInt } + } + } + + /** + * Allocates a list of actor refs according to the weight of their node, i.e. + * weight 3 of node A will allocate 3 slots for each ref with address A. + */ + def weightedRefs(refs: IndexedSeq[ActorRef], selfAddress: Address, weights: Map[Address, Int]): IndexedSeq[ActorRef] = { + def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match { + case Address(_, _, None, None) ⇒ selfAddress + case a ⇒ a + } + + val w = weights.withDefaultValue(1) + refs.foldLeft(IndexedSeq.empty[ActorRef]) { (acc, ref) ⇒ + acc ++ IndexedSeq.fill(w(fullAddress(ref)))(ref) + } + } + + /** + * Combines the different pieces to allocate a list of weighted actor refs + * based on the node metrics. + */ + def weightedRefs(refs: IndexedSeq[ActorRef], selfAddress: Address, nodeMetrics: Set[NodeMetrics]): IndexedSeq[ActorRef] = + weightedRefs(refs, selfAddress, weights(capacity(nodeMetrics))) +} \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala index e04d3612d3..aa7c10b1ed 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala @@ -31,6 +31,8 @@ class ClusterMetricsMultiJvmNode5 extends ClusterMetricsSpec abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSpec) with MultiNodeClusterSpec with MetricSpec { import ClusterMetricsMultiJvmSpec._ + def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector] + "Cluster metrics" must { "periodically collect metrics on each node, publish ClusterMetricsChanged to the event stream, " + "and gossip metrics around the node ring" taggedAs LongRunningTest in within(60 seconds) { @@ -38,9 +40,8 @@ abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSp enterBarrier("cluster-started") awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size) awaitCond(clusterView.clusterMetrics.size == roles.size) - assertInitialized(cluster.settings.MetricsRateOfDecay, collectNodeMetrics(clusterView.clusterMetrics).toSet) - val collector = MetricsCollector(cluster.selfAddress, log, system.asInstanceOf[ExtendedActorSystem].dynamicAccess) - clusterView.clusterMetrics.foreach(n ⇒ assertExpectedSampleSize(collector.isSigar, cluster.settings.MetricsRateOfDecay, n)) + val collector = MetricsCollector(cluster.system, cluster.settings) + collector.sample.metrics.size must be > (3) enterBarrier("after") } "reflect the correct number of node metrics in cluster view" taggedAs LongRunningTest in within(30 seconds) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 12fc8ebbc6..4c799aec63 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -64,7 +64,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS def muteLog(sys: ActorSystem = system): Unit = { if (!sys.log.isDebugEnabled) { Seq(".*Metrics collection has started successfully.*", - ".*Hyperic SIGAR was not found on the classpath.*", + ".*Metrics will be retreived from MBeans.*", ".*Cluster Node.* - registered cluster JMX MBean.*", ".*Cluster Node.* - is starting up.*", ".*Shutting down cluster Node.*", diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterAdaptiveLoadBalancingRouterMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterAdaptiveLoadBalancingRouterMultiJvmSpec.scala deleted file mode 100644 index b62f60c0ce..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterAdaptiveLoadBalancingRouterMultiJvmSpec.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.cluster.routing - -import akka.remote.testkit.{MultiNodeSpec, MultiNodeConfig} -import akka.testkit.{LongRunningTest, DefaultTimeout, ImplicitSender} -import akka.actor._ -import akka.cluster.{ MemberStatus, MultiNodeClusterSpec } -import akka.cluster.routing.ClusterRoundRobinRoutedActorMultiJvmSpec.SomeActor - - -object ClusterAdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig { - - val first = role("first") - val second = role("second") - val third = role("third") - val fourth = role("fourth") - val fifth = role("fifth") - - // TODO - config - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) - -} - -class ClusterAdaptiveLoadBalancingRouterMultiJvmNode1 extends ClusterAdaptiveLoadBalancingRouterSpec -class ClusterAdaptiveLoadBalancingRouterMultiJvmNode2 extends ClusterAdaptiveLoadBalancingRouterSpec -class ClusterAdaptiveLoadBalancingRouterMultiJvmNode3 extends ClusterAdaptiveLoadBalancingRouterSpec -class ClusterAdaptiveLoadBalancingRouterMultiJvmNode4 extends ClusterAdaptiveLoadBalancingRouterSpec -class ClusterAdaptiveLoadBalancingRouterMultiJvmNode5 extends ClusterAdaptiveLoadBalancingRouterSpec - -abstract class ClusterAdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(ClusterAdaptiveLoadBalancingRouterMultiJvmSpec) -with MultiNodeClusterSpec -with ImplicitSender with DefaultTimeout { - import ClusterAdaptiveLoadBalancingRouterMultiJvmSpec._ - - // TODO configure properly and leverage the other pending load balancing routers - lazy val router1 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(), - ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router1") - lazy val router2 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(), - ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router2") - lazy val router3 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(), - ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router3") - lazy val router4 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(), - ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router4") - lazy val router5 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(MemoryLoadBalancingRouter(), - ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1))), "router5") - - "A cluster with a ClusterAdaptiveLoadBalancingRouter" must { - "start cluster with 5 nodes" taggedAs LongRunningTest in { - awaitClusterUp(roles: _*) - enterBarrier("cluster-started") - awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size) - awaitCond(clusterView.clusterMetrics.size == roles.size) - enterBarrier("cluster-metrics-consumer-ready") - } - // TODO the rest of the necessary testing. All the work needed for consumption and extraction - // of the data needed is in ClusterMetricsCollector._ - } -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterLoadBalancingRouter.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterLoadBalancingRouter.scala deleted file mode 100644 index 969372112f..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterLoadBalancingRouter.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.cluster.routing - -import language.implicitConversions -import language.postfixOps -import akka.actor._ -import akka.cluster._ -import akka.routing._ -import akka.dispatch.Dispatchers -import akka.event.Logging -import java.util.concurrent.atomic.{ AtomicLong, AtomicReference } -import akka.routing.Destination -import akka.cluster.NodeMetrics -import akka.routing.Broadcast -import akka.actor.OneForOneStrategy -import akka.cluster.ClusterEvent.ClusterMetricsChanged -import akka.cluster.ClusterEvent.CurrentClusterState -import util.Try -import akka.cluster.NodeMetrics.{ NodeMetricsComparator, MetricValues } -import NodeMetricsComparator._ - -/** - * INTERNAL API. - * - * Trait that embodies the contract for all load balancing implementations. - */ -private[cluster] trait LoadBalancer { - - /** - * Compares only those nodes that are deemed 'available' by the - * [[akka.routing.RouteeProvider]] - */ - def selectNodeByHealth(availableNodes: Set[NodeMetrics]): Option[Address] - -} - -/** - * INTERNAL API - */ -private[cluster] object ClusterLoadBalancingRouter { - val defaultSupervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case _ ⇒ SupervisorStrategy.Escalate - } -} - -/** - * INTERNAL API. - * - * The abstract consumer of [[akka.cluster.ClusterMetricsChanged]] events and the primary consumer - * of cluster metric data. This strategy is a metrics-aware router which performs load balancing of - * cluster nodes with a fallback strategy of a [[akka.routing.RoundRobinRouter]]. - * - * Load balancing of nodes is based on .. etc etc desc forthcoming - */ -trait ClusterAdaptiveLoadBalancingRouterLike extends RoundRobinLike with LoadBalancer { this: RouterConfig ⇒ - - def routerDispatcher: String - - override def createRoute(routeeProvider: RouteeProvider): Route = { - if (resizer.isEmpty) { - if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances) - else routeeProvider.registerRouteesFor(routees) - } - - val log = Logging(routeeProvider.context.system, routeeProvider.context.self) - - val next = new AtomicLong(0) - - val nodeMetrics = new AtomicReference[Set[NodeMetrics]](Set.empty) - - val metricsListener = routeeProvider.context.actorOf(Props(new Actor { - def receive = { - case ClusterMetricsChanged(metrics) ⇒ receiveMetrics(metrics) - case _: CurrentClusterState ⇒ // ignore - } - def receiveMetrics(metrics: Set[NodeMetrics]): Unit = { - val availableNodes = routeeProvider.routees.map(_.path.address).toSet - val updated: Set[NodeMetrics] = nodeMetrics.get.collect { case node if availableNodes contains node.address ⇒ node } - nodeMetrics.set(updated) - } - override def postStop(): Unit = Cluster(routeeProvider.context.system) unsubscribe self - }).withDispatcher(routerDispatcher), name = "metricsListener") - Cluster(routeeProvider.context.system).subscribe(metricsListener, classOf[ClusterMetricsChanged]) - - def getNext(): ActorRef = { - // TODO use as/where you will... selects by health category based on the implementation - val address: Option[Address] = selectNodeByHealth(nodeMetrics.get) - // TODO actual routee selection. defaults to round robin. - routeeProvider.routees((next.getAndIncrement % routees.size).asInstanceOf[Int]) - } - - def routeTo(): ActorRef = if (routeeProvider.routees.isEmpty) routeeProvider.context.system.deadLetters else getNext() - - { - case (sender, message) ⇒ - message match { - case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) - case msg ⇒ List(Destination(sender, routeTo())) - } - } - } -} - -/** - * Selects by all monitored metric types (memory, network latency, cpu...) and - * chooses the healthiest node to route to. - */ -@SerialVersionUID(1L) -private[cluster] case class ClusterAdaptiveMetricsLoadBalancingRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, - override val resizer: Option[Resizer] = None, - val routerDispatcher: String = Dispatchers.DefaultDispatcherId, - val supervisorStrategy: SupervisorStrategy = ClusterLoadBalancingRouter.defaultSupervisorStrategy) - extends RouterConfig with ClusterAdaptiveLoadBalancingRouterLike with MetricsAwareClusterNodeSelector { - - // TODO - def selectNodeByHealth(nodes: Set[NodeMetrics]): Option[Address] = { - val s = Set(selectByMemory(nodes), selectByNetworkLatency(nodes), selectByCpu(nodes)) - s.head // TODO select the Address that appears with the highest or lowest frequency - } -} - -@SerialVersionUID(1L) -private[cluster] case class MemoryLoadBalancingRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, - override val resizer: Option[Resizer] = None, - val routerDispatcher: String = Dispatchers.DefaultDispatcherId, - val supervisorStrategy: SupervisorStrategy = ClusterLoadBalancingRouter.defaultSupervisorStrategy) - extends RouterConfig with ClusterAdaptiveLoadBalancingRouterLike with MetricsAwareClusterNodeSelector { - - def selectNodeByHealth(nodes: Set[NodeMetrics]): Option[Address] = selectByMemory(nodes) -} - -@SerialVersionUID(1L) -private[cluster] case class CpuLoadBalancer(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, - override val resizer: Option[Resizer] = None, - val routerDispatcher: String = Dispatchers.DefaultDispatcherId, - val supervisorStrategy: SupervisorStrategy = ClusterLoadBalancingRouter.defaultSupervisorStrategy) - extends RouterConfig with ClusterAdaptiveLoadBalancingRouterLike with MetricsAwareClusterNodeSelector { - - // TODO - def selectNodeByHealth(nodes: Set[NodeMetrics]): Option[Address] = None -} - -@SerialVersionUID(1L) -private[cluster] case class NetworkLatencyLoadBalancer(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, - override val resizer: Option[Resizer] = None, - val routerDispatcher: String = Dispatchers.DefaultDispatcherId, - val supervisorStrategy: SupervisorStrategy = ClusterLoadBalancingRouter.defaultSupervisorStrategy) - extends RouterConfig with ClusterAdaptiveLoadBalancingRouterLike with MetricsAwareClusterNodeSelector { - - // TODO - def selectNodeByHealth(nodes: Set[NodeMetrics]): Option[Address] = None -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterLoadBalancingRouterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterLoadBalancingRouterSpec.scala new file mode 100644 index 0000000000..028693f04c --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterLoadBalancingRouterSpec.scala @@ -0,0 +1,162 @@ +/* + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster.routing + +import language.postfixOps +import scala.concurrent.duration._ +import java.lang.management.ManagementFactory +import akka.remote.testkit.{ MultiNodeSpec, MultiNodeConfig } +import akka.testkit.{ LongRunningTest, DefaultTimeout, ImplicitSender } +import akka.actor._ +import akka.cluster.{ MemberStatus, MultiNodeClusterSpec } +import scala.concurrent.Await +import akka.routing.RouterRoutees +import akka.pattern.ask +import akka.routing.CurrentRoutees +import akka.cluster.Cluster +import scala.concurrent.forkjoin.ThreadLocalRandom +import com.typesafe.config.ConfigFactory + +object ClusterLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig { + + class Routee extends Actor { + var usedMemory: Array[Byte] = _ + def receive = { + case _ ⇒ sender ! Reply(Cluster(context.system).selfAddress) + } + } + + class Memory extends Actor with ActorLogging { + var usedMemory: Array[Byte] = _ + def receive = { + case AllocateMemory ⇒ + val heap = ManagementFactory.getMemoryMXBean.getHeapMemoryUsage + // getMax can be undefined (-1) + val max = math.max(heap.getMax, heap.getCommitted) + val used = heap.getUsed + val allocate = (0.8 * (max - used)).toInt + usedMemory = Array.fill(allocate)(ThreadLocalRandom.current.nextInt(127).toByte) + } + } + + case object AllocateMemory + case class Reply(address: Address) + + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" + akka.cluster.metrics.collect-interval = 1s + akka.cluster.metrics.gossip-interval = 1s + """)).withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class ClusterLoadBalancingRouterMultiJvmNode1 extends ClusterLoadBalancingRouterSpec +class ClusterLoadBalancingRouterMultiJvmNode2 extends ClusterLoadBalancingRouterSpec +class ClusterLoadBalancingRouterMultiJvmNode3 extends ClusterLoadBalancingRouterSpec + +abstract class ClusterLoadBalancingRouterSpec extends MultiNodeSpec(ClusterLoadBalancingRouterMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender with DefaultTimeout { + import ClusterLoadBalancingRouterMultiJvmSpec._ + + def currentRoutees(router: ActorRef) = + Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees + + def receiveReplies(expectedReplies: Int): Map[Address, Int] = { + val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0) + (receiveWhile(5 seconds, messages = expectedReplies) { + case Reply(address) ⇒ address + }).foldLeft(zero) { + case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1)) + } + } + + /** + * Fills in self address for local ActorRef + */ + def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match { + case Address(_, _, None, None) ⇒ cluster.selfAddress + case a ⇒ a + } + + def startRouter(name: String): ActorRef = { + val router = system.actorOf(Props[Routee].withRouter(ClusterRouterConfig( + local = ClusterLoadBalancingRouter(HeapMetricsSelector), + settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1))), name) + awaitCond { + // it may take some time until router receives cluster member events + currentRoutees(router).size == roles.size + } + currentRoutees(router).map(fullAddress).toSet must be(roles.map(address).toSet) + router + } + + "A cluster with a ClusterLoadBalancingRouter" must { + "start cluster nodes" taggedAs LongRunningTest in { + awaitClusterUp(roles: _*) + enterBarrier("after-1") + } + + "use all nodes in the cluster when not overloaded" taggedAs LongRunningTest in { + runOn(first) { + val router1 = startRouter("router1") + + // collect some metrics before we start + Thread.sleep(10000) + + val iterationCount = 100 + for (i ← 0 until iterationCount) { + router1 ! "hit" + Thread.sleep(10) + } + + val replies = receiveReplies(iterationCount) + + replies(first) must be > (0) + replies(second) must be > (0) + replies(third) must be > (0) + replies.values.sum must be(iterationCount) + + } + + enterBarrier("after-2") + } + + "prefer node with more free heap capacity" taggedAs LongRunningTest in { + System.gc() + enterBarrier("gc") + + runOn(second) { + system.actorOf(Props[Memory], "memory") ! AllocateMemory + } + enterBarrier("heap-allocated") + + runOn(first) { + val router2 = startRouter("router2") + router2 + + // collect some metrics before we start + Thread.sleep(10000) + + val iterationCount = 100 + for (i ← 0 until iterationCount) { + router2 ! "hit" + Thread.sleep(10) + } + + val replies = receiveReplies(iterationCount) + + replies(third) must be > (replies(second)) + replies.values.sum must be(iterationCount) + + } + + enterBarrier("after-3") + } + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 2d1a6542bd..34513076c0 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -47,6 +47,7 @@ class ClusterConfigSpec extends AkkaSpec { callTimeout = 2 seconds, resetTimeout = 30 seconds)) MetricsEnabled must be(true) + MetricsCollectorClass must be("akka.cluster.SigarMetricsCollector") MetricsInterval must be(3 seconds) MetricsGossipInterval must be(3 seconds) MetricsRateOfDecay must be(10) diff --git a/akka-cluster/src/test/scala/akka/cluster/DataStreamSpec.scala b/akka-cluster/src/test/scala/akka/cluster/DataStreamSpec.scala index b6357ddc46..f6519f000f 100644 --- a/akka-cluster/src/test/scala/akka/cluster/DataStreamSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/DataStreamSpec.scala @@ -7,48 +7,74 @@ package akka.cluster import language.postfixOps import scala.concurrent.duration._ import akka.testkit.{ LongRunningTest, AkkaSpec } +import scala.concurrent.forkjoin.ThreadLocalRandom +import System.currentTimeMillis @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class DataStreamSpec extends AkkaSpec(MetricsEnabledSpec.config) with AbstractClusterMetricsSpec { +class DataStreamSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricSpec with MetricsCollectorFactory { import system.dispatcher val collector = createMetricsCollector "DataStream" must { + "calcualate same ewma for constant values" in { + val ds = DataStream(decay = 5, ewma = 100.0, currentTimeMillis, currentTimeMillis) :+ + 100.0 :+ 100.0 :+ 100.0 + ds.ewma must be(100.0 plusOrMinus 0.001) + } + + "calcualate correct ewma for normal decay" in { + val d0 = DataStream(decay = 10, ewma = 1000.0, currentTimeMillis, currentTimeMillis) + d0.ewma must be(1000.0 plusOrMinus 0.01) + val d1 = d0 :+ 10.0 + d1.ewma must be(820.0 plusOrMinus 0.01) + val d2 = d1 :+ 10.0 + d2.ewma must be(672.73 plusOrMinus 0.01) + val d3 = d2 :+ 10.0 + d3.ewma must be(552.23 plusOrMinus 0.01) + val d4 = d3 :+ 10.0 + d4.ewma must be(453.64 plusOrMinus 0.01) + + val dn = (1 to 100).foldLeft(d0)((d, _) ⇒ d :+ 10.0) + dn.ewma must be(10.0 plusOrMinus 0.1) + } + + "calculate ewma for decay 1" in { + val d0 = DataStream(decay = 1, ewma = 100.0, currentTimeMillis, currentTimeMillis) + d0.ewma must be(100.0 plusOrMinus 0.01) + val d1 = d0 :+ 1.0 + d1.ewma must be(1.0 plusOrMinus 0.01) + val d2 = d1 :+ 57.0 + d2.ewma must be(57.0 plusOrMinus 0.01) + val d3 = d2 :+ 10.0 + d3.ewma must be(10.0 plusOrMinus 0.01) + } + "calculate the ewma for multiple, variable, data streams" taggedAs LongRunningTest in { - val firstDataSet = collector.sample.metrics.collect { case m if m.trendable && m.isDefined ⇒ m.initialize(DefaultRateOfDecay) } - var streamingDataSet = firstDataSet - - val cancellable = system.scheduler.schedule(0 seconds, 100 millis) { - streamingDataSet = collector.sample.metrics.flatMap(latest ⇒ streamingDataSet.collect { - case streaming if (latest.trendable && latest.isDefined) && (latest same streaming) - && (latest.value.get != streaming.value.get) ⇒ { - val updatedDataStream = streaming.average.get :+ latest.value.get - updatedDataStream.timestamp must be > (streaming.average.get.timestamp) - updatedDataStream.duration.length must be > (streaming.average.get.duration.length) - updatedDataStream.ewma must not be (streaming.average.get.ewma) - updatedDataStream.ewma must not be (latest.value.get) - streaming.copy(value = latest.value, average = Some(updatedDataStream)) + var streamingDataSet = Map.empty[String, Metric] + var usedMemory = Array.empty[Byte] + (1 to 50) foreach { _ ⇒ + Thread.sleep(100) + usedMemory = usedMemory ++ Array.fill(1024)(ThreadLocalRandom.current.nextInt(127).toByte) + val changes = collector.sample.metrics.flatMap { latest ⇒ + streamingDataSet.get(latest.name) match { + case None ⇒ Some(latest) + case Some(previous) ⇒ + if (latest.average.isDefined && latest.isDefined && latest.value.get != previous.value.get) { + val updated = previous :+ latest + updated.average.isDefined must be(true) + val updatedAverage = updated.average.get + updatedAverage.timestamp must be > (previous.average.get.timestamp) + updatedAverage.duration.length must be > (previous.average.get.duration.length) + updatedAverage.ewma must not be (previous.average.get.ewma) + (previous.value.get.longValue compare latest.value.get.longValue) must be( + previous.average.get.ewma.longValue compare updatedAverage.ewma.longValue) + Some(updated) + } else None } - }) - } - awaitCond(firstDataSet.size == streamingDataSet.size, longDuration) - cancellable.cancel() - - val finalDataSet = streamingDataSet.map(m ⇒ m.name -> m).toMap - firstDataSet map { - first ⇒ - val newMetric = finalDataSet(first.name) - val e1 = first.average.get - val e2 = newMetric.average.get - - if (first.value.get != newMetric.value.get) { - e2.ewma must not be (first.value.get) - e2.ewma must not be (newMetric.value.get) - } - if (first.value.get.longValue > newMetric.value.get.longValue) e1.ewma.longValue must be > e2.ewma.longValue - else if (first.value.get.longValue < newMetric.value.get.longValue) e1.ewma.longValue must be < e2.ewma.longValue + } + streamingDataSet ++= changes.map(m ⇒ m.name -> m).toMap } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala index 1f23da769c..f866056d9f 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala @@ -5,39 +5,43 @@ package akka.cluster import akka.testkit.{ ImplicitSender, AkkaSpec } +import akka.cluster.NodeMetrics.MetricValues._ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricNumericConverter with ImplicitSender with AbstractClusterMetricsSpec { +class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricNumericConverter with ImplicitSender with MetricSpec + with MetricsCollectorFactory { "MetricNumericConverter" must { val collector = createMetricsCollector "convert " in { - convert(0).isLeft must be(true) - convert(1).left.get must be(1) - convert(1L).isLeft must be(true) - convert(0.0).isRight must be(true) + convertNumber(0).isLeft must be(true) + convertNumber(1).left.get must be(1) + convertNumber(1L).isLeft must be(true) + convertNumber(0.0).isRight must be(true) } "define a new metric" in { - val metric = Metric("heap-memory-used", Some(0L)) - metric.initializable must be(true) - metric.name must not be (null) - metric.average.isEmpty must be(true) - metric.trendable must be(true) + val metric = Metric(HeapMemoryUsed, Some(256L), decay = Some(10)) + metric.name must be(HeapMemoryUsed) + metric.isDefined must be(true) + metric.value must be(Some(256L)) + metric.average.isDefined must be(true) + metric.average.get.ewma must be(256L) - if (collector.isSigar) { - val cores = collector.totalCores - cores.isDefined must be(true) - cores.value.get.intValue must be > (0) - cores.initializable must be(false) + collector match { + case c: SigarMetricsCollector ⇒ + val cores = c.totalCores + cores.isDefined must be(true) + cores.value.get.intValue must be > (0) + case _ ⇒ } } "define an undefined value with a None " in { - Metric("x", Some(-1)).value.isDefined must be(false) - Metric("x", Some(java.lang.Double.NaN)).value.isDefined must be(false) - Metric("x", None).isDefined must be(false) + Metric("x", Some(-1), None).value.isDefined must be(false) + Metric("x", Some(java.lang.Double.NaN), None).value.isDefined must be(false) + Metric("x", None, None).isDefined must be(false) } "recognize whether a metric value is defined" in { diff --git a/akka-cluster/src/test/scala/akka/cluster/NodeMetricsComparatorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricValuesSpec.scala similarity index 53% rename from akka-cluster/src/test/scala/akka/cluster/NodeMetricsComparatorSpec.scala rename to akka-cluster/src/test/scala/akka/cluster/MetricValuesSpec.scala index 54e540763b..5910d0510f 100644 --- a/akka-cluster/src/test/scala/akka/cluster/NodeMetricsComparatorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MetricValuesSpec.scala @@ -4,17 +4,42 @@ package akka.cluster +import scala.util.Try import akka.actor.Address +import akka.cluster.NodeMetrics.MetricValues._ +import akka.testkit.AkkaSpec @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class NodeMetricsComparatorSpec extends ClusterMetricsRouteeSelectorSpec { - +class MetricValuesSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricSpec + with MetricsCollectorFactory { import NodeMetrics._ + val collector = createMetricsCollector + + val node1 = NodeMetrics(Address("akka", "sys", "a", 2554), 1, collector.sample.metrics) + val node2 = NodeMetrics(Address("akka", "sys", "a", 2555), 1, collector.sample.metrics) + + var nodes: Seq[NodeMetrics] = Seq(node1, node2) + + // work up the data streams where applicable + for (i ← 1 to 100) { + nodes = nodes map { + n ⇒ + n.copy(metrics = collector.sample.metrics.flatMap(latest ⇒ n.metrics.collect { + case streaming if latest same streaming ⇒ + streaming.average match { + case Some(e) ⇒ streaming.copy(value = latest.value, average = + if (latest.isDefined) Some(e :+ latest.value.get.doubleValue) else None) + case None ⇒ streaming.copy(value = latest.value) + } + })) + } + } + "NodeMetrics.MetricValues" must { "extract expected metrics for load balancing" in { - val stream1 = node2.metric("heap-memory-committed").value.get.longValue() - val stream2 = node1.metric("heap-memory-used").value.get.longValue() + val stream1 = node2.metric(HeapMemoryCommitted).value.get.longValue + val stream2 = node1.metric(HeapMemoryUsed).value.get.longValue stream1 must be >= (stream2) } @@ -39,8 +64,9 @@ class NodeMetricsComparatorSpec extends ClusterMetricsRouteeSelectorSpec { } val (systemLoadAverage, processors, combinedCpu, cores) = MetricValues.unapply(node.cpu) - systemLoadAverage must be >= (0.0) processors must be > (0) + if (systemLoadAverage.isDefined) + systemLoadAverage.get must be >= (0.0) if (combinedCpu.isDefined) { combinedCpu.get must be <= (1.0) combinedCpu.get must be >= (0.0) @@ -53,29 +79,4 @@ class NodeMetricsComparatorSpec extends ClusterMetricsRouteeSelectorSpec { } } - "NodeMetricsComparator" must { - val seq = Seq((Address("akka", "sys", "a", 2554), 9L), (Address("akka", "sys", "a", 2555), 8L)) - val seq2 = Seq((Address("akka", "sys", "a", 2554), 9.0), (Address("akka", "sys", "a", 2555), 8.0)) - - "handle min ordering of a (Address, Long)" in { - import NodeMetricsComparator.longMinAddressOrdering - seq.min._1.port.get must be(2555) - seq.min._2 must be(8L) - } - "handle max ordering of a (Address, Long)" in { - val (address, value) = NodeMetricsComparator.maxAddressLong(seq) - value must be(9L) - address.port.get must be(2554) - } - "handle min ordering of a (Address, Double)" in { - import NodeMetricsComparator.doubleMinAddressOrdering - seq2.min._1.port.get must be(2555) - seq2.min._2 must be(8.0) - } - "handle max ordering of a (Address, Double)" in { - val (address, value) = NodeMetricsComparator.maxAddressDouble(seq2) - value must be(9.0) - address.port.get must be(2554) - } - } } \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricsAwareClusterNodeSelectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricsAwareClusterNodeSelectorSpec.scala deleted file mode 100644 index 01327d65f9..0000000000 --- a/akka-cluster/src/test/scala/akka/cluster/MetricsAwareClusterNodeSelectorSpec.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.cluster - -import akka.testkit.AkkaSpec -import akka.actor.Address -import akka.cluster.NodeMetrics.MetricValues -import util.control.NonFatal -import util.Try - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class MetricsAwareClusterNodeSelectorSpec extends ClusterMetricsRouteeSelectorSpec with MetricsAwareClusterNodeSelector { - import NodeMetrics.NodeMetricsComparator.longMinAddressOrdering - - "MetricsAwareClusterNodeSelector" must { - - "select the address of the node with the lowest memory" in { - for (i ← 0 to 10) { // run enough times to insure we test differing metric values - - val map = nodes.map(n ⇒ n.address -> MetricValues.unapply(n.heapMemory)).toMap - val (used1, committed1, max1) = map.get(node1.address).get - val (used2, committed2, max2) = map.get(node2.address).get - val diff1 = max1 match { - case Some(m) ⇒ ((committed1 - used1) + (m - used1) + (m - committed1)) - case None ⇒ committed1 - used1 - } - val diff2 = max2 match { - case Some(m) ⇒ ((committed2 - used2) + (m - used2) + (m - committed2)) - case None ⇒ committed2 - used2 - } - val testMin = Set(diff1, diff2).min - val expectedAddress = if (testMin == diff1) node1.address else node2.address - val address = selectByMemory(nodes.toSet).get - address must be(expectedAddress) - } - } - "select the address of the node with the lowest network latency" in { - // TODO - } - "select the address of the node with the best CPU health" in { - // TODO - } - "select the address of the node with the best overall health based on all metric categories monitored" in { - // TODO - } - } -} - -abstract class ClusterMetricsRouteeSelectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with AbstractClusterMetricsSpec { - - val collector = createMetricsCollector - - var node1 = NodeMetrics(Address("akka", "sys", "a", 2554), 1, collector.sample.metrics) - node1 = node1.copy(metrics = node1.metrics.collect { case m ⇒ m.initialize(DefaultRateOfDecay) }) - - var node2 = NodeMetrics(Address("akka", "sys", "a", 2555), 1, collector.sample.metrics) - node2 = node2.copy(metrics = node2.metrics.collect { case m ⇒ m.initialize(DefaultRateOfDecay) }) - - var nodes: Seq[NodeMetrics] = Seq(node1, node2) - - // work up the data streams where applicable - for (i ← 0 to samples) { - nodes = nodes map { - n ⇒ - n.copy(metrics = collector.sample.metrics.flatMap(latest ⇒ n.metrics.collect { - case streaming if latest same streaming ⇒ - streaming.average match { - case Some(e) ⇒ streaming.copy(value = latest.value, average = Try(Some(e :+ latest.value.get)) getOrElse None) - case None ⇒ streaming.copy(value = latest.value) - } - })) - } - } -} diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala index c7a4ad4c73..ad836bf5f7 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala @@ -7,17 +7,18 @@ package akka.cluster import scala.language.postfixOps import scala.concurrent.duration._ import scala.concurrent.Await +import scala.util.{ Success, Try, Failure } import akka.actor._ import akka.testkit._ +import akka.cluster.NodeMetrics.MetricValues._ import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -import util.{ Success, Try, Failure } object MetricsEnabledSpec { val config = """ akka.cluster.metrics.enabled = on - akka.cluster.metrics.metrics-interval = 1 s + akka.cluster.metrics.collect-interval = 1 s akka.cluster.metrics.gossip-interval = 1 s akka.cluster.metrics.rate-of-decay = 10 akka.actor.provider = "akka.remote.RemoteActorRefProvider" @@ -25,22 +26,16 @@ object MetricsEnabledSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with AbstractClusterMetricsSpec with MetricSpec { +class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with MetricSpec + with MetricsCollectorFactory { import system.dispatcher val collector = createMetricsCollector "Metric must" must { - "create and initialize a new metric or merge an existing one" in { - for (i ← 0 to samples) { - val metrics = collector.sample.metrics - assertCreatedUninitialized(metrics) - assertInitialized(DefaultRateOfDecay, metrics map (_.initialize(DefaultRateOfDecay))) - } - } "merge 2 metrics that are tracking the same metric" in { - for (i ← 0 to samples) { + for (i ← 1 to 20) { val sample1 = collector.sample.metrics val sample2 = collector.sample.metrics var merged = sample2 flatMap (latest ⇒ sample1 collect { @@ -51,8 +46,8 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl } }) - val sample3 = collector.sample.metrics map (_.initialize(DefaultRateOfDecay)) - val sample4 = collector.sample.metrics map (_.initialize(DefaultRateOfDecay)) + val sample3 = collector.sample.metrics + val sample4 = collector.sample.metrics merged = sample4 flatMap (latest ⇒ sample3 collect { case peer if latest same peer ⇒ { val m = peer :+ latest @@ -74,62 +69,61 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl "collect accurate metrics for a node" in { val sample = collector.sample - assertExpectedSampleSize(collector.isSigar, DefaultRateOfDecay, sample) val metrics = sample.metrics.collect { case m if m.isDefined ⇒ (m.name, m.value.get) } - val used = metrics collectFirst { case ("heap-memory-used", b) ⇒ b } - val committed = metrics collectFirst { case ("heap-memory-committed", b) ⇒ b } + val used = metrics collectFirst { case (HeapMemoryUsed, b) ⇒ b } + val committed = metrics collectFirst { case (HeapMemoryCommitted, b) ⇒ b } metrics foreach { - case ("total-cores", b) ⇒ b.intValue must be > (0) - case ("network-max-rx", b) ⇒ b.longValue must be > (0L) - case ("network-max-tx", b) ⇒ b.longValue must be > (0L) - case ("system-load-average", b) ⇒ b.doubleValue must be >= (0.0) - case ("processors", b) ⇒ b.intValue must be >= (0) - case ("heap-memory-used", b) ⇒ b.longValue must be >= (0L) - case ("heap-memory-committed", b) ⇒ b.longValue must be > (0L) - case ("cpu-combined", b) ⇒ - b.doubleValue must be <= (1.0) - b.doubleValue must be >= (0.0) - case ("heap-memory-max", b) ⇒ + case (TotalCores, b) ⇒ b.intValue must be > (0) + case (NetworkInboundRate, b) ⇒ b.longValue must be > (0L) + case (NetworkOutboundRate, b) ⇒ b.longValue must be > (0L) + case (SystemLoadAverage, b) ⇒ b.doubleValue must be >= (0.0) + case (Processors, b) ⇒ b.intValue must be >= (0) + case (HeapMemoryUsed, b) ⇒ b.longValue must be >= (0L) + case (HeapMemoryCommitted, b) ⇒ b.longValue must be > (0L) + case (HeapMemoryMax, b) ⇒ + b.longValue must be > (0L) used.get.longValue must be <= (b.longValue) committed.get.longValue must be <= (b.longValue) + case (CpuCombined, b) ⇒ + b.doubleValue must be <= (1.0) + b.doubleValue must be >= (0.0) + } } "collect SIGAR metrics if it is on the classpath" in { - if (collector.isSigar) { - // combined cpu may or may not be defined on a given sampling - // systemLoadAverage is SIGAR present - collector.systemLoadAverage.isDefined must be(true) - collector.networkStats.nonEmpty must be(true) - collector.networkMaxRx.isDefined must be(true) - collector.networkMaxTx.isDefined must be(true) - collector.totalCores.isDefined must be(true) + collector match { + case c: SigarMetricsCollector ⇒ + // combined cpu may or may not be defined on a given sampling + // systemLoadAverage is not present on all platforms + c.networkMaxRx.isDefined must be(true) + c.networkMaxTx.isDefined must be(true) + c.totalCores.isDefined must be(true) + case _ ⇒ } } "collect JMX metrics" in { // heap max may be undefined depending on the OS - // systemLoadAverage is JMX if SIGAR not present, but not available on all OS - collector.used.isDefined must be(true) - collector.committed.isDefined must be(true) - collector.processors.isDefined must be(true) + // systemLoadAverage is JMX when SIGAR not present, but + // it's not present on all platforms + val c = collector.asInstanceOf[JmxMetricsCollector] + c.heapUsed.isDefined must be(true) + c.heapCommitted.isDefined must be(true) + c.processors.isDefined must be(true) } - "collect [" + samples + "] node metrics samples in an acceptable duration" taggedAs LongRunningTest in { - val latch = TestLatch(samples) - val task = system.scheduler.schedule(0 seconds, interval) { + "collect 50 node metrics samples in an acceptable duration" taggedAs LongRunningTest in within(7 seconds) { + (1 to 50) foreach { _ ⇒ val sample = collector.sample - assertCreatedUninitialized(sample.metrics) - assertExpectedSampleSize(collector.isSigar, DefaultRateOfDecay, sample) - latch.countDown() + sample.metrics.size must be >= (3) + Thread.sleep(100) } - Await.ready(latch, longDuration) - task.cancel() } } } -trait MetricSpec extends WordSpec with MustMatchers { +trait MetricSpec extends WordSpec with MustMatchers { this: { def system: ActorSystem } ⇒ def assertMasterMetricsAgainstGossipMetrics(master: Set[NodeMetrics], gossip: MetricsGossip): Unit = { val masterMetrics = collectNodeMetrics(master) @@ -140,52 +134,22 @@ trait MetricSpec extends WordSpec with MustMatchers { def assertExpectedNodeAddresses(gossip: MetricsGossip, nodes: Set[NodeMetrics]): Unit = gossip.nodes.map(_.address) must be(nodes.map(_.address)) - def assertExpectedSampleSize(isSigar: Boolean, gossip: MetricsGossip): Unit = - gossip.nodes.foreach(n ⇒ assertExpectedSampleSize(isSigar, gossip.rateOfDecay, n)) - - def assertCreatedUninitialized(gossip: MetricsGossip): Unit = - gossip.nodes.foreach(n ⇒ assertCreatedUninitialized(n.metrics.filterNot(_.trendable))) - - def assertInitialized(gossip: MetricsGossip): Unit = - gossip.nodes.foreach(n ⇒ assertInitialized(gossip.rateOfDecay, n.metrics)) - - def assertCreatedUninitialized(metrics: Set[Metric]): Unit = { - metrics.size must be > (0) - metrics foreach { m ⇒ - m.average.isEmpty must be(true) - if (m.value.isDefined) m.isDefined must be(true) - if (m.initializable) (m.trendable && m.isDefined && m.average.isEmpty) must be(true) - } - } - - def assertInitialized(decay: Int, metrics: Set[Metric]): Unit = if (decay > 0) metrics.filter(_.trendable) foreach { m ⇒ - m.initializable must be(false) - if (m.isDefined) m.average.isDefined must be(true) - } - def assertMerged(latest: Metric, peer: Metric, merged: Metric): Unit = if (latest same peer) { if (latest.isDefined) { if (peer.isDefined) { merged.isDefined must be(true) merged.value.get must be(latest.value.get) - if (latest.trendable) { - if (latest.initializable) merged.average.isEmpty must be(true) - else merged.average.isDefined must be(true) - } + merged.average.isDefined must be(latest.average.isDefined) } else { merged.isDefined must be(true) merged.value.get must be(latest.value.get) - if (latest.average.isDefined) merged.average.get must be(latest.average.get) - else merged.average.isEmpty must be(true) + merged.average.isDefined must be(latest.average.isDefined || peer.average.isDefined) } } else { if (peer.isDefined) { merged.isDefined must be(true) merged.value.get must be(peer.value.get) - if (peer.trendable) { - if (peer.initializable) merged.average.isEmpty must be(true) - else merged.average.isDefined must be(true) - } + merged.average.isDefined must be(peer.average.isDefined) } else { merged.isDefined must be(false) merged.average.isEmpty must be(true) @@ -193,20 +157,6 @@ trait MetricSpec extends WordSpec with MustMatchers { } } - def assertExpectedSampleSize(isSigar: Boolean, decay: Int, node: NodeMetrics): Unit = { - node.metrics.size must be(9) - val metrics = node.metrics.filter(_.isDefined) - if (isSigar) { // combined cpu + jmx max heap - metrics.size must be >= (7) - metrics.size must be <= (9) - } else { // jmx max heap - metrics.size must be >= (4) - metrics.size must be <= (5) - } - - if (decay > 0) metrics.collect { case m if m.trendable && (!m.initializable) ⇒ m }.foreach(_.average.isDefined must be(true)) - } - def collectNodeMetrics(nodes: Set[NodeMetrics]): Seq[Metric] = { var r: Seq[Metric] = Seq.empty nodes.foreach(n ⇒ r ++= n.metrics.filter(_.isDefined)) @@ -214,19 +164,25 @@ trait MetricSpec extends WordSpec with MustMatchers { } } -trait AbstractClusterMetricsSpec extends DefaultTimeout { - this: AkkaSpec ⇒ +/** + * Used when testing metrics without full cluster + */ +trait MetricsCollectorFactory { this: AkkaSpec ⇒ - val selfAddress = new Address("akka", "localhost") + private def extendedActorSystem = system.asInstanceOf[ExtendedActorSystem] - val DefaultRateOfDecay = 10 + def selfAddress = extendedActorSystem.provider.rootPath.address - val interval: FiniteDuration = 100 millis + val defaultRateOfDecay = 10 - val longDuration = 120 seconds // for long running tests + def createMetricsCollector: MetricsCollector = + Try(new SigarMetricsCollector(selfAddress, defaultRateOfDecay, extendedActorSystem.dynamicAccess)) match { + case Success(sigarCollector) ⇒ sigarCollector + case Failure(e) ⇒ + log.debug("Metrics will be retreived from MBeans, Sigar failed to load. Reason: " + + e.getMessage) + new JmxMetricsCollector(selfAddress, defaultRateOfDecay) + } - val samples = 100 - - def createMetricsCollector: MetricsCollector = MetricsCollector(selfAddress, log, system.asInstanceOf[ExtendedActorSystem].dynamicAccess) - -} \ No newline at end of file + def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector] +} diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricsGossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricsGossipSpec.scala index e44b02e409..3ec10001f2 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MetricsGossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MetricsGossipSpec.scala @@ -12,7 +12,8 @@ import akka.actor.Address import java.lang.System.{ currentTimeMillis ⇒ newTimestamp } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with AbstractClusterMetricsSpec with MetricSpec { +class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with MetricSpec + with MetricsCollectorFactory { val collector = createMetricsCollector @@ -21,27 +22,25 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics) val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics) - var localGossip = MetricsGossip(DefaultRateOfDecay) + var localGossip = MetricsGossip() localGossip :+= m1 localGossip.nodes.size must be(1) localGossip.nodeKeys.size must be(localGossip.nodes.size) assertMasterMetricsAgainstGossipMetrics(Set(m1), localGossip) - assertExpectedSampleSize(collector.isSigar, localGossip) - assertInitialized(localGossip.rateOfDecay, collectNodeMetrics(localGossip.nodes).toSet) + collector.sample.metrics.size must be > (3) localGossip :+= m2 localGossip.nodes.size must be(2) localGossip.nodeKeys.size must be(localGossip.nodes.size) assertMasterMetricsAgainstGossipMetrics(Set(m1, m2), localGossip) - assertExpectedSampleSize(collector.isSigar, localGossip) - assertInitialized(localGossip.rateOfDecay, collectNodeMetrics(localGossip.nodes).toSet) + collector.sample.metrics.size must be > (3) } "merge peer metrics" in { val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics) val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics) - var remoteGossip = MetricsGossip(DefaultRateOfDecay) + var remoteGossip = MetricsGossip() remoteGossip :+= m1 remoteGossip :+= m2 remoteGossip.nodes.size must be(2) @@ -51,7 +50,7 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici remoteGossip :+= m2Updated // merge peers remoteGossip.nodes.size must be(2) assertMasterMetricsAgainstGossipMetrics(beforeMergeNodes, remoteGossip) - assertExpectedSampleSize(collector.isSigar, remoteGossip) + remoteGossip.nodes.foreach(_.metrics.size must be > (3)) remoteGossip.nodes collect { case peer if peer.address == m2.address ⇒ peer.timestamp must be(m2Updated.timestamp) } } @@ -61,11 +60,11 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici val m3 = NodeMetrics(Address("akka", "sys", "a", 2556), newTimestamp, collector.sample.metrics) val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = newTimestamp) - var localGossip = MetricsGossip(DefaultRateOfDecay) + var localGossip = MetricsGossip() localGossip :+= m1 localGossip :+= m2 - var remoteGossip = MetricsGossip(DefaultRateOfDecay) + var remoteGossip = MetricsGossip() remoteGossip :+= m3 remoteGossip :+= m2Updated @@ -76,15 +75,13 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici val mergedGossip = localGossip merge remoteGossip mergedGossip.nodes.size must be(3) assertExpectedNodeAddresses(mergedGossip, Set(m1, m2, m3)) - assertExpectedSampleSize(collector.isSigar, mergedGossip) - assertCreatedUninitialized(mergedGossip) - assertInitialized(mergedGossip) + mergedGossip.nodes.foreach(_.metrics.size must be > (3)) mergedGossip.nodes.find(_.address == m2.address).get.timestamp must be(m2Updated.timestamp) } "get the current NodeMetrics if it exists in the local nodes" in { val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics) - var localGossip = MetricsGossip(DefaultRateOfDecay) + var localGossip = MetricsGossip() localGossip :+= m1 localGossip.metricsFor(m1).nonEmpty must be(true) } @@ -93,7 +90,7 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics) val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics) - var localGossip = MetricsGossip(DefaultRateOfDecay) + var localGossip = MetricsGossip() localGossip :+= m1 localGossip :+= m2 diff --git a/akka-cluster/src/test/scala/akka/cluster/NodeMetricsSpec.scala b/akka-cluster/src/test/scala/akka/cluster/NodeMetricsSpec.scala index 5d58bc84e5..d6a47db670 100644 --- a/akka-cluster/src/test/scala/akka/cluster/NodeMetricsSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/NodeMetricsSpec.scala @@ -8,7 +8,8 @@ import akka.testkit.AkkaSpec import akka.actor.Address @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class NodeMetricsSpec extends AkkaSpec with AbstractClusterMetricsSpec with MetricSpec { +class NodeMetricsSpec extends AkkaSpec with MetricSpec + with MetricsCollectorFactory { val collector = createMetricsCollector diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/HeapMetricsSelectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/routing/HeapMetricsSelectorSpec.scala new file mode 100644 index 0000000000..5a8fecc2eb --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/routing/HeapMetricsSelectorSpec.scala @@ -0,0 +1,114 @@ +/* + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster.routing + +import akka.testkit.AkkaSpec +import akka.actor.Address +import akka.actor.RootActorPath +import akka.cluster.NodeMetrics +import akka.cluster.NodeMetrics.MetricValues._ +import akka.cluster.Metric +import com.typesafe.config.ConfigFactory + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class HeapMetricsSelectorSpec extends AkkaSpec(ConfigFactory.parseString(""" + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.netty.port = 0 + """)) { + + val selector = HeapMetricsSelector + + val a1 = Address("akka", "sys", "a1", 2551) + val b1 = Address("akka", "sys", "b1", 2551) + val c1 = Address("akka", "sys", "c1", 2551) + val d1 = Address("akka", "sys", "d1", 2551) + + val decay = Some(10) + + val nodeMetricsA = NodeMetrics(a1, System.currentTimeMillis, Set( + Metric(HeapMemoryUsed, Some(BigInt(128)), decay), + Metric(HeapMemoryCommitted, Some(BigInt(256)), decay), + Metric(HeapMemoryMax, Some(BigInt(512)), None))) + + val nodeMetricsB = NodeMetrics(b1, System.currentTimeMillis, Set( + Metric(HeapMemoryUsed, Some(BigInt(256)), decay), + Metric(HeapMemoryCommitted, Some(BigInt(512)), decay), + Metric(HeapMemoryMax, Some(BigInt(1024)), None))) + + val nodeMetricsC = NodeMetrics(c1, System.currentTimeMillis, Set( + Metric(HeapMemoryUsed, Some(BigInt(1024)), decay), + Metric(HeapMemoryCommitted, Some(BigInt(1024)), decay), + Metric(HeapMemoryMax, Some(BigInt(1024)), None))) + + val nodeMetrics = Set(nodeMetricsA, nodeMetricsB, nodeMetricsC) + + "MetricsAwareClusterNodeSelector" must { + + "calculate capacity of heap metrics" in { + val capacity = selector.capacity(nodeMetrics) + capacity(a1) must be(0.75 plusOrMinus 0.0001) + capacity(b1) must be(0.75 plusOrMinus 0.0001) + capacity(c1) must be(0.0 plusOrMinus 0.0001) + } + + "calculate weights from capacity" in { + val capacity = Map(a1 -> 0.6, b1 -> 0.3, c1 -> 0.1) + val weights = selector.weights(capacity) + weights must be(Map(c1 -> 1, b1 -> 3, a1 -> 6)) + } + + "handle low and zero capacity" in { + val capacity = Map(a1 -> 0.0, b1 -> 1.0, c1 -> 0.005, d1 -> 0.004) + val weights = selector.weights(capacity) + weights must be(Map(a1 -> 0, b1 -> 100, c1 -> 1, d1 -> 0)) + } + + "allocate weighted refs" in { + val weights = Map(a1 -> 1, b1 -> 3, c1 -> 10) + val refs = IndexedSeq( + system.actorFor(RootActorPath(a1) / "user" / "a"), + system.actorFor(RootActorPath(b1) / "user" / "b"), + system.actorFor(RootActorPath(c1) / "user" / "c")) + val result = selector.weightedRefs(refs, a1, weights) + val grouped = result.groupBy(_.path.address) + grouped(a1).size must be(1) + grouped(b1).size must be(3) + grouped(c1).size must be(10) + } + + "allocate refs for undefined weight" in { + val weights = Map(a1 -> 1, b1 -> 2) + val refs = IndexedSeq( + system.actorFor(RootActorPath(a1) / "user" / "a"), + system.actorFor(RootActorPath(b1) / "user" / "b"), + system.actorFor(RootActorPath(c1) / "user" / "c")) + val result = selector.weightedRefs(refs, a1, weights) + val grouped = result.groupBy(_.path.address) + grouped(a1).size must be(1) + grouped(b1).size must be(2) + grouped(c1).size must be(1) + } + + "allocate weighted local refs" in { + val weights = Map(a1 -> 2, b1 -> 1, c1 -> 10) + val refs = IndexedSeq( + testActor, + system.actorFor(RootActorPath(b1) / "user" / "b"), + system.actorFor(RootActorPath(c1) / "user" / "c")) + val result = selector.weightedRefs(refs, a1, weights) + result.filter(_ == testActor).size must be(2) + } + + "not allocate ref with weight zero" in { + val weights = Map(a1 -> 0, b1 -> 2, c1 -> 10) + val refs = IndexedSeq( + system.actorFor(RootActorPath(a1) / "user" / "a"), + system.actorFor(RootActorPath(b1) / "user" / "b"), + system.actorFor(RootActorPath(c1) / "user" / "c")) + val result = selector.weightedRefs(refs, a1, weights) + result.filter(_ == refs.head).size must be(0) + } + } +} \ No newline at end of file