diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIntersperseSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIntersperseSpec.scala index 0f1360692e..fa25b1644f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIntersperseSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIntersperseSpec.scala @@ -4,7 +4,8 @@ package akka.stream.scaladsl import akka.stream.testkit._ -import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.Utils.assertAllStagesStopped +import akka.stream.testkit.scaladsl.{ TestSource, TestSink } import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import org.scalatest.concurrent.ScalaFutures @@ -18,7 +19,6 @@ class FlowIntersperseSpec extends AkkaSpec with ScalaFutures { implicit val materializer = ActorMaterializer(settings) "A Intersperse" must { - "inject element between existing elements" in { val probe = Source(List(1, 2, 3)) .map(_.toString) @@ -74,6 +74,29 @@ class FlowIntersperseSpec extends AkkaSpec with ScalaFutures { probe.expectSubscription() probe.toStrict(1.second).mkString("") should ===(List(1).mkString("[", ",", "]")) } + + "complete the stage when the Source has been completed" in { + val (p1, p2) = TestSource.probe[String].intersperse(",").toMat(TestSink.probe[String])(Keep.both).run + p2.request(10) + p1.sendNext("a") + .sendNext("b") + .sendComplete() + p2.expectNext("a") + .expectNext(",") + .expectNext("b") + .expectComplete() + } + + "complete the stage when the Sink has been cancelled" in { + val (p1, p2) = TestSource.probe[String].intersperse(",").toMat(TestSink.probe[String])(Keep.both).run + p2.request(10) + p1.sendNext("a") + .sendNext("b") + p2.expectNext("a") + .expectNext(",") + .cancel() + p1.expectCancellation() + } } }