remove old deprecated cluster metrics, #21423

* corresponding was moved to akka-cluster-metrics, see
  http://doc.akka.io/docs/akka/2.4/project/migration-guide-2.3.x-2.4.x.html#New_Cluster_Metrics_Extension
This commit is contained in:
Patrik Nordwall 2017-01-20 13:48:36 +01:00 committed by Johan Andrén
parent 6aa67703a8
commit 452b3f1406
43 changed files with 107 additions and 7797 deletions

View file

@ -110,23 +110,22 @@ private[metrics] trait ClusterMetricsMessage extends Serializable
private[metrics] final case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip, reply: Boolean) extends ClusterMetricsMessage private[metrics] final case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip, reply: Boolean) extends ClusterMetricsMessage
with DeadLetterSuppression with DeadLetterSuppression
/**
* INTERNAL API.
*/
private[metrics] object ClusterMetricsCollector {
case object MetricsTick
case object GossipTick
}
/** /**
* INTERNAL API. * INTERNAL API.
* *
* Actor responsible for periodic data sampling in the node and publication to the cluster. * Actor responsible for periodic data sampling in the node and publication to the cluster.
*/ */
private[metrics] class ClusterMetricsCollector extends Actor with ActorLogging { private[metrics] class ClusterMetricsCollector extends Actor with ActorLogging {
import InternalClusterAction._ import ClusterMetricsCollector._
// TODO collapse to ClusterEvent._ after akka-cluster metrics is gone import ClusterEvent._
import ClusterEvent.MemberEvent
import ClusterEvent.MemberUp
import ClusterEvent.MemberWeaklyUp
import ClusterEvent.MemberRemoved
import ClusterEvent.MemberExited
import ClusterEvent.ReachabilityEvent
import ClusterEvent.ReachableMember
import ClusterEvent.UnreachableMember
import ClusterEvent.CurrentClusterState
import Member.addressOrdering import Member.addressOrdering
import context.dispatcher import context.dispatcher
val cluster = Cluster(context.system) val cluster = Cluster(context.system)

View file

@ -28,13 +28,10 @@ trait ClusterMetricsCommonConfig extends MultiNodeConfig {
// Extract individual sigar library for every node. // Extract individual sigar library for every node.
nodeList foreach { role nodeList foreach { role
nodeConfig(role) { nodeConfig(role) {
parseString("akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native/" + role.name) parseString(s"akka.cluster.metrics.native-library-extract-folder=$${user.dir}/target/native/" + role.name)
} }
} }
// Disable legacy metrics in akka-cluster.
def disableMetricsLegacy = parseString("""akka.cluster.metrics.enabled=off""")
// Enable metrics extension in akka-cluster-metrics. // Enable metrics extension in akka-cluster-metrics.
def enableMetricsExtension = parseString(""" def enableMetricsExtension = parseString("""
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"] akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
@ -56,7 +53,6 @@ object ClusterMetricsDisabledConfig extends ClusterMetricsCommonConfig {
commonConfig { commonConfig {
Seq( Seq(
customLogging, customLogging,
disableMetricsLegacy,
disableMetricsExtension, disableMetricsExtension,
debugConfig(on = false), debugConfig(on = false),
MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet) MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)
@ -70,7 +66,6 @@ object ClusterMetricsEnabledConfig extends ClusterMetricsCommonConfig {
commonConfig { commonConfig {
Seq( Seq(
customLogging, customLogging,
disableMetricsLegacy,
enableMetricsExtension, enableMetricsExtension,
debugConfig(on = false), debugConfig(on = false),
MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet) MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)

View file

@ -66,10 +66,6 @@ object AdaptiveLoadBalancingRouterConfig extends MultiNodeConfig {
} }
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
# Disable legacy metrics.
akka.cluster.metrics.enabled=off
# Enable metrics estension. # Enable metrics estension.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"] akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com> * Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/ */
package akka.cluster.routing package akka.cluster.metrics
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.actor.Address import akka.actor.Address
@ -11,6 +11,7 @@ import akka.remote.RARP
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.routing.ActorSelectionRoutee import akka.routing.ActorSelectionRoutee
import akka.routing.ActorRefRoutee import akka.routing.ActorRefRoutee
import scala.Vector
class WeightedRouteesSpec extends AkkaSpec(ConfigFactory.parseString(""" class WeightedRouteesSpec extends AkkaSpec(ConfigFactory.parseString("""
akka.actor.provider = "cluster" akka.actor.provider = "cluster"

View file

@ -48,7 +48,6 @@ object ClusterShardingGetStateSpecConfig extends MultiNodeConfig {
akka.loglevel = INFO akka.loglevel = INFO
akka.actor.provider = "cluster" akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off akka.remote.log-remote-lifecycle-events = off
akka.cluster.metrics.enabled = off
akka.cluster.auto-down-unreachable-after = 0s akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.sharding { akka.cluster.sharding {
coordinator-failure-backoff = 3s coordinator-failure-backoff = 3s

View file

@ -50,7 +50,6 @@ object ClusterShardingGetStatsSpecConfig extends MultiNodeConfig {
akka.actor.provider = "cluster" akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off akka.remote.log-remote-lifecycle-events = off
akka.log-dead-letters-during-shutdown = off akka.log-dead-letters-during-shutdown = off
akka.cluster.metrics.enabled = off
akka.cluster.auto-down-unreachable-after = 0s akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.sharding { akka.cluster.sharding {
state-store-mode = "ddata" state-store-mode = "ddata"

View file

@ -173,59 +173,6 @@ message VectorClock {
repeated Version versions = 2; repeated Version versions = 2;
} }
/****************************************
* Metrics Gossip Messages
****************************************/
/**
* Metrics Gossip Envelope
*/
message MetricsGossipEnvelope {
required Address from = 1;
required MetricsGossip gossip = 2;
required bool reply = 3;
}
/**
* Metrics Gossip
*/
message MetricsGossip {
repeated Address allAddresses = 1;
repeated string allMetricNames = 2;
repeated NodeMetrics nodeMetrics = 3;
}
/**
* Node Metrics
*/
message NodeMetrics {
enum NumberType {
Serialized = 0;
Double = 1;
Float = 2;
Integer = 3;
Long = 4;
}
message Number {
required NumberType type = 1;
optional uint32 value32 = 2;
optional uint64 value64 = 3;
optional bytes serialized = 4;
}
message EWMA {
required double value = 1;
required double alpha = 2;
}
message Metric {
required int32 nameIndex = 1;
required Number number = 2;
optional EWMA ewma = 3;
}
required int32 addressIndex = 1;
required int64 timestamp = 2;
repeated Metric metrics = 3;
}
/**************************************** /****************************************
* Common Datatypes and Messages * Common Datatypes and Messages
****************************************/ ****************************************/

View file

@ -189,34 +189,6 @@ akka {
} }
metrics {
# Enable or disable metrics collector for load-balancing nodes.
enabled = on
# FQCN of the metrics collector implementation.
# It must implement akka.cluster.MetricsCollector and
# have public constructor with akka.actor.ActorSystem parameter.
# The default SigarMetricsCollector uses JMX and Hyperic SIGAR, if SIGAR
# is on the classpath, otherwise only JMX.
collector-class = "akka.cluster.SigarMetricsCollector"
# How often metrics are sampled on a node.
# Shorter interval will collect the metrics more often.
collect-interval = 3s
# How often a node publishes metrics information.
gossip-interval = 3s
# How quickly the exponential weighting of past data is decayed compared to
# new data. Set lower to increase the bias toward newer values.
# The relevance of each data sample is halved for every passing half-life
# duration, i.e. after 4 times the half-life, a data samples relevance is
# reduced to 6% of its original relevance. The initial relevance of a data
# sample is given by 1 0.5 ^ (collect-interval / half-life).
# See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
moving-average-half-life = 12s
}
# If the tick-duration of the default scheduler is longer than the # If the tick-duration of the default scheduler is longer than the
# tick-duration configured here a dedicated scheduler will be used for # tick-duration configured here a dedicated scheduler will be used for
# periodic tasks of the cluster, otherwise the default scheduler is used. # periodic tasks of the cluster, otherwise the default scheduler is used.
@ -233,17 +205,6 @@ akka {
} }
# Default configuration for routers
actor.deployment.default {
# MetricsSelector to use
# - available: "mix", "heap", "cpu", "load"
# - or: Fully qualified class name of the MetricsSelector class.
# The class must extend akka.cluster.routing.MetricsSelector
# and have a public constructor with com.typesafe.config.Config
# parameter.
# - default is "mix"
metrics-selector = mix
}
actor.deployment.default.cluster { actor.deployment.default.cluster {
# enable cluster aware router that deploys to nodes in the cluster # enable cluster aware router that deploys to nodes in the cluster
enabled = off enabled = off
@ -289,10 +250,6 @@ akka {
"akka.cluster.protobuf.ClusterMessageSerializer" = 5 "akka.cluster.protobuf.ClusterMessageSerializer" = 5
} }
router.type-mapping {
adaptive-pool = "akka.cluster.routing.AdaptiveLoadBalancingPool"
adaptive-group = "akka.cluster.routing.AdaptiveLoadBalancingGroup"
}
} }
} }

View file

@ -125,8 +125,6 @@ private[cluster] object InternalClusterAction {
case object ReapUnreachableTick extends Tick case object ReapUnreachableTick extends Tick
case object MetricsTick extends Tick
case object LeaderActionsTick extends Tick case object LeaderActionsTick extends Tick
case object PublishStatsTick extends Tick case object PublishStatsTick extends Tick
@ -135,8 +133,6 @@ private[cluster] object InternalClusterAction {
case object GetClusterCoreRef case object GetClusterCoreRef
final case class PublisherCreated(publisher: ActorRef)
/** /**
* Command to [[akka.cluster.ClusterDaemon]] to create a * Command to [[akka.cluster.ClusterDaemon]] to create a
* [[akka.cluster.OnMemberStatusChangedListener]]. * [[akka.cluster.OnMemberStatusChangedListener]].
@ -217,13 +213,6 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
context.actorOf(Props(classOf[OnMemberStatusChangedListener], code, Up).withDeploy(Deploy.local)) context.actorOf(Props(classOf[OnMemberStatusChangedListener], code, Up).withDeploy(Deploy.local))
case AddOnMemberRemovedListener(code) case AddOnMemberRemovedListener(code)
context.actorOf(Props(classOf[OnMemberStatusChangedListener], code, Removed).withDeploy(Deploy.local)) context.actorOf(Props(classOf[OnMemberStatusChangedListener], code, Removed).withDeploy(Deploy.local))
case PublisherCreated(publisher)
if (settings.MetricsEnabled) {
// metrics must be started after core/publisher to be able
// to inject the publisher ref to the ClusterMetricsCollector
context.actorOf(Props(classOf[ClusterMetricsCollector], publisher).
withDispatcher(context.props.dispatcher), name = "metrics")
}
case CoordinatedShutdownLeave.LeaveReq case CoordinatedShutdownLeave.LeaveReq
val ref = context.actorOf(CoordinatedShutdownLeave.props().withDispatcher(context.props.dispatcher)) val ref = context.actorOf(CoordinatedShutdownLeave.props().withDispatcher(context.props.dispatcher))
// forward the ask request // forward the ask request
@ -254,7 +243,6 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi
withDispatcher(context.props.dispatcher), name = "publisher") withDispatcher(context.props.dispatcher), name = "publisher")
coreDaemon = Some(context.watch(context.actorOf(Props(classOf[ClusterCoreDaemon], publisher). coreDaemon = Some(context.watch(context.actorOf(Props(classOf[ClusterCoreDaemon], publisher).
withDispatcher(context.props.dispatcher), name = "daemon"))) withDispatcher(context.props.dispatcher), name = "daemon")))
context.parent ! PublisherCreated(publisher)
} }
override val supervisorStrategy = override val supervisorStrategy =

View file

@ -3,8 +3,6 @@
*/ */
package akka.cluster package akka.cluster
// TODO remove metrics
import language.postfixOps import language.postfixOps
import scala.collection.immutable import scala.collection.immutable
import scala.collection.immutable.VectorBuilder import scala.collection.immutable.VectorBuilder
@ -227,18 +225,6 @@ object ClusterEvent {
*/ */
final case class ReachableMember(member: Member) extends ReachabilityEvent final case class ReachableMember(member: Member) extends ReachabilityEvent
/**
* Current snapshot of cluster node metrics. Published to subscribers.
*/
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
final case class ClusterMetricsChanged(nodeMetrics: Set[NodeMetrics]) extends ClusterDomainEvent {
/**
* Java API
*/
def getNodeMetrics: java.lang.Iterable[NodeMetrics] =
scala.collection.JavaConverters.asJavaIterableConverter(nodeMetrics).asJava
}
/** /**
* INTERNAL API * INTERNAL API
* The nodes that have seen current version of the Gossip. * The nodes that have seen current version of the Gossip.

View file

@ -1,818 +0,0 @@
/*
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
// TODO remove metrics
import java.io.Closeable
import java.lang.System.{ currentTimeMillis newTimestamp }
import java.lang.management.{ OperatingSystemMXBean, MemoryMXBean, ManagementFactory }
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import scala.collection.immutable
import scala.concurrent.duration._
import java.util.concurrent.ThreadLocalRandom
import scala.util.{ Try, Success, Failure }
import akka.ConfigurationException
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.cluster.MemberStatus.Up
import akka.cluster.MemberStatus.WeaklyUp
import akka.event.Logging
import java.lang.management.MemoryUsage
/**
* INTERNAL API.
*
* Cluster metrics is primarily for load-balancing of nodes. It controls metrics sampling
* at a regular frequency, prepares highly variable data for further analysis by other entities,
* and publishes the latest cluster metrics data around the node ring and local eventStream
* to assist in determining the need to redirect traffic to the least-loaded nodes.
*
* Metrics sampling is delegated to the [[akka.cluster.MetricsCollector]].
*
* Smoothing of the data for each monitored process is delegated to the
* [[akka.cluster.EWMA]] for exponential weighted moving average.
*/
private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Actor with ActorLogging {
import InternalClusterAction._
import ClusterEvent._
import Member.addressOrdering
import context.dispatcher
val cluster = Cluster(context.system)
import cluster.{ selfAddress, scheduler, settings }
import cluster.settings._
import cluster.InfoLogger._
/**
* The node ring gossipped that contains only members that are Up.
*/
var nodes: immutable.SortedSet[Address] = immutable.SortedSet.empty
/**
* The latest metric values with their statistical data.
*/
var latestGossip: MetricsGossip = MetricsGossip.empty
/**
* The metrics collector that samples data on the node.
*/
val collector: MetricsCollector = MetricsCollector(context.system.asInstanceOf[ExtendedActorSystem], settings)
/**
* Start periodic gossip to random nodes in cluster
*/
val gossipTask = scheduler.schedule(
PeriodicTasksInitialDelay max MetricsGossipInterval,
MetricsGossipInterval, self, GossipTick)
/**
* Start periodic metrics collection
*/
val metricsTask = scheduler.schedule(
PeriodicTasksInitialDelay max MetricsInterval,
MetricsInterval, self, MetricsTick)
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])
logInfo("Metrics collection has started successfully")
}
def receive = {
case GossipTick gossip()
case MetricsTick collect()
case msg: MetricsGossipEnvelope receiveGossip(msg)
case state: CurrentClusterState receiveState(state)
case MemberUp(m) addMember(m)
case MemberWeaklyUp(m) addMember(m)
case MemberRemoved(m, _) removeMember(m)
case MemberExited(m) removeMember(m)
case UnreachableMember(m) removeMember(m)
case ReachableMember(m)
if (m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp)
addMember(m)
case _: MemberEvent // not interested in other types of MemberEvent
}
override def postStop: Unit = {
cluster unsubscribe self
gossipTask.cancel()
metricsTask.cancel()
collector.close()
}
/**
* Adds a member to the node ring.
*/
def addMember(member: Member): Unit = nodes += member.address
/**
* Removes a member from the member node ring.
*/
def removeMember(member: Member): Unit = {
nodes -= member.address
latestGossip = latestGossip remove member.address
publish()
}
/**
* Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus]] `Up`.
*/
def receiveState(state: CurrentClusterState): Unit =
nodes = state.members collect { case m if m.status == Up || m.status == WeaklyUp m.address }
/**
* Samples the latest metrics for the node, updates metrics statistics in
* [[akka.cluster.MetricsGossip]], and publishes the change to the event bus.
*
* @see [[akka.cluster.ClusterMetricsCollector#collect]]
*/
def collect(): Unit = {
latestGossip :+= collector.sample()
publish()
}
/**
* Receives changes from peer nodes, merges remote with local gossip nodes, then publishes
* changes to the event stream for load balancing router consumption, and gossip back.
*/
def receiveGossip(envelope: MetricsGossipEnvelope): Unit = {
// remote node might not have same view of member nodes, this side should only care
// about nodes that are known here, otherwise removed nodes can come back
val otherGossip = envelope.gossip.filter(nodes)
latestGossip = latestGossip merge otherGossip
// changes will be published in the period collect task
if (!envelope.reply)
replyGossipTo(envelope.from)
}
/**
* Gossip to peer nodes.
*/
def gossip(): Unit = selectRandomNode((nodes - selfAddress).toVector) foreach gossipTo
def gossipTo(address: Address): Unit =
sendGossip(address, MetricsGossipEnvelope(selfAddress, latestGossip, reply = false))
def replyGossipTo(address: Address): Unit =
sendGossip(address, MetricsGossipEnvelope(selfAddress, latestGossip, reply = true))
def sendGossip(address: Address, envelope: MetricsGossipEnvelope): Unit =
context.actorSelection(self.path.toStringWithAddress(address)) ! envelope
def selectRandomNode(addresses: immutable.IndexedSeq[Address]): Option[Address] =
if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
/**
* Publishes to the event stream.
*/
def publish(): Unit = publisher ! PublishEvent(ClusterMetricsChanged(latestGossip.nodes))
}
/**
* INTERNAL API
*/
private[cluster] object MetricsGossip {
val empty = MetricsGossip(Set.empty[NodeMetrics])
}
/**
* INTERNAL API
*
* @param nodes metrics per node
*/
@SerialVersionUID(1L)
private[cluster] final case class MetricsGossip(nodes: Set[NodeMetrics]) {
/**
* Removes nodes if their correlating node ring members are not [[akka.cluster.MemberStatus]] `Up`.
*/
def remove(node: Address): MetricsGossip = copy(nodes = nodes filterNot (_.address == node))
/**
* Only the nodes that are in the `includeNodes` Set.
*/
def filter(includeNodes: Set[Address]): MetricsGossip =
copy(nodes = nodes filter { includeNodes contains _.address })
/**
* Adds new remote [[akka.cluster.NodeMetrics]] and merges existing from a remote gossip.
*/
def merge(otherGossip: MetricsGossip): MetricsGossip =
otherGossip.nodes.foldLeft(this) { (gossip, nodeMetrics) gossip :+ nodeMetrics }
/**
* Adds new local [[akka.cluster.NodeMetrics]], or merges an existing.
*/
def :+(newNodeMetrics: NodeMetrics): MetricsGossip = nodeMetricsFor(newNodeMetrics.address) match {
case Some(existingNodeMetrics)
copy(nodes = nodes - existingNodeMetrics + (existingNodeMetrics merge newNodeMetrics))
case None copy(nodes = nodes + newNodeMetrics)
}
/**
* Returns [[akka.cluster.NodeMetrics]] for a node if exists.
*/
def nodeMetricsFor(address: Address): Option[NodeMetrics] = nodes find { n n.address == address }
}
/**
* INTERNAL API
* Envelope adding a sender address to the gossip.
*/
@SerialVersionUID(1L)
private[cluster] final case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip, reply: Boolean)
extends ClusterMessage
private[cluster] object EWMA {
/**
* math.log(2)
*/
private val LogOf2 = 0.69315
/**
* Calculate the alpha (decay factor) used in [[akka.cluster.EWMA]]
* from specified half-life and interval between observations.
* Half-life is the interval over which the weights decrease by a factor of two.
* The relevance of each data sample is halved for every passing half-life duration,
* i.e. after 4 times the half-life, a data samples relevance is reduced to 6% of
* its original relevance. The initial relevance of a data sample is given by
* 1 0.5 ^ (collect-interval / half-life).
*/
def alpha(halfLife: FiniteDuration, collectInterval: FiniteDuration): Double = {
val halfLifeMillis = halfLife.toMillis
require(halfLife.toMillis > 0, "halfLife must be > 0 s")
val decayRate = LogOf2 / halfLifeMillis
1 - math.exp(-decayRate * collectInterval.toMillis)
}
}
/**
* The exponentially weighted moving average (EWMA) approach captures short-term
* movements in volatility for a conditional volatility forecasting model. By virtue
* of its alpha, or decay factor, this provides a statistical streaming data model
* that is exponentially biased towards newer entries.
*
* http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
*
* An EWMA only needs the most recent forecast value to be kept, as opposed to a standard
* moving average model.
*
* INTERNAL API
*
* @param alpha decay factor, sets how quickly the exponential weighting decays for past data compared to new data,
* see http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
*
* @param value the current exponentially weighted moving average, e.g. Y(n - 1), or,
* the sampled value resulting from the previous smoothing iteration.
* This value is always used as the previous EWMA to calculate the new EWMA.
*
*/
@SerialVersionUID(1L)
private[cluster] final case class EWMA(value: Double, alpha: Double) {
require(0.0 <= alpha && alpha <= 1.0, "alpha must be between 0.0 and 1.0")
/**
* Calculates the exponentially weighted moving average for a given monitored data set.
*
* @param xn the new data point
* @return a new [[akka.cluster.EWMA]] with the updated value
*/
def :+(xn: Double): EWMA = {
val newValue = (alpha * xn) + (1 - alpha) * value
if (newValue == value) this // no change
else copy(value = newValue)
}
}
/**
* Metrics key/value.
*
* Equality of Metric is based on its name.
*
* @param name the metric name
* @param value the metric value, which must be a valid numerical value,
* a valid value is neither negative nor NaN/Infinite.
* @param average the data stream of the metric value, for trending over time. Metrics that are already
* averages (e.g. system load average) or finite (e.g. as number of processors), are not trended.
*/
@SerialVersionUID(1L)
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
final case class Metric private[cluster] (name: String, value: Number, private[cluster] val average: Option[EWMA])
extends MetricNumericConverter {
require(defined(value), s"Invalid Metric [$name] value [$value]")
/**
* Updates the data point, and if defined, updates the data stream (average).
* Returns the updated metric.
*/
def :+(latest: Metric): Metric =
if (this sameAs latest) average match {
case Some(avg) copy(value = latest.value, average = Some(avg :+ latest.value.doubleValue))
case None if latest.average.isDefined copy(value = latest.value, average = latest.average)
case _ copy(value = latest.value)
}
else this
/**
* The numerical value of the average, if defined, otherwise the latest value
*/
def smoothValue: Double = average match {
case Some(avg) avg.value
case None value.doubleValue
}
/**
* @return true if this value is smoothed
*/
def isSmooth: Boolean = average.isDefined
/**
* Returns true if <code>that</code> is tracking the same metric as this.
*/
def sameAs(that: Metric): Boolean = name == that.name
override def hashCode = name.##
override def equals(obj: Any) = obj match {
case other: Metric sameAs(other)
case _ false
}
}
/**
* Factory for creating valid Metric instances.
*/
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
object Metric extends MetricNumericConverter {
/**
* Creates a new Metric instance if the value is valid, otherwise None
* is returned. Invalid numeric values are negative and NaN/Infinite.
*/
def create(name: String, value: Number, decayFactor: Option[Double]): Option[Metric] =
if (defined(value)) Some(new Metric(name, value, ceateEWMA(value.doubleValue, decayFactor)))
else None
/**
* Creates a new Metric instance if the Try is successful and the value is valid,
* otherwise None is returned. Invalid numeric values are negative and NaN/Infinite.
*/
def create(name: String, value: Try[Number], decayFactor: Option[Double]): Option[Metric] = value match {
case Success(v) create(name, v, decayFactor)
case Failure(_) None
}
private def ceateEWMA(value: Double, decayFactor: Option[Double]): Option[EWMA] = decayFactor match {
case Some(alpha) Some(EWMA(value, alpha))
case None None
}
}
/**
* The snapshot of current sampled health metrics for any monitored process.
* Collected and gossipped at regular intervals for dynamic cluster management strategies.
*
* Equality of NodeMetrics is based on its address.
*
* @param address [[akka.actor.Address]] of the node the metrics are gathered at
* @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
* @param metrics the set of sampled [[akka.cluster.Metric]]
*/
@SerialVersionUID(1L)
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
final case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) {
/**
* Returns the most recent data.
*/
def merge(that: NodeMetrics): NodeMetrics = {
require(address == that.address, s"merge only allowed for same address, [$address] != [$that.address]")
if (timestamp >= that.timestamp) this // that is older
else {
// equality is based on the name of the Metric and Set doesn't replace existing element
copy(metrics = that.metrics union metrics, timestamp = that.timestamp)
}
}
def metric(key: String): Option[Metric] = metrics.collectFirst { case m if m.name == key m }
/**
* Java API
*/
def getMetrics: java.lang.Iterable[Metric] =
scala.collection.JavaConverters.asJavaIterableConverter(metrics).asJava
/**
* Returns true if <code>that</code> address is the same as this
*/
def sameAs(that: NodeMetrics): Boolean = address == that.address
override def hashCode = address.##
override def equals(obj: Any) = obj match {
case other: NodeMetrics sameAs(other)
case _ false
}
}
/**
* Definitions of the built-in standard metrics.
*
* The following extractors and data structures makes it easy to consume the
* [[akka.cluster.NodeMetrics]] in for example load balancers.
*/
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
object StandardMetrics {
// Constants for the heap related Metric names
final val HeapMemoryUsed = "heap-memory-used"
final val HeapMemoryCommitted = "heap-memory-committed"
final val HeapMemoryMax = "heap-memory-max"
// Constants for the cpu related Metric names
final val SystemLoadAverage = "system-load-average"
final val Processors = "processors"
final val CpuCombined = "cpu-combined"
object HeapMemory {
/**
* Given a NodeMetrics it returns the HeapMemory data if the nodeMetrics contains
* necessary heap metrics.
* @return if possible a tuple matching the HeapMemory constructor parameters
*/
def unapply(nodeMetrics: NodeMetrics): Option[(Address, Long, Long, Long, Option[Long])] = {
for {
used nodeMetrics.metric(HeapMemoryUsed)
committed nodeMetrics.metric(HeapMemoryCommitted)
} yield (nodeMetrics.address, nodeMetrics.timestamp,
used.smoothValue.longValue, committed.smoothValue.longValue,
nodeMetrics.metric(HeapMemoryMax).map(_.smoothValue.longValue))
}
}
/**
* Java API to extract HeapMemory data from nodeMetrics, if the nodeMetrics
* contains necessary heap metrics, otherwise it returns null.
*/
def extractHeapMemory(nodeMetrics: NodeMetrics): HeapMemory = nodeMetrics match {
case HeapMemory(address, timestamp, used, committed, max)
// note that above extractor returns tuple
HeapMemory(address, timestamp, used, committed, max)
case _ null
}
/**
* The amount of used and committed memory will always be <= max if max is defined.
* A memory allocation may fail if it attempts to increase the used memory such that used > committed
* even if used <= max is true (e.g. when the system virtual memory is low).
*
* @param address [[akka.actor.Address]] of the node the metrics are gathered at
* @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
* @param used the current sum of heap memory used from all heap memory pools (in bytes)
* @param committed the current sum of heap memory guaranteed to be available to the JVM
* from all heap memory pools (in bytes). Committed will always be greater than or equal to used.
* @param max the maximum amount of memory (in bytes) that can be used for JVM memory management.
* Can be undefined on some OS.
*/
@SerialVersionUID(1L)
final case class HeapMemory(address: Address, timestamp: Long, used: Long, committed: Long, max: Option[Long]) {
require(committed > 0L, "committed heap expected to be > 0 bytes")
require(max.isEmpty || max.get > 0L, "max heap expected to be > 0 bytes")
}
object Cpu {
/**
* Given a NodeMetrics it returns the Cpu data if the nodeMetrics contains
* necessary cpu metrics.
* @return if possible a tuple matching the Cpu constructor parameters
*/
def unapply(nodeMetrics: NodeMetrics): Option[(Address, Long, Option[Double], Option[Double], Int)] = {
for {
processors nodeMetrics.metric(Processors)
} yield (nodeMetrics.address, nodeMetrics.timestamp,
nodeMetrics.metric(SystemLoadAverage).map(_.smoothValue),
nodeMetrics.metric(CpuCombined).map(_.smoothValue), processors.value.intValue)
}
}
/**
* Java API to extract Cpu data from nodeMetrics, if the nodeMetrics
* contains necessary cpu metrics, otherwise it returns null.
*/
def extractCpu(nodeMetrics: NodeMetrics): Cpu = nodeMetrics match {
case Cpu(address, timestamp, systemLoadAverage, cpuCombined, processors)
// note that above extractor returns tuple
Cpu(address, timestamp, systemLoadAverage, cpuCombined, processors)
case _ null
}
/**
* @param address [[akka.actor.Address]] of the node the metrics are gathered at
* @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
* @param systemLoadAverage OS-specific average load on the CPUs in the system, for the past 1 minute,
* The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores.
* @param cpuCombined combined CPU sum of User + Sys + Nice + Wait, in percentage ([0.0 - 1.0]. This
* metric can describe the amount of time the CPU spent executing code during n-interval and how
* much more it could theoretically.
* @param processors the number of available processors
*/
@SerialVersionUID(1L)
final case class Cpu(
address: Address,
timestamp: Long,
systemLoadAverage: Option[Double],
cpuCombined: Option[Double],
processors: Int) {
cpuCombined match {
case Some(x) require(0.0 <= x && x <= 1.0, s"cpuCombined must be between [0.0 - 1.0], was [$x]")
case None
}
}
}
/**
* INTERNAL API
*
* Encapsulates evaluation of validity of metric values, conversion of an actual metric value to
* a [[akka.cluster.Metric]] for consumption by subscribed cluster entities.
*/
private[cluster] trait MetricNumericConverter {
/**
* An defined value is neither negative nor NaN/Infinite:
* <ul><li>JMX system load average and max heap can be 'undefined' for certain OS, in which case a -1 is returned</li>
* <li>SIGAR combined CPU can occasionally return a NaN or Infinite (known bug)</li></ul>
*/
def defined(value: Number): Boolean = convertNumber(value) match {
case Left(a) a >= 0
case Right(b) !(b < 0.0 || b.isNaN || b.isInfinite)
}
/**
* May involve rounding or truncation.
*/
def convertNumber(from: Any): Either[Long, Double] = from match {
case n: Int Left(n)
case n: Long Left(n)
case n: Double Right(n)
case n: Float Right(n)
case n: BigInt Left(n.longValue)
case n: BigDecimal Right(n.doubleValue)
case x throw new IllegalArgumentException(s"Not a number [$x]")
}
}
/**
* Implementations of cluster system metrics extends this trait.
*/
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
trait MetricsCollector extends Closeable {
/**
* Samples and collects new data points.
* This method is invoked periodically and should return
* current metrics for this node.
*/
def sample(): NodeMetrics
}
/**
* Loads JVM and system metrics through JMX monitoring beans.
*
* @param address The [[akka.actor.Address]] of the node being sampled
* @param decay how quickly the exponential weighting of past data is decayed
*/
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
class JmxMetricsCollector(address: Address, decayFactor: Double) extends MetricsCollector {
import StandardMetrics._
private def this(cluster: Cluster) =
this(
cluster.selfAddress,
EWMA.alpha(cluster.settings.MetricsMovingAverageHalfLife, cluster.settings.MetricsInterval))
/**
* This constructor is used when creating an instance from configured FQCN
*/
def this(system: ActorSystem) = this(Cluster(system))
private val decayFactorOption = Some(decayFactor)
private val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean
private val osMBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean
/**
* Samples and collects new data points.
* Creates a new instance each time.
*/
def sample(): NodeMetrics = NodeMetrics(address, newTimestamp, metrics)
def metrics: Set[Metric] = {
val heap = heapMemoryUsage
Set(systemLoadAverage, heapUsed(heap), heapCommitted(heap), heapMax(heap), processors).flatten
}
/**
* JMX Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
* On some systems the JMX OS system load average may not be available, in which case a -1 is
* returned from JMX, and None is returned from this method.
* Creates a new instance each time.
*/
def systemLoadAverage: Option[Metric] = Metric.create(
name = SystemLoadAverage,
value = osMBean.getSystemLoadAverage,
decayFactor = None)
/**
* (JMX) Returns the number of available processors
* Creates a new instance each time.
*/
def processors: Option[Metric] = Metric.create(
name = Processors,
value = osMBean.getAvailableProcessors,
decayFactor = None)
/**
* Current heap to be passed in to heapUsed, heapCommitted and heapMax
*/
def heapMemoryUsage: MemoryUsage = memoryMBean.getHeapMemoryUsage
/**
* (JMX) Returns the current sum of heap memory used from all heap memory pools (in bytes).
* Creates a new instance each time.
*/
def heapUsed(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryUsed,
value = heap.getUsed,
decayFactor = decayFactorOption)
/**
* (JMX) Returns the current sum of heap memory guaranteed to be available to the JVM
* from all heap memory pools (in bytes).
* Creates a new instance each time.
*/
def heapCommitted(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryCommitted,
value = heap.getCommitted,
decayFactor = decayFactorOption)
/**
* (JMX) Returns the maximum amount of memory (in bytes) that can be used
* for JVM memory management. If not defined the metrics value is None, i.e.
* never negative.
* Creates a new instance each time.
*/
def heapMax(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryMax,
value = heap.getMax,
decayFactor = None)
override def close(): Unit = ()
}
/**
* Loads metrics through Hyperic SIGAR and JMX monitoring beans. This
* loads wider and more accurate range of metrics compared to JmxMetricsCollector
* by using SIGAR's native OS library.
*
* The constructor will by design throw exception if org.hyperic.sigar.Sigar can't be loaded, due
* to missing classes or native libraries.
*
* @param address The [[akka.actor.Address]] of the node being sampled
* @param decay how quickly the exponential weighting of past data is decayed
* @param sigar the org.hyperic.Sigar instance
*/
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: AnyRef)
extends JmxMetricsCollector(address, decayFactor) {
import StandardMetrics._
private def this(cluster: Cluster) =
this(
cluster.selfAddress,
EWMA.alpha(cluster.settings.MetricsMovingAverageHalfLife, cluster.settings.MetricsInterval),
cluster.system.dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Nil).get)
/**
* This constructor is used when creating an instance from configured FQCN
*/
def this(system: ActorSystem) = this(Cluster(system))
private val decayFactorOption = Some(decayFactor)
private val EmptyClassArray: Array[(Class[_])] = Array.empty[(Class[_])]
private val LoadAverage: Option[Method] = createMethodFrom(sigar, "getLoadAverage")
private val Cpu: Option[Method] = createMethodFrom(sigar, "getCpuPerc")
private val CombinedCpu: Option[Method] = Try(Cpu.get.getReturnType.getMethod("getCombined")).toOption
// Do something initially, in constructor, to make sure that the native library can be loaded.
// This will by design throw exception if sigar isn't usable
val pid: Long = createMethodFrom(sigar, "getPid") match {
case Some(method)
try method.invoke(sigar).asInstanceOf[Long] catch {
case e: InvocationTargetException if e.getCause.isInstanceOf[LinkageError]
// native libraries not in place
// don't throw fatal LinkageError, but something harmless
throw new IllegalArgumentException(e.getCause.toString)
case e: InvocationTargetException throw e.getCause
}
case None throw new IllegalArgumentException("Wrong version of Sigar, expected 'getPid' method")
}
override def metrics: Set[Metric] = {
super.metrics.filterNot(_.name == SystemLoadAverage) union Set(systemLoadAverage, cpuCombined).flatten
}
/**
* (SIGAR / JMX) Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
* On some systems the JMX OS system load average may not be available, in which case a -1 is returned
* from JMX, which means that None is returned from this method.
* Hyperic SIGAR provides more precise values, thus, if the library is on the classpath, it is the default.
* Creates a new instance each time.
*/
override def systemLoadAverage: Option[Metric] = Metric.create(
name = SystemLoadAverage,
value = Try(LoadAverage.get.invoke(sigar).asInstanceOf[Array[AnyRef]](0).asInstanceOf[Number]),
decayFactor = None) orElse super.systemLoadAverage
/**
* (SIGAR) Returns the combined CPU sum of User + Sys + Nice + Wait, in percentage. This metric can describe
* the amount of time the CPU spent executing code during n-interval and how much more it could
* theoretically. Note that 99% CPU utilization can be optimal or indicative of failure.
*
* In the data stream, this will sometimes return with a valid metric value, and sometimes as a NaN or Infinite.
* Documented bug https://bugzilla.redhat.com/show_bug.cgi?id=749121 and several others.
*
* Creates a new instance each time.
*/
def cpuCombined: Option[Metric] = Metric.create(
name = CpuCombined,
value = Try(CombinedCpu.get.invoke(Cpu.get.invoke(sigar)).asInstanceOf[Number]),
decayFactor = decayFactorOption)
/**
* Releases any native resources associated with this instance.
*/
override def close(): Unit = Try(createMethodFrom(sigar, "close").get.invoke(sigar))
private def createMethodFrom(ref: AnyRef, method: String, types: Array[(Class[_])] = EmptyClassArray): Option[Method] =
Try(ref.getClass.getMethod(method, types: _*)).toOption
}
/**
* INTERNAL API
* Factory to create configured MetricsCollector.
* If instantiation of SigarMetricsCollector fails (missing class or native library)
* it falls back to use JmxMetricsCollector.
*/
private[cluster] object MetricsCollector {
def apply(system: ExtendedActorSystem, settings: ClusterSettings): MetricsCollector = {
import settings.{ MetricsCollectorClass fqcn }
def log = Logging(system, getClass.getName)
if (fqcn == classOf[SigarMetricsCollector].getName) {
Try(new SigarMetricsCollector(system)) match {
case Success(sigarCollector) sigarCollector
case Failure(e)
Cluster(system).InfoLogger.logInfo(
"Metrics will be retreived from MBeans, and may be incorrect on some platforms. " +
"To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate " +
"platform-specific native libary to 'java.library.path'. Reason: " +
e.toString)
new JmxMetricsCollector(system)
}
} else {
system.dynamicAccess.createInstanceFor[MetricsCollector](fqcn, List(classOf[ActorSystem] system)).
recover {
case e throw new ConfigurationException("Could not create custom metrics collector [" + fqcn + "] due to:" + e.toString)
}.get
}
}
}

View file

@ -4,8 +4,6 @@
package akka.cluster package akka.cluster
// TODO remove metrics
import java.io.Closeable import java.io.Closeable
import scala.collection.immutable import scala.collection.immutable
import akka.actor.{ Actor, ActorRef, Address, Props } import akka.actor.{ Actor, ActorRef, Address, Props }
@ -37,12 +35,6 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
@volatile @volatile
private var _latestStats = CurrentInternalStats(GossipStats(), VectorClockStats()) private var _latestStats = CurrentInternalStats(GossipStats(), VectorClockStats())
/**
* Current cluster metrics, updated periodically via event bus.
*/
@volatile
private var _clusterMetrics: Set[NodeMetrics] = Set.empty
val selfAddress = cluster.selfAddress val selfAddress = cluster.selfAddress
// create actor that subscribes to the cluster eventBus to update current read view state // create actor that subscribes to the cluster eventBus to update current read view state
@ -76,9 +68,8 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
_state = _state.copy(leader = leader) _state = _state.copy(leader = leader)
case RoleLeaderChanged(role, leader) case RoleLeaderChanged(role, leader)
_state = _state.copy(roleLeaderMap = _state.roleLeaderMap + (role leader)) _state = _state.copy(roleLeaderMap = _state.roleLeaderMap + (role leader))
case stats: CurrentInternalStats _latestStats = stats case stats: CurrentInternalStats _latestStats = stats
case ClusterMetricsChanged(nodes) _clusterMetrics = nodes case ClusterShuttingDown
case ClusterShuttingDown
} }
case s: CurrentClusterState _state = s case s: CurrentClusterState _state = s
} }
@ -145,11 +136,6 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
def reachability: Reachability = _reachability def reachability: Reachability = _reachability
/**
* Current cluster metrics.
*/
def clusterMetrics: Set[NodeMetrics] = _clusterMetrics
/** /**
* INTERNAL API * INTERNAL API
*/ */

View file

@ -113,15 +113,6 @@ final class ClusterSettings(val config: Config, val systemName: String) {
val ReduceGossipDifferentViewProbability: Int = cc.getInt("reduce-gossip-different-view-probability") val ReduceGossipDifferentViewProbability: Int = cc.getInt("reduce-gossip-different-view-probability")
val SchedulerTickDuration: FiniteDuration = cc.getMillisDuration("scheduler.tick-duration") val SchedulerTickDuration: FiniteDuration = cc.getMillisDuration("scheduler.tick-duration")
val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel") val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel")
val MetricsEnabled: Boolean = cc.getBoolean("metrics.enabled")
val MetricsCollectorClass: String = cc.getString("metrics.collector-class")
val MetricsInterval: FiniteDuration = {
cc.getMillisDuration("metrics.collect-interval")
} requiring (_ > Duration.Zero, "metrics.collect-interval must be > 0")
val MetricsGossipInterval: FiniteDuration = cc.getMillisDuration("metrics.gossip-interval")
val MetricsMovingAverageHalfLife: FiniteDuration = {
cc.getMillisDuration("metrics.moving-average-half-life")
} requiring (_ > Duration.Zero, "metrics.moving-average-half-life must be > 0")
object Debug { object Debug {
val VerboseHeartbeatLogging = cc.getBoolean("debug.verbose-heartbeat-logging") val VerboseHeartbeatLogging = cc.getBoolean("debug.verbose-heartbeat-logging")

View file

@ -60,8 +60,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
classOf[ClusterHeartbeatSender.HeartbeatRsp] (bytes ClusterHeartbeatSender.HeartbeatRsp(uniqueAddressFromBinary(bytes))), classOf[ClusterHeartbeatSender.HeartbeatRsp] (bytes ClusterHeartbeatSender.HeartbeatRsp(uniqueAddressFromBinary(bytes))),
classOf[ExitingConfirmed] (bytes InternalClusterAction.ExitingConfirmed(uniqueAddressFromBinary(bytes))), classOf[ExitingConfirmed] (bytes InternalClusterAction.ExitingConfirmed(uniqueAddressFromBinary(bytes))),
classOf[GossipStatus] gossipStatusFromBinary, classOf[GossipStatus] gossipStatusFromBinary,
classOf[GossipEnvelope] gossipEnvelopeFromBinary, classOf[GossipEnvelope] gossipEnvelopeFromBinary)
classOf[MetricsGossipEnvelope] metricsGossipEnvelopeFromBinary)
def includeManifest: Boolean = true def includeManifest: Boolean = true
@ -70,7 +69,6 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
case ClusterHeartbeatSender.HeartbeatRsp(from) uniqueAddressToProtoByteArray(from) case ClusterHeartbeatSender.HeartbeatRsp(from) uniqueAddressToProtoByteArray(from)
case m: GossipEnvelope gossipEnvelopeToProto(m).toByteArray case m: GossipEnvelope gossipEnvelopeToProto(m).toByteArray
case m: GossipStatus gossipStatusToProto(m).toByteArray case m: GossipStatus gossipStatusToProto(m).toByteArray
case m: MetricsGossipEnvelope compress(metricsGossipEnvelopeToProto(m))
case InternalClusterAction.Join(node, roles) joinToProto(node, roles).toByteArray case InternalClusterAction.Join(node, roles) joinToProto(node, roles).toByteArray
case InternalClusterAction.Welcome(from, gossip) compress(welcomeToProto(from, gossip)) case InternalClusterAction.Welcome(from, gossip) compress(welcomeToProto(from, gossip))
case ClusterUserAction.Leave(address) addressToProtoByteArray(address) case ClusterUserAction.Leave(address) addressToProtoByteArray(address)
@ -333,94 +331,4 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
status.getVersion, status.getVersion,
status.getAllHashesList.asScala.toVector)) status.getAllHashesList.asScala.toVector))
private def metricsGossipEnvelopeToProto(envelope: MetricsGossipEnvelope): cm.MetricsGossipEnvelope = {
import scala.collection.breakOut
val allNodeMetrics = envelope.gossip.nodes
val allAddresses: Vector[Address] = allNodeMetrics.map(_.address)(breakOut)
val addressMapping = allAddresses.zipWithIndex.toMap
val allMetricNames: Vector[String] = allNodeMetrics.foldLeft(Set.empty[String])((s, n) s ++ n.metrics.iterator.map(_.name)).toVector
val metricNamesMapping = allMetricNames.zipWithIndex.toMap
def mapAddress(address: Address) = mapWithErrorMessage(addressMapping, address, "address")
def mapName(name: String) = mapWithErrorMessage(metricNamesMapping, name, "address")
def ewmaToProto(ewma: Option[EWMA]): Option[cm.NodeMetrics.EWMA.Builder] = ewma.map {
x cm.NodeMetrics.EWMA.newBuilder().setValue(x.value).setAlpha(x.alpha)
}
def numberToProto(number: Number): cm.NodeMetrics.Number.Builder = {
import cm.NodeMetrics.Number
import cm.NodeMetrics.NumberType
number match {
case n: jl.Double Number.newBuilder().setType(NumberType.Double).setValue64(jl.Double.doubleToLongBits(n))
case n: jl.Long Number.newBuilder().setType(NumberType.Long).setValue64(n)
case n: jl.Float Number.newBuilder().setType(NumberType.Float).setValue32(jl.Float.floatToIntBits(n))
case n: jl.Integer Number.newBuilder().setType(NumberType.Integer).setValue32(n)
case _
val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos)
out.writeObject(number)
out.close()
Number.newBuilder().setType(NumberType.Serialized).setSerialized(ByteString.copyFrom(bos.toByteArray))
}
}
def metricToProto(metric: Metric): cm.NodeMetrics.Metric.Builder = {
val builder = cm.NodeMetrics.Metric.newBuilder().setNameIndex(mapName(metric.name)).setNumber(numberToProto(metric.value))
ewmaToProto(metric.average).map(builder.setEwma).getOrElse(builder)
}
def nodeMetricsToProto(nodeMetrics: NodeMetrics): cm.NodeMetrics.Builder =
cm.NodeMetrics.newBuilder().setAddressIndex(mapAddress(nodeMetrics.address)).setTimestamp(nodeMetrics.timestamp).
addAllMetrics(nodeMetrics.metrics.map(metricToProto(_).build).asJava)
val nodeMetrics: Iterable[cm.NodeMetrics] = allNodeMetrics.map(nodeMetricsToProto(_).build)
cm.MetricsGossipEnvelope.newBuilder().setFrom(addressToProto(envelope.from)).setGossip(
cm.MetricsGossip.newBuilder().addAllAllAddresses(allAddresses.map(addressToProto(_).build()).asJava).
addAllAllMetricNames(allMetricNames.asJava).addAllNodeMetrics(nodeMetrics.asJava)).
setReply(envelope.reply).build
}
private def metricsGossipEnvelopeFromBinary(bytes: Array[Byte]): MetricsGossipEnvelope =
metricsGossipEnvelopeFromProto(cm.MetricsGossipEnvelope.parseFrom(decompress(bytes)))
private def metricsGossipEnvelopeFromProto(envelope: cm.MetricsGossipEnvelope): MetricsGossipEnvelope = {
import scala.collection.breakOut
val mgossip = envelope.getGossip
val addressMapping: Vector[Address] = mgossip.getAllAddressesList.asScala.map(addressFromProto)(breakOut)
val metricNameMapping: Vector[String] = mgossip.getAllMetricNamesList.asScala.toVector
def ewmaFromProto(ewma: cm.NodeMetrics.EWMA): Option[EWMA] =
Some(EWMA(ewma.getValue, ewma.getAlpha))
def numberFromProto(number: cm.NodeMetrics.Number): Number = {
import cm.NodeMetrics.NumberType
number.getType.getNumber match {
case NumberType.Double_VALUE jl.Double.longBitsToDouble(number.getValue64)
case NumberType.Long_VALUE number.getValue64
case NumberType.Float_VALUE jl.Float.intBitsToFloat(number.getValue32)
case NumberType.Integer_VALUE number.getValue32
case NumberType.Serialized_VALUE
val in = new ClassLoaderObjectInputStream(
system.dynamicAccess.classLoader,
new ByteArrayInputStream(number.getSerialized.toByteArray))
val obj = in.readObject
in.close()
obj.asInstanceOf[jl.Number]
}
}
def metricFromProto(metric: cm.NodeMetrics.Metric): Metric =
Metric(metricNameMapping(metric.getNameIndex), numberFromProto(metric.getNumber),
if (metric.hasEwma) ewmaFromProto(metric.getEwma) else None)
def nodeMetricsFromProto(nodeMetrics: cm.NodeMetrics): NodeMetrics =
NodeMetrics(addressMapping(nodeMetrics.getAddressIndex), nodeMetrics.getTimestamp,
nodeMetrics.getMetricsList.asScala.map(metricFromProto)(breakOut))
val nodeMetrics: Set[NodeMetrics] = mgossip.getNodeMetricsList.asScala.map(nodeMetricsFromProto)(breakOut)
MetricsGossipEnvelope(addressFromProto(envelope.getFrom), MetricsGossip(nodeMetrics), envelope.getReply)
}
} }

View file

@ -1,534 +0,0 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.routing
// TODO remove metrics
import java.util.Arrays
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable
import java.util.concurrent.ThreadLocalRandom
import com.typesafe.config.Config
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.DynamicAccess
import akka.actor.NoSerializationVerificationNeeded
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.ClusterMetricsChanged
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.NodeMetrics
import akka.cluster.StandardMetrics.Cpu
import akka.cluster.StandardMetrics.HeapMemory
import akka.dispatch.Dispatchers
import akka.japi.Util.immutableSeq
import akka.routing._
/**
* Load balancing of messages to cluster nodes based on cluster metric data.
*
* It uses random selection of routees based on probabilities derived from
* the remaining capacity of corresponding node.
*
* @param system the actor system hosting this router
*
* @param metricsSelector decides what probability to use for selecting a routee, based
* on remaining capacity as indicated by the node metrics
*/
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
final case class AdaptiveLoadBalancingRoutingLogic(system: ActorSystem, metricsSelector: MetricsSelector = MixMetricsSelector)
extends RoutingLogic with NoSerializationVerificationNeeded {
private val cluster = Cluster(system)
// The current weighted routees, if any. Weights are produced by the metricsSelector
// via the metricsListener Actor. It's only updated by the actor, but accessed from
// the threads of the sender()s.
private val weightedRouteesRef =
new AtomicReference[(immutable.IndexedSeq[Routee], Set[NodeMetrics], Option[WeightedRoutees])](
(Vector.empty, Set.empty, None))
@tailrec final def metricsChanged(event: ClusterMetricsChanged): Unit = {
val oldValue = weightedRouteesRef.get
val (routees, _, _) = oldValue
val weightedRoutees = Some(new WeightedRoutees(routees, cluster.selfAddress,
metricsSelector.weights(event.nodeMetrics)))
// retry when CAS failure
if (!weightedRouteesRef.compareAndSet(oldValue, (routees, event.nodeMetrics, weightedRoutees)))
metricsChanged(event)
}
override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
if (routees.isEmpty) NoRoutee
else {
def updateWeightedRoutees(): Option[WeightedRoutees] = {
val oldValue = weightedRouteesRef.get
val (oldRoutees, oldMetrics, oldWeightedRoutees) = oldValue
if (routees ne oldRoutees) {
val weightedRoutees = Some(new WeightedRoutees(routees, cluster.selfAddress,
metricsSelector.weights(oldMetrics)))
// ignore, don't update, in case of CAS failure
weightedRouteesRef.compareAndSet(oldValue, (routees, oldMetrics, weightedRoutees))
weightedRoutees
} else oldWeightedRoutees
}
updateWeightedRoutees() match {
case Some(weighted)
if (weighted.isEmpty) NoRoutee
else weighted(ThreadLocalRandom.current.nextInt(weighted.total) + 1)
case None
routees(ThreadLocalRandom.current.nextInt(routees.size))
}
}
}
/**
* A router pool that performs load balancing of messages to cluster nodes based on
* cluster metric data.
*
* It uses random selection of routees based on probabilities derived from
* the remaining capacity of corresponding node.
*
* The configuration parameter trumps the constructor arguments. This means that
* if you provide `nrOfInstances` during instantiation they will be ignored if
* the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* Any routees that are created by a router will be created as the router's children.
* The router is therefore also the children's supervisor.
*
* The supervision strategy of the router actor can be configured with
* [[#withSupervisorStrategy]]. If no strategy is provided, routers default to
* a strategy of always escalate. This means that errors are passed up to the
* router's supervisor for handling.
*
* The router's supervisor will treat the error as an error with the router itself.
* Therefore a directive to stop or restart will cause the router itself to stop or
* restart. The router, in turn, will cause its children to stop and restart.
*
* @param metricsSelector decides what probability to use for selecting a routee, based
* on remaining capacity as indicated by the node metrics
*
* @param nrOfInstances initial number of routees in the pool
*
* @param supervisorStrategy strategy for supervising the routees, see 'Supervision Setup'
*
* @param routerDispatcher dispatcher to use for the router head actor, which handles
* supervision, death watch and router management messages
*/
@SerialVersionUID(1L)
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
final case class AdaptiveLoadBalancingPool(
metricsSelector: MetricsSelector = MixMetricsSelector,
override val nrOfInstances: Int = 0,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
override val usePoolDispatcher: Boolean = false)
extends Pool {
def this(config: Config, dynamicAccess: DynamicAccess) =
this(
nrOfInstances = ClusterRouterSettingsBase.getMaxTotalNrOfInstances(config),
metricsSelector = MetricsSelector.fromConfig(config, dynamicAccess),
usePoolDispatcher = config.hasPath("pool-dispatcher"))
/**
* Java API
* @param metricsSelector decides what probability to use for selecting a routee, based
* on remaining capacity as indicated by the node metrics
* @param nr initial number of routees in the pool
*/
def this(metricsSelector: MetricsSelector, nr: Int) = this(nrOfInstances = nr)
override def resizer: Option[Resizer] = None
override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances
override def createRouter(system: ActorSystem): Router =
new Router(AdaptiveLoadBalancingRoutingLogic(system, metricsSelector))
override def routingLogicController(routingLogic: RoutingLogic): Option[Props] =
Some(Props(
classOf[AdaptiveLoadBalancingMetricsListener],
routingLogic.asInstanceOf[AdaptiveLoadBalancingRoutingLogic]))
/**
* Setting the supervisor strategy to be used for the head Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy): AdaptiveLoadBalancingPool = copy(supervisorStrategy = strategy)
/**
* Setting the dispatcher to be used for the router head actor, which handles
* supervision, death watch and router management messages.
*/
def withDispatcher(dispatcherId: String): AdaptiveLoadBalancingPool = copy(routerDispatcher = dispatcherId)
/**
* Uses the supervisor strategy of the given RouterConfig
* if this RouterConfig doesn't have one
*/
override def withFallback(other: RouterConfig): RouterConfig =
if (this.supervisorStrategy ne Pool.defaultSupervisorStrategy) this
else other match {
case _: FromConfig | _: NoRouter this // NoRouter is the default, hence neutral
case otherRouter: AdaptiveLoadBalancingPool
if (otherRouter.supervisorStrategy eq Pool.defaultSupervisorStrategy) this
else this.withSupervisorStrategy(otherRouter.supervisorStrategy)
case _ throw new IllegalArgumentException("Expected AdaptiveLoadBalancingPool, got [%s]".format(other))
}
}
/**
* A router group that performs load balancing of messages to cluster nodes based on
* cluster metric data.
*
* It uses random selection of routees based on probabilities derived from
* the remaining capacity of corresponding node.
*
* The configuration parameter trumps the constructor arguments. This means that
* if you provide `paths` during instantiation they will be ignored if
* the router is defined in the configuration file for the actor being used.
*
* @param metricsSelector decides what probability to use for selecting a routee, based
* on remaining capacity as indicated by the node metrics
*
* @param paths string representation of the actor paths of the routees, messages are
* sent with [[akka.actor.ActorSelection]] to these paths
*
* @param routerDispatcher dispatcher to use for the router head actor, which handles
* router management messages
*/
@SerialVersionUID(1L)
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
final case class AdaptiveLoadBalancingGroup(
metricsSelector: MetricsSelector = MixMetricsSelector,
override val paths: immutable.Iterable[String] = Nil,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
extends Group {
def this(config: Config, dynamicAccess: DynamicAccess) =
this(
metricsSelector = MetricsSelector.fromConfig(config, dynamicAccess),
paths = immutableSeq(config.getStringList("routees.paths")))
/**
* Java API
* @param metricsSelector decides what probability to use for selecting a routee, based
* on remaining capacity as indicated by the node metrics
* @param routeesPaths string representation of the actor paths of the routees, messages are
* sent with [[akka.actor.ActorSelection]] to these paths
*/
def this(
metricsSelector: MetricsSelector,
routeesPaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeesPaths))
override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths
override def createRouter(system: ActorSystem): Router =
new Router(AdaptiveLoadBalancingRoutingLogic(system, metricsSelector))
override def routingLogicController(routingLogic: RoutingLogic): Option[Props] =
Some(Props(
classOf[AdaptiveLoadBalancingMetricsListener],
routingLogic.asInstanceOf[AdaptiveLoadBalancingRoutingLogic]))
/**
* Setting the dispatcher to be used for the router head actor, which handles
* router management messages
*/
def withDispatcher(dispatcherId: String): AdaptiveLoadBalancingGroup = copy(routerDispatcher = dispatcherId)
}
/**
* MetricsSelector that uses the heap metrics.
* Low heap capacity => small weight.
*/
@SerialVersionUID(1L)
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
case object HeapMetricsSelector extends CapacityMetricsSelector {
/**
* Java API: get the singleton instance
*/
def getInstance = this
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = {
nodeMetrics.collect {
case HeapMemory(address, _, used, committed, max)
val capacity = max match {
case None (committed - used).toDouble / committed
case Some(m) (m - used).toDouble / m
}
(address, capacity)
}.toMap
}
}
/**
* MetricsSelector that uses the combined CPU metrics.
* Combined CPU is sum of User + Sys + Nice + Wait, in percentage.
* Low cpu capacity => small weight.
*/
@SerialVersionUID(1L)
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
case object CpuMetricsSelector extends CapacityMetricsSelector {
/**
* Java API: get the singleton instance
*/
def getInstance = this
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = {
nodeMetrics.collect {
case Cpu(address, _, _, Some(cpuCombined), _)
val capacity = 1.0 - cpuCombined
(address, capacity)
}.toMap
}
}
/**
* MetricsSelector that uses the system load average metrics.
* System load average is OS-specific average load on the CPUs in the system,
* for the past 1 minute. The system is possibly nearing a bottleneck if the
* system load average is nearing number of cpus/cores.
* Low load average capacity => small weight.
*/
@SerialVersionUID(1L)
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
case object SystemLoadAverageMetricsSelector extends CapacityMetricsSelector {
/**
* Java API: get the singleton instance
*/
def getInstance = this
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = {
nodeMetrics.collect {
case Cpu(address, _, Some(systemLoadAverage), _, processors)
val capacity = 1.0 - math.min(1.0, systemLoadAverage / processors)
(address, capacity)
}.toMap
}
}
/**
* Singleton instance of the default MixMetricsSelector, which uses [akka.cluster.routing.HeapMetricsSelector],
* [akka.cluster.routing.CpuMetricsSelector], and [akka.cluster.routing.SystemLoadAverageMetricsSelector]
*/
@SerialVersionUID(1L)
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
object MixMetricsSelector extends MixMetricsSelectorBase(
Vector(HeapMetricsSelector, CpuMetricsSelector, SystemLoadAverageMetricsSelector)) {
/**
* Java API: get the default singleton instance
*/
def getInstance = this
}
/**
* MetricsSelector that combines other selectors and aggregates their capacity
* values. By default it uses [akka.cluster.routing.HeapMetricsSelector],
* [akka.cluster.routing.CpuMetricsSelector], and [akka.cluster.routing.SystemLoadAverageMetricsSelector]
*/
@SerialVersionUID(1L)
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
final case class MixMetricsSelector(
selectors: immutable.IndexedSeq[CapacityMetricsSelector])
extends MixMetricsSelectorBase(selectors)
/**
* Base class for MetricsSelector that combines other selectors and aggregates their capacity.
*/
@SerialVersionUID(1L)
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
abstract class MixMetricsSelectorBase(selectors: immutable.IndexedSeq[CapacityMetricsSelector])
extends CapacityMetricsSelector {
/**
* Java API: construct a mix-selector from a sequence of selectors
*/
def this(selectors: java.lang.Iterable[CapacityMetricsSelector]) = this(immutableSeq(selectors).toVector)
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = {
val combined: immutable.IndexedSeq[(Address, Double)] = selectors.flatMap(_.capacity(nodeMetrics).toSeq)
// aggregated average of the capacities by address
combined.foldLeft(Map.empty[Address, (Double, Int)].withDefaultValue((0.0, 0))) {
case (acc, (address, capacity))
val (sum, count) = acc(address)
acc + (address ((sum + capacity, count + 1)))
}.map {
case (addr, (sum, count)) addr (sum / count)
}
}
}
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
object MetricsSelector {
def fromConfig(config: Config, dynamicAccess: DynamicAccess) =
config.getString("metrics-selector") match {
case "mix" MixMetricsSelector
case "heap" HeapMetricsSelector
case "cpu" CpuMetricsSelector
case "load" SystemLoadAverageMetricsSelector
case fqn
val args = List(classOf[Config] config)
dynamicAccess.createInstanceFor[MetricsSelector](fqn, args).recover({
case exception throw new IllegalArgumentException(
(s"Cannot instantiate metrics-selector [$fqn], " +
"make sure it extends [akka.cluster.routing.MetricsSelector] and " +
"has constructor with [com.typesafe.config.Config] parameter"), exception)
}).get
}
}
/**
* A MetricsSelector is responsible for producing weights from the node metrics.
*/
@SerialVersionUID(1L)
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
trait MetricsSelector extends Serializable {
/**
* The weights per address, based on the nodeMetrics.
*/
def weights(nodeMetrics: Set[NodeMetrics]): Map[Address, Int]
}
/**
* A MetricsSelector producing weights from remaining capacity.
* The weights are typically proportional to the remaining capacity.
*/
@SerialVersionUID(1L)
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
abstract class CapacityMetricsSelector extends MetricsSelector {
/**
* Remaining capacity for each node. The value is between
* 0.0 and 1.0, where 0.0 means no remaining capacity (full
* utilization) and 1.0 means full remaining capacity (zero
* utilization).
*/
def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double]
/**
* Converts the capacity values to weights. The node with lowest
* capacity gets weight 1 (lowest usable capacity is 1%) and other
* nodes gets weights proportional to their capacity compared to
* the node with lowest capacity.
*/
def weights(capacity: Map[Address, Double]): Map[Address, Int] = {
if (capacity.isEmpty) Map.empty[Address, Int]
else {
val (_, min) = capacity.minBy { case (_, c) c }
// lowest usable capacity is 1% (>= 0.5% will be rounded to weight 1), also avoids div by zero
val divisor = math.max(0.01, min)
capacity map { case (addr, c) (addr math.round((c) / divisor).toInt) }
}
}
/**
* The weights per address, based on the capacity produced by
* the nodeMetrics.
*/
override def weights(nodeMetrics: Set[NodeMetrics]): Map[Address, Int] =
weights(capacity(nodeMetrics))
}
/**
* INTERNAL API
*
* Pick routee based on its weight. Higher weight, higher probability.
*/
private[cluster] class WeightedRoutees(routees: immutable.IndexedSeq[Routee], selfAddress: Address, weights: Map[Address, Int]) {
// fill an array of same size as the refs with accumulated weights,
// binarySearch is used to pick the right bucket from a requested value
// from 1 to the total sum of the used weights.
private val buckets: Array[Int] = {
def fullAddress(routee: Routee): Address = {
val a = routee match {
case ActorRefRoutee(ref) ref.path.address
case ActorSelectionRoutee(sel) sel.anchor.path.address
}
a match {
case Address(_, _, None, None) selfAddress
case a a
}
}
val buckets = Array.ofDim[Int](routees.size)
val meanWeight = if (weights.isEmpty) 1 else weights.values.sum / weights.size
val w = weights.withDefaultValue(meanWeight) // we dont necessarily have metrics for all addresses
var i = 0
var sum = 0
routees foreach { r
sum += w(fullAddress(r))
buckets(i) = sum
i += 1
}
buckets
}
def isEmpty: Boolean = buckets.length == 0 || buckets(buckets.length - 1) == 0
def total: Int = {
require(!isEmpty, "WeightedRoutees must not be used when empty")
buckets(buckets.length - 1)
}
/**
* Pick the routee matching a value, from 1 to total.
*/
def apply(value: Int): Routee = {
require(1 <= value && value <= total, "value must be between [1 - %s]" format total)
routees(idx(Arrays.binarySearch(buckets, value)))
}
/**
* Converts the result of Arrays.binarySearch into a index in the buckets array
* see documentation of Arrays.binarySearch for what it returns
*/
private def idx(i: Int): Int = {
if (i >= 0) i // exact match
else {
val j = math.abs(i + 1)
if (j >= buckets.length) throw new IndexOutOfBoundsException(
"Requested index [%s] is > max index [%s]".format(i, buckets.length))
else j
}
}
}
/**
* INTERNAL API
* subscribe to ClusterMetricsChanged and update routing logic
*/
private[akka] class AdaptiveLoadBalancingMetricsListener(routingLogic: AdaptiveLoadBalancingRoutingLogic)
extends Actor {
val cluster = Cluster(context.system)
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterMetricsChanged])
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case event: ClusterMetricsChanged routingLogic.metricsChanged(event)
case _: CurrentClusterState // ignore
}
}

View file

@ -1,37 +0,0 @@
/*
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
// TODO remove metrics
import akka.remote.testkit.{ MultiNodeSpec, MultiNodeConfig }
import com.typesafe.config.ConfigFactory
import akka.testkit.LongRunningTest
import akka.cluster.ClusterEvent._
object ClusterMetricsDisabledMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(ConfigFactory.parseString("akka.cluster.metrics.enabled = off")
.withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
}
class ClusterMetricsDisabledMultiJvmNode1 extends ClusterMetricsDisabledSpec
class ClusterMetricsDisabledMultiJvmNode2 extends ClusterMetricsDisabledSpec
abstract class ClusterMetricsDisabledSpec extends MultiNodeSpec(ClusterMetricsDisabledMultiJvmSpec) with MultiNodeClusterSpec {
"Cluster metrics" must {
"not collect metrics, not publish ClusterMetricsChanged, and not gossip metrics" taggedAs LongRunningTest in {
awaitClusterUp(roles: _*)
clusterView.clusterMetrics.size should ===(0)
cluster.subscribe(testActor, classOf[ClusterMetricsChanged])
expectMsgType[CurrentClusterState]
expectNoMsg
clusterView.clusterMetrics.size should ===(0)
enterBarrier("after")
}
}
}

View file

@ -1,61 +0,0 @@
/*
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
// TODO remove metrics
import scala.language.postfixOps
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.actor.ExtendedActorSystem
object ClusterMetricsMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
}
class ClusterMetricsMultiJvmNode1 extends ClusterMetricsSpec
class ClusterMetricsMultiJvmNode2 extends ClusterMetricsSpec
class ClusterMetricsMultiJvmNode3 extends ClusterMetricsSpec
class ClusterMetricsMultiJvmNode4 extends ClusterMetricsSpec
class ClusterMetricsMultiJvmNode5 extends ClusterMetricsSpec
abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSpec) with MultiNodeClusterSpec {
import ClusterMetricsMultiJvmSpec._
private[cluster] def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]
"Cluster metrics" must {
"periodically collect metrics on each node, publish ClusterMetricsChanged to the event stream, " +
"and gossip metrics around the node ring" taggedAs LongRunningTest in within(60 seconds) {
awaitClusterUp(roles: _*)
enterBarrier("cluster-started")
awaitAssert(clusterView.members.count(_.status == MemberStatus.Up) should ===(roles.size))
awaitAssert(clusterView.clusterMetrics.size should ===(roles.size))
val collector = MetricsCollector(cluster.system, cluster.settings)
collector.sample.metrics.size should be > (3)
enterBarrier("after")
}
"reflect the correct number of node metrics in cluster view" taggedAs LongRunningTest in within(30 seconds) {
runOn(second) {
cluster.leave(first)
}
enterBarrier("first-left")
runOn(second, third, fourth, fifth) {
markNodeAsUnavailable(first)
awaitAssert(clusterView.clusterMetrics.size should ===(roles.size - 1))
}
enterBarrier("finished")
}
}
}

View file

@ -3,8 +3,6 @@
*/ */
package akka.cluster package akka.cluster
// TODO remove metrics
import java.util.UUID import java.util.UUID
import language.implicitConversions import language.implicitConversions
@ -115,8 +113,6 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
def muteLog(sys: ActorSystem = system): Unit = { def muteLog(sys: ActorSystem = system): Unit = {
if (!sys.log.isDebugEnabled) { if (!sys.log.isDebugEnabled) {
Seq( Seq(
".*Metrics collection has started successfully.*",
".*Metrics will be retreived from MBeans.*",
".*Cluster Node.* - registered cluster JMX MBean.*", ".*Cluster Node.* - registered cluster JMX MBean.*",
".*Cluster Node.* - is starting up.*", ".*Cluster Node.* - is starting up.*",
".*Shutting down cluster Node.*", ".*Shutting down cluster Node.*",
@ -130,8 +126,6 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
classOf[ClusterHeartbeatSender.HeartbeatRsp], classOf[ClusterHeartbeatSender.HeartbeatRsp],
classOf[GossipEnvelope], classOf[GossipEnvelope],
classOf[GossipStatus], classOf[GossipStatus],
classOf[MetricsGossipEnvelope],
classOf[ClusterEvent.ClusterMetricsChanged],
classOf[InternalClusterAction.Tick], classOf[InternalClusterAction.Tick],
classOf[akka.actor.PoisonPill], classOf[akka.actor.PoisonPill],
classOf[akka.dispatch.sysmsg.DeathWatchNotification], classOf[akka.dispatch.sysmsg.DeathWatchNotification],

View file

@ -3,9 +3,6 @@
*/ */
package akka.cluster package akka.cluster
// TODO remove metrics
// FIXME this test is not migrated to metrics extension
import language.postfixOps import language.postfixOps
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable import scala.collection.immutable
@ -26,12 +23,9 @@ import akka.actor.Props
import akka.actor.RootActorPath import akka.actor.RootActorPath
import akka.actor.SupervisorStrategy._ import akka.actor.SupervisorStrategy._
import akka.actor.Terminated import akka.actor.Terminated
import akka.cluster.ClusterEvent.ClusterMetricsChanged
import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.CurrentInternalStats import akka.cluster.ClusterEvent.CurrentInternalStats
import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.StandardMetrics.Cpu
import akka.cluster.StandardMetrics.HeapMemory
import akka.remote.DefaultFailureDetectorRegistry import akka.remote.DefaultFailureDetectorRegistry
import akka.remote.PhiAccrualFailureDetector import akka.remote.PhiAccrualFailureDetector
import akka.remote.RemoteScope import akka.remote.RemoteScope
@ -116,7 +110,6 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
# (width * math.pow(width, levels) - 1) / (width - 1) # (width * math.pow(width, levels) - 1) / (width - 1)
tree-width = 4 tree-width = 4
tree-levels = 4 tree-levels = 4
report-metrics-interval = 10s
# scale convergence within timeouts with this factor # scale convergence within timeouts with this factor
convergence-within-factor = 1.0 convergence-within-factor = 1.0
# set to off to only test cluster membership # set to off to only test cluster membership
@ -211,7 +204,6 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
val expectedTestDuration = testConfig.getMillisDuration("expected-test-duration") * dFactor val expectedTestDuration = testConfig.getMillisDuration("expected-test-duration") * dFactor
val treeWidth = getInt("tree-width") val treeWidth = getInt("tree-width")
val treeLevels = getInt("tree-levels") val treeLevels = getInt("tree-levels")
val reportMetricsInterval = testConfig.getMillisDuration("report-metrics-interval")
val convergenceWithinFactor = getDouble("convergence-within-factor") val convergenceWithinFactor = getDouble("convergence-within-factor")
val exerciseActors = getBoolean("exercise-actors") val exerciseActors = getBoolean("exercise-actors")
@ -251,12 +243,10 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
* itself when expected results has been collected. * itself when expected results has been collected.
*/ */
class ClusterResultAggregator(title: String, expectedResults: Int, settings: Settings) extends Actor with ActorLogging { class ClusterResultAggregator(title: String, expectedResults: Int, settings: Settings) extends Actor with ActorLogging {
import settings.reportMetricsInterval
import settings.infolog import settings.infolog
private val cluster = Cluster(context.system) private val cluster = Cluster(context.system)
private var reportTo: Option[ActorRef] = None private var reportTo: Option[ActorRef] = None
private var results = Vector.empty[ClusterResult] private var results = Vector.empty[ClusterResult]
private var nodeMetrics = Set.empty[NodeMetrics]
private var phiValuesObservedByNode = { private var phiValuesObservedByNode = {
import akka.cluster.Member.addressOrdering import akka.cluster.Member.addressOrdering
immutable.SortedMap.empty[Address, immutable.SortedSet[PhiValue]] immutable.SortedMap.empty[Address, immutable.SortedSet[PhiValue]]
@ -267,30 +257,19 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
} }
import context.dispatcher import context.dispatcher
private val reportMetricsTask = context.system.scheduler.schedule(
reportMetricsInterval, reportMetricsInterval, self, ReportTick)
// subscribe to ClusterMetricsChanged, re-subscribe when restart
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterMetricsChanged])
override def postStop(): Unit = {
cluster.unsubscribe(self)
reportMetricsTask.cancel()
super.postStop()
}
def receive = { def receive = {
case ClusterMetricsChanged(clusterMetrics) nodeMetrics = clusterMetrics case PhiResult(from, phiValues) phiValuesObservedByNode += from phiValues
case PhiResult(from, phiValues) phiValuesObservedByNode += from phiValues case StatsResult(from, stats) clusterStatsObservedByNode += from stats
case StatsResult(from, stats) clusterStatsObservedByNode += from stats
case ReportTick case ReportTick
if (infolog) if (infolog)
log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") log.info(s"[${title}] in progress\n\n${formatPhi}\n\n${formatStats}")
case r: ClusterResult case r: ClusterResult
results :+= r results :+= r
if (results.size == expectedResults) { if (results.size == expectedResults) {
val aggregated = AggregatedClusterResult(title, maxDuration, totalGossipStats) val aggregated = AggregatedClusterResult(title, maxDuration, totalGossipStats)
if (infolog) if (infolog)
log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n\n${formatPhi}\n\n${formatStats}")
reportTo foreach { _ ! aggregated } reportTo foreach { _ ! aggregated }
context stop self context stop self
} }
@ -302,27 +281,6 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
def totalGossipStats = results.foldLeft(GossipStats()) { _ :+ _.clusterStats } def totalGossipStats = results.foldLeft(GossipStats()) { _ :+ _.clusterStats }
def formatMetrics: String = {
import akka.cluster.Member.addressOrdering
(formatMetricsHeader +: (nodeMetrics.toSeq.sortBy(_.address) map formatMetricsLine)).mkString("\n")
}
def formatMetricsHeader: String = "[Node]\t[Heap (MB)]\t[CPU (%)]\t[Load]"
def formatMetricsLine(nodeMetrics: NodeMetrics): String = {
val heap = nodeMetrics match {
case HeapMemory(address, timestamp, used, committed, max)
(used.doubleValue / 1024 / 1024).form
case _ ""
}
val cpuAndLoad = nodeMetrics match {
case Cpu(address, timestamp, loadOption, cpuOption, processors)
format(cpuOption) + "\t" + format(loadOption)
case _ "N/A\tN/A"
}
s"${nodeMetrics.address}\t${heap}\t${cpuAndLoad}"
}
def format(opt: Option[Double]) = opt match { def format(opt: Option[Double]) = opt match {
case None "N/A" case None "N/A"
case Some(x) x.form case Some(x) x.form

View file

@ -1,220 +0,0 @@
/*
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.routing
// TODO remove metrics
import language.postfixOps
import java.lang.management.ManagementFactory
import scala.concurrent.Await
import scala.concurrent.duration._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.MultiNodeClusterSpec
import akka.cluster.NodeMetrics
import akka.pattern.ask
import akka.remote.testkit.{ MultiNodeSpec, MultiNodeConfig }
import akka.routing.GetRoutees
import akka.routing.FromConfig
import akka.testkit.{ LongRunningTest, DefaultTimeout, ImplicitSender }
import akka.routing.ActorRefRoutee
import akka.routing.Routees
object AdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {
class Echo extends Actor {
def receive = {
case _ sender() ! Reply(Cluster(context.system).selfAddress)
}
}
class Memory extends Actor with ActorLogging {
var usedMemory: Array[Array[Int]] = _
def receive = {
case AllocateMemory
val heap = ManagementFactory.getMemoryMXBean.getHeapMemoryUsage
// getMax can be undefined (-1)
val max = math.max(heap.getMax, heap.getCommitted)
val used = heap.getUsed
log.info("used heap before: [{}] bytes, of max [{}]", used, heap.getMax)
// allocate 70% of free space
val allocateBytes = (0.7 * (max - used)).toInt
val numberOfArrays = allocateBytes / 1024
usedMemory = Array.ofDim(numberOfArrays, 248) // each 248 element Int array will use ~ 1 kB
log.info("used heap after: [{}] bytes", ManagementFactory.getMemoryMXBean.getHeapMemoryUsage.getUsed)
sender() ! "done"
}
}
case object AllocateMemory
final case class Reply(address: Address)
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
akka.cluster.failure-detector.acceptable-heartbeat-pause = 10s
akka.cluster.metrics.collect-interval = 1s
akka.cluster.metrics.gossip-interval = 1s
akka.cluster.metrics.moving-average-half-life = 2s
akka.actor.deployment {
/router3 = {
router = adaptive-pool
metrics-selector = cpu
nr-of-instances = 9
}
/router4 = {
router = adaptive-pool
metrics-selector = "akka.cluster.routing.TestCustomMetricsSelector"
cluster {
enabled = on
max-nr-of-instances-per-node = 2
max-total-nr-of-instances = 10
}
}
}
""")).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class TestCustomMetricsSelector(config: Config) extends MetricsSelector {
override def weights(nodeMetrics: Set[NodeMetrics]): Map[Address, Int] = Map.empty
}
class AdaptiveLoadBalancingRouterMultiJvmNode1 extends AdaptiveLoadBalancingRouterSpec
class AdaptiveLoadBalancingRouterMultiJvmNode2 extends AdaptiveLoadBalancingRouterSpec
class AdaptiveLoadBalancingRouterMultiJvmNode3 extends AdaptiveLoadBalancingRouterSpec
abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoadBalancingRouterMultiJvmSpec)
with MultiNodeClusterSpec
with ImplicitSender with DefaultTimeout {
import AdaptiveLoadBalancingRouterMultiJvmSpec._
def currentRoutees(router: ActorRef) =
Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees
def receiveReplies(expectedReplies: Int): Map[Address, Int] = {
val zero = Map.empty[Address, Int] ++ roles.map(address(_) 0)
(receiveWhile(5 seconds, messages = expectedReplies) {
case Reply(address) address
}).foldLeft(zero) {
case (replyMap, address) replyMap + (address (replyMap(address) + 1))
}
}
/**
* Fills in self address for local ActorRef
*/
def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
case Address(_, _, None, None) cluster.selfAddress
case a a
}
def startRouter(name: String): ActorRef = {
val router = system.actorOf(
ClusterRouterPool(
local = AdaptiveLoadBalancingPool(HeapMetricsSelector),
settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 1, allowLocalRoutees = true, useRole = None)).
props(Props[Echo]),
name)
// it may take some time until router receives cluster member events
awaitAssert { currentRoutees(router).size should ===(roles.size) }
val routees = currentRoutees(router)
routees.map { case ActorRefRoutee(ref) fullAddress(ref) }.toSet should ===(roles.map(address).toSet)
router
}
"A cluster with a AdaptiveLoadBalancingRouter" must {
"start cluster nodes" taggedAs LongRunningTest in {
awaitClusterUp(roles: _*)
enterBarrier("after-1")
}
"use all nodes in the cluster when not overloaded" taggedAs LongRunningTest in {
runOn(first) {
val router1 = startRouter("router1")
// collect some metrics before we start
Thread.sleep(cluster.settings.MetricsInterval.toMillis * 10)
val iterationCount = 100
1 to iterationCount foreach { _
router1 ! "hit"
// wait a while between each message, since metrics is collected periodically
Thread.sleep(10)
}
val replies = receiveReplies(iterationCount)
replies(first) should be > (0)
replies(second) should be > (0)
replies(third) should be > (0)
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-2")
}
"prefer node with more free heap capacity" taggedAs LongRunningTest in {
System.gc()
enterBarrier("gc")
runOn(second) {
within(20.seconds) {
system.actorOf(Props[Memory], "memory") ! AllocateMemory
expectMsg("done")
}
}
enterBarrier("heap-allocated")
runOn(first) {
val router2 = startRouter("router2")
// collect some metrics before we start
Thread.sleep(cluster.settings.MetricsInterval.toMillis * 10)
val iterationCount = 3000
1 to iterationCount foreach { _
router2 ! "hit"
}
val replies = receiveReplies(iterationCount)
replies(third) should be > (replies(second))
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-3")
}
"create routees from configuration" taggedAs LongRunningTest in {
runOn(first) {
val router3 = system.actorOf(FromConfig.props(Props[Memory]), "router3")
// it may take some time until router receives cluster member events
awaitAssert { currentRoutees(router3).size should ===(9) }
val routees = currentRoutees(router3)
routees.map { case ActorRefRoutee(ref) fullAddress(ref) }.toSet should ===(Set(address(first)))
}
enterBarrier("after-4")
}
"create routees from cluster.enabled configuration" taggedAs LongRunningTest in {
runOn(first) {
val router4 = system.actorOf(FromConfig.props(Props[Memory]), "router4")
// it may take some time until router receives cluster member events
awaitAssert { currentRoutees(router4).size should ===(6) }
val routees = currentRoutees(router4)
routees.map { case ActorRefRoutee(ref) fullAddress(ref) }.toSet should ===(Set(
address(first), address(second), address(third)))
}
enterBarrier("after-5")
}
}
}

View file

@ -48,12 +48,6 @@ class ClusterConfigSpec extends AkkaSpec {
ReduceGossipDifferentViewProbability should ===(400) ReduceGossipDifferentViewProbability should ===(400)
SchedulerTickDuration should ===(33 millis) SchedulerTickDuration should ===(33 millis)
SchedulerTicksPerWheel should ===(512) SchedulerTicksPerWheel should ===(512)
// TODO remove metrics
MetricsEnabled should ===(true)
MetricsCollectorClass should ===(classOf[SigarMetricsCollector].getName)
MetricsInterval should ===(3 seconds)
MetricsGossipInterval should ===(3 seconds)
MetricsMovingAverageHalfLife should ===(12 seconds)
} }
} }
} }

View file

@ -79,12 +79,6 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) {
"mymailbox"))) "mymailbox")))
} }
"have correct router mappings" in {
val mapping = system.asInstanceOf[ActorSystemImpl].provider.deployer.routerTypeMapping
mapping("adaptive-pool") should ===(classOf[akka.cluster.routing.AdaptiveLoadBalancingPool].getName)
mapping("adaptive-group") should ===(classOf[akka.cluster.routing.AdaptiveLoadBalancingGroup].getName)
}
} }
} }

View file

@ -1,100 +0,0 @@
/*
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
// TODO remove metrics
import scala.concurrent.duration._
import akka.testkit.{ LongRunningTest, AkkaSpec }
import java.util.concurrent.ThreadLocalRandom
class EWMASpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsCollectorFactory {
val collector = createMetricsCollector
"DataStream" must {
"calcualate same ewma for constant values" in {
val ds = EWMA(value = 100.0, alpha = 0.18) :+
100.0 :+ 100.0 :+ 100.0
ds.value should ===(100.0 +- 0.001)
}
"calcualate correct ewma for normal decay" in {
val d0 = EWMA(value = 1000.0, alpha = 2.0 / (1 + 10))
d0.value should ===(1000.0 +- 0.01)
val d1 = d0 :+ 10.0
d1.value should ===(820.0 +- 0.01)
val d2 = d1 :+ 10.0
d2.value should ===(672.73 +- 0.01)
val d3 = d2 :+ 10.0
d3.value should ===(552.23 +- 0.01)
val d4 = d3 :+ 10.0
d4.value should ===(453.64 +- 0.01)
val dn = (1 to 100).foldLeft(d0)((d, _) d :+ 10.0)
dn.value should ===(10.0 +- 0.1)
}
"calculate ewma for alpha 1.0, max bias towards latest value" in {
val d0 = EWMA(value = 100.0, alpha = 1.0)
d0.value should ===(100.0 +- 0.01)
val d1 = d0 :+ 1.0
d1.value should ===(1.0 +- 0.01)
val d2 = d1 :+ 57.0
d2.value should ===(57.0 +- 0.01)
val d3 = d2 :+ 10.0
d3.value should ===(10.0 +- 0.01)
}
"calculate alpha from half-life and collect interval" in {
// according to http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
val expectedAlpha = 0.1
// alpha = 2.0 / (1 + N)
val n = 19
val halfLife = n.toDouble / 2.8854
val collectInterval = 1.second
val halfLifeDuration = (halfLife * 1000).millis
EWMA.alpha(halfLifeDuration, collectInterval) should ===(expectedAlpha +- 0.001)
}
"calculate sane alpha from short half-life" in {
val alpha = EWMA.alpha(1.millis, 3.seconds)
alpha should be <= (1.0)
alpha should be >= (0.0)
alpha should ===(1.0 +- 0.001)
}
"calculate sane alpha from long half-life" in {
val alpha = EWMA.alpha(1.day, 3.seconds)
alpha should be <= (1.0)
alpha should be >= (0.0)
alpha should ===(0.0 +- 0.001)
}
"calculate the ewma for multiple, variable, data streams" taggedAs LongRunningTest in {
var streamingDataSet = Map.empty[String, Metric]
var usedMemory = Array.empty[Byte]
(1 to 50) foreach { _
// wait a while between each message to give the metrics a chance to change
Thread.sleep(100)
usedMemory = usedMemory ++ Array.fill(1024)(ThreadLocalRandom.current.nextInt(127).toByte)
val changes = collector.sample.metrics.flatMap { latest
streamingDataSet.get(latest.name) match {
case None Some(latest)
case Some(previous)
if (latest.isSmooth && latest.value != previous.value) {
val updated = previous :+ latest
updated.isSmooth should ===(true)
updated.smoothValue should not be (previous.smoothValue)
Some(updated)
} else None
}
}
streamingDataSet ++= changes.map(m m.name m)
}
}
}
}

View file

@ -1,50 +0,0 @@
/*
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
// TODO remove metrics
import org.scalatest.WordSpec
import org.scalatest.Matchers
import akka.cluster.StandardMetrics._
import scala.util.Failure
class MetricNumericConverterSpec extends WordSpec with Matchers with MetricNumericConverter {
"MetricNumericConverter" must {
"convert" in {
convertNumber(0).isLeft should ===(true)
convertNumber(1).left.get should ===(1)
convertNumber(1L).isLeft should ===(true)
convertNumber(0.0).isRight should ===(true)
}
"define a new metric" in {
val Some(metric) = Metric.create(HeapMemoryUsed, 256L, decayFactor = Some(0.18))
metric.name should ===(HeapMemoryUsed)
metric.value should ===(256L)
metric.isSmooth should ===(true)
metric.smoothValue should ===(256.0 +- 0.0001)
}
"define an undefined value with a None " in {
Metric.create("x", -1, None).isDefined should ===(false)
Metric.create("x", java.lang.Double.NaN, None).isDefined should ===(false)
Metric.create("x", Failure(new RuntimeException), None).isDefined should ===(false)
}
"recognize whether a metric value is defined" in {
defined(0) should ===(true)
defined(0.0) should ===(true)
}
"recognize whether a metric value is not defined" in {
defined(-1) should ===(false)
defined(-1.0) should ===(false)
defined(Double.NaN) should ===(false)
}
}
}

View file

@ -1,66 +0,0 @@
/*
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
// TODO remove metrics
import akka.actor.Address
import akka.testkit.AkkaSpec
import akka.cluster.StandardMetrics._
class MetricValuesSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricsCollectorFactory {
val collector = createMetricsCollector
val node1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), 1, collector.sample.metrics)
val node2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), 1, collector.sample.metrics)
val nodes: Seq[NodeMetrics] = {
(1 to 100).foldLeft(List(node1, node2)) { (nodes, _)
nodes map { n
n.copy(metrics = collector.sample.metrics.flatMap(latest n.metrics.collect {
case streaming if latest sameAs streaming streaming :+ latest
}))
}
}
}
"NodeMetrics.MetricValues" must {
"extract expected metrics for load balancing" in {
val stream1 = node2.metric(HeapMemoryCommitted).get.value.longValue
val stream2 = node1.metric(HeapMemoryUsed).get.value.longValue
stream1 should be >= (stream2)
}
"extract expected MetricValue types for load balancing" in {
nodes foreach { node
node match {
case HeapMemory(address, _, used, committed, _)
used should be > (0L)
committed should be >= (used)
// Documentation java.lang.management.MemoryUsage says that committed <= max,
// but in practice that is not always true (we have seen it happen). Therefore
// we don't check the heap max value in this test.
// extract is the java api
StandardMetrics.extractHeapMemory(node) should not be (null)
}
node match {
case Cpu(address, _, systemLoadAverageOption, cpuCombinedOption, processors)
processors should be > (0)
if (systemLoadAverageOption.isDefined)
systemLoadAverageOption.get should be >= (0.0)
if (cpuCombinedOption.isDefined) {
cpuCombinedOption.get should be <= (1.0)
cpuCombinedOption.get should be >= (0.0)
}
// extract is the java api
StandardMetrics.extractCpu(node) should not be (null)
}
}
}
}
}

View file

@ -1,128 +0,0 @@
/*
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
// TODO remove metrics
import scala.language.postfixOps
import scala.concurrent.duration._
import scala.util.{ Try }
import akka.actor._
import akka.testkit._
import akka.cluster.StandardMetrics._
object MetricsEnabledSpec {
val config = """
akka.cluster.metrics.enabled = on
akka.cluster.metrics.collect-interval = 1 s
akka.cluster.metrics.gossip-interval = 1 s
akka.actor.provider = remote
"""
}
class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with MetricsCollectorFactory {
val collector = createMetricsCollector
"Metric must" must {
"merge 2 metrics that are tracking the same metric" in {
for (i 1 to 20) {
val sample1 = collector.sample.metrics
val sample2 = collector.sample.metrics
val merged12 = sample2 flatMap (latest sample1 collect {
case peer if latest sameAs peer
val m = peer :+ latest
m.value should ===(latest.value)
m.isSmooth should ===(peer.isSmooth || latest.isSmooth)
m
})
val sample3 = collector.sample.metrics
val sample4 = collector.sample.metrics
val merged34 = sample4 flatMap (latest sample3 collect {
case peer if latest sameAs peer
val m = peer :+ latest
m.value should ===(latest.value)
m.isSmooth should ===(peer.isSmooth || latest.isSmooth)
m
})
}
}
}
"MetricsCollector" must {
"not raise errors when attempting reflective code in apply" in {
Try(createMetricsCollector).get should not be null
}
"collect accurate metrics for a node" in {
val sample = collector.sample
val metrics = sample.metrics.collect { case m (m.name, m.value) }
val used = metrics collectFirst { case (HeapMemoryUsed, b) b }
val committed = metrics collectFirst { case (HeapMemoryCommitted, b) b }
metrics foreach {
case (SystemLoadAverage, b) b.doubleValue should be >= (0.0)
case (Processors, b) b.intValue should be >= (0)
case (HeapMemoryUsed, b) b.longValue should be >= (0L)
case (HeapMemoryCommitted, b) b.longValue should be > (0L)
case (HeapMemoryMax, b)
b.longValue should be > (0L)
used.get.longValue should be <= (b.longValue)
committed.get.longValue should be <= (b.longValue)
case (CpuCombined, b)
b.doubleValue should be <= (1.0)
b.doubleValue should be >= (0.0)
}
}
"collect JMX metrics" in {
// heap max may be undefined depending on the OS
// systemLoadAverage is JMX when SIGAR not present, but
// it's not present on all platforms
val c = collector.asInstanceOf[JmxMetricsCollector]
val heap = c.heapMemoryUsage
c.heapUsed(heap).isDefined should ===(true)
c.heapCommitted(heap).isDefined should ===(true)
c.processors.isDefined should ===(true)
}
"collect 50 node metrics samples in an acceptable duration" taggedAs LongRunningTest in within(10 seconds) {
(1 to 50) foreach { _
val sample = collector.sample
sample.metrics.size should be >= (3)
Thread.sleep(100)
}
}
}
}
/**
* Used when testing metrics without full cluster
*/
trait MetricsCollectorFactory { this: AkkaSpec
private def extendedActorSystem = system.asInstanceOf[ExtendedActorSystem]
def selfAddress = extendedActorSystem.provider.rootPath.address
val defaultDecayFactor = 2.0 / (1 + 10)
def createMetricsCollector: MetricsCollector =
Try(new SigarMetricsCollector(selfAddress, defaultDecayFactor,
extendedActorSystem.dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Nil))).
recover {
case e
log.debug("Metrics will be retreived from MBeans, Sigar failed to load. Reason: " + e)
new JmxMetricsCollector(selfAddress, defaultDecayFactor)
}.get
private[cluster] def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]
}

View file

@ -1,115 +0,0 @@
/*
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
// TODO remove metrics
import akka.testkit.{ ImplicitSender, AkkaSpec }
import akka.actor.Address
import java.lang.System.{ currentTimeMillis newTimestamp }
class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with MetricsCollectorFactory {
val collector = createMetricsCollector
/**
* sometimes Sigar will not be able to return a valid value (NaN and such) so must ensure they
* have the same Metric types
*/
def newSample(previousSample: Set[Metric]): Set[Metric] = {
// Metric.equals is based on name equality
collector.sample.metrics.filter(previousSample.contains) ++ previousSample
}
"A MetricsGossip" must {
"add new NodeMetrics" in {
val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
m1.metrics.size should be > (3)
m2.metrics.size should be > (3)
val g1 = MetricsGossip.empty :+ m1
g1.nodes.size should ===(1)
g1.nodeMetricsFor(m1.address).map(_.metrics) should ===(Some(m1.metrics))
val g2 = g1 :+ m2
g2.nodes.size should ===(2)
g2.nodeMetricsFor(m1.address).map(_.metrics) should ===(Some(m1.metrics))
g2.nodeMetricsFor(m2.address).map(_.metrics) should ===(Some(m2.metrics))
}
"merge peer metrics" in {
val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
val g1 = MetricsGossip.empty :+ m1 :+ m2
g1.nodes.size should ===(2)
val beforeMergeNodes = g1.nodes
val m2Updated = m2 copy (metrics = newSample(m2.metrics), timestamp = m2.timestamp + 1000)
val g2 = g1 :+ m2Updated // merge peers
g2.nodes.size should ===(2)
g2.nodeMetricsFor(m1.address).map(_.metrics) should ===(Some(m1.metrics))
g2.nodeMetricsFor(m2.address).map(_.metrics) should ===(Some(m2Updated.metrics))
g2.nodes collect { case peer if peer.address == m2.address peer.timestamp should ===(m2Updated.timestamp) }
}
"merge an existing metric set for a node and update node ring" in {
val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
val m3 = NodeMetrics(Address("akka.tcp", "sys", "a", 2556), newTimestamp, collector.sample.metrics)
val m2Updated = m2 copy (metrics = newSample(m2.metrics), timestamp = m2.timestamp + 1000)
val g1 = MetricsGossip.empty :+ m1 :+ m2
val g2 = MetricsGossip.empty :+ m3 :+ m2Updated
g1.nodes.map(_.address) should ===(Set(m1.address, m2.address))
// should contain nodes 1,3, and the most recent version of 2
val mergedGossip = g1 merge g2
mergedGossip.nodes.map(_.address) should ===(Set(m1.address, m2.address, m3.address))
mergedGossip.nodeMetricsFor(m1.address).map(_.metrics) should ===(Some(m1.metrics))
mergedGossip.nodeMetricsFor(m2.address).map(_.metrics) should ===(Some(m2Updated.metrics))
mergedGossip.nodeMetricsFor(m3.address).map(_.metrics) should ===(Some(m3.metrics))
mergedGossip.nodes.foreach(_.metrics.size should be > (3))
mergedGossip.nodeMetricsFor(m2.address).map(_.timestamp) should ===(Some(m2Updated.timestamp))
}
"get the current NodeMetrics if it exists in the local nodes" in {
val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val g1 = MetricsGossip.empty :+ m1
g1.nodeMetricsFor(m1.address).map(_.metrics) should ===(Some(m1.metrics))
}
"remove a node if it is no longer Up" in {
val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
val g1 = MetricsGossip.empty :+ m1 :+ m2
g1.nodes.size should ===(2)
val g2 = g1 remove m1.address
g2.nodes.size should ===(1)
g2.nodes.exists(_.address == m1.address) should ===(false)
g2.nodeMetricsFor(m1.address) should ===(None)
g2.nodeMetricsFor(m2.address).map(_.metrics) should ===(Some(m2.metrics))
}
"filter nodes" in {
val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
val g1 = MetricsGossip.empty :+ m1 :+ m2
g1.nodes.size should ===(2)
val g2 = g1 filter Set(m2.address)
g2.nodes.size should ===(1)
g2.nodes.exists(_.address == m1.address) should ===(false)
g2.nodeMetricsFor(m1.address) should ===(None)
g2.nodeMetricsFor(m2.address).map(_.metrics) should ===(Some(m2.metrics))
}
}
}

View file

@ -1,49 +0,0 @@
/*
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
// TODO remove metrics
import org.scalatest.WordSpec
import org.scalatest.Matchers
import akka.actor.Address
class NodeMetricsSpec extends WordSpec with Matchers {
val node1 = Address("akka.tcp", "sys", "a", 2554)
val node2 = Address("akka.tcp", "sys", "a", 2555)
"NodeMetrics must" must {
"return correct result for 2 'same' nodes" in {
(NodeMetrics(node1, 0) sameAs NodeMetrics(node1, 0)) should ===(true)
}
"return correct result for 2 not 'same' nodes" in {
(NodeMetrics(node1, 0) sameAs NodeMetrics(node2, 0)) should ===(false)
}
"merge 2 NodeMetrics by most recent" in {
val sample1 = NodeMetrics(node1, 1, Set(Metric.create("a", 10, None), Metric.create("b", 20, None)).flatten)
val sample2 = NodeMetrics(node1, 2, Set(Metric.create("a", 11, None), Metric.create("c", 30, None)).flatten)
val merged = sample1 merge sample2
merged.timestamp should ===(sample2.timestamp)
merged.metric("a").map(_.value) should ===(Some(11))
merged.metric("b").map(_.value) should ===(Some(20))
merged.metric("c").map(_.value) should ===(Some(30))
}
"not merge 2 NodeMetrics if master is more recent" in {
val sample1 = NodeMetrics(node1, 1, Set(Metric.create("a", 10, None), Metric.create("b", 20, None)).flatten)
val sample2 = NodeMetrics(node1, 0, Set(Metric.create("a", 11, None), Metric.create("c", 30, None)).flatten)
val merged = sample1 merge sample2 // older and not same
merged.timestamp should ===(sample1.timestamp)
merged.metrics should ===(sample1.metrics)
}
}
}

View file

@ -3,8 +3,6 @@
*/ */
package akka.cluster.protobuf package akka.cluster.protobuf
// TODO remove metrics
import akka.cluster._ import akka.cluster._
import akka.actor.{ ExtendedActorSystem, Address } import akka.actor.{ ExtendedActorSystem, Address }
import collection.immutable.SortedSet import collection.immutable.SortedSet
@ -73,17 +71,6 @@ class ClusterMessageSerializerSpec extends AkkaSpec(
checkSerialization(GossipStatus(a1.uniqueAddress, g3.version)) checkSerialization(GossipStatus(a1.uniqueAddress, g3.version))
checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2)) checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2))
val mg = MetricsGossip(Set(
NodeMetrics(a1.address, 4711, Set(Metric("foo", 1.2, None))),
NodeMetrics(b1.address, 4712, Set(
Metric("foo", 2.1, Some(EWMA(value = 100.0, alpha = 0.18))),
Metric("bar1", Double.MinPositiveValue, None),
Metric("bar2", Float.MaxValue, None),
Metric("bar3", Int.MaxValue, None),
Metric("bar4", Long.MaxValue, None),
Metric("bar5", BigInt(Long.MaxValue), None)))))
checkSerialization(MetricsGossipEnvelope(a1.address, mg, true))
} }
} }
} }

View file

@ -1,119 +0,0 @@
/*
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.routing
// TODO remove metrics
import org.scalatest.WordSpec
import org.scalatest.Matchers
import akka.actor.Address
import akka.cluster.Metric
import akka.cluster.NodeMetrics
import akka.cluster.StandardMetrics._
class MetricsSelectorSpec extends WordSpec with Matchers {
val abstractSelector = new CapacityMetricsSelector {
override def capacity(nodeMetrics: Set[NodeMetrics]): Map[Address, Double] = Map.empty
}
val a1 = Address("akka.tcp", "sys", "a1", 2551)
val b1 = Address("akka.tcp", "sys", "b1", 2551)
val c1 = Address("akka.tcp", "sys", "c1", 2551)
val d1 = Address("akka.tcp", "sys", "d1", 2551)
val decayFactor = Some(0.18)
val nodeMetricsA = NodeMetrics(a1, System.currentTimeMillis, Set(
Metric.create(HeapMemoryUsed, 128, decayFactor),
Metric.create(HeapMemoryCommitted, 256, decayFactor),
Metric.create(HeapMemoryMax, 512, None),
Metric.create(CpuCombined, 0.1, decayFactor),
Metric.create(SystemLoadAverage, 0.5, None),
Metric.create(Processors, 8, None)).flatten)
val nodeMetricsB = NodeMetrics(b1, System.currentTimeMillis, Set(
Metric.create(HeapMemoryUsed, 256, decayFactor),
Metric.create(HeapMemoryCommitted, 512, decayFactor),
Metric.create(HeapMemoryMax, 1024, None),
Metric.create(CpuCombined, 0.5, decayFactor),
Metric.create(SystemLoadAverage, 1.0, None),
Metric.create(Processors, 16, None)).flatten)
val nodeMetricsC = NodeMetrics(c1, System.currentTimeMillis, Set(
Metric.create(HeapMemoryUsed, 1024, decayFactor),
Metric.create(HeapMemoryCommitted, 1024, decayFactor),
Metric.create(HeapMemoryMax, 1024, None),
Metric.create(CpuCombined, 1.0, decayFactor),
Metric.create(SystemLoadAverage, 16.0, None),
Metric.create(Processors, 16, None)).flatten)
val nodeMetricsD = NodeMetrics(d1, System.currentTimeMillis, Set(
Metric.create(HeapMemoryUsed, 511, decayFactor),
Metric.create(HeapMemoryCommitted, 512, decayFactor),
Metric.create(HeapMemoryMax, 512, None),
Metric.create(Processors, 2, decayFactor)).flatten)
val nodeMetrics = Set(nodeMetricsA, nodeMetricsB, nodeMetricsC, nodeMetricsD)
"CapacityMetricsSelector" must {
"calculate weights from capacity" in {
val capacity = Map(a1 0.6, b1 0.3, c1 0.1)
val weights = abstractSelector.weights(capacity)
weights should ===(Map(c1 1, b1 3, a1 6))
}
"handle low and zero capacity" in {
val capacity = Map(a1 0.0, b1 1.0, c1 0.005, d1 0.004)
val weights = abstractSelector.weights(capacity)
weights should ===(Map(a1 0, b1 100, c1 1, d1 0))
}
}
"HeapMetricsSelector" must {
"calculate capacity of heap metrics" in {
val capacity = HeapMetricsSelector.capacity(nodeMetrics)
capacity(a1) should ===(0.75 +- 0.0001)
capacity(b1) should ===(0.75 +- 0.0001)
capacity(c1) should ===(0.0 +- 0.0001)
capacity(d1) should ===(0.001953125 +- 0.0001)
}
}
"CpuMetricsSelector" must {
"calculate capacity of cpuCombined metrics" in {
val capacity = CpuMetricsSelector.capacity(nodeMetrics)
capacity(a1) should ===(0.9 +- 0.0001)
capacity(b1) should ===(0.5 +- 0.0001)
capacity(c1) should ===(0.0 +- 0.0001)
capacity.contains(d1) should ===(false)
}
}
"SystemLoadAverageMetricsSelector" must {
"calculate capacity of systemLoadAverage metrics" in {
val capacity = SystemLoadAverageMetricsSelector.capacity(nodeMetrics)
capacity(a1) should ===(0.9375 +- 0.0001)
capacity(b1) should ===(0.9375 +- 0.0001)
capacity(c1) should ===(0.0 +- 0.0001)
capacity.contains(d1) should ===(false)
}
}
"MixMetricsSelector" must {
"aggregate capacity of all metrics" in {
val capacity = MixMetricsSelector.capacity(nodeMetrics)
capacity(a1) should ===((0.75 + 0.9 + 0.9375) / 3 +- 0.0001)
capacity(b1) should ===((0.75 + 0.5 + 0.9375) / 3 +- 0.0001)
capacity(c1) should ===((0.0 + 0.0 + 0.0) / 3 +- 0.0001)
capacity(d1) should ===((0.001953125) / 1 +- 0.0001)
}
}
}

View file

@ -30,9 +30,6 @@ and add the following configuration stanza to your ``application.conf``
akka.extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ] akka.extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ]
Make sure to disable legacy metrics in akka-cluster: ``akka.cluster.metrics.enabled=off``,
since it is still enabled in akka-cluster by default (for compatibility with past releases).
Cluster members with status :ref:`WeaklyUp <weakly_up_java>`, if that feature is enabled, Cluster members with status :ref:`WeaklyUp <weakly_up_java>`, if that feature is enabled,
will participate in Cluster Metrics collection and dissemination. will participate in Cluster Metrics collection and dissemination.

View file

@ -26,9 +26,6 @@ and add the following configuration stanza to your ``application.conf``
akka.extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ] akka.extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ]
Make sure to disable legacy metrics in akka-cluster: ``akka.cluster.metrics.enabled=off``,
since it is still enabled in akka-cluster by default (for compatibility with past releases).
Cluster members with status :ref:`WeaklyUp <weakly_up_scala>`, if that feature is enabled, Cluster members with status :ref:`WeaklyUp <weakly_up_scala>`, if that feature is enabled,
will participate in Cluster Metrics collection and dissemination. will participate in Cluster Metrics collection and dissemination.

View file

@ -27,9 +27,6 @@ akka {
} }
} }
# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off
# Enable metrics extension in akka-cluster-metrics. # Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"] akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]

View file

@ -35,8 +35,6 @@ object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig {
nodeList foreach { role => nodeList foreach { role =>
nodeConfig(role) { nodeConfig(role) {
ConfigFactory.parseString(s""" ConfigFactory.parseString(s"""
# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off
# Enable metrics extension in akka-cluster-metrics. # Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"] akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
# Sigar native library extract location during tests. # Sigar native library extract location during tests.

View file

@ -31,8 +31,6 @@ object StatsSampleSpecConfig extends MultiNodeConfig {
nodeList foreach { role => nodeList foreach { role =>
nodeConfig(role) { nodeConfig(role) {
ConfigFactory.parseString(s""" ConfigFactory.parseString(s"""
# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off
# Enable metrics extension in akka-cluster-metrics. # Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"] akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
# Sigar native library extract location during tests. # Sigar native library extract location during tests.

View file

@ -30,8 +30,6 @@ object TransformationSampleSpecConfig extends MultiNodeConfig {
nodeList foreach { role => nodeList foreach { role =>
nodeConfig(role) { nodeConfig(role) {
ConfigFactory.parseString(s""" ConfigFactory.parseString(s"""
# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off
# Enable metrics extension in akka-cluster-metrics. # Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"] akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
# Sigar native library extract location during tests. # Sigar native library extract location during tests.

View file

@ -27,9 +27,6 @@ akka {
} }
} }
# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off
# Enable metrics extension in akka-cluster-metrics. # Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"] akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]

View file

@ -34,8 +34,6 @@ object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig {
nodeList foreach { role => nodeList foreach { role =>
nodeConfig(role) { nodeConfig(role) {
ConfigFactory.parseString(s""" ConfigFactory.parseString(s"""
# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off
# Enable metrics extension in akka-cluster-metrics. # Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"] akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
# Sigar native library extract location during tests. # Sigar native library extract location during tests.

View file

@ -27,8 +27,6 @@ object StatsSampleSpecConfig extends MultiNodeConfig {
nodeList foreach { role => nodeList foreach { role =>
nodeConfig(role) { nodeConfig(role) {
ConfigFactory.parseString(s""" ConfigFactory.parseString(s"""
# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off
# Enable metrics extension in akka-cluster-metrics. # Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"] akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
# Sigar native library extract location during tests. # Sigar native library extract location during tests.

View file

@ -29,8 +29,6 @@ object TransformationSampleSpecConfig extends MultiNodeConfig {
nodeList foreach { role => nodeList foreach { role =>
nodeConfig(role) { nodeConfig(role) {
ConfigFactory.parseString(s""" ConfigFactory.parseString(s"""
# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off
# Enable metrics extension in akka-cluster-metrics. # Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"] akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
# Sigar native library extract location during tests. # Sigar native library extract location during tests.

View file

@ -52,7 +52,6 @@ object Dependencies {
val osgiCore = "org.osgi" % "org.osgi.core" % "4.3.1" // ApacheV2 val osgiCore = "org.osgi" % "org.osgi.core" % "4.3.1" // ApacheV2
val osgiCompendium= "org.osgi" % "org.osgi.compendium" % "4.3.1" // ApacheV2 val osgiCompendium= "org.osgi" % "org.osgi.compendium" % "4.3.1" // ApacheV2
// TODO remove with metrics from akka-cluster
val sigar = "org.fusesource" % "sigar" % "1.6.4" // ApacheV2 val sigar = "org.fusesource" % "sigar" % "1.6.4" // ApacheV2
// reactive streams // reactive streams

View file

@ -82,6 +82,79 @@ object MiMa extends AutoPlugin {
import com.typesafe.tools.mima.core._ import com.typesafe.tools.mima.core._
val bcIssuesBetween24and25 = Seq( val bcIssuesBetween24and25 = Seq(
// #21423 Remove deprecated metrics
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterReadView.clusterMetrics"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.InternalClusterAction$MetricsTick$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.MetricsCollector"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.Metric"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.MetricsCollector$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.Metric$"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterSettings.MetricsMovingAverageHalfLife"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterSettings.MetricsGossipInterval"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterSettings.MetricsCollectorClass"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterSettings.MetricsInterval"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterSettings.MetricsEnabled"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.JmxMetricsCollector"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.SigarMetricsCollector"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.StandardMetrics$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.MetricNumericConverter"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.ClusterEvent$ClusterMetricsChanged"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.MetricsGossipEnvelope"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.StandardMetrics"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.NodeMetrics"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.StandardMetrics$Cpu$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.StandardMetrics$Cpu"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.InternalClusterAction$PublisherCreated"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.EWMA"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.MetricsGossip$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.InternalClusterAction$PublisherCreated$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.NodeMetrics$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.MetricsGossipEnvelope$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.ClusterMetricsCollector"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.EWMA$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.StandardMetrics$HeapMemory"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.MetricsGossip"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.ClusterEvent$ClusterMetricsChanged$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.StandardMetrics$HeapMemory$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.SystemLoadAverageMetricsSelector$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.AdaptiveLoadBalancingMetricsListener"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.WeightedRoutees"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.AdaptiveLoadBalancingPool"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.CpuMetricsSelector$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.MixMetricsSelector"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.CapacityMetricsSelector"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.SystemLoadAverageMetricsSelector"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.AdaptiveLoadBalancingRoutingLogic"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.HeapMetricsSelector"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.AdaptiveLoadBalancingPool$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.CpuMetricsSelector"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.AdaptiveLoadBalancingRoutingLogic$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.HeapMetricsSelector$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.MetricsSelector$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.AdaptiveLoadBalancingGroup$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.MixMetricsSelectorBase"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.AdaptiveLoadBalancingGroup"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.MixMetricsSelector$"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.routing.MetricsSelector"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$NodeMetrics"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$NodeMetrics$EWMA$Builder"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$NodeMetrics$MetricOrBuilder"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$NodeMetrics$Number"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$NodeMetrics$NumberType"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$MetricsGossipEnvelopeOrBuilder"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$NodeMetrics$Builder"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$NodeMetricsOrBuilder"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$NodeMetrics$NumberOrBuilder"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$NodeMetrics$EWMA"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$MetricsGossip$Builder"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$MetricsGossipOrBuilder"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$MetricsGossipEnvelope"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$MetricsGossip"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$MetricsGossipEnvelope$Builder"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$NodeMetrics$EWMAOrBuilder"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$NodeMetrics$Metric"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$NodeMetrics$Metric$Builder"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.protobuf.msg.ClusterMessages$NodeMetrics$Number$Builder"),
// #21537 coordinated shutdown // #21537 coordinated shutdown
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.removed"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.removed"),