=doc #20017 Update the stream-rate docs to current streams behavior

This commit is contained in:
Johan Andrén 2016-03-16 11:59:52 +01:00
parent efab2c9252
commit d72848f22f
8 changed files with 124 additions and 87 deletions

View file

@ -45,9 +45,9 @@ public class StreamBuffersRateDocTest extends AbstractJavaTest {
public void demonstratePipelining() {
//#pipelining
Source.from(Arrays.asList(1, 2, 3))
.map(i -> {System.out.println("A: " + i); return i;})
.map(i -> {System.out.println("B: " + i); return i;})
.map(i -> {System.out.println("C: " + i); return i;})
.map(i -> {System.out.println("A: " + i); return i;}).async()
.map(i -> {System.out.println("B: " + i); return i;}).async()
.map(i -> {System.out.println("C: " + i); return i;}).async()
.runWith(Sink.ignore(), mat);
//#pipelining
}
@ -64,12 +64,12 @@ public class StreamBuffersRateDocTest extends AbstractJavaTest {
//#section-buffer
final Flow<Integer, Integer, NotUsed> flow1 =
Flow.of(Integer.class)
.map(elem -> elem * 2) // the buffer size of this map is 1
.withAttributes(Attributes.inputBuffer(1, 1));
.map(elem -> elem * 2).async()
.withAttributes(Attributes.inputBuffer(1, 1)); // the buffer size of this map is 1
final Flow<Integer, Integer, NotUsed> flow2 =
flow1.via(
Flow.of(Integer.class)
.map(elem -> elem / 2)); // the buffer size of this map is the default
.map(elem -> elem / 2)).async(); // the buffer size of this map is the default
//#section-buffer
}
@ -87,8 +87,9 @@ public class StreamBuffersRateDocTest extends AbstractJavaTest {
first -> 1, (count, elem) -> count + 1);
RunnableGraph.fromGraph(GraphDSL.create(b -> {
// this is the asynchronous stage in this graph
final FanInShape2<String, Integer, Integer> zipper =
b.add(ZipWith.create((String tick, Integer count) -> count));
b.add(ZipWith.create((String tick, Integer count) -> count).async());
b.from(b.add(msgSource)).via(b.add(conflate)).toInlet(zipper.in1());
b.from(b.add(tickSource)).toInlet(zipper.in0());
b.from(zipper.out()).to(b.add(Sink.foreach(elem -> System.out.println(elem))));

View file

@ -285,9 +285,9 @@ Attributes
We have seen that we can use ``named()`` to introduce a nesting level in the fluid DSL (and also explicit nesting by using
``create()`` from :class:`GraphDSL`). Apart from having the effect of adding a nesting level, ``named()`` is actually
a shorthand for calling ``withAttributes(Attributes.name("someName"))``. Attributes provide a way to fine-tune certain
aspects of the materialized running entity. For example buffer sizes can be controlled via attributes (see
:ref:`stream-buffers-scala`). When it comes to hierarchic composition, attributes are inherited by nested modules,
unless they override them with a custom value.
aspects of the materialized running entity. For example buffer sizes for asynchronous stagescan be controlled via
attributes (see :ref:`async-stream-buffers-java`). When it comes to hierarchic composition, attributes are inherited
by nested modules, unless they override them with a custom value.
The code below, a modification of an earlier example sets the ``inputBuffer`` attribute on certain modules, but not
on others:

View file

@ -168,7 +168,7 @@ which will be responsible for materializing and running the streams we are about
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#materializer-setup
The :class:`ActorMaterializer` can optionally take :class:`ActorMaterializerSettings` which can be used to define
materialization properties, such as default buffer sizes (see also :ref:`stream-buffers-java`), the dispatcher to
materialization properties, such as default buffer sizes (see also :ref:`async-stream-buffers-java`), the dispatcher to
be used by the pipeline etc. These can be overridden with ``withAttributes`` on :class:`Flow`, :class:`Source`, :class:`Sink` and :class:`Graph`.
Let's assume we have a stream of tweets readily available. In Akka this is expressed as a :class:`Source<Out, M>`:

View file

@ -4,9 +4,20 @@
Buffers and working with rate
#############################
Akka Streams processing stages are asynchronous and pipelined by default which means that a stage, after handing out
an element to its downstream consumer is able to immediately process the next message. To demonstrate what we mean
by this, let's take a look at the following example:
When upstream and downstream rates differ, especially when the throughput has spikes, it can be useful to introduce
buffers in a stream. In this chapter we cover how buffers are used in Akka Streams.
.. _async-stream-buffers-java:
Buffers for asynchronous stages
===============================
In this section we will discuss internal buffers that are introduced as an optimization when using asynchronous stages.
To run a stage asynchronously it has to be marked explicitly as such using the ``.async()`` method. Being run
asynchronously means that a stage, after handing out an element to its downstream consumer is able to immediately
process the next message. To demonstrate what we mean by this, let's take a look at the following example:
.. includecode:: ../code/docs/stream/StreamBuffersRateDocTest.java#pipelining
@ -24,9 +35,9 @@ Running the above example, one of the possible outputs looks like this:
C: 2
C: 3
Note that the order is *not* ``A:1, B:1, C:1, A:2, B:2, C:2,`` which would correspond to a synchronous execution model
where an element completely flows through the processing pipeline before the next element enters the flow. The next
element is processed by a stage as soon as it is emitted the previous one.
Note that the order is *not* ``A:1, B:1, C:1, A:2, B:2, C:2,`` which would correspond to the normal fused synchronous
execution model of flows where an element completely passes through the processing pipeline before the next element
enters the flow. The next element is processed by an asynchronous stage as soon as it is emitted the previous one.
While pipelining in general increases throughput, in practice there is a cost of passing an element through the
asynchronous (and therefore thread crossing) boundary which is significant. To amortize this cost Akka Streams uses
@ -45,18 +56,14 @@ These situations are exactly those where the internal batching buffering strateg
.. _Stop-And-Wait: https://en.wikipedia.org/wiki/Stop-and-wait_ARQ
.. _stream-buffers-java:
Buffers in Akka Streams
=======================
Internal buffers and their effect
---------------------------------
As we have explained, for performance reasons Akka Streams introduces a buffer for every processing stage. The purpose
of these buffers is solely optimization, in fact the size of 1 would be the most natural choice if there would be no
need for throughput improvements. Therefore it is recommended to keep these buffer sizes small, and increase them only
to a level suitable for the throughput requirements of the application. Default buffer sizes can be set through configuration:
As we have explained, for performance reasons Akka Streams introduces a buffer for every asynchronous processing stage.
The purpose of these buffers is solely optimization, in fact the size of 1 would be the most natural choice if there
would be no need for throughput improvements. Therefore it is recommended to keep these buffer sizes small,
and increase them only to a level suitable for the throughput requirements of the application. Default buffer sizes
can be set through configuration:
::
@ -75,22 +82,21 @@ Here is an example of a code that demonstrate some of the issues caused by inter
.. includecode:: ../code/docs/stream/StreamBuffersRateDocTest.java#buffering-abstraction-leak
Running the above example one would expect the number *3* to be printed in every 3 seconds (the ``conflate`` step here
is configured so that it counts the number of elements received before the downstream ``ZipWith`` consumes them). What
is being printed is different though, we will see the number *1*. The reason for this is the internal buffer which is
by default 16 elements large, and prefetches elements before the ``ZipWith`` starts consuming them. It is possible
to fix this issue by changing the buffer size of ``ZipWith`` (or the whole graph) to 1. We will still see a leading
1 though which is caused by an initial prefetch of the ``ZipWith`` element.
Running the above example one would expect the number *3* to be printed in every 3 seconds (the ``conflateWithSeed``
step here is configured so that it counts the number of elements received before the downstream ``ZipWith`` consumes
them). What is being printed is different though, we will see the number *1*. The reason for this is the internal
buffer which is by default 16 elements large, and prefetches elements before the ``ZipWith`` starts consuming them.
It is possible to fix this issue by changing the buffer size of ``ZipWith`` (or the whole graph) to 1. We will still see
a leading 1 though which is caused by an initial prefetch of the ``ZipWith`` element.
.. note::
In general, when time or rate driven processing stages exhibit strange behavior, one of the first solutions to try
should be to decrease the input buffer of the affected elements to 1.
Explicit user defined buffers
-----------------------------
The previous section explained the internal buffers of Akka Streams used to reduce the cost of crossing elements through
the asynchronous boundary. These are internal buffers which will be very likely automatically tuned in future versions.
Buffers in Akka Streams
=======================
In this section we will discuss *explicit* user defined buffers that are part of the domain logic of the stream processing
pipeline of an application.
@ -141,29 +147,41 @@ Rate transformation
Understanding conflate
----------------------
When a fast producer can not be informed to slow down by backpressure or some other signal, ``conflate`` might be useful to combine elements from a producer until a demand signal comes from a consumer.
When a fast producer can not be informed to slow down by backpressure or some other signal, ``conflate`` might be
useful to combine elements from a producer until a demand signal comes from a consumer.
Below is an example snippet that summarizes fast stream of elements to a standart deviation, mean and count of elements that have arrived while the stats have been calculated.
Below is an example snippet that summarizes fast stream of elements to a standard deviation, mean and count of
elements that have arrived while the stats have been calculated.
.. includecode:: ../code/docs/stream/RateTransformationDocTest.java#conflate-summarize
This example demonstrates that such flow's rate is decoupled. The element rate at the start of the flow can be much higher that the element rate at the end of the flow.
This example demonstrates that such flow's rate is decoupled. The element rate at the start of the flow
can be much higher that the element rate at the end of the flow.
Another possible use of ``conflate`` is to not consider all elements for summary when producer starts getting too fast. Example below demonstrates how ``conflate`` can be used to implement random drop of elements when consumer is not able to keep up with the producer.
Another possible use of ``conflate`` is to not consider all elements for summary when producer starts getting too fast.
Example below demonstrates how ``conflate`` can be used to implement random drop of elements when consumer is not able
to keep up with the producer.
.. includecode:: ../code/docs/stream/RateTransformationDocTest.java#conflate-sample
Understanding expand
--------------------
Expand helps to deal with slow producers which are unable to keep up with the demand coming from consumers. Expand allows to extrapolate a value to be sent as an element to a consumer.
Expand helps to deal with slow producers which are unable to keep up with the demand coming from consumers. Expand
allows to extrapolate a value to be sent as an element to a consumer.
As a simple use of ``expand`` here is a flow that sends the same element to consumer when producer does not send any new elements.
As a simple use of ``expand`` here is a flow that sends the same element to consumer when producer does not send any
new elements.
.. includecode:: ../code/docs/stream/RateTransformationDocTest.java#expand-last
Expand also allows to keep some state between demand requests from the downstream. Leveraging this, here is a flow that tracks and reports a drift between fast consumer and slow producer.
Expand also allows to keep some state between demand requests from the downstream. Leveraging this, here is a flow
that tracks and reports a drift between fast consumer and slow producer.
.. includecode:: ../code/docs/stream/RateTransformationDocTest.java#expand-drift
Note that all of the elements coming from upstream will go through ``expand`` at least once. This means that the output of this flow is going to report a drift of zero if producer is fast enough, or a larger drift otherwise.
Note that all of the elements coming from upstream will go through ``expand`` at least once. This means that the
output of this flow is going to report a drift of zero if producer is fast enough, or a larger drift otherwise.

View file

@ -12,9 +12,9 @@ class StreamBuffersRateSpec extends AkkaSpec {
def println(s: Any) = ()
//#pipelining
Source(1 to 3)
.map { i => println(s"A: $i"); i }
.map { i => println(s"B: $i"); i }
.map { i => println(s"C: $i"); i }
.map { i => println(s"A: $i"); i }.async
.map { i => println(s"B: $i"); i }.async
.map { i => println(s"C: $i"); i }.async
.runWith(Sink.ignore)
//#pipelining
}
@ -29,9 +29,9 @@ class StreamBuffersRateSpec extends AkkaSpec {
//#materializer-buffer
//#section-buffer
val section = Flow[Int].map(_ * 2)
.withAttributes(Attributes.inputBuffer(initial = 1, max = 1))
val flow = section.via(Flow[Int].map(_ / 2)) // the buffer size of this map is the default
val section = Flow[Int].map(_ * 2).async
.withAttributes(Attributes.inputBuffer(initial = 1, max = 1)) // the buffer size of this map is 1
val flow = section.via(Flow[Int].map(_ / 2)).async // the buffer size of this map is the default
//#section-buffer
}
@ -43,7 +43,8 @@ class StreamBuffersRateSpec extends AkkaSpec {
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val zipper = b.add(ZipWith[Tick, Int, Int]((tick, count) => count))
// this is the asynchronous stage in this graph
val zipper = b.add(ZipWith[Tick, Int, Int]((tick, count) => count).async)
Source.tick(initialDelay = 3.second, interval = 3.second, Tick()) ~> zipper.in0

View file

@ -286,9 +286,9 @@ Attributes
We have seen that we can use ``named()`` to introduce a nesting level in the fluid DSL (and also explicit nesting by using
``create()`` from :class:`GraphDSL`). Apart from having the effect of adding a nesting level, ``named()`` is actually
a shorthand for calling ``withAttributes(Attributes.name("someName"))``. Attributes provide a way to fine-tune certain
aspects of the materialized running entity. For example buffer sizes can be controlled via attributes (see
:ref:`stream-buffers-scala`). When it comes to hierarchic composition, attributes are inherited by nested modules,
unless they override them with a custom value.
aspects of the materialized running entity. For example buffer sizes for asynchronous stages can be controlled via
attributes (see :ref:`async-stream-buffers-scala`). When it comes to hierarchic composition, attributes are inherited
by nested modules, unless they override them with a custom value.
The code below, a modification of an earlier example sets the ``inputBuffer`` attribute on certain modules, but not
on others:

View file

@ -167,7 +167,7 @@ which will be responsible for materializing and running the streams we are about
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#materializer-setup
The :class:`ActorMaterializer` can optionally take :class:`ActorMaterializerSettings` which can be used to define
materialization properties, such as default buffer sizes (see also :ref:`stream-buffers-scala`), the dispatcher to
materialization properties, such as default buffer sizes (see also :ref:`async-stream-buffers-scala`), the dispatcher to
be used by the pipeline etc. These can be overridden with ``withAttributes`` on :class:`Flow`, :class:`Source`, :class:`Sink` and :class:`Graph`.
Let's assume we have a stream of tweets readily available. In Akka this is expressed as a :class:`Source[Out, M]`:

View file

@ -4,9 +4,19 @@
Buffers and working with rate
#############################
Akka Streams processing stages are asynchronous and pipelined by default which means that a stage, after handing out
an element to its downstream consumer is able to immediately process the next message. To demonstrate what we mean
by this, let's take a look at the following example:
When upstream and downstream rates differ, especially when the throughput has spikes, it can be useful to introduce
buffers in a stream. In this chapter we cover how buffers are used in Akka Streams.
.. _async-stream-buffers-scala:
Buffers for asynchronous stages
===============================
In this section we will discuss internal buffers that are introduced as an optimization when using asynchronous stages.
To run a stage asynchronously it has to be marked explicitly as such using the ``.async`` method. Being run
asynchronously means that a stage, after handing out an element to its downstream consumer is able to immediately
process the next message. To demonstrate what we mean by this, let's take a look at the following example:
.. includecode:: ../code/docs/stream/StreamBuffersRateSpec.scala#pipelining
@ -24,9 +34,9 @@ Running the above example, one of the possible outputs looks like this:
C: 2
C: 3
Note that the order is *not* ``A:1, B:1, C:1, A:2, B:2, C:2,`` which would correspond to a synchronous execution model
where an element completely flows through the processing pipeline before the next element enters the flow. The next
element is processed by a stage as soon as it is emitted the previous one.
Note that the order is *not* ``A:1, B:1, C:1, A:2, B:2, C:2,`` which would correspond to the normal fused synchronous
execution model of flows where an element completely passes through the processing pipeline before the next element
enters the flow. The next element is processed by an asynchronous stage as soon as it is emitted the previous one.
While pipelining in general increases throughput, in practice there is a cost of passing an element through the
asynchronous (and therefore thread crossing) boundary which is significant. To amortize this cost Akka Streams uses
@ -45,18 +55,15 @@ These situations are exactly those where the internal batching buffering strateg
.. _Stop-And-Wait: https://en.wikipedia.org/wiki/Stop-and-wait_ARQ
.. _stream-buffers-scala:
Buffers in Akka Streams
=======================
Internal buffers and their effect
---------------------------------
As we have explained, for performance reasons Akka Streams introduces a buffer for every processing stage. The purpose
of these buffers is solely optimization, in fact the size of 1 would be the most natural choice if there would be no
need for throughput improvements. Therefore it is recommended to keep these buffer sizes small, and increase them only
to a level suitable for the throughput requirements of the application. Default buffer sizes can be set through configuration:
As we have explained, for performance reasons Akka Streams introduces a buffer for every asynchronous processing stage.
The purpose of these buffers is solely optimization, in fact the size of 1 would be the most natural choice if there
would be no need for throughput improvements. Therefore it is recommended to keep these buffer sizes small,
and increase them only to a level suitable for the throughput requirements of the application. Default buffer sizes
can be set through configuration:
::
@ -75,22 +82,21 @@ Here is an example of a code that demonstrate some of the issues caused by inter
.. includecode:: ../code/docs/stream/StreamBuffersRateSpec.scala#buffering-abstraction-leak
Running the above example one would expect the number *3* to be printed in every 3 seconds (the ``cUndefinedSourceonflate`` step here
is configured so that it counts the number of elements received before the downstream ``ZipWith`` consumes them). What
is being printed is different though, we will see the number *1*. The reason for this is the internal buffer which is
by default 16 elements large, and prefetches elements before the ``ZipWith`` starts consuming them. It is possible
to fix this issue by changing the buffer size of ``ZipWith`` (or the whole graph) to 1. We will still see a leading
1 though which is caused by an initial prefetch of the ``ZipWith`` element.
Running the above example one would expect the number *3* to be printed in every 3 seconds (the ``conflateWithSeed``
step here is configured so that it counts the number of elements received before the downstream ``ZipWith`` consumes
them). What is being printed is different though, we will see the number *1*. The reason for this is the internal
buffer which is by default 16 elements large, and prefetches elements before the ``ZipWith`` starts consuming them.
It is possible to fix this issue by changing the buffer size of ``ZipWith`` (or the whole graph) to 1. We will still see
a leading 1 though which is caused by an initial prefetch of the ``ZipWith`` element.
.. note::
In general, when time or rate driven processing stages exhibit strange behavior, one of the first solutions to try
should be to decrease the input buffer of the affected elements to 1.
Explicit user defined buffers
-----------------------------
The previous section explained the internal buffers of Akka Streams used to reduce the cost of crossing elements through
the asynchronous boundary. These are internal buffers which will be very likely automatically tuned in future versions.
Buffers in Akka Streams
=======================
In this section we will discuss *explicit* user defined buffers that are part of the domain logic of the stream processing
pipeline of an application.
@ -141,29 +147,40 @@ Rate transformation
Understanding conflate
----------------------
When a fast producer can not be informed to slow down by backpressure or some other signal, ``conflate`` might be useful to combine elements from a producer until a demand signal comes from a consumer.
When a fast producer can not be informed to slow down by backpressure or some other signal, ``conflate`` might be
useful to combine elements from a producer until a demand signal comes from a consumer.
Below is an example snippet that summarizes fast stream of elements to a standart deviation, mean and count of elements that have arrived while the stats have been calculated.
Below is an example snippet that summarizes fast stream of elements to a standart deviation, mean and count of elements
that have arrived while the stats have been calculated.
.. includecode:: ../code/docs/stream/RateTransformationDocSpec.scala#conflate-summarize
This example demonstrates that such flow's rate is decoupled. The element rate at the start of the flow can be much higher that the element rate at the end of the flow.
This example demonstrates that such flow's rate is decoupled. The element rate at the start of the flow can be much
higher that the element rate at the end of the flow.
Another possible use of ``conflate`` is to not consider all elements for summary when producer starts getting too fast. Example below demonstrates how ``conflate`` can be used to implement random drop of elements when consumer is not able to keep up with the producer.
Another possible use of ``conflate`` is to not consider all elements for summary when producer starts getting too fast.
Example below demonstrates how ``conflate`` can be used to implement random drop of elements when consumer is not able
to keep up with the producer.
.. includecode:: ../code/docs/stream/RateTransformationDocSpec.scala#conflate-sample
Understanding expand
--------------------
Expand helps to deal with slow producers which are unable to keep up with the demand coming from consumers. Expand allows to extrapolate a value to be sent as an element to a consumer.
Expand helps to deal with slow producers which are unable to keep up with the demand coming from consumers.
Expand allows to extrapolate a value to be sent as an element to a consumer.
As a simple use of ``expand`` here is a flow that sends the same element to consumer when producer does not send any new elements.
As a simple use of ``expand`` here is a flow that sends the same element to consumer when producer does not send
any new elements.
.. includecode:: ../code/docs/stream/RateTransformationDocSpec.scala#expand-last
Expand also allows to keep some state between demand requests from the downstream. Leveraging this, here is a flow that tracks and reports a drift between fast consumer and slow producer.
Expand also allows to keep some state between demand requests from the downstream. Leveraging this, here is a flow
that tracks and reports a drift between fast consumer and slow producer.
.. includecode:: ../code/docs/stream/RateTransformationDocSpec.scala#expand-drift
Note that all of the elements coming from upstream will go through ``expand`` at least once. This means that the output of this flow is going to report a drift of zero if producer is fast enough, or a larger drift otherwise.
Note that all of the elements coming from upstream will go through ``expand`` at least once. This means that the
output of this flow is going to report a drift of zero if producer is fast enough, or a larger drift otherwise.