diff --git a/akka-docs-dev/rst/scala.rst b/akka-docs-dev/rst/scala.rst index 1371dec927..14662b661c 100644 --- a/akka-docs-dev/rst/scala.rst +++ b/akka-docs-dev/rst/scala.rst @@ -7,4 +7,5 @@ Scala Documentation :maxdepth: 2 experimental/index + scala/stream scala/index-http diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala new file mode 100644 index 0000000000..1fa867875f --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package docs.stream + +//#imports + +import akka.stream.FlowMaterializer +import akka.stream.scaladsl.PartialFlowGraph + +//#imports + +import akka.stream.testkit.AkkaSpec + +// TODO replace ⇒ with => and disable this intellij setting +class StreamPartialFlowGraphDocSpec extends AkkaSpec { + + implicit val mat = FlowMaterializer() + + "build with open ports" in { + //#simple-partial-flow-graph + PartialFlowGraph { implicit b ⇒ + + } + //#simple-partial-flow-graph + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala new file mode 100644 index 0000000000..87faa3b1bf --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -0,0 +1,213 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package docs.stream + +//#imports + +import java.util.Date + +import akka.actor.ActorSystem +import akka.stream.FlowMaterializer +import akka.stream.OverflowStrategy +import akka.stream.scaladsl.Broadcast +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.FlowGraph +import akka.stream.scaladsl.FlowGraphImplicits +import akka.stream.scaladsl.MaterializedMap +import akka.stream.scaladsl.RunnableFlow +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + +import concurrent.Await +import concurrent.Future + +//#imports + +import akka.stream.testkit.AkkaSpec + +// TODO replace ⇒ with => and disable this intellij setting +class TwitterStreamQuickstartDocSpec extends AkkaSpec { + + implicit val executionContext = system.dispatcher + + //#model + final case class Author(handle: String) + val AkkaTeam = Author("akkateam") + + final case class Hashtag(name: String) + + final case class Tweet(author: Author, timestamp: Long, body: String) { + def hashtags: List[Hashtag] = + body.split(" ").toList.collect { case t if t.startsWith("#") ⇒ Hashtag(t) } + } + //#model + + trait Example0 { + //#tweet-source + val tweets: Source[Tweet] + //#tweet-source + } + + trait Example1 { + //#materializer-setup + implicit val system = ActorSystem("reactive-tweets") + implicit val mat = FlowMaterializer() + //#materializer-setup + } + + val tweets = Source( + Tweet(Author("rolandkuhn"), (new Date).getTime, "#akka rocks!") :: + Tweet(Author("patriknw"), (new Date).getTime, "#akka!") :: + Tweet(Author("bantonsson"), (new Date).getTime, "#akka!") :: + Tweet(Author("drewhk"), (new Date).getTime, "#akka!") :: + Tweet(Author("ktosopl"), (new Date).getTime, "#akka on the rocks!") :: + Tweet(Author("mmartynas"), (new Date).getTime, "wow #akka!") :: + Tweet(Author("akkateam"), (new Date).getTime, "#akka rocks!") :: + Tweet(Author("bananaman"), (new Date).getTime, "#bananas rock!") :: + Tweet(Author("appleman"), (new Date).getTime, "#apples rock!") :: + Tweet(Author("drama"), (new Date).getTime, "we compared #apples to #oranges!") :: + Nil) + + implicit val mat = FlowMaterializer() + + "filter and map" in { + //#authors-filter-map + val authors: Source[Author] = + tweets + .filter(_.hashtags.contains("#akka")) + .map(_.author) + //#authors-filter-map + + trait Example3 { + //#authors-collect + val authors: Source[Author] = + tweets.collect { case t if t.hashtags.contains("#akka") ⇒ t.author } + //#authors-collect + } + + //#authors-foreachsink-println + authors.runWith(Sink.foreach(println)) + //#authors-foreachsink-println + + //#authors-foreach-println + authors.foreach(println) + //#authors-foreach-println + } + + "mapConcat hashtags" in { + //#hashtags-mapConcat + val hashtags: Source[Hashtag] = tweets.mapConcat(_.hashtags) + //#hashtags-mapConcat + } + + "simple broadcast" in { + trait X { + //#flow-graph-broadcast + val writeAuthors: Sink[Author] = ??? + val writeHashtags: Sink[Hashtag] = ??? + //#flow-graph-broadcast + } + + val writeAuthors: Sink[Author] = Sink.ignore + val writeHashtags: Sink[Hashtag] = Sink.ignore + + // format: OFF + //#flow-graph-broadcast + val g = FlowGraph { implicit builder ⇒ + import FlowGraphImplicits._ + + val b = Broadcast[Tweet] + tweets ~> b ~> Flow[Tweet].map(_.author) ~> writeAuthors + b ~> Flow[Tweet].mapConcat(_.hashtags) ~> writeHashtags + } + g.run() + //#flow-graph-broadcast + // format: ON + } + + "slowProcessing" in { + def slowComputation(t: Tweet): Long = { + Thread.sleep(500) // act as if performing some heavy computation + 42 + } + + //#tweets-slow-consumption-dropHead + tweets + .buffer(10, OverflowStrategy.dropHead) + .map(slowComputation) + .runWith(Sink.ignore) + //#tweets-slow-consumption-dropHead + } + + "backpressure by readline" in { + trait X { + import scala.concurrent.duration._ + + //#backpressure-by-readline + val completion: Future[Unit] = + Source(1 to 10) + .map(i ⇒ { println(s"map => $i"); i }) + .foreach { i ⇒ readLine(s"Element = $i; continue reading? [press enter]\n") } + + Await.ready(completion, 1.minute) + //#backpressure-by-readline + } + } + + "count elements on finite stream" in { + //#tweets-fold-count + val sumSink = Sink.fold[Int, Int](0)(_ + _) + + val counter: RunnableFlow = tweets.map(t ⇒ 1).to(sumSink) + val map: MaterializedMap = counter.run() + + val sum: Future[Int] = map.get(sumSink) + + sum.map { c ⇒ println(s"Total tweets processed: $c") } + //#tweets-fold-count + + new AnyRef { + //#tweets-fold-count-oneline + val sum: Future[Int] = tweets.map(t ⇒ 1).runWith(sumSink) + //#tweets-fold-count-oneline + } + } + + "materialize multiple times" in { + val tweetsInMinuteFromNow = tweets // not really in second, just acting as if + + //#tweets-runnable-flow-materialized-twice + val sumSink = Sink.fold[Int, Int](0)(_ + _) + val counterRunnableFlow: RunnableFlow = + tweetsInMinuteFromNow + .filter(_.hashtags contains "#akka") + .map(t ⇒ 1) + .to(sumSink) + + // materialize the stream once in the morning + val morningMaterialized = counterRunnableFlow.run() + // and once in the evening, reusing the + val eveningMaterialized = counterRunnableFlow.run() + + // the sumSink materialized two different futures + // we use it as key to get the materialized value out of the materialized map + val morningTweetsCount: Future[Int] = morningMaterialized.get(sumSink) + val eveningTweetsCount: Future[Int] = morningMaterialized.get(sumSink) + //#tweets-runnable-flow-materialized-twice + + val map: MaterializedMap = counterRunnableFlow.run() + + val sum: Future[Int] = map.get(sumSink) + + sum.map { c ⇒ println(s"Total tweets processed: $c") } + //#tweets-fold-count + + new AnyRef { + //#tweets-fold-count-oneline + val sum: Future[Int] = tweets.map(t ⇒ 1).runWith(sumSink) + //#tweets-fold-count-oneline + } + } + +} diff --git a/akka-docs-dev/rst/scala/stream.rst b/akka-docs-dev/rst/scala/stream.rst new file mode 100644 index 0000000000..7511cc0e71 --- /dev/null +++ b/akka-docs-dev/rst/scala/stream.rst @@ -0,0 +1,351 @@ +.. _stream-scala: + +####### +Streams +####### + +How to read these docs +====================== + +**TODO** + +Add section: "How to read these docs" (or something similar) +It should be roughly: + +* read the quickstart to get a feel +* (optional) read the design statement +* (optional) look at the cookbook probably in parallel while reading the main docs as supplementary material +* the other sections can be read sequentially, each digging deeper into advanced topics + +**TODO - write me** + +Motivation +========== + +**TODO - write me** + +Quick Start: Reactive Tweets +============================ + +A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some +other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them. + +We will also consider the problem inherent to all non-blocking streaming solutions – "*What if the subscriber is slower +to consume the live stream of data?*" i.e. it is unable to keep up with processing the live data. Traditionally the solution +is often to buffer the elements, but this can (and usually *will*) cause eventual buffer overflows and instability of such systems. +Instead Akka Streams depend on internal backpressure signals that allow to control what should happen in such scenarios. + +Here's the data model we'll be working with throughout the quickstart examples: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model + +Transforming and consuming simple streams +----------------------------------------- +In order to prepare our environment by creating an :class:`ActorSystem` and :class:`FlowMaterializer`, +which will be responsible for materializing and running the streams we're about to create: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#materializer-setup + +The :class:`FlowMaterializer` can optionally take :class:`MaterializerSettings` which can be used to define +materialization properties, such as default buffer sizes (see also :ref:`stream-buffering-explained-scala`), the dispatcher to +be used by the pipeline etc. These can be overridden on an element-by-element basis or for an entire section, but this +will be discussed in depth in :ref:`stream-section-configuration`. + +Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source[Out]`: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweet-source + +Streams always start flowing from a :class:`Source[Out]` then can continue through :class:`Flow[In,Out]` elements or +more advanced graph elements to finally be consumed by a :class:`Sink[In]`. Both Sources and Flows provide stream operations +that can be used to transform the flowing data, a :class:`Sink` however does not since its the "end of stream" and its +behavior depends on the type of :class:`Sink` used. + +In our case let's say we want to find all twitter handles of users which tweet about ``#akka``, the operations should look +familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map + +Finally in order to :ref:`materialize ` and run the stream computation we need to attach +the Flow to a :class:`Sink[T]` that will get the flow running. The simplest way to do this is to call +``runWith(sink)`` on a ``Source[Out]``. For convenience a number of common Sinks are predefined and collected as methods on +the :class:``Sink`` `companion object `_. +For now let's simply print each author: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println + +or by using the shorthand version (which are defined only for the most popular sinks such as :class:`FoldSink` and :class:`ForeachSink`): + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println + +Materializing and running a stream always requires a :class:`FlowMaterializer` to be in implicit scope (or passed in explicitly, +like this: ``.run(mat)``). + +Flattening sequences in streams +------------------------------- +In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes +we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like ``flatMap`` +works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the ``mapConcat`` +combinator: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#hashtags-mapConcat + +.. note:: + The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition. + It is problematic for two reasons: firstly, flattening by concatenation is often undesirable in bounded stream processing + due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for + our implementation of flatMap (due to the liveness issues). + + Please note that the mapConcat requires the supplied function to return a strict collection (``f:Out⇒immutable.Seq[T]``), + whereas ``flatMap`` would have to operate on streams all the way through. + + +Broadcasting a stream +--------------------- +Now let's say we want to persist all hashtags, as well as all author names from this one live stream. +For example we'd like to write all author handles into one file, and all hashtags into another file on disk. +This means we have to split the source stream into 2 streams which will handle the writing to these different files. + +Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams. +One of these that we'll be using in this example is called :class:`Broadcast`, and it simply emits elements from its +input port to all of its output ports. + +Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs) +in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups +at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation +graphs as Flows, Sinks or Sources, which will be explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`. +FlowGraphs are constructed like this: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#flow-graph-broadcast + +.. note:: + The ``~>`` (read as "edge", "via" or "to") operator is only available if ``FlowGraphImplicits._`` are imported. + Without this import you can still construct graphs using the ``builder.addEdge(from,[through,]to)`` method. + +As you can see, inside the :class:`FlowGraph` we use an implicit graph builder to mutably construct the graph +using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). Once we have the FlowGraph in the value ``g`` +*it is immutable, thread-safe, and freely shareable*. A graph can can be ``run()`` directly - assuming all +ports (sinks/sources) within a flow have been connected properly. It is possible to construct :class:`PartialFlowGraph` s +where this is not required but this will be covered in detail in :ref:`partial-flow-graph-scala`. + +As all Akka streams elements, :class:`Broadcast` will properly propagate back-pressure to its upstream element. + +Back-pressure in action +----------------------- + +One of the main advantages of Akka streams is that they *always* propagate back-pressure information from stream Sinks +(Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more +about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read +:ref:`back-pressure-explained-scala`. + +A typical problem applications (not using Akka streams) like this often face is that they are unable to process the incoming data fast enough, +either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting +in either ``OutOfMemoryError`` s or other severe degradations of service responsiveness. With Akka streams buffering can +and must be handled explicitly. For example, if we're only interested in the "*most recent tweets, with a buffer of 10 +elements*" this can be expressed using the ``buffer`` element: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-slow-consumption-dropHead + +The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react +when it receives another element element while it is full. Strategies provided include dropping the oldest element (``dropHead``), +dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best. + +Materialized values +------------------- +So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing +values or storing them in some external system. However sometimes we may be interested in some value that can be +obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed. +While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer +this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"), +but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements. + +First, let's write such an element counter using :class:`FoldSink` and then we'll see how it is possible to obtain materialized +values from a :class:`MaterializedMap` which is returned by materializing an Akka stream. We'll split execution into multiple +lines for the sake of explaining the concepts of ``Materializable`` elements and ``MaterializedType`` + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count + +First, we prepare the :class:`FoldSink` which will be used to sum all ``Int`` elements of the stream. +Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``, +finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the +processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can +be ``run()``, as indicated by its type: :class:`RunnableFlow`. Next we call ``run()`` which uses the implicit :class:`FlowMaterializer` +to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow`` or ``FlowGraph`` is ``MaterializedMap``, +which can be used to retrieve materialized values from the running stream. + +In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map +obtained from materializing a flow or graph. Since ``FoldSink`` implements ``Materializable`` and implements the ``MaterializedType`` +as ``Future[Int]`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream. +In case of the stream failing, this future would complete with a Failure. + +The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableFlow` may be reused +and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, +for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations +will be different, as illustrated by this example: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-runnable-flow-materialized-twice + +Many elements in Akka streams provide materialized values which can be used for obtaining either results of computation or +steering these elements which will be discussed in detail in :ref:`stream-materialization-scala`. Summing up this section, now we know +what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count-oneline + + +Core concepts +============= + +// TODO REWORD? This section explains the core types and concepts used in Akka Streams, from a more day-to-day use angle. +If we would like to get the big picture overview you may be interested in reading :ref:`stream-design`. + +Sources, Flows and Sinks +------------------------ + +// TODO: runnable flow, types - runWith + +// TODO: talk about how creating and sharing a ``Flow.of[String]`` is useful etc. + +.. _back-pressure-explained-scala: + +Back-pressure explained +----------------------- + +// TODO: explain the protocol and how it performs in slow-pub/fast-sub and fast-pub/slow-sub scenarios + +Backpressure when Fast Publisher and Slow Subscriber +---------------------------------------------------- + +// TODO: Write me + +Backpressure when Slow Publisher and Fast Subscriber +---------------------------------------------------- + +// TODO: Write me + +In depth +======== +// TODO: working with flows +// TODO: creating an empty flow +// TODO: materialization Flow -> RunnableFlow + +// TODO: flattening, prefer static fanin/out, deadlocks + +.. _stream-buffering-explained-scala: +Stream buffering explained +-------------------------- +**TODO - write me (feel free to move around as well)** + +Streams of Streams +------------------ +**TODO - write me (feel free to move around as well)** + +groupBy +^^^^^^^ +**TODO - write me (feel free to move around as well)** +// TODO: deserves its own section? and explain the dangers? (dangling sub-stream problem, subscription timeouts) + +// TODO: Talk about ``flatten`` and ``FlattenStrategy`` + + +.. _stream-materialization-scala: +Stream Materialization +---------------------- +**TODO - write me (feel free to move around as well)** + +MaterializedMap +^^^^^^^^^^^^^^^ +**TODO - write me (feel free to move around as well)** + +Working with rates +------------------ + +**TODO - write me (feel free to move around as well)** + +Optimizations +^^^^^^^^^^^^^ +// TODO: not really to be covered right now, right? + +Subscription timeouts +--------------------- +// TODO: esp in groupby etc, if you dont subscribe to a stream son enougu it may be dead once you get to it + + +Working with Graphs +=================== +// TODO: Don't forget adding the type parameter to the graph elements! + +.. _partial-flow-graph-scala: + +Constructing and combining Partial Flow Graphs +---------------------------------------------- +**TODO - write me (feel free to move around as well)** + +Constructing a Source or Sink from a Graph +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +**TODO - write me (feel free to move around as well)** + +Dealing with cycles, deadlocks +------------------------------ +// TODO: why to avoid cycles, how to enable if you really need to + +// TODO: problem cases, expand-conflate, expand-filter + +// TODO: working with rate + +// TODO: custom processing + +// TODO: stages and flexistuff + +Streaming IO +============ + +// TODO: TCP here I guess + +// TODO: Files if we get any, but not this week + +Custom elements +=============== +**TODO - write me (feel free to move around as well)** +// TODO: So far we've been mostly using predefined elements, but sometimes that's not enough + +Stage +----- +**TODO - write me (feel free to move around as well)** + +Flexi Merge +----------- +**TODO - write me (feel free to move around as well)** + +Flexi Route +----------- +**TODO - write me (feel free to move around as well)** + +Actor based custom elements +--------------------------- + +ActorPublisher +^^^^^^^^^^^^^^ + +ActorSubscriber +^^^^^^^^^^^^^^^ + + +// TODO: Implementing Reactive Streams interfaces directly vs. extending ActorPublisher / ActoSubscriber??? + +Integrating with Actors +======================= + +// TODO: Source.subscriber + +// TODO: Sink.publisher + +// TODO: Use the ImplicitFlowMaterializer if you have streams starting from inside actors. + +// TODO: how do I create my own sources / sinks? + +Integration with Reactive Streams enabled libraries +=================================================== + +// TODO: some info about reactive streams in general + +// TODO: Simplly runWith(Sink.publisher) and runWith(Source.subscriber) to get the corresponding reactive streams types. +