diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/preMaterialize.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/preMaterialize.md new file mode 100644 index 0000000000..dfa3a8496a --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/preMaterialize.md @@ -0,0 +1,16 @@ +# preMaterialize + +Materializes this Graph, immediately returning (1) its materialized value, and (2) a new pre-materialized Graph. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Source.preMaterialize](Source) { scala="#preMaterialize()(implicitmaterializer:akka.stream.Materializer):(Mat,akka.stream.scaladsl.Source[Out,akka.NotUsed])" java="#preMaterialize(akka.actor.ClassicActorSystemProvider)" java="#preMaterialize(akka.stream.Materializer)" } +@apidoc[Flow.preMaterialize](Flow) { scala="#preMaterialize()(implicitmaterializer:akka.stream.Materializer):(Mat,akka.stream.scaladsl.Flow[Int,Out,akka.NotUsed])" java="#preMaterialize(akka.actor.ClassicActorSystemProvider)" java="#preMaterialize(akka.stream.Materializer)" } + + +## Description + +Materializes this Graph, immediately returning (1) its materialized value, and (2) a new pre-materialized Graph. + diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 5db234aca9..4c555bca75 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -100,8 +100,8 @@ For example, following snippet will fall with timeout exception: ```scala ... .toMat(StreamConverters.asInputStream().mapMaterializedValue { inputStream => - inputStream.read() // this could block forever - ... + inputStream.read() // this could block forever + ... }).run() ``` @@ -166,6 +166,7 @@ depending on being backpressured by downstream or not. |Source/Flow|@ref[logWithMarker](Source-or-Flow/logWithMarker.md)|Log elements flowing through the stream as well as completion and erroring.| |Source/Flow|@ref[map](Source-or-Flow/map.md)|Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.| |Source/Flow|@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.| +|Source/Flow|@ref[preMaterialize](Source-or-Flow/preMaterialize.md)|Materializes this Graph, immediately returning (1) its materialized value, and (2) a new pre-materialized Graph.| |Source/Flow|@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.| |Source/Flow|@ref[scan](Source-or-Flow/scan.md)|Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value.| |Source/Flow|@ref[scanAsync](Source-or-Flow/scanAsync.md)|Just like @ref[`scan`](Source-or-Flow/./scan.md) but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.| diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 82383d707c..b93cebbe60 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -5,23 +5,24 @@ package akka.stream.scaladsl import java.util.concurrent.ThreadLocalRandom - import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.control.NoStackTrace - import scala.annotation.nowarn import com.typesafe.config.ConfigFactory import org.reactivestreams.{ Publisher, Subscriber } - import akka.NotUsed import akka.stream._ import akka.stream.impl._ +import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler } import akka.stream.testkit._ import akka.stream.testkit.scaladsl.StreamTestKit._ +import akka.stream.testkit.scaladsl.{ TestSink, TestSource } import akka.testkit.TestDuration +import java.util.concurrent.atomic.AtomicLong + object FlowSpec { class Fruit extends Serializable class Apple extends Fruit @@ -521,6 +522,81 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.re } } + /** + * Count elements that passing by this flow + * */ + private class CounterFlow[T] extends GraphStageWithMaterializedValue[FlowShape[T, T], AtomicLong] { + private val in = Inlet[T]("ElementCounterFlow.in") + private val out = Outlet[T]("ElementCounterFlow.out") + val shape = FlowShape(in, out) + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, AtomicLong) = { + val counter = new AtomicLong() + val logic = new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = { + counter.incrementAndGet() + push(out, grab(in)) + } + override def onPull(): Unit = pull(in) + setHandlers(in, out, this) + } + (logic, counter) + } + } + + "Flow pre-materialization" must { + "passing elements to downstream" in { + val (counter, flow) = Flow.fromGraph(new CounterFlow[Int]).preMaterialize() + val probe = Source(List(1, 2, 3)).via(flow).reduce((a, b) => a + b).runWith(TestSink.probe[Int]) + probe.request(1) + probe.expectNext(6) + probe.request(1) + probe.expectComplete() + counter.get() should (be(3)) + } + + "propagate failures to downstream" in { + val (queue, source) = Source.queue[Int](1).preMaterialize() + val (counter, flow) = Flow.fromGraph(new CounterFlow[Int]).preMaterialize() + val probe = source.via(flow).runWith(TestSink.probe[Int]) + queue.offer(1) + probe.request(1) + probe.expectNext(1) + queue.fail(new RuntimeException("boom")) + probe.expectError().getMessage should ===("boom") + counter.get() should (be(1)) + } + + "disallow materialize multiple times" in { + val (counter, flow) = Flow.fromGraph(new CounterFlow[Int]).preMaterialize() + val probe1 = Source(List(1, 2, 3)).via(flow).reduce((a, b) => a + b).runWith(TestSink.probe[Int]) + probe1.request(1) + probe1.expectNext(6) + probe1.request(1) + probe1.expectComplete() + counter.get() should (be(3)) + val probe2 = Source(List(1, 2, 3)).via(flow).reduce((a, b) => a + b).runWith(TestSink.probe[Int]) + probe2.request(1) + probe2.expectError() + } + + "propagate failure to downstream when materializing" in { + a[RuntimeException] shouldBe thrownBy( + Flow + .fromGraph(new CounterFlow[Int]) + .mapMaterializedValue(_ => throw new RuntimeException("boom")) + .preMaterialize()) + } + + "propagate cancel to upstream" in { + val (counter, flow) = Flow.fromGraph(new CounterFlow[Int]).preMaterialize() + val probSource = TestSource.probe[Int].via(flow).toMat(Sink.cancelled[Int])(Keep.left).run() + probSource.ensureSubscription() + probSource.expectCancellation() + counter.get() should (be(0)) + } + + } + object TestException extends RuntimeException with NoStackTrace } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 2ae85e0c78..b12813cbcb 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -393,6 +393,23 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Flow[In, Out, Mat2] = new Flow(delegate.mapMaterializedValue(f.apply _)) + /** + * Materializes this [[Flow]], immediately returning (1) its materialized value, and (2) a newly materialized [[Flow]]. + */ + def preMaterialize( + systemProvider: ClassicActorSystemProvider): akka.japi.Pair[Mat @uncheckedVariance, Flow[In, Out, NotUsed]] = { + preMaterialize(SystemMaterializer(systemProvider.classicSystem).materializer) + } + + /** + * Materializes this [[Flow]], immediately returning (1) its materialized value, and (2) a newly materialized [[Flow]]. + * The returned flow is partial materialized and do not support multiple times materialization. + */ + def preMaterialize(materializer: Materializer): akka.japi.Pair[Mat @uncheckedVariance, Flow[In, Out, NotUsed]] = { + val (mat, flow) = delegate.preMaterialize()(materializer) + akka.japi.Pair(mat, flow.asJava) + } + /** * Transform this [[Flow]] by appending the given processing steps. * {{{ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 16ea4cb0a2..8a8bad3fe5 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -145,6 +145,16 @@ final class Flow[-In, +Out, +Mat]( override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] = new Flow(traversalBuilder.transformMat(f), shape) + /** + * Materializes this [[Flow]], immediately returning (1) its materialized value, and (2) a newly materialized [[Flow]]. + * The returned flow is partial materialized and do not support multiple times materialization. + */ + def preMaterialize()(implicit materializer: Materializer): (Mat, ReprMat[Out, NotUsed]) = { + val ((sub, mat), pub) = + Source.asSubscriber[In].viaMat(this)(Keep.both).toMat(Sink.asPublisher(false))(Keep.both).run() + (mat, Flow.fromSinkAndSource(Sink.fromSubscriber(sub), Source.fromPublisher(pub))) + } + /** * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]]. * {{{ diff --git a/project/StreamOperatorsIndexGenerator.scala b/project/StreamOperatorsIndexGenerator.scala index df50381be3..dbe894c2b0 100644 --- a/project/StreamOperatorsIndexGenerator.scala +++ b/project/StreamOperatorsIndexGenerator.scala @@ -74,7 +74,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { // FIXME document these methods as well val pendingTestCases = Map( - "Source" -> (pendingSourceOrFlow ++ Seq("preMaterialize")), + "Source" -> (pendingSourceOrFlow), "Flow" -> (pendingSourceOrFlow ++ Seq( "lazyInit", "fromProcessorMat",