Internal Metrics API. Fixes #939

* Retreives metrics snapshots of the system the node is running on through JMX monitoring MBeans or Hyperic Sigar (is Sigar library is plugged)
        * Allows to set metrics alteration monitors that are triggered, when specific conditions are satisfied (e.g., not enough memory left on the node)
        * Nodes publish their local metrics to ZNodes
        * In order to maintain good performance, metrics manager internally caches snapshots, and refreshes them from time to time from ZooKeeper
This commit is contained in:
Vasil Remeniuk 2011-08-28 16:07:19 +03:00
parent cb9196c690
commit 3cee2fc8ec
15 changed files with 881 additions and 2 deletions

View file

@ -18,6 +18,11 @@ import com.eaio.uuid.UUID
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.concurrent.{ ConcurrentSkipListSet } import java.util.concurrent.{ ConcurrentSkipListSet }
import akka.cluster.metrics._
import akka.util.Duration
import akka.util.duration._
class ClusterException(message: String) extends AkkaException(message) class ClusterException(message: String) extends AkkaException(message)
object ChangeListener { object ChangeListener {
@ -109,6 +114,76 @@ object NodeAddress {
} }
} }
/*
* Allows user to access metrics of a different nodes in the cluster. Changing metrics can be monitored
* using {@link MetricsAlterationMonitor}
* Metrics of the cluster nodes are distributed through ZooKeeper. For better performance, metrics are
* cached internally, and refreshed from ZooKeeper after an interval
*/
trait NodeMetricsManager {
/*
* Gets metrics of a local node directly from JMX monitoring beans/Hyperic Sigar
*/
def getLocalMetrics: NodeMetrics
/*
* Gets metrics of a specified node
* @param nodeName metrics of the node specified by the name will be returned
* @param useCached if <code>true</code>, returns metrics cached in the metrics manager,
* gets metrics directly from ZooKeeper otherwise
*/
def getMetrics(nodeName: String, useCached: Boolean = true): Option[NodeMetrics]
/*
* Gets cached metrics of all nodes in the cluster
*/
def getAllMetrics: Array[NodeMetrics]
/*
* Adds monitor that reacts, when specific conditions are satisfied
*/
def addMonitor(monitor: MetricsAlterationMonitor): Unit
/*
* Removes monitor
*/
def removeMonitor(monitor: MetricsAlterationMonitor): Unit
/*
* Removes metrics of s specified node from ZooKeeper and metrics manager cache
*/
def removeNodeMetrics(nodeName: String): Unit
/*
* Sets timeout after which metrics, cached in the metrics manager, will be refreshed from ZooKeeper
*/
def refreshTimeout_=(newValue: Duration): Unit
/*
* Timeout after which metrics, cached in the metrics manager, will be refreshed from ZooKeeper
*/
def refreshTimeout: Duration
/*
* Starts metrics manager. When metrics manager is started, it refreshes cache from ZooKeeper
* after <code>refreshTimeout</code>, and invokes plugged monitors
*/
def start(): NodeMetricsManager
/*
* Stops metrics manager. Stopped metrics manager doesn't refresh cache from ZooKeeper,
* and doesn't invoke plugged monitors
*/
def stop(): Unit
/*
* If the value is <code>true</code>, metrics manages is started and running. Stopped, otherwise
*/
def isRunning: Boolean
}
/** /**
* Interface for cluster node. * Interface for cluster node.
* *
@ -139,6 +214,8 @@ trait ClusterNode {
def reconnect(): ClusterNode def reconnect(): ClusterNode
def metricsManager: NodeMetricsManager
/** /**
* Registers a cluster change listener. * Registers a cluster change listener.
*/ */

View file

@ -0,0 +1,82 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.metrics
/*
* {@link NodeMetricsManager} periodically refershes internal cache with node metrics from MBeans / Sigar.
* Every time local cache is refreshed, monitors plugged to the metrics manager are invoked.
* If updated metrics satisfy conditions, specified in <code>reactsOn</code>,
* <code>react</code> is called
*
* @exampl {{{
* class PeakCPULoadMonitor extends LocalMetricsAlterationMonitor {
* val id = "peak-cpu-load-monitor"
*
* def reactsOn(metrics: NodeMetrics) =
* metrics.systemLoadAverage > 0.8
*
* def react(metrics: NodeMetrics) =
* println("Peak average system load at node [%s] is reached!" format (metrics.nodeName))
* }
* }}}
*
*/
trait LocalMetricsAlterationMonitor extends MetricsAlterationMonitor {
/*
* Definies conditions that must be satisfied in order to <code>react<code> on the changed metrics
*/
def reactsOn(metrics: NodeMetrics): Boolean
/*
* Reacts on the changed metrics
*/
def react(metrics: NodeMetrics): Unit
}
/*
* {@link NodeMetricsManager} periodically refershes internal cache with metrics of all nodes in the cluster
* from ZooKeeper. Every time local cache is refreshed, monitors plugged to the metrics manager are invoked.
* If updated metrics satisfy conditions, specified in <code>reactsOn</code>,
* <code>react</code> is called
*
* @exampl {{{
* class PeakCPULoadReached extends ClusterMetricsAlterationMonitor {
* val id = "peak-cpu-load-reached"
*
* def reactsOn(metrics: Array[NodeMetrics]) =
* metrics.forall(_.systemLoadAverage > 0.8)
*
* def react(metrics: Array[NodeMetrics]) =
* println("One of the nodes in the scluster has reached the peak system load!")
* }
* }}}
*
*/
trait ClusterMetricsAlterationMonitor extends MetricsAlterationMonitor {
/*
* Definies conditions that must be satisfied in order to <code>react<code> on the changed metrics
*/
def reactsOn(allMetrics: Array[NodeMetrics]): Boolean
/*
* Reacts on the changed metrics
*/
def react(allMetrics: Array[NodeMetrics]): Unit
}
sealed trait MetricsAlterationMonitor extends Comparable[MetricsAlterationMonitor] {
/*
* Unique identiifier of the monitor
*/
def id: String
def compareTo(otherMonitor: MetricsAlterationMonitor) = id.compareTo(otherMonitor.id)
}

View file

@ -0,0 +1,44 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.metrics
/*
* Snapshot of the JVM / system that's the node is running on
*/
trait NodeMetrics {
/*
* Name of the node the metrics are gathered at
*/
def nodeName: String
/*
* Amount of heap memory currently used
*/
def usedHeapMemory: Long
/*
* Amount of heap memory guaranteed to be available
*/
def committedHeapMemory: Long
/*
* Maximum amount of heap memory that can be used
*/
def maxHeapMemory: Long
/*
* Number of the processors avalable to the JVM
*/
def avaiableProcessors: Int
/*
* If OS-specific Hyperic Sigar library is plugged, it's used to calculate
* average load on the CPUs in the system. Otherwise, value is retreived from monitoring MBeans.
* Hyperic Sigar provides more precise values, and, thus, if the library is provided, it's used by default.
*/
def systemLoadAverage: Double
}

View file

@ -19,6 +19,7 @@ import scala.collection.mutable.ConcurrentMap
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import akka.util._ import akka.util._
import duration._
import Helpers._ import Helpers._
import akka.actor._ import akka.actor._
@ -39,6 +40,7 @@ import akka.serialization.{ Serialization, Serializer, ActorSerialization }
import ActorSerialization._ import ActorSerialization._
import akka.serialization.Compression.LZF import akka.serialization.Compression.LZF
import akka.cluster.metrics._
import akka.cluster.zookeeper._ import akka.cluster.zookeeper._
import ChangeListener._ import ChangeListener._
import ClusterProtocol._ import ClusterProtocol._
@ -150,6 +152,7 @@ object Cluster {
val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181") val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181")
val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552) val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552)
val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt
val metricsRefreshInterval = Duration(config.getInt("akka.cluster.metrics-refresh-timeout", 2), TIME_UNIT)
val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt
val maxTimeToWaitUntilConnected = Duration(config.getInt("akka.cluster.max-time-to-wait-until-connected", 30), TIME_UNIT).toMillis.toInt val maxTimeToWaitUntilConnected = Duration(config.getInt("akka.cluster.max-time-to-wait-until-connected", 30), TIME_UNIT).toMillis.toInt
val shouldCompressData = config.getBool("akka.cluster.use-compression", false) val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
@ -304,6 +307,8 @@ class DefaultClusterNode private[akka] (
lazy val remoteServerAddress: InetSocketAddress = remoteService.address lazy val remoteServerAddress: InetSocketAddress = remoteService.address
lazy val metricsManager: NodeMetricsManager = new LocalNodeMetricsManager(zkClient, Cluster.metricsRefreshInterval).start()
// static nodes // static nodes
val CLUSTER_PATH = "/" + nodeAddress.clusterName val CLUSTER_PATH = "/" + nodeAddress.clusterName
val MEMBERSHIP_PATH = CLUSTER_PATH + "/members" val MEMBERSHIP_PATH = CLUSTER_PATH + "/members"
@ -314,6 +319,7 @@ class DefaultClusterNode private[akka] (
val ACTOR_UUID_REGISTRY_PATH = CLUSTER_PATH + "/actor-uuid-registry" val ACTOR_UUID_REGISTRY_PATH = CLUSTER_PATH + "/actor-uuid-registry"
val ACTOR_ADDRESS_TO_UUIDS_PATH = CLUSTER_PATH + "/actor-address-to-uuids" val ACTOR_ADDRESS_TO_UUIDS_PATH = CLUSTER_PATH + "/actor-address-to-uuids"
val NODE_TO_ACTOR_UUIDS_PATH = CLUSTER_PATH + "/node-to-actors-uuids" val NODE_TO_ACTOR_UUIDS_PATH = CLUSTER_PATH + "/node-to-actors-uuids"
val NODE_METRICS = CLUSTER_PATH + "/metrics"
val basePaths = List( val basePaths = List(
CLUSTER_PATH, CLUSTER_PATH,
@ -324,7 +330,8 @@ class DefaultClusterNode private[akka] (
NODE_TO_ACTOR_UUIDS_PATH, NODE_TO_ACTOR_UUIDS_PATH,
ACTOR_ADDRESS_TO_UUIDS_PATH, ACTOR_ADDRESS_TO_UUIDS_PATH,
CONFIGURATION_PATH, CONFIGURATION_PATH,
PROVISIONING_PATH) PROVISIONING_PATH,
NODE_METRICS)
val LEADER_ELECTION_PATH = CLUSTER_PATH + "/leader" // should NOT be part of 'basePaths' only used by 'leaderLock' val LEADER_ELECTION_PATH = CLUSTER_PATH + "/leader" // should NOT be part of 'basePaths' only used by 'leaderLock'
@ -1641,7 +1648,11 @@ class MembershipChildListener(self: ClusterNode) extends IZkChildListener with E
// publish NodeConnected and NodeDisconnect events to the listeners // publish NodeConnected and NodeDisconnect events to the listeners
newlyConnectedMembershipNodes foreach (node self.publish(NodeConnected(node))) newlyConnectedMembershipNodes foreach (node self.publish(NodeConnected(node)))
newlyDisconnectedMembershipNodes foreach (node self.publish(NodeDisconnected(node))) newlyDisconnectedMembershipNodes foreach { node
self.publish(NodeDisconnected(node))
// remove metrics of a disconnected node from ZK and local cache
self.metricsManager.removeNodeMetrics(node)
}
} }
} }
} }

View file

@ -132,6 +132,11 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
(s.version, s.connections.values) (s.version, s.connections.values)
} }
def versionedSocketAddressToRefMap = {
val s = state.get
(s.version, s.connections)
}
def size: Int = state.get().connections.size def size: Int = state.get().connections.size
def stopAll() { def stopAll() {

View file

@ -0,0 +1,226 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.metrics
import akka.cluster._
import Cluster._
import akka.cluster.zookeeper._
import akka.actor._
import Actor._
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import java.util.concurrent.{ ConcurrentHashMap, ConcurrentSkipListSet }
import java.util.concurrent.atomic.AtomicReference
import akka.util.{ Duration, Switch }
import akka.util.Helpers._
import akka.util.duration._
import org.I0Itec.zkclient.exception.ZkNoNodeException
import akka.event.EventHandler
/*
* Instance of the metrics manager running on the node. To keep the fine performance, metrics of all the
* nodes in the cluster are cached internally, and refreshed from monitoring MBeans / Sigar (when if's local node),
* of ZooKeeper (if it's metrics of all the nodes in the cluster) after a specified timeout -
* <code>metricsRefreshTimeout</code>
* <code>metricsRefreshTimeout</code> defaults to 2 seconds, and can be declaratively defined through
* akka.conf:
*
* @exampl {{{
* akka.cluster.metrics-refresh-timeout = 2
* }}}
*/
class LocalNodeMetricsManager(zkClient: AkkaZkClient, private val metricsRefreshTimeout: Duration)
extends NodeMetricsManager {
/*
* Provides metrics of the system that the node is running on, through monitoring MBeans, Hyperic Sigar
* and other systems
*/
lazy private val metricsProvider = SigarMetricsProvider(refreshTimeout.toMillis.toInt) fold ((thrw) {
EventHandler.warning(this, """Hyperic Sigar library failed to load due to %s: %s.
All the metrics will be retreived from monitoring MBeans, and may be incorrect at some platforms.
In order to get better metrics, please put "sigar.jar" to the classpath, and add platform-specific native libary to "java.library.path"."""
.format(thrw.getClass.getName, thrw.getMessage))
new JMXMetricsProvider
},
sigar sigar)
/*
* Metrics of all nodes in the cluster
*/
private val localNodeMetricsCache = new ConcurrentHashMap[String, NodeMetrics]
@volatile
private var _refreshTimeout = metricsRefreshTimeout
/*
* Plugged monitors (both local and cluster-wide)
*/
private val alterationMonitors = new ConcurrentSkipListSet[MetricsAlterationMonitor]
private val _isRunning = new Switch(false)
/*
* If the value is <code>true</code>, metrics manages is started and running. Stopped, otherwise
*/
def isRunning = _isRunning.isOn
/*
* Starts metrics manager. When metrics manager is started, it refreshes cache from ZooKeeper
* after <code>refreshTimeout</code>, and invokes plugged monitors
*/
def start() = {
_isRunning.switchOn { refresh() }
this
}
private[cluster] def metricsForNode(nodeName: String): String = "%s/%s".format(node.NODE_METRICS, nodeName)
/*
* Adds monitor that reacts, when specific conditions are satisfied
*/
def addMonitor(monitor: MetricsAlterationMonitor) = alterationMonitors add monitor
def removeMonitor(monitor: MetricsAlterationMonitor) = alterationMonitors remove monitor
def refreshTimeout_=(newValue: Duration) = _refreshTimeout = newValue
/*
* Timeout after which metrics, cached in the metrics manager, will be refreshed from ZooKeeper
*/
def refreshTimeout = _refreshTimeout
/*
* Stores metrics of the node in ZooKeeper
*/
private[akka] def storeMetricsInZK(metrics: NodeMetrics) = {
val metricsPath = metricsForNode(metrics.nodeName)
if (zkClient.exists(metricsPath)) {
zkClient.writeData(metricsPath, metrics)
} else {
ignore[ZkNoNodeException](zkClient.createEphemeral(metricsPath, metrics))
}
}
/*
* Gets metrics of the node from ZooKeeper
*/
private[akka] def getMetricsFromZK(nodeName: String) = {
zkClient.readData[NodeMetrics](metricsForNode(nodeName))
}
/*
* Removed metrics of the node from local cache and ZooKeeper
*/
def removeNodeMetrics(nodeName: String) = {
val metricsPath = metricsForNode(nodeName)
if (zkClient.exists(metricsPath)) {
ignore[ZkNoNodeException](zkClient.delete(metricsPath))
}
localNodeMetricsCache.remove(nodeName)
}
/*
* Gets metrics of a local node directly from JMX monitoring beans/Hyperic Sigar
*/
def getLocalMetrics = metricsProvider.getLocalMetrics
/*
* Gets metrics of the node, specified by the name. If <code>useCached</code> is true (default value),
* metrics snapshot is taken from the local cache; otherwise, it's retreived from ZooKeeper'
*/
def getMetrics(nodeName: String, useCached: Boolean = true): Option[NodeMetrics] =
if (useCached)
Option(localNodeMetricsCache.get(nodeName))
else
try {
Some(getMetricsFromZK(nodeName))
} catch {
case ex: ZkNoNodeException None
}
/*
* Return metrics of all nodes in the cluster from ZooKeeper
*/
private[akka] def getAllMetricsFromZK: Map[String, NodeMetrics] = {
val metricsPaths = zkClient.getChildren(node.NODE_METRICS).toList.toArray.asInstanceOf[Array[String]]
metricsPaths.flatMap { nodeName getMetrics(nodeName, false).map((nodeName, _)) } toMap
}
/*
* Gets cached metrics of all nodes in the cluster
*/
def getAllMetrics: Array[NodeMetrics] = localNodeMetricsCache.values.asScala.toArray
/*
* Refreshes locally cached metrics from ZooKeeper, and invokes plugged monitors
*/
private[akka] def refresh(): Unit = {
storeMetricsInZK(getLocalMetrics)
refreshMetricsCacheFromZK()
if (isRunning) {
Scheduler.schedule({ () refresh() }, refreshTimeout.length, refreshTimeout.length, refreshTimeout.unit)
invokeMonitors()
}
}
/*
* Refreshes metrics manager cache from ZooKeeper
*/
private def refreshMetricsCacheFromZK(): Unit = {
val allMetricsFromZK = getAllMetricsFromZK
localNodeMetricsCache.keySet.foreach { key
if (!allMetricsFromZK.contains(key))
localNodeMetricsCache.remove(key)
}
// RACY: metrics for the node might have been removed both from ZK and local cache by the moment,
// but will be re-cached, since they're still present in allMetricsFromZK snapshot. Not important, because
// cache will be fixed soon, at the next iteration of refresh
allMetricsFromZK map {
case (node, metrics)
localNodeMetricsCache.put(node, metrics)
}
}
/*
* Invokes monitors with the cached metrics
*/
private def invokeMonitors(): Unit = if (!alterationMonitors.isEmpty) {
// RACY: metrics for some nodes might have been removed/added by that moment. Not important,
// because monitors will be fed with up-to-date metrics shortly, at the next iteration of refresh
val clusterNodesMetrics = getAllMetrics
val localNodeMetrics = clusterNodesMetrics.find(_.nodeName == nodeAddress.nodeName)
val iterator = alterationMonitors.iterator
// RACY: there might be new monitors added after the iterator has been obtained. Not important,
// becuse refresh interval is meant to be very short, and all the new monitors will be called ad the
// next refresh iteration
while (iterator.hasNext) {
val monitor = iterator.next
monitor match {
case localMonitor: LocalMetricsAlterationMonitor
localNodeMetrics.map { metrics
if (localMonitor reactsOn metrics)
localMonitor react metrics
}
case clusterMonitor: ClusterMetricsAlterationMonitor
if (clusterMonitor reactsOn clusterNodesMetrics)
clusterMonitor react clusterNodesMetrics
}
}
}
def stop() = _isRunning.switchOff
}

View file

@ -0,0 +1,154 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.metrics
import akka.cluster._
import akka.event.EventHandler
import java.lang.management.ManagementFactory
import akka.util.ReflectiveAccess._
import akka.util.Switch
/*
* Snapshot of the JVM / system that's the node is running on
*
* @param nodeName name of the node, where metrics are gathered at
* @param usedHeapMemory amount of heap memory currently used
* @param committedHeapMemory amount of heap memory guaranteed to be available
* @param maxHeapMemory maximum amount of heap memory that can be used
* @param avaiableProcessors number of the processors avalable to the JVM
* @param systemLoadAverage system load average. If OS-specific Sigar's native library is plugged,
* it's used to calculate average load on the CPUs in the system. Otherwise, value is retreived from monitoring
* MBeans. Hyperic Sigar provides more precise values, and, thus, if the library is provided, it's used by default.
*
*/
case class DefaultNodeMetrics(nodeName: String,
usedHeapMemory: Long,
committedHeapMemory: Long,
maxHeapMemory: Long,
avaiableProcessors: Int,
systemLoadAverage: Double) extends NodeMetrics
object MetricsProvider {
/*
* Maximum value of system load average
*/
val MAX_SYS_LOAD_AVG = 1
/*
* Minimum value of system load average
*/
val MIN_SYS_LOAD_AVG = 0
/*
* Default value of system load average
*/
val DEF_SYS_LOAD_AVG = 0.5
}
/*
* Abstracts metrics provider that returns metrics of the system the node is running at
*/
trait MetricsProvider {
/*
* Gets metrics of the local system
*/
def getLocalMetrics: NodeMetrics
}
/*
* Loads JVM metrics through JMX monitoring beans
*/
class JMXMetricsProvider extends MetricsProvider {
import MetricsProvider._
private val memoryMXBean = ManagementFactory.getMemoryMXBean
private val osMXBean = ManagementFactory.getOperatingSystemMXBean
/*
* Validates and calculates system load average
*
* @param avg system load average obtained from a specific monitoring provider (may be incorrect)
* @return system load average, or default value(<code>0.5</code>), if passed value was out of permitted
* bounds (0.0 to 1.0)
*/
@inline
protected final def calcSystemLoadAverage(avg: Double) =
if (avg >= MIN_SYS_LOAD_AVG && avg <= MAX_SYS_LOAD_AVG) avg else DEF_SYS_LOAD_AVG
protected def systemLoadAverage = calcSystemLoadAverage(osMXBean.getSystemLoadAverage)
def getLocalMetrics =
DefaultNodeMetrics(Cluster.nodeAddress.nodeName,
memoryMXBean.getHeapMemoryUsage.getUsed,
memoryMXBean.getHeapMemoryUsage.getCommitted,
memoryMXBean.getHeapMemoryUsage.getMax,
osMXBean.getAvailableProcessors,
systemLoadAverage)
}
/*
* Loads wider range of metrics of a better quality with Hyperic Sigar (native library)
*
* @param refreshTimeout Sigar gathers metrics during this interval
*/
class SigarMetricsProvider private (private val sigarInstance: AnyRef) extends JMXMetricsProvider {
private val reportErrors = new Switch(true)
private val getCpuPercMethod = sigarInstance.getClass.getMethod("getCpuPerc")
private val sigarCpuCombinedMethod = getCpuPercMethod.getReturnType.getMethod("getCombined")
/*
* Wraps reflective calls to Hyperic Sigar
*
* @param f reflective call to Hyperic Sigar
* @param fallback function, which is invoked, if call to Sigar has been finished with exception
*/
private def callSigarMethodOrElse[T](callSigar: T, fallback: T): T =
try callSigar catch {
case thrw
reportErrors.switchOff {
EventHandler.warning(this, "Failed to get metrics from Hyperic Sigar. %s: %s"
.format(thrw.getClass.getName, thrw.getMessage))
}
fallback
}
/*
* Obtains system load average from Sigar
* If the value cannot be obtained, falls back to system load average taken from JMX
*/
override def systemLoadAverage = callSigarMethodOrElse(
calcSystemLoadAverage(sigarCpuCombinedMethod
.invoke(getCpuPercMethod.invoke(sigarInstance)).asInstanceOf[Double]),
super.systemLoadAverage)
}
object SigarMetricsProvider {
/*
* Instantiates Sigar metrics provider through reflections, in order to avoid creating dependencies to
* Hiperic Sigar library
*/
def apply(refreshTimeout: Int): Either[Throwable, MetricsProvider] = try {
for {
sigarInstance createInstance[AnyRef]("org.hyperic.sigar.Sigar", noParams, noArgs).right
sigarProxyCacheClass: Class[_] getClassFor("org.hyperic.sigar.SigarProxyCache").right
} yield new SigarMetricsProvider(sigarProxyCacheClass
.getMethod("newInstance", Array(sigarInstance.getClass, classOf[Int]): _*)
.invoke(null, sigarInstance, new java.lang.Integer(refreshTimeout)))
} catch {
case thrw Left(thrw)
}
}

View file

@ -0,0 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handlers = ["akka.testkit.TestEventListener"]
akka.event-handler-level = "WARNING"
akka.cluster.metrics-refresh-timeout = 1

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991

View file

@ -0,0 +1,134 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.metrics.local
import akka.cluster._
import akka.actor._
import Actor._
import Cluster._
import akka.dispatch._
import akka.util.Duration
import akka.util.duration._
import akka.cluster.metrics._
import java.util.concurrent.atomic.AtomicInteger
object LocalMetricsMultiJvmSpec {
val NrOfNodes = 1
}
class LocalMetricsMultiJvmNode1 extends MasterClusterTestNode {
import LocalMetricsMultiJvmSpec._
val testNodes = NrOfNodes
override def beforeAll = {
super.beforeAll()
node
}
override def afterAll = {
node.shutdown()
super.afterAll()
}
"Metrics manager" must {
def timeout = node.metricsManager.refreshTimeout
"be initialized with refresh timeout value, specified in akka.conf" in {
timeout must be(1.second)
}
"return up-to-date local node metrics straight from MBeans/Sigar" in {
node.metricsManager.getLocalMetrics must not be (null)
node.metricsManager.getLocalMetrics.systemLoadAverage must be(0.5 plusOrMinus 0.5)
}
"return metrics cached in the MetricsManagerLocalMetrics" in {
node.metricsManager.getMetrics(nodeAddress.nodeName) must not be (null)
}
"return local node metrics from ZNode" in {
node.metricsManager.getMetrics(nodeAddress.nodeName, false) must not be (null)
}
"return cached metrics of all nodes in the cluster" in {
node.metricsManager.getAllMetrics.size must be(1)
node.metricsManager.getAllMetrics.find(_.nodeName == "node1") must not be (null)
}
"throw no exceptions, when user attempts to get metrics of a non-existing node" in {
node.metricsManager.getMetrics("nonexisting") must be(None)
node.metricsManager.getMetrics("nonexisting", false) must be(None)
}
"regularly update cached metrics" in {
val oldMetrics = node.metricsManager.getLocalMetrics
Thread sleep timeout.toMillis
node.metricsManager.getLocalMetrics must not be (oldMetrics)
}
"allow to track JVM state and bind handles through MetricsAlterationMonitors" in {
val monitorReponse = new DefaultPromise[String]
node.metricsManager.addMonitor(new LocalMetricsAlterationMonitor {
val id = "heapMemoryThresholdMonitor"
def reactsOn(metrics: NodeMetrics) = metrics.usedHeapMemory > 1
def react(metrics: NodeMetrics) = monitorReponse.completeWithResult("Too much memory is used!")
})
monitorReponse.get must be("Too much memory is used!")
}
class FooMonitor(monitorWorked: AtomicInteger) extends LocalMetricsAlterationMonitor {
val id = "fooMonitor"
def reactsOn(metrics: NodeMetrics) = true
def react(metrics: NodeMetrics) = monitorWorked.set(monitorWorked.get + 1)
}
"allow to unregister the monitor" in {
val monitorWorked = new AtomicInteger(0)
val fooMonitor = new FooMonitor(monitorWorked)
node.metricsManager.addMonitor(fooMonitor)
node.metricsManager.removeMonitor(fooMonitor)
val oldValue = monitorWorked.get
Thread sleep timeout.toMillis
monitorWorked.get must be(oldValue)
}
"stop notifying monitors, when stopped" in {
node.metricsManager.stop()
val monitorWorked = new AtomicInteger(0)
node.metricsManager.addMonitor(new LocalMetricsAlterationMonitor {
val id = "fooMonitor"
def reactsOn(metrics: NodeMetrics) = true
def react(metrics: NodeMetrics) = monitorWorked.set(monitorWorked.get + 1)
})
monitorWorked.get must be(0)
node.metricsManager.start()
Thread sleep (timeout.toMillis * 2)
monitorWorked.get must be > (1)
}
}
}

View file

@ -0,0 +1,3 @@
akka.enabled-modules = ["cluster"]
akka.event-handlers = ["akka.testkit.TestEventListener"]
akka.event-handler-level = "WARNING"

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991

View file

@ -0,0 +1,3 @@
akka.enabled-modules = ["cluster"]
akka.event-handlers = ["akka.testkit.TestEventListener"]
akka.event-handler-level = "WARNING"

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992

View file

@ -0,0 +1,133 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.metrics.remote
import akka.cluster._
import akka.actor._
import Actor._
import Cluster._
import akka.dispatch._
import akka.util.Duration
import akka.util.duration._
import akka.cluster.metrics._
import java.util.concurrent._
import atomic.AtomicInteger
object RemoteMetricsMultiJvmSpec {
val NrOfNodes = 2
val MetricsRefreshTimeout = 100.millis
}
class AllMetricsAvailableMonitor(_id: String, completionLatch: CountDownLatch, clusterSize: Int) extends ClusterMetricsAlterationMonitor {
val id = _id
def reactsOn(allMetrics: Array[NodeMetrics]) = allMetrics.size == clusterSize
def react(allMetrics: Array[NodeMetrics]) = completionLatch.countDown
}
class RemoteMetricsMultiJvmNode1 extends MasterClusterTestNode {
import RemoteMetricsMultiJvmSpec._
val testNodes = NrOfNodes
"Metrics manager" must {
"provide metrics of all nodes in the cluster" in {
val allMetricsAvaiable = new CountDownLatch(1)
node.metricsManager.refreshTimeout = MetricsRefreshTimeout
node.metricsManager.addMonitor(new AllMetricsAvailableMonitor("all-metrics-available", allMetricsAvaiable, NrOfNodes))
LocalCluster.barrier("node-start", NrOfNodes).await()
allMetricsAvaiable.await()
LocalCluster.barrier("check-all-remote-metrics", NrOfNodes) {
node.metricsManager.getAllMetrics.size must be(2)
}
val cachedMetrics = node.metricsManager.getMetrics("node2")
val metricsFromZnode = node.metricsManager.getMetrics("node2", false)
LocalCluster.barrier("check-single-remote-metrics", NrOfNodes) {
cachedMetrics must not be (null)
metricsFromZnode must not be (null)
}
Thread sleep MetricsRefreshTimeout.toMillis
LocalCluster.barrier("remote-metrics-is-updated", NrOfNodes) {
node.metricsManager.getMetrics("node2") must not be (cachedMetrics)
node.metricsManager.getMetrics("node2", false) must not be (metricsFromZnode)
}
val someMetricsGone = new CountDownLatch(1)
node.metricsManager.addMonitor(new AllMetricsAvailableMonitor("some-metrics-gone", someMetricsGone, 1))
LocalCluster.barrier("some-nodes-leave", NrOfNodes).await()
someMetricsGone.await(10, TimeUnit.SECONDS) must be(true)
node.metricsManager.getMetrics("node2") must be(None)
node.metricsManager.getMetrics("node2", false) must be(None)
node.metricsManager.getAllMetrics.size must be(1)
node.shutdown()
}
}
}
class RemoteMetricsMultiJvmNode2 extends ClusterTestNode {
import RemoteMetricsMultiJvmSpec._
val testNodes = NrOfNodes
"Metrics manager" must {
"provide metrics of all nodes in the cluster" in {
val allMetricsAvaiable = new CountDownLatch(1)
node.metricsManager.refreshTimeout = MetricsRefreshTimeout
node.metricsManager.addMonitor(new AllMetricsAvailableMonitor("all-metrics-available", allMetricsAvaiable, NrOfNodes))
LocalCluster.barrier("node-start", NrOfNodes).await()
allMetricsAvaiable.await()
LocalCluster.barrier("check-all-remote-metrics", NrOfNodes) {
node.metricsManager.getAllMetrics.size must be(2)
}
val cachedMetrics = node.metricsManager.getMetrics("node1")
val metricsFromZnode = node.metricsManager.getMetrics("node1", false)
LocalCluster.barrier("check-single-remote-metrics", NrOfNodes) {
cachedMetrics must not be (null)
metricsFromZnode must not be (null)
}
Thread sleep MetricsRefreshTimeout.toMillis
LocalCluster.barrier("remote-metrics-is-updated", NrOfNodes) {
node.metricsManager.getMetrics("node1") must not be (cachedMetrics)
node.metricsManager.getMetrics("node1", false) must not be (metricsFromZnode)
}
LocalCluster.barrier("some-nodes-leave", NrOfNodes) {
node.shutdown()
}
}
}
}