diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/setup.md b/akka-docs/src/main/paradox/stream/operators/Sink/setup.md
new file mode 100644
index 0000000000..855d08b022
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Sink/setup.md
@@ -0,0 +1,17 @@
+# Sink.setup
+
+Defer the creation of a `Sink` until materialization and access `ActorMaterializer` and `Attributes`
+
+@ref[Sink operators](../index.md#sink-operators)
+
+@@@ div { .group-scala }
+
+## Signature
+
+@@signature [Sink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #setup }
+@@@
+
+## Description
+
+Typically used when access to materializer is needed to run a different stream during the construction of a sink.
+Can also be used to access the underlying `ActorSystem` from `ActorMaterializer`.
\ No newline at end of file
diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/setup.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/setup.md
new file mode 100644
index 0000000000..b4421d07b2
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/setup.md
@@ -0,0 +1,19 @@
+# Source/Flow.setup
+
+Defer the creation of a `Source/Flow` until materialization and access `ActorMaterializer` and `Attributes`
+
+@ref[Simple operators](../index.md#simple-operators)
+
+@@@ div { .group-scala }
+
+## Signature
+
+@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #setup }
+@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #setup }
+
+@@@
+
+## Description
+
+Typically used when access to materializer is needed to run a different stream during the construction of a source/flow.
+Can also be used to access the underlying `ActorSystem` from `ActorMaterializer`.
\ No newline at end of file
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index ea5f587d39..4b390e1d43 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -64,6 +64,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav
|Sink| @ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.|
|Sink| @ref[reduce](Sink/reduce.md)|Apply a reduction function on the incoming elements and pass the result to the next invocation.|
|Sink| @ref[seq](Sink/seq.md)|Collect values emitted from the stream into a collection.|
+|Sink| @ref[setup](Sink/setup.md)|Defer the creation of a `Sink` until materialization and access `ActorMaterializer` and `Attributes`|
|Sink| @ref[takeLast](Sink/takeLast.md)|Collect the last `n` values emitted from the stream into a collection.|
## Additional Sink and Source converters
@@ -150,6 +151,7 @@ depending on being backpressured by downstream or not.
|Source/Flow| @ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.|
|Source/Flow| @ref[scan](Source-or-Flow/scan.md)|Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value.|
|Source/Flow| @ref[scanAsync](Source-or-Flow/scanAsync.md)|Just like `scan` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.|
+|Source/Flow| @ref[setup](Source-or-Flow/setup.md)|Defer the creation of a `Source/Flow` until materialization and access `ActorMaterializer` and `Attributes`|
|Source/Flow| @ref[sliding](Source-or-Flow/sliding.md)|Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.|
|Source/Flow| @ref[statefulMapConcat](Source-or-Flow/statefulMapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.|
|Source/Flow| @ref[take](Source-or-Flow/take.md)|Pass `n` incoming elements downstream and then complete|
@@ -291,6 +293,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [fromPublisher](Source/fromPublisher.md)
* [fromIterator](Source/fromIterator.md)
* [cycle](Source/cycle.md)
+* [setup](Source-or-Flow/setup.md)
* [fromFuture](Source/fromFuture.md)
* [fromCompletionStage](Source/fromCompletionStage.md)
* [fromFutureSource](Source/fromFutureSource.md)
@@ -391,6 +394,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [fromSinkAndSourceCoupled](Flow/fromSinkAndSourceCoupled.md)
* [lazyInitAsync](Flow/lazyInitAsync.md)
* [preMaterialize](Sink/preMaterialize.md)
+* [setup](Sink/setup.md)
* [fromSubscriber](Sink/fromSubscriber.md)
* [cancelled](Sink/cancelled.md)
* [head](Sink/head.md)
diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SetupTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SetupTest.java
new file mode 100644
index 0000000000..f143b9610a
--- /dev/null
+++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SetupTest.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright (C) 2019 Lightbend Inc.
+ */
+
+package akka.stream.javadsl;
+
+import akka.NotUsed;
+import akka.japi.Pair;
+import akka.stream.StreamTest;
+import akka.testkit.AkkaJUnitActorSystemResource;
+import akka.testkit.AkkaSpec;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class SetupTest extends StreamTest {
+ public SetupTest() {
+ super(actorSystemResource);
+ }
+
+ @ClassRule
+ public static AkkaJUnitActorSystemResource actorSystemResource =
+ new AkkaJUnitActorSystemResource("SetupTest", AkkaSpec.testConf());
+
+ @Test
+ public void shouldExposeMaterializerAndAttributesToSource() throws Exception {
+ final Source, CompletionStage> source =
+ Source.setup(
+ (mat, attr) ->
+ Source.single(Pair.create(mat.isShutdown(), attr.attributeList().isEmpty())));
+
+ assertEquals(
+ Pair.create(false, false),
+ source.runWith(Sink.head(), materializer).toCompletableFuture().get(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void shouldExposeMaterializerAndAttributesToFlow() throws Exception {
+ final Flow, CompletionStage> flow =
+ Flow.setup(
+ (mat, attr) ->
+ Flow.fromSinkAndSource(
+ Sink.ignore(),
+ Source.single(Pair.create(mat.isShutdown(), attr.attributeList().isEmpty()))));
+
+ assertEquals(
+ Pair.create(false, false),
+ Source.empty()
+ .via(flow)
+ .runWith(Sink.head(), materializer)
+ .toCompletableFuture()
+ .get(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void shouldExposeMaterializerAndAttributesToSink() throws Exception {
+ Sink>>> sink =
+ Sink.setup(
+ (mat, attr) ->
+ Sink.fold(
+ Pair.create(mat.isShutdown(), attr.attributeList().isEmpty()), Keep.left()));
+
+ assertEquals(
+ Pair.create(false, false),
+ Source.empty()
+ .runWith(sink, materializer)
+ .thenCompose(c -> c)
+ .toCompletableFuture()
+ .get(5, TimeUnit.SECONDS));
+ }
+}
diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala
index f20c1cc7ff..7f0be3d358 100644
--- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala
@@ -47,6 +47,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
(classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) ::
(classOf[scala.Function1[_, Unit]], classOf[akka.japi.function.Procedure[_]]) ::
(classOf[scala.Function1[_, _]], classOf[akka.japi.function.Function[_, _]]) ::
+ (classOf[scala.Function2[_, _, _]], classOf[java.util.function.BiFunction[_, _, _]]) :: // setup
(classOf[scala.Function1[scala.Function1[_, _], _]], classOf[akka.japi.function.Function2[_, _, _]]) ::
(classOf[akka.stream.scaladsl.Source[_, _]], classOf[akka.stream.javadsl.Source[_, _]]) ::
(classOf[akka.stream.scaladsl.Sink[_, _]], classOf[akka.stream.javadsl.Sink[_, _]]) ::
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala
new file mode 100644
index 0000000000..d57e365998
--- /dev/null
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala
@@ -0,0 +1,232 @@
+/*
+ * Copyright (C) 2019 Lightbend Inc.
+ */
+
+package akka.stream.scaladsl
+
+import akka.NotUsed
+import akka.stream.ActorMaterializer
+import akka.stream.testkit.StreamSpec
+
+class SetupSpec extends StreamSpec {
+
+ implicit val materializer = ActorMaterializer()
+ import system.dispatcher
+
+ "Source.setup" should {
+
+ "expose materializer" in {
+ val source = Source.setup { (mat, _) ⇒
+ Source.single(mat.isShutdown)
+ }
+
+ source.runWith(Sink.head).futureValue shouldBe false
+ }
+
+ "expose attributes" in {
+ val source = Source.setup { (_, attr) ⇒
+ Source.single(attr.attributeList)
+ }
+
+ source.runWith(Sink.head).futureValue should not be empty
+ }
+
+ "propagate materialized value" in {
+ val source = Source.setup { (_, _) ⇒
+ Source.maybe[NotUsed]
+ }
+
+ val (completion, element) = source.toMat(Sink.head)(Keep.both).run()
+ completion.futureValue.trySuccess(Some(NotUsed))
+ element.futureValue shouldBe NotUsed
+ }
+
+ "propagate attributes" in {
+ val source = Source
+ .setup { (_, attr) ⇒
+ Source.single(attr.nameLifted)
+ }
+ .named("my-name")
+
+ source.runWith(Sink.head).futureValue shouldBe Some("my-name")
+ }
+
+ "propagate attributes when nested" in {
+ val source = Source
+ .setup { (_, _) ⇒
+ Source.setup { (_, attr) ⇒
+ Source.single(attr.nameLifted)
+ }
+ }
+ .named("my-name")
+
+ source.runWith(Sink.head).futureValue shouldBe Some("my-name")
+ }
+
+ "handle factory failure" in {
+ val error = new Error("boom")
+ val source = Source.setup { (_, _) ⇒
+ throw error
+ }
+
+ val (materialized, completion) = source.toMat(Sink.head)(Keep.both).run()
+ materialized.failed.futureValue.getCause shouldBe error
+ completion.failed.futureValue.getCause shouldBe error
+ }
+
+ "handle materialization failure" in {
+ val error = new Error("boom")
+ val source = Source.setup { (_, _) ⇒
+ Source.empty.mapMaterializedValue(_ ⇒ throw error)
+ }
+
+ val (materialized, completion) = source.toMat(Sink.head)(Keep.both).run()
+ materialized.failed.futureValue.getCause shouldBe error
+ completion.failed.futureValue.getCause shouldBe error
+ }
+
+ }
+
+ "Flow.setup" should {
+
+ "expose materializer" in {
+ val flow = Flow.setup { (mat, _) ⇒
+ Flow.fromSinkAndSource(Sink.ignore, Source.single(mat.isShutdown))
+ }
+
+ Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe false
+ }
+
+ "expose attributes" in {
+ val flow = Flow.setup { (_, attr) ⇒
+ Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.attributeList))
+ }
+
+ Source.empty.via(flow).runWith(Sink.head).futureValue should not be empty
+ }
+
+ "propagate materialized value" in {
+ val flow = Flow.setup { (_, _) ⇒
+ Flow.fromSinkAndSourceMat(Sink.ignore, Source.maybe[NotUsed])(Keep.right)
+ }
+
+ val (completion, element) = Source.empty.viaMat(flow)(Keep.right).toMat(Sink.head)(Keep.both).run()
+ completion.futureValue.trySuccess(Some(NotUsed))
+ element.futureValue shouldBe NotUsed
+ }
+
+ "propagate attributes" in {
+ val flow = Flow
+ .setup { (_, attr) ⇒
+ Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.nameLifted))
+ }
+ .named("my-name")
+
+ Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("my-name")
+ }
+
+ "propagate attributes when nested" in {
+ val flow = Flow
+ .setup { (_, _) ⇒
+ Flow.setup { (_, attr) ⇒
+ Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.nameLifted))
+ }
+ }
+ .named("my-name")
+
+ Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("my-name")
+ }
+
+ "handle factory failure" in {
+ val error = new Error("boom")
+ val flow = Flow.setup { (_, _) ⇒
+ throw error
+ }
+
+ val (materialized, completion) = Source.empty.viaMat(flow)(Keep.right).toMat(Sink.head)(Keep.both).run()
+ materialized.failed.futureValue.getCause shouldBe error
+ completion.failed.futureValue.getCause shouldBe error
+ }
+
+ "handle materialization failure" in {
+ val error = new Error("boom")
+ val flow = Flow.setup { (_, _) ⇒
+ Flow[NotUsed].mapMaterializedValue(_ ⇒ throw error)
+ }
+
+ val (materialized, completion) = Source.empty.viaMat(flow)(Keep.right).toMat(Sink.head)(Keep.both).run()
+ materialized.failed.futureValue.getCause shouldBe error
+ completion.failed.futureValue.getCause shouldBe error
+ }
+
+ }
+
+ "Sink.setup" should {
+
+ "expose materializer" in {
+ val sink = Sink.setup { (mat, _) ⇒
+ Sink.fold(mat.isShutdown)(Keep.left)
+ }
+
+ Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe false
+ }
+
+ "expose attributes" in {
+ val sink = Sink.setup { (_, attr) ⇒
+ Sink.fold(attr.attributeList)(Keep.left)
+ }
+
+ Source.empty.runWith(sink).flatMap(identity).futureValue should not be empty
+ }
+
+ "propagate materialized value" in {
+ val sink = Sink.setup { (_, _) ⇒
+ Sink.fold(NotUsed)(Keep.left)
+ }
+
+ Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe NotUsed
+ }
+
+ "propagate attributes" in {
+ val sink = Sink
+ .setup { (_, attr) ⇒
+ Sink.fold(attr.nameLifted)(Keep.left)
+ }
+ .named("my-name")
+
+ Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe Some("my-name")
+ }
+
+ "propagate attributes when nested" in {
+ val sink = Sink
+ .setup { (_, _) ⇒
+ Sink.setup { (_, attr) ⇒
+ Sink.fold(attr.nameLifted)(Keep.left)
+ }
+ }
+ .named("my-name")
+
+ Source.empty.runWith(sink).flatMap(identity).flatMap(identity).futureValue shouldBe Some("my-name")
+ }
+
+ "handle factory failure" in {
+ val error = new Error("boom")
+ val sink = Sink.setup { (_, _) ⇒
+ throw error
+ }
+
+ Source.empty.runWith(sink).failed.futureValue.getCause shouldBe error
+ }
+
+ "handle materialization failure" in {
+ val error = new Error("boom")
+ val sink = Sink.setup { (_, _) ⇒
+ Sink.ignore.mapMaterializedValue(_ ⇒ throw error)
+ }
+
+ Source.empty.runWith(sink).failed.futureValue.getCause shouldBe error
+ }
+
+ }
+
+}
diff --git a/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala b/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala
new file mode 100644
index 0000000000..333564136e
--- /dev/null
+++ b/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala
@@ -0,0 +1,165 @@
+/*
+ * Copyright (C) 2019 Lightbend Inc.
+ */
+
+package akka.stream.impl
+
+import akka.annotation.InternalApi
+import akka.stream.scaladsl.{ Flow, Keep, Sink, Source }
+import akka.stream._
+import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler }
+
+import scala.concurrent.{ Future, Promise }
+import scala.util.control.NonFatal
+
+/** Internal Api */
+@InternalApi private[stream] final class SetupSinkStage[T, M](factory: (ActorMaterializer, Attributes) ⇒ Sink[T, M])
+ extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
+
+ private val in = Inlet[T]("SetupSinkStage.in")
+ override val shape = SinkShape(in)
+
+ override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
+ val matPromise = Promise[M]
+ (createStageLogic(matPromise), matPromise.future)
+ }
+
+ private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) {
+ import SetupStage._
+
+ val subOutlet = new SubSourceOutlet[T]("SetupSinkStage")
+ subOutlet.setHandler(delegateToInlet(() ⇒ pull(in), () ⇒ cancel(in)))
+ setHandler(in, delegateToSubOutlet(() ⇒ grab(in), subOutlet))
+
+ override def preStart(): Unit = {
+ try {
+ val sink = factory(ActorMaterializerHelper.downcast(materializer), attributes)
+
+ val mat = Source.fromGraph(subOutlet.source).runWith(sink.withAttributes(attributes))(subFusingMaterializer)
+ matPromise.success(mat)
+ } catch {
+ case NonFatal(ex) ⇒
+ matPromise.failure(ex)
+ throw ex
+ }
+ }
+ }
+
+}
+
+/** Internal Api */
+@InternalApi private[stream] final class SetupFlowStage[T, U, M](
+ factory: (ActorMaterializer, Attributes) ⇒ Flow[T, U, M])
+ extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] {
+
+ private val in = Inlet[T]("SetupFlowStage.in")
+ private val out = Outlet[U]("SetupFlowStage.out")
+ override val shape = FlowShape(in, out)
+
+ override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
+ val matPromise = Promise[M]
+ (createStageLogic(matPromise), matPromise.future)
+ }
+
+ private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) {
+ import SetupStage._
+
+ val subInlet = new SubSinkInlet[U]("SetupFlowStage")
+ val subOutlet = new SubSourceOutlet[T]("SetupFlowStage")
+
+ subInlet.setHandler(delegateToOutlet(push(out, _: U), () ⇒ complete(out), fail(out, _), subInlet))
+ subOutlet.setHandler(delegateToInlet(() ⇒ pull(in), () ⇒ cancel(in)))
+
+ setHandler(in, delegateToSubOutlet(() ⇒ grab(in), subOutlet))
+ setHandler(out, delegateToSubInlet(subInlet))
+
+ override def preStart(): Unit = {
+ try {
+ val flow = factory(ActorMaterializerHelper.downcast(materializer), attributes)
+
+ val mat = Source
+ .fromGraph(subOutlet.source)
+ .viaMat(flow.withAttributes(attributes))(Keep.right)
+ .to(Sink.fromGraph(subInlet.sink))
+ .run()(subFusingMaterializer)
+ matPromise.success(mat)
+ } catch {
+ case NonFatal(ex) ⇒
+ matPromise.failure(ex)
+ throw ex
+ }
+ }
+ }
+}
+
+/** Internal Api */
+@InternalApi private[stream] final class SetupSourceStage[T, M](factory: (ActorMaterializer, Attributes) ⇒ Source[T, M])
+ extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] {
+
+ private val out = Outlet[T]("SetupSourceStage.out")
+ override val shape = SourceShape(out)
+
+ override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
+ val matPromise = Promise[M]
+ (createStageLogic(matPromise), matPromise.future)
+ }
+
+ private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) {
+ import SetupStage._
+
+ val subInlet = new SubSinkInlet[T]("SetupSourceStage")
+ subInlet.setHandler(delegateToOutlet(push(out, _: T), () ⇒ complete(out), fail(out, _), subInlet))
+ setHandler(out, delegateToSubInlet(subInlet))
+
+ override def preStart(): Unit = {
+ try {
+ val source = factory(ActorMaterializerHelper.downcast(materializer), attributes)
+
+ val mat = source.withAttributes(attributes).to(Sink.fromGraph(subInlet.sink)).run()(subFusingMaterializer)
+ matPromise.success(mat)
+ } catch {
+ case NonFatal(ex) ⇒
+ matPromise.failure(ex)
+ throw ex
+ }
+ }
+ }
+}
+
+private object SetupStage {
+ def delegateToSubOutlet[T](grab: () ⇒ T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler {
+ override def onPush(): Unit =
+ subOutlet.push(grab())
+ override def onUpstreamFinish(): Unit =
+ subOutlet.complete()
+ override def onUpstreamFailure(ex: Throwable): Unit =
+ subOutlet.fail(ex)
+ }
+
+ def delegateToOutlet[T](
+ push: T ⇒ Unit,
+ complete: () ⇒ Unit,
+ fail: Throwable ⇒ Unit,
+ subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler {
+ override def onPush(): Unit =
+ push(subInlet.grab())
+ override def onUpstreamFinish(): Unit =
+ complete()
+ override def onUpstreamFailure(ex: Throwable): Unit =
+ fail(ex)
+ }
+
+ def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler {
+ override def onPull(): Unit =
+ subInlet.pull()
+ override def onDownstreamFinish(): Unit =
+ subInlet.cancel()
+ }
+
+ def delegateToInlet(pull: () ⇒ Unit, cancel: () ⇒ Unit) = new OutHandler {
+ override def onPull(): Unit =
+ pull()
+ override def onDownstreamFinish(): Unit =
+ cancel()
+ }
+}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
index 766daec987..5bd9b1ad6b 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
@@ -14,7 +14,7 @@ import org.reactivestreams.Processor
import scala.concurrent.duration.FiniteDuration
import java.util.{ Comparator, Optional }
import java.util.concurrent.CompletionStage
-import java.util.function.Supplier
+import java.util.function.{ BiFunction, Supplier }
import akka.util.JavaDurationConverters._
import akka.actor.ActorRef
@@ -62,6 +62,15 @@ object Flow {
case other => new Flow(scaladsl.Flow.fromGraph(other))
}
+ /**
+ * Defers the creation of a [[Flow]] until materialization. The `factory` function
+ * exposes [[ActorMaterializer]] which is going to be used during materialization and
+ * [[Attributes]] of the [[Flow]] returned by this method.
+ */
+ def setup[I, O, M](
+ factory: BiFunction[ActorMaterializer, Attributes, Flow[I, O, M]]): Flow[I, O, CompletionStage[M]] =
+ scaladsl.Flow.setup((mat, attr) ⇒ factory(mat, attr).asScala).mapMaterializedValue(_.toJava).asJava
+
/**
* Creates a `Flow` from a `Sink` and a `Source` where the Flow's input
* will be sent to the Sink and the Flow's output will come from the Source.
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala
index 37d3420952..4bd6c2fe2e 100644
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala
@@ -18,6 +18,8 @@ import scala.compat.java8.OptionConverters._
import scala.concurrent.ExecutionContext
import scala.util.Try
import java.util.concurrent.CompletionStage
+import java.util.function.BiFunction
+
import scala.collection.immutable
import scala.annotation.unchecked.uncheckedVariance
import scala.compat.java8.FutureConverters._
@@ -285,6 +287,14 @@ object Sink {
case other => new Sink(scaladsl.Sink.fromGraph(other))
}
+ /**
+ * Defers the creation of a [[Sink]] until materialization. The `factory` function
+ * exposes [[ActorMaterializer]] which is going to be used during materialization and
+ * [[Attributes]] of the [[Sink]] returned by this method.
+ */
+ def setup[T, M](factory: BiFunction[ActorMaterializer, Attributes, Sink[T, M]]): Sink[T, CompletionStage[M]] =
+ scaladsl.Sink.setup((mat, attr) ⇒ factory(mat, attr).asScala).mapMaterializedValue(_.toJava).asJava
+
/**
* Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`.
*/
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
index 8e80e87ac9..4c1346fdfd 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
@@ -26,7 +26,7 @@ import scala.concurrent.{ Future, Promise }
import scala.compat.java8.OptionConverters._
import java.util.concurrent.CompletionStage
import java.util.concurrent.CompletableFuture
-import java.util.function.Supplier
+import java.util.function.{ BiFunction, Supplier }
import akka.util.unused
import com.github.ghik.silencer.silent
@@ -359,6 +359,14 @@ object Source {
case other => new Source(scaladsl.Source.fromGraph(other))
}
+ /**
+ * Defers the creation of a [[Source]] until materialization. The `factory` function
+ * exposes [[ActorMaterializer]] which is going to be used during materialization and
+ * [[Attributes]] of the [[Source]] returned by this method.
+ */
+ def setup[T, M](factory: BiFunction[ActorMaterializer, Attributes, Source[T, M]]): Source[T, CompletionStage[M]] =
+ scaladsl.Source.setup((mat, attr) ⇒ factory(mat, attr).asScala).mapMaterializedValue(_.toJava).asJava
+
/**
* Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`.
*/
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
index c1753315cf..8679454a3d 100755
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
@@ -11,6 +11,7 @@ import akka.stream.impl.{
fusing,
LinearTraversalBuilder,
ProcessorModule,
+ SetupFlowStage,
SubFlowImpl,
Throttle,
Timers,
@@ -387,6 +388,14 @@ object Flow {
case _ => new Flow(LinearTraversalBuilder.fromBuilder(g.traversalBuilder, g.shape, Keep.right), g.shape)
}
+ /**
+ * Defers the creation of a [[Flow]] until materialization. The `factory` function
+ * exposes [[ActorMaterializer]] which is going to be used during materialization and
+ * [[Attributes]] of the [[Flow]] returned by this method.
+ */
+ def setup[T, U, M](factory: (ActorMaterializer, Attributes) ⇒ Flow[T, U, M]): Flow[T, U, Future[M]] =
+ Flow.fromGraph(new SetupFlowStage(factory))
+
/**
* Creates a `Flow` from a `Sink` and a `Source` where the Flow's input
* will be sent to the Sink and the Flow's output will come from the Source.
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
index fe7420687a..378b5d9e31 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
@@ -145,6 +145,14 @@ object Sink {
new Sink(LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right), other.shape)
}
+ /**
+ * Defers the creation of a [[Sink]] until materialization. The `factory` function
+ * exposes [[ActorMaterializer]] which is going to be used during materialization and
+ * [[Attributes]] of the [[Sink]] returned by this method.
+ */
+ def setup[T, M](factory: (ActorMaterializer, Attributes) ⇒ Sink[T, M]): Sink[T, Future[M]] =
+ Sink.fromGraph(new SetupSinkStage(factory))
+
/**
* Helper to create [[Sink]] from `Subscriber`.
*/
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
index 28a2ed9c27..03f17c0769 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
@@ -295,6 +295,14 @@ object Source {
new Source(LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right), other.shape)
}
+ /**
+ * Defers the creation of a [[Source]] until materialization. The `factory` function
+ * exposes [[ActorMaterializer]] which is going to be used during materialization and
+ * [[Attributes]] of the [[Source]] returned by this method.
+ */
+ def setup[T, M](factory: (ActorMaterializer, Attributes) ⇒ Source[T, M]): Source[T, Future[M]] =
+ Source.fromGraph(new SetupSourceStage(factory))
+
/**
* Helper to create [[Source]] from `Iterable`.
* Example usage: `Source(Seq(1,2,3))`
diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala
index 5a78405612..080d3fe466 100644
--- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala
+++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala
@@ -77,7 +77,7 @@ abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S,
*
* Extend this `AbstractGraphStageWithMaterializedValue` if you want to provide a materialized value,
* represented by the type parameter `M`. If your GraphStage does not need to provide a materialized
- * value you can instead extende [[GraphStage]] which materializes a [[NotUsed]] value.
+ * value you can instead extend [[GraphStage]] which materializes a [[NotUsed]] value.
*
* A GraphStage consists of a [[Shape]] which describes its input and output ports and a factory function that
* creates a [[GraphStageLogic]] which implements the processing logic that ties the ports together.
@@ -389,11 +389,16 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
*/
private[akka] def interpreter: GraphInterpreter =
if (_interpreter == null)
- throw new IllegalStateException("not yet initialized: only setHandler is allowed in GraphStageLogic constructor")
+ throw new IllegalStateException(
+ "not yet initialized: only setHandler is allowed in GraphStageLogic constructor. To access materializer use Source/Flow/Sink.setup factory")
else _interpreter
/**
* The [[akka.stream.Materializer]] that has set this GraphStage in motion.
+ *
+ * Can not be used from a `GraphStage` constructor. Access to materializer is provided by the
+ * [[akka.stream.scaladsl.Source.setup]], [[akka.stream.scaladsl.Flow.setup]] and [[akka.stream.scaladsl.Sink.setup]]
+ * and their corresponding Java API factories.
*/
protected def materializer: Materializer = interpreter.materializer