#20487: Fix race in FlowForeachParallelSpec

This commit is contained in:
Endre Sándor Varga 2016-06-27 16:11:46 +02:00
parent 1ca40730ef
commit 9e2da7be32

View file

@ -3,6 +3,8 @@
*/
package akka.stream.scaladsl
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.stream.ActorMaterializer
import akka.stream.ActorAttributes._
import akka.stream.Supervision._
@ -20,7 +22,7 @@ class SinkForeachParallelSpec extends AkkaSpec {
"A ForeachParallel" must {
"produce elements in the order they are ready" in assertAllStagesStopped {
implicit val ec = system.dispatcher
import system.dispatcher
val probe = TestProbe()
val latch = (1 to 4).map(_ TestLatch(1)).toMap
@ -45,7 +47,7 @@ class SinkForeachParallelSpec extends AkkaSpec {
}
"not run more functions in parallel then specified" in {
implicit val ec = system.dispatcher
import system.dispatcher
val probe = TestProbe()
val latch = (1 to 5).map(_ TestLatch()).toMap
@ -70,7 +72,7 @@ class SinkForeachParallelSpec extends AkkaSpec {
}
"resume after function failure" in assertAllStagesStopped {
implicit val ec = system.dispatcher
import system.dispatcher
val probe = TestProbe()
val latch = TestLatch(1)
@ -90,30 +92,34 @@ class SinkForeachParallelSpec extends AkkaSpec {
}
"finish after function thrown exception" in assertAllStagesStopped {
val probe = TestProbe()
val latch = TestLatch(1)
import system.dispatcher
implicit val ec = system.dispatcher
val p = Source(1 to 5).runWith(Sink.foreachParallel(3)((n: Int) {
if (n == 3) throw new RuntimeException("err2") with NoStackTrace
else {
val probe = TestProbe()
val element4Latch = new CountDownLatch(1)
val errorLatch = new CountDownLatch(2)
val p = Source.fromIterator(() Iterator.from(1)).runWith(Sink.foreachParallel(3)((n: Int) {
if (n == 3) {
// Error will happen only after elements 1, 2 has been processed
errorLatch.await(5, TimeUnit.SECONDS)
throw new RuntimeException("err2") with NoStackTrace
} else {
probe.ref ! n
Await.ready(latch, 10.seconds)
errorLatch.countDown()
element4Latch.await(5, TimeUnit.SECONDS) // Block element 4, 5, 6, ... from entering
}
}).withAttributes(supervisionStrategy(stoppingDecider)))
p.onFailure { case e assert(e.getMessage.equals("err2")); Unit }
p.onSuccess { case _ fail() }
latch.countDown()
// Only the first two messages are guaranteed to arrive due to their enforced ordering related to the time
// of failure.
probe.expectMsgAllOf(1, 2)
element4Latch.countDown() // Release elements 4, 5, 6, ...
Await.ready(p, 1.seconds)
assert(p.isCompleted)
a[RuntimeException] must be thrownBy Await.result(p, 3.seconds)
}
"handle empty source" in assertAllStagesStopped {
implicit val ec = system.dispatcher
import system.dispatcher
val p = Source(List.empty[Int]).runWith(Sink.foreachParallel(3)(a ()))