diff --git a/.github/workflows/build-test-prValidation.yml b/.github/workflows/build-test-prValidation.yml index 5bd394353d..bab315facf 100644 --- a/.github/workflows/build-test-prValidation.yml +++ b/.github/workflows/build-test-prValidation.yml @@ -64,14 +64,8 @@ jobs: - name: Akka publishLocal run: |- sbt -jvm-opts .jvmopts-ci \ - -Dakka.mima.enabled=false \ -Dakka.ci-server=true \ - -Dakka.test.tags.exclude=performance,timing,long-running,gh-exclude \ - -Dakka.test.multi-in-test=false \ - -Dakka.test.timefactor=1 \ - -Dakka.cluster.assert=on \ -Dsbt.override.build.repos=false \ - -Dakka.test.multi-node=false \ -Dsbt.log.noformat=false \ -Dakka.log.timestamps=true \ publishLocal publishM2 @@ -102,7 +96,7 @@ jobs: -Dakka.ci-server=true \ -Dakka.test.tags.exclude=performance,timing,long-running,gh-exclude \ -Dakka.test.multi-in-test=false \ - -Dakka.test.timefactor=1 \ + -Dakka.test.timefactor=2 \ -Dakka.cluster.assert=on \ -Dsbt.override.build.repos=false \ -Dakka.test.multi-node=false \ diff --git a/akka-docs/src/main/paradox/cluster-metrics.md b/akka-docs/src/main/paradox/cluster-metrics.md index 5c82307e2a..dab31ff9ff 100644 --- a/akka-docs/src/main/paradox/cluster-metrics.md +++ b/akka-docs/src/main/paradox/cluster-metrics.md @@ -24,7 +24,6 @@ akka.extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ] ## Introduction - The member nodes of the cluster can collect system health metrics and publish that to other cluster nodes and to the registered subscribers on the system event bus with the help of Cluster Metrics Extension. @@ -44,16 +43,16 @@ Certain message routing and let-it-crash functions may not work when Sigar is no Cluster metrics extension comes with two built-in collector implementations: - 1. `akka.cluster.metrics.SigarMetricsCollector`, which requires Sigar provisioning, and is more rich/precise - 2. `akka.cluster.metrics.JmxMetricsCollector`, which is used as fall back, and is less rich/precise +1. `akka.cluster.metrics.SigarMetricsCollector`, which requires Sigar provisioning, and is more rich/precise +2. `akka.cluster.metrics.JmxMetricsCollector`, which is used as fall back, and is less rich/precise You can also plug-in your own metrics collector implementation. By default, metrics extension will use collector provider fall back and will try to load them in this order: - 1. configured user-provided collector - 2. built-in `akka.cluster.metrics.SigarMetricsCollector` - 3. and finally `akka.cluster.metrics.JmxMetricsCollector` +1. configured user-provided collector +2. built-in `akka.cluster.metrics.SigarMetricsCollector` +3. and finally `akka.cluster.metrics.JmxMetricsCollector` ## Metrics Events @@ -69,14 +68,14 @@ You can subscribe your metrics listener actors to these events in order to imple Scala : @@@ vars -``` +```scala ClusterMetricsExtension(system).subscribe(metricsListenerActor) ``` @@@ Java : @@@ vars -``` +```java ClusterMetricsExtension.get(system).subscribe(metricsListenerActor); ``` @@@ @@ -123,11 +122,11 @@ The `AdaptiveLoadBalancingPool` / `AdaptiveLoadBalancingGroup` performs load bal It uses random selection of routees with probabilities derived from the remaining capacity of the corresponding node. It can be configured to use a specific MetricsSelector to produce the probabilities, a.k.a. weights: - * `heap` / `HeapMetricsSelector` - Used and max JVM heap memory. Weights based on remaining heap capacity; (max - used) / max - * `load` / `SystemLoadAverageMetricsSelector` - System load average for the past 1 minute, corresponding value can be found in `top` of Linux systems. The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores. Weights based on remaining load capacity; 1 - (load / processors) - * `cpu` / `CpuMetricsSelector` - CPU utilization in percentage, sum of User + Sys + Nice + Wait. Weights based on remaining cpu capacity; 1 - utilization - * `mix` / `MixMetricsSelector` - Combines heap, cpu and load. Weights based on mean of remaining capacity of the combined selectors. - * Any custom implementation of `akka.cluster.metrics.MetricsSelector` +* `heap` / `HeapMetricsSelector` - Used and max JVM heap memory. Weights based on remaining heap capacity; (max - used) / max +* `load` / `SystemLoadAverageMetricsSelector` - System load average for the past 1 minute, corresponding value can be found in `top` of Linux systems. The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores. Weights based on remaining load capacity; 1 - (load / processors) +* `cpu` / `CpuMetricsSelector` - CPU utilization in percentage, sum of User + Sys + Nice + Wait. Weights based on remaining cpu capacity; 1 - utilization +* `mix` / `MixMetricsSelector` - Combines heap, cpu and load. Weights based on mean of remaining capacity of the combined selectors. +* Any custom implementation of `akka.cluster.metrics.MetricsSelector` The collected metrics values are smoothed with [exponential weighted moving average](https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average). In the @ref:[Cluster configuration](cluster-usage.md#cluster-configuration) you can adjust how quickly past data is decayed compared to new data. diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala index d31d922100..277bcb76e4 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala @@ -8,8 +8,9 @@ import scala.concurrent.duration._ import akka.stream.scaladsl.Source import akka.stream.testkit.scaladsl.TestSink -import akka.testkit.AkkaSpec -import akka.testkit.EventFilter + +import akka.testkit._ + import akka.testkit.TestEvent.Mute import akka.testkit.TestEvent.UnMute @@ -121,13 +122,17 @@ class StreamTestKitSpec extends AkkaSpec { "#expectNextWithTimeoutPF should fail after timeout when element delayed" in { intercept[AssertionError] { val timeout = 100.millis - val overTimeout = timeout + 50.millis + // Initial delay is longer than the timeout so an exception will be thrown. + // It also needs to be dilated since the testkit will dilate the timeout + // accordingly to `-Dakka.test.timefactor` value. + val initialDelay = (timeout * 2).dilated Source - .tick(overTimeout, 1.millis, 1) + .tick(initialDelay, 1.millis, 1) .runWith(TestSink.probe) .request(1) .expectNextWithTimeoutPF(timeout, { case 1 => + system.log.info("Message received :(") }) }.getMessage should include("timeout") @@ -160,9 +165,12 @@ class StreamTestKitSpec extends AkkaSpec { "#expectNextChainingPF should fail after timeout when element delayed" in { intercept[AssertionError] { val timeout = 100.millis - val overTimeout = timeout + 50.millis + // Initial delay is longer than the timeout so an exception will be thrown. + // It also needs to be dilated since the testkit will dilate the timeout + // accordingly to `-Dakka.test.timefactor` value. + val initialDelay = (timeout * 2).dilated Source - .tick(overTimeout, 1.millis, 1) + .tick(initialDelay, 1.millis, 1) .runWith(TestSink.probe) .request(1) .expectNextChainingPF(timeout, { diff --git a/project/Paradox.scala b/project/Paradox.scala index 91e2b76999..150c9ce7ba 100644 --- a/project/Paradox.scala +++ b/project/Paradox.scala @@ -11,6 +11,8 @@ import com.lightbend.sbt.publishrsync.PublishRsyncPlugin.autoImport._ import sbt.Keys._ import sbt._ +import scala.concurrent.duration._ + object Paradox { val propertiesSettings = Seq( @@ -69,11 +71,14 @@ object Paradox { val groupsSettings = Seq(Compile / paradoxGroups := Map("Language" -> Seq("Scala", "Java"))) + val parsingSettings = Seq(Compile / paradoxParsingTimeout := 5.seconds) + val settings = propertiesSettings ++ rootsSettings ++ includesSettings ++ groupsSettings ++ + parsingSettings ++ Seq( Compile / paradox / name := "Akka", resolvers += Resolver.jcenterRepo,