adding stage completion test to intersperse

This commit is contained in:
lolski 2015-12-11 23:57:56 +08:00
parent 3152a6d15f
commit 967c17e7d1

View file

@ -4,7 +4,8 @@
package akka.stream.scaladsl
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.Utils.assertAllStagesStopped
import akka.stream.testkit.scaladsl.{ TestSource, TestSink }
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import org.scalatest.concurrent.ScalaFutures
@ -18,7 +19,6 @@ class FlowIntersperseSpec extends AkkaSpec with ScalaFutures {
implicit val materializer = ActorMaterializer(settings)
"A Intersperse" must {
"inject element between existing elements" in {
val probe = Source(List(1, 2, 3))
.map(_.toString)
@ -74,6 +74,29 @@ class FlowIntersperseSpec extends AkkaSpec with ScalaFutures {
probe.expectSubscription()
probe.toStrict(1.second).mkString("") should ===(List(1).mkString("[", ",", "]"))
}
"complete the stage when the Source has been completed" in {
val (p1, p2) = TestSource.probe[String].intersperse(",").toMat(TestSink.probe[String])(Keep.both).run
p2.request(10)
p1.sendNext("a")
.sendNext("b")
.sendComplete()
p2.expectNext("a")
.expectNext(",")
.expectNext("b")
.expectComplete()
}
"complete the stage when the Sink has been cancelled" in {
val (p1, p2) = TestSource.probe[String].intersperse(",").toMat(TestSink.probe[String])(Keep.both).run
p2.request(10)
p1.sendNext("a")
.sendNext("b")
p2.expectNext("a")
.expectNext(",")
.cancel()
p1.expectCancellation()
}
}
}