Add setup operator #26192
This commit is contained in:
parent
7b20b89ce0
commit
18d970fc8e
14 changed files with 574 additions and 4 deletions
17
akka-docs/src/main/paradox/stream/operators/Sink/setup.md
Normal file
17
akka-docs/src/main/paradox/stream/operators/Sink/setup.md
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
# Sink.setup
|
||||
|
||||
Defer the creation of a `Sink` until materialization and access `ActorMaterializer` and `Attributes`
|
||||
|
||||
@ref[Sink operators](../index.md#sink-operators)
|
||||
|
||||
@@@ div { .group-scala }
|
||||
|
||||
## Signature
|
||||
|
||||
@@signature [Sink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #setup }
|
||||
@@@
|
||||
|
||||
## Description
|
||||
|
||||
Typically used when access to materializer is needed to run a different stream during the construction of a sink.
|
||||
Can also be used to access the underlying `ActorSystem` from `ActorMaterializer`.
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
# Source/Flow.setup
|
||||
|
||||
Defer the creation of a `Source/Flow` until materialization and access `ActorMaterializer` and `Attributes`
|
||||
|
||||
@ref[Simple operators](../index.md#simple-operators)
|
||||
|
||||
@@@ div { .group-scala }
|
||||
|
||||
## Signature
|
||||
|
||||
@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #setup }
|
||||
@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #setup }
|
||||
|
||||
@@@
|
||||
|
||||
## Description
|
||||
|
||||
Typically used when access to materializer is needed to run a different stream during the construction of a source/flow.
|
||||
Can also be used to access the underlying `ActorSystem` from `ActorMaterializer`.
|
||||
|
|
@ -64,6 +64,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav
|
|||
|Sink|<a name="queue"></a>@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.|
|
||||
|Sink|<a name="reduce"></a>@ref[reduce](Sink/reduce.md)|Apply a reduction function on the incoming elements and pass the result to the next invocation.|
|
||||
|Sink|<a name="seq"></a>@ref[seq](Sink/seq.md)|Collect values emitted from the stream into a collection.|
|
||||
|Sink|<a name="setup"></a>@ref[setup](Sink/setup.md)|Defer the creation of a `Sink` until materialization and access `ActorMaterializer` and `Attributes`|
|
||||
|Sink|<a name="takelast"></a>@ref[takeLast](Sink/takeLast.md)|Collect the last `n` values emitted from the stream into a collection.|
|
||||
|
||||
## Additional Sink and Source converters
|
||||
|
|
@ -150,6 +151,7 @@ depending on being backpressured by downstream or not.
|
|||
|Source/Flow|<a name="reduce"></a>@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.|
|
||||
|Source/Flow|<a name="scan"></a>@ref[scan](Source-or-Flow/scan.md)|Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value.|
|
||||
|Source/Flow|<a name="scanasync"></a>@ref[scanAsync](Source-or-Flow/scanAsync.md)|Just like `scan` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.|
|
||||
|Source/Flow|<a name="setup"></a>@ref[setup](Source-or-Flow/setup.md)|Defer the creation of a `Source/Flow` until materialization and access `ActorMaterializer` and `Attributes`|
|
||||
|Source/Flow|<a name="sliding"></a>@ref[sliding](Source-or-Flow/sliding.md)|Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.|
|
||||
|Source/Flow|<a name="statefulmapconcat"></a>@ref[statefulMapConcat](Source-or-Flow/statefulMapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.|
|
||||
|Source/Flow|<a name="take"></a>@ref[take](Source-or-Flow/take.md)|Pass `n` incoming elements downstream and then complete|
|
||||
|
|
@ -291,6 +293,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
|
|||
* [fromPublisher](Source/fromPublisher.md)
|
||||
* [fromIterator](Source/fromIterator.md)
|
||||
* [cycle](Source/cycle.md)
|
||||
* [setup](Source-or-Flow/setup.md)
|
||||
* [fromFuture](Source/fromFuture.md)
|
||||
* [fromCompletionStage](Source/fromCompletionStage.md)
|
||||
* [fromFutureSource](Source/fromFutureSource.md)
|
||||
|
|
@ -391,6 +394,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
|
|||
* [fromSinkAndSourceCoupled](Flow/fromSinkAndSourceCoupled.md)
|
||||
* [lazyInitAsync](Flow/lazyInitAsync.md)
|
||||
* [preMaterialize](Sink/preMaterialize.md)
|
||||
* [setup](Sink/setup.md)
|
||||
* [fromSubscriber](Sink/fromSubscriber.md)
|
||||
* [cancelled](Sink/cancelled.md)
|
||||
* [head](Sink/head.md)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.stream.javadsl;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.japi.Pair;
|
||||
import akka.stream.StreamTest;
|
||||
import akka.testkit.AkkaJUnitActorSystemResource;
|
||||
import akka.testkit.AkkaSpec;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class SetupTest extends StreamTest {
|
||||
public SetupTest() {
|
||||
super(actorSystemResource);
|
||||
}
|
||||
|
||||
@ClassRule
|
||||
public static AkkaJUnitActorSystemResource actorSystemResource =
|
||||
new AkkaJUnitActorSystemResource("SetupTest", AkkaSpec.testConf());
|
||||
|
||||
@Test
|
||||
public void shouldExposeMaterializerAndAttributesToSource() throws Exception {
|
||||
final Source<Pair<Boolean, Boolean>, CompletionStage<NotUsed>> source =
|
||||
Source.setup(
|
||||
(mat, attr) ->
|
||||
Source.single(Pair.create(mat.isShutdown(), attr.attributeList().isEmpty())));
|
||||
|
||||
assertEquals(
|
||||
Pair.create(false, false),
|
||||
source.runWith(Sink.head(), materializer).toCompletableFuture().get(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldExposeMaterializerAndAttributesToFlow() throws Exception {
|
||||
final Flow<Object, Pair<Boolean, Boolean>, CompletionStage<NotUsed>> flow =
|
||||
Flow.setup(
|
||||
(mat, attr) ->
|
||||
Flow.fromSinkAndSource(
|
||||
Sink.ignore(),
|
||||
Source.single(Pair.create(mat.isShutdown(), attr.attributeList().isEmpty()))));
|
||||
|
||||
assertEquals(
|
||||
Pair.create(false, false),
|
||||
Source.empty()
|
||||
.via(flow)
|
||||
.runWith(Sink.head(), materializer)
|
||||
.toCompletableFuture()
|
||||
.get(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldExposeMaterializerAndAttributesToSink() throws Exception {
|
||||
Sink<Object, CompletionStage<CompletionStage<Pair<Boolean, Boolean>>>> sink =
|
||||
Sink.setup(
|
||||
(mat, attr) ->
|
||||
Sink.fold(
|
||||
Pair.create(mat.isShutdown(), attr.attributeList().isEmpty()), Keep.left()));
|
||||
|
||||
assertEquals(
|
||||
Pair.create(false, false),
|
||||
Source.empty()
|
||||
.runWith(sink, materializer)
|
||||
.thenCompose(c -> c)
|
||||
.toCompletableFuture()
|
||||
.get(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
|
@ -47,6 +47,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
|
|||
(classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) ::
|
||||
(classOf[scala.Function1[_, Unit]], classOf[akka.japi.function.Procedure[_]]) ::
|
||||
(classOf[scala.Function1[_, _]], classOf[akka.japi.function.Function[_, _]]) ::
|
||||
(classOf[scala.Function2[_, _, _]], classOf[java.util.function.BiFunction[_, _, _]]) :: // setup
|
||||
(classOf[scala.Function1[scala.Function1[_, _], _]], classOf[akka.japi.function.Function2[_, _, _]]) ::
|
||||
(classOf[akka.stream.scaladsl.Source[_, _]], classOf[akka.stream.javadsl.Source[_, _]]) ::
|
||||
(classOf[akka.stream.scaladsl.Sink[_, _]], classOf[akka.stream.javadsl.Sink[_, _]]) ::
|
||||
|
|
|
|||
|
|
@ -0,0 +1,232 @@
|
|||
/*
|
||||
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.testkit.StreamSpec
|
||||
|
||||
class SetupSpec extends StreamSpec {
|
||||
|
||||
implicit val materializer = ActorMaterializer()
|
||||
import system.dispatcher
|
||||
|
||||
"Source.setup" should {
|
||||
|
||||
"expose materializer" in {
|
||||
val source = Source.setup { (mat, _) ⇒
|
||||
Source.single(mat.isShutdown)
|
||||
}
|
||||
|
||||
source.runWith(Sink.head).futureValue shouldBe false
|
||||
}
|
||||
|
||||
"expose attributes" in {
|
||||
val source = Source.setup { (_, attr) ⇒
|
||||
Source.single(attr.attributeList)
|
||||
}
|
||||
|
||||
source.runWith(Sink.head).futureValue should not be empty
|
||||
}
|
||||
|
||||
"propagate materialized value" in {
|
||||
val source = Source.setup { (_, _) ⇒
|
||||
Source.maybe[NotUsed]
|
||||
}
|
||||
|
||||
val (completion, element) = source.toMat(Sink.head)(Keep.both).run()
|
||||
completion.futureValue.trySuccess(Some(NotUsed))
|
||||
element.futureValue shouldBe NotUsed
|
||||
}
|
||||
|
||||
"propagate attributes" in {
|
||||
val source = Source
|
||||
.setup { (_, attr) ⇒
|
||||
Source.single(attr.nameLifted)
|
||||
}
|
||||
.named("my-name")
|
||||
|
||||
source.runWith(Sink.head).futureValue shouldBe Some("my-name")
|
||||
}
|
||||
|
||||
"propagate attributes when nested" in {
|
||||
val source = Source
|
||||
.setup { (_, _) ⇒
|
||||
Source.setup { (_, attr) ⇒
|
||||
Source.single(attr.nameLifted)
|
||||
}
|
||||
}
|
||||
.named("my-name")
|
||||
|
||||
source.runWith(Sink.head).futureValue shouldBe Some("my-name")
|
||||
}
|
||||
|
||||
"handle factory failure" in {
|
||||
val error = new Error("boom")
|
||||
val source = Source.setup { (_, _) ⇒
|
||||
throw error
|
||||
}
|
||||
|
||||
val (materialized, completion) = source.toMat(Sink.head)(Keep.both).run()
|
||||
materialized.failed.futureValue.getCause shouldBe error
|
||||
completion.failed.futureValue.getCause shouldBe error
|
||||
}
|
||||
|
||||
"handle materialization failure" in {
|
||||
val error = new Error("boom")
|
||||
val source = Source.setup { (_, _) ⇒
|
||||
Source.empty.mapMaterializedValue(_ ⇒ throw error)
|
||||
}
|
||||
|
||||
val (materialized, completion) = source.toMat(Sink.head)(Keep.both).run()
|
||||
materialized.failed.futureValue.getCause shouldBe error
|
||||
completion.failed.futureValue.getCause shouldBe error
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"Flow.setup" should {
|
||||
|
||||
"expose materializer" in {
|
||||
val flow = Flow.setup { (mat, _) ⇒
|
||||
Flow.fromSinkAndSource(Sink.ignore, Source.single(mat.isShutdown))
|
||||
}
|
||||
|
||||
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe false
|
||||
}
|
||||
|
||||
"expose attributes" in {
|
||||
val flow = Flow.setup { (_, attr) ⇒
|
||||
Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.attributeList))
|
||||
}
|
||||
|
||||
Source.empty.via(flow).runWith(Sink.head).futureValue should not be empty
|
||||
}
|
||||
|
||||
"propagate materialized value" in {
|
||||
val flow = Flow.setup { (_, _) ⇒
|
||||
Flow.fromSinkAndSourceMat(Sink.ignore, Source.maybe[NotUsed])(Keep.right)
|
||||
}
|
||||
|
||||
val (completion, element) = Source.empty.viaMat(flow)(Keep.right).toMat(Sink.head)(Keep.both).run()
|
||||
completion.futureValue.trySuccess(Some(NotUsed))
|
||||
element.futureValue shouldBe NotUsed
|
||||
}
|
||||
|
||||
"propagate attributes" in {
|
||||
val flow = Flow
|
||||
.setup { (_, attr) ⇒
|
||||
Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.nameLifted))
|
||||
}
|
||||
.named("my-name")
|
||||
|
||||
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("my-name")
|
||||
}
|
||||
|
||||
"propagate attributes when nested" in {
|
||||
val flow = Flow
|
||||
.setup { (_, _) ⇒
|
||||
Flow.setup { (_, attr) ⇒
|
||||
Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.nameLifted))
|
||||
}
|
||||
}
|
||||
.named("my-name")
|
||||
|
||||
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("my-name")
|
||||
}
|
||||
|
||||
"handle factory failure" in {
|
||||
val error = new Error("boom")
|
||||
val flow = Flow.setup { (_, _) ⇒
|
||||
throw error
|
||||
}
|
||||
|
||||
val (materialized, completion) = Source.empty.viaMat(flow)(Keep.right).toMat(Sink.head)(Keep.both).run()
|
||||
materialized.failed.futureValue.getCause shouldBe error
|
||||
completion.failed.futureValue.getCause shouldBe error
|
||||
}
|
||||
|
||||
"handle materialization failure" in {
|
||||
val error = new Error("boom")
|
||||
val flow = Flow.setup { (_, _) ⇒
|
||||
Flow[NotUsed].mapMaterializedValue(_ ⇒ throw error)
|
||||
}
|
||||
|
||||
val (materialized, completion) = Source.empty.viaMat(flow)(Keep.right).toMat(Sink.head)(Keep.both).run()
|
||||
materialized.failed.futureValue.getCause shouldBe error
|
||||
completion.failed.futureValue.getCause shouldBe error
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"Sink.setup" should {
|
||||
|
||||
"expose materializer" in {
|
||||
val sink = Sink.setup { (mat, _) ⇒
|
||||
Sink.fold(mat.isShutdown)(Keep.left)
|
||||
}
|
||||
|
||||
Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe false
|
||||
}
|
||||
|
||||
"expose attributes" in {
|
||||
val sink = Sink.setup { (_, attr) ⇒
|
||||
Sink.fold(attr.attributeList)(Keep.left)
|
||||
}
|
||||
|
||||
Source.empty.runWith(sink).flatMap(identity).futureValue should not be empty
|
||||
}
|
||||
|
||||
"propagate materialized value" in {
|
||||
val sink = Sink.setup { (_, _) ⇒
|
||||
Sink.fold(NotUsed)(Keep.left)
|
||||
}
|
||||
|
||||
Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe NotUsed
|
||||
}
|
||||
|
||||
"propagate attributes" in {
|
||||
val sink = Sink
|
||||
.setup { (_, attr) ⇒
|
||||
Sink.fold(attr.nameLifted)(Keep.left)
|
||||
}
|
||||
.named("my-name")
|
||||
|
||||
Source.empty.runWith(sink).flatMap(identity).futureValue shouldBe Some("my-name")
|
||||
}
|
||||
|
||||
"propagate attributes when nested" in {
|
||||
val sink = Sink
|
||||
.setup { (_, _) ⇒
|
||||
Sink.setup { (_, attr) ⇒
|
||||
Sink.fold(attr.nameLifted)(Keep.left)
|
||||
}
|
||||
}
|
||||
.named("my-name")
|
||||
|
||||
Source.empty.runWith(sink).flatMap(identity).flatMap(identity).futureValue shouldBe Some("my-name")
|
||||
}
|
||||
|
||||
"handle factory failure" in {
|
||||
val error = new Error("boom")
|
||||
val sink = Sink.setup { (_, _) ⇒
|
||||
throw error
|
||||
}
|
||||
|
||||
Source.empty.runWith(sink).failed.futureValue.getCause shouldBe error
|
||||
}
|
||||
|
||||
"handle materialization failure" in {
|
||||
val error = new Error("boom")
|
||||
val sink = Sink.setup { (_, _) ⇒
|
||||
Sink.ignore.mapMaterializedValue(_ ⇒ throw error)
|
||||
}
|
||||
|
||||
Source.empty.runWith(sink).failed.futureValue.getCause shouldBe error
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
165
akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala
Normal file
165
akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala
Normal file
|
|
@ -0,0 +1,165 @@
|
|||
/*
|
||||
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.stream.impl
|
||||
|
||||
import akka.annotation.InternalApi
|
||||
import akka.stream.scaladsl.{ Flow, Keep, Sink, Source }
|
||||
import akka.stream._
|
||||
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler }
|
||||
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
/** Internal Api */
|
||||
@InternalApi private[stream] final class SetupSinkStage[T, M](factory: (ActorMaterializer, Attributes) ⇒ Sink[T, M])
|
||||
extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
|
||||
|
||||
private val in = Inlet[T]("SetupSinkStage.in")
|
||||
override val shape = SinkShape(in)
|
||||
|
||||
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
|
||||
val matPromise = Promise[M]
|
||||
(createStageLogic(matPromise), matPromise.future)
|
||||
}
|
||||
|
||||
private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) {
|
||||
import SetupStage._
|
||||
|
||||
val subOutlet = new SubSourceOutlet[T]("SetupSinkStage")
|
||||
subOutlet.setHandler(delegateToInlet(() ⇒ pull(in), () ⇒ cancel(in)))
|
||||
setHandler(in, delegateToSubOutlet(() ⇒ grab(in), subOutlet))
|
||||
|
||||
override def preStart(): Unit = {
|
||||
try {
|
||||
val sink = factory(ActorMaterializerHelper.downcast(materializer), attributes)
|
||||
|
||||
val mat = Source.fromGraph(subOutlet.source).runWith(sink.withAttributes(attributes))(subFusingMaterializer)
|
||||
matPromise.success(mat)
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
matPromise.failure(ex)
|
||||
throw ex
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/** Internal Api */
|
||||
@InternalApi private[stream] final class SetupFlowStage[T, U, M](
|
||||
factory: (ActorMaterializer, Attributes) ⇒ Flow[T, U, M])
|
||||
extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] {
|
||||
|
||||
private val in = Inlet[T]("SetupFlowStage.in")
|
||||
private val out = Outlet[U]("SetupFlowStage.out")
|
||||
override val shape = FlowShape(in, out)
|
||||
|
||||
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
|
||||
val matPromise = Promise[M]
|
||||
(createStageLogic(matPromise), matPromise.future)
|
||||
}
|
||||
|
||||
private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) {
|
||||
import SetupStage._
|
||||
|
||||
val subInlet = new SubSinkInlet[U]("SetupFlowStage")
|
||||
val subOutlet = new SubSourceOutlet[T]("SetupFlowStage")
|
||||
|
||||
subInlet.setHandler(delegateToOutlet(push(out, _: U), () ⇒ complete(out), fail(out, _), subInlet))
|
||||
subOutlet.setHandler(delegateToInlet(() ⇒ pull(in), () ⇒ cancel(in)))
|
||||
|
||||
setHandler(in, delegateToSubOutlet(() ⇒ grab(in), subOutlet))
|
||||
setHandler(out, delegateToSubInlet(subInlet))
|
||||
|
||||
override def preStart(): Unit = {
|
||||
try {
|
||||
val flow = factory(ActorMaterializerHelper.downcast(materializer), attributes)
|
||||
|
||||
val mat = Source
|
||||
.fromGraph(subOutlet.source)
|
||||
.viaMat(flow.withAttributes(attributes))(Keep.right)
|
||||
.to(Sink.fromGraph(subInlet.sink))
|
||||
.run()(subFusingMaterializer)
|
||||
matPromise.success(mat)
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
matPromise.failure(ex)
|
||||
throw ex
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Internal Api */
|
||||
@InternalApi private[stream] final class SetupSourceStage[T, M](factory: (ActorMaterializer, Attributes) ⇒ Source[T, M])
|
||||
extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] {
|
||||
|
||||
private val out = Outlet[T]("SetupSourceStage.out")
|
||||
override val shape = SourceShape(out)
|
||||
|
||||
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
|
||||
val matPromise = Promise[M]
|
||||
(createStageLogic(matPromise), matPromise.future)
|
||||
}
|
||||
|
||||
private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) {
|
||||
import SetupStage._
|
||||
|
||||
val subInlet = new SubSinkInlet[T]("SetupSourceStage")
|
||||
subInlet.setHandler(delegateToOutlet(push(out, _: T), () ⇒ complete(out), fail(out, _), subInlet))
|
||||
setHandler(out, delegateToSubInlet(subInlet))
|
||||
|
||||
override def preStart(): Unit = {
|
||||
try {
|
||||
val source = factory(ActorMaterializerHelper.downcast(materializer), attributes)
|
||||
|
||||
val mat = source.withAttributes(attributes).to(Sink.fromGraph(subInlet.sink)).run()(subFusingMaterializer)
|
||||
matPromise.success(mat)
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
matPromise.failure(ex)
|
||||
throw ex
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private object SetupStage {
|
||||
def delegateToSubOutlet[T](grab: () ⇒ T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler {
|
||||
override def onPush(): Unit =
|
||||
subOutlet.push(grab())
|
||||
override def onUpstreamFinish(): Unit =
|
||||
subOutlet.complete()
|
||||
override def onUpstreamFailure(ex: Throwable): Unit =
|
||||
subOutlet.fail(ex)
|
||||
}
|
||||
|
||||
def delegateToOutlet[T](
|
||||
push: T ⇒ Unit,
|
||||
complete: () ⇒ Unit,
|
||||
fail: Throwable ⇒ Unit,
|
||||
subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler {
|
||||
override def onPush(): Unit =
|
||||
push(subInlet.grab())
|
||||
override def onUpstreamFinish(): Unit =
|
||||
complete()
|
||||
override def onUpstreamFailure(ex: Throwable): Unit =
|
||||
fail(ex)
|
||||
}
|
||||
|
||||
def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler {
|
||||
override def onPull(): Unit =
|
||||
subInlet.pull()
|
||||
override def onDownstreamFinish(): Unit =
|
||||
subInlet.cancel()
|
||||
}
|
||||
|
||||
def delegateToInlet(pull: () ⇒ Unit, cancel: () ⇒ Unit) = new OutHandler {
|
||||
override def onPull(): Unit =
|
||||
pull()
|
||||
override def onDownstreamFinish(): Unit =
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
|
@ -14,7 +14,7 @@ import org.reactivestreams.Processor
|
|||
import scala.concurrent.duration.FiniteDuration
|
||||
import java.util.{ Comparator, Optional }
|
||||
import java.util.concurrent.CompletionStage
|
||||
import java.util.function.Supplier
|
||||
import java.util.function.{ BiFunction, Supplier }
|
||||
|
||||
import akka.util.JavaDurationConverters._
|
||||
import akka.actor.ActorRef
|
||||
|
|
@ -62,6 +62,15 @@ object Flow {
|
|||
case other => new Flow(scaladsl.Flow.fromGraph(other))
|
||||
}
|
||||
|
||||
/**
|
||||
* Defers the creation of a [[Flow]] until materialization. The `factory` function
|
||||
* exposes [[ActorMaterializer]] which is going to be used during materialization and
|
||||
* [[Attributes]] of the [[Flow]] returned by this method.
|
||||
*/
|
||||
def setup[I, O, M](
|
||||
factory: BiFunction[ActorMaterializer, Attributes, Flow[I, O, M]]): Flow[I, O, CompletionStage[M]] =
|
||||
scaladsl.Flow.setup((mat, attr) ⇒ factory(mat, attr).asScala).mapMaterializedValue(_.toJava).asJava
|
||||
|
||||
/**
|
||||
* Creates a `Flow` from a `Sink` and a `Source` where the Flow's input
|
||||
* will be sent to the Sink and the Flow's output will come from the Source.
|
||||
|
|
|
|||
|
|
@ -18,6 +18,8 @@ import scala.compat.java8.OptionConverters._
|
|||
import scala.concurrent.ExecutionContext
|
||||
import scala.util.Try
|
||||
import java.util.concurrent.CompletionStage
|
||||
import java.util.function.BiFunction
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
import scala.compat.java8.FutureConverters._
|
||||
|
|
@ -285,6 +287,14 @@ object Sink {
|
|||
case other => new Sink(scaladsl.Sink.fromGraph(other))
|
||||
}
|
||||
|
||||
/**
|
||||
* Defers the creation of a [[Sink]] until materialization. The `factory` function
|
||||
* exposes [[ActorMaterializer]] which is going to be used during materialization and
|
||||
* [[Attributes]] of the [[Sink]] returned by this method.
|
||||
*/
|
||||
def setup[T, M](factory: BiFunction[ActorMaterializer, Attributes, Sink[T, M]]): Sink[T, CompletionStage[M]] =
|
||||
scaladsl.Sink.setup((mat, attr) ⇒ factory(mat, attr).asScala).mapMaterializedValue(_.toJava).asJava
|
||||
|
||||
/**
|
||||
* Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ import scala.concurrent.{ Future, Promise }
|
|||
import scala.compat.java8.OptionConverters._
|
||||
import java.util.concurrent.CompletionStage
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.function.Supplier
|
||||
import java.util.function.{ BiFunction, Supplier }
|
||||
|
||||
import akka.util.unused
|
||||
import com.github.ghik.silencer.silent
|
||||
|
|
@ -359,6 +359,14 @@ object Source {
|
|||
case other => new Source(scaladsl.Source.fromGraph(other))
|
||||
}
|
||||
|
||||
/**
|
||||
* Defers the creation of a [[Source]] until materialization. The `factory` function
|
||||
* exposes [[ActorMaterializer]] which is going to be used during materialization and
|
||||
* [[Attributes]] of the [[Source]] returned by this method.
|
||||
*/
|
||||
def setup[T, M](factory: BiFunction[ActorMaterializer, Attributes, Source[T, M]]): Source[T, CompletionStage[M]] =
|
||||
scaladsl.Source.setup((mat, attr) ⇒ factory(mat, attr).asScala).mapMaterializedValue(_.toJava).asJava
|
||||
|
||||
/**
|
||||
* Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import akka.stream.impl.{
|
|||
fusing,
|
||||
LinearTraversalBuilder,
|
||||
ProcessorModule,
|
||||
SetupFlowStage,
|
||||
SubFlowImpl,
|
||||
Throttle,
|
||||
Timers,
|
||||
|
|
@ -387,6 +388,14 @@ object Flow {
|
|||
case _ => new Flow(LinearTraversalBuilder.fromBuilder(g.traversalBuilder, g.shape, Keep.right), g.shape)
|
||||
}
|
||||
|
||||
/**
|
||||
* Defers the creation of a [[Flow]] until materialization. The `factory` function
|
||||
* exposes [[ActorMaterializer]] which is going to be used during materialization and
|
||||
* [[Attributes]] of the [[Flow]] returned by this method.
|
||||
*/
|
||||
def setup[T, U, M](factory: (ActorMaterializer, Attributes) ⇒ Flow[T, U, M]): Flow[T, U, Future[M]] =
|
||||
Flow.fromGraph(new SetupFlowStage(factory))
|
||||
|
||||
/**
|
||||
* Creates a `Flow` from a `Sink` and a `Source` where the Flow's input
|
||||
* will be sent to the Sink and the Flow's output will come from the Source.
|
||||
|
|
|
|||
|
|
@ -145,6 +145,14 @@ object Sink {
|
|||
new Sink(LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right), other.shape)
|
||||
}
|
||||
|
||||
/**
|
||||
* Defers the creation of a [[Sink]] until materialization. The `factory` function
|
||||
* exposes [[ActorMaterializer]] which is going to be used during materialization and
|
||||
* [[Attributes]] of the [[Sink]] returned by this method.
|
||||
*/
|
||||
def setup[T, M](factory: (ActorMaterializer, Attributes) ⇒ Sink[T, M]): Sink[T, Future[M]] =
|
||||
Sink.fromGraph(new SetupSinkStage(factory))
|
||||
|
||||
/**
|
||||
* Helper to create [[Sink]] from `Subscriber`.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -295,6 +295,14 @@ object Source {
|
|||
new Source(LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right), other.shape)
|
||||
}
|
||||
|
||||
/**
|
||||
* Defers the creation of a [[Source]] until materialization. The `factory` function
|
||||
* exposes [[ActorMaterializer]] which is going to be used during materialization and
|
||||
* [[Attributes]] of the [[Source]] returned by this method.
|
||||
*/
|
||||
def setup[T, M](factory: (ActorMaterializer, Attributes) ⇒ Source[T, M]): Source[T, Future[M]] =
|
||||
Source.fromGraph(new SetupSourceStage(factory))
|
||||
|
||||
/**
|
||||
* Helper to create [[Source]] from `Iterable`.
|
||||
* Example usage: `Source(Seq(1,2,3))`
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S,
|
|||
*
|
||||
* Extend this `AbstractGraphStageWithMaterializedValue` if you want to provide a materialized value,
|
||||
* represented by the type parameter `M`. If your GraphStage does not need to provide a materialized
|
||||
* value you can instead extende [[GraphStage]] which materializes a [[NotUsed]] value.
|
||||
* value you can instead extend [[GraphStage]] which materializes a [[NotUsed]] value.
|
||||
*
|
||||
* A GraphStage consists of a [[Shape]] which describes its input and output ports and a factory function that
|
||||
* creates a [[GraphStageLogic]] which implements the processing logic that ties the ports together.
|
||||
|
|
@ -389,11 +389,16 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
*/
|
||||
private[akka] def interpreter: GraphInterpreter =
|
||||
if (_interpreter == null)
|
||||
throw new IllegalStateException("not yet initialized: only setHandler is allowed in GraphStageLogic constructor")
|
||||
throw new IllegalStateException(
|
||||
"not yet initialized: only setHandler is allowed in GraphStageLogic constructor. To access materializer use Source/Flow/Sink.setup factory")
|
||||
else _interpreter
|
||||
|
||||
/**
|
||||
* The [[akka.stream.Materializer]] that has set this GraphStage in motion.
|
||||
*
|
||||
* Can not be used from a `GraphStage` constructor. Access to materializer is provided by the
|
||||
* [[akka.stream.scaladsl.Source.setup]], [[akka.stream.scaladsl.Flow.setup]] and [[akka.stream.scaladsl.Sink.setup]]
|
||||
* and their corresponding Java API factories.
|
||||
*/
|
||||
protected def materializer: Materializer = interpreter.materializer
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue