Merge pull request #16596 from drewhk/wip-stream-doc-buffers-drewhk

+doc: Added section about buffering
This commit is contained in:
drewhk 2014-12-22 17:36:13 +01:00
commit 36e1cdd065
13 changed files with 1593 additions and 14 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

File diff suppressed because it is too large Load diff

After

Width:  |  Height:  |  Size: 53 KiB

View file

@ -0,0 +1,124 @@
package docs.stream
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{ RunnableFlow, Sink, Source, Flow }
import akka.stream.stage.PushPullStage
import akka.stream.testkit.AkkaSpec
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
class FlowStagesSpec extends AkkaSpec {
//#import-stage
import akka.stream.stage._
//#import-stage
implicit val mat = FlowMaterializer()
"stages demo" must {
"demonstrate various PushPullStages" in {
//#one-to-one
class Map[A, B](f: A => B) extends PushPullStage[A, B] {
override def onPush(elem: A, ctx: Context[B]): Directive =
ctx.push(f(elem))
override def onPull(ctx: Context[B]): Directive =
ctx.pull()
}
//#one-to-one
//#many-to-one
class Filter[A](p: A => Boolean) extends PushPullStage[A, A] {
override def onPush(elem: A, ctx: Context[A]): Directive =
if (p(elem)) ctx.push(elem)
else ctx.pull()
override def onPull(ctx: Context[A]): Directive =
ctx.pull()
}
//#many-to-one
//#one-to-many
class Duplicator[A]() extends PushPullStage[A, A] {
private var lastElem: A = _
private var oneLeft = false
override def onPush(elem: A, ctx: Context[A]): Directive = {
lastElem = elem
oneLeft = true
ctx.push(elem)
}
override def onPull(ctx: Context[A]): Directive =
if (!ctx.isFinishing) {
// the main pulling logic is below as it is demonstrated on the illustration
if (oneLeft) {
oneLeft = false
ctx.push(lastElem)
} else
ctx.pull()
} else {
// If we need to emit a final element after the upstream
// finished
if (oneLeft) ctx.pushAndFinish(lastElem)
else ctx.finish()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
//#one-to-many
val keyedSink = Sink.head[immutable.Seq[Int]]
val sink = Flow[Int].grouped(10).to(keyedSink)
//#stage-chain
val runnable: RunnableFlow = Source(1 to 10)
.transform(() => new Filter(_ % 2 == 0))
.transform(() => new Duplicator())
.transform(() => new Map(_ / 2))
.to(sink)
//#stage-chain
Await.result(runnable.run().get(keyedSink), 3.seconds) should be(Seq(1, 1, 2, 2, 3, 3, 4, 4, 5, 5))
}
"demonstrate various PushStages" in {
import akka.stream.stage._
//#pushstage
class Map[A, B](f: A => B) extends PushStage[A, B] {
override def onPush(elem: A, ctx: Context[B]): Directive =
ctx.push(f(elem))
}
class Filter[A](p: A => Boolean) extends PushStage[A, A] {
override def onPush(elem: A, ctx: Context[A]): Directive =
if (p(elem)) ctx.push(elem)
else ctx.pull()
}
//#pushstage
}
"demonstrate StatefulStage" in {
//#doubler-stateful
class Duplicator[A]() extends StatefulStage[A, A] {
override val initial: StageState[A, A] = new StageState[A, A] {
override def onPush(elem: A, ctx: Context[A]): Directive =
emit(List(elem, elem).iterator, ctx)
}
}
//#doubler-stateful
}
}
}

View file

@ -0,0 +1,88 @@
package docs.stream
import akka.stream.{ OverflowStrategy, MaterializerSettings, FlowMaterializer }
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
class StreamBuffersRateSpec extends AkkaSpec {
implicit val mat = FlowMaterializer()
"Demonstrate pipelining" in {
def println(s: Any) = ()
//#pipelining
Source(1 to 3)
.map { i => println(s"A: $i"); i }
.map { i => println(s"B: $i"); i }
.map { i => println(s"C: $i"); i }
.runWith(Sink.ignore)
//#pipelining
}
"Demonstrate buffer sizes" in {
//#materializer-buffer
val materializer = FlowMaterializer(
MaterializerSettings(system)
.withInputBuffer(
initialSize = 64,
maxSize = 64))
//#materializer-buffer
//#section-buffer
val flow =
Flow[Int]
.section(OperationAttributes.inputBuffer(initial = 1, max = 1)) { sectionFlow =>
// the buffer size of this map is 1
sectionFlow.map(_ * 2)
}
.map(_ / 2) // the buffer size of this map is the default
//#section-buffer
}
"buffering abstraction leak" in {
//#buffering-abstraction-leak
import scala.concurrent.duration._
case class Tick()
FlowGraph { implicit b =>
import FlowGraphImplicits._
val zipper = ZipWith[Tick, Int, Int]((tick, count) => count)
Source(initialDelay = 1.second, interval = 1.second, () => "message!")
.conflate(seed = (_) => 1)((count, _) => count + 1) ~> zipper.right
Source(initialDelay = 3.second, interval = 3.second, () => Tick()) ~> zipper.left
zipper.out ~> Sink.foreach(println)
}
//#buffering-abstraction-leak
}
"explcit buffers" in {
trait Job
def inboundJobsConnector(): Source[Job] = Source.empty()
//#explicit-buffers-backpressure
// Getting a stream of jobs from an imaginary external system as a Source
val jobs: Source[Job] = inboundJobsConnector()
jobs.buffer(1000, OverflowStrategy.backpressure)
//#explicit-buffers-backpressure
//#explicit-buffers-droptail
jobs.buffer(1000, OverflowStrategy.dropTail)
//#explicit-buffers-droptail
//#explicit-buffers-drophead
jobs.buffer(1000, OverflowStrategy.dropHead)
//#explicit-buffers-drophead
//#explicit-buffers-dropbuffer
jobs.buffer(1000, OverflowStrategy.dropBuffer)
//#explicit-buffers-dropbuffer
//#explicit-buffers-error
jobs.buffer(1000, OverflowStrategy.error)
//#explicit-buffers-error
}
}

View file

@ -4,24 +4,142 @@
Custom stream processing
########################
While the processing vocabulary of Akka Streams is quite rich (see the :ref:`stream-cookbook-scala` for examples) it
is sometimes necessary to define new transformation stages either because some functionality is missing from the
stock operations, or for performance reasons. In this part we show how to build custom processing stages and graph
junctions of various kinds.
Custom linear processing stages
===============================
Using PushStage
---------------
*TODO*
To extend the available transformations on a :class:`Flow` or :class:`Source` one can use the ``transform()`` method
which takes a factory function returning a :class:`Stage`. Stages come in different flavors swhich we will introduce in this
page.
Using PushPullStage
-------------------
*TODO*
The most elementary transformation stage is the :class:`PushPullStage` which can express a large class of algorithms
working on streams. A :class:`PushPullStage` can be illustrated as a box with two "input" and two "output ports" as it is
seen in the illustration below.
.. image:: ../images/stage_conceptual.png
:align: center
:width: 600
The "input ports" are implemented as event handlers ``onPush(elem,ctx)`` and ``onPull(ctx)`` while "output ports"
correspond to methods on the :class:`Context` object that is handed as a parameter to the event handlers. By calling
exactly one "output port" method we wire up these four ports in various ways which we demonstrate shortly.
.. warning::
There is one very important rule to remember when working with a ``Stage``. **Exactly one** method should be called
on the **currently passed** :class:`Context` **exactly once** and as the **last statement of the handler** where the return type
of the called method **matches the expected return type of the handler**. Any violation of this rule will
almost certainly result in unspecified behavior (in other words, it will break in spectacular ways). Exceptions
to this rule are the query methods ``isHolding()`` and ``isFinishing()``
To illustrate these concepts we create a small :class:`PushPullStage` that implements the ``map`` transformation.
.. image:: ../images/stage_map.png
:align: center
:width: 300
Map calls ``ctx.push()`` from the ``onPush()`` handler and it also calls ``ctx.pull()`` form the ``onPull``
handler resulting in the conceptual wiring above, and fully expressed in code below:
.. includecode:: code/docs/stream/FlowStagesSpec.scala#one-to-one
Map is a typical example of a one-to-one transformation of a stream. To demonstrate a many-to-one stage we will implement
filter. The conceptual wiring of ``Filter`` looks like this:
.. image:: ../images/stage_filter.png
:align: center
:width: 300
As we see above, if the given predicate matches the current element we are propagating it downwards, otherwise
we return the "ball" to our upstream so that we get the new element. This is achieved by modifying the map
example by adding a conditional in the ``onPush`` handler and decide between a ``ctx.pull()`` or ``ctx.push()`` call
(and of course not having a mapping ``f`` function).
.. includecode:: code/docs/stream/FlowStagesSpec.scala#many-to-one
To complete the picture we define a one-to-many transformation as the next step. We chose a straightforward example stage
that emits every upstream element twice downstream. The conceptual wiring of this stage looks like this:
.. image:: ../images/stage_doubler.png
:align: center
:width: 300
This is a stage that has state: the last element it has seen, and a flag ``oneLeft`` that indicates if we
have duplicated this last element already or not. Looking at the code below, the reader might notice that our ``onPull``
method is more complex than it is demonstrated by the figure above. The reason for this is completion handling, which we
will explain a little bit later. For now it is enough to look at the ``if(!ctx.isFinishing)`` block which
corresponds to the logic we expect by looking at the conceptual picture.
.. includecode:: code/docs/stream/FlowStagesSpec.scala#one-to-many
Finally, to demonstrate all of the stages above, we put them together into a processing chain, which conceptually
would correspond to the following structure:
.. image:: ../images/stage_chain.png
:align: center
:width: 650
In code this is only a few lines, using the ``transform`` method to inject our custom processing into a stream:
.. includecode:: code/docs/stream/FlowStagesSpec.scala#stage-chain
Completion handling
^^^^^^^^^^^^^^^^^^^
Completion handling usually (but not exclusively) comes into the picture when processing stages need to emit a few
more elements after their upstream source has been completed. We have seen an example of this in our ``Duplicator`` class
where the last element needs to be doubled even after the upstream neighbor stage has been completed. Since the
``onUpstreamFinish()`` handler expects a :class:`TerminationDirective` as the return type we are only allowed to call
``ctx.finish()``, ``ctx.fail()`` or ``ctx.absorbTermination()``. Since the first two of these available methods will
immediately terminate, our only option is ``absorbTermination()``. It is also clear from the return type of
``onUpstreamFinish`` that we cannot call ``ctx.push()`` but we need to emit elements somehow! The trick is that after
calling ``absorbTermination()`` the ``onPull()`` handler will be called eventually, and at the same time
``ctx.isFinishing`` will return true, indicating that ``ctx.pull()`` cannot be called anymore. Now we are free to
emit additional elementss and call ``ctx.finish()`` or ``ctx.pushAndFinish()`` eventually to finish processing.
.. note::
The reason for this slightly complex termination sequence is that the underlying ``onComplete`` signal of
Reactive Streams may arrive without any pending demand, i.e. without respecting backpressure. This means that
our push/pull structure that was illustrated in the figure of our custom processing chain does not
apply to termination. Unlike our neat model that is analogous to a ball that bounces back-and-forth in a
pipe (it bounces back on ``Filter``, ``Duplicator`` for example) cannot describe the termination signals. By calling
``absorbTermination()`` the execution environment checks if the conceptual token was *above* the current stage at
that time (which means that it will never come back, so the environment immediately calls ``onPull``) or it was
*below* (which means that it will come back eventually, so the environment does not need to call anything yet).
Using PushStage
---------------
Many one-to-one and many-to-one transformations do not need to override the ``onPull()`` handler at all since all
they do is just propagate the pull upwards. For such transformations it is better to extend PushStage directly. For
example our ``Map`` and ``Filter`` would look like this:
.. includecode:: code/docs/stream/FlowStagesSpec.scala#pushstage
The reason to use ``PushStage`` is not just cosmetic: internal optimizations rely on the fact that the onPull method
only calls ``ctx.pull()`` and allow the environment do process elements faster than without this knowledge. By
extending ``PushStage`` the environment can be sure that ``onPull()`` was not overridden since it is ``final`` on
``PushStage``.
Using StatefulStage
-------------------
*TODO*
On top of ``PushPullStage`` which is the most elementary and low-level abstraction and ``PushStage`` that is a
convenience class that also informs the environment about possible optimizations ``StatefulStage`` is a new tool that
builds on ``PushPullStage`` directly, adding various convenience methods on top of it. It is possible to explicitly
maintain state-machine like states using its ``become()`` method to encapsulates states explicitly. There is also
a handy ``emit()`` method that simplifies emitting multiple values given as an iterator. To demonstrate this feature
we reimplemented ``Duplicator`` in terms of a ``StatefulStage``:
.. includecode:: code/docs/stream/FlowStagesSpec.scala#doubler-stateful
Using DetachedStage
-------------------

View file

@ -16,4 +16,5 @@ Streams
stream-integrations
stream-io
stream-cookbook
../stream-configuration

View file

@ -4,18 +4,130 @@
Buffers and working with rate
#############################
Akka Streams processing stages are asynchronous and pipelined by default which means that a stage, after handing out
an element to its downstream consumer is able to immediately process the next message. To demonstrate what we mean
by this, let's take a look at the following example:
.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#pipelining
Running the above example, one of the possible outputs looks like this:
::
A: 1
A: 2
B: 1
A: 3
B: 2
C: 1
B: 3
C: 2
C: 3
Note that the order is *not* ``A:1, B:1, C:1, A:2, B:2, C:2,`` which would correspond to a synchronous execution model
where an element completely flows through the processing pipeline before the next element enters the flow. The next
element is processed by a stage as soon as it emitted the previous one.
While pipelining in general increases throughput, in practice there is a cost of passing an element through the
asynchronous (and therefore thread crossing) boundary which is significant. To amortize this cost Akka Streams uses
a *windowed*, *batching* backpressure strategy internally. It is windowed because as opposed to a `Stop-And-Wait`_
protocol multiple elements might be "in-flight" concurrently with requests for elements. It is also batching because
a new element is not immediately requested once an element has been drained from the window-buffer but multiple elements
are requested after multiple elements has been drained. This batching strategy reduces the communication cost of
propagating the backpressure signal through the asynchronous boundary.
While this internal protocol is mostly invisible to the user (apart form its throughput increasing effects) there are
situations when these details get exposed. In all of our previous examples we always assumed that the rate of the
processing chain is strictly coordinated through the backpressure signal causing all stages to process no faster than
the throughput of the connected chain. There are tools in Akka Streams however that enable the rates of different segments
of a processing chain to be "detached" or to define the maximum throughput of the stream through external timing sources.
These situations are exactly those where the internal batching buffering strategy suddenly becomes non-transparent.
.. _Stop-And-Wait: https://en.wikipedia.org/wiki/Stop-and-wait_ARQ
.. _stream-buffers-scala:
Buffers in Akka Streams
=======================
Internal buffers and their effect
---------------------------------
*TODO*
As we have explained, for performance reasons Akka Streams introduces a buffer for every processing stage. The purpose
of these buffers is solely optimization, in fact the size of 1 would be the most natural choice if there would be no
need for throughput improvements. Therefore it is recommended to keep these buffer sizes small, and increase them only
to a level that throughput requirements of the application require. Default buffer sizes can be set through configuration:
::
akka.stream.materializer.max-input-buffer-size = 16
Alternatively they can be set by passing a :class:`MaterializerSettings` to the materializer:
.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#materializer-buffer
If buffer size needs to be set for segments of a Flow only, it is possible by defining a ``section()``:
.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#section-buffer
Here is an example of a code that demonstrate some of the issues caused by internal buffers:
.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#buffering-abstraction-leak
Running the above example one would expect the number *3* to be printed in every 3 seconds (the ``conflate`` step here
is configured so that it counts the number of elements received before the downstream ``ZipWith`` consumes them). What
is being printed is different though, we will see the number *1*. The reason for this is the internal buffer which is
by default 16 elements large, and prefetches elements before the ``ZipWith`` starts consuming them. It is possible
to fix this issue by changing the buffer size of ``ZipWith`` (or the whole graph) to 1. We will still see a leading
1 though which is caused by an initial prefetch of the ``ZipWith`` element.
.. note::
In general, when time or rate driven processing stages exhibit strange behavior, one of the first solutions to try
should be to decrease the input buffer of the affected elements to 1.
Explicit user defined buffers
-----------------------------
*TODO*
The previous section explained the internal buffers of Akka Streams used to reduce the cost of crossing elements through
the asynchronous boundary. These are internal buffers which will be very likely automatically tuned in future versions.
In this section we will discuss *explicit* user defined buffers that are part of the domain logic of the stream processing
pipeline of an application.
The example below will ensure that 1000 jobs (but not more) are dequeued from an external (imaginary) system and
stored locally in memory - relieving the external system:
.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-backpressure
The next example will also queue up 1000 jobs locally, but if there are more jobs waiting
in the imaginary external systems, it makes space for the new element by
dropping one element from the *tail* of the buffer. Dropping from the tail is a very common strategy but
it must be noted that this will drop the *youngest* waiting job. If some "fairness" is desired in the sense that
we want to be nice to jobs that has been waiting for long, then this option can be useful.
.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-droptail
Here is another example with a queue of 1000 jobs, but it makes space for the new element by
dropping one element from the *head* of the buffer. This is the *oldest*
waiting job. This is the preferred strategy if jobs are expected to be
resent if not processed in a certain period. The oldest element will be
retransmitted soon, (in fact a retransmitted duplicate might be already in the queue!)
so it makes sense to drop it first.
.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-drophead
Compared to the dropping strategies above, dropBuffer drops all the 1000
jobs it has enqueued once the buffer gets full. This aggressive strategy
is useful when dropped jobs are preferred to delayed jobs.
.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-dropbuffer
If our imaginary external job provider is a client using our API, we might
want to enforce that the client cannot have more than 1000 queued jobs
otherwise we consider it flooding and terminate the connection. This is
easily achievable by the error strategy which simply fails the stream
once the buffer gets full.
.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-error
Rate transformation
===================

View file

@ -0,0 +1,7 @@
.. _stream-config:
#############
Configuration
#############
.. literalinclude:: ../../akka-stream/src/main/resources/reference.conf

View file

@ -18,15 +18,20 @@ akka {
# When this value is left empty, the default-dispatcher will be used.
dispatcher = ""
# Cleanup leaked publishers and subscribers when they are not used within a given deadline
# Cleanup leaked publishers and subscribers when they are not used within a given
# deadline
subscription-timeout {
# when the subscription timeout is reached one of the following strategies on the "stale" publisher:
# cancel - cancel it (via `onError` or subscribing to the publisher and `cancel()`ing the subscription right away
# warn - log a warning statement about the stale element (then drop the reference to it)
# when the subscription timeout is reached one of the following strategies on
# the "stale" publisher:
# cancel - cancel it (via `onError` or subscribing to the publisher and
# `cancel()`ing the subscription right away
# warn - log a warning statement about the stale element (then drop the
# reference to it)
# noop - do nothing (not recommended)
mode = cancel
# time after which a subscriber / publisher is considered stale and eligible for cancelation (see `akka.stream.subscription-timeout.mode`)
# time after which a subscriber / publisher is considered stale and eligible
# for cancelation (see `akka.stream.subscription-timeout.mode`)
timeout = 5s
}