diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala index d31d922100..277bcb76e4 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala @@ -8,8 +8,9 @@ import scala.concurrent.duration._ import akka.stream.scaladsl.Source import akka.stream.testkit.scaladsl.TestSink -import akka.testkit.AkkaSpec -import akka.testkit.EventFilter + +import akka.testkit._ + import akka.testkit.TestEvent.Mute import akka.testkit.TestEvent.UnMute @@ -121,13 +122,17 @@ class StreamTestKitSpec extends AkkaSpec { "#expectNextWithTimeoutPF should fail after timeout when element delayed" in { intercept[AssertionError] { val timeout = 100.millis - val overTimeout = timeout + 50.millis + // Initial delay is longer than the timeout so an exception will be thrown. + // It also needs to be dilated since the testkit will dilate the timeout + // accordingly to `-Dakka.test.timefactor` value. + val initialDelay = (timeout * 2).dilated Source - .tick(overTimeout, 1.millis, 1) + .tick(initialDelay, 1.millis, 1) .runWith(TestSink.probe) .request(1) .expectNextWithTimeoutPF(timeout, { case 1 => + system.log.info("Message received :(") }) }.getMessage should include("timeout") @@ -160,9 +165,12 @@ class StreamTestKitSpec extends AkkaSpec { "#expectNextChainingPF should fail after timeout when element delayed" in { intercept[AssertionError] { val timeout = 100.millis - val overTimeout = timeout + 50.millis + // Initial delay is longer than the timeout so an exception will be thrown. + // It also needs to be dilated since the testkit will dilate the timeout + // accordingly to `-Dakka.test.timefactor` value. + val initialDelay = (timeout * 2).dilated Source - .tick(overTimeout, 1.millis, 1) + .tick(initialDelay, 1.millis, 1) .runWith(TestSink.probe) .request(1) .expectNextChainingPF(timeout, {