diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index b0ae2c87af..cd9212e0d8 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -136,6 +136,14 @@ Fail directly with a user specified exception. **completes** fails the stream directly with the given exception +lazily +~~~~~~ +Defers creation and materialization of a ``Source`` until there is demand. + +**emits** depends on the wrapped ``Source`` + +**completes** depends on the wrapped ``Source`` + actorPublisher ^^^^^^^^^^^^^^ Wrap an actor extending ``ActorPublisher`` as a source. diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index 5db202bb86..2bd6cfb341 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -134,6 +134,14 @@ Fail directly with a user specified exception. **completes** fails the stream directly with the given exception +lazily +~~~~~~ +Defers creation and materialization of a ``Source`` until there is demand. + +**emits** depends on the wrapped ``Source`` + +**completes** depends on the wrapped ``Source`` + actorPublisher ^^^^^^^^^^^^^^ Wrap an actor extending ``ActorPublisher`` as a source. diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala new file mode 100644 index 0000000000..5e5ba16915 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import java.util.concurrent.atomic.AtomicBoolean + +import akka.Done +import akka.stream.impl.LazySource +import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } +import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } +import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } +import akka.stream.{ ActorMaterializer, Attributes, Outlet, SourceShape } +import akka.testkit.DefaultTimeout +import org.scalatest.concurrent.ScalaFutures + +import scala.collection.immutable.Seq +import scala.concurrent.Future + +class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { + + implicit val materializer = ActorMaterializer() + + "A lazy source" should { + "work like a normal source, happy path" in assertAllStagesStopped { + val result = Source.fromGraph(LazySource(() ⇒ Source(List(1, 2, 3)))).runWith(Sink.seq) + + result.futureValue should ===(Seq(1, 2, 3)) + } + + "never construct the source when there was no demand" in assertAllStagesStopped { + val probe = TestSubscriber.probe[Int]() + val constructed = new AtomicBoolean(false) + val result = Source.fromGraph(LazySource { () ⇒ constructed.set(true); Source(List(1, 2, 3)) }).runWith(Sink.fromSubscriber(probe)) + probe.cancel() + + constructed.get() should ===(false) + } + + "fail the materialized value when downstream cancels without ever consuming any element" in assertAllStagesStopped { + val matF = Source.fromGraph(LazySource(() ⇒ Source(List(1, 2, 3)))) + .toMat(Sink.cancelled)(Keep.left) + .run() + + intercept[RuntimeException] { + matF.futureValue + } + } + + "stop consuming when downstream has cancelled" in assertAllStagesStopped { + val outProbe = TestSubscriber.probe[Int]() + val inProbe = TestPublisher.probe[Int]() + + Source.fromGraph(LazySource(() ⇒ Source.fromPublisher(inProbe))).runWith(Sink.fromSubscriber(outProbe)) + + outProbe.request(1) + inProbe.expectRequest() + inProbe.sendNext(27) + outProbe.expectNext(27) + outProbe.cancel() + inProbe.expectCancellation() + } + + "materialize when the source has been created" in assertAllStagesStopped { + val probe = TestSubscriber.probe[Int]() + + val matF: Future[Done] = Source.fromGraph(LazySource { () ⇒ + Source(List(1, 2, 3)).mapMaterializedValue(_ ⇒ Done) + }).to(Sink.fromSubscriber(probe)) + .run() + + matF.value shouldEqual None + probe.request(1) + probe.expectNext(1) + matF.futureValue should ===(Done) + + probe.cancel() + } + + "fail stage when upstream fails" in assertAllStagesStopped { + val outProbe = TestSubscriber.probe[Int]() + val inProbe = TestPublisher.probe[Int]() + + Source.fromGraph(LazySource(() ⇒ Source.fromPublisher(inProbe))).runWith(Sink.fromSubscriber(outProbe)) + + outProbe.request(1) + inProbe.expectRequest() + inProbe.sendNext(27) + outProbe.expectNext(27) + inProbe.sendError(TE("OMG Who set that on fire!?!")) + outProbe.expectError() shouldEqual TE("OMG Who set that on fire!?!") + } + + val attributesSource = Source.fromGraph( + new GraphStage[SourceShape[Attributes]] { + val out = Outlet[Attributes]("AttributesSource.out") + override val shape: SourceShape[Attributes] = SourceShape(out) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { + override def onPull(): Unit = { + push(out, inheritedAttributes) + completeStage() + } + setHandler(out, this) + } + } + ) + + "propagate attributes to inner streams" in assertAllStagesStopped { + val f = Source.single(attributesSource.addAttributes(Attributes.name("inner"))) + .flatMapMerge(1, identity) + .addAttributes(Attributes.name("outer")) + .runWith(Sink.head) + + val attributes = f.futureValue.attributeList + attributes should contain(Attributes.Name("inner")) + attributes should contain(Attributes.Name("outer")) + attributes.indexOf(Attributes.Name("outer")) < attributes.indexOf(Attributes.Name("inner")) should be(true) + } + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index c8a36c2323..cd1469143d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -9,11 +9,13 @@ import akka.stream.OverflowStrategies._ import akka.stream._ import akka.stream.impl.Stages.DefaultAttributes import akka.stream.stage._ -import akka.stream.scaladsl.SourceQueueWithComplete +import akka.stream.scaladsl.{ Keep, Source, SourceQueueWithComplete } + import scala.annotation.tailrec import scala.concurrent.{ Future, Promise } import akka.Done import java.util.concurrent.CompletionStage + import scala.compat.java8.FutureConverters._ import scala.util.Try import scala.util.control.NonFatal @@ -333,3 +335,64 @@ final class UnfoldResourceSourceAsync[T, S]( override def toString = "UnfoldResourceSourceAsync" } + +object LazySource { + def apply[T, M](sourceFactory: () ⇒ Source[T, M]) = new LazySource[T, M](sourceFactory) +} + +final class LazySource[T, M](sourceFactory: () ⇒ Source[T, M]) extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { + val out = Outlet[T]("LazySource.out") + override val shape = SourceShape(out) + + override protected def initialAttributes = DefaultAttributes.lazySource + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { + val matPromise = Promise[M]() + val logic = new GraphStageLogic(shape) with OutHandler { + + override def onDownstreamFinish(): Unit = { + matPromise.failure(new RuntimeException("Downstream canceled without triggering lazy source materialization")) + completeStage() + } + + override def onPull(): Unit = { + val source = sourceFactory() + val subSink = new SubSinkInlet[T]("LazySource") + subSink.pull() + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + subSink.pull() + } + + override def onDownstreamFinish(): Unit = { + subSink.cancel() + completeStage() + } + }) + + subSink.setHandler(new InHandler { + override def onPush(): Unit = { + push(out, subSink.grab()) + } + }) + + matPromise.tryComplete( + Try { + subFusingMaterializer.materialize(source.toMat(subSink.sink)(Keep.left), inheritedAttributes) + }) + } + + setHandler(out, this) + + override def postStop() = { + matPromise.tryFailure(new RuntimeException("LazySource stopped without completing the materialized future")) + } + } + + (logic, matPromise.future) + } + + override def toString = "LazySource" +} + diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 9eb341a33a..bbd575e8b7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -127,6 +127,7 @@ object Stages { val actorSubscriberSink = name("actorSubscriberSink") val queueSink = name("queueSink") val lazySink = name("lazySink") + val lazySource = name("lazySource") val outputStreamSink = name("outputStreamSink") and IODispatcher val inputStreamSink = name("inputStreamSink") and IODispatcher val fileSink = name("fileSink") and IODispatcher diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index bfb7642469..501e1dec44 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -221,6 +221,14 @@ object Source { def failed[T](cause: Throwable): Source[T, NotUsed] = new Source(scaladsl.Source.failed(cause)) + /** + * Creates a `Source` that is not materialized until there is downstream demand, when the source gets materialized + * the materialized future is completed with its value, if downstream cancels or fails without any demand the + * `create` factory is never called and the materialized `CompletionStage` is failed. + */ + def lazily[T, M](create: function.Creator[Source[T, M]]): Source[T, CompletionStage[M]] = + scaladsl.Source.lazily[T, M](() => create.create().asScala).mapMaterializedValue(_.toJava).asJava + /** * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index dd28350956..a7adcfb691 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -355,6 +355,14 @@ object Source { DefaultAttributes.failedSource, shape("FailedSource"))) + /** + * Creates a `Source` that is not materialized until there is downstream demand, when the source gets materialized + * the materialized future is completed with its value, if downstream cancels or fails without any demand the + * create factory is never called and the materialized `Future` is failed. + */ + def lazily[T, M](create: () => Source[T, M]): Source[T, Future[M]] = + Source.fromGraph(new LazySource[T, M](create)) + /** * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] */