diff --git a/akka-docs-dev/rst/images/simple-graph-example.png b/akka-docs-dev/rst/images/simple-graph-example.png new file mode 100644 index 0000000000..a464fda413 Binary files /dev/null and b/akka-docs-dev/rst/images/simple-graph-example.png differ diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala new file mode 100644 index 0000000000..847eb50666 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package docs.stream + +import akka.stream.FlowMaterializer +import akka.stream.scaladsl.Broadcast +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.FlowGraph +import akka.stream.scaladsl.FlowGraphImplicits +import akka.stream.scaladsl.MaterializedMap +import akka.stream.scaladsl.Merge +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.Zip +import akka.stream.testkit.AkkaSpec + +// TODO replace ⇒ with => and disable this intellij setting +class FlowGraphDocSpec extends AkkaSpec { + + implicit val ec = system.dispatcher + + implicit val mat = FlowMaterializer() + + "build simple graph" in { + //format: OFF + //#simple-flow-graph + val g = FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + val in = Source(1 to 10) + val out = Sink.ignore + + val broadcast = Broadcast[Int] + val merge = Merge[Int] + + val f1 = Flow[Int].map(_ + 10) + val f3 = Flow[Int].map(_.toString) + val f2 = Flow[Int].map(_ + 20) + + in ~> broadcast ~> f1 ~> merge + broadcast ~> f2 ~> merge ~> f3 ~> out + } + //#simple-flow-graph + //format: ON + + //#simple-graph-run + val map: MaterializedMap = g.run() + //#simple-graph-run + } + + "build simple graph without implicits" in { + //#simple-flow-graph-no-implicits + val g = FlowGraph { b ⇒ + val in = Source(1 to 10) + val out = Sink.ignore + + val broadcast = Broadcast[Int] + val merge = Merge[Int] + + val f1 = Flow[Int].map(_ + 10) + val f3 = Flow[Int].map(_.toString) + val f2 = Flow[Int].map(_ + 20) + + b.addEdge(in, broadcast) + .addEdge(broadcast, f1, merge) + .addEdge(broadcast, f2, merge) + .addEdge(merge, f3, out) + } + //#simple-flow-graph-no-implicits + + g.run() + } + + "flow connection errors" in { + intercept[IllegalArgumentException] { + //#simple-graph + FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + val source1 = Source(1 to 10) + val source2 = Source(1 to 10) + + val zip = Zip[Int, Int] + + source1 ~> zip.left + source2 ~> zip.right + // unconnected zip.out (!) => "must have at least 1 outgoing edge" + } + //#simple-graph + }.getMessage should include("must have at least 1 outgoing edge") + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamDocSpec.scala new file mode 100644 index 0000000000..91b86ffd42 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamDocSpec.scala @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package docs.stream + +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.FlowGraph +import akka.stream.scaladsl.FlowGraphImplicits +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.Zip +import akka.stream.testkit.AkkaSpec + +// TODO replace ⇒ with => and disable this intellij setting +class StreamDocSpec extends AkkaSpec { + + implicit val ec = system.dispatcher + + //#imports + import akka.stream.FlowMaterializer + import akka.stream.scaladsl.Broadcast + //#imports + + implicit val mat = FlowMaterializer() + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala index 1fa867875f..3c4de39310 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala @@ -3,26 +3,142 @@ */ package docs.stream -//#imports - import akka.stream.FlowMaterializer +import akka.stream.scaladsl.Broadcast +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.FlowGraph +import akka.stream.scaladsl.FlowGraphImplicits import akka.stream.scaladsl.PartialFlowGraph - -//#imports - +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.UndefinedSink +import akka.stream.scaladsl.UndefinedSource +import akka.stream.scaladsl.Zip +import akka.stream.scaladsl.ZipWith import akka.stream.testkit.AkkaSpec +import scala.concurrent.Await +import scala.concurrent.Future +import scala.concurrent.duration._ + // TODO replace ⇒ with => and disable this intellij setting class StreamPartialFlowGraphDocSpec extends AkkaSpec { + implicit val ec = system.dispatcher + implicit val mat = FlowMaterializer() "build with open ports" in { + // format: OFF //#simple-partial-flow-graph - PartialFlowGraph { implicit b ⇒ + // defined outside as they will be used by different FlowGraphs + // 1) first by the PartialFlowGraph to mark its open input and output ports + // 2) then by the assembling FlowGraph which will attach real sinks and sources to them + val in1 = UndefinedSource[Int] + val in2 = UndefinedSource[Int] + val in3 = UndefinedSource[Int] + val out = UndefinedSink[Int] + val pickMaxOfThree: PartialFlowGraph = PartialFlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + + val zip1 = ZipWith[Int, Int, Int](math.max _) + val zip2 = ZipWith[Int, Int, Int](math.max _) + + in1 ~> zip1.left + in2 ~> zip1.right + zip1.out ~> zip2.left + in3 ~> zip2.right + zip2.out ~> out } + // format: ON + + val resultSink = Sink.head[Int] + + val g = FlowGraph { b ⇒ + // import the partial flow graph explicitly + b.importPartialFlowGraph(pickMaxOfThree) + + b.attachSource(in1, Source.single(1)) + b.attachSource(in2, Source.single(2)) + b.attachSource(in3, Source.single(3)) + b.attachSink(out, resultSink) + } + + val materialized = g.run() + val max: Future[Int] = materialized.get(resultSink) + Await.result(max, 300.millis) should equal(3) //#simple-partial-flow-graph + + val g2 = + //#simple-partial-flow-graph-import-shorthand + FlowGraph(pickMaxOfThree) { b ⇒ + b.attachSource(in1, Source.single(1)) + b.attachSource(in2, Source.single(2)) + b.attachSource(in3, Source.single(3)) + b.attachSink(out, resultSink) + } + //#simple-partial-flow-graph-import-shorthand + val materialized2 = g.run() + val max2: Future[Int] = materialized2.get(resultSink) + Await.result(max2, 300.millis) should equal(3) } + "build source from partial flow graph" in { + //#source-from-partial-flow-graph + val pairs: Source[(Int, Int)] = Source() { implicit b ⇒ + import FlowGraphImplicits._ + + // prepare graph elements + val undefinedSink = UndefinedSink[(Int, Int)] + val zip = Zip[Int, Int] + def ints = Source(() ⇒ Iterator.from(1)) + + // connect the graph + ints ~> Flow[Int].filter(_ % 2 != 0) ~> zip.left + ints ~> Flow[Int].filter(_ % 2 == 0) ~> zip.right + zip.out ~> undefinedSink + + // expose undefined sink + undefinedSink + } + + val firstPair: Future[(Int, Int)] = pairs.runWith(Sink.head) + Await.result(firstPair, 300.millis) should equal(1 → 2) + //#source-from-partial-flow-graph + } + + "build flow from partial flow graph" in { + //#flow-from-partial-flow-graph + val pairUpWithToString = Flow() { implicit b ⇒ + import FlowGraphImplicits._ + + // prepare graph elements + val undefinedSource = UndefinedSource[Int] + val undefinedSink = UndefinedSink[(Int, String)] + + val broadcast = Broadcast[Int] + val zip = Zip[Int, String] + + // connect the graph + undefinedSource ~> broadcast + broadcast ~> Flow[Int].map(identity) ~> zip.left + broadcast ~> Flow[Int].map(_.toString) ~> zip.right + zip.out ~> undefinedSink + + // expose undefined ports + (undefinedSource, undefinedSink) + } + + //#flow-from-partial-flow-graph + + // format: OFF + val (_, matSink: Future[(Int, String)]) = + //#flow-from-partial-flow-graph + pairUpWithToString.runWith(Source(List(1)), Sink.head) + //#flow-from-partial-flow-graph + // format: ON + + Await.result(matSink, 300.millis) should equal(1 → "1") + } } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala index 87faa3b1bf..330de1ac3a 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -101,14 +101,14 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { //#hashtags-mapConcat } - "simple broadcast" in { - trait X { - //#flow-graph-broadcast - val writeAuthors: Sink[Author] = ??? - val writeHashtags: Sink[Hashtag] = ??? - //#flow-graph-broadcast - } + trait HiddenDefinitions { + //#flow-graph-broadcast + val writeAuthors: Sink[Author] = ??? + val writeHashtags: Sink[Hashtag] = ??? + //#flow-graph-broadcast + } + "simple broadcast" in { val writeAuthors: Sink[Author] = Sink.ignore val writeHashtags: Sink[Hashtag] = Sink.ignore @@ -193,7 +193,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { // the sumSink materialized two different futures // we use it as key to get the materialized value out of the materialized map val morningTweetsCount: Future[Int] = morningMaterialized.get(sumSink) - val eveningTweetsCount: Future[Int] = morningMaterialized.get(sumSink) + val eveningTweetsCount: Future[Int] = eveningMaterialized.get(sumSink) //#tweets-runnable-flow-materialized-twice val map: MaterializedMap = counterRunnableFlow.run() @@ -202,12 +202,6 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { sum.map { c ⇒ println(s"Total tweets processed: $c") } //#tweets-fold-count - - new AnyRef { - //#tweets-fold-count-oneline - val sum: Future[Int] = tweets.map(t ⇒ 1).runWith(sumSink) - //#tweets-fold-count-oneline - } } } diff --git a/akka-docs-dev/rst/scala/stream.rst b/akka-docs-dev/rst/scala/stream.rst index 7511cc0e71..168fa884cb 100644 --- a/akka-docs-dev/rst/scala/stream.rst +++ b/akka-docs-dev/rst/scala/stream.rst @@ -42,7 +42,7 @@ Here's the data model we'll be working with throughout the quickstart examples: Transforming and consuming simple streams ----------------------------------------- In order to prepare our environment by creating an :class:`ActorSystem` and :class:`FlowMaterializer`, -which will be responsible for materializing and running the streams we're about to create: +which will be responsible for materializing and running the streams we are about to create: .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#materializer-setup @@ -140,7 +140,7 @@ about the back-pressure protocol used by Akka Streams and all other Reactive Str A typical problem applications (not using Akka streams) like this often face is that they are unable to process the incoming data fast enough, either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting in either ``OutOfMemoryError`` s or other severe degradations of service responsiveness. With Akka streams buffering can -and must be handled explicitly. For example, if we're only interested in the "*most recent tweets, with a buffer of 10 +and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10 elements*" this can be expressed using the ``buffer`` element: .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-slow-consumption-dropHead @@ -204,6 +204,12 @@ Sources, Flows and Sinks // TODO: talk about how creating and sharing a ``Flow.of[String]`` is useful etc. +.. note:: + By default Akka streams elements support **exactly one** down-stream element. + Making fan-out (supporting multiple downstream elements) an explicit opt-in feature allows default stream elements to + be less complex and more efficient. Also it allows for greater flexibility on *how exactly* to handle the multicast scenarios, + by providing named fan-out elements such as broadcast (signalls all down-stream elements) or balance (signals one of available down-stream elements). + .. _back-pressure-explained-scala: Back-pressure explained @@ -216,11 +222,6 @@ Backpressure when Fast Publisher and Slow Subscriber // TODO: Write me -Backpressure when Slow Publisher and Fast Subscriber ----------------------------------------------------- - -// TODO: Write me - In depth ======== // TODO: working with flows @@ -251,6 +252,15 @@ Stream Materialization ---------------------- **TODO - write me (feel free to move around as well)** +When constructing flows and graphs in Akka streams think of them as preparing a blueprint, an execution plan. +Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources +it needs in order to run. In the case of Akka streams this often means starting up Actors which power the processing, +but is not restricted to that - it could also mean opening files or socket connections etc. – depending on what the stream needs. + +Materialization is triggered at so called "terminal operations". Most notably this includes the various forms of the ``run()`` +and ``runWith()`` methods defined on flow elements as well as a small number of special syntactic sugars for running with +well-known sinks, such as ``foreach(el => )`` (being an alias to ``runWith(Sink.foreach(el => ))``. + MaterializedMap ^^^^^^^^^^^^^^^ **TODO - write me (feel free to move around as well)** @@ -266,22 +276,125 @@ Optimizations Subscription timeouts --------------------- -// TODO: esp in groupby etc, if you dont subscribe to a stream son enougu it may be dead once you get to it +// TODO: esp in groupBy etc, if you dont subscribe to a stream son enough it may be dead once you get to it + + +.. _stream-section-configuration: + +Section configuration +--------------------- +// TODO: it is possible to configure sections of a graph Working with Graphs =================== -// TODO: Don't forget adding the type parameter to the graph elements! +Akka streams are unique in the way they handle and expose computation graphs - instead of hiding the fact that the +processing pipeline is in fact a graph in a purely "fluent" DSL, graph operations are written in a DSL that graphically +resembles and embraces the fact that the built pipeline is in fact a Graph. In this section we'll dive into the multiple +ways of constructing and re-using graphs, as well as explain common pitfalls and how to avoid them. + +Graphs are needed whenever you want to perform any kind of fan-in ("multiple inputs") or fan-out ("multiple outputs") operations. +Considering linear Flows to be like roads, we can picture graph operations as junctions: multiple flows being connected at a single point. +Some graph operations which are common enough and fit the linear style of Flows, such as ``concat`` (which concatenates two +streams, such that the second one is consumed after the first one has completed), may have shorthand methods defined on +:class:`Flow` or :class:`Source` themselves, however you should keep in mind that those are also implemented as graph junctions. + +.. _flow-graph-scala: + +Constructing Flow Graphs +------------------------ +Flow graphs are built from simple Flows which serve as the linear connections within the graphs as well as Junctions +which serve as fan-in and fan-out points for flows. Thanks to the junctions having meaningful types based on their behaviour +and making them explicit elements these elements should be rather straight forward to use. + +Akka streams currently provides these junctions: + +* **Fan-out** + - :class:`Broadcast` – (1 input, n outputs) signals each output given an input signal, + - :class:`Balance` – (1 input => n outputs), signals one of its output ports given an input signal, + - :class:`UnZip` – (1 input => 2 outputs), which is a specialized element which is able to split a stream of ``(A,B)`` into two streams one type ``A`` and one of type ``B``, + - :class:`FlexiRoute` – (1 input, n outputs), which enables writing custom fan out elements using a simple DSL, +* **Fan-in** + - :class:`Merge` – (n inputs , 1 output), picks signals randomly from inputs pushing them one by one to its output, + - :class:`MergePreferred` – like :class:`Merge` but if elements are available on ``preferred`` port, it picks from it, otherwise randomly from ``others``, + - :class:`ZipWith` – (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output, + + :class:`Zip` – (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` stream, + - :class:`Concat` – (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters, + - :class:`FlexiMerge` – (n inputs, 1 output), which enables writing custom fan out elements using a simple DSL. + +One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is +simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating +the below hand drawn graph into Akka streams: + +.. image:: ../images/simple-graph-example.png + +Such graph is simple to translate to the Graph DSL since each linear element corresponds to a :class:`Flow`, +and each circle corresponds to either a :class:`Junction` or a :class:`Source` or :class:`Sink` if it is beginning +or ending a :class:`Flow`. + +.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#simple-flow-graph + +Notice the ``import FlowGraphImplicits._`` which brings into scope the ``~>`` operator (read as "edge", "via" or "to"). +It is also possible to construct graphs without the ``~>`` operator in case you prefer to use the graph builder explicitly: + +.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#simple-flow-graph-no-implicits .. _partial-flow-graph-scala: Constructing and combining Partial Flow Graphs ---------------------------------------------- -**TODO - write me (feel free to move around as well)** +Sometimes it is not possible (or needed) to construct the entire computation graph in one place, but instead construct +all of it is different phases in different places and in the end connect them all into a complete graph and run it. -Constructing a Source or Sink from a Graph -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -**TODO - write me (feel free to move around as well)** +This can be achieved using :class:`PartialFlowGraph`. The reason of representing it as a different type is that a +:class:`FlowGraph` requires all ports to be connected, and if they are not it will throw an exception at construction +time, which helps to avoid simple wiring errors while working with graphs. A partial flow graph however does not perform +this validation, and allows graphs that are not yet fully connected. + +A :class:`PartialFlowGraph` is defined as a :class:`FlowGraph` which contains so called "undefined elements", +such as ``UndefinedSink[T]`` or ``UndefinedSource[T]``, which can be reused and plugged into by consumers of that +partial flow graph. Let's imagine we want to provide users with a specialized element that given 3 inputs will pick +the greatest int value of each zipped triple. We'll want to expose 3 input ports (undefined sources) and one output port +(undefined sink). + +.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#simple-partial-flow-graph + +As you can see, first we construct the partial graph that contains all the zipping and comparing of stream +elements, then we import it (all of its nodes and connections) explicitly to the :class:`FlowGraph` instance in which all +the undefined elements are rewired to real sources and sinks. The graph can then be run and yields the expected result. + +.. warning:: + Please note that a :class:`FlowGraph` is not able to provide compile time type-safety about whether or not all + elements have been properly connected - this validation is performed as a runtime check during the graph's instantiation. + +.. _constructing-sources-sinks-flows-from-partial-graphs-scala: + +Constructing Sources, Sinks and Flows from a Partial Graphs +----------------------------------------------------------- +Instead of treating a :class:`PartialFlowGraph` as simply a collection of flows and junctions which may not yet all be +connected it is sometimes useful to expose such complex graph as a simpler structure, +such as a :class:`Source`, :class:`Sink` or :class:`Flow`. + +In fact, these concepts can be easily expressed as special cases of a partially connected graph: + +* :class:`Source` is a partial flow graph with *exactly one* :class:`UndefinedSink`, +* :class:`Sink` is a partial flow graph with *exactly one* :class:`UndefinedSource`, +* :class:`Flow` is a partial flow graph with *exactly one* :class:`UndefinedSource` and *exactly one* :class:`UndefinedSource`. + +Being able hide complex graphs inside of simple elements such as Sink / Source / Flow enables you to easily create one +complex element and from there on treat it as simple compound stage for linear computations. + +In order to create a Source from a partial flow graph ``Source[T]`` provides a special apply method that takes a function +that must return an ``UndefinedSink[T]``. This undefined sink will become "the sink that must be attached before this Source +can run". Refer to the example below, in which we create a Source that zips together two numbers, to see this graph +construction in action: + +.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#source-from-partial-flow-graph + +Similarly the same can be done for a ``Sink[T]``, in which case the returned value must be an ``UndefinedSource[T]``. +For defining a ``Flow[T]`` we need to expose both an undefined source and sink: + +.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#flow-from-partial-flow-graph Dealing with cycles, deadlocks ------------------------------ @@ -293,7 +406,7 @@ Dealing with cycles, deadlocks // TODO: custom processing -// TODO: stages and flexistuff +// TODO: stages and flexi stuff Streaming IO ============ @@ -307,14 +420,14 @@ Custom elements **TODO - write me (feel free to move around as well)** // TODO: So far we've been mostly using predefined elements, but sometimes that's not enough -Stage ------ -**TODO - write me (feel free to move around as well)** +.. _flexi-merge: Flexi Merge ----------- -**TODO - write me (feel free to move around as well)** +// TODO: "May sometimes be exactly what you need..." + +.. _flexi-route: Flexi Route ----------- **TODO - write me (feel free to move around as well)** @@ -329,7 +442,7 @@ ActorSubscriber ^^^^^^^^^^^^^^^ -// TODO: Implementing Reactive Streams interfaces directly vs. extending ActorPublisher / ActoSubscriber??? +// TODO: Implementing Reactive Streams interfaces directly vs. extending ActorPublisher / ActorSubscriber??? Integrating with Actors ======================= @@ -347,5 +460,5 @@ Integration with Reactive Streams enabled libraries // TODO: some info about reactive streams in general -// TODO: Simplly runWith(Sink.publisher) and runWith(Source.subscriber) to get the corresponding reactive streams types. +// TODO: Simply runWith(Sink.publisher) and runWith(Source.subscriber) to get the corresponding reactive streams types.