+doc: Changes to basics+flows part

This commit is contained in:
Endre Sándor Varga 2014-12-20 17:16:08 +01:00
parent 754d50abdb
commit d7dc5bbc3b
2 changed files with 153 additions and 152 deletions

View file

@ -1,143 +1,142 @@
/** /**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/ */
package docs.stream package docs.stream
import akka.actor.Cancellable import akka.actor.Cancellable
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec import akka.stream.testkit.AkkaSpec
import concurrent.Future import concurrent.Future
// TODO replace with => and disable this intellij setting // TODO replace with => and disable this intellij setting
class FlowDocSpec extends AkkaSpec { class FlowDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
//#imports //#imports
import akka.stream.FlowMaterializer import akka.stream.FlowMaterializer
//#imports //#imports
implicit val mat = FlowMaterializer() implicit val mat = FlowMaterializer()
"source is immutable" in { "source is immutable" in {
//#source-immutable //#source-immutable
val source = Source(1 to 10) val source = Source(1 to 10)
source.map(_ 0) // has no effect on source, since it's immutable source.map(_ 0) // has no effect on source, since it's immutable
source.runWith(Sink.fold(0)(_ + _)) // 55 source.runWith(Sink.fold(0)(_ + _)) // 55
val zeroes = source.map(_ 0) // returns new Source[Int], with `map()` appended val zeroes = source.map(_ 0) // returns new Source[Int], with `map()` appended
zeroes.runWith(Sink.fold(0)(_ + _)) // 0 zeroes.runWith(Sink.fold(0)(_ + _)) // 0
//#source-immutable //#source-immutable
} }
"materialization in steps" in { "materialization in steps" in {
//#materialization-in-steps //#materialization-in-steps
val source = Source(1 to 10) val source = Source(1 to 10)
val sink = Sink.fold[Int, Int](0)(_ + _) val sink = Sink.fold[Int, Int](0)(_ + _)
// connect the Source to the Sink, obtaining a RunnableFlow // connect the Source to the Sink, obtaining a RunnableFlow
val runnable: RunnableFlow = source.to(sink) val runnable: RunnableFlow = source.to(sink)
// materialize the flow // materialize the flow
val materialized: MaterializedMap = runnable.run() val materialized: MaterializedMap = runnable.run()
// get the materialized value of the FoldSink // get the materialized value of the FoldSink
val sum: Future[Int] = materialized.get(sink) val sum: Future[Int] = materialized.get(sink)
//#materialization-in-steps //#materialization-in-steps
} }
"materialization runWith" in { "materialization runWith" in {
//#materialization-runWith //#materialization-runWith
val source = Source(1 to 10) val source = Source(1 to 10)
val sink = Sink.fold[Int, Int](0)(_ + _) val sink = Sink.fold[Int, Int](0)(_ + _)
// materialize the flow, getting the Sinks materialized value // materialize the flow, getting the Sinks materialized value
val sum: Future[Int] = source.runWith(sink) val sum: Future[Int] = source.runWith(sink)
//#materialization-runWith //#materialization-runWith
} }
"materializedMap is unique" in { "materializedMap is unique" in {
//#stream-reuse //#stream-reuse
// connect the Source to the Sink, obtaining a RunnableFlow // connect the Source to the Sink, obtaining a RunnableFlow
val sink = Sink.fold[Int, Int](0)(_ + _) val sink = Sink.fold[Int, Int](0)(_ + _)
val runnable: RunnableFlow = Source(1 to 10).to(sink) val runnable: RunnableFlow = Source(1 to 10).to(sink)
// get the materialized value of the FoldSink // get the materialized value of the FoldSink
val sum1: Future[Int] = runnable.run().get(sink) val sum1: Future[Int] = runnable.run().get(sink)
val sum2: Future[Int] = runnable.run().get(sink) val sum2: Future[Int] = runnable.run().get(sink)
// sum1 and sum2 are different Futures! // sum1 and sum2 are different Futures!
//#stream-reuse //#stream-reuse
} }
"compound source cannot be used as key" in { "compound source cannot be used as key" in {
//#compound-source-is-not-keyed-runWith //#compound-source-is-not-keyed-runWith
import scala.concurrent.duration._ import scala.concurrent.duration._
case object Tick case object Tick
val timer = Source(initialDelay = 1.second, interval = 1.seconds, tick = () Tick) val timer = Source(initialDelay = 1.second, interval = 1.seconds, tick = () Tick)
val timerCancel: Cancellable = Sink.ignore.runWith(timer) val timerCancel: Cancellable = Sink.ignore.runWith(timer)
timerCancel.cancel() timerCancel.cancel()
val timerMap = timer.map(tick "tick") val timerMap = timer.map(tick "tick")
val _ = Sink.ignore.runWith(timerMap) // WRONG: returned type is not the timers Cancellable! val _ = Sink.ignore.runWith(timerMap) // WRONG: returned type is not the timers Cancellable!
//#compound-source-is-not-keyed-runWith //#compound-source-is-not-keyed-runWith
//#compound-source-is-not-keyed-run //#compound-source-is-not-keyed-run
// retain the materialized map, in order to retrieve the timers Cancellable // retain the materialized map, in order to retrieve the timers Cancellable
val materialized = timerMap.to(Sink.ignore).run() val materialized = timerMap.to(Sink.ignore).run()
val timerCancellable = materialized.get(timer) val timerCancellable = materialized.get(timer)
timerCancellable.cancel() timerCancellable.cancel()
//#compound-source-is-not-keyed-run //#compound-source-is-not-keyed-run
} }
"creating sources, sinks" in { "creating sources, sinks" in {
//#source-sink //#source-sink
// Create a source from an Iterable // Create a source from an Iterable
Source(List(1, 2, 3)) Source(List(1, 2, 3))
// Create a source form a Future // Create a source form a Future
Source(Future.successful("Hello Streams!")) Source(Future.successful("Hello Streams!"))
// Create a source from a single element // Create a source from a single element
Source.single("only one element") Source.single("only one element")
// an empty source // an empty source
Source.empty Source.empty
// Sink that folds over the stream and returns a Future // Sink that folds over the stream and returns a Future
// of the final result in the MaterializedMap // of the final result in the MaterializedMap
Sink.fold(0)(_ + _) Sink.fold[Int, Int](0)(_ + _)
// Sink that returns a Future in the MaterializedMap, // Sink that returns a Future in the MaterializedMap,
// containing the first element of the stream // containing the first element of the stream
Sink.head Sink.head
// A Sink that consumes a stream without doing anything with the elements // A Sink that consumes a stream without doing anything with the elements
Sink.ignore Sink.ignore
// A Sink that executes a side-effecting call for every element of the stream // A Sink that executes a side-effecting call for every element of the stream
Sink.foreach((elem) => println(elem)) Sink.foreach[String](println(_))
//#source-sink //#source-sink
} }
"various ways of connecting source, sink, flow" in { "various ways of connecting source, sink, flow" in {
//#flow-connecting //#flow-connecting
// Explicitly creating and wiring up a Source, Sink and Flow // Explicitly creating and wiring up a Source, Sink and Flow
Source(1 to 6).via(Flow[Int].map(_ * 2)).to(Sink.foreach(println(_))) Source(1 to 6).via(Flow[Int].map(_ * 2)).to(Sink.foreach(println(_)))
// Starting from a Source // Starting from a Source
val source = Source(1 to 6).map(_ * 2) val source = Source(1 to 6).map(_ * 2)
source.to(Sink.foreach(println(_))) source.to(Sink.foreach(println(_)))
// Starting from a Sink // Starting from a Sink
val sink: Sink[Int] = Flow[Int].map(_ * 2).to(Sink.foreach(println(_))) val sink: Sink[Int] = Flow[Int].map(_ * 2).to(Sink.foreach(println(_)))
Source(1 to 6).to(sink) Source(1 to 6).to(sink)
//#flow-connecting
//#flow-connecting }
} }
}

View file

@ -28,24 +28,24 @@ Back-pressure
In the context of Akka Streams back-pressure is always understood as *non-blocking* and *asynchronous* In the context of Akka Streams back-pressure is always understood as *non-blocking* and *asynchronous*
Processing Stage Processing Stage
The common name for all building blocks that build up a Flow or FlowGraph. The common name for all building blocks that build up a Flow or FlowGraph.
Examples of a processing stage would be ``Stage`` (:class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage`, Examples of a processing stage would be operations like ``map()``, ``filter()``, stages added by ``transform()``
:class:`DetachedStage`), operations like ``map()``, ``filter()`` and graph junctions like ``Merge`` or ``Broadcast``. (:class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage`) and graph junctions like ``Merge`` or ``Broadcast``.
Defining and running streams Defining and running streams
---------------------------- ----------------------------
Linear processing pipelines can be expressed in Akka Streams using the following three core abstractions: Linear processing pipelines can be expressed in Akka Streams using the following three core abstractions:
Source Source
A processing stage with *exactly one output*, emitting data elements whenever downstream processing elements are A processing stage with *exactly one output*, emitting data elements whenever downstream processing stages are
ready to receive them. ready to receive them.
Sink Sink
A processing stage with *exactly one input*, requesting and accepting data elements possibly slowing down the upstream A processing stage with *exactly one input*, requesting and accepting data elements possibly slowing down the upstream
producer of elements producer of elements
Flow Flow
A processing stage which has *exactly one input and output*, which connects its up- and downstreams by (usually) A processing stage which has *exactly one input and output*, which connects its up- and downstreams by
transforming the data elements flowing through it. transforming the data elements flowing through it.
RunnableFlow RunnableFlow
A Flow with has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``. A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``.
It is possible to attach a ``Flow`` to a ``Source`` resulting in a composite source, and it is also possible to prepend It is possible to attach a ``Flow`` to a ``Source`` resulting in a composite source, and it is also possible to prepend
a ``Flow`` to a ``Sink`` to get a new sink. After a stream is properly terminated by having both a source and a sink, a ``Flow`` to a ``Sink`` to get a new sink. After a stream is properly terminated by having both a source and a sink,
@ -55,13 +55,13 @@ It is important to remember that even after constructing the ``RunnableFlow`` by
different processing stages, no data will flow through it until it is materialized. Materialization is the process of different processing stages, no data will flow through it until it is materialized. Materialization is the process of
allocating all resources needed to run the computation described by a Flow (in Akka Streams this will often involve allocating all resources needed to run the computation described by a Flow (in Akka Streams this will often involve
starting up Actors). Thanks to Flows being simply a description of the processing pipeline they are *immutable, starting up Actors). Thanks to Flows being simply a description of the processing pipeline they are *immutable,
thread-safe, and freely shareable*, which means that it is for example safe to share send between actors, to have thread-safe, and freely shareable*, which means that it is for example safe to share and send them between actors, to have
one actor prepare the work, and then have it be materialized at some completely different place in the code. one actor prepare the work, and then have it be materialized at some completely different place in the code.
.. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-in-steps .. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-in-steps
After running (materializing) the ``RunnableFlow`` we get a special container object, the ``MaterializedMap``. Both After running (materializing) the ``RunnableFlow`` we get a special container object, the ``MaterializedMap``. Both
sources and sinks are able to put specific object into this map. Whether they put something in or not is implementation sources and sinks are able to put specific objects into this map. Whether they put something in or not is implementation
dependent. For example a ``FoldSink`` will make a ``Future`` available in this map which will represent the result dependent. For example a ``FoldSink`` will make a ``Future`` available in this map which will represent the result
of the folding process over the stream. In general, a stream can expose multiple materialized values, of the folding process over the stream. In general, a stream can expose multiple materialized values,
but it is quite common to be interested in only the value of the Source or the Sink in the stream. For this reason but it is quite common to be interested in only the value of the Source or the Sink in the stream. For this reason
@ -109,10 +109,12 @@ There are various ways to wire up different parts of a stream, the following exa
Back-pressure explained Back-pressure explained
----------------------- -----------------------
Akka Streams implements an asynchronous non-blocking back-pressure protocol standardised by the Reactive Streams Akka Streams implements an asynchronous non-blocking back-pressure protocol standardised by the `Reactive Streams`_
specification, which Akka is a founding member of. specification, which Akka is a founding member of.
The user of the library does not have to write any explicit back-pressure handling code - it is built in .. _Reactive Streams: http://reactive-streams.org/
The user of the library does not have to write any explicit back-pressure handling code — it is built in
and dealt with automatically by all of the provided Akka Streams processing stages. It is possible however to add and dealt with automatically by all of the provided Akka Streams processing stages. It is possible however to add
explicit buffer stages with overflow strategies that can influence the behaviour of the stream. This is especially important explicit buffer stages with overflow strategies that can influence the behaviour of the stream. This is especially important
in complex processing graphs which may even sometimes even contain loops (which *must* be treated with very special in complex processing graphs which may even sometimes even contain loops (which *must* be treated with very special