diff --git a/akka-docs-dev/rst/java/stream-rate.rst b/akka-docs-dev/rst/java/stream-rate.rst new file mode 100644 index 0000000000..914c2d4c7a --- /dev/null +++ b/akka-docs-dev/rst/java/stream-rate.rst @@ -0,0 +1,143 @@ +.. _stream-rate-java: + +############################# +Buffers and working with rate +############################# + +Akka Streams processing stages are asynchronous and pipelined by default which means that a stage, after handing out +an element to its downstream consumer is able to immediately process the next message. To demonstrate what we mean +by this, let's take a look at the following example: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#pipelining + +Running the above example, one of the possible outputs looks like this: + +:: + + A: 1 + A: 2 + B: 1 + A: 3 + B: 2 + C: 1 + B: 3 + C: 2 + C: 3 + +Note that the order is *not* ``A:1, B:1, C:1, A:2, B:2, C:2,`` which would correspond to a synchronous execution model +where an element completely flows through the processing pipeline before the next element enters the flow. The next +element is processed by a stage as soon as it emitted the previous one. + +While pipelining in general increases throughput, in practice there is a cost of passing an element through the +asynchronous (and therefore thread crossing) boundary which is significant. To amortize this cost Akka Streams uses +a *windowed*, *batching* backpressure strategy internally. It is windowed because as opposed to a `Stop-And-Wait`_ +protocol multiple elements might be "in-flight" concurrently with requests for elements. It is also batching because +a new element is not immediately requested once an element has been drained from the window-buffer but multiple elements +are requested after multiple elements has been drained. This batching strategy reduces the communication cost of +propagating the backpressure signal through the asynchronous boundary. + +While this internal protocol is mostly invisible to the user (apart form its throughput increasing effects) there are +situations when these details get exposed. In all of our previous examples we always assumed that the rate of the +processing chain is strictly coordinated through the backpressure signal causing all stages to process no faster than +the throughput of the connected chain. There are tools in Akka Streams however that enable the rates of different segments +of a processing chain to be "detached" or to define the maximum throughput of the stream through external timing sources. +These situations are exactly those where the internal batching buffering strategy suddenly becomes non-transparent. + +.. _Stop-And-Wait: https://en.wikipedia.org/wiki/Stop-and-wait_ARQ + +.. _stream-buffers-java: + +Buffers in Akka Streams +======================= + +Internal buffers and their effect +--------------------------------- + +As we have explained, for performance reasons Akka Streams introduces a buffer for every processing stage. The purpose +of these buffers is solely optimization, in fact the size of 1 would be the most natural choice if there would be no +need for throughput improvements. Therefore it is recommended to keep these buffer sizes small, and increase them only +to a level that throughput requirements of the application require. Default buffer sizes can be set through configuration: + +:: + + akka.stream.materializer.max-input-buffer-size = 16 + +Alternatively they can be set by passing a :class:`ActorFlowMaterializerSettings` to the materializer: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#materializer-buffer + +If buffer size needs to be set for segments of a Flow only, it is possible by defining a ``section()``: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#section-buffer + +Here is an example of a code that demonstrate some of the issues caused by internal buffers: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#buffering-abstraction-leak + +Running the above example one would expect the number *3* to be printed in every 3 seconds (the ``conflate`` step here +is configured so that it counts the number of elements received before the downstream ``ZipWith`` consumes them). What +is being printed is different though, we will see the number *1*. The reason for this is the internal buffer which is +by default 16 elements large, and prefetches elements before the ``ZipWith`` starts consuming them. It is possible +to fix this issue by changing the buffer size of ``ZipWith`` (or the whole graph) to 1. We will still see a leading +1 though which is caused by an initial prefetch of the ``ZipWith`` element. + +.. note:: + In general, when time or rate driven processing stages exhibit strange behavior, one of the first solutions to try + should be to decrease the input buffer of the affected elements to 1. + +Explicit user defined buffers +----------------------------- + +The previous section explained the internal buffers of Akka Streams used to reduce the cost of crossing elements through +the asynchronous boundary. These are internal buffers which will be very likely automatically tuned in future versions. +In this section we will discuss *explicit* user defined buffers that are part of the domain logic of the stream processing +pipeline of an application. + +The example below will ensure that 1000 jobs (but not more) are dequeued from an external (imaginary) system and +stored locally in memory - relieving the external system: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#explicit-buffers-backpressure + +The next example will also queue up 1000 jobs locally, but if there are more jobs waiting +in the imaginary external systems, it makes space for the new element by +dropping one element from the *tail* of the buffer. Dropping from the tail is a very common strategy but +it must be noted that this will drop the *youngest* waiting job. If some "fairness" is desired in the sense that +we want to be nice to jobs that has been waiting for long, then this option can be useful. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#explicit-buffers-droptail + +Here is another example with a queue of 1000 jobs, but it makes space for the new element by +dropping one element from the *head* of the buffer. This is the *oldest* +waiting job. This is the preferred strategy if jobs are expected to be +resent if not processed in a certain period. The oldest element will be +retransmitted soon, (in fact a retransmitted duplicate might be already in the queue!) +so it makes sense to drop it first. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#explicit-buffers-drophead + +Compared to the dropping strategies above, dropBuffer drops all the 1000 +jobs it has enqueued once the buffer gets full. This aggressive strategy +is useful when dropping jobs is preferred to delaying jobs. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#explicit-buffers-dropbuffer + +If our imaginary external job provider is a client using our API, we might +want to enforce that the client cannot have more than 1000 queued jobs +otherwise we consider it flooding and terminate the connection. This is +easily achievable by the error strategy which simply fails the stream +once the buffer gets full. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#explicit-buffers-fail + +Rate transformation +=================== + +Understanding conflate +---------------------- + +*TODO* + +Understanding expand +-------------------- + +*TODO* diff --git a/akka-docs-dev/rst/scala/stream-rate.rst b/akka-docs-dev/rst/scala/stream-rate.rst index 1c400d319d..02a9a4197e 100644 --- a/akka-docs-dev/rst/scala/stream-rate.rst +++ b/akka-docs-dev/rst/scala/stream-rate.rst @@ -117,7 +117,7 @@ so it makes sense to drop it first. Compared to the dropping strategies above, dropBuffer drops all the 1000 jobs it has enqueued once the buffer gets full. This aggressive strategy -is useful when dropped jobs are preferred to delayed jobs. +is useful when dropping jobs is preferred to delaying jobs. .. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-dropbuffer