+doc: Updating basics, quickstart and some of graph docs

This commit is contained in:
Endre Sándor Varga 2015-02-26 11:33:29 +01:00
parent ac9c61a3a5
commit 41f6a0bf26
6 changed files with 285 additions and 42 deletions

View file

@ -7,7 +7,7 @@ import akka.actor.Cancellable
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import concurrent.Future
import scala.concurrent.{ Promise, Future }
class FlowDocSpec extends AkkaSpec {
@ -54,7 +54,7 @@ class FlowDocSpec extends AkkaSpec {
//#materialization-runWith
}
"materializedMap is unique" in {
"materialization is unique" in {
//#stream-reuse
// connect the Source to the Sink, obtaining a RunnableFlow
val sink = Sink.fold[Int, Int](0)(_ + _)
@ -108,10 +108,10 @@ class FlowDocSpec extends AkkaSpec {
Source.empty
// Sink that folds over the stream and returns a Future
// of the final result in the MaterializedMap
// of the final result as its materialized value
Sink.fold[Int, Int](0)(_ + _)
// Sink that returns a Future in the MaterializedMap,
// Sink that returns a Future as its materialized value,
// containing the first element of the stream
Sink.head
@ -138,4 +138,79 @@ class FlowDocSpec extends AkkaSpec {
//#flow-connecting
}
"various ways of transforming materialized values" in {
import scala.concurrent.duration._
val throttler = Flow(Source(1.second, 1.second, "test")) { implicit builder =>
tickSource =>
import FlowGraph.Implicits._
val zip = builder.add(ZipWith[String, Int, Int](Keep.right))
tickSource ~> zip.in0
(zip.in1, zip.out)
}
//#flow-mat-combine
// An empty source that can be shut down explicitly from the outside
val source: Source[Int, Promise[Unit]] = Source.lazyEmpty[Int]()
// A flow that internally throttles elements to 1/second, and returns a Cancellable
// which can be used to shut down the stream
val flow: Flow[Int, Int, Cancellable] = throttler
// A sink that returns the first element of a stream in the returned Future
val sink: Sink[Int, Future[Int]] = Sink.head[Int]()
// By default, the materialized value of the leftmost stage is preserved
val r1: RunnableFlow[Promise[Unit]] = source.via(flow).to(sink)
// Simple selection of materialized values by using Keep.right
val r2: RunnableFlow[Cancellable] = source.viaMat(flow)(Keep.right).to(sink)
val r3: RunnableFlow[Future[Int]] = source.via(flow).toMat(sink)(Keep.right)
// Using runWith will always give the materialized values of the stages added
// by runWith() itself
val r4: Future[Int] = source.via(flow).runWith(sink)
val r5: Promise[Unit] = flow.to(sink).runWith(source)
val r6: (Promise[Unit], Future[Int]) = flow.runWith(source, sink)
// Using more complext combinations
val r7: RunnableFlow[(Promise[Unit], Cancellable)] =
source.viaMat(flow)(Keep.both).to(sink)
val r8: RunnableFlow[(Promise[Unit], Future[Int])] =
source.via(flow).toMat(sink)(Keep.both)
val r9: RunnableFlow[((Promise[Unit], Cancellable), Future[Int])] =
source.viaMat(flow)(Keep.both).toMat(sink)(Keep.both)
val r10: RunnableFlow[(Cancellable, Future[Int])] =
source.viaMat(flow)(Keep.right).toMat(sink)(Keep.both)
// It is also possible to map over the materialized values. In r9 we had a
// doubly nested pair, but we want to flatten it out
val r11: RunnableFlow[(Promise[Unit], Cancellable, Future[Int])] =
r9.mapMaterialized {
case ((promise, cancellable), future) =>
(promise, cancellable, future)
}
// Now we can use pattern matching to get the resulting materialized values
val (promise, cancellable, future) = r11.run()
// Type inference works as expected
promise.success(0)
cancellable.cancel()
future.map(_ + 3)
// The result of r11 can be also achieved by using the Graph API
val r12: RunnableFlow[(Promise[Unit], Cancellable, Future[Int])] =
FlowGraph.closed(source, flow, sink)((_, _, _)) { implicit builder =>
(src, f, dst) =>
import FlowGraph.Implicits._
src ~> f ~> dst
}
//#flow-mat-combine
}
}

View file

@ -3,16 +3,11 @@
*/
package docs.stream
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.Broadcast
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.FlowGraph
import akka.stream.scaladsl.Merge
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Zip
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
@ -117,4 +112,112 @@ class FlowGraphDocSpec extends AkkaSpec {
Await.result(bottomFuture, 300.millis) shouldEqual 2
}
"building a reusable component" in {
//#flow-graph-components-shape
// A shape represents the input and output ports of a reusable
// processing module
case class PriorityWorkerPoolShape[In, Out](
jobsIn: Inlet[In],
priorityJobsIn: Inlet[In],
resultsOut: Outlet[Out]) extends Shape {
// It is important to provide the list of all input and output
// ports with a stable order. Duplicates are not allowed.
override val inlets: immutable.Seq[Inlet[_]] =
jobsIn :: priorityJobsIn :: Nil
override val outlets: immutable.Seq[Outlet[_]] =
resultsOut :: Nil
// A Shape must be able to create a copy of itself. Basically
// it means a new instance with copies of the ports
override def deepCopy() = PriorityWorkerPoolShape(
new Inlet[In](jobsIn.toString),
new Inlet[In](priorityJobsIn.toString),
new Outlet[Out](resultsOut.toString))
// A Shape must also be able to create itself from existing ports
override def copyFromPorts(
inlets: immutable.Seq[Inlet[_]],
outlets: immutable.Seq[Outlet[_]]) = {
assert(inlets.size == this.inlets.size)
assert(outlets.size == this.outlets.size)
// This is why order matters when overriding inlets and outlets
PriorityWorkerPoolShape(inlets(0), inlets(1), outlets(0))
}
}
//#flow-graph-components-shape
//#flow-graph-components-create
object PriorityWorkerPool {
def apply[In, Out](
worker: Flow[In, Out, _],
workerCount: Int): Graph[PriorityWorkerPoolShape[In, Out], Unit] = {
FlowGraph.partial() { implicit b
import FlowGraph.Implicits._
val priorityMerge = b.add(MergePreferred[In](1))
val balance = b.add(Balance[In](workerCount))
val resultsMerge = b.add(Merge[Out](workerCount))
// After merging priority and ordinary jobs, we feed them to the balancer
priorityMerge ~> balance
// Wire up each of the outputs of the balancer to a worker flow
// then merge them back
for (i <- 0 until workerCount)
balance.out(i) ~> worker ~> resultsMerge.in(i)
// We now expose the input ports of the priorityMerge and the output
// of the resultsMerge as our PriorityWorkerPool ports
// -- all neatly wrapped in our domain specific Shape
PriorityWorkerPoolShape(
jobsIn = priorityMerge.in(0),
priorityJobsIn = priorityMerge.preferred,
resultsOut = resultsMerge.out)
}
}
}
//#flow-graph-components-create
def println(s: Any): Unit = ()
//#flow-graph-components-use
val worker1 = Flow[String].map("step 1 " + _)
val worker2 = Flow[String].map("step 2 " + _)
FlowGraph.closed() { implicit b =>
import FlowGraph.Implicits._
val priorityPool1 = b.add(PriorityWorkerPool(worker1, 4))
val priorityPool2 = b.add(PriorityWorkerPool(worker2, 2))
Source(1 to 100).map("job: " + _) ~> priorityPool1.jobsIn
Source(1 to 100).map("priority job: " + _) ~> priorityPool1.priorityJobsIn
priorityPool1.resultsOut ~> priorityPool2.jobsIn
Source(1 to 100).map("one-step, priority " + _) ~> priorityPool2.priorityJobsIn
priorityPool2.resultsOut ~> Sink.foreach(println)
}.run()
//#flow-graph-components-use
//#flow-graph-components-shape2
import FanInShape.Name
import FanInShape.Init
case class PriorityWorkerPoolShape2[In, Out](
_init: Init[Out] = Name("PriorityWorkerPool")) extends FanInShape2[In, In, Out](_init) {
def jobsIn: Inlet[In] = in0
def priorityJobsIn: Inlet[In] = in1
def resultsOut: Outlet[Out] = out
}
//#flow-graph-components-shape2
}
}

View file

@ -62,7 +62,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
trait Example1 {
//#materializer-setup
implicit val system = ActorSystem("reactive-tweets")
implicit val mat = ActorFlowMaterializer()
implicit val materializer = ActorFlowMaterializer()
//#materializer-setup
}
@ -155,7 +155,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
"count elements on finite stream" in {
//#tweets-fold-count
val sumSink = Sink.fold[Int, Int](0)(_ + _)
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
val counter: RunnableFlow[Future[Int]] = tweets.map(t => 1).toMat(sumSink)(Keep.right)

View file

@ -60,9 +60,12 @@ one actor prepare the work, and then have it be materialized at some completely
.. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-in-steps
After running (materializing) the ``RunnableFlow`` we get a special container object, the ``MaterializedMap``. Both
sources and sinks are able to put specific objects into this map. Whether they put something in or not is implementation
dependent. For example a ``FoldSink`` will make a ``Future`` available in this map which will represent the result
After running (materializing) the ``RunnableFlow[T]`` we get back the materialized value of type T. Every stream processing
stage can produce a materialized value, and it is the responsibility of the user to combine them to a new type.
In the above example we used ``toMat`` to indicate that we want to transform the materialized value of the source and
sink, and we used the convenience function ``Keep.right`` to say that we are only interested in the materialized value
of the sink.
In our example the ``FoldSink`` materializes a value of type ``Future`` which will represent the result
of the folding process over the stream. In general, a stream can expose multiple materialized values,
but it is quite common to be interested in only the value of the Source or the Sink in the stream. For this reason
there is a convenience method called ``runWith()`` available for ``Sink``, ``Source`` or ``Flow`` requiring, respectively,
@ -85,7 +88,8 @@ instead of modifying the existing instance, so while constructing long flows, re
In the above example we used the ``runWith`` method, which both materializes the stream and returns the materialized value
of the given sink or source.
Since a stream can be materialized multiple times, the ``MaterializedMap`` returned is different for each materialization.
Since a stream can be materialized multiple times, the materialized value will also be calculated anew for each such
materialization, usually leading to different values being returned each time.
In the example below we create two running materialized instance of the stream that we described in the ``runnable``
variable, and both materializations give us a different ``Future`` from the map even though we used the same ``sink``
to refer to the future:
@ -196,6 +200,15 @@ which will be running on the thread pools they have been configured to run on -
Reusing *instances* of linear computation stages (Source, Sink, Flow) inside FlowGraphs is legal,
yet will materialize that stage multiple times.
Combining materialized values
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Since every processing stage in Akka Streams can provide a materialized value after being materialized, it is necessary
to somehow express how these values should be composed to a final value when we plug these stages together. For this,
many combinator methods have variants that take an additional argument, a function, that will be used to combine the
resulting values. Some examples of using these combiners are illustrated in the example below.
.. includecode:: code/docs/stream/FlowDocSpec.scala#flow-mat-combine
Stream ordering
===============

View file

@ -132,6 +132,51 @@ For defining a ``Flow[T]`` we need to expose both an undefined source and sink:
.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#flow-from-partial-flow-graph
Building reusable Graph components
----------------------------------
It is possible to build reusable, encapsulated components of arbitrary input and output ports using the graph DSL.
As an example, we will build a graph junction that represents a pool of workers, where a worker is expressed
as a ``Flow[I,O,_]``, i.e. a simple transformation of jobs of type ``I`` to results of type ``O`` (as you have seen
already, this flow can actually contain a complex graph inside). Our reusable worker pool junction will
not preserve the order of the incoming jobs (they are assumed to have a proper ID field) and it will use a ``Balance``
junction to schedule jobs to available workers. On top of this, our junction will feature a "fastlane", a dedicated port
where jobs of higher priority can be sent.
Altogether, our junction will have two input ports of type ``I`` (for the normal and priority jobs) and an output port
of type ``O``. To represent this interface, we need to define a custom :class:`Shape`. The following lines show how to do that.
.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-components-shape
In general a custom :class:`Shape` needs to be able to provide all its input and output ports, be able to copy itself, and also be
able to create a new instance from given ports. There are some predefined shapes provided to avoid unnecessary
boilerplate
* :class:`SourceShape`, :class:`SinkShape`, :class:`FlowShape` for simpler shapes,
* :class:`UniformFanInShape` and :class:`UniformFanOutShape` for junctions with multiple input (or output) ports
of the same type,
* :class:`FanInShape1`, :class:`FanInShape2`, ..., :class:`FanOutShape1`, :class:`FanOutShape2`, ... for junctions
with multiple input (or output) ports of different types.
Since our shape has two input ports and one output port, we can just reuse the :class:`FanInShape2` class to define
our custom shape:
.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-components-shape2
Now that we have a :class:`Shape` we can wire up a Graph that represents
our worker pool. First, we will merge incoming normal and priority jobs using ``MergePreferred``, then we will send the jobs
to a ``Balance`` junction which will fan-out to a configurable number of workers (flows), finally we merge all these
results together and send them out through our only output port. This is expressed by the following code:
.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-components-create
All we need to do now is to use our custom junction in a graph. The following code simulates some simple workers
and jobs using plain strings and prints out the results. Actually we used *two* instances of our worker pool junction
using ``add()`` twice.
.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-components-use
.. _graph-cycles-scala:
Graph cycles, liveness and deadlocks

View file

@ -29,14 +29,15 @@ materialization properties, such as default buffer sizes (see also :ref:`stream-
be used by the pipeline etc. These can be overridden on an element-by-element basis or for an entire section, but this
will be discussed in depth in :ref:`stream-section-configuration`.
Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source[Out]`:
Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source[Out, M]`:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweet-source
Streams always start flowing from a :class:`Source[Out]` then can continue through :class:`Flow[In,Out]` elements or
more advanced graph elements to finally be consumed by a :class:`Sink[In]`. Both Sources and Flows provide stream operations
that can be used to transform the flowing data, a :class:`Sink` however does not since its the "end of stream" and its
behavior depends on the type of :class:`Sink` used.
Streams always start flowing from a :class:`Source[Out,M1]` then can continue through :class:`Flow[In,Out,M2]` elements or
more advanced graph elements to finally be consumed by a :class:`Sink[In,M3]` (ignore the type parameters ``M1``, ``M2``
and ``M3`` for now, they are not relevant to the types of the elements produced/consumed by these classes). Both Sources and
Flows provide stream operations that can be used to transform the flowing data, a :class:`Sink` however does not since
its the "end of stream" and its behavior depends on the type of :class:`Sink` used.
In our case let's say we want to find all twitter handles of users which tweet about ``#akka``, the operations should look
familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data:
@ -44,8 +45,8 @@ familiar to anyone who has used the Scala Collections library, however they oper
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map
Finally in order to :ref:`materialize <stream-materialization-scala>` and run the stream computation we need to attach
the Flow to a :class:`Sink[T]` that will get the flow running. The simplest way to do this is to call
``runWith(sink)`` on a ``Source[Out]``. For convenience a number of common Sinks are predefined and collected as methods on
the Flow to a :class:`Sink` that will get the flow running. The simplest way to do this is to call
``runWith(sink)`` on a ``Source``. For convenience a number of common Sinks are predefined and collected as methods on
the :class:``Sink`` `companion object <http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M2-SNAPSHOT/#akka.stream.scaladsl.Sink$>`_.
For now let's simply print each author:
@ -56,7 +57,7 @@ or by using the shorthand version (which are defined only for the most popular s
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println
Materializing and running a stream always requires a :class:`FlowMaterializer` to be in implicit scope (or passed in explicitly,
like this: ``.run(mat)``).
like this: ``.run(materializer)``).
Flattening sequences in streams
-------------------------------
@ -69,8 +70,8 @@ combinator:
.. note::
The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition.
It is problematic for two reasons: firstly, flattening by concatenation is often undesirable in bounded stream processing
due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for
It is problematic for two reasons: first, flattening by concatenation is often undesirable in bounded stream processing
due to the risk of deadlock (with merge being the preferred strategy), and second, the monad laws would not hold for
our implementation of flatMap (due to the liveness issues).
Please note that the mapConcat requires the supplied function to return a strict collection (``f:Out=>immutable.Seq[T]``),
@ -96,13 +97,13 @@ FlowGraphs are constructed like this:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#flow-graph-broadcast
.. note::
The ``~>`` (read as "edge", "via" or "to") operator is only available if ``FlowGraphImplicits._`` are imported.
The ``~>`` (read as "edge", "via" or "to") operator is only available if ``FlowGraph.Implicits._`` are imported.
Without this import you can still construct graphs using the ``builder.addEdge(from,[through,]to)`` method.
As you can see, inside the :class:`FlowGraph` we use an implicit graph builder to mutably construct the graph
using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). Once we have the FlowGraph in the value ``g``
*it is immutable, thread-safe, and freely shareable*. A graph can can be ``run()`` directly - assuming all
ports (sinks/sources) within a flow have been connected properly. It is possible to construct :class:`PartialFlowGraph` s
ports (sinks/sources) within a flow have been connected properly. It is possible to construct partial graphs
where this is not required but this will be covered in detail in :ref:`partial-flow-graph-scala`.
As all Akka Streams elements, :class:`Broadcast` will properly propagate back-pressure to its upstream element.
@ -136,26 +137,27 @@ While this question is not as obvious to give an answer to in case of an infinit
this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"),
but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements.
First, let's write such an element counter using :class:`FoldSink` and then we'll see how it is possible to obtain materialized
values from a :class:`MaterializedMap` which is returned by materializing an Akka stream. We'll split execution into multiple
lines for the sake of explaining the concepts of ``Materializable`` elements and ``MaterializedType``
First, let's write such an element counter using :class:`FoldSink` and see how the types look like:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count
First, we prepare the :class:`FoldSink` which will be used to sum all ``Int`` elements of the stream.
Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``,
finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the
processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can
be ``run()``, as indicated by its type: :class:`RunnableFlow`. Next we call ``run()`` which uses the implicit :class:`ActorFlowMaterializer`
to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow`` or ``FlowGraph`` is ``MaterializedMap``,
which can be used to retrieve materialized values from the running stream.
finally we connect the flow using ``toMat`` the previously prepared Sink. Remember those mysterious type parameters on
:class:`Source` :class:`Flow` and :class:`Sink`? They represent the type of values these processing parts return when
materialized. When you chain these together, you can explicitly combine their materialized values: in our example we
used the ``Keep.right`` predefined function, which tells the implementation to only care about the materialized
type of the stage currently appended to the right. As you can notice, the materialized type of sumSink is ``Future[Int]``
and because of using ``Keep.right``, the resulting :class:`RunnableFlow` has also a type parameter of ``Future[Int]``.
In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map
obtained from materializing a flow or graph. Since ``FoldSink`` implements ``Materializable`` and implements the ``MaterializedType``
as ``Future[Int]`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream.
This step does *not* yet materialize the
processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can
be ``run()``, as indicated by its type: :class:`RunnableFlow[Future[Int]]`. Next we call ``run()`` which uses the implicit :class:`ActorFlowMaterializer`
to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow[T]`` is of type ``T``.
In our case this type is ``Future[Int]`` which, when completed, will contain the total length of our tweets stream.
In case of the stream failing, this future would complete with a Failure.
The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableFlow` may be reused
A :class:`RunnableFlow` may be reused
and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream,
for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations
will be different, as illustrated by this example:
@ -167,3 +169,8 @@ steering these elements which will be discussed in detail in :ref:`stream-materi
what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count-oneline
.. note::
``runWith()`` is a convenience method that automatically ignores the materialized value of any other stages except
those appended by the ``runWith()`` itself. In the above example it translates to using ``Keep.right`` as the combiner
for materialized values.