Merge pull request #858 from akka/wip-2547-metrics-router-patriknw

AdaptiveLoadBalancingRouter and refactoring of metrics, see #2547
This commit is contained in:
Patrik Nordwall 2012-11-30 23:37:30 -08:00
commit 4761feb071
73 changed files with 2576 additions and 700 deletions

View file

@ -139,16 +139,24 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce
}
def parseConfig(key: String, config: Config): Option[Deploy] = {
val deployment = config.withFallback(default)
val router = createRouterConfig(deployment.getString("router"), key, config, deployment)
Some(Deploy(key, deployment, router, NoScopeGiven))
}
/**
* Factory method for creating `RouterConfig`
* @param routerType the configured name of the router, or FQCN
* @param key the full configuration key of the deployment section
* @param config the user defined config of the deployment, without defaults
* @param deployment the deployment config, with defaults
*/
protected def createRouterConfig(routerType: String, key: String, config: Config, deployment: Config): RouterConfig = {
val routees = immutableSeq(deployment.getStringList("routees.paths"))
val nrOfInstances = deployment.getInt("nr-of-instances")
val resizer = if (config.hasPath("resizer")) Some(DefaultResizer(deployment.getConfig("resizer"))) else None
val resizer: Option[Resizer] = if (config.hasPath("resizer")) Some(DefaultResizer(deployment.getConfig("resizer"))) else None
val router: RouterConfig = deployment.getString("router") match {
routerType match {
case "from-code" NoRouter
case "round-robin" RoundRobinRouter(nrOfInstances, routees, resizer)
case "random" RandomRouter(nrOfInstances, routees, resizer)
@ -170,7 +178,6 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce
.format(fqn, key), exception)
}).get
}
Some(Deploy(key, deployment, router, NoScopeGiven))
}
}

View file

@ -32,7 +32,7 @@ object Serialization {
private final def configToMap(path: String): Map[String, String] = {
import scala.collection.JavaConverters._
config.getConfig(path).root.unwrapped.asScala.mapValues(_.toString).toMap
config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) (k -> v.toString) }
}
}
}

View file

@ -70,7 +70,7 @@ akka {
failure-detector {
# FQCN of the failure detector implementation.
# It must implement akka.cluster.akka.cluster and
# It must implement akka.cluster.FailureDetector and
# have constructor with akka.actor.ActorSystem and
# akka.cluster.ClusterSettings parameters
implementation-class = "akka.cluster.AccrualFailureDetector"
@ -106,22 +106,32 @@ akka {
max-sample-size = 1000
}
# Uses JMX and Hyperic SIGAR, if SIGAR is on the classpath.
metrics {
# Enable or disable metrics collector for load-balancing nodes.
enabled = on
# How often metrics is sampled on a node.
metrics-interval = 3s
# FQCN of the metrics collector implementation.
# It must implement akka.cluster.cluster.MetricsCollector and
# have constructor with akka.actor.ActorSystem parameter.
# The default SigarMetricsCollector uses JMX and Hyperic SIGAR, if SIGAR
# is on the classpath, otherwise only JMX.
collector-class = "akka.cluster.SigarMetricsCollector"
# How often metrics are sampled on a node.
# Shorter interval will collect the metrics more often.
collect-interval = 3s
# How often a node publishes metrics information.
gossip-interval = 3s
# How quickly the exponential weighting of past data is decayed compared to
# new data.
# If set to 0 data streaming over time will be turned off.
# Set higher to increase the bias toward newer values
rate-of-decay = 10
# new data. Set lower to increase the bias toward newer values.
# The relevance of each data sample is halved for every passing half-life duration,
# i.e. after 4 times the half-life, a data samples relevance is reduced to 6% of
# its original relevance. The initial relevance of a data sample is given by
# 1 0.5 ^ (collect-interval / half-life).
# See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
moving-average-half-life = 12s
}
# If the tick-duration of the default scheduler is longer than the
@ -143,6 +153,16 @@ akka {
}
# Default configuration for routers
actor.deployment.default {
# MetricsSelector to use
# - available: "mix", "heap", "cpu", "load"
# - or: Fully qualified class name of the MetricsSelector class.
# The class must extend akka.cluster.routing.MetricsSelector
# and have a constructor with com.typesafe.config.Config
# parameter.
# - default is "mix"
metrics-selector = mix
}
actor.deployment.default.cluster {
# enable cluster aware router that deploys to nodes in the cluster
enabled = off
@ -169,4 +189,5 @@ akka {
routees-path = ""
}
}

View file

@ -6,7 +6,7 @@ package akka.cluster
import akka.actor.{ ActorSystem, Address, ExtendedActorSystem }
import akka.event.Logging
import scala.collection.immutable.Map
import scala.collection.immutable
import scala.annotation.tailrec
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.TimeUnit.NANOSECONDS
@ -233,7 +233,7 @@ private[cluster] object HeartbeatHistory {
*/
def apply(maxSampleSize: Int): HeartbeatHistory = HeartbeatHistory(
maxSampleSize = maxSampleSize,
intervals = IndexedSeq.empty,
intervals = immutable.IndexedSeq.empty,
intervalSum = 0L,
squaredIntervalSum = 0L)
@ -248,7 +248,7 @@ private[cluster] object HeartbeatHistory {
*/
private[cluster] case class HeartbeatHistory private (
maxSampleSize: Int,
intervals: IndexedSeq[Long],
intervals: immutable.IndexedSeq[Long],
intervalSum: Long,
squaredIntervalSum: Long) {

View file

@ -17,7 +17,7 @@ import akka.util._
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.annotation.tailrec
import scala.collection.immutable.SortedSet
import scala.collection.immutable
import java.io.Closeable
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
@ -241,7 +241,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
* in config. Especially useful from tests when Addresses are unknown
* before startup time.
*/
private[cluster] def joinSeedNodes(seedNodes: IndexedSeq[Address]): Unit =
private[cluster] def joinSeedNodes(seedNodes: immutable.IndexedSeq[Address]): Unit =
clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes)
/**

View file

@ -18,13 +18,22 @@ import akka.actor.Props
import akka.actor.Scheduler
import akka.actor.Scope
import akka.actor.Terminated
import akka.cluster.routing.ClusterRouterConfig
import akka.cluster.routing.ClusterRouterSettings
import akka.dispatch.ChildTerminated
import akka.event.EventStream
import akka.japi.Util.immutableSeq
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteDeployer
import akka.remote.routing.RemoteRouterConfig
import akka.routing.RouterConfig
import akka.routing.DefaultResizer
import akka.cluster.routing.ClusterRouterConfig
import akka.cluster.routing.ClusterRouterSettings
import akka.cluster.routing.AdaptiveLoadBalancingRouter
import akka.cluster.routing.MixMetricsSelector
import akka.cluster.routing.HeapMetricsSelector
import akka.cluster.routing.SystemLoadAverageMetricsSelector
import akka.cluster.routing.CpuMetricsSelector
import akka.cluster.routing.MetricsSelector
/**
* INTERNAL API
@ -45,7 +54,11 @@ class ClusterActorRefProvider(
remoteDeploymentWatcher = system.systemActorOf(Props[RemoteDeploymentWatcher], "RemoteDeploymentWatcher")
}
override val deployer: ClusterDeployer = new ClusterDeployer(settings, dynamicAccess)
/**
* Factory method to make it possible to override deployer in subclass
* Creates a new instance every time
*/
override protected def createDeployer: ClusterDeployer = new ClusterDeployer(settings, dynamicAccess)
/**
* This method is overridden here to keep track of remote deployed actors to
@ -108,6 +121,36 @@ private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: Dynami
case None None
}
}
override protected def createRouterConfig(routerType: String, key: String, config: Config, deployment: Config): RouterConfig = {
val routees = immutableSeq(deployment.getStringList("routees.paths"))
val nrOfInstances = deployment.getInt("nr-of-instances")
val resizer = if (config.hasPath("resizer")) Some(DefaultResizer(deployment.getConfig("resizer"))) else None
routerType match {
case "adaptive"
val metricsSelector = deployment.getString("metrics-selector") match {
case "mix" MixMetricsSelector
case "heap" HeapMetricsSelector
case "cpu" CpuMetricsSelector
case "load" SystemLoadAverageMetricsSelector
case fqn
val args = List(classOf[Config] -> deployment)
dynamicAccess.createInstanceFor[MetricsSelector](fqn, args).recover({
case exception throw new IllegalArgumentException(
("Cannot instantiate metrics-selector [%s], defined in [%s], " +
"make sure it extends [akka.cluster.routing.MetricsSelector] and " +
"has constructor with [com.typesafe.config.Config] parameter")
.format(fqn, key), exception)
}).get
}
AdaptiveLoadBalancingRouter(metricsSelector, nrOfInstances, routees, resizer)
case _ super.createRouterConfig(routerType, key, config, deployment)
}
}
}
@SerialVersionUID(1L)

View file

@ -3,7 +3,7 @@
*/
package akka.cluster
import scala.collection.immutable.SortedSet
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, Scheduler }
@ -61,7 +61,7 @@ private[cluster] object InternalClusterAction {
* Command to initiate the process to join the specified
* seed nodes.
*/
case class JoinSeedNodes(seedNodes: IndexedSeq[Address])
case class JoinSeedNodes(seedNodes: immutable.IndexedSeq[Address])
/**
* Start message of the process to join one of the seed nodes.
@ -256,7 +256,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
def initJoin(): Unit = sender ! InitJoinAck(selfAddress)
def joinSeedNodes(seedNodes: IndexedSeq[Address]): Unit = {
def joinSeedNodes(seedNodes: immutable.IndexedSeq[Address]): Unit = {
// only the node which is named first in the list of seed nodes will join itself
if (seedNodes.isEmpty || seedNodes.head == selfAddress)
self ! JoinTo(selfAddress)
@ -770,7 +770,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
*
* @return the used [[akka.actor.Address] if any
*/
private def gossipToRandomNodeOf(addresses: IndexedSeq[Address]): Option[Address] = {
private def gossipToRandomNodeOf(addresses: immutable.IndexedSeq[Address]): Option[Address] = {
log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", selfAddress, addresses.mkString(", "))
// filter out myself
val peer = selectRandomNode(addresses filterNot (_ == selfAddress))
@ -823,7 +823,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
* 5. seed3 retries the join procedure and gets acks from seed2 first, and then joins to seed2
*
*/
private[cluster] final class JoinSeedNodeProcess(seedNodes: IndexedSeq[Address]) extends Actor with ActorLogging {
private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address]) extends Actor with ActorLogging {
import InternalClusterAction._
def selfAddress = Cluster(context.system).selfAddress

View file

@ -4,12 +4,15 @@
package akka.cluster
import language.postfixOps
import scala.collection.immutable.SortedSet
import scala.collection.immutable
import akka.actor.{ Actor, ActorLogging, ActorRef, Address }
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus._
import akka.event.EventStream
import akka.actor.AddressTerminated
import java.lang.Iterable
import akka.japi.Util.immutableSeq
import akka.util.Collections.EmptyImmutableSeq
/**
* Domain events published to the event bus.
@ -28,7 +31,7 @@ object ClusterEvent {
* Current snapshot state of the cluster. Sent to new subscriber.
*/
case class CurrentClusterState(
members: SortedSet[Member] = SortedSet.empty,
members: immutable.SortedSet[Member] = immutable.SortedSet.empty,
unreachable: Set[Member] = Set.empty,
convergence: Boolean = false,
seenBy: Set[Address] = Set.empty,
@ -47,19 +50,15 @@ object ClusterEvent {
* Java API
* Read only
*/
def getUnreachable: java.util.Set[Member] = {
import scala.collection.JavaConverters._
unreachable.asJava
}
def getUnreachable: java.util.Set[Member] =
scala.collection.JavaConverters.setAsJavaSetConverter(unreachable).asJava
/**
* Java API
* Read only
*/
def getSeenBy: java.util.Set[Address] = {
import scala.collection.JavaConverters._
seenBy.asJava
}
def getSeenBy: java.util.Set[Address] =
scala.collection.JavaConverters.setAsJavaSetConverter(seenBy).asJava
/**
* Java API
@ -139,11 +138,16 @@ object ClusterEvent {
}
/**
* INTERNAL API
*
* Current snapshot of cluster member metrics. Published to subscribers.
* Current snapshot of cluster node metrics. Published to subscribers.
*/
case class ClusterMetricsChanged(nodes: Set[NodeMetrics]) extends ClusterDomainEvent
case class ClusterMetricsChanged(nodeMetrics: Set[NodeMetrics]) extends ClusterDomainEvent {
/**
* Java API
*/
def getNodeMetrics: java.lang.Iterable[NodeMetrics] =
scala.collection.JavaConverters.asJavaIterableConverter(nodeMetrics).asJava
}
/**
* INTERNAL API
@ -159,7 +163,7 @@ object ClusterEvent {
/**
* INTERNAL API
*/
private[cluster] def diff(oldGossip: Gossip, newGossip: Gossip): IndexedSeq[ClusterDomainEvent] = {
private[cluster] def diff(oldGossip: Gossip, newGossip: Gossip): immutable.IndexedSeq[ClusterDomainEvent] = {
val newMembers = newGossip.members -- oldGossip.members
val membersGroupedByAddress = (newGossip.members.toList ++ oldGossip.members.toList).groupBy(_.address)
@ -194,18 +198,18 @@ object ClusterEvent {
val newConvergence = newGossip.convergence
val convergenceChanged = newConvergence != oldGossip.convergence
val convergenceEvents = if (convergenceChanged) Seq(ConvergenceChanged(newConvergence)) else Seq.empty
val convergenceEvents = if (convergenceChanged) List(ConvergenceChanged(newConvergence)) else EmptyImmutableSeq
val leaderEvents =
if (newGossip.leader != oldGossip.leader) Seq(LeaderChanged(newGossip.leader))
else Seq.empty
if (newGossip.leader != oldGossip.leader) List(LeaderChanged(newGossip.leader))
else EmptyImmutableSeq
val newSeenBy = newGossip.seenBy
val seenEvents =
if (convergenceChanged || newSeenBy != oldGossip.seenBy) Seq(SeenChanged(newConvergence, newSeenBy))
else Seq.empty
if (convergenceChanged || newSeenBy != oldGossip.seenBy) List(SeenChanged(newConvergence, newSeenBy))
else EmptyImmutableSeq
memberEvents.toIndexedSeq ++ unreachableEvents ++ downedEvents ++ unreachableDownedEvents ++ removedEvents ++
memberEvents.toVector ++ unreachableEvents ++ downedEvents ++ unreachableDownedEvents ++ removedEvents ++
leaderEvents ++ convergenceEvents ++ seenEvents
}

View file

@ -5,7 +5,7 @@ package akka.cluster
import language.postfixOps
import scala.collection.immutable.SortedSet
import scala.collection.immutable
import scala.annotation.tailrec
import scala.concurrent.duration._
import java.net.URLEncoder

View file

@ -4,34 +4,39 @@
package akka.cluster
import scala.language.postfixOps
import java.io.Closeable
import java.lang.System.{ currentTimeMillis newTimestamp }
import java.lang.management.{ OperatingSystemMXBean, MemoryMXBean, ManagementFactory }
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import scala.collection.immutable
import scala.concurrent.duration._
import scala.collection.immutable.{ SortedSet, Map }
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.{ Try, Success, Failure }
import scala.math.ScalaNumericAnyConversions
import runtime.{ ScalaNumberProxy, RichLong, RichDouble, RichInt }
import akka.actor._
import akka.event.LoggingAdapter
import akka.ConfigurationException
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.DynamicAccess
import akka.actor.ExtendedActorSystem
import akka.cluster.MemberStatus.Up
import java.lang.management.{ OperatingSystemMXBean, MemoryMXBean, ManagementFactory }
import java.lang.reflect.Method
import java.lang.System.{ currentTimeMillis newTimestamp }
import akka.event.Logging
import java.lang.management.MemoryUsage
/**
* INTERNAL API.
*
* This strategy is primarily for load-balancing of nodes. It controls metrics sampling
* Cluster metrics is primarily for load-balancing of nodes. It controls metrics sampling
* at a regular frequency, prepares highly variable data for further analysis by other entities,
* and publishes the latest cluster metrics data around the node ring to assist in determining
* the need to redirect traffic to the least-loaded nodes.
* and publishes the latest cluster metrics data around the node ring and local eventStream
* to assist in determining the need to redirect traffic to the least-loaded nodes.
*
* Metrics sampling is delegated to the [[akka.cluster.MetricsCollector]].
*
* Calculation of statistical data for each monitored process is delegated to the
* [[akka.cluster.DataStream]] for exponential smoothing, with additional decay factor.
* Smoothing of the data for each monitored process is delegated to the
* [[akka.cluster.EWMA]] for exponential weighted moving average.
*/
private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Actor with ActorLogging {
@ -46,17 +51,17 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
/**
* The node ring gossipped that contains only members that are Up.
*/
var nodes: SortedSet[Address] = SortedSet.empty
var nodes: immutable.SortedSet[Address] = immutable.SortedSet.empty
/**
* The latest metric values with their statistical data.
*/
var latestGossip: MetricsGossip = MetricsGossip(MetricsRateOfDecay)
var latestGossip: MetricsGossip = MetricsGossip.empty
/**
* The metrics collector that samples data on the node.
*/
val collector: MetricsCollector = MetricsCollector(selfAddress, log, context.system.asInstanceOf[ExtendedActorSystem].dynamicAccess)
val collector: MetricsCollector = MetricsCollector(context.system.asInstanceOf[ExtendedActorSystem], settings)
/**
* Start periodic gossip to random nodes in cluster
@ -79,7 +84,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
case GossipTick gossip()
case MetricsTick collect()
case state: CurrentClusterState receiveState(state)
case MemberUp(m) receiveMember(m)
case MemberUp(m) addMember(m)
case e: MemberEvent removeMember(e)
case msg: MetricsGossipEnvelope receiveGossip(msg)
}
@ -94,7 +99,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
/**
* Adds a member to the node ring.
*/
def receiveMember(member: Member): Unit = nodes += member.address
def addMember(member: Member): Unit = nodes += member.address
/**
* Removes a member from the member node ring.
@ -108,7 +113,8 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
/**
* Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus.Up]].
*/
def receiveState(state: CurrentClusterState): Unit = nodes = state.members collect { case m if m.status == Up m.address }
def receiveState(state: CurrentClusterState): Unit =
nodes = state.members collect { case m if m.status == Up m.address }
/**
* Samples the latest metrics for the node, updates metrics statistics in
@ -123,27 +129,33 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
/**
* Receives changes from peer nodes, merges remote with local gossip nodes, then publishes
* changes to the event stream for load balancing router consumption, and gossips to peers.
* changes to the event stream for load balancing router consumption, and gossip back.
*/
def receiveGossip(envelope: MetricsGossipEnvelope): Unit = {
val remoteGossip = envelope.gossip
if (remoteGossip != latestGossip) {
latestGossip = latestGossip merge remoteGossip
publish()
gossipTo(envelope.from)
}
// remote node might not have same view of member nodes, this side should only care
// about nodes that are known here, otherwise removed nodes can come back
val otherGossip = envelope.gossip.filter(nodes)
latestGossip = latestGossip merge otherGossip
publish()
if (!envelope.reply)
replyGossipTo(envelope.from)
}
/**
* Gossip to peer nodes.
*/
def gossip(): Unit = selectRandomNode((nodes - selfAddress).toIndexedSeq) foreach gossipTo
def gossip(): Unit = selectRandomNode((nodes - selfAddress).toVector) foreach gossipTo
def gossipTo(address: Address): Unit =
context.actorFor(self.path.toStringWithAddress(address)) ! MetricsGossipEnvelope(selfAddress, latestGossip)
sendGossip(address, MetricsGossipEnvelope(selfAddress, latestGossip, reply = false))
def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] =
def replyGossipTo(address: Address): Unit =
sendGossip(address, MetricsGossipEnvelope(selfAddress, latestGossip, reply = true))
def sendGossip(address: Address, envelope: MetricsGossipEnvelope): Unit =
context.actorFor(self.path.toStringWithAddress(address)) ! envelope
def selectRandomNode(addresses: immutable.IndexedSeq[Address]): Option[Address] =
if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
/**
@ -153,61 +165,50 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
}
/**
* INTERNAL API
*/
private[cluster] object MetricsGossip {
val empty = MetricsGossip(Set.empty[NodeMetrics])
}
/**
* INTERNAL API
*
* @param nodes metrics per node
*/
private[cluster] case class MetricsGossip(rateOfDecay: Int, nodes: Set[NodeMetrics] = Set.empty) {
private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics]) {
/**
* Removes nodes if their correlating node ring members are not [[akka.cluster.MemberStatus.Up]]
*/
def remove(node: Address): MetricsGossip = copy(nodes = nodes filterNot (_.address == node))
/**
* Only the nodes that are in the `includeNodes` Set.
*/
def filter(includeNodes: Set[Address]): MetricsGossip =
copy(nodes = nodes filter { includeNodes contains _.address })
/**
* Adds new remote [[akka.cluster.NodeMetrics]] and merges existing from a remote gossip.
*/
def merge(remoteGossip: MetricsGossip): MetricsGossip = {
val remoteNodes = remoteGossip.nodes.map(n n.address -> n).toMap
val toMerge = nodeKeys intersect remoteNodes.keySet
val onlyInRemote = remoteNodes.keySet -- nodeKeys
val onlyInLocal = nodeKeys -- remoteNodes.keySet
def merge(otherGossip: MetricsGossip): MetricsGossip =
otherGossip.nodes.foldLeft(this) { (gossip, nodeMetrics) gossip :+ nodeMetrics }
val seen = nodes.collect {
case n if toMerge contains n.address n merge remoteNodes(n.address)
case n if onlyInLocal contains n.address n
}
val unseen = remoteGossip.nodes.collect { case n if onlyInRemote contains n.address n }
copy(nodes = seen ++ unseen)
/**
* Adds new local [[akka.cluster.NodeMetrics]], or merges an existing.
*/
def :+(newNodeMetrics: NodeMetrics): MetricsGossip = nodeMetricsFor(newNodeMetrics.address) match {
case Some(existingNodeMetrics)
copy(nodes = nodes - existingNodeMetrics + (existingNodeMetrics merge newNodeMetrics))
case None copy(nodes = nodes + newNodeMetrics)
}
/**
* Adds new local [[akka.cluster.NodeMetrics]] and initializes the data, or merges an existing.
* Returns [[akka.cluster.NodeMetrics]] for a node if exists.
*/
def :+(data: NodeMetrics): MetricsGossip = {
val previous = metricsFor(data)
val names = previous map (_.name)
val (toMerge: Set[Metric], unseen: Set[Metric]) = data.metrics partition (a names contains a.name)
val initialized = unseen.map(_.initialize(rateOfDecay))
val merged = toMerge flatMap (latest previous.collect { case peer if latest same peer peer :+ latest })
val refreshed = nodes filterNot (_.address == data.address)
copy(nodes = refreshed + data.copy(metrics = initialized ++ merged))
}
/**
* Returns a set of [[akka.actor.Address]] for a given node set.
*/
def nodeKeys: Set[Address] = nodes map (_.address)
/**
* Returns metrics for a node if exists.
*/
def metricsFor(node: NodeMetrics): Set[Metric] = nodes flatMap (n if (n same node) n.metrics else Set.empty[Metric])
def nodeMetricsFor(address: Address): Option[NodeMetrics] = nodes find { n n.address == address }
}
@ -215,7 +216,31 @@ private[cluster] case class MetricsGossip(rateOfDecay: Int, nodes: Set[NodeMetri
* INTERNAL API
* Envelope adding a sender address to the gossip.
*/
private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip) extends ClusterMessage
private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip, reply: Boolean)
extends ClusterMessage
object EWMA {
/**
* math.log(2)
*/
private val LogOf2 = 0.69315
/**
* Calculate the alpha (decay factor) used in [[akka.cluster.EWMA]]
* from specified half-life and interval between observations.
* Half-life is the interval over which the weights decrease by a factor of two.
* The relevance of each data sample is halved for every passing half-life duration,
* i.e. after 4 times the half-life, a data samples relevance is reduced to 6% of
* its original relevance. The initial relevance of a data sample is given by
* 1 0.5 ^ (collect-interval / half-life).
*/
def alpha(halfLife: FiniteDuration, collectInterval: FiniteDuration): Double = {
val halfLifeMillis = halfLife.toMillis
require(halfLife.toMillis > 0, "halfLife must be > 0 s")
val decayRate = LogOf2 / halfLifeMillis
1 - math.exp(-decayRate * collectInterval.toMillis)
}
}
/**
* The exponentially weighted moving average (EWMA) approach captures short-term
@ -223,176 +248,282 @@ private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: Metrics
* of its alpha, or decay factor, this provides a statistical streaming data model
* that is exponentially biased towards newer entries.
*
* http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
*
* An EWMA only needs the most recent forecast value to be kept, as opposed to a standard
* moving average model.
*
* INTERNAL API
*
* @param decay sets how quickly the exponential weighting decays for past data compared to new data
* @param alpha decay factor, sets how quickly the exponential weighting decays for past data compared to new data,
* see http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
*
* @param ewma the current exponentially weighted moving average, e.g. Y(n - 1), or,
* @param value the current exponentially weighted moving average, e.g. Y(n - 1), or,
* the sampled value resulting from the previous smoothing iteration.
* This value is always used as the previous EWMA to calculate the new EWMA.
*
* @param timestamp the most recent time of sampling
*
* @param startTime the time of initial sampling for this data stream
*/
private[cluster] case class DataStream(decay: Int, ewma: ScalaNumericAnyConversions, startTime: Long, timestamp: Long)
extends ClusterMessage with MetricNumericConverter {
private[cluster] case class EWMA(value: Double, alpha: Double) extends ClusterMessage {
/**
* The rate at which the weights of past observations
* decay as they become more distant.
*/
private val α = 2 / decay + 1
require(0.0 <= alpha && alpha <= 1.0, "alpha must be between 0.0 and 1.0")
/**
* Calculates the exponentially weighted moving average for a given monitored data set.
* The datam can be too large to fit into an int or long, thus we use ScalaNumericAnyConversions,
* and defer to BigInt or BigDecimal.
*
* @param xn the new data point
* @return an new [[akka.cluster.DataStream]] with the updated yn and timestamp
* @return a new [[akka.cluster.EWMA]] with the updated value
*/
def :+(xn: ScalaNumericAnyConversions): DataStream = convert(xn) fold (
nl copy(ewma = BigInt(α * nl + 1 - α * ewma.longValue()), timestamp = newTimestamp),
nd copy(ewma = BigDecimal(α * nd + 1 - α * ewma.doubleValue()), timestamp = newTimestamp))
/**
* The duration of observation for this data stream
*/
def duration: FiniteDuration = (timestamp - startTime) millis
def :+(xn: Double): EWMA = {
val newValue = (alpha * xn) + (1 - alpha) * value
if (newValue == value) this // no change
else copy(value = newValue)
}
}
/**
* INTERNAL API
* Metrics key/value.
*
* Companion object of DataStream class.
*/
private[cluster] object DataStream {
def apply(decay: Int, data: ScalaNumericAnyConversions): Option[DataStream] = if (decay > 0)
Some(DataStream(decay, data, newTimestamp, newTimestamp)) else None
}
/**
* INTERNAL API
* Equality of Metric is based on its name.
*
* @param name the metric name
*
* @param value the metric value, which may or may not be defined
*
* @param value the metric value, which may or may not be defined, it must be a valid numerical value,
* see [[akka.cluster.MetricNumericConverter.defined()]]
* @param average the data stream of the metric value, for trending over time. Metrics that are already
* averages (e.g. system load average) or finite (e.g. as total cores), are not trended.
* averages (e.g. system load average) or finite (e.g. as number of processors), are not trended.
*/
private[cluster] case class Metric(name: String, value: Option[ScalaNumericAnyConversions], average: Option[DataStream])
case class Metric private (name: String, value: Number, private val average: Option[EWMA])
extends ClusterMessage with MetricNumericConverter {
/**
* Returns the metric with a new data stream for data trending if eligible,
* otherwise returns the unchanged metric.
*/
def initialize(decay: Int): Metric = if (initializable) copy(average = DataStream(decay, value.get)) else this
require(defined(value), s"Invalid Metric [$name] value [$value]")
/**
* If defined ( [[akka.cluster.MetricNumericConverter.defined()]] ), updates the new
* data point, and if defined, updates the data stream. Returns the updated metric.
*/
def :+(latest: Metric): Metric = latest.value match {
case Some(v) if this same latest average match {
case Some(previous) copy(value = Some(v), average = Some(previous :+ v))
case None if latest.average.isDefined copy(value = Some(v), average = latest.average)
case None if !latest.average.isDefined copy(value = Some(v))
}
case None this
def :+(latest: Metric): Metric = if (this sameAs latest) average match {
case Some(avg) copy(value = latest.value, average = Some(avg :+ latest.value.doubleValue))
case None if latest.average.isDefined copy(value = latest.value, average = latest.average)
case _ copy(value = latest.value)
}
else this
/**
* The numerical value of the average, if defined, otherwise the latest value
*/
def smoothValue: Double = average match {
case Some(avg) avg.value
case None value.doubleValue
}
/**
* @see [[akka.cluster.MetricNumericConverter.defined()]]
* @return true if this value is smoothed
*/
def isDefined: Boolean = value match {
case Some(a) defined(a)
case None false
}
def isSmooth: Boolean = average.isDefined
/**
* Returns true if <code>that</code> is tracking the same metric as this.
*/
def same(that: Metric): Boolean = name == that.name
def sameAs(that: Metric): Boolean = name == that.name
/**
* Returns true if the metric requires initialization.
*/
def initializable: Boolean = trendable && isDefined && average.isEmpty
/**
* Returns true if the metric is a value applicable for trending.
*/
def trendable: Boolean = !(Metric.noStream contains name)
}
/**
* INTERNAL API
*
* Companion object of Metric class.
*/
private[cluster] object Metric extends MetricNumericConverter {
/**
* The metrics that are already averages or finite are not trended over time.
*/
private val noStream = Set("system-load-average", "total-cores", "processors")
/**
* Evaluates validity of <code>value</code> based on whether it is available (SIGAR on classpath)
* or defined for the OS (JMX). If undefined we set the value option to None and do not modify
* the latest sampled metric to avoid skewing the statistical trend.
*/
def apply(name: String, value: Option[ScalaNumericAnyConversions]): Metric = value match {
case Some(v) if defined(v) Metric(name, value, None)
case _ Metric(name, None, None)
override def hashCode = name.##
override def equals(obj: Any) = obj match {
case other: Metric sameAs(other)
case _ false
}
}
/**
* Factory for creating valid Metric instances.
*/
object Metric extends MetricNumericConverter {
/**
* Creates a new Metric instance if the value is valid, otherwise None
* is returned. Invalid numeric values are negative and NaN/Infinite.
*/
def create(name: String, value: Number, decayFactor: Option[Double]): Option[Metric] =
if (defined(value)) Some(new Metric(name, value, ceateEWMA(value.doubleValue, decayFactor)))
else None
/**
* Creates a new Metric instance if the Try is successful and the value is valid,
* otherwise None is returned. Invalid numeric values are negative and NaN/Infinite.
*/
def create(name: String, value: Try[Number], decayFactor: Option[Double]): Option[Metric] = value match {
case Success(v) create(name, v, decayFactor)
case Failure(_) None
}
private def ceateEWMA(value: Double, decayFactor: Option[Double]): Option[EWMA] = decayFactor match {
case Some(alpha) Some(EWMA(value, alpha))
case None None
}
}
/**
* INTERNAL API
*
* The snapshot of current sampled health metrics for any monitored process.
* Collected and gossipped at regular intervals for dynamic cluster management strategies.
*
* For the JVM memory. The amount of used and committed memory will always be <= max if max is defined.
* A memory allocation may fail if it attempts to increase the used memory such that used > committed
* even if used <= max is true (e.g. when the system virtual memory is low).
*
* The system is possibly nearing a bottleneck if the system load average is nearing in cpus/cores.
* Equality of NodeMetrics is based on its address.
*
* @param address [[akka.actor.Address]] of the node the metrics are gathered at
*
* @param timestamp the time of sampling
*
* @param metrics the array of sampled [[akka.actor.Metric]]
* @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
* @param metrics the set of sampled [[akka.actor.Metric]]
*/
private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) extends ClusterMessage {
case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) extends ClusterMessage {
/**
* Returns the most recent data.
*/
def merge(that: NodeMetrics): NodeMetrics = if (this updatable that) copy(metrics = that.metrics, timestamp = that.timestamp) else this
def merge(that: NodeMetrics): NodeMetrics = {
require(address == that.address, s"merge only allowed for same address, [$address] != [$that.address]")
if (timestamp >= that.timestamp) this // that is older
else {
// equality is based on the name of the Metric and Set doesn't replace existing element
copy(metrics = that.metrics ++ metrics, timestamp = that.timestamp)
}
}
def metric(key: String): Option[Metric] = metrics.collectFirst { case m if m.name == key m }
/**
* Returns true if <code>that</code> address is the same as this and its metric set is more recent.
* Java API
*/
def updatable(that: NodeMetrics): Boolean = (this same that) && (that.timestamp > timestamp)
def getMetrics: java.lang.Iterable[Metric] =
scala.collection.JavaConverters.asJavaIterableConverter(metrics).asJava
/**
* Returns true if <code>that</code> address is the same as this
*/
def same(that: NodeMetrics): Boolean = address == that.address
def sameAs(that: NodeMetrics): Boolean = address == that.address
override def hashCode = address.##
override def equals(obj: Any) = obj match {
case other: NodeMetrics sameAs(other)
case _ false
}
}
/**
* Definitions of the built-in standard metrics.
*
* The following extractors and data structures makes it easy to consume the
* [[akka.cluster.NodeMetrics]] in for example load balancers.
*/
object StandardMetrics {
// Constants for the heap related Metric names
final val HeapMemoryUsed = "heap-memory-used"
final val HeapMemoryCommitted = "heap-memory-committed"
final val HeapMemoryMax = "heap-memory-max"
// Constants for the cpu related Metric names
final val SystemLoadAverage = "system-load-average"
final val Processors = "processors"
final val CpuCombined = "cpu-combined"
object HeapMemory {
/**
* Given a NodeMetrics it returns the HeapMemory data if the nodeMetrics contains
* necessary heap metrics.
* @return if possible a tuple matching the HeapMemory constructor parameters
*/
def unapply(nodeMetrics: NodeMetrics): Option[(Address, Long, Long, Long, Option[Long])] = {
for {
used nodeMetrics.metric(HeapMemoryUsed)
committed nodeMetrics.metric(HeapMemoryCommitted)
} yield (nodeMetrics.address, nodeMetrics.timestamp,
used.smoothValue.longValue, committed.smoothValue.longValue,
nodeMetrics.metric(HeapMemoryMax).map(_.smoothValue.longValue))
}
}
/**
* Java API to extract HeapMemory data from nodeMetrics, if the nodeMetrics
* contains necessary heap metrics, otherwise it returns null.
*/
def extractHeapMemory(nodeMetrics: NodeMetrics): HeapMemory = nodeMetrics match {
case HeapMemory(address, timestamp, used, committed, max)
// note that above extractor returns tuple
HeapMemory(address, timestamp, used, committed, max)
case _ null
}
/**
* The amount of used and committed memory will always be <= max if max is defined.
* A memory allocation may fail if it attempts to increase the used memory such that used > committed
* even if used <= max is true (e.g. when the system virtual memory is low).
*
* @param address [[akka.actor.Address]] of the node the metrics are gathered at
* @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
* @param used the current sum of heap memory used from all heap memory pools (in bytes)
* @param committed the current sum of heap memory guaranteed to be available to the JVM
* from all heap memory pools (in bytes). Committed will always be greater than or equal to used.
* @param max the maximum amount of memory (in bytes) that can be used for JVM memory management.
* Can be undefined on some OS.
*/
case class HeapMemory(address: Address, timestamp: Long, used: Long, committed: Long, max: Option[Long]) {
require(committed > 0L, "committed heap expected to be > 0 bytes")
require(max.isEmpty || max.get > 0L, "max heap expected to be > 0 bytes")
}
object Cpu {
/**
* Given a NodeMetrics it returns the Cpu data if the nodeMetrics contains
* necessary cpu metrics.
* @return if possible a tuple matching the Cpu constructor parameters
*/
def unapply(nodeMetrics: NodeMetrics): Option[(Address, Long, Option[Double], Option[Double], Int)] = {
for {
processors nodeMetrics.metric(Processors)
} yield (nodeMetrics.address, nodeMetrics.timestamp,
nodeMetrics.metric(SystemLoadAverage).map(_.smoothValue),
nodeMetrics.metric(CpuCombined).map(_.smoothValue), processors.value.intValue)
}
}
/**
* Java API to extract Cpu data from nodeMetrics, if the nodeMetrics
* contains necessary cpu metrics, otherwise it returns null.
*/
def extractCpu(nodeMetrics: NodeMetrics): Cpu = nodeMetrics match {
case Cpu(address, timestamp, systemLoadAverage, cpuCombined, processors)
// note that above extractor returns tuple
Cpu(address, timestamp, systemLoadAverage, cpuCombined, processors)
case _ null
}
/**
* @param address [[akka.actor.Address]] of the node the metrics are gathered at
* @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
* @param systemLoadAverage OS-specific average load on the CPUs in the system, for the past 1 minute,
* The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores.
* @param cpuCombined combined CPU sum of User + Sys + Nice + Wait, in percentage ([0.0 - 1.0]. This
* metric can describe the amount of time the CPU spent executing code during n-interval and how
* much more it could theoretically.
* @param processors the number of available processors
*/
case class Cpu(
address: Address,
timestamp: Long,
systemLoadAverage: Option[Double],
cpuCombined: Option[Double],
processors: Int) {
cpuCombined match {
case Some(x) require(0.0 <= x && x <= 1.0, s"cpuCombined must be between [0.0 - 1.0], was [$x]")
case None
}
}
}
@ -405,97 +536,199 @@ private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metri
private[cluster] trait MetricNumericConverter {
/**
* A defined value is neither a -1 or NaN/Infinite:
* An defined value is neither negative nor NaN/Infinite:
* <ul><li>JMX system load average and max heap can be 'undefined' for certain OS, in which case a -1 is returned</li>
* <li>SIGAR combined CPU can occasionally return a NaN or Infinite (known bug)</li></ul>
*/
def defined(value: ScalaNumericAnyConversions): Boolean =
convert(value) fold (a value.underlying != -1, b !(b.isNaN || b.isInfinite))
def defined(value: Number): Boolean = convertNumber(value) match {
case Left(a) a >= 0
case Right(b) !(b < 0.0 || b.isNaN || b.isInfinite)
}
/**
* May involve rounding or truncation.
*/
def convert(from: ScalaNumericAnyConversions): Either[Long, Double] = from match {
case n: BigInt Left(n.longValue())
case n: BigDecimal Right(n.doubleValue())
case n: RichInt Left(n.abs)
case n: RichLong Left(n.self)
case n: RichDouble Right(n.self)
def convertNumber(from: Any): Either[Long, Double] = from match {
case n: Int Left(n)
case n: Long Left(n)
case n: Double Right(n)
case n: Float Right(n)
case n: BigInt Left(n.longValue)
case n: BigDecimal Right(n.doubleValue)
case x throw new IllegalArgumentException(s"Not a number [$x]")
}
}
/**
* INTERNAL API
*
* Loads JVM metrics through JMX monitoring beans. If Hyperic SIGAR is on the classpath, this
* loads wider and more accurate range of metrics in combination with SIGAR's native OS library.
*
* FIXME switch to Scala reflection
*
* @param sigar the optional org.hyperic.Sigar instance
*/
private[cluster] trait MetricsCollector extends Closeable {
/**
* Samples and collects new data points.
*/
def sample: NodeMetrics
}
/**
* Loads JVM and system metrics through JMX monitoring beans.
*
* @param address The [[akka.actor.Address]] of the node being sampled
* @param decay how quickly the exponential weighting of past data is decayed
*/
private[cluster] class MetricsCollector private (private val sigar: Option[AnyRef], address: Address) extends MetricNumericConverter {
class JmxMetricsCollector(address: Address, decayFactor: Double) extends MetricsCollector {
import StandardMetrics._
private def this(cluster: Cluster) =
this(cluster.selfAddress,
EWMA.alpha(cluster.settings.MetricsMovingAverageHalfLife, cluster.settings.MetricsInterval))
/**
* This constructor is used when creating an instance from configured FQCN
*/
def this(system: ActorSystem) = this(Cluster(system))
private val decayFactorOption = Some(decayFactor)
private val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean
private val osMBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean
private val LoadAverage: Option[Method] = createMethodFrom(sigar, "getLoadAverage")
private val CpuList: Option[Method] = createMethodFrom(sigar, "getCpuInfoList").map(m m)
private val NetInterfaces: Option[Method] = createMethodFrom(sigar, "getNetInterfaceList")
private val Cpu: Option[Method] = createMethodFrom(sigar, "getCpuPerc")
private val CombinedCpu: Option[Method] = Try(Cpu.get.getReturnType.getMethod("getCombined")).toOption
/**
* Samples and collects new data points.
*
* @return [[akka.cluster.NodeMetrics]]
* Creates a new instance each time.
*/
def sample: NodeMetrics = NodeMetrics(address, newTimestamp, Set(cpuCombined, totalCores,
systemLoadAverage, used, committed, max, processors, networkMaxRx, networkMaxTx))
def sample: NodeMetrics = NodeMetrics(address, newTimestamp, metrics)
def metrics: Set[Metric] = {
val heap = heapMemoryUsage
Set(systemLoadAverage, heapUsed(heap), heapCommitted(heap), heapMax(heap), processors).flatten
}
/**
* (SIGAR / JMX) Returns the OS-specific average system load on the CPUs in the system, for the past 1 minute.
* On some systems the JMX OS system load average may not be available, in which case a Metric with
* undefined value is returned.
* Hyperic SIGAR provides more precise values, thus, if the library is on the classpath, it is the default.
* JMX Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
* On some systems the JMX OS system load average may not be available, in which case a -1 is
* returned from JMX, and None is returned from this method.
* Creates a new instance each time.
*/
def systemLoadAverage: Metric = Metric("system-load-average",
Try(LoadAverage.get.invoke(sigar.get).asInstanceOf[Array[Double]].toSeq.head).getOrElse(
osMBean.getSystemLoadAverage) match {
case x if x < 0 None // load average may be unavailable on some platform
case x Some(BigDecimal(x))
})
def systemLoadAverage: Option[Metric] = Metric.create(
name = SystemLoadAverage,
value = osMBean.getSystemLoadAverage,
decayFactor = None)
/**
* (JMX) Returns the number of available processors
* Creates a new instance each time.
*/
def processors: Metric = Metric("processors", Some(BigInt(osMBean.getAvailableProcessors)))
def processors: Option[Metric] = Metric.create(
name = Processors,
value = osMBean.getAvailableProcessors,
decayFactor = None)
/**
* Current heap to be passed in to heapUsed, heapCommitted and heapMax
*/
def heapMemoryUsage: MemoryUsage = memoryMBean.getHeapMemoryUsage
/**
* (JMX) Returns the current sum of heap memory used from all heap memory pools (in bytes).
* Creates a new instance each time.
*/
def used: Metric = Metric("heap-memory-used", Some(BigInt(memoryMBean.getHeapMemoryUsage.getUsed)))
def heapUsed(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryUsed,
value = heap.getUsed,
decayFactor = decayFactorOption)
/**
* (JMX) Returns the current sum of heap memory guaranteed to be available to the JVM
* from all heap memory pools (in bytes). Committed will always be greater
* than or equal to used.
* from all heap memory pools (in bytes).
* Creates a new instance each time.
*/
def committed: Metric = Metric("heap-memory-committed", Some(BigInt(memoryMBean.getHeapMemoryUsage.getCommitted)))
def heapCommitted(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryCommitted,
value = heap.getCommitted,
decayFactor = decayFactorOption)
/**
* (JMX) Returns the maximum amount of memory (in bytes) that can be used
* for JVM memory management. If undefined, returns -1.
* for JVM memory management. If not defined the metrics value is None, i.e.
* never negative.
* Creates a new instance each time.
*/
def max: Metric = Metric("heap-memory-max", Some(BigInt(memoryMBean.getHeapMemoryUsage.getMax)))
def heapMax(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryMax,
value = heap.getMax,
decayFactor = None)
override def close(): Unit = ()
}
/**
* Loads metrics through Hyperic SIGAR and JMX monitoring beans. This
* loads wider and more accurate range of metrics compared to JmxMetricsCollector
* by using SIGAR's native OS library.
*
* The constructor will by design throw exception if org.hyperic.sigar.Sigar can't be loaded, due
* to missing classes or native libraries.
*
* TODO switch to Scala reflection
*
* @param address The [[akka.actor.Address]] of the node being sampled
* @param decay how quickly the exponential weighting of past data is decayed
* @param sigar the org.hyperic.Sigar instance
*/
class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: AnyRef)
extends JmxMetricsCollector(address, decayFactor) {
import StandardMetrics._
private def this(cluster: Cluster) =
this(cluster.selfAddress,
EWMA.alpha(cluster.settings.MetricsMovingAverageHalfLife, cluster.settings.MetricsInterval),
cluster.system.dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Nil).get)
/**
* This constructor is used when creating an instance from configured FQCN
*/
def this(system: ActorSystem) = this(Cluster(system))
private val decayFactorOption = Some(decayFactor)
private val EmptyClassArray: Array[(Class[_])] = Array.empty[(Class[_])]
private val LoadAverage: Option[Method] = createMethodFrom(sigar, "getLoadAverage")
private val Cpu: Option[Method] = createMethodFrom(sigar, "getCpuPerc")
private val CombinedCpu: Option[Method] = Try(Cpu.get.getReturnType.getMethod("getCombined")).toOption
// Do something initially, in constructor, to make sure that the native library can be loaded.
// This will by design throw exception if sigar isn't usable
val pid: Long = createMethodFrom(sigar, "getPid") match {
case Some(method)
try method.invoke(sigar).asInstanceOf[Long] catch {
case e: InvocationTargetException if e.getCause.isInstanceOf[LinkageError]
// native libraries not in place
// don't throw fatal LinkageError, but something harmless
throw new IllegalArgumentException(e.getCause.toString)
case e: InvocationTargetException throw e.getCause
}
case None throw new IllegalArgumentException("Wrong version of Sigar, expected 'getPid' method")
}
override def metrics: Set[Metric] = {
super.metrics.filterNot(_.name == SystemLoadAverage) ++ Set(systemLoadAverage, cpuCombined).flatten
}
/**
* (SIGAR / JMX) Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
* On some systems the JMX OS system load average may not be available, in which case a -1 is returned
* from JMX, which means that None is returned from this method.
* Hyperic SIGAR provides more precise values, thus, if the library is on the classpath, it is the default.
* Creates a new instance each time.
*/
override def systemLoadAverage: Option[Metric] = Metric.create(
name = SystemLoadAverage,
value = Try(LoadAverage.get.invoke(sigar).asInstanceOf[Array[AnyRef]](0).asInstanceOf[Number]),
decayFactor = None) orElse super.systemLoadAverage
/**
* (SIGAR) Returns the combined CPU sum of User + Sys + Nice + Wait, in percentage. This metric can describe
@ -504,68 +737,51 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe
*
* In the data stream, this will sometimes return with a valid metric value, and sometimes as a NaN or Infinite.
* Documented bug https://bugzilla.redhat.com/show_bug.cgi?id=749121 and several others.
*
* Creates a new instance each time.
*/
def cpuCombined: Metric = Metric("cpu-combined", Try(BigDecimal(CombinedCpu.get.invoke(Cpu.get.invoke(sigar.get)).asInstanceOf[Double])).toOption)
/**
* (SIGAR) Returns the total number of cores.
*/
def totalCores: Metric = Metric("total-cores", Try(BigInt(CpuList.get.invoke(sigar.get).asInstanceOf[Array[AnyRef]].map(cpu
createMethodFrom(Some(cpu), "getTotalCores").get.invoke(cpu).asInstanceOf[Int]).head)).toOption)
//Array[Int].head - if this would differ on some servers, expose all. In testing each int was always equal.
/**
* (SIGAR) Returns the max network IO read/write value, in bytes, for network latency evaluation.
*/
def networkMaxRx: Metric = networkMaxFor("getRxBytes", "network-max-rx")
/**
* (SIGAR) Returns the max network IO tx value, in bytes.
*/
def networkMaxTx: Metric = networkMaxFor("getTxBytes", "network-max-tx")
/**
* Returns the network stats per interface.
*/
def networkStats: Map[String, AnyRef] = Try(NetInterfaces.get.invoke(sigar.get).asInstanceOf[Array[String]].map(arg
arg -> (createMethodFrom(sigar, "getNetInterfaceStat", Array(classOf[String])).get.invoke(sigar.get, arg))).toMap) getOrElse Map.empty[String, AnyRef]
/**
* Returns true if SIGAR is successfully installed on the classpath, otherwise false.
*/
def isSigar: Boolean = sigar.isDefined
def cpuCombined: Option[Metric] = Metric.create(
name = CpuCombined,
value = Try(CombinedCpu.get.invoke(Cpu.get.invoke(sigar)).asInstanceOf[Number]),
decayFactor = decayFactorOption)
/**
* Releases any native resources associated with this instance.
*/
def close(): Unit = if (isSigar) Try(createMethodFrom(sigar, "close").get.invoke(sigar.get)) getOrElse Unit
override def close(): Unit = Try(createMethodFrom(sigar, "close").get.invoke(sigar))
/**
* Returns the max bytes for the given <code>method</code> in metric for <code>metric</code> from the network interface stats.
*/
private def networkMaxFor(method: String, metric: String): Metric = Metric(metric, Try(Some(BigInt(
networkStats.collect { case (_, a) createMethodFrom(Some(a), method).get.invoke(a).asInstanceOf[Long] }.max))) getOrElse None)
private def createMethodFrom(ref: Option[AnyRef], method: String, types: Array[(Class[_])] = Array.empty[(Class[_])]): Option[Method] =
Try(ref.get.getClass.getMethod(method, types: _*)).toOption
private def createMethodFrom(ref: AnyRef, method: String, types: Array[(Class[_])] = EmptyClassArray): Option[Method] =
Try(ref.getClass.getMethod(method, types: _*)).toOption
}
/**
* INTERNAL API
* Companion object of MetricsCollector class.
* Factory to create configured MetricsCollector.
* If instantiation of SigarMetricsCollector fails (missing class or native library)
* it falls back to use JmxMetricsCollector.
*/
private[cluster] object MetricsCollector {
def apply(address: Address, log: LoggingAdapter, dynamicAccess: DynamicAccess): MetricsCollector =
dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Nil) match {
case Success(identity) new MetricsCollector(Some(identity), address)
case Failure(e)
log.debug(e.toString)
log.info("Hyperic SIGAR was not found on the classpath or not installed properly. " +
"Metrics will be retreived from MBeans, and may be incorrect on some platforms. " +
"To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate" +
"platform-specific native libary to 'java.library.path'.")
new MetricsCollector(None, address)
def apply(system: ExtendedActorSystem, settings: ClusterSettings): MetricsCollector = {
import settings.{ MetricsCollectorClass fqcn }
def log = Logging(system, "MetricsCollector")
if (fqcn == classOf[SigarMetricsCollector].getName) {
Try(new SigarMetricsCollector(system)) match {
case Success(sigarCollector) sigarCollector
case Failure(e)
log.info("Metrics will be retreived from MBeans, and may be incorrect on some platforms. " +
"To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate " +
"platform-specific native libary to 'java.library.path'. Reason: " +
e.toString)
new JmxMetricsCollector(system)
}
} else {
system.dynamicAccess.createInstanceFor[MetricsCollector](fqcn, List(classOf[ActorSystem] -> system)).
recover {
case e throw new ConfigurationException("Could not create custom metrics collector [" + fqcn + "] due to:" + e.toString)
}.get
}
}
}

View file

@ -5,7 +5,7 @@
package akka.cluster
import java.io.Closeable
import scala.collection.immutable.SortedSet
import scala.collection.immutable
import akka.actor.{ Actor, ActorRef, ActorSystemImpl, Address, Props }
import akka.cluster.ClusterEvent._
import akka.actor.PoisonPill
@ -81,7 +81,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
/**
* Current cluster members, sorted by address.
*/
def members: SortedSet[Member] = state.members
def members: immutable.SortedSet[Member] = state.members
/**
* Members that has been detected as unreachable.

View file

@ -3,15 +3,16 @@
*/
package akka.cluster
import scala.collection.immutable
import com.typesafe.config.Config
import scala.concurrent.duration.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.ConfigurationException
import scala.collection.JavaConverters._
import akka.actor.Address
import akka.actor.AddressFromURIString
import akka.dispatch.Dispatchers
import scala.concurrent.duration.FiniteDuration
import akka.japi.Util.immutableSeq
class ClusterSettings(val config: Config, val systemName: String) {
import config._
@ -45,7 +46,8 @@ class ClusterSettings(val config: Config, val systemName: String) {
require(n > 0, "failure-detector.monitored-by-nr-of-members must be > 0"); n
}
final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map { case AddressFromURIString(addr) addr }.toIndexedSeq
final val SeedNodes: immutable.IndexedSeq[Address] =
immutableSeq(getStringList("akka.cluster.seed-nodes")).map { case AddressFromURIString(addr) addr }.toVector
final val SeedNodeTimeout: FiniteDuration = Duration(getMilliseconds("akka.cluster.seed-node-timeout"), MILLISECONDS)
final val PeriodicTasksInitialDelay: FiniteDuration = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS)
final val GossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS)
@ -69,9 +71,16 @@ class ClusterSettings(val config: Config, val systemName: String) {
callTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.call-timeout"), MILLISECONDS),
resetTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.reset-timeout"), MILLISECONDS))
final val MetricsEnabled: Boolean = getBoolean("akka.cluster.metrics.enabled")
final val MetricsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.metrics-interval"), MILLISECONDS)
final val MetricsCollectorClass: String = getString("akka.cluster.metrics.collector-class")
final val MetricsInterval: FiniteDuration = {
val d = Duration(getMilliseconds("akka.cluster.metrics.collect-interval"), MILLISECONDS)
require(d > Duration.Zero, "metrics.collect-interval must be > 0"); d
}
final val MetricsGossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.gossip-interval"), MILLISECONDS)
final val MetricsRateOfDecay: Int = getInt("akka.cluster.metrics.rate-of-decay")
final val MetricsMovingAverageHalfLife: FiniteDuration = {
val d = Duration(getMilliseconds("akka.cluster.metrics.moving-average-half-life"), MILLISECONDS)
require(d > Duration.Zero, "metrics.moving-average-half-life must be > 0"); d
}
}
case class CircuitBreakerSettings(maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)

View file

@ -5,14 +5,14 @@
package akka.cluster
import akka.actor.Address
import scala.collection.immutable.SortedSet
import scala.collection.immutable
import MemberStatus._
/**
* Internal API
*/
private[cluster] object Gossip {
val emptyMembers: SortedSet[Member] = SortedSet.empty
val emptyMembers: immutable.SortedSet[Member] = immutable.SortedSet.empty
}
/**
@ -50,7 +50,7 @@ private[cluster] object Gossip {
*/
private[cluster] case class Gossip(
overview: GossipOverview = GossipOverview(),
members: SortedSet[Member] = Gossip.emptyMembers, // sorted set of members with their status, sorted by address
members: immutable.SortedSet[Member] = Gossip.emptyMembers, // sorted set of members with their status, sorted by address
version: VectorClock = VectorClock()) // vector clock version
extends ClusterMessage // is a serializable cluster message
with Versioned[Gossip] {

View file

@ -6,7 +6,7 @@ package akka.cluster
import language.implicitConversions
import scala.collection.immutable.SortedSet
import scala.collection.immutable
import scala.collection.GenTraversableOnce
import akka.actor.Address
import MemberStatus._

View file

@ -0,0 +1,434 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import java.util.Arrays
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.collection.immutable
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.dispatch.Dispatchers
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.ClusterMetricsChanged
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.NodeMetrics
import akka.cluster.StandardMetrics.Cpu
import akka.cluster.StandardMetrics.HeapMemory
import akka.event.Logging
import akka.japi.Util.immutableSeq
import akka.routing.Broadcast
import akka.routing.Destination
import akka.routing.FromConfig
import akka.routing.NoRouter
import akka.routing.Resizer
import akka.routing.Route
import akka.routing.RouteeProvider
import akka.routing.RouterConfig
object AdaptiveLoadBalancingRouter {
private val escalateStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ SupervisorStrategy.Escalate
}
}
/**
* A Router that performs load balancing of messages to cluster nodes based on
* cluster metric data.
*
* It uses random selection of routees based probabilities derived from
* the remaining capacity of corresponding node.
*
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical
* sense as this means that the router should both create new actors and use the 'routees'
* actor(s). In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* @param metricsSelector decides what probability to use for selecting a routee, based
* on remaining capacity as indicated by the node metrics
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
@SerialVersionUID(1L)
case class AdaptiveLoadBalancingRouter(
metricsSelector: MetricsSelector = MixMetricsSelector,
nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = AdaptiveLoadBalancingRouter.escalateStrategy)
extends RouterConfig with AdaptiveLoadBalancingRouterLike {
/**
* Constructor that sets nrOfInstances to be created.
* Java API
* @param selector the selector is responsible for producing weighted mix of routees from the node metrics
* @param nr number of routees to create
*/
def this(selector: MetricsSelector, nr: Int) = this(metricsSelector = selector, nrOfInstances = nr)
/**
* Constructor that sets the routees to be used.
* Java API
* @param selector the selector is responsible for producing weighted mix of routees from the node metrics
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(selector: MetricsSelector, routeePaths: java.lang.Iterable[String]) =
this(metricsSelector = selector, routees = immutableSeq(routeePaths))
/**
* Constructor that sets the resizer to be used.
* Java API
* @param selector the selector is responsible for producing weighted mix of routees from the node metrics
*/
def this(selector: MetricsSelector, resizer: Resizer) =
this(metricsSelector = selector, resizer = Some(resizer))
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String): AdaptiveLoadBalancingRouter =
copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy): AdaptiveLoadBalancingRouter =
copy(supervisorStrategy = strategy)
/**
* Uses the resizer of the given RouterConfig if this RouterConfig
* doesn't have one, i.e. the resizer defined in code is used if
* resizer was not defined in config.
*/
override def withFallback(other: RouterConfig): RouterConfig = other match {
case _: FromConfig | _: NoRouter this
case otherRouter: AdaptiveLoadBalancingRouter
val useResizer =
if (this.resizer.isEmpty && otherRouter.resizer.isDefined) otherRouter.resizer
else this.resizer
copy(resizer = useResizer)
case _ throw new IllegalArgumentException("Expected AdaptiveLoadBalancingRouter, got [%s]".format(other))
}
}
/**
* INTERNAL API.
*
* This strategy is a metrics-aware router which performs load balancing of messages to
* cluster nodes based on cluster metric data. It consumes [[akka.cluster.ClusterMetricsChanged]]
* events and the [[akka.cluster.routing.MetricsSelector]] creates an mix of
* weighted routees based on the node metrics. Messages are routed randomly to the
* weighted routees, i.e. nodes with lower load are more likely to be used than nodes with
* higher load
*/
trait AdaptiveLoadBalancingRouterLike { this: RouterConfig
def metricsSelector: MetricsSelector
def nrOfInstances: Int
def routees: immutable.Iterable[String]
def routerDispatcher: String
override def createRoute(routeeProvider: RouteeProvider): Route = {
if (resizer.isEmpty) {
if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances)
else routeeProvider.registerRouteesFor(routees)
}
val log = Logging(routeeProvider.context.system, routeeProvider.context.self)
// The current weighted routees, if any. Weights are produced by the metricsSelector
// via the metricsListener Actor. It's only updated by the actor, but accessed from
// the threads of the senders.
@volatile var weightedRoutees: Option[WeightedRoutees] = None
// subscribe to ClusterMetricsChanged and update weightedRoutees
val metricsListener = routeeProvider.context.actorOf(Props(new Actor {
val cluster = Cluster(context.system)
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterMetricsChanged])
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case ClusterMetricsChanged(metrics) receiveMetrics(metrics)
case _: CurrentClusterState // ignore
}
def receiveMetrics(metrics: Set[NodeMetrics]): Unit = {
// this is the only place from where weightedRoutees is updated
weightedRoutees = Some(new WeightedRoutees(routeeProvider.routees, cluster.selfAddress,
metricsSelector.weights(metrics)))
}
}).withDispatcher(routerDispatcher), name = "metricsListener")
def getNext(): ActorRef = weightedRoutees match {
case Some(weighted)
if (weighted.isEmpty) routeeProvider.context.system.deadLetters
else weighted(ThreadLocalRandom.current.nextInt(weighted.total) + 1)
case None
val currentRoutees = routeeProvider.routees
if (currentRoutees.isEmpty) routeeProvider.context.system.deadLetters
else currentRoutees(ThreadLocalRandom.current.nextInt(currentRoutees.size))
}
{
case (sender, message)
message match {
case Broadcast(msg) toAll(sender, routeeProvider.routees)
case msg List(Destination(sender, getNext()))
}
}
}
}
/**
* MetricsSelector that uses the heap metrics.
* Low heap capacity => small weight.
*/
@SerialVersionUID(1L)
case object HeapMetricsSelector extends CapacityMetricsSelector {
/**
* Java API: get the singleton instance
*/
def getInstance = this
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = {
nodeMetrics.collect {
case HeapMemory(address, _, used, committed, max)
val capacity = max match {
case None (committed - used).toDouble / committed
case Some(m) (m - used).toDouble / m
}
(address, capacity)
}.toMap
}
}
/**
* MetricsSelector that uses the combined CPU metrics.
* Combined CPU is sum of User + Sys + Nice + Wait, in percentage.
* Low cpu capacity => small weight.
*/
@SerialVersionUID(1L)
case object CpuMetricsSelector extends CapacityMetricsSelector {
/**
* Java API: get the singleton instance
*/
def getInstance = this
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = {
nodeMetrics.collect {
case Cpu(address, _, _, Some(cpuCombined), _)
val capacity = 1.0 - cpuCombined
(address, capacity)
}.toMap
}
}
/**
* MetricsSelector that uses the system load average metrics.
* System load average is OS-specific average load on the CPUs in the system,
* for the past 1 minute. The system is possibly nearing a bottleneck if the
* system load average is nearing number of cpus/cores.
* Low load average capacity => small weight.
*/
@SerialVersionUID(1L)
case object SystemLoadAverageMetricsSelector extends CapacityMetricsSelector {
/**
* Java API: get the singleton instance
*/
def getInstance = this
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = {
nodeMetrics.collect {
case Cpu(address, _, Some(systemLoadAverage), _, processors)
val capacity = 1.0 - math.min(1.0, systemLoadAverage / processors)
(address, capacity)
}.toMap
}
}
/**
* Singleton instance of the default MixMetricsSelector, which uses [akka.cluster.routing.HeapMetricsSelector],
* [akka.cluster.routing.CpuMetricsSelector], and [akka.cluster.routing.SystemLoadAverageMetricsSelector]
*/
@SerialVersionUID(1L)
object MixMetricsSelector extends MixMetricsSelectorBase(
Vector(HeapMetricsSelector, CpuMetricsSelector, SystemLoadAverageMetricsSelector)) {
/**
* Java API: get the default singleton instance
*/
def getInstance = this
}
/**
* MetricsSelector that combines other selectors and aggregates their capacity
* values. By default it uses [akka.cluster.routing.HeapMetricsSelector],
* [akka.cluster.routing.CpuMetricsSelector], and [akka.cluster.routing.SystemLoadAverageMetricsSelector]
*/
@SerialVersionUID(1L)
case class MixMetricsSelector(
selectors: immutable.IndexedSeq[CapacityMetricsSelector])
extends MixMetricsSelectorBase(selectors)
/**
* Base class for MetricsSelector that combines other selectors and aggregates their capacity.
*/
@SerialVersionUID(1L)
abstract class MixMetricsSelectorBase(selectors: immutable.IndexedSeq[CapacityMetricsSelector])
extends CapacityMetricsSelector {
/**
* Java API
*/
def this(selectors: java.lang.Iterable[CapacityMetricsSelector]) = this(immutableSeq(selectors).toVector)
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = {
val combined: immutable.IndexedSeq[(Address, Double)] = selectors.flatMap(_.capacity(nodeMetrics).toSeq)
// aggregated average of the capacities by address
combined.foldLeft(Map.empty[Address, (Double, Int)].withDefaultValue((0.0, 0))) {
case (acc, (address, capacity))
val (sum, count) = acc(address)
acc + (address -> (sum + capacity, count + 1))
}.map {
case (addr, (sum, count)) (addr -> sum / count)
}
}
}
/**
* A MetricsSelector is responsible for producing weights from the node metrics.
*/
@SerialVersionUID(1L)
trait MetricsSelector extends Serializable {
/**
* The weights per address, based on the the nodeMetrics.
*/
def weights(nodeMetrics: Set[NodeMetrics]): Map[Address, Int]
}
/**
* A MetricsSelector producing weights from remaining capacity.
* The weights are typically proportional to the remaining capacity.
*/
abstract class CapacityMetricsSelector extends MetricsSelector {
/**
* Remaining capacity for each node. The value is between
* 0.0 and 1.0, where 0.0 means no remaining capacity (full
* utilization) and 1.0 means full remaining capacity (zero
* utilization).
*/
def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double]
/**
* Converts the capacity values to weights. The node with lowest
* capacity gets weight 1 (lowest usable capacity is 1%) and other
* nodes gets weights proportional to their capacity compared to
* the node with lowest capacity.
*/
def weights(capacity: Map[Address, Double]): Map[Address, Int] = {
if (capacity.isEmpty) Map.empty[Address, Int]
else {
val (_, min) = capacity.minBy { case (_, c) c }
// lowest usable capacity is 1% (>= 0.5% will be rounded to weight 1), also avoids div by zero
val divisor = math.max(0.01, min)
capacity map { case (addr, c) (addr -> math.round((c) / divisor).toInt) }
}
}
/**
* The weights per address, based on the capacity produced by
* the nodeMetrics.
*/
override def weights(nodeMetrics: Set[NodeMetrics]): Map[Address, Int] =
weights(capacity(nodeMetrics))
}
/**
* INTERNAL API
*
* Pick routee based on its weight. Higher weight, higher probability.
*/
private[cluster] class WeightedRoutees(refs: immutable.IndexedSeq[ActorRef], selfAddress: Address, weights: Map[Address, Int]) {
// fill an array of same size as the refs with accumulated weights,
// binarySearch is used to pick the right bucket from a requested value
// from 1 to the total sum of the used weights.
private val buckets: Array[Int] = {
def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
case Address(_, _, None, None) selfAddress
case a a
}
val buckets = Array.ofDim[Int](refs.size)
val meanWeight = if (weights.isEmpty) 1 else weights.values.sum / weights.size
val w = weights.withDefaultValue(meanWeight) // we dont necessarily have metrics for all addresses
var i = 0
var sum = 0
refs foreach { ref
sum += w(fullAddress(ref))
buckets(i) = sum
i += 1
}
buckets
}
def isEmpty: Boolean = buckets.length == 0
def total: Int = {
require(!isEmpty, "WeightedRoutees must not be used when empty")
buckets(buckets.length - 1)
}
/**
* Pick the routee matching a value, from 1 to total.
*/
def apply(value: Int): ActorRef = {
require(1 <= value && value <= total, "value must be between [1 - %s]" format total)
refs(idx(Arrays.binarySearch(buckets, value)))
}
/**
* Converts the result of Arrays.binarySearch into a index in the buckets array
* see documentation of Arrays.binarySearch for what it returns
*/
private def idx(i: Int): Int = {
if (i >= 0) i // exact match
else {
val j = math.abs(i + 1)
if (j >= buckets.length) throw new IndexOutOfBoundsException(
"Requested index [%s] is > max index [%s]".format(i, buckets.length))
else j
}
}
}

View file

@ -127,7 +127,7 @@ case class ClusterRouterSettings private[akka] (
if (isRouteesPathDefined && maxInstancesPerNode != 1)
throw new IllegalArgumentException("maxInstancesPerNode of cluster router must be 1 when routeesPath is defined")
val routeesPathElements: Iterable[String] = routeesPath match {
val routeesPathElements: immutable.Iterable[String] = routeesPath match {
case RelativeActorPath(elements) elements
case _
throw new IllegalArgumentException("routeesPath [%s] is not a valid relative actor path" format routeesPath)

View file

@ -1,31 +0,0 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.language.postfixOps
import scala.concurrent.duration._
import akka.remote.testkit.{ MultiNodeSpec, MultiNodeConfig }
import com.typesafe.config.ConfigFactory
import akka.testkit.LongRunningTest
object ClusterMetricsDataStreamingOffMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(ConfigFactory.parseString("akka.cluster.metrics.rate-of-decay = 0")
.withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
}
class ClusterMetricsDataStreamingOffMultiJvmNode1 extends ClusterMetricsDataStreamingOffSpec
class ClusterMetricsDataStreamingOffMultiJvmNode2 extends ClusterMetricsDataStreamingOffSpec
abstract class ClusterMetricsDataStreamingOffSpec extends MultiNodeSpec(ClusterMetricsDataStreamingOffMultiJvmSpec) with MultiNodeClusterSpec with MetricSpec {
"Cluster metrics" must {
"not collect stream metric data" taggedAs LongRunningTest in within(30 seconds) {
awaitClusterUp(roles: _*)
awaitCond(clusterView.clusterMetrics.size == roles.size)
awaitCond(clusterView.clusterMetrics.flatMap(_.metrics).filter(_.trendable).forall(_.average.isEmpty))
enterBarrier("after")
}
}
}

View file

@ -28,9 +28,11 @@ class ClusterMetricsMultiJvmNode3 extends ClusterMetricsSpec
class ClusterMetricsMultiJvmNode4 extends ClusterMetricsSpec
class ClusterMetricsMultiJvmNode5 extends ClusterMetricsSpec
abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSpec) with MultiNodeClusterSpec with MetricSpec {
abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSpec) with MultiNodeClusterSpec {
import ClusterMetricsMultiJvmSpec._
def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]
"Cluster metrics" must {
"periodically collect metrics on each node, publish ClusterMetricsChanged to the event stream, " +
"and gossip metrics around the node ring" taggedAs LongRunningTest in within(60 seconds) {
@ -38,9 +40,8 @@ abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSp
enterBarrier("cluster-started")
awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size)
awaitCond(clusterView.clusterMetrics.size == roles.size)
assertInitialized(cluster.settings.MetricsRateOfDecay, collectNodeMetrics(clusterView.clusterMetrics).toSet)
val collector = MetricsCollector(cluster.selfAddress, log, system.asInstanceOf[ExtendedActorSystem].dynamicAccess)
clusterView.clusterMetrics.foreach(n assertExpectedSampleSize(collector.isSigar, cluster.settings.MetricsRateOfDecay, n))
val collector = MetricsCollector(cluster.system, cluster.settings)
collector.sample.metrics.size must be > (3)
enterBarrier("after")
}
"reflect the correct number of node metrics in cluster view" taggedAs LongRunningTest in within(30 seconds) {

View file

@ -3,6 +3,7 @@
*/
package akka.cluster
import scala.collection.immutable
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig
@ -35,7 +36,7 @@ abstract class JoinSeedNodeSpec
import JoinSeedNodeMultiJvmSpec._
def seedNodes: IndexedSeq[Address] = IndexedSeq(seed1, seed2, seed3)
def seedNodes: immutable.IndexedSeq[Address] = Vector(seed1, seed2, seed3)
"A cluster with seed nodes" must {
"be able to start the seed nodes concurrently" taggedAs LongRunningTest in {

View file

@ -64,7 +64,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
def muteLog(sys: ActorSystem = system): Unit = {
if (!sys.log.isDebugEnabled) {
Seq(".*Metrics collection has started successfully.*",
".*Hyperic SIGAR was not found on the classpath.*",
".*Metrics will be retreived from MBeans.*",
".*Cluster Node.* - registered cluster JMX MBean.*",
".*Cluster Node.* - is starting up.*",
".*Shutting down cluster Node.*",

View file

@ -0,0 +1,218 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import language.postfixOps
import java.lang.management.ManagementFactory
import scala.concurrent.Await
import scala.concurrent.duration._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.MultiNodeClusterSpec
import akka.cluster.NodeMetrics
import akka.pattern.ask
import akka.remote.testkit.{ MultiNodeSpec, MultiNodeConfig }
import akka.routing.CurrentRoutees
import akka.routing.FromConfig
import akka.routing.RouterRoutees
import akka.testkit.{ LongRunningTest, DefaultTimeout, ImplicitSender }
object AdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {
class Routee extends Actor {
def receive = {
case _ sender ! Reply(Cluster(context.system).selfAddress)
}
}
class Memory extends Actor with ActorLogging {
var usedMemory: Array[Array[Int]] = _
def receive = {
case AllocateMemory
val heap = ManagementFactory.getMemoryMXBean.getHeapMemoryUsage
// getMax can be undefined (-1)
val max = math.max(heap.getMax, heap.getCommitted)
val used = heap.getUsed
log.debug("used heap before: [{}] bytes, of max [{}]", used, heap.getMax)
// allocate 70% of free space
val allocateBytes = (0.7 * (max - used)).toInt
val numberOfArrays = allocateBytes / 1024
usedMemory = Array.ofDim(numberOfArrays, 248) // each 248 element Int array will use ~ 1 kB
log.debug("used heap after: [{}] bytes", ManagementFactory.getMemoryMXBean.getHeapMemoryUsage.getUsed)
sender ! "done"
}
}
case object AllocateMemory
case class Reply(address: Address)
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
akka.cluster.metrics.collect-interval = 1s
akka.cluster.metrics.gossip-interval = 1s
akka.cluster.metrics.moving-average-half-life = 2s
akka.actor.deployment {
/router3 = {
router = adaptive
metrics-selector = cpu
nr-of-instances = 9
}
/router4 = {
router = adaptive
metrics-selector = "akka.cluster.routing.TestCustomMetricsSelector"
nr-of-instances = 10
cluster {
enabled = on
max-nr-of-instances-per-node = 2
}
}
}
""")).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class TestCustomMetricsSelector(config: Config) extends MetricsSelector {
override def weights(nodeMetrics: Set[NodeMetrics]): Map[Address, Int] = Map.empty
}
class AdaptiveLoadBalancingRouterMultiJvmNode1 extends AdaptiveLoadBalancingRouterSpec
class AdaptiveLoadBalancingRouterMultiJvmNode2 extends AdaptiveLoadBalancingRouterSpec
class AdaptiveLoadBalancingRouterMultiJvmNode3 extends AdaptiveLoadBalancingRouterSpec
abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoadBalancingRouterMultiJvmSpec)
with MultiNodeClusterSpec
with ImplicitSender with DefaultTimeout {
import AdaptiveLoadBalancingRouterMultiJvmSpec._
def currentRoutees(router: ActorRef) =
Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees
def receiveReplies(expectedReplies: Int): Map[Address, Int] = {
val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0)
(receiveWhile(5 seconds, messages = expectedReplies) {
case Reply(address) address
}).foldLeft(zero) {
case (replyMap, address) replyMap + (address -> (replyMap(address) + 1))
}
}
/**
* Fills in self address for local ActorRef
*/
def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
case Address(_, _, None, None) cluster.selfAddress
case a a
}
def startRouter(name: String): ActorRef = {
val router = system.actorOf(Props[Routee].withRouter(ClusterRouterConfig(
local = AdaptiveLoadBalancingRouter(HeapMetricsSelector),
settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1))), name)
awaitCond {
// it may take some time until router receives cluster member events
currentRoutees(router).size == roles.size
}
currentRoutees(router).map(fullAddress).toSet must be(roles.map(address).toSet)
router
}
"A cluster with a AdaptiveLoadBalancingRouter" must {
"start cluster nodes" taggedAs LongRunningTest in {
awaitClusterUp(roles: _*)
enterBarrier("after-1")
}
"use all nodes in the cluster when not overloaded" taggedAs LongRunningTest in {
runOn(first) {
val router1 = startRouter("router1")
// collect some metrics before we start
Thread.sleep(cluster.settings.MetricsInterval.toMillis * 10)
val iterationCount = 100
1 to iterationCount foreach { _
router1 ! "hit"
// wait a while between each message, since metrics is collected periodically
Thread.sleep(10)
}
val replies = receiveReplies(iterationCount)
replies(first) must be > (0)
replies(second) must be > (0)
replies(third) must be > (0)
replies.values.sum must be(iterationCount)
}
enterBarrier("after-2")
}
"prefer node with more free heap capacity" taggedAs LongRunningTest in {
System.gc()
enterBarrier("gc")
runOn(second) {
within(20.seconds) {
system.actorOf(Props[Memory], "memory") ! AllocateMemory
expectMsg("done")
}
}
enterBarrier("heap-allocated")
runOn(first) {
val router2 = startRouter("router2")
router2
// collect some metrics before we start
Thread.sleep(cluster.settings.MetricsInterval.toMillis * 10)
val iterationCount = 3000
1 to iterationCount foreach { _
router2 ! "hit"
}
val replies = receiveReplies(iterationCount)
replies(third) must be > (replies(second))
replies.values.sum must be(iterationCount)
}
enterBarrier("after-3")
}
"create routees from configuration" taggedAs LongRunningTest in {
runOn(first) {
val router3 = system.actorOf(Props[Memory].withRouter(FromConfig()), "router3")
awaitCond {
// it may take some time until router receives cluster member events
currentRoutees(router3).size == 9
}
currentRoutees(router3).map(fullAddress).toSet must be(Set(address(first)))
}
enterBarrier("after-4")
}
"create routees from cluster.enabled configuration" taggedAs LongRunningTest in {
runOn(first) {
val router4 = system.actorOf(Props[Memory].withRouter(FromConfig()), "router4")
awaitCond {
// it may take some time until router receives cluster member events
currentRoutees(router4).size == 6
}
currentRoutees(router4).map(fullAddress).toSet must be(Set(
address(first), address(second), address(third)))
}
enterBarrier("after-5")
}
}
}

View file

@ -47,9 +47,10 @@ class ClusterConfigSpec extends AkkaSpec {
callTimeout = 2 seconds,
resetTimeout = 30 seconds))
MetricsEnabled must be(true)
MetricsCollectorClass must be(classOf[SigarMetricsCollector].getName)
MetricsInterval must be(3 seconds)
MetricsGossipInterval must be(3 seconds)
MetricsRateOfDecay must be(10)
MetricsMovingAverageHalfLife must be(12 seconds)
}
}
}

View file

@ -1,62 +0,0 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import language.postfixOps
import scala.concurrent.duration._
import akka.testkit.{ LongRunningTest, AkkaSpec }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DataStreamSpec extends AkkaSpec(MetricsEnabledSpec.config) with AbstractClusterMetricsSpec with MetricNumericConverter {
import system.dispatcher
val collector = createMetricsCollector
val DefaultRateOfDecay = 10
"DataStream" must {
"calculate the ewma for multiple, variable, data streams" taggedAs LongRunningTest in {
val firstDataSet = collector.sample.metrics.collect { case m if m.trendable && m.isDefined m.initialize(DefaultRateOfDecay) }
var streamingDataSet = firstDataSet
val cancellable = system.scheduler.schedule(0 seconds, 100 millis) {
streamingDataSet = collector.sample.metrics.flatMap(latest streamingDataSet.collect {
case streaming if (latest.trendable && latest.isDefined) && (latest same streaming)
&& (latest.value.get != streaming.value.get) {
val updatedDataStream = streaming.average.get :+ latest.value.get
updatedDataStream.timestamp must be > (streaming.average.get.timestamp)
updatedDataStream.duration.length must be > (streaming.average.get.duration.length)
updatedDataStream.ewma must not be (streaming.average.get.ewma)
updatedDataStream.ewma must not be (latest.value.get)
streaming.copy(value = latest.value, average = Some(updatedDataStream))
}
})
}
awaitCond(firstDataSet.size == streamingDataSet.size, longDuration)
cancellable.cancel()
val finalDataSet = streamingDataSet.map(m m.name -> m).toMap
firstDataSet map {
first
val newMetric = finalDataSet(first.name)
val e1 = first.average.get
val e2 = newMetric.average.get
if (first.value.get != newMetric.value.get) {
e2.ewma must not be (first.value.get)
e2.ewma must not be (newMetric.value.get)
}
if (first.value.get.longValue > newMetric.value.get.longValue) e1.ewma.longValue must be > e2.ewma.longValue
else if (first.value.get.longValue < newMetric.value.get.longValue) e1.ewma.longValue must be < e2.ewma.longValue
}
}
"data streaming is disabled if the decay is set to 0" in {
val data = collector.sample.metrics map (_.initialize(0))
data foreach (_.average.isEmpty must be(true))
}
}
}

View file

@ -0,0 +1,101 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import language.postfixOps
import scala.concurrent.duration._
import akka.testkit.{ LongRunningTest, AkkaSpec }
import scala.concurrent.forkjoin.ThreadLocalRandom
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class EWMASpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsCollectorFactory {
import system.dispatcher
val collector = createMetricsCollector
"DataStream" must {
"calcualate same ewma for constant values" in {
val ds = EWMA(value = 100.0, alpha = 0.18) :+
100.0 :+ 100.0 :+ 100.0
ds.value must be(100.0 plusOrMinus 0.001)
}
"calcualate correct ewma for normal decay" in {
val d0 = EWMA(value = 1000.0, alpha = 2.0 / (1 + 10))
d0.value must be(1000.0 plusOrMinus 0.01)
val d1 = d0 :+ 10.0
d1.value must be(820.0 plusOrMinus 0.01)
val d2 = d1 :+ 10.0
d2.value must be(672.73 plusOrMinus 0.01)
val d3 = d2 :+ 10.0
d3.value must be(552.23 plusOrMinus 0.01)
val d4 = d3 :+ 10.0
d4.value must be(453.64 plusOrMinus 0.01)
val dn = (1 to 100).foldLeft(d0)((d, _) d :+ 10.0)
dn.value must be(10.0 plusOrMinus 0.1)
}
"calculate ewma for alpha 1.0, max bias towards latest value" in {
val d0 = EWMA(value = 100.0, alpha = 1.0)
d0.value must be(100.0 plusOrMinus 0.01)
val d1 = d0 :+ 1.0
d1.value must be(1.0 plusOrMinus 0.01)
val d2 = d1 :+ 57.0
d2.value must be(57.0 plusOrMinus 0.01)
val d3 = d2 :+ 10.0
d3.value must be(10.0 plusOrMinus 0.01)
}
"calculate alpha from half-life and collect interval" in {
// according to http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
val expectedAlpha = 0.1
// alpha = 2.0 / (1 + N)
val n = 19
val halfLife = n.toDouble / 2.8854
val collectInterval = 1.second
val halfLifeDuration = (halfLife * 1000).millis
EWMA.alpha(halfLifeDuration, collectInterval) must be(expectedAlpha plusOrMinus 0.001)
}
"calculate sane alpha from short half-life" in {
val alpha = EWMA.alpha(1.millis, 3.seconds)
alpha must be <= (1.0)
alpha must be >= (0.0)
alpha must be(1.0 plusOrMinus 0.001)
}
"calculate sane alpha from long half-life" in {
val alpha = EWMA.alpha(1.day, 3.seconds)
alpha must be <= (1.0)
alpha must be >= (0.0)
alpha must be(0.0 plusOrMinus 0.001)
}
"calculate the ewma for multiple, variable, data streams" taggedAs LongRunningTest in {
var streamingDataSet = Map.empty[String, Metric]
var usedMemory = Array.empty[Byte]
(1 to 50) foreach { _
// wait a while between each message to give the metrics a chance to change
Thread.sleep(100)
usedMemory = usedMemory ++ Array.fill(1024)(ThreadLocalRandom.current.nextInt(127).toByte)
val changes = collector.sample.metrics.flatMap { latest
streamingDataSet.get(latest.name) match {
case None Some(latest)
case Some(previous)
if (latest.isSmooth && latest.value != previous.value) {
val updated = previous :+ latest
updated.isSmooth must be(true)
updated.smoothValue must not be (previous.smoothValue)
Some(updated)
} else None
}
}
streamingDataSet ++= changes.map(m m.name -> m)
}
}
}
}

View file

@ -4,40 +4,35 @@
package akka.cluster
import akka.testkit.{ ImplicitSender, AkkaSpec }
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.cluster.StandardMetrics._
import scala.util.Failure
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricNumericConverter with ImplicitSender with AbstractClusterMetricsSpec {
class MetricNumericConverterSpec extends WordSpec with MustMatchers with MetricNumericConverter {
"MetricNumericConverter" must {
val collector = createMetricsCollector
"convert " in {
convert(0).isLeft must be(true)
convert(1).left.get must be(1)
convert(1L).isLeft must be(true)
convert(0.0).isRight must be(true)
"convert" in {
convertNumber(0).isLeft must be(true)
convertNumber(1).left.get must be(1)
convertNumber(1L).isLeft must be(true)
convertNumber(0.0).isRight must be(true)
}
"define a new metric" in {
val metric = Metric("heap-memory-used", Some(0L))
metric.initializable must be(true)
metric.name must not be (null)
metric.average.isEmpty must be(true)
metric.trendable must be(true)
if (collector.isSigar) {
val cores = collector.totalCores
cores.isDefined must be(true)
cores.value.get.intValue must be > (0)
cores.initializable must be(false)
}
val Some(metric) = Metric.create(HeapMemoryUsed, 256L, decayFactor = Some(0.18))
metric.name must be(HeapMemoryUsed)
metric.value must be(256L)
metric.isSmooth must be(true)
metric.smoothValue must be(256.0 plusOrMinus 0.0001)
}
"define an undefined value with a None " in {
Metric("x", Some(-1)).value.isDefined must be(false)
Metric("x", Some(java.lang.Double.NaN)).value.isDefined must be(false)
Metric("x", None).isDefined must be(false)
Metric.create("x", -1, None).isDefined must be(false)
Metric.create("x", java.lang.Double.NaN, None).isDefined must be(false)
Metric.create("x", Failure(new RuntimeException), None).isDefined must be(false)
}
"recognize whether a metric value is defined" in {
@ -47,6 +42,7 @@ class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) wit
"recognize whether a metric value is not defined" in {
defined(-1) must be(false)
defined(-1.0) must be(false)
defined(Double.NaN) must be(false)
}
}

View file

@ -0,0 +1,69 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.util.Try
import akka.actor.Address
import akka.testkit.AkkaSpec
import akka.cluster.StandardMetrics._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricValuesSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsCollectorFactory {
val collector = createMetricsCollector
val node1 = NodeMetrics(Address("akka", "sys", "a", 2554), 1, collector.sample.metrics)
val node2 = NodeMetrics(Address("akka", "sys", "a", 2555), 1, collector.sample.metrics)
val nodes: Seq[NodeMetrics] = {
(1 to 100).foldLeft(List(node1, node2)) { (nodes, _)
nodes map { n
n.copy(metrics = collector.sample.metrics.flatMap(latest n.metrics.collect {
case streaming if latest sameAs streaming streaming :+ latest
}))
}
}
}
"NodeMetrics.MetricValues" must {
"extract expected metrics for load balancing" in {
val stream1 = node2.metric(HeapMemoryCommitted).get.value.longValue
val stream2 = node1.metric(HeapMemoryUsed).get.value.longValue
stream1 must be >= (stream2)
}
"extract expected MetricValue types for load balancing" in {
nodes foreach { node
node match {
case HeapMemory(address, _, used, committed, Some(max))
committed must be >= (used)
used must be <= (max)
committed must be <= (max)
// extract is the java api
StandardMetrics.extractHeapMemory(node) must not be (null)
case HeapMemory(address, _, used, committed, None)
used must be > (0L)
committed must be > (0L)
// extract is the java api
StandardMetrics.extractCpu(node) must not be (null)
}
node match {
case Cpu(address, _, systemLoadAverageOption, cpuCombinedOption, processors)
processors must be > (0)
if (systemLoadAverageOption.isDefined)
systemLoadAverageOption.get must be >= (0.0)
if (cpuCombinedOption.isDefined) {
cpuCombinedOption.get must be <= (1.0)
cpuCombinedOption.get must be >= (0.0)
}
// extract is the java api
StandardMetrics.extractCpu(node) must not be (null)
}
}
}
}
}

View file

@ -1,4 +1,5 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
@ -13,57 +14,48 @@ import scala.util.{ Success, Try, Failure }
import akka.actor._
import akka.testkit._
import akka.cluster.StandardMetrics._
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
object MetricsEnabledSpec {
val config = """
akka.cluster.metrics.enabled = on
akka.cluster.metrics.metrics-interval = 1 s
akka.cluster.metrics.collect-interval = 1 s
akka.cluster.metrics.gossip-interval = 1 s
akka.cluster.metrics.rate-of-decay = 10
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
"""
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with AbstractClusterMetricsSpec with MetricSpec {
class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with MetricsCollectorFactory {
import system.dispatcher
val collector = createMetricsCollector
"Metric must" must {
"create and initialize a new metric or merge an existing one" in {
for (i 0 to samples) {
val metrics = collector.sample.metrics
assertCreatedUninitialized(metrics)
assertInitialized(window, metrics map (_.initialize(window)))
}
}
"merge 2 metrics that are tracking the same metric" in {
for (i 0 to samples) {
for (i 1 to 20) {
val sample1 = collector.sample.metrics
val sample2 = collector.sample.metrics
var merged = sample2 flatMap (latest sample1 collect {
case peer if latest same peer {
val merged12 = sample2 flatMap (latest sample1 collect {
case peer if latest sameAs peer
val m = peer :+ latest
assertMerged(latest, peer, m)
m.value must be(latest.value)
m.isSmooth must be(peer.isSmooth || latest.isSmooth)
m
}
})
val sample3 = collector.sample.metrics map (_.initialize(window))
val sample4 = collector.sample.metrics map (_.initialize(window))
merged = sample4 flatMap (latest sample3 collect {
case peer if latest same peer {
val sample3 = collector.sample.metrics
val sample4 = collector.sample.metrics
val merged34 = sample4 flatMap (latest sample3 collect {
case peer if latest sameAs peer
val m = peer :+ latest
assertMerged(latest, peer, m)
m.value must be(latest.value)
m.isSmooth must be(peer.isSmooth || latest.isSmooth)
m
}
})
merged.size must be(sample3.size)
merged.size must be(sample4.size)
}
}
}
@ -76,158 +68,65 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl
"collect accurate metrics for a node" in {
val sample = collector.sample
assertExpectedSampleSize(collector.isSigar, window, sample)
val metrics = sample.metrics.collect { case m if m.isDefined (m.name, m.value.get) }
val used = metrics collectFirst { case ("heap-memory-used", b) b }
val committed = metrics collectFirst { case ("heap-memory-committed", b) b }
val metrics = sample.metrics.collect { case m (m.name, m.value) }
val used = metrics collectFirst { case (HeapMemoryUsed, b) b }
val committed = metrics collectFirst { case (HeapMemoryCommitted, b) b }
metrics foreach {
case ("total-cores", b) b.intValue must be > (0)
case ("network-max-rx", b) b.longValue must be > (0L)
case ("network-max-tx", b) b.longValue must be > (0L)
case ("system-load-average", b) b.doubleValue must be >= (0.0)
case ("processors", b) b.intValue must be >= (0)
case ("heap-memory-used", b) b.longValue must be >= (0L)
case ("heap-memory-committed", b) b.longValue must be > (0L)
case ("cpu-combined", b)
b.doubleValue must be <= (1.0)
b.doubleValue must be >= (0.0)
case ("heap-memory-max", b)
case (SystemLoadAverage, b) b.doubleValue must be >= (0.0)
case (Processors, b) b.intValue must be >= (0)
case (HeapMemoryUsed, b) b.longValue must be >= (0L)
case (HeapMemoryCommitted, b) b.longValue must be > (0L)
case (HeapMemoryMax, b)
b.longValue must be > (0L)
used.get.longValue must be <= (b.longValue)
committed.get.longValue must be <= (b.longValue)
}
}
case (CpuCombined, b)
b.doubleValue must be <= (1.0)
b.doubleValue must be >= (0.0)
"collect SIGAR metrics if it is on the classpath" in {
if (collector.isSigar) {
// combined cpu may or may not be defined on a given sampling
// systemLoadAverage is SIGAR present
collector.systemLoadAverage.isDefined must be(true)
collector.networkStats.nonEmpty must be(true)
collector.networkMaxRx.isDefined must be(true)
collector.networkMaxTx.isDefined must be(true)
collector.totalCores.isDefined must be(true)
}
}
"collect JMX metrics" in {
// heap max may be undefined depending on the OS
// systemLoadAverage is JMX if SIGAR not present, but not available on all OS
collector.used.isDefined must be(true)
collector.committed.isDefined must be(true)
collector.processors.isDefined must be(true)
// systemLoadAverage is JMX when SIGAR not present, but
// it's not present on all platforms
val c = collector.asInstanceOf[JmxMetricsCollector]
val heap = c.heapMemoryUsage
c.heapUsed(heap).isDefined must be(true)
c.heapCommitted(heap).isDefined must be(true)
c.processors.isDefined must be(true)
}
"collect [" + samples + "] node metrics samples in an acceptable duration" taggedAs LongRunningTest in {
val latch = TestLatch(samples)
val task = system.scheduler.schedule(0 seconds, interval) {
"collect 50 node metrics samples in an acceptable duration" taggedAs LongRunningTest in within(7 seconds) {
(1 to 50) foreach { _
val sample = collector.sample
assertCreatedUninitialized(sample.metrics)
assertExpectedSampleSize(collector.isSigar, window, sample)
latch.countDown()
sample.metrics.size must be >= (3)
Thread.sleep(100)
}
Await.ready(latch, longDuration)
task.cancel()
}
}
}
trait MetricSpec extends WordSpec with MustMatchers {
/**
* Used when testing metrics without full cluster
*/
trait MetricsCollectorFactory { this: AkkaSpec
def assertMasterMetricsAgainstGossipMetrics(master: Set[NodeMetrics], gossip: MetricsGossip): Unit = {
val masterMetrics = collectNodeMetrics(master)
val gossipMetrics = collectNodeMetrics(gossip.nodes)
gossipMetrics.size must be(masterMetrics.size plusOrMinus 1) // combined cpu
}
private def extendedActorSystem = system.asInstanceOf[ExtendedActorSystem]
def assertExpectedNodeAddresses(gossip: MetricsGossip, nodes: Set[NodeMetrics]): Unit =
gossip.nodes.map(_.address) must be(nodes.map(_.address))
def selfAddress = extendedActorSystem.provider.rootPath.address
def assertExpectedSampleSize(isSigar: Boolean, gossip: MetricsGossip): Unit =
gossip.nodes.foreach(n assertExpectedSampleSize(isSigar, gossip.rateOfDecay, n))
val defaultDecayFactor = 2.0 / (1 + 10)
def assertCreatedUninitialized(gossip: MetricsGossip): Unit =
gossip.nodes.foreach(n assertCreatedUninitialized(n.metrics.filterNot(_.trendable)))
def createMetricsCollector: MetricsCollector =
Try(new SigarMetricsCollector(selfAddress, defaultDecayFactor,
extendedActorSystem.dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Nil))).
recover {
case e
log.debug("Metrics will be retreived from MBeans, Sigar failed to load. Reason: " + e)
new JmxMetricsCollector(selfAddress, defaultDecayFactor)
}.get
def assertInitialized(gossip: MetricsGossip): Unit =
gossip.nodes.foreach(n assertInitialized(gossip.rateOfDecay, n.metrics))
def assertCreatedUninitialized(metrics: Set[Metric]): Unit = {
metrics.size must be > (0)
metrics foreach { m
m.average.isEmpty must be(true)
if (m.value.isDefined) m.isDefined must be(true)
if (m.initializable) (m.trendable && m.isDefined && m.average.isEmpty) must be(true)
}
}
def assertInitialized(decay: Int, metrics: Set[Metric]): Unit = if (decay > 0) metrics.filter(_.trendable) foreach { m
m.initializable must be(false)
if (m.isDefined) m.average.isDefined must be(true)
}
def assertMerged(latest: Metric, peer: Metric, merged: Metric): Unit = if (latest same peer) {
if (latest.isDefined) {
if (peer.isDefined) {
merged.isDefined must be(true)
merged.value.get must be(latest.value.get)
if (latest.trendable) {
if (latest.initializable) merged.average.isEmpty must be(true)
else merged.average.isDefined must be(true)
}
} else {
merged.isDefined must be(true)
merged.value.get must be(latest.value.get)
if (latest.average.isDefined) merged.average.get must be(latest.average.get)
else merged.average.isEmpty must be(true)
}
} else {
if (peer.isDefined) {
merged.isDefined must be(true)
merged.value.get must be(peer.value.get)
if (peer.trendable) {
if (peer.initializable) merged.average.isEmpty must be(true)
else merged.average.isDefined must be(true)
}
} else {
merged.isDefined must be(false)
merged.average.isEmpty must be(true)
}
}
}
def assertExpectedSampleSize(isSigar: Boolean, decay: Int, node: NodeMetrics): Unit = {
node.metrics.size must be(9)
val metrics = node.metrics.filter(_.isDefined)
if (isSigar) { // combined cpu + jmx max heap
metrics.size must be >= (7)
metrics.size must be <= (9)
} else { // jmx max heap
metrics.size must be >= (4)
metrics.size must be <= (5)
}
if (decay > 0) metrics.collect { case m if m.trendable && (!m.initializable) m }.foreach(_.average.isDefined must be(true))
}
def collectNodeMetrics(nodes: Set[NodeMetrics]): immutable.Seq[Metric] =
nodes.foldLeft(Vector[Metric]()) {
case (r, n) r ++ n.metrics.filter(_.isDefined)
}
def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]
}
trait AbstractClusterMetricsSpec extends DefaultTimeout {
this: AkkaSpec
val selfAddress = new Address("akka", "localhost")
val window = 49
val interval: FiniteDuration = 100 millis
val longDuration = 120 seconds // for long running tests
val samples = 100
def createMetricsCollector: MetricsCollector = MetricsCollector(selfAddress, log, system.asInstanceOf[ExtendedActorSystem].dynamicAccess)
}

View file

@ -12,95 +12,95 @@ import akka.actor.Address
import java.lang.System.{ currentTimeMillis newTimestamp }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with AbstractClusterMetricsSpec with MetricSpec {
class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with MetricsCollectorFactory {
val collector = createMetricsCollector
"A MetricsGossip" must {
"add and initialize new NodeMetrics" in {
"add new NodeMetrics" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
var localGossip = MetricsGossip(window)
localGossip :+= m1
localGossip.nodes.size must be(1)
localGossip.nodeKeys.size must be(localGossip.nodes.size)
assertMasterMetricsAgainstGossipMetrics(Set(m1), localGossip)
assertExpectedSampleSize(collector.isSigar, localGossip)
assertInitialized(localGossip.rateOfDecay, collectNodeMetrics(localGossip.nodes).toSet)
m1.metrics.size must be > (3)
m2.metrics.size must be > (3)
localGossip :+= m2
localGossip.nodes.size must be(2)
localGossip.nodeKeys.size must be(localGossip.nodes.size)
assertMasterMetricsAgainstGossipMetrics(Set(m1, m2), localGossip)
assertExpectedSampleSize(collector.isSigar, localGossip)
assertInitialized(localGossip.rateOfDecay, collectNodeMetrics(localGossip.nodes).toSet)
val g1 = MetricsGossip.empty :+ m1
g1.nodes.size must be(1)
g1.nodeMetricsFor(m1.address).map(_.metrics) must be(Some(m1.metrics))
val g2 = g1 :+ m2
g2.nodes.size must be(2)
g2.nodeMetricsFor(m1.address).map(_.metrics) must be(Some(m1.metrics))
g2.nodeMetricsFor(m2.address).map(_.metrics) must be(Some(m2.metrics))
}
"merge peer metrics" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
var remoteGossip = MetricsGossip(window)
remoteGossip :+= m1
remoteGossip :+= m2
remoteGossip.nodes.size must be(2)
val beforeMergeNodes = remoteGossip.nodes
val g1 = MetricsGossip.empty :+ m1 :+ m2
g1.nodes.size must be(2)
val beforeMergeNodes = g1.nodes
val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = newTimestamp)
remoteGossip :+= m2Updated // merge peers
remoteGossip.nodes.size must be(2)
assertMasterMetricsAgainstGossipMetrics(beforeMergeNodes, remoteGossip)
assertExpectedSampleSize(collector.isSigar, remoteGossip)
remoteGossip.nodes collect { case peer if peer.address == m2.address peer.timestamp must be(m2Updated.timestamp) }
val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = m2.timestamp + 1000)
val g2 = g1 :+ m2Updated // merge peers
g2.nodes.size must be(2)
g2.nodeMetricsFor(m1.address).map(_.metrics) must be(Some(m1.metrics))
g2.nodeMetricsFor(m2.address).map(_.metrics) must be(Some(m2Updated.metrics))
g2.nodes collect { case peer if peer.address == m2.address peer.timestamp must be(m2Updated.timestamp) }
}
"merge an existing metric set for a node and update node ring" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
val m3 = NodeMetrics(Address("akka", "sys", "a", 2556), newTimestamp, collector.sample.metrics)
val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = newTimestamp)
val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = m2.timestamp + 1000)
var localGossip = MetricsGossip(window)
localGossip :+= m1
localGossip :+= m2
val g1 = MetricsGossip.empty :+ m1 :+ m2
val g2 = MetricsGossip.empty :+ m3 :+ m2Updated
var remoteGossip = MetricsGossip(window)
remoteGossip :+= m3
remoteGossip :+= m2Updated
localGossip.nodeKeys.contains(m1.address) must be(true)
remoteGossip.nodeKeys.contains(m3.address) must be(true)
g1.nodes.map(_.address) must be(Set(m1.address, m2.address))
// must contain nodes 1,3, and the most recent version of 2
val mergedGossip = localGossip merge remoteGossip
mergedGossip.nodes.size must be(3)
assertExpectedNodeAddresses(mergedGossip, Set(m1, m2, m3))
assertExpectedSampleSize(collector.isSigar, mergedGossip)
assertCreatedUninitialized(mergedGossip)
assertInitialized(mergedGossip)
mergedGossip.nodes.find(_.address == m2.address).get.timestamp must be(m2Updated.timestamp)
val mergedGossip = g1 merge g2
mergedGossip.nodes.map(_.address) must be(Set(m1.address, m2.address, m3.address))
mergedGossip.nodeMetricsFor(m1.address).map(_.metrics) must be(Some(m1.metrics))
mergedGossip.nodeMetricsFor(m2.address).map(_.metrics) must be(Some(m2Updated.metrics))
mergedGossip.nodeMetricsFor(m3.address).map(_.metrics) must be(Some(m3.metrics))
mergedGossip.nodes.foreach(_.metrics.size must be > (3))
mergedGossip.nodeMetricsFor(m2.address).map(_.timestamp) must be(Some(m2Updated.timestamp))
}
"get the current NodeMetrics if it exists in the local nodes" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
var localGossip = MetricsGossip(window)
localGossip :+= m1
localGossip.metricsFor(m1).nonEmpty must be(true)
val g1 = MetricsGossip.empty :+ m1
g1.nodeMetricsFor(m1.address).map(_.metrics) must be(Some(m1.metrics))
}
"remove a node if it is no longer Up" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
var localGossip = MetricsGossip(window)
localGossip :+= m1
localGossip :+= m2
val g1 = MetricsGossip.empty :+ m1 :+ m2
g1.nodes.size must be(2)
val g2 = g1 remove m1.address
g2.nodes.size must be(1)
g2.nodes.exists(_.address == m1.address) must be(false)
g2.nodeMetricsFor(m1.address) must be(None)
g2.nodeMetricsFor(m2.address).map(_.metrics) must be(Some(m2.metrics))
}
localGossip.nodes.size must be(2)
localGossip = localGossip remove m1.address
localGossip.nodes.size must be(1)
localGossip.nodes.exists(_.address == m1.address) must be(false)
"filter nodes" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
val g1 = MetricsGossip.empty :+ m1 :+ m2
g1.nodes.size must be(2)
val g2 = g1 filter Set(m2.address)
g2.nodes.size must be(1)
g2.nodes.exists(_.address == m1.address) must be(false)
g2.nodeMetricsFor(m1.address) must be(None)
g2.nodeMetricsFor(m2.address).map(_.metrics) must be(Some(m2.metrics))
}
}
}

View file

@ -4,51 +4,44 @@
package akka.cluster
import akka.testkit.AkkaSpec
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.Address
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class NodeMetricsSpec extends AkkaSpec with AbstractClusterMetricsSpec with MetricSpec {
val collector = createMetricsCollector
class NodeMetricsSpec extends WordSpec with MustMatchers {
val node1 = Address("akka", "sys", "a", 2554)
val node2 = Address("akka", "sys", "a", 2555)
"NodeMetrics must" must {
"recognize updatable nodes" in {
(NodeMetrics(node1, 0) updatable NodeMetrics(node1, 1)) must be(true)
}
"recognize non-updatable nodes" in {
(NodeMetrics(node1, 1) updatable NodeMetrics(node2, 0)) must be(false)
}
"return correct result for 2 'same' nodes" in {
(NodeMetrics(node1, 0) same NodeMetrics(node1, 0)) must be(true)
(NodeMetrics(node1, 0) sameAs NodeMetrics(node1, 0)) must be(true)
}
"return correct result for 2 not 'same' nodes" in {
(NodeMetrics(node1, 0) same NodeMetrics(node2, 0)) must be(false)
(NodeMetrics(node1, 0) sameAs NodeMetrics(node2, 0)) must be(false)
}
"merge 2 NodeMetrics by most recent" in {
val sample1 = NodeMetrics(node1, 1, collector.sample.metrics)
val sample2 = NodeMetrics(node1, 2, collector.sample.metrics)
val sample1 = NodeMetrics(node1, 1, Set(Metric.create("a", 10, None), Metric.create("b", 20, None)).flatten)
val sample2 = NodeMetrics(node1, 2, Set(Metric.create("a", 11, None), Metric.create("c", 30, None)).flatten)
val merged = sample1 merge sample2
merged.timestamp must be(sample2.timestamp)
merged.metrics must be(sample2.metrics)
merged.metric("a").map(_.value) must be(Some(11))
merged.metric("b").map(_.value) must be(Some(20))
merged.metric("c").map(_.value) must be(Some(30))
}
"not merge 2 NodeMetrics if master is more recent" in {
val sample1 = NodeMetrics(node1, 1, collector.sample.metrics)
val sample2 = NodeMetrics(node2, 0, sample1.metrics)
val sample1 = NodeMetrics(node1, 1, Set(Metric.create("a", 10, None), Metric.create("b", 20, None)).flatten)
val sample2 = NodeMetrics(node1, 0, Set(Metric.create("a", 11, None), Metric.create("c", 30, None)).flatten)
val merged = sample2 merge sample2 // older and not same
merged.timestamp must be(sample2.timestamp)
merged.metrics must be(sample2.metrics)
val merged = sample1 merge sample2 // older and not same
merged.timestamp must be(sample1.timestamp)
merged.metrics must be(sample1.metrics)
}
}
}

View file

@ -0,0 +1,118 @@
/*
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.Address
import akka.cluster.Metric
import akka.cluster.NodeMetrics
import akka.cluster.StandardMetrics._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricsSelectorSpec extends WordSpec with MustMatchers {
val abstractSelector = new CapacityMetricsSelector {
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = Map.empty
}
val a1 = Address("akka", "sys", "a1", 2551)
val b1 = Address("akka", "sys", "b1", 2551)
val c1 = Address("akka", "sys", "c1", 2551)
val d1 = Address("akka", "sys", "d1", 2551)
val decayFactor = Some(0.18)
val nodeMetricsA = NodeMetrics(a1, System.currentTimeMillis, Set(
Metric.create(HeapMemoryUsed, 128, decayFactor),
Metric.create(HeapMemoryCommitted, 256, decayFactor),
Metric.create(HeapMemoryMax, 512, None),
Metric.create(CpuCombined, 0.1, decayFactor),
Metric.create(SystemLoadAverage, 0.5, None),
Metric.create(Processors, 8, None)).flatten)
val nodeMetricsB = NodeMetrics(b1, System.currentTimeMillis, Set(
Metric.create(HeapMemoryUsed, 256, decayFactor),
Metric.create(HeapMemoryCommitted, 512, decayFactor),
Metric.create(HeapMemoryMax, 1024, None),
Metric.create(CpuCombined, 0.5, decayFactor),
Metric.create(SystemLoadAverage, 1.0, None),
Metric.create(Processors, 16, None)).flatten)
val nodeMetricsC = NodeMetrics(c1, System.currentTimeMillis, Set(
Metric.create(HeapMemoryUsed, 1024, decayFactor),
Metric.create(HeapMemoryCommitted, 1024, decayFactor),
Metric.create(HeapMemoryMax, 1024, None),
Metric.create(CpuCombined, 1.0, decayFactor),
Metric.create(SystemLoadAverage, 16.0, None),
Metric.create(Processors, 16, None)).flatten)
val nodeMetricsD = NodeMetrics(d1, System.currentTimeMillis, Set(
Metric.create(HeapMemoryUsed, 511, decayFactor),
Metric.create(HeapMemoryCommitted, 512, decayFactor),
Metric.create(HeapMemoryMax, 512, None),
Metric.create(Processors, 2, decayFactor)).flatten)
val nodeMetrics = Set(nodeMetricsA, nodeMetricsB, nodeMetricsC, nodeMetricsD)
"CapacityMetricsSelector" must {
"calculate weights from capacity" in {
val capacity = Map(a1 -> 0.6, b1 -> 0.3, c1 -> 0.1)
val weights = abstractSelector.weights(capacity)
weights must be(Map(c1 -> 1, b1 -> 3, a1 -> 6))
}
"handle low and zero capacity" in {
val capacity = Map(a1 -> 0.0, b1 -> 1.0, c1 -> 0.005, d1 -> 0.004)
val weights = abstractSelector.weights(capacity)
weights must be(Map(a1 -> 0, b1 -> 100, c1 -> 1, d1 -> 0))
}
}
"HeapMetricsSelector" must {
"calculate capacity of heap metrics" in {
val capacity = HeapMetricsSelector.capacity(nodeMetrics)
capacity(a1) must be(0.75 plusOrMinus 0.0001)
capacity(b1) must be(0.75 plusOrMinus 0.0001)
capacity(c1) must be(0.0 plusOrMinus 0.0001)
capacity(d1) must be(0.001953125 plusOrMinus 0.0001)
}
}
"CpuMetricsSelector" must {
"calculate capacity of cpuCombined metrics" in {
val capacity = CpuMetricsSelector.capacity(nodeMetrics)
capacity(a1) must be(0.9 plusOrMinus 0.0001)
capacity(b1) must be(0.5 plusOrMinus 0.0001)
capacity(c1) must be(0.0 plusOrMinus 0.0001)
capacity.contains(d1) must be(false)
}
}
"SystemLoadAverageMetricsSelector" must {
"calculate capacity of systemLoadAverage metrics" in {
val capacity = SystemLoadAverageMetricsSelector.capacity(nodeMetrics)
capacity(a1) must be(0.9375 plusOrMinus 0.0001)
capacity(b1) must be(0.9375 plusOrMinus 0.0001)
capacity(c1) must be(0.0 plusOrMinus 0.0001)
capacity.contains(d1) must be(false)
}
}
"MixMetricsSelector" must {
"aggregate capacity of all metrics" in {
val capacity = MixMetricsSelector.capacity(nodeMetrics)
capacity(a1) must be((0.75 + 0.9 + 0.9375) / 3 plusOrMinus 0.0001)
capacity(b1) must be((0.75 + 0.5 + 0.9375) / 3 plusOrMinus 0.0001)
capacity(c1) must be((0.0 + 0.0 + 0.0) / 3 plusOrMinus 0.0001)
capacity(d1) must be((0.001953125) / 1 plusOrMinus 0.0001)
}
}
}

View file

@ -0,0 +1,87 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import com.typesafe.config.ConfigFactory
import akka.actor.Address
import akka.actor.RootActorPath
import akka.testkit.AkkaSpec
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class WeightedRouteesSpec extends AkkaSpec(ConfigFactory.parseString("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.netty.port = 0
""")) {
val a1 = Address("akka", "sys", "a1", 2551)
val b1 = Address("akka", "sys", "b1", 2551)
val c1 = Address("akka", "sys", "c1", 2551)
val d1 = Address("akka", "sys", "d1", 2551)
val refA = system.actorFor(RootActorPath(a1) / "user" / "a")
val refB = system.actorFor(RootActorPath(b1) / "user" / "b")
val refC = system.actorFor(RootActorPath(c1) / "user" / "c")
"WeightedRoutees" must {
"allocate weighted refs" in {
val weights = Map(a1 -> 1, b1 -> 3, c1 -> 10)
val refs = Vector(refA, refB, refC)
val weighted = new WeightedRoutees(refs, a1, weights)
weighted(1) must be(refA)
2 to 4 foreach { weighted(_) must be(refB) }
5 to 14 foreach { weighted(_) must be(refC) }
weighted.total must be(14)
}
"check boundaries" in {
val empty = new WeightedRoutees(Vector(), a1, Map.empty)
empty.isEmpty must be(true)
intercept[IllegalArgumentException] {
empty.total
}
val weighted = new WeightedRoutees(Vector(refA, refB, refC), a1, Map.empty)
weighted.total must be(3)
intercept[IllegalArgumentException] {
weighted(0)
}
intercept[IllegalArgumentException] {
weighted(4)
}
}
"allocate refs for undefined weight" in {
val weights = Map(a1 -> 1, b1 -> 7)
val refs = Vector(refA, refB, refC)
val weighted = new WeightedRoutees(refs, a1, weights)
weighted(1) must be(refA)
2 to 8 foreach { weighted(_) must be(refB) }
// undefined, uses the mean of the weights, i.e. 4
9 to 12 foreach { weighted(_) must be(refC) }
weighted.total must be(12)
}
"allocate weighted local refs" in {
val weights = Map(a1 -> 2, b1 -> 1, c1 -> 10)
val refs = Vector(testActor, refB, refC)
val weighted = new WeightedRoutees(refs, a1, weights)
1 to 2 foreach { weighted(_) must be(testActor) }
3 to weighted.total foreach { weighted(_) must not be (testActor) }
}
"not allocate ref with weight zero" in {
val weights = Map(a1 -> 0, b1 -> 2, c1 -> 10)
val refs = Vector(refA, refB, refC)
val weighted = new WeightedRoutees(refs, a1, weights)
1 to weighted.total foreach { weighted(_) must not be (refA) }
}
}
}

View file

@ -38,8 +38,7 @@ Try it out:
1. Add the following ``application.conf`` in your project, place it in ``src/main/resources``:
.. literalinclude:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf
:language: none
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#cluster
To enable cluster capabilities in your Akka project you should, at a minimum, add the :ref:`remoting-java`
settings, but with ``akka.cluster.ClusterActorRefProvider``.
@ -438,6 +437,107 @@ service nodes and 1 client::
.. note:: The above example, especially the last part, will be simplified when the cluster handles automatic actor partitioning.
Cluster Metrics
^^^^^^^^^^^^^^^
The member nodes of the cluster collects system health metrics and publishes that to other nodes and to
registered subscribers. This information is primarily used for load-balancing routers.
Hyperic Sigar
-------------
The built-in metrics is gathered from JMX MBeans, and optionally you can use `Hyperic Sigar <http://www.hyperic.com/products/sigar>`_
for a wider and more accurate range of metrics compared to what can be retrieved from ordinary MBeans.
Sigar is using a native OS library. To enable usage of Sigar you need to add the directory of the native library to
``-Djava.libarary.path=<path_of_sigar_libs>`` add the following dependency::
<dependency>
<groupId>org.hyperic</groupId>
<artifactId>sigar</artifactId>
<version>@sigarVersion@</version>
</dependency>
Adaptive Load Balancing
-----------------------
The ``AdaptiveLoadBalancingRouter`` performs load balancing of messages to cluster nodes based on the cluster metrics data.
It uses random selection of routees with probabilities derived from the remaining capacity of the corresponding node.
It can be configured to use a specific MetricsSelector to produce the probabilities, a.k.a. weights:
* ``heap`` / ``HeapMetricsSelector`` - Used and max JVM heap memory. Weights based on remaining heap capacity; (max - used) / max
* ``load`` / ``SystemLoadAverageMetricsSelector`` - System load average for the past 1 minute, corresponding value can be found in ``top`` of Linux systems. The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores. Weights based on remaining load capacity; 1 - (load / processors)
* ``cpu`` / ``CpuMetricsSelector`` - CPU utilization in percentage, sum of User + Sys + Nice + Wait. Weights based on remaining cpu capacity; 1 - utilization
* ``mix`` / ``MixMetricsSelector`` - Combines heap, cpu and load. Weights based on mean of remaining capacity of the combined selectors.
* Any custom implementation of ``akka.cluster.routing.MetricsSelector``
The collected metrics values are smoothed with `exponential weighted moving average <http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average>`_. In the :ref:`cluster_configuration_java` you can adjust how quickly past data is decayed compared to new data.
Let's take a look at this router in action.
In this example the following imports are used:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialBackend.java#imports
The backend worker that performs the factorial calculation:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialBackend.java#backend
The frontend that receives user jobs and delegates to the backends via the router:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontend.java#frontend
As you can see, the router is defined in the same way as other routers, and in this case it's configured as follows:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#adaptive-router
It's only router type ``adaptive`` and the ``metrics-selector`` that is specific to this router, other things work
in the same way as other routers.
The same type of router could also have been defined in code:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontend.java#router-lookup-in-code
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontend.java#router-deploy-in-code
This example is included in ``akka-samples/akka-sample-cluster`` and you can try it by copying the
`source <@github@/akka-samples/akka-sample-cluster>`_ to your
maven project, defined as in :ref:`cluster_simple_example_java`.
Run it by starting nodes in different terminal windows. For example, starting 3 backend nodes and
one frontend::
mvn exec:java \
-Dexec.mainClass="sample.cluster.factorial.FactorialBackendMain" \
-Dexec.args="2551"
mvn exec:java \
-Dexec.mainClass="sample.cluster.factorial.FactorialBackendMain" \
-Dexec.args="2552"
mvn exec:java \
-Dexec.mainClass="sample.cluster.factorial.FactorialBackendMain"
mvn exec:java \
-Dexec.mainClass="sample.cluster.factorial.FactorialFrontendMain"
Press ctrl-c in the terminal window of the frontend to stop the factorial calculations.
Subscribe to Metrics Events
---------------------------
It's possible to subscribe to the metrics events directly to implement other functionality.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/MetricsListener.java#metrics-listener
Custom Metrics Collector
------------------------
You can plug-in your own metrics collector instead of
``akka.cluster.SigarMetricsCollector`` or ``akka.cluster.JmxMetricsCollector``. Look at those two implementations
for inspiration. The implementation class can be defined in the :ref:`cluster_configuration_java`.
.. _cluster_jmx_java:

View file

@ -32,8 +32,7 @@ Try it out:
1. Add the following ``application.conf`` in your project, place it in ``src/main/resources``:
.. literalinclude:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf
:language: none
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#cluster
To enable cluster capabilities in your Akka project you should, at a minimum, add the :ref:`remoting-scala`
settings, but with ``akka.cluster.ClusterActorRefProvider``.
@ -265,6 +264,8 @@ This is how the curve looks like for ``acceptable-heartbeat-pause`` configured t
.. image:: images/phi3.png
.. _cluster_aware_routers_scala:
Cluster Aware Routers
^^^^^^^^^^^^^^^^^^^^^
@ -397,6 +398,97 @@ service nodes and 1 client::
.. note:: The above example, especially the last part, will be simplified when the cluster handles automatic actor partitioning.
Cluster Metrics
^^^^^^^^^^^^^^^
The member nodes of the cluster collects system health metrics and publishes that to other nodes and to
registered subscribers. This information is primarily used for load-balancing routers.
Hyperic Sigar
-------------
The built-in metrics is gathered from JMX MBeans, and optionally you can use `Hyperic Sigar <http://www.hyperic.com/products/sigar>`_
for a wider and more accurate range of metrics compared to what can be retrieved from ordinary MBeans.
Sigar is using a native OS library. To enable usage of Sigar you need to add the directory of the native library to
``-Djava.libarary.path=<path_of_sigar_libs>`` add the following dependency::
"org.hyperic" % "sigar" % "@sigarVersion@"
Adaptive Load Balancing
-----------------------
The ``AdaptiveLoadBalancingRouter`` performs load balancing of messages to cluster nodes based on the cluster metrics data.
It uses random selection of routees with probabilities derived from the remaining capacity of the corresponding node.
It can be configured to use a specific MetricsSelector to produce the probabilities, a.k.a. weights:
* ``heap`` / ``HeapMetricsSelector`` - Used and max JVM heap memory. Weights based on remaining heap capacity; (max - used) / max
* ``load`` / ``SystemLoadAverageMetricsSelector`` - System load average for the past 1 minute, corresponding value can be found in ``top`` of Linux systems. The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores. Weights based on remaining load capacity; 1 - (load / processors)
* ``cpu`` / ``CpuMetricsSelector`` - CPU utilization in percentage, sum of User + Sys + Nice + Wait. Weights based on remaining cpu capacity; 1 - utilization
* ``mix`` / ``MixMetricsSelector`` - Combines heap, cpu and load. Weights based on mean of remaining capacity of the combined selectors.
* Any custom implementation of ``akka.cluster.routing.MetricsSelector``
The collected metrics values are smoothed with `exponential weighted moving average <http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average>`_. In the :ref:`cluster_configuration_scala` you can adjust how quickly past data is decayed compared to new data.
Let's take a look at this router in action.
In this example the following imports are used:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#imports
The backend worker that performs the factorial calculation:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#backend
The frontend that receives user jobs and delegates to the backends via the router:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#frontend
As you can see, the router is defined in the same way as other routers, and in this case it's configured as follows:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#adaptive-router
It's only router type ``adaptive`` and the ``metrics-selector`` that is specific to this router, other things work
in the same way as other routers.
The same type of router could also have been defined in code:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#router-lookup-in-code
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#router-deploy-in-code
This example is included in ``akka-samples/akka-sample-cluster``
and you can try by starting nodes in different terminal windows. For example, starting 3 backend nodes and one frontend::
sbt
project akka-sample-cluster-experimental
run-main sample.cluster.factorial.FactorialBackend 2551
run-main sample.cluster.factorial.FactorialBackend 2552
run-main sample.cluster.factorial.FactorialBackend
run-main sample.cluster.factorial.FactorialFrontend
Press ctrl-c in the terminal window of the frontend to stop the factorial calculations.
Subscribe to Metrics Events
---------------------------
It's possible to subscribe to the metrics events directly to implement other functionality.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#metrics-listener
Custom Metrics Collector
------------------------
You can plug-in your own metrics collector instead of
``akka.cluster.SigarMetricsCollector`` or ``akka.cluster.JmxMetricsCollector``. Look at those two implementations
for inspiration. The implementation class can be defined in the :ref:`cluster_configuration_scala`.
How to Test
^^^^^^^^^^^

View file

@ -84,9 +84,9 @@ Gossip
The cluster membership used in Akka is based on Amazon's `Dynamo`_ system and
particularly the approach taken in Basho's' `Riak`_ distributed database.
Cluster membership is communicated using a `Gossip Protocol`_, where the current
state of the cluster is gossiped randomly through the cluster. Joining a cluster
is initiated by issuing a ``Join`` command to one of the nodes in the cluster to
join.
state of the cluster is gossiped randomly through the cluster, with preference to
members that have not seen the latest version. Joining a cluster is initiated
by issuing a ``Join`` command to one of the nodes in the cluster to join.
.. _Gossip Protocol: http://en.wikipedia.org/wiki/Gossip_protocol
.. _Dynamo: http://www.allthingsdistributed.com/files/amazon-dynamo-sosp2007.pdf
@ -209,8 +209,7 @@ node to initiate a round of gossip with. The choice of node is random but can
also include extra gossiping nodes with either newer or older state versions.
The gossip overview contains the current state version for all nodes and also a
list of unreachable nodes. Whenever a node receives a gossip overview it updates
the `Failure Detector`_ with the liveness information.
list of unreachable nodes.
The nodes defined as ``seed`` nodes are just regular member nodes whose only
"special role" is to function as contact points in the cluster.

View file

@ -28,7 +28,13 @@ class RemoteActorRefProvider(
val remoteSettings: RemoteSettings = new RemoteSettings(settings.config, systemName)
val deployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess)
override val deployer: Deployer = createDeployer
/**
* Factory method to make it possible to override deployer in subclass
* Creates a new instance every time
*/
protected def createDeployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess)
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, dynamicAccess, deployer)

View file

@ -0,0 +1,49 @@
package sample.cluster.factorial.japi;
//#imports
import java.math.BigInteger;
import java.util.concurrent.Callable;
import scala.concurrent.Future;
import akka.actor.UntypedActor;
import akka.dispatch.Mapper;
import static akka.dispatch.Futures.future;
import static akka.pattern.Patterns.pipe;
//#imports
//#backend
public class FactorialBackend extends UntypedActor {
@Override
public void onReceive(Object message) {
if (message instanceof Integer) {
final Integer n = (Integer) message;
Future<BigInteger> f = future(new Callable<BigInteger>() {
public BigInteger call() {
return factorial(n);
}
}, getContext().dispatcher());
Future<FactorialResult> result = f.map(
new Mapper<BigInteger, FactorialResult>() {
public FactorialResult apply(BigInteger factorial) {
return new FactorialResult(n, factorial);
}
}, getContext().dispatcher());
pipe(result, getContext().dispatcher()).to(getSender());
} else {
unhandled(message);
}
}
BigInteger factorial(int n) {
BigInteger acc = BigInteger.ONE;
for (int i = 1; i <= n; ++i) {
acc = acc.multiply(BigInteger.valueOf(i));
}
return acc;
}
}
//#backend

View file

@ -0,0 +1,22 @@
package sample.cluster.factorial.japi;
import akka.actor.ActorSystem;
import akka.actor.Props;
public class FactorialBackendMain {
public static void main(String[] args) throws Exception {
// Override the configuration of the port
// when specified as program argument
if (args.length > 0)
System.setProperty("akka.remote.netty.port", args[0]);
ActorSystem system = ActorSystem.create("ClusterSystem");
system.actorOf(new Props(FactorialBackend.class), "factorialBackend");
system.actorOf(new Props(MetricsListener.class), "metricsListener");
}
}

View file

@ -0,0 +1,90 @@
package sample.cluster.factorial.japi;
import akka.actor.UntypedActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.routing.FromConfig;
import akka.cluster.routing.AdaptiveLoadBalancingRouter;
import akka.cluster.routing.ClusterRouterConfig;
import akka.cluster.routing.ClusterRouterSettings;
import akka.cluster.routing.HeapMetricsSelector;
import akka.cluster.routing.SystemLoadAverageMetricsSelector;
//#frontend
public class FactorialFrontend extends UntypedActor {
final int upToN;
final boolean repeat;
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
ActorRef backend = getContext().actorOf(
new Props(FactorialBackend.class).withRouter(FromConfig.getInstance()),
"factorialBackendRouter");
public FactorialFrontend(int upToN, boolean repeat) {
this.upToN = upToN;
this.repeat = repeat;
}
@Override
public void preStart() {
sendJobs();
}
@Override
public void onReceive(Object message) {
if (message instanceof FactorialResult) {
FactorialResult result = (FactorialResult) message;
if (result.n == upToN) {
log.debug("{}! = {}", result.n, result.factorial);
if (repeat) sendJobs();
}
} else {
unhandled(message);
}
}
void sendJobs() {
log.info("Starting batch of factorials up to [{}]", upToN);
for (int n = 1; n <= upToN; n++) {
backend.tell(n, getSelf());
}
}
}
//#frontend
//not used, only for documentation
abstract class FactorialFrontend2 extends UntypedActor {
//#router-lookup-in-code
int totalInstances = 100;
String routeesPath = "/user/statsWorker";
boolean allowLocalRoutees = true;
ActorRef backend = getContext().actorOf(
new Props(FactorialBackend.class).withRouter(new ClusterRouterConfig(
new AdaptiveLoadBalancingRouter(HeapMetricsSelector.getInstance(), 0),
new ClusterRouterSettings(
totalInstances, routeesPath, allowLocalRoutees))),
"factorialBackendRouter2");
//#router-lookup-in-code
}
//not used, only for documentation
abstract class StatsService3 extends UntypedActor {
//#router-deploy-in-code
int totalInstances = 100;
int maxInstancesPerNode = 3;
boolean allowLocalRoutees = false;
ActorRef backend = getContext().actorOf(
new Props(FactorialBackend.class).withRouter(new ClusterRouterConfig(
new AdaptiveLoadBalancingRouter(
SystemLoadAverageMetricsSelector.getInstance(), 0),
new ClusterRouterSettings(
totalInstances, maxInstancesPerNode, allowLocalRoutees))),
"factorialBackendRouter3");
//#router-deploy-in-code
}

View file

@ -0,0 +1,27 @@
package sample.cluster.factorial.japi;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
public class FactorialFrontendMain {
public static void main(String[] args) throws Exception {
final int upToN = (args.length == 0 ? 200 : Integer.valueOf(args[0]));
ActorSystem system = ActorSystem.create("ClusterSystem");
// start the calculations when there is at least 2 other members
system.actorOf(new Props(new UntypedActorFactory() {
@Override
public UntypedActor create() {
return new StartupFrontend(upToN);
}
}), "startup");
}
}

View file

@ -0,0 +1,14 @@
package sample.cluster.factorial.japi;
import java.math.BigInteger;
import java.io.Serializable;
public class FactorialResult implements Serializable {
public final int n;
public final BigInteger factorial;
FactorialResult(int n, BigInteger factorial) {
this.n = n;
this.factorial = factorial;
}
}

View file

@ -0,0 +1,68 @@
package sample.cluster.factorial.japi;
//#metrics-listener
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.ClusterMetricsChanged;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.NodeMetrics;
import akka.cluster.StandardMetrics;
import akka.cluster.StandardMetrics.HeapMemory;
import akka.cluster.StandardMetrics.Cpu;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class MetricsListener extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
Cluster cluster = Cluster.get(getContext().system());
//subscribe to ClusterMetricsChanged
@Override
public void preStart() {
cluster.subscribe(getSelf(), ClusterMetricsChanged.class);
}
//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
}
@Override
public void onReceive(Object message) {
if (message instanceof ClusterMetricsChanged) {
ClusterMetricsChanged clusterMetrics = (ClusterMetricsChanged) message;
for (NodeMetrics nodeMetrics : clusterMetrics.getNodeMetrics()) {
if (nodeMetrics.address().equals(cluster.selfAddress())) {
logHeap(nodeMetrics);
logCpu(nodeMetrics);
}
}
} else if (message instanceof CurrentClusterState) {
// ignore
} else {
unhandled(message);
}
}
void logHeap(NodeMetrics nodeMetrics) {
HeapMemory heap = StandardMetrics.extractHeapMemory(nodeMetrics);
if (heap != null) {
log.info("Used heap: {} MB", ((double) heap.used()) / 1024 / 1024);
}
}
void logCpu(NodeMetrics nodeMetrics) {
Cpu cpu = StandardMetrics.extractCpu(nodeMetrics);
if (cpu != null && cpu.systemLoadAverage().isDefined()) {
log.info("Load: {} ({} processors)", cpu.systemLoadAverage().get(),
cpu.processors());
}
}
}
//#metrics-listener

View file

@ -0,0 +1,56 @@
package sample.cluster.factorial.japi;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberUp;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class StartupFrontend extends UntypedActor {
final int upToN;
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
int memberCount = 0;
public StartupFrontend(int upToN) {
this.upToN = upToN;
}
//subscribe to ClusterMetricsChanged
@Override
public void preStart() {
log.info("Factorials will start when 3 members in the cluster.");
Cluster.get(getContext().system()).subscribe(getSelf(), MemberUp.class);
}
@Override
public void onReceive(Object message) {
if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
memberCount = state.members().size();
runWhenReady();
} else if (message instanceof MemberUp) {
memberCount++;
runWhenReady();
} else {
unhandled(message);
}
}
void runWhenReady() {
if (memberCount >= 3) {
getContext().system().actorOf(new Props(new UntypedActorFactory() {
@Override
public UntypedActor create() {
return new FactorialFrontend(upToN, true);
}
}), "factorialFrontend");
getContext().stop(getSelf());
}
}
}

View file

@ -1,3 +1,4 @@
# //#cluster
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
@ -20,4 +21,23 @@ akka {
auto-down = on
}
}
}
# //#cluster
# //#adaptive-router
akka.actor.deployment {
/factorialFrontend/factorialBackendRouter = {
router = adaptive
# metrics-selector = heap
# metrics-selector = load
# metrics-selector = cpu
metrics-selector = mix
nr-of-instances = 100
cluster {
enabled = on
routees-path = "/user/factorialBackend"
allow-local-routees = off
}
}
}
# //#adaptive-router

View file

@ -0,0 +1,184 @@
package sample.cluster.factorial
//#imports
import scala.annotation.tailrec
import scala.concurrent.Future
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.pattern.pipe
import akka.routing.FromConfig
//#imports
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp
object FactorialFrontend {
def main(args: Array[String]): Unit = {
val upToN = if (args.isEmpty) 200 else args(0).toInt
val system = ActorSystem("ClusterSystem")
// start the calculations when there is at least 2 other members
system.actorOf(Props(new Actor with ActorLogging {
var memberCount = 0
log.info("Factorials will start when 3 members in the cluster.")
Cluster(context.system).subscribe(self, classOf[MemberUp])
def receive = {
case state: CurrentClusterState
memberCount = state.members.size
runWhenReady()
case MemberUp(member)
memberCount += 1
runWhenReady()
}
def runWhenReady(): Unit = if (memberCount >= 3) {
context.system.actorOf(Props(new FactorialFrontend(upToN, repeat = true)),
name = "factorialFrontend")
context stop self
}
}), name = "startup")
}
}
//#frontend
class FactorialFrontend(upToN: Int, repeat: Boolean) extends Actor with ActorLogging {
val backend = context.actorOf(Props[FactorialBackend].withRouter(FromConfig),
name = "factorialBackendRouter")
override def preStart(): Unit = sendJobs()
def receive = {
case (n: Int, factorial: BigInt)
if (n == upToN) {
log.debug("{}! = {}", n, factorial)
if (repeat) sendJobs()
}
}
def sendJobs(): Unit = {
log.info("Starting batch of factorials up to [{}]", upToN)
1 to upToN foreach { backend ! _ }
}
}
//#frontend
object FactorialBackend {
def main(args: Array[String]): Unit = {
// Override the configuration of the port
// when specified as program argument
if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0))
val system = ActorSystem("ClusterSystem")
system.actorOf(Props[FactorialBackend], name = "factorialBackend")
system.actorOf(Props[MetricsListener], name = "metricsListener")
}
}
//#backend
class FactorialBackend extends Actor with ActorLogging {
import context.dispatcher
def receive = {
case (n: Int)
Future(factorial(n)) map { result (n, result) } pipeTo sender
}
def factorial(n: Int): BigInt = {
@tailrec def factorialAcc(acc: BigInt, n: Int): BigInt = {
if (n <= 1) acc
else factorialAcc(acc * n, n - 1)
}
factorialAcc(BigInt(1), n)
}
}
//#backend
//#metrics-listener
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.ClusterMetricsChanged
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.NodeMetrics
import akka.cluster.StandardMetrics.HeapMemory
import akka.cluster.StandardMetrics.Cpu
class MetricsListener extends Actor with ActorLogging {
val selfAddress = Cluster(context.system).selfAddress
// subscribe to ClusterMetricsChanged
// re-subscribe when restart
override def preStart(): Unit =
Cluster(context.system).subscribe(self, classOf[ClusterMetricsChanged])
override def postStop(): Unit =
Cluster(context.system).unsubscribe(self)
def receive = {
case ClusterMetricsChanged(clusterMetrics)
clusterMetrics.filter(_.address == selfAddress) foreach { nodeMetrics
logHeap(nodeMetrics)
logCpu(nodeMetrics)
}
case state: CurrentClusterState // ignore
}
def logHeap(nodeMetrics: NodeMetrics): Unit = nodeMetrics match {
case HeapMemory(address, timestamp, used, committed, max)
log.info("Used heap: {} MB", used.doubleValue / 1024 / 1024)
case _ // no heap info
}
def logCpu(nodeMetrics: NodeMetrics): Unit = nodeMetrics match {
case Cpu(address, timestamp, Some(systemLoadAverage), cpuCombined, processors)
log.info("Load: {} ({} processors)", systemLoadAverage, processors)
case _ // no cpu info
}
}
//#metrics-listener
// not used, only for documentation
abstract class FactorialFrontend2 extends Actor {
//#router-lookup-in-code
import akka.cluster.routing.ClusterRouterConfig
import akka.cluster.routing.ClusterRouterSettings
import akka.cluster.routing.AdaptiveLoadBalancingRouter
import akka.cluster.routing.HeapMetricsSelector
val backend = context.actorOf(Props[FactorialBackend].withRouter(
ClusterRouterConfig(AdaptiveLoadBalancingRouter(HeapMetricsSelector),
ClusterRouterSettings(
totalInstances = 100, routeesPath = "/user/statsWorker",
allowLocalRoutees = true))),
name = "factorialBackendRouter2")
//#router-lookup-in-code
}
// not used, only for documentation
abstract class FactorialFrontend3 extends Actor {
//#router-deploy-in-code
import akka.cluster.routing.ClusterRouterConfig
import akka.cluster.routing.ClusterRouterSettings
import akka.cluster.routing.AdaptiveLoadBalancingRouter
import akka.cluster.routing.SystemLoadAverageMetricsSelector
val backend = context.actorOf(Props[FactorialBackend].withRouter(
ClusterRouterConfig(AdaptiveLoadBalancingRouter(
SystemLoadAverageMetricsSelector), ClusterRouterSettings(
totalInstances = 100, maxInstancesPerNode = 3,
allowLocalRoutees = false))),
name = "factorialBackendRouter3")
//#router-deploy-in-code
}

View file

@ -33,6 +33,8 @@ object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
#//#router-deploy-config
akka.actor.deployment {
/statsFacade/statsService/workerRouter {

View file

@ -27,6 +27,8 @@ object StatsSampleSpecConfig extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
#//#router-lookup-config
akka.actor.deployment {
/statsService/workerRouter {

View file

@ -31,6 +31,8 @@ object StatsSampleJapiSpecConfig extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
akka.actor.deployment {
/statsService/workerRouter {
router = consistent-hashing

View file

@ -34,6 +34,8 @@ object StatsSampleSingleMasterJapiSpecConfig extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
akka.actor.deployment {
/statsFacade/statsService/workerRouter {
router = consistent-hashing

View file

@ -29,6 +29,8 @@ object TransformationSampleSpecConfig extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
"""))
}

View file

@ -30,6 +30,8 @@ object TransformationSampleJapiSpecConfig extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
"""))
}

View file

@ -325,7 +325,13 @@ object AkkaBuild extends Build {
base = file("akka-samples/akka-sample-cluster"),
dependencies = Seq(cluster, remoteTests % "test", testkit % "test"),
settings = sampleSettings ++ multiJvmSettings ++ experimentalSettings ++ Seq(
// sigar is in Typesafe repo
resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/",
libraryDependencies ++= Dependencies.clusterSample,
javaOptions in run ++= Seq(
"-Djava.library.path=./sigar",
"-Xms128m", "-Xmx1024m"),
Keys.fork in run := true,
// disable parallel tests
parallelExecution in Test := false,
extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src =>
@ -446,7 +452,9 @@ object AkkaBuild extends Build {
case key: String if key.startsWith("multinode.") => "-D" + key + "=" + System.getProperty(key)
case key: String if key.startsWith("akka.") => "-D" + key + "=" + System.getProperty(key)
}
akkaProperties ::: (if (getBoolean("sbt.log.noformat")) List("-Dakka.test.nocolor=true") else Nil)
"-Xmx256m" :: akkaProperties :::
(if (getBoolean("sbt.log.noformat")) List("-Dakka.test.nocolor=true") else Nil)
}
// for excluding tests by name use system property: -Dakka.test.names.exclude=TimingSpec
@ -549,6 +557,7 @@ object AkkaBuild extends Build {
case BinVer(bv) => bv
case _ => s
}),
"sigarVersion" -> Dependencies.Compile.sigar.revision,
"github" -> "http://github.com/akka/akka/tree/%s".format((if (isSnapshot) "master" else "v" + v))
)
},
@ -675,6 +684,9 @@ object Dependencies {
// Camel Sample
val camelJetty = "org.apache.camel" % "camel-jetty" % camelCore.revision // ApacheV2
// Cluster Sample
val sigar = "org.hyperic" % "sigar" % "1.6.4" // ApacheV2
// Test
object Test {
@ -731,7 +743,7 @@ object Dependencies {
val zeroMQ = Seq(protobuf, zeroMQClient, Test.scalatest, Test.junit)
val clusterSample = Seq(Test.scalatest)
val clusterSample = Seq(Test.scalatest, sigar)
val contrib = Seq(Test.junitIntf)