Incorparate review feedback, see #2547

* case object and case class for MixMetricsSelector
* Rename decay-half-life-duration to moving-average-half-life
* Clarification of decay-half-life-duration and collect-interval
* Removed Fields, Java compatibility issue
* Adapt for-yield variables
* Comment metrics collector constructor that takes system param
* Don't copy EWMA if not needed
* LogOf2 constant 0.69315
* Don't use mapValues
* Remove RichInt conversion
* sigar version replace tag in docs
* createDeployer factory method to make it possible to override
  deployer in subclass
* Improve readability of MetricsListener (in sample)
* Better startup of factorial sample (no sleep)
* Many minor enhancements and cleanups
This commit is contained in:
Patrik Nordwall 2012-11-15 12:48:13 +01:00
parent 1914be7069
commit 5eec693fd0
22 changed files with 334 additions and 194 deletions

View file

@ -32,7 +32,7 @@ object Serialization {
private final def configToMap(path: String): Map[String, String] = {
import scala.collection.JavaConverters._
config.getConfig(path).root.unwrapped.asScala.mapValues(_.toString).toMap
config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) (k -> v.toString) }
}
}
}

View file

@ -117,7 +117,8 @@ akka {
# is on the classpath, otherwise only JMX.
collector-class = "akka.cluster.SigarMetricsCollector"
# How often metrics is sampled on a node.
# How often metrics are sampled on a node.
# Shorter interval will collect the metrics more often.
collect-interval = 3s
# How often a node publishes metrics information.
@ -125,9 +126,12 @@ akka {
# How quickly the exponential weighting of past data is decayed compared to
# new data. Set lower to increase the bias toward newer values.
# It takes about 4 half-life to drop below 10% contribution, and 7 to drop
# below 1%.
decay-half-life-duration = 12s
# The relevance of each data sample is halved for every passing half-life duration,
# i.e. after 4 times the half-life, a data samples relevance is reduced to 6% of
# its original relevance. The initial relevance of a data sample is given by
# 1 0.5 ^ (collect-interval / half-life).
# See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
moving-average-half-life = 12s
}
# If the tick-duration of the default scheduler is longer than the

View file

@ -54,7 +54,11 @@ class ClusterActorRefProvider(
remoteDeploymentWatcher = system.systemActorOf(Props[RemoteDeploymentWatcher], "RemoteDeploymentWatcher")
}
override lazy val deployer: ClusterDeployer = new ClusterDeployer(settings, dynamicAccess)
/**
* Factory method to make it possible to override deployer in subclass
* Creates a new instance every time
*/
override protected def createDeployer: ClusterDeployer = new ClusterDeployer(settings, dynamicAccess)
/**
* This method is overridden here to keep track of remote deployed actors to
@ -126,7 +130,7 @@ private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: Dynami
routerType match {
case "adaptive"
val metricsSelector = deployment.getString("metrics-selector") match {
case "mix" MixMetricsSelector()
case "mix" MixMetricsSelector
case "heap" HeapMetricsSelector
case "cpu" CpuMetricsSelector
case "load" SystemLoadAverageMetricsSelector

View file

@ -12,7 +12,6 @@ import java.lang.reflect.Method
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.runtime.{ RichLong, RichDouble, RichInt }
import scala.util.{ Try, Success, Failure }
import akka.ConfigurationException
import akka.actor.Actor
@ -183,9 +182,10 @@ private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics]) {
*/
def merge(remoteGossip: MetricsGossip): MetricsGossip = {
val remoteNodes = remoteGossip.nodes.map(n n.address -> n).toMap
val toMerge = nodeKeys intersect remoteNodes.keySet
val onlyInRemote = remoteNodes.keySet -- nodeKeys
val onlyInLocal = nodeKeys -- remoteNodes.keySet
val remoteNodesKeySet = remoteNodes.keySet
val toMerge = nodeKeys intersect remoteNodesKeySet
val onlyInRemote = remoteNodesKeySet -- nodeKeys
val onlyInLocal = nodeKeys -- remoteNodesKeySet
val seen = nodes.collect {
case n if toMerge contains n.address n merge remoteNodes(n.address)
@ -198,7 +198,7 @@ private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics]) {
}
/**
* Adds new local [[akka.cluster.NodeMetrics]] and initializes the data, or merges an existing.
* Adds new local [[akka.cluster.NodeMetrics]], or merges an existing.
*/
def :+(data: NodeMetrics): MetricsGossip = {
val previous = metricsFor(data.address)
@ -232,16 +232,24 @@ private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics]) {
private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip) extends ClusterMessage
object EWMA {
/**
* math.log(2)
*/
private val LogOf2 = 0.69315
/**
* Calculate the alpha (decay factor) used in [[akka.cluster.EWMA]]
* from specified half-life and interval between observations.
* It takes about 4 half-life to drop below 10% contribution, and 7 to drop
* below 1%.
* Half-life is the interval over which the weights decrease by a factor of two.
* The relevance of each data sample is halved for every passing half-life duration,
* i.e. after 4 times the half-life, a data samples relevance is reduced to 6% of
* its original relevance. The initial relevance of a data sample is given by
* 1 0.5 ^ (collect-interval / half-life).
*/
def alpha(halfLife: FiniteDuration, collectInterval: FiniteDuration): Double = {
val halfLifeMillis = halfLife.toMillis
require(halfLife.toMillis > 0, "halfLife must be > 0 s")
val decayRate = 0.69315 / halfLifeMillis
val decayRate = LogOf2 / halfLifeMillis
1 - math.exp(-decayRate * collectInterval.toMillis)
}
}
@ -277,7 +285,11 @@ private[cluster] case class EWMA(value: Double, alpha: Double) extends ClusterMe
* @param xn the new data point
* @return a new [[akka.cluster.EWMA]] with the updated value
*/
def :+(xn: Double): EWMA = copy(value = (alpha * xn) + (1 - alpha) * value)
def :+(xn: Double): EWMA = {
val newValue = (alpha * xn) + (1 - alpha) * value
if (newValue == value) this // no change
else copy(value = newValue)
}
}
@ -291,7 +303,7 @@ private[cluster] case class EWMA(value: Double, alpha: Double) extends ClusterMe
case class Metric private (name: String, value: Number, private val average: Option[EWMA])
extends ClusterMessage with MetricNumericConverter {
require(defined(value), "Invalid Metric [%s] value [%]".format(name, value))
require(defined(value), s"Invalid Metric [$name] value [$value]")
/**
* If defined ( [[akka.cluster.MetricNumericConverter.defined()]] ), updates the new
@ -398,16 +410,17 @@ case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] =
*/
object StandardMetrics {
object HeapMemory {
/**
* Constants for the heap related Metric names
*/
object Fields {
// Constants for the heap related Metric names
final val HeapMemoryUsed = "heap-memory-used"
final val HeapMemoryCommitted = "heap-memory-committed"
final val HeapMemoryMax = "heap-memory-max"
}
import Fields._
// Constants for the cpu related Metric names
final val SystemLoadAverage = "system-load-average"
final val Processors = "processors"
final val CpuCombined = "cpu-combined"
object HeapMemory {
/**
* Given a NodeMetrics it returns the HeapMemory data if the nodeMetrics contains
@ -418,9 +431,9 @@ object StandardMetrics {
for {
used nodeMetrics.metric(HeapMemoryUsed)
committed nodeMetrics.metric(HeapMemoryCommitted)
maxOption = nodeMetrics.metric(HeapMemoryMax).map(_.smoothValue.longValue)
} yield (nodeMetrics.address, nodeMetrics.timestamp,
used.smoothValue.longValue, committed.smoothValue.longValue, maxOption)
used.smoothValue.longValue, committed.smoothValue.longValue,
nodeMetrics.metric(HeapMemoryMax).map(_.smoothValue.longValue))
}
}
@ -431,6 +444,7 @@ object StandardMetrics {
*/
def extractHeapMemory(nodeMetrics: NodeMetrics): HeapMemory = nodeMetrics match {
case HeapMemory(address, timestamp, used, committed, max)
// note that above extractor returns tuple
HeapMemory(address, timestamp, used, committed, max)
case _ null
}
@ -454,15 +468,6 @@ object StandardMetrics {
}
object Cpu {
/**
* Constants for the cpu related Metric names
*/
object Fields {
final val SystemLoadAverage = "system-load-average"
final val Processors = "processors"
final val CpuCombined = "cpu-combined"
}
import Fields._
/**
* Given a NodeMetrics it returns the Cpu data if the nodeMetrics contains
@ -472,10 +477,9 @@ object StandardMetrics {
def unapply(nodeMetrics: NodeMetrics): Option[(Address, Long, Option[Double], Option[Double], Int)] = {
for {
processors nodeMetrics.metric(Processors)
systemLoadAverageOption = nodeMetrics.metric(SystemLoadAverage).map(_.smoothValue)
cpuCombinedOption = nodeMetrics.metric(CpuCombined).map(_.smoothValue)
} yield (nodeMetrics.address, nodeMetrics.timestamp,
systemLoadAverageOption, cpuCombinedOption, processors.value.intValue)
nodeMetrics.metric(SystemLoadAverage).map(_.smoothValue),
nodeMetrics.metric(CpuCombined).map(_.smoothValue), processors.value.intValue)
}
}
@ -486,6 +490,7 @@ object StandardMetrics {
*/
def extractCpu(nodeMetrics: NodeMetrics): Cpu = nodeMetrics match {
case Cpu(address, timestamp, systemLoadAverage, cpuCombined, processors)
// note that above extractor returns tuple
Cpu(address, timestamp, systemLoadAverage, cpuCombined, processors)
case _ null
}
@ -508,7 +513,7 @@ object StandardMetrics {
processors: Int) {
cpuCombined match {
case Some(x) require(0.0 <= x && x <= 1.0, "cpuCombined must be between [0.0 - 1.0], was [%s]" format x)
case Some(x) require(0.0 <= x && x <= 1.0, s"cpuCombined must be between [0.0 - 1.0], was [$x]")
case None
}
@ -542,12 +547,9 @@ private[cluster] trait MetricNumericConverter {
case n: Long Left(n)
case n: Double Right(n)
case n: Float Right(n)
case n: RichInt Left(n.abs)
case n: RichLong Left(n.self)
case n: RichDouble Right(n.self)
case n: BigInt Left(n.longValue)
case n: BigDecimal Right(n.doubleValue)
case x throw new IllegalArgumentException("Not a number [%s]" format x)
case x throw new IllegalArgumentException(s"Not a number [$x]")
}
}
@ -569,13 +571,15 @@ private[cluster] trait MetricsCollector extends Closeable {
* @param decay how quickly the exponential weighting of past data is decayed
*/
class JmxMetricsCollector(address: Address, decayFactor: Double) extends MetricsCollector {
import StandardMetrics.HeapMemory.Fields._
import StandardMetrics.Cpu.Fields._
import StandardMetrics._
private def this(cluster: Cluster) =
this(cluster.selfAddress,
EWMA.alpha(cluster.settings.MetricsDecayHalfLifeDuration, cluster.settings.MetricsInterval))
EWMA.alpha(cluster.settings.MetricsMovingAverageHalfLife, cluster.settings.MetricsInterval))
/**
* This constructor is used when creating an instance from configured FQCN
*/
def this(system: ActorSystem) = this(Cluster(system))
private val decayFactorOption = Some(decayFactor)
@ -586,6 +590,7 @@ class JmxMetricsCollector(address: Address, decayFactor: Double) extends Metrics
/**
* Samples and collects new data points.
* Creates a new instance each time.
*/
def sample: NodeMetrics = NodeMetrics(address, newTimestamp, metrics)
@ -598,6 +603,7 @@ class JmxMetricsCollector(address: Address, decayFactor: Double) extends Metrics
* JMX Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
* On some systems the JMX OS system load average may not be available, in which case a -1 is
* returned from JMX, and None is returned from this method.
* Creates a new instance each time.
*/
def systemLoadAverage: Option[Metric] = Metric.create(
name = SystemLoadAverage,
@ -606,6 +612,7 @@ class JmxMetricsCollector(address: Address, decayFactor: Double) extends Metrics
/**
* (JMX) Returns the number of available processors
* Creates a new instance each time.
*/
def processors: Option[Metric] = Metric.create(
name = Processors,
@ -619,6 +626,7 @@ class JmxMetricsCollector(address: Address, decayFactor: Double) extends Metrics
/**
* (JMX) Returns the current sum of heap memory used from all heap memory pools (in bytes).
* Creates a new instance each time.
*/
def heapUsed(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryUsed,
@ -628,6 +636,7 @@ class JmxMetricsCollector(address: Address, decayFactor: Double) extends Metrics
/**
* (JMX) Returns the current sum of heap memory guaranteed to be available to the JVM
* from all heap memory pools (in bytes).
* Creates a new instance each time.
*/
def heapCommitted(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryCommitted,
@ -638,6 +647,7 @@ class JmxMetricsCollector(address: Address, decayFactor: Double) extends Metrics
* (JMX) Returns the maximum amount of memory (in bytes) that can be used
* for JVM memory management. If not defined the metrics value is None, i.e.
* never negative.
* Creates a new instance each time.
*/
def heapMax(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryMax,
@ -665,14 +675,16 @@ class JmxMetricsCollector(address: Address, decayFactor: Double) extends Metrics
class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: AnyRef)
extends JmxMetricsCollector(address, decayFactor) {
import StandardMetrics.HeapMemory.Fields._
import StandardMetrics.Cpu.Fields._
import StandardMetrics._
private def this(cluster: Cluster) =
this(cluster.selfAddress,
EWMA.alpha(cluster.settings.MetricsDecayHalfLifeDuration, cluster.settings.MetricsInterval),
EWMA.alpha(cluster.settings.MetricsMovingAverageHalfLife, cluster.settings.MetricsInterval),
cluster.system.dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Nil).get)
/**
* This constructor is used when creating an instance from configured FQCN
*/
def this(system: ActorSystem) = this(Cluster(system))
private val decayFactorOption = Some(decayFactor)
@ -689,7 +701,7 @@ class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: AnyRef
try method.invoke(sigar).asInstanceOf[Long] catch {
case e: InvocationTargetException if e.getCause.isInstanceOf[LinkageError]
// native libraries not in place
// don't throw fatal LinkageError, but something less harmless
// don't throw fatal LinkageError, but something harmless
throw new IllegalArgumentException(e.getCause.toString)
case e: InvocationTargetException throw e.getCause
}
@ -705,6 +717,7 @@ class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: AnyRef
* On some systems the JMX OS system load average may not be available, in which case a -1 is returned
* from JMX, which means that None is returned from this method.
* Hyperic SIGAR provides more precise values, thus, if the library is on the classpath, it is the default.
* Creates a new instance each time.
*/
override def systemLoadAverage: Option[Metric] = Metric.create(
name = SystemLoadAverage,
@ -718,6 +731,8 @@ class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: AnyRef
*
* In the data stream, this will sometimes return with a valid metric value, and sometimes as a NaN or Infinite.
* Documented bug https://bugzilla.redhat.com/show_bug.cgi?id=749121 and several others.
*
* Creates a new instance each time.
*/
def cpuCombined: Option[Metric] = Metric.create(
name = CpuCombined,
@ -756,10 +771,10 @@ private[cluster] object MetricsCollector {
}
} else {
system.dynamicAccess.createInstanceFor[MetricsCollector](
fqcn, List(classOf[ActorSystem] -> system)).recover({
system.dynamicAccess.createInstanceFor[MetricsCollector](fqcn, List(classOf[ActorSystem] -> system)).
recover {
case e throw new ConfigurationException("Could not create custom metrics collector [" + fqcn + "] due to:" + e.toString)
}).get
}.get
}
}
}

View file

@ -77,9 +77,9 @@ class ClusterSettings(val config: Config, val systemName: String) {
require(d > Duration.Zero, "metrics.collect-interval must be > 0"); d
}
final val MetricsGossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.gossip-interval"), MILLISECONDS)
final val MetricsDecayHalfLifeDuration: FiniteDuration = {
val d = Duration(getMilliseconds("akka.cluster.metrics.decay-half-life-duration"), MILLISECONDS)
require(d > Duration.Zero, "metrics.decay-half-life-duration must be > 0"); d
final val MetricsMovingAverageHalfLife: FiniteDuration = {
val d = Duration(getMilliseconds("akka.cluster.metrics.moving-average-half-life"), MILLISECONDS)
require(d > Duration.Zero, "metrics.moving-average-half-life must be > 0"); d
}
}

View file

@ -33,13 +33,13 @@ import akka.routing.RouteeProvider
import akka.routing.RouterConfig
object AdaptiveLoadBalancingRouter {
private val defaultSupervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
private val escalateStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ SupervisorStrategy.Escalate
}
}
/**
* A Router that performs load balancing to cluster nodes based on
* A Router that performs load balancing of messages to cluster nodes based on
* cluster metric data.
*
* It uses random selection of routees based probabilities derived from
@ -71,11 +71,11 @@ object AdaptiveLoadBalancingRouter {
*/
@SerialVersionUID(1L)
case class AdaptiveLoadBalancingRouter(
metricsSelector: MetricsSelector = MixMetricsSelector(),
metricsSelector: MetricsSelector = MixMetricsSelector,
nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = AdaptiveLoadBalancingRouter.defaultSupervisorStrategy)
val supervisorStrategy: SupervisorStrategy = AdaptiveLoadBalancingRouter.escalateStrategy)
extends RouterConfig with AdaptiveLoadBalancingRouterLike {
/**
@ -137,7 +137,7 @@ case class AdaptiveLoadBalancingRouter(
/**
* INTERNAL API.
*
* This strategy is a metrics-aware router which performs load balancing of
* This strategy is a metrics-aware router which performs load balancing of messages to
* cluster nodes based on cluster metric data. It consumes [[akka.cluster.ClusterMetricsChanged]]
* events and the [[akka.cluster.routing.MetricsSelector]] creates an mix of
* weighted routees based on the node metrics. Messages are routed randomly to the
@ -181,7 +181,7 @@ trait AdaptiveLoadBalancingRouterLike { this: RouterConfig ⇒
}
def receiveMetrics(metrics: Set[NodeMetrics]): Unit = {
// update the state outside of the actor, not a recommended practice, but works fine here
// this is the only place from where weightedRoutees is updated
weightedRoutees = Some(new WeightedRoutees(routeeProvider.routees, cluster.selfAddress,
metricsSelector.weights(metrics)))
}
@ -275,6 +275,20 @@ case object SystemLoadAverageMetricsSelector extends CapacityMetricsSelector {
}
}
/**
* Singleton instance of the default MixMetricsSelector, which uses [akka.cluster.routing.HeapMetricsSelector],
* [akka.cluster.routing.CpuMetricsSelector], and [akka.cluster.routing.SystemLoadAverageMetricsSelector]
*/
@SerialVersionUID(1L)
object MixMetricsSelector extends MixMetricsSelectorBase(
Vector(HeapMetricsSelector, CpuMetricsSelector, SystemLoadAverageMetricsSelector)) {
/**
* Java API: get the default singleton instance
*/
def getInstance = this
}
/**
* MetricsSelector that combines other selectors and aggregates their capacity
* values. By default it uses [akka.cluster.routing.HeapMetricsSelector],
@ -282,8 +296,14 @@ case object SystemLoadAverageMetricsSelector extends CapacityMetricsSelector {
*/
@SerialVersionUID(1L)
case class MixMetricsSelector(
selectors: immutable.IndexedSeq[CapacityMetricsSelector] = Vector(
HeapMetricsSelector, CpuMetricsSelector, SystemLoadAverageMetricsSelector))
selectors: immutable.IndexedSeq[CapacityMetricsSelector])
extends MixMetricsSelectorBase(selectors)
/**
* Base class for MetricsSelector that combines other selectors and aggregates their capacity.
*/
@SerialVersionUID(1L)
abstract class MixMetricsSelectorBase(selectors: immutable.IndexedSeq[CapacityMetricsSelector])
extends CapacityMetricsSelector {
/**
@ -298,20 +318,13 @@ case class MixMetricsSelector(
case (acc, (address, capacity))
val (sum, count) = acc(address)
acc + (address -> (sum + capacity, count + 1))
}.mapValues {
case (sum, count) sum / count
}.toMap
}.map {
case (addr, (sum, count)) (addr -> sum / count)
}
}
}
case object MixMetricsSelector {
/**
* Java API: get the default singleton instance
*/
def getInstance = MixMetricsSelector()
}
/**
* A MetricsSelector is responsible for producing weights from the node metrics.
*/
@ -349,7 +362,7 @@ abstract class CapacityMetricsSelector extends MetricsSelector {
val (_, min) = capacity.minBy { case (_, c) c }
// lowest usable capacity is 1% (>= 0.5% will be rounded to weight 1), also avoids div by zero
val divisor = math.max(0.01, min)
capacity mapValues { c math.round((c) / divisor).toInt }
capacity map { case (addr, c) (addr -> math.round((c) / divisor).toInt) }
}
}
@ -379,12 +392,13 @@ private[cluster] class WeightedRoutees(refs: immutable.IndexedSeq[ActorRef], sel
}
val buckets = Array.ofDim[Int](refs.size)
val meanWeight = if (weights.isEmpty) 1 else weights.values.sum / weights.size
val w = weights.withDefaultValue(meanWeight)
val w = weights.withDefaultValue(meanWeight) // we dont necessarily have metrics for all addresses
var i = 0
var sum = 0
refs.zipWithIndex foreach {
case (ref, i)
refs foreach { ref
sum += w(fullAddress(ref))
buckets(i) = sum
i += 1
}
buckets
}
@ -397,9 +411,15 @@ private[cluster] class WeightedRoutees(refs: immutable.IndexedSeq[ActorRef], sel
* Pick the routee matching a value, from 1 to total.
*/
def apply(value: Int): ActorRef = {
// converts the result of Arrays.binarySearch into a index in the buckets array
// see documentation of Arrays.binarySearch for what it returns
def idx(i: Int): Int = {
require(1 <= value && value <= total, "value must be between [1 - %s]" format total)
refs(idx(Arrays.binarySearch(buckets, value)))
}
/**
* Converts the result of Arrays.binarySearch into a index in the buckets array
* see documentation of Arrays.binarySearch for what it returns
*/
private def idx(i: Int): Int = {
if (i >= 0) i // exact match
else {
val j = math.abs(i + 1)
@ -408,9 +428,4 @@ private[cluster] class WeightedRoutees(refs: immutable.IndexedSeq[ActorRef], sel
else j
}
}
require(1 <= value && value <= total, "value must be between [1 - %s]" format total)
refs(idx(Arrays.binarySearch(buckets, value)))
}
}

View file

@ -131,10 +131,10 @@ abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoa
val router1 = startRouter("router1")
// collect some metrics before we start
Thread.sleep(10000)
Thread.sleep(cluster.settings.MetricsInterval.toMillis * 10)
val iterationCount = 100
for (i 0 until iterationCount) {
1 to iterationCount foreach { _
router1 ! "hit"
// wait a while between each message, since metrics is collected periodically
Thread.sleep(10)

View file

@ -50,7 +50,7 @@ class ClusterConfigSpec extends AkkaSpec {
MetricsCollectorClass must be(classOf[SigarMetricsCollector].getName)
MetricsInterval must be(3 seconds)
MetricsGossipInterval must be(3 seconds)
MetricsDecayHalfLifeDuration must be(12 seconds)
MetricsMovingAverageHalfLife must be(12 seconds)
}
}
}

View file

@ -94,7 +94,7 @@ class EWMASpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsCollector
} else None
}
}
streamingDataSet ++= changes.map(m m.name -> m).toMap
streamingDataSet ++= changes.map(m m.name -> m)
}
}
}

View file

@ -5,8 +5,8 @@
package akka.cluster
import akka.testkit.{ ImplicitSender, AkkaSpec }
import akka.cluster.StandardMetrics.HeapMemory.Fields._
import scala.util.Try
import akka.cluster.StandardMetrics._
import scala.util.Failure
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricNumericConverter with ImplicitSender
@ -33,7 +33,7 @@ class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) wit
"define an undefined value with a None " in {
Metric.create("x", -1, None).isDefined must be(false)
Metric.create("x", java.lang.Double.NaN, None).isDefined must be(false)
Metric.create("x", Try(throw new RuntimeException), None).isDefined must be(false)
Metric.create("x", Failure(new RuntimeException), None).isDefined must be(false)
}
"recognize whether a metric value is defined" in {

View file

@ -7,8 +7,7 @@ package akka.cluster
import scala.util.Try
import akka.actor.Address
import akka.testkit.AkkaSpec
import akka.cluster.StandardMetrics.HeapMemory
import akka.cluster.StandardMetrics.Cpu
import akka.cluster.StandardMetrics._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricValuesSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsCollectorFactory {
@ -19,21 +18,17 @@ class MetricValuesSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsC
val node2 = NodeMetrics(Address("akka", "sys", "a", 2555), 1, collector.sample.metrics)
val nodes: Seq[NodeMetrics] = {
var nodes = Seq(node1, node2)
// work up the data streams where applicable
for (i 1 to 100) {
nodes = nodes map { n
(1 to 100).foldLeft(List(node1, node2)) { (nodes, _)
nodes map { n
n.copy(metrics = collector.sample.metrics.flatMap(latest n.metrics.collect {
case streaming if latest sameAs streaming streaming :+ latest
}))
}
}
nodes
}
"NodeMetrics.MetricValues" must {
"extract expected metrics for load balancing" in {
import HeapMemory.Fields._
val stream1 = node2.metric(HeapMemoryCommitted).get.value.longValue
val stream2 = node1.metric(HeapMemoryUsed).get.value.longValue
stream1 must be >= (stream2)
@ -53,7 +48,6 @@ class MetricValuesSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsC
committed must be > (0L)
// extract is the java api
StandardMetrics.extractCpu(node) must not be (null)
case _ fail("no heap")
}
node match {
@ -67,7 +61,6 @@ class MetricValuesSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsC
}
// extract is the java api
StandardMetrics.extractCpu(node) must not be (null)
case _ fail("no cpu")
}
}
}

View file

@ -14,8 +14,7 @@ import scala.util.{ Success, Try, Failure }
import akka.actor._
import akka.testkit._
import akka.cluster.StandardMetrics.HeapMemory.Fields._
import akka.cluster.StandardMetrics.Cpu.Fields._
import akka.cluster.StandardMetrics._
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
@ -41,23 +40,21 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl
val sample1 = collector.sample.metrics
val sample2 = collector.sample.metrics
val merged12 = sample2 flatMap (latest sample1 collect {
case peer if latest sameAs peer {
case peer if latest sameAs peer
val m = peer :+ latest
m.value must be(latest.value)
m.isSmooth must be(peer.isSmooth || latest.isSmooth)
m
}
})
val sample3 = collector.sample.metrics
val sample4 = collector.sample.metrics
val merged34 = sample4 flatMap (latest sample3 collect {
case peer if latest sameAs peer {
case peer if latest sameAs peer
val m = peer :+ latest
m.value must be(latest.value)
m.isSmooth must be(peer.isSmooth || latest.isSmooth)
m
}
})
}
}
@ -124,12 +121,12 @@ trait MetricsCollectorFactory { this: AkkaSpec ⇒
def createMetricsCollector: MetricsCollector =
Try(new SigarMetricsCollector(selfAddress, defaultDecayFactor,
extendedActorSystem.dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Nil).get)) match {
case Success(sigarCollector) sigarCollector
case Failure(e)
extendedActorSystem.dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Nil))).
recover {
case e
log.debug("Metrics will be retreived from MBeans, Sigar failed to load. Reason: " + e)
new JmxMetricsCollector(selfAddress, defaultDecayFactor)
}
}.get
def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]
}

View file

@ -10,8 +10,7 @@ import org.scalatest.matchers.MustMatchers
import akka.actor.Address
import akka.cluster.Metric
import akka.cluster.NodeMetrics
import akka.cluster.StandardMetrics.HeapMemory.Fields._
import akka.cluster.StandardMetrics.Cpu.Fields._
import akka.cluster.StandardMetrics._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricsSelectorSpec extends WordSpec with MustMatchers {
@ -107,7 +106,7 @@ class MetricsSelectorSpec extends WordSpec with MustMatchers {
"MixMetricsSelector" must {
"aggregate capacity of all metrics" in {
val capacity = MixMetricsSelector().capacity(nodeMetrics)
val capacity = MixMetricsSelector.capacity(nodeMetrics)
capacity(a1) must be((0.75 + 0.9 + 0.9375) / 3 plusOrMinus 0.0001)
capacity(b1) must be((0.75 + 0.5 + 0.9375) / 3 plusOrMinus 0.0001)
capacity(c1) must be((0.0 + 0.0 + 0.0) / 3 plusOrMinus 0.0001)

View file

@ -440,8 +440,8 @@ service nodes and 1 client::
Cluster Metrics
^^^^^^^^^^^^^^^
The member nodes of the cluster collects system health metrics and publish that to other nodes and to
registered subscribers. This information is primarily used for load-balancing of nodes.
The member nodes of the cluster collects system health metrics and publishes that to other nodes and to
registered subscribers. This information is primarily used for load-balancing routers.
Hyperic Sigar
-------------
@ -454,7 +454,7 @@ Sigar is using a native OS library. To enable usage of Sigar you need to add the
<dependency>
<groupId>org.hyperic</groupId>
<artifactId>sigar</artifactId>
<version>1.6.4</version>
<version>@sigarVersion@</version>
</dependency>
@ -522,6 +522,8 @@ one frontend::
mvn exec:java \
-Dexec.mainClass="sample.cluster.factorial.FactorialFrontendMain"
Press ctrl-c in the terminal window of the frontend to stop the factorial calculations.
Subscribe to Metrics Events
---------------------------

View file

@ -401,8 +401,8 @@ service nodes and 1 client::
Cluster Metrics
^^^^^^^^^^^^^^^
The member nodes of the cluster collects system health metrics and publish that to other nodes and to
registered subscribers. This information is primarily used for load-balancing of nodes.
The member nodes of the cluster collects system health metrics and publishes that to other nodes and to
registered subscribers. This information is primarily used for load-balancing routers.
Hyperic Sigar
-------------
@ -412,7 +412,7 @@ for a wider and more accurate range of metrics compared to what can be retrieved
Sigar is using a native OS library. To enable usage of Sigar you need to add the directory of the native library to
``-Djava.libarary.path=<path_of_sigar_libs>`` add the following dependency::
"org.hyperic" % "sigar" % "1.6.4"
"org.hyperic" % "sigar" % "@sigarVersion@"
Adaptive Load Balancing
@ -473,6 +473,7 @@ and you can try by starting nodes in different terminal windows. For example, st
run-main sample.cluster.factorial.FactorialFrontend
Press ctrl-c in the terminal window of the frontend to stop the factorial calculations.
Subscribe to Metrics Events
---------------------------

View file

@ -24,8 +24,13 @@ class RemoteActorRefProvider(
val remoteSettings: RemoteSettings = new RemoteSettings(settings.config, systemName)
// this is lazy to be able to override it in subclass
lazy val deployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess)
override val deployer: Deployer = createDeployer
/**
* Factory method to make it possible to override deployer in subclass
* Creates a new instance every time
*/
protected def createDeployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess)
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, dynamicAccess, deployer)

View file

@ -14,6 +14,8 @@ import akka.cluster.routing.SystemLoadAverageMetricsSelector;
//#frontend
public class FactorialFrontend extends UntypedActor {
final int upToN;
final boolean repeat;
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@ -21,21 +23,37 @@ public class FactorialFrontend extends UntypedActor {
new Props(FactorialBackend.class).withRouter(FromConfig.getInstance()),
"factorialBackendRouter");
public FactorialFrontend(int upToN, boolean repeat) {
this.upToN = upToN;
this.repeat = repeat;
}
@Override
public void preStart() {
sendJobs();
}
@Override
public void onReceive(Object message) {
if (message instanceof Integer) {
Integer n = (Integer) message;
backend.tell(n, getSelf());
} else if (message instanceof FactorialResult) {
if (message instanceof FactorialResult) {
FactorialResult result = (FactorialResult) message;
log.info("{}! = {}", result.n, result.factorial);
if (result.n == upToN) {
log.debug("{}! = {}", result.n, result.factorial);
if (repeat) sendJobs();
}
} else {
unhandled(message);
}
}
void sendJobs() {
log.info("Starting batch of factorials up to [{}]", upToN);
for (int n = 1; n <= upToN; n++) {
backend.tell(n, getSelf());
}
}
}
//#frontend

View file

@ -1,37 +1,26 @@
package sample.cluster.factorial.japi;
import java.util.concurrent.TimeUnit;
import sample.cluster.transformation.japi.TransformationMessages.TransformationJob;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.OnSuccess;
import akka.util.Timeout;
import static akka.pattern.Patterns.ask;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
public class FactorialFrontendMain {
public static void main(String[] args) throws Exception {
int upToN = (args.length == 0 ? 200 : Integer.valueOf(args[0]));
final int upToN = (args.length == 0 ? 200 : Integer.valueOf(args[0]));
ActorSystem system = ActorSystem.create("ClusterSystem");
ActorRef frontend = system.actorOf(new Props(
FactorialFrontend.class), "factorialFrontend");
system.log().info("Starting up");
// wait to let cluster converge and gather metrics
Thread.sleep(10000);
system.log().info("Starting many factorials up to [{}]", upToN);
for (int i = 0; i < 1000; i++) {
for (int n = 1; n <= upToN; n++) {
frontend.tell(n, null);
}
// start the calculations when there is at least 2 other members
system.actorOf(new Props(new UntypedActorFactory() {
@Override
public UntypedActor create() {
return new StartupFrontend(upToN);
}
}), "startup");
}

View file

@ -36,15 +36,8 @@ public class MetricsListener extends UntypedActor {
ClusterMetricsChanged clusterMetrics = (ClusterMetricsChanged) message;
for (NodeMetrics nodeMetrics : clusterMetrics.getNodeMetrics()) {
if (nodeMetrics.address().equals(cluster.selfAddress())) {
HeapMemory heap = StandardMetrics.extractHeapMemory(nodeMetrics);
if (heap != null) {
log.info("Used heap: {} MB", ((double) heap.used()) / 1024 / 1024);
}
Cpu cpu = StandardMetrics.extractCpu(nodeMetrics);
if (cpu != null && cpu.systemLoadAverage().isDefined()) {
log.info("Load: {} ({} processors)", cpu.systemLoadAverage().get(),
cpu.processors());
}
logHeap(nodeMetrics);
logCpu(nodeMetrics);
}
}
@ -56,5 +49,20 @@ public class MetricsListener extends UntypedActor {
}
}
void logHeap(NodeMetrics nodeMetrics) {
HeapMemory heap = StandardMetrics.extractHeapMemory(nodeMetrics);
if (heap != null) {
log.info("Used heap: {} MB", ((double) heap.used()) / 1024 / 1024);
}
}
void logCpu(NodeMetrics nodeMetrics) {
Cpu cpu = StandardMetrics.extractCpu(nodeMetrics);
if (cpu != null && cpu.systemLoadAverage().isDefined()) {
log.info("Load: {} ({} processors)", cpu.systemLoadAverage().get(),
cpu.processors());
}
}
}
//#metrics-listener

View file

@ -0,0 +1,56 @@
package sample.cluster.factorial.japi;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberUp;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class StartupFrontend extends UntypedActor {
final int upToN;
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
int memberCount = 0;
public StartupFrontend(int upToN) {
this.upToN = upToN;
}
//subscribe to ClusterMetricsChanged
@Override
public void preStart() {
log.info("Factorials will start when 3 members in the cluster.");
Cluster.get(getContext().system()).subscribe(getSelf(), MemberUp.class);
}
@Override
public void onReceive(Object message) {
if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
memberCount = state.members().size();
runWhenReady();
} else if (message instanceof MemberUp) {
memberCount++;
runWhenReady();
} else {
unhandled(message);
}
}
void runWhenReady() {
if (memberCount >= 3) {
getContext().system().actorOf(new Props(new UntypedActorFactory() {
@Override
public UntypedActor create() {
return new FactorialFrontend(upToN, true);
}
}), "factorialFrontend");
getContext().stop(getSelf());
}
}
}

View file

@ -13,34 +13,62 @@ import akka.routing.FromConfig
//#imports
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp
object FactorialFrontend {
def main(args: Array[String]): Unit = {
val upToN = if (args.isEmpty) 200 else args(0).toInt
val system = ActorSystem("ClusterSystem")
val frontend = system.actorOf(Props[FactorialFrontend], name = "factorialFrontend")
system.log.info("Starting up")
// wait to let cluster converge and gather metrics
Thread.sleep(10000)
// start the calculations when there is at least 2 other members
system.actorOf(Props(new Actor with ActorLogging {
var memberCount = 0
system.log.info("Starting many factorials up to [{}]", upToN)
for (_ 1 to 1000; n 1 to upToN) {
frontend ! n
log.info("Factorials will start when 3 members in the cluster.")
Cluster(context.system).subscribe(self, classOf[MemberUp])
def receive = {
case state: CurrentClusterState
memberCount = state.members.size
runWhenReady()
case MemberUp(member)
memberCount += 1
runWhenReady()
}
def runWhenReady(): Unit = if (memberCount >= 3) {
context.system.actorOf(Props(new FactorialFrontend(upToN, repeat = true)),
name = "factorialFrontend")
context stop self
}
}), name = "startup")
}
}
//#frontend
class FactorialFrontend extends Actor with ActorLogging {
class FactorialFrontend(upToN: Int, repeat: Boolean) extends Actor with ActorLogging {
val backend = context.actorOf(Props[FactorialBackend].withRouter(FromConfig),
name = "factorialBackendRouter")
override def preStart(): Unit = sendJobs()
def receive = {
case n: Int backend ! n
case (n: Int, factorial: BigInt)
log.info("{}! = {}", n, factorial)
if (n == upToN) {
log.debug("{}! = {}", n, factorial)
if (repeat) sendJobs()
}
}
def sendJobs(): Unit = {
log.info("Starting batch of factorials up to [{}]", upToN)
1 to upToN foreach { backend ! _ }
}
}
//#frontend
@ -83,6 +111,7 @@ class FactorialBackend extends Actor with ActorLogging {
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.ClusterMetricsChanged
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.NodeMetrics
import akka.cluster.StandardMetrics.HeapMemory
import akka.cluster.StandardMetrics.Cpu
@ -97,21 +126,25 @@ class MetricsListener extends Actor with ActorLogging {
Cluster(context.system).unsubscribe(self)
def receive = {
case ClusterMetricsChanged(nodeMetrics)
nodeMetrics.filter(_.address == selfAddress) foreach { n
n match {
case ClusterMetricsChanged(clusterMetrics)
clusterMetrics.filter(_.address == selfAddress) foreach { nodeMetrics
logHeap(nodeMetrics)
logCpu(nodeMetrics)
}
case state: CurrentClusterState // ignore
}
def logHeap(nodeMetrics: NodeMetrics): Unit = nodeMetrics match {
case HeapMemory(address, timestamp, used, committed, max)
log.info("Used heap: {} MB", used.doubleValue / 1024 / 1024)
case _ // no heap info
}
n match {
def logCpu(nodeMetrics: NodeMetrics): Unit = nodeMetrics match {
case Cpu(address, timestamp, Some(systemLoadAverage), cpuCombined, processors)
log.info("Load: {} ({} processors)", systemLoadAverage, processors)
case _ // no cpu info
}
}
case state: CurrentClusterState // ignore
}
}
//#metrics-listener

View file

@ -553,6 +553,7 @@ object AkkaBuild extends Build {
case BinVer(bv) => bv
case _ => s
}),
"sigarVersion" -> Dependencies.Compile.sigar.revision,
"github" -> "http://github.com/akka/akka/tree/%s".format((if (isSnapshot) "master" else "v" + v))
)
},