diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index e2fe6cc8d1..f9bed30607 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -100,6 +100,23 @@ akka { max-sample-size = 1000 } + # Uses JMX and Hyperic SIGAR, if SIGAR is on the classpath. + metrics { + # Enable or disable metrics collector for load-balancing nodes. + enabled = on + + # How often metrics is sampled on a node. + metrics-interval = 3s + + # How often a node publishes metrics information. + gossip-interval = 3s + + # How quickly the exponential weighting of past data is decayed compared to new data. + # If set to 0 data streaming over time will be turned off. + # Set higher to increase the bias toward newer values + rate-of-decay = 10 + } + # If the tick-duration of the default scheduler is longer than the tick-duration # configured here a dedicated scheduler will be used for periodic tasks of the cluster, # otherwise the default scheduler is used. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 3d198572c9..a2d88b55ad 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -7,7 +7,7 @@ import scala.collection.immutable.SortedSet import scala.concurrent.util.{ Deadline, Duration } import scala.concurrent.util.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom -import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, PoisonPill, Scheduler } +import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, Scheduler } import akka.actor.Status.Failure import akka.event.EventStream import akka.pattern.ask @@ -94,6 +94,8 @@ private[cluster] object InternalClusterAction { case object ReapUnreachableTick extends Tick + case object MetricsTick extends Tick + case object LeaderActionsTick extends Tick case object PublishStatsTick extends Tick @@ -152,6 +154,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac withDispatcher(context.props.dispatcher), name = "core") val heartbeat = context.actorOf(Props[ClusterHeartbeatDaemon]. withDispatcher(context.props.dispatcher), name = "heartbeat") + if (settings.MetricsEnabled) context.actorOf(Props[ClusterMetricsCollector]. + withDispatcher(context.props.dispatcher), name = "metrics") def receive = { case InternalClusterAction.GetClusterCoreRef ⇒ sender ! core @@ -214,8 +218,8 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging { self ! LeaderActionsTick } - // start periodic publish of current state - private val publishStateTask: Option[Cancellable] = + // start periodic publish of current stats + private val publishStatsTask: Option[Cancellable] = if (PublishStatsInterval == Duration.Zero) None else Some(FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval).asInstanceOf[FiniteDuration], PublishStatsInterval) { self ! PublishStatsTick @@ -230,7 +234,7 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging { heartbeatTask.cancel() failureDetectorReaperTask.cancel() leaderActionsTask.cancel() - publishStateTask foreach { _.cancel() } + publishStatsTask foreach { _.cancel() } } def uninitialized: Actor.Receive = { @@ -875,7 +879,7 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: IndexedSeq[Address]) case JoinSeedNode ⇒ // send InitJoin to all seed nodes (except myself) seedNodes.collect { - case a if a != selfAddress ⇒ context.system.actorFor(context.parent.path.toStringWithAddress(a)) + case a if a != selfAddress ⇒ context.actorFor(context.parent.path.toStringWithAddress(a)) } foreach { _ ! InitJoin } case InitJoinAck(address) ⇒ // first InitJoinAck reply @@ -904,7 +908,7 @@ private[cluster] final class ClusterCoreSender extends Actor with ActorLogging { * Looks up and returns the remote cluster command connection for the specific address. */ private def clusterCoreConnectionFor(address: Address): ActorRef = - context.system.actorFor(RootActorPath(address) / "system" / "cluster" / "core") + context.actorFor(RootActorPath(address) / "system" / "cluster" / "core") def receive = { case SendClusterMessage(to, msg) ⇒ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 1747df4dbb..3ccf32307b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -88,6 +88,11 @@ object ClusterEvent { if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member) } + /** + * Current snapshot of cluster member metrics. Published to subscribers. + */ + case class ClusterMetricsChanged(nodes: Set[NodeMetrics]) extends ClusterDomainEvent + /** * Cluster convergence state changed. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index a7b4d52daa..b48c9f066b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -62,7 +62,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg * Looks up and returns the remote cluster heartbeat connection for the specific address. */ def clusterHeartbeatConnectionFor(address: Address): ActorRef = - context.system.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat") + context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat") val digester = MessageDigest.getInstance("MD5") diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala new file mode 100644 index 0000000000..1e0c314f21 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -0,0 +1,581 @@ +/* + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import scala.language.postfixOps +import scala.concurrent.util.duration._ +import scala.concurrent.util.FiniteDuration +import scala.collection.immutable.{ SortedSet, Map } +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.util.{ Try, Success, Failure } +import scala.math.ScalaNumber +import scala.runtime.{ RichLong, RichDouble, RichInt } + +import akka.actor._ +import akka.event.LoggingAdapter +import akka.cluster.MemberStatus.Up + +import java.lang.management.{ OperatingSystemMXBean, MemoryMXBean, ManagementFactory } +import java.lang.reflect.Method +import java.lang.System.{ currentTimeMillis ⇒ newTimestamp } + +/** + * This strategy is primarily for load-balancing of nodes. It controls metrics sampling + * at a regular frequency, prepares highly variable data for further analysis by other entities, + * and publishes the latest cluster metrics data around the node ring to assist in determining + * the need to redirect traffic to the least-loaded nodes. + * + * Metrics sampling is delegated to the [[akka.cluster.MetricsCollector]]. + * + * Calculation of statistical data for each monitored process is delegated to the + * [[akka.cluster.DataStream]] for exponential smoothing, with additional decay factor. + * + * INTERNAL API. + * + * @author Helena Edelson + */ +private[cluster] class ClusterMetricsCollector extends Actor with ActorLogging { + + import InternalClusterAction._ + import ClusterEvent._ + import Member.addressOrdering + import context.dispatcher + val cluster = Cluster(context.system) + import cluster.{ selfAddress, scheduler, settings } + import settings._ + + /** + * The node ring gossipped that contains only members that are Up. + */ + var nodes: SortedSet[Address] = SortedSet.empty + + /** + * The latest metric values with their statistical data. + */ + var latestGossip: MetricsGossip = MetricsGossip(MetricsRateOfDecay) + + /** + * The metrics collector that samples data on the node. + */ + val collector: MetricsCollector = MetricsCollector(selfAddress, log, context.system.asInstanceOf[ExtendedActorSystem].dynamicAccess) + + /** + * Start periodic gossip to random nodes in cluster + */ + val gossipTask = FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(MetricsGossipInterval).asInstanceOf[FiniteDuration], MetricsGossipInterval) { + self ! GossipTick + } + + /** + * Start periodic metrics collection + */ + val metricsTask = FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(MetricsInterval).asInstanceOf[FiniteDuration], MetricsInterval) { + self ! MetricsTick + } + + override def preStart(): Unit = { + cluster.subscribe(self, classOf[MemberEvent]) + log.info("Metrics collection has started successfully on node [{}]", selfAddress) + } + + def receive = { + case GossipTick ⇒ gossip() + case MetricsTick ⇒ collect() + case state: CurrentClusterState ⇒ receiveState(state) + case MemberUp(m) ⇒ receiveMember(m) + case e: MemberEvent ⇒ removeMember(e) + case msg: MetricsGossipEnvelope ⇒ receiveGossip(msg) + } + + override def postStop: Unit = { + cluster unsubscribe self + gossipTask.cancel() + metricsTask.cancel() + collector.close() + } + + /** + * Adds a member to the node ring. + */ + def receiveMember(member: Member): Unit = nodes += member.address + + /** + * Removes a member from the member node ring. + */ + def removeMember(event: MemberEvent): Unit = { + nodes -= event.member.address + latestGossip = latestGossip remove event.member.address + } + + /** + * Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus.Up]]. + */ + def receiveState(state: CurrentClusterState): Unit = nodes = state.members collect { case m if m.status == Up ⇒ m.address } + + /** + * Samples the latest metrics for the node, updates metrics statistics in + * [[akka.cluster.MetricsGossip]], and publishes the change to the event bus. + * + * INTERNAL API + * + * @see [[akka.cluster.ClusterMetricsCollector.collect( )]] + */ + def collect(): Unit = { + latestGossip :+= collector.sample + publish() + } + + /** + * Receives changes from peer nodes, merges remote with local gossip nodes, then publishes + * changes to the event stream for load balancing router consumption, and gossips to peers. + */ + def receiveGossip(envelope: MetricsGossipEnvelope): Unit = { + val remoteGossip = envelope.gossip + + if (remoteGossip != latestGossip) { + latestGossip = latestGossip merge remoteGossip + publish() + gossipTo(envelope.from) + } + } + + /** + * Gossip to peer nodes. + */ + def gossip(): Unit = selectRandomNode((nodes - selfAddress).toIndexedSeq) foreach gossipTo + + def gossipTo(address: Address): Unit = + context.actorFor(self.path.toStringWithAddress(address)) ! MetricsGossipEnvelope(selfAddress, latestGossip) + + def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = + if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current nextInt addresses.size)) + + /** + * Publishes to the event stream. + */ + def publish(): Unit = context.system.eventStream publish ClusterMetricsChanged(latestGossip.nodes) + +} + +/** + * INTERNAL API + * + * @param nodes metrics per node + * @author Helena Edelson + */ +private[cluster] case class MetricsGossip(rateOfDecay: Int, nodes: Set[NodeMetrics] = Set.empty) { + + /** + * Removes nodes if their correlating node ring members are not [[akka.cluster.MemberStatus.Up]] + */ + def remove(node: Address): MetricsGossip = copy(nodes = nodes filterNot (_.address == node)) + + /** + * Adds new remote [[akka.cluster.NodeMetrics]] and merges existing from a remote gossip. + */ + def merge(remoteGossip: MetricsGossip): MetricsGossip = { + val remoteNodes = remoteGossip.nodes.map(n ⇒ n.address -> n).toMap + val toMerge = nodeKeys intersect remoteNodes.keySet + val onlyInRemote = remoteNodes.keySet -- nodeKeys + val onlyInLocal = nodeKeys -- remoteNodes.keySet + + val seen = nodes.collect { + case n if toMerge contains n.address ⇒ n merge remoteNodes(n.address) + case n if onlyInLocal contains n.address ⇒ n + } + + val unseen = remoteGossip.nodes.collect { case n if onlyInRemote contains n.address ⇒ n } + + copy(nodes = seen ++ unseen) + } + + /** + * Adds new local [[akka.cluster.NodeMetrics]] and initializes the data, or merges an existing. + */ + def :+(data: NodeMetrics): MetricsGossip = { + val previous = metricsFor(data) + val names = previous map (_.name) + + val (toMerge: Set[Metric], unseen: Set[Metric]) = data.metrics partition (a ⇒ names contains a.name) + val initialized = unseen.map(_.initialize(rateOfDecay)) + val merged = toMerge flatMap (latest ⇒ previous.collect { case peer if latest same peer ⇒ peer :+ latest }) + + val refreshed = nodes filterNot (_.address == data.address) + copy(nodes = refreshed + data.copy(metrics = initialized ++ merged)) + } + + /** + * Returns a set of [[akka.actor.Address]] for a given node set. + */ + def nodeKeys: Set[Address] = nodes map (_.address) + + /** + * Returns metrics for a node if exists. + */ + def metricsFor(node: NodeMetrics): Set[Metric] = nodes flatMap (n ⇒ if (n same node) n.metrics else Set.empty[Metric]) + +} + +/** + * Envelope adding a sender address to the gossip. + * INTERNAL API + */ +private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip) extends ClusterMessage + +/** + * The exponentially weighted moving average (EWMA) approach captures short-term + * movements in volatility for a conditional volatility forecasting model. By virtue + * of its alpha, or decay factor, this provides a statistical streaming data model + * that is exponentially biased towards newer entries. + * + * An EWMA only needs the most recent forecast value to be kept, as opposed to a standard + * moving average model. + * + * INTERNAL API + * + * @param decay sets how quickly the exponential weighting decays for past data compared to new data + * + * @param ewma the current exponentially weighted moving average, e.g. Y(n - 1), or, + * the sampled value resulting from the previous smoothing iteration. + * This value is always used as the previous EWMA to calculate the new EWMA. + * + * @param timestamp the most recent time of sampling + * + * @param startTime the time of initial sampling for this data stream + * + * @author Helena Edelson + */ +private[cluster] case class DataStream(decay: Int, ewma: ScalaNumber, startTime: Long, timestamp: Long) + extends ClusterMessage with MetricNumericConverter { + + /** + * The rate at which the weights of past observations + * decay as they become more distant. + */ + private val α = 2 / decay + 1 + + /** + * Calculates the exponentially weighted moving average for a given monitored data set. + * The datam can be too large to fit into an int or long, thus we use ScalaNumber, + * and defer to BigInt or BigDecimal. + * + * @param xn the new data point + * @return an new [[akka.cluster.DataStream]] with the updated yn and timestamp + */ + def :+(xn: ScalaNumber): DataStream = convert(xn) fold ( + nl ⇒ copy(ewma = BigInt(α * nl + 1 - α * ewma.longValue()), timestamp = newTimestamp), + nd ⇒ copy(ewma = BigDecimal(α * nd + 1 - α * ewma.doubleValue()), timestamp = newTimestamp)) + + /** + * The duration of observation for this data stream + */ + def duration: FiniteDuration = (timestamp - startTime) millis + +} + +/** + * Companion object of DataStream class. + * + * @author Helena Edelson + */ +private[cluster] object DataStream { + + def apply(decay: Int, data: ScalaNumber): Option[DataStream] = if (decay > 0) + Some(DataStream(decay, data, newTimestamp, newTimestamp)) else None + +} + +/** + * INTERNAL API + * + * @param name the metric name + * + * @param value the metric value, which may or may not be defined + * + * @param average the data stream of the metric value, for trending over time. Metrics that are already + * averages (e.g. system load average) or finite (e.g. as total cores), are not trended. + * + * @author Helena Edelson + */ +private[cluster] case class Metric(name: String, value: Option[ScalaNumber], average: Option[DataStream]) + extends ClusterMessage with MetricNumericConverter { + + /** + * Returns the metric with a new data stream for data trending if eligible, + * otherwise returns the unchanged metric. + */ + def initialize(decay: Int): Metric = if (initializable) copy(average = DataStream(decay, value.get)) else this + + /** + * If defined ( [[akka.cluster.MetricNumericConverter.defined()]] ), updates the new + * data point, and if defined, updates the data stream. Returns the updated metric. + */ + def :+(latest: Metric): Metric = latest.value match { + case Some(v) if this same latest ⇒ average match { + case Some(previous) ⇒ copy(value = Some(v), average = Some(previous :+ v)) + case None if latest.average.isDefined ⇒ copy(value = Some(v), average = latest.average) + case None if !latest.average.isDefined ⇒ copy(value = Some(v)) + } + case None ⇒ this + } + + /** + * @see [[akka.cluster.MetricNumericConverter.defined()]] + */ + def isDefined: Boolean = value match { + case Some(a) ⇒ defined(a) + case None ⇒ false + } + + /** + * Returns true if that is tracking the same metric as this. + */ + def same(that: Metric): Boolean = name == that.name + + /** + * Returns true if the metric requires initialization. + */ + def initializable: Boolean = trendable && isDefined && average.isEmpty + + /** + * Returns true if the metric is a value applicable for trending. + */ + def trendable: Boolean = !(Metric.noStream contains name) + +} + +/** + * Companion object of Metric class. + * + * @author Helena Edelson + */ +private[cluster] object Metric extends MetricNumericConverter { + + /** + * The metrics that are already averages or finite are not trended over time. + */ + private val noStream = Set("system-load-average", "total-cores", "processors") + + /** + * Evaluates validity of value based on whether it is available (SIGAR on classpath) + * or defined for the OS (JMX). If undefined we set the value option to None and do not modify + * the latest sampled metric to avoid skewing the statistical trend. + */ + def apply(name: String, value: Option[ScalaNumber]): Metric = value match { + case Some(v) if defined(v) ⇒ Metric(name, value, None) + case _ ⇒ Metric(name, None, None) + } + +} + +/** + * The snapshot of current sampled health metrics for any monitored process. + * Collected and gossipped at regular intervals for dynamic cluster management strategies. + * + * For the JVM memory. The amount of used and committed memory will always be <= max if max is defined. + * A memory allocation may fail if it attempts to increase the used memory such that used > committed + * even if used <= max is true (e.g. when the system virtual memory is low). + * + * The system is possibly nearing a bottleneck if the system load average is nearing in cpus/cores. + * + * @param address [[akka.actor.Address]] of the node the metrics are gathered at + * + * @param timestamp the time of sampling + * + * @param metrics the array of sampled [[akka.actor.Metric]] + * + * @author Helena Edelson + */ +private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) extends ClusterMessage { + + /** + * Returns the most recent data. + */ + def merge(that: NodeMetrics): NodeMetrics = if (this updatable that) copy(metrics = that.metrics, timestamp = that.timestamp) else this + + /** + * Returns true if that address is the same as this and its metric set is more recent. + */ + def updatable(that: NodeMetrics): Boolean = (this same that) && (that.timestamp > timestamp) + + /** + * Returns true if that address is the same as this + */ + def same(that: NodeMetrics): Boolean = address == that.address + +} + +/** + * Encapsulates evaluation of validity of metric values, conversion of an actual metric value to + * a [[akka.cluster.Metric]] for consumption by subscribed cluster entities. + * + * INTERNAL API + * + * @author Helena Edelson + */ +private[cluster] trait MetricNumericConverter { + + /** + * A defined value is neither a -1 or NaN/Infinite: + * + */ + def defined(value: ScalaNumber): Boolean = convert(value) fold (a ⇒ value != -1, b ⇒ !(b.isNaN || b.isInfinite)) + + /** + * May involve rounding or truncation. + */ + def convert(from: ScalaNumber): Either[Long, Double] = from match { + case n: BigInt ⇒ Left(n.longValue()) + case n: BigDecimal ⇒ Right(n.doubleValue()) + case n: RichInt ⇒ Left(n.abs) + case n: RichLong ⇒ Left(n.self) + case n: RichDouble ⇒ Right(n.self) + } + +} + +/** + * Loads JVM metrics through JMX monitoring beans. If Hyperic SIGAR is on the classpath, this + * loads wider and more accurate range of metrics in combination with SIGAR's native OS library. + * + * FIXME switch to Scala reflection + * + * INTERNAL API + * + * @param sigar the optional org.hyperic.Sigar instance + * + * @param address The [[akka.actor.Address]] of the node being sampled + * + * @author Helena Edelson + */ +private[cluster] class MetricsCollector private (private val sigar: Option[AnyRef], address: Address) extends MetricNumericConverter { + + private val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean + + private val osMBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean + + private val LoadAverage: Option[Method] = createMethodFrom(sigar, "getLoadAverage") + + private val CpuList: Option[Method] = createMethodFrom(sigar, "getCpuInfoList").map(m ⇒ m) + + private val NetInterfaces: Option[Method] = createMethodFrom(sigar, "getNetInterfaceList") + + private val Cpu: Option[Method] = createMethodFrom(sigar, "getCpuPerc") + + private val CombinedCpu: Option[Method] = Try(Cpu.get.getReturnType.getMethod("getCombined")).toOption + + /** + * Samples and collects new data points. + * + * @return [[akka.cluster.NodeMetrics]] + */ + def sample: NodeMetrics = NodeMetrics(address, newTimestamp, Set(cpuCombined, totalCores, + systemLoadAverage, used, committed, max, processors, networkMaxRx, networkMaxTx)) + + /** + * (SIGAR / JMX) Returns the OS-specific average system load on the CPUs in the system, for the past 1 minute. + * On some systems the JMX OS system load average may not be available, in which case a -1 is returned. + * Hyperic SIGAR provides more precise values, thus, if the library is on the classpath, it is the default. + */ + def systemLoadAverage: Metric = Metric("system-load-average", Some(BigDecimal(Try( + LoadAverage.get.invoke(sigar.get).asInstanceOf[Array[Double]].toSeq.head) getOrElse osMBean.getSystemLoadAverage))) + + /** + * (JMX) Returns the number of available processors + */ + def processors: Metric = Metric("processors", Some(BigInt(osMBean.getAvailableProcessors))) + + /** + * (JMX) Returns the current sum of heap memory used from all heap memory pools (in bytes). + */ + def used: Metric = Metric("heap-memory-used", Some(BigInt(memoryMBean.getHeapMemoryUsage.getUsed))) + + /** + * (JMX) Returns the current sum of heap memory guaranteed to be available to the JVM + * from all heap memory pools (in bytes). Committed will always be greater + * than or equal to used. + */ + def committed: Metric = Metric("heap-memory-committed", Some(BigInt(memoryMBean.getHeapMemoryUsage.getCommitted))) + + /** + * (JMX) Returns the maximum amount of memory (in bytes) that can be used + * for JVM memory management. If undefined, returns -1. + */ + def max: Metric = Metric("heap-memory-max", Some(BigInt(memoryMBean.getHeapMemoryUsage.getMax))) + + /** + * (SIGAR) Returns the combined CPU sum of User + Sys + Nice + Wait, in percentage. This metric can describe + * the amount of time the CPU spent executing code during n-interval and how much more it could + * theoretically. Note that 99% CPU utilization can be optimal or indicative of failure. + * + * In the data stream, this will sometimes return with a valid metric value, and sometimes as a NaN or Infinite. + * Documented bug https://bugzilla.redhat.com/show_bug.cgi?id=749121 and several others. + */ + def cpuCombined: Metric = Metric("cpu-combined", Try(BigDecimal(CombinedCpu.get.invoke(Cpu.get.invoke(sigar.get)).asInstanceOf[Double])).toOption) + + /** + * (SIGAR) Returns the total number of cores. + */ + def totalCores: Metric = Metric("total-cores", Try(BigInt(CpuList.get.invoke(sigar.get).asInstanceOf[Array[AnyRef]].map(cpu ⇒ + createMethodFrom(Some(cpu), "getTotalCores").get.invoke(cpu).asInstanceOf[Int]).head)).toOption) + //Array[Int].head - if this would differ on some servers, expose all. In testing each int was always equal. + + /** + * (SIGAR) Returns the max network IO read/write value, in bytes, for network latency evaluation. + */ + def networkMaxRx: Metric = networkMaxFor("getRxBytes", "network-max-rx") + + /** + * (SIGAR) Returns the max network IO tx value, in bytes. + */ + def networkMaxTx: Metric = networkMaxFor("getTxBytes", "network-max-tx") + + /** + * Returns the network stats per interface. + */ + def networkStats: Map[String, AnyRef] = Try(NetInterfaces.get.invoke(sigar.get).asInstanceOf[Array[String]].map(arg ⇒ + arg -> (createMethodFrom(sigar, "getNetInterfaceStat", Array(classOf[String])).get.invoke(sigar.get, arg))).toMap) getOrElse Map.empty[String, AnyRef] + + /** + * Returns true if SIGAR is successfully installed on the classpath, otherwise false. + */ + def isSigar: Boolean = sigar.isDefined + + /** + * Releases any native resources associated with this instance. + */ + def close(): Unit = if (isSigar) Try(createMethodFrom(sigar, "close").get.invoke(sigar.get)) getOrElse Unit + + /** + * Returns the max bytes for the given method in metric for metric from the network interface stats. + */ + private def networkMaxFor(method: String, metric: String): Metric = Metric(metric, Try(Some(BigInt( + networkStats.collect { case (_, a) ⇒ createMethodFrom(Some(a), method).get.invoke(a).asInstanceOf[Long] }.max))) getOrElse None) + + private def createMethodFrom(ref: Option[AnyRef], method: String, types: Array[(Class[_])] = Array.empty[(Class[_])]): Option[Method] = + Try(ref.get.getClass.getMethod(method, types: _*)).toOption + +} + +/** + * Companion object of MetricsCollector class. + * + * @author Helena Edelson + */ +private[cluster] object MetricsCollector { + def apply(address: Address, log: LoggingAdapter, dynamicAccess: DynamicAccess): MetricsCollector = + dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Seq.empty) match { + case Success(identity) ⇒ new MetricsCollector(Some(identity), address) + case Failure(e) ⇒ + log.debug(e.toString) + log.info("Hyperic SIGAR was not found on the classpath or not installed properly. " + + "Metrics will be retreived from MBeans, and may be incorrect on some platforms. " + + "To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate" + + "platform-specific native libary to 'java.library.path'.") + new MetricsCollector(None, address) + } +} + diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 0aa9e6997e..5920ac3dca 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -30,6 +30,12 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { @volatile private var _latestStats = ClusterStats() + /** + * Current cluster metrics, updated periodically via event bus. + */ + @volatile + private var _clusterMetrics: Set[NodeMetrics] = Set.empty + val selfAddress = cluster.selfAddress // create actor that subscribes to the cluster eventBus to update current read view state @@ -56,6 +62,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { case ConvergenceChanged(convergence) ⇒ state = state.copy(convergence = convergence) case s: CurrentClusterState ⇒ state = s case CurrentInternalStats(stats) ⇒ _latestStats = stats + case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes case _ ⇒ // ignore, not interesting } }).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener") @@ -118,6 +125,11 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { !unreachableMembers.contains(myself) && !myself.status.isUnavailable } + /** + * Current cluster metrics. + */ + def clusterMetrics: Set[NodeMetrics] = _clusterMetrics + /** * INTERNAL API * The nodes that has seen current version of the Gossip. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 3c7baa4f76..6110df034a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -50,6 +50,10 @@ class ClusterSettings(val config: Config, val systemName: String) { maxFailures = getInt("akka.cluster.send-circuit-breaker.max-failures"), callTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.call-timeout"), MILLISECONDS), resetTimeout = Duration(getMilliseconds("akka.cluster.send-circuit-breaker.reset-timeout"), MILLISECONDS)) + final val MetricsEnabled: Boolean = getBoolean("akka.cluster.metrics.enabled") + final val MetricsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.metrics-interval"), MILLISECONDS) + final val MetricsGossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.metrics.gossip-interval"), MILLISECONDS) + final val MetricsRateOfDecay: Int = getInt("akka.cluster.metrics.rate-of-decay") } case class CircuitBreakerSettings(maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsDataStreamingOffSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsDataStreamingOffSpec.scala new file mode 100644 index 0000000000..efc2ce51fb --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsDataStreamingOffSpec.scala @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import scala.language.postfixOps +import scala.concurrent.util.duration._ +import akka.remote.testkit.{MultiNodeSpec, MultiNodeConfig} +import com.typesafe.config.ConfigFactory +import akka.testkit.LongRunningTest + + +object ClusterMetricsDataStreamingOffMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + commonConfig(ConfigFactory.parseString("akka.cluster.metrics.rate-of-decay = 0") + .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) +} +class ClusterMetricsDataStreamingMultiJvmNode1 extends ClusterMetricsDataStreamingOffSpec +class ClusterMetricsDataStreamingMultiJvmNode2 extends ClusterMetricsDataStreamingOffSpec + +abstract class ClusterMetricsDataStreamingOffSpec extends MultiNodeSpec(ClusterMetricsDataStreamingOffMultiJvmSpec) with MultiNodeClusterSpec with MetricSpec { + "Cluster metrics" must { + "not collect stream metric data" taggedAs LongRunningTest in within(30 seconds) { + awaitClusterUp(roles: _*) + enterBarrier("cluster-started") + runOn(roles: _*) { + awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size) + awaitCond(clusterView.clusterMetrics.size == roles.size) + awaitCond(clusterView.clusterMetrics.flatMap(_.metrics).filter(_.trendable).forall(_.average.isEmpty)) + } + enterBarrier("after") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsDisabledSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsDisabledSpec.scala new file mode 100644 index 0000000000..1216759a5a --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsDisabledSpec.scala @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import akka.remote.testkit.{MultiNodeSpec, MultiNodeConfig} +import com.typesafe.config.ConfigFactory +import akka.testkit.LongRunningTest + +object ClusterMetricsDisabledMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + commonConfig(ConfigFactory.parseString("akka.cluster.metrics.enabled = off") + .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) +} + +class ClusterMetricsDisabledMultiJvmNode1 extends ClusterMetricsDisabledSpec +class ClusterMetricsDisabledMultiJvmNode2 extends ClusterMetricsDisabledSpec + +abstract class ClusterMetricsDisabledSpec extends MultiNodeSpec(ClusterMetricsDisabledMultiJvmSpec) with MultiNodeClusterSpec { + "Cluster metrics" must { + "not collect metrics, not publish ClusterMetricsChanged, and not gossip metrics" taggedAs LongRunningTest in { + awaitClusterUp(roles: _*) + enterBarrier("cluster-started") + runOn(roles: _*) { + awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size) + awaitCond(clusterView.clusterMetrics.isEmpty) + } + enterBarrier("after") + } + } +} + diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala new file mode 100644 index 0000000000..d494f71336 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import scala.language.postfixOps +import scala.concurrent.util.duration._ +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.actor.ExtendedActorSystem + +object ClusterMetricsMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + + commonConfig(ConfigFactory.parseString(""" + akka.cluster.auto-join = on + akka.cluster.metrics.enabled = on + akka.cluster.metrics.metrics-interval = 3 s + akka.cluster.metrics.gossip-interval = 3 s + akka.cluster.metrics.rate-of-decay = 10 + akka.loglevel = INFO + akka.remote.log-sent-messages = off + akka.remote.log-received-messages = off + akka.actor.debug.receive = off + akka.actor.debug.unhandled = off + akka.actor.debug.lifecycle = off + akka.actor.debug.autoreceive = off + akka.actor.debug.fsm = off""") + .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) +} + +class ClusterMetricsMultiJvmNode1 extends ClusterMetricsSpec +class ClusterMetricsMultiJvmNode2 extends ClusterMetricsSpec +class ClusterMetricsMultiJvmNode3 extends ClusterMetricsSpec +class ClusterMetricsMultiJvmNode4 extends ClusterMetricsSpec +class ClusterMetricsMultiJvmNode5 extends ClusterMetricsSpec + +abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSpec) with MultiNodeClusterSpec with MetricSpec { + import ClusterMetricsMultiJvmSpec._ + + val collector = MetricsCollector(cluster.selfAddress, log, system.asInstanceOf[ExtendedActorSystem].dynamicAccess) + + "Cluster metrics" must { + "periodically collect metrics on each node, publish ClusterMetricsChanged to the event stream, " + + "and gossip metrics around the node ring" taggedAs LongRunningTest in within(60 seconds) { + awaitClusterUp(roles: _*) + enterBarrier("cluster-started") + runOn(roles: _*) { + awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size) + awaitCond(clusterView.clusterMetrics.size == roles.size) + assertInitialized(cluster.settings.MetricsRateOfDecay, collectNodeMetrics(clusterView.clusterMetrics).toSet) + clusterView.clusterMetrics.foreach(n => assertExpectedSampleSize(collector.isSigar, cluster.settings.MetricsRateOfDecay, n)) + } + enterBarrier("after") + } + "reflect the correct number of node metrics in cluster view" taggedAs LongRunningTest in within(30 seconds) { + runOn(second) { + cluster.leave(first) + } + enterBarrier("first-left") + runOn(second, third, fourth, fifth) { + awaitCond(clusterView.clusterMetrics.size == (roles.size - 1)) + } + enterBarrier("finished") + } + } +} \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index f8c5571a57..be5ae74e4d 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -45,6 +45,10 @@ class ClusterConfigSpec extends AkkaSpec { maxFailures = 3, callTimeout = 2 seconds, resetTimeout = 30 seconds)) + MetricsEnabled must be(true) + MetricsInterval must be(3 seconds) + MetricsGossipInterval must be(3 seconds) + MetricsRateOfDecay must be(10) } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/DataStreamSpec.scala b/akka-cluster/src/test/scala/akka/cluster/DataStreamSpec.scala new file mode 100644 index 0000000000..2f2ccaa2ae --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/DataStreamSpec.scala @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import language.postfixOps +import scala.concurrent.util.duration._ + +import akka.testkit.{ LongRunningTest, AkkaSpec } + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class DataStreamSpec extends AkkaSpec(MetricsEnabledSpec.config) with AbstractClusterMetricsSpec with MetricNumericConverter { + import system.dispatcher + + val collector = createMetricsCollector + val DefaultRateOfDecay = 10 + + "DataStream" must { + + "calculate the ewma for multiple, variable, data streams" taggedAs LongRunningTest in { + val firstDataSet = collector.sample.metrics.collect { case m if m.trendable && m.isDefined ⇒ m.initialize(DefaultRateOfDecay) } + var streamingDataSet = firstDataSet + + val cancellable = system.scheduler.schedule(0 seconds, 100 millis) { + streamingDataSet = collector.sample.metrics.flatMap(latest ⇒ streamingDataSet.collect { + case streaming if (latest.trendable && latest.isDefined) && (latest same streaming) + && (latest.value.get != streaming.value.get) ⇒ { + val updatedDataStream = streaming.average.get :+ latest.value.get + updatedDataStream.timestamp must be > (streaming.average.get.timestamp) + updatedDataStream.duration.length must be > (streaming.average.get.duration.length) + updatedDataStream.ewma must not be (streaming.average.get.ewma) + updatedDataStream.ewma must not be (latest.value.get) + streaming.copy(value = latest.value, average = Some(updatedDataStream)) + } + }) + } + awaitCond(firstDataSet.size == streamingDataSet.size, longDuration) + cancellable.cancel() + + val finalDataSet = streamingDataSet.map(m ⇒ m.name -> m).toMap + firstDataSet map { + first ⇒ + val newMetric = finalDataSet(first.name) + val e1 = first.average.get + val e2 = newMetric.average.get + + if (first.value.get != newMetric.value.get) { + e2.ewma must not be (first.value.get) + e2.ewma must not be (newMetric.value.get) + } + if (first.value.get.longValue > newMetric.value.get.longValue) e1.ewma.longValue must be > e2.ewma.longValue + else if (first.value.get.longValue < newMetric.value.get.longValue) e1.ewma.longValue must be < e2.ewma.longValue + } + } + + "data streaming is disabled if the decay is set to 0" in { + val data = collector.sample.metrics map (_.initialize(0)) + data foreach (_.average.isEmpty must be(true)) + } + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala new file mode 100644 index 0000000000..1f23da769c --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import akka.testkit.{ ImplicitSender, AkkaSpec } + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricNumericConverter with ImplicitSender with AbstractClusterMetricsSpec { + + "MetricNumericConverter" must { + val collector = createMetricsCollector + + "convert " in { + convert(0).isLeft must be(true) + convert(1).left.get must be(1) + convert(1L).isLeft must be(true) + convert(0.0).isRight must be(true) + } + + "define a new metric" in { + val metric = Metric("heap-memory-used", Some(0L)) + metric.initializable must be(true) + metric.name must not be (null) + metric.average.isEmpty must be(true) + metric.trendable must be(true) + + if (collector.isSigar) { + val cores = collector.totalCores + cores.isDefined must be(true) + cores.value.get.intValue must be > (0) + cores.initializable must be(false) + } + } + + "define an undefined value with a None " in { + Metric("x", Some(-1)).value.isDefined must be(false) + Metric("x", Some(java.lang.Double.NaN)).value.isDefined must be(false) + Metric("x", None).isDefined must be(false) + } + + "recognize whether a metric value is defined" in { + defined(0) must be(true) + defined(0.0) must be(true) + } + + "recognize whether a metric value is not defined" in { + defined(-1) must be(false) + defined(Double.NaN) must be(false) + } + } +} \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala new file mode 100644 index 0000000000..867b085f8a --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala @@ -0,0 +1,240 @@ +/* + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import scala.language.postfixOps +import scala.concurrent.util.duration._ +import scala.concurrent.util.FiniteDuration +import scala.concurrent.Await +import scala.util.{ Try, Failure } + +import akka.actor._ +import akka.testkit._ +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers + +object MetricsEnabledSpec { + val config = """ + akka.cluster.metrics.enabled = on + akka.cluster.metrics.metrics-interval = 1 s + akka.cluster.metrics.gossip-interval = 1 s + akka.cluster.metrics.rate-of-decay = 10 + akka.actor.provider = "akka.remote.RemoteActorRefProvider" + akka.loglevel = INFO""" +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with AbstractClusterMetricsSpec with MetricSpec { + import system.dispatcher + + val collector = createMetricsCollector + + "Metric must" must { + "create and initialize a new metric or merge an existing one" in { + for (i ← 0 to samples) { + val metrics = collector.sample.metrics + assertCreatedUninitialized(metrics) + assertInitialized(window, metrics map (_.initialize(window))) + } + } + + "merge 2 metrics that are tracking the same metric" in { + for (i ← 0 to samples) { + val sample1 = collector.sample.metrics + val sample2 = collector.sample.metrics + var merged = sample2 flatMap (latest ⇒ sample1 collect { + case peer if latest same peer ⇒ { + val m = peer :+ latest + assertMerged(latest, peer, m) + m + } + }) + + val sample3 = collector.sample.metrics map (_.initialize(window)) + val sample4 = collector.sample.metrics map (_.initialize(window)) + merged = sample4 flatMap (latest ⇒ sample3 collect { + case peer if latest same peer ⇒ { + val m = peer :+ latest + assertMerged(latest, peer, m) + m + } + }) + merged.size must be(sample3.size) + merged.size must be(sample4.size) + } + } + } + + "MetricsCollector" must { + + "not raise errors when attempting reflective code in apply" in { + Try(createMetricsCollector must not be null) match { + case Failure(e) ⇒ fail("No error should have been raised creating 'createMetricsCollector'.") + case _ ⇒ // + } + } + + "collect accurate metrics for a node" in { + val sample = collector.sample + assertExpectedSampleSize(collector.isSigar, window, sample) + val metrics = sample.metrics.collect { case m if m.isDefined ⇒ (m.name, m.value.get) } + val used = metrics collectFirst { case (a, b) if a == "heap-memory-used" ⇒ b } + val committed = metrics collectFirst { case (a, b) if a == "heap-memory-committed" ⇒ b } + metrics collect { + case (a, b) if a == "cpu-combined" ⇒ + b.doubleValue must be <= (1.0) + b.doubleValue must be >= (0.0) + b + case (a, b) if a == "total-cores" ⇒ b.intValue must be > (0); b + case (a, b) if a == "network-max-rx" ⇒ b.longValue must be > (0L); b + case (a, b) if a == "network-max-tx" ⇒ b.longValue must be > (0L); b + case (a, b) if a == "system-load-average" ⇒ b.doubleValue must be >= (0.0); b + case (a, b) if a == "processors" ⇒ b.intValue must be >= (0); b + case (a, b) if a == "heap-memory-used" ⇒ b.longValue must be >= (0L); b + case (a, b) if a == "heap-memory-committed" ⇒ b.longValue must be > (0L); b + case (a, b) if a == "heap-memory-max" ⇒ + used.get.longValue must be < (b.longValue) + committed.get.longValue must be < (b.longValue) + used.get.longValue + committed.get.longValue must be <= (b.longValue) + b + } + } + + "collect SIGAR metrics if it is on the classpath" in { + if (collector.isSigar) { + // combined cpu may or may not be defined on a given sampling + // systemLoadAverage is SIGAR present + collector.systemLoadAverage.isDefined must be(true) + collector.networkStats.nonEmpty must be(true) + collector.networkMaxRx.isDefined must be(true) + collector.networkMaxTx.isDefined must be(true) + collector.totalCores.isDefined must be(true) + } + } + + "collect JMX metrics" in { + // heap max may be undefined depending on the OS + // systemLoadAverage is JMX is SIGAR not present + collector.systemLoadAverage.isDefined must be(true) + collector.used.isDefined must be(true) + collector.committed.isDefined must be(true) + collector.processors.isDefined must be(true) + } + + "collect [" + samples + "] node metrics samples in an acceptable duration" taggedAs LongRunningTest in { + val latch = TestLatch(samples) + val task = FixedRateTask(system.scheduler, 0 seconds, interval) { + val sample = collector.sample + assertCreatedUninitialized(sample.metrics) + assertExpectedSampleSize(collector.isSigar, window, sample) + latch.countDown() + } + Await.ready(latch, longDuration) + task.cancel() + } + } +} + +trait MetricSpec extends WordSpec with MustMatchers { + + def assertMasterMetricsAgainstGossipMetrics(master: Set[NodeMetrics], gossip: MetricsGossip): Unit = { + val masterMetrics = collectNodeMetrics(master) + val gossipMetrics = collectNodeMetrics(gossip.nodes) + gossipMetrics.size must be(masterMetrics.size plusOrMinus 1) // combined cpu + } + + def assertExpectedNodeAddresses(gossip: MetricsGossip, nodes: Set[NodeMetrics]): Unit = + gossip.nodes.map(_.address) must be(nodes.map(_.address)) + + def assertExpectedSampleSize(isSigar: Boolean, gossip: MetricsGossip): Unit = + gossip.nodes.foreach(n ⇒ assertExpectedSampleSize(isSigar, gossip.rateOfDecay, n)) + + def assertCreatedUninitialized(gossip: MetricsGossip): Unit = + gossip.nodes.foreach(n ⇒ assertCreatedUninitialized(n.metrics.filterNot(_.trendable))) + + def assertInitialized(gossip: MetricsGossip): Unit = + gossip.nodes.foreach(n ⇒ assertInitialized(gossip.rateOfDecay, n.metrics)) + + def assertCreatedUninitialized(metrics: Set[Metric]): Unit = { + metrics.size must be > (0) + metrics foreach { m ⇒ + m.average.isEmpty must be(true) + if (m.value.isDefined) m.isDefined must be(true) + if (m.initializable) (m.trendable && m.isDefined && m.average.isEmpty) must be(true) + } + } + + def assertInitialized(decay: Int, metrics: Set[Metric]): Unit = if (decay > 0) metrics.filter(_.trendable) foreach { m ⇒ + m.initializable must be(false) + if (m.isDefined) m.average.isDefined must be(true) + } + + def assertMerged(latest: Metric, peer: Metric, merged: Metric): Unit = if (latest same peer) { + if (latest.isDefined) { + if (peer.isDefined) { + merged.isDefined must be(true) + merged.value.get must be(latest.value.get) + if (latest.trendable) { + if (latest.initializable) merged.average.isEmpty must be(true) + else merged.average.isDefined must be(true) + } + } else { + merged.isDefined must be(true) + merged.value.get must be(latest.value.get) + if (latest.average.isDefined) merged.average.get must be(latest.average.get) + else merged.average.isEmpty must be(true) + } + } else { + if (peer.isDefined) { + merged.isDefined must be(true) + merged.value.get must be(peer.value.get) + if (peer.trendable) { + if (peer.initializable) merged.average.isEmpty must be(true) + else merged.average.isDefined must be(true) + } + } else { + merged.isDefined must be(false) + merged.average.isEmpty must be(true) + } + } + } + + def assertExpectedSampleSize(isSigar: Boolean, decay: Int, node: NodeMetrics): Unit = { + node.metrics.size must be(9) + val metrics = node.metrics.filter(_.isDefined) + if (isSigar) { // combined cpu + jmx max heap + metrics.size must be >= (7) + metrics.size must be <= (9) + } else { // jmx max heap + metrics.size must be >= (4) + metrics.size must be <= (5) + } + + if (decay > 0) metrics.collect { case m if m.trendable && (!m.initializable) ⇒ m }.foreach(_.average.isDefined must be(true)) + } + + def collectNodeMetrics(nodes: Set[NodeMetrics]): Seq[Metric] = { + var r: Seq[Metric] = Seq.empty + nodes.foreach(n ⇒ r ++= n.metrics.filter(_.isDefined)) + r + } +} + +trait AbstractClusterMetricsSpec extends DefaultTimeout { + this: AkkaSpec ⇒ + + val selfAddress = new Address("akka", "localhost") + + val window = 49 + + val interval: FiniteDuration = 100 millis + + val longDuration = 120 seconds // for long running tests + + val samples = 100 + + def createMetricsCollector: MetricsCollector = MetricsCollector(selfAddress, log, system.asInstanceOf[ExtendedActorSystem].dynamicAccess) + +} \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricsGossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricsGossipSpec.scala new file mode 100644 index 0000000000..3ff6db6de2 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/MetricsGossipSpec.scala @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import scala.concurrent.util.duration._ + +import akka.testkit.{ ImplicitSender, AkkaSpec } +import akka.actor.Address + +import java.lang.System.{ currentTimeMillis ⇒ newTimestamp } + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with AbstractClusterMetricsSpec with MetricSpec { + + val collector = createMetricsCollector + + "A MetricsGossip" must { + "add and initialize new NodeMetrics" in { + val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics) + val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics) + + var localGossip = MetricsGossip(window) + localGossip :+= m1 + localGossip.nodes.size must be(1) + localGossip.nodeKeys.size must be(localGossip.nodes.size) + assertMasterMetricsAgainstGossipMetrics(Set(m1), localGossip) + assertExpectedSampleSize(collector.isSigar, localGossip) + assertInitialized(localGossip.rateOfDecay, collectNodeMetrics(localGossip.nodes).toSet) + + localGossip :+= m2 + localGossip.nodes.size must be(2) + localGossip.nodeKeys.size must be(localGossip.nodes.size) + assertMasterMetricsAgainstGossipMetrics(Set(m1, m2), localGossip) + assertExpectedSampleSize(collector.isSigar, localGossip) + assertInitialized(localGossip.rateOfDecay, collectNodeMetrics(localGossip.nodes).toSet) + } + + "merge peer metrics" in { + val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics) + val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics) + + var remoteGossip = MetricsGossip(window) + remoteGossip :+= m1 + remoteGossip :+= m2 + remoteGossip.nodes.size must be(2) + val beforeMergeNodes = remoteGossip.nodes + + val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = newTimestamp) + remoteGossip :+= m2Updated // merge peers + remoteGossip.nodes.size must be(2) + assertMasterMetricsAgainstGossipMetrics(beforeMergeNodes, remoteGossip) + assertExpectedSampleSize(collector.isSigar, remoteGossip) + remoteGossip.nodes collect { case peer if peer.address == m2.address ⇒ peer.timestamp must be(m2Updated.timestamp) } + } + + "merge an existing metric set for a node and update node ring" in { + val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics) + val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics) + val m3 = NodeMetrics(Address("akka", "sys", "a", 2556), newTimestamp, collector.sample.metrics) + val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = newTimestamp) + + var localGossip = MetricsGossip(window) + localGossip :+= m1 + localGossip :+= m2 + + var remoteGossip = MetricsGossip(window) + remoteGossip :+= m3 + remoteGossip :+= m2Updated + + localGossip.nodeKeys.contains(m1.address) must be(true) + remoteGossip.nodeKeys.contains(m3.address) must be(true) + + // must contain nodes 1,3, and the most recent version of 2 + val mergedGossip = localGossip merge remoteGossip + mergedGossip.nodes.size must be(3) + assertExpectedNodeAddresses(mergedGossip, Set(m1, m2, m3)) + assertExpectedSampleSize(collector.isSigar, mergedGossip) + assertCreatedUninitialized(mergedGossip) + assertInitialized(mergedGossip) + mergedGossip.nodes.find(_.address == m2.address).get.timestamp must be(m2Updated.timestamp) + } + + "get the current NodeMetrics if it exists in the local nodes" in { + val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics) + var localGossip = MetricsGossip(window) + localGossip :+= m1 + localGossip.metricsFor(m1).nonEmpty must be(true) + } + + "remove a node if it is no longer Up" in { + val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics) + val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics) + + var localGossip = MetricsGossip(window) + localGossip :+= m1 + localGossip :+= m2 + + localGossip.nodes.size must be(2) + localGossip = localGossip remove m1.address + localGossip.nodes.size must be(1) + localGossip.nodes.exists(_.address == m1.address) must be(false) + } + } +} + diff --git a/akka-cluster/src/test/scala/akka/cluster/NodeMetricsSpec.scala b/akka-cluster/src/test/scala/akka/cluster/NodeMetricsSpec.scala new file mode 100644 index 0000000000..5d58bc84e5 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/NodeMetricsSpec.scala @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import akka.testkit.AkkaSpec +import akka.actor.Address + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class NodeMetricsSpec extends AkkaSpec with AbstractClusterMetricsSpec with MetricSpec { + + val collector = createMetricsCollector + + val node1 = Address("akka", "sys", "a", 2554) + + val node2 = Address("akka", "sys", "a", 2555) + + "NodeMetrics must" must { + "recognize updatable nodes" in { + (NodeMetrics(node1, 0) updatable NodeMetrics(node1, 1)) must be(true) + } + + "recognize non-updatable nodes" in { + (NodeMetrics(node1, 1) updatable NodeMetrics(node2, 0)) must be(false) + } + + "return correct result for 2 'same' nodes" in { + (NodeMetrics(node1, 0) same NodeMetrics(node1, 0)) must be(true) + } + + "return correct result for 2 not 'same' nodes" in { + (NodeMetrics(node1, 0) same NodeMetrics(node2, 0)) must be(false) + } + + "merge 2 NodeMetrics by most recent" in { + val sample1 = NodeMetrics(node1, 1, collector.sample.metrics) + val sample2 = NodeMetrics(node1, 2, collector.sample.metrics) + + val merged = sample1 merge sample2 + merged.timestamp must be(sample2.timestamp) + merged.metrics must be(sample2.metrics) + } + + "not merge 2 NodeMetrics if master is more recent" in { + val sample1 = NodeMetrics(node1, 1, collector.sample.metrics) + val sample2 = NodeMetrics(node2, 0, sample1.metrics) + + val merged = sample2 merge sample2 // older and not same + merged.timestamp must be(sample2.timestamp) + merged.metrics must be(sample2.metrics) + } + } +} +