diff --git a/akka-docs/src/main/paradox/stream/stages-overview.md b/akka-docs/src/main/paradox/stream/stages-overview.md
index fa1a2bff04..c60b2eb569 100644
--- a/akka-docs/src/main/paradox/stream/stages-overview.md
+++ b/akka-docs/src/main/paradox/stream/stages-overview.md
@@ -1112,6 +1112,33 @@ also be sent to the wire-tap `Sink` if there is demand.
---------------------------------------------------------------
+### lazyInit
+
+Creates a real `Flow` upon receiving the first element by calling relevant `flowFactory` given as an argument.
+Internal `Flow` will not be created if there are no elements, because of completion or error.
+The materialized value of the `Flow` will be the materialized value of the created internal flow.
+
+If `flowFactory` throws an exception and the supervision decision is
+`akka.stream.Supervision.Stop` the materialized value of the flow will be completed with
+the result of the `fallback` argument. For all other supervision options it will
+try to create flow with the next element.
+
+The `fallback` will be executed when there was no elements and completed is received from upstream
+or when there was an exception either thrown by the `flowFactory` or during the internal flow
+materialization process.
+
+Adheres to the `ActorAttributes.SupervisionStrategy` attribute.
+
+**emits** when the internal flow is successfully created and it emits
+
+**backpressures** when the internal flow is successfully created and it backpressures
+
+**completes** when upstream completes and all elements have been emitted from the internal flow
+
+**cancels** when downstream cancels
+
+---------------------------------------------------------------
+
## Flow stages composed of Sinks and Sources
diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java
index 8bfda94599..d5394f3eb5 100644
--- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java
+++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java
@@ -961,4 +961,24 @@ public class FlowTest extends StreamTest {
final Flow f = Flow.of(Integer.class).divertTo(Sink.ignore(), e -> true);
final Flow f2 = Flow.of(Integer.class).divertToMat(Sink.ignore(), e -> true, (i, n) -> "foo");
}
+
+ @Test
+ public void mustBeAbleToUseLazyInit() throws Exception {
+ final CompletionStage> future = new CompletableFuture>();
+ future.toCompletableFuture().complete(Flow.fromFunction((id) -> id));
+ Creator ignoreFunction = new Creator() {
+ @Override
+ public NotUsed create() throws Exception {
+ return NotUsed.getInstance();
+ }
+ };
+
+ Integer result =
+ Source.range(1, 10)
+ .via(Flow.lazyInit((i) -> future, ignoreFunction))
+ .runWith(Sink.head(), materializer)
+ .toCompletableFuture().get(3, TimeUnit.SECONDS);
+
+ assertEquals((Object) 1, result);
+ }
}
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala
new file mode 100644
index 0000000000..f21cd63236
--- /dev/null
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala
@@ -0,0 +1,184 @@
+/**
+ * Copyright (C) 2018-2018 Lightbend Inc.
+ */
+package akka.stream.scaladsl
+
+import java.util.concurrent.TimeoutException
+
+import akka.NotUsed
+import akka.stream.ActorAttributes.supervisionStrategy
+import akka.stream.Supervision._
+import akka.stream._
+import akka.stream.impl.fusing.LazyFlow
+import akka.stream.stage.{ GraphStage, GraphStageLogic, GraphStageWithMaterializedValue }
+import akka.stream.testkit.{ StreamSpec, TestPublisher }
+import akka.stream.testkit.TestSubscriber.Probe
+import akka.stream.testkit.Utils._
+import akka.stream.testkit.scaladsl.TestSink
+
+import scala.concurrent.{ Await, Future, Promise }
+import scala.concurrent.duration._
+
+class LazyFlowSpec extends StreamSpec {
+
+ val settings = ActorMaterializerSettings(system)
+ .withInputBuffer(initialSize = 1, maxSize = 1)
+ implicit val materializer = ActorMaterializer(settings)
+
+ val fallback = () ⇒ NotUsed
+ val ex = TE("")
+
+ "A LazyFlow" must {
+ def mapF(e: Int): Future[Flow[Int, String, NotUsed]] =
+ Future.successful(Flow.fromFunction[Int, String](i ⇒ (i * e).toString))
+ val flowF = Future.successful(Flow.fromFunction[Int, Int](id ⇒ id))
+ "work in happy case" in assertAllStagesStopped {
+ val probe = Source(2 to 10)
+ .via(Flow.lazyInit[Int, String, NotUsed](mapF, fallback))
+ .runWith(TestSink.probe[String])
+ probe.request(100)
+ (2 to 10).map(i ⇒ (i * 2).toString).foreach(probe.expectNext)
+ }
+
+ "work with slow flow init" in assertAllStagesStopped {
+ val p = Promise[Flow[Int, Int, NotUsed]]()
+ val sourceProbe = TestPublisher.manualProbe[Int]()
+ val flowProbe = Source.fromPublisher(sourceProbe)
+ .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ p.future, fallback))
+ .runWith(TestSink.probe[Int])
+
+ val sourceSub = sourceProbe.expectSubscription()
+ flowProbe.request(1)
+ sourceSub.expectRequest(1)
+ sourceSub.sendNext(0)
+ sourceSub.expectRequest(1)
+ sourceProbe.expectNoMsg(200.millis)
+
+ p.success(Flow.fromFunction[Int, Int](id ⇒ id))
+ flowProbe.request(99)
+ flowProbe.expectNext(0)
+ (1 to 10).foreach(i ⇒ {
+ sourceSub.sendNext(i)
+ flowProbe.expectNext(i)
+ })
+ sourceSub.sendComplete()
+ }
+
+ "complete when there was no elements in the stream" in assertAllStagesStopped {
+ def flowMaker(i: Int) = flowF
+ val probe = Source.empty
+ .via(Flow.lazyInit(flowMaker, () ⇒ 0))
+ .runWith(TestSink.probe[Int])
+ probe.request(1).expectComplete()
+ }
+
+ "complete normally when upstream is completed" in assertAllStagesStopped {
+ val probe = Source.single(1)
+ .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ flowF, fallback))
+ .runWith(TestSink.probe[Int])
+ probe.request(1)
+ .expectNext(1)
+ .expectComplete()
+ }
+
+ "fail gracefully when flow factory method failed" in assertAllStagesStopped {
+ val sourceProbe = TestPublisher.manualProbe[Int]()
+ val probe = Source.fromPublisher(sourceProbe)
+ .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ throw ex, fallback))
+ .runWith(TestSink.probe[Int])
+
+ val sourceSub = sourceProbe.expectSubscription()
+ probe.request(1)
+ sourceSub.expectRequest(1)
+ sourceSub.sendNext(0)
+ sourceSub.expectCancellation()
+ probe.expectError(ex)
+ }
+
+ "fail gracefully when upstream failed" in assertAllStagesStopped {
+ val sourceProbe = TestPublisher.manualProbe[Int]()
+ val probe = Source.fromPublisher(sourceProbe)
+ .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ flowF, fallback))
+ .runWith(TestSink.probe[Int])
+
+ val sourceSub = sourceProbe.expectSubscription()
+ sourceSub.expectRequest(1)
+ sourceSub.sendNext(0)
+ probe.request(1)
+ .expectNext(0)
+ sourceSub.sendError(ex)
+ probe.expectError(ex)
+ }
+
+ "fail gracefully when factory future failed" in assertAllStagesStopped {
+ val sourceProbe = TestPublisher.manualProbe[Int]()
+ val flowProbe = Source.fromPublisher(sourceProbe)
+ .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ Future.failed(ex), fallback))
+ .withAttributes(supervisionStrategy(stoppingDecider))
+ .runWith(TestSink.probe[Int])
+
+ val sourceSub = sourceProbe.expectSubscription()
+ sourceSub.expectRequest(1)
+ sourceSub.sendNext(0)
+ flowProbe.request(1).expectError(ex)
+ }
+
+ "cancel upstream when the downstream is cancelled" in assertAllStagesStopped {
+ val sourceProbe = TestPublisher.manualProbe[Int]()
+ val probe = Source.fromPublisher(sourceProbe)
+ .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ flowF, fallback))
+ .withAttributes(supervisionStrategy(stoppingDecider))
+ .runWith(TestSink.probe[Int])
+
+ val sourceSub = sourceProbe.expectSubscription()
+ probe.request(1)
+ sourceSub.expectRequest(1)
+ sourceSub.sendNext(0)
+ sourceSub.expectRequest(1)
+ probe.expectNext(0)
+ probe.cancel()
+ sourceSub.expectCancellation()
+ }
+
+ "continue if supervision is resume" in assertAllStagesStopped {
+ val sourceProbe = TestPublisher.manualProbe[Int]()
+ def flowBuilder(a: Int) = if (a == 0) throw ex else Future.successful(Flow.fromFunction[Int, Int](id ⇒ id))
+ val probe = Source.fromPublisher(sourceProbe)
+ .via(Flow.lazyInit[Int, Int, NotUsed](flowBuilder, fallback))
+ .withAttributes(supervisionStrategy(resumingDecider))
+ .runWith(TestSink.probe[Int])
+
+ val sourceSub = sourceProbe.expectSubscription()
+ probe.request(1)
+ sourceSub.expectRequest(1)
+ sourceSub.sendNext(0)
+ sourceSub.expectRequest(1)
+ sourceSub.sendNext(1)
+ probe.expectNext(1)
+ probe.cancel()
+ }
+
+ "fail correctly when materialization of inner sink fails" in assertAllStagesStopped {
+ val matFail = TE("fail!")
+ object FailingInnerMat extends GraphStageWithMaterializedValue[FlowShape[String, String], Option[String]] {
+ val in = Inlet[String]("in")
+ val out = Outlet[String]("out")
+ val shape = FlowShape(in, out)
+ override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) =
+ (new GraphStageLogic(shape) {
+ throw matFail
+ }, Some("fine"))
+ }
+
+ val result = Source.single("whatever")
+ .viaMat(Flow.lazyInit(
+ _ ⇒ Future.successful(Flow.fromGraph(FailingInnerMat)),
+ () ⇒ Some("boom")))(Keep.right)
+ .toMat(Sink.ignore)(Keep.left)
+ .run()
+
+ result should ===(Some("boom"))
+ }
+ }
+
+}
diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
index fada0576ed..ea06d31cf0 100755
--- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
@@ -130,6 +130,7 @@ import akka.stream._
val actorSubscriberSink = name("actorSubscriberSink")
val queueSink = name("queueSink")
val lazySink = name("lazySink")
+ val lazyFlow = name("lazyFlow")
val lazySource = name("lazySource")
val outputStreamSink = name("outputStreamSink") and IODispatcher
val inputStreamSink = name("inputStreamSink") and IODispatcher
diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
index d11a2c4812..122916129b 100644
--- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
@@ -6,20 +6,21 @@ package akka.stream.impl.fusing
import java.util.concurrent.TimeUnit.NANOSECONDS
import akka.annotation.{ DoNotInherit, InternalApi }
+import akka.dispatch.ExecutionContexts
import akka.event.Logging.LogLevel
import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.OverflowStrategies._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ ConstantFun, ReactiveStreamsCompliance, Stages, Buffer ⇒ BufferImpl }
-import akka.stream.scaladsl.{ Source, SourceQueue }
+import akka.stream.scaladsl.{ Flow, Keep, Source, SourceQueue }
import akka.stream.stage._
import akka.stream.{ Supervision, _ }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.immutable.VectorBuilder
-import scala.concurrent.Future
+import scala.concurrent.{ Future, Promise }
import scala.util.control.{ NoStackTrace, NonFatal }
import scala.util.{ Failure, Success, Try }
import akka.stream.ActorAttributes.SupervisionStrategy
@@ -1927,3 +1928,152 @@ private[stream] object Collect {
override def toString = "StatefulMapConcat"
}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi final private[akka] class LazyFlow[I, O, M](flowFactory: I ⇒ Future[Flow[I, O, M]], zeroMat: () ⇒ M)
+ extends GraphStageWithMaterializedValue[FlowShape[I, O], M] {
+ val in = Inlet[I]("lazyFlow.in")
+ val out = Outlet[O]("lazyFlow.out")
+ override def initialAttributes = DefaultAttributes.lazyFlow
+ override val shape: FlowShape[I, O] = FlowShape.of(in, out)
+
+ override def toString: String = "LazyFlow"
+
+ override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
+ lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+
+ var completed = false
+ var matVal: Option[M] = None
+ val stageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
+ val subSink = new SubSinkInlet[O]("LazyFlowSubSink")
+
+ override def onPush(): Unit = {
+ try {
+ val element = grab(in)
+ val cb: AsyncCallback[Try[Flow[I, O, M]]] =
+ getAsyncCallback {
+ case Success(flow) ⇒ initInternalSource(flow, element)
+ case Failure(e) ⇒ failure(e)
+ }
+ flowFactory(element).onComplete { cb.invoke }(ExecutionContexts.sameThreadExecutionContext)
+ setHandler(in, new InHandler {
+ override def onPush(): Unit = throw new IllegalStateException("LazyFlow received push while waiting for flowFactory to complete.")
+ override def onUpstreamFinish(): Unit = gotCompletionEvent()
+ override def onUpstreamFailure(ex: Throwable): Unit = failure(ex)
+ })
+ } catch {
+ case NonFatal(e) ⇒ decider(e) match {
+ case Supervision.Stop ⇒ failure(e)
+ case _ ⇒ pull(in)
+ }
+ }
+ }
+
+ override def onPull(): Unit = {
+ pull(in)
+ subSink.pull()
+
+ setHandler(out, new OutHandler {
+ override def onPull(): Unit = {
+ subSink.pull()
+ }
+
+ override def onDownstreamFinish(): Unit = {
+ subSink.cancel()
+ completeStage()
+ }
+ })
+
+ subSink.setHandler(new InHandler {
+ override def onPush(): Unit = {
+ val elem = subSink.grab()
+ push(out, elem)
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ completeStage()
+ }
+ })
+ }
+
+ setHandler(out, this)
+
+ private def failure(ex: Throwable): Unit = {
+ matVal = Some(zeroMat())
+ failStage(ex)
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ matVal = Some(zeroMat())
+ completeStage()
+ }
+ override def onUpstreamFailure(ex: Throwable): Unit = failure(ex)
+ setHandler(in, this)
+
+ private def gotCompletionEvent(): Unit = {
+ setKeepGoing(true)
+ completed = true
+ }
+
+ private def initInternalSource(flow: Flow[I, O, M], firstElement: I): Unit = {
+ val sourceOut = new SubSourceOutlet[I]("LazyFlowSubSource")
+
+ def switchToFirstElementHandlers(): Unit = {
+ sourceOut.setHandler(new OutHandler {
+ override def onPull(): Unit = {
+ sourceOut.push(firstElement)
+ if (completed) internalSourceComplete() else switchToFinalHandlers()
+ }
+ override def onDownstreamFinish(): Unit = internalSourceComplete()
+ })
+
+ setHandler(in, new InHandler {
+ override def onPush(): Unit = sourceOut.push(grab(in))
+ override def onUpstreamFinish(): Unit = gotCompletionEvent()
+ override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex)
+ })
+ }
+
+ def switchToFinalHandlers(): Unit = {
+ sourceOut.setHandler(new OutHandler {
+ override def onPull(): Unit = pull(in)
+ override def onDownstreamFinish(): Unit = internalSourceComplete()
+ })
+ setHandler(in, new InHandler {
+ override def onPush(): Unit = {
+ val elem = grab(in)
+ sourceOut.push(elem)
+ }
+ override def onUpstreamFinish(): Unit = internalSourceComplete()
+ override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex)
+ })
+ }
+
+ def internalSourceComplete(): Unit = {
+ sourceOut.complete()
+ // normal completion, subSink.onUpstreamFinish will complete the stage
+ }
+
+ def internalSourceFailure(ex: Throwable): Unit = {
+ sourceOut.fail(ex)
+ failStage(ex)
+ }
+
+ switchToFirstElementHandlers()
+ try {
+ matVal = Some(Source.fromGraph(sourceOut.source)
+ .viaMat(flow)(Keep.right).toMat(subSink.sink)(Keep.left).run()(interpreter.subFusingMaterializer))
+ } catch {
+ case NonFatal(ex) ⇒
+ subSink.cancel()
+ matVal = Some(zeroMat())
+ failStage(ex)
+ }
+ }
+
+ }
+ (stageLogic, matVal.getOrElse(zeroMat()))
+ }
+}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
index b6d91c5ad7..314615bd81 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
@@ -16,6 +16,9 @@ import akka.japi.Util
import java.util.Comparator
import java.util.concurrent.CompletionStage
+import akka.dispatch.ExecutionContexts
+import akka.stream.impl.fusing.LazyFlow
+
import scala.compat.java8.FutureConverters._
import scala.reflect.ClassTag
@@ -200,6 +203,36 @@ object Flow {
sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2],
combine: function.Function2[M1, M2, M]): Flow[I, O, M] =
new Flow(scaladsl.Flow.fromSinkAndSourceCoupledMat(sink, source)(combinerToScala(combine)))
+
+ /**
+ * Creates a real `Flow` upon receiving the first element. Internal `Flow` will not be created
+ * if there are no elements, because of completion or error.
+ * The materialized value of the `Flow` will be the materialized
+ * value of the created internal flow.
+ *
+ * If `flowFactory` throws an exception and the supervision decision is
+ * [[akka.stream.Supervision.Stop]] the materialized value of the flow will be completed with
+ * the result of the `fallback`. For all other supervision options it will
+ * try to create flow with the next element.
+ *
+ * `fallback` will be executed when there was no elements and completed is received from upstream
+ * or when there was an exception either thrown by the `flowFactory` or during the internal flow
+ * materialization process.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the internal flow is successfully created and it emits
+ *
+ * '''Backpressures when''' the internal flow is successfully created and it backpressures
+ *
+ * '''Completes when''' upstream completes and all elements have been emitted from the internal flow
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def lazyInit[I, O, M](flowFactory: function.Function[I, CompletionStage[Flow[I, O, M]]], fallback: function.Creator[M]): Flow[I, O, M] =
+ Flow.fromGraph(new LazyFlow[I, O, M](
+ t ⇒ flowFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext),
+ () ⇒ fallback.create()))
}
/** Create a `Flow` which can process elements of type `T`. */
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
index be72a098e0..1bf4959b52 100755
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
@@ -495,6 +495,34 @@ object Flow {
FlowShape(bidi.in1, bidi.out2)
})
// format: ON
+
+ /**
+ * Creates a real `Flow` upon receiving the first element. Internal `Flow` will not be created
+ * if there are no elements, because of completion or error.
+ * The materialized value of the `Flow` will be the materialized
+ * value of the created internal flow.
+ *
+ * If `flowFactory` throws an exception and the supervision decision is
+ * [[akka.stream.Supervision.Stop]] the materialized value of the flow will be completed with
+ * the result of the `fallback`. For all other supervision options it will
+ * try to create flow with the next element.
+ *
+ * `fallback` will be executed when there was no elements and completed is received from upstream
+ * or when there was an exception either thrown by the `flowFactory` or during the internal flow
+ * materialization process.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the internal flow is successfully created and it emits
+ *
+ * '''Backpressures when''' the internal flow is successfully created and it backpressures
+ *
+ * '''Completes when''' upstream completes and all elements have been emitted from the internal flow
+ *
+ * '''Cancels when''' downstream cancels
+ */
+ def lazyInit[I, O, M](flowFactory: I ⇒ Future[Flow[I, O, M]], fallback: () ⇒ M): Flow[I, O, M] =
+ Flow.fromGraph(new LazyFlow[I, O, M](flowFactory, fallback))
}
object RunnableGraph {