From 18d970fc8ebb60b864a883fa04725d21a811fa63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martynas=20Mickevi=C4=8Dius?= Date: Fri, 17 May 2019 09:54:18 +0300 Subject: [PATCH] Add setup operator #26192 --- .../paradox/stream/operators/Sink/setup.md | 17 ++ .../stream/operators/Source-or-Flow/setup.md | 19 ++ .../main/paradox/stream/operators/index.md | 4 + .../java/akka/stream/javadsl/SetupTest.java | 75 ++++++ .../stream/DslFactoriesConsistencySpec.scala | 1 + .../akka/stream/scaladsl/SetupSpec.scala | 232 ++++++++++++++++++ .../scala/akka/stream/impl/SetupStage.scala | 165 +++++++++++++ .../main/scala/akka/stream/javadsl/Flow.scala | 11 +- .../main/scala/akka/stream/javadsl/Sink.scala | 10 + .../scala/akka/stream/javadsl/Source.scala | 10 +- .../scala/akka/stream/scaladsl/Flow.scala | 9 + .../scala/akka/stream/scaladsl/Sink.scala | 8 + .../scala/akka/stream/scaladsl/Source.scala | 8 + .../scala/akka/stream/stage/GraphStage.scala | 9 +- 14 files changed, 574 insertions(+), 4 deletions(-) create mode 100644 akka-docs/src/main/paradox/stream/operators/Sink/setup.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Source-or-Flow/setup.md create mode 100644 akka-stream-tests/src/test/java/akka/stream/javadsl/SetupTest.java create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/setup.md b/akka-docs/src/main/paradox/stream/operators/Sink/setup.md new file mode 100644 index 0000000000..855d08b022 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Sink/setup.md @@ -0,0 +1,17 @@ +# Sink.setup + +Defer the creation of a `Sink` until materialization and access `ActorMaterializer` and `Attributes` + +@ref[Sink operators](../index.md#sink-operators) + +@@@ div { .group-scala } + +## Signature + +@@signature [Sink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #setup } +@@@ + +## Description + +Typically used when access to materializer is needed to run a different stream during the construction of a sink. +Can also be used to access the underlying `ActorSystem` from `ActorMaterializer`. \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/setup.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/setup.md new file mode 100644 index 0000000000..b4421d07b2 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/setup.md @@ -0,0 +1,19 @@ +# Source/Flow.setup + +Defer the creation of a `Source/Flow` until materialization and access `ActorMaterializer` and `Attributes` + +@ref[Simple operators](../index.md#simple-operators) + +@@@ div { .group-scala } + +## Signature + +@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #setup } +@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #setup } + +@@@ + +## Description + +Typically used when access to materializer is needed to run a different stream during the construction of a source/flow. +Can also be used to access the underlying `ActorSystem` from `ActorMaterializer`. \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index ea5f587d39..4b390e1d43 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -64,6 +64,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav |Sink|@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.| |Sink|@ref[reduce](Sink/reduce.md)|Apply a reduction function on the incoming elements and pass the result to the next invocation.| |Sink|@ref[seq](Sink/seq.md)|Collect values emitted from the stream into a collection.| +|Sink|@ref[setup](Sink/setup.md)|Defer the creation of a `Sink` until materialization and access `ActorMaterializer` and `Attributes`| |Sink|@ref[takeLast](Sink/takeLast.md)|Collect the last `n` values emitted from the stream into a collection.| ## Additional Sink and Source converters @@ -150,6 +151,7 @@ depending on being backpressured by downstream or not. |Source/Flow|@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.| |Source/Flow|@ref[scan](Source-or-Flow/scan.md)|Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value.| |Source/Flow|@ref[scanAsync](Source-or-Flow/scanAsync.md)|Just like `scan` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.| +|Source/Flow|@ref[setup](Source-or-Flow/setup.md)|Defer the creation of a `Source/Flow` until materialization and access `ActorMaterializer` and `Attributes`| |Source/Flow|@ref[sliding](Source-or-Flow/sliding.md)|Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.| |Source/Flow|@ref[statefulMapConcat](Source-or-Flow/statefulMapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.| |Source/Flow|@ref[take](Source-or-Flow/take.md)|Pass `n` incoming elements downstream and then complete| @@ -291,6 +293,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [fromPublisher](Source/fromPublisher.md) * [fromIterator](Source/fromIterator.md) * [cycle](Source/cycle.md) +* [setup](Source-or-Flow/setup.md) * [fromFuture](Source/fromFuture.md) * [fromCompletionStage](Source/fromCompletionStage.md) * [fromFutureSource](Source/fromFutureSource.md) @@ -391,6 +394,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [fromSinkAndSourceCoupled](Flow/fromSinkAndSourceCoupled.md) * [lazyInitAsync](Flow/lazyInitAsync.md) * [preMaterialize](Sink/preMaterialize.md) +* [setup](Sink/setup.md) * [fromSubscriber](Sink/fromSubscriber.md) * [cancelled](Sink/cancelled.md) * [head](Sink/head.md) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SetupTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SetupTest.java new file mode 100644 index 0000000000..f143b9610a --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SetupTest.java @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream.javadsl; + +import akka.NotUsed; +import akka.japi.Pair; +import akka.stream.StreamTest; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class SetupTest extends StreamTest { + public SetupTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource("SetupTest", AkkaSpec.testConf()); + + @Test + public void shouldExposeMaterializerAndAttributesToSource() throws Exception { + final Source, CompletionStage> source = + Source.setup( + (mat, attr) -> + Source.single(Pair.create(mat.isShutdown(), attr.attributeList().isEmpty()))); + + assertEquals( + Pair.create(false, false), + source.runWith(Sink.head(), materializer).toCompletableFuture().get(5, TimeUnit.SECONDS)); + } + + @Test + public void shouldExposeMaterializerAndAttributesToFlow() throws Exception { + final Flow, CompletionStage> flow = + Flow.setup( + (mat, attr) -> + Flow.fromSinkAndSource( + Sink.ignore(), + Source.single(Pair.create(mat.isShutdown(), attr.attributeList().isEmpty())))); + + assertEquals( + Pair.create(false, false), + Source.empty() + .via(flow) + .runWith(Sink.head(), materializer) + .toCompletableFuture() + .get(5, TimeUnit.SECONDS)); + } + + @Test + public void shouldExposeMaterializerAndAttributesToSink() throws Exception { + Sink>>> sink = + Sink.setup( + (mat, attr) -> + Sink.fold( + Pair.create(mat.isShutdown(), attr.attributeList().isEmpty()), Keep.left())); + + assertEquals( + Pair.create(false, false), + Source.empty() + .runWith(sink, materializer) + .thenCompose(c -> c) + .toCompletableFuture() + .get(5, TimeUnit.SECONDS)); + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index f20c1cc7ff..7f0be3d358 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -47,6 +47,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { (classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) :: (classOf[scala.Function1[_, Unit]], classOf[akka.japi.function.Procedure[_]]) :: (classOf[scala.Function1[_, _]], classOf[akka.japi.function.Function[_, _]]) :: + (classOf[scala.Function2[_, _, _]], classOf[java.util.function.BiFunction[_, _, _]]) :: // setup (classOf[scala.Function1[scala.Function1[_, _], _]], classOf[akka.japi.function.Function2[_, _, _]]) :: (classOf[akka.stream.scaladsl.Source[_, _]], classOf[akka.stream.javadsl.Source[_, _]]) :: (classOf[akka.stream.scaladsl.Sink[_, _]], classOf[akka.stream.javadsl.Sink[_, _]]) :: diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala new file mode 100644 index 0000000000..d57e365998 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala @@ -0,0 +1,232 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.NotUsed +import akka.stream.ActorMaterializer +import akka.stream.testkit.StreamSpec + +class SetupSpec extends StreamSpec { + + implicit val materializer = ActorMaterializer() + import system.dispatcher + + "Source.setup" should { + + "expose materializer" in { + val source = Source.setup { (mat, _) ⇒ + Source.single(mat.isShutdown) + } + + source.runWith(Sink.head).futureValue shouldBe false + } + + "expose attributes" in { + val source = Source.setup { (_, attr) ⇒ + Source.single(attr.attributeList) + } + + source.runWith(Sink.head).futureValue should not be empty + } + + "propagate materialized value" in { + val source = Source.setup { (_, _) ⇒ + Source.maybe[NotUsed] + } + + val (completion, element) = source.toMat(Sink.head)(Keep.both).run() + completion.futureValue.trySuccess(Some(NotUsed)) + element.futureValue shouldBe NotUsed + } + + "propagate attributes" in { + val source = Source + .setup { (_, attr) ⇒ + Source.single(attr.nameLifted) + } + .named("my-name") + + source.runWith(Sink.head).futureValue shouldBe Some("my-name") + } + + "propagate attributes when nested" in { + val source = Source + .setup { (_, _) ⇒ + Source.setup { (_, attr) ⇒ + Source.single(attr.nameLifted) + } + } + .named("my-name") + + source.runWith(Sink.head).futureValue shouldBe Some("my-name") + } + + "handle factory failure" in { + val error = new Error("boom") + val source = Source.setup { (_, _) ⇒ + throw error + } + + val (materialized, completion) = source.toMat(Sink.head)(Keep.both).run() + materialized.failed.futureValue.getCause shouldBe error + completion.failed.futureValue.getCause shouldBe error + } + + "handle materialization failure" in { + val error = new Error("boom") + val source = Source.setup { (_, _) ⇒ + Source.empty.mapMaterializedValue(_ ⇒ throw error) + } + + val (materialized, completion) = source.toMat(Sink.head)(Keep.both).run() + materialized.failed.futureValue.getCause shouldBe error + completion.failed.futureValue.getCause shouldBe error + } + + } + + "Flow.setup" should { + + "expose materializer" in { + val flow = Flow.setup { (mat, _) ⇒ + Flow.fromSinkAndSource(Sink.ignore, Source.single(mat.isShutdown)) + } + + Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe false + } + + "expose attributes" in { + val flow = Flow.setup { (_, attr) ⇒ + Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.attributeList)) + } + + Source.empty.via(flow).runWith(Sink.head).futureValue should not be empty + } + + "propagate materialized value" in { + val flow = Flow.setup { (_, _) ⇒ + Flow.fromSinkAndSourceMat(Sink.ignore, Source.maybe[NotUsed])(Keep.right) + } + + val (completion, element) = Source.empty.viaMat(flow)(Keep.right).toMat(Sink.head)(Keep.both).run() + completion.futureValue.trySuccess(Some(NotUsed)) + element.futureValue shouldBe NotUsed + } + + "propagate attributes" in { + val flow = Flow + .setup { (_, attr) ⇒ + Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.nameLifted)) + } + .named("my-name") + + Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("my-name") + } + + "propagate attributes when nested" in { + val flow = Flow + .setup { (_, _) ⇒ + Flow.setup { (_, attr) ⇒ + Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.nameLifted)) + } + } + .named("my-name") + + Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("my-name") + } + + "handle factory failure" in { + val error = new Error("boom") + val flow = Flow.setup { (_, _) ⇒ + throw error + } + + val (materialized, completion) = Source.empty.viaMat(flow)(Keep.right).toMat(Sink.head)(Keep.both).run() + materialized.failed.futureValue.getCause shouldBe error + completion.failed.futureValue.getCause shouldBe error + } + + "handle materialization failure" in { + val error = new Error("boom") + val flow = Flow.setup { (_, _) ⇒ + Flow[NotUsed].mapMaterializedValue(_ ⇒ throw error) + } + + val (materialized, completion) = Source.empty.viaMat(flow)(Keep.right).toMat(Sink.head)(Keep.both).run() + materialized.failed.futureValue.getCause shouldBe error + completion.failed.futureValue.getCause shouldBe error + } + + } + + "Sink.setup" should { + + "expose materializer" in { + val sink = Sink.setup { (mat, _) ⇒ + Sink.fold(mat.isShutdown)(Keep.left) + } + + Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe false + } + + "expose attributes" in { + val sink = Sink.setup { (_, attr) ⇒ + Sink.fold(attr.attributeList)(Keep.left) + } + + Source.empty.runWith(sink).flatMap(identity).futureValue should not be empty + } + + "propagate materialized value" in { + val sink = Sink.setup { (_, _) ⇒ + Sink.fold(NotUsed)(Keep.left) + } + + Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe NotUsed + } + + "propagate attributes" in { + val sink = Sink + .setup { (_, attr) ⇒ + Sink.fold(attr.nameLifted)(Keep.left) + } + .named("my-name") + + Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe Some("my-name") + } + + "propagate attributes when nested" in { + val sink = Sink + .setup { (_, _) ⇒ + Sink.setup { (_, attr) ⇒ + Sink.fold(attr.nameLifted)(Keep.left) + } + } + .named("my-name") + + Source.empty.runWith(sink).flatMap(identity).flatMap(identity).futureValue shouldBe Some("my-name") + } + + "handle factory failure" in { + val error = new Error("boom") + val sink = Sink.setup { (_, _) ⇒ + throw error + } + + Source.empty.runWith(sink).failed.futureValue.getCause shouldBe error + } + + "handle materialization failure" in { + val error = new Error("boom") + val sink = Sink.setup { (_, _) ⇒ + Sink.ignore.mapMaterializedValue(_ ⇒ throw error) + } + + Source.empty.runWith(sink).failed.futureValue.getCause shouldBe error + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala b/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala new file mode 100644 index 0000000000..333564136e --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala @@ -0,0 +1,165 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream.impl + +import akka.annotation.InternalApi +import akka.stream.scaladsl.{ Flow, Keep, Sink, Source } +import akka.stream._ +import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler } + +import scala.concurrent.{ Future, Promise } +import scala.util.control.NonFatal + +/** Internal Api */ +@InternalApi private[stream] final class SetupSinkStage[T, M](factory: (ActorMaterializer, Attributes) ⇒ Sink[T, M]) + extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { + + private val in = Inlet[T]("SetupSinkStage.in") + override val shape = SinkShape(in) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { + val matPromise = Promise[M] + (createStageLogic(matPromise), matPromise.future) + } + + private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { + import SetupStage._ + + val subOutlet = new SubSourceOutlet[T]("SetupSinkStage") + subOutlet.setHandler(delegateToInlet(() ⇒ pull(in), () ⇒ cancel(in))) + setHandler(in, delegateToSubOutlet(() ⇒ grab(in), subOutlet)) + + override def preStart(): Unit = { + try { + val sink = factory(ActorMaterializerHelper.downcast(materializer), attributes) + + val mat = Source.fromGraph(subOutlet.source).runWith(sink.withAttributes(attributes))(subFusingMaterializer) + matPromise.success(mat) + } catch { + case NonFatal(ex) ⇒ + matPromise.failure(ex) + throw ex + } + } + } + +} + +/** Internal Api */ +@InternalApi private[stream] final class SetupFlowStage[T, U, M]( + factory: (ActorMaterializer, Attributes) ⇒ Flow[T, U, M]) + extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { + + private val in = Inlet[T]("SetupFlowStage.in") + private val out = Outlet[U]("SetupFlowStage.out") + override val shape = FlowShape(in, out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { + val matPromise = Promise[M] + (createStageLogic(matPromise), matPromise.future) + } + + private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { + import SetupStage._ + + val subInlet = new SubSinkInlet[U]("SetupFlowStage") + val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") + + subInlet.setHandler(delegateToOutlet(push(out, _: U), () ⇒ complete(out), fail(out, _), subInlet)) + subOutlet.setHandler(delegateToInlet(() ⇒ pull(in), () ⇒ cancel(in))) + + setHandler(in, delegateToSubOutlet(() ⇒ grab(in), subOutlet)) + setHandler(out, delegateToSubInlet(subInlet)) + + override def preStart(): Unit = { + try { + val flow = factory(ActorMaterializerHelper.downcast(materializer), attributes) + + val mat = Source + .fromGraph(subOutlet.source) + .viaMat(flow.withAttributes(attributes))(Keep.right) + .to(Sink.fromGraph(subInlet.sink)) + .run()(subFusingMaterializer) + matPromise.success(mat) + } catch { + case NonFatal(ex) ⇒ + matPromise.failure(ex) + throw ex + } + } + } +} + +/** Internal Api */ +@InternalApi private[stream] final class SetupSourceStage[T, M](factory: (ActorMaterializer, Attributes) ⇒ Source[T, M]) + extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { + + private val out = Outlet[T]("SetupSourceStage.out") + override val shape = SourceShape(out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { + val matPromise = Promise[M] + (createStageLogic(matPromise), matPromise.future) + } + + private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { + import SetupStage._ + + val subInlet = new SubSinkInlet[T]("SetupSourceStage") + subInlet.setHandler(delegateToOutlet(push(out, _: T), () ⇒ complete(out), fail(out, _), subInlet)) + setHandler(out, delegateToSubInlet(subInlet)) + + override def preStart(): Unit = { + try { + val source = factory(ActorMaterializerHelper.downcast(materializer), attributes) + + val mat = source.withAttributes(attributes).to(Sink.fromGraph(subInlet.sink)).run()(subFusingMaterializer) + matPromise.success(mat) + } catch { + case NonFatal(ex) ⇒ + matPromise.failure(ex) + throw ex + } + } + } +} + +private object SetupStage { + def delegateToSubOutlet[T](grab: () ⇒ T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { + override def onPush(): Unit = + subOutlet.push(grab()) + override def onUpstreamFinish(): Unit = + subOutlet.complete() + override def onUpstreamFailure(ex: Throwable): Unit = + subOutlet.fail(ex) + } + + def delegateToOutlet[T]( + push: T ⇒ Unit, + complete: () ⇒ Unit, + fail: Throwable ⇒ Unit, + subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { + override def onPush(): Unit = + push(subInlet.grab()) + override def onUpstreamFinish(): Unit = + complete() + override def onUpstreamFailure(ex: Throwable): Unit = + fail(ex) + } + + def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { + override def onPull(): Unit = + subInlet.pull() + override def onDownstreamFinish(): Unit = + subInlet.cancel() + } + + def delegateToInlet(pull: () ⇒ Unit, cancel: () ⇒ Unit) = new OutHandler { + override def onPull(): Unit = + pull() + override def onDownstreamFinish(): Unit = + cancel() + } +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 766daec987..5bd9b1ad6b 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -14,7 +14,7 @@ import org.reactivestreams.Processor import scala.concurrent.duration.FiniteDuration import java.util.{ Comparator, Optional } import java.util.concurrent.CompletionStage -import java.util.function.Supplier +import java.util.function.{ BiFunction, Supplier } import akka.util.JavaDurationConverters._ import akka.actor.ActorRef @@ -62,6 +62,15 @@ object Flow { case other => new Flow(scaladsl.Flow.fromGraph(other)) } + /** + * Defers the creation of a [[Flow]] until materialization. The `factory` function + * exposes [[ActorMaterializer]] which is going to be used during materialization and + * [[Attributes]] of the [[Flow]] returned by this method. + */ + def setup[I, O, M]( + factory: BiFunction[ActorMaterializer, Attributes, Flow[I, O, M]]): Flow[I, O, CompletionStage[M]] = + scaladsl.Flow.setup((mat, attr) ⇒ factory(mat, attr).asScala).mapMaterializedValue(_.toJava).asJava + /** * Creates a `Flow` from a `Sink` and a `Source` where the Flow's input * will be sent to the Sink and the Flow's output will come from the Source. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 37d3420952..4bd6c2fe2e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -18,6 +18,8 @@ import scala.compat.java8.OptionConverters._ import scala.concurrent.ExecutionContext import scala.util.Try import java.util.concurrent.CompletionStage +import java.util.function.BiFunction + import scala.collection.immutable import scala.annotation.unchecked.uncheckedVariance import scala.compat.java8.FutureConverters._ @@ -285,6 +287,14 @@ object Sink { case other => new Sink(scaladsl.Sink.fromGraph(other)) } + /** + * Defers the creation of a [[Sink]] until materialization. The `factory` function + * exposes [[ActorMaterializer]] which is going to be used during materialization and + * [[Attributes]] of the [[Sink]] returned by this method. + */ + def setup[T, M](factory: BiFunction[ActorMaterializer, Attributes, Sink[T, M]]): Sink[T, CompletionStage[M]] = + scaladsl.Sink.setup((mat, attr) ⇒ factory(mat, attr).asScala).mapMaterializedValue(_.toJava).asJava + /** * Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`. */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 8e80e87ac9..4c1346fdfd 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -26,7 +26,7 @@ import scala.concurrent.{ Future, Promise } import scala.compat.java8.OptionConverters._ import java.util.concurrent.CompletionStage import java.util.concurrent.CompletableFuture -import java.util.function.Supplier +import java.util.function.{ BiFunction, Supplier } import akka.util.unused import com.github.ghik.silencer.silent @@ -359,6 +359,14 @@ object Source { case other => new Source(scaladsl.Source.fromGraph(other)) } + /** + * Defers the creation of a [[Source]] until materialization. The `factory` function + * exposes [[ActorMaterializer]] which is going to be used during materialization and + * [[Attributes]] of the [[Source]] returned by this method. + */ + def setup[T, M](factory: BiFunction[ActorMaterializer, Attributes, Source[T, M]]): Source[T, CompletionStage[M]] = + scaladsl.Source.setup((mat, attr) ⇒ factory(mat, attr).asScala).mapMaterializedValue(_.toJava).asJava + /** * Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index c1753315cf..8679454a3d 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -11,6 +11,7 @@ import akka.stream.impl.{ fusing, LinearTraversalBuilder, ProcessorModule, + SetupFlowStage, SubFlowImpl, Throttle, Timers, @@ -387,6 +388,14 @@ object Flow { case _ => new Flow(LinearTraversalBuilder.fromBuilder(g.traversalBuilder, g.shape, Keep.right), g.shape) } + /** + * Defers the creation of a [[Flow]] until materialization. The `factory` function + * exposes [[ActorMaterializer]] which is going to be used during materialization and + * [[Attributes]] of the [[Flow]] returned by this method. + */ + def setup[T, U, M](factory: (ActorMaterializer, Attributes) ⇒ Flow[T, U, M]): Flow[T, U, Future[M]] = + Flow.fromGraph(new SetupFlowStage(factory)) + /** * Creates a `Flow` from a `Sink` and a `Source` where the Flow's input * will be sent to the Sink and the Flow's output will come from the Source. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index fe7420687a..378b5d9e31 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -145,6 +145,14 @@ object Sink { new Sink(LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right), other.shape) } + /** + * Defers the creation of a [[Sink]] until materialization. The `factory` function + * exposes [[ActorMaterializer]] which is going to be used during materialization and + * [[Attributes]] of the [[Sink]] returned by this method. + */ + def setup[T, M](factory: (ActorMaterializer, Attributes) ⇒ Sink[T, M]): Sink[T, Future[M]] = + Sink.fromGraph(new SetupSinkStage(factory)) + /** * Helper to create [[Sink]] from `Subscriber`. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 28a2ed9c27..03f17c0769 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -295,6 +295,14 @@ object Source { new Source(LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right), other.shape) } + /** + * Defers the creation of a [[Source]] until materialization. The `factory` function + * exposes [[ActorMaterializer]] which is going to be used during materialization and + * [[Attributes]] of the [[Source]] returned by this method. + */ + def setup[T, M](factory: (ActorMaterializer, Attributes) ⇒ Source[T, M]): Source[T, Future[M]] = + Source.fromGraph(new SetupSourceStage(factory)) + /** * Helper to create [[Source]] from `Iterable`. * Example usage: `Source(Seq(1,2,3))` diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 5a78405612..080d3fe466 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -77,7 +77,7 @@ abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, * * Extend this `AbstractGraphStageWithMaterializedValue` if you want to provide a materialized value, * represented by the type parameter `M`. If your GraphStage does not need to provide a materialized - * value you can instead extende [[GraphStage]] which materializes a [[NotUsed]] value. + * value you can instead extend [[GraphStage]] which materializes a [[NotUsed]] value. * * A GraphStage consists of a [[Shape]] which describes its input and output ports and a factory function that * creates a [[GraphStageLogic]] which implements the processing logic that ties the ports together. @@ -389,11 +389,16 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: */ private[akka] def interpreter: GraphInterpreter = if (_interpreter == null) - throw new IllegalStateException("not yet initialized: only setHandler is allowed in GraphStageLogic constructor") + throw new IllegalStateException( + "not yet initialized: only setHandler is allowed in GraphStageLogic constructor. To access materializer use Source/Flow/Sink.setup factory") else _interpreter /** * The [[akka.stream.Materializer]] that has set this GraphStage in motion. + * + * Can not be used from a `GraphStage` constructor. Access to materializer is provided by the + * [[akka.stream.scaladsl.Source.setup]], [[akka.stream.scaladsl.Flow.setup]] and [[akka.stream.scaladsl.Sink.setup]] + * and their corresponding Java API factories. */ protected def materializer: Materializer = interpreter.materializer