Enable fatal warnings for cluster metrics and slf4j (#26678)

This commit is contained in:
Christopher Batey 2019-04-05 16:06:02 +01:00 committed by Arnout Engelen
parent 4bb60bbcc8
commit f798481de5
12 changed files with 67 additions and 48 deletions

View file

@ -39,7 +39,7 @@ trait SigarProvider {
SigarProvider.close(sigar) SigarProvider.close(sigar)
true true
} catch { } catch {
case e: Throwable => false case _: Throwable => false
} }
/** Create sigar and verify it works. */ /** Create sigar and verify it works. */

View file

@ -26,6 +26,7 @@ import akka.dispatch.Dispatchers
/** /**
* Protobuf serializer for [[akka.cluster.metrics.ClusterMetricsMessage]] types. * Protobuf serializer for [[akka.cluster.metrics.ClusterMetricsMessage]] types.
*/ */
@ccompatUsedUntil213
class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer { class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer {
private final val BufferSize = 4 * 1024 private final val BufferSize = 4 * 1024

View file

@ -147,7 +147,7 @@ abstract class ClusterMetricsDisabledSpec
//clusterView.clusterMetrics.size should ===(0) //clusterView.clusterMetrics.size should ===(0)
metricsView.clusterMetrics.size should ===(0) metricsView.clusterMetrics.size should ===(0)
ClusterMetricsExtension(system).subscribe(testActor) ClusterMetricsExtension(system).subscribe(testActor)
expectNoMsg expectNoMessage
// TODO ensure same contract // TODO ensure same contract
//clusterView.clusterMetrics.size should ===(0) //clusterView.clusterMetrics.size should ===(0)
metricsView.clusterMetrics.size should ===(0) metricsView.clusterMetrics.size should ===(0)

View file

@ -6,6 +6,7 @@ package akka.cluster.metrics
import language.postfixOps import language.postfixOps
import java.lang.management.ManagementFactory import java.lang.management.ManagementFactory
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
import com.typesafe.config.Config import com.typesafe.config.Config
@ -22,6 +23,7 @@ import akka.routing.ActorRefRoutee
import akka.routing.Routees import akka.routing.Routees
import akka.cluster.routing.ClusterRouterPool import akka.cluster.routing.ClusterRouterPool
import akka.cluster.routing.ClusterRouterPoolSettings import akka.cluster.routing.ClusterRouterPoolSettings
import akka.util.unused
object AdaptiveLoadBalancingRouterConfig extends MultiNodeConfig { object AdaptiveLoadBalancingRouterConfig extends MultiNodeConfig {
@ -105,7 +107,7 @@ object AdaptiveLoadBalancingRouterConfig extends MultiNodeConfig {
} }
class TestCustomMetricsSelector(config: Config) extends MetricsSelector { class TestCustomMetricsSelector(@unused config: Config) extends MetricsSelector {
override def weights(nodeMetrics: Set[NodeMetrics]): Map[Address, Int] = Map.empty override def weights(nodeMetrics: Set[NodeMetrics]): Map[Address, Int] = Map.empty
} }

View file

@ -11,7 +11,10 @@ import akka.testkit._
import akka.cluster.metrics.StandardMetrics._ import akka.cluster.metrics.StandardMetrics._
import akka.cluster.Cluster import akka.cluster.Cluster
class MetricsExtensionSpec extends AkkaSpec(MetricsConfig.clusterSigarMock) with ImplicitSender with RedirectLogging { class ClusterMetricsExtensionSpec
extends AkkaSpec(MetricsConfig.clusterSigarMock)
with ImplicitSender
with RedirectLogging {
val cluster = Cluster(system) val cluster = Cluster(system)
@ -49,7 +52,7 @@ class MetricsExtensionSpec extends AkkaSpec(MetricsConfig.clusterSigarMock) with
extension.supervisor ! CollectionStopMessage extension.supervisor ! CollectionStopMessage
awaitSample() awaitSample()
metricsNodeCount should ===(nodeCount) metricsNodeCount should ===(nodeCount)
metricsHistorySize should be >= (sampleCount) metricsHistorySize should be >= sampleCount
} }
"verify sigar mock data matches expected ewma data" in { "verify sigar mock data matches expected ewma data" in {
@ -95,12 +98,12 @@ class MetricsExtensionSpec extends AkkaSpec(MetricsConfig.clusterSigarMock) with
extension.supervisor ! CollectionStartMessage extension.supervisor ! CollectionStartMessage
awaitSample() awaitSample()
val size3 = metricsHistorySize val size3 = metricsHistorySize
size3 should be > (size2) size3 should be > size2
extension.supervisor ! CollectionStopMessage extension.supervisor ! CollectionStopMessage
awaitSample() awaitSample()
val size4 = metricsHistorySize val size4 = metricsHistorySize
size4 should be >= (size3) size4 should be >= size3
awaitSample() awaitSample()
val size5 = metricsHistorySize val size5 = metricsHistorySize
@ -108,7 +111,7 @@ class MetricsExtensionSpec extends AkkaSpec(MetricsConfig.clusterSigarMock) with
} }
(1 to 3).foreach { step => (1 to 3).foreach { _ =>
cycle() cycle()
} }

View file

@ -8,6 +8,9 @@ import scala.concurrent.duration._
import akka.testkit.{ AkkaSpec, LongRunningTest } import akka.testkit.{ AkkaSpec, LongRunningTest }
import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.ThreadLocalRandom
import com.github.ghik.silencer.silent
@silent
class EWMASpec extends AkkaSpec(MetricsConfig.defaultEnabled) with MetricsCollectorFactory { class EWMASpec extends AkkaSpec(MetricsConfig.defaultEnabled) with MetricsCollectorFactory {
val collector = createMetricsCollector val collector = createMetricsCollector

View file

@ -7,12 +7,16 @@ package akka.cluster.metrics
import org.scalatest.WordSpec import org.scalatest.WordSpec
import org.scalatest.Matchers import org.scalatest.Matchers
import akka.cluster.metrics.StandardMetrics._ import akka.cluster.metrics.StandardMetrics._
import scala.util.Failure import scala.util.Failure
import akka.actor.Address import akka.actor.Address
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender import akka.testkit.ImplicitSender
import java.lang.System.{ currentTimeMillis => newTimestamp } import java.lang.System.{ currentTimeMillis => newTimestamp }
import com.github.ghik.silencer.silent
@silent
class MetricNumericConverterSpec extends WordSpec with Matchers with MetricNumericConverter { class MetricNumericConverterSpec extends WordSpec with Matchers with MetricNumericConverter {
"MetricNumericConverter" must { "MetricNumericConverter" must {
@ -51,6 +55,7 @@ class MetricNumericConverterSpec extends WordSpec with Matchers with MetricNumer
} }
} }
@silent
class NodeMetricsSpec extends WordSpec with Matchers { class NodeMetricsSpec extends WordSpec with Matchers {
val node1 = Address("akka.tcp", "sys", "a", 2554) val node1 = Address("akka.tcp", "sys", "a", 2554)
@ -149,8 +154,8 @@ class MetricsGossipSpec
val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics) val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics) val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
m1.metrics.size should be > (3) m1.metrics.size should be > 3
m2.metrics.size should be > (3) m2.metrics.size should be > 3
val g1 = MetricsGossip.empty :+ m1 val g1 = MetricsGossip.empty :+ m1
g1.nodes.size should ===(1) g1.nodes.size should ===(1)
@ -168,7 +173,6 @@ class MetricsGossipSpec
val g1 = MetricsGossip.empty :+ m1 :+ m2 val g1 = MetricsGossip.empty :+ m1 :+ m2
g1.nodes.size should ===(2) g1.nodes.size should ===(2)
val beforeMergeNodes = g1.nodes
val m2Updated = m2.copy(metrics = newSample(m2.metrics), timestamp = m2.timestamp + 1000) val m2Updated = m2.copy(metrics = newSample(m2.metrics), timestamp = m2.timestamp + 1000)
val g2 = g1 :+ m2Updated // merge peers val g2 = g1 :+ m2Updated // merge peers
@ -195,7 +199,7 @@ class MetricsGossipSpec
mergedGossip.nodeMetricsFor(m1.address).map(_.metrics) should ===(Some(m1.metrics)) mergedGossip.nodeMetricsFor(m1.address).map(_.metrics) should ===(Some(m1.metrics))
mergedGossip.nodeMetricsFor(m2.address).map(_.metrics) should ===(Some(m2Updated.metrics)) mergedGossip.nodeMetricsFor(m2.address).map(_.metrics) should ===(Some(m2Updated.metrics))
mergedGossip.nodeMetricsFor(m3.address).map(_.metrics) should ===(Some(m3.metrics)) mergedGossip.nodeMetricsFor(m3.address).map(_.metrics) should ===(Some(m3.metrics))
mergedGossip.nodes.foreach(_.metrics.size should be > (3)) mergedGossip.nodes.foreach(_.metrics.size should be > 3)
mergedGossip.nodeMetricsFor(m2.address).map(_.timestamp) should ===(Some(m2Updated.timestamp)) mergedGossip.nodeMetricsFor(m2.address).map(_.timestamp) should ===(Some(m2Updated.timestamp))
} }
@ -233,6 +237,7 @@ class MetricsGossipSpec
} }
} }
@silent
class MetricValuesSpec extends AkkaSpec(MetricsConfig.defaultEnabled) with MetricsCollectorFactory { class MetricValuesSpec extends AkkaSpec(MetricsConfig.defaultEnabled) with MetricsCollectorFactory {
import akka.cluster.metrics.StandardMetrics._ import akka.cluster.metrics.StandardMetrics._
@ -256,37 +261,37 @@ class MetricValuesSpec extends AkkaSpec(MetricsConfig.defaultEnabled) with Metri
"extract expected metrics for load balancing" in { "extract expected metrics for load balancing" in {
val stream1 = node2.metric(HeapMemoryCommitted).get.value.longValue val stream1 = node2.metric(HeapMemoryCommitted).get.value.longValue
val stream2 = node1.metric(HeapMemoryUsed).get.value.longValue val stream2 = node1.metric(HeapMemoryUsed).get.value.longValue
stream1 should be >= (stream2) stream1 should be >= stream2
} }
"extract expected MetricValue types for load balancing" in { "extract expected MetricValue types for load balancing" in {
nodes.foreach { node => nodes.foreach { node =>
node match { node match {
case HeapMemory(address, _, used, committed, _) => case HeapMemory(_, _, used, committed, _) =>
used should be > (0L) used should be > 0L
committed should be >= (used) committed should be >= used
// Documentation java.lang.management.MemoryUsage says that committed <= max, // Documentation java.lang.management.MemoryUsage says that committed <= max,
// but in practice that is not always true (we have seen it happen). Therefore // but in practice that is not always true (we have seen it happen). Therefore
// we don't check the heap max value in this test. // we don't check the heap max value in this test.
// extract is the java api // extract is the java api
StandardMetrics.extractHeapMemory(node) should not be (null) StandardMetrics.extractHeapMemory(node) should not be null
} }
node match { node match {
case Cpu(address, _, systemLoadAverageOption, cpuCombinedOption, cpuStolenOption, processors) => case Cpu(_, _, systemLoadAverageOption, cpuCombinedOption, cpuStolenOption, processors) =>
processors should be > (0) processors should be > 0
if (systemLoadAverageOption.isDefined) if (systemLoadAverageOption.isDefined)
systemLoadAverageOption.get should be >= (0.0) systemLoadAverageOption.get should be >= 0.0
if (cpuCombinedOption.isDefined) { if (cpuCombinedOption.isDefined) {
cpuCombinedOption.get should be <= (1.0) cpuCombinedOption.get should be <= 1.0
cpuCombinedOption.get should be >= (0.0) cpuCombinedOption.get should be >= 0.0
} }
if (cpuStolenOption.isDefined) { if (cpuStolenOption.isDefined) {
cpuStolenOption.get should be <= (1.0) cpuStolenOption.get should be <= 1.0
cpuStolenOption.get should be >= (0.0) cpuStolenOption.get should be >= 0.0
} }
// extract is the java api // extract is the java api
StandardMetrics.extractCpu(node) should not be (null) StandardMetrics.extractCpu(node) should not be null
} }
} }
} }

View file

@ -5,13 +5,13 @@
package akka.cluster.metrics package akka.cluster.metrics
import scala.language.postfixOps import scala.language.postfixOps
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.{ Try } import scala.util.Try
import akka.testkit._ import akka.testkit._
import akka.cluster.metrics.StandardMetrics._ import akka.cluster.metrics.StandardMetrics._
import com.github.ghik.silencer.silent
@silent
class MetricsCollectorSpec class MetricsCollectorSpec
extends AkkaSpec(MetricsConfig.defaultEnabled) extends AkkaSpec(MetricsConfig.defaultEnabled)
with ImplicitSender with ImplicitSender
@ -22,10 +22,10 @@ class MetricsCollectorSpec
"Metric must" must { "Metric must" must {
"merge 2 metrics that are tracking the same metric" in { "merge 2 metrics that are tracking the same metric" in {
for (i <- 1 to 20) { for (_ <- 1 to 20) {
val sample1 = collector.sample.metrics val sample1 = collector.sample.metrics
val sample2 = collector.sample.metrics val sample2 = collector.sample.metrics
val merged12 = sample2.flatMap(latest => sample2.flatMap(latest =>
sample1.collect { sample1.collect {
case peer if latest.sameAs(peer) => case peer if latest.sameAs(peer) =>
val m = peer :+ latest val m = peer :+ latest
@ -36,7 +36,7 @@ class MetricsCollectorSpec
val sample3 = collector.sample.metrics val sample3 = collector.sample.metrics
val sample4 = collector.sample.metrics val sample4 = collector.sample.metrics
val merged34 = sample4.flatMap(latest => sample4.flatMap(latest =>
sample3.collect { sample3.collect {
case peer if latest.sameAs(peer) => case peer if latest.sameAs(peer) =>
val m = peer :+ latest val m = peer :+ latest
@ -60,20 +60,22 @@ class MetricsCollectorSpec
val used = metrics.collectFirst { case (HeapMemoryUsed, b) => b } val used = metrics.collectFirst { case (HeapMemoryUsed, b) => b }
val committed = metrics.collectFirst { case (HeapMemoryCommitted, b) => b } val committed = metrics.collectFirst { case (HeapMemoryCommitted, b) => b }
metrics.foreach { metrics.foreach {
case (SystemLoadAverage, b) => b.doubleValue should be >= (0.0) case (SystemLoadAverage, b) => b.doubleValue should be >= 0.0
case (Processors, b) => b.intValue should be >= (0) case (Processors, b) => b.intValue should be >= 0
case (HeapMemoryUsed, b) => b.longValue should be >= (0L) case (HeapMemoryUsed, b) => b.longValue should be >= 0L
case (HeapMemoryCommitted, b) => b.longValue should be > (0L) case (HeapMemoryCommitted, b) => b.longValue should be > 0L
case (HeapMemoryMax, b) => case (HeapMemoryMax, b) =>
b.longValue should be > (0L) b.longValue should be > 0L
used.get.longValue should be <= (b.longValue) used.get.longValue should be <= b.longValue
committed.get.longValue should be <= (b.longValue) committed.get.longValue should be <= b.longValue
case (CpuCombined, b) => case (CpuCombined, b) =>
b.doubleValue should be <= (1.0) b.doubleValue should be <= 1.0
b.doubleValue should be >= (0.0) b.doubleValue should be >= 0.0
case (CpuStolen, b) => case (CpuStolen, b) =>
b.doubleValue should be <= (1.0) b.doubleValue should be <= 1.0
b.doubleValue should be >= (0.0) b.doubleValue should be >= 0.0
case unexpected =>
fail(s"Unexpected metric type $unexpected")
} }
} }
@ -92,7 +94,7 @@ class MetricsCollectorSpec
"collect 50 node metrics samples in an acceptable duration" taggedAs LongRunningTest in within(10 seconds) { "collect 50 node metrics samples in an acceptable duration" taggedAs LongRunningTest in within(10 seconds) {
(1 to 50).foreach { _ => (1 to 50).foreach { _ =>
val sample = collector.sample val sample = collector.sample
sample.metrics.size should be >= (3) sample.metrics.size should be >= 3
Thread.sleep(100) Thread.sleep(100)
} }
} }

View file

@ -21,7 +21,7 @@ import akka.actor.Deploy
import akka.dispatch.UnboundedMessageQueueSemantics import akka.dispatch.UnboundedMessageQueueSemantics
import akka.actor.PoisonPill import akka.actor.PoisonPill
import akka.actor.ActorLogging import akka.actor.ActorLogging
import org.scalatest.mock.MockitoSugar import org.scalatestplus.mockito.MockitoSugar
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import akka.remote.RARP import akka.remote.RARP

View file

@ -8,7 +8,7 @@ import org.slf4j.{ MDC, Marker, MarkerFactory, Logger => SLFLogger, LoggerFactor
import akka.event.Logging._ import akka.event.Logging._
import akka.actor._ import akka.actor._
import akka.event.{ LogMarker, _ } import akka.event.{ LogMarker, _ }
import akka.util.Helpers import akka.util.{ unused, Helpers }
import akka.dispatch.RequiresMessageQueue import akka.dispatch.RequiresMessageQueue
/** /**
@ -146,7 +146,8 @@ class Slf4jLogger extends Actor with SLF4JLogging with RequiresMessageQueue[Logg
* backend configuration (e.g. logback.xml) to filter log events before publishing * backend configuration (e.g. logback.xml) to filter log events before publishing
* the log events to the `eventStream`. * the log events to the `eventStream`.
*/ */
class Slf4jLoggingFilter(settings: ActorSystem.Settings, eventStream: EventStream) extends LoggingFilterWithMarker { class Slf4jLoggingFilter(@unused settings: ActorSystem.Settings, eventStream: EventStream)
extends LoggingFilterWithMarker {
def isErrorEnabled(logClass: Class[_], logSource: String) = def isErrorEnabled(logClass: Class[_], logSource: String) =
(eventStream.logLevel >= ErrorLevel) && Logger(logClass, logSource).isErrorEnabled (eventStream.logLevel >= ErrorLevel) && Logger(logClass, logSource).isErrorEnabled
def isWarningEnabled(logClass: Class[_], logSource: String) = def isWarningEnabled(logClass: Class[_], logSource: String) =

View file

@ -107,7 +107,7 @@ class Slf4jLoggingFilterSpec extends AkkaSpec(Slf4jLoggingFilterSpec.config) wit
val debugLevelProducer = system.actorOf(Props[WarningLevelProducer], name = "warningLevelProducer") val debugLevelProducer = system.actorOf(Props[WarningLevelProducer], name = "warningLevelProducer")
debugLevelProducer ! "test2" debugLevelProducer ! "test2"
probe.expectMsgType[Warning].message should be("test2") probe.expectMsgType[Warning].message should be("test2")
probe.expectNoMsg(500.millis) probe.expectNoMessage(500.millis)
} }
} }

View file

@ -28,7 +28,9 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport {
"akka-persistence", "akka-persistence",
"akka-cluster-tools", "akka-cluster-tools",
"akka-cluster-sharding", "akka-cluster-sharding",
"akka-stream") "akka-stream",
"akka-cluster-metrics",
"akka-slf4j")
val strictProjects = Set("akka-discovery", "akka-protobuf", "akka-coordination") val strictProjects = Set("akka-discovery", "akka-protobuf", "akka-coordination")