diff --git a/akka-docs/src/main/paradox/stream/stages-overview.md b/akka-docs/src/main/paradox/stream/stages-overview.md index 75beae0a24..fa1a2bff04 100644 --- a/akka-docs/src/main/paradox/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/stream/stages-overview.md @@ -201,6 +201,16 @@ Defers creation and materialization of a `Source` until there is demand. --------------------------------------------------------------- +### lazilyAsync + +Defers creation and materialization of a `CompletionStage` until there is demand. + +**emits** the future completes + +**completes** after the future has completed + +--------------------------------------------------------------- + ### actorRef Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor contain diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazilyAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazilyAsyncSpec.scala new file mode 100644 index 0000000000..66529c8439 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazilyAsyncSpec.scala @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2014-2018 Lightbend Inc. + */ +package akka.stream.scaladsl + +import java.util.concurrent.atomic.AtomicBoolean + +import akka.Done +import akka.stream.ActorMaterializer +import akka.stream.testkit.{ StreamSpec, TestSubscriber } +import akka.stream.testkit.Utils.assertAllStagesStopped +import akka.testkit.DefaultTimeout +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.Future + +class LazilyAsyncSpec extends StreamSpec with DefaultTimeout with ScalaFutures { + + private implicit val mat: ActorMaterializer = ActorMaterializer() + + import mat.executionContext + + "A lazy async source" should { + + "work in happy path scenario" in assertAllStagesStopped { + val stream = Source.lazilyAsync { () ⇒ Future(42) }.runWith(Sink.head) + + stream.futureValue should ===(42) + } + + "call factory method on demand only" in assertAllStagesStopped { + val probe = TestSubscriber.probe[Int]() + val constructed = new AtomicBoolean(false) + + val result = Source.lazilyAsync { () ⇒ constructed.set(true); Future(42) } + .runWith(Sink.fromSubscriber(probe)) + probe.cancel() + + constructed.get() should ===(false) + } + + "fail materialized value when downstream cancels without ever consuming any element" in assertAllStagesStopped { + val materialization = Source.lazilyAsync { () ⇒ Future(42) } + .toMat(Sink.cancelled)(Keep.left) + .run() + + intercept[RuntimeException] { + materialization.futureValue + } + } + + "materialize when the source has been created" in assertAllStagesStopped { + val probe = TestSubscriber.probe[Int]() + + val materialization: Future[Done] = + Source.lazilyAsync { () ⇒ Future(42) } + .mapMaterializedValue(_.map(_ ⇒ Done)) + .to(Sink.fromSubscriber(probe)) + .run() + + materialization.value shouldEqual None + probe.request(1) + probe.expectNext(42) + materialization.futureValue should ===(Done) + + probe.cancel() + } + + "propagate failed future from factory" in assertAllStagesStopped { + val probe = TestSubscriber.probe[Int]() + val failure = new RuntimeException("too bad") + val materialization = Source.lazilyAsync { () ⇒ Future.failed(failure) } + .to(Sink.fromSubscriber(probe)) + .run() + + probe.request(1) + probe.expectError(failure) + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 482866aa5f..cef2191d26 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -251,6 +251,16 @@ object Source { def lazily[T, M](create: function.Creator[Source[T, M]]): Source[T, CompletionStage[M]] = scaladsl.Source.lazily[T, M](() ⇒ create.create().asScala).mapMaterializedValue(_.toJava).asJava + /** + * Creates a `Source` from supplied future factory that is not called until downstream demand. When source gets + * materialized the materialized future is completed with the value from the factory. If downstream cancels or fails + * without any demand the create factory is never called and the materialized `Future` is failed. + * + * @see [[Source.lazily]] + */ + def lazilyAsync[T](create: function.Creator[CompletionStage[T]]): Source[T, Future[NotUsed]] = + scaladsl.Source.lazilyAsync[T](() ⇒ create.create().toScala).asJava + /** * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index e1ef830acc..4cdacace92 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -421,6 +421,16 @@ object Source { def lazily[T, M](create: () ⇒ Source[T, M]): Source[T, Future[M]] = Source.fromGraph(new LazySource[T, M](create)) + /** + * Creates a `Source` from supplied future factory that is not called until downstream demand. When source gets + * materialized the materialized future is completed with the value from the factory. If downstream cancels or fails + * without any demand the create factory is never called and the materialized `Future` is failed. + * + * @see [[Source.lazily]] + */ + def lazilyAsync[T](create: () ⇒ Future[T]): Source[T, Future[NotUsed]] = + lazily(() ⇒ fromFuture(create())) + /** * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] */