Merge ClusterActoRef & RoutedActorRef: After merge with master, part 2
This commit is contained in:
commit
650e78b2f7
16 changed files with 891 additions and 6 deletions
|
|
@ -18,6 +18,11 @@ import com.eaio.uuid.UUID
|
|||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent.{ ConcurrentSkipListSet }
|
||||
|
||||
import akka.cluster.metrics._
|
||||
|
||||
import akka.util.Duration
|
||||
import akka.util.duration._
|
||||
|
||||
class ClusterException(message: String) extends AkkaException(message)
|
||||
|
||||
object ChangeListener {
|
||||
|
|
@ -109,6 +114,76 @@ object NodeAddress {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Allows user to access metrics of a different nodes in the cluster. Changing metrics can be monitored
|
||||
* using {@link MetricsAlterationMonitor}
|
||||
* Metrics of the cluster nodes are distributed through ZooKeeper. For better performance, metrics are
|
||||
* cached internally, and refreshed from ZooKeeper after an interval
|
||||
*/
|
||||
trait NodeMetricsManager {
|
||||
|
||||
/*
|
||||
* Gets metrics of a local node directly from JMX monitoring beans/Hyperic Sigar
|
||||
*/
|
||||
def getLocalMetrics: NodeMetrics
|
||||
|
||||
/*
|
||||
* Gets metrics of a specified node
|
||||
* @param nodeName metrics of the node specified by the name will be returned
|
||||
* @param useCached if <code>true</code>, returns metrics cached in the metrics manager,
|
||||
* gets metrics directly from ZooKeeper otherwise
|
||||
*/
|
||||
def getMetrics(nodeName: String, useCached: Boolean = true): Option[NodeMetrics]
|
||||
|
||||
/*
|
||||
* Gets cached metrics of all nodes in the cluster
|
||||
*/
|
||||
def getAllMetrics: Array[NodeMetrics]
|
||||
|
||||
/*
|
||||
* Adds monitor that reacts, when specific conditions are satisfied
|
||||
*/
|
||||
def addMonitor(monitor: MetricsAlterationMonitor): Unit
|
||||
|
||||
/*
|
||||
* Removes monitor
|
||||
*/
|
||||
def removeMonitor(monitor: MetricsAlterationMonitor): Unit
|
||||
|
||||
/*
|
||||
* Removes metrics of s specified node from ZooKeeper and metrics manager cache
|
||||
*/
|
||||
def removeNodeMetrics(nodeName: String): Unit
|
||||
|
||||
/*
|
||||
* Sets timeout after which metrics, cached in the metrics manager, will be refreshed from ZooKeeper
|
||||
*/
|
||||
def refreshTimeout_=(newValue: Duration): Unit
|
||||
|
||||
/*
|
||||
* Timeout after which metrics, cached in the metrics manager, will be refreshed from ZooKeeper
|
||||
*/
|
||||
def refreshTimeout: Duration
|
||||
|
||||
/*
|
||||
* Starts metrics manager. When metrics manager is started, it refreshes cache from ZooKeeper
|
||||
* after <code>refreshTimeout</code>, and invokes plugged monitors
|
||||
*/
|
||||
def start(): NodeMetricsManager
|
||||
|
||||
/*
|
||||
* Stops metrics manager. Stopped metrics manager doesn't refresh cache from ZooKeeper,
|
||||
* and doesn't invoke plugged monitors
|
||||
*/
|
||||
def stop(): Unit
|
||||
|
||||
/*
|
||||
* If the value is <code>true</code>, metrics manages is started and running. Stopped, otherwise
|
||||
*/
|
||||
def isRunning: Boolean
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Interface for cluster node.
|
||||
*
|
||||
|
|
@ -139,6 +214,8 @@ trait ClusterNode {
|
|||
|
||||
def reconnect(): ClusterNode
|
||||
|
||||
def metricsManager: NodeMetricsManager
|
||||
|
||||
/**
|
||||
* Registers a cluster change listener.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -0,0 +1,82 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.metrics
|
||||
|
||||
/*
|
||||
* {@link NodeMetricsManager} periodically refershes internal cache with node metrics from MBeans / Sigar.
|
||||
* Every time local cache is refreshed, monitors plugged to the metrics manager are invoked.
|
||||
* If updated metrics satisfy conditions, specified in <code>reactsOn</code>,
|
||||
* <code>react</code> is called
|
||||
*
|
||||
* @exampl {{{
|
||||
* class PeakCPULoadMonitor extends LocalMetricsAlterationMonitor {
|
||||
* val id = "peak-cpu-load-monitor"
|
||||
*
|
||||
* def reactsOn(metrics: NodeMetrics) =
|
||||
* metrics.systemLoadAverage > 0.8
|
||||
*
|
||||
* def react(metrics: NodeMetrics) =
|
||||
* println("Peak average system load at node [%s] is reached!" format (metrics.nodeName))
|
||||
* }
|
||||
* }}}
|
||||
*
|
||||
*/
|
||||
trait LocalMetricsAlterationMonitor extends MetricsAlterationMonitor {
|
||||
|
||||
/*
|
||||
* Definies conditions that must be satisfied in order to <code>react<code> on the changed metrics
|
||||
*/
|
||||
def reactsOn(metrics: NodeMetrics): Boolean
|
||||
|
||||
/*
|
||||
* Reacts on the changed metrics
|
||||
*/
|
||||
def react(metrics: NodeMetrics): Unit
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* {@link NodeMetricsManager} periodically refershes internal cache with metrics of all nodes in the cluster
|
||||
* from ZooKeeper. Every time local cache is refreshed, monitors plugged to the metrics manager are invoked.
|
||||
* If updated metrics satisfy conditions, specified in <code>reactsOn</code>,
|
||||
* <code>react</code> is called
|
||||
*
|
||||
* @exampl {{{
|
||||
* class PeakCPULoadReached extends ClusterMetricsAlterationMonitor {
|
||||
* val id = "peak-cpu-load-reached"
|
||||
*
|
||||
* def reactsOn(metrics: Array[NodeMetrics]) =
|
||||
* metrics.forall(_.systemLoadAverage > 0.8)
|
||||
*
|
||||
* def react(metrics: Array[NodeMetrics]) =
|
||||
* println("One of the nodes in the scluster has reached the peak system load!")
|
||||
* }
|
||||
* }}}
|
||||
*
|
||||
*/
|
||||
trait ClusterMetricsAlterationMonitor extends MetricsAlterationMonitor {
|
||||
|
||||
/*
|
||||
* Definies conditions that must be satisfied in order to <code>react<code> on the changed metrics
|
||||
*/
|
||||
def reactsOn(allMetrics: Array[NodeMetrics]): Boolean
|
||||
|
||||
/*
|
||||
* Reacts on the changed metrics
|
||||
*/
|
||||
def react(allMetrics: Array[NodeMetrics]): Unit
|
||||
|
||||
}
|
||||
|
||||
sealed trait MetricsAlterationMonitor extends Comparable[MetricsAlterationMonitor] {
|
||||
|
||||
/*
|
||||
* Unique identiifier of the monitor
|
||||
*/
|
||||
def id: String
|
||||
|
||||
def compareTo(otherMonitor: MetricsAlterationMonitor) = id.compareTo(otherMonitor.id)
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.metrics
|
||||
|
||||
/*
|
||||
* Snapshot of the JVM / system that's the node is running on
|
||||
*/
|
||||
trait NodeMetrics {
|
||||
|
||||
/*
|
||||
* Name of the node the metrics are gathered at
|
||||
*/
|
||||
def nodeName: String
|
||||
|
||||
/*
|
||||
* Amount of heap memory currently used
|
||||
*/
|
||||
def usedHeapMemory: Long
|
||||
|
||||
/*
|
||||
* Amount of heap memory guaranteed to be available
|
||||
*/
|
||||
def committedHeapMemory: Long
|
||||
|
||||
/*
|
||||
* Maximum amount of heap memory that can be used
|
||||
*/
|
||||
def maxHeapMemory: Long
|
||||
|
||||
/*
|
||||
* Number of the processors avalable to the JVM
|
||||
*/
|
||||
def avaiableProcessors: Int
|
||||
|
||||
/*
|
||||
* If OS-specific Hyperic Sigar library is plugged, it's used to calculate
|
||||
* average load on the CPUs in the system. Otherwise, value is retreived from monitoring MBeans.
|
||||
* Hyperic Sigar provides more precise values, and, thus, if the library is provided, it's used by default.
|
||||
*/
|
||||
def systemLoadAverage: Double
|
||||
|
||||
}
|
||||
|
|
@ -19,6 +19,7 @@ import scala.collection.mutable.ConcurrentMap
|
|||
import scala.collection.JavaConversions._
|
||||
|
||||
import akka.util._
|
||||
import duration._
|
||||
import Helpers._
|
||||
|
||||
import akka.actor._
|
||||
|
|
@ -39,6 +40,7 @@ import akka.serialization.{ Serialization, Serializer, ActorSerialization }
|
|||
import ActorSerialization._
|
||||
import akka.serialization.Compression.LZF
|
||||
|
||||
import akka.cluster.metrics._
|
||||
import akka.cluster.zookeeper._
|
||||
import ChangeListener._
|
||||
import ClusterProtocol._
|
||||
|
|
@ -150,6 +152,7 @@ object Cluster {
|
|||
val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181")
|
||||
val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552)
|
||||
val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt
|
||||
val metricsRefreshInterval = Duration(config.getInt("akka.cluster.metrics-refresh-timeout", 2), TIME_UNIT)
|
||||
val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt
|
||||
val maxTimeToWaitUntilConnected = Duration(config.getInt("akka.cluster.max-time-to-wait-until-connected", 30), TIME_UNIT).toMillis.toInt
|
||||
val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
|
||||
|
|
@ -304,6 +307,8 @@ class DefaultClusterNode private[akka] (
|
|||
|
||||
lazy val remoteServerAddress: InetSocketAddress = remoteService.address
|
||||
|
||||
lazy val metricsManager: NodeMetricsManager = new LocalNodeMetricsManager(zkClient, Cluster.metricsRefreshInterval).start()
|
||||
|
||||
// static nodes
|
||||
val CLUSTER_PATH = "/" + nodeAddress.clusterName
|
||||
val MEMBERSHIP_PATH = CLUSTER_PATH + "/members"
|
||||
|
|
@ -314,6 +319,7 @@ class DefaultClusterNode private[akka] (
|
|||
val ACTOR_UUID_REGISTRY_PATH = CLUSTER_PATH + "/actor-uuid-registry"
|
||||
val ACTOR_ADDRESS_TO_UUIDS_PATH = CLUSTER_PATH + "/actor-address-to-uuids"
|
||||
val NODE_TO_ACTOR_UUIDS_PATH = CLUSTER_PATH + "/node-to-actors-uuids"
|
||||
val NODE_METRICS = CLUSTER_PATH + "/metrics"
|
||||
|
||||
val basePaths = List(
|
||||
CLUSTER_PATH,
|
||||
|
|
@ -324,7 +330,8 @@ class DefaultClusterNode private[akka] (
|
|||
NODE_TO_ACTOR_UUIDS_PATH,
|
||||
ACTOR_ADDRESS_TO_UUIDS_PATH,
|
||||
CONFIGURATION_PATH,
|
||||
PROVISIONING_PATH)
|
||||
PROVISIONING_PATH,
|
||||
NODE_METRICS)
|
||||
|
||||
val LEADER_ELECTION_PATH = CLUSTER_PATH + "/leader" // should NOT be part of 'basePaths' only used by 'leaderLock'
|
||||
|
||||
|
|
@ -1630,7 +1637,11 @@ class MembershipChildListener(self: ClusterNode) extends IZkChildListener with E
|
|||
|
||||
// publish NodeConnected and NodeDisconnect events to the listeners
|
||||
newlyConnectedMembershipNodes foreach (node ⇒ self.publish(NodeConnected(node)))
|
||||
newlyDisconnectedMembershipNodes foreach (node ⇒ self.publish(NodeDisconnected(node)))
|
||||
newlyDisconnectedMembershipNodes foreach { node ⇒
|
||||
self.publish(NodeDisconnected(node))
|
||||
// remove metrics of a disconnected node from ZK and local cache
|
||||
self.metricsManager.removeNodeMetrics(node)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,15 +7,12 @@ import akka.actor._
|
|||
import akka.util._
|
||||
import akka.event.EventHandler
|
||||
import ReflectiveAccess._
|
||||
import akka.dispatch.Future
|
||||
import akka.routing._
|
||||
import RouterType._
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import com.eaio.uuid.UUID
|
||||
|
||||
import collection.immutable.Map
|
||||
import annotation.tailrec
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,226 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.metrics
|
||||
|
||||
import akka.cluster._
|
||||
import Cluster._
|
||||
import akka.cluster.zookeeper._
|
||||
import akka.actor._
|
||||
import Actor._
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.collection.JavaConverters._
|
||||
import java.util.concurrent.{ ConcurrentHashMap, ConcurrentSkipListSet }
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import akka.util.{ Duration, Switch }
|
||||
import akka.util.Helpers._
|
||||
import akka.util.duration._
|
||||
import org.I0Itec.zkclient.exception.ZkNoNodeException
|
||||
import akka.event.EventHandler
|
||||
|
||||
/*
|
||||
* Instance of the metrics manager running on the node. To keep the fine performance, metrics of all the
|
||||
* nodes in the cluster are cached internally, and refreshed from monitoring MBeans / Sigar (when if's local node),
|
||||
* of ZooKeeper (if it's metrics of all the nodes in the cluster) after a specified timeout -
|
||||
* <code>metricsRefreshTimeout</code>
|
||||
* <code>metricsRefreshTimeout</code> defaults to 2 seconds, and can be declaratively defined through
|
||||
* akka.conf:
|
||||
*
|
||||
* @exampl {{{
|
||||
* akka.cluster.metrics-refresh-timeout = 2
|
||||
* }}}
|
||||
*/
|
||||
class LocalNodeMetricsManager(zkClient: AkkaZkClient, private val metricsRefreshTimeout: Duration)
|
||||
extends NodeMetricsManager {
|
||||
|
||||
/*
|
||||
* Provides metrics of the system that the node is running on, through monitoring MBeans, Hyperic Sigar
|
||||
* and other systems
|
||||
*/
|
||||
lazy private val metricsProvider = SigarMetricsProvider(refreshTimeout.toMillis.toInt) fold ((thrw) ⇒ {
|
||||
EventHandler.warning(this, """Hyperic Sigar library failed to load due to %s: %s.
|
||||
All the metrics will be retreived from monitoring MBeans, and may be incorrect at some platforms.
|
||||
In order to get better metrics, please put "sigar.jar" to the classpath, and add platform-specific native libary to "java.library.path"."""
|
||||
.format(thrw.getClass.getName, thrw.getMessage))
|
||||
new JMXMetricsProvider
|
||||
},
|
||||
sigar ⇒ sigar)
|
||||
|
||||
/*
|
||||
* Metrics of all nodes in the cluster
|
||||
*/
|
||||
private val localNodeMetricsCache = new ConcurrentHashMap[String, NodeMetrics]
|
||||
|
||||
@volatile
|
||||
private var _refreshTimeout = metricsRefreshTimeout
|
||||
|
||||
/*
|
||||
* Plugged monitors (both local and cluster-wide)
|
||||
*/
|
||||
private val alterationMonitors = new ConcurrentSkipListSet[MetricsAlterationMonitor]
|
||||
|
||||
private val _isRunning = new Switch(false)
|
||||
|
||||
/*
|
||||
* If the value is <code>true</code>, metrics manages is started and running. Stopped, otherwise
|
||||
*/
|
||||
def isRunning = _isRunning.isOn
|
||||
|
||||
/*
|
||||
* Starts metrics manager. When metrics manager is started, it refreshes cache from ZooKeeper
|
||||
* after <code>refreshTimeout</code>, and invokes plugged monitors
|
||||
*/
|
||||
def start() = {
|
||||
_isRunning.switchOn { refresh() }
|
||||
this
|
||||
}
|
||||
|
||||
private[cluster] def metricsForNode(nodeName: String): String = "%s/%s".format(node.NODE_METRICS, nodeName)
|
||||
|
||||
/*
|
||||
* Adds monitor that reacts, when specific conditions are satisfied
|
||||
*/
|
||||
def addMonitor(monitor: MetricsAlterationMonitor) = alterationMonitors add monitor
|
||||
|
||||
def removeMonitor(monitor: MetricsAlterationMonitor) = alterationMonitors remove monitor
|
||||
|
||||
def refreshTimeout_=(newValue: Duration) = _refreshTimeout = newValue
|
||||
|
||||
/*
|
||||
* Timeout after which metrics, cached in the metrics manager, will be refreshed from ZooKeeper
|
||||
*/
|
||||
def refreshTimeout = _refreshTimeout
|
||||
|
||||
/*
|
||||
* Stores metrics of the node in ZooKeeper
|
||||
*/
|
||||
private[akka] def storeMetricsInZK(metrics: NodeMetrics) = {
|
||||
val metricsPath = metricsForNode(metrics.nodeName)
|
||||
if (zkClient.exists(metricsPath)) {
|
||||
zkClient.writeData(metricsPath, metrics)
|
||||
} else {
|
||||
ignore[ZkNoNodeException](zkClient.createEphemeral(metricsPath, metrics))
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Gets metrics of the node from ZooKeeper
|
||||
*/
|
||||
private[akka] def getMetricsFromZK(nodeName: String) = {
|
||||
zkClient.readData[NodeMetrics](metricsForNode(nodeName))
|
||||
}
|
||||
|
||||
/*
|
||||
* Removed metrics of the node from local cache and ZooKeeper
|
||||
*/
|
||||
def removeNodeMetrics(nodeName: String) = {
|
||||
val metricsPath = metricsForNode(nodeName)
|
||||
if (zkClient.exists(metricsPath)) {
|
||||
ignore[ZkNoNodeException](zkClient.delete(metricsPath))
|
||||
}
|
||||
|
||||
localNodeMetricsCache.remove(nodeName)
|
||||
}
|
||||
|
||||
/*
|
||||
* Gets metrics of a local node directly from JMX monitoring beans/Hyperic Sigar
|
||||
*/
|
||||
def getLocalMetrics = metricsProvider.getLocalMetrics
|
||||
|
||||
/*
|
||||
* Gets metrics of the node, specified by the name. If <code>useCached</code> is true (default value),
|
||||
* metrics snapshot is taken from the local cache; otherwise, it's retreived from ZooKeeper'
|
||||
*/
|
||||
def getMetrics(nodeName: String, useCached: Boolean = true): Option[NodeMetrics] =
|
||||
if (useCached)
|
||||
Option(localNodeMetricsCache.get(nodeName))
|
||||
else
|
||||
try {
|
||||
Some(getMetricsFromZK(nodeName))
|
||||
} catch {
|
||||
case ex: ZkNoNodeException ⇒ None
|
||||
}
|
||||
|
||||
/*
|
||||
* Return metrics of all nodes in the cluster from ZooKeeper
|
||||
*/
|
||||
private[akka] def getAllMetricsFromZK: Map[String, NodeMetrics] = {
|
||||
val metricsPaths = zkClient.getChildren(node.NODE_METRICS).toList.toArray.asInstanceOf[Array[String]]
|
||||
metricsPaths.flatMap { nodeName ⇒ getMetrics(nodeName, false).map((nodeName, _)) } toMap
|
||||
}
|
||||
|
||||
/*
|
||||
* Gets cached metrics of all nodes in the cluster
|
||||
*/
|
||||
def getAllMetrics: Array[NodeMetrics] = localNodeMetricsCache.values.asScala.toArray
|
||||
|
||||
/*
|
||||
* Refreshes locally cached metrics from ZooKeeper, and invokes plugged monitors
|
||||
*/
|
||||
private[akka] def refresh(): Unit = {
|
||||
|
||||
storeMetricsInZK(getLocalMetrics)
|
||||
refreshMetricsCacheFromZK()
|
||||
|
||||
if (isRunning) {
|
||||
Scheduler.schedule({ () ⇒ refresh() }, refreshTimeout.length, refreshTimeout.length, refreshTimeout.unit)
|
||||
invokeMonitors()
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Refreshes metrics manager cache from ZooKeeper
|
||||
*/
|
||||
private def refreshMetricsCacheFromZK(): Unit = {
|
||||
val allMetricsFromZK = getAllMetricsFromZK
|
||||
|
||||
localNodeMetricsCache.keySet.foreach { key ⇒
|
||||
if (!allMetricsFromZK.contains(key))
|
||||
localNodeMetricsCache.remove(key)
|
||||
}
|
||||
|
||||
// RACY: metrics for the node might have been removed both from ZK and local cache by the moment,
|
||||
// but will be re-cached, since they're still present in allMetricsFromZK snapshot. Not important, because
|
||||
// cache will be fixed soon, at the next iteration of refresh
|
||||
allMetricsFromZK map {
|
||||
case (node, metrics) ⇒
|
||||
localNodeMetricsCache.put(node, metrics)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Invokes monitors with the cached metrics
|
||||
*/
|
||||
private def invokeMonitors(): Unit = if (!alterationMonitors.isEmpty) {
|
||||
// RACY: metrics for some nodes might have been removed/added by that moment. Not important,
|
||||
// because monitors will be fed with up-to-date metrics shortly, at the next iteration of refresh
|
||||
val clusterNodesMetrics = getAllMetrics
|
||||
val localNodeMetrics = clusterNodesMetrics.find(_.nodeName == nodeAddress.nodeName)
|
||||
val iterator = alterationMonitors.iterator
|
||||
|
||||
// RACY: there might be new monitors added after the iterator has been obtained. Not important,
|
||||
// becuse refresh interval is meant to be very short, and all the new monitors will be called ad the
|
||||
// next refresh iteration
|
||||
while (iterator.hasNext) {
|
||||
|
||||
val monitor = iterator.next
|
||||
|
||||
monitor match {
|
||||
case localMonitor: LocalMetricsAlterationMonitor ⇒
|
||||
localNodeMetrics.map { metrics ⇒
|
||||
if (localMonitor reactsOn metrics)
|
||||
localMonitor react metrics
|
||||
}
|
||||
|
||||
case clusterMonitor: ClusterMetricsAlterationMonitor ⇒
|
||||
if (clusterMonitor reactsOn clusterNodesMetrics)
|
||||
clusterMonitor react clusterNodesMetrics
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
def stop() = _isRunning.switchOff
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,154 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.metrics
|
||||
|
||||
import akka.cluster._
|
||||
import akka.event.EventHandler
|
||||
import java.lang.management.ManagementFactory
|
||||
import akka.util.ReflectiveAccess._
|
||||
import akka.util.Switch
|
||||
|
||||
/*
|
||||
* Snapshot of the JVM / system that's the node is running on
|
||||
*
|
||||
* @param nodeName name of the node, where metrics are gathered at
|
||||
* @param usedHeapMemory amount of heap memory currently used
|
||||
* @param committedHeapMemory amount of heap memory guaranteed to be available
|
||||
* @param maxHeapMemory maximum amount of heap memory that can be used
|
||||
* @param avaiableProcessors number of the processors avalable to the JVM
|
||||
* @param systemLoadAverage system load average. If OS-specific Sigar's native library is plugged,
|
||||
* it's used to calculate average load on the CPUs in the system. Otherwise, value is retreived from monitoring
|
||||
* MBeans. Hyperic Sigar provides more precise values, and, thus, if the library is provided, it's used by default.
|
||||
*
|
||||
*/
|
||||
case class DefaultNodeMetrics(nodeName: String,
|
||||
usedHeapMemory: Long,
|
||||
committedHeapMemory: Long,
|
||||
maxHeapMemory: Long,
|
||||
avaiableProcessors: Int,
|
||||
systemLoadAverage: Double) extends NodeMetrics
|
||||
|
||||
object MetricsProvider {
|
||||
|
||||
/*
|
||||
* Maximum value of system load average
|
||||
*/
|
||||
val MAX_SYS_LOAD_AVG = 1
|
||||
|
||||
/*
|
||||
* Minimum value of system load average
|
||||
*/
|
||||
val MIN_SYS_LOAD_AVG = 0
|
||||
|
||||
/*
|
||||
* Default value of system load average
|
||||
*/
|
||||
val DEF_SYS_LOAD_AVG = 0.5
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* Abstracts metrics provider that returns metrics of the system the node is running at
|
||||
*/
|
||||
trait MetricsProvider {
|
||||
|
||||
/*
|
||||
* Gets metrics of the local system
|
||||
*/
|
||||
def getLocalMetrics: NodeMetrics
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* Loads JVM metrics through JMX monitoring beans
|
||||
*/
|
||||
class JMXMetricsProvider extends MetricsProvider {
|
||||
|
||||
import MetricsProvider._
|
||||
|
||||
private val memoryMXBean = ManagementFactory.getMemoryMXBean
|
||||
|
||||
private val osMXBean = ManagementFactory.getOperatingSystemMXBean
|
||||
|
||||
/*
|
||||
* Validates and calculates system load average
|
||||
*
|
||||
* @param avg system load average obtained from a specific monitoring provider (may be incorrect)
|
||||
* @return system load average, or default value(<code>0.5</code>), if passed value was out of permitted
|
||||
* bounds (0.0 to 1.0)
|
||||
*/
|
||||
@inline
|
||||
protected final def calcSystemLoadAverage(avg: Double) =
|
||||
if (avg >= MIN_SYS_LOAD_AVG && avg <= MAX_SYS_LOAD_AVG) avg else DEF_SYS_LOAD_AVG
|
||||
|
||||
protected def systemLoadAverage = calcSystemLoadAverage(osMXBean.getSystemLoadAverage)
|
||||
|
||||
def getLocalMetrics =
|
||||
DefaultNodeMetrics(Cluster.nodeAddress.nodeName,
|
||||
memoryMXBean.getHeapMemoryUsage.getUsed,
|
||||
memoryMXBean.getHeapMemoryUsage.getCommitted,
|
||||
memoryMXBean.getHeapMemoryUsage.getMax,
|
||||
osMXBean.getAvailableProcessors,
|
||||
systemLoadAverage)
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* Loads wider range of metrics of a better quality with Hyperic Sigar (native library)
|
||||
*
|
||||
* @param refreshTimeout Sigar gathers metrics during this interval
|
||||
*/
|
||||
class SigarMetricsProvider private (private val sigarInstance: AnyRef) extends JMXMetricsProvider {
|
||||
|
||||
private val reportErrors = new Switch(true)
|
||||
|
||||
private val getCpuPercMethod = sigarInstance.getClass.getMethod("getCpuPerc")
|
||||
private val sigarCpuCombinedMethod = getCpuPercMethod.getReturnType.getMethod("getCombined")
|
||||
|
||||
/*
|
||||
* Wraps reflective calls to Hyperic Sigar
|
||||
*
|
||||
* @param f reflective call to Hyperic Sigar
|
||||
* @param fallback function, which is invoked, if call to Sigar has been finished with exception
|
||||
*/
|
||||
private def callSigarMethodOrElse[T](callSigar: ⇒ T, fallback: ⇒ T): T =
|
||||
try callSigar catch {
|
||||
case thrw ⇒
|
||||
reportErrors.switchOff {
|
||||
EventHandler.warning(this, "Failed to get metrics from Hyperic Sigar. %s: %s"
|
||||
.format(thrw.getClass.getName, thrw.getMessage))
|
||||
}
|
||||
fallback
|
||||
}
|
||||
|
||||
/*
|
||||
* Obtains system load average from Sigar
|
||||
* If the value cannot be obtained, falls back to system load average taken from JMX
|
||||
*/
|
||||
override def systemLoadAverage = callSigarMethodOrElse(
|
||||
calcSystemLoadAverage(sigarCpuCombinedMethod
|
||||
.invoke(getCpuPercMethod.invoke(sigarInstance)).asInstanceOf[Double]),
|
||||
super.systemLoadAverage)
|
||||
|
||||
}
|
||||
|
||||
object SigarMetricsProvider {
|
||||
|
||||
/*
|
||||
* Instantiates Sigar metrics provider through reflections, in order to avoid creating dependencies to
|
||||
* Hiperic Sigar library
|
||||
*/
|
||||
def apply(refreshTimeout: Int): Either[Throwable, MetricsProvider] = try {
|
||||
for {
|
||||
sigarInstance ← createInstance[AnyRef]("org.hyperic.sigar.Sigar", noParams, noArgs).right
|
||||
sigarProxyCacheClass: Class[_] ← getClassFor("org.hyperic.sigar.SigarProxyCache").right
|
||||
} yield new SigarMetricsProvider(sigarProxyCacheClass
|
||||
.getMethod("newInstance", Array(sigarInstance.getClass, classOf[Int]): _*)
|
||||
.invoke(null, sigarInstance, new java.lang.Integer(refreshTimeout)))
|
||||
} catch {
|
||||
case thrw ⇒ Left(thrw)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,4 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handlers = ["akka.testkit.TestEventListener"]
|
||||
akka.event-handler-level = "WARNING"
|
||||
akka.cluster.metrics-refresh-timeout = 1
|
||||
|
|
@ -0,0 +1 @@
|
|||
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
|
||||
|
|
@ -0,0 +1,134 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.metrics.local
|
||||
|
||||
import akka.cluster._
|
||||
import akka.actor._
|
||||
import Actor._
|
||||
import Cluster._
|
||||
import akka.dispatch._
|
||||
import akka.util.Duration
|
||||
import akka.util.duration._
|
||||
import akka.cluster.metrics._
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
object LocalMetricsMultiJvmSpec {
|
||||
val NrOfNodes = 1
|
||||
}
|
||||
|
||||
class LocalMetricsMultiJvmNode1 extends MasterClusterTestNode {
|
||||
|
||||
import LocalMetricsMultiJvmSpec._
|
||||
|
||||
val testNodes = NrOfNodes
|
||||
|
||||
override def beforeAll = {
|
||||
super.beforeAll()
|
||||
node
|
||||
}
|
||||
|
||||
override def afterAll = {
|
||||
node.shutdown()
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
"Metrics manager" must {
|
||||
|
||||
def timeout = node.metricsManager.refreshTimeout
|
||||
|
||||
"be initialized with refresh timeout value, specified in akka.conf" in {
|
||||
timeout must be(1.second)
|
||||
}
|
||||
|
||||
"return up-to-date local node metrics straight from MBeans/Sigar" in {
|
||||
node.metricsManager.getLocalMetrics must not be (null)
|
||||
|
||||
node.metricsManager.getLocalMetrics.systemLoadAverage must be(0.5 plusOrMinus 0.5)
|
||||
}
|
||||
|
||||
"return metrics cached in the MetricsManagerLocalMetrics" in {
|
||||
node.metricsManager.getMetrics(nodeAddress.nodeName) must not be (null)
|
||||
}
|
||||
|
||||
"return local node metrics from ZNode" in {
|
||||
node.metricsManager.getMetrics(nodeAddress.nodeName, false) must not be (null)
|
||||
}
|
||||
|
||||
"return cached metrics of all nodes in the cluster" in {
|
||||
node.metricsManager.getAllMetrics.size must be(1)
|
||||
node.metricsManager.getAllMetrics.find(_.nodeName == "node1") must not be (null)
|
||||
}
|
||||
|
||||
"throw no exceptions, when user attempts to get metrics of a non-existing node" in {
|
||||
node.metricsManager.getMetrics("nonexisting") must be(None)
|
||||
node.metricsManager.getMetrics("nonexisting", false) must be(None)
|
||||
}
|
||||
|
||||
"regularly update cached metrics" in {
|
||||
val oldMetrics = node.metricsManager.getLocalMetrics
|
||||
Thread sleep timeout.toMillis
|
||||
node.metricsManager.getLocalMetrics must not be (oldMetrics)
|
||||
}
|
||||
|
||||
"allow to track JVM state and bind handles through MetricsAlterationMonitors" in {
|
||||
val monitorReponse = new DefaultPromise[String]
|
||||
|
||||
node.metricsManager.addMonitor(new LocalMetricsAlterationMonitor {
|
||||
|
||||
val id = "heapMemoryThresholdMonitor"
|
||||
|
||||
def reactsOn(metrics: NodeMetrics) = metrics.usedHeapMemory > 1
|
||||
|
||||
def react(metrics: NodeMetrics) = monitorReponse.completeWithResult("Too much memory is used!")
|
||||
|
||||
})
|
||||
|
||||
monitorReponse.get must be("Too much memory is used!")
|
||||
|
||||
}
|
||||
|
||||
class FooMonitor(monitorWorked: AtomicInteger) extends LocalMetricsAlterationMonitor {
|
||||
val id = "fooMonitor"
|
||||
def reactsOn(metrics: NodeMetrics) = true
|
||||
def react(metrics: NodeMetrics) = monitorWorked.set(monitorWorked.get + 1)
|
||||
}
|
||||
|
||||
"allow to unregister the monitor" in {
|
||||
|
||||
val monitorWorked = new AtomicInteger(0)
|
||||
val fooMonitor = new FooMonitor(monitorWorked)
|
||||
|
||||
node.metricsManager.addMonitor(fooMonitor)
|
||||
node.metricsManager.removeMonitor(fooMonitor)
|
||||
|
||||
val oldValue = monitorWorked.get
|
||||
Thread sleep timeout.toMillis
|
||||
monitorWorked.get must be(oldValue)
|
||||
|
||||
}
|
||||
|
||||
"stop notifying monitors, when stopped" in {
|
||||
|
||||
node.metricsManager.stop()
|
||||
|
||||
val monitorWorked = new AtomicInteger(0)
|
||||
|
||||
node.metricsManager.addMonitor(new LocalMetricsAlterationMonitor {
|
||||
val id = "fooMonitor"
|
||||
def reactsOn(metrics: NodeMetrics) = true
|
||||
def react(metrics: NodeMetrics) = monitorWorked.set(monitorWorked.get + 1)
|
||||
})
|
||||
|
||||
monitorWorked.get must be(0)
|
||||
|
||||
node.metricsManager.start()
|
||||
Thread sleep (timeout.toMillis * 2)
|
||||
monitorWorked.get must be > (1)
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handlers = ["akka.testkit.TestEventListener"]
|
||||
akka.event-handler-level = "WARNING"
|
||||
|
|
@ -0,0 +1 @@
|
|||
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handlers = ["akka.testkit.TestEventListener"]
|
||||
akka.event-handler-level = "WARNING"
|
||||
|
|
@ -0,0 +1 @@
|
|||
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
|
||||
|
|
@ -0,0 +1,133 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.metrics.remote
|
||||
|
||||
import akka.cluster._
|
||||
import akka.actor._
|
||||
import Actor._
|
||||
import Cluster._
|
||||
import akka.dispatch._
|
||||
import akka.util.Duration
|
||||
import akka.util.duration._
|
||||
import akka.cluster.metrics._
|
||||
import java.util.concurrent._
|
||||
import atomic.AtomicInteger
|
||||
|
||||
object RemoteMetricsMultiJvmSpec {
|
||||
val NrOfNodes = 2
|
||||
|
||||
val MetricsRefreshTimeout = 100.millis
|
||||
}
|
||||
|
||||
class AllMetricsAvailableMonitor(_id: String, completionLatch: CountDownLatch, clusterSize: Int) extends ClusterMetricsAlterationMonitor {
|
||||
|
||||
val id = _id
|
||||
|
||||
def reactsOn(allMetrics: Array[NodeMetrics]) = allMetrics.size == clusterSize
|
||||
|
||||
def react(allMetrics: Array[NodeMetrics]) = completionLatch.countDown
|
||||
|
||||
}
|
||||
|
||||
class RemoteMetricsMultiJvmNode1 extends MasterClusterTestNode {
|
||||
|
||||
import RemoteMetricsMultiJvmSpec._
|
||||
|
||||
val testNodes = NrOfNodes
|
||||
|
||||
"Metrics manager" must {
|
||||
"provide metrics of all nodes in the cluster" in {
|
||||
|
||||
val allMetricsAvaiable = new CountDownLatch(1)
|
||||
|
||||
node.metricsManager.refreshTimeout = MetricsRefreshTimeout
|
||||
node.metricsManager.addMonitor(new AllMetricsAvailableMonitor("all-metrics-available", allMetricsAvaiable, NrOfNodes))
|
||||
|
||||
LocalCluster.barrier("node-start", NrOfNodes).await()
|
||||
|
||||
allMetricsAvaiable.await()
|
||||
|
||||
LocalCluster.barrier("check-all-remote-metrics", NrOfNodes) {
|
||||
node.metricsManager.getAllMetrics.size must be(2)
|
||||
}
|
||||
|
||||
val cachedMetrics = node.metricsManager.getMetrics("node2")
|
||||
val metricsFromZnode = node.metricsManager.getMetrics("node2", false)
|
||||
|
||||
LocalCluster.barrier("check-single-remote-metrics", NrOfNodes) {
|
||||
cachedMetrics must not be (null)
|
||||
metricsFromZnode must not be (null)
|
||||
}
|
||||
|
||||
Thread sleep MetricsRefreshTimeout.toMillis
|
||||
|
||||
LocalCluster.barrier("remote-metrics-is-updated", NrOfNodes) {
|
||||
node.metricsManager.getMetrics("node2") must not be (cachedMetrics)
|
||||
node.metricsManager.getMetrics("node2", false) must not be (metricsFromZnode)
|
||||
}
|
||||
|
||||
val someMetricsGone = new CountDownLatch(1)
|
||||
node.metricsManager.addMonitor(new AllMetricsAvailableMonitor("some-metrics-gone", someMetricsGone, 1))
|
||||
|
||||
LocalCluster.barrier("some-nodes-leave", NrOfNodes).await()
|
||||
|
||||
someMetricsGone.await(10, TimeUnit.SECONDS) must be(true)
|
||||
|
||||
node.metricsManager.getMetrics("node2") must be(None)
|
||||
node.metricsManager.getMetrics("node2", false) must be(None)
|
||||
node.metricsManager.getAllMetrics.size must be(1)
|
||||
|
||||
node.shutdown()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class RemoteMetricsMultiJvmNode2 extends ClusterTestNode {
|
||||
|
||||
import RemoteMetricsMultiJvmSpec._
|
||||
|
||||
val testNodes = NrOfNodes
|
||||
|
||||
"Metrics manager" must {
|
||||
"provide metrics of all nodes in the cluster" in {
|
||||
|
||||
val allMetricsAvaiable = new CountDownLatch(1)
|
||||
|
||||
node.metricsManager.refreshTimeout = MetricsRefreshTimeout
|
||||
node.metricsManager.addMonitor(new AllMetricsAvailableMonitor("all-metrics-available", allMetricsAvaiable, NrOfNodes))
|
||||
|
||||
LocalCluster.barrier("node-start", NrOfNodes).await()
|
||||
|
||||
allMetricsAvaiable.await()
|
||||
|
||||
LocalCluster.barrier("check-all-remote-metrics", NrOfNodes) {
|
||||
node.metricsManager.getAllMetrics.size must be(2)
|
||||
}
|
||||
|
||||
val cachedMetrics = node.metricsManager.getMetrics("node1")
|
||||
val metricsFromZnode = node.metricsManager.getMetrics("node1", false)
|
||||
|
||||
LocalCluster.barrier("check-single-remote-metrics", NrOfNodes) {
|
||||
cachedMetrics must not be (null)
|
||||
metricsFromZnode must not be (null)
|
||||
}
|
||||
|
||||
Thread sleep MetricsRefreshTimeout.toMillis
|
||||
|
||||
LocalCluster.barrier("remote-metrics-is-updated", NrOfNodes) {
|
||||
node.metricsManager.getMetrics("node1") must not be (cachedMetrics)
|
||||
node.metricsManager.getMetrics("node1", false) must not be (metricsFromZnode)
|
||||
}
|
||||
|
||||
LocalCluster.barrier("some-nodes-leave", NrOfNodes) {
|
||||
node.shutdown()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -31,6 +31,20 @@ Akka uses Git and is hosted at `Github <http://github.com>`_.
|
|||
`Maven Repository <http://akka.io/repository/>`_
|
||||
================================================
|
||||
|
||||
`<http://akka.io/repository>`_
|
||||
The Akka Maven repository can be found at `<http://akka.io/repository>`_.
|
||||
|
||||
Typesafe provides `<http://repo.typesafe.com/typesafe/releases/>`_ that proxies several other repositories, including akka.io.
|
||||
It is convenient to use the Typesafe repository, since it includes all external dependencies of Akka.
|
||||
It is a "best-effort" service, and if it is unavailable you may need to use the underlying repositories
|
||||
directly.
|
||||
|
||||
* http://akka.io/repository
|
||||
* http://repository.codehaus.org
|
||||
* http://guiceyfruit.googlecode.com/svn/repo/releases/
|
||||
* http://repository.jboss.org/nexus/content/groups/public/
|
||||
* http://download.java.net/maven/2
|
||||
* http://oss.sonatype.org/content/repositories/releases
|
||||
* http://download.java.net/maven/glassfish
|
||||
* http://databinder.net/repo
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue