Simplify MetricsGossip and NodeMetrics merge, see #2547

* Also fixed problem with removed nodes potentially added
  back to MetricsGossip
This commit is contained in:
Patrik Nordwall 2012-11-24 22:01:27 +01:00
parent 1466ad84af
commit e9e1580d47
4 changed files with 110 additions and 100 deletions

View file

@ -84,7 +84,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
case GossipTick gossip()
case MetricsTick collect()
case state: CurrentClusterState receiveState(state)
case MemberUp(m) receiveMember(m)
case MemberUp(m) addMember(m)
case e: MemberEvent removeMember(e)
case msg: MetricsGossipEnvelope receiveGossip(msg)
}
@ -99,7 +99,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
/**
* Adds a member to the node ring.
*/
def receiveMember(member: Member): Unit = nodes += member.address
def addMember(member: Member): Unit = nodes += member.address
/**
* Removes a member from the member node ring.
@ -113,7 +113,8 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
/**
* Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus.Up]].
*/
def receiveState(state: CurrentClusterState): Unit = nodes = state.members collect { case m if m.status == Up m.address }
def receiveState(state: CurrentClusterState): Unit =
nodes = state.members collect { case m if m.status == Up m.address }
/**
* Samples the latest metrics for the node, updates metrics statistics in
@ -128,16 +129,16 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
/**
* Receives changes from peer nodes, merges remote with local gossip nodes, then publishes
* changes to the event stream for load balancing router consumption, and gossips to peers.
* changes to the event stream for load balancing router consumption, and gossip back.
*/
def receiveGossip(envelope: MetricsGossipEnvelope): Unit = {
val remoteGossip = envelope.gossip
if (remoteGossip != latestGossip) {
latestGossip = latestGossip merge remoteGossip
publish()
gossipTo(envelope.from)
}
// remote node might not have same view of member nodes, this side should only care
// about nodes that are known here, otherwise removed nodes can come back
val otherGossip = envelope.gossip.filter(nodes)
latestGossip = latestGossip merge otherGossip
publish()
if (!envelope.reply)
replyGossipTo(envelope.from)
}
/**
@ -146,7 +147,13 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
def gossip(): Unit = selectRandomNode((nodes - selfAddress).toVector) foreach gossipTo
def gossipTo(address: Address): Unit =
context.actorFor(self.path.toStringWithAddress(address)) ! MetricsGossipEnvelope(selfAddress, latestGossip)
sendGossip(address, MetricsGossipEnvelope(selfAddress, latestGossip, reply = false))
def replyGossipTo(address: Address): Unit =
sendGossip(address, MetricsGossipEnvelope(selfAddress, latestGossip, reply = true))
def sendGossip(address: Address, envelope: MetricsGossipEnvelope): Unit =
context.actorFor(self.path.toStringWithAddress(address)) ! envelope
def selectRandomNode(addresses: immutable.IndexedSeq[Address]): Option[Address] =
if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
@ -177,51 +184,31 @@ private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics]) {
*/
def remove(node: Address): MetricsGossip = copy(nodes = nodes filterNot (_.address == node))
/**
* Only the nodes that are in the `includeNodes` Set.
*/
def filter(includeNodes: Set[Address]): MetricsGossip =
copy(nodes = nodes filter { includeNodes contains _.address })
/**
* Adds new remote [[akka.cluster.NodeMetrics]] and merges existing from a remote gossip.
*/
def merge(remoteGossip: MetricsGossip): MetricsGossip = {
val remoteNodes = remoteGossip.nodes.map(n n.address -> n).toMap
val remoteNodesKeySet = remoteNodes.keySet
val toMerge = nodeKeys intersect remoteNodesKeySet
val onlyInRemote = remoteNodesKeySet -- nodeKeys
val onlyInLocal = nodeKeys -- remoteNodesKeySet
val seen = nodes.collect {
case n if toMerge contains n.address n merge remoteNodes(n.address)
case n if onlyInLocal contains n.address n
}
val unseen = remoteGossip.nodes.collect { case n if onlyInRemote contains n.address n }
copy(nodes = seen ++ unseen)
}
def merge(otherGossip: MetricsGossip): MetricsGossip =
otherGossip.nodes.foldLeft(this) { (gossip, nodeMetrics) gossip :+ nodeMetrics }
/**
* Adds new local [[akka.cluster.NodeMetrics]], or merges an existing.
*/
def :+(data: NodeMetrics): MetricsGossip = {
val previous = metricsFor(data.address)
val names = previous map (_.name)
val (toMerge: Set[Metric], unseen: Set[Metric]) = data.metrics partition (a names contains a.name)
val merged = toMerge flatMap (latest previous.collect { case peer if latest sameAs peer peer :+ latest })
val refreshed = nodes filterNot (_.address == data.address)
copy(nodes = refreshed + data.copy(metrics = unseen ++ merged))
def :+(newNodeMetrics: NodeMetrics): MetricsGossip = nodeMetricsFor(newNodeMetrics.address) match {
case Some(existingNodeMetrics)
copy(nodes = nodes - existingNodeMetrics + (existingNodeMetrics merge newNodeMetrics))
case None copy(nodes = nodes + newNodeMetrics)
}
/**
* Returns a set of [[akka.actor.Address]] for a given node set.
* Returns [[akka.cluster.NodeMetrics]] for a node if exists.
*/
def nodeKeys: Set[Address] = nodes map (_.address)
/**
* Returns metrics for a node if exists.
*/
def metricsFor(address: Address): Set[Metric] = nodes collectFirst {
case n if (n.address == address) n.metrics
} getOrElse Set.empty[Metric]
def nodeMetricsFor(address: Address): Option[NodeMetrics] = nodes find { n n.address == address }
}
@ -229,7 +216,8 @@ private[cluster] case class MetricsGossip(nodes: Set[NodeMetrics]) {
* INTERNAL API
* Envelope adding a sender address to the gossip.
*/
private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip) extends ClusterMessage
private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip, reply: Boolean)
extends ClusterMessage
object EWMA {
/**
@ -294,6 +282,10 @@ private[cluster] case class EWMA(value: Double, alpha: Double) extends ClusterMe
}
/**
* Metrics key/value.
*
* Equality of Metric is based on its name.
*
* @param name the metric name
* @param value the metric value, which may or may not be defined, it must be a valid numerical value,
* see [[akka.cluster.MetricNumericConverter.defined()]]
@ -334,6 +326,12 @@ case class Metric private (name: String, value: Number, private val average: Opt
*/
def sameAs(that: Metric): Boolean = name == that.name
override def hashCode = name.##
override def equals(obj: Any) = obj match {
case other: Metric sameAs(other)
case _ false
}
}
/**
@ -369,6 +367,8 @@ object Metric extends MetricNumericConverter {
* The snapshot of current sampled health metrics for any monitored process.
* Collected and gossipped at regular intervals for dynamic cluster management strategies.
*
* Equality of NodeMetrics is based on its address.
*
* @param address [[akka.actor.Address]] of the node the metrics are gathered at
* @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
* @param metrics the set of sampled [[akka.actor.Metric]]
@ -378,17 +378,14 @@ case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] =
/**
* Returns the most recent data.
*/
def merge(that: NodeMetrics): NodeMetrics = if (this updatable that) copy(metrics = that.metrics, timestamp = that.timestamp) else this
/**
* Returns true if <code>that</code> address is the same as this and its metric set is more recent.
*/
def updatable(that: NodeMetrics): Boolean = (this sameAs that) && (that.timestamp > timestamp)
/**
* Returns true if <code>that</code> address is the same as this
*/
def sameAs(that: NodeMetrics): Boolean = address == that.address
def merge(that: NodeMetrics): NodeMetrics = {
require(address == that.address, s"merge only allowed for same address, [$address] != [$that.address]")
if (timestamp >= that.timestamp) this // that is older
else {
// equality is based on the name of the Metric and Set doesn't replace existing element
copy(metrics = that.metrics ++ metrics, timestamp = that.timestamp)
}
}
def metric(key: String): Option[Metric] = metrics.collectFirst { case m if m.name == key m }
@ -398,6 +395,17 @@ case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] =
def getMetrics: java.lang.Iterable[Metric] =
scala.collection.JavaConverters.asJavaIterableConverter(metrics).asJava
/**
* Returns true if <code>that</code> address is the same as this
*/
def sameAs(that: NodeMetrics): Boolean = address == that.address
override def hashCode = address.##
override def equals(obj: Any) = obj match {
case other: NodeMetrics sameAs(other)
case _ false
}
}
/**

View file

@ -4,16 +4,15 @@
package akka.cluster
import akka.testkit.{ ImplicitSender, AkkaSpec }
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.cluster.StandardMetrics._
import scala.util.Failure
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricNumericConverterSpec extends AkkaSpec(MetricsEnabledSpec.config) with MetricNumericConverter with ImplicitSender
with MetricsCollectorFactory {
class MetricNumericConverterSpec extends WordSpec with MustMatchers with MetricNumericConverter {
"MetricNumericConverter" must {
val collector = createMetricsCollector
"convert" in {
convertNumber(0).isLeft must be(true)

View file

@ -17,7 +17,7 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val collector = createMetricsCollector
"A MetricsGossip" must {
"add and initialize new NodeMetrics" in {
"add new NodeMetrics" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
@ -26,14 +26,12 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val g1 = MetricsGossip.empty :+ m1
g1.nodes.size must be(1)
g1.nodeKeys.size must be(g1.nodes.size)
g1.metricsFor(m1.address).size must be(m1.metrics.size)
g1.nodeMetricsFor(m1.address).map(_.metrics) must be(Some(m1.metrics))
val g2 = g1 :+ m2
g2.nodes.size must be(2)
g2.nodeKeys.size must be(g2.nodes.size)
g2.metricsFor(m1.address).size must be(m1.metrics.size)
g2.metricsFor(m2.address).size must be(m2.metrics.size)
g2.nodeMetricsFor(m1.address).map(_.metrics) must be(Some(m1.metrics))
g2.nodeMetricsFor(m2.address).map(_.metrics) must be(Some(m2.metrics))
}
"merge peer metrics" in {
@ -47,8 +45,8 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = m2.timestamp + 1000)
val g2 = g1 :+ m2Updated // merge peers
g2.nodes.size must be(2)
g2.metricsFor(m1.address).size must be(m1.metrics.size)
g2.metricsFor(m2.address).size must be(m2Updated.metrics.size)
g2.nodeMetricsFor(m1.address).map(_.metrics) must be(Some(m1.metrics))
g2.nodeMetricsFor(m2.address).map(_.metrics) must be(Some(m2Updated.metrics))
g2.nodes collect { case peer if peer.address == m2.address peer.timestamp must be(m2Updated.timestamp) }
}
@ -61,23 +59,22 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val g1 = MetricsGossip.empty :+ m1 :+ m2
val g2 = MetricsGossip.empty :+ m3 :+ m2Updated
g1.nodeKeys.contains(m1.address) must be(true)
g2.nodeKeys.contains(m3.address) must be(true)
g1.nodes.map(_.address) must be(Set(m1.address, m2.address))
// must contain nodes 1,3, and the most recent version of 2
val mergedGossip = g1 merge g2
mergedGossip.nodes.size must be(3)
mergedGossip.metricsFor(m1.address).size must be(m1.metrics.size)
mergedGossip.metricsFor(m2.address).size must be(m2Updated.metrics.size)
mergedGossip.metricsFor(m3.address).size must be(m3.metrics.size)
mergedGossip.nodes.map(_.address) must be(Set(m1.address, m2.address, m3.address))
mergedGossip.nodeMetricsFor(m1.address).map(_.metrics) must be(Some(m1.metrics))
mergedGossip.nodeMetricsFor(m2.address).map(_.metrics) must be(Some(m2Updated.metrics))
mergedGossip.nodeMetricsFor(m3.address).map(_.metrics) must be(Some(m3.metrics))
mergedGossip.nodes.foreach(_.metrics.size must be > (3))
mergedGossip.nodes.find(_.address == m2.address).get.timestamp must be(m2Updated.timestamp)
mergedGossip.nodeMetricsFor(m2.address).map(_.timestamp) must be(Some(m2Updated.timestamp))
}
"get the current NodeMetrics if it exists in the local nodes" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val g1 = MetricsGossip.empty :+ m1
g1.metricsFor(m1.address).nonEmpty must be(true)
g1.nodeMetricsFor(m1.address).map(_.metrics) must be(Some(m1.metrics))
}
"remove a node if it is no longer Up" in {
@ -89,8 +86,21 @@ class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with Implici
val g2 = g1 remove m1.address
g2.nodes.size must be(1)
g2.nodes.exists(_.address == m1.address) must be(false)
g2.metricsFor(m1.address).size must be(0)
g2.metricsFor(m2.address).size must be(m2.metrics.size)
g2.nodeMetricsFor(m1.address) must be(None)
g2.nodeMetricsFor(m2.address).map(_.metrics) must be(Some(m2.metrics))
}
"filter nodes" in {
val m1 = NodeMetrics(Address("akka", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
val g1 = MetricsGossip.empty :+ m1 :+ m2
g1.nodes.size must be(2)
val g2 = g1 filter Set(m2.address)
g2.nodes.size must be(1)
g2.nodes.exists(_.address == m1.address) must be(false)
g2.nodeMetricsFor(m1.address) must be(None)
g2.nodeMetricsFor(m2.address).map(_.metrics) must be(Some(m2.metrics))
}
}
}

View file

@ -4,26 +4,17 @@
package akka.cluster
import akka.testkit.AkkaSpec
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.Address
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class NodeMetricsSpec extends AkkaSpec with MetricsCollectorFactory {
val collector = createMetricsCollector
class NodeMetricsSpec extends WordSpec with MustMatchers {
val node1 = Address("akka", "sys", "a", 2554)
val node2 = Address("akka", "sys", "a", 2555)
"NodeMetrics must" must {
"recognize updatable nodes" in {
(NodeMetrics(node1, 0) updatable NodeMetrics(node1, 1)) must be(true)
}
"recognize non-updatable nodes" in {
(NodeMetrics(node1, 1) updatable NodeMetrics(node2, 0)) must be(false)
}
"return correct result for 2 'same' nodes" in {
(NodeMetrics(node1, 0) sameAs NodeMetrics(node1, 0)) must be(true)
@ -34,21 +25,23 @@ class NodeMetricsSpec extends AkkaSpec with MetricsCollectorFactory {
}
"merge 2 NodeMetrics by most recent" in {
val sample1 = NodeMetrics(node1, 1, collector.sample.metrics)
val sample2 = NodeMetrics(node1, 2, collector.sample.metrics)
val sample1 = NodeMetrics(node1, 1, Set(Metric.create("a", 10, None), Metric.create("b", 20, None)).flatten)
val sample2 = NodeMetrics(node1, 2, Set(Metric.create("a", 11, None), Metric.create("c", 30, None)).flatten)
val merged = sample1 merge sample2
merged.timestamp must be(sample2.timestamp)
merged.metrics must be(sample2.metrics)
merged.metric("a").map(_.value) must be(Some(11))
merged.metric("b").map(_.value) must be(Some(20))
merged.metric("c").map(_.value) must be(Some(30))
}
"not merge 2 NodeMetrics if master is more recent" in {
val sample1 = NodeMetrics(node1, 1, collector.sample.metrics)
val sample2 = NodeMetrics(node2, 0, sample1.metrics)
val sample1 = NodeMetrics(node1, 1, Set(Metric.create("a", 10, None), Metric.create("b", 20, None)).flatten)
val sample2 = NodeMetrics(node1, 0, Set(Metric.create("a", 11, None), Metric.create("c", 30, None)).flatten)
val merged = sample2 merge sample2 // older and not same
merged.timestamp must be(sample2.timestamp)
merged.metrics must be(sample2.metrics)
val merged = sample1 merge sample2 // older and not same
merged.timestamp must be(sample1.timestamp)
merged.metrics must be(sample1.metrics)
}
}
}