diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkForeachParallelSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkForeachParallelSpec.scala index 5b1458eb40..600b621a4b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkForeachParallelSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkForeachParallelSpec.scala @@ -3,6 +3,8 @@ */ package akka.stream.scaladsl +import java.util.concurrent.{ CountDownLatch, TimeUnit } + import akka.stream.ActorMaterializer import akka.stream.ActorAttributes._ import akka.stream.Supervision._ @@ -20,7 +22,7 @@ class SinkForeachParallelSpec extends AkkaSpec { "A ForeachParallel" must { "produce elements in the order they are ready" in assertAllStagesStopped { - implicit val ec = system.dispatcher + import system.dispatcher val probe = TestProbe() val latch = (1 to 4).map(_ → TestLatch(1)).toMap @@ -45,7 +47,7 @@ class SinkForeachParallelSpec extends AkkaSpec { } "not run more functions in parallel then specified" in { - implicit val ec = system.dispatcher + import system.dispatcher val probe = TestProbe() val latch = (1 to 5).map(_ → TestLatch()).toMap @@ -70,7 +72,7 @@ class SinkForeachParallelSpec extends AkkaSpec { } "resume after function failure" in assertAllStagesStopped { - implicit val ec = system.dispatcher + import system.dispatcher val probe = TestProbe() val latch = TestLatch(1) @@ -90,30 +92,34 @@ class SinkForeachParallelSpec extends AkkaSpec { } "finish after function thrown exception" in assertAllStagesStopped { - val probe = TestProbe() - val latch = TestLatch(1) + import system.dispatcher - implicit val ec = system.dispatcher - val p = Source(1 to 5).runWith(Sink.foreachParallel(3)((n: Int) ⇒ { - if (n == 3) throw new RuntimeException("err2") with NoStackTrace - else { + val probe = TestProbe() + val element4Latch = new CountDownLatch(1) + val errorLatch = new CountDownLatch(2) + + val p = Source.fromIterator(() ⇒ Iterator.from(1)).runWith(Sink.foreachParallel(3)((n: Int) ⇒ { + if (n == 3) { + // Error will happen only after elements 1, 2 has been processed + errorLatch.await(5, TimeUnit.SECONDS) + throw new RuntimeException("err2") with NoStackTrace + } else { probe.ref ! n - Await.ready(latch, 10.seconds) + errorLatch.countDown() + element4Latch.await(5, TimeUnit.SECONDS) // Block element 4, 5, 6, ... from entering } }).withAttributes(supervisionStrategy(stoppingDecider))) - p.onFailure { case e ⇒ assert(e.getMessage.equals("err2")); Unit } - p.onSuccess { case _ ⇒ fail() } - latch.countDown() + // Only the first two messages are guaranteed to arrive due to their enforced ordering related to the time + // of failure. probe.expectMsgAllOf(1, 2) + element4Latch.countDown() // Release elements 4, 5, 6, ... - Await.ready(p, 1.seconds) - - assert(p.isCompleted) + a[RuntimeException] must be thrownBy Await.result(p, 3.seconds) } "handle empty source" in assertAllStagesStopped { - implicit val ec = system.dispatcher + import system.dispatcher val p = Source(List.empty[Int]).runWith(Sink.foreachParallel(3)(a ⇒ ()))