merge content of stream-rate.md (#23218)
This commit is contained in:
parent
6536553e00
commit
8865691231
2 changed files with 75 additions and 189 deletions
|
|
@ -1,172 +0,0 @@
|
|||
# Buffers and working with rate
|
||||
|
||||
When upstream and downstream rates differ, especially when the throughput has spikes, it can be useful to introduce
|
||||
buffers in a stream. In this chapter we cover how buffers are used in Akka Streams.
|
||||
|
||||
<a id="async-stream-buffers"></a>
|
||||
## Buffers for asynchronous stages
|
||||
|
||||
In this section we will discuss internal buffers that are introduced as an optimization when using asynchronous stages.
|
||||
|
||||
To run a stage asynchronously it has to be marked explicitly as such using the `.async()` method. Being run
|
||||
asynchronously means that a stage, after handing out an element to its downstream consumer is able to immediately
|
||||
process the next message. To demonstrate what we mean by this, let's take a look at the following example:
|
||||
|
||||
@@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #pipelining }
|
||||
|
||||
Running the above example, one of the possible outputs looks like this:
|
||||
|
||||
```
|
||||
A: 1
|
||||
A: 2
|
||||
B: 1
|
||||
A: 3
|
||||
B: 2
|
||||
C: 1
|
||||
B: 3
|
||||
C: 2
|
||||
C: 3
|
||||
```
|
||||
|
||||
Note that the order is *not* `A:1, B:1, C:1, A:2, B:2, C:2,` which would correspond to the normal fused synchronous
|
||||
execution model of flows where an element completely passes through the processing pipeline before the next element
|
||||
enters the flow. The next element is processed by an asynchronous stage as soon as it is emitted the previous one.
|
||||
|
||||
While pipelining in general increases throughput, in practice there is a cost of passing an element through the
|
||||
asynchronous (and therefore thread crossing) boundary which is significant. To amortize this cost Akka Streams uses
|
||||
a *windowed*, *batching* backpressure strategy internally. It is windowed because as opposed to a [Stop-And-Wait](https://en.wikipedia.org/wiki/Stop-and-wait_ARQ)
|
||||
protocol multiple elements might be "in-flight" concurrently with requests for elements. It is also batching because
|
||||
a new element is not immediately requested once an element has been drained from the window-buffer but multiple elements
|
||||
are requested after multiple elements have been drained. This batching strategy reduces the communication cost of
|
||||
propagating the backpressure signal through the asynchronous boundary.
|
||||
|
||||
While this internal protocol is mostly invisible to the user (apart form its throughput increasing effects) there are
|
||||
situations when these details get exposed. In all of our previous examples we always assumed that the rate of the
|
||||
processing chain is strictly coordinated through the backpressure signal causing all stages to process no faster than
|
||||
the throughput of the connected chain. There are tools in Akka Streams however that enable the rates of different segments
|
||||
of a processing chain to be "detached" or to define the maximum throughput of the stream through external timing sources.
|
||||
These situations are exactly those where the internal batching buffering strategy suddenly becomes non-transparent.
|
||||
|
||||
### Internal buffers and their effect
|
||||
|
||||
As we have explained, for performance reasons Akka Streams introduces a buffer for every asynchronous processing stage.
|
||||
The purpose of these buffers is solely optimization, in fact the size of 1 would be the most natural choice if there
|
||||
would be no need for throughput improvements. Therefore it is recommended to keep these buffer sizes small,
|
||||
and increase them only to a level suitable for the throughput requirements of the application. Default buffer sizes
|
||||
can be set through configuration:
|
||||
|
||||
```
|
||||
akka.stream.materializer.max-input-buffer-size = 16
|
||||
```
|
||||
|
||||
Alternatively they can be set by passing a `ActorMaterializerSettings` to the materializer:
|
||||
|
||||
@@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #materializer-buffer }
|
||||
|
||||
If the buffer size needs to be set for segments of a `Flow` only, it is possible by defining a separate
|
||||
`Flow` with these attributes:
|
||||
|
||||
@@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #section-buffer }
|
||||
|
||||
Here is an example of a code that demonstrate some of the issues caused by internal buffers:
|
||||
|
||||
@@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #buffering-abstraction-leak }
|
||||
|
||||
Running the above example one would expect the number *3* to be printed in every 3 seconds (the `conflateWithSeed`
|
||||
step here is configured so that it counts the number of elements received before the downstream `ZipWith` consumes
|
||||
them). What is being printed is different though, we will see the number *1*. The reason for this is the internal
|
||||
buffer which is by default 16 elements large, and prefetches elements before the `ZipWith` starts consuming them.
|
||||
It is possible to fix this issue by changing the buffer size of `ZipWith` (or the whole graph) to 1. We will still see
|
||||
a leading 1 though which is caused by an initial prefetch of the `ZipWith` element.
|
||||
|
||||
@@@ note
|
||||
|
||||
In general, when time or rate driven processing stages exhibit strange behavior, one of the first solutions to try
|
||||
should be to decrease the input buffer of the affected elements to 1.
|
||||
|
||||
@@@
|
||||
|
||||
## Buffers in Akka Streams
|
||||
|
||||
In this section we will discuss *explicit* user defined buffers that are part of the domain logic of the stream processing
|
||||
pipeline of an application.
|
||||
|
||||
The example below will ensure that 1000 jobs (but not more) are dequeued from an external (imaginary) system and
|
||||
stored locally in memory - relieving the external system:
|
||||
|
||||
@@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #explicit-buffers-backpressure }
|
||||
|
||||
The next example will also queue up 1000 jobs locally, but if there are more jobs waiting
|
||||
in the imaginary external systems, it makes space for the new element by
|
||||
dropping one element from the *tail* of the buffer. Dropping from the tail is a very common strategy but
|
||||
it must be noted that this will drop the *youngest* waiting job. If some "fairness" is desired in the sense that
|
||||
we want to be nice to jobs that has been waiting for long, then this option can be useful.
|
||||
|
||||
@@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #explicit-buffers-droptail }
|
||||
|
||||
Instead of dropping the youngest element from the tail of the buffer a new element can be dropped without
|
||||
enqueueing it to the buffer at all.
|
||||
|
||||
@@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #explicit-buffers-dropnew }
|
||||
|
||||
Here is another example with a queue of 1000 jobs, but it makes space for the new element by
|
||||
dropping one element from the *head* of the buffer. This is the *oldest*
|
||||
waiting job. This is the preferred strategy if jobs are expected to be
|
||||
resent if not processed in a certain period. The oldest element will be
|
||||
retransmitted soon, (in fact a retransmitted duplicate might be already in the queue!)
|
||||
so it makes sense to drop it first.
|
||||
|
||||
@@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #explicit-buffers-drophead }
|
||||
|
||||
Compared to the dropping strategies above, dropBuffer drops all the 1000
|
||||
jobs it has enqueued once the buffer gets full. This aggressive strategy
|
||||
is useful when dropping jobs is preferred to delaying jobs.
|
||||
|
||||
@@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #explicit-buffers-dropbuffer }
|
||||
|
||||
If our imaginary external job provider is a client using our API, we might
|
||||
want to enforce that the client cannot have more than 1000 queued jobs
|
||||
otherwise we consider it flooding and terminate the connection. This is
|
||||
easily achievable by the error strategy which simply fails the stream
|
||||
once the buffer gets full.
|
||||
|
||||
@@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #explicit-buffers-fail }
|
||||
|
||||
## Rate transformation
|
||||
|
||||
### Understanding conflate
|
||||
|
||||
When a fast producer can not be informed to slow down by backpressure or some other signal, `conflate` might be
|
||||
useful to combine elements from a producer until a demand signal comes from a consumer.
|
||||
|
||||
Below is an example snippet that summarizes fast stream of elements to a standard deviation, mean and count of
|
||||
elements that have arrived while the stats have been calculated.
|
||||
|
||||
@@snip [RateTransformationDocTest.java]($code$/java/jdocs/stream/RateTransformationDocTest.java) { #conflate-summarize }
|
||||
|
||||
This example demonstrates that such flow's rate is decoupled. The element rate at the start of the flow
|
||||
can be much higher that the element rate at the end of the flow.
|
||||
|
||||
Another possible use of `conflate` is to not consider all elements for summary when producer starts getting too fast.
|
||||
Example below demonstrates how `conflate` can be used to implement random drop of elements when consumer is not able
|
||||
to keep up with the producer.
|
||||
|
||||
@@snip [RateTransformationDocTest.java]($code$/java/jdocs/stream/RateTransformationDocTest.java) { #conflate-sample }
|
||||
|
||||
### Understanding expand
|
||||
|
||||
Expand helps to deal with slow producers which are unable to keep up with the demand coming from consumers. Expand
|
||||
allows to extrapolate a value to be sent as an element to a consumer.
|
||||
|
||||
As a simple use of `expand` here is a flow that sends the same element to consumer when producer does not send any
|
||||
new elements.
|
||||
|
||||
@@snip [RateTransformationDocTest.java]($code$/java/jdocs/stream/RateTransformationDocTest.java) { #expand-last }
|
||||
|
||||
Expand also allows to keep some state between demand requests from the downstream. Leveraging this, here is a flow
|
||||
that tracks and reports a drift between fast consumer and slow producer.
|
||||
|
||||
@@snip [RateTransformationDocTest.java]($code$/java/jdocs/stream/RateTransformationDocTest.java) { #expand-drift }
|
||||
|
||||
Note that all of the elements coming from upstream will go through `expand` at least once. This means that the
|
||||
output of this flow is going to report a drift of zero if producer is fast enough, or a larger drift otherwise.
|
||||
1
akka-docs/src/main/paradox/java/stream/stream-rate.md
Symbolic link
1
akka-docs/src/main/paradox/java/stream/stream-rate.md
Symbolic link
|
|
@ -0,0 +1 @@
|
|||
../../scala/stream/stream-rate.md
|
||||
|
|
@ -8,11 +8,15 @@ buffers in a stream. In this chapter we cover how buffers are used in Akka Strea
|
|||
|
||||
In this section we will discuss internal buffers that are introduced as an optimization when using asynchronous stages.
|
||||
|
||||
To run a stage asynchronously it has to be marked explicitly as such using the `.async` method. Being run
|
||||
To run a stage asynchronously it has to be marked explicitly as such using the @scala[`.async`]@java[`.async()`] method. Being run
|
||||
asynchronously means that a stage, after handing out an element to its downstream consumer is able to immediately
|
||||
process the next message. To demonstrate what we mean by this, let's take a look at the following example:
|
||||
|
||||
@@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #pipelining }
|
||||
Scala
|
||||
: @@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #pipelining }
|
||||
|
||||
Java
|
||||
: @@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #pipelining }
|
||||
|
||||
Running the above example, one of the possible outputs looks like this:
|
||||
|
||||
|
|
@ -61,16 +65,28 @@ akka.stream.materializer.max-input-buffer-size = 16
|
|||
|
||||
Alternatively they can be set by passing a `ActorMaterializerSettings` to the materializer:
|
||||
|
||||
@@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #materializer-buffer }
|
||||
Scala
|
||||
: @@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #materializer-buffer }
|
||||
|
||||
Java
|
||||
: @@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #materializer-buffer }
|
||||
|
||||
If the buffer size needs to be set for segments of a `Flow` only, it is possible by defining a separate
|
||||
`Flow` with these attributes:
|
||||
|
||||
@@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #section-buffer }
|
||||
Scala
|
||||
: @@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #section-buffer }
|
||||
|
||||
Java
|
||||
: @@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #section-buffer }
|
||||
|
||||
Here is an example of a code that demonstrate some of the issues caused by internal buffers:
|
||||
|
||||
@@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #buffering-abstraction-leak }
|
||||
Scala
|
||||
: @@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #buffering-abstraction-leak }
|
||||
|
||||
Java
|
||||
: @@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #buffering-abstraction-leak }
|
||||
|
||||
Running the above example one would expect the number *3* to be printed in every 3 seconds (the `conflateWithSeed`
|
||||
step here is configured so that it counts the number of elements received before the downstream `ZipWith` consumes
|
||||
|
|
@ -94,7 +110,12 @@ pipeline of an application.
|
|||
The example below will ensure that 1000 jobs (but not more) are dequeued from an external (imaginary) system and
|
||||
stored locally in memory - relieving the external system:
|
||||
|
||||
@@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #explicit-buffers-backpressure }
|
||||
Scala
|
||||
: @@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #explicit-buffers-backpressure }
|
||||
|
||||
Java
|
||||
: @@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #explicit-buffers-backpressure }
|
||||
|
||||
|
||||
The next example will also queue up 1000 jobs locally, but if there are more jobs waiting
|
||||
in the imaginary external systems, it makes space for the new element by
|
||||
|
|
@ -102,12 +123,20 @@ dropping one element from the *tail* of the buffer. Dropping from the tail is a
|
|||
it must be noted that this will drop the *youngest* waiting job. If some "fairness" is desired in the sense that
|
||||
we want to be nice to jobs that has been waiting for long, then this option can be useful.
|
||||
|
||||
@@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #explicit-buffers-droptail }
|
||||
Scala
|
||||
: @@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #explicit-buffers-droptail }
|
||||
|
||||
Java
|
||||
: @@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #explicit-buffers-droptail }
|
||||
|
||||
Instead of dropping the youngest element from the tail of the buffer a new element can be dropped without
|
||||
enqueueing it to the buffer at all.
|
||||
|
||||
@@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #explicit-buffers-dropnew }
|
||||
Scala
|
||||
: @@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #explicit-buffers-dropnew }
|
||||
|
||||
Java
|
||||
: @@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #explicit-buffers-dropnew }
|
||||
|
||||
Here is another example with a queue of 1000 jobs, but it makes space for the new element by
|
||||
dropping one element from the *head* of the buffer. This is the *oldest*
|
||||
|
|
@ -116,13 +145,21 @@ resent if not processed in a certain period. The oldest element will be
|
|||
retransmitted soon, (in fact a retransmitted duplicate might be already in the queue!)
|
||||
so it makes sense to drop it first.
|
||||
|
||||
@@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #explicit-buffers-drophead }
|
||||
Scala
|
||||
: @@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #explicit-buffers-drophead }
|
||||
|
||||
Java
|
||||
: @@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #explicit-buffers-drophead }
|
||||
|
||||
Compared to the dropping strategies above, dropBuffer drops all the 1000
|
||||
jobs it has enqueued once the buffer gets full. This aggressive strategy
|
||||
is useful when dropping jobs is preferred to delaying jobs.
|
||||
|
||||
@@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #explicit-buffers-dropbuffer }
|
||||
Scala
|
||||
: @@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #explicit-buffers-dropbuffer }
|
||||
|
||||
Java
|
||||
: @@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #explicit-buffers-dropbuffer }
|
||||
|
||||
If our imaginary external job provider is a client using our API, we might
|
||||
want to enforce that the client cannot have more than 1000 queued jobs
|
||||
|
|
@ -130,7 +167,11 @@ otherwise we consider it flooding and terminate the connection. This is
|
|||
easily achievable by the error strategy which simply fails the stream
|
||||
once the buffer gets full.
|
||||
|
||||
@@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #explicit-buffers-fail }
|
||||
Scala
|
||||
: @@snip [StreamBuffersRateSpec.scala]($code$/scala/docs/stream/StreamBuffersRateSpec.scala) { #explicit-buffers-fail }
|
||||
|
||||
Java
|
||||
: @@snip [StreamBuffersRateDocTest.java]($code$/java/jdocs/stream/StreamBuffersRateDocTest.java) { #explicit-buffers-fail }
|
||||
|
||||
## Rate transformation
|
||||
|
||||
|
|
@ -139,10 +180,14 @@ once the buffer gets full.
|
|||
When a fast producer can not be informed to slow down by backpressure or some other signal, `conflate` might be
|
||||
useful to combine elements from a producer until a demand signal comes from a consumer.
|
||||
|
||||
Below is an example snippet that summarizes fast stream of elements to a standart deviation, mean and count of elements
|
||||
that have arrived while the stats have been calculated.
|
||||
Below is an example snippet that summarizes fast stream of elements to a standard deviation, mean and count of elements
|
||||
that have arrived while the stats have been calculated.
|
||||
|
||||
@@snip [RateTransformationDocSpec.scala]($code$/scala/docs/stream/RateTransformationDocSpec.scala) { #conflate-summarize }
|
||||
Scala
|
||||
: @@snip [RateTransformationDocSpec.scala]($code$/scala/docs/stream/RateTransformationDocSpec.scala) { #conflate-summarize }
|
||||
|
||||
Java
|
||||
: @@snip [RateTransformationDocTest.java]($code$/java/jdocs/stream/RateTransformationDocTest.java) { #conflate-summarize }
|
||||
|
||||
This example demonstrates that such flow's rate is decoupled. The element rate at the start of the flow can be much
|
||||
higher that the element rate at the end of the flow.
|
||||
|
|
@ -151,7 +196,11 @@ Another possible use of `conflate` is to not consider all elements for summary w
|
|||
Example below demonstrates how `conflate` can be used to implement random drop of elements when consumer is not able
|
||||
to keep up with the producer.
|
||||
|
||||
@@snip [RateTransformationDocSpec.scala]($code$/scala/docs/stream/RateTransformationDocSpec.scala) { #conflate-sample }
|
||||
Scala
|
||||
: @@snip [RateTransformationDocSpec.scala]($code$/scala/docs/stream/RateTransformationDocSpec.scala) { #conflate-sample }
|
||||
|
||||
Java
|
||||
: @@snip [RateTransformationDocTest.java]($code$/java/jdocs/stream/RateTransformationDocTest.java) { #conflate-sample }
|
||||
|
||||
### Understanding expand
|
||||
|
||||
|
|
@ -161,12 +210,20 @@ Expand allows to extrapolate a value to be sent as an element to a consumer.
|
|||
As a simple use of `expand` here is a flow that sends the same element to consumer when producer does not send
|
||||
any new elements.
|
||||
|
||||
@@snip [RateTransformationDocSpec.scala]($code$/scala/docs/stream/RateTransformationDocSpec.scala) { #expand-last }
|
||||
Scala
|
||||
: @@snip [RateTransformationDocSpec.scala]($code$/scala/docs/stream/RateTransformationDocSpec.scala) { #expand-last }
|
||||
|
||||
Java
|
||||
: @@snip [RateTransformationDocTest.java]($code$/java/jdocs/stream/RateTransformationDocTest.java) { #expand-last }
|
||||
|
||||
Expand also allows to keep some state between demand requests from the downstream. Leveraging this, here is a flow
|
||||
that tracks and reports a drift between fast consumer and slow producer.
|
||||
|
||||
@@snip [RateTransformationDocSpec.scala]($code$/scala/docs/stream/RateTransformationDocSpec.scala) { #expand-drift }
|
||||
Scala
|
||||
: @@snip [RateTransformationDocSpec.scala]($code$/scala/docs/stream/RateTransformationDocSpec.scala) { #expand-drift }
|
||||
|
||||
Java
|
||||
: @@snip [RateTransformationDocTest.java]($code$/java/jdocs/stream/RateTransformationDocTest.java) { #expand-drift }
|
||||
|
||||
Note that all of the elements coming from upstream will go through `expand` at least once. This means that the
|
||||
output of this flow is going to report a drift of zero if producer is fast enough, or a larger drift otherwise.
|
||||
Loading…
Add table
Add a link
Reference in a new issue