pekko/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/ClusterMetricsExtensionSpec.scala

164 lines
5.8 KiB
Scala
Raw Normal View History

/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.metrics
import scala.language.postfixOps
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.actor.ExtendedActorSystem
import akka.cluster.MultiNodeClusterSpec
import akka.testkit.LongRunningTest
import akka.cluster.MemberStatus
trait ClusterMetricsCommonConfig extends MultiNodeConfig {
import ConfigFactory._
val node1 = role("node-1")
val node2 = role("node-2")
val node3 = role("node-3")
val node4 = role("node-4")
val node5 = role("node-5")
def nodeList = Seq(node1, node2, node3, node4, node5)
// Extract individual sigar library for every node.
nodeList foreach { role
nodeConfig(role) {
parseString("akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native/" + role.name)
}
}
// Disable legacy metrics in akka-cluster.
def disableMetricsLegacy = parseString("""akka.cluster.metrics.enabled=off""")
// Enable metrics extension in akka-cluster-metrics.
def enableMetricsExtension = parseString("""
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
akka.cluster.metrics.collector.enabled = on
""")
// Disable metrics extension in akka-cluster-metrics.
def disableMetricsExtension = parseString("""
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
akka.cluster.metrics.collector.enabled = off
""")
// Activate slf4j logging along with test listener.
def customLogging = parseString("""akka.loggers=["akka.testkit.TestEventListener","akka.event.slf4j.Slf4jLogger"]""")
}
object ClusterMetricsDisabledConfig extends ClusterMetricsCommonConfig {
commonConfig {
Seq(
customLogging,
disableMetricsLegacy,
disableMetricsExtension,
debugConfig(on = false),
MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)
.reduceLeft(_ withFallback _)
}
}
object ClusterMetricsEnabledConfig extends ClusterMetricsCommonConfig {
import ConfigFactory._
commonConfig {
Seq(
customLogging,
disableMetricsLegacy,
enableMetricsExtension,
debugConfig(on = false),
MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)
.reduceLeft(_ withFallback _)
}
}
class ClusterMetricsEnabledMultiJvmNode1 extends ClusterMetricsEnabledSpec
class ClusterMetricsEnabledMultiJvmNode2 extends ClusterMetricsEnabledSpec
class ClusterMetricsEnabledMultiJvmNode3 extends ClusterMetricsEnabledSpec
class ClusterMetricsEnabledMultiJvmNode4 extends ClusterMetricsEnabledSpec
class ClusterMetricsEnabledMultiJvmNode5 extends ClusterMetricsEnabledSpec
abstract class ClusterMetricsEnabledSpec extends MultiNodeSpec(ClusterMetricsEnabledConfig)
with MultiNodeClusterSpec with RedirectLogging {
import ClusterMetricsEnabledConfig._
def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]
def saveApplicationConf(): Unit = {
import java.io.File
import java.io.PrintWriter
val conf = cluster.system.settings.config
val text = conf.root.render
val file = new File(s"target/${myself.name}_application.conf")
Some(new PrintWriter(file)) map { p p.write(text); p.close }
}
saveApplicationConf()
val metricsView = new ClusterMetricsView(cluster.system)
"Cluster metrics" must {
"periodically collect metrics on each node, publish to the event stream, " +
"and gossip metrics around the node ring" taggedAs LongRunningTest in within(60 seconds) {
awaitClusterUp(roles: _*)
enterBarrier("cluster-started")
2015-01-16 11:09:59 +01:00
awaitAssert(clusterView.members.count(_.status == MemberStatus.Up) should ===(roles.size))
// TODO ensure same contract
2015-01-16 11:09:59 +01:00
//awaitAssert(clusterView.clusterMetrics.size should ===(roles.size))
awaitAssert(metricsView.clusterMetrics.size should ===(roles.size))
val collector = MetricsCollector(cluster.system)
collector.sample.metrics.size should be > (3)
enterBarrier("after")
}
"reflect the correct number of node metrics in cluster view" taggedAs LongRunningTest in within(30 seconds) {
runOn(node2) {
cluster.leave(node1)
}
enterBarrier("first-left")
runOn(node2, node3, node4, node5) {
markNodeAsUnavailable(node1)
// TODO ensure same contract
2015-01-16 11:09:59 +01:00
//awaitAssert(clusterView.clusterMetrics.size should ===(roles.size - 1))
awaitAssert(metricsView.clusterMetrics.size should ===(roles.size - 1))
}
enterBarrier("finished")
}
}
}
class ClusterMetricsDisabledMultiJvmNode1 extends ClusterMetricsDisabledSpec
class ClusterMetricsDisabledMultiJvmNode2 extends ClusterMetricsDisabledSpec
class ClusterMetricsDisabledMultiJvmNodv3 extends ClusterMetricsDisabledSpec
class ClusterMetricsDisabledMultiJvmNode4 extends ClusterMetricsDisabledSpec
class ClusterMetricsDisabledMultiJvmNode5 extends ClusterMetricsDisabledSpec
abstract class ClusterMetricsDisabledSpec extends MultiNodeSpec(ClusterMetricsDisabledConfig)
with MultiNodeClusterSpec with RedirectLogging {
import akka.cluster.ClusterEvent.CurrentClusterState
val metricsView = new ClusterMetricsView(cluster.system)
"Cluster metrics" must {
"not collect metrics, not publish metrics events, and not gossip metrics" taggedAs LongRunningTest in {
awaitClusterUp(roles: _*)
// TODO ensure same contract
2015-01-16 11:09:59 +01:00
//clusterView.clusterMetrics.size should ===(0)
metricsView.clusterMetrics.size should ===(0)
cluster.subscribe(testActor, classOf[ClusterMetricsChanged])
expectMsgType[CurrentClusterState]
expectNoMsg
// TODO ensure same contract
2015-01-16 11:09:59 +01:00
//clusterView.clusterMetrics.size should ===(0)
metricsView.clusterMetrics.size should ===(0)
enterBarrier("after")
}
}
}