diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala index 98258c846a..f9cf6382f0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala @@ -75,13 +75,16 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { "never construct the source when there was no demand" in assertAllStagesStopped { val constructed = new AtomicBoolean(false) - Source - .lazySingle { () => + val termination = Source + .lazyFuture { () => constructed.set(true) - 1 + Future.successful(1) } - .runWith(Sink.cancelled) + .watchTermination()(Keep.right) + .toMat(Sink.cancelled)(Keep.left) + .run() + termination.futureValue // stream should terminate constructed.get() should ===(false) } @@ -124,14 +127,17 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { "never construct the source when there was no demand" in assertAllStagesStopped { val constructed = new AtomicBoolean(false) - Source + val (lazySourceMatVal, termination) = Source .lazySource { () => constructed.set(true); Source(List(1, 2, 3)) } + .watchTermination()(Keep.both) .toMat(Sink.cancelled)(Keep.left) .run() + termination.futureValue // stream should terminate constructed.get() should ===(false) + lazySourceMatVal.failed.futureValue shouldBe a[NeverMaterializedException] } "fail the materialized value when downstream cancels without ever consuming any element" in assertAllStagesStopped { @@ -270,17 +276,20 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { "never construct the source when there was no demand" in assertAllStagesStopped { val constructed = new AtomicBoolean(false) - Source + val (lazyFutureSourceMatval, termination) = Source .lazyFutureSource { () => Future { constructed.set(true) Source(List(1, 2, 3)) }; } + .watchTermination()(Keep.both) .toMat(Sink.cancelled)(Keep.left) .run() + termination.futureValue // stream should terminate constructed.get() should ===(false) + lazyFutureSourceMatval.failed.futureValue shouldBe a[NeverMaterializedException] } "fail the materialized value when downstream cancels without ever consuming any element" in assertAllStagesStopped {