AdaptiveLoadBalancingRouter and more refactoring of metrics, see #2547

* Refactoring of standard metrics extractors and data structures
* Removed optional value in Metric, simplified a lot
* Configuration of EWMA by using half-life duration
* Renamed DataStream to EWMA
* Incorporate review feedback
* Use binarySearch for selecting weighted routees
* More metrics selectors for the router
* Removed network metrics, since not supported on linux
* Configuration of router
* Rename to AdaptiveLoadBalancingRouter
* Remove total cores metrics, since it's the same as jmx getAvailableProcessors,
  tested on intel 24 core server and amd 48 core server, and MBP
* API cleanup
* Java API additions
* Documentation of metrics and AdaptiveLoadBalancingRouter
* New cluster sample to illustrate metrics in the documentation,
  and play around with (factorial)
This commit is contained in:
Patrik Nordwall 2012-11-08 18:49:54 +01:00
parent c9d206764a
commit dcde7d3594
61 changed files with 1885 additions and 975 deletions

View file

@ -138,16 +138,24 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce
}
def parseConfig(key: String, config: Config): Option[Deploy] = {
val deployment = config.withFallback(default)
val router = createRouterConfig(deployment.getString("router"), key, config, deployment)
Some(Deploy(key, deployment, router, NoScopeGiven))
}
/**
* Factory method for creating `RouterConfig`
* @param routerType the configured name of the router, or FQCN
* @param key the full configuration key of the deployment section
* @param config the user defined config of the deployment, without defaults
* @param deployment the deployment config, with defaults
*/
protected def createRouterConfig(routerType: String, key: String, config: Config, deployment: Config): RouterConfig = {
val routees = Vector() ++ deployment.getStringList("routees.paths").asScala
val nrOfInstances = deployment.getInt("nr-of-instances")
val resizer = if (config.hasPath("resizer")) Some(DefaultResizer(deployment.getConfig("resizer"))) else None
val resizer: Option[Resizer] = if (config.hasPath("resizer")) Some(DefaultResizer(deployment.getConfig("resizer"))) else None
val router: RouterConfig = deployment.getString("router") match {
routerType match {
case "from-code" NoRouter
case "round-robin" RoundRobinRouter(nrOfInstances, routees, resizer)
case "random" RandomRouter(nrOfInstances, routees, resizer)
@ -169,7 +177,6 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce
.format(fqn, key), exception)
}).get
}
Some(Deploy(key, deployment, router, NoScopeGiven))
}
}

View file

@ -70,7 +70,7 @@ akka {
failure-detector {
# FQCN of the failure detector implementation.
# It must implement akka.cluster.cluster.FailureDetector and
# It must implement akka.cluster.FailureDetector and
# have constructor with akka.actor.ActorSystem and
# akka.cluster.ClusterSettings parameters
implementation-class = "akka.cluster.AccrualFailureDetector"
@ -106,14 +106,6 @@ akka {
max-sample-size = 1000
}
# Uses JMX and Hyperic SIGAR, if SIGAR is on the classpath.
# Metrics that are only available with SIGAR for load balancing of nodes are:
# 1. Network latency
# 2. CPU
# A. Combined CPU (sum of User + Sys + Nice + Wait, in percentage)
# B. System load average: on some OS the JMX load average may not be available,
# however if SIGAR is on the classpath it is available on any OS.
# C. The number of cores per processor
metrics {
# Enable or disable metrics collector for load-balancing nodes.
enabled = on
@ -121,6 +113,8 @@ akka {
# FQCN of the metrics collector implementation.
# It must implement akka.cluster.cluster.MetricsCollector and
# have constructor with akka.actor.ActorSystem parameter.
# The default SigarMetricsCollector uses JMX and Hyperic SIGAR, if SIGAR
# is on the classpath, otherwise only JMX.
collector-class = "akka.cluster.SigarMetricsCollector"
# How often metrics is sampled on a node.
@ -131,10 +125,9 @@ akka {
# How quickly the exponential weighting of past data is decayed compared to
# new data. Set lower to increase the bias toward newer values.
# Corresponds to 'N time periods' as explained in
# http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
# Must be >= 1
rate-of-decay = 10
# It takes about 4 half-life to drop below 10% contribution, and 7 to drop
# below 1%.
decay-half-life-duration = 12s
}
# If the tick-duration of the default scheduler is longer than the
@ -156,6 +149,16 @@ akka {
}
# Default configuration for routers
actor.deployment.default {
# MetricsSelector to use
# - available: "mix", "heap", "cpu", "load"
# - or: Fully qualified class name of the MetricsSelector class.
# The class must extend akka.cluster.routing.MetricsSelector
# and have a constructor with com.typesafe.config.Config
# parameter.
# - default is "mix"
metrics-selector = mix
}
actor.deployment.default.cluster {
# enable cluster aware router that deploys to nodes in the cluster
enabled = off
@ -182,4 +185,5 @@ akka {
routees-path = ""
}
}

View file

@ -3,6 +3,7 @@
*/
package akka.cluster
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import com.typesafe.config.Config
import akka.ConfigurationException
import akka.actor.Actor
@ -25,6 +26,14 @@ import akka.event.EventStream
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteDeployer
import akka.remote.routing.RemoteRouterConfig
import akka.routing.RouterConfig
import akka.routing.DefaultResizer
import akka.cluster.routing.AdaptiveLoadBalancingRouter
import akka.cluster.routing.MixMetricsSelector
import akka.cluster.routing.HeapMetricsSelector
import akka.cluster.routing.SystemLoadAverageMetricsSelector
import akka.cluster.routing.CpuMetricsSelector
import akka.cluster.routing.MetricsSelector
/**
* INTERNAL API
@ -45,7 +54,7 @@ class ClusterActorRefProvider(
remoteDeploymentWatcher = system.systemActorOf(Props[RemoteDeploymentWatcher], "RemoteDeploymentWatcher")
}
override val deployer: ClusterDeployer = new ClusterDeployer(settings, dynamicAccess)
override lazy val deployer: ClusterDeployer = new ClusterDeployer(settings, dynamicAccess)
/**
* This method is overridden here to keep track of remote deployed actors to
@ -108,6 +117,36 @@ private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: Dynami
case None None
}
}
override protected def createRouterConfig(routerType: String, key: String, config: Config, deployment: Config): RouterConfig = {
val routees = Vector() ++ deployment.getStringList("routees.paths").asScala
val nrOfInstances = deployment.getInt("nr-of-instances")
val resizer = if (config.hasPath("resizer")) Some(DefaultResizer(deployment.getConfig("resizer"))) else None
routerType match {
case "adaptive"
val metricsSelector = deployment.getString("metrics-selector") match {
case "mix" MixMetricsSelector()
case "heap" HeapMetricsSelector
case "cpu" CpuMetricsSelector
case "load" SystemLoadAverageMetricsSelector
case fqn
val args = Seq(classOf[Config] -> deployment)
dynamicAccess.createInstanceFor[MetricsSelector](fqn, args).recover({
case exception throw new IllegalArgumentException(
("Cannot instantiate metrics-selector [%s], defined in [%s], " +
"make sure it extends [akka.cluster.routing.MetricsSelector] and " +
"has constructor with [com.typesafe.config.Config] parameter")
.format(fqn, key), exception)
}).get
}
AdaptiveLoadBalancingRouter(metricsSelector, nrOfInstances, routees, resizer)
case _ super.createRouterConfig(routerType, key, config, deployment)
}
}
}
@SerialVersionUID(1L)

View file

@ -10,6 +10,8 @@ import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus._
import akka.event.EventStream
import akka.actor.AddressTerminated
import java.lang.Iterable
import scala.collection.JavaConverters
/**
* Domain events published to the event bus.
@ -139,11 +141,18 @@ object ClusterEvent {
}
/**
* INTERNAL API
*
* Current snapshot of cluster member metrics. Published to subscribers.
* Current snapshot of cluster node metrics. Published to subscribers.
*/
case class ClusterMetricsChanged(nodes: Set[NodeMetrics]) extends ClusterDomainEvent
case class ClusterMetricsChanged(nodeMetrics: Set[NodeMetrics]) extends ClusterDomainEvent {
/**
* Java API
*/
def getNodeMetrics: java.lang.Iterable[NodeMetrics] = {
import scala.collection.JavaConverters._
nodeMetrics.asJava
}
}
/**
* INTERNAL API

View file

@ -9,13 +9,11 @@ import java.lang.System.{ currentTimeMillis ⇒ newTimestamp }
import java.lang.management.{ OperatingSystemMXBean, MemoryMXBean, ManagementFactory }
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import scala.collection.immutable.{ SortedSet, Map }
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.runtime.{ RichLong, RichDouble, RichInt }
import scala.util.{ Try, Success, Failure }
import akka.ConfigurationException
import akka.actor.Actor
import akka.actor.ActorLogging
@ -26,6 +24,7 @@ import akka.actor.DynamicAccess
import akka.actor.ExtendedActorSystem
import akka.cluster.MemberStatus.Up
import akka.event.Logging
import java.lang.management.MemoryUsage
/**
* INTERNAL API.
@ -37,8 +36,8 @@ import akka.event.Logging
*
* Metrics sampling is delegated to the [[akka.cluster.MetricsCollector]].
*
* Calculation of statistical data for each monitored process is delegated to the
* [[akka.cluster.DataStream]] for exponential smoothing, with additional decay factor.
* Smoothing of the data for each monitored process is delegated to the
* [[akka.cluster.EWMA]] for exponential weighted moving average.
*/
private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Actor with ActorLogging {
@ -58,7 +57,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
/**
* The latest metric values with their statistical data.
*/
var latestGossip: MetricsGossip = MetricsGossip()
var latestGossip: MetricsGossip = MetricsGossip.empty
/**
* The metrics collector that samples data on the node.
@ -160,6 +159,13 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
}
/**
* INTERNAL API
*/
private[cluster] object MetricsGossip {
val empty = MetricsGossip()
}
/**
* INTERNAL API
*
@ -195,11 +201,11 @@ private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics] = Set.empty) {
* Adds new local [[akka.cluster.NodeMetrics]] and initializes the data, or merges an existing.
*/
def :+(data: NodeMetrics): MetricsGossip = {
val previous = metricsFor(data)
val previous = metricsFor(data.address)
val names = previous map (_.name)
val (toMerge: Set[Metric], unseen: Set[Metric]) = data.metrics partition (a names contains a.name)
val merged = toMerge flatMap (latest previous.collect { case peer if latest same peer peer :+ latest })
val merged = toMerge flatMap (latest previous.collect { case peer if latest sameAs peer peer :+ latest })
val refreshed = nodes filterNot (_.address == data.address)
copy(nodes = refreshed + data.copy(metrics = unseen ++ merged))
@ -213,7 +219,9 @@ private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics] = Set.empty) {
/**
* Returns metrics for a node if exists.
*/
def metricsFor(node: NodeMetrics): Set[Metric] = nodes flatMap (n if (n same node) n.metrics else Set.empty[Metric])
def metricsFor(address: Address): Set[Metric] = nodes collectFirst {
case n if (n.address == address) n.metrics
} getOrElse Set.empty[Metric]
}
@ -223,155 +231,139 @@ private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics] = Set.empty) {
*/
private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip) extends ClusterMessage
object EWMA {
/**
* Calculate the alpha (decay factor) used in [[akka.cluster.EWMA]]
* from specified half-life and interval between observations.
* It takes about 4 half-life to drop below 10% contribution, and 7 to drop
* below 1%.
*/
def alpha(halfLife: FiniteDuration, collectInterval: FiniteDuration): Double = {
val halfLifeMillis = halfLife.toMillis
require(halfLife.toMillis > 0, "halfLife must be > 0 s")
val decayRate = 0.69315 / halfLifeMillis
1 - math.exp(-decayRate * collectInterval.toMillis)
}
}
/**
* The exponentially weighted moving average (EWMA) approach captures short-term
* movements in volatility for a conditional volatility forecasting model. By virtue
* of its alpha, or decay factor, this provides a statistical streaming data model
* that is exponentially biased towards newer entries.
*
* http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
*
* An EWMA only needs the most recent forecast value to be kept, as opposed to a standard
* moving average model.
*
* INTERNAL API
*
* @param decay sets how quickly the exponential weighting decays for past data compared to new data
* Corresponds to 'N time periods' as explained in
* http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
* @param alpha decay factor, sets how quickly the exponential weighting decays for past data compared to new data,
* see http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
*
* @param ewma the current exponentially weighted moving average, e.g. Y(n - 1), or,
* @param value the current exponentially weighted moving average, e.g. Y(n - 1), or,
* the sampled value resulting from the previous smoothing iteration.
* This value is always used as the previous EWMA to calculate the new EWMA.
*
* @param timestamp the most recent time of sampling
*
* @param startTime the time of initial sampling for this data stream
*/
private[cluster] case class DataStream(decay: Int, ewma: Double, startTime: Long, timestamp: Long)
extends ClusterMessage {
private[cluster] case class EWMA(value: Double, alpha: Double) extends ClusterMessage {
require(decay >= 1, "Rate of decay must be >= 1")
/**
* The rate at which the weights of past observations
* decay as they become more distant.
*/
private val α = 2.0 / (decay + 1)
require(0.0 <= alpha && alpha <= 1.0, "alpha must be between 0.0 and 1.0")
/**
* Calculates the exponentially weighted moving average for a given monitored data set.
*
* @param xn the new data point
*
* @return a [[akka.cluster.DataStream]] with the updated yn and timestamp
* @return a new [[akka.cluster.EWMA]] with the updated value
*/
def :+(xn: Double): DataStream = copy(ewma = (α * xn) + (1 - α) * ewma, timestamp = newTimestamp)
/**
* The duration of observation for this data stream
*/
def duration: FiniteDuration = (timestamp - startTime).millis
def :+(xn: Double): EWMA = copy(value = (alpha * xn) + (1 - alpha) * value)
}
/**
* INTERNAL API
*
* @param name the metric name
*
* @param value the metric value, which may or may not be defined, it must be a valid numerical value,
* see [[akka.cluster.MetricNumericConverter.defined()]]
*
* @param average the data stream of the metric value, for trending over time. Metrics that are already
* averages (e.g. system load average) or finite (e.g. as total cores), are not trended.
* averages (e.g. system load average) or finite (e.g. as number of processors), are not trended.
*/
private[cluster] case class Metric private (name: String, value: Option[Number], average: Option[DataStream],
dummy: Boolean) // dummy because of overloading clash with apply
case class Metric private (name: String, value: Number, private val average: Option[EWMA])
extends ClusterMessage with MetricNumericConverter {
require(value.isEmpty || defined(value.get), "Invalid Metric [%s] value [%]".format(name, value))
require(defined(value), "Invalid Metric [%s] value [%]".format(name, value))
/**
* If defined ( [[akka.cluster.MetricNumericConverter.defined()]] ), updates the new
* data point, and if defined, updates the data stream. Returns the updated metric.
*/
def :+(latest: Metric): Metric = latest.value match {
case Some(v) if this same latest average match {
case Some(previous) copy(value = latest.value, average = Some(previous :+ v.doubleValue))
def :+(latest: Metric): Metric = if (this sameAs latest) average match {
case Some(avg) copy(value = latest.value, average = Some(avg :+ latest.value.doubleValue))
case None if latest.average.isDefined copy(value = latest.value, average = latest.average)
case None if latest.average.isEmpty copy(value = latest.value)
case _ copy(value = latest.value)
}
case None this
}
def isDefined: Boolean = value.isDefined
else this
/**
* The numerical value of the average, if defined, otherwise the latest value,
* if defined.
* The numerical value of the average, if defined, otherwise the latest value
*/
def averageValue: Option[Double] =
average map (_.ewma) orElse value.map(_.doubleValue)
def smoothValue: Double = average match {
case Some(avg) avg.value
case None value.doubleValue
}
/**
* @return true if this value is smoothed
*/
def isSmooth: Boolean = average.isDefined
/**
* Returns true if <code>that</code> is tracking the same metric as this.
*/
def same(that: Metric): Boolean = name == that.name
def sameAs(that: Metric): Boolean = name == that.name
}
/**
* INTERNAL API
*
* Companion object of Metric class.
* Factory for creating valid Metric instances.
*/
private[cluster] object Metric extends MetricNumericConverter {
import NodeMetrics.MetricValues._
private def definedValue(value: Option[Number]): Option[Number] =
if (value.isDefined && defined(value.get)) value else None
object Metric extends MetricNumericConverter {
/**
* Evaluates validity of <code>value</code> based on whether it is available (SIGAR on classpath)
* or defined for the OS (JMX). If undefined we set the value option to None and do not modify
* the latest sampled metric to avoid skewing the statistical trend.
*
* @param decay rate of decay for values applicable for trending
* Creates a new Metric instance if the value is valid, otherwise None
* is returned. Invalid numeric values are negative Int/Long and NaN/Infinite
* Double values.
*/
def apply(name: String, value: Option[Number], decay: Option[Int]): Metric = {
val dv = definedValue(value)
val average =
if (decay.isDefined && dv.isDefined) {
val now = newTimestamp
Some(DataStream(decay.get, dv.get.doubleValue, now, now))
} else
None
new Metric(name, dv, average, true)
def create(name: String, value: Number, decayFactor: Option[Double]): Option[Metric] =
if (defined(value)) Some(new Metric(name, value, ceateEWMA(value.doubleValue, decayFactor)))
else None
/**
* Creates a new Metric instance if the Try is successful and the value is valid,
* otherwise None is returned. Invalid numeric values are negative Int/Long and
* NaN/Infinite Double values.
*/
def create(name: String, value: Try[Number], decayFactor: Option[Double]): Option[Metric] = value match {
case Success(v) create(name, v, decayFactor)
case Failure(_) None
}
private def ceateEWMA(value: Double, decayFactor: Option[Double]): Option[EWMA] = decayFactor match {
case Some(alpha) Some(EWMA(value, alpha))
case None None
}
def apply(name: String): Metric = new Metric(name, None, None, true)
}
/**
* INTERNAL API
*
* The snapshot of current sampled health metrics for any monitored process.
* Collected and gossipped at regular intervals for dynamic cluster management strategies.
*
* For the JVM memory. The amount of used and committed memory will always be <= max if max is defined.
* A memory allocation may fail if it attempts to increase the used memory such that used > committed
* even if used <= max is true (e.g. when the system virtual memory is low).
*
* The system is possibly nearing a bottleneck if the system load average is nearing in cpus/cores.
*
* @param address [[akka.actor.Address]] of the node the metrics are gathered at
*
* @param timestamp the time of sampling
*
* @param metrics the array of sampled [[akka.actor.Metric]]
* @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
* @param metrics the set of sampled [[akka.actor.Metric]]
*/
private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) extends ClusterMessage {
import NodeMetrics._
import NodeMetrics.MetricValues._
case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) extends ClusterMessage {
/**
* Returns the most recent data.
@ -381,101 +373,147 @@ private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metri
/**
* Returns true if <code>that</code> address is the same as this and its metric set is more recent.
*/
def updatable(that: NodeMetrics): Boolean = (this same that) && (that.timestamp > timestamp)
def updatable(that: NodeMetrics): Boolean = (this sameAs that) && (that.timestamp > timestamp)
/**
* Returns true if <code>that</code> address is the same as this
*/
def same(that: NodeMetrics): Boolean = address == that.address
def sameAs(that: NodeMetrics): Boolean = address == that.address
def metric(key: String): Option[Metric] = metrics.collectFirst { case m if m.name == key m }
/**
* Of all the data streams, this fluctuates the most.
* Java API
*/
def heapMemory: HeapMemory = HeapMemory(metric(HeapMemoryUsed), metric(HeapMemoryCommitted), metric(HeapMemoryMax))
def networkLatency: NetworkLatency = NetworkLatency(metric(NetworkInboundRate), metric(NetworkOutboundRate))
def cpu: Cpu = Cpu(metric(SystemLoadAverage), metric(Processors), metric(CpuCombined), metric(TotalCores))
def metric(key: String): Metric = metrics.collectFirst { case m if m.name == key m } getOrElse Metric(key)
def getMetrics: java.lang.Iterable[Metric] = {
import scala.collection.JavaConverters._
metrics.asJava
}
}
/**
* INTERNAL API
* Definitions of the built-in standard metrics.
*
* Companion object of Metric class - used by metrics consumers such as the load balancing routers.
*
* The following extractors and orderings hide the implementation from cluster metric consumers
* such as load balancers.
* The following extractors and data structures makes it easy to consume the
* [[akka.cluster.NodeMetrics]] in for example load balancers.
*/
private[cluster] object NodeMetrics {
sealed trait MetricValues
object MetricValues {
object StandardMetrics {
object HeapMemory {
/**
* Constants for the heap related Metric names
*/
object Fields {
final val HeapMemoryUsed = "heap-memory-used"
final val HeapMemoryCommitted = "heap-memory-committed"
final val HeapMemoryMax = "heap-memory-max"
final val NetworkInboundRate = "network-max-inbound"
final val NetworkOutboundRate = "network-max-outbound"
final val SystemLoadAverage = "system-load-average"
final val Processors = "processors"
final val CpuCombined = "cpu-combined"
final val TotalCores = "total-cores"
}
import Fields._
def unapply(v: HeapMemory): Tuple3[Long, Long, Option[Long]] =
(v.used.averageValue.get.longValue,
v.committed.averageValue.get.longValue,
v.max.averageValue map (_.longValue) orElse None)
def unapply(v: NetworkLatency): Option[(Long, Long)] =
(v.inbound.averageValue, v.outbound.averageValue) match {
case (Some(a), Some(b)) Some((a.longValue, b.longValue))
case _ None
/**
* Given a NodeMetrics it returns the HeapMemory data if the nodeMetrics contains
* necessary heap metrics.
* @return if possible a tuple matching the HeapMemory constructor parameters
*/
def unapply(nodeMetrics: NodeMetrics): Option[(Address, Long, Long, Long, Option[Long])] = {
for {
used nodeMetrics.metric(HeapMemoryUsed)
committed nodeMetrics.metric(HeapMemoryCommitted)
maxOption = nodeMetrics.metric(HeapMemoryMax).map(_.smoothValue.longValue)
} yield (nodeMetrics.address, nodeMetrics.timestamp,
used.smoothValue.longValue, committed.smoothValue.longValue, maxOption)
}
def unapply(v: Cpu): Tuple4[Option[Double], Int, Option[Double], Option[Int]] =
(v.systemLoadAverage.averageValue map (_.doubleValue),
v.processors.averageValue.get.intValue,
v.combinedCpu.averageValue map (_.doubleValue),
v.cores.averageValue map (_.intValue))
}
/**
* @param used the current sum of heap memory used from all heap memory pools (in bytes)
* Java API to extract HeapMemory data from nodeMetrics, if the nodeMetrics
* contains necessary heap metrics, otherwise it returns null.
*/
def extractHeapMemory(nodeMetrics: NodeMetrics): HeapMemory = nodeMetrics match {
case HeapMemory(address, timestamp, used, committed, max)
HeapMemory(address, timestamp, used, committed, max)
case _ null
}
/**
* The amount of used and committed memory will always be <= max if max is defined.
* A memory allocation may fail if it attempts to increase the used memory such that used > committed
* even if used <= max is true (e.g. when the system virtual memory is low).
*
* @param address [[akka.actor.Address]] of the node the metrics are gathered at
* @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
* @param used the current sum of heap memory used from all heap memory pools (in bytes)
* @param committed the current sum of heap memory guaranteed to be available to the JVM
* from all heap memory pools (in bytes). Committed will always be greater than or equal to used.
*
* @param max the maximum amount of memory (in bytes) that can be used for JVM memory management.
* Can be undefined on some OS.
*/
case class HeapMemory(used: Metric, committed: Metric, max: Metric) extends MetricValues {
require(used.isDefined, "used must be defined")
require(committed.isDefined, "committed must be defined")
case class HeapMemory(address: Address, timestamp: Long, used: Long, committed: Long, max: Option[Long]) {
require(committed > 0L, "committed heap expected to be > 0 bytes")
require(max.isEmpty || max.get > 0L, "max heap expected to be > 0 bytes")
}
object Cpu {
/**
* Constants for the cpu related Metric names
*/
object Fields {
final val SystemLoadAverage = "system-load-average"
final val Processors = "processors"
final val CpuCombined = "cpu-combined"
}
import Fields._
/**
* Given a NodeMetrics it returns the Cpu data if the nodeMetrics contains
* necessary cpu metrics.
* @return if possible a tuple matching the Cpu constructor parameters
*/
def unapply(nodeMetrics: NodeMetrics): Option[(Address, Long, Option[Double], Option[Double], Int)] = {
for {
processors nodeMetrics.metric(Processors)
systemLoadAverageOption = nodeMetrics.metric(SystemLoadAverage).map(_.smoothValue)
cpuCombinedOption = nodeMetrics.metric(CpuCombined).map(_.smoothValue)
} yield (nodeMetrics.address, nodeMetrics.timestamp,
systemLoadAverageOption, cpuCombinedOption, processors.value.intValue)
}
}
/**
* @param inbound the inbound network IO rate, in bytes
*
* @param outbound the outbound network IO rate, in bytes
* Java API to extract Cpu data from nodeMetrics, if the nodeMetrics
* contains necessary cpu metrics, otherwise it returns null.
*/
case class NetworkLatency(inbound: Metric, outbound: Metric) extends MetricValues
def extractCpu(nodeMetrics: NodeMetrics): Cpu = nodeMetrics match {
case Cpu(address, timestamp, systemLoadAverage, cpuCombined, processors)
Cpu(address, timestamp, systemLoadAverage, cpuCombined, processors)
case _ null
}
/**
* @param systemLoadAverage OS-specific average load on the CPUs in the system, for the past 1 minute
*
* @param address [[akka.actor.Address]] of the node the metrics are gathered at
* @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
* @param systemLoadAverage OS-specific average load on the CPUs in the system, for the past 1 minute,
* The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores.
* @param cpuCombined combined CPU sum of User + Sys + Nice + Wait, in percentage ([0.0 - 1.0]. This
* metric can describe the amount of time the CPU spent executing code during n-interval and how
* much more it could theoretically.
* @param processors the number of available processors
*
* @param combinedCpu combined CPU sum of User + Sys + Nice + Wait, in percentage. This metric can describe
* the amount of time the CPU spent executing code during n-interval and how much more it could theoretically.
*
* @param cores the number of cores (multi-core: per processor)
*/
private[cluster] case class Cpu(systemLoadAverage: Metric, processors: Metric, combinedCpu: Metric, cores: Metric) extends MetricValues {
require(processors.isDefined, "processors must be defined")
case class Cpu(
address: Address,
timestamp: Long,
systemLoadAverage: Option[Double],
cpuCombined: Option[Double],
processors: Int) {
cpuCombined match {
case Some(x) require(0.0 <= x && x <= 1.0, "cpuCombined must be between [0.0 - 1.0], was [%s]" format x)
case None
}
}
}
@ -527,22 +565,22 @@ private[cluster] trait MetricsCollector extends Closeable {
}
/**
* INTERNAL API
*
* Loads JVM metrics through JMX monitoring beans.
* Loads JVM and system metrics through JMX monitoring beans.
*
* @param address The [[akka.actor.Address]] of the node being sampled
* @param decay how quickly the exponential weighting of past data is decayed
*/
private[cluster] class JmxMetricsCollector(address: Address, decay: Int) extends MetricsCollector {
import NodeMetrics.MetricValues._
class JmxMetricsCollector(address: Address, decayFactor: Double) extends MetricsCollector {
import StandardMetrics.HeapMemory.Fields._
import StandardMetrics.Cpu.Fields._
private def this(cluster: Cluster) =
this(cluster.selfAddress, cluster.settings.MetricsRateOfDecay)
this(cluster.selfAddress,
EWMA.alpha(cluster.settings.MetricsDecayHalfLifeDuration, cluster.settings.MetricsInterval))
def this(system: ActorSystem) = this(Cluster(system))
private val decayOption = Some(decay)
private val decayFactorOption = Some(decayFactor)
private val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean
@ -551,63 +589,68 @@ private[cluster] class JmxMetricsCollector(address: Address, decay: Int) extends
/**
* Samples and collects new data points.
*/
def sample: NodeMetrics = NodeMetrics(address, newTimestamp, Set(
systemLoadAverage, heapUsed, heapCommitted, heapMax, processors))
def sample: NodeMetrics = NodeMetrics(address, newTimestamp, metrics)
def metrics: Set[Metric] = {
val heap = heapMemoryUsage
Set(systemLoadAverage, heapUsed(heap), heapCommitted(heap), heapMax(heap), processors).flatten
}
/**
* JMX Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
* On some systems the JMX OS system load average may not be available, in which case a -1 is returned,
* which means that the returned Metric is undefined.
* On some systems the JMX OS system load average may not be available, in which case a -1 is
* returned from JMX, and None is returned from this method.
*/
def systemLoadAverage: Metric = Metric(
def systemLoadAverage: Option[Metric] = Metric.create(
name = SystemLoadAverage,
value = Some(osMBean.getSystemLoadAverage),
decay = None)
value = osMBean.getSystemLoadAverage,
decayFactor = None)
/**
* (JMX) Returns the number of available processors
*/
def processors: Metric = Metric(
def processors: Option[Metric] = Metric.create(
name = Processors,
value = Some(osMBean.getAvailableProcessors),
decay = None)
value = osMBean.getAvailableProcessors,
decayFactor = None)
// FIXME those three heap calls should be done at once
/**
* Current heap to be passed in to heapUsed, heapCommitted and heapMax
*/
def heapMemoryUsage: MemoryUsage = memoryMBean.getHeapMemoryUsage
/**
* (JMX) Returns the current sum of heap memory used from all heap memory pools (in bytes).
*/
def heapUsed: Metric = Metric(
def heapUsed(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryUsed,
value = Some(memoryMBean.getHeapMemoryUsage.getUsed),
decay = decayOption)
value = heap.getUsed,
decayFactor = decayFactorOption)
/**
* (JMX) Returns the current sum of heap memory guaranteed to be available to the JVM
* from all heap memory pools (in bytes).
*/
def heapCommitted: Metric = Metric(
def heapCommitted(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryCommitted,
value = Some(memoryMBean.getHeapMemoryUsage.getCommitted),
decay = decayOption)
value = heap.getCommitted,
decayFactor = decayFactorOption)
/**
* (JMX) Returns the maximum amount of memory (in bytes) that can be used
* for JVM memory management. If not defined the metrics value is None, i.e.
* never negative.
*/
def heapMax: Metric = Metric(
def heapMax(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryMax,
value = Some(memoryMBean.getHeapMemoryUsage.getMax),
decay = None)
value = heap.getMax,
decayFactor = None)
def close(): Unit = ()
override def close(): Unit = ()
}
/**
* INTERNAL API
*
* Loads metrics through Hyperic SIGAR and JMX monitoring beans. This
* loads wider and more accurate range of metrics compared to JmxMetricsCollector
* by using SIGAR's native OS library.
@ -621,32 +664,25 @@ private[cluster] class JmxMetricsCollector(address: Address, decay: Int) extends
* @param decay how quickly the exponential weighting of past data is decayed
* @param sigar the org.hyperic.Sigar instance
*/
private[cluster] class SigarMetricsCollector(address: Address, decay: Int, sigar: AnyRef)
extends JmxMetricsCollector(address, decay) {
import NodeMetrics.MetricValues._
class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: AnyRef)
extends JmxMetricsCollector(address, decayFactor) {
def this(address: Address, decay: Int, dynamicAccess: DynamicAccess) =
this(address, decay, dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Seq.empty).get)
import StandardMetrics.HeapMemory.Fields._
import StandardMetrics.Cpu.Fields._
private def this(cluster: Cluster) =
this(cluster.selfAddress, cluster.settings.MetricsRateOfDecay, cluster.system.dynamicAccess)
this(cluster.selfAddress,
EWMA.alpha(cluster.settings.MetricsDecayHalfLifeDuration, cluster.settings.MetricsInterval),
cluster.system.dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Seq.empty).get)
def this(system: ActorSystem) = this(Cluster(system))
private val decayOption = Some(decay)
private val decayFactorOption = Some(decayFactor)
private val LoadAverage: Option[Method] = createMethodFrom(sigar, "getLoadAverage")
private val CpuList: Option[Method] = createMethodFrom(sigar, "getCpuInfoList").map(m m)
private val NetInterfaces: Option[Method] = createMethodFrom(sigar, "getNetInterfaceList")
private val Cpu: Option[Method] = createMethodFrom(sigar, "getCpuPerc")
private val CombinedCpu: Option[Method] = Try(Cpu.get.getReturnType.getMethod("getCombined")).toOption
private val Pid: Option[Method] = createMethodFrom(sigar, "getPid")
// Do something initially, in constructor, to make sure that the native library can be loaded.
// This will by design throw exception if sigar isn't usable
val pid: Long = createMethodFrom(sigar, "getPid") match {
@ -661,25 +697,20 @@ private[cluster] class SigarMetricsCollector(address: Address, decay: Int, sigar
case None throw new IllegalArgumentException("Wrong version of Sigar, expected 'getPid' method")
}
/**
* Samples and collects new data points.
*/
override def sample: NodeMetrics = NodeMetrics(address, newTimestamp, Set(cpuCombined, totalCores,
systemLoadAverage, heapUsed, heapCommitted, heapMax, processors, networkMaxRx, networkMaxTx))
override def metrics: Set[Metric] = {
super.metrics.filterNot(_.name == SystemLoadAverage) ++ Set(systemLoadAverage, cpuCombined).flatten
}
/**
* (SIGAR / JMX) Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
* On some systems the JMX OS system load average may not be available, in which case a -1 is returned,
* which means that the returned Metric is undefined.
* On some systems the JMX OS system load average may not be available, in which case a -1 is returned
* from JMX, which means that None is returned from this method.
* Hyperic SIGAR provides more precise values, thus, if the library is on the classpath, it is the default.
*/
override def systemLoadAverage: Metric = {
val m = Metric(
override def systemLoadAverage: Option[Metric] = Metric.create(
name = SystemLoadAverage,
value = Try(LoadAverage.get.invoke(sigar).asInstanceOf[Array[AnyRef]].head.asInstanceOf[Number]).toOption,
decay = None)
if (m.isDefined) m else super.systemLoadAverage
}
value = Try(LoadAverage.get.invoke(sigar).asInstanceOf[Array[AnyRef]](0).asInstanceOf[Number]),
decayFactor = None) orElse super.systemLoadAverage
/**
* (SIGAR) Returns the combined CPU sum of User + Sys + Nice + Wait, in percentage. This metric can describe
@ -689,59 +720,16 @@ private[cluster] class SigarMetricsCollector(address: Address, decay: Int, sigar
* In the data stream, this will sometimes return with a valid metric value, and sometimes as a NaN or Infinite.
* Documented bug https://bugzilla.redhat.com/show_bug.cgi?id=749121 and several others.
*/
def cpuCombined: Metric = Metric(
def cpuCombined: Option[Metric] = Metric.create(
name = CpuCombined,
value = Try(CombinedCpu.get.invoke(Cpu.get.invoke(sigar)).asInstanceOf[Number]).toOption,
decay = decayOption)
/**
* FIXME: Array[Int].head - expose all if cores per processor might differ.
* (SIGAR) Returns the total number of cores.
*
* FIXME do we need this information, isn't it enough with jmx processors?
*/
def totalCores: Metric = Metric(
name = TotalCores,
value = Try(CpuList.get.invoke(sigar).asInstanceOf[Array[AnyRef]].map(cpu
createMethodFrom(cpu, "getTotalCores").get.invoke(cpu).asInstanceOf[Number]).head).toOption,
decay = None)
// FIXME those two network calls should be combined into one
/**
* (SIGAR) Returns the max network IO read/write value, in bytes, for network latency evaluation.
*/
def networkMaxRx: Metric = networkFor("getRxBytes", NetworkInboundRate)
/**
* (SIGAR) Returns the max network IO outbound value, in bytes.
*/
def networkMaxTx: Metric = networkFor("getTxBytes", NetworkOutboundRate)
value = Try(CombinedCpu.get.invoke(Cpu.get.invoke(sigar)).asInstanceOf[Number]),
decayFactor = decayFactorOption)
/**
* Releases any native resources associated with this instance.
*/
override def close(): Unit = Try(createMethodFrom(sigar, "close").get.invoke(sigar))
// FIXME network metrics needs thought and refactoring
/**
* Returns the max bytes for the given <code>method</code> in metric for <code>metric</code> from the network interface stats.
*/
private def networkFor(method: String, metric: String): Metric = Metric(
name = metric,
value = Try(networkStats.collect {
case (_, a)
createMethodFrom(a, method).get.invoke(a).asInstanceOf[Long]
}.max.asInstanceOf[Number]).toOption.orElse(None),
decay = decayOption)
/**
* Returns the network stats per interface.
*/
private def networkStats: Map[String, AnyRef] = Try(NetInterfaces.get.invoke(sigar).asInstanceOf[Array[String]].map(arg
arg -> (createMethodFrom(sigar, "getNetInterfaceStat", Array(classOf[String])).get.invoke(sigar, arg))).toMap) getOrElse Map.empty[String, AnyRef]
private def createMethodFrom(ref: AnyRef, method: String, types: Array[(Class[_])] = Array.empty[(Class[_])]): Option[Method] =
Try(ref.getClass.getMethod(method, types: _*)).toOption
@ -749,7 +737,9 @@ private[cluster] class SigarMetricsCollector(address: Address, decay: Int, sigar
/**
* INTERNAL API
* Companion object of MetricsCollector class.
* Factory to create configured MetricsCollector.
* If instantiation of SigarMetricsCollector fails (missing class or native library)
* it falls back to use JmxMetricsCollector.
*/
private[cluster] object MetricsCollector {
def apply(system: ExtendedActorSystem, settings: ClusterSettings): MetricsCollector = {

View file

@ -77,9 +77,9 @@ class ClusterSettings(val config: Config, val systemName: String) {
require(d > Duration.Zero, "metrics.collect-interval must be > 0"); d
}
final val MetricsGossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.gossip-interval"), MILLISECONDS)
final val MetricsRateOfDecay: Int = {
val n = getInt("akka.cluster.metrics.rate-of-decay")
require(n >= 1, "metrics.rate-of-decay must be >= 1"); n
final val MetricsDecayHalfLifeDuration: FiniteDuration = {
val d = Duration(getMilliseconds("akka.cluster.metrics.decay-half-life-duration"), MILLISECONDS)
require(d > Duration.Zero, "metrics.decay-half-life-duration must be > 0"); d
}
}

View file

@ -0,0 +1,414 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import java.util.Arrays
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.dispatch.Dispatchers
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.ClusterMetricsChanged
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.NodeMetrics
import akka.cluster.StandardMetrics.Cpu
import akka.cluster.StandardMetrics.HeapMemory
import akka.event.Logging
import akka.routing.Broadcast
import akka.routing.Destination
import akka.routing.FromConfig
import akka.routing.NoRouter
import akka.routing.Resizer
import akka.routing.Route
import akka.routing.RouteeProvider
import akka.routing.RouterConfig
object AdaptiveLoadBalancingRouter {
private val defaultSupervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ SupervisorStrategy.Escalate
}
}
/**
* A Router that performs load balancing to cluster nodes based on
* cluster metric data.
*
* It uses random selection of routees based probabilities derived from
* the remaining capacity of corresponding node.
*
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical
* sense as this means that the router should both create new actors and use the 'routees'
* actor(s). In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* @param metricsSelector decides what probability to use for selecting a routee, based
* on remaining capacity as indicated by the node metrics
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
@SerialVersionUID(1L)
case class AdaptiveLoadBalancingRouter(
metricsSelector: MetricsSelector = MixMetricsSelector(),
nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = AdaptiveLoadBalancingRouter.defaultSupervisorStrategy)
extends RouterConfig with AdaptiveLoadBalancingRouterLike {
/**
* Constructor that sets nrOfInstances to be created.
* Java API
* @param selector the selector is responsible for producing weighted mix of routees from the node metrics
* @param nr number of routees to create
*/
def this(selector: MetricsSelector, nr: Int) = this(metricsSelector = selector, nrOfInstances = nr)
/**
* Constructor that sets the routees to be used.
* Java API
* @param selector the selector is responsible for producing weighted mix of routees from the node metrics
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(selector: MetricsSelector, routeePaths: java.lang.Iterable[String]) =
this(metricsSelector = selector, routees = routeePaths.asScala)
/**
* Constructor that sets the resizer to be used.
* Java API
* @param selector the selector is responsible for producing weighted mix of routees from the node metrics
*/
def this(selector: MetricsSelector, resizer: Resizer) =
this(metricsSelector = selector, resizer = Some(resizer))
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String): AdaptiveLoadBalancingRouter =
copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy): AdaptiveLoadBalancingRouter =
copy(supervisorStrategy = strategy)
/**
* Uses the resizer of the given RouterConfig if this RouterConfig
* doesn't have one, i.e. the resizer defined in code is used if
* resizer was not defined in config.
*/
override def withFallback(other: RouterConfig): RouterConfig = other match {
case _: FromConfig | _: NoRouter this
case otherRouter: AdaptiveLoadBalancingRouter
val useResizer =
if (this.resizer.isEmpty && otherRouter.resizer.isDefined) otherRouter.resizer
else this.resizer
copy(resizer = useResizer)
case _ throw new IllegalArgumentException("Expected AdaptiveLoadBalancingRouter, got [%s]".format(other))
}
}
/**
* INTERNAL API.
*
* This strategy is a metrics-aware router which performs load balancing of
* cluster nodes based on cluster metric data. It consumes [[akka.cluster.ClusterMetricsChanged]]
* events and the [[akka.cluster.routing.MetricsSelector]] creates an mix of
* weighted routees based on the node metrics. Messages are routed randomly to the
* weighted routees, i.e. nodes with lower load are more likely to be used than nodes with
* higher load
*/
trait AdaptiveLoadBalancingRouterLike { this: RouterConfig
def metricsSelector: MetricsSelector
def nrOfInstances: Int
def routees: Iterable[String]
def routerDispatcher: String
override def createRoute(routeeProvider: RouteeProvider): Route = {
if (resizer.isEmpty) {
if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances)
else routeeProvider.registerRouteesFor(routees)
}
val log = Logging(routeeProvider.context.system, routeeProvider.context.self)
// The current weighted routees, if any. Weights are produced by the metricsSelector
// via the metricsListener Actor. It's only updated by the actor, but accessed from
// the threads of the senders.
@volatile var weightedRoutees: Option[WeightedRoutees] = None
// subscribe to ClusterMetricsChanged and update weightedRoutees
val metricsListener = routeeProvider.context.actorOf(Props(new Actor {
val cluster = Cluster(context.system)
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterMetricsChanged])
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case ClusterMetricsChanged(metrics) receiveMetrics(metrics)
case _: CurrentClusterState // ignore
}
def receiveMetrics(metrics: Set[NodeMetrics]): Unit = {
// update the state outside of the actor, not a recommended practice, but works fine here
weightedRoutees = Some(new WeightedRoutees(routeeProvider.routees, cluster.selfAddress,
metricsSelector.weights(metrics)))
}
}).withDispatcher(routerDispatcher), name = "metricsListener")
def getNext(): ActorRef = weightedRoutees match {
case Some(weighted)
if (weighted.total == 0) routeeProvider.context.system.deadLetters
else weighted(ThreadLocalRandom.current.nextInt(weighted.total) + 1)
case None
val currentRoutees = routeeProvider.routees
if (currentRoutees.isEmpty) routeeProvider.context.system.deadLetters
else currentRoutees(ThreadLocalRandom.current.nextInt(currentRoutees.size))
}
{
case (sender, message)
message match {
case Broadcast(msg) toAll(sender, routeeProvider.routees)
case msg List(Destination(sender, getNext()))
}
}
}
}
/**
* MetricsSelector that uses the heap metrics.
* Low heap capacity => small weight.
*/
@SerialVersionUID(1L)
case object HeapMetricsSelector extends CapacityMetricsSelector {
/**
* Java API: get the singleton instance
*/
def getInstance = this
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = {
nodeMetrics.collect {
case HeapMemory(address, _, used, committed, max)
val capacity = max match {
case None (committed - used).toDouble / committed
case Some(m) (m - used).toDouble / m
}
(address, capacity)
}.toMap
}
}
/**
* MetricsSelector that uses the combined CPU metrics.
* Combined CPU is sum of User + Sys + Nice + Wait, in percentage.
* Low cpu capacity => small weight.
*/
@SerialVersionUID(1L)
case object CpuMetricsSelector extends CapacityMetricsSelector {
/**
* Java API: get the singleton instance
*/
def getInstance = this
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = {
nodeMetrics.collect {
case Cpu(address, _, _, Some(cpuCombined), _)
val capacity = 1.0 - cpuCombined
(address, capacity)
}.toMap
}
}
/**
* MetricsSelector that uses the system load average metrics.
* System load average is OS-specific average load on the CPUs in the system,
* for the past 1 minute. The system is possibly nearing a bottleneck if the
* system load average is nearing number of cpus/cores.
* Low load average capacity => small weight.
*/
@SerialVersionUID(1L)
case object SystemLoadAverageMetricsSelector extends CapacityMetricsSelector {
/**
* Java API: get the singleton instance
*/
def getInstance = this
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = {
nodeMetrics.collect {
case Cpu(address, _, Some(systemLoadAverage), _, processors)
val capacity = 1.0 - math.min(1.0, systemLoadAverage / processors)
(address, capacity)
}.toMap
}
}
/**
* MetricsSelector that combines other selectors and aggregates their capacity
* values. By default it uses [akka.cluster.routing.HeapMetricsSelector],
* [akka.cluster.routing.CpuMetricsSelector], and [akka.cluster.routing.SystemLoadAverageMetricsSelector]
*/
@SerialVersionUID(1L)
case class MixMetricsSelector(
selectors: IndexedSeq[CapacityMetricsSelector] = Vector(
HeapMetricsSelector, CpuMetricsSelector, SystemLoadAverageMetricsSelector))
extends CapacityMetricsSelector {
/**
* Java API
*/
def this(selectors: java.lang.Iterable[CapacityMetricsSelector]) = this(selectors.asScala.toIndexedSeq)
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = {
val combined: IndexedSeq[(Address, Double)] = selectors.flatMap(_.capacity(nodeMetrics).toSeq)
// aggregated average of the capacities by address
combined.foldLeft(Map.empty[Address, (Double, Int)].withDefaultValue((0.0, 0))) {
case (acc, (address, capacity))
val (sum, count) = acc(address)
acc + (address -> (sum + capacity, count + 1))
}.mapValues {
case (sum, count) sum / count
}.toMap
}
}
case object MixMetricsSelector {
/**
* Java API: get the default singleton instance
*/
def getInstance = MixMetricsSelector()
}
/**
* A MetricsSelector is responsible for producing weights from the node metrics.
*/
@SerialVersionUID(1L)
trait MetricsSelector extends Serializable {
/**
* The weights per address, based on the the nodeMetrics.
*/
def weights(nodeMetrics: Set[NodeMetrics]): Map[Address, Int]
}
/**
* A MetricsSelector producing weights from remaining capacity.
* The weights are typically proportional to the remaining capacity.
*/
abstract class CapacityMetricsSelector extends MetricsSelector {
/**
* Remaining capacity for each node. The value is between
* 0.0 and 1.0, where 0.0 means no remaining capacity (full
* utilization) and 1.0 means full remaining capacity (zero
* utilization).
*/
def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double]
/**
* Converts the capacity values to weights. The node with lowest
* capacity gets weight 1 (lowest usable capacity is 1%) and other
* nodes gets weights proportional to their capacity compared to
* the node with lowest capacity.
*/
def weights(capacity: Map[Address, Double]): Map[Address, Int] = {
if (capacity.isEmpty) Map.empty[Address, Int]
else {
val (_, min) = capacity.minBy { case (_, c) c }
// lowest usable capacity is 1% (>= 0.5% will be rounded to weight 1), also avoids div by zero
val divisor = math.max(0.01, min)
capacity mapValues { c math.round((c) / divisor).toInt }
}
}
/**
* The weights per address, based on the capacity produced by
* the nodeMetrics.
*/
override def weights(nodeMetrics: Set[NodeMetrics]): Map[Address, Int] =
weights(capacity(nodeMetrics))
}
/**
* INTERNAL API
*
* Pick routee based on its weight. Higher weight, higher probability.
*/
private[cluster] class WeightedRoutees(refs: IndexedSeq[ActorRef], selfAddress: Address, weights: Map[Address, Int]) {
// fill an array of same size as the refs with accumulated weights,
// binarySearch is used to pick the right bucket from a requested value
// from 1 to the total sum of the used weights.
private val buckets: Array[Int] = {
def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
case Address(_, _, None, None) selfAddress
case a a
}
val buckets = Array.ofDim[Int](refs.size)
val meanWeight = if (weights.isEmpty) 1 else weights.values.sum / weights.size
val w = weights.withDefaultValue(meanWeight)
var sum = 0
refs.zipWithIndex foreach {
case (ref, i)
sum += w(fullAddress(ref))
buckets(i) = sum
}
buckets
}
def total: Int =
if (buckets.length == 0) 0
else buckets(buckets.length - 1)
/**
* Pick the routee matching a value, from 1 to total.
*/
def apply(value: Int): ActorRef = {
// converts the result of Arrays.binarySearch into a index in the buckets array
// see documentation of Arrays.binarySearch for what it returns
def idx(i: Int): Int = {
if (i >= 0) i // exact match
else {
val j = math.abs(i + 1)
if (j >= buckets.length) throw new IndexOutOfBoundsException(
"Requested index [%s] is > max index [%s]".format(i, buckets.length))
else j
}
}
require(1 <= value && value <= total, "value must be between [1 - %s]" format total)
refs(idx(Arrays.binarySearch(buckets, value)))
}
}

View file

@ -1,271 +0,0 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.ClusterMetricsChanged
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.NodeMetrics
import akka.cluster.NodeMetrics.MetricValues
import akka.dispatch.Dispatchers
import akka.event.Logging
import akka.routing.Broadcast
import akka.routing.Destination
import akka.routing.Resizer
import akka.routing.Route
import akka.routing.RouteeProvider
import akka.routing.RouterConfig
/**
* INTERNAL API
*/
private[cluster] object ClusterLoadBalancingRouter {
val defaultSupervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ SupervisorStrategy.Escalate
}
}
/**
* A Router that performs load balancing to cluster nodes based on
* cluster metric data.
*
* It uses random selection of routees based probabilities derived from
* the remaining capacity of corresponding node.
*
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical
* sense as this means that the router should both create new actors and use the 'routees'
* actor(s). In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* @param metricsSelector decides what probability to use for selecting a routee, based
* on remaining capacity as indicated by the node metrics
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
@SerialVersionUID(1L)
case class ClusterLoadBalancingRouter(
metricsSelector: MetricsSelector,
nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = ClusterLoadBalancingRouter.defaultSupervisorStrategy)
extends RouterConfig with ClusterLoadBalancingRouterLike {
/**
* Constructor that sets nrOfInstances to be created.
* Java API
* @param selector the selector is responsible for producing weighted mix of routees from the node metrics
* @param nr number of routees to create
*/
def this(selector: MetricsSelector, nr: Int) = this(metricsSelector = selector, nrOfInstances = nr)
/**
* Constructor that sets the routees to be used.
* Java API
* @param selector the selector is responsible for producing weighted mix of routees from the node metrics
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(selector: MetricsSelector, routeePaths: java.lang.Iterable[String]) =
this(metricsSelector = selector, routees = routeePaths.asScala)
/**
* Constructor that sets the resizer to be used.
* Java API
* @param selector the selector is responsible for producing weighted mix of routees from the node metrics
*/
def this(selector: MetricsSelector, resizer: Resizer) =
this(metricsSelector = selector, resizer = Some(resizer))
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String): ClusterLoadBalancingRouter =
copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy): ClusterLoadBalancingRouter =
copy(supervisorStrategy = strategy)
}
/**
* INTERNAL API.
*
* This strategy is a metrics-aware router which performs load balancing of
* cluster nodes based on cluster metric data. It consumes [[akka.cluster.ClusterMetricsChanged]]
* events and the [[akka.cluster.routing.MetricsSelector]] creates an mix of
* weighted routees based on the node metrics. Messages are routed randomly to the
* weighted routees, i.e. nodes with lower load are more likely to be used than nodes with
* higher load
*/
trait ClusterLoadBalancingRouterLike { this: RouterConfig
def metricsSelector: MetricsSelector
def nrOfInstances: Int
def routees: Iterable[String]
def routerDispatcher: String
override def createRoute(routeeProvider: RouteeProvider): Route = {
if (resizer.isEmpty) {
if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances)
else routeeProvider.registerRouteesFor(routees)
}
val log = Logging(routeeProvider.context.system, routeeProvider.context.self)
// Function that points to the routees to use, starts with the plain routees
// of the routeeProvider and then changes to the current weighted routees
// produced by the metricsSelector. The reason for using a function is that
// routeeProvider.routees can change.
@volatile var weightedRoutees: () IndexedSeq[ActorRef] = () routeeProvider.routees
// subscribe to ClusterMetricsChanged and update weightedRoutees
val metricsListener = routeeProvider.context.actorOf(Props(new Actor {
val cluster = Cluster(routeeProvider.context.system)
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterMetricsChanged])
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case ClusterMetricsChanged(metrics) receiveMetrics(metrics)
case _: CurrentClusterState // ignore
}
def receiveMetrics(metrics: Set[NodeMetrics]): Unit = {
val routees = metricsSelector.weightedRefs(routeeProvider.routees, cluster.selfAddress, metrics)
weightedRoutees = () routees
}
}).withDispatcher(routerDispatcher), name = "metricsListener")
def getNext(): ActorRef = {
val currentRoutees = weightedRoutees.apply
if (currentRoutees.isEmpty) routeeProvider.context.system.deadLetters
else currentRoutees(ThreadLocalRandom.current.nextInt(currentRoutees.size))
}
{
case (sender, message)
message match {
case Broadcast(msg) toAll(sender, routeeProvider.routees)
case msg List(Destination(sender, getNext()))
}
}
}
}
/**
* MetricsSelector that uses the heap metrics.
* Low heap capacity => lower weight.
*/
@SerialVersionUID(1L)
case object HeapMetricsSelector extends MetricsSelector {
/**
* Java API: get the singleton instance
*/
def getInstance = this
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = {
nodeMetrics.map { n
val (used, committed, max) = MetricValues.unapply(n.heapMemory)
val capacity = max match {
case None (committed - used).toDouble / committed
case Some(m) (m - used).toDouble / m
}
(n.address, capacity)
}.toMap
}
}
// FIXME implement more MetricsSelectors, such as CpuMetricsSelector,
// LoadAverageMetricsSelector, NetworkMetricsSelector.
// Also a CompositeMetricsSelector which uses a mix of other
// selectors.
/**
* A MetricsSelector is responsible for producing weighted mix of routees
* from the node metrics. The weights are typically proportional to the
* remaining capacity.
*/
abstract class MetricsSelector {
/**
* Remaining capacity for each node. The value is between
* 0.0 and 1.0, where 0.0 means no remaining capacity (full
* utilization) and 1.0 means full remaining capacity (zero
* utilization).
*/
def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double]
/**
* Converts the capacity values to weights. The node with lowest
* capacity gets weight 1 (lowest usable capacity is 1%) and other
* nodes gets weights proportional to their capacity compared to
* the node with lowest capacity.
*/
def weights(capacity: Map[Address, Double]): Map[Address, Int] = {
if (capacity.isEmpty) Map.empty[Address, Int]
else {
val (_, min) = capacity.minBy { case (_, c) c }
// lowest usable capacity is 1% (>= 0.5% will be rounded to weight 1), also avoids div by zero
val divisor = math.max(0.01, min)
capacity mapValues { c math.round((c) / divisor).toInt }
}
}
/**
* Allocates a list of actor refs according to the weight of their node, i.e.
* weight 3 of node A will allocate 3 slots for each ref with address A.
*/
def weightedRefs(refs: IndexedSeq[ActorRef], selfAddress: Address, weights: Map[Address, Int]): IndexedSeq[ActorRef] = {
def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
case Address(_, _, None, None) selfAddress
case a a
}
val w = weights.withDefaultValue(1)
refs.foldLeft(IndexedSeq.empty[ActorRef]) { (acc, ref)
acc ++ IndexedSeq.fill(w(fullAddress(ref)))(ref)
}
}
/**
* Combines the different pieces to allocate a list of weighted actor refs
* based on the node metrics.
*/
def weightedRefs(refs: IndexedSeq[ActorRef], selfAddress: Address, nodeMetrics: Set[NodeMetrics]): IndexedSeq[ActorRef] =
weightedRefs(refs, selfAddress, weights(capacity(nodeMetrics)))
}

View file

@ -28,7 +28,7 @@ class ClusterMetricsMultiJvmNode3 extends ClusterMetricsSpec
class ClusterMetricsMultiJvmNode4 extends ClusterMetricsSpec
class ClusterMetricsMultiJvmNode5 extends ClusterMetricsSpec
abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSpec) with MultiNodeClusterSpec with MetricSpec {
abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSpec) with MultiNodeClusterSpec {
import ClusterMetricsMultiJvmSpec._
def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]

View file

@ -5,21 +5,25 @@
package akka.cluster.routing
import language.postfixOps
import scala.concurrent.duration._
import java.lang.management.ManagementFactory
import akka.remote.testkit.{ MultiNodeSpec, MultiNodeConfig }
import akka.testkit.{ LongRunningTest, DefaultTimeout, ImplicitSender }
import akka.actor._
import akka.cluster.{ MemberStatus, MultiNodeClusterSpec }
import scala.concurrent.Await
import akka.routing.RouterRoutees
import akka.pattern.ask
import akka.routing.CurrentRoutees
import akka.cluster.Cluster
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
object ClusterLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.MultiNodeClusterSpec
import akka.cluster.NodeMetrics
import akka.pattern.ask
import akka.remote.testkit.{ MultiNodeSpec, MultiNodeConfig }
import akka.routing.CurrentRoutees
import akka.routing.FromConfig
import akka.routing.RouterRoutees
import akka.testkit.{ LongRunningTest, DefaultTimeout, ImplicitSender }
object AdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {
class Routee extends Actor {
var usedMemory: Array[Byte] = _
@ -51,18 +55,38 @@ object ClusterLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
akka.cluster.metrics.collect-interval = 1s
akka.cluster.metrics.gossip-interval = 1s
akka.actor.deployment {
/router3 = {
router = adaptive
metrics-selector = cpu
nr-of-instances = 9
}
/router4 = {
router = adaptive
metrics-selector = "akka.cluster.routing.TestCustomMetricsSelector"
nr-of-instances = 10
cluster {
enabled = on
max-nr-of-instances-per-node = 2
}
}
}
""")).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class ClusterLoadBalancingRouterMultiJvmNode1 extends ClusterLoadBalancingRouterSpec
class ClusterLoadBalancingRouterMultiJvmNode2 extends ClusterLoadBalancingRouterSpec
class ClusterLoadBalancingRouterMultiJvmNode3 extends ClusterLoadBalancingRouterSpec
class TestCustomMetricsSelector(config: Config) extends MetricsSelector {
override def weights(nodeMetrics: Set[NodeMetrics]): Map[Address, Int] = Map.empty
}
abstract class ClusterLoadBalancingRouterSpec extends MultiNodeSpec(ClusterLoadBalancingRouterMultiJvmSpec)
class AdaptiveLoadBalancingRouterMultiJvmNode1 extends AdaptiveLoadBalancingRouterSpec
class AdaptiveLoadBalancingRouterMultiJvmNode2 extends AdaptiveLoadBalancingRouterSpec
class AdaptiveLoadBalancingRouterMultiJvmNode3 extends AdaptiveLoadBalancingRouterSpec
abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoadBalancingRouterMultiJvmSpec)
with MultiNodeClusterSpec
with ImplicitSender with DefaultTimeout {
import ClusterLoadBalancingRouterMultiJvmSpec._
import AdaptiveLoadBalancingRouterMultiJvmSpec._
def currentRoutees(router: ActorRef) =
Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees
@ -86,7 +110,7 @@ abstract class ClusterLoadBalancingRouterSpec extends MultiNodeSpec(ClusterLoadB
def startRouter(name: String): ActorRef = {
val router = system.actorOf(Props[Routee].withRouter(ClusterRouterConfig(
local = ClusterLoadBalancingRouter(HeapMetricsSelector),
local = AdaptiveLoadBalancingRouter(HeapMetricsSelector),
settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1))), name)
awaitCond {
// it may take some time until router receives cluster member events
@ -96,7 +120,7 @@ abstract class ClusterLoadBalancingRouterSpec extends MultiNodeSpec(ClusterLoadB
router
}
"A cluster with a ClusterLoadBalancingRouter" must {
"A cluster with a AdaptiveLoadBalancingRouter" must {
"start cluster nodes" taggedAs LongRunningTest in {
awaitClusterUp(roles: _*)
enterBarrier("after-1")
@ -112,6 +136,7 @@ abstract class ClusterLoadBalancingRouterSpec extends MultiNodeSpec(ClusterLoadB
val iterationCount = 100
for (i 0 until iterationCount) {
router1 ! "hit"
// wait a while between each message, since metrics is collected periodically
Thread.sleep(10)
}
@ -146,6 +171,7 @@ abstract class ClusterLoadBalancingRouterSpec extends MultiNodeSpec(ClusterLoadB
val iterationCount = 100
for (i 0 until iterationCount) {
router2 ! "hit"
// wait a while between each message, since metrics is collected periodically
Thread.sleep(10)
}
@ -158,5 +184,30 @@ abstract class ClusterLoadBalancingRouterSpec extends MultiNodeSpec(ClusterLoadB
enterBarrier("after-3")
}
"create routees from configuration" taggedAs LongRunningTest in {
runOn(first) {
val router3 = system.actorOf(Props[Memory].withRouter(FromConfig()), "router3")
awaitCond {
// it may take some time until router receives cluster member events
currentRoutees(router3).size == 9
}
currentRoutees(router3).map(fullAddress).toSet must be(Set(address(first)))
}
enterBarrier("after-4")
}
"create routees from cluster.enabled configuration" taggedAs LongRunningTest in {
runOn(first) {
val router4 = system.actorOf(Props[Memory].withRouter(FromConfig()), "router4")
awaitCond {
// it may take some time until router receives cluster member events
currentRoutees(router4).size == 6
}
currentRoutees(router4).map(fullAddress).toSet must be(Set(
address(first), address(second), address(third)))
}
enterBarrier("after-5")
}
}
}

View file

@ -47,10 +47,10 @@ class ClusterConfigSpec extends AkkaSpec {
callTimeout = 2 seconds,
resetTimeout = 30 seconds))
MetricsEnabled must be(true)
MetricsCollectorClass must be("akka.cluster.SigarMetricsCollector")
MetricsCollectorClass must be(classOf[SigarMetricsCollector].getName)
MetricsInterval must be(3 seconds)
MetricsGossipInterval must be(3 seconds)
MetricsRateOfDecay must be(10)
MetricsDecayHalfLifeDuration must be(12 seconds)
}
}
}

View file

@ -1,81 +0,0 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import language.postfixOps
import scala.concurrent.duration._
import akka.testkit.{ LongRunningTest, AkkaSpec }
import scala.concurrent.forkjoin.ThreadLocalRandom
import System.currentTimeMillis
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DataStreamSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricSpec with MetricsCollectorFactory {
import system.dispatcher
val collector = createMetricsCollector
"DataStream" must {
"calcualate same ewma for constant values" in {
val ds = DataStream(decay = 5, ewma = 100.0, currentTimeMillis, currentTimeMillis) :+
100.0 :+ 100.0 :+ 100.0
ds.ewma must be(100.0 plusOrMinus 0.001)
}
"calcualate correct ewma for normal decay" in {
val d0 = DataStream(decay = 10, ewma = 1000.0, currentTimeMillis, currentTimeMillis)
d0.ewma must be(1000.0 plusOrMinus 0.01)
val d1 = d0 :+ 10.0
d1.ewma must be(820.0 plusOrMinus 0.01)
val d2 = d1 :+ 10.0
d2.ewma must be(672.73 plusOrMinus 0.01)
val d3 = d2 :+ 10.0
d3.ewma must be(552.23 plusOrMinus 0.01)
val d4 = d3 :+ 10.0
d4.ewma must be(453.64 plusOrMinus 0.01)
val dn = (1 to 100).foldLeft(d0)((d, _) d :+ 10.0)
dn.ewma must be(10.0 plusOrMinus 0.1)
}
"calculate ewma for decay 1" in {
val d0 = DataStream(decay = 1, ewma = 100.0, currentTimeMillis, currentTimeMillis)
d0.ewma must be(100.0 plusOrMinus 0.01)
val d1 = d0 :+ 1.0
d1.ewma must be(1.0 plusOrMinus 0.01)
val d2 = d1 :+ 57.0
d2.ewma must be(57.0 plusOrMinus 0.01)
val d3 = d2 :+ 10.0
d3.ewma must be(10.0 plusOrMinus 0.01)
}
"calculate the ewma for multiple, variable, data streams" taggedAs LongRunningTest in {
var streamingDataSet = Map.empty[String, Metric]
var usedMemory = Array.empty[Byte]
(1 to 50) foreach { _
Thread.sleep(100)
usedMemory = usedMemory ++ Array.fill(1024)(ThreadLocalRandom.current.nextInt(127).toByte)
val changes = collector.sample.metrics.flatMap { latest
streamingDataSet.get(latest.name) match {
case None Some(latest)
case Some(previous)
if (latest.average.isDefined && latest.isDefined && latest.value.get != previous.value.get) {
val updated = previous :+ latest
updated.average.isDefined must be(true)
val updatedAverage = updated.average.get
updatedAverage.timestamp must be > (previous.average.get.timestamp)
updatedAverage.duration.length must be > (previous.average.get.duration.length)
updatedAverage.ewma must not be (previous.average.get.ewma)
(previous.value.get.longValue compare latest.value.get.longValue) must be(
previous.average.get.ewma.longValue compare updatedAverage.ewma.longValue)
Some(updated)
} else None
}
}
streamingDataSet ++= changes.map(m m.name -> m).toMap
}
}
}
}

View file

@ -0,0 +1,101 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import language.postfixOps
import scala.concurrent.duration._
import akka.testkit.{ LongRunningTest, AkkaSpec }
import scala.concurrent.forkjoin.ThreadLocalRandom
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class EWMASpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsCollectorFactory {
import system.dispatcher
val collector = createMetricsCollector
"DataStream" must {
"calcualate same ewma for constant values" in {
val ds = EWMA(value = 100.0, alpha = 0.18) :+
100.0 :+ 100.0 :+ 100.0
ds.value must be(100.0 plusOrMinus 0.001)
}
"calcualate correct ewma for normal decay" in {
val d0 = EWMA(value = 1000.0, alpha = 2.0 / (1 + 10))
d0.value must be(1000.0 plusOrMinus 0.01)
val d1 = d0 :+ 10.0
d1.value must be(820.0 plusOrMinus 0.01)
val d2 = d1 :+ 10.0
d2.value must be(672.73 plusOrMinus 0.01)
val d3 = d2 :+ 10.0
d3.value must be(552.23 plusOrMinus 0.01)
val d4 = d3 :+ 10.0
d4.value must be(453.64 plusOrMinus 0.01)
val dn = (1 to 100).foldLeft(d0)((d, _) d :+ 10.0)
dn.value must be(10.0 plusOrMinus 0.1)
}
"calculate ewma for alpha 1.0, max bias towards latest value" in {
val d0 = EWMA(value = 100.0, alpha = 1.0)
d0.value must be(100.0 plusOrMinus 0.01)
val d1 = d0 :+ 1.0
d1.value must be(1.0 plusOrMinus 0.01)
val d2 = d1 :+ 57.0
d2.value must be(57.0 plusOrMinus 0.01)
val d3 = d2 :+ 10.0
d3.value must be(10.0 plusOrMinus 0.01)
}
"calculate alpha from half-life and collect interval" in {
// according to http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
val expectedAlpha = 0.1
// alpha = 2.0 / (1 + N)
val n = 19
val halfLife = n.toDouble / 2.8854
val collectInterval = 1.second
val halfLifeDuration = (halfLife * 1000).millis
EWMA.alpha(halfLifeDuration, collectInterval) must be(expectedAlpha plusOrMinus 0.001)
}
"calculate sane alpha from short half-life" in {
val alpha = EWMA.alpha(1.millis, 3.seconds)
alpha must be <= (1.0)
alpha must be >= (0.0)
alpha must be(1.0 plusOrMinus 0.001)
}
"calculate sane alpha from long half-life" in {
val alpha = EWMA.alpha(1.day, 3.seconds)
alpha must be <= (1.0)
alpha must be >= (0.0)
alpha must be(0.0 plusOrMinus 0.001)
}
"calculate the ewma for multiple, variable, data streams" taggedAs LongRunningTest in {
var streamingDataSet = Map.empty[String, Metric]
var usedMemory = Array.empty[Byte]
(1 to 50) foreach { _
// wait a while between each message to give the metrics a chance to change
Thread.sleep(100)
usedMemory = usedMemory ++ Array.fill(1024)(ThreadLocalRandom.current.nextInt(127).toByte)
val changes = collector.sample.metrics.flatMap { latest
streamingDataSet.get(latest.name) match {
case None Some(latest)
case Some(previous)
if (latest.isSmooth && latest.value != previous.value) {
val updated = previous :+ latest
updated.isSmooth must be(true)
updated.smoothValue must not be (previous.smoothValue)
Some(updated)
} else None
}
}
streamingDataSet ++= changes.map(m m.name -> m).toMap
}
}
}
}

View file

@ -5,10 +5,11 @@
package akka.cluster
import akka.testkit.{ ImplicitSender, AkkaSpec }
import akka.cluster.NodeMetrics.MetricValues._
import akka.cluster.StandardMetrics.HeapMemory.Fields._
import scala.util.Try
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricNumericConverter with ImplicitSender with MetricSpec
class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricNumericConverter with ImplicitSender
with MetricsCollectorFactory {
"MetricNumericConverter" must {
@ -22,26 +23,17 @@ class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) wit
}
"define a new metric" in {
val metric = Metric(HeapMemoryUsed, Some(256L), decay = Some(10))
val Some(metric) = Metric.create(HeapMemoryUsed, 256L, decayFactor = Some(0.18))
metric.name must be(HeapMemoryUsed)
metric.isDefined must be(true)
metric.value must be(Some(256L))
metric.average.isDefined must be(true)
metric.average.get.ewma must be(256L)
collector match {
case c: SigarMetricsCollector
val cores = c.totalCores
cores.isDefined must be(true)
cores.value.get.intValue must be > (0)
case _
}
metric.value must be(256L)
metric.isSmooth must be(true)
metric.smoothValue must be(256.0 plusOrMinus 0.0001)
}
"define an undefined value with a None " in {
Metric("x", Some(-1), None).value.isDefined must be(false)
Metric("x", Some(java.lang.Double.NaN), None).value.isDefined must be(false)
Metric("x", None, None).isDefined must be(false)
Metric.create("x", -1, None).isDefined must be(false)
Metric.create("x", java.lang.Double.NaN, None).isDefined must be(false)
Metric.create("x", Try(throw new RuntimeException), None).isDefined must be(false)
}
"recognize whether a metric value is defined" in {

View file

@ -6,74 +6,68 @@ package akka.cluster
import scala.util.Try
import akka.actor.Address
import akka.cluster.NodeMetrics.MetricValues._
import akka.testkit.AkkaSpec
import akka.cluster.StandardMetrics.HeapMemory
import akka.cluster.StandardMetrics.Cpu
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricValuesSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricSpec
with MetricsCollectorFactory {
import NodeMetrics._
class MetricValuesSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsCollectorFactory {
val collector = createMetricsCollector
val node1 = NodeMetrics(Address("akka", "sys", "a", 2554), 1, collector.sample.metrics)
val node2 = NodeMetrics(Address("akka", "sys", "a", 2555), 1, collector.sample.metrics)
var nodes: Seq[NodeMetrics] = Seq(node1, node2)
val nodes: Seq[NodeMetrics] = {
var nodes = Seq(node1, node2)
// work up the data streams where applicable
for (i 1 to 100) {
nodes = nodes map {
n
nodes = nodes map { n
n.copy(metrics = collector.sample.metrics.flatMap(latest n.metrics.collect {
case streaming if latest same streaming
streaming.average match {
case Some(e) streaming.copy(value = latest.value, average =
if (latest.isDefined) Some(e :+ latest.value.get.doubleValue) else None)
case None streaming.copy(value = latest.value)
}
case streaming if latest sameAs streaming streaming :+ latest
}))
}
}
nodes
}
"NodeMetrics.MetricValues" must {
"extract expected metrics for load balancing" in {
val stream1 = node2.metric(HeapMemoryCommitted).value.get.longValue
val stream2 = node1.metric(HeapMemoryUsed).value.get.longValue
import HeapMemory.Fields._
val stream1 = node2.metric(HeapMemoryCommitted).get.value.longValue
val stream2 = node1.metric(HeapMemoryUsed).get.value.longValue
stream1 must be >= (stream2)
}
"extract expected MetricValue types for load balancing" in {
nodes foreach {
node
val (used, committed, max) = MetricValues.unapply(node.heapMemory)
nodes foreach { node
node match {
case HeapMemory(address, _, used, committed, Some(max))
committed must be >= (used)
max match {
case Some(m)
used must be <= (m)
committed must be <= (m)
case None
used must be <= (max)
committed must be <= (max)
// extract is the java api
StandardMetrics.extractHeapMemory(node) must not be (null)
case HeapMemory(address, _, used, committed, None)
used must be > (0L)
committed must be > (0L)
// extract is the java api
StandardMetrics.extractCpu(node) must not be (null)
case _ fail("no heap")
}
val network = MetricValues.unapply(node.networkLatency)
if (network.isDefined) {
network.get._1 must be > (0L)
network.get._2 must be > (0L)
}
val (systemLoadAverage, processors, combinedCpu, cores) = MetricValues.unapply(node.cpu)
node match {
case Cpu(address, _, systemLoadAverageOption, cpuCombinedOption, processors)
processors must be > (0)
if (systemLoadAverage.isDefined)
systemLoadAverage.get must be >= (0.0)
if (combinedCpu.isDefined) {
combinedCpu.get must be <= (1.0)
combinedCpu.get must be >= (0.0)
if (systemLoadAverageOption.isDefined)
systemLoadAverageOption.get must be >= (0.0)
if (cpuCombinedOption.isDefined) {
cpuCombinedOption.get must be <= (1.0)
cpuCombinedOption.get must be >= (0.0)
}
if (cores.isDefined) {
cores.get must be > (0)
cores.get must be >= (processors)
// extract is the java api
StandardMetrics.extractCpu(node) must not be (null)
case _ fail("no cpu")
}
}
}

View file

@ -1,4 +1,5 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
@ -11,7 +12,8 @@ import scala.util.{ Success, Try, Failure }
import akka.actor._
import akka.testkit._
import akka.cluster.NodeMetrics.MetricValues._
import akka.cluster.StandardMetrics.HeapMemory.Fields._
import akka.cluster.StandardMetrics.Cpu.Fields._
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
@ -20,14 +22,12 @@ object MetricsEnabledSpec {
akka.cluster.metrics.enabled = on
akka.cluster.metrics.collect-interval = 1 s
akka.cluster.metrics.gossip-interval = 1 s
akka.cluster.metrics.rate-of-decay = 10
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
"""
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with MetricSpec
with MetricsCollectorFactory {
class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with MetricsCollectorFactory {
import system.dispatcher
val collector = createMetricsCollector
@ -38,25 +38,25 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl
for (i 1 to 20) {
val sample1 = collector.sample.metrics
val sample2 = collector.sample.metrics
var merged = sample2 flatMap (latest sample1 collect {
case peer if latest same peer {
val merged12 = sample2 flatMap (latest sample1 collect {
case peer if latest sameAs peer {
val m = peer :+ latest
assertMerged(latest, peer, m)
m.value must be(latest.value)
m.isSmooth must be(peer.isSmooth || latest.isSmooth)
m
}
})
val sample3 = collector.sample.metrics
val sample4 = collector.sample.metrics
merged = sample4 flatMap (latest sample3 collect {
case peer if latest same peer {
val merged34 = sample4 flatMap (latest sample3 collect {
case peer if latest sameAs peer {
val m = peer :+ latest
assertMerged(latest, peer, m)
m.value must be(latest.value)
m.isSmooth must be(peer.isSmooth || latest.isSmooth)
m
}
})
merged.size must be(sample3.size)
merged.size must be(sample4.size)
}
}
}
@ -69,13 +69,10 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl
"collect accurate metrics for a node" in {
val sample = collector.sample
val metrics = sample.metrics.collect { case m if m.isDefined (m.name, m.value.get) }
val metrics = sample.metrics.collect { case m (m.name, m.value) }
val used = metrics collectFirst { case (HeapMemoryUsed, b) b }
val committed = metrics collectFirst { case (HeapMemoryCommitted, b) b }
metrics foreach {
case (TotalCores, b) b.intValue must be > (0)
case (NetworkInboundRate, b) b.longValue must be > (0L)
case (NetworkOutboundRate, b) b.longValue must be > (0L)
case (SystemLoadAverage, b) b.doubleValue must be >= (0.0)
case (Processors, b) b.intValue must be >= (0)
case (HeapMemoryUsed, b) b.longValue must be >= (0L)
@ -91,25 +88,14 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl
}
}
"collect SIGAR metrics if it is on the classpath" in {
collector match {
case c: SigarMetricsCollector
// combined cpu may or may not be defined on a given sampling
// systemLoadAverage is not present on all platforms
c.networkMaxRx.isDefined must be(true)
c.networkMaxTx.isDefined must be(true)
c.totalCores.isDefined must be(true)
case _
}
}
"collect JMX metrics" in {
// heap max may be undefined depending on the OS
// systemLoadAverage is JMX when SIGAR not present, but
// it's not present on all platforms
val c = collector.asInstanceOf[JmxMetricsCollector]
c.heapUsed.isDefined must be(true)
c.heapCommitted.isDefined must be(true)
val heap = c.heapMemoryUsage
c.heapUsed(heap).isDefined must be(true)
c.heapCommitted(heap).isDefined must be(true)
c.processors.isDefined must be(true)
}
@ -123,47 +109,6 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl
}
}
trait MetricSpec extends WordSpec with MustMatchers { this: { def system: ActorSystem }
def assertMasterMetricsAgainstGossipMetrics(master: Set[NodeMetrics], gossip: MetricsGossip): Unit = {
val masterMetrics = collectNodeMetrics(master)
val gossipMetrics = collectNodeMetrics(gossip.nodes)
gossipMetrics.size must be(masterMetrics.size plusOrMinus 1) // combined cpu
}
def assertExpectedNodeAddresses(gossip: MetricsGossip, nodes: Set[NodeMetrics]): Unit =
gossip.nodes.map(_.address) must be(nodes.map(_.address))
def assertMerged(latest: Metric, peer: Metric, merged: Metric): Unit = if (latest same peer) {
if (latest.isDefined) {
if (peer.isDefined) {
merged.isDefined must be(true)
merged.value.get must be(latest.value.get)
merged.average.isDefined must be(latest.average.isDefined)
} else {
merged.isDefined must be(true)
merged.value.get must be(latest.value.get)
merged.average.isDefined must be(latest.average.isDefined || peer.average.isDefined)
}
} else {
if (peer.isDefined) {
merged.isDefined must be(true)
merged.value.get must be(peer.value.get)
merged.average.isDefined must be(peer.average.isDefined)
} else {
merged.isDefined must be(false)
merged.average.isEmpty must be(true)
}
}
}
def collectNodeMetrics(nodes: Set[NodeMetrics]): Seq[Metric] = {
var r: Seq[Metric] = Seq.empty
nodes.foreach(n r ++= n.metrics.filter(_.isDefined))
r
}
}
/**
* Used when testing metrics without full cluster
*/
@ -173,15 +118,15 @@ trait MetricsCollectorFactory { this: AkkaSpec ⇒
def selfAddress = extendedActorSystem.provider.rootPath.address
val defaultRateOfDecay = 10
val defaultDecayFactor = 2.0 / (1 + 10)
def createMetricsCollector: MetricsCollector =
Try(new SigarMetricsCollector(selfAddress, defaultRateOfDecay, extendedActorSystem.dynamicAccess)) match {
Try(new SigarMetricsCollector(selfAddress, defaultDecayFactor,
extendedActorSystem.dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Seq.empty).get)) match {
case Success(sigarCollector) sigarCollector
case Failure(e)
log.debug("Metrics will be retreived from MBeans, Sigar failed to load. Reason: " +
e.getMessage)
new JmxMetricsCollector(selfAddress, defaultRateOfDecay)
log.debug("Metrics will be retreived from MBeans, Sigar failed to load. Reason: " + e)
new JmxMetricsCollector(selfAddress, defaultDecayFactor)
}
def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]

View file

@ -12,8 +12,7 @@ import akka.actor.Address
import java.lang.System.{ currentTimeMillis newTimestamp }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with MetricSpec
with MetricsCollectorFactory {
class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with MetricsCollectorFactory {
val collector = createMetricsCollector
@ -22,82 +21,76 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
var localGossip = MetricsGossip()
localGossip :+= m1
localGossip.nodes.size must be(1)
localGossip.nodeKeys.size must be(localGossip.nodes.size)
assertMasterMetricsAgainstGossipMetrics(Set(m1), localGossip)
collector.sample.metrics.size must be > (3)
m1.metrics.size must be > (3)
m2.metrics.size must be > (3)
localGossip :+= m2
localGossip.nodes.size must be(2)
localGossip.nodeKeys.size must be(localGossip.nodes.size)
assertMasterMetricsAgainstGossipMetrics(Set(m1, m2), localGossip)
collector.sample.metrics.size must be > (3)
val g1 = MetricsGossip.empty :+ m1
g1.nodes.size must be(1)
g1.nodeKeys.size must be(g1.nodes.size)
g1.metricsFor(m1.address).size must be(m1.metrics.size)
val g2 = g1 :+ m2
g2.nodes.size must be(2)
g2.nodeKeys.size must be(g2.nodes.size)
g2.metricsFor(m1.address).size must be(m1.metrics.size)
g2.metricsFor(m2.address).size must be(m2.metrics.size)
}
"merge peer metrics" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
var remoteGossip = MetricsGossip()
remoteGossip :+= m1
remoteGossip :+= m2
remoteGossip.nodes.size must be(2)
val beforeMergeNodes = remoteGossip.nodes
val g1 = MetricsGossip.empty :+ m1 :+ m2
g1.nodes.size must be(2)
val beforeMergeNodes = g1.nodes
val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = newTimestamp)
remoteGossip :+= m2Updated // merge peers
remoteGossip.nodes.size must be(2)
assertMasterMetricsAgainstGossipMetrics(beforeMergeNodes, remoteGossip)
remoteGossip.nodes.foreach(_.metrics.size must be > (3))
remoteGossip.nodes collect { case peer if peer.address == m2.address peer.timestamp must be(m2Updated.timestamp) }
val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = m2.timestamp + 1000)
val g2 = g1 :+ m2Updated // merge peers
g2.nodes.size must be(2)
g2.metricsFor(m1.address).size must be(m1.metrics.size)
g2.metricsFor(m2.address).size must be(m2Updated.metrics.size)
g2.nodes collect { case peer if peer.address == m2.address peer.timestamp must be(m2Updated.timestamp) }
}
"merge an existing metric set for a node and update node ring" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
val m3 = NodeMetrics(Address("akka", "sys", "a", 2556), newTimestamp, collector.sample.metrics)
val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = newTimestamp)
val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = m2.timestamp + 1000)
var localGossip = MetricsGossip()
localGossip :+= m1
localGossip :+= m2
val g1 = MetricsGossip.empty :+ m1 :+ m2
val g2 = MetricsGossip.empty :+ m3 :+ m2Updated
var remoteGossip = MetricsGossip()
remoteGossip :+= m3
remoteGossip :+= m2Updated
localGossip.nodeKeys.contains(m1.address) must be(true)
remoteGossip.nodeKeys.contains(m3.address) must be(true)
g1.nodeKeys.contains(m1.address) must be(true)
g2.nodeKeys.contains(m3.address) must be(true)
// must contain nodes 1,3, and the most recent version of 2
val mergedGossip = localGossip merge remoteGossip
val mergedGossip = g1 merge g2
mergedGossip.nodes.size must be(3)
assertExpectedNodeAddresses(mergedGossip, Set(m1, m2, m3))
mergedGossip.metricsFor(m1.address).size must be(m1.metrics.size)
mergedGossip.metricsFor(m2.address).size must be(m2Updated.metrics.size)
mergedGossip.metricsFor(m3.address).size must be(m3.metrics.size)
mergedGossip.nodes.foreach(_.metrics.size must be > (3))
mergedGossip.nodes.find(_.address == m2.address).get.timestamp must be(m2Updated.timestamp)
}
"get the current NodeMetrics if it exists in the local nodes" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
var localGossip = MetricsGossip()
localGossip :+= m1
localGossip.metricsFor(m1).nonEmpty must be(true)
val g1 = MetricsGossip.empty :+ m1
g1.metricsFor(m1.address).nonEmpty must be(true)
}
"remove a node if it is no longer Up" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
var localGossip = MetricsGossip()
localGossip :+= m1
localGossip :+= m2
localGossip.nodes.size must be(2)
localGossip = localGossip remove m1.address
localGossip.nodes.size must be(1)
localGossip.nodes.exists(_.address == m1.address) must be(false)
val g1 = MetricsGossip.empty :+ m1 :+ m2
g1.nodes.size must be(2)
val g2 = g1 remove m1.address
g2.nodes.size must be(1)
g2.nodes.exists(_.address == m1.address) must be(false)
g2.metricsFor(m1.address).size must be(0)
g2.metricsFor(m2.address).size must be(m2.metrics.size)
}
}
}

View file

@ -8,8 +8,7 @@ import akka.testkit.AkkaSpec
import akka.actor.Address
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class NodeMetricsSpec extends AkkaSpec with MetricSpec
with MetricsCollectorFactory {
class NodeMetricsSpec extends AkkaSpec with MetricsCollectorFactory {
val collector = createMetricsCollector
@ -27,11 +26,11 @@ class NodeMetricsSpec extends AkkaSpec with MetricSpec
}
"return correct result for 2 'same' nodes" in {
(NodeMetrics(node1, 0) same NodeMetrics(node1, 0)) must be(true)
(NodeMetrics(node1, 0) sameAs NodeMetrics(node1, 0)) must be(true)
}
"return correct result for 2 not 'same' nodes" in {
(NodeMetrics(node1, 0) same NodeMetrics(node2, 0)) must be(false)
(NodeMetrics(node1, 0) sameAs NodeMetrics(node2, 0)) must be(false)
}
"merge 2 NodeMetrics by most recent" in {

View file

@ -1,114 +0,0 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import akka.testkit.AkkaSpec
import akka.actor.Address
import akka.actor.RootActorPath
import akka.cluster.NodeMetrics
import akka.cluster.NodeMetrics.MetricValues._
import akka.cluster.Metric
import com.typesafe.config.ConfigFactory
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class HeapMetricsSelectorSpec extends AkkaSpec(ConfigFactory.parseString("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.netty.port = 0
""")) {
val selector = HeapMetricsSelector
val a1 = Address("akka", "sys", "a1", 2551)
val b1 = Address("akka", "sys", "b1", 2551)
val c1 = Address("akka", "sys", "c1", 2551)
val d1 = Address("akka", "sys", "d1", 2551)
val decay = Some(10)
val nodeMetricsA = NodeMetrics(a1, System.currentTimeMillis, Set(
Metric(HeapMemoryUsed, Some(BigInt(128)), decay),
Metric(HeapMemoryCommitted, Some(BigInt(256)), decay),
Metric(HeapMemoryMax, Some(BigInt(512)), None)))
val nodeMetricsB = NodeMetrics(b1, System.currentTimeMillis, Set(
Metric(HeapMemoryUsed, Some(BigInt(256)), decay),
Metric(HeapMemoryCommitted, Some(BigInt(512)), decay),
Metric(HeapMemoryMax, Some(BigInt(1024)), None)))
val nodeMetricsC = NodeMetrics(c1, System.currentTimeMillis, Set(
Metric(HeapMemoryUsed, Some(BigInt(1024)), decay),
Metric(HeapMemoryCommitted, Some(BigInt(1024)), decay),
Metric(HeapMemoryMax, Some(BigInt(1024)), None)))
val nodeMetrics = Set(nodeMetricsA, nodeMetricsB, nodeMetricsC)
"MetricsAwareClusterNodeSelector" must {
"calculate capacity of heap metrics" in {
val capacity = selector.capacity(nodeMetrics)
capacity(a1) must be(0.75 plusOrMinus 0.0001)
capacity(b1) must be(0.75 plusOrMinus 0.0001)
capacity(c1) must be(0.0 plusOrMinus 0.0001)
}
"calculate weights from capacity" in {
val capacity = Map(a1 -> 0.6, b1 -> 0.3, c1 -> 0.1)
val weights = selector.weights(capacity)
weights must be(Map(c1 -> 1, b1 -> 3, a1 -> 6))
}
"handle low and zero capacity" in {
val capacity = Map(a1 -> 0.0, b1 -> 1.0, c1 -> 0.005, d1 -> 0.004)
val weights = selector.weights(capacity)
weights must be(Map(a1 -> 0, b1 -> 100, c1 -> 1, d1 -> 0))
}
"allocate weighted refs" in {
val weights = Map(a1 -> 1, b1 -> 3, c1 -> 10)
val refs = IndexedSeq(
system.actorFor(RootActorPath(a1) / "user" / "a"),
system.actorFor(RootActorPath(b1) / "user" / "b"),
system.actorFor(RootActorPath(c1) / "user" / "c"))
val result = selector.weightedRefs(refs, a1, weights)
val grouped = result.groupBy(_.path.address)
grouped(a1).size must be(1)
grouped(b1).size must be(3)
grouped(c1).size must be(10)
}
"allocate refs for undefined weight" in {
val weights = Map(a1 -> 1, b1 -> 2)
val refs = IndexedSeq(
system.actorFor(RootActorPath(a1) / "user" / "a"),
system.actorFor(RootActorPath(b1) / "user" / "b"),
system.actorFor(RootActorPath(c1) / "user" / "c"))
val result = selector.weightedRefs(refs, a1, weights)
val grouped = result.groupBy(_.path.address)
grouped(a1).size must be(1)
grouped(b1).size must be(2)
grouped(c1).size must be(1)
}
"allocate weighted local refs" in {
val weights = Map(a1 -> 2, b1 -> 1, c1 -> 10)
val refs = IndexedSeq(
testActor,
system.actorFor(RootActorPath(b1) / "user" / "b"),
system.actorFor(RootActorPath(c1) / "user" / "c"))
val result = selector.weightedRefs(refs, a1, weights)
result.filter(_ == testActor).size must be(2)
}
"not allocate ref with weight zero" in {
val weights = Map(a1 -> 0, b1 -> 2, c1 -> 10)
val refs = IndexedSeq(
system.actorFor(RootActorPath(a1) / "user" / "a"),
system.actorFor(RootActorPath(b1) / "user" / "b"),
system.actorFor(RootActorPath(c1) / "user" / "c"))
val result = selector.weightedRefs(refs, a1, weights)
result.filter(_ == refs.head).size must be(0)
}
}
}

View file

@ -0,0 +1,119 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.Address
import akka.cluster.Metric
import akka.cluster.NodeMetrics
import akka.cluster.StandardMetrics.HeapMemory.Fields._
import akka.cluster.StandardMetrics.Cpu.Fields._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricsSelectorSpec extends WordSpec with MustMatchers {
val abstractSelector = new CapacityMetricsSelector {
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = Map.empty
}
val a1 = Address("akka", "sys", "a1", 2551)
val b1 = Address("akka", "sys", "b1", 2551)
val c1 = Address("akka", "sys", "c1", 2551)
val d1 = Address("akka", "sys", "d1", 2551)
val decayFactor = Some(0.18)
val nodeMetricsA = NodeMetrics(a1, System.currentTimeMillis, Set(
Metric.create(HeapMemoryUsed, 128, decayFactor),
Metric.create(HeapMemoryCommitted, 256, decayFactor),
Metric.create(HeapMemoryMax, 512, None),
Metric.create(CpuCombined, 0.1, decayFactor),
Metric.create(SystemLoadAverage, 0.5, None),
Metric.create(Processors, 8, None)).flatten)
val nodeMetricsB = NodeMetrics(b1, System.currentTimeMillis, Set(
Metric.create(HeapMemoryUsed, 256, decayFactor),
Metric.create(HeapMemoryCommitted, 512, decayFactor),
Metric.create(HeapMemoryMax, 1024, None),
Metric.create(CpuCombined, 0.5, decayFactor),
Metric.create(SystemLoadAverage, 1.0, None),
Metric.create(Processors, 16, None)).flatten)
val nodeMetricsC = NodeMetrics(c1, System.currentTimeMillis, Set(
Metric.create(HeapMemoryUsed, 1024, decayFactor),
Metric.create(HeapMemoryCommitted, 1024, decayFactor),
Metric.create(HeapMemoryMax, 1024, None),
Metric.create(CpuCombined, 1.0, decayFactor),
Metric.create(SystemLoadAverage, 16.0, None),
Metric.create(Processors, 16, None)).flatten)
val nodeMetricsD = NodeMetrics(d1, System.currentTimeMillis, Set(
Metric.create(HeapMemoryUsed, 511, decayFactor),
Metric.create(HeapMemoryCommitted, 512, decayFactor),
Metric.create(HeapMemoryMax, 512, None),
Metric.create(Processors, 2, decayFactor)).flatten)
val nodeMetrics = Set(nodeMetricsA, nodeMetricsB, nodeMetricsC, nodeMetricsD)
"CapacityMetricsSelector" must {
"calculate weights from capacity" in {
val capacity = Map(a1 -> 0.6, b1 -> 0.3, c1 -> 0.1)
val weights = abstractSelector.weights(capacity)
weights must be(Map(c1 -> 1, b1 -> 3, a1 -> 6))
}
"handle low and zero capacity" in {
val capacity = Map(a1 -> 0.0, b1 -> 1.0, c1 -> 0.005, d1 -> 0.004)
val weights = abstractSelector.weights(capacity)
weights must be(Map(a1 -> 0, b1 -> 100, c1 -> 1, d1 -> 0))
}
}
"HeapMetricsSelector" must {
"calculate capacity of heap metrics" in {
val capacity = HeapMetricsSelector.capacity(nodeMetrics)
capacity(a1) must be(0.75 plusOrMinus 0.0001)
capacity(b1) must be(0.75 plusOrMinus 0.0001)
capacity(c1) must be(0.0 plusOrMinus 0.0001)
capacity(d1) must be(0.001953125 plusOrMinus 0.0001)
}
}
"CpuMetricsSelector" must {
"calculate capacity of cpuCombined metrics" in {
val capacity = CpuMetricsSelector.capacity(nodeMetrics)
capacity(a1) must be(0.9 plusOrMinus 0.0001)
capacity(b1) must be(0.5 plusOrMinus 0.0001)
capacity(c1) must be(0.0 plusOrMinus 0.0001)
capacity.contains(d1) must be(false)
}
}
"SystemLoadAverageMetricsSelector" must {
"calculate capacity of systemLoadAverage metrics" in {
val capacity = SystemLoadAverageMetricsSelector.capacity(nodeMetrics)
capacity(a1) must be(0.9375 plusOrMinus 0.0001)
capacity(b1) must be(0.9375 plusOrMinus 0.0001)
capacity(c1) must be(0.0 plusOrMinus 0.0001)
capacity.contains(d1) must be(false)
}
}
"MixMetricsSelector" must {
"aggregate capacity of all metrics" in {
val capacity = MixMetricsSelector().capacity(nodeMetrics)
capacity(a1) must be((0.75 + 0.9 + 0.9375) / 3 plusOrMinus 0.0001)
capacity(b1) must be((0.75 + 0.5 + 0.9375) / 3 plusOrMinus 0.0001)
capacity(c1) must be((0.0 + 0.0 + 0.0) / 3 plusOrMinus 0.0001)
capacity(d1) must be((0.001953125) / 1 plusOrMinus 0.0001)
}
}
}

View file

@ -0,0 +1,84 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import com.typesafe.config.ConfigFactory
import akka.actor.Address
import akka.actor.RootActorPath
import akka.testkit.AkkaSpec
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class WeightedRouteesSpec extends AkkaSpec(ConfigFactory.parseString("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.netty.port = 0
""")) {
val a1 = Address("akka", "sys", "a1", 2551)
val b1 = Address("akka", "sys", "b1", 2551)
val c1 = Address("akka", "sys", "c1", 2551)
val d1 = Address("akka", "sys", "d1", 2551)
val refA = system.actorFor(RootActorPath(a1) / "user" / "a")
val refB = system.actorFor(RootActorPath(b1) / "user" / "b")
val refC = system.actorFor(RootActorPath(c1) / "user" / "c")
"WeightedRoutees" must {
"allocate weighted refs" in {
val weights = Map(a1 -> 1, b1 -> 3, c1 -> 10)
val refs = IndexedSeq(refA, refB, refC)
val weighted = new WeightedRoutees(refs, a1, weights)
weighted(1) must be(refA)
2 to 4 foreach { weighted(_) must be(refB) }
5 to 14 foreach { weighted(_) must be(refC) }
weighted.total must be(14)
}
"check boundaries" in {
val empty = new WeightedRoutees(Vector(), a1, Map.empty)
empty.total must be(0)
val weighted = new WeightedRoutees(IndexedSeq(refA, refB, refC), a1, Map.empty)
weighted.total must be(3)
intercept[IllegalArgumentException] {
weighted(0)
}
intercept[IllegalArgumentException] {
weighted(4)
}
}
"allocate refs for undefined weight" in {
val weights = Map(a1 -> 1, b1 -> 7)
val refs = IndexedSeq(refA, refB, refC)
val weighted = new WeightedRoutees(refs, a1, weights)
weighted(1) must be(refA)
2 to 8 foreach { weighted(_) must be(refB) }
// undefined, uses the mean of the weights, i.e. 4
9 to 12 foreach { weighted(_) must be(refC) }
weighted.total must be(12)
}
"allocate weighted local refs" in {
val weights = Map(a1 -> 2, b1 -> 1, c1 -> 10)
val refs = IndexedSeq(testActor, refB, refC)
val weighted = new WeightedRoutees(refs, a1, weights)
1 to 2 foreach { weighted(_) must be(testActor) }
3 to weighted.total foreach { weighted(_) must not be (testActor) }
}
"not allocate ref with weight zero" in {
val weights = Map(a1 -> 0, b1 -> 2, c1 -> 10)
val refs = IndexedSeq(refA, refB, refC)
val weighted = new WeightedRoutees(refs, a1, weights)
1 to weighted.total foreach { weighted(_) must not be (refA) }
}
}
}

View file

@ -38,8 +38,7 @@ Try it out:
1. Add the following ``application.conf`` in your project, place it in ``src/main/resources``:
.. literalinclude:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf
:language: none
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#cluster
To enable cluster capabilities in your Akka project you should, at a minimum, add the :ref:`remoting-java`
settings, but with ``akka.cluster.ClusterActorRefProvider``.
@ -438,6 +437,105 @@ service nodes and 1 client::
.. note:: The above example, especially the last part, will be simplified when the cluster handles automatic actor partitioning.
Cluster Metrics
^^^^^^^^^^^^^^^
The member nodes of the cluster collects system health metrics and publish that to other nodes and to
registered subscribers. This information is primarily used for load-balancing of nodes.
Hyperic Sigar
-------------
The built-in metrics is gathered from JMX MBeans, and optionally you can use `Hyperic Sigar <http://www.hyperic.com/products/sigar>`_
for a wider and more accurate range of metrics compared to what can be retrieved from ordinary MBeans.
Sigar is using a native OS library. To enable usage of Sigar you need to add the directory of the native library to
``-Djava.libarary.path=<path_of_sigar_libs>`` add the following dependency::
<dependency>
<groupId>org.hyperic</groupId>
<artifactId>sigar</artifactId>
<version>1.6.4</version>
</dependency>
Adaptive Load Balancing
-----------------------
The ``AdaptiveLoadBalancingRouter`` performs load balancing of messages to cluster nodes based on the cluster metrics data.
It uses random selection of routees with probabilities derived from the remaining capacity of corresponding node.
It can be configured to use a specific MetricsSelector to produce the probabilities, a.k.a. weights:
* ``heap`` / ``HeapMetricsSelector`` - Used and max JVM heap memory. Weights based on remaining heap capacity; (max - used) / max
* ``load`` / ``SystemLoadAverageMetricsSelector`` - System load average for the past 1 minute, corresponding value can be found in ``top`` of Linux systems. The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores. Weights based on remaining load capacity; 1 - (load / processors)
* ``cpu`` / ``CpuMetricsSelector`` - CPU utilization in percentage, sum of User + Sys + Nice + Wait. Weights based on remaining cpu capacity; 1 - utilization
* ``mix`` / ``MixMetricsSelector`` - Combines heap, cpu and load. Weights based on mean of remaining capacity of the combined selectors.
* Any custom implementation of ``akka.cluster.routing.MetricsSelector``
The collected metrics values are smoothed with `exponential weighted moving average <http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average>`_. In the :ref:`cluster_configuration_java` you can adjust how quickly past data is decayed compared to new data.
Let's take a look at this router in action.
In this example the following imports are used:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialBackend.java#imports
The backend worker that performs the factorial calculation:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialBackend.java#backend
The frontend that receives user jobs and delegates to the backends via the router:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontend.java#frontend
As you can see, the router is defined in the same way as other routers, and in this case it's configured as follows:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#adaptive-router
It's only router type ``adaptive`` and the ``metrics-selector`` that is specific to this router, other things work
in the same way as other routers.
The same type of router could also have been defined in code:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontend.java#router-lookup-in-code
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontend.java#router-deploy-in-code
This example is included in ``akka-samples/akka-sample-cluster`` and you can try it by copying the
`source <@github@/akka-samples/akka-sample-cluster>`_ to your
maven project, defined as in :ref:`cluster_simple_example_java`.
Run it by starting nodes in different terminal windows. For example, starting 3 backend nodes and
one frontend::
mvn exec:java \
-Dexec.mainClass="sample.cluster.factorial.FactorialBackendMain" \
-Dexec.args="2551"
mvn exec:java \
-Dexec.mainClass="sample.cluster.factorial.FactorialBackendMain" \
-Dexec.args="2552"
mvn exec:java \
-Dexec.mainClass="sample.cluster.factorial.FactorialBackendMain"
mvn exec:java \
-Dexec.mainClass="sample.cluster.factorial.FactorialFrontendMain"
Subscribe to Metrics Events
---------------------------
It's possible to subscribe to the metrics events directly to implement other functionality.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/MetricsListener.java#metrics-listener
Custom Metrics Collector
------------------------
You can plug-in your own metrics collector instead of
``akka.cluster.SigarMetricsCollector`` or ``akka.cluster.JmxMetricsCollector``. Look at those two implementations
for inspiration. The implementation class can be defined in the :ref:`cluster_configuration_java`.
.. _cluster_jmx_java:

View file

@ -32,8 +32,7 @@ Try it out:
1. Add the following ``application.conf`` in your project, place it in ``src/main/resources``:
.. literalinclude:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf
:language: none
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#cluster
To enable cluster capabilities in your Akka project you should, at a minimum, add the :ref:`remoting-scala`
settings, but with ``akka.cluster.ClusterActorRefProvider``.
@ -265,6 +264,8 @@ This is how the curve looks like for ``acceptable-heartbeat-pause`` configured t
.. image:: images/phi3.png
.. _cluster_aware_routers_scala:
Cluster Aware Routers
^^^^^^^^^^^^^^^^^^^^^
@ -397,6 +398,96 @@ service nodes and 1 client::
.. note:: The above example, especially the last part, will be simplified when the cluster handles automatic actor partitioning.
Cluster Metrics
^^^^^^^^^^^^^^^
The member nodes of the cluster collects system health metrics and publish that to other nodes and to
registered subscribers. This information is primarily used for load-balancing of nodes.
Hyperic Sigar
-------------
The built-in metrics is gathered from JMX MBeans, and optionally you can use `Hyperic Sigar <http://www.hyperic.com/products/sigar>`_
for a wider and more accurate range of metrics compared to what can be retrieved from ordinary MBeans.
Sigar is using a native OS library. To enable usage of Sigar you need to add the directory of the native library to
``-Djava.libarary.path=<path_of_sigar_libs>`` add the following dependency::
"org.hyperic" % "sigar" % "1.6.4"
Adaptive Load Balancing
-----------------------
The ``AdaptiveLoadBalancingRouter`` performs load balancing of messages to cluster nodes based on the cluster metrics data.
It uses random selection of routees with probabilities derived from the remaining capacity of corresponding node.
It can be configured to use a specific MetricsSelector to produce the probabilities, a.k.a. weights:
* ``heap`` / ``HeapMetricsSelector`` - Used and max JVM heap memory. Weights based on remaining heap capacity; (max - used) / max
* ``load`` / ``SystemLoadAverageMetricsSelector`` - System load average for the past 1 minute, corresponding value can be found in ``top`` of Linux systems. The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores. Weights based on remaining load capacity; 1 - (load / processors)
* ``cpu`` / ``CpuMetricsSelector`` - CPU utilization in percentage, sum of User + Sys + Nice + Wait. Weights based on remaining cpu capacity; 1 - utilization
* ``mix`` / ``MixMetricsSelector`` - Combines heap, cpu and load. Weights based on mean of remaining capacity of the combined selectors.
* Any custom implementation of ``akka.cluster.routing.MetricsSelector``
The collected metrics values are smoothed with `exponential weighted moving average <http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average>`_. In the :ref:`cluster_configuration_scala` you can adjust how quickly past data is decayed compared to new data.
Let's take a look at this router in action.
In this example the following imports are used:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#imports
The backend worker that performs the factorial calculation:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#backend
The frontend that receives user jobs and delegates to the backends via the router:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#frontend
As you can see, the router is defined in the same way as other routers, and in this case it's configured as follows:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#adaptive-router
It's only router type ``adaptive`` and the ``metrics-selector`` that is specific to this router, other things work
in the same way as other routers.
The same type of router could also have been defined in code:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#router-lookup-in-code
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#router-deploy-in-code
This example is included in ``akka-samples/akka-sample-cluster``
and you can try by starting nodes in different terminal windows. For example, starting 3 backend nodes and one frontend::
sbt
project akka-sample-cluster-experimental
run-main sample.cluster.factorial.FactorialBackend 2551
run-main sample.cluster.factorial.FactorialBackend 2552
run-main sample.cluster.factorial.FactorialBackend
run-main sample.cluster.factorial.FactorialFrontend
Subscribe to Metrics Events
---------------------------
It's possible to subscribe to the metrics events directly to implement other functionality.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#metrics-listener
Custom Metrics Collector
------------------------
You can plug-in your own metrics collector instead of
``akka.cluster.SigarMetricsCollector`` or ``akka.cluster.JmxMetricsCollector``. Look at those two implementations
for inspiration. The implementation class can be defined in the :ref:`cluster_configuration_scala`.
How to Test
^^^^^^^^^^^

View file

@ -24,7 +24,8 @@ class RemoteActorRefProvider(
val remoteSettings: RemoteSettings = new RemoteSettings(settings.config, systemName)
val deployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess)
// this is lazy to be able to override it in subclass
lazy val deployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess)
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, dynamicAccess, deployer)

View file

@ -0,0 +1,49 @@
package sample.cluster.factorial.japi;
//#imports
import java.math.BigInteger;
import java.util.concurrent.Callable;
import scala.concurrent.Future;
import akka.actor.UntypedActor;
import akka.dispatch.Mapper;
import static akka.dispatch.Futures.future;
import static akka.pattern.Patterns.pipe;
//#imports
//#backend
public class FactorialBackend extends UntypedActor {
@Override
public void onReceive(Object message) {
if (message instanceof Integer) {
final Integer n = (Integer) message;
Future<BigInteger> f = future(new Callable<BigInteger>() {
public BigInteger call() {
return factorial(n);
}
}, getContext().dispatcher());
Future<FactorialResult> result = f.map(
new Mapper<BigInteger, FactorialResult>() {
public FactorialResult apply(BigInteger factorial) {
return new FactorialResult(n, factorial);
}
}, getContext().dispatcher());
pipe(result, getContext().dispatcher()).to(getSender());
} else {
unhandled(message);
}
}
BigInteger factorial(int n) {
BigInteger acc = BigInteger.ONE;
for (int i = 1; i <= n; ++i) {
acc = acc.multiply(BigInteger.valueOf(i));
}
return acc;
}
}
//#backend

View file

@ -0,0 +1,22 @@
package sample.cluster.factorial.japi;
import akka.actor.ActorSystem;
import akka.actor.Props;
public class FactorialBackendMain {
public static void main(String[] args) throws Exception {
// Override the configuration of the port
// when specified as program argument
if (args.length > 0)
System.setProperty("akka.remote.netty.port", args[0]);
ActorSystem system = ActorSystem.create("ClusterSystem");
system.actorOf(new Props(FactorialBackend.class), "factorialBackend");
system.actorOf(new Props(MetricsListener.class), "metricsListener");
}
}

View file

@ -0,0 +1,72 @@
package sample.cluster.factorial.japi;
import akka.actor.UntypedActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.routing.FromConfig;
import akka.cluster.routing.AdaptiveLoadBalancingRouter;
import akka.cluster.routing.ClusterRouterConfig;
import akka.cluster.routing.ClusterRouterSettings;
import akka.cluster.routing.HeapMetricsSelector;
import akka.cluster.routing.SystemLoadAverageMetricsSelector;
//#frontend
public class FactorialFrontend extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
ActorRef backend = getContext().actorOf(
new Props(FactorialBackend.class).withRouter(FromConfig.getInstance()),
"factorialBackendRouter");
@Override
public void onReceive(Object message) {
if (message instanceof Integer) {
Integer n = (Integer) message;
backend.tell(n, getSelf());
} else if (message instanceof FactorialResult) {
FactorialResult result = (FactorialResult) message;
log.info("{}! = {}", result.n, result.factorial);
} else {
unhandled(message);
}
}
}
//#frontend
//not used, only for documentation
abstract class FactorialFrontend2 extends UntypedActor {
//#router-lookup-in-code
int totalInstances = 100;
String routeesPath = "/user/statsWorker";
boolean allowLocalRoutees = true;
ActorRef backend = getContext().actorOf(
new Props(FactorialBackend.class).withRouter(new ClusterRouterConfig(
new AdaptiveLoadBalancingRouter(HeapMetricsSelector.getInstance(), 0),
new ClusterRouterSettings(
totalInstances, routeesPath, allowLocalRoutees))),
"factorialBackendRouter2");
//#router-lookup-in-code
}
//not used, only for documentation
abstract class StatsService3 extends UntypedActor {
//#router-deploy-in-code
int totalInstances = 100;
int maxInstancesPerNode = 3;
boolean allowLocalRoutees = false;
ActorRef backend = getContext().actorOf(
new Props(FactorialBackend.class).withRouter(new ClusterRouterConfig(
new AdaptiveLoadBalancingRouter(
SystemLoadAverageMetricsSelector.getInstance(), 0),
new ClusterRouterSettings(
totalInstances, maxInstancesPerNode, allowLocalRoutees))),
"factorialBackendRouter3");
//#router-deploy-in-code
}

View file

@ -0,0 +1,38 @@
package sample.cluster.factorial.japi;
import java.util.concurrent.TimeUnit;
import sample.cluster.transformation.japi.TransformationMessages.TransformationJob;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.OnSuccess;
import akka.util.Timeout;
import static akka.pattern.Patterns.ask;
public class FactorialFrontendMain {
public static void main(String[] args) throws Exception {
int upToN = (args.length == 0 ? 200 : Integer.valueOf(args[0]));
ActorSystem system = ActorSystem.create("ClusterSystem");
ActorRef frontend = system.actorOf(new Props(
FactorialFrontend.class), "factorialFrontend");
system.log().info("Starting up");
// wait to let cluster converge and gather metrics
Thread.sleep(10000);
system.log().info("Starting many factorials up to [{}]", upToN);
for (int i = 0; i < 1000; i++) {
for (int n = 1; n <= upToN; n++) {
frontend.tell(n, null);
}
}
}
}

View file

@ -0,0 +1,14 @@
package sample.cluster.factorial.japi;
import java.math.BigInteger;
import java.io.Serializable;
public class FactorialResult implements Serializable {
public final int n;
public final BigInteger factorial;
FactorialResult(int n, BigInteger factorial) {
this.n = n;
this.factorial = factorial;
}
}

View file

@ -0,0 +1,60 @@
package sample.cluster.factorial.japi;
//#metrics-listener
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.ClusterMetricsChanged;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.NodeMetrics;
import akka.cluster.StandardMetrics;
import akka.cluster.StandardMetrics.HeapMemory;
import akka.cluster.StandardMetrics.Cpu;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class MetricsListener extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
Cluster cluster = Cluster.get(getContext().system());
//subscribe to ClusterMetricsChanged
@Override
public void preStart() {
cluster.subscribe(getSelf(), ClusterMetricsChanged.class);
}
//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
}
@Override
public void onReceive(Object message) {
if (message instanceof ClusterMetricsChanged) {
ClusterMetricsChanged clusterMetrics = (ClusterMetricsChanged) message;
for (NodeMetrics nodeMetrics : clusterMetrics.getNodeMetrics()) {
if (nodeMetrics.address().equals(cluster.selfAddress())) {
HeapMemory heap = StandardMetrics.extractHeapMemory(nodeMetrics);
if (heap != null) {
log.info("Used heap: {} MB", ((double) heap.used()) / 1024 / 1024);
}
Cpu cpu = StandardMetrics.extractCpu(nodeMetrics);
if (cpu != null && cpu.systemLoadAverage().isDefined()) {
log.info("Load: {} ({} processors)", cpu.systemLoadAverage().get(),
cpu.processors());
}
}
}
} else if (message instanceof CurrentClusterState) {
// ignore
} else {
unhandled(message);
}
}
}
//#metrics-listener

View file

@ -1,3 +1,4 @@
# //#cluster
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
@ -21,3 +22,22 @@ akka {
auto-down = on
}
}
# //#cluster
# //#adaptive-router
akka.actor.deployment {
/factorialFrontend/factorialBackendRouter = {
router = adaptive
# metrics-selector = heap
# metrics-selector = load
# metrics-selector = cpu
metrics-selector = mix
nr-of-instances = 100
cluster {
enabled = on
routees-path = "/user/factorialBackend"
allow-local-routees = off
}
}
}
# //#adaptive-router

View file

@ -0,0 +1,151 @@
package sample.cluster.factorial
//#imports
import scala.annotation.tailrec
import scala.concurrent.Future
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.pattern.pipe
import akka.routing.FromConfig
//#imports
object FactorialFrontend {
def main(args: Array[String]): Unit = {
val upToN = if (args.isEmpty) 200 else args(0).toInt
val system = ActorSystem("ClusterSystem")
val frontend = system.actorOf(Props[FactorialFrontend], name = "factorialFrontend")
system.log.info("Starting up")
// wait to let cluster converge and gather metrics
Thread.sleep(10000)
system.log.info("Starting many factorials up to [{}]", upToN)
for (_ 1 to 1000; n 1 to upToN) {
frontend ! n
}
}
}
//#frontend
class FactorialFrontend extends Actor with ActorLogging {
val backend = context.actorOf(Props[FactorialBackend].withRouter(FromConfig),
name = "factorialBackendRouter")
def receive = {
case n: Int backend ! n
case (n: Int, factorial: BigInt)
log.info("{}! = {}", n, factorial)
}
}
//#frontend
object FactorialBackend {
def main(args: Array[String]): Unit = {
// Override the configuration of the port
// when specified as program argument
if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0))
val system = ActorSystem("ClusterSystem")
system.actorOf(Props[FactorialBackend], name = "factorialBackend")
system.actorOf(Props[MetricsListener], name = "metricsListener")
}
}
//#backend
class FactorialBackend extends Actor with ActorLogging {
import context.dispatcher
def receive = {
case (n: Int)
Future(factorial(n)) map { result (n, result) } pipeTo sender
}
def factorial(n: Int): BigInt = {
@tailrec def factorialAcc(acc: BigInt, n: Int): BigInt = {
if (n <= 1) acc
else factorialAcc(acc * n, n - 1)
}
factorialAcc(BigInt(1), n)
}
}
//#backend
//#metrics-listener
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.ClusterMetricsChanged
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.StandardMetrics.HeapMemory
import akka.cluster.StandardMetrics.Cpu
class MetricsListener extends Actor with ActorLogging {
val selfAddress = Cluster(context.system).selfAddress
// subscribe to ClusterMetricsChanged
// re-subscribe when restart
override def preStart(): Unit =
Cluster(context.system).subscribe(self, classOf[ClusterMetricsChanged])
override def postStop(): Unit =
Cluster(context.system).unsubscribe(self)
def receive = {
case ClusterMetricsChanged(nodeMetrics)
nodeMetrics.filter(_.address == selfAddress) foreach { n
n match {
case HeapMemory(address, timestamp, used, committed, max)
log.info("Used heap: {} MB", used.doubleValue / 1024 / 1024)
case _ // no heap info
}
n match {
case Cpu(address, timestamp, Some(systemLoadAverage), cpuCombined, processors)
log.info("Load: {} ({} processors)", systemLoadAverage, processors)
case _ // no cpu info
}
}
case state: CurrentClusterState // ignore
}
}
//#metrics-listener
// not used, only for documentation
abstract class FactorialFrontend2 extends Actor {
//#router-lookup-in-code
import akka.cluster.routing.ClusterRouterConfig
import akka.cluster.routing.ClusterRouterSettings
import akka.cluster.routing.AdaptiveLoadBalancingRouter
import akka.cluster.routing.HeapMetricsSelector
val backend = context.actorOf(Props[FactorialBackend].withRouter(
ClusterRouterConfig(AdaptiveLoadBalancingRouter(HeapMetricsSelector),
ClusterRouterSettings(
totalInstances = 100, routeesPath = "/user/statsWorker",
allowLocalRoutees = true))),
name = "factorialBackendRouter2")
//#router-lookup-in-code
}
// not used, only for documentation
abstract class FactorialFrontend3 extends Actor {
//#router-deploy-in-code
import akka.cluster.routing.ClusterRouterConfig
import akka.cluster.routing.ClusterRouterSettings
import akka.cluster.routing.AdaptiveLoadBalancingRouter
import akka.cluster.routing.SystemLoadAverageMetricsSelector
val backend = context.actorOf(Props[FactorialBackend].withRouter(
ClusterRouterConfig(AdaptiveLoadBalancingRouter(
SystemLoadAverageMetricsSelector), ClusterRouterSettings(
totalInstances = 100, maxInstancesPerNode = 3,
allowLocalRoutees = false))),
name = "factorialBackendRouter3")
//#router-deploy-in-code
}

View file

@ -33,6 +33,8 @@ object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
#//#router-deploy-config
akka.actor.deployment {
/statsFacade/statsService/workerRouter {

View file

@ -27,6 +27,8 @@ object StatsSampleSpecConfig extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
#//#router-lookup-config
akka.actor.deployment {
/statsService/workerRouter {

View file

@ -31,6 +31,8 @@ object StatsSampleJapiSpecConfig extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
akka.actor.deployment {
/statsService/workerRouter {
router = consistent-hashing

View file

@ -34,6 +34,8 @@ object StatsSampleSingleMasterJapiSpecConfig extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
akka.actor.deployment {
/statsFacade/statsService/workerRouter {
router = consistent-hashing

View file

@ -29,6 +29,8 @@ object TransformationSampleSpecConfig extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
"""))
}

View file

@ -30,6 +30,8 @@ object TransformationSampleJapiSpecConfig extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
"""))
}

View file

@ -327,6 +327,10 @@ object AkkaBuild extends Build {
dependencies = Seq(cluster, remoteTests % "test", testkit % "test"),
settings = sampleSettings ++ multiJvmSettings ++ experimentalSettings ++ Seq(
libraryDependencies ++= Dependencies.clusterSample,
javaOptions in run ++= Seq(
"-Djava.library.path=./sigar",
"-Xms128m", "-Xmx1024m"),
Keys.fork in run := true,
// disable parallel tests
parallelExecution in Test := false,
extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src =>
@ -673,6 +677,9 @@ object Dependencies {
// Camel Sample
val camelJetty = "org.apache.camel" % "camel-jetty" % camelCore.revision // ApacheV2
// Cluster Sample
val sigar = "org.hyperic" % "sigar" % "1.6.4" // ApacheV2
// Test
object Test {
@ -729,7 +736,7 @@ object Dependencies {
val zeroMQ = Seq(protobuf, zeroMQClient, Test.scalatest, Test.junit)
val clusterSample = Seq(Test.scalatest)
val clusterSample = Seq(Test.scalatest, sigar)
val contrib = Seq(Test.junitIntf)