diff --git a/akka-docs/rst/general/stream/stages-overview.rst b/akka-docs/rst/general/stream/stages-overview.rst index 4f5695f98f..e68a4ef6c8 100644 --- a/akka-docs/rst/general/stream/stages-overview.rst +++ b/akka-docs/rst/general/stream/stages-overview.rst @@ -12,51 +12,58 @@ These built-in sources are available from `akka.stream.scaladsl.Source` and `akk fromIterator ^^^^^^^^^^^^ -Streams the values from an iterator, requesting the next value when there is demand. If the iterator -performs blocking operations, make sure to run it on a separate dispatcher. +Stream the values from an iterator, requesting the next value when there is demand. + +If the iterator perform blocking operations, make sure to run it on a separate dispatcher. *emits* when the next value returned from the iterator + *completes* when the iterator reaches it's end single ^^^^^^ -Streams a single object +Stream a single object *emits* the value once + *completes* when the single value has been emitted repeat ^^^^^^ -Streams a single object repeatedly +Stream a single object repeatedly *emits* the same value repeatedly when there is demand + *completes* never tick ^^^^ -A periodical repeated stream of an arbitrary object. Delay of first tick is specified +A periodical repetition of an arbitrary object. Delay of first tick is specified separately from interval of the following ticks. *emits* periodically, if there is downstream backpressure ticks are skipped + *completes* never fromFuture ^^^^^^^^^^ -The value of the future is sent when the future completes and there is demand. +Send the single value of the ``Future`` when it completes and there is demand. If the future fails the stream is failed with that exception. *emits* the future completes + *completes* after the future has completed unfold ^^^^^^ -Streams the result of a function as long as it returns a ``Some`` or non-empty ``Optional``, the value inside the optional +Stream the result of a function as long as it returns a ``Some`` or non-empty ``Optional``, the value inside the optional consists of a tuple (or ``Pair``) where the first value is a state passed back into the next call to the function allowing to pass a state. The first invocation of the provided fold function will receive the ``zero`` state. Can be used to implement many stateful sources without having to touch the more low level ``GraphStage`` API. *emits* when there is demand and the unfold function over the previous state returns non empty value + *completes* when the unfold function returns an empty value unfoldAsync @@ -67,62 +74,70 @@ complete or emit when it completes. Can be used to implement many stateful sources without having to touch the more low level ``GraphStage`` API. *emits* when there is demand and unfold state returned future completes with some value + *completes* when the future returned by the unfold function completes with an empty value empty ^^^^^ -A source that completes right away without ever emitting any elements. Useful when you have to provide a source to +Complete right away without ever emitting any elements. Useful when you have to provide a source to an API but there are no elements to emit. *emits* never + *completes* directly maybe ^^^^^ -A source that will either emit one value if the ``Option`` or ``Optional`` contains a value, or complete directly +Either emit one value if the ``Option`` or ``Optional`` contains a value, or complete directly if the optional value is empty. *emits* when the returned promise is completed with some value + *completes* after emitting some value, or directly if the promise is completed with no value failed ^^^^^^ -A source that fails with a user specified exception +Fail directly with a user specified exception. *emits* never + *completes* fails the stream directly with the given exception actorPublisher ^^^^^^^^^^^^^^ -Wraps an actor extending ``ActorPublisher`` as a source +Wrap an actor extending ``ActorPublisher`` as a source. *emits* depends on the actor implementation -*completes* when the actor stops (TODO double check) + +*completes* when the actor stops actorRef ^^^^^^^^ -Materializes into an ``ActorRef``, sending messages to the actor will emit them on the stream. The actor contains +Materialize an ``ActorRef``, sending messages to it will emit them on the stream. The actor contain a buffer but since communication is one way, there is no back pressure. Handling overflow is done by either dropping elements or failing the stream, the strategy is chosen by the user. *emits* when there is demand and there are messages in the buffer or a message is sent to the actorref -*completes* When the actorref is sent ``akka.actor.Status.Success`` or ``PoisonPill`` + +*completes* when the actorref is sent ``akka.actor.Status.Success`` or ``PoisonPill`` combine ^^^^^^^ -Combines several sources, using a given strategy such as merge or concat, into one source. +Combine several sources, using a given strategy such as merge or concat, into one source. *emits* when there is demand, but depending on the strategy -*completes* + +*completes* when all sources has completed queue ^^^^^ -Materializes into a ``SourceQueue`` onto which elements can be pushed for emitting from the source. The queue contains +Materialize a ``SourceQueue`` onto which elements can be pushed for emitting from the source. The queue contains a buffer, if elements are pushed onto the queue faster than the source is consumed the overflow will be handled with a strategy specified by the user. Functionality for tracking when an element has been emitted is available through ``SourceQueue.offer``. *emits* when there is demand and the queue contains elements + *completes* when downstream completes asSubscriber @@ -137,7 +152,6 @@ Integration with Reactive Streams, subscribes to a ``org.reactivestreams.Publish - Sink stages ----------- These built-in sinks are available from ``akka.stream.scaladsl.Sink`` and ``akka.stream.javadsl.Sink``: @@ -149,6 +163,7 @@ Materializes into a ``Future`` or ``CompletionStage`` which completes with the f after this the stream is canceled. If no element is emitted, the future is be failed. *cancels* after receiving one element + *backpressures* never headOption @@ -157,51 +172,54 @@ Materializes into a ``Future[Option[T]]`` or ``CompletionStage>`` wh arriving wrapped in the optional, or an empty optional if the stream completes without any elements emitted. *cancels* after receiving one element + *backpressures* never last ^^^^ -Materializes into a ``Future`` or ``CompletionStage`` which will complete with the last value emitted when the stream +Materializes into a ``Future`` which will complete with the last value emitted when the stream completes. If the stream completes with no elements the future is failed. *cancels* never + *backpressures* never lastOption ^^^^^^^^^^ -Materializes into a ``Future[Option[T]]`` or ``CompletionStage>`` which completes with the last value +Materialize a ``Future[Option[T]]`` which completes with the last value emitted wrapped in an optional when the stream completes. if the stream completes with no elements the future is completed with an empty optional. *cancels* never + *backpressures* never ignore ^^^^^^ -Keeps consuming elements but discards them +Consume all elements but discards them. Useful when a stream has to be consumed but there is no use to actually +do anything with the elements. *cancels* never + *backpressures* never cancelled ^^^^^^^^^ -Immediately cancels the stream +Immediately cancel the stream *cancels* immediately -seq_ -^^^^ -TODO three letter header not allowed - -Collects values emitted from the stream into a collection, the collection is available through a ``Future`` or -``CompletionStage`` which completes when the stream completes. Note that the collection is bounded to ``Int.MaxValue``, +seq +^^^ +Collect values emitted from the stream into a collection, the collection is available through a ``Future`` or +which completes when the stream completes. Note that the collection is bounded to ``Int.MaxValue``, if more element are emitted the sink will cancel the stream *cancels* If too many values are collected foreach ^^^^^^^ -Invokes a given procedure for each element received. Note that it is not safe to mutate shared state from the procedure. +Invoke a given procedure for each element received. Note that it is not safe to mutate shared state from the procedure. The sink materializes into a ``Future[Option[Done]]`` or ``CompletionStage>`` which completes when the stream completes, or fails if the stream fails. @@ -209,6 +227,7 @@ stream completes, or fails if the stream fails. Note that it is not safe to mutate state from the procedure. *cancels* never + *backpressures* when the previous procedure invocation has not yet completed @@ -217,14 +236,16 @@ foreachParallel Like ``foreach`` but allows up to ``parallellism`` procedure calls to happen in parallel. *cancels* never + *backpressures* when the previous parallel procedure invocations has not yet completed onComplete ^^^^^^^^^^ -A sink that calls a callback when the stream has completed or failed. +Invoke a callback when the stream has completed or failed. *cancels* never + *backpressures* never @@ -239,51 +260,58 @@ This stage allows combining values into a result without a global mutable state between invocations. *cancels* never + *backpressures* when the previous fold function invocation has not yet completed reduce ^^^^^^ -Applies a reduction function on the incoming elements and passes the result to the next invocation. The first invocation +Apply a reduction function on the incoming elements and pass the result to the next invocation. The first invocation receives the two first elements of the flow. Materializes into a future that will be completed by the last result of the reduction function. *cancels* never + *backpressures* when the previous reduction function invocation has not yet completed combine ^^^^^^^ -Combines several sinks into one using a user specified strategy +Combine several sinks into one using a user specified strategy *cancels* depends on the strategy + *backpressures* depends on the strategy actorRef ^^^^^^^^ -Sends the elements from the stream to an ``ActorRef``. No backpressure so care must be taken to not overflow the inbox. +Send the elements from the stream to an ``ActorRef``. No backpressure so care must be taken to not overflow the inbox. *cancels* when the actor terminates + *backpressures* never actorRefWithAck ^^^^^^^^^^^^^^^ -Sends the elements from the stream to an ``ActorRef`` which must then acknowledge reception after completing a message, +Send the elements from the stream to an ``ActorRef`` which must then acknowledge reception after completing a message, to provide back pressure onto the sink. *cancels* when the actor terminates + *backpressures* when the actor acknowledgement has not arrived actorSubscriber ^^^^^^^^^^^^^^^ -Creates an actor from a ``Props`` upon materialization, where the actor implements ``ActorSubscriber``. +Create an actor from a ``Props`` upon materialization, where the actor implements ``ActorSubscriber``, which will +receive the elements from the stream. Materializes into an ``ActorRef`` to the created actor. *cancels* when the actor terminates + *backpressures* depends on the actor implementation @@ -307,7 +335,7 @@ dispatcher configured through the ``akka.stream.blocking-io-dispatcher``. fromOutputStream ^^^^^^^^^^^^^^^^ -Creates a sink that wraps an ``OutputStream``. Takes a function that produces an ``OutputStream``, when the sink is +Create a sink that wraps an ``OutputStream``. Takes a function that produces an ``OutputStream``, when the sink is materialized the function will be called and bytes sent to the sink will be written to the returned ``OutputStream``. Materializes into a ``Future`` or ``CompletionStage`` which will complete with a ``IOResult`` when the stream @@ -318,12 +346,12 @@ to handle multiple invocations. asInputStream ^^^^^^^^^^^^^ -Creates a sink which materializes into an ``InputStream`` that can be read to trigger demand through the sink. +Create a sink which materializes into an ``InputStream`` that can be read to trigger demand through the sink. Bytes emitted through the stream will be available for reading through the ``InputStream`` fromInputStream ^^^^^^^^^^^^^^^ -Creates a source that wraps an ``InputStream``. Takes a function that produces an ``InputStream``, when the source is +Create a source that wraps an ``InputStream``. Takes a function that produces an ``InputStream``, when the source is materialized the function will be called and bytes from the ``InputStream`` will be emitted into the stream. Materializes into a ``Future`` or ``CompletionStage`` which will complete with a ``IOResult`` when the stream @@ -334,7 +362,7 @@ to handle multiple invocations. asOutputStream ^^^^^^^^^^^^^^ -Creates a source that materializes into an ``OutputStream``. When bytes are written to the ``OutputStream`` they +Create a source that materializes into an ``OutputStream``. When bytes are written to the ``OutputStream`` they are emitted from the source @@ -345,12 +373,12 @@ Sources and sinks for reading and writing files can be found on ``FileIO``. fromFile ^^^^^^^^ -Emits the contents of a file, as ``ByteString`` s, materializes into a ``Future`` or ``CompletionStage`` which will be completed with +Emit the contents of a file, as ``ByteString`` s, materializes into a ``Future`` or ``CompletionStage`` which will be completed with a ``IOResult`` upon reaching the end of the file or if there is a failure. toFile ^^^^^^ -Creates a sink which will write incoming ``ByteString`` s to a given file. +Create a sink which will write incoming ``ByteString`` s to a given file. @@ -370,154 +398,577 @@ states (for example ``Try`` in Scala). Simple processing stages -^^^^^^^^^^^^^^^^^^^^^^^^ +------------------------ -These stages are all expressible as a ``GraphStage``. These stages can transform the rate of incoming elements -since there are stages that emit multiple elements for a single input (e.g. `mapConcat') or consume -multiple elements before emitting one output (e.g. ``filter``). However, these rate transformations are data-driven, i.e. it is -the incoming elements that define how the rate is affected. This is in contrast with :ref:`detached-stages-overview` -which can change their processing behavior depending on being backpressured by downstream or not. +These stages can transform the rate of incoming elements since there are stages that emit multiple elements for a +single input (e.g. `mapConcat') or consume multiple elements before emitting one output (e.g. ``filter``). +However, these rate transformations are data-driven, i.e. it is the incoming elements that define how the +rate is affected. This is in contrast with :ref:`detached-stages-overview` which can change their processing behavior +depending on being backpressured by downstream or not. + +map +^^^ +Transform each element in the stream by calling a mapping function with it and passing the returned value downstream. + +*emits* when the mapping function returns an element + +*backpressures* when downstream backpressures + +*completes* when upstream completes + +mapConcat +^^^^^^^^^ +Transform each element into zero or more elements that are individually passed downstream. + +*emits* when the mapping function returns an element or there are still remaining elements from the previously calculated collection + +*backpressures* when downstream backpressures or there are still available elements from the previously calculated collection + +*completes* when upstream completes and all remaining elements has been emitted + +filter +^^^^^^ +Filter the incoming elements using a predicate. If the predicate returns true the element is passed downstream, if +it returns false the element is discarded. + +*emits* when the given predicate returns true for the element + +*backpressures* when the given predicate returns true for the element and downstream backpressures + +*completes* when upstream completes + +collect +^^^^^^^ +Apply a partial function to each incoming element, if the partial function is defined for a value the returned +value is passed downstream. Can often replace ``filter`` followed by ``map`` to achieve the same in one single stage. + +*emits* when the provided partial function is defined for the element + +*backpressures* the partial function is defined for the element and downstream backpressures + +*completes* when upstream completes + +grouped +^^^^^^^ +Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of +elements downstream. + +*emits* when the specified number of elements has been accumulated or upstream completed + +*backpressures* when a group has been assembled and downstream backpressures + +*completes* when upstream completes + +sliding +^^^^^^^ +Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream. + +Note: the last window might be smaller than the requested size due to end of stream. + +*emits* the specified number of elements has been accumulated or upstream completed + +*backpressures* when a group has been assembled and downstream backpressures + +*completes* when upstream completes + + +scan +^^^^ +Emit its current value which starts at ``zero`` and then applies the current and next value to the given function +emitting the next current value. + +Note that this means that scan emits one element downstream before and upstream elements will not be requested until +the second element is required from downstream. + +*emits* when the function scanning the element returns a new element + +*backpressures* when downstream backpressures + +*completes* when upstream completes + +fold +^^^^ +Start with current value ``zero`` and then apply the current and next value to the given function, when upstream +complete the current value is emitted downstream. + +*emits* when upstream completes + +*backpressures* when downstream backpressures + +*completes* when upstream completes + +drop +^^^^ +Drop ``n`` elements and then pass any subsequent element downstream. + +*emits* when the specified number of elements has been dropped already + +*backpressures* when the specified number of elements has been dropped and downstream backpressures + +*completes* when upstream completes + +take +^^^^ +Pass ``n`` incoming elements downstream and then complete + +*emits* while the specified number of elements to take has not yet been reached + +*backpressures* when downstream backpressures + +*completes* when the defined number of elements has been taken or upstream completes + + +takeWhile +^^^^^^^^^ +Pass elements downstream as long as a predicate function return true for the element include the element +when the predicate first return false and then complete. + +*emits* while the predicate is true and until the first false result + +*backpressures* when downstream backpressures + +*completes* when predicate returned false or upstream completes + +dropWhile +^^^^^^^^^ +Drop elements as long as a predicate function return true for the element + +*emits* when the predicate returned false and for all following stream elements + +*backpressures* predicate returned false and downstream backpressures + +*completes* when upstream completes + +recover +^^^^^^^ +Allow sending of one last element downstream when a failure has happened upstream. + +*emits* when the element is available from the upstream or upstream is failed and pf returns an element + +*backpressures* when downstream backpressures, not when failure happened + +*completes* when upstream completes or upstream failed with exception pf can handle + +detach +^^^^^^ +Detach upstream demand from downstream demand without detaching the stream rates. + +*emits* when the upstream stage has emitted and there is demand + +*backpressures* when downstream backpressures + +*completes* when upstream completes -===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== -Stage Emits when Backpressures when Completes when -===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== -map the mapping function returns an element downstream backpressures upstream completes -mapConcat the mapping function returns an element or there are still remaining elements from the previously calculated collection downstream backpressures or there are still available elements from the previously calculated collection upstream completes and all remaining elements has been emitted -filter the given predicate returns true for the element the given predicate returns true for the element and downstream backpressures upstream completes -collect the provided partial function is defined for the element the partial function is defined for the element and downstream backpressures upstream completes -grouped the specified number of elements has been accumulated or upstream completed a group has been assembled and downstream backpressures upstream completes -sliding the specified number of elements has been accumulated or upstream completed a group has been assembled and downstream backpressures upstream completes -scan the function scanning the element returns a new element downstream backpressures upstream completes -fold upstream completes downstream backpressures upstream completes -drop the specified number of elements has been dropped already the specified number of elements has been dropped and downstream backpressures upstream completes -take the specified number of elements to take has not yet been reached downstream backpressures the defined number of elements has been taken or upstream completes -takeWhile the predicate is true and until the first false result downstream backpressures predicate returned false or upstream completes -dropWhile the predicate returned false and for all following stream elements predicate returned false and downstream backpressures upstream completes -recover the element is available from the upstream or upstream is failed and pf returns an element downstream backpressures, not when failure happened upstream completes or upstream failed with exception pf can handle -detach the upstream stage has emitted and there is demand downstream backpressures upstream completes -===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== Asynchronous processing stages -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +------------------------------ These stages encapsulate an asynchronous computation, properly handling backpressure while taking care of the asynchronous operation at the same time (usually handling the completion of a Future). -===================== ========================================================================================================================= ============================================================================================================================== ============================================================================================= -Stage Emits when Backpressures when Completes when -===================== ========================================================================================================================= ============================================================================================================================== ============================================================================================= -mapAsync the Future returned by the provided function finishes for the next element in sequence the number of futures reaches the configured parallelism and the downstream backpressures upstream completes and all futures has been completed and all elements has been emitted [1]_ -mapAsyncUnordered any of the Futures returned by the provided function complete the number of futures reaches the configured parallelism and the downstream backpressures upstream completes and all futures has been completed and all elements has been emitted [1]_ -===================== ========================================================================================================================= ============================================================================================================================== ============================================================================================= + +mapAsync +^^^^^^^^ +Pass incoming elements to a function that return a ``Future`` result. When the future arrives the result is passed +downstream. Up to ``n`` elements can be processed concurrently, but regardless of their completion time the incoming +order will be kept when results complete. For use cases where order does not mather ``mapAsyncUnordered`` can be used. + +If a Future fails, the stream also fails (unless a different supervision strategy is applied) + +*emits* when the Future returned by the provided function finishes for the next element in sequence + +*backpressures* when the number of futures reaches the configured parallelism and the downstream backpressures + +*completes* when upstream completes and all futures has been completed and all elements has been emitted + +mapAsyncUnordered +^^^^^^^^^^^^^^^^^ +Like ``mapAsync`` but ``Future`` results are passed downstream as they arrive regardless of the order of the elements +that triggered them. + +If a Future fails, the stream also fails (unless a different supervision strategy is applied) + +*emits* any of the Futures returned by the provided function complete + +*backpressures* when the number of futures reaches the configured parallelism and the downstream backpressures + +*completes* upstream completes and all futures has been completed and all elements has been emitted + Timer driven stages -^^^^^^^^^^^^^^^^^^^ +------------------- These stages process elements using timers, delaying, dropping or grouping elements for certain time durations. -===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== -Stage Emits when Backpressures when Completes when -===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== -takeWithin an upstream element arrives downstream backpressures upstream completes or timer fires -dropWithin after the timer fired and a new upstream element arrives downstream backpressures upstream completes -groupedWithin the configured time elapses since the last group has been emitted the group has been assembled (the duration elapsed) and downstream backpressures upstream completes -===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== +takeWithin +^^^^^^^^^^ +Pass elements downstream within a timeout and then complete. + +*emits* when an upstream element arrives + +*backpressures* downstream backpressures + +*completes* upstream completes or timer fires + + +dropWithin +^^^^^^^^^^ +Drop elements until a timeout has fired + +*emits* after the timer fired and a new upstream element arrives + +*backpressures* when downstream backpressures + +*completes* upstream completes + +groupedWithin +^^^^^^^^^^^^^ +Chunk up the stream into groups of elements received within a time window, or limited by the given number of elements, +whichever happens first. + +*emits* when the configured time elapses since the last group has been emitted + +*backpressures* when the group has been assembled (the duration elapsed) and downstream backpressures + +*completes* when upstream completes -**It is currently not possible to build custom timer driven stages** .. _detached-stages-overview: Backpressure aware stages -^^^^^^^^^^^^^^^^^^^^^^^^^ +------------------------- -These stages are all expressible as a ``DetachedStage``. These stages are aware of the backpressure provided by their -downstreams and able to adapt their behavior to that signal. +These stages are aware of the backpressure provided by their downstreams and able to adapt their behavior to that signal. + +conflate +^^^^^^^^ +Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as +there is backpressure. The summary value must be of the same type as the incoming elements, for example the sum or +average of incoming numbers, if aggregation should lead to a different type ``conflateWithSeed`` can be used: + +*emits* when downstream stops backpressuring and there is a conflated element available + +*backpressures* when the aggregate function cannot keep up with incoming elements + +*completes* when upstream completes + +conflateWithSeed +^^^^^^^^^^^^^^^^ +Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there +is backpressure. When backpressure starts or there is no backpressure element is passed into a ``seed`` function to +transform it to the summary type. + +*emits* when downstream stops backpressuring and there is a conflated element available + +*backpressures* when the aggregate or seed functions cannot keep up with incoming elements + +*completes* when upstream completes + +batch +^^^^^ +Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there +is backpressure and a maximum number of batched elements is not yet reached. When the maximum number is reached and +downstream still backpressures batch will also backpressure. + +When backpressure starts or there is no backpressure element is passed into a ``seed`` function to transform it +to the summary type. + +Will eagerly pull elements, this behavior may result in a single pending (i.e. buffered) element which cannot be +aggregated to the batched value. + +*emits* when downstream stops backpressuring and there is a batched element available + +*backpressures* when batched elements reached the max limit of allowed batched elements & downstream backpressures + +*completes* when upstream completes and a "possibly pending" element was drained + + +batchWeighted +^^^^^^^^^^^^^ +Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there +is backpressure and a maximum weight batched elements is not yet reached. The weight of each element is determined by +applying ``costFn``. When the maximum total weight is reached and downstream still backpressures batch will also +backpressure. + +Will eagerly pull elements, this behavior may result in a single pending (i.e. buffered) element which cannot be +aggregated to the batched value. + +*emits* downstream stops backpressuring and there is a batched element available + +*backpressures* batched elements reached the max weight limit of allowed batched elements & downstream backpressures + +*completes* upstream completes and a "possibly pending" element was drained + +expand +^^^^^^ +Allow for a faster downstream by expanding the last incoming element to an ``Iterator``. For example +``Iterator.continually(element)`` to keep repating the last incoming element. + +*emits* when downstream stops backpressuring + +*backpressures* when downstream backpressures + +*completes* when upstream completes + +buffer (Backpressure) +^^^^^^^^^^^^^^^^^^^^^ +Allow for a temporarily faster upstream events by buffering ``size`` elements. When the buffer is full backpressure +is applied. + +*emits* when downstream stops backpressuring and there is a pending element in the buffer + +*backpressures* when buffer is full + +*completes* when upstream completes and buffered elements has been drained + +buffer (Drop) +^^^^^^^^^^^^^ +Allow for a temporarily faster upstream events by buffering ``size`` elements. When the buffer is full elements are +dropped according to the specified ``OverflowStrategy``: + +* ``dropHead`` drops the oldest element in the buffer to make space for the new element +* ``dropTail`` drops the youngest element in the buffer to make space for the new element +* ``dropBuffer`` drops the entire buffer and buffers the new element +* ``dropNew`` drops the new element + +*emits* when downstream stops backpressuring and there is a pending element in the buffer + +*backpressures* never (when dropping cannot keep up with incoming elements) + +*completes* upstream completes and buffered elements has been drained + +buffer (Fail) +^^^^^^^^^^^^^ +Allow for a temporarily faster upstream events by buffering ``size`` elements. When the buffer is full the stage fails +the flow with a ``BufferOverflowException``. + +*emits* when downstream stops backpressuring and there is a pending element in the buffer + +*backpressures* never, fails the stream instead of backpressuring when buffer is full + +*completes* when upstream completes and buffered elements has been drained -===================== ========================================================================================================================= ==================================================================================================================================== ===================================================================================== -Stage Emits when Backpressures when Completes when -===================== ========================================================================================================================= ==================================================================================================================================== ===================================================================================== -conflate downstream stops backpressuring and there is a conflated element available never [2]_ upstream completes -conflateWithSeed downstream stops backpressuring and there is a conflated element available never [2]_ upstream completes -batch downstream stops backpressuring and there is a batched element available batched elements reached the max limit of allowed batched elements & downstream backpressures upstream completes and a "possibly pending" element was drained [3]_ -batchWeighted downstream stops backpressuring and there is a batched element available batched elements reached the max weight limit of allowed batched elements (plus a pending element [3]_ ) & downstream backpressures upstream completes and a "possibly pending" element was drained [3]_ -expand downstream stops backpressuring downstream backpressures upstream completes -buffer (Backpressure) downstream stops backpressuring and there is a pending element in the buffer buffer is full upstream completes and buffered elements has been drained -buffer (DropX) downstream stops backpressuring and there is a pending element in the buffer never [2]_ upstream completes and buffered elements has been drained -buffer (Fail) downstream stops backpressuring and there is a pending element in the buffer fails the stream instead of backpressuring when buffer is full upstream completes and buffered elements has been drained -===================== ========================================================================================================================= ==================================================================================================================================== ===================================================================================== Nesting and flattening stages -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +----------------------------- These stages either take a stream and turn it into a stream of streams (nesting) or they take a stream that contains nested streams and turn them into a stream of elements instead (flattening). -**It is currently not possible to build custom nesting or flattening stages** +prefixAndTail +^^^^^^^^^^^^^ +Take up to `n` elements from the stream (less than `n` only if the upstream completes before emitting `n` elements) +and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements. + +*emits* when the configured number of prefix elements are available. Emits this prefix, and the rest as a substream + +*backpressures* when downstream backpressures or substream backpressures + +*completes* when prefix elements has been consumed and substream has been consumed + + +groupBy +^^^^^^^ +Demultiplex the incoming stream into separate output streams. + +*emits* an element for which the grouping function returns a group that has not yet been created. Emits the new group +there is an element pending for a group whose substream backpressures + +*completes* when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) + +splitWhen +^^^^^^^^^ +Split off elements into a new substream whenever a predicate function return ``true``. + +*emits* an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements + +*backpressures* when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures + +*completes* when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) + +splitAfter +^^^^^^^^^^ +End the current substream whenever a predicate returns ``true``, starting a new substream for the next element. + +*emits* when an element passes through. When the provided predicate is true it emitts the element * and opens a new substream for subsequent element + +*backpressures* when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures + +*completes* when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) + +flatMapConcat +^^^^^^^^^^^^^ +Transform each input element into a ``Source`` whose elements are then flattened into the output stream through +concatenation. This means each source is fully consumed before consumption of the next source starts. + +*emits* when the current consumed substream has an element available + +*backpressures* when downstream backpressures + +*completes* when upstream completes and all consumed substreams complete + + +flatMapMerge +^^^^^^^^^^^^ +Transform each input element into a ``Source`` whose elements are then flattened into the output stream through +merging. The maximum number of merged sources has to be specified. + +*emits* when one of the currently consumed substreams has an element available + +*backpressures* when downstream backpressures + +*completes* when upstream completes and all consumed substreams complete -===================== ========================================================================================================================================= ============================================================================================================================== ===================================================================================== -Stage Emits when Backpressures when Completes when -===================== ========================================================================================================================================= ============================================================================================================================== ===================================================================================== -prefixAndTail the configured number of prefix elements are available. Emits this prefix, and the rest as a substream downstream backpressures or substream backpressures prefix elements has been consumed and substream has been consumed -groupBy an element for which the grouping function returns a group that has not yet been created. Emits the new group there is an element pending for a group whose substream backpressures upstream completes [4]_ -splitWhen an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures upstream completes [4]_ -splitAfter an element passes through. When the provided predicate is true it emitts the element * and opens a new substream for subsequent element there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures upstream completes [4]_ -flatMapConcat the current consumed substream has an element available downstream backpressures upstream completes and all consumed substreams complete -flatMapMerge one of the currently consumed substreams has an element available downstream backpressures upstream completes and all consumed substreams complete -===================== ========================================================================================================================================= ============================================================================================================================== ===================================================================================== Fan-in stages -^^^^^^^^^^^^^ +------------- -Most of these stages can be expressible as a ``GraphStage``. These stages take multiple streams as their input and provide -a single output combining the elements from all of the inputs in different ways. +These stages take multiple streams as their input and provide a single output combining the elements from all of +the inputs in different ways. -**The custom fan-in stages that can be built currently are limited** +merge +^^^^^ +Merge multiple sources. Picks elements randomly if all sources has elements ready. -===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== -Stage Emits when Backpressures when Completes when -===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== -merge one of the inputs has an element available downstream backpressures all upstreams complete (*) -mergeSorted all of the inputs have an element available downstream backpressures all upstreams complete -mergePreferred one of the inputs has an element available, preferring a defined input if multiple have elements available downstream backpressures all upstreams complete (*) -zip all of the inputs have an element available downstream backpressures any upstream completes -zipWith all of the inputs have an element available downstream backpressures any upstream completes -concat the current stream has an element available; if the current input completes, it tries the next one downstream backpressures all upstreams complete -prepend the given stream has an element available; if the given input completes, it tries the current one downstream backpressures all upstreams complete -===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== +*emits* when one of the inputs has an element available -(*) This behavior is changeable to completing when any upstream completes by setting ``eagerComplete=true``. +*backpressures* when downstream backpressures + +*completes* when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting ``eagerComplete=true``.) + +mergeSorted +^^^^^^^^^^^ +Merge multiple sources. Waits for one element to be ready from each input stream and emits the +smallest element. + +*emits* when all of the inputs have an element available + +*backpressures* when downstream backpressures + +*completes* when all upstreams complete + +mergePreferred +^^^^^^^^^^^^^^ +Merge multiple sources. Prefer one source if all sources has elements ready. + +*emits* when one of the inputs has an element available, preferring a defined input if multiple have elements available + +*backpressures* when downstream backpressures + +*completes* when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting ``eagerComplete=true``.) + +zip +^^^ +Combines elements from each of multiple sources into tuples and passes the tuples downstream. + +*emits* when all of the inputs have an element available + +*backpressures* when downstream backpressures + +*completes* when any upstream completes + +zipWith +^^^^^^^ +Combines elements from multiple sources through a ``combine`` function and passes the +returned value downstream. + +*emits* when all of the inputs have an element available + +*backpressures* when downstream backpressures + +*completes* when any upstream completes + +concat +^^^^^^ +After completion of the original upstream the elements of the given source will be emitted. + +*emits* when the current stream has an element available; if the current input completes, it tries the next one + +*backpressures* when downstream backpressures + +*completes* when all upstreams complete + +prepend +^^^^^^^ +Prepends the given source to the flow, consuming it until completion before the original source is consumed. + +If materialized values needs to be collected ``prependMat`` is available. + +*emits* when the given stream has an element available; if the given input completes, it tries the current one + +*backpressures* when downstream backpressures + +*completes* when all upstreams complete + +interleave +^^^^^^^^^^ +Emits a specifiable number of elements from the original source, then from the provided source and repeats. If one +source completes the rest of the other stream will be emitted. + +*emits* when element is available from the currently consumed upstream + +*backpressures* when upstream backpressures + +*completes* when both upstreams have completed Fan-out stages -^^^^^^^^^^^^^^ +-------------- -Most of these stages can be expressible as a ``GraphStage``. These have one input and multiple outputs. They might -route the elements between different outputs, or emit elements on multiple outputs at the same time. +These have one input and multiple outputs. They might route the elements between different outputs, or emit elements on +multiple outputs at the same time. -**The custom fan-out stages that can be built currently are limited** +unzip +^^^^^ +Takes a stream of two element tuples and unzips the two elements ino two different downstreams. + +*emits* when all of the outputs stops backpressuring and there is an input element available + +*backpressures* when any of the outputs backpressures + +*completes* when upstream completes + +unzipWith +^^^^^^^^^ +Splits each element of input into multiple downstreams using a function + +*emits* when all of the outputs stops backpressuring and there is an input element available + +*backpressures* when any of the outputs backpressures + +*completes* when upstream completes + +broadcast +^^^^^^^^^ +Emit each incoming element each of ``n`` outputs. + +*emits* when all of the outputs stops backpressuring and there is an input element available + +*backpressures* when any of the outputs backpressures + +*completes* when upstream completes + +balance +^^^^^^^ +Fan-out the stream to several streams. Each upstream element is emitted to the first available downstream consumer. + +*emits* when any of the outputs stops backpressuring; emits the element to the first available output + +*backpressures* when all of the outputs backpressure + +*completes* when upstream completes -===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== -Stage Emits when Backpressures when Completes when -===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== -unzip all of the outputs stops backpressuring and there is an input element available any of the outputs backpressures upstream completes -unzipWith all of the outputs stops backpressuring and there is an input element available any of the outputs backpressures upstream completes -broadcast all of the outputs stops backpressuring and there is an input element available any of the outputs backpressures upstream completes -balance any of the outputs stops backpressuring; emits the element to the first available output all of the outputs backpressure upstream completes -===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== Watching status stages -^^^^^^^^^^^^^^^^^^^^^^ +---------------------- -Materializes to a Future that will be completed with Done or failed depending whether the upstream of the stage has been completed or failed. +watchTermination +^^^^^^^^^^^^^^^^ +Materializes to a ``Future`` that will be completed with Done or failed depending whether the upstream of the stage has been completed or failed. The stage otherwise passes through elements unchanged. -===================== ======================================================================== ========================================================== ===================================================================================== -Stage Emits when Backpressures when Completes when -===================== ======================================================================== ========================================================== ===================================================================================== -watchTermination input has an element available output backpressures upstream completes -===================== ======================================================================== ========================================================== ===================================================================================== +*emits* when input has an element available +*backpressures* when output backpressures + +*completes* when upstream completes -.. [1] If a Future fails, the stream also fails (unless a different supervision strategy is applied) -.. [2] Except if the encapsulated computation is not fast enough -.. [3] Batch & BatchWeighted stages eagerly pulling elements, and this behavior may result in a single pending (i.e. buffered) element which cannot be aggregated to the batched value -.. [4] Until the end of stream it is not possible to know whether new substreams will be needed or not diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index fa2956f215..d8d4acee6c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -970,7 +970,7 @@ trait FlowOps[+Out, +Mat] { * until the subscriber is ready to accept them. For example a batch step might store received elements in * an array up to the allowed max limit if the upstream publisher is faster. * - * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * This only rolls up elements if the upstream is faster, but if the downstream is faster it will not * duplicate elements. * * '''Emits when''' downstream stops backpressuring and there is an aggregated element available