Issue 24519: Created method lazilyAsync for both DSLs and adds section in docs (#24568)

* Issue 24519: Created method lazilyAsync for both DSLs and adds section in docs.

* Issue 24519: Changes according to code review

* Issue 24519: Added unit tests

* Update LazilyAsyncSpec.scala

* Issue 24519: Added copyright
This commit is contained in:
Oleksii Tkachuk 2018-02-21 19:15:25 -06:00 committed by Konrad `ktoso` Malawski
parent 11b6065d97
commit 80da4cadee
4 changed files with 110 additions and 0 deletions

View file

@ -201,6 +201,16 @@ Defers creation and materialization of a `Source` until there is demand.
---------------------------------------------------------------
### lazilyAsync
Defers creation and materialization of a `CompletionStage` until there is demand.
**emits** the future completes
**completes** after the future has completed
---------------------------------------------------------------
### actorRef
Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor contain

View file

@ -0,0 +1,80 @@
/**
* Copyright (C) 2014-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import java.util.concurrent.atomic.AtomicBoolean
import akka.Done
import akka.stream.ActorMaterializer
import akka.stream.testkit.{ StreamSpec, TestSubscriber }
import akka.stream.testkit.Utils.assertAllStagesStopped
import akka.testkit.DefaultTimeout
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.Future
class LazilyAsyncSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
private implicit val mat: ActorMaterializer = ActorMaterializer()
import mat.executionContext
"A lazy async source" should {
"work in happy path scenario" in assertAllStagesStopped {
val stream = Source.lazilyAsync { () Future(42) }.runWith(Sink.head)
stream.futureValue should ===(42)
}
"call factory method on demand only" in assertAllStagesStopped {
val probe = TestSubscriber.probe[Int]()
val constructed = new AtomicBoolean(false)
val result = Source.lazilyAsync { () constructed.set(true); Future(42) }
.runWith(Sink.fromSubscriber(probe))
probe.cancel()
constructed.get() should ===(false)
}
"fail materialized value when downstream cancels without ever consuming any element" in assertAllStagesStopped {
val materialization = Source.lazilyAsync { () Future(42) }
.toMat(Sink.cancelled)(Keep.left)
.run()
intercept[RuntimeException] {
materialization.futureValue
}
}
"materialize when the source has been created" in assertAllStagesStopped {
val probe = TestSubscriber.probe[Int]()
val materialization: Future[Done] =
Source.lazilyAsync { () Future(42) }
.mapMaterializedValue(_.map(_ Done))
.to(Sink.fromSubscriber(probe))
.run()
materialization.value shouldEqual None
probe.request(1)
probe.expectNext(42)
materialization.futureValue should ===(Done)
probe.cancel()
}
"propagate failed future from factory" in assertAllStagesStopped {
val probe = TestSubscriber.probe[Int]()
val failure = new RuntimeException("too bad")
val materialization = Source.lazilyAsync { () Future.failed(failure) }
.to(Sink.fromSubscriber(probe))
.run()
probe.request(1)
probe.expectError(failure)
}
}
}

View file

@ -251,6 +251,16 @@ object Source {
def lazily[T, M](create: function.Creator[Source[T, M]]): Source[T, CompletionStage[M]] =
scaladsl.Source.lazily[T, M](() create.create().asScala).mapMaterializedValue(_.toJava).asJava
/**
* Creates a `Source` from supplied future factory that is not called until downstream demand. When source gets
* materialized the materialized future is completed with the value from the factory. If downstream cancels or fails
* without any demand the create factory is never called and the materialized `Future` is failed.
*
* @see [[Source.lazily]]
*/
def lazilyAsync[T](create: function.Creator[CompletionStage[T]]): Source[T, Future[NotUsed]] =
scaladsl.Source.lazilyAsync[T](() create.create().toScala).asJava
/**
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]
*/

View file

@ -421,6 +421,16 @@ object Source {
def lazily[T, M](create: () Source[T, M]): Source[T, Future[M]] =
Source.fromGraph(new LazySource[T, M](create))
/**
* Creates a `Source` from supplied future factory that is not called until downstream demand. When source gets
* materialized the materialized future is completed with the value from the factory. If downstream cancels or fails
* without any demand the create factory is never called and the materialized `Future` is failed.
*
* @see [[Source.lazily]]
*/
def lazilyAsync[T](create: () Future[T]): Source[T, Future[NotUsed]] =
lazily(() fromFuture(create()))
/**
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]
*/