From d22e4becd120e8961ebefc2e8276a8623fea1db8 Mon Sep 17 00:00:00 2001 From: Marcos Pereira Date: Mon, 23 Aug 2021 11:55:49 -0400 Subject: [PATCH 1/4] Also dilate the initial delay when testing timeouts in akka-stream-testkit --- .../stream/testkit/StreamTestKitSpec.scala | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala index d31d922100..277bcb76e4 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala @@ -8,8 +8,9 @@ import scala.concurrent.duration._ import akka.stream.scaladsl.Source import akka.stream.testkit.scaladsl.TestSink -import akka.testkit.AkkaSpec -import akka.testkit.EventFilter + +import akka.testkit._ + import akka.testkit.TestEvent.Mute import akka.testkit.TestEvent.UnMute @@ -121,13 +122,17 @@ class StreamTestKitSpec extends AkkaSpec { "#expectNextWithTimeoutPF should fail after timeout when element delayed" in { intercept[AssertionError] { val timeout = 100.millis - val overTimeout = timeout + 50.millis + // Initial delay is longer than the timeout so an exception will be thrown. + // It also needs to be dilated since the testkit will dilate the timeout + // accordingly to `-Dakka.test.timefactor` value. + val initialDelay = (timeout * 2).dilated Source - .tick(overTimeout, 1.millis, 1) + .tick(initialDelay, 1.millis, 1) .runWith(TestSink.probe) .request(1) .expectNextWithTimeoutPF(timeout, { case 1 => + system.log.info("Message received :(") }) }.getMessage should include("timeout") @@ -160,9 +165,12 @@ class StreamTestKitSpec extends AkkaSpec { "#expectNextChainingPF should fail after timeout when element delayed" in { intercept[AssertionError] { val timeout = 100.millis - val overTimeout = timeout + 50.millis + // Initial delay is longer than the timeout so an exception will be thrown. + // It also needs to be dilated since the testkit will dilate the timeout + // accordingly to `-Dakka.test.timefactor` value. + val initialDelay = (timeout * 2).dilated Source - .tick(overTimeout, 1.millis, 1) + .tick(initialDelay, 1.millis, 1) .runWith(TestSink.probe) .request(1) .expectNextChainingPF(timeout, { From 6d916df2f9e7f8f814e947aea0391672cda93a6e Mon Sep 17 00:00:00 2001 From: Marcos Pereira Date: Mon, 23 Aug 2021 14:08:51 -0400 Subject: [PATCH 2/4] Increase paradox parsing timeout from 2 to 5 seconds --- project/Paradox.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/project/Paradox.scala b/project/Paradox.scala index 91e2b76999..150c9ce7ba 100644 --- a/project/Paradox.scala +++ b/project/Paradox.scala @@ -11,6 +11,8 @@ import com.lightbend.sbt.publishrsync.PublishRsyncPlugin.autoImport._ import sbt.Keys._ import sbt._ +import scala.concurrent.duration._ + object Paradox { val propertiesSettings = Seq( @@ -69,11 +71,14 @@ object Paradox { val groupsSettings = Seq(Compile / paradoxGroups := Map("Language" -> Seq("Scala", "Java"))) + val parsingSettings = Seq(Compile / paradoxParsingTimeout := 5.seconds) + val settings = propertiesSettings ++ rootsSettings ++ includesSettings ++ groupsSettings ++ + parsingSettings ++ Seq( Compile / paradox / name := "Akka", resolvers += Resolver.jcenterRepo, From 0e31566b1a10470409efc106dd1854987dfd9d3a Mon Sep 17 00:00:00 2001 From: Marcos Pereira Date: Mon, 23 Aug 2021 14:09:10 -0400 Subject: [PATCH 3/4] Some minor syntax enhancements --- akka-docs/src/main/paradox/cluster-metrics.md | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/akka-docs/src/main/paradox/cluster-metrics.md b/akka-docs/src/main/paradox/cluster-metrics.md index 5c82307e2a..dab31ff9ff 100644 --- a/akka-docs/src/main/paradox/cluster-metrics.md +++ b/akka-docs/src/main/paradox/cluster-metrics.md @@ -24,7 +24,6 @@ akka.extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ] ## Introduction - The member nodes of the cluster can collect system health metrics and publish that to other cluster nodes and to the registered subscribers on the system event bus with the help of Cluster Metrics Extension. @@ -44,16 +43,16 @@ Certain message routing and let-it-crash functions may not work when Sigar is no Cluster metrics extension comes with two built-in collector implementations: - 1. `akka.cluster.metrics.SigarMetricsCollector`, which requires Sigar provisioning, and is more rich/precise - 2. `akka.cluster.metrics.JmxMetricsCollector`, which is used as fall back, and is less rich/precise +1. `akka.cluster.metrics.SigarMetricsCollector`, which requires Sigar provisioning, and is more rich/precise +2. `akka.cluster.metrics.JmxMetricsCollector`, which is used as fall back, and is less rich/precise You can also plug-in your own metrics collector implementation. By default, metrics extension will use collector provider fall back and will try to load them in this order: - 1. configured user-provided collector - 2. built-in `akka.cluster.metrics.SigarMetricsCollector` - 3. and finally `akka.cluster.metrics.JmxMetricsCollector` +1. configured user-provided collector +2. built-in `akka.cluster.metrics.SigarMetricsCollector` +3. and finally `akka.cluster.metrics.JmxMetricsCollector` ## Metrics Events @@ -69,14 +68,14 @@ You can subscribe your metrics listener actors to these events in order to imple Scala : @@@ vars -``` +```scala ClusterMetricsExtension(system).subscribe(metricsListenerActor) ``` @@@ Java : @@@ vars -``` +```java ClusterMetricsExtension.get(system).subscribe(metricsListenerActor); ``` @@@ @@ -123,11 +122,11 @@ The `AdaptiveLoadBalancingPool` / `AdaptiveLoadBalancingGroup` performs load bal It uses random selection of routees with probabilities derived from the remaining capacity of the corresponding node. It can be configured to use a specific MetricsSelector to produce the probabilities, a.k.a. weights: - * `heap` / `HeapMetricsSelector` - Used and max JVM heap memory. Weights based on remaining heap capacity; (max - used) / max - * `load` / `SystemLoadAverageMetricsSelector` - System load average for the past 1 minute, corresponding value can be found in `top` of Linux systems. The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores. Weights based on remaining load capacity; 1 - (load / processors) - * `cpu` / `CpuMetricsSelector` - CPU utilization in percentage, sum of User + Sys + Nice + Wait. Weights based on remaining cpu capacity; 1 - utilization - * `mix` / `MixMetricsSelector` - Combines heap, cpu and load. Weights based on mean of remaining capacity of the combined selectors. - * Any custom implementation of `akka.cluster.metrics.MetricsSelector` +* `heap` / `HeapMetricsSelector` - Used and max JVM heap memory. Weights based on remaining heap capacity; (max - used) / max +* `load` / `SystemLoadAverageMetricsSelector` - System load average for the past 1 minute, corresponding value can be found in `top` of Linux systems. The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores. Weights based on remaining load capacity; 1 - (load / processors) +* `cpu` / `CpuMetricsSelector` - CPU utilization in percentage, sum of User + Sys + Nice + Wait. Weights based on remaining cpu capacity; 1 - utilization +* `mix` / `MixMetricsSelector` - Combines heap, cpu and load. Weights based on mean of remaining capacity of the combined selectors. +* Any custom implementation of `akka.cluster.metrics.MetricsSelector` The collected metrics values are smoothed with [exponential weighted moving average](https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average). In the @ref:[Cluster configuration](cluster-usage.md#cluster-configuration) you can adjust how quickly past data is decayed compared to new data. From ec55b4290697aea1ca41e2ef02cd89d49784cf38 Mon Sep 17 00:00:00 2001 From: Marcos Pereira Date: Mon, 23 Aug 2021 20:00:28 -0400 Subject: [PATCH 4/4] Use a timefactor=2 when validating pull requests --- .github/workflows/build-test-prValidation.yml | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/.github/workflows/build-test-prValidation.yml b/.github/workflows/build-test-prValidation.yml index 5bd394353d..bab315facf 100644 --- a/.github/workflows/build-test-prValidation.yml +++ b/.github/workflows/build-test-prValidation.yml @@ -64,14 +64,8 @@ jobs: - name: Akka publishLocal run: |- sbt -jvm-opts .jvmopts-ci \ - -Dakka.mima.enabled=false \ -Dakka.ci-server=true \ - -Dakka.test.tags.exclude=performance,timing,long-running,gh-exclude \ - -Dakka.test.multi-in-test=false \ - -Dakka.test.timefactor=1 \ - -Dakka.cluster.assert=on \ -Dsbt.override.build.repos=false \ - -Dakka.test.multi-node=false \ -Dsbt.log.noformat=false \ -Dakka.log.timestamps=true \ publishLocal publishM2 @@ -102,7 +96,7 @@ jobs: -Dakka.ci-server=true \ -Dakka.test.tags.exclude=performance,timing,long-running,gh-exclude \ -Dakka.test.multi-in-test=false \ - -Dakka.test.timefactor=1 \ + -Dakka.test.timefactor=2 \ -Dakka.cluster.assert=on \ -Dsbt.override.build.repos=false \ -Dakka.test.multi-node=false \