+doc, str #16716: Table summarizing built-in processing stages

This commit is contained in:
Endre Sándor Varga 2015-04-09 16:11:16 +02:00 committed by Endre Sándor Varga
parent e15891b7c9
commit e82b89e285
15 changed files with 658 additions and 17 deletions

View file

@ -30,6 +30,7 @@ Processing Stage
The common name for all building blocks that build up a Flow or FlowGraph.
Examples of a processing stage would be operations like ``map()``, ``filter()``, stages added by ``transform()`` like
:class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage` and graph junctions like ``Merge`` or ``Broadcast``.
For the full list of built-in processing stages see :ref:`stages-overview`
Defining and running streams
----------------------------

View file

@ -23,7 +23,7 @@ Flow graphs are built from simple Flows which serve as the linear connections wi
which serve as fan-in and fan-out points for Flows. Thanks to the junctions having meaningful types based on their behaviour
and making them explicit elements these elements should be rather straightforward to use.
Akka Streams currently provide these junctions:
Akka Streams currently provide these junctions (for a detailed list see :ref:`stages-overview`):
* **Fan-out**

View file

@ -18,5 +18,6 @@ Streams
stream-io
stream-parallelism
stream-testkit
../stages-overview
stream-cookbook
../stream-configuration

View file

@ -78,6 +78,8 @@ and for best results we recommend the following approach:
point.
* The bottom-up learners may feel more at home rummaging through the
:ref:`stream-cookbook-java`.
* For a complete overview of the built-in processing stages you can look at the
table in :ref:`stages-overview`
* The other sections can be read sequentially or as needed during the previous
steps, each digging deeper into specific topics.

View file

@ -16,6 +16,8 @@ This part also serves as supplementary material for the main body of documentati
open while reading the manual and look for examples demonstrating various streaming concepts
as they appear in the main body of documentation.
If you need a quick reference of the available processing stages used in the recipes see :ref:`stages-overview`.
Working with Flows
==================

View file

@ -30,6 +30,7 @@ Processing Stage
The common name for all building blocks that build up a Flow or FlowGraph.
Examples of a processing stage would be operations like ``map()``, ``filter()``, stages added by ``transform()`` like
:class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage` and graph junctions like ``Merge`` or ``Broadcast``.
For the full list of built-in processing stages see :ref:`stages-overview`
Defining and running streams
----------------------------

View file

@ -24,7 +24,7 @@ Flow graphs are built from simple Flows which serve as the linear connections wi
which serve as fan-in and fan-out points for Flows. Thanks to the junctions having meaningful types based on their behaviour
and making them explicit elements these elements should be rather straightforward to use.
Akka Streams currently provide these junctions:
Akka Streams currently provide these junctions (for a detailed list see :ref:`stages-overview`):
* **Fan-out**

View file

@ -18,5 +18,6 @@ Streams
stream-io
stream-parallelism
stream-testkit
../stages-overview
stream-cookbook
../stream-configuration

View file

@ -78,6 +78,8 @@ and for best results we recommend the following approach:
point.
* The bottom-up learners may feel more at home rummaging through the
:ref:`stream-cookbook-scala`.
* For a complete overview of the built-in processing stages you can look at the
table in :ref:`stages-overview`
* The other sections can be read sequentially or as needed during the previous
steps, each digging deeper into specific topics.

View file

@ -0,0 +1,142 @@
.. _stages-overview:
###############################################
Overview of built-in stages and their semantics
###############################################
All stages by default backpressure if the computation they encapsulate is not fast enough to keep up with the rate of
incoming elements from the preceding stage. There are differences though how the different stages handle when some of
their downstream stages backpressure them. This table provides a summary of all built-in stages and their semantics.
All stages stop and propagate the failure downstream as soon as any of their upstreams emit a failure unless supervision
is used. This happens to ensure reliable teardown of streams and cleanup when failures happen. Failures are meant to
be to model unrecoverable conditions, therefore they are always eagerly propagated.
For in-band error handling of normal errors (dropping elements if a map fails for example) you should use the
upervision support, or explicitly wrap your element types in a proper container that can express error or success
states (for example ``Try`` in Scala).
Custom components are not covered by this table since their semantics are defined by the user.
Simple processing stages
^^^^^^^^^^^^^^^^^^^^^^^^
These stages are all expressible as a ``PushPullStage``. These stages can transform the rate of incoming elements
since there are stages that emit multiple elements for a single input (e.g. `mapConcat') or consume
multiple elements before emitting one output (e.g. ``filter``). However, these rate transformations are data-driven, i.e. it is
the incoming elements that define how the rate is affected. This is in contrast with :ref:`detached-stages-overview`
which can change their processing behavior depending on being backpressured by downstream or not.
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
Stage Emits when Backpressures when Completes when
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
map the mapping function returns an element downstream backpressures upstream completes
mapConcat the mapping function returns an element or there are still remaining elements from the previously calculated collection downstream backpressures or there are still available elements from the previously calculated collection upstream completes and all remaining elements has been emitted
filter the given predicate returns true for the element the given predicate returns true for the element and downstream backpressures upstream completes
collect the provided partial function is defined for the element the partial function is defined for the element and downstream backpressures upstream completes
grouped the specified number of elements has been accumulated or upstream completed a group has been assembled and downstream backpressures upstream completes
scan the function scanning the element returns a new element downstream backpressures upstream completes
drop the specified number of elements has been dropped already the specified number of elements has been dropped and downstream backpressures upstream completes
take the specified number of elements to take has not yet been reached downstream backpressures the defined number of elements has been taken or upstream completes
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
Asynchronous processing stages
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
These stages encapsulate an asynchronous computation, properly handling backpressure while taking care of the asynchronous
operation at the same time (usually handling the completion of a Future).
**It is currently not possible to build custom asynchronous processing stages**
===================== ========================================================================================================================= ============================================================================================================================== =============================================================================================
Stage Emits when Backpressures when Completes when
===================== ========================================================================================================================= ============================================================================================================================== =============================================================================================
mapAsync the Future returned by the provided function finishes for the next element in sequence the number of futures reaches the configured parallelism and the downstream backpressures upstream completes and all futures has been completed and all elements has been emitted [1]_
mapAsyncUnordered any of the Futures returned by the provided function complete the number of futures reaches the configured parallelism and the downstream backpressures upstream completes and all futures has been completed and all elements has been emitted [1]_
===================== ========================================================================================================================= ============================================================================================================================== =============================================================================================
Timer driven stages
^^^^^^^^^^^^^^^^^^^
These stages process elements using timers, delaying, dropping or grouping elements for certain time durations.
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
Stage Emits when Backpressures when Completes when
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
takeWithin an upstream element arrives downstream backpressures upstream completes or timer fires
dropWithin after the timer fired and a new upstream element arrives downstream backpressures upstream completes
groupedWithin the configured time elapses since the last group has been emitted the group has been assembled (the duration elapsed) and downstream backpressures upstream completes
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
**It is currently not possible to build custom timer driven stages**
.. _detached-stages-overview:
Backpressure aware stages
^^^^^^^^^^^^^^^^^^^^^^^^^
These stages are all expressible as a ``DetachedStage``. These stages are aware of the backpressure provided by their
downstreams and able to adapt their behavior to that signal.
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
Stage Emits when Backpressures when Completes when
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
conflate downstream stops backpressuring and there is a conflated element available never [2]_ upstream completes
expand downstream stops backpressuring downstream backpressures upstream completes
buffer (Backpressure) downstream stops backpressuring and there is a pending element in the buffer buffer is full upstream completes and buffered elements has been drained
buffer (DropX) downstream stops backpressuring and there is a pending element in the buffer never [2]_ upstream completes and buffered elements has been drained
buffer (Fail) downstream stops backpressuring and there is a pending element in the buffer fails the stream instead of backpressuring when buffer is full upstream completes and buffered elements has been drained
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
Nesting and flattening stages
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
These stages either take a stream and turn it into a stream of streams (nesting) or they take a stream that contains
nested streams and turn them into a stream of elements instead (flattening).
**It is currently not possible to build custom nesting or flattening stages**
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
Stage Emits when Backpressures when Completes when
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
prefixAndTail the configured number of prefix elements are available. Emits this prefix, and the rest as a substream downstream backpressures or substream backpressures prefix elements has been consumed and substream has been consumed
groupBy an element for which the grouping function returns a group that has not yet been created. Emits the new group there is an element pending for a group whose substream backpressures upstream completes [3]_
splitWhen an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures upstream completes [3]_
flatten (Concat) the current consumed substream has an element available downstream backpressures upstream completes and all consumed substreams complete
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
Fan-in stages
^^^^^^^^^^^^^
Most of these stages can be expressible as a ``FlexiMerge``. These stages take multiple streams as their input and provide
a single output combining the elements from all of the inputs in different ways.
**The custom fan-in stages that can be built currently are limited**
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
Stage Emits when Backpressures when Completes when
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
merge one of the inputs has an element available downstream backpressures all upstreams complete
mergePreferred one of the inputs has an element available, preferring a defined input if multiple have elements available downstream backpressures all upstreams complete
zip all of the inputs has an element available downstream backpressures any upstream completes
zipWith all of the inputs has an element available downstream backpressures any upstream completes
concat the current stream has an element available; if the current input completes, it tries the next one downstream backpressures all upstreams complete
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
Fan-out stages
^^^^^^^^^^^^^^
Most of these stages can be expressible as a ``FlexiRoute``. These have one input and multiple outputs. They might
route the elements between different outputs, or emit elements on multiple outputs at the same time.
**The custom fan-out stages that can be built currently are limited**
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
Stage Emits when Backpressures when Completes when
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
unzip all of the outputs stops backpressuring and there is an input element available any of the outputs backpressures upstream completes
broadcast all of the outputs stops backpressuring and there is an input element available any of the outputs backpressures upstream completes
balance any of the outputs stops backpressuring; emits the element to the first available output all of the outputs backpressure upstream completes
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
.. [1] If a Future fails, the stream also fails (unless a different supervision strategy is applied)
.. [2] Except if the encapsulated computation is not fast enough
.. [3] Until the end of stream it is not possible to know whether new substreams will be needed or not

View file

@ -6,6 +6,17 @@ package akka.stream.javadsl
import akka.stream._
import akka.stream.scaladsl
/**
* Combine the elements of multiple streams into a stream of combined elements using a combiner function.
*
* '''Emits when''' all of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
object ZipWith {
/**

View file

@ -147,6 +147,16 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step.
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*
*/
def map[T](f: japi.Function[Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.map(f.apply))
@ -157,6 +167,16 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
*
* The returned list MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection
*
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection
*
* '''Completes when''' upstream completes and all remaining elements has been emitted
*
* '''Cancels when''' downstream cancels
*/
def mapConcat[T](f: japi.Function[Out, java.util.List[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.mapConcat(elem Util.immutableSeq(f.apply(elem))))
@ -176,6 +196,15 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* with failure and the supervision decision is [[akka.stream.Supervision#resume]] or
* [[akka.stream.Supervision#restart]] the element is dropped and the stream continues.
*
* '''Emits when''' the Future returned by the provided function finishes for the next element in sequence
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream
* backpressures or the first future is not completed
*
* '''Completes when''' upstream completes and all futures has been completed and all elements has been emitted
*
* '''Cancels when''' downstream cancels
*
* @see [[#mapAsyncUnordered]]
*/
def mapAsync[T](parallelism: Int, f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] =
@ -197,6 +226,14 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* with failure and the supervision decision is [[akka.stream.Supervision#resume]] or
* [[akka.stream.Supervision#restart]] the element is dropped and the stream continues.
*
* '''Emits when''' any of the Futures returned by the provided function complete
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures
*
* '''Completes when''' upstream completes and all futures has been completed and all elements has been emitted
*
* '''Cancels when''' downstream cancels
*
* @see [[#mapAsync]]
*/
def mapAsyncUnordered[T](parallelism: Int, f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] =
@ -204,6 +241,15 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
/**
* Only pass on those elements that satisfy the given predicate.
*
* '''Emits when''' the given predicate returns true for the element
*
* '''Backpressures when''' the given predicate returns true for the element and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def filter(p: japi.Predicate[Out]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.filter(p.test))
@ -212,6 +258,14 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step.
* Non-matching elements are filtered out.
*
* '''Emits when''' the provided partial function is defined for the element
*
* '''Backpressures when''' the partial function is defined for the element and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def collect[T](pf: PartialFunction[Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.collect(pf))
@ -221,6 +275,14 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* possibly smaller than requested due to end-of-stream.
*
* `n` must be positive, otherwise IllegalArgumentException is thrown.
*
* '''Emits when''' the specified number of elements has been accumulated or upstream completed
*
* '''Backpressures when''' a group has been assembled and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def grouped(n: Int): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] =
new Flow(delegate.grouped(n).map(_.asJava)) // TODO optimize to one step
@ -234,6 +296,14 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision#restart]] current value starts at `zero` again
* the stream will continue.
*
* '''Emits when''' the function scanning the element returns a new element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def scan[T](zero: T)(f: japi.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.scan(zero)(f.apply))
@ -245,6 +315,14 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* '''Emits when''' the configured time elapses since the last group has been emitted
*
* '''Backpressures when''' the configured time elapses since the last group has been emitted
*
* '''Completes when''' upstream completes (emits last group)
*
* '''Cancels when''' downstream completes
*
* `n` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*/
@ -254,12 +332,28 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
/**
* Discard the given number of elements at the beginning of the stream.
* No elements will be dropped if `n` is zero or negative.
*
* '''Emits when''' the specified number of elements has been dropped already
*
* '''Backpressures when''' the specified number of elements has been dropped and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def drop(n: Long): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.drop(n))
/**
* Discard the elements received within the given duration at beginning of the stream.
*
* '''Emits when''' the specified time elapsed and a new upstream element arrives
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def dropWithin(d: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.dropWithin(d))
@ -272,6 +366,14 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
*
* The stream will be completed without producing any elements if `n` is zero
* or negative.
*
* '''Emits when''' the specified number of elements to take has not yet been reached
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the defined number of elements has been taken or upstream completes
*
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
*/
def take(n: Long): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.take(n))
@ -284,6 +386,14 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
*
* Note that this can be combined with [[#take]] to limit the number of elements
* within the duration.
*
* '''Emits when''' an upstream element arrives
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or timer fires
*
* '''Cancels when''' downstream cancels or timer fires
*/
def takeWithin(d: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.takeWithin(d))
@ -296,8 +406,17 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
* '''Emits when''' downstream stops backpressuring and there is a conflated element available
*
* '''Backpressures when''' never
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*
*/
def conflate[S](seed: japi.Function[Out, S], aggregate: japi.Function2[S, Out, S]): javadsl.Flow[In, S, Mat] =
new Flow(delegate.conflate(seed.apply)(aggregate.apply))
@ -314,6 +433,14 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* Expand does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]].
* Exceptions from the `seed` or `extrapolate` functions will complete the stream with failure.
*
* '''Emits when''' downstream stops backpressuring
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @param seed Provides the first state for extrapolation using the first unconsumed element
* @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation
* state.
@ -329,6 +456,17 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements or backpressure the upstream if
* there is no space available
*
* '''Emits when''' downstream stops backpressuring and there is a pending element in the buffer
*
* '''Backpressures when''' depending on OverflowStrategy
* * Backpressure - backpressures when buffer is full
* * DropHead, DropTail, DropBuffer - never backpressures
* * Fail - fails the stream if buffer gets full
*
* '''Completes when''' upstream completes and buffered elements has been drained
*
* '''Cancels when''' downstream cancels
*
* @param size The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
@ -347,6 +485,15 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* Takes up to `n` elements from the stream and returns a pair containing a strict sequence of the taken element
* and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair
* of an empty collection and a stream containing the whole upstream unchanged.
*
* '''Emits when''' the configured number of prefix elements are available. Emits this prefix, and the rest
* as a substream
*
* '''Backpressures when''' downstream backpressures or substream backpressures
*
* '''Completes when''' prefix elements has been consumed and substream has been consumed
*
* '''Cancels when''' downstream cancels or substream cancels
*/
def prefixAndTail(n: Int): javadsl.Flow[In, akka.japi.Pair[java.util.List[Out @uncheckedVariance], javadsl.Source[Out @uncheckedVariance, Unit]], Mat] =
new Flow(delegate.prefixAndTail(n).map { case (taken, tail) akka.japi.Pair(taken.asJava, tail.asJava) })
@ -369,6 +516,16 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* If the group by function `f` throws an exception and the supervision decision
* is [[akka.stream.Supervision#resume]] or [[akka.stream.Supervision#restart]]
* the element is dropped and the stream and substreams continue.
*
* '''Emits when''' an element for which the grouping function returns a group that has not yet been created.
* Emits the new group
*
* '''Backpressures when''' there is an element pending for a group whose substream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and all substreams cancel
*
*/
def groupBy[K](f: japi.Function[Out, K]): javadsl.Flow[In, akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance, Unit]], Mat] =
new Flow(delegate.groupBy(f.apply).map { case (k, p) akka.japi.Pair(k, p.asJava) }) // TODO optimize to one step
@ -393,6 +550,16 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* If the split predicate `p` throws an exception and the supervision decision
* is [[akka.stream.Supervision#resume]] or [[akka.stream.Supervision#restart]]
* the element is dropped and the stream and substreams continue.
*
* '''Emits when''' an element for which the provided predicate is true, opening and emitting
* a new substream for subsequent element
*
* '''Backpressures when''' there is an element pending for the next substream, but the previous
* is not fully consumed yet, or the substream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and substreams cancel
*/
def splitWhen(p: japi.Predicate[Out]): javadsl.Flow[In, Source[Out, Unit], Mat] =
new Flow(delegate.splitWhen(p.test).map(_.asJava))
@ -400,6 +567,15 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
/**
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.
* This operation can be used on a stream of element type [[Source]].
*
* '''Emits when''' (Concat) the current consumed substream has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and all consumed substreams complete
*
* '''Cancels when''' downstream cancels
*
*/
def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Flow[In, U, Mat] =
new Flow(delegate.flatten(strategy))

View file

@ -17,6 +17,14 @@ import akka.japi.Pair
* that multiple flows can be attached to; if you want to have multiple independent
* junctions within the same `FlowGraph` then you will have to create multiple such
* instances.
*
* '''Emits when''' one of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete
*
* '''Cancels when''' downstream cancels
*/
object Merge {
@ -44,6 +52,15 @@ object Merge {
* that multiple flows can be attached to; if you want to have multiple independent
* junctions within the same `FlowGraph` then you will have to create multiple such
* instances.
*
* '''Emits when''' one of the inputs has an element available, preferring
* a specified input if multiple have elements available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete
*
* '''Cancels when''' downstream cancels
*/
object MergePreferred {
/**
@ -60,14 +77,22 @@ object MergePreferred {
}
/**
* Fan-out the stream to several streams. Each element is produced to
* the other streams. It will not shutdown until the subscriptions for at least
* Fan-out the stream to several streams. emitting each incoming upstream element to all downstream consumers.
* It will not shutdown until the subscriptions for at least
* two downstream subscribers have been established.
*
* Note that a junction instance describes exactly one place (vertex) in the `FlowGraph`
* that multiple flows can be attached to; if you want to have multiple independent
* junctions within the same `FlowGraph` then you will have to create multiple such
* instances.
*
* '''Emits when''' all of the outputs stops backpressuring and there is an input element available
*
* '''Backpressures when''' any of the outputs backpressure
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' all downstreams cancel
*/
object Broadcast {
/**
@ -84,14 +109,22 @@ object Broadcast {
}
/**
* Fan-out the stream to several streams. Each element is produced to
* one of the other streams. It will not shutdown until the subscriptions for at least
* Fan-out the stream to several streams. Each upstream element is emitted to the first available downstream consumer.
* It will not shutdown until the subscriptions for at least
* two downstream subscribers have been established.
*
* Note that a junction instance describes exactly one place (vertex) in the `FlowGraph`
* that multiple flows can be attached to; if you want to have multiple independent
* junctions within the same `FlowGraph` then you will have to create multiple such
* instances.
*
* '''Emits when''' any of the outputs stops backpressuring; emits the element to the first available output
*
* '''Backpressures when''' all of the outputs backpressure
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' all downstreams cancel
*/
object Balance {
/**
@ -123,6 +156,19 @@ object Balance {
create(outputCount, waitForAllDownstreams)
}
/**
* Combine the elements of 2 streams into a stream of tuples.
*
* A `Zip` has a `left` and a `right` input port and one `out` port
*
* '''Emits when''' all of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
object Zip {
import akka.stream.javadsl.japi.Function2
import akka.japi.Pair
@ -139,10 +185,17 @@ object Zip {
}
/**
* Note that a junction instance describes exactly one place (vertex) in the `FlowGraph`
* that multiple flows can be attached to; if you want to have multiple independent
* junctions within the same `FlowGraph` then you will have to create multiple such
* instances.
* Takes a stream of pair elements and splits each pair to two output streams.
*
* An `Unzip` has one `in` port and one `left` and one `right` output port.
*
* '''Emits when''' all of the outputs stops backpressuring and there is an input element available
*
* '''Backpressures when''' any of the outputs backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' any downstream cancels
*/
object Unzip {
@ -173,6 +226,14 @@ object Unzip {
* that multiple flows can be attached to; if you want to have multiple independent
* junctions within the same `FlowGraph` then you will have to create multiple such
* instances.
*
* '''Emits when''' the current stream has an element available; if the current input completes, it tries the next one
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete
*
* '''Cancels when''' downstream cancels
*/
object Concat {
/**

View file

@ -343,6 +343,15 @@ trait FlowOps[+Out, +Mat] {
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step.
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def map[T](f: Out T): Repr[T, Mat] = andThen(Map(f.asInstanceOf[Any Any]))
@ -352,6 +361,17 @@ trait FlowOps[+Out, +Mat] {
*
* The returned sequence MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection
*
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection
*
* '''Completes when''' upstream completes and all remaining elements has been emitted
*
* '''Cancels when''' downstream cancels
*
*/
def mapConcat[T](f: Out immutable.Seq[T]): Repr[T, Mat] = andThen(MapConcat(f.asInstanceOf[Any immutable.Seq[Any]]))
@ -371,6 +391,15 @@ trait FlowOps[+Out, +Mat] {
* with failure and the supervision decision is [[akka.stream.Supervision.Resume]] or
* [[akka.stream.Supervision.Restart]] the element is dropped and the stream continues.
*
* '''Emits when''' the Future returned by the provided function finishes for the next element in sequence
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream
* backpressures or the first future is not completed
*
* '''Completes when''' upstream completes and all futures has been completed and all elements has been emitted
*
* '''Cancels when''' downstream cancels
*
* @see [[#mapAsyncUnordered]]
*/
def mapAsync[T](parallelism: Int, f: Out Future[T]): Repr[T, Mat] =
@ -392,6 +421,14 @@ trait FlowOps[+Out, +Mat] {
* with failure and the supervision decision is [[akka.stream.Supervision.Resume]] or
* [[akka.stream.Supervision.Restart]] the element is dropped and the stream continues.
*
* '''Emits when''' any of the Futures returned by the provided function complete
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures
*
* '''Completes when''' upstream completes and all futures has been completed and all elements has been emitted
*
* '''Cancels when''' downstream cancels
*
* @see [[#mapAsync]]
*/
def mapAsyncUnordered[T](parallelism: Int, f: Out Future[T]): Repr[T, Mat] =
@ -399,6 +436,14 @@ trait FlowOps[+Out, +Mat] {
/**
* Only pass on those elements that satisfy the given predicate.
*
* '''Emits when''' the given predicate returns true for the element
*
* '''Backpressures when''' the given predicate returns true for the element and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def filter(p: Out Boolean): Repr[Out, Mat] = andThen(Filter(p.asInstanceOf[Any Boolean]))
@ -406,6 +451,14 @@ trait FlowOps[+Out, +Mat] {
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step.
* Non-matching elements are filtered out.
*
* '''Emits when''' the provided partial function is defined for the element
*
* '''Backpressures when''' the partial function is defined for the element and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def collect[T](pf: PartialFunction[Out, T]): Repr[T, Mat] = andThen(Collect(pf.asInstanceOf[PartialFunction[Any, Any]]))
@ -414,6 +467,14 @@ trait FlowOps[+Out, +Mat] {
* possibly smaller than requested due to end-of-stream.
*
* `n` must be positive, otherwise IllegalArgumentException is thrown.
*
* '''Emits when''' the specified number of elements has been accumulated or upstream completed
*
* '''Backpressures when''' a group has been assembled and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def grouped(n: Int): Repr[immutable.Seq[Out], Mat] = andThen(Grouped(n))
@ -426,6 +487,14 @@ trait FlowOps[+Out, +Mat] {
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Restart]] current value starts at `zero` again
* the stream will continue.
*
* '''Emits when''' the function scanning the element returns a new element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def scan[T](zero: T)(f: (T, Out) T): Repr[T, Mat] = andThen(Scan(zero, f.asInstanceOf[(Any, Any) Any]))
@ -438,6 +507,14 @@ trait FlowOps[+Out, +Mat] {
*
* `n` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*
* '''Emits when''' the configured time elapses since the last group has been emitted
*
* '''Backpressures when''' the configured time elapses since the last group has been emitted
*
* '''Completes when''' upstream completes (emits last group)
*
* '''Cancels when''' downstream completes
*/
def groupedWithin(n: Int, d: FiniteDuration): Repr[Out, Mat]#Repr[immutable.Seq[Out], Mat] = {
require(n > 0, "n must be greater than 0")
@ -469,11 +546,27 @@ trait FlowOps[+Out, +Mat] {
/**
* Discard the given number of elements at the beginning of the stream.
* No elements will be dropped if `n` is zero or negative.
*
* '''Emits when''' the specified number of elements has been dropped already
*
* '''Backpressures when''' the specified number of elements has been dropped and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def drop(n: Long): Repr[Out, Mat] = andThen(Drop(n))
/**
* Discard the elements received within the given duration at beginning of the stream.
*
* '''Emits when''' the specified time elapsed and a new upstream element arrives
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def dropWithin(d: FiniteDuration): Repr[Out, Mat]#Repr[Out, Mat] =
withAttributes(name("dropWithin")).timerTransform(() new TimerTransformer[Out, Out] {
@ -499,6 +592,14 @@ trait FlowOps[+Out, +Mat] {
*
* The stream will be completed without producing any elements if `n` is zero
* or negative.
*
* '''Emits when''' the specified number of elements to take has not yet been reached
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' the defined number of elements has been taken or upstream completes
*
* '''Cancels when''' the defined number of elements has been taken or downstream cancels
*/
def take(n: Long): Repr[Out, Mat] = andThen(Take(n))
@ -510,6 +611,14 @@ trait FlowOps[+Out, +Mat] {
*
* Note that this can be combined with [[#take]] to limit the number of elements
* within the duration.
*
* '''Emits when''' an upstream element arrives
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or timer fires
*
* '''Cancels when''' downstream cancels or timer fires
*/
def takeWithin(d: FiniteDuration): Repr[Out, Mat]#Repr[Out, Mat] =
withAttributes(name("takeWithin")).timerTransform(() new TimerTransformer[Out, Out] {
@ -533,6 +642,14 @@ trait FlowOps[+Out, +Mat] {
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
* '''Emits when''' downstream stops backpressuring and there is a conflated element available
*
* '''Backpressures when''' never
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*/
@ -551,6 +668,14 @@ trait FlowOps[+Out, +Mat] {
* Expand does not support [[akka.stream.Supervision.Restart]] and [[akka.stream.Supervision.Resume]].
* Exceptions from the `seed` or `extrapolate` functions will complete the stream with failure.
*
* '''Emits when''' downstream stops backpressuring
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @param seed Provides the first state for extrapolation using the first unconsumed element
* @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation
* state.
@ -563,6 +688,17 @@ trait FlowOps[+Out, +Mat] {
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements or backpressure the upstream if
* there is no space available
*
* '''Emits when''' downstream stops backpressuring and there is a pending element in the buffer
*
* '''Backpressures when''' depending on OverflowStrategy
* * Backpressure - backpressures when buffer is full
* * DropHead, DropTail, DropBuffer - never backpressures
* * Fail - fails the stream if buffer gets full
*
* '''Completes when''' upstream completes and buffered elements has been drained
*
* '''Cancels when''' downstream cancels
*
* @param size The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
@ -584,6 +720,16 @@ trait FlowOps[+Out, +Mat] {
* Takes up to `n` elements from the stream and returns a pair containing a strict sequence of the taken element
* and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair
* of an empty collection and a stream containing the whole upstream unchanged.
*
* '''Emits when''' the configured number of prefix elements are available. Emits this prefix, and the rest
* as a substream
*
* '''Backpressures when''' downstream backpressures or substream backpressures
*
* '''Completes when''' prefix elements has been consumed and substream has been consumed
*
* '''Cancels when''' downstream cancels or substream cancels
*
*/
def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U, Unit]), Mat] =
andThen(PrefixAndTail(n))
@ -606,6 +752,16 @@ trait FlowOps[+Out, +Mat] {
* If the group by function `f` throws an exception and the supervision decision
* is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]]
* the element is dropped and the stream and substreams continue.
*
* '''Emits when''' an element for which the grouping function returns a group that has not yet been created.
* Emits the new group
*
* '''Backpressures when''' there is an element pending for a group whose substream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and all substreams cancel
*
*/
def groupBy[K, U >: Out](f: Out K): Repr[(K, Source[U, Unit]), Mat] =
andThen(GroupBy(f.asInstanceOf[Any Any]))
@ -630,6 +786,17 @@ trait FlowOps[+Out, +Mat] {
* If the split predicate `p` throws an exception and the supervision decision
* is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]]
* the element is dropped and the stream and substreams continue.
*
* '''Emits when''' an element for which the provided predicate is true, opening and emitting
* a new substream for subsequent element
*
* '''Backpressures when''' there is an element pending for the next substream, but the previous
* is not fully consumed yet, or the substream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and substreams cancel
*
*/
def splitWhen[U >: Out](p: Out Boolean): Repr[Source[U, Unit], Mat] =
andThen(SplitWhen(p.asInstanceOf[Any Boolean]))
@ -637,6 +804,15 @@ trait FlowOps[+Out, +Mat] {
/**
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.
* This operation can be used on a stream of element type [[akka.stream.scaladsl.Source]].
*
* '''Emits when''' (Concat) the current consumed substream has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and all consumed substreams complete
*
* '''Cancels when''' downstream cancels
*
*/
def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U, Mat] = strategy match {
case _: FlattenStrategy.Concat[Out] | _: javadsl.FlattenStrategy.Concat[Out, _] andThen(ConcatAll())

View file

@ -31,7 +31,13 @@ object Merge {
* Merge several streams, taking elements as they arrive from input streams
* (picking randomly when several have elements ready).
*
* A `Merge` has one `out` port and one or more `in` ports.
* '''Emits when''' one of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete
*
* '''Cancels when''' downstream cancels
*/
class Merge[T] private (inputPorts: Int,
override val shape: UniformFanInShape[T, T],
@ -70,6 +76,17 @@ object MergePreferred {
* (picking from preferred when several have elements ready).
*
* A `MergePreferred` has one `out` port, one `preferred` input port and 0 or more secondary `in` ports.
*
* '''Emits when''' one of the inputs has an element available, preferring
* a specified input if multiple have elements available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete
*
* '''Cancels when''' downstream cancels
*
* A `Broadcast` has one `in` port and 2 or more `out` ports.
*/
class MergePreferred[T] private (secondaryPorts: Int,
override val shape: MergePreferred.MergePreferredShape[T],
@ -95,11 +112,16 @@ object Broadcast {
}
/**
* Fan-out the stream to several streams. Each element is produced to
* the other streams. It will not shut down until the subscriptions
* for at least two downstream subscribers have been established.
* Fan-out the stream to several streams emitting each incoming upstream element to all downstream consumers.
* It will not shut down until the subscriptions for at least two downstream subscribers have been established.
*
* A `Broadcast` has one `in` port and 2 or more `out` ports.
* '''Emits when''' all of the outputs stops backpressuring and there is an input element available
*
* '''Backpressures when''' any of the outputs backpressure
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' all downstreams cancel
*/
class Broadcast[T] private (outputPorts: Int,
override val shape: UniformFanOutShape[T, T],
@ -129,11 +151,19 @@ object Balance {
}
/**
* Fan-out the stream to several streams. Each element is produced to
* one of the other streams. It will not shut down until the subscriptions
* Fan-out the stream to several streams. Each upstream element is emitted to the first available downstream consumer.
* It will not shut down until the subscriptions
* for at least two downstream subscribers have been established.
*
* A `Balance` has one `in` port and 2 or more `out` ports.
*
* '''Emits when''' any of the outputs stops backpressuring; emits the element to the first available output
*
* '''Backpressures when''' all of the outputs backpressure
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' all downstreams cancel
*/
class Balance[T] private (outputPorts: Int,
waitForAllDownstreams: Boolean,
@ -161,6 +191,14 @@ object Zip {
* Combine the elements of 2 streams into a stream of tuples.
*
* A `Zip` has a `left` and a `right` input port and one `out` port
*
* '''Emits when''' all of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
class Zip[A, B] private (override val shape: FanInShape2[A, B, (A, B)],
private[stream] override val module: StreamLayout.Module)
@ -172,12 +210,31 @@ class Zip[A, B] private (override val shape: FanInShape2[A, B, (A, B)],
override def named(name: String): Zip[A, B] = withAttributes(OperationAttributes.name(name))
}
/**
* Combine the elements of multiple streams into a stream of combined elements using a combiner function.
*
* '''Emits when''' all of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
object ZipWith extends ZipWithApply
/**
* Takes a stream of pair elements and splits each pair to two output streams.
*
* An `Unzip` has one `in` port and one `left` and one `right` output port.
*
* '''Emits when''' all of the outputs stops backpressuring and there is an input element available
*
* '''Backpressures when''' any of the outputs backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' any downstream cancels
*/
object Unzip {
/**
@ -218,6 +275,14 @@ object Concat {
* all of the elements from the second stream.
*
* A `Concat` has one `first` port, one `second` port and one `out` port.
*
* '''Emits when''' the current stream has an element available; if the current input completes, it tries the next one
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete
*
* '''Cancels when''' downstream cancels
*/
class Concat[T] private (override val shape: UniformFanInShape[T, T],
private[stream] override val module: StreamLayout.Module)