diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala
index e848529a94..4b99374bda 100644
--- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala
+++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala
@@ -18,6 +18,11 @@ import com.eaio.uuid.UUID
import java.net.InetSocketAddress
import java.util.concurrent.{ ConcurrentSkipListSet }
+import akka.cluster.metrics._
+
+import akka.util.Duration
+import akka.util.duration._
+
class ClusterException(message: String) extends AkkaException(message)
object ChangeListener {
@@ -109,6 +114,76 @@ object NodeAddress {
}
}
+/*
+ * Allows user to access metrics of a different nodes in the cluster. Changing metrics can be monitored
+ * using {@link MetricsAlterationMonitor}
+ * Metrics of the cluster nodes are distributed through ZooKeeper. For better performance, metrics are
+ * cached internally, and refreshed from ZooKeeper after an interval
+ */
+trait NodeMetricsManager {
+
+ /*
+ * Gets metrics of a local node directly from JMX monitoring beans/Hyperic Sigar
+ */
+ def getLocalMetrics: NodeMetrics
+
+ /*
+ * Gets metrics of a specified node
+ * @param nodeName metrics of the node specified by the name will be returned
+ * @param useCached if true, returns metrics cached in the metrics manager,
+ * gets metrics directly from ZooKeeper otherwise
+ */
+ def getMetrics(nodeName: String, useCached: Boolean = true): Option[NodeMetrics]
+
+ /*
+ * Gets cached metrics of all nodes in the cluster
+ */
+ def getAllMetrics: Array[NodeMetrics]
+
+ /*
+ * Adds monitor that reacts, when specific conditions are satisfied
+ */
+ def addMonitor(monitor: MetricsAlterationMonitor): Unit
+
+ /*
+ * Removes monitor
+ */
+ def removeMonitor(monitor: MetricsAlterationMonitor): Unit
+
+ /*
+ * Removes metrics of s specified node from ZooKeeper and metrics manager cache
+ */
+ def removeNodeMetrics(nodeName: String): Unit
+
+ /*
+ * Sets timeout after which metrics, cached in the metrics manager, will be refreshed from ZooKeeper
+ */
+ def refreshTimeout_=(newValue: Duration): Unit
+
+ /*
+ * Timeout after which metrics, cached in the metrics manager, will be refreshed from ZooKeeper
+ */
+ def refreshTimeout: Duration
+
+ /*
+ * Starts metrics manager. When metrics manager is started, it refreshes cache from ZooKeeper
+ * after refreshTimeout, and invokes plugged monitors
+ */
+ def start(): NodeMetricsManager
+
+ /*
+ * Stops metrics manager. Stopped metrics manager doesn't refresh cache from ZooKeeper,
+ * and doesn't invoke plugged monitors
+ */
+ def stop(): Unit
+
+ /*
+ * If the value is true, metrics manages is started and running. Stopped, otherwise
+ */
+ def isRunning: Boolean
+
+}
+
/**
* Interface for cluster node.
*
@@ -139,6 +214,8 @@ trait ClusterNode {
def reconnect(): ClusterNode
+ def metricsManager: NodeMetricsManager
+
/**
* Registers a cluster change listener.
*/
diff --git a/akka-actor/src/main/scala/akka/cluster/metrics/MetricsAlterationMonitor.scala b/akka-actor/src/main/scala/akka/cluster/metrics/MetricsAlterationMonitor.scala
new file mode 100644
index 0000000000..d537c4f70f
--- /dev/null
+++ b/akka-actor/src/main/scala/akka/cluster/metrics/MetricsAlterationMonitor.scala
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc.
+ */
+
+package akka.cluster.metrics
+
+/*
+ * {@link NodeMetricsManager} periodically refershes internal cache with node metrics from MBeans / Sigar.
+ * Every time local cache is refreshed, monitors plugged to the metrics manager are invoked.
+ * If updated metrics satisfy conditions, specified in reactsOn,
+ * react is called
+ *
+ * @exampl {{{
+ * class PeakCPULoadMonitor extends LocalMetricsAlterationMonitor {
+ * val id = "peak-cpu-load-monitor"
+ *
+ * def reactsOn(metrics: NodeMetrics) =
+ * metrics.systemLoadAverage > 0.8
+ *
+ * def react(metrics: NodeMetrics) =
+ * println("Peak average system load at node [%s] is reached!" format (metrics.nodeName))
+ * }
+ * }}}
+ *
+ */
+trait LocalMetricsAlterationMonitor extends MetricsAlterationMonitor {
+
+ /*
+ * Definies conditions that must be satisfied in order to react on the changed metrics
+ */
+ def reactsOn(metrics: NodeMetrics): Boolean
+
+ /*
+ * Reacts on the changed metrics
+ */
+ def react(metrics: NodeMetrics): Unit
+
+}
+
+/*
+ * {@link NodeMetricsManager} periodically refershes internal cache with metrics of all nodes in the cluster
+ * from ZooKeeper. Every time local cache is refreshed, monitors plugged to the metrics manager are invoked.
+ * If updated metrics satisfy conditions, specified in reactsOn,
+ * react is called
+ *
+ * @exampl {{{
+ * class PeakCPULoadReached extends ClusterMetricsAlterationMonitor {
+ * val id = "peak-cpu-load-reached"
+ *
+ * def reactsOn(metrics: Array[NodeMetrics]) =
+ * metrics.forall(_.systemLoadAverage > 0.8)
+ *
+ * def react(metrics: Array[NodeMetrics]) =
+ * println("One of the nodes in the scluster has reached the peak system load!")
+ * }
+ * }}}
+ *
+ */
+trait ClusterMetricsAlterationMonitor extends MetricsAlterationMonitor {
+
+ /*
+ * Definies conditions that must be satisfied in order to react on the changed metrics
+ */
+ def reactsOn(allMetrics: Array[NodeMetrics]): Boolean
+
+ /*
+ * Reacts on the changed metrics
+ */
+ def react(allMetrics: Array[NodeMetrics]): Unit
+
+}
+
+sealed trait MetricsAlterationMonitor extends Comparable[MetricsAlterationMonitor] {
+
+ /*
+ * Unique identiifier of the monitor
+ */
+ def id: String
+
+ def compareTo(otherMonitor: MetricsAlterationMonitor) = id.compareTo(otherMonitor.id)
+
+}
\ No newline at end of file
diff --git a/akka-actor/src/main/scala/akka/cluster/metrics/NodeMetrics.scala b/akka-actor/src/main/scala/akka/cluster/metrics/NodeMetrics.scala
new file mode 100644
index 0000000000..d6bd2ed01c
--- /dev/null
+++ b/akka-actor/src/main/scala/akka/cluster/metrics/NodeMetrics.scala
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc.
+ */
+
+package akka.cluster.metrics
+
+/*
+ * Snapshot of the JVM / system that's the node is running on
+ */
+trait NodeMetrics {
+
+ /*
+ * Name of the node the metrics are gathered at
+ */
+ def nodeName: String
+
+ /*
+ * Amount of heap memory currently used
+ */
+ def usedHeapMemory: Long
+
+ /*
+ * Amount of heap memory guaranteed to be available
+ */
+ def committedHeapMemory: Long
+
+ /*
+ * Maximum amount of heap memory that can be used
+ */
+ def maxHeapMemory: Long
+
+ /*
+ * Number of the processors avalable to the JVM
+ */
+ def avaiableProcessors: Int
+
+ /*
+ * If OS-specific Hyperic Sigar library is plugged, it's used to calculate
+ * average load on the CPUs in the system. Otherwise, value is retreived from monitoring MBeans.
+ * Hyperic Sigar provides more precise values, and, thus, if the library is provided, it's used by default.
+ */
+ def systemLoadAverage: Double
+
+}
\ No newline at end of file
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index b4af807a69..5e6cb8f9c0 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -19,6 +19,7 @@ import scala.collection.mutable.ConcurrentMap
import scala.collection.JavaConversions._
import akka.util._
+import duration._
import Helpers._
import akka.actor._
@@ -39,6 +40,7 @@ import akka.serialization.{ Serialization, Serializer, ActorSerialization }
import ActorSerialization._
import akka.serialization.Compression.LZF
+import akka.cluster.metrics._
import akka.cluster.zookeeper._
import ChangeListener._
import ClusterProtocol._
@@ -150,6 +152,7 @@ object Cluster {
val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181")
val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552)
val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt
+ val metricsRefreshInterval = Duration(config.getInt("akka.cluster.metrics-refresh-timeout", 2), TIME_UNIT)
val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt
val maxTimeToWaitUntilConnected = Duration(config.getInt("akka.cluster.max-time-to-wait-until-connected", 30), TIME_UNIT).toMillis.toInt
val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
@@ -304,6 +307,8 @@ class DefaultClusterNode private[akka] (
lazy val remoteServerAddress: InetSocketAddress = remoteService.address
+ lazy val metricsManager: NodeMetricsManager = new LocalNodeMetricsManager(zkClient, Cluster.metricsRefreshInterval).start()
+
// static nodes
val CLUSTER_PATH = "/" + nodeAddress.clusterName
val MEMBERSHIP_PATH = CLUSTER_PATH + "/members"
@@ -314,6 +319,7 @@ class DefaultClusterNode private[akka] (
val ACTOR_UUID_REGISTRY_PATH = CLUSTER_PATH + "/actor-uuid-registry"
val ACTOR_ADDRESS_TO_UUIDS_PATH = CLUSTER_PATH + "/actor-address-to-uuids"
val NODE_TO_ACTOR_UUIDS_PATH = CLUSTER_PATH + "/node-to-actors-uuids"
+ val NODE_METRICS = CLUSTER_PATH + "/metrics"
val basePaths = List(
CLUSTER_PATH,
@@ -324,7 +330,8 @@ class DefaultClusterNode private[akka] (
NODE_TO_ACTOR_UUIDS_PATH,
ACTOR_ADDRESS_TO_UUIDS_PATH,
CONFIGURATION_PATH,
- PROVISIONING_PATH)
+ PROVISIONING_PATH,
+ NODE_METRICS)
val LEADER_ELECTION_PATH = CLUSTER_PATH + "/leader" // should NOT be part of 'basePaths' only used by 'leaderLock'
@@ -1641,7 +1648,11 @@ class MembershipChildListener(self: ClusterNode) extends IZkChildListener with E
// publish NodeConnected and NodeDisconnect events to the listeners
newlyConnectedMembershipNodes foreach (node ⇒ self.publish(NodeConnected(node)))
- newlyDisconnectedMembershipNodes foreach (node ⇒ self.publish(NodeDisconnected(node)))
+ newlyDisconnectedMembershipNodes foreach { node ⇒
+ self.publish(NodeDisconnected(node))
+ // remove metrics of a disconnected node from ZK and local cache
+ self.metricsManager.removeNodeMetrics(node)
+ }
}
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
index 6061b514db..1fceb7fb3a 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
@@ -132,6 +132,11 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
(s.version, s.connections.values)
}
+ def versionedSocketAddressToRefMap = {
+ val s = state.get
+ (s.version, s.connections)
+ }
+
def size: Int = state.get().connections.size
def stopAll() {
diff --git a/akka-cluster/src/main/scala/akka/cluster/metrics/LocalNodeMetricsManager.scala b/akka-cluster/src/main/scala/akka/cluster/metrics/LocalNodeMetricsManager.scala
new file mode 100644
index 0000000000..a01a43e077
--- /dev/null
+++ b/akka-cluster/src/main/scala/akka/cluster/metrics/LocalNodeMetricsManager.scala
@@ -0,0 +1,226 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc.
+ */
+
+package akka.cluster.metrics
+
+import akka.cluster._
+import Cluster._
+import akka.cluster.zookeeper._
+import akka.actor._
+import Actor._
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import java.util.concurrent.{ ConcurrentHashMap, ConcurrentSkipListSet }
+import java.util.concurrent.atomic.AtomicReference
+import akka.util.{ Duration, Switch }
+import akka.util.Helpers._
+import akka.util.duration._
+import org.I0Itec.zkclient.exception.ZkNoNodeException
+import akka.event.EventHandler
+
+/*
+ * Instance of the metrics manager running on the node. To keep the fine performance, metrics of all the
+ * nodes in the cluster are cached internally, and refreshed from monitoring MBeans / Sigar (when if's local node),
+ * of ZooKeeper (if it's metrics of all the nodes in the cluster) after a specified timeout -
+ * metricsRefreshTimeout
+ * metricsRefreshTimeout defaults to 2 seconds, and can be declaratively defined through
+ * akka.conf:
+ *
+ * @exampl {{{
+ * akka.cluster.metrics-refresh-timeout = 2
+ * }}}
+ */
+class LocalNodeMetricsManager(zkClient: AkkaZkClient, private val metricsRefreshTimeout: Duration)
+ extends NodeMetricsManager {
+
+ /*
+ * Provides metrics of the system that the node is running on, through monitoring MBeans, Hyperic Sigar
+ * and other systems
+ */
+ lazy private val metricsProvider = SigarMetricsProvider(refreshTimeout.toMillis.toInt) fold ((thrw) ⇒ {
+ EventHandler.warning(this, """Hyperic Sigar library failed to load due to %s: %s.
+All the metrics will be retreived from monitoring MBeans, and may be incorrect at some platforms.
+In order to get better metrics, please put "sigar.jar" to the classpath, and add platform-specific native libary to "java.library.path"."""
+ .format(thrw.getClass.getName, thrw.getMessage))
+ new JMXMetricsProvider
+ },
+ sigar ⇒ sigar)
+
+ /*
+ * Metrics of all nodes in the cluster
+ */
+ private val localNodeMetricsCache = new ConcurrentHashMap[String, NodeMetrics]
+
+ @volatile
+ private var _refreshTimeout = metricsRefreshTimeout
+
+ /*
+ * Plugged monitors (both local and cluster-wide)
+ */
+ private val alterationMonitors = new ConcurrentSkipListSet[MetricsAlterationMonitor]
+
+ private val _isRunning = new Switch(false)
+
+ /*
+ * If the value is true, metrics manages is started and running. Stopped, otherwise
+ */
+ def isRunning = _isRunning.isOn
+
+ /*
+ * Starts metrics manager. When metrics manager is started, it refreshes cache from ZooKeeper
+ * after refreshTimeout, and invokes plugged monitors
+ */
+ def start() = {
+ _isRunning.switchOn { refresh() }
+ this
+ }
+
+ private[cluster] def metricsForNode(nodeName: String): String = "%s/%s".format(node.NODE_METRICS, nodeName)
+
+ /*
+ * Adds monitor that reacts, when specific conditions are satisfied
+ */
+ def addMonitor(monitor: MetricsAlterationMonitor) = alterationMonitors add monitor
+
+ def removeMonitor(monitor: MetricsAlterationMonitor) = alterationMonitors remove monitor
+
+ def refreshTimeout_=(newValue: Duration) = _refreshTimeout = newValue
+
+ /*
+ * Timeout after which metrics, cached in the metrics manager, will be refreshed from ZooKeeper
+ */
+ def refreshTimeout = _refreshTimeout
+
+ /*
+ * Stores metrics of the node in ZooKeeper
+ */
+ private[akka] def storeMetricsInZK(metrics: NodeMetrics) = {
+ val metricsPath = metricsForNode(metrics.nodeName)
+ if (zkClient.exists(metricsPath)) {
+ zkClient.writeData(metricsPath, metrics)
+ } else {
+ ignore[ZkNoNodeException](zkClient.createEphemeral(metricsPath, metrics))
+ }
+ }
+
+ /*
+ * Gets metrics of the node from ZooKeeper
+ */
+ private[akka] def getMetricsFromZK(nodeName: String) = {
+ zkClient.readData[NodeMetrics](metricsForNode(nodeName))
+ }
+
+ /*
+ * Removed metrics of the node from local cache and ZooKeeper
+ */
+ def removeNodeMetrics(nodeName: String) = {
+ val metricsPath = metricsForNode(nodeName)
+ if (zkClient.exists(metricsPath)) {
+ ignore[ZkNoNodeException](zkClient.delete(metricsPath))
+ }
+
+ localNodeMetricsCache.remove(nodeName)
+ }
+
+ /*
+ * Gets metrics of a local node directly from JMX monitoring beans/Hyperic Sigar
+ */
+ def getLocalMetrics = metricsProvider.getLocalMetrics
+
+ /*
+ * Gets metrics of the node, specified by the name. If useCached is true (default value),
+ * metrics snapshot is taken from the local cache; otherwise, it's retreived from ZooKeeper'
+ */
+ def getMetrics(nodeName: String, useCached: Boolean = true): Option[NodeMetrics] =
+ if (useCached)
+ Option(localNodeMetricsCache.get(nodeName))
+ else
+ try {
+ Some(getMetricsFromZK(nodeName))
+ } catch {
+ case ex: ZkNoNodeException ⇒ None
+ }
+
+ /*
+ * Return metrics of all nodes in the cluster from ZooKeeper
+ */
+ private[akka] def getAllMetricsFromZK: Map[String, NodeMetrics] = {
+ val metricsPaths = zkClient.getChildren(node.NODE_METRICS).toList.toArray.asInstanceOf[Array[String]]
+ metricsPaths.flatMap { nodeName ⇒ getMetrics(nodeName, false).map((nodeName, _)) } toMap
+ }
+
+ /*
+ * Gets cached metrics of all nodes in the cluster
+ */
+ def getAllMetrics: Array[NodeMetrics] = localNodeMetricsCache.values.asScala.toArray
+
+ /*
+ * Refreshes locally cached metrics from ZooKeeper, and invokes plugged monitors
+ */
+ private[akka] def refresh(): Unit = {
+
+ storeMetricsInZK(getLocalMetrics)
+ refreshMetricsCacheFromZK()
+
+ if (isRunning) {
+ Scheduler.schedule({ () ⇒ refresh() }, refreshTimeout.length, refreshTimeout.length, refreshTimeout.unit)
+ invokeMonitors()
+ }
+ }
+
+ /*
+ * Refreshes metrics manager cache from ZooKeeper
+ */
+ private def refreshMetricsCacheFromZK(): Unit = {
+ val allMetricsFromZK = getAllMetricsFromZK
+
+ localNodeMetricsCache.keySet.foreach { key ⇒
+ if (!allMetricsFromZK.contains(key))
+ localNodeMetricsCache.remove(key)
+ }
+
+ // RACY: metrics for the node might have been removed both from ZK and local cache by the moment,
+ // but will be re-cached, since they're still present in allMetricsFromZK snapshot. Not important, because
+ // cache will be fixed soon, at the next iteration of refresh
+ allMetricsFromZK map {
+ case (node, metrics) ⇒
+ localNodeMetricsCache.put(node, metrics)
+ }
+ }
+
+ /*
+ * Invokes monitors with the cached metrics
+ */
+ private def invokeMonitors(): Unit = if (!alterationMonitors.isEmpty) {
+ // RACY: metrics for some nodes might have been removed/added by that moment. Not important,
+ // because monitors will be fed with up-to-date metrics shortly, at the next iteration of refresh
+ val clusterNodesMetrics = getAllMetrics
+ val localNodeMetrics = clusterNodesMetrics.find(_.nodeName == nodeAddress.nodeName)
+ val iterator = alterationMonitors.iterator
+
+ // RACY: there might be new monitors added after the iterator has been obtained. Not important,
+ // becuse refresh interval is meant to be very short, and all the new monitors will be called ad the
+ // next refresh iteration
+ while (iterator.hasNext) {
+
+ val monitor = iterator.next
+
+ monitor match {
+ case localMonitor: LocalMetricsAlterationMonitor ⇒
+ localNodeMetrics.map { metrics ⇒
+ if (localMonitor reactsOn metrics)
+ localMonitor react metrics
+ }
+
+ case clusterMonitor: ClusterMetricsAlterationMonitor ⇒
+ if (clusterMonitor reactsOn clusterNodesMetrics)
+ clusterMonitor react clusterNodesMetrics
+ }
+
+ }
+ }
+
+ def stop() = _isRunning.switchOff
+
+}
diff --git a/akka-cluster/src/main/scala/akka/cluster/metrics/MetricsProvider.scala b/akka-cluster/src/main/scala/akka/cluster/metrics/MetricsProvider.scala
new file mode 100644
index 0000000000..61a0b4e969
--- /dev/null
+++ b/akka-cluster/src/main/scala/akka/cluster/metrics/MetricsProvider.scala
@@ -0,0 +1,154 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc.
+ */
+
+package akka.cluster.metrics
+
+import akka.cluster._
+import akka.event.EventHandler
+import java.lang.management.ManagementFactory
+import akka.util.ReflectiveAccess._
+import akka.util.Switch
+
+/*
+ * Snapshot of the JVM / system that's the node is running on
+ *
+ * @param nodeName name of the node, where metrics are gathered at
+ * @param usedHeapMemory amount of heap memory currently used
+ * @param committedHeapMemory amount of heap memory guaranteed to be available
+ * @param maxHeapMemory maximum amount of heap memory that can be used
+ * @param avaiableProcessors number of the processors avalable to the JVM
+ * @param systemLoadAverage system load average. If OS-specific Sigar's native library is plugged,
+ * it's used to calculate average load on the CPUs in the system. Otherwise, value is retreived from monitoring
+ * MBeans. Hyperic Sigar provides more precise values, and, thus, if the library is provided, it's used by default.
+ *
+ */
+case class DefaultNodeMetrics(nodeName: String,
+ usedHeapMemory: Long,
+ committedHeapMemory: Long,
+ maxHeapMemory: Long,
+ avaiableProcessors: Int,
+ systemLoadAverage: Double) extends NodeMetrics
+
+object MetricsProvider {
+
+ /*
+ * Maximum value of system load average
+ */
+ val MAX_SYS_LOAD_AVG = 1
+
+ /*
+ * Minimum value of system load average
+ */
+ val MIN_SYS_LOAD_AVG = 0
+
+ /*
+ * Default value of system load average
+ */
+ val DEF_SYS_LOAD_AVG = 0.5
+
+}
+
+/*
+ * Abstracts metrics provider that returns metrics of the system the node is running at
+ */
+trait MetricsProvider {
+
+ /*
+ * Gets metrics of the local system
+ */
+ def getLocalMetrics: NodeMetrics
+
+}
+
+/*
+ * Loads JVM metrics through JMX monitoring beans
+ */
+class JMXMetricsProvider extends MetricsProvider {
+
+ import MetricsProvider._
+
+ private val memoryMXBean = ManagementFactory.getMemoryMXBean
+
+ private val osMXBean = ManagementFactory.getOperatingSystemMXBean
+
+ /*
+ * Validates and calculates system load average
+ *
+ * @param avg system load average obtained from a specific monitoring provider (may be incorrect)
+ * @return system load average, or default value(0.5), if passed value was out of permitted
+ * bounds (0.0 to 1.0)
+ */
+ @inline
+ protected final def calcSystemLoadAverage(avg: Double) =
+ if (avg >= MIN_SYS_LOAD_AVG && avg <= MAX_SYS_LOAD_AVG) avg else DEF_SYS_LOAD_AVG
+
+ protected def systemLoadAverage = calcSystemLoadAverage(osMXBean.getSystemLoadAverage)
+
+ def getLocalMetrics =
+ DefaultNodeMetrics(Cluster.nodeAddress.nodeName,
+ memoryMXBean.getHeapMemoryUsage.getUsed,
+ memoryMXBean.getHeapMemoryUsage.getCommitted,
+ memoryMXBean.getHeapMemoryUsage.getMax,
+ osMXBean.getAvailableProcessors,
+ systemLoadAverage)
+
+}
+
+/*
+ * Loads wider range of metrics of a better quality with Hyperic Sigar (native library)
+ *
+ * @param refreshTimeout Sigar gathers metrics during this interval
+ */
+class SigarMetricsProvider private (private val sigarInstance: AnyRef) extends JMXMetricsProvider {
+
+ private val reportErrors = new Switch(true)
+
+ private val getCpuPercMethod = sigarInstance.getClass.getMethod("getCpuPerc")
+ private val sigarCpuCombinedMethod = getCpuPercMethod.getReturnType.getMethod("getCombined")
+
+ /*
+ * Wraps reflective calls to Hyperic Sigar
+ *
+ * @param f reflective call to Hyperic Sigar
+ * @param fallback function, which is invoked, if call to Sigar has been finished with exception
+ */
+ private def callSigarMethodOrElse[T](callSigar: ⇒ T, fallback: ⇒ T): T =
+ try callSigar catch {
+ case thrw ⇒
+ reportErrors.switchOff {
+ EventHandler.warning(this, "Failed to get metrics from Hyperic Sigar. %s: %s"
+ .format(thrw.getClass.getName, thrw.getMessage))
+ }
+ fallback
+ }
+
+ /*
+ * Obtains system load average from Sigar
+ * If the value cannot be obtained, falls back to system load average taken from JMX
+ */
+ override def systemLoadAverage = callSigarMethodOrElse(
+ calcSystemLoadAverage(sigarCpuCombinedMethod
+ .invoke(getCpuPercMethod.invoke(sigarInstance)).asInstanceOf[Double]),
+ super.systemLoadAverage)
+
+}
+
+object SigarMetricsProvider {
+
+ /*
+ * Instantiates Sigar metrics provider through reflections, in order to avoid creating dependencies to
+ * Hiperic Sigar library
+ */
+ def apply(refreshTimeout: Int): Either[Throwable, MetricsProvider] = try {
+ for {
+ sigarInstance ← createInstance[AnyRef]("org.hyperic.sigar.Sigar", noParams, noArgs).right
+ sigarProxyCacheClass: Class[_] ← getClassFor("org.hyperic.sigar.SigarProxyCache").right
+ } yield new SigarMetricsProvider(sigarProxyCacheClass
+ .getMethod("newInstance", Array(sigarInstance.getClass, classOf[Int]): _*)
+ .invoke(null, sigarInstance, new java.lang.Integer(refreshTimeout)))
+ } catch {
+ case thrw ⇒ Left(thrw)
+ }
+
+}
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmNode1.conf
new file mode 100644
index 0000000000..8d5284be46
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmNode1.conf
@@ -0,0 +1,4 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handlers = ["akka.testkit.TestEventListener"]
+akka.event-handler-level = "WARNING"
+akka.cluster.metrics-refresh-timeout = 1
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmNode1.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmNode1.opts
new file mode 100644
index 0000000000..a88c260d8c
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmNode1.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala
new file mode 100644
index 0000000000..456fd4f65a
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala
@@ -0,0 +1,134 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc.
+ */
+
+package akka.cluster.metrics.local
+
+import akka.cluster._
+import akka.actor._
+import Actor._
+import Cluster._
+import akka.dispatch._
+import akka.util.Duration
+import akka.util.duration._
+import akka.cluster.metrics._
+import java.util.concurrent.atomic.AtomicInteger
+
+object LocalMetricsMultiJvmSpec {
+ val NrOfNodes = 1
+}
+
+class LocalMetricsMultiJvmNode1 extends MasterClusterTestNode {
+
+ import LocalMetricsMultiJvmSpec._
+
+ val testNodes = NrOfNodes
+
+ override def beforeAll = {
+ super.beforeAll()
+ node
+ }
+
+ override def afterAll = {
+ node.shutdown()
+ super.afterAll()
+ }
+
+ "Metrics manager" must {
+
+ def timeout = node.metricsManager.refreshTimeout
+
+ "be initialized with refresh timeout value, specified in akka.conf" in {
+ timeout must be(1.second)
+ }
+
+ "return up-to-date local node metrics straight from MBeans/Sigar" in {
+ node.metricsManager.getLocalMetrics must not be (null)
+
+ node.metricsManager.getLocalMetrics.systemLoadAverage must be(0.5 plusOrMinus 0.5)
+ }
+
+ "return metrics cached in the MetricsManagerLocalMetrics" in {
+ node.metricsManager.getMetrics(nodeAddress.nodeName) must not be (null)
+ }
+
+ "return local node metrics from ZNode" in {
+ node.metricsManager.getMetrics(nodeAddress.nodeName, false) must not be (null)
+ }
+
+ "return cached metrics of all nodes in the cluster" in {
+ node.metricsManager.getAllMetrics.size must be(1)
+ node.metricsManager.getAllMetrics.find(_.nodeName == "node1") must not be (null)
+ }
+
+ "throw no exceptions, when user attempts to get metrics of a non-existing node" in {
+ node.metricsManager.getMetrics("nonexisting") must be(None)
+ node.metricsManager.getMetrics("nonexisting", false) must be(None)
+ }
+
+ "regularly update cached metrics" in {
+ val oldMetrics = node.metricsManager.getLocalMetrics
+ Thread sleep timeout.toMillis
+ node.metricsManager.getLocalMetrics must not be (oldMetrics)
+ }
+
+ "allow to track JVM state and bind handles through MetricsAlterationMonitors" in {
+ val monitorReponse = new DefaultPromise[String]
+
+ node.metricsManager.addMonitor(new LocalMetricsAlterationMonitor {
+
+ val id = "heapMemoryThresholdMonitor"
+
+ def reactsOn(metrics: NodeMetrics) = metrics.usedHeapMemory > 1
+
+ def react(metrics: NodeMetrics) = monitorReponse.completeWithResult("Too much memory is used!")
+
+ })
+
+ monitorReponse.get must be("Too much memory is used!")
+
+ }
+
+ class FooMonitor(monitorWorked: AtomicInteger) extends LocalMetricsAlterationMonitor {
+ val id = "fooMonitor"
+ def reactsOn(metrics: NodeMetrics) = true
+ def react(metrics: NodeMetrics) = monitorWorked.set(monitorWorked.get + 1)
+ }
+
+ "allow to unregister the monitor" in {
+
+ val monitorWorked = new AtomicInteger(0)
+ val fooMonitor = new FooMonitor(monitorWorked)
+
+ node.metricsManager.addMonitor(fooMonitor)
+ node.metricsManager.removeMonitor(fooMonitor)
+
+ val oldValue = monitorWorked.get
+ Thread sleep timeout.toMillis
+ monitorWorked.get must be(oldValue)
+
+ }
+
+ "stop notifying monitors, when stopped" in {
+
+ node.metricsManager.stop()
+
+ val monitorWorked = new AtomicInteger(0)
+
+ node.metricsManager.addMonitor(new LocalMetricsAlterationMonitor {
+ val id = "fooMonitor"
+ def reactsOn(metrics: NodeMetrics) = true
+ def react(metrics: NodeMetrics) = monitorWorked.set(monitorWorked.get + 1)
+ })
+
+ monitorWorked.get must be(0)
+
+ node.metricsManager.start()
+ Thread sleep (timeout.toMillis * 2)
+ monitorWorked.get must be > (1)
+
+ }
+
+ }
+
+}
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/remote/RemoteMetricsMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/remote/RemoteMetricsMultiJvmNode1.conf
new file mode 100644
index 0000000000..172e980612
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/remote/RemoteMetricsMultiJvmNode1.conf
@@ -0,0 +1,3 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handlers = ["akka.testkit.TestEventListener"]
+akka.event-handler-level = "WARNING"
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/remote/RemoteMetricsMultiJvmNode1.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/remote/RemoteMetricsMultiJvmNode1.opts
new file mode 100644
index 0000000000..a88c260d8c
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/remote/RemoteMetricsMultiJvmNode1.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/remote/RemoteMetricsMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/remote/RemoteMetricsMultiJvmNode2.conf
new file mode 100644
index 0000000000..172e980612
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/remote/RemoteMetricsMultiJvmNode2.conf
@@ -0,0 +1,3 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handlers = ["akka.testkit.TestEventListener"]
+akka.event-handler-level = "WARNING"
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/remote/RemoteMetricsMultiJvmNode2.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/remote/RemoteMetricsMultiJvmNode2.opts
new file mode 100644
index 0000000000..f1e01f253d
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/remote/RemoteMetricsMultiJvmNode2.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/remote/RemoteMetricsMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/remote/RemoteMetricsMultiJvmSpec.scala
new file mode 100644
index 0000000000..22054bc0d0
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/remote/RemoteMetricsMultiJvmSpec.scala
@@ -0,0 +1,133 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc.
+ */
+
+package akka.cluster.metrics.remote
+
+import akka.cluster._
+import akka.actor._
+import Actor._
+import Cluster._
+import akka.dispatch._
+import akka.util.Duration
+import akka.util.duration._
+import akka.cluster.metrics._
+import java.util.concurrent._
+import atomic.AtomicInteger
+
+object RemoteMetricsMultiJvmSpec {
+ val NrOfNodes = 2
+
+ val MetricsRefreshTimeout = 100.millis
+}
+
+class AllMetricsAvailableMonitor(_id: String, completionLatch: CountDownLatch, clusterSize: Int) extends ClusterMetricsAlterationMonitor {
+
+ val id = _id
+
+ def reactsOn(allMetrics: Array[NodeMetrics]) = allMetrics.size == clusterSize
+
+ def react(allMetrics: Array[NodeMetrics]) = completionLatch.countDown
+
+}
+
+class RemoteMetricsMultiJvmNode1 extends MasterClusterTestNode {
+
+ import RemoteMetricsMultiJvmSpec._
+
+ val testNodes = NrOfNodes
+
+ "Metrics manager" must {
+ "provide metrics of all nodes in the cluster" in {
+
+ val allMetricsAvaiable = new CountDownLatch(1)
+
+ node.metricsManager.refreshTimeout = MetricsRefreshTimeout
+ node.metricsManager.addMonitor(new AllMetricsAvailableMonitor("all-metrics-available", allMetricsAvaiable, NrOfNodes))
+
+ LocalCluster.barrier("node-start", NrOfNodes).await()
+
+ allMetricsAvaiable.await()
+
+ LocalCluster.barrier("check-all-remote-metrics", NrOfNodes) {
+ node.metricsManager.getAllMetrics.size must be(2)
+ }
+
+ val cachedMetrics = node.metricsManager.getMetrics("node2")
+ val metricsFromZnode = node.metricsManager.getMetrics("node2", false)
+
+ LocalCluster.barrier("check-single-remote-metrics", NrOfNodes) {
+ cachedMetrics must not be (null)
+ metricsFromZnode must not be (null)
+ }
+
+ Thread sleep MetricsRefreshTimeout.toMillis
+
+ LocalCluster.barrier("remote-metrics-is-updated", NrOfNodes) {
+ node.metricsManager.getMetrics("node2") must not be (cachedMetrics)
+ node.metricsManager.getMetrics("node2", false) must not be (metricsFromZnode)
+ }
+
+ val someMetricsGone = new CountDownLatch(1)
+ node.metricsManager.addMonitor(new AllMetricsAvailableMonitor("some-metrics-gone", someMetricsGone, 1))
+
+ LocalCluster.barrier("some-nodes-leave", NrOfNodes).await()
+
+ someMetricsGone.await(10, TimeUnit.SECONDS) must be(true)
+
+ node.metricsManager.getMetrics("node2") must be(None)
+ node.metricsManager.getMetrics("node2", false) must be(None)
+ node.metricsManager.getAllMetrics.size must be(1)
+
+ node.shutdown()
+
+ }
+ }
+
+}
+
+class RemoteMetricsMultiJvmNode2 extends ClusterTestNode {
+
+ import RemoteMetricsMultiJvmSpec._
+
+ val testNodes = NrOfNodes
+
+ "Metrics manager" must {
+ "provide metrics of all nodes in the cluster" in {
+
+ val allMetricsAvaiable = new CountDownLatch(1)
+
+ node.metricsManager.refreshTimeout = MetricsRefreshTimeout
+ node.metricsManager.addMonitor(new AllMetricsAvailableMonitor("all-metrics-available", allMetricsAvaiable, NrOfNodes))
+
+ LocalCluster.barrier("node-start", NrOfNodes).await()
+
+ allMetricsAvaiable.await()
+
+ LocalCluster.barrier("check-all-remote-metrics", NrOfNodes) {
+ node.metricsManager.getAllMetrics.size must be(2)
+ }
+
+ val cachedMetrics = node.metricsManager.getMetrics("node1")
+ val metricsFromZnode = node.metricsManager.getMetrics("node1", false)
+
+ LocalCluster.barrier("check-single-remote-metrics", NrOfNodes) {
+ cachedMetrics must not be (null)
+ metricsFromZnode must not be (null)
+ }
+
+ Thread sleep MetricsRefreshTimeout.toMillis
+
+ LocalCluster.barrier("remote-metrics-is-updated", NrOfNodes) {
+ node.metricsManager.getMetrics("node1") must not be (cachedMetrics)
+ node.metricsManager.getMetrics("node1", false) must not be (metricsFromZnode)
+ }
+
+ LocalCluster.barrier("some-nodes-leave", NrOfNodes) {
+ node.shutdown()
+ }
+ }
+ }
+
+}
+