diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala index 88852c9da3..171f7f3bb5 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala @@ -7,7 +7,7 @@ import akka.actor.Cancellable import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec -import concurrent.Future +import scala.concurrent.{ Promise, Future } class FlowDocSpec extends AkkaSpec { @@ -54,7 +54,7 @@ class FlowDocSpec extends AkkaSpec { //#materialization-runWith } - "materializedMap is unique" in { + "materialization is unique" in { //#stream-reuse // connect the Source to the Sink, obtaining a RunnableFlow val sink = Sink.fold[Int, Int](0)(_ + _) @@ -108,10 +108,10 @@ class FlowDocSpec extends AkkaSpec { Source.empty // Sink that folds over the stream and returns a Future - // of the final result in the MaterializedMap + // of the final result as its materialized value Sink.fold[Int, Int](0)(_ + _) - // Sink that returns a Future in the MaterializedMap, + // Sink that returns a Future as its materialized value, // containing the first element of the stream Sink.head @@ -138,4 +138,79 @@ class FlowDocSpec extends AkkaSpec { //#flow-connecting } + + "various ways of transforming materialized values" in { + import scala.concurrent.duration._ + + val throttler = Flow(Source(1.second, 1.second, "test")) { implicit builder => + tickSource => + import FlowGraph.Implicits._ + val zip = builder.add(ZipWith[String, Int, Int](Keep.right)) + tickSource ~> zip.in0 + (zip.in1, zip.out) + } + + //#flow-mat-combine + // An empty source that can be shut down explicitly from the outside + val source: Source[Int, Promise[Unit]] = Source.lazyEmpty[Int]() + + // A flow that internally throttles elements to 1/second, and returns a Cancellable + // which can be used to shut down the stream + val flow: Flow[Int, Int, Cancellable] = throttler + + // A sink that returns the first element of a stream in the returned Future + val sink: Sink[Int, Future[Int]] = Sink.head[Int]() + + // By default, the materialized value of the leftmost stage is preserved + val r1: RunnableFlow[Promise[Unit]] = source.via(flow).to(sink) + + // Simple selection of materialized values by using Keep.right + val r2: RunnableFlow[Cancellable] = source.viaMat(flow)(Keep.right).to(sink) + val r3: RunnableFlow[Future[Int]] = source.via(flow).toMat(sink)(Keep.right) + + // Using runWith will always give the materialized values of the stages added + // by runWith() itself + val r4: Future[Int] = source.via(flow).runWith(sink) + val r5: Promise[Unit] = flow.to(sink).runWith(source) + val r6: (Promise[Unit], Future[Int]) = flow.runWith(source, sink) + + // Using more complext combinations + val r7: RunnableFlow[(Promise[Unit], Cancellable)] = + source.viaMat(flow)(Keep.both).to(sink) + + val r8: RunnableFlow[(Promise[Unit], Future[Int])] = + source.via(flow).toMat(sink)(Keep.both) + + val r9: RunnableFlow[((Promise[Unit], Cancellable), Future[Int])] = + source.viaMat(flow)(Keep.both).toMat(sink)(Keep.both) + + val r10: RunnableFlow[(Cancellable, Future[Int])] = + source.viaMat(flow)(Keep.right).toMat(sink)(Keep.both) + + // It is also possible to map over the materialized values. In r9 we had a + // doubly nested pair, but we want to flatten it out + val r11: RunnableFlow[(Promise[Unit], Cancellable, Future[Int])] = + r9.mapMaterialized { + case ((promise, cancellable), future) => + (promise, cancellable, future) + } + + // Now we can use pattern matching to get the resulting materialized values + val (promise, cancellable, future) = r11.run() + + // Type inference works as expected + promise.success(0) + cancellable.cancel() + future.map(_ + 3) + + // The result of r11 can be also achieved by using the Graph API + val r12: RunnableFlow[(Promise[Unit], Cancellable, Future[Int])] = + FlowGraph.closed(source, flow, sink)((_, _, _)) { implicit builder => + (src, f, dst) => + import FlowGraph.Implicits._ + src ~> f ~> dst + } + + //#flow-mat-combine + } } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala index 342a8274e0..0fd6fcedd3 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala @@ -3,16 +3,11 @@ */ package docs.stream -import akka.stream.ActorFlowMaterializer -import akka.stream.scaladsl.Broadcast -import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.FlowGraph -import akka.stream.scaladsl.Merge -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source -import akka.stream.scaladsl.Zip +import akka.stream._ +import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec +import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration._ @@ -117,4 +112,112 @@ class FlowGraphDocSpec extends AkkaSpec { Await.result(bottomFuture, 300.millis) shouldEqual 2 } + "building a reusable component" in { + + //#flow-graph-components-shape + // A shape represents the input and output ports of a reusable + // processing module + case class PriorityWorkerPoolShape[In, Out]( + jobsIn: Inlet[In], + priorityJobsIn: Inlet[In], + resultsOut: Outlet[Out]) extends Shape { + + // It is important to provide the list of all input and output + // ports with a stable order. Duplicates are not allowed. + override val inlets: immutable.Seq[Inlet[_]] = + jobsIn :: priorityJobsIn :: Nil + override val outlets: immutable.Seq[Outlet[_]] = + resultsOut :: Nil + + // A Shape must be able to create a copy of itself. Basically + // it means a new instance with copies of the ports + override def deepCopy() = PriorityWorkerPoolShape( + new Inlet[In](jobsIn.toString), + new Inlet[In](priorityJobsIn.toString), + new Outlet[Out](resultsOut.toString)) + + // A Shape must also be able to create itself from existing ports + override def copyFromPorts( + inlets: immutable.Seq[Inlet[_]], + outlets: immutable.Seq[Outlet[_]]) = { + assert(inlets.size == this.inlets.size) + assert(outlets.size == this.outlets.size) + // This is why order matters when overriding inlets and outlets + PriorityWorkerPoolShape(inlets(0), inlets(1), outlets(0)) + } + } + //#flow-graph-components-shape + + //#flow-graph-components-create + object PriorityWorkerPool { + def apply[In, Out]( + worker: Flow[In, Out, _], + workerCount: Int): Graph[PriorityWorkerPoolShape[In, Out], Unit] = { + + FlowGraph.partial() { implicit b ⇒ + import FlowGraph.Implicits._ + + val priorityMerge = b.add(MergePreferred[In](1)) + val balance = b.add(Balance[In](workerCount)) + val resultsMerge = b.add(Merge[Out](workerCount)) + + // After merging priority and ordinary jobs, we feed them to the balancer + priorityMerge ~> balance + + // Wire up each of the outputs of the balancer to a worker flow + // then merge them back + for (i <- 0 until workerCount) + balance.out(i) ~> worker ~> resultsMerge.in(i) + + // We now expose the input ports of the priorityMerge and the output + // of the resultsMerge as our PriorityWorkerPool ports + // -- all neatly wrapped in our domain specific Shape + PriorityWorkerPoolShape( + jobsIn = priorityMerge.in(0), + priorityJobsIn = priorityMerge.preferred, + resultsOut = resultsMerge.out) + } + + } + + } + //#flow-graph-components-create + + def println(s: Any): Unit = () + + //#flow-graph-components-use + val worker1 = Flow[String].map("step 1 " + _) + val worker2 = Flow[String].map("step 2 " + _) + + FlowGraph.closed() { implicit b => + import FlowGraph.Implicits._ + + val priorityPool1 = b.add(PriorityWorkerPool(worker1, 4)) + val priorityPool2 = b.add(PriorityWorkerPool(worker2, 2)) + + Source(1 to 100).map("job: " + _) ~> priorityPool1.jobsIn + Source(1 to 100).map("priority job: " + _) ~> priorityPool1.priorityJobsIn + + priorityPool1.resultsOut ~> priorityPool2.jobsIn + Source(1 to 100).map("one-step, priority " + _) ~> priorityPool2.priorityJobsIn + + priorityPool2.resultsOut ~> Sink.foreach(println) + }.run() + //#flow-graph-components-use + + //#flow-graph-components-shape2 + import FanInShape.Name + import FanInShape.Init + + case class PriorityWorkerPoolShape2[In, Out]( + _init: Init[Out] = Name("PriorityWorkerPool")) extends FanInShape2[In, In, Out](_init) { + + def jobsIn: Inlet[In] = in0 + def priorityJobsIn: Inlet[In] = in1 + def resultsOut: Outlet[Out] = out + } + //#flow-graph-components-shape2 + + } + } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala index 852da1bd50..0b25c21eec 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -62,7 +62,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { trait Example1 { //#materializer-setup implicit val system = ActorSystem("reactive-tweets") - implicit val mat = ActorFlowMaterializer() + implicit val materializer = ActorFlowMaterializer() //#materializer-setup } @@ -155,7 +155,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { "count elements on finite stream" in { //#tweets-fold-count - val sumSink = Sink.fold[Int, Int](0)(_ + _) + val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _) val counter: RunnableFlow[Future[Int]] = tweets.map(t => 1).toMat(sumSink)(Keep.right) diff --git a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst index fe1fb281a4..176a16dace 100644 --- a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst @@ -60,9 +60,12 @@ one actor prepare the work, and then have it be materialized at some completely .. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-in-steps -After running (materializing) the ``RunnableFlow`` we get a special container object, the ``MaterializedMap``. Both -sources and sinks are able to put specific objects into this map. Whether they put something in or not is implementation -dependent. For example a ``FoldSink`` will make a ``Future`` available in this map which will represent the result +After running (materializing) the ``RunnableFlow[T]`` we get back the materialized value of type T. Every stream processing +stage can produce a materialized value, and it is the responsibility of the user to combine them to a new type. +In the above example we used ``toMat`` to indicate that we want to transform the materialized value of the source and +sink, and we used the convenience function ``Keep.right`` to say that we are only interested in the materialized value +of the sink. +In our example the ``FoldSink`` materializes a value of type ``Future`` which will represent the result of the folding process over the stream. In general, a stream can expose multiple materialized values, but it is quite common to be interested in only the value of the Source or the Sink in the stream. For this reason there is a convenience method called ``runWith()`` available for ``Sink``, ``Source`` or ``Flow`` requiring, respectively, @@ -85,7 +88,8 @@ instead of modifying the existing instance, so while constructing long flows, re In the above example we used the ``runWith`` method, which both materializes the stream and returns the materialized value of the given sink or source. -Since a stream can be materialized multiple times, the ``MaterializedMap`` returned is different for each materialization. +Since a stream can be materialized multiple times, the materialized value will also be calculated anew for each such +materialization, usually leading to different values being returned each time. In the example below we create two running materialized instance of the stream that we described in the ``runnable`` variable, and both materializations give us a different ``Future`` from the map even though we used the same ``sink`` to refer to the future: @@ -196,6 +200,15 @@ which will be running on the thread pools they have been configured to run on - Reusing *instances* of linear computation stages (Source, Sink, Flow) inside FlowGraphs is legal, yet will materialize that stage multiple times. +Combining materialized values +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Since every processing stage in Akka Streams can provide a materialized value after being materialized, it is necessary +to somehow express how these values should be composed to a final value when we plug these stages together. For this, +many combinator methods have variants that take an additional argument, a function, that will be used to combine the +resulting values. Some examples of using these combiners are illustrated in the example below. + +.. includecode:: code/docs/stream/FlowDocSpec.scala#flow-mat-combine Stream ordering =============== diff --git a/akka-docs-dev/rst/scala/stream-graphs.rst b/akka-docs-dev/rst/scala/stream-graphs.rst index 13021feff6..9b756fcf9b 100644 --- a/akka-docs-dev/rst/scala/stream-graphs.rst +++ b/akka-docs-dev/rst/scala/stream-graphs.rst @@ -132,6 +132,51 @@ For defining a ``Flow[T]`` we need to expose both an undefined source and sink: .. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#flow-from-partial-flow-graph +Building reusable Graph components +---------------------------------- + +It is possible to build reusable, encapsulated components of arbitrary input and output ports using the graph DSL. + +As an example, we will build a graph junction that represents a pool of workers, where a worker is expressed +as a ``Flow[I,O,_]``, i.e. a simple transformation of jobs of type ``I`` to results of type ``O`` (as you have seen +already, this flow can actually contain a complex graph inside). Our reusable worker pool junction will +not preserve the order of the incoming jobs (they are assumed to have a proper ID field) and it will use a ``Balance`` +junction to schedule jobs to available workers. On top of this, our junction will feature a "fastlane", a dedicated port +where jobs of higher priority can be sent. + +Altogether, our junction will have two input ports of type ``I`` (for the normal and priority jobs) and an output port +of type ``O``. To represent this interface, we need to define a custom :class:`Shape`. The following lines show how to do that. + +.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-components-shape + +In general a custom :class:`Shape` needs to be able to provide all its input and output ports, be able to copy itself, and also be +able to create a new instance from given ports. There are some predefined shapes provided to avoid unnecessary +boilerplate + + * :class:`SourceShape`, :class:`SinkShape`, :class:`FlowShape` for simpler shapes, + * :class:`UniformFanInShape` and :class:`UniformFanOutShape` for junctions with multiple input (or output) ports + of the same type, + * :class:`FanInShape1`, :class:`FanInShape2`, ..., :class:`FanOutShape1`, :class:`FanOutShape2`, ... for junctions + with multiple input (or output) ports of different types. + +Since our shape has two input ports and one output port, we can just reuse the :class:`FanInShape2` class to define +our custom shape: + +.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-components-shape2 + +Now that we have a :class:`Shape` we can wire up a Graph that represents +our worker pool. First, we will merge incoming normal and priority jobs using ``MergePreferred``, then we will send the jobs +to a ``Balance`` junction which will fan-out to a configurable number of workers (flows), finally we merge all these +results together and send them out through our only output port. This is expressed by the following code: + +.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-components-create + +All we need to do now is to use our custom junction in a graph. The following code simulates some simple workers +and jobs using plain strings and prints out the results. Actually we used *two* instances of our worker pool junction +using ``add()`` twice. + +.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-components-use + .. _graph-cycles-scala: Graph cycles, liveness and deadlocks diff --git a/akka-docs-dev/rst/scala/stream-quickstart.rst b/akka-docs-dev/rst/scala/stream-quickstart.rst index e7dacd34b8..84228dd1c3 100644 --- a/akka-docs-dev/rst/scala/stream-quickstart.rst +++ b/akka-docs-dev/rst/scala/stream-quickstart.rst @@ -29,14 +29,15 @@ materialization properties, such as default buffer sizes (see also :ref:`stream- be used by the pipeline etc. These can be overridden on an element-by-element basis or for an entire section, but this will be discussed in depth in :ref:`stream-section-configuration`. -Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source[Out]`: +Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source[Out, M]`: .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweet-source -Streams always start flowing from a :class:`Source[Out]` then can continue through :class:`Flow[In,Out]` elements or -more advanced graph elements to finally be consumed by a :class:`Sink[In]`. Both Sources and Flows provide stream operations -that can be used to transform the flowing data, a :class:`Sink` however does not since its the "end of stream" and its -behavior depends on the type of :class:`Sink` used. +Streams always start flowing from a :class:`Source[Out,M1]` then can continue through :class:`Flow[In,Out,M2]` elements or +more advanced graph elements to finally be consumed by a :class:`Sink[In,M3]` (ignore the type parameters ``M1``, ``M2`` +and ``M3`` for now, they are not relevant to the types of the elements produced/consumed by these classes). Both Sources and +Flows provide stream operations that can be used to transform the flowing data, a :class:`Sink` however does not since +its the "end of stream" and its behavior depends on the type of :class:`Sink` used. In our case let's say we want to find all twitter handles of users which tweet about ``#akka``, the operations should look familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data: @@ -44,8 +45,8 @@ familiar to anyone who has used the Scala Collections library, however they oper .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map Finally in order to :ref:`materialize ` and run the stream computation we need to attach -the Flow to a :class:`Sink[T]` that will get the flow running. The simplest way to do this is to call -``runWith(sink)`` on a ``Source[Out]``. For convenience a number of common Sinks are predefined and collected as methods on +the Flow to a :class:`Sink` that will get the flow running. The simplest way to do this is to call +``runWith(sink)`` on a ``Source``. For convenience a number of common Sinks are predefined and collected as methods on the :class:``Sink`` `companion object `_. For now let's simply print each author: @@ -56,7 +57,7 @@ or by using the shorthand version (which are defined only for the most popular s .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println Materializing and running a stream always requires a :class:`FlowMaterializer` to be in implicit scope (or passed in explicitly, -like this: ``.run(mat)``). +like this: ``.run(materializer)``). Flattening sequences in streams ------------------------------- @@ -69,8 +70,8 @@ combinator: .. note:: The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition. - It is problematic for two reasons: firstly, flattening by concatenation is often undesirable in bounded stream processing - due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for + It is problematic for two reasons: first, flattening by concatenation is often undesirable in bounded stream processing + due to the risk of deadlock (with merge being the preferred strategy), and second, the monad laws would not hold for our implementation of flatMap (due to the liveness issues). Please note that the mapConcat requires the supplied function to return a strict collection (``f:Out=>immutable.Seq[T]``), @@ -96,13 +97,13 @@ FlowGraphs are constructed like this: .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#flow-graph-broadcast .. note:: - The ``~>`` (read as "edge", "via" or "to") operator is only available if ``FlowGraphImplicits._`` are imported. + The ``~>`` (read as "edge", "via" or "to") operator is only available if ``FlowGraph.Implicits._`` are imported. Without this import you can still construct graphs using the ``builder.addEdge(from,[through,]to)`` method. As you can see, inside the :class:`FlowGraph` we use an implicit graph builder to mutably construct the graph using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). Once we have the FlowGraph in the value ``g`` *it is immutable, thread-safe, and freely shareable*. A graph can can be ``run()`` directly - assuming all -ports (sinks/sources) within a flow have been connected properly. It is possible to construct :class:`PartialFlowGraph` s +ports (sinks/sources) within a flow have been connected properly. It is possible to construct partial graphs where this is not required but this will be covered in detail in :ref:`partial-flow-graph-scala`. As all Akka Streams elements, :class:`Broadcast` will properly propagate back-pressure to its upstream element. @@ -136,26 +137,27 @@ While this question is not as obvious to give an answer to in case of an infinit this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"), but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements. -First, let's write such an element counter using :class:`FoldSink` and then we'll see how it is possible to obtain materialized -values from a :class:`MaterializedMap` which is returned by materializing an Akka stream. We'll split execution into multiple -lines for the sake of explaining the concepts of ``Materializable`` elements and ``MaterializedType`` +First, let's write such an element counter using :class:`FoldSink` and see how the types look like: .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count First, we prepare the :class:`FoldSink` which will be used to sum all ``Int`` elements of the stream. Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``, -finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the -processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can -be ``run()``, as indicated by its type: :class:`RunnableFlow`. Next we call ``run()`` which uses the implicit :class:`ActorFlowMaterializer` -to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow`` or ``FlowGraph`` is ``MaterializedMap``, -which can be used to retrieve materialized values from the running stream. +finally we connect the flow using ``toMat`` the previously prepared Sink. Remember those mysterious type parameters on +:class:`Source` :class:`Flow` and :class:`Sink`? They represent the type of values these processing parts return when +materialized. When you chain these together, you can explicitly combine their materialized values: in our example we +used the ``Keep.right`` predefined function, which tells the implementation to only care about the materialized +type of the stage currently appended to the right. As you can notice, the materialized type of sumSink is ``Future[Int]`` +and because of using ``Keep.right``, the resulting :class:`RunnableFlow` has also a type parameter of ``Future[Int]``. -In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map -obtained from materializing a flow or graph. Since ``FoldSink`` implements ``Materializable`` and implements the ``MaterializedType`` -as ``Future[Int]`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream. +This step does *not* yet materialize the +processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can +be ``run()``, as indicated by its type: :class:`RunnableFlow[Future[Int]]`. Next we call ``run()`` which uses the implicit :class:`ActorFlowMaterializer` +to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow[T]`` is of type ``T``. +In our case this type is ``Future[Int]`` which, when completed, will contain the total length of our tweets stream. In case of the stream failing, this future would complete with a Failure. -The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableFlow` may be reused +A :class:`RunnableFlow` may be reused and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations will be different, as illustrated by this example: @@ -167,3 +169,8 @@ steering these elements which will be discussed in detail in :ref:`stream-materi what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above: .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count-oneline + +.. note:: + ``runWith()`` is a convenience method that automatically ignores the materialized value of any other stages except + those appended by the ``runWith()`` itself. In the above example it translates to using ``Keep.right`` as the combiner + for materialized values.