diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala
index 071195c11e..2fb6a37469 100644
--- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala
+++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala
@@ -32,7 +32,7 @@ object Serialization {
private final def configToMap(path: String): Map[String, String] = {
import scala.collection.JavaConverters._
- config.getConfig(path).root.unwrapped.asScala.mapValues(_.toString).toMap
+ config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) ⇒ (k -> v.toString) }
}
}
}
diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf
index f24aa2dee8..53b14e2842 100644
--- a/akka-cluster/src/main/resources/reference.conf
+++ b/akka-cluster/src/main/resources/reference.conf
@@ -117,7 +117,8 @@ akka {
# is on the classpath, otherwise only JMX.
collector-class = "akka.cluster.SigarMetricsCollector"
- # How often metrics is sampled on a node.
+ # How often metrics are sampled on a node.
+ # Shorter interval will collect the metrics more often.
collect-interval = 3s
# How often a node publishes metrics information.
@@ -125,9 +126,12 @@ akka {
# How quickly the exponential weighting of past data is decayed compared to
# new data. Set lower to increase the bias toward newer values.
- # It takes about 4 half-life to drop below 10% contribution, and 7 to drop
- # below 1%.
- decay-half-life-duration = 12s
+ # The relevance of each data sample is halved for every passing half-life duration,
+ # i.e. after 4 times the half-life, a data sample’s relevance is reduced to 6% of
+ # its original relevance. The initial relevance of a data sample is given by
+ # 1 – 0.5 ^ (collect-interval / half-life).
+ # See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
+ moving-average-half-life = 12s
}
# If the tick-duration of the default scheduler is longer than the
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala
index 3b0b8a1d1a..a4a1b37e86 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala
@@ -54,7 +54,11 @@ class ClusterActorRefProvider(
remoteDeploymentWatcher = system.systemActorOf(Props[RemoteDeploymentWatcher], "RemoteDeploymentWatcher")
}
- override lazy val deployer: ClusterDeployer = new ClusterDeployer(settings, dynamicAccess)
+ /**
+ * Factory method to make it possible to override deployer in subclass
+ * Creates a new instance every time
+ */
+ override protected def createDeployer: ClusterDeployer = new ClusterDeployer(settings, dynamicAccess)
/**
* This method is overridden here to keep track of remote deployed actors to
@@ -126,7 +130,7 @@ private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: Dynami
routerType match {
case "adaptive" ⇒
val metricsSelector = deployment.getString("metrics-selector") match {
- case "mix" ⇒ MixMetricsSelector()
+ case "mix" ⇒ MixMetricsSelector
case "heap" ⇒ HeapMetricsSelector
case "cpu" ⇒ CpuMetricsSelector
case "load" ⇒ SystemLoadAverageMetricsSelector
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala
index 3823434f5b..26fd48b494 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala
@@ -12,7 +12,6 @@ import java.lang.reflect.Method
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
-import scala.runtime.{ RichLong, RichDouble, RichInt }
import scala.util.{ Try, Success, Failure }
import akka.ConfigurationException
import akka.actor.Actor
@@ -183,9 +182,10 @@ private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics]) {
*/
def merge(remoteGossip: MetricsGossip): MetricsGossip = {
val remoteNodes = remoteGossip.nodes.map(n ⇒ n.address -> n).toMap
- val toMerge = nodeKeys intersect remoteNodes.keySet
- val onlyInRemote = remoteNodes.keySet -- nodeKeys
- val onlyInLocal = nodeKeys -- remoteNodes.keySet
+ val remoteNodesKeySet = remoteNodes.keySet
+ val toMerge = nodeKeys intersect remoteNodesKeySet
+ val onlyInRemote = remoteNodesKeySet -- nodeKeys
+ val onlyInLocal = nodeKeys -- remoteNodesKeySet
val seen = nodes.collect {
case n if toMerge contains n.address ⇒ n merge remoteNodes(n.address)
@@ -198,7 +198,7 @@ private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics]) {
}
/**
- * Adds new local [[akka.cluster.NodeMetrics]] and initializes the data, or merges an existing.
+ * Adds new local [[akka.cluster.NodeMetrics]], or merges an existing.
*/
def :+(data: NodeMetrics): MetricsGossip = {
val previous = metricsFor(data.address)
@@ -232,16 +232,24 @@ private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics]) {
private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip) extends ClusterMessage
object EWMA {
+ /**
+ * math.log(2)
+ */
+ private val LogOf2 = 0.69315
+
/**
* Calculate the alpha (decay factor) used in [[akka.cluster.EWMA]]
* from specified half-life and interval between observations.
- * It takes about 4 half-life to drop below 10% contribution, and 7 to drop
- * below 1%.
+ * Half-life is the interval over which the weights decrease by a factor of two.
+ * The relevance of each data sample is halved for every passing half-life duration,
+ * i.e. after 4 times the half-life, a data sample’s relevance is reduced to 6% of
+ * its original relevance. The initial relevance of a data sample is given by
+ * 1 – 0.5 ^ (collect-interval / half-life).
*/
def alpha(halfLife: FiniteDuration, collectInterval: FiniteDuration): Double = {
val halfLifeMillis = halfLife.toMillis
require(halfLife.toMillis > 0, "halfLife must be > 0 s")
- val decayRate = 0.69315 / halfLifeMillis
+ val decayRate = LogOf2 / halfLifeMillis
1 - math.exp(-decayRate * collectInterval.toMillis)
}
}
@@ -277,7 +285,11 @@ private[cluster] case class EWMA(value: Double, alpha: Double) extends ClusterMe
* @param xn the new data point
* @return a new [[akka.cluster.EWMA]] with the updated value
*/
- def :+(xn: Double): EWMA = copy(value = (alpha * xn) + (1 - alpha) * value)
+ def :+(xn: Double): EWMA = {
+ val newValue = (alpha * xn) + (1 - alpha) * value
+ if (newValue == value) this // no change
+ else copy(value = newValue)
+ }
}
@@ -291,7 +303,7 @@ private[cluster] case class EWMA(value: Double, alpha: Double) extends ClusterMe
case class Metric private (name: String, value: Number, private val average: Option[EWMA])
extends ClusterMessage with MetricNumericConverter {
- require(defined(value), "Invalid Metric [%s] value [%]".format(name, value))
+ require(defined(value), s"Invalid Metric [$name] value [$value]")
/**
* If defined ( [[akka.cluster.MetricNumericConverter.defined()]] ), updates the new
@@ -398,16 +410,17 @@ case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] =
*/
object StandardMetrics {
+ // Constants for the heap related Metric names
+ final val HeapMemoryUsed = "heap-memory-used"
+ final val HeapMemoryCommitted = "heap-memory-committed"
+ final val HeapMemoryMax = "heap-memory-max"
+
+ // Constants for the cpu related Metric names
+ final val SystemLoadAverage = "system-load-average"
+ final val Processors = "processors"
+ final val CpuCombined = "cpu-combined"
+
object HeapMemory {
- /**
- * Constants for the heap related Metric names
- */
- object Fields {
- final val HeapMemoryUsed = "heap-memory-used"
- final val HeapMemoryCommitted = "heap-memory-committed"
- final val HeapMemoryMax = "heap-memory-max"
- }
- import Fields._
/**
* Given a NodeMetrics it returns the HeapMemory data if the nodeMetrics contains
@@ -418,9 +431,9 @@ object StandardMetrics {
for {
used ← nodeMetrics.metric(HeapMemoryUsed)
committed ← nodeMetrics.metric(HeapMemoryCommitted)
- maxOption = nodeMetrics.metric(HeapMemoryMax).map(_.smoothValue.longValue)
} yield (nodeMetrics.address, nodeMetrics.timestamp,
- used.smoothValue.longValue, committed.smoothValue.longValue, maxOption)
+ used.smoothValue.longValue, committed.smoothValue.longValue,
+ nodeMetrics.metric(HeapMemoryMax).map(_.smoothValue.longValue))
}
}
@@ -431,6 +444,7 @@ object StandardMetrics {
*/
def extractHeapMemory(nodeMetrics: NodeMetrics): HeapMemory = nodeMetrics match {
case HeapMemory(address, timestamp, used, committed, max) ⇒
+ // note that above extractor returns tuple
HeapMemory(address, timestamp, used, committed, max)
case _ ⇒ null
}
@@ -454,15 +468,6 @@ object StandardMetrics {
}
object Cpu {
- /**
- * Constants for the cpu related Metric names
- */
- object Fields {
- final val SystemLoadAverage = "system-load-average"
- final val Processors = "processors"
- final val CpuCombined = "cpu-combined"
- }
- import Fields._
/**
* Given a NodeMetrics it returns the Cpu data if the nodeMetrics contains
@@ -472,10 +477,9 @@ object StandardMetrics {
def unapply(nodeMetrics: NodeMetrics): Option[(Address, Long, Option[Double], Option[Double], Int)] = {
for {
processors ← nodeMetrics.metric(Processors)
- systemLoadAverageOption = nodeMetrics.metric(SystemLoadAverage).map(_.smoothValue)
- cpuCombinedOption = nodeMetrics.metric(CpuCombined).map(_.smoothValue)
} yield (nodeMetrics.address, nodeMetrics.timestamp,
- systemLoadAverageOption, cpuCombinedOption, processors.value.intValue)
+ nodeMetrics.metric(SystemLoadAverage).map(_.smoothValue),
+ nodeMetrics.metric(CpuCombined).map(_.smoothValue), processors.value.intValue)
}
}
@@ -486,6 +490,7 @@ object StandardMetrics {
*/
def extractCpu(nodeMetrics: NodeMetrics): Cpu = nodeMetrics match {
case Cpu(address, timestamp, systemLoadAverage, cpuCombined, processors) ⇒
+ // note that above extractor returns tuple
Cpu(address, timestamp, systemLoadAverage, cpuCombined, processors)
case _ ⇒ null
}
@@ -508,7 +513,7 @@ object StandardMetrics {
processors: Int) {
cpuCombined match {
- case Some(x) ⇒ require(0.0 <= x && x <= 1.0, "cpuCombined must be between [0.0 - 1.0], was [%s]" format x)
+ case Some(x) ⇒ require(0.0 <= x && x <= 1.0, s"cpuCombined must be between [0.0 - 1.0], was [$x]")
case None ⇒
}
@@ -542,12 +547,9 @@ private[cluster] trait MetricNumericConverter {
case n: Long ⇒ Left(n)
case n: Double ⇒ Right(n)
case n: Float ⇒ Right(n)
- case n: RichInt ⇒ Left(n.abs)
- case n: RichLong ⇒ Left(n.self)
- case n: RichDouble ⇒ Right(n.self)
case n: BigInt ⇒ Left(n.longValue)
case n: BigDecimal ⇒ Right(n.doubleValue)
- case x ⇒ throw new IllegalArgumentException("Not a number [%s]" format x)
+ case x ⇒ throw new IllegalArgumentException(s"Not a number [$x]")
}
}
@@ -569,13 +571,15 @@ private[cluster] trait MetricsCollector extends Closeable {
* @param decay how quickly the exponential weighting of past data is decayed
*/
class JmxMetricsCollector(address: Address, decayFactor: Double) extends MetricsCollector {
- import StandardMetrics.HeapMemory.Fields._
- import StandardMetrics.Cpu.Fields._
+ import StandardMetrics._
private def this(cluster: Cluster) =
this(cluster.selfAddress,
- EWMA.alpha(cluster.settings.MetricsDecayHalfLifeDuration, cluster.settings.MetricsInterval))
+ EWMA.alpha(cluster.settings.MetricsMovingAverageHalfLife, cluster.settings.MetricsInterval))
+ /**
+ * This constructor is used when creating an instance from configured FQCN
+ */
def this(system: ActorSystem) = this(Cluster(system))
private val decayFactorOption = Some(decayFactor)
@@ -586,6 +590,7 @@ class JmxMetricsCollector(address: Address, decayFactor: Double) extends Metrics
/**
* Samples and collects new data points.
+ * Creates a new instance each time.
*/
def sample: NodeMetrics = NodeMetrics(address, newTimestamp, metrics)
@@ -598,6 +603,7 @@ class JmxMetricsCollector(address: Address, decayFactor: Double) extends Metrics
* JMX Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
* On some systems the JMX OS system load average may not be available, in which case a -1 is
* returned from JMX, and None is returned from this method.
+ * Creates a new instance each time.
*/
def systemLoadAverage: Option[Metric] = Metric.create(
name = SystemLoadAverage,
@@ -606,6 +612,7 @@ class JmxMetricsCollector(address: Address, decayFactor: Double) extends Metrics
/**
* (JMX) Returns the number of available processors
+ * Creates a new instance each time.
*/
def processors: Option[Metric] = Metric.create(
name = Processors,
@@ -619,6 +626,7 @@ class JmxMetricsCollector(address: Address, decayFactor: Double) extends Metrics
/**
* (JMX) Returns the current sum of heap memory used from all heap memory pools (in bytes).
+ * Creates a new instance each time.
*/
def heapUsed(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryUsed,
@@ -628,6 +636,7 @@ class JmxMetricsCollector(address: Address, decayFactor: Double) extends Metrics
/**
* (JMX) Returns the current sum of heap memory guaranteed to be available to the JVM
* from all heap memory pools (in bytes).
+ * Creates a new instance each time.
*/
def heapCommitted(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryCommitted,
@@ -638,6 +647,7 @@ class JmxMetricsCollector(address: Address, decayFactor: Double) extends Metrics
* (JMX) Returns the maximum amount of memory (in bytes) that can be used
* for JVM memory management. If not defined the metrics value is None, i.e.
* never negative.
+ * Creates a new instance each time.
*/
def heapMax(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryMax,
@@ -665,14 +675,16 @@ class JmxMetricsCollector(address: Address, decayFactor: Double) extends Metrics
class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: AnyRef)
extends JmxMetricsCollector(address, decayFactor) {
- import StandardMetrics.HeapMemory.Fields._
- import StandardMetrics.Cpu.Fields._
+ import StandardMetrics._
private def this(cluster: Cluster) =
this(cluster.selfAddress,
- EWMA.alpha(cluster.settings.MetricsDecayHalfLifeDuration, cluster.settings.MetricsInterval),
+ EWMA.alpha(cluster.settings.MetricsMovingAverageHalfLife, cluster.settings.MetricsInterval),
cluster.system.dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Nil).get)
+ /**
+ * This constructor is used when creating an instance from configured FQCN
+ */
def this(system: ActorSystem) = this(Cluster(system))
private val decayFactorOption = Some(decayFactor)
@@ -689,7 +701,7 @@ class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: AnyRef
try method.invoke(sigar).asInstanceOf[Long] catch {
case e: InvocationTargetException if e.getCause.isInstanceOf[LinkageError] ⇒
// native libraries not in place
- // don't throw fatal LinkageError, but something less harmless
+ // don't throw fatal LinkageError, but something harmless
throw new IllegalArgumentException(e.getCause.toString)
case e: InvocationTargetException ⇒ throw e.getCause
}
@@ -705,6 +717,7 @@ class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: AnyRef
* On some systems the JMX OS system load average may not be available, in which case a -1 is returned
* from JMX, which means that None is returned from this method.
* Hyperic SIGAR provides more precise values, thus, if the library is on the classpath, it is the default.
+ * Creates a new instance each time.
*/
override def systemLoadAverage: Option[Metric] = Metric.create(
name = SystemLoadAverage,
@@ -718,6 +731,8 @@ class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: AnyRef
*
* In the data stream, this will sometimes return with a valid metric value, and sometimes as a NaN or Infinite.
* Documented bug https://bugzilla.redhat.com/show_bug.cgi?id=749121 and several others.
+ *
+ * Creates a new instance each time.
*/
def cpuCombined: Option[Metric] = Metric.create(
name = CpuCombined,
@@ -756,10 +771,10 @@ private[cluster] object MetricsCollector {
}
} else {
- system.dynamicAccess.createInstanceFor[MetricsCollector](
- fqcn, List(classOf[ActorSystem] -> system)).recover({
+ system.dynamicAccess.createInstanceFor[MetricsCollector](fqcn, List(classOf[ActorSystem] -> system)).
+ recover {
case e ⇒ throw new ConfigurationException("Could not create custom metrics collector [" + fqcn + "] due to:" + e.toString)
- }).get
+ }.get
}
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
index da03e2009e..6861459168 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala
@@ -77,9 +77,9 @@ class ClusterSettings(val config: Config, val systemName: String) {
require(d > Duration.Zero, "metrics.collect-interval must be > 0"); d
}
final val MetricsGossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.gossip-interval"), MILLISECONDS)
- final val MetricsDecayHalfLifeDuration: FiniteDuration = {
- val d = Duration(getMilliseconds("akka.cluster.metrics.decay-half-life-duration"), MILLISECONDS)
- require(d > Duration.Zero, "metrics.decay-half-life-duration must be > 0"); d
+ final val MetricsMovingAverageHalfLife: FiniteDuration = {
+ val d = Duration(getMilliseconds("akka.cluster.metrics.moving-average-half-life"), MILLISECONDS)
+ require(d > Duration.Zero, "metrics.moving-average-half-life must be > 0"); d
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala
index fc850e2d69..430aeaeeaf 100644
--- a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala
@@ -33,13 +33,13 @@ import akka.routing.RouteeProvider
import akka.routing.RouterConfig
object AdaptiveLoadBalancingRouter {
- private val defaultSupervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
+ private val escalateStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ ⇒ SupervisorStrategy.Escalate
}
}
/**
- * A Router that performs load balancing to cluster nodes based on
+ * A Router that performs load balancing of messages to cluster nodes based on
* cluster metric data.
*
* It uses random selection of routees based probabilities derived from
@@ -71,11 +71,11 @@ object AdaptiveLoadBalancingRouter {
*/
@SerialVersionUID(1L)
case class AdaptiveLoadBalancingRouter(
- metricsSelector: MetricsSelector = MixMetricsSelector(),
+ metricsSelector: MetricsSelector = MixMetricsSelector,
nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
- val supervisorStrategy: SupervisorStrategy = AdaptiveLoadBalancingRouter.defaultSupervisorStrategy)
+ val supervisorStrategy: SupervisorStrategy = AdaptiveLoadBalancingRouter.escalateStrategy)
extends RouterConfig with AdaptiveLoadBalancingRouterLike {
/**
@@ -137,7 +137,7 @@ case class AdaptiveLoadBalancingRouter(
/**
* INTERNAL API.
*
- * This strategy is a metrics-aware router which performs load balancing of
+ * This strategy is a metrics-aware router which performs load balancing of messages to
* cluster nodes based on cluster metric data. It consumes [[akka.cluster.ClusterMetricsChanged]]
* events and the [[akka.cluster.routing.MetricsSelector]] creates an mix of
* weighted routees based on the node metrics. Messages are routed randomly to the
@@ -181,7 +181,7 @@ trait AdaptiveLoadBalancingRouterLike { this: RouterConfig ⇒
}
def receiveMetrics(metrics: Set[NodeMetrics]): Unit = {
- // update the state outside of the actor, not a recommended practice, but works fine here
+ // this is the only place from where weightedRoutees is updated
weightedRoutees = Some(new WeightedRoutees(routeeProvider.routees, cluster.selfAddress,
metricsSelector.weights(metrics)))
}
@@ -275,6 +275,20 @@ case object SystemLoadAverageMetricsSelector extends CapacityMetricsSelector {
}
}
+/**
+ * Singleton instance of the default MixMetricsSelector, which uses [akka.cluster.routing.HeapMetricsSelector],
+ * [akka.cluster.routing.CpuMetricsSelector], and [akka.cluster.routing.SystemLoadAverageMetricsSelector]
+ */
+@SerialVersionUID(1L)
+object MixMetricsSelector extends MixMetricsSelectorBase(
+ Vector(HeapMetricsSelector, CpuMetricsSelector, SystemLoadAverageMetricsSelector)) {
+
+ /**
+ * Java API: get the default singleton instance
+ */
+ def getInstance = this
+}
+
/**
* MetricsSelector that combines other selectors and aggregates their capacity
* values. By default it uses [akka.cluster.routing.HeapMetricsSelector],
@@ -282,8 +296,14 @@ case object SystemLoadAverageMetricsSelector extends CapacityMetricsSelector {
*/
@SerialVersionUID(1L)
case class MixMetricsSelector(
- selectors: immutable.IndexedSeq[CapacityMetricsSelector] = Vector(
- HeapMetricsSelector, CpuMetricsSelector, SystemLoadAverageMetricsSelector))
+ selectors: immutable.IndexedSeq[CapacityMetricsSelector])
+ extends MixMetricsSelectorBase(selectors)
+
+/**
+ * Base class for MetricsSelector that combines other selectors and aggregates their capacity.
+ */
+@SerialVersionUID(1L)
+abstract class MixMetricsSelectorBase(selectors: immutable.IndexedSeq[CapacityMetricsSelector])
extends CapacityMetricsSelector {
/**
@@ -298,20 +318,13 @@ case class MixMetricsSelector(
case (acc, (address, capacity)) ⇒
val (sum, count) = acc(address)
acc + (address -> (sum + capacity, count + 1))
- }.mapValues {
- case (sum, count) ⇒ sum / count
- }.toMap
+ }.map {
+ case (addr, (sum, count)) ⇒ (addr -> sum / count)
+ }
}
}
-case object MixMetricsSelector {
- /**
- * Java API: get the default singleton instance
- */
- def getInstance = MixMetricsSelector()
-}
-
/**
* A MetricsSelector is responsible for producing weights from the node metrics.
*/
@@ -349,7 +362,7 @@ abstract class CapacityMetricsSelector extends MetricsSelector {
val (_, min) = capacity.minBy { case (_, c) ⇒ c }
// lowest usable capacity is 1% (>= 0.5% will be rounded to weight 1), also avoids div by zero
val divisor = math.max(0.01, min)
- capacity mapValues { c ⇒ math.round((c) / divisor).toInt }
+ capacity map { case (addr, c) ⇒ (addr -> math.round((c) / divisor).toInt) }
}
}
@@ -379,12 +392,13 @@ private[cluster] class WeightedRoutees(refs: immutable.IndexedSeq[ActorRef], sel
}
val buckets = Array.ofDim[Int](refs.size)
val meanWeight = if (weights.isEmpty) 1 else weights.values.sum / weights.size
- val w = weights.withDefaultValue(meanWeight)
+ val w = weights.withDefaultValue(meanWeight) // we don’t necessarily have metrics for all addresses
+ var i = 0
var sum = 0
- refs.zipWithIndex foreach {
- case (ref, i) ⇒
- sum += w(fullAddress(ref))
- buckets(i) = sum
+ refs foreach { ref ⇒
+ sum += w(fullAddress(ref))
+ buckets(i) = sum
+ i += 1
}
buckets
}
@@ -397,20 +411,21 @@ private[cluster] class WeightedRoutees(refs: immutable.IndexedSeq[ActorRef], sel
* Pick the routee matching a value, from 1 to total.
*/
def apply(value: Int): ActorRef = {
- // converts the result of Arrays.binarySearch into a index in the buckets array
- // see documentation of Arrays.binarySearch for what it returns
- def idx(i: Int): Int = {
- if (i >= 0) i // exact match
- else {
- val j = math.abs(i + 1)
- if (j >= buckets.length) throw new IndexOutOfBoundsException(
- "Requested index [%s] is > max index [%s]".format(i, buckets.length))
- else j
- }
- }
-
require(1 <= value && value <= total, "value must be between [1 - %s]" format total)
refs(idx(Arrays.binarySearch(buckets, value)))
+ }
+ /**
+ * Converts the result of Arrays.binarySearch into a index in the buckets array
+ * see documentation of Arrays.binarySearch for what it returns
+ */
+ private def idx(i: Int): Int = {
+ if (i >= 0) i // exact match
+ else {
+ val j = math.abs(i + 1)
+ if (j >= buckets.length) throw new IndexOutOfBoundsException(
+ "Requested index [%s] is > max index [%s]".format(i, buckets.length))
+ else j
+ }
}
}
\ No newline at end of file
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/AdaptiveLoadBalancingRouterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/AdaptiveLoadBalancingRouterSpec.scala
index 79143a6b4c..f936c5d0a7 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/AdaptiveLoadBalancingRouterSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/AdaptiveLoadBalancingRouterSpec.scala
@@ -131,10 +131,10 @@ abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoa
val router1 = startRouter("router1")
// collect some metrics before we start
- Thread.sleep(10000)
+ Thread.sleep(cluster.settings.MetricsInterval.toMillis * 10)
val iterationCount = 100
- for (i ← 0 until iterationCount) {
+ 1 to iterationCount foreach { _ ⇒
router1 ! "hit"
// wait a while between each message, since metrics is collected periodically
Thread.sleep(10)
diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala
index 547cfb8211..a857a3363c 100644
--- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala
@@ -50,7 +50,7 @@ class ClusterConfigSpec extends AkkaSpec {
MetricsCollectorClass must be(classOf[SigarMetricsCollector].getName)
MetricsInterval must be(3 seconds)
MetricsGossipInterval must be(3 seconds)
- MetricsDecayHalfLifeDuration must be(12 seconds)
+ MetricsMovingAverageHalfLife must be(12 seconds)
}
}
}
diff --git a/akka-cluster/src/test/scala/akka/cluster/EWMASpec.scala b/akka-cluster/src/test/scala/akka/cluster/EWMASpec.scala
index 760f64206f..ed954b7bb6 100644
--- a/akka-cluster/src/test/scala/akka/cluster/EWMASpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/EWMASpec.scala
@@ -94,7 +94,7 @@ class EWMASpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsCollector
} else None
}
}
- streamingDataSet ++= changes.map(m ⇒ m.name -> m).toMap
+ streamingDataSet ++= changes.map(m ⇒ m.name -> m)
}
}
}
diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala
index e20e670879..1b7e30eae1 100644
--- a/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala
@@ -5,8 +5,8 @@
package akka.cluster
import akka.testkit.{ ImplicitSender, AkkaSpec }
-import akka.cluster.StandardMetrics.HeapMemory.Fields._
-import scala.util.Try
+import akka.cluster.StandardMetrics._
+import scala.util.Failure
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricNumericConverter with ImplicitSender
@@ -33,7 +33,7 @@ class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) wit
"define an undefined value with a None " in {
Metric.create("x", -1, None).isDefined must be(false)
Metric.create("x", java.lang.Double.NaN, None).isDefined must be(false)
- Metric.create("x", Try(throw new RuntimeException), None).isDefined must be(false)
+ Metric.create("x", Failure(new RuntimeException), None).isDefined must be(false)
}
"recognize whether a metric value is defined" in {
diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricValuesSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricValuesSpec.scala
index b174375ece..8a38b59da6 100644
--- a/akka-cluster/src/test/scala/akka/cluster/MetricValuesSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/MetricValuesSpec.scala
@@ -7,8 +7,7 @@ package akka.cluster
import scala.util.Try
import akka.actor.Address
import akka.testkit.AkkaSpec
-import akka.cluster.StandardMetrics.HeapMemory
-import akka.cluster.StandardMetrics.Cpu
+import akka.cluster.StandardMetrics._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricValuesSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsCollectorFactory {
@@ -19,21 +18,17 @@ class MetricValuesSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsC
val node2 = NodeMetrics(Address("akka", "sys", "a", 2555), 1, collector.sample.metrics)
val nodes: Seq[NodeMetrics] = {
- var nodes = Seq(node1, node2)
- // work up the data streams where applicable
- for (i ← 1 to 100) {
- nodes = nodes map { n ⇒
+ (1 to 100).foldLeft(List(node1, node2)) { (nodes, _) ⇒
+ nodes map { n ⇒
n.copy(metrics = collector.sample.metrics.flatMap(latest ⇒ n.metrics.collect {
case streaming if latest sameAs streaming ⇒ streaming :+ latest
}))
}
}
- nodes
}
"NodeMetrics.MetricValues" must {
"extract expected metrics for load balancing" in {
- import HeapMemory.Fields._
val stream1 = node2.metric(HeapMemoryCommitted).get.value.longValue
val stream2 = node1.metric(HeapMemoryUsed).get.value.longValue
stream1 must be >= (stream2)
@@ -53,7 +48,6 @@ class MetricValuesSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsC
committed must be > (0L)
// extract is the java api
StandardMetrics.extractCpu(node) must not be (null)
- case _ ⇒ fail("no heap")
}
node match {
@@ -67,7 +61,6 @@ class MetricValuesSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsC
}
// extract is the java api
StandardMetrics.extractCpu(node) must not be (null)
- case _ ⇒ fail("no cpu")
}
}
}
diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala
index a7cb445649..2ce3892645 100644
--- a/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala
@@ -14,8 +14,7 @@ import scala.util.{ Success, Try, Failure }
import akka.actor._
import akka.testkit._
-import akka.cluster.StandardMetrics.HeapMemory.Fields._
-import akka.cluster.StandardMetrics.Cpu.Fields._
+import akka.cluster.StandardMetrics._
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
@@ -41,23 +40,21 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl
val sample1 = collector.sample.metrics
val sample2 = collector.sample.metrics
val merged12 = sample2 flatMap (latest ⇒ sample1 collect {
- case peer if latest sameAs peer ⇒ {
+ case peer if latest sameAs peer ⇒
val m = peer :+ latest
m.value must be(latest.value)
m.isSmooth must be(peer.isSmooth || latest.isSmooth)
m
- }
})
val sample3 = collector.sample.metrics
val sample4 = collector.sample.metrics
val merged34 = sample4 flatMap (latest ⇒ sample3 collect {
- case peer if latest sameAs peer ⇒ {
+ case peer if latest sameAs peer ⇒
val m = peer :+ latest
m.value must be(latest.value)
m.isSmooth must be(peer.isSmooth || latest.isSmooth)
m
- }
})
}
}
@@ -124,12 +121,12 @@ trait MetricsCollectorFactory { this: AkkaSpec ⇒
def createMetricsCollector: MetricsCollector =
Try(new SigarMetricsCollector(selfAddress, defaultDecayFactor,
- extendedActorSystem.dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Nil).get)) match {
- case Success(sigarCollector) ⇒ sigarCollector
- case Failure(e) ⇒
- log.debug("Metrics will be retreived from MBeans, Sigar failed to load. Reason: " + e)
- new JmxMetricsCollector(selfAddress, defaultDecayFactor)
- }
+ extendedActorSystem.dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Nil))).
+ recover {
+ case e ⇒
+ log.debug("Metrics will be retreived from MBeans, Sigar failed to load. Reason: " + e)
+ new JmxMetricsCollector(selfAddress, defaultDecayFactor)
+ }.get
def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]
}
diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/MetricsSelectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/routing/MetricsSelectorSpec.scala
index 851152413e..5b5b92d950 100644
--- a/akka-cluster/src/test/scala/akka/cluster/routing/MetricsSelectorSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/routing/MetricsSelectorSpec.scala
@@ -10,8 +10,7 @@ import org.scalatest.matchers.MustMatchers
import akka.actor.Address
import akka.cluster.Metric
import akka.cluster.NodeMetrics
-import akka.cluster.StandardMetrics.HeapMemory.Fields._
-import akka.cluster.StandardMetrics.Cpu.Fields._
+import akka.cluster.StandardMetrics._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricsSelectorSpec extends WordSpec with MustMatchers {
@@ -107,7 +106,7 @@ class MetricsSelectorSpec extends WordSpec with MustMatchers {
"MixMetricsSelector" must {
"aggregate capacity of all metrics" in {
- val capacity = MixMetricsSelector().capacity(nodeMetrics)
+ val capacity = MixMetricsSelector.capacity(nodeMetrics)
capacity(a1) must be((0.75 + 0.9 + 0.9375) / 3 plusOrMinus 0.0001)
capacity(b1) must be((0.75 + 0.5 + 0.9375) / 3 plusOrMinus 0.0001)
capacity(c1) must be((0.0 + 0.0 + 0.0) / 3 plusOrMinus 0.0001)
diff --git a/akka-docs/rst/cluster/cluster-usage-java.rst b/akka-docs/rst/cluster/cluster-usage-java.rst
index b91b3d943d..9059bfd13d 100644
--- a/akka-docs/rst/cluster/cluster-usage-java.rst
+++ b/akka-docs/rst/cluster/cluster-usage-java.rst
@@ -440,8 +440,8 @@ service nodes and 1 client::
Cluster Metrics
^^^^^^^^^^^^^^^
-The member nodes of the cluster collects system health metrics and publish that to other nodes and to
-registered subscribers. This information is primarily used for load-balancing of nodes.
+The member nodes of the cluster collects system health metrics and publishes that to other nodes and to
+registered subscribers. This information is primarily used for load-balancing routers.
Hyperic Sigar
-------------
@@ -454,7 +454,7 @@ Sigar is using a native OS library. To enable usage of Sigar you need to add the
org.hyperic
sigar
- 1.6.4
+ @sigarVersion@
@@ -522,6 +522,8 @@ one frontend::
mvn exec:java \
-Dexec.mainClass="sample.cluster.factorial.FactorialFrontendMain"
+Press ctrl-c in the terminal window of the frontend to stop the factorial calculations.
+
Subscribe to Metrics Events
---------------------------
diff --git a/akka-docs/rst/cluster/cluster-usage-scala.rst b/akka-docs/rst/cluster/cluster-usage-scala.rst
index 00369e2303..eacbefc9d6 100644
--- a/akka-docs/rst/cluster/cluster-usage-scala.rst
+++ b/akka-docs/rst/cluster/cluster-usage-scala.rst
@@ -401,8 +401,8 @@ service nodes and 1 client::
Cluster Metrics
^^^^^^^^^^^^^^^
-The member nodes of the cluster collects system health metrics and publish that to other nodes and to
-registered subscribers. This information is primarily used for load-balancing of nodes.
+The member nodes of the cluster collects system health metrics and publishes that to other nodes and to
+registered subscribers. This information is primarily used for load-balancing routers.
Hyperic Sigar
-------------
@@ -412,7 +412,7 @@ for a wider and more accurate range of metrics compared to what can be retrieved
Sigar is using a native OS library. To enable usage of Sigar you need to add the directory of the native library to
``-Djava.libarary.path=`` add the following dependency::
- "org.hyperic" % "sigar" % "1.6.4"
+ "org.hyperic" % "sigar" % "@sigarVersion@"
Adaptive Load Balancing
@@ -473,6 +473,7 @@ and you can try by starting nodes in different terminal windows. For example, st
run-main sample.cluster.factorial.FactorialFrontend
+Press ctrl-c in the terminal window of the frontend to stop the factorial calculations.
Subscribe to Metrics Events
---------------------------
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
index 847617a10c..f8f38a8a17 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
@@ -24,8 +24,13 @@ class RemoteActorRefProvider(
val remoteSettings: RemoteSettings = new RemoteSettings(settings.config, systemName)
- // this is lazy to be able to override it in subclass
- lazy val deployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess)
+ override val deployer: Deployer = createDeployer
+
+ /**
+ * Factory method to make it possible to override deployer in subclass
+ * Creates a new instance every time
+ */
+ protected def createDeployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess)
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, dynamicAccess, deployer)
diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontend.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontend.java
index 63656a43ad..13af688739 100644
--- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontend.java
+++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontend.java
@@ -14,6 +14,8 @@ import akka.cluster.routing.SystemLoadAverageMetricsSelector;
//#frontend
public class FactorialFrontend extends UntypedActor {
+ final int upToN;
+ final boolean repeat;
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@@ -21,21 +23,37 @@ public class FactorialFrontend extends UntypedActor {
new Props(FactorialBackend.class).withRouter(FromConfig.getInstance()),
"factorialBackendRouter");
+ public FactorialFrontend(int upToN, boolean repeat) {
+ this.upToN = upToN;
+ this.repeat = repeat;
+ }
+
+ @Override
+ public void preStart() {
+ sendJobs();
+ }
+
@Override
public void onReceive(Object message) {
- if (message instanceof Integer) {
- Integer n = (Integer) message;
- backend.tell(n, getSelf());
-
- } else if (message instanceof FactorialResult) {
+ if (message instanceof FactorialResult) {
FactorialResult result = (FactorialResult) message;
- log.info("{}! = {}", result.n, result.factorial);
+ if (result.n == upToN) {
+ log.debug("{}! = {}", result.n, result.factorial);
+ if (repeat) sendJobs();
+ }
} else {
unhandled(message);
}
}
+ void sendJobs() {
+ log.info("Starting batch of factorials up to [{}]", upToN);
+ for (int n = 1; n <= upToN; n++) {
+ backend.tell(n, getSelf());
+ }
+ }
+
}
//#frontend
diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontendMain.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontendMain.java
index 31d0ff5f12..8d52bdf54a 100644
--- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontendMain.java
+++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontendMain.java
@@ -1,38 +1,27 @@
package sample.cluster.factorial.japi;
-import java.util.concurrent.TimeUnit;
-
-import sample.cluster.transformation.japi.TransformationMessages.TransformationJob;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
-import akka.dispatch.OnSuccess;
-import akka.util.Timeout;
-import static akka.pattern.Patterns.ask;
+import akka.actor.UntypedActor;
+import akka.actor.UntypedActorFactory;
+
public class FactorialFrontendMain {
public static void main(String[] args) throws Exception {
- int upToN = (args.length == 0 ? 200 : Integer.valueOf(args[0]));
+ final int upToN = (args.length == 0 ? 200 : Integer.valueOf(args[0]));
ActorSystem system = ActorSystem.create("ClusterSystem");
- ActorRef frontend = system.actorOf(new Props(
- FactorialFrontend.class), "factorialFrontend");
+ // start the calculations when there is at least 2 other members
+ system.actorOf(new Props(new UntypedActorFactory() {
+ @Override
+ public UntypedActor create() {
+ return new StartupFrontend(upToN);
+ }
+ }), "startup");
- system.log().info("Starting up");
- // wait to let cluster converge and gather metrics
- Thread.sleep(10000);
-
- system.log().info("Starting many factorials up to [{}]", upToN);
- for (int i = 0; i < 1000; i++) {
- for (int n = 1; n <= upToN; n++) {
- frontend.tell(n, null);
- }
- }
-
}
}
diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/MetricsListener.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/MetricsListener.java
index 8931c3bed2..3acbf3e4c0 100644
--- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/MetricsListener.java
+++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/MetricsListener.java
@@ -36,15 +36,8 @@ public class MetricsListener extends UntypedActor {
ClusterMetricsChanged clusterMetrics = (ClusterMetricsChanged) message;
for (NodeMetrics nodeMetrics : clusterMetrics.getNodeMetrics()) {
if (nodeMetrics.address().equals(cluster.selfAddress())) {
- HeapMemory heap = StandardMetrics.extractHeapMemory(nodeMetrics);
- if (heap != null) {
- log.info("Used heap: {} MB", ((double) heap.used()) / 1024 / 1024);
- }
- Cpu cpu = StandardMetrics.extractCpu(nodeMetrics);
- if (cpu != null && cpu.systemLoadAverage().isDefined()) {
- log.info("Load: {} ({} processors)", cpu.systemLoadAverage().get(),
- cpu.processors());
- }
+ logHeap(nodeMetrics);
+ logCpu(nodeMetrics);
}
}
@@ -56,5 +49,20 @@ public class MetricsListener extends UntypedActor {
}
}
+ void logHeap(NodeMetrics nodeMetrics) {
+ HeapMemory heap = StandardMetrics.extractHeapMemory(nodeMetrics);
+ if (heap != null) {
+ log.info("Used heap: {} MB", ((double) heap.used()) / 1024 / 1024);
+ }
+ }
+
+ void logCpu(NodeMetrics nodeMetrics) {
+ Cpu cpu = StandardMetrics.extractCpu(nodeMetrics);
+ if (cpu != null && cpu.systemLoadAverage().isDefined()) {
+ log.info("Load: {} ({} processors)", cpu.systemLoadAverage().get(),
+ cpu.processors());
+ }
+ }
+
}
//#metrics-listener
\ No newline at end of file
diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/StartupFrontend.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/StartupFrontend.java
new file mode 100644
index 0000000000..54ca680988
--- /dev/null
+++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/StartupFrontend.java
@@ -0,0 +1,56 @@
+package sample.cluster.factorial.japi;
+
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.actor.UntypedActorFactory;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent.CurrentClusterState;
+import akka.cluster.ClusterEvent.MemberUp;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+
+public class StartupFrontend extends UntypedActor {
+ final int upToN;
+ LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+ int memberCount = 0;
+
+ public StartupFrontend(int upToN) {
+ this.upToN = upToN;
+ }
+
+ //subscribe to ClusterMetricsChanged
+ @Override
+ public void preStart() {
+ log.info("Factorials will start when 3 members in the cluster.");
+ Cluster.get(getContext().system()).subscribe(getSelf(), MemberUp.class);
+ }
+
+ @Override
+ public void onReceive(Object message) {
+ if (message instanceof CurrentClusterState) {
+ CurrentClusterState state = (CurrentClusterState) message;
+ memberCount = state.members().size();
+ runWhenReady();
+
+ } else if (message instanceof MemberUp) {
+ memberCount++;
+ runWhenReady();
+
+ } else {
+ unhandled(message);
+ }
+
+ }
+
+ void runWhenReady() {
+ if (memberCount >= 3) {
+ getContext().system().actorOf(new Props(new UntypedActorFactory() {
+ @Override
+ public UntypedActor create() {
+ return new FactorialFrontend(upToN, true);
+ }
+ }), "factorialFrontend");
+ getContext().stop(getSelf());
+ }
+ }
+}
diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala
index 6e55e03656..9e219a933a 100644
--- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala
+++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala
@@ -13,34 +13,62 @@ import akka.routing.FromConfig
//#imports
+import akka.cluster.Cluster
+import akka.cluster.ClusterEvent.CurrentClusterState
+import akka.cluster.ClusterEvent.MemberUp
+
object FactorialFrontend {
def main(args: Array[String]): Unit = {
val upToN = if (args.isEmpty) 200 else args(0).toInt
val system = ActorSystem("ClusterSystem")
- val frontend = system.actorOf(Props[FactorialFrontend], name = "factorialFrontend")
- system.log.info("Starting up")
- // wait to let cluster converge and gather metrics
- Thread.sleep(10000)
+ // start the calculations when there is at least 2 other members
+ system.actorOf(Props(new Actor with ActorLogging {
+ var memberCount = 0
+
+ log.info("Factorials will start when 3 members in the cluster.")
+ Cluster(context.system).subscribe(self, classOf[MemberUp])
+
+ def receive = {
+ case state: CurrentClusterState ⇒
+ memberCount = state.members.size
+ runWhenReady()
+ case MemberUp(member) ⇒
+ memberCount += 1
+ runWhenReady()
+ }
+
+ def runWhenReady(): Unit = if (memberCount >= 3) {
+ context.system.actorOf(Props(new FactorialFrontend(upToN, repeat = true)),
+ name = "factorialFrontend")
+ context stop self
+ }
+
+ }), name = "startup")
- system.log.info("Starting many factorials up to [{}]", upToN)
- for (_ ← 1 to 1000; n ← 1 to upToN) {
- frontend ! n
- }
}
}
//#frontend
-class FactorialFrontend extends Actor with ActorLogging {
+class FactorialFrontend(upToN: Int, repeat: Boolean) extends Actor with ActorLogging {
val backend = context.actorOf(Props[FactorialBackend].withRouter(FromConfig),
name = "factorialBackendRouter")
+ override def preStart(): Unit = sendJobs()
+
def receive = {
- case n: Int ⇒ backend ! n
case (n: Int, factorial: BigInt) ⇒
- log.info("{}! = {}", n, factorial)
+ if (n == upToN) {
+ log.debug("{}! = {}", n, factorial)
+ if (repeat) sendJobs()
+ }
+ }
+
+ def sendJobs(): Unit = {
+ log.info("Starting batch of factorials up to [{}]", upToN)
+ 1 to upToN foreach { backend ! _ }
}
}
//#frontend
@@ -83,6 +111,7 @@ class FactorialBackend extends Actor with ActorLogging {
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.ClusterMetricsChanged
import akka.cluster.ClusterEvent.CurrentClusterState
+import akka.cluster.NodeMetrics
import akka.cluster.StandardMetrics.HeapMemory
import akka.cluster.StandardMetrics.Cpu
@@ -97,21 +126,25 @@ class MetricsListener extends Actor with ActorLogging {
Cluster(context.system).unsubscribe(self)
def receive = {
- case ClusterMetricsChanged(nodeMetrics) ⇒
- nodeMetrics.filter(_.address == selfAddress) foreach { n ⇒
- n match {
- case HeapMemory(address, timestamp, used, committed, max) ⇒
- log.info("Used heap: {} MB", used.doubleValue / 1024 / 1024)
- case _ ⇒ // no heap info
- }
- n match {
- case Cpu(address, timestamp, Some(systemLoadAverage), cpuCombined, processors) ⇒
- log.info("Load: {} ({} processors)", systemLoadAverage, processors)
- case _ ⇒ // no cpu info
- }
+ case ClusterMetricsChanged(clusterMetrics) ⇒
+ clusterMetrics.filter(_.address == selfAddress) foreach { nodeMetrics ⇒
+ logHeap(nodeMetrics)
+ logCpu(nodeMetrics)
}
case state: CurrentClusterState ⇒ // ignore
}
+
+ def logHeap(nodeMetrics: NodeMetrics): Unit = nodeMetrics match {
+ case HeapMemory(address, timestamp, used, committed, max) ⇒
+ log.info("Used heap: {} MB", used.doubleValue / 1024 / 1024)
+ case _ ⇒ // no heap info
+ }
+
+ def logCpu(nodeMetrics: NodeMetrics): Unit = nodeMetrics match {
+ case Cpu(address, timestamp, Some(systemLoadAverage), cpuCombined, processors) ⇒
+ log.info("Load: {} ({} processors)", systemLoadAverage, processors)
+ case _ ⇒ // no cpu info
+ }
}
//#metrics-listener
diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala
index 570fe83a53..f68a224902 100644
--- a/project/AkkaBuild.scala
+++ b/project/AkkaBuild.scala
@@ -553,6 +553,7 @@ object AkkaBuild extends Build {
case BinVer(bv) => bv
case _ => s
}),
+ "sigarVersion" -> Dependencies.Compile.sigar.revision,
"github" -> "http://github.com/akka/akka/tree/%s".format((if (isSnapshot) "master" else "v" + v))
)
},