diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala
index a9f60b3675..271ad1d29a 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala
@@ -84,7 +84,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
case GossipTick ⇒ gossip()
case MetricsTick ⇒ collect()
case state: CurrentClusterState ⇒ receiveState(state)
- case MemberUp(m) ⇒ receiveMember(m)
+ case MemberUp(m) ⇒ addMember(m)
case e: MemberEvent ⇒ removeMember(e)
case msg: MetricsGossipEnvelope ⇒ receiveGossip(msg)
}
@@ -99,7 +99,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
/**
* Adds a member to the node ring.
*/
- def receiveMember(member: Member): Unit = nodes += member.address
+ def addMember(member: Member): Unit = nodes += member.address
/**
* Removes a member from the member node ring.
@@ -113,7 +113,8 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
/**
* Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus.Up]].
*/
- def receiveState(state: CurrentClusterState): Unit = nodes = state.members collect { case m if m.status == Up ⇒ m.address }
+ def receiveState(state: CurrentClusterState): Unit =
+ nodes = state.members collect { case m if m.status == Up ⇒ m.address }
/**
* Samples the latest metrics for the node, updates metrics statistics in
@@ -128,16 +129,16 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
/**
* Receives changes from peer nodes, merges remote with local gossip nodes, then publishes
- * changes to the event stream for load balancing router consumption, and gossips to peers.
+ * changes to the event stream for load balancing router consumption, and gossip back.
*/
def receiveGossip(envelope: MetricsGossipEnvelope): Unit = {
- val remoteGossip = envelope.gossip
-
- if (remoteGossip != latestGossip) {
- latestGossip = latestGossip merge remoteGossip
- publish()
- gossipTo(envelope.from)
- }
+ // remote node might not have same view of member nodes, this side should only care
+ // about nodes that are known here, otherwise removed nodes can come back
+ val otherGossip = envelope.gossip.filter(nodes)
+ latestGossip = latestGossip merge otherGossip
+ publish()
+ if (!envelope.reply)
+ replyGossipTo(envelope.from)
}
/**
@@ -146,7 +147,13 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
def gossip(): Unit = selectRandomNode((nodes - selfAddress).toVector) foreach gossipTo
def gossipTo(address: Address): Unit =
- context.actorFor(self.path.toStringWithAddress(address)) ! MetricsGossipEnvelope(selfAddress, latestGossip)
+ sendGossip(address, MetricsGossipEnvelope(selfAddress, latestGossip, reply = false))
+
+ def replyGossipTo(address: Address): Unit =
+ sendGossip(address, MetricsGossipEnvelope(selfAddress, latestGossip, reply = true))
+
+ def sendGossip(address: Address, envelope: MetricsGossipEnvelope): Unit =
+ context.actorFor(self.path.toStringWithAddress(address)) ! envelope
def selectRandomNode(addresses: immutable.IndexedSeq[Address]): Option[Address] =
if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
@@ -177,51 +184,31 @@ private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics]) {
*/
def remove(node: Address): MetricsGossip = copy(nodes = nodes filterNot (_.address == node))
+ /**
+ * Only the nodes that are in the `includeNodes` Set.
+ */
+ def filter(includeNodes: Set[Address]): MetricsGossip =
+ copy(nodes = nodes filter { includeNodes contains _.address })
+
/**
* Adds new remote [[akka.cluster.NodeMetrics]] and merges existing from a remote gossip.
*/
- def merge(remoteGossip: MetricsGossip): MetricsGossip = {
- val remoteNodes = remoteGossip.nodes.map(n ⇒ n.address -> n).toMap
- val remoteNodesKeySet = remoteNodes.keySet
- val toMerge = nodeKeys intersect remoteNodesKeySet
- val onlyInRemote = remoteNodesKeySet -- nodeKeys
- val onlyInLocal = nodeKeys -- remoteNodesKeySet
-
- val seen = nodes.collect {
- case n if toMerge contains n.address ⇒ n merge remoteNodes(n.address)
- case n if onlyInLocal contains n.address ⇒ n
- }
-
- val unseen = remoteGossip.nodes.collect { case n if onlyInRemote contains n.address ⇒ n }
-
- copy(nodes = seen ++ unseen)
- }
+ def merge(otherGossip: MetricsGossip): MetricsGossip =
+ otherGossip.nodes.foldLeft(this) { (gossip, nodeMetrics) ⇒ gossip :+ nodeMetrics }
/**
* Adds new local [[akka.cluster.NodeMetrics]], or merges an existing.
*/
- def :+(data: NodeMetrics): MetricsGossip = {
- val previous = metricsFor(data.address)
- val names = previous map (_.name)
-
- val (toMerge: Set[Metric], unseen: Set[Metric]) = data.metrics partition (a ⇒ names contains a.name)
- val merged = toMerge flatMap (latest ⇒ previous.collect { case peer if latest sameAs peer ⇒ peer :+ latest })
-
- val refreshed = nodes filterNot (_.address == data.address)
- copy(nodes = refreshed + data.copy(metrics = unseen ++ merged))
+ def :+(newNodeMetrics: NodeMetrics): MetricsGossip = nodeMetricsFor(newNodeMetrics.address) match {
+ case Some(existingNodeMetrics) ⇒
+ copy(nodes = nodes - existingNodeMetrics + (existingNodeMetrics merge newNodeMetrics))
+ case None ⇒ copy(nodes = nodes + newNodeMetrics)
}
/**
- * Returns a set of [[akka.actor.Address]] for a given node set.
+ * Returns [[akka.cluster.NodeMetrics]] for a node if exists.
*/
- def nodeKeys: Set[Address] = nodes map (_.address)
-
- /**
- * Returns metrics for a node if exists.
- */
- def metricsFor(address: Address): Set[Metric] = nodes collectFirst {
- case n if (n.address == address) ⇒ n.metrics
- } getOrElse Set.empty[Metric]
+ def nodeMetricsFor(address: Address): Option[NodeMetrics] = nodes find { n ⇒ n.address == address }
}
@@ -229,7 +216,8 @@ private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics]) {
* INTERNAL API
* Envelope adding a sender address to the gossip.
*/
-private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip) extends ClusterMessage
+private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip, reply: Boolean)
+ extends ClusterMessage
object EWMA {
/**
@@ -294,6 +282,10 @@ private[cluster] case class EWMA(value: Double, alpha: Double) extends ClusterMe
}
/**
+ * Metrics key/value.
+ *
+ * Equality of Metric is based on its name.
+ *
* @param name the metric name
* @param value the metric value, which may or may not be defined, it must be a valid numerical value,
* see [[akka.cluster.MetricNumericConverter.defined()]]
@@ -334,6 +326,12 @@ case class Metric private (name: String, value: Number, private val average: Opt
*/
def sameAs(that: Metric): Boolean = name == that.name
+ override def hashCode = name.##
+ override def equals(obj: Any) = obj match {
+ case other: Metric ⇒ sameAs(other)
+ case _ ⇒ false
+ }
+
}
/**
@@ -369,6 +367,8 @@ object Metric extends MetricNumericConverter {
* The snapshot of current sampled health metrics for any monitored process.
* Collected and gossipped at regular intervals for dynamic cluster management strategies.
*
+ * Equality of NodeMetrics is based on its address.
+ *
* @param address [[akka.actor.Address]] of the node the metrics are gathered at
* @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
* @param metrics the set of sampled [[akka.actor.Metric]]
@@ -378,17 +378,14 @@ case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] =
/**
* Returns the most recent data.
*/
- def merge(that: NodeMetrics): NodeMetrics = if (this updatable that) copy(metrics = that.metrics, timestamp = that.timestamp) else this
-
- /**
- * Returns true if that address is the same as this and its metric set is more recent.
- */
- def updatable(that: NodeMetrics): Boolean = (this sameAs that) && (that.timestamp > timestamp)
-
- /**
- * Returns true if that address is the same as this
- */
- def sameAs(that: NodeMetrics): Boolean = address == that.address
+ def merge(that: NodeMetrics): NodeMetrics = {
+ require(address == that.address, s"merge only allowed for same address, [$address] != [$that.address]")
+ if (timestamp >= that.timestamp) this // that is older
+ else {
+ // equality is based on the name of the Metric and Set doesn't replace existing element
+ copy(metrics = that.metrics ++ metrics, timestamp = that.timestamp)
+ }
+ }
def metric(key: String): Option[Metric] = metrics.collectFirst { case m if m.name == key ⇒ m }
@@ -398,6 +395,17 @@ case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] =
def getMetrics: java.lang.Iterable[Metric] =
scala.collection.JavaConverters.asJavaIterableConverter(metrics).asJava
+ /**
+ * Returns true if that address is the same as this
+ */
+ def sameAs(that: NodeMetrics): Boolean = address == that.address
+
+ override def hashCode = address.##
+ override def equals(obj: Any) = obj match {
+ case other: NodeMetrics ⇒ sameAs(other)
+ case _ ⇒ false
+ }
+
}
/**
diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala
index 58e157d12e..f572b13233 100644
--- a/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/MetricNumericConverterSpec.scala
@@ -4,16 +4,15 @@
package akka.cluster
-import akka.testkit.{ ImplicitSender, AkkaSpec }
+import org.scalatest.WordSpec
+import org.scalatest.matchers.MustMatchers
import akka.cluster.StandardMetrics._
import scala.util.Failure
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
-class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricNumericConverter with ImplicitSender
- with MetricsCollectorFactory {
+class MetricNumericConverterSpec extends WordSpec with MustMatchers with MetricNumericConverter {
"MetricNumericConverter" must {
- val collector = createMetricsCollector
"convert" in {
convertNumber(0).isLeft must be(true)
diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricsGossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricsGossipSpec.scala
index 36470a4725..6d54a69bc2 100644
--- a/akka-cluster/src/test/scala/akka/cluster/MetricsGossipSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/MetricsGossipSpec.scala
@@ -17,7 +17,7 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val collector = createMetricsCollector
"A MetricsGossip" must {
- "add and initialize new NodeMetrics" in {
+ "add new NodeMetrics" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
@@ -26,14 +26,12 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val g1 = MetricsGossip.empty :+ m1
g1.nodes.size must be(1)
- g1.nodeKeys.size must be(g1.nodes.size)
- g1.metricsFor(m1.address).size must be(m1.metrics.size)
+ g1.nodeMetricsFor(m1.address).map(_.metrics) must be(Some(m1.metrics))
val g2 = g1 :+ m2
g2.nodes.size must be(2)
- g2.nodeKeys.size must be(g2.nodes.size)
- g2.metricsFor(m1.address).size must be(m1.metrics.size)
- g2.metricsFor(m2.address).size must be(m2.metrics.size)
+ g2.nodeMetricsFor(m1.address).map(_.metrics) must be(Some(m1.metrics))
+ g2.nodeMetricsFor(m2.address).map(_.metrics) must be(Some(m2.metrics))
}
"merge peer metrics" in {
@@ -47,8 +45,8 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = m2.timestamp + 1000)
val g2 = g1 :+ m2Updated // merge peers
g2.nodes.size must be(2)
- g2.metricsFor(m1.address).size must be(m1.metrics.size)
- g2.metricsFor(m2.address).size must be(m2Updated.metrics.size)
+ g2.nodeMetricsFor(m1.address).map(_.metrics) must be(Some(m1.metrics))
+ g2.nodeMetricsFor(m2.address).map(_.metrics) must be(Some(m2Updated.metrics))
g2.nodes collect { case peer if peer.address == m2.address ⇒ peer.timestamp must be(m2Updated.timestamp) }
}
@@ -61,23 +59,22 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val g1 = MetricsGossip.empty :+ m1 :+ m2
val g2 = MetricsGossip.empty :+ m3 :+ m2Updated
- g1.nodeKeys.contains(m1.address) must be(true)
- g2.nodeKeys.contains(m3.address) must be(true)
+ g1.nodes.map(_.address) must be(Set(m1.address, m2.address))
// must contain nodes 1,3, and the most recent version of 2
val mergedGossip = g1 merge g2
- mergedGossip.nodes.size must be(3)
- mergedGossip.metricsFor(m1.address).size must be(m1.metrics.size)
- mergedGossip.metricsFor(m2.address).size must be(m2Updated.metrics.size)
- mergedGossip.metricsFor(m3.address).size must be(m3.metrics.size)
+ mergedGossip.nodes.map(_.address) must be(Set(m1.address, m2.address, m3.address))
+ mergedGossip.nodeMetricsFor(m1.address).map(_.metrics) must be(Some(m1.metrics))
+ mergedGossip.nodeMetricsFor(m2.address).map(_.metrics) must be(Some(m2Updated.metrics))
+ mergedGossip.nodeMetricsFor(m3.address).map(_.metrics) must be(Some(m3.metrics))
mergedGossip.nodes.foreach(_.metrics.size must be > (3))
- mergedGossip.nodes.find(_.address == m2.address).get.timestamp must be(m2Updated.timestamp)
+ mergedGossip.nodeMetricsFor(m2.address).map(_.timestamp) must be(Some(m2Updated.timestamp))
}
"get the current NodeMetrics if it exists in the local nodes" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val g1 = MetricsGossip.empty :+ m1
- g1.metricsFor(m1.address).nonEmpty must be(true)
+ g1.nodeMetricsFor(m1.address).map(_.metrics) must be(Some(m1.metrics))
}
"remove a node if it is no longer Up" in {
@@ -89,8 +86,21 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val g2 = g1 remove m1.address
g2.nodes.size must be(1)
g2.nodes.exists(_.address == m1.address) must be(false)
- g2.metricsFor(m1.address).size must be(0)
- g2.metricsFor(m2.address).size must be(m2.metrics.size)
+ g2.nodeMetricsFor(m1.address) must be(None)
+ g2.nodeMetricsFor(m2.address).map(_.metrics) must be(Some(m2.metrics))
+ }
+
+ "filter nodes" in {
+ val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
+ val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
+
+ val g1 = MetricsGossip.empty :+ m1 :+ m2
+ g1.nodes.size must be(2)
+ val g2 = g1 filter Set(m2.address)
+ g2.nodes.size must be(1)
+ g2.nodes.exists(_.address == m1.address) must be(false)
+ g2.nodeMetricsFor(m1.address) must be(None)
+ g2.nodeMetricsFor(m2.address).map(_.metrics) must be(Some(m2.metrics))
}
}
}
diff --git a/akka-cluster/src/test/scala/akka/cluster/NodeMetricsSpec.scala b/akka-cluster/src/test/scala/akka/cluster/NodeMetricsSpec.scala
index 2c62bc5a41..7e80a04d64 100644
--- a/akka-cluster/src/test/scala/akka/cluster/NodeMetricsSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/NodeMetricsSpec.scala
@@ -4,26 +4,17 @@
package akka.cluster
-import akka.testkit.AkkaSpec
+import org.scalatest.WordSpec
+import org.scalatest.matchers.MustMatchers
import akka.actor.Address
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
-class NodeMetricsSpec extends AkkaSpec with MetricsCollectorFactory {
-
- val collector = createMetricsCollector
+class NodeMetricsSpec extends WordSpec with MustMatchers {
val node1 = Address("akka", "sys", "a", 2554)
-
val node2 = Address("akka", "sys", "a", 2555)
"NodeMetrics must" must {
- "recognize updatable nodes" in {
- (NodeMetrics(node1, 0) updatable NodeMetrics(node1, 1)) must be(true)
- }
-
- "recognize non-updatable nodes" in {
- (NodeMetrics(node1, 1) updatable NodeMetrics(node2, 0)) must be(false)
- }
"return correct result for 2 'same' nodes" in {
(NodeMetrics(node1, 0) sameAs NodeMetrics(node1, 0)) must be(true)
@@ -34,21 +25,23 @@ class NodeMetricsSpec extends AkkaSpec with MetricsCollectorFactory {
}
"merge 2 NodeMetrics by most recent" in {
- val sample1 = NodeMetrics(node1, 1, collector.sample.metrics)
- val sample2 = NodeMetrics(node1, 2, collector.sample.metrics)
+ val sample1 = NodeMetrics(node1, 1, Set(Metric.create("a", 10, None), Metric.create("b", 20, None)).flatten)
+ val sample2 = NodeMetrics(node1, 2, Set(Metric.create("a", 11, None), Metric.create("c", 30, None)).flatten)
val merged = sample1 merge sample2
merged.timestamp must be(sample2.timestamp)
- merged.metrics must be(sample2.metrics)
+ merged.metric("a").map(_.value) must be(Some(11))
+ merged.metric("b").map(_.value) must be(Some(20))
+ merged.metric("c").map(_.value) must be(Some(30))
}
"not merge 2 NodeMetrics if master is more recent" in {
- val sample1 = NodeMetrics(node1, 1, collector.sample.metrics)
- val sample2 = NodeMetrics(node2, 0, sample1.metrics)
+ val sample1 = NodeMetrics(node1, 1, Set(Metric.create("a", 10, None), Metric.create("b", 20, None)).flatten)
+ val sample2 = NodeMetrics(node1, 0, Set(Metric.create("a", 11, None), Metric.create("c", 30, None)).flatten)
- val merged = sample2 merge sample2 // older and not same
- merged.timestamp must be(sample2.timestamp)
- merged.metrics must be(sample2.metrics)
+ val merged = sample1 merge sample2 // older and not same
+ merged.timestamp must be(sample1.timestamp)
+ merged.metrics must be(sample1.metrics)
}
}
}