diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala index 41906b4468..14eed3d8cc 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala @@ -3,14 +3,16 @@ */ package akka.stream.scaladsl -import akka.NotUsed +import akka.{ Done, NotUsed } import akka.stream._ import akka.stream.Supervision.resumingDecider import akka.stream.impl.SubscriptionTimeoutException import akka.stream.testkit.Utils._ import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSink import org.reactivestreams.Publisher -import scala.concurrent.Await + +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ class FlowSplitWhenSpec extends StreamSpec { @@ -257,14 +259,19 @@ class FlowSplitWhenSpec extends StreamSpec { } "fail substream if materialized twice" in assertAllStagesStopped { + implicit val mat = ActorMaterializer(ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 1, maxSize = 1)) + import system.dispatcher + val probe = Source(1 to 5).splitWhen(_ ⇒ true).lift + .map { src ⇒ src.runWith(Sink.ignore)(mat).flatMap(_ ⇒ src.runWith(Sink.ignore)(mat)) } + .runWith(TestSink.probe[Future[Done]])(mat) + probe.request(1) + val future = probe.requestNext() an[IllegalStateException] mustBe thrownBy { - Await.result( - Source.single(1).splitWhen(_ ⇒ true).lift - .mapAsync(1) { src ⇒ src.runWith(Sink.ignore).flatMap(_ ⇒ src.runWith(Sink.ignore)) } // Sink.ignore+mapAsync pipes error back - .runWith(Sink.ignore), - 3.seconds) + Await.result(future, 3.seconds) } + probe.cancel() } "fail stream if substream not materialized in time" in assertAllStagesStopped {