harden StreamTestKitSpec, #24308

This commit is contained in:
Patrik Nordwall 2018-01-16 18:30:33 +01:00 committed by Johan Andrén
parent e4dd3c24fc
commit 5fdb247488

View file

@ -7,7 +7,11 @@ import akka.stream._
import akka.stream.scaladsl.Source
import akka.stream.testkit.scaladsl.TestSink
import scala.concurrent.duration._
import akka.testkit.AkkaSpec
import akka.testkit.EventFilter
import akka.testkit.TestEvent.Mute
import akka.testkit.TestEvent.UnMute
class StreamTestKitSpec extends AkkaSpec {
@ -18,33 +22,41 @@ class StreamTestKitSpec extends AkkaSpec {
"A TestSink Probe" must {
"#toStrict" in {
Source(1 to 4).runWith(TestSink.probe)
.toStrict(300.millis) should ===(List(1, 2, 3, 4))
.toStrict(remainingOrDefault) should ===(List(1, 2, 3, 4))
}
"#toStrict with failing source" in {
val error = intercept[AssertionError] {
Source.fromIterator(() new Iterator[Int] {
var i = 0
override def hasNext: Boolean = true
override def next(): Int = {
i += 1
i match {
case 3 throw ex
case n n
}
}
}).runWith(TestSink.probe)
.toStrict(300.millis)
}
system.eventStream.publish(Mute(EventFilter[Exception]()))
try {
val error = intercept[AssertionError] {
Source.fromIterator(() new Iterator[Int] {
var i = 0
error.getCause.getMessage should include("Boom!")
error.getMessage should include("List(1, 2)")
override def hasNext: Boolean = true
override def next(): Int = {
i += 1
i match {
case 3 throw ex
case n n
}
}
}).runWith(TestSink.probe)
.toStrict(remainingOrDefault)
}
error.getMessage should startWith("toStrict received OnError")
error.getMessage should include("List(1, 2)")
error.getCause should ===(ex)
} finally {
system.eventStream.publish(UnMute(EventFilter[Exception]()))
}
}
"#toStrict when subscription was already obtained" in {
val p = Source(1 to 4).runWith(TestSink.probe)
p.expectSubscription()
p.toStrict(300.millis) should ===(List(1, 2, 3, 4))
p.toStrict(remainingOrDefault) should ===(List(1, 2, 3, 4))
}
"#expectNextOrError with right element" in {