diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala index 99ccc439c9..bb7cb5073e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala @@ -37,16 +37,14 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } "never construct the source when there was no demand" in assertAllStagesStopped { - val probe = TestSubscriber.probe[Int]() val constructed = new AtomicBoolean(false) Source .lazySingle { () => constructed.set(true) 1 } - .toMat(Sink.fromSubscriber(probe))(Keep.left) + .toMat(Sink.cancelled)(Keep.left) .run() - probe.cancel() constructed.get() should ===(false) } @@ -76,15 +74,13 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } "never construct the source when there was no demand" in assertAllStagesStopped { - val probe = TestSubscriber.probe[Int]() val constructed = new AtomicBoolean(false) Source .lazySingle { () => constructed.set(true) 1 } - .runWith(Sink.fromSubscriber(probe)) - probe.cancel() + .runWith(Sink.cancelled) constructed.get() should ===(false) } @@ -127,15 +123,13 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } "never construct the source when there was no demand" in assertAllStagesStopped { - val probe = TestSubscriber.probe[Int]() val constructed = new AtomicBoolean(false) val result = Source .lazySource { () => constructed.set(true); Source(List(1, 2, 3)) } - .toMat(Sink.fromSubscriber(probe))(Keep.left) + .toMat(Sink.cancelled)(Keep.left) .run() - probe.cancel() constructed.get() should ===(false) result.isCompleted should ===(false) @@ -275,7 +269,6 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } "never construct the source when there was no demand" in assertAllStagesStopped { - val probe = TestSubscriber.probe[Int]() val constructed = new AtomicBoolean(false) val result = Source .lazyFutureSource { () => @@ -284,9 +277,8 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { Source(List(1, 2, 3)) }; } - .toMat(Sink.fromSubscriber(probe))(Keep.left) + .toMat(Sink.cancelled)(Keep.left) .run() - probe.cancel() constructed.get() should ===(false) result.isCompleted should ===(false)