=str failed: FlowSplitWhenSpec (#23278)

This commit is contained in:
Alexander Golubev 2017-09-01 06:13:22 -04:00 committed by Johan Andrén
parent 8807833f2b
commit 3afc17ffa2

View file

@ -3,14 +3,16 @@
*/
package akka.stream.scaladsl
import akka.NotUsed
import akka.{ Done, NotUsed }
import akka.stream._
import akka.stream.Supervision.resumingDecider
import akka.stream.impl.SubscriptionTimeoutException
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
import org.reactivestreams.Publisher
import scala.concurrent.Await
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
class FlowSplitWhenSpec extends StreamSpec {
@ -257,14 +259,19 @@ class FlowSplitWhenSpec extends StreamSpec {
}
"fail substream if materialized twice" in assertAllStagesStopped {
implicit val mat = ActorMaterializer(ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 1, maxSize = 1))
import system.dispatcher
val probe = Source(1 to 5).splitWhen(_ true).lift
.map { src src.runWith(Sink.ignore)(mat).flatMap(_ src.runWith(Sink.ignore)(mat)) }
.runWith(TestSink.probe[Future[Done]])(mat)
probe.request(1)
val future = probe.requestNext()
an[IllegalStateException] mustBe thrownBy {
Await.result(
Source.single(1).splitWhen(_ true).lift
.mapAsync(1) { src src.runWith(Sink.ignore).flatMap(_ src.runWith(Sink.ignore)) } // Sink.ignore+mapAsync pipes error back
.runWith(Sink.ignore),
3.seconds)
Await.result(future, 3.seconds)
}
probe.cancel()
}
"fail stream if substream not materialized in time" in assertAllStagesStopped {