diff --git a/akka-docs-dev/rst/java/stream-introduction.rst b/akka-docs-dev/rst/java/stream-introduction.rst index 6528ea8416..327b83eaa8 100644 --- a/akka-docs-dev/rst/java/stream-introduction.rst +++ b/akka-docs-dev/rst/java/stream-introduction.rst @@ -53,7 +53,7 @@ composition, therefore it may take some careful study of this subject until you feel familiar with the tools and techniques. The documentation is here to help and for best results we recommend the following approach: -* Read the :ref:`quickstart-java` to get a feel for how streams +* Read the :ref:`stream-quickstart-java` to get a feel for how streams look like and what they can do. * The top-down learners may want to peruse the :ref:`stream-design` at this point. diff --git a/akka-docs-dev/rst/java/stream-quickstart.rst b/akka-docs-dev/rst/java/stream-quickstart.rst index 900e8324e2..0b16788b12 100644 --- a/akka-docs-dev/rst/java/stream-quickstart.rst +++ b/akka-docs-dev/rst/java/stream-quickstart.rst @@ -1,4 +1,4 @@ -.. _quickstart-java: +.. _stream-quickstart-java: Quick Start Guide: Reactive Tweets ================================== @@ -15,45 +15,45 @@ allow to control what should happen in such scenarios. Here's the data model we'll be working with throughout the quickstart examples: -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#model Transforming and consuming simple streams ----------------------------------------- In order to prepare our environment by creating an :class:`ActorSystem` and :class:`FlowMaterializer`, which will be responsible for materializing and running the streams we are about to create: -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#materializer-setup +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#materializer-setup The :class:`FlowMaterializer` can optionally take :class:`MaterializerSettings` which can be used to define materialization properties, such as default buffer sizes (see also :ref:`stream-buffering-explained-scala`), the dispatcher to be used by the pipeline etc. These can be overridden on an element-by-element basis or for an entire section, but this will be discussed in depth in :ref:`stream-section-configuration`. -Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source[Out]`: +Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source`: -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweet-source +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#tweet-source -Streams always start flowing from a :class:`Source[Out]` then can continue through :class:`Flow[In,Out]` elements or -more advanced graph elements to finally be consumed by a :class:`Sink[In]`. Both Sources and Flows provide stream operations +Streams always start flowing from a :class:`Source` then can continue through :class:`Flow` elements or +more advanced graph elements to finally be consumed by a :class:`Sink`. Both Sources and Flows provide stream operations that can be used to transform the flowing data, a :class:`Sink` however does not since its the "end of stream" and its behavior depends on the type of :class:`Sink` used. In our case let's say we want to find all twitter handles of users which tweet about ``#akka``, the operations should look familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data: -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#authors-filter-map -Finally in order to :ref:`materialize ` and run the stream computation we need to attach -the Flow to a :class:`Sink[T]` that will get the flow running. The simplest way to do this is to call -``runWith(sink)`` on a ``Source[Out]``. For convenience a number of common Sinks are predefined and collected as methods on -the :class:``Sink`` `companion object `_. +Finally in order to :ref:`materialize ` and run the stream computation we need to attach +the Flow to a :class:`Sink` that will get the flow running. The simplest way to do this is to call +``runWith(sink)`` on a ``Source``. For convenience a number of common Sinks are predefined and collected as static methods on +the `Sink class `_. For now let's simply print each author: -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#authors-foreachsink-println or by using the shorthand version (which are defined only for the most popular sinks such as :class:`FoldSink` and :class:`ForeachSink`): -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#authors-foreach-println Materializing and running a stream always requires a :class:`FlowMaterializer` to be in implicit scope (or passed in explicitly, like this: ``.run(mat)``). @@ -65,7 +65,7 @@ we might want to map from one element to a number of elements and receive a "fla works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the ``mapConcat`` combinator: -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#hashtags-mapConcat +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#hashtags-mapConcat .. note:: The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition. @@ -73,7 +73,7 @@ combinator: due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for our implementation of flatMap (due to the liveness issues). - Please note that the mapConcat requires the supplied function to return a strict collection (``f:Out=>immutable.Seq[T]``), + Please note that the mapConcat requires the supplied function to return a strict collection (``Out f -> java.util.List``), whereas ``flatMap`` would have to operate on streams all the way through. @@ -90,20 +90,15 @@ input port to all of its output ports. Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs) in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation -graphs as Flows, Sinks or Sources, which will be explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`. +graphs as Flows, Sinks or Sources, which will be explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-java`. FlowGraphs are constructed like this: -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#flow-graph-broadcast +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#flow-graph-broadcast -.. note:: - The ``~>`` (read as "edge", "via" or "to") operator is only available if ``FlowGraphImplicits._`` are imported. - Without this import you can still construct graphs using the ``builder.addEdge(from,[through,]to)`` method. - -As you can see, inside the :class:`FlowGraph` we use an implicit graph builder to mutably construct the graph -using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). Once we have the FlowGraph in the value ``g`` -*it is immutable, thread-safe, and freely shareable*. A graph can can be ``run()`` directly - assuming all -ports (sinks/sources) within a flow have been connected properly. It is possible to construct :class:`PartialFlowGraph` s -where this is not required but this will be covered in detail in :ref:`partial-flow-graph-scala`. +As you can see, we use graph builder to mutably construct the graph using the ``addEdge`` method. Once we have the +FlowGraph in the value ``g`` *it is immutable, thread-safe, and freely shareable*. A graph can can be ``run()`` directly - +assuming all ports (sinks/sources) within a flow have been connected properly. It is possible to construct :class:`PartialFlowGraph` s +where this is not required but this will be covered in detail in :ref:`partial-flow-graph-java`. As all Akka Streams elements, :class:`Broadcast` will properly propagate back-pressure to its upstream element. @@ -113,7 +108,7 @@ Back-pressure in action One of the main advantages of Akka Streams is that they *always* propagate back-pressure information from stream Sinks (Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read -:ref:`back-pressure-explained-scala`. +:ref:`back-pressure-explained-java`. A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough, either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting @@ -121,7 +116,7 @@ in either ``OutOfMemoryError`` s or other severe degradations of service respons and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10 elements*" this can be expressed using the ``buffer`` element: -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-slow-consumption-dropHead +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-slow-consumption-dropHead The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react when it receives another element element while it is full. Strategies provided include dropping the oldest element (``dropHead``), @@ -140,9 +135,9 @@ First, let's write such an element counter using :class:`FoldSink` and then we'l values from a :class:`MaterializedMap` which is returned by materializing an Akka stream. We'll split execution into multiple lines for the sake of explaining the concepts of ``Materializable`` elements and ``MaterializedType`` -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-fold-count -First, we prepare the :class:`FoldSink` which will be used to sum all ``Int`` elements of the stream. +First, we prepare the :class:`FoldSink` which will be used to sum all ``Integer`` elements of the stream. Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``, finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can @@ -152,7 +147,7 @@ which can be used to retrieve materialized values from the running stream. In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map obtained from materializing a flow or graph. Since ``FoldSink`` implements ``Materializable`` and implements the ``MaterializedType`` -as ``Future[Int]`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream. +as ``Future`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream. In case of the stream failing, this future would complete with a Failure. The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableFlow` may be reused @@ -160,10 +155,10 @@ and materialized multiple times, because it is just the "blueprint" of the strea for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations will be different, as illustrated by this example: -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-runnable-flow-materialized-twice +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-runnable-flow-materialized-twice Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or -steering these elements which will be discussed in detail in :ref:`stream-materialization-scala`. Summing up this section, now we know +steering these elements which will be discussed in detail in :ref:`stream-materialization-java`. Summing up this section, now we know what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above: -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count-oneline +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-fold-count-oneline diff --git a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala index 0b7159e9fc..1e8f346c61 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -5,8 +5,6 @@ package docs.stream //#imports -import java.util.Date - import akka.actor.ActorSystem import akka.stream.FlowMaterializer import akka.stream.OverflowStrategy @@ -19,8 +17,8 @@ import akka.stream.scaladsl.RunnableFlow import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source -import concurrent.Await -import concurrent.Future +import scala.concurrent.Await +import scala.concurrent.Future //#imports @@ -29,7 +27,6 @@ import akka.stream.testkit.AkkaSpec object TwitterStreamQuickstartDocSpec { //#model final case class Author(handle: String) - val akka = Hashtag("#akka") final case class Hashtag(name: String) @@ -37,19 +34,21 @@ object TwitterStreamQuickstartDocSpec { def hashtags: Set[Hashtag] = body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet } + + val akka = Hashtag("#akka") //#model val tweets = Source( - Tweet(Author("rolandkuhn"), (new Date).getTime, "#akka rocks!") :: - Tweet(Author("patriknw"), (new Date).getTime, "#akka !") :: - Tweet(Author("bantonsson"), (new Date).getTime, "#akka !") :: - Tweet(Author("drewhk"), (new Date).getTime, "#akka !") :: - Tweet(Author("ktosopl"), (new Date).getTime, "#akka on the rocks!") :: - Tweet(Author("mmartynas"), (new Date).getTime, "wow #akka !") :: - Tweet(Author("akkateam"), (new Date).getTime, "#akka rocks!") :: - Tweet(Author("bananaman"), (new Date).getTime, "#bananas rock!") :: - Tweet(Author("appleman"), (new Date).getTime, "#apples rock!") :: - Tweet(Author("drama"), (new Date).getTime, "we compared #apples to #oranges!") :: + Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") :: + Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") :: + Tweet(Author("bantonsson"), System.currentTimeMillis, "#akka !") :: + Tweet(Author("drewhk"), System.currentTimeMillis, "#akka !") :: + Tweet(Author("ktosopl"), System.currentTimeMillis, "#akka on the rocks!") :: + Tweet(Author("mmartynas"), System.currentTimeMillis, "wow #akka !") :: + Tweet(Author("akkateam"), System.currentTimeMillis, "#akka rocks!") :: + Tweet(Author("bananaman"), System.currentTimeMillis, "#bananas rock!") :: + Tweet(Author("appleman"), System.currentTimeMillis, "#apples rock!") :: + Tweet(Author("drama"), System.currentTimeMillis, "we compared #apples to #oranges!") :: Nil) } @@ -203,7 +202,6 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { val sum: Future[Int] = map.get(sumSink) sum.map { c => println(s"Total tweets processed: $c") } - //#tweets-fold-count } } diff --git a/akka-docs-dev/rst/scala/stream-introduction.rst b/akka-docs-dev/rst/scala/stream-introduction.rst index c27e636809..c0e1730ad3 100644 --- a/akka-docs-dev/rst/scala/stream-introduction.rst +++ b/akka-docs-dev/rst/scala/stream-introduction.rst @@ -53,7 +53,7 @@ composition, therefore it may take some careful study of this subject until you feel familiar with the tools and techniques. The documentation is here to help and for best results we recommend the following approach: -* Read the :ref:`quickstart-scala` to get a feel for how streams +* Read the :ref:`stream-quickstart-scala` to get a feel for how streams look like and what they can do. * The top-down learners may want to peruse the :ref:`stream-design` at this point. diff --git a/akka-docs-dev/rst/scala/stream-quickstart.rst b/akka-docs-dev/rst/scala/stream-quickstart.rst index ffecf6d5a8..5a2bb97b3c 100644 --- a/akka-docs-dev/rst/scala/stream-quickstart.rst +++ b/akka-docs-dev/rst/scala/stream-quickstart.rst @@ -1,4 +1,4 @@ -.. _quickstart-scala: +.. _stream-quickstart-scala: Quick Start Guide: Reactive Tweets ==================================