Merge pull request #30561 from marcospereira/issues/30559/fix-stream-test-kit-specs

Also dilate the initial delay when testing timeouts in akka-stream-testkit
This commit is contained in:
Marcos Pereira 2021-08-24 10:34:14 -04:00 committed by GitHub
commit 17f8ccb9be
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 32 additions and 26 deletions

View file

@ -64,14 +64,8 @@ jobs:
- name: Akka publishLocal
run: |-
sbt -jvm-opts .jvmopts-ci \
-Dakka.mima.enabled=false \
-Dakka.ci-server=true \
-Dakka.test.tags.exclude=performance,timing,long-running,gh-exclude \
-Dakka.test.multi-in-test=false \
-Dakka.test.timefactor=1 \
-Dakka.cluster.assert=on \
-Dsbt.override.build.repos=false \
-Dakka.test.multi-node=false \
-Dsbt.log.noformat=false \
-Dakka.log.timestamps=true \
publishLocal publishM2
@ -102,7 +96,7 @@ jobs:
-Dakka.ci-server=true \
-Dakka.test.tags.exclude=performance,timing,long-running,gh-exclude \
-Dakka.test.multi-in-test=false \
-Dakka.test.timefactor=1 \
-Dakka.test.timefactor=2 \
-Dakka.cluster.assert=on \
-Dsbt.override.build.repos=false \
-Dakka.test.multi-node=false \

View file

@ -24,7 +24,6 @@ akka.extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ]
## Introduction
The member nodes of the cluster can collect system health metrics and publish that to other cluster nodes
and to the registered subscribers on the system event bus with the help of Cluster Metrics Extension.
@ -44,16 +43,16 @@ Certain message routing and let-it-crash functions may not work when Sigar is no
Cluster metrics extension comes with two built-in collector implementations:
1. `akka.cluster.metrics.SigarMetricsCollector`, which requires Sigar provisioning, and is more rich/precise
2. `akka.cluster.metrics.JmxMetricsCollector`, which is used as fall back, and is less rich/precise
1. `akka.cluster.metrics.SigarMetricsCollector`, which requires Sigar provisioning, and is more rich/precise
2. `akka.cluster.metrics.JmxMetricsCollector`, which is used as fall back, and is less rich/precise
You can also plug-in your own metrics collector implementation.
By default, metrics extension will use collector provider fall back and will try to load them in this order:
1. configured user-provided collector
2. built-in `akka.cluster.metrics.SigarMetricsCollector`
3. and finally `akka.cluster.metrics.JmxMetricsCollector`
1. configured user-provided collector
2. built-in `akka.cluster.metrics.SigarMetricsCollector`
3. and finally `akka.cluster.metrics.JmxMetricsCollector`
## Metrics Events
@ -69,14 +68,14 @@ You can subscribe your metrics listener actors to these events in order to imple
Scala
: @@@ vars
```
```scala
ClusterMetricsExtension(system).subscribe(metricsListenerActor)
```
@@@
Java
: @@@ vars
```
```java
ClusterMetricsExtension.get(system).subscribe(metricsListenerActor);
```
@@@
@ -123,11 +122,11 @@ The `AdaptiveLoadBalancingPool` / `AdaptiveLoadBalancingGroup` performs load bal
It uses random selection of routees with probabilities derived from the remaining capacity of the corresponding node.
It can be configured to use a specific MetricsSelector to produce the probabilities, a.k.a. weights:
* `heap` / `HeapMetricsSelector` - Used and max JVM heap memory. Weights based on remaining heap capacity; (max - used) / max
* `load` / `SystemLoadAverageMetricsSelector` - System load average for the past 1 minute, corresponding value can be found in `top` of Linux systems. The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores. Weights based on remaining load capacity; 1 - (load / processors)
* `cpu` / `CpuMetricsSelector` - CPU utilization in percentage, sum of User + Sys + Nice + Wait. Weights based on remaining cpu capacity; 1 - utilization
* `mix` / `MixMetricsSelector` - Combines heap, cpu and load. Weights based on mean of remaining capacity of the combined selectors.
* Any custom implementation of `akka.cluster.metrics.MetricsSelector`
* `heap` / `HeapMetricsSelector` - Used and max JVM heap memory. Weights based on remaining heap capacity; (max - used) / max
* `load` / `SystemLoadAverageMetricsSelector` - System load average for the past 1 minute, corresponding value can be found in `top` of Linux systems. The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores. Weights based on remaining load capacity; 1 - (load / processors)
* `cpu` / `CpuMetricsSelector` - CPU utilization in percentage, sum of User + Sys + Nice + Wait. Weights based on remaining cpu capacity; 1 - utilization
* `mix` / `MixMetricsSelector` - Combines heap, cpu and load. Weights based on mean of remaining capacity of the combined selectors.
* Any custom implementation of `akka.cluster.metrics.MetricsSelector`
The collected metrics values are smoothed with [exponential weighted moving average](https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average). In the @ref:[Cluster configuration](cluster-usage.md#cluster-configuration) you can adjust how quickly past data is decayed compared to new data.

View file

@ -8,8 +8,9 @@ import scala.concurrent.duration._
import akka.stream.scaladsl.Source
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.AkkaSpec
import akka.testkit.EventFilter
import akka.testkit._
import akka.testkit.TestEvent.Mute
import akka.testkit.TestEvent.UnMute
@ -121,13 +122,17 @@ class StreamTestKitSpec extends AkkaSpec {
"#expectNextWithTimeoutPF should fail after timeout when element delayed" in {
intercept[AssertionError] {
val timeout = 100.millis
val overTimeout = timeout + 50.millis
// Initial delay is longer than the timeout so an exception will be thrown.
// It also needs to be dilated since the testkit will dilate the timeout
// accordingly to `-Dakka.test.timefactor` value.
val initialDelay = (timeout * 2).dilated
Source
.tick(overTimeout, 1.millis, 1)
.tick(initialDelay, 1.millis, 1)
.runWith(TestSink.probe)
.request(1)
.expectNextWithTimeoutPF(timeout, {
case 1 =>
system.log.info("Message received :(")
})
}.getMessage should include("timeout")
@ -160,9 +165,12 @@ class StreamTestKitSpec extends AkkaSpec {
"#expectNextChainingPF should fail after timeout when element delayed" in {
intercept[AssertionError] {
val timeout = 100.millis
val overTimeout = timeout + 50.millis
// Initial delay is longer than the timeout so an exception will be thrown.
// It also needs to be dilated since the testkit will dilate the timeout
// accordingly to `-Dakka.test.timefactor` value.
val initialDelay = (timeout * 2).dilated
Source
.tick(overTimeout, 1.millis, 1)
.tick(initialDelay, 1.millis, 1)
.runWith(TestSink.probe)
.request(1)
.expectNextChainingPF(timeout, {

View file

@ -11,6 +11,8 @@ import com.lightbend.sbt.publishrsync.PublishRsyncPlugin.autoImport._
import sbt.Keys._
import sbt._
import scala.concurrent.duration._
object Paradox {
val propertiesSettings = Seq(
@ -69,11 +71,14 @@ object Paradox {
val groupsSettings = Seq(Compile / paradoxGroups := Map("Language" -> Seq("Scala", "Java")))
val parsingSettings = Seq(Compile / paradoxParsingTimeout := 5.seconds)
val settings =
propertiesSettings ++
rootsSettings ++
includesSettings ++
groupsSettings ++
parsingSettings ++
Seq(
Compile / paradox / name := "Akka",
resolvers += Resolver.jcenterRepo,