diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala index 5e7900cbb0..a5108e38ef 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala @@ -1,143 +1,142 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package docs.stream - -import akka.actor.Cancellable -import akka.stream.scaladsl._ -import akka.stream.testkit.AkkaSpec - -import concurrent.Future - -// TODO replace ⇒ with => and disable this intellij setting -class FlowDocSpec extends AkkaSpec { - - implicit val ec = system.dispatcher - - //#imports - import akka.stream.FlowMaterializer - //#imports - - implicit val mat = FlowMaterializer() - - "source is immutable" in { - //#source-immutable - val source = Source(1 to 10) - source.map(_ ⇒ 0) // has no effect on source, since it's immutable - source.runWith(Sink.fold(0)(_ + _)) // 55 - - val zeroes = source.map(_ ⇒ 0) // returns new Source[Int], with `map()` appended - zeroes.runWith(Sink.fold(0)(_ + _)) // 0 - //#source-immutable - } - - "materialization in steps" in { - //#materialization-in-steps - val source = Source(1 to 10) - val sink = Sink.fold[Int, Int](0)(_ + _) - - // connect the Source to the Sink, obtaining a RunnableFlow - val runnable: RunnableFlow = source.to(sink) - - // materialize the flow - val materialized: MaterializedMap = runnable.run() - - // get the materialized value of the FoldSink - val sum: Future[Int] = materialized.get(sink) - - //#materialization-in-steps - } - - "materialization runWith" in { - //#materialization-runWith - val source = Source(1 to 10) - val sink = Sink.fold[Int, Int](0)(_ + _) - - // materialize the flow, getting the Sinks materialized value - val sum: Future[Int] = source.runWith(sink) - //#materialization-runWith - } - - "materializedMap is unique" in { - //#stream-reuse - // connect the Source to the Sink, obtaining a RunnableFlow - val sink = Sink.fold[Int, Int](0)(_ + _) - val runnable: RunnableFlow = Source(1 to 10).to(sink) - - // get the materialized value of the FoldSink - val sum1: Future[Int] = runnable.run().get(sink) - val sum2: Future[Int] = runnable.run().get(sink) - - // sum1 and sum2 are different Futures! - //#stream-reuse - } - - "compound source cannot be used as key" in { - //#compound-source-is-not-keyed-runWith - import scala.concurrent.duration._ - case object Tick - - val timer = Source(initialDelay = 1.second, interval = 1.seconds, tick = () ⇒ Tick) - - val timerCancel: Cancellable = Sink.ignore.runWith(timer) - timerCancel.cancel() - - val timerMap = timer.map(tick ⇒ "tick") - val _ = Sink.ignore.runWith(timerMap) // WRONG: returned type is not the timers Cancellable! - //#compound-source-is-not-keyed-runWith - - //#compound-source-is-not-keyed-run - // retain the materialized map, in order to retrieve the timers Cancellable - val materialized = timerMap.to(Sink.ignore).run() - val timerCancellable = materialized.get(timer) - timerCancellable.cancel() - //#compound-source-is-not-keyed-run - } - - "creating sources, sinks" in { - //#source-sink - // Create a source from an Iterable - Source(List(1, 2, 3)) - - // Create a source form a Future - Source(Future.successful("Hello Streams!")) - - // Create a source from a single element - Source.single("only one element") - - // an empty source - Source.empty - - // Sink that folds over the stream and returns a Future - // of the final result in the MaterializedMap - Sink.fold(0)(_ + _) - - // Sink that returns a Future in the MaterializedMap, - // containing the first element of the stream - Sink.head - - // A Sink that consumes a stream without doing anything with the elements - Sink.ignore - - // A Sink that executes a side-effecting call for every element of the stream - Sink.foreach((elem) => println(elem)) - //#source-sink - } - - "various ways of connecting source, sink, flow" in { - //#flow-connecting - // Explicitly creating and wiring up a Source, Sink and Flow - Source(1 to 6).via(Flow[Int].map(_ * 2)).to(Sink.foreach(println(_))) - - // Starting from a Source - val source = Source(1 to 6).map(_ * 2) - source.to(Sink.foreach(println(_))) - - // Starting from a Sink - val sink: Sink[Int] = Flow[Int].map(_ * 2).to(Sink.foreach(println(_))) - Source(1 to 6).to(sink) - - - //#flow-connecting - } -} +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package docs.stream + +import akka.actor.Cancellable +import akka.stream.scaladsl._ +import akka.stream.testkit.AkkaSpec + +import concurrent.Future + +// TODO replace ⇒ with => and disable this intellij setting +class FlowDocSpec extends AkkaSpec { + + implicit val ec = system.dispatcher + + //#imports + import akka.stream.FlowMaterializer + //#imports + + implicit val mat = FlowMaterializer() + + "source is immutable" in { + //#source-immutable + val source = Source(1 to 10) + source.map(_ ⇒ 0) // has no effect on source, since it's immutable + source.runWith(Sink.fold(0)(_ + _)) // 55 + + val zeroes = source.map(_ ⇒ 0) // returns new Source[Int], with `map()` appended + zeroes.runWith(Sink.fold(0)(_ + _)) // 0 + //#source-immutable + } + + "materialization in steps" in { + //#materialization-in-steps + val source = Source(1 to 10) + val sink = Sink.fold[Int, Int](0)(_ + _) + + // connect the Source to the Sink, obtaining a RunnableFlow + val runnable: RunnableFlow = source.to(sink) + + // materialize the flow + val materialized: MaterializedMap = runnable.run() + + // get the materialized value of the FoldSink + val sum: Future[Int] = materialized.get(sink) + + //#materialization-in-steps + } + + "materialization runWith" in { + //#materialization-runWith + val source = Source(1 to 10) + val sink = Sink.fold[Int, Int](0)(_ + _) + + // materialize the flow, getting the Sinks materialized value + val sum: Future[Int] = source.runWith(sink) + //#materialization-runWith + } + + "materializedMap is unique" in { + //#stream-reuse + // connect the Source to the Sink, obtaining a RunnableFlow + val sink = Sink.fold[Int, Int](0)(_ + _) + val runnable: RunnableFlow = Source(1 to 10).to(sink) + + // get the materialized value of the FoldSink + val sum1: Future[Int] = runnable.run().get(sink) + val sum2: Future[Int] = runnable.run().get(sink) + + // sum1 and sum2 are different Futures! + //#stream-reuse + } + + "compound source cannot be used as key" in { + //#compound-source-is-not-keyed-runWith + import scala.concurrent.duration._ + case object Tick + + val timer = Source(initialDelay = 1.second, interval = 1.seconds, tick = () ⇒ Tick) + + val timerCancel: Cancellable = Sink.ignore.runWith(timer) + timerCancel.cancel() + + val timerMap = timer.map(tick ⇒ "tick") + val _ = Sink.ignore.runWith(timerMap) // WRONG: returned type is not the timers Cancellable! + //#compound-source-is-not-keyed-runWith + + //#compound-source-is-not-keyed-run + // retain the materialized map, in order to retrieve the timers Cancellable + val materialized = timerMap.to(Sink.ignore).run() + val timerCancellable = materialized.get(timer) + timerCancellable.cancel() + //#compound-source-is-not-keyed-run + } + + "creating sources, sinks" in { + //#source-sink + // Create a source from an Iterable + Source(List(1, 2, 3)) + + // Create a source form a Future + Source(Future.successful("Hello Streams!")) + + // Create a source from a single element + Source.single("only one element") + + // an empty source + Source.empty + + // Sink that folds over the stream and returns a Future + // of the final result in the MaterializedMap + Sink.fold[Int, Int](0)(_ + _) + + // Sink that returns a Future in the MaterializedMap, + // containing the first element of the stream + Sink.head + + // A Sink that consumes a stream without doing anything with the elements + Sink.ignore + + // A Sink that executes a side-effecting call for every element of the stream + Sink.foreach[String](println(_)) + //#source-sink + } + + "various ways of connecting source, sink, flow" in { + //#flow-connecting + // Explicitly creating and wiring up a Source, Sink and Flow + Source(1 to 6).via(Flow[Int].map(_ * 2)).to(Sink.foreach(println(_))) + + // Starting from a Source + val source = Source(1 to 6).map(_ * 2) + source.to(Sink.foreach(println(_))) + + // Starting from a Sink + val sink: Sink[Int] = Flow[Int].map(_ * 2).to(Sink.foreach(println(_))) + Source(1 to 6).to(sink) + + //#flow-connecting + } +} diff --git a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst index e86eb426b9..f767bc93d7 100644 --- a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst @@ -28,24 +28,24 @@ Back-pressure In the context of Akka Streams back-pressure is always understood as *non-blocking* and *asynchronous* Processing Stage The common name for all building blocks that build up a Flow or FlowGraph. - Examples of a processing stage would be ``Stage`` (:class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage`, - :class:`DetachedStage`), operations like ``map()``, ``filter()`` and graph junctions like ``Merge`` or ``Broadcast``. + Examples of a processing stage would be operations like ``map()``, ``filter()``, stages added by ``transform()`` + (:class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage`) and graph junctions like ``Merge`` or ``Broadcast``. Defining and running streams ---------------------------- Linear processing pipelines can be expressed in Akka Streams using the following three core abstractions: Source - A processing stage with *exactly one output*, emitting data elements whenever downstream processing elements are + A processing stage with *exactly one output*, emitting data elements whenever downstream processing stages are ready to receive them. Sink A processing stage with *exactly one input*, requesting and accepting data elements possibly slowing down the upstream producer of elements Flow - A processing stage which has *exactly one input and output*, which connects its up- and downstreams by (usually) + A processing stage which has *exactly one input and output*, which connects its up- and downstreams by transforming the data elements flowing through it. RunnableFlow - A Flow with has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``. + A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``. It is possible to attach a ``Flow`` to a ``Source`` resulting in a composite source, and it is also possible to prepend a ``Flow`` to a ``Sink`` to get a new sink. After a stream is properly terminated by having both a source and a sink, @@ -55,13 +55,13 @@ It is important to remember that even after constructing the ``RunnableFlow`` by different processing stages, no data will flow through it until it is materialized. Materialization is the process of allocating all resources needed to run the computation described by a Flow (in Akka Streams this will often involve starting up Actors). Thanks to Flows being simply a description of the processing pipeline they are *immutable, -thread-safe, and freely shareable*, which means that it is for example safe to share send between actors, to have +thread-safe, and freely shareable*, which means that it is for example safe to share and send them between actors, to have one actor prepare the work, and then have it be materialized at some completely different place in the code. .. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-in-steps After running (materializing) the ``RunnableFlow`` we get a special container object, the ``MaterializedMap``. Both -sources and sinks are able to put specific object into this map. Whether they put something in or not is implementation +sources and sinks are able to put specific objects into this map. Whether they put something in or not is implementation dependent. For example a ``FoldSink`` will make a ``Future`` available in this map which will represent the result of the folding process over the stream. In general, a stream can expose multiple materialized values, but it is quite common to be interested in only the value of the Source or the Sink in the stream. For this reason @@ -109,10 +109,12 @@ There are various ways to wire up different parts of a stream, the following exa Back-pressure explained ----------------------- -Akka Streams implements an asynchronous non-blocking back-pressure protocol standardised by the Reactive Streams +Akka Streams implements an asynchronous non-blocking back-pressure protocol standardised by the `Reactive Streams`_ specification, which Akka is a founding member of. -The user of the library does not have to write any explicit back-pressure handling code - it is built in +.. _Reactive Streams: http://reactive-streams.org/ + +The user of the library does not have to write any explicit back-pressure handling code — it is built in and dealt with automatically by all of the provided Akka Streams processing stages. It is possible however to add explicit buffer stages with overflow strategies that can influence the behaviour of the stream. This is especially important in complex processing graphs which may even sometimes even contain loops (which *must* be treated with very special