Merge pull request #24928 from raboof/streamReference

Move stage docs to their own files
This commit is contained in:
Arnout Engelen 2018-04-23 14:27:33 +02:00 committed by GitHub
commit 4b4a012250
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
165 changed files with 4581 additions and 1994 deletions

View file

@ -0,0 +1,21 @@
Sources and sinks for integrating with `java.io.InputStream` and `java.io.OutputStream` can be found on
`StreamConverters`. As they are blocking APIs the implementations of these stages are run on a separate
dispatcher configured through the `akka.stream.blocking-io-dispatcher`.
@@@ warning
Be aware that `asInputStream` and `asOutputStream` materialize `InputStream` and `OutputStream` respectively as
blocking API implementation. They will block tread until data will be available from upstream.
Because of blocking nature these objects cannot be used in `mapMaterializeValue` section as it causes deadlock
of the stream materialization process.
For example, following snippet will fall with timeout exception:
```scala
...
.toMat(StreamConverters.asInputStream().mapMaterializedValue { inputStream ⇒
inputStream.read() // this could block forever
...
}).run()
```
@@@

View file

@ -0,0 +1,2 @@
These stages encapsulate an asynchronous computation, properly handling backpressure while taking care of the asynchronous
operation at the same time (usually handling the completion of a @scala[`Future`] @java[`CompletionStage`]).

View file

@ -0,0 +1 @@
These stages are aware of the backpressure provided by their downstreams and able to adapt their behavior to that signal.

View file

@ -0,0 +1,2 @@
These stages take multiple streams as their input and provide a single output combining the elements from all of
the inputs in different ways.

View file

@ -0,0 +1,2 @@
These have one input and multiple outputs. They might route the elements between different outputs, or emit elements on
multiple outputs at the same time.

View file

@ -0,0 +1 @@
Sources and sinks for reading and writing files can be found on `FileIO`.

View file

@ -0,0 +1,4 @@
These stages either take a stream and turn it into a stream of streams (nesting) or they take a stream that contains
nested streams and turn them into a stream of elements instead (flattening).
See the [Substreams](stream-substream.md) page for more detail and code samples.

View file

@ -0,0 +1,5 @@
These stages can transform the rate of incoming elements since there are stages that emit multiple elements for a
single input (e.g. `mapConcat`) or consume multiple elements before emitting one output (e.g. `filter`).
However, these rate transformations are data-driven, i.e. it is the incoming elements that define how the
rate is affected. This is in contrast with [detached stages](#backpressure-aware-stages) which can change their processing behavior
depending on being backpressured by downstream or not.

View file

@ -0,0 +1 @@
These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @java[`akka.stream.javadsl.Sink`]:

View file

@ -0,0 +1 @@
These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] @java[`akka.stream.javadsl.Source`]:

View file

@ -0,0 +1 @@
Those stages operate taking time into consideration.

View file

@ -0,0 +1 @@
These stages process elements using timers, delaying, dropping or grouping elements for certain time durations.

View file

@ -1,2 +1,3 @@
RedirectMatch 301 ^(.*)/scala/(.*) $1/$2?language=scala
RedirectMatch 301 ^(.*)/java/(.*) $1/$2?language=java
RedirectMatch 301 ^(.*)/stream/stages-overview\.html.* $1/stream/operators/index.html

View file

@ -19,9 +19,9 @@
* [stream-refs](stream-refs.md)
* [stream-parallelism](stream-parallelism.md)
* [stream-testkit](stream-testkit.md)
* [stages-overview](stages-overview.md)
* [stream-substream](stream-substream.md)
* [stream-cookbook](stream-cookbook.md)
* [../general/stream/stream-configuration](../general/stream/stream-configuration.md)
* [operators](operators/index.md)
@@@

View file

@ -0,0 +1,18 @@
# FileIO.fromPath
Emit the contents of a file.
@ref[File IO Sinks and Sources](../index.md#file-io-sinks-and-sources)
@@@div { .group-scala }
## Signature
@@signature [FileIO.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala) { #fromPath }
@@@
## Description
Emit the contents of a file, as `ByteString`s, materializes into a @scala[`Future`] @java[`CompletionStage`] which will be completed with
a `IOResult` upon reaching the end of the file or if there is a failure.

View file

@ -0,0 +1,14 @@
# FileIO.toPath
Create a sink which will write incoming `ByteString` s to a given file path.
@ref[File IO Sinks and Sources](../index.md#file-io-sinks-and-sources)
@@@div { .group-scala }
## Signature
@@signature [FileIO.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala) { #toPath }
@@@

View file

@ -0,0 +1,22 @@
# Flow.fromSinkAndSource
Creates a `Flow` from a `Sink` and a `Source` where the Flow's input will be sent to the `Sink` and the `Flow` 's output will come from the Source.
@ref[Flow stages composed of Sinks and Sources](../index.md#flow-stages-composed-of-sinks-and-sources)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #fromSinkAndSource }
@@@
## Description
Creates a `Flow` from a `Sink` and a `Source` where the Flow's input will be sent to the `Sink`
and the `Flow` 's output will come from the Source.
Note that termination events, like completion and cancelation is not automatically propagated through to the "other-side"
of the such-composed Flow. Use `Flow.fromSinkAndSourceCoupled` if you want to couple termination of both of the ends,
for example most useful in handling websocket connections.

View file

@ -0,0 +1,33 @@
# Flow.fromSinkAndSourceCoupled
Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow between them.
@ref[Flow stages composed of Sinks and Sources](../index.md#flow-stages-composed-of-sinks-and-sources)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #fromSinkAndSourceCoupled }
@@@
## Description
Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow between them.
Similar to `Flow.fromSinkAndSource` however couples the termination of these two stages.
E.g. if the emitted `Flow` gets a cancellation, the `Source` of course is cancelled,
however the Sink will also be completed. The table below illustrates the effects in detail:
| Returned Flow | Sink (in) | Source (out) |
|-------------------------------------------------|-----------------------------|---------------------------------|
| cause: upstream (sink-side) receives completion | effect: receives completion | effect: receives cancel |
| cause: upstream (sink-side) receives error | effect: receives error | effect: receives cancel |
| cause: downstream (source-side) receives cancel | effect: completes | effect: receives cancel |
| effect: cancels upstream, completes downstream | effect: completes | cause: signals complete |
| effect: cancels upstream, errors downstream | effect: receives error | cause: signals error or throws |
| effect: cancels upstream, completes downstream | cause: cancels | effect: receives cancel |
The order in which the *in* and *out* sides receive their respective completion signals is not defined, do not rely on its ordering.

View file

@ -0,0 +1,40 @@
# Flow.lazyInitAsync
Creates a real `Flow` upon receiving the first element by calling relevant `flowFactory` given as an argument.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #lazyInitAsync }
@@@
## Description
Creates a real `Flow` upon receiving the first element by calling relevant `flowFactory` given as an argument.
Internal `Flow` will not be created if there are no elements, because of completion or error.
The materialized value of the `Flow` will be the materialized value of the created internal flow.
The materialized value of the `Flow` is a @scala[`Future[Option[M]]`]@java[`CompletionStage<Optional<M>>`] that is
completed with @scala[`Some(mat)`]@java[`Optional.of(mat)`] when the internal flow gets materialized or with @scala[`None`]
@java[an empty optional] when there where no elements. If the flow materialization (including the call of the `flowFactory`)
fails then the future is completed with a failure.
Adheres to the `ActorAttributes.SupervisionStrategy` attribute.
@@@div { .callout }
**emits** when the internal flow is successfully created and it emits
**backpressures** when the internal flow is successfully created and it backpressures
**completes** when upstream completes and all elements have been emitted from the internal flow
**completes** when upstream completes and all futures have been completed and all elements have been emitted
@@@

View file

@ -0,0 +1,26 @@
# actorRef
Send the elements from the stream to an `ActorRef`.
@ref[Sink stages](../index.md#sink-stages)
@@@ div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #actorRef }
@@@
## Description
Send the elements from the stream to an `ActorRef`. No backpressure so care must be taken to not overflow the inbox.
@@@div { .callout }
**cancels** when the actor terminates
**backpressures** never
@@@

View file

@ -0,0 +1,20 @@
# actorRefWithAck
Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, to provide back pressure onto the sink.
@ref[Sink stages](../index.md#sink-stages)
## Description
Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message,
to provide back pressure onto the sink.
@@@div { .callout }
**cancels** when the actor terminates
**backpressures** when the actor acknowledgement has not arrived
@@@

View file

@ -0,0 +1,14 @@
# asPublisher
Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.
@ref[Sink stages](../index.md#sink-stages)
@@@ div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #asPublisher }
@@@

View file

@ -0,0 +1,26 @@
# cancelled
Immediately cancel the stream
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #cancelled }
@@@
## Description
Immediately cancel the stream
@@@div { .callout }
**cancels** immediately
@@@

View file

@ -0,0 +1,27 @@
# combine
Combine several sinks into one using a user specified strategy
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #combine }
@@@
## Description
Combine several sinks into one using a user specified strategy
@@@div { .callout }
**cancels** depends on the strategy
**backpressures** depends on the strategy
@@@

View file

@ -0,0 +1,32 @@
# fold
Fold over emitted element with a function, where each invocation will get the new element and the result from the previous fold invocation.
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #fold }
@@@
## Description
Fold over emitted element with a function, where each invocation will get the new element and the result from the
previous fold invocation. The first invocation will be provided the `zero` value.
Materializes into a @scala[`Future`] @java[`CompletionStage`] that will complete with the last state when the stream has completed.
This stage allows combining values into a result without a global mutable state by instead passing the state along
between invocations.
@@@div { .callout }
**cancels** never
**backpressures** when the previous fold function invocation has not yet completed
@@@div

View file

@ -0,0 +1,33 @@
# foreach
Invoke a given procedure for each element received.
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #forEach }
@@@
## Description
Invoke a given procedure for each element received. Note that it is not safe to mutate shared state from the procedure.
The sink materializes into a @scala[`Future[Option[Done]]`] @java[`CompletionStage<Optional<Done>`] which completes when the
stream completes, or fails if the stream fails.
Note that it is not safe to mutate state from the procedure.
@@@div { .callout }
**cancels** never
**backpressures** when the previous procedure invocation has not yet completed
@@@

View file

@ -0,0 +1,27 @@
# foreachParallel
Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #foreachParallel }
@@@
## Description
Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.
@@@div { .callout }
**cancels** never
**backpressures** when the previous parallel procedure invocations has not yet completed
@@@

View file

@ -0,0 +1,14 @@
# fromSubscriber
Integration with Reactive Streams, wraps a `org.reactivestreams.Subscriber` as a sink.
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #fromSubscriber }
@@@

View file

@ -0,0 +1,28 @@
# head
Materializes into a @scala[`Future`] @java[`CompletionStage`] which completes with the first value arriving, after this the stream is canceled.
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #head }
@@@
## Description
Materializes into a @scala[`Future`] @java[`CompletionStage`] which completes with the first value arriving,
after this the stream is canceled. If no element is emitted, the @scala[`Future`] @java[`CompletionStage`] is failed.
@@@div { .callout }
**cancels** after receiving one element
**backpressures** never
@@@

View file

@ -0,0 +1,29 @@
# Sink.headOption
Materializes into a @scala[`Future[Option[T]]`] @java[`CompletionStage<Optional<T>>`] which completes with the first value arriving wrapped in @scala[`Some`] @java[`Optional`], or @scala[a `None`] @java[an empty Optional] if the stream completes without any elements emitted.
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #headOption }
@@@
## Description
Materializes into a @scala[`Future[Option[T]]`] @java[`CompletionStage<Optional<T>>`] which completes with the first value arriving wrapped in @scala[`Some`] @java[`Optional`],
or @scala[a `None`] @java[an empty Optional] if the stream completes without any elements emitted.
@@@div { .callout }
**cancels** after receiving one element
**backpressures** never
@@@

View file

@ -0,0 +1,29 @@
# Sink.ignore
Consume all elements but discards them.
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #ignore }
@@@
## Description
Consume all elements but discards them. Useful when a stream has to be consumed but there is no use to actually
do anything with the elements.
@@@div { .callout }
**cancels** never
**backpressures** never
@@@

View file

@ -0,0 +1,29 @@
# Sink.last
Materializes into a @scala[`Future`] @java[`CompletionStage`] which will complete with the last value emitted when the stream completes.
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #last }
@@@
## Description
Materializes into a @scala[`Future`] @java[`CompletionStage`] which will complete with the last value emitted when the stream
completes. If the stream completes with no elements the @scala[`Future`] @java[`CompletionStage`] is failed.
@@@div { .callout }
**cancels** never
**backpressures** never
@@@

View file

@ -0,0 +1,29 @@
# Sink.lastOption
Materialize a @scala[`Future[Option[T]]`] @java[`CompletionStage<Optional<T>>`] which completes with the last value emitted wrapped in an @scala[`Some`] @java[`Optional`] when the stream completes.
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #lastOption }
@@@
## Description
Materialize a @scala[`Future[Option[T]]`] @java[`CompletionStage<Optional<T>>`] which completes with the last value
emitted wrapped in an @scala[`Some`] @java[`Optional`] when the stream completes. if the stream completes with no elements the `CompletionStage` is
completed with @scala[`None`] @java[an empty `Optional`].
@@@div { .callout }
**cancels** never
**backpressures** never
@@@

View file

@ -0,0 +1,33 @@
# Sink.lazyInitAsync
Creates a real `Sink` upon receiving the first element.
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #lazyInitAsync }
@@@
## Description
Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements,
because of completion or error.
- If upstream completes before an element was received then the @scala[`Future`]@java[`CompletionStage`] is completed with @scala[`None`]@java[an empty `Optional`].
- If upstream fails before an element was received, `sinkFactory` throws an exception, or materialization of the internal
sink fails then the @scala[`Future`]@java[`CompletionStage`] is completed with the exception.
- Otherwise the @scala[`Future`]@java[`CompletionStage`] is completed with the materialized value of the internal sink.
@@@div { .callout }
**cancels** never
**backpressures** when initialized and when created sink backpressures
@@@

View file

@ -0,0 +1,28 @@
# Sink.onComplete
Invoke a callback when the stream has completed or failed.
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #onComplete }
@@@
## Description
Invoke a callback when the stream has completed or failed.
@@@div { .callout }
**cancels** never
**backpressures** never
@@@

View file

@ -0,0 +1,18 @@
# Sink.preMaterialize
Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements 'into' the pre-materialized one.
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #preMaterialize }
@@@
## Description
Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements 'into' the pre-materialized one. Useful for when you need a materialized value of a Sink when handing it out to someone to materialize it for you.

View file

@ -0,0 +1,28 @@
# Sink.queue
Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #queue }
@@@
## Description
Materialize a `SinkQueue` that can be pulled to trigger demand through the sink. The queue contains
a buffer in case stream emitting elements faster than queue pulling them.
@@@div { .callout }
**cancels** when `SinkQueue.cancel` is called
**backpressures** when buffer has some space
@@@

View file

@ -0,0 +1,29 @@
# Sink.reduce
Apply a reduction function on the incoming elements and pass the result to the next invocation.
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #reduce }
@@@
## Description
Apply a reduction function on the incoming elements and pass the result to the next invocation. The first invocation
receives the two first elements of the flow.
Materializes into a @scala[`Future`] @java[`CompletionStage`] that will be completed by the last result of the reduction function.
@@@div { .callout }
**cancels** never
**backpressures** when the previous reduction function invocation has not yet completed
@@@

View file

@ -0,0 +1,28 @@
# Sink.seq
Collect values emitted from the stream into a collection.
@ref[Sink stages](../index.md#sink-stages)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #seq }
@@@
## Description
Collect values emitted from the stream into a collection, the collection is available through a @scala[`Future`] @java[`CompletionStage`] or
which completes when the stream completes. Note that the collection is bounded to @scala[`Int.MaxValue`] @java[`Integer.MAX_VALUE`],
if more element are emitted the sink will cancel the stream
@@@div { .callout }
**cancels** If too many values are collected
@@@

View file

@ -0,0 +1,30 @@
# alsoTo
Attaches the given `Sink` to this `Flow`, meaning that elements that pass through this `Flow` will also be sent to the `Sink`.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@ div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #alsoTo }
@@@
## Description
Attaches the given `Sink` to this `Flow`, meaning that elements that pass through this `Flow` will also be sent to the `Sink`.
@@@div { .callout }
**emits** when an element is available and demand exists both from the `Sink` and the downstream
**backpressures** when downstream or `Sink` backpressures
**completes** when upstream completes
**cancels** when downstream or `Sink` cancels
@@@

View file

@ -0,0 +1,24 @@
# apply
Stream the values of an `immutable.Seq`.
@ref[Source stages](../index.md#source-stages)
@@@ div { .group-scala }
## Signature
@@signature [Source.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #apply }
@@@
## Description
Stream the values of an `immutable.Seq`.
@@@div { .callout }
**emits** the next value of the seq
**completes** when the last element of the seq has been emitted
@@@

View file

@ -0,0 +1,37 @@
# ask
Use the `ask` pattern to send a request-reply message to the target `ref` actor.
@ref[Asynchronous processing stages](../index.md#asynchronous-processing-stages)
@@@ div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #ask }
@@@
## Description
Use the `ask` pattern to send a request-reply message to the target `ref` actor.
If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]].
The `mapTo` class parameter is used to cast the incoming responses to the expected response type.
Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`.
An `akka.util.Status#Failure` will cause the stage to fail with the cause carried in the `Failure` message.
Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
@@@div { .callout }
**emits** when the ask @scala[`Future`] @java[`CompletionStage`] returned by the provided function finishes for the next element in sequence
**backpressures** when the number of ask @scala[`Future` s] @java[`CompletionStage` s] reaches the configured parallelism and the downstream backpressures
**completes** when upstream completes and all ask @scala[`Future` s] @java[`CompletionStage` s] has been completed and all elements has been emitted
@@@

View file

@ -0,0 +1,32 @@
# backpressureTimeout
If the time between the emission of an element and the following downstream demand exceeds the provided timeout, the stream is failed with a `TimeoutException`.
@ref[Time aware stages](../index.md#time-aware-stages)
@@@ div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #backpressureTimeout }
@@@
## Description
If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
the stream is failed with a `TimeoutException`. The timeout is checked periodically, so the resolution of the
check is one period (equals to timeout value).
@@@div { .callout }
**emits** when upstream emits an element
**backpressures** when downstream backpressures
**completes** when upstream completes or fails if timeout elapses between element emission and downstream demand.
**cancels** when downstream cancels
@@@

View file

@ -0,0 +1,23 @@
# balance
Fan-out the stream to several streams.
@ref[Fan-out stages](../index.md#fan-out-stages)
## Description
Fan-out the stream to several streams. Each upstream element is emitted to the first available downstream consumer.
@@@div { .callout }
**emits** when any of the outputs stops backpressuring; emits the element to the first available output
**backpressures** when all of the outputs backpressure
**completes** when upstream completes
**cancels** depends on the `eagerCancel` flag. If it is true, when any downstream cancels, if false, when all downstreams cancel.
@@@

View file

@ -0,0 +1,36 @@
# batch
Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure and a maximum number of batched elements is not yet reached.
@ref[Backpressure aware stages](../index.md#backpressure-aware-stages)
@@@ div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #batch }
@@@
## Description
Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there
is backpressure and a maximum number of batched elements is not yet reached. When the maximum number is reached and
downstream still backpressures batch will also backpressure.
When backpressure starts or there is no backpressure element is passed into a `seed` function to transform it
to the summary type.
Will eagerly pull elements, this behavior may result in a single pending (i.e. buffered) element which cannot be
aggregated to the batched value.
@@@div { .callout }
**emits** when downstream stops backpressuring and there is a batched element available
**backpressures** when batched elements reached the max limit of allowed batched elements & downstream backpressures
**completes** when upstream completes and a "possibly pending" element was drained
@@@

View file

@ -0,0 +1,34 @@
# batchWeighted
Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure and a maximum weight batched elements is not yet reached.
@ref[Backpressure aware stages](../index.md#backpressure-aware-stages)
@@@ div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #batchWeighted }
@@@
## Description
Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there
is backpressure and a maximum weight batched elements is not yet reached. The weight of each element is determined by
applying `costFn`. When the maximum total weight is reached and downstream still backpressures batch will also
backpressure.
Will eagerly pull elements, this behavior may result in a single pending (i.e. buffered) element which cannot be
aggregated to the batched value.
@@@div { .callout }
**emits** downstream stops backpressuring and there is a batched element available
**backpressures** batched elements reached the max weight limit of allowed batched elements & downstream backpressures
**completes** upstream completes and a "possibly pending" element was drained
@@@

View file

@ -0,0 +1,24 @@
# broadcast
Emit each incoming element each of `n` outputs.
@ref[Fan-out stages](../index.md#fan-out-stages)
## Description
Emit each incoming element each of `n` outputs.
@@@div { .callout }
**emits** when all of the outputs stops backpressuring and there is an input element available
**backpressures** when any of the outputs backpressures
**completes** when upstream completes
**cancels** depends on the `eagerCancel` flag. If it is true, when any downstream cancels, if false, when all downstreams cancel.
@@@

View file

@ -0,0 +1,38 @@
# buffer
Allow for a temporarily faster upstream events by buffering `size` elements.
@ref[Backpressure aware stages](../index.md#backpressure-aware-stages)
@@@ div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #buffer }
@@@
## Description
Allow for a temporarily faster upstream events by buffering `size` elements. When the buffer is full, a new element is
handled according to the specified `OverflowStrategy`:
* `backpressure` backpressure is applied upstream
* `dropHead` drops the oldest element in the buffer to make space for the new element
* `dropTail` drops the youngest element in the buffer to make space for the new element
* `dropBuffer` drops the entire buffer and buffers the new element
* `dropNew` drops the new element
* `fail` fails the flow with a `BufferOverflowException`
@@@div { .callout }
**emits** when downstream stops backpressuring and there is a pending element in the buffer
**backpressures** when `OverflowStrategy` is `backpressure` and buffer is full
**completes** when upstream completes and buffered elements has been drained, or when `OverflowStrategy` is `fail`, the buffer is full and a new element arrives
@@@

View file

@ -0,0 +1,30 @@
# collect
Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #collect }
@@@
## Description
Apply a partial function to each incoming element, if the partial function is defined for a value the returned
value is passed downstream. Can often replace `filter` followed by `map` to achieve the same in one single stage.
@@@div { .callout }
**emits** when the provided partial function is defined for the element
**backpressures** the partial function is defined for the element and downstream backpressures
**completes** when upstream completes
@@@

View file

@ -0,0 +1,16 @@
# collectType
Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #collectType }
@@@

View file

@ -0,0 +1,32 @@
# completionTimeout
If the completion of the stream does not happen until the provided timeout, the stream is failed with a `TimeoutException`.
@ref[Time aware stages](../index.md#time-aware-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #completionTimeout }
@@@
## Description
If the completion of the stream does not happen until the provided timeout, the stream is failed
with a `TimeoutException`.
@@@div { .callout }
**emits** when upstream emits an element
**backpressures** when downstream backpressures
**completes** when upstream completes or fails if timeout elapses before upstream completes
**cancels** when downstream cancels
@@@

View file

@ -0,0 +1,29 @@
# concat
After completion of the original upstream the elements of the given source will be emitted.
@ref[Fan-in stages](../index.md#fan-in-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #concat }
@@@
## Description
After completion of the original upstream the elements of the given source will be emitted.
@@@div { .callout }
**emits** when the current stream has an element available; if the current input completes, it tries the next one
**backpressures** when downstream backpressures
**completes** when all upstreams complete
@@@

View file

@ -0,0 +1,31 @@
# conflate
Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure.
@ref[Backpressure aware stages](../index.md#backpressure-aware-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #conflate }
@@@
## Description
Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as
there is backpressure. The summary value must be of the same type as the incoming elements, for example the sum or
average of incoming numbers, if aggregation should lead to a different type `conflateWithSeed` can be used:
@@@div { .callout }
**emits** when downstream stops backpressuring and there is a conflated element available
**backpressures** when the aggregate function cannot keep up with incoming elements
**completes** when upstream completes
@@@

View file

@ -0,0 +1,32 @@
# conflateWithSeed
Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure.
@ref[Backpressure aware stages](../index.md#backpressure-aware-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #conflateWithSeed }
@@@
## Description
Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there
is backpressure. When backpressure starts or there is no backpressure element is passed into a `seed` function to
transform it to the summary type.
@@@div { .callout }
**emits** when downstream stops backpressuring and there is a conflated element available
**backpressures** when the aggregate or seed functions cannot keep up with incoming elements
**completes** when upstream completes
@@@

View file

@ -0,0 +1,30 @@
# delay
Delay every element passed through with a specific duration.
@ref[Timer driven stages](../index.md#timer-driven-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #delay }
@@@
## Description
Delay every element passed through with a specific duration.
@@@div { .callout }
**emits** there is a pending element in the buffer and configured time for this element elapsed
**backpressures** differs, depends on `OverflowStrategy` set
**completes** when upstream completes and buffered elements has been drained
@@@

View file

@ -0,0 +1,29 @@
# detach
Detach upstream demand from downstream demand without detaching the stream rates.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #detach }
@@@
## Description
Detach upstream demand from downstream demand without detaching the stream rates.
@@@div { .callout }
**emits** when the upstream stage has emitted and there is demand
**backpressures** when downstream backpressures
**completes** when upstream completes
@@@

View file

@ -0,0 +1,31 @@
# divertTo
Each upstream element will either be diverted to the given sink, or the downstream consumer according to the predicate function applied to the element.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #divertTo }
@@@
## Description
Each upstream element will either be diverted to the given sink, or the downstream consumer according to the predicate function applied to the element.
@@@div { .callout }
**emits** when the chosen output stops backpressuring and there is an input element available
**backpressures** when the chosen output backpressures
**completes** when upstream completes and no output is pending
**cancels** when any of the downstreams cancel
@@@

View file

@ -0,0 +1,29 @@
# drop
Drop `n` elements and then pass any subsequent element downstream.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #drop }
@@@
## Description
Drop `n` elements and then pass any subsequent element downstream.
@@@div { .callout }
**emits** when the specified number of elements has been dropped already
**backpressures** when the specified number of elements has been dropped and downstream backpressures
**completes** when upstream completes
@@@

View file

@ -0,0 +1,29 @@
# dropWhile
Drop elements as long as a predicate function return true for the element
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #dropWhile }
@@@
## Description
Drop elements as long as a predicate function return true for the element
@@@div { .callout }
**emits** when the predicate returned false and for all following stream elements
**backpressures** predicate returned false and downstream backpressures
**completes** when upstream completes
@@@

View file

@ -0,0 +1,29 @@
# dropWithin
Drop elements until a timeout has fired
@ref[Timer driven stages](../index.md#timer-driven-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #dropWithin }
@@@
## Description
Drop elements until a timeout has fired
@@@div { .callout }
**emits** after the timer fired and a new upstream element arrives
**backpressures** when downstream backpressures
**completes** upstream completes
@@@

View file

@ -0,0 +1,33 @@
# expand
Like `extrapolate`, but does not have the `initial` argument, and the `Iterator` is also used in lieu of the original element, allowing for it to be rewritten and/or filtered.
@ref[Backpressure aware stages](../index.md#backpressure-aware-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #expand }
@@@
## Description
Like `extrapolate`, but does not have the `initial` argument, and the `Iterator` is also used in lieu of the original
element, allowing for it to be rewritten and/or filtered.
See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understanding-extrapolate-and-expand) for more information
and examples.
@@@div { .callout }
**emits** when downstream stops backpressuring
**backpressures** when downstream backpressures
**completes** when upstream completes
@@@

View file

@ -0,0 +1,38 @@
# extrapolate
Allow for a faster downstream by expanding the last emitted element to an `Iterator`.
@ref[Backpressure aware stages](../index.md#backpressure-aware-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #extrapolate }
@@@
## Description
Allow for a faster downstream by expanding the last emitted element to an `Iterator`. For example, an
`Iterator.continually(element)` will cause `extrapolate` to keep repeating the last emitted element.
All original elements are always emitted unchanged - the `Iterator` is only used whenever there is downstream
demand before upstream emits a new element.
Includes an optional `initial` argument to prevent blocking the entire stream when there are multiple producers.
See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understanding-extrapolate-and-expand) for more information
and examples.
@@@div { .callout }
**emits** when downstream stops backpressuring
**backpressures** when downstream backpressures
**completes** when upstream completes
@@@

View file

@ -0,0 +1,30 @@
# filter
Filter the incoming elements using a predicate.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #filter }
@@@
## Description
Filter the incoming elements using a predicate. If the predicate returns true the element is passed downstream, if
it returns false the element is discarded.
@@@div { .callout }
**emits** when the given predicate returns true for the element
**backpressures** when the given predicate returns true for the element and downstream backpressures
**completes** when upstream completes
@@@

View file

@ -0,0 +1,30 @@
# filterNot
Filter the incoming elements using a predicate.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #filterNot }
@@@
## Description
Filter the incoming elements using a predicate. If the predicate returns false the element is passed downstream, if
it returns true the element is discarded.
@@@div { .callout }
**emits** when the given predicate returns false for the element
**backpressures** when the given predicate returns false for the element and downstream backpressures
**completes** when upstream completes
@@@

View file

@ -0,0 +1,30 @@
# flatMapConcat
Transform each input element into a `Source` whose elements are then flattened into the output stream through concatenation.
@ref[Nesting and flattening stages](../index.md#nesting-and-flattening-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #flatMapConcat }
@@@
## Description
Transform each input element into a `Source` whose elements are then flattened into the output stream through
concatenation. This means each source is fully consumed before consumption of the next source starts.
@@@div { .callout }
**emits** when the current consumed substream has an element available
**backpressures** when downstream backpressures
**completes** when upstream completes and all consumed substreams complete
@@@

View file

@ -0,0 +1,31 @@
# flatMapMerge
Transform each input element into a `Source` whose elements are then flattened into the output stream through merging.
@ref[Nesting and flattening stages](../index.md#nesting-and-flattening-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #flatMapMerge }
@@@
## Description
Transform each input element into a `Source` whose elements are then flattened into the output stream through
merging. The maximum number of merged sources has to be specified.
@@@div { .callout }
**emits** when one of the currently consumed substreams has an element available
**backpressures** when downstream backpressures
**completes** when upstream completes and all consumed substreams complete
@@@

View file

@ -0,0 +1,30 @@
# fold
Start with current value `zero` and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #fold }
@@@
## Description
Start with current value `zero` and then apply the current and next value to the given function, when upstream
complete the current value is emitted downstream.
@@@div { .callout }
**emits** when upstream completes
**backpressures** when downstream backpressures
**completes** when upstream completes
@@@

View file

@ -0,0 +1,29 @@
# foldAsync
Just like `fold` but receiving a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #foldAsync }
@@@
## Description
Just like `fold` but receiving a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.
@@@div { .callout }
**emits** when upstream completes and the last @scala[`Future`] @java[`CompletionStage`] is resolved
**backpressures** when downstream backpressures
**completes** when upstream completes and the last @scala[`Future`] @java[`CompletionStage`] is resolved
@@@

View file

@ -0,0 +1,28 @@
# groupBy
Demultiplex the incoming stream into separate output streams.
@ref[Nesting and flattening stages](../index.md#nesting-and-flattening-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #groupBy }
@@@
## Description
Demultiplex the incoming stream into separate output streams.
@@@div { .callout }
**emits** an element for which the grouping function returns a group that has not yet been created. Emits the new group
there is an element pending for a group whose substream backpressures
**completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not)
@@@

View file

@ -0,0 +1,30 @@
# grouped
Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #grouped }
@@@
## Description
Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of
elements downstream.
@@@div { .callout }
**emits** when the specified number of elements has been accumulated or upstream completed
**backpressures** when a group has been assembled and downstream backpressures
**completes** when upstream completes
@@@

View file

@ -0,0 +1,32 @@
# groupedWeightedWithin
Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, whatever happens first.
@ref[Timer driven stages](../index.md#timer-driven-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #groupedWeightedWithin }
@@@
## Description
Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements,
whatever happens first. Empty groups will not be emitted if no elements are received from upstream.
The last group before end-of-stream will contain the buffered elements since the previously emitted group.
@@@div { .callout }
**emits** when the configured time elapses since the last group has been emitted,
but not if no elements has been grouped (i.e: no empty groups), or when weight limit has been reached.
**backpressures** downstream backpressures, and buffered group (+ pending element) weighs more than *maxWeight*
**completes** when upstream completes
@@@

View file

@ -0,0 +1,32 @@
# groupedWithin
Chunk up this stream into groups of elements received within a time window, or limited by the number of the elements, whatever happens first.
@ref[Timer driven stages](../index.md#timer-driven-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #groupedWithin }
@@@
## Description
Chunk up this stream into groups of elements received within a time window, or limited by the number of the elements,
whatever happens first. Empty groups will not be emitted if no elements are received from upstream.
The last group before end-of-stream will contain the buffered elements since the previously emitted group.
@@@div { .callout }
**emits** when the configured time elapses since the last group has been emitted,
but not if no elements has been grouped (i.e: no empty groups), or when limit has been reached.
**backpressures** downstream backpressures, and there are *n+1* buffered elements
**completes** when upstream completes
@@@

View file

@ -0,0 +1,33 @@
# idleTimeout
If the time between two processed elements exceeds the provided timeout, the stream is failed with a `TimeoutException`.
@ref[Time aware stages](../index.md#time-aware-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #idleTimeout }
@@@
## Description
If the time between two processed elements exceeds the provided timeout, the stream is failed
with a `TimeoutException`. The timeout is checked periodically, so the resolution of the
check is one period (equals to timeout value).
@@@div { .callout }
**emits** when upstream emits an element
**backpressures** when downstream backpressures
**completes** when upstream completes or fails if timeout elapses between two emitted elements
**cancels** when downstream cancels
@@@

View file

@ -0,0 +1,31 @@
# initialDelay
Delays the initial element by the specified duration.
@ref[Timer driven stages](../index.md#timer-driven-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #initialDelay }
@@@
## Description
Delays the initial element by the specified duration.
@@@div { .callout }
**emits** when upstream emits an element if the initial delay is already elapsed
**backpressures** when downstream backpressures or initial delay is not yet elapsed
**completes** when upstream completes
**cancels** when downstream cancels
@@@

View file

@ -0,0 +1,32 @@
# initialTimeout
If the first element has not passed through this stage before the provided timeout, the stream is failed with a `TimeoutException`.
@ref[Time aware stages](../index.md#time-aware-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #initialTimeout }
@@@
## Description
If the first element has not passed through this stage before the provided timeout, the stream is failed
with a `TimeoutException`.
@@@div { .callout }
**emits** when upstream emits an element
**backpressures** when downstream backpressures
**completes** when upstream completes or fails if timeout elapses before first element arrives
**cancels** when downstream cancels
@@@

View file

@ -0,0 +1,30 @@
# interleave
Emits a specifiable number of elements from the original source, then from the provided source and repeats.
@ref[Fan-in stages](../index.md#fan-in-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #interleave }
@@@
## Description
Emits a specifiable number of elements from the original source, then from the provided source and repeats. If one
source completes the rest of the other stream will be emitted.
@@@div { .callout }
**emits** when element is available from the currently consumed upstream
**backpressures** when upstream backpressures
**completes** when both upstreams have completed
@@@

View file

@ -0,0 +1,29 @@
# intersperse
Intersperse stream with provided element similar to `List.mkString`.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #intersperse }
@@@
## Description
Intersperse stream with provided element similar to `List.mkString`. It can inject start and end marker elements to stream.
@@@div { .callout }
**emits** when upstream emits an element or before with the *start* element if provided
**backpressures** when downstream backpressures
**completes** when upstream completes
@@@

View file

@ -0,0 +1,31 @@
# keepAlive
Injects additional (configured) elements if upstream does not emit for a configured amount of time.
@ref[Time aware stages](../index.md#time-aware-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #keepAlive }
@@@
## Description
Injects additional (configured) elements if upstream does not emit for a configured amount of time.
@@@div { .callout }
**emits** when upstream emits an element or if the upstream was idle for the configured period
**backpressures** when downstream backpressures
**completes** when upstream completes
**cancels** when downstream cancels
@@@

View file

@ -0,0 +1,29 @@
# limit
Limit number of element from upstream to given `max` number.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #limit }
@@@
## Description
Limit number of element from upstream to given `max` number.
@@@div { .callout }
**emits** when upstream emits and the number of emitted elements has not reached max
**backpressures** when downstream backpressures
**completes** when upstream completes and the number of emitted elements has not reached max
@@@

View file

@ -0,0 +1,30 @@
# limitWeighted
Ensure stream boundedness by evaluating the cost of incoming elements using a cost function.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #limitWeighted }
@@@
## Description
Ensure stream boundedness by evaluating the cost of incoming elements using a cost function.
Evaluated cost of each element defines how many elements will be allowed to travel downstream.
@@@div { .callout }
**emits** when upstream emits and the number of emitted elements has not reached max
**backpressures** when downstream backpressures
**completes** when upstream completes and the number of emitted elements has not reached max
@@@

View file

@ -0,0 +1,31 @@
# log
Log elements flowing through the stream as well as completion and erroring.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #log }
@@@
## Description
Log elements flowing through the stream as well as completion and erroring. By default element and
completion signals are logged on debug level, and errors are logged on Error level.
This can be changed by calling @scala[`Attributes.logLevels(...)`] @java[`Attributes.createLogLevels(...)`] on the given Flow.
@@@div { .callout }
**emits** when upstream emits
**backpressures** when downstream backpressures
**completes** when upstream completes
@@@

View file

@ -0,0 +1,30 @@
# map
Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #map }
@@@
## Description
Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.
@@@div { .callout }
**emits** when the mapping function returns an element
**backpressures** when downstream backpressures
**completes** when upstream completes
@@@

View file

@ -0,0 +1,33 @@
# mapAsync
Pass incoming elements to a function that return a @scala[`Future`] @java[`CompletionStage`] result.
@ref[Asynchronous processing stages](../index.md#asynchronous-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #mapAsync }
@@@
## Description
Pass incoming elements to a function that return a @scala[`Future`] @java[`CompletionStage`] result. When the @scala[`Future`] @java[`CompletionStage`] arrives the result is passed
downstream. Up to `n` elements can be processed concurrently, but regardless of their completion time the incoming
order will be kept when results complete. For use cases where order does not matter `mapAsyncUnordered` can be used.
If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unless a different supervision strategy is applied)
@@@div { .callout }
**emits** when the @scala[`Future`] @java[`CompletionStage`] returned by the provided function finishes for the next element in sequence
**backpressures** when the number of @scala[`Future` s] @java[`CompletionStage` s] reaches the configured parallelism and the downstream backpressures
**completes** when upstream completes and all @scala[`Future` s] @java[`CompletionStage` s] has been completed and all elements has been emitted
@@@

View file

@ -0,0 +1,32 @@
# mapAsyncUnordered
Like `mapAsync` but @scala[`Future`] @java[`CompletionStage`] results are passed downstream as they arrive regardless of the order of the elements that triggered them.
@ref[Asynchronous processing stages](../index.md#asynchronous-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #mapAsyncUnordered }
@@@
## Description
Like `mapAsync` but @scala[`Future`] @java[`CompletionStage`] results are passed downstream as they arrive regardless of the order of the elements
that triggered them.
If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unless a different supervision strategy is applied)
@@@div { .callout }
**emits** any of the @scala[`Future` s] @java[`CompletionStage` s] returned by the provided function complete
**backpressures** when the number of @scala[`Future` s] @java[`CompletionStage` s] reaches the configured parallelism and the downstream backpressures
**completes** upstream completes and all @scala[`Future` s] @java[`CompletionStage` s] has been completed and all elements has been emitted
@@@

View file

@ -0,0 +1,29 @@
# mapConcat
Transform each element into zero or more elements that are individually passed downstream.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #mapConcat }
@@@
## Description
Transform each element into zero or more elements that are individually passed downstream.
@@@div { .callout }
**emits** when the mapping function returns an element or there are still remaining elements from the previously calculated collection
**backpressures** when downstream backpressures or there are still available elements from the previously calculated collection
**completes** when upstream completes and all remaining elements has been emitted
@@@

View file

@ -0,0 +1,34 @@
# mapError
While similar to `recover` this stage can be used to transform an error signal to a different one *without* logging it as an error in the process.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #mapError }
@@@
## Description
While similar to `recover` this stage can be used to transform an error signal to a different one *without* logging
it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
would log the `t2` error.
Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
This stage can recover the failure signal, but not the skipped elements, which will be dropped.
Similarily to `recover` throwing an exception inside `mapError` _will_ be logged on ERROR level automatically.
@@@div { .callout }
**emits** when element is available from the upstream or upstream is failed and pf returns an element
**backpressures** when downstream backpressures
**completes** when upstream completes or upstream failed with exception pf can handle
@@@

View file

@ -0,0 +1,29 @@
# merge
Merge multiple sources.
@ref[Fan-in stages](../index.md#fan-in-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #merge }
@@@
## Description
Merge multiple sources. Picks elements randomly if all sources has elements ready.
@@@div { .callout }
**emits** when one of the inputs has an element available
**backpressures** when downstream backpressures
**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
@@@

View file

@ -0,0 +1,23 @@
# mergePreferred
Merge multiple sources.
@ref[Fan-in stages](../index.md#fan-in-stages)
## Signature
## Description
Merge multiple sources. Prefer one source if all sources has elements ready.
@@@div { .callout }
**emits** when one of the inputs has an element available, preferring a defined input if multiple have elements available
**backpressures** when downstream backpressures
**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
@@@

View file

@ -0,0 +1,24 @@
# mergePrioritized
Merge multiple sources.
@ref[Fan-in stages](../index.md#fan-in-stages)
## Signature
## Description
Merge multiple sources. Prefer sources depending on priorities if all sources has elements ready. If a subset of all
sources has elements ready the relative priorities for those sources are used to prioritise.
@@@div { .callout }
**emits** when one of the inputs has an element available, preferring inputs based on their priorities if multiple have elements available
**backpressures** when downstream backpressures
**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
@@@

View file

@ -0,0 +1,30 @@
# mergeSorted
Merge multiple sources.
@ref[Fan-in stages](../index.md#fan-in-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #mergeSorted }
@@@
## Description
Merge multiple sources. Waits for one element to be ready from each input stream and emits the
smallest element.
@@@div { .callout }
**emits** when all of the inputs have an element available
**backpressures** when downstream backpressures
**completes** when all upstreams complete
@@@

View file

@ -0,0 +1,31 @@
# monitor
Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the stage.
@ref[Watching status stages](../index.md#watching-status-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #monitor }
@@@
## Description
Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the stage. The stage otherwise
passes through elements unchanged. Note that the `FlowMonitor` inserts a memory barrier every time it processes an
event, and may therefore affect performance.
@@@div { .callout }
**emits** when upstream emits an element
**backpressures** when downstream **backpressures**
**completes** when upstream completes
@@@

View file

@ -0,0 +1,37 @@
# orElse
If the primary source completes without emitting any elements, the elements from the secondary source are emitted.
@ref[Fan-in stages](../index.md#fan-in-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #orElse }
@@@
## Description
If the primary source completes without emitting any elements, the elements from the secondary source
are emitted. If the primary source emits any elements the secondary source is cancelled.
Note that both sources are materialized directly and the secondary source is backpressured until it becomes
the source of elements or is cancelled.
Signal errors downstream, regardless which of the two sources emitted the error.
@@@div { .callout }
**emits** when an element is available from first stream or first stream closed without emitting any elements and an element
is available from the second stream
**backpressures** when downstream backpressures
**completes** the primary stream completes after emitting at least one element, when the primary stream completes
without emitting and the secondary stream already has completed or when the secondary stream completes
@@@

View file

@ -0,0 +1,26 @@
# partition
Fan-out the stream to several streams.
@ref[Fan-out stages](../index.md#fan-out-stages)
## Signature
## Description
Fan-out the stream to several streams. Each upstream element is emitted to one downstream consumer according to the
partitioner function applied to the element.
@@@div { .callout }
**emits** when the chosen output stops backpressuring and there is an input element available
**backpressures** when the chosen output backpressures
**completes** when upstream completes and no output is pending
**cancels** depends on the `eagerCancel` flag. If it is true, when any downstream cancels, if false, when all downstreams cancel.
@@@

View file

@ -0,0 +1,31 @@
# prefixAndTail
Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements.
@ref[Nesting and flattening stages](../index.md#nesting-and-flattening-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #prefixAndTail }
@@@
## Description
Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements)
and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements.
@@@div { .callout }
**emits** when the configured number of prefix elements are available. Emits this prefix, and the rest as a substream
**backpressures** when downstream backpressures or substream backpressures
**completes** when prefix elements has been consumed and substream has been consumed
@@@

View file

@ -0,0 +1,31 @@
# prepend
Prepends the given source to the flow, consuming it until completion before the original source is consumed.
@ref[Fan-in stages](../index.md#fan-in-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #prepend }
@@@
## Description
Prepends the given source to the flow, consuming it until completion before the original source is consumed.
If materialized values needs to be collected `prependMat` is available.
@@@div { .callout }
**emits** when the given stream has an element available; if the given input completes, it tries the current one
**backpressures** when downstream backpressures
**completes** when all upstreams complete
@@@

View file

@ -0,0 +1,31 @@
# recover
Allow sending of one last element downstream when a failure has happened upstream.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #recover }
@@@
## Description
Allow sending of one last element downstream when a failure has happened upstream.
Throwing an exception inside `recover` _will_ be logged on ERROR level automatically.
@@@div { .callout }
**emits** when the element is available from the upstream or upstream is failed and pf returns an element
**backpressures** when downstream backpressures, not when failure happened
**completes** when upstream completes or upstream failed with exception pf can handle
@@@

View file

@ -0,0 +1,31 @@
# recoverWith
Allow switching to alternative Source when a failure has happened upstream.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #recoverWith }
@@@
## Description
Allow switching to alternative Source when a failure has happened upstream.
Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
@@@div { .callout }
**emits** the element is available from the upstream or upstream is failed and pf returns alternative Source
**backpressures** downstream backpressures, after failure happened it backprssures to alternative Source
**completes** upstream completes or upstream failed with exception pf can handle
@@@

View file

@ -0,0 +1,35 @@
# recoverWithRetries
RecoverWithRetries allows to switch to alternative Source on flow failure.
@ref[Simple processing stages](../index.md#simple-processing-stages)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala]($akka$/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #recoverWithRetries }
@@@
## Description
RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
a failure has been recovered up to *attempts* number of times so that each time there is a failure
it is fed into the *pf* and a new Source may be materialized. Note that if you pass in 0, this won't
attempt to recover at all. A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`.
Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
This stage can recover the failure signal, but not the skipped elements, which will be dropped.
@@@div { .callout }
**emits** when element is available from the upstream or upstream is failed and element is available from alternative Source
**backpressures** when downstream backpressures
**completes** when upstream completes or upstream failed with exception pf can handle
@@@

Some files were not shown because too many files have changed in this diff Show more