Mute expected log messages, see #2010
This commit is contained in:
parent
756f31c7b9
commit
8476b2195c
20 changed files with 119 additions and 49 deletions
|
|
@ -29,6 +29,8 @@ abstract class ClusterAccrualFailureDetectorSpec
|
|||
|
||||
import ClusterAccrualFailureDetectorMultiJvmSpec._
|
||||
|
||||
muteMarkingAsUnreachable()
|
||||
|
||||
"A heartbeat driven Failure Detector" must {
|
||||
|
||||
"receive heartbeats so that all member nodes in the cluster are marked 'available'" taggedAs LongRunningTest in {
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import org.scalatest.BeforeAndAfter
|
|||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.testkit.TestEvent._
|
||||
import akka.actor.Props
|
||||
import akka.actor.Actor
|
||||
import akka.actor.Address
|
||||
|
|
@ -35,6 +36,14 @@ abstract class ClusterDeathWatchSpec
|
|||
|
||||
import ClusterDeathWatchMultiJvmSpec._
|
||||
|
||||
override def atStartup(): Unit = {
|
||||
super.atStartup()
|
||||
if (!log.isDebugEnabled) {
|
||||
muteMarkingAsUnreachable()
|
||||
system.eventStream.publish(Mute(EventFilter[java.net.UnknownHostException]("unknownhost")))
|
||||
}
|
||||
}
|
||||
|
||||
"An actor watching a remote actor in the cluster" must {
|
||||
"receive Terminated when watched node becomes unreachable" taggedAs LongRunningTest in {
|
||||
awaitClusterUp(roles: _*)
|
||||
|
|
|
|||
|
|
@ -6,30 +6,25 @@ package akka.cluster
|
|||
|
||||
import scala.language.postfixOps
|
||||
import scala.concurrent.util.duration._
|
||||
import akka.remote.testkit.{MultiNodeSpec, MultiNodeConfig}
|
||||
import akka.remote.testkit.{ MultiNodeSpec, MultiNodeConfig }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.testkit.LongRunningTest
|
||||
|
||||
|
||||
object ClusterMetricsDataStreamingOffMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
commonConfig(ConfigFactory.parseString("akka.cluster.metrics.rate-of-decay = 0")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
||||
}
|
||||
class ClusterMetricsDataStreamingMultiJvmNode1 extends ClusterMetricsDataStreamingOffSpec
|
||||
class ClusterMetricsDataStreamingMultiJvmNode2 extends ClusterMetricsDataStreamingOffSpec
|
||||
class ClusterMetricsDataStreamingOffMultiJvmNode1 extends ClusterMetricsDataStreamingOffSpec
|
||||
class ClusterMetricsDataStreamingOffMultiJvmNode2 extends ClusterMetricsDataStreamingOffSpec
|
||||
|
||||
abstract class ClusterMetricsDataStreamingOffSpec extends MultiNodeSpec(ClusterMetricsDataStreamingOffMultiJvmSpec) with MultiNodeClusterSpec with MetricSpec {
|
||||
"Cluster metrics" must {
|
||||
"not collect stream metric data" taggedAs LongRunningTest in within(30 seconds) {
|
||||
awaitClusterUp(roles: _*)
|
||||
enterBarrier("cluster-started")
|
||||
runOn(roles: _*) {
|
||||
awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size)
|
||||
awaitCond(clusterView.clusterMetrics.size == roles.size)
|
||||
awaitCond(clusterView.clusterMetrics.flatMap(_.metrics).filter(_.trendable).forall(_.average.isEmpty))
|
||||
}
|
||||
awaitCond(clusterView.clusterMetrics.size == roles.size)
|
||||
awaitCond(clusterView.clusterMetrics.flatMap(_.metrics).filter(_.trendable).forall(_.average.isEmpty))
|
||||
enterBarrier("after")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,9 +4,10 @@
|
|||
|
||||
package akka.cluster
|
||||
|
||||
import akka.remote.testkit.{MultiNodeSpec, MultiNodeConfig}
|
||||
import akka.remote.testkit.{ MultiNodeSpec, MultiNodeConfig }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.testkit.LongRunningTest
|
||||
import akka.cluster.ClusterEvent._
|
||||
|
||||
object ClusterMetricsDisabledMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
|
|
@ -22,11 +23,11 @@ abstract class ClusterMetricsDisabledSpec extends MultiNodeSpec(ClusterMetricsDi
|
|||
"Cluster metrics" must {
|
||||
"not collect metrics, not publish ClusterMetricsChanged, and not gossip metrics" taggedAs LongRunningTest in {
|
||||
awaitClusterUp(roles: _*)
|
||||
enterBarrier("cluster-started")
|
||||
runOn(roles: _*) {
|
||||
awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size)
|
||||
awaitCond(clusterView.clusterMetrics.isEmpty)
|
||||
}
|
||||
clusterView.clusterMetrics.size must be(0)
|
||||
cluster.subscribe(testActor, classOf[ClusterMetricsChanged])
|
||||
expectMsgType[CurrentClusterState]
|
||||
expectNoMsg
|
||||
clusterView.clusterMetrics.size must be(0)
|
||||
enterBarrier("after")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,21 +19,7 @@ object ClusterMetricsMultiJvmSpec extends MultiNodeConfig {
|
|||
val fourth = role("fourth")
|
||||
val fifth = role("fifth")
|
||||
|
||||
commonConfig(ConfigFactory.parseString("""
|
||||
akka.cluster.auto-join = on
|
||||
akka.cluster.metrics.enabled = on
|
||||
akka.cluster.metrics.metrics-interval = 3 s
|
||||
akka.cluster.metrics.gossip-interval = 3 s
|
||||
akka.cluster.metrics.rate-of-decay = 10
|
||||
akka.loglevel = INFO
|
||||
akka.remote.log-sent-messages = off
|
||||
akka.remote.log-received-messages = off
|
||||
akka.actor.debug.receive = off
|
||||
akka.actor.debug.unhandled = off
|
||||
akka.actor.debug.lifecycle = off
|
||||
akka.actor.debug.autoreceive = off
|
||||
akka.actor.debug.fsm = off""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
||||
}
|
||||
|
||||
class ClusterMetricsMultiJvmNode1 extends ClusterMetricsSpec
|
||||
|
|
@ -45,21 +31,18 @@ class ClusterMetricsMultiJvmNode5 extends ClusterMetricsSpec
|
|||
abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSpec) with MultiNodeClusterSpec with MetricSpec {
|
||||
import ClusterMetricsMultiJvmSpec._
|
||||
|
||||
val collector = MetricsCollector(cluster.selfAddress, log, system.asInstanceOf[ExtendedActorSystem].dynamicAccess)
|
||||
|
||||
"Cluster metrics" must {
|
||||
"periodically collect metrics on each node, publish ClusterMetricsChanged to the event stream, " +
|
||||
"and gossip metrics around the node ring" taggedAs LongRunningTest in within(60 seconds) {
|
||||
awaitClusterUp(roles: _*)
|
||||
enterBarrier("cluster-started")
|
||||
runOn(roles: _*) {
|
||||
awaitClusterUp(roles: _*)
|
||||
enterBarrier("cluster-started")
|
||||
awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size)
|
||||
awaitCond(clusterView.clusterMetrics.size == roles.size)
|
||||
assertInitialized(cluster.settings.MetricsRateOfDecay, collectNodeMetrics(clusterView.clusterMetrics).toSet)
|
||||
clusterView.clusterMetrics.foreach(n => assertExpectedSampleSize(collector.isSigar, cluster.settings.MetricsRateOfDecay, n))
|
||||
val collector = MetricsCollector(cluster.selfAddress, log, system.asInstanceOf[ExtendedActorSystem].dynamicAccess)
|
||||
clusterView.clusterMetrics.foreach(n ⇒ assertExpectedSampleSize(collector.isSigar, cluster.settings.MetricsRateOfDecay, n))
|
||||
enterBarrier("after")
|
||||
}
|
||||
enterBarrier("after")
|
||||
}
|
||||
"reflect the correct number of node metrics in cluster view" taggedAs LongRunningTest in within(30 seconds) {
|
||||
runOn(second) {
|
||||
cluster.leave(first)
|
||||
|
|
|
|||
|
|
@ -41,6 +41,8 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig)
|
|||
|
||||
import multiNodeConfig._
|
||||
|
||||
muteMarkingAsUnreachable()
|
||||
|
||||
"A cluster of 3 members" must {
|
||||
|
||||
"reach initial convergence" taggedAs LongRunningTest in {
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import com.typesafe.config.ConfigFactory
|
|||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.testkit.TestEvent._
|
||||
import scala.concurrent.util.duration._
|
||||
import akka.actor.ActorSystem
|
||||
import scala.concurrent.util.Deadline
|
||||
|
|
@ -44,6 +45,7 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig {
|
|||
failure-detector.acceptable-heartbeat-pause = 10s
|
||||
publish-stats-interval = 0 s # always, when it happens
|
||||
}
|
||||
akka.event-handlers = ["akka.testkit.TestEventListener"]
|
||||
akka.loglevel = INFO
|
||||
akka.actor.provider = akka.cluster.ClusterActorRefProvider
|
||||
akka.actor.default-dispatcher.fork-join-executor {
|
||||
|
|
@ -86,6 +88,12 @@ abstract class LargeClusterSpec
|
|||
import LargeClusterMultiJvmSpec._
|
||||
import ClusterEvent._
|
||||
|
||||
override def muteLog(sys: ActorSystem = system): Unit = {
|
||||
super.muteLog(sys)
|
||||
muteMarkingAsUnreachable(sys)
|
||||
muteDeadLetters(sys)
|
||||
}
|
||||
|
||||
var systems: IndexedSeq[ActorSystem] = IndexedSeq(system)
|
||||
val nodesPerDatacenter = system.settings.config.getInt(
|
||||
"akka.test.large-cluster-spec.nodes-per-datacenter")
|
||||
|
|
@ -117,8 +125,11 @@ abstract class LargeClusterSpec
|
|||
|
||||
def startupSystems(): Unit = {
|
||||
// one system is already started by the multi-node test
|
||||
for (n ← 2 to nodesPerDatacenter)
|
||||
systems :+= ActorSystem(myself.name + "-" + n, system.settings.config)
|
||||
for (n ← 2 to nodesPerDatacenter) {
|
||||
val sys = ActorSystem(myself.name + "-" + n, system.settings.config)
|
||||
muteLog(sys)
|
||||
systems :+= sys
|
||||
}
|
||||
|
||||
// Initialize the Cluster extensions, i.e. startup the clusters
|
||||
systems foreach { Cluster(_) }
|
||||
|
|
|
|||
|
|
@ -41,6 +41,8 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow
|
|||
|
||||
import multiNodeConfig._
|
||||
|
||||
muteMarkingAsUnreachable()
|
||||
|
||||
"The Leader in a 4 node cluster" must {
|
||||
|
||||
"be able to DOWN a 'last' node that is UNREACHABLE" taggedAs LongRunningTest in {
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ abstract class MembershipChangeListenerJoinSpec
|
|||
"be notified when new node is JOINING" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
cluster.join(first)
|
||||
val joinLatch = TestLatch()
|
||||
val expectedAddresses = Set(first, second) map address
|
||||
cluster.subscribe(system.actorOf(Props(new Actor {
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import akka.actor.{ Address, ExtendedActorSystem }
|
|||
import akka.remote.testconductor.RoleName
|
||||
import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec }
|
||||
import akka.testkit._
|
||||
import akka.testkit.TestEvent._
|
||||
import scala.concurrent.util.duration._
|
||||
import scala.concurrent.util.Duration
|
||||
import org.scalatest.Suite
|
||||
|
|
@ -18,6 +19,8 @@ import java.util.concurrent.ConcurrentHashMap
|
|||
import akka.actor.ActorPath
|
||||
import akka.actor.RootActorPath
|
||||
import scala.concurrent.util.FiniteDuration
|
||||
import akka.event.Logging.ErrorLevel
|
||||
import akka.actor.ActorSystem
|
||||
|
||||
object MultiNodeClusterSpec {
|
||||
|
||||
|
|
@ -31,7 +34,7 @@ object MultiNodeClusterSpec {
|
|||
def clusterConfig: Config = ConfigFactory.parseString("""
|
||||
akka.actor.provider = akka.cluster.ClusterActorRefProvider
|
||||
akka.cluster {
|
||||
auto-join = on
|
||||
auto-join = off
|
||||
auto-down = off
|
||||
jmx.enabled = off
|
||||
gossip-interval = 200 ms
|
||||
|
|
@ -41,7 +44,9 @@ object MultiNodeClusterSpec {
|
|||
publish-stats-interval = 0 s # always, when it happens
|
||||
failure-detector.heartbeat-interval = 400 ms
|
||||
}
|
||||
akka.loglevel = INFO
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
akka.event-handlers = ["akka.testkit.TestEventListener"]
|
||||
akka.test {
|
||||
single-expect-default = 5 s
|
||||
}
|
||||
|
|
@ -54,6 +59,47 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
|
|||
|
||||
private val cachedAddresses = new ConcurrentHashMap[RoleName, Address]
|
||||
|
||||
override def atStartup(): Unit = {
|
||||
muteLog()
|
||||
}
|
||||
|
||||
def muteLog(sys: ActorSystem = system): Unit = {
|
||||
if (!sys.log.isDebugEnabled) {
|
||||
Seq(".*Metrics collection has started successfully.*",
|
||||
".*Hyperic SIGAR was not found on the classpath.*",
|
||||
".*Cluster Node.* - registered cluster JMX MBean.*",
|
||||
".*Cluster Node.* - is starting up.*",
|
||||
".*Shutting down cluster Node.*",
|
||||
".*Cluster node successfully shut down.*",
|
||||
".*Using a dedicated scheduler for cluster.*",
|
||||
".*Phi value.* for connection.*") foreach { s ⇒
|
||||
sys.eventStream.publish(Mute(EventFilter.info(pattern = s)))
|
||||
}
|
||||
|
||||
Seq(".*received dead letter from.*ClientDisconnected",
|
||||
".*received dead letter from.*deadLetters.*PoisonPill",
|
||||
".*installing context org.jboss.netty.channel.DefaultChannelPipeline.*") foreach { s ⇒
|
||||
sys.eventStream.publish(Mute(EventFilter.warning(pattern = s)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def muteMarkingAsUnreachable(sys: ActorSystem = system): Unit = if (!sys.log.isDebugEnabled) {
|
||||
sys.eventStream.publish(Mute(EventFilter.error(pattern = ".*Marking.* as UNREACHABLE.*")))
|
||||
}
|
||||
|
||||
def muteDeadLetters(sys: ActorSystem = system): Unit = if (!sys.log.isDebugEnabled) {
|
||||
sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*")))
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
if (!log.isDebugEnabled) {
|
||||
muteDeadLetters()
|
||||
system.eventStream.setLogLevel(ErrorLevel)
|
||||
}
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
/**
|
||||
* Lookup the Address for the role.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -56,6 +56,7 @@ abstract class NodeLeavingAndExitingSpec
|
|||
exitingLatch.countDown()
|
||||
case MemberLeft(m) if m.address == secondAddess ⇒ leavingLatch.countDown()
|
||||
case MemberExited(m) if m.address == secondAddess ⇒ exitingLatch.countDown()
|
||||
case MemberRemoved(m) ⇒ // not tested here
|
||||
|
||||
}
|
||||
})), classOf[MemberEvent])
|
||||
|
|
|
|||
|
|
@ -39,10 +39,11 @@ abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeCo
|
|||
|
||||
import multiNodeConfig._
|
||||
|
||||
muteMarkingAsUnreachable()
|
||||
|
||||
"A cluster of 2 nodes" must {
|
||||
|
||||
"become singleton cluster when started with 'auto-join=on' and 'seed-nodes=[]'" taggedAs LongRunningTest in {
|
||||
startClusterNode()
|
||||
awaitUpConvergence(1)
|
||||
clusterView.isSingletonCluster must be(true)
|
||||
|
||||
|
|
|
|||
|
|
@ -49,6 +49,8 @@ abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig)
|
|||
|
||||
import multiNodeConfig._
|
||||
|
||||
muteMarkingAsUnreachable()
|
||||
|
||||
val side1 = IndexedSeq(first, second)
|
||||
val side2 = IndexedSeq(third, fourth, fifth)
|
||||
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
|
|||
akka.cluster {
|
||||
auto-join = off
|
||||
}
|
||||
akka.event-handlers = ["akka.testkit.TestEventListener"]
|
||||
akka.loglevel = INFO
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
"""))
|
||||
|
|
@ -52,12 +53,12 @@ abstract class SunnyWeatherSpec
|
|||
// start some
|
||||
awaitClusterUp(first, second, third)
|
||||
runOn(first, second, third) {
|
||||
log.info("3 joined")
|
||||
log.debug("3 joined")
|
||||
}
|
||||
|
||||
// add a few more
|
||||
awaitClusterUp(roles: _*)
|
||||
log.info("5 joined")
|
||||
log.debug("5 joined")
|
||||
|
||||
val unexpected = new AtomicReference[SortedSet[Member]](SortedSet.empty)
|
||||
cluster.subscribe(system.actorOf(Props(new Actor {
|
||||
|
|
@ -74,7 +75,7 @@ abstract class SunnyWeatherSpec
|
|||
unexpected.get must be(SortedSet.empty)
|
||||
awaitUpConvergence(roles.size)
|
||||
assertLeaderIn(roles)
|
||||
if (n % 5 == 0) log.info("Passed period [{}]", n)
|
||||
if (n % 5 == 0) log.debug("Passed period [{}]", n)
|
||||
Thread.sleep(1000)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -37,6 +37,8 @@ abstract class TransitionSpec
|
|||
|
||||
import TransitionMultiJvmSpec._
|
||||
|
||||
muteMarkingAsUnreachable()
|
||||
|
||||
// sorted in the order used by the cluster
|
||||
def leader(roles: RoleName*) = roles.sorted.head
|
||||
def nonLeader(roles: RoleName*) = roles.toSeq.sorted.tail
|
||||
|
|
|
|||
|
|
@ -41,6 +41,8 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
|
|||
|
||||
import multiNodeConfig._
|
||||
|
||||
muteMarkingAsUnreachable()
|
||||
|
||||
def allBut(role: RoleName, roles: Seq[RoleName] = roles): Seq[RoleName] = {
|
||||
roles.filterNot(_ == role)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -246,6 +246,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
|
|||
}
|
||||
|
||||
"deploy programatically defined routees to other node when a node becomes down" taggedAs LongRunningTest in {
|
||||
muteMarkingAsUnreachable()
|
||||
|
||||
runOn(first) {
|
||||
def currentRoutees = Await.result(router2 ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees
|
||||
|
|
|
|||
|
|
@ -5,7 +5,8 @@
|
|||
package akka.cluster
|
||||
|
||||
import akka.actor.Address
|
||||
import akka.testkit.{ LongRunningTest, AkkaSpec }
|
||||
import akka.testkit._
|
||||
import akka.testkit.TestEvent._
|
||||
import scala.collection.immutable.TreeMap
|
||||
import scala.concurrent.util.duration._
|
||||
import scala.concurrent.util.Duration
|
||||
|
|
@ -16,6 +17,13 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
|
|||
akka.loglevel = "INFO"
|
||||
""") {
|
||||
|
||||
override def atStartup(): Unit = {
|
||||
super.atStartup()
|
||||
if (!log.isDebugEnabled) {
|
||||
system.eventStream.publish(Mute(EventFilter.info(pattern = ".*Phi value.*")))
|
||||
}
|
||||
}
|
||||
|
||||
"An AccrualFailureDetector" must {
|
||||
val conn = Address("akka", "", "localhost", 2552)
|
||||
val conn2 = Address("akka", "", "localhost", 2553)
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ object MetricsEnabledSpec {
|
|||
akka.cluster.metrics.gossip-interval = 1 s
|
||||
akka.cluster.metrics.rate-of-decay = 10
|
||||
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
akka.loglevel = INFO"""
|
||||
"""
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ abstract class MultiNodeConfig {
|
|||
akka.remote.log-remote-lifecycle-events = on
|
||||
""")
|
||||
else
|
||||
ConfigFactory.parseString("akka.loglevel = INFO")
|
||||
ConfigFactory.empty
|
||||
|
||||
/**
|
||||
* Construct a RoleName and return it, to be used as an identifier in the
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue