diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala index 8e4acd4764..793d9d0a77 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala @@ -57,4 +57,31 @@ class StreamBuffersRateSpec extends AkkaSpec { //#buffering-abstraction-leak } + "explcit buffers" in { + trait Job + def inboundJobsConnector(): Source[Job] = Source.empty() + //#explicit-buffers-backpressure + // Getting a stream of jobs from an imaginary external system as a Source + val jobs: Source[Job] = inboundJobsConnector() + jobs.buffer(1000, OverflowStrategy.backpressure) + //#explicit-buffers-backpressure + + //#explicit-buffers-droptail + jobs.buffer(1000, OverflowStrategy.dropTail) + //#explicit-buffers-droptail + + //#explicit-buffers-drophead + jobs.buffer(1000, OverflowStrategy.dropHead) + //#explicit-buffers-drophead + + //#explicit-buffers-dropbuffer + jobs.buffer(1000, OverflowStrategy.dropBuffer) + //#explicit-buffers-dropbuffer + + //#explicit-buffers-error + jobs.buffer(1000, OverflowStrategy.error) + //#explicit-buffers-error + + } + } diff --git a/akka-docs-dev/rst/scala/stream-rate.rst b/akka-docs-dev/rst/scala/stream-rate.rst index a81ff2b23b..5402dcb4d1 100644 --- a/akka-docs-dev/rst/scala/stream-rate.rst +++ b/akka-docs-dev/rst/scala/stream-rate.rst @@ -44,7 +44,6 @@ of a processing chain to be "detached" or to define the maximum throughput of th These situations are exactly those where the internal batching buffering strategy suddenly becomes non-transparent. .. _Stop-And-Wait: https://en.wikipedia.org/wiki/Stop-and-wait_ARQ -.. _Reactive Streams: http://reactive-streams.org/ .. _stream-buffers-scala: @@ -83,13 +82,52 @@ to fix this issue by changing the buffer size of ``ZipWith`` (or the whole graph 1 though which is caused by an initial prefetch of the ``ZipWith`` element. .. note:: - In general, when time or rate driven processing stages exhibit strange behavior, one of the first solution to try + In general, when time or rate driven processing stages exhibit strange behavior, one of the first solutions to try should be to decrease the input buffer of the affected elements to 1. Explicit user defined buffers ----------------------------- -*TODO* +The previous section explained the internal buffers of Akka Streams used to reduce the cost of crossing elements through +the asynchronous boundary. These are internal buffers which will be very likely automatically tuned in future versions. +In this section we will discuss *explicit* user defined buffers that are part of the domain logic of the stream processing +pipeline of an application. + +The example below will ensure that 1000 jobs (but not more) are dequeued from an external (imaginary) system and +stored locally in memory - relieving the external system: + +.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-backpressure + +The next example will also queue up 1000 jobs locally, but if there are more jobs waiting +in the imaginary external systems, it makes space for the new element by +dropping one element from the *tail* of the buffer. Dropping from the tail is a very common strategy but +it must be noted that this will drop the *youngest* waiting job. If some "fairness" is desired in the sense that +we want to be nice to jobs that has been waiting for long, then this option can be useful. + +.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-droptail + +Here is another example with a queue of 1000 jobs, but it makes space for the new element by +dropping one element from the *head* of the buffer. This is the *oldest* +waiting job. This is the preferred strategy if jobs are expected to be +resent if not processed in a certain period. The oldest element will be +retransmitted soon, (in fact a retransmitted duplicate might be already in the queue!) +so it makes sense to drop it first. + +.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-drophead + +Compared to the dropping strategies above, dropBuffer drops all the 1000 +jobs it has enqueued once the buffer gets full. This aggressive strategy +is useful when dropped jobs are preferred to delayed jobs. + +.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-dropbuffer + +If our imaginary external job provider is a client using our API, we might +want to enforce that the client cannot have more than 1000 queued jobs +otherwise we consider it flooding and terminate the connection. This is +easily achievable by the error strategy which simply fails the stream +once the buffer gets full. + +.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#explicit-buffers-error Rate transformation ===================