diff --git a/akka-docs/src/main/paradox/stream/reference/++.md b/akka-docs/src/main/paradox/stream/reference/++.md new file mode 100644 index 0000000000..b075b34d04 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/++.md @@ -0,0 +1,23 @@ +# ++ + +++ + +## Signature + +## Description + +Just a shorthand for concat + + +@@@div { .callout } + +**emits** when the current stream has an element available; if the current input completes, it tries the next one + +**backpressures** when downstream backpressures + +**completes** when all upstreams complete + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/Flow.fromSinkAndSource.md b/akka-docs/src/main/paradox/stream/reference/Flow.fromSinkAndSource.md new file mode 100644 index 0000000000..2ae391bf98 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/Flow.fromSinkAndSource.md @@ -0,0 +1,17 @@ +# Flow.fromSinkAndSource + +Creates a `Flow` from a `Sink` and a `Source` where the Flow's input will be sent to the `Sink` +and the `Flow` 's output will come from the Source. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/Flow.fromSinkAndSourceCoupled.md b/akka-docs/src/main/paradox/stream/reference/Flow.fromSinkAndSourceCoupled.md new file mode 100644 index 0000000000..5a6bce528d --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/Flow.fromSinkAndSourceCoupled.md @@ -0,0 +1,16 @@ +# Flow.fromSinkAndSourceCoupled + +Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow between them. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/actorRef.md b/akka-docs/src/main/paradox/stream/reference/actorRef.md new file mode 100644 index 0000000000..180a495c94 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/actorRef.md @@ -0,0 +1,21 @@ +# actorRef + +Send the elements from the stream to an `ActorRef`. + +## Signature + +## Description + +Send the elements from the stream to an `ActorRef`. No backpressure so care must be taken to not overflow the inbox. + + +@@@div { .callout } + +**cancels** when the actor terminates + +**backpressures** never + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/actorRefWithAck.md b/akka-docs/src/main/paradox/stream/reference/actorRefWithAck.md new file mode 100644 index 0000000000..3743a56c46 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/actorRefWithAck.md @@ -0,0 +1,23 @@ +# actorRefWithAck + +Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, +to provide back pressure onto the sink. + +## Signature + +## Description + +Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message, +to provide back pressure onto the sink. + + +@@@div { .callout } + +**cancels** when the actor terminates + +**backpressures** when the actor acknowledgement has not arrived + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/alsoTo.md b/akka-docs/src/main/paradox/stream/reference/alsoTo.md new file mode 100644 index 0000000000..dfb12efbfb --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/alsoTo.md @@ -0,0 +1,25 @@ +# alsoTo + +Attaches the given `Sink` to this `Flow`, meaning that elements that pass through this `Flow` will also be sent to the `Sink`. + +## Signature + +## Description + +Attaches the given `Sink` to this `Flow`, meaning that elements that pass through this `Flow` will also be sent to the `Sink`. + + +@@@div { .callout } + +**emits** when an element is available and demand exists both from the `Sink` and the downstream + +**backpressures** when downstream or `Sink` backpressures + +**completes** when upstream completes + +**cancels** when downstream or `Sink` cancels + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/apply.md b/akka-docs/src/main/paradox/stream/reference/apply.md new file mode 100644 index 0000000000..f260dee7cd --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/apply.md @@ -0,0 +1,32 @@ +# apply + +Stream the values of an `immutable. + +## Signature + +## Description + +Stream the values of an `immutable.Seq`. + + +@@@div { .callout } + +**emits** the next value of the seq + +**completes** when the last element of the seq has been emitted + +@@@ + +@@@ div { .group-java } + +### from + +Stream the values of an `Iterable`. Make sure the `Iterable` is immutable or at least not modified after being used +as a source. + +@@@ + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/asInputStream.md b/akka-docs/src/main/paradox/stream/reference/asInputStream.md new file mode 100644 index 0000000000..5fa4698f9d --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/asInputStream.md @@ -0,0 +1,16 @@ +# asInputStream + +Create a sink which materializes into an `InputStream` that can be read to trigger demand through the sink. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/asJavaStream.md b/akka-docs/src/main/paradox/stream/reference/asJavaStream.md new file mode 100644 index 0000000000..3b9dec9af8 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/asJavaStream.md @@ -0,0 +1,16 @@ +# asJavaStream + +Create a sink which materializes into Java 8 `Stream` that can be run to trigger demand through the sink. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/asOutputStream.md b/akka-docs/src/main/paradox/stream/reference/asOutputStream.md new file mode 100644 index 0000000000..a553460bdd --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/asOutputStream.md @@ -0,0 +1,16 @@ +# asOutputStream + +Create a source that materializes into an `OutputStream`. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/asPublisher.md b/akka-docs/src/main/paradox/stream/reference/asPublisher.md new file mode 100644 index 0000000000..7beeca46b9 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/asPublisher.md @@ -0,0 +1,16 @@ +# asPublisher + +Integration with Reactive Streams, materializes into a `org. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/asSubscriber.md b/akka-docs/src/main/paradox/stream/reference/asSubscriber.md new file mode 100644 index 0000000000..326ef41ee1 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/asSubscriber.md @@ -0,0 +1,16 @@ +# asSubscriber + +Integration with Reactive Streams, materializes into a `org. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/ask.md b/akka-docs/src/main/paradox/stream/reference/ask.md new file mode 100644 index 0000000000..76378f30ee --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/ask.md @@ -0,0 +1,33 @@ +# ask + +Use the `ask` pattern to send a request-reply message to the target `ref` actor. + +## Signature + +## Description + +Use the `ask` pattern to send a request-reply message to the target `ref` actor. +If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]]. + +The `mapTo` class parameter is used to cast the incoming responses to the expected response type. + +Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`. +An `akka.util.Status#Failure` will cause the stage to fail with the cause carried in the `Failure` message. + +Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + + +@@@div { .callout } + +**emits** when the ask @scala[`Future`] @java[`CompletionStage`] returned by the provided function finishes for the next element in sequence + + +**backpressures** when the number of ask @scala[`Future` s] @java[`CompletionStage` s] reaches the configured parallelism and the downstream backpressures + +**completes** when upstream completes and all ask @scala[`Future` s] @java[`CompletionStage` s] has been completed and all elements has been emitted + + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/backpressureTimeout.md b/akka-docs/src/main/paradox/stream/reference/backpressureTimeout.md new file mode 100644 index 0000000000..2dc196e79a --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/backpressureTimeout.md @@ -0,0 +1,28 @@ +# backpressureTimeout + +If the time between the emission of an element and the following downstream demand exceeds the provided timeout, +the stream is failed with a `TimeoutException`. + +## Signature + +## Description + +If the time between the emission of an element and the following downstream demand exceeds the provided timeout, +the stream is failed with a `TimeoutException`. The timeout is checked periodically, so the resolution of the +check is one period (equals to timeout value). + + +@@@div { .callout } + +**emits** when upstream emits an element + +**backpressures** when downstream backpressures + +**completes** when upstream completes or fails if timeout elapses between element emission and downstream demand. + +**cancels** when downstream cancels + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/balance.md b/akka-docs/src/main/paradox/stream/reference/balance.md new file mode 100644 index 0000000000..b66fcf309f --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/balance.md @@ -0,0 +1,25 @@ +# balance + +Fan-out the stream to several streams. + +## Signature + +## Description + +Fan-out the stream to several streams. Each upstream element is emitted to the first available downstream consumer. + + +@@@div { .callout } + +**emits** when any of the outputs stops backpressuring; emits the element to the first available output + +**backpressures** when all of the outputs backpressure + +**completes** when upstream completes + +**cancels** depends on the `eagerCancel` flag. If it is true, when any downstream cancels, if false, when all downstreams cancel. + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/batch.md b/akka-docs/src/main/paradox/stream/reference/batch.md new file mode 100644 index 0000000000..29eb5ebdff --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/batch.md @@ -0,0 +1,32 @@ +# batch + +Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there +is backpressure and a maximum number of batched elements is not yet reached. + +## Signature + +## Description + +Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there +is backpressure and a maximum number of batched elements is not yet reached. When the maximum number is reached and +downstream still backpressures batch will also backpressure. + +When backpressure starts or there is no backpressure element is passed into a `seed` function to transform it +to the summary type. + +Will eagerly pull elements, this behavior may result in a single pending (i.e. buffered) element which cannot be +aggregated to the batched value. + + +@@@div { .callout } + +**emits** when downstream stops backpressuring and there is a batched element available + +**backpressures** when batched elements reached the max limit of allowed batched elements & downstream backpressures + +**completes** when upstream completes and a "possibly pending" element was drained + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/batchWeighted.md b/akka-docs/src/main/paradox/stream/reference/batchWeighted.md new file mode 100644 index 0000000000..d1bfd5d560 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/batchWeighted.md @@ -0,0 +1,30 @@ +# batchWeighted + +Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there +is backpressure and a maximum weight batched elements is not yet reached. + +## Signature + +## Description + +Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there +is backpressure and a maximum weight batched elements is not yet reached. The weight of each element is determined by +applying `costFn`. When the maximum total weight is reached and downstream still backpressures batch will also +backpressure. + +Will eagerly pull elements, this behavior may result in a single pending (i.e. buffered) element which cannot be +aggregated to the batched value. + + +@@@div { .callout } + +**emits** downstream stops backpressuring and there is a batched element available + +**backpressures** batched elements reached the max weight limit of allowed batched elements & downstream backpressures + +**completes** upstream completes and a "possibly pending" element was drained + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/broadcast.md b/akka-docs/src/main/paradox/stream/reference/broadcast.md new file mode 100644 index 0000000000..85a157ef47 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/broadcast.md @@ -0,0 +1,25 @@ +# broadcast + +Emit each incoming element each of `n` outputs. + +## Signature + +## Description + +Emit each incoming element each of `n` outputs. + + +@@@div { .callout } + +**emits** when all of the outputs stops backpressuring and there is an input element available + +**backpressures** when any of the outputs backpressures + +**completes** when upstream completes + +**cancels** depends on the `eagerCancel` flag. If it is true, when any downstream cancels, if false, when all downstreams cancel. + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/bufferBackpressure.md b/akka-docs/src/main/paradox/stream/reference/bufferBackpressure.md new file mode 100644 index 0000000000..f005ffba41 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/bufferBackpressure.md @@ -0,0 +1,24 @@ +# buffer (Backpressure) + +Allow for a temporarily faster upstream events by buffering `size` elements. + +## Signature + +## Description + +Allow for a temporarily faster upstream events by buffering `size` elements. When the buffer is full backpressure +is applied. + + +@@@div { .callout } + +**emits** when downstream stops backpressuring and there is a pending element in the buffer + +**backpressures** when buffer is full + +**completes** when upstream completes and buffered elements has been drained + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/bufferDrop.md b/akka-docs/src/main/paradox/stream/reference/bufferDrop.md new file mode 100644 index 0000000000..b8cbba484b --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/bufferDrop.md @@ -0,0 +1,29 @@ +# buffer (Drop) + +Allow for a temporarily faster upstream events by buffering `size` elements. + +## Signature + +## Description + +Allow for a temporarily faster upstream events by buffering `size` elements. When the buffer is full elements are +dropped according to the specified `OverflowStrategy`: + + * `dropHead` drops the oldest element in the buffer to make space for the new element + * `dropTail` drops the youngest element in the buffer to make space for the new element + * `dropBuffer` drops the entire buffer and buffers the new element + * `dropNew` drops the new element + + +@@@div { .callout } + +**emits** when downstream stops backpressuring and there is a pending element in the buffer + +**backpressures** never (when dropping cannot keep up with incoming elements) + +**completes** upstream completes and buffered elements has been drained + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/bufferFail.md b/akka-docs/src/main/paradox/stream/reference/bufferFail.md new file mode 100644 index 0000000000..9f309daec3 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/bufferFail.md @@ -0,0 +1,24 @@ +# buffer (Fail) + +Allow for a temporarily faster upstream events by buffering `size` elements. + +## Signature + +## Description + +Allow for a temporarily faster upstream events by buffering `size` elements. When the buffer is full the stage fails +the flow with a `BufferOverflowException`. + + +@@@div { .callout } + +**emits** when downstream stops backpressuring and there is a pending element in the buffer + +**backpressures** never, fails the stream instead of backpressuring when buffer is full + +**completes** when upstream completes and buffered elements has been drained + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/cancelled.md b/akka-docs/src/main/paradox/stream/reference/cancelled.md new file mode 100644 index 0000000000..35243bbbea --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/cancelled.md @@ -0,0 +1,19 @@ +# cancelled + +cancelled + +## Signature + +## Description + +Immediately cancel the stream + + +@@@div { .callout } + +**cancels** immediately + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/collect.md b/akka-docs/src/main/paradox/stream/reference/collect.md new file mode 100644 index 0000000000..372556182a --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/collect.md @@ -0,0 +1,25 @@ +# collect + +Apply a partial function to each incoming element, if the partial function is defined for a value the returned +value is passed downstream. + +## Signature + +## Description + +Apply a partial function to each incoming element, if the partial function is defined for a value the returned +value is passed downstream. Can often replace `filter` followed by `map` to achieve the same in one single stage. + + +@@@div { .callout } + +**emits** when the provided partial function is defined for the element + +**backpressures** the partial function is defined for the element and downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/collectType.md b/akka-docs/src/main/paradox/stream/reference/collectType.md new file mode 100644 index 0000000000..eba512458a --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/collectType.md @@ -0,0 +1,17 @@ +# collectType + +Transform this stream by testing the type of each of the elements on which the element is an instance of +the provided type as they pass through this processing step. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/combine.md b/akka-docs/src/main/paradox/stream/reference/combine.md new file mode 100644 index 0000000000..28a73896f9 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/combine.md @@ -0,0 +1,21 @@ +# combine + +combine + +## Signature + +## Description + +Combine several sinks into one using a user specified strategy + + +@@@div { .callout } + +**cancels** depends on the strategy + +**backpressures** depends on the strategy + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/completionTimeout.md b/akka-docs/src/main/paradox/stream/reference/completionTimeout.md new file mode 100644 index 0000000000..eced4103bc --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/completionTimeout.md @@ -0,0 +1,27 @@ +# completionTimeout + +If the completion of the stream does not happen until the provided timeout, the stream is failed +with a `TimeoutException`. + +## Signature + +## Description + +If the completion of the stream does not happen until the provided timeout, the stream is failed +with a `TimeoutException`. + + +@@@div { .callout } + +**emits** when upstream emits an element + +**backpressures** when downstream backpressures + +**completes** when upstream completes or fails if timeout elapses before upstream completes + +**cancels** when downstream cancels + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/concat.md b/akka-docs/src/main/paradox/stream/reference/concat.md new file mode 100644 index 0000000000..4028d1f295 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/concat.md @@ -0,0 +1,23 @@ +# concat + +After completion of the original upstream the elements of the given source will be emitted. + +## Signature + +## Description + +After completion of the original upstream the elements of the given source will be emitted. + + +@@@div { .callout } + +**emits** when the current stream has an element available; if the current input completes, it tries the next one + +**backpressures** when downstream backpressures + +**completes** when all upstreams complete + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/conflate.md b/akka-docs/src/main/paradox/stream/reference/conflate.md new file mode 100644 index 0000000000..8bd56fad2d --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/conflate.md @@ -0,0 +1,26 @@ +# conflate + +Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as +there is backpressure. + +## Signature + +## Description + +Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as +there is backpressure. The summary value must be of the same type as the incoming elements, for example the sum or +average of incoming numbers, if aggregation should lead to a different type `conflateWithSeed` can be used: + + +@@@div { .callout } + +**emits** when downstream stops backpressuring and there is a conflated element available + +**backpressures** when the aggregate function cannot keep up with incoming elements + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/conflateWithSeed.md b/akka-docs/src/main/paradox/stream/reference/conflateWithSeed.md new file mode 100644 index 0000000000..84a852bc04 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/conflateWithSeed.md @@ -0,0 +1,26 @@ +# conflateWithSeed + +Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there +is backpressure. + +## Signature + +## Description + +Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there +is backpressure. When backpressure starts or there is no backpressure element is passed into a `seed` function to +transform it to the summary type. + + +@@@div { .callout } + +**emits** when downstream stops backpressuring and there is a conflated element available + +**backpressures** when the aggregate or seed functions cannot keep up with incoming elements + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/cycle.md b/akka-docs/src/main/paradox/stream/reference/cycle.md new file mode 100644 index 0000000000..8103a2eaa5 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/cycle.md @@ -0,0 +1,24 @@ +# cycle + +Stream iterator in cycled manner. + +## Signature + +## Description + +Stream iterator in cycled manner. Internally new iterator is being created to cycle the one provided via argument meaning +when original iterator runs out of elements process will start all over again from the beginning of the iterator +provided by the evaluation of provided parameter. If method argument provides empty iterator stream will be terminated with +exception. + + +@@@div { .callout } + +**emits** the next value returned from cycled iterator + +**completes** never + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/delay.md b/akka-docs/src/main/paradox/stream/reference/delay.md new file mode 100644 index 0000000000..dd004e905e --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/delay.md @@ -0,0 +1,24 @@ +# delay + +Delay every element passed through with a specific duration. + +## Signature + +## Description + +Delay every element passed through with a specific duration. + + +@@@div { .callout } + +**emits** there is a pending element in the buffer and configured time for this element elapsed + +**backpressures** differs, depends on `OverflowStrategy` set + +**completes** when upstream completes and buffered elements has been drained + + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/detach.md b/akka-docs/src/main/paradox/stream/reference/detach.md new file mode 100644 index 0000000000..80467e2c56 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/detach.md @@ -0,0 +1,23 @@ +# detach + +Detach upstream demand from downstream demand without detaching the stream rates. + +## Signature + +## Description + +Detach upstream demand from downstream demand without detaching the stream rates. + + +@@@div { .callout } + +**emits** when the upstream stage has emitted and there is demand + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/divertTo.md b/akka-docs/src/main/paradox/stream/reference/divertTo.md new file mode 100644 index 0000000000..0dca817312 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/divertTo.md @@ -0,0 +1,25 @@ +# divertTo + +Each upstream element will either be diverted to the given sink, or the downstream consumer according to the predicate function applied to the element. + +## Signature + +## Description + +Each upstream element will either be diverted to the given sink, or the downstream consumer according to the predicate function applied to the element. + + +@@@div { .callout } + +**emits** when the chosen output stops backpressuring and there is an input element available + +**backpressures** when the chosen output backpressures + +**completes** when upstream completes and no output is pending + +**cancels** when any of the downstreams cancel + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/drop.md b/akka-docs/src/main/paradox/stream/reference/drop.md new file mode 100644 index 0000000000..f653ae32d8 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/drop.md @@ -0,0 +1,23 @@ +# drop + +Drop `n` elements and then pass any subsequent element downstream. + +## Signature + +## Description + +Drop `n` elements and then pass any subsequent element downstream. + + +@@@div { .callout } + +**emits** when the specified number of elements has been dropped already + +**backpressures** when the specified number of elements has been dropped and downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/dropWhile.md b/akka-docs/src/main/paradox/stream/reference/dropWhile.md new file mode 100644 index 0000000000..559ddbd3b0 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/dropWhile.md @@ -0,0 +1,23 @@ +# dropWhile + +dropWhile + +## Signature + +## Description + +Drop elements as long as a predicate function return true for the element + + +@@@div { .callout } + +**emits** when the predicate returned false and for all following stream elements + +**backpressures** predicate returned false and downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/dropWithin.md b/akka-docs/src/main/paradox/stream/reference/dropWithin.md new file mode 100644 index 0000000000..1efb305e84 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/dropWithin.md @@ -0,0 +1,23 @@ +# dropWithin + +dropWithin + +## Signature + +## Description + +Drop elements until a timeout has fired + + +@@@div { .callout } + +**emits** after the timer fired and a new upstream element arrives + +**backpressures** when downstream backpressures + +**completes** upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/empty.md b/akka-docs/src/main/paradox/stream/reference/empty.md new file mode 100644 index 0000000000..4d3ab25a2f --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/empty.md @@ -0,0 +1,22 @@ +# empty + +Complete right away without ever emitting any elements. + +## Signature + +## Description + +Complete right away without ever emitting any elements. Useful when you have to provide a source to +an API but there are no elements to emit. + + +@@@div { .callout } + +**emits** never + +**completes** directly + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/expand.md b/akka-docs/src/main/paradox/stream/reference/expand.md new file mode 100644 index 0000000000..c36fd2131d --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/expand.md @@ -0,0 +1,28 @@ +# expand + +Like `extrapolate`, but does not have the `initial` argument, and the `Iterator` is also used in lieu of the original +element, allowing for it to be rewritten and/or filtered. + +## Signature + +## Description + +Like `extrapolate`, but does not have the `initial` argument, and the `Iterator` is also used in lieu of the original +element, allowing for it to be rewritten and/or filtered. + +See @ref:[Understanding extrapolate and expand](../stream-rate.md#understanding-extrapolate-and-expand) for more information +and examples. + + +@@@div { .callout } + +**emits** when downstream stops backpressuring + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/extrapolate.md b/akka-docs/src/main/paradox/stream/reference/extrapolate.md new file mode 100644 index 0000000000..8c9a815ae5 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/extrapolate.md @@ -0,0 +1,32 @@ +# extrapolate + +Allow for a faster downstream by expanding the last emitted element to an `Iterator`. + +## Signature + +## Description + +Allow for a faster downstream by expanding the last emitted element to an `Iterator`. For example, an +`Iterator.continually(element)` will cause `extrapolate` to keep repeating the last emitted element. + +All original elements are always emitted unchanged - the `Iterator` is only used whenever there is downstream + demand before upstream emits a new element. + +Includes an optional `initial` argument to prevent blocking the entire stream when there are multiple producers. + +See @ref:[Understanding extrapolate and expand](../stream-rate.md#understanding-extrapolate-and-expand) for more information +and examples. + + +@@@div { .callout } + +**emits** when downstream stops backpressuring + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/failed.md b/akka-docs/src/main/paradox/stream/reference/failed.md new file mode 100644 index 0000000000..fe8cffa887 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/failed.md @@ -0,0 +1,21 @@ +# failed + +Fail directly with a user specified exception. + +## Signature + +## Description + +Fail directly with a user specified exception. + + +@@@div { .callout } + +**emits** never + +**completes** fails the stream directly with the given exception + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/filter.md b/akka-docs/src/main/paradox/stream/reference/filter.md new file mode 100644 index 0000000000..0f277fd7c7 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/filter.md @@ -0,0 +1,24 @@ +# filter + +Filter the incoming elements using a predicate. + +## Signature + +## Description + +Filter the incoming elements using a predicate. If the predicate returns true the element is passed downstream, if +it returns false the element is discarded. + + +@@@div { .callout } + +**emits** when the given predicate returns true for the element + +**backpressures** when the given predicate returns true for the element and downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/filterNot.md b/akka-docs/src/main/paradox/stream/reference/filterNot.md new file mode 100644 index 0000000000..60b43230e7 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/filterNot.md @@ -0,0 +1,24 @@ +# filterNot + +Filter the incoming elements using a predicate. + +## Signature + +## Description + +Filter the incoming elements using a predicate. If the predicate returns false the element is passed downstream, if +it returns true the element is discarded. + + +@@@div { .callout } + +**emits** when the given predicate returns false for the element + +**backpressures** when the given predicate returns false for the element and downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/flatMapConcat.md b/akka-docs/src/main/paradox/stream/reference/flatMapConcat.md new file mode 100644 index 0000000000..0558f6c761 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/flatMapConcat.md @@ -0,0 +1,25 @@ +# flatMapConcat + +Transform each input element into a `Source` whose elements are then flattened into the output stream through +concatenation. + +## Signature + +## Description + +Transform each input element into a `Source` whose elements are then flattened into the output stream through +concatenation. This means each source is fully consumed before consumption of the next source starts. + + +@@@div { .callout } + +**emits** when the current consumed substream has an element available + +**backpressures** when downstream backpressures + +**completes** when upstream completes and all consumed substreams complete + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/flatMapMerge.md b/akka-docs/src/main/paradox/stream/reference/flatMapMerge.md new file mode 100644 index 0000000000..e9a7a9f073 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/flatMapMerge.md @@ -0,0 +1,25 @@ +# flatMapMerge + +Transform each input element into a `Source` whose elements are then flattened into the output stream through +merging. + +## Signature + +## Description + +Transform each input element into a `Source` whose elements are then flattened into the output stream through +merging. The maximum number of merged sources has to be specified. + + +@@@div { .callout } + +**emits** when one of the currently consumed substreams has an element available + +**backpressures** when downstream backpressures + +**completes** when upstream completes and all consumed substreams complete + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/fold.md b/akka-docs/src/main/paradox/stream/reference/fold.md new file mode 100644 index 0000000000..83919e145f --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/fold.md @@ -0,0 +1,25 @@ +# fold + +Start with current value `zero` and then apply the current and next value to the given function, when upstream +complete the current value is emitted downstream. + +## Signature + +## Description + +Start with current value `zero` and then apply the current and next value to the given function, when upstream +complete the current value is emitted downstream. + + +@@@div { .callout } + +**emits** when upstream completes + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/foldAsync.md b/akka-docs/src/main/paradox/stream/reference/foldAsync.md new file mode 100644 index 0000000000..01a71677c4 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/foldAsync.md @@ -0,0 +1,23 @@ +# foldAsync + +Just like `fold` but receiving a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. + +## Signature + +## Description + +Just like `fold` but receiving a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. + + +@@@div { .callout } + +**emits** when upstream completes and the last @scala[`Future`] @java[`CompletionStage`] is resolved + +**backpressures** when downstream backpressures + +**completes** when upstream completes and the last @scala[`Future`] @java[`CompletionStage`] is resolved + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/foreach.md b/akka-docs/src/main/paradox/stream/reference/foreach.md new file mode 100644 index 0000000000..b3940afc56 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/foreach.md @@ -0,0 +1,26 @@ +# foreach + +Invoke a given procedure for each element received. + +## Signature + +## Description + +Invoke a given procedure for each element received. Note that it is not safe to mutate shared state from the procedure. + +The sink materializes into a @scala[`Future[Option[Done]]`] @java[`CompletionStage`] which completes when the +stream completes, or fails if the stream fails. + +Note that it is not safe to mutate state from the procedure. + + +@@@div { .callout } + +**cancels** never + +**backpressures** when the previous procedure invocation has not yet completed + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/foreachParallel.md b/akka-docs/src/main/paradox/stream/reference/foreachParallel.md new file mode 100644 index 0000000000..761ab385ce --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/foreachParallel.md @@ -0,0 +1,21 @@ +# foreachParallel + +Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel. + +## Signature + +## Description + +Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel. + + +@@@div { .callout } + +**cancels** never + +**backpressures** when the previous parallel procedure invocations has not yet completed + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/fromCompletionStage.md b/akka-docs/src/main/paradox/stream/reference/fromCompletionStage.md new file mode 100644 index 0000000000..9ebacb8774 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/fromCompletionStage.md @@ -0,0 +1,22 @@ +# fromCompletionStage + +Send the single value of the `CompletionStage` when it completes and there is demand. + +## Signature + +## Description + +Send the single value of the `CompletionStage` when it completes and there is demand. +If the future fails the stream is failed with that exception. + + +@@@div { .callout } + +**emits** the future completes + +**completes** after the future has completed + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/fromFuture.md b/akka-docs/src/main/paradox/stream/reference/fromFuture.md new file mode 100644 index 0000000000..b0300c364a --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/fromFuture.md @@ -0,0 +1,22 @@ +# fromFuture + +Send the single value of the `Future` when it completes and there is demand. + +## Signature + +## Description + +Send the single value of the `Future` when it completes and there is demand. +If the future fails the stream is failed with that exception. + + +@@@div { .callout } + +**emits** the future completes + +**completes** after the future has completed + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/fromFutureSource.md b/akka-docs/src/main/paradox/stream/reference/fromFutureSource.md new file mode 100644 index 0000000000..ef16e7fb30 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/fromFutureSource.md @@ -0,0 +1,22 @@ +# fromFutureSource + +Streams the elements of the given future source once it successfully completes. + +## Signature + +## Description + +Streams the elements of the given future source once it successfully completes. +If the future fails the stream is failed. + + +@@@div { .callout } + +**emits** the next value from the *future* source, once it has completed + +**completes** after the *future* source completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/fromInputStream.md b/akka-docs/src/main/paradox/stream/reference/fromInputStream.md new file mode 100644 index 0000000000..02143cbf2f --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/fromInputStream.md @@ -0,0 +1,16 @@ +# fromInputStream + +Create a source that wraps an `InputStream`. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/fromIterator.md b/akka-docs/src/main/paradox/stream/reference/fromIterator.md new file mode 100644 index 0000000000..1cf60bc5e4 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/fromIterator.md @@ -0,0 +1,24 @@ +# fromIterator + +Stream the values from an `Iterator`, requesting the next value when there is demand. + +## Signature + +## Description + +Stream the values from an `Iterator`, requesting the next value when there is demand. The iterator will be created anew +for each materialization, which is the reason the @scala[`method`] @java[`factory`] takes a @scala[`function`] @java[`Creator`] rather than an `Iterator` directly. + +If the iterator perform blocking operations, make sure to run it on a separate dispatcher. + + +@@@div { .callout } + +**emits** the next value returned from the iterator + +**completes** when the iterator reaches its end + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/fromJavaStream.md b/akka-docs/src/main/paradox/stream/reference/fromJavaStream.md new file mode 100644 index 0000000000..9abb17f28a --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/fromJavaStream.md @@ -0,0 +1,16 @@ +# fromJavaStream + +Create a source that wraps a Java 8 `Stream`. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/fromOutputStream.md b/akka-docs/src/main/paradox/stream/reference/fromOutputStream.md new file mode 100644 index 0000000000..256ab3aaa9 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/fromOutputStream.md @@ -0,0 +1,16 @@ +# fromOutputStream + +Create a sink that wraps an `OutputStream`. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/fromPath.md b/akka-docs/src/main/paradox/stream/reference/fromPath.md new file mode 100644 index 0000000000..4f796a0bf8 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/fromPath.md @@ -0,0 +1,17 @@ +# fromPath + +Emit the contents of a file, as `ByteString` s, materializes into a @scala[`Future`] @java[`CompletionStage`]` which will be completed with +a `IOResult` upon reaching the end of the file or if there is a failure. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/fromPublisher.md b/akka-docs/src/main/paradox/stream/reference/fromPublisher.md new file mode 100644 index 0000000000..3695dc3f96 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/fromPublisher.md @@ -0,0 +1,16 @@ +# fromPublisher + +Integration with Reactive Streams, subscribes to a `org. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/fromSourceCompletionStage.md b/akka-docs/src/main/paradox/stream/reference/fromSourceCompletionStage.md new file mode 100644 index 0000000000..03e0a08ceb --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/fromSourceCompletionStage.md @@ -0,0 +1,22 @@ +# fromSourceCompletionStage + +Streams the elements of an asynchronous source once its given *completion* stage completes. + +## Signature + +## Description + +Streams the elements of an asynchronous source once its given *completion* stage completes. +If the *completion* fails the stream is failed with that exception. + + +@@@div { .callout } + +**emits** the next value from the asynchronous source, once its *completion stage* has completed + +**completes** after the asynchronous source completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/fromSubscriber.md b/akka-docs/src/main/paradox/stream/reference/fromSubscriber.md new file mode 100644 index 0000000000..1cd1a98e0f --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/fromSubscriber.md @@ -0,0 +1,16 @@ +# fromSubscriber + +Integration with Reactive Streams, wraps a `org. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/groupBy.md b/akka-docs/src/main/paradox/stream/reference/groupBy.md new file mode 100644 index 0000000000..138adad59a --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/groupBy.md @@ -0,0 +1,22 @@ +# groupBy + +Demultiplex the incoming stream into separate output streams. + +## Signature + +## Description + +Demultiplex the incoming stream into separate output streams. + + +@@@div { .callout } + +**emits** an element for which the grouping function returns a group that has not yet been created. Emits the new group +there is an element pending for a group whose substream backpressures + +**completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/grouped.md b/akka-docs/src/main/paradox/stream/reference/grouped.md new file mode 100644 index 0000000000..ca87173faf --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/grouped.md @@ -0,0 +1,25 @@ +# grouped + +Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of +elements downstream. + +## Signature + +## Description + +Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of +elements downstream. + + +@@@div { .callout } + +**emits** when the specified number of elements has been accumulated or upstream completed + +**backpressures** when a group has been assembled and downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/groupedWeightedWithin.md b/akka-docs/src/main/paradox/stream/reference/groupedWeightedWithin.md new file mode 100644 index 0000000000..a86cc46455 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/groupedWeightedWithin.md @@ -0,0 +1,27 @@ +# groupedWeightedWithin + +Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, +whatever happens first. + +## Signature + +## Description + +Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, +whatever happens first. Empty groups will not be emitted if no elements are received from upstream. +The last group before end-of-stream will contain the buffered elements since the previously emitted group. + + +@@@div { .callout } + +**emits** when the configured time elapses since the last group has been emitted, +but not if no elements has been grouped (i.e: no empty groups), or when weight limit has been reached. + +**backpressures** downstream backpressures, and buffered group (+ pending element) weighs more than *maxWeight* + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/groupedWithin.md b/akka-docs/src/main/paradox/stream/reference/groupedWithin.md new file mode 100644 index 0000000000..917cab2ca0 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/groupedWithin.md @@ -0,0 +1,27 @@ +# groupedWithin + +Chunk up this stream into groups of elements received within a time window, or limited by the number of the elements, +whatever happens first. + +## Signature + +## Description + +Chunk up this stream into groups of elements received within a time window, or limited by the number of the elements, +whatever happens first. Empty groups will not be emitted if no elements are received from upstream. +The last group before end-of-stream will contain the buffered elements since the previously emitted group. + + +@@@div { .callout } + +**emits** when the configured time elapses since the last group has been emitted, +but not if no elements has been grouped (i.e: no empty groups), or when limit has been reached. + +**backpressures** downstream backpressures, and there are *n+1* buffered elements + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/head.md b/akka-docs/src/main/paradox/stream/reference/head.md new file mode 100644 index 0000000000..54ea990e8e --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/head.md @@ -0,0 +1,23 @@ +# head + +Materializes into a @scala[`Future`] @java[`CompletionStage`] which completes with the first value arriving, +after this the stream is canceled. + +## Signature + +## Description + +Materializes into a @scala[`Future`] @java[`CompletionStage`] which completes with the first value arriving, +after this the stream is canceled. If no element is emitted, the @scala[`Future`] @java[`CompletionStage`] is failed. + + +@@@div { .callout } + +**cancels** after receiving one element + +**backpressures** never + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/headOption.md b/akka-docs/src/main/paradox/stream/reference/headOption.md new file mode 100644 index 0000000000..6e6a902b06 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/headOption.md @@ -0,0 +1,23 @@ +# headOption + +Materializes into a @scala[`Future[Option[T]]`] @java[`CompletionStage>`] which completes with the first value arriving wrapped in @scala[`Some`] @java[`Optional`], +or @scala[a `None`] @java[an empty Optional] if the stream completes without any elements emitted. + +## Signature + +## Description + +Materializes into a @scala[`Future[Option[T]]`] @java[`CompletionStage>`] which completes with the first value arriving wrapped in @scala[`Some`] @java[`Optional`], +or @scala[a `None`] @java[an empty Optional] if the stream completes without any elements emitted. + + +@@@div { .callout } + +**cancels** after receiving one element + +**backpressures** never + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/idleTimeout.md b/akka-docs/src/main/paradox/stream/reference/idleTimeout.md new file mode 100644 index 0000000000..3eda0c5f8a --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/idleTimeout.md @@ -0,0 +1,28 @@ +# idleTimeout + +If the time between two processed elements exceeds the provided timeout, the stream is failed +with a `TimeoutException`. + +## Signature + +## Description + +If the time between two processed elements exceeds the provided timeout, the stream is failed +with a `TimeoutException`. The timeout is checked periodically, so the resolution of the +check is one period (equals to timeout value). + + +@@@div { .callout } + +**emits** when upstream emits an element + +**backpressures** when downstream backpressures + +**completes** when upstream completes or fails if timeout elapses between two emitted elements + +**cancels** when downstream cancels + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/ignore.md b/akka-docs/src/main/paradox/stream/reference/ignore.md new file mode 100644 index 0000000000..d4bf99a89a --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/ignore.md @@ -0,0 +1,22 @@ +# ignore + +Consume all elements but discards them. + +## Signature + +## Description + +Consume all elements but discards them. Useful when a stream has to be consumed but there is no use to actually +do anything with the elements. + + +@@@div { .callout } + +**cancels** never + +**backpressures** never + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/initialDelay.md b/akka-docs/src/main/paradox/stream/reference/initialDelay.md new file mode 100644 index 0000000000..fe616a93fe --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/initialDelay.md @@ -0,0 +1,25 @@ +# initialDelay + +Delays the initial element by the specified duration. + +## Signature + +## Description + +Delays the initial element by the specified duration. + + +@@@div { .callout } + +**emits** when upstream emits an element if the initial delay is already elapsed + +**backpressures** when downstream backpressures or initial delay is not yet elapsed + +**completes** when upstream completes + +**cancels** when downstream cancels + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/initialTimeout.md b/akka-docs/src/main/paradox/stream/reference/initialTimeout.md new file mode 100644 index 0000000000..2d0fe4f34c --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/initialTimeout.md @@ -0,0 +1,27 @@ +# initialTimeout + +If the first element has not passed through this stage before the provided timeout, the stream is failed +with a `TimeoutException`. + +## Signature + +## Description + +If the first element has not passed through this stage before the provided timeout, the stream is failed +with a `TimeoutException`. + + +@@@div { .callout } + +**emits** when upstream emits an element + +**backpressures** when downstream backpressures + +**completes** when upstream completes or fails if timeout elapses before first element arrives + +**cancels** when downstream cancels + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/interleave.md b/akka-docs/src/main/paradox/stream/reference/interleave.md new file mode 100644 index 0000000000..05c6df0c29 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/interleave.md @@ -0,0 +1,24 @@ +# interleave + +Emits a specifiable number of elements from the original source, then from the provided source and repeats. + +## Signature + +## Description + +Emits a specifiable number of elements from the original source, then from the provided source and repeats. If one +source completes the rest of the other stream will be emitted. + + +@@@div { .callout } + +**emits** when element is available from the currently consumed upstream + +**backpressures** when upstream backpressures + +**completes** when both upstreams have completed + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/intersperse.md b/akka-docs/src/main/paradox/stream/reference/intersperse.md new file mode 100644 index 0000000000..a69ad181e6 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/intersperse.md @@ -0,0 +1,23 @@ +# intersperse + +Intersperse stream with provided element similar to `List. + +## Signature + +## Description + +Intersperse stream with provided element similar to `List.mkString`. It can inject start and end marker elements to stream. + + +@@@div { .callout } + +**emits** when upstream emits an element or before with the *start* element if provided + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/javaCollector.md b/akka-docs/src/main/paradox/stream/reference/javaCollector.md new file mode 100644 index 0000000000..c4e48124b2 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/javaCollector.md @@ -0,0 +1,17 @@ +# javaCollector + +Create a sink which materializes into a @scala[`Future`] @java[`CompletionStage`] which will be completed with a result of the Java 8 `Collector` +transformation and reduction operations. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/javaCollectorParallelUnordered.md b/akka-docs/src/main/paradox/stream/reference/javaCollectorParallelUnordered.md new file mode 100644 index 0000000000..ae4d6abd41 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/javaCollectorParallelUnordered.md @@ -0,0 +1,17 @@ +# javaCollectorParallelUnordered + +Create a sink which materializes into a @scala[`Future`] @java[`CompletionStage`] which will be completed with a result of the Java 8 `Collector` +transformation and reduction operations. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/keepAlive.md b/akka-docs/src/main/paradox/stream/reference/keepAlive.md new file mode 100644 index 0000000000..f67151b6f7 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/keepAlive.md @@ -0,0 +1,25 @@ +# keepAlive + +Injects additional (configured) elements if upstream does not emit for a configured amount of time. + +## Signature + +## Description + +Injects additional (configured) elements if upstream does not emit for a configured amount of time. + + +@@@div { .callout } + +**emits** when upstream emits an element or if the upstream was idle for the configured period + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +**cancels** when downstream cancels + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/last.md b/akka-docs/src/main/paradox/stream/reference/last.md new file mode 100644 index 0000000000..fe7f83026b --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/last.md @@ -0,0 +1,23 @@ +# last + +Materializes into a @scala[`Future`] @java[`CompletionStage`] which will complete with the last value emitted when the stream +completes. + +## Signature + +## Description + +Materializes into a @scala[`Future`] @java[`CompletionStage`] which will complete with the last value emitted when the stream +completes. If the stream completes with no elements the @scala[`Future`] @java[`CompletionStage`] is failed. + + +@@@div { .callout } + +**cancels** never + +**backpressures** never + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/lastOption.md b/akka-docs/src/main/paradox/stream/reference/lastOption.md new file mode 100644 index 0000000000..b872abc6c4 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/lastOption.md @@ -0,0 +1,24 @@ +# lastOption + +Materialize a @scala[`Future[Option[T]]`] @java[`CompletionStage>`] which completes with the last value +emitted wrapped in an @scala[`Some`] @java[`Optional`] when the stream completes. + +## Signature + +## Description + +Materialize a @scala[`Future[Option[T]]`] @java[`CompletionStage>`] which completes with the last value +emitted wrapped in an @scala[`Some`] @java[`Optional`] when the stream completes. if the stream completes with no elements the `CompletionStage` is +completed with @scala[`None`] @java[an empty `Optional`]. + + +@@@div { .callout } + +**cancels** never + +**backpressures** never + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/lazily.md b/akka-docs/src/main/paradox/stream/reference/lazily.md new file mode 100644 index 0000000000..29cd0eed86 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/lazily.md @@ -0,0 +1,21 @@ +# lazily + +Defers creation and materialization of a `Source` until there is demand. + +## Signature + +## Description + +Defers creation and materialization of a `Source` until there is demand. + + +@@@div { .callout } + +**emits** depends on the wrapped `Source` + +**completes** depends on the wrapped `Source` + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/lazilyAsync.md b/akka-docs/src/main/paradox/stream/reference/lazilyAsync.md new file mode 100644 index 0000000000..2a6317bffe --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/lazilyAsync.md @@ -0,0 +1,21 @@ +# lazilyAsync + +Defers creation and materialization of a `CompletionStage` until there is demand. + +## Signature + +## Description + +Defers creation and materialization of a `CompletionStage` until there is demand. + + +@@@div { .callout } + +**emits** the future completes + +**completes** after the future has completed + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/lazyInitAsync.md b/akka-docs/src/main/paradox/stream/reference/lazyInitAsync.md new file mode 100644 index 0000000000..6cbb2e1649 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/lazyInitAsync.md @@ -0,0 +1,34 @@ +# lazyInitAsync + +Creates a real `Flow` upon receiving the first element by calling relevant `flowFactory` given as an argument. + +## Signature + +## Description + +Creates a real `Flow` upon receiving the first element by calling relevant `flowFactory` given as an argument. +Internal `Flow` will not be created if there are no elements, because of completion or error. +The materialized value of the `Flow` will be the materialized value of the created internal flow. + +The materialized value of the `Flow` is a @scala[`Future[Option[M]]`]@java[`CompletionStage>`] that is +completed with @scala[`Some(mat)`]@java[`Optional.of(mat)`] when the internal flow gets materialized or with @scala[`None`] +@java[an empty optional] when there where no elements. If the flow materialization (including the call of the `flowFactory`) +fails then the future is completed with a failure. + +Adheres to the `ActorAttributes.SupervisionStrategy` attribute. + + +@@@div { .callout } + +**emits** when the internal flow is successfully created and it emits + +**backpressures** when the internal flow is successfully created and it backpressures + +**completes** when upstream completes and all elements have been emitted from the internal flow + +**completes** when upstream completes and all futures have been completed and all elements have been emitted + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/limit.md b/akka-docs/src/main/paradox/stream/reference/limit.md new file mode 100644 index 0000000000..2d48e770e4 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/limit.md @@ -0,0 +1,23 @@ +# limit + +Limit number of element from upstream to given `max` number. + +## Signature + +## Description + +Limit number of element from upstream to given `max` number. + + +@@@div { .callout } + +**emits** when upstream emits and the number of emitted elements has not reached max + +**backpressures** when downstream backpressures + +**completes** when upstream completes and the number of emitted elements has not reached max + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/limitWeighted.md b/akka-docs/src/main/paradox/stream/reference/limitWeighted.md new file mode 100644 index 0000000000..d74327cb47 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/limitWeighted.md @@ -0,0 +1,24 @@ +# limitWeighted + +Ensure stream boundedness by evaluating the cost of incoming elements using a cost function. + +## Signature + +## Description + +Ensure stream boundedness by evaluating the cost of incoming elements using a cost function. +Evaluated cost of each element defines how many elements will be allowed to travel downstream. + + +@@@div { .callout } + +**emits** when upstream emits and the number of emitted elements has not reached max + +**backpressures** when downstream backpressures + +**completes** when upstream completes and the number of emitted elements has not reached max + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/log.md b/akka-docs/src/main/paradox/stream/reference/log.md new file mode 100644 index 0000000000..7c95341258 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/log.md @@ -0,0 +1,25 @@ +# log + +Log elements flowing through the stream as well as completion and erroring. + +## Signature + +## Description + +Log elements flowing through the stream as well as completion and erroring. By default element and +completion signals are logged on debug level, and errors are logged on Error level. +This can be changed by calling @scala[`Attributes.logLevels(...)`] @java[`Attributes.createLogLevels(...)`] on the given Flow. + + +@@@div { .callout } + +**emits** when upstream emits + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/map.md b/akka-docs/src/main/paradox/stream/reference/map.md new file mode 100644 index 0000000000..2bd0dbf33d --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/map.md @@ -0,0 +1,23 @@ +# map + +Transform each element in the stream by calling a mapping function with it and passing the returned value downstream. + +## Signature + +## Description + +Transform each element in the stream by calling a mapping function with it and passing the returned value downstream. + + +@@@div { .callout } + +**emits** when the mapping function returns an element + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/mapAsync.md b/akka-docs/src/main/paradox/stream/reference/mapAsync.md new file mode 100644 index 0000000000..a542a3dd12 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/mapAsync.md @@ -0,0 +1,27 @@ +# mapAsync + +Pass incoming elements to a function that return a @scala[`Future`] @java[`CompletionStage`] result. + +## Signature + +## Description + +Pass incoming elements to a function that return a @scala[`Future`] @java[`CompletionStage`] result. When the @scala[`Future`] @java[`CompletionStage`] arrives the result is passed +downstream. Up to `n` elements can be processed concurrently, but regardless of their completion time the incoming +order will be kept when results complete. For use cases where order does not matter `mapAsyncUnordered` can be used. + +If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unless a different supervision strategy is applied) + + +@@@div { .callout } + +**emits** when the @scala[`Future`] @java[`CompletionStage`] returned by the provided function finishes for the next element in sequence + +**backpressures** when the number of @scala[`Future` s] @java[`CompletionStage` s] reaches the configured parallelism and the downstream backpressures + +**completes** when upstream completes and all @scala[`Future` s] @java[`CompletionStage` s] has been completed and all elements has been emitted + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/mapAsyncUnordered.md b/akka-docs/src/main/paradox/stream/reference/mapAsyncUnordered.md new file mode 100644 index 0000000000..7db9bdad7c --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/mapAsyncUnordered.md @@ -0,0 +1,27 @@ +# mapAsyncUnordered + +Like `mapAsync` but @scala[`Future`] @java[`CompletionStage`] results are passed downstream as they arrive regardless of the order of the elements +that triggered them. + +## Signature + +## Description + +Like `mapAsync` but @scala[`Future`] @java[`CompletionStage`] results are passed downstream as they arrive regardless of the order of the elements +that triggered them. + +If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unless a different supervision strategy is applied) + + +@@@div { .callout } + +**emits** any of the @scala[`Future` s] @java[`CompletionStage` s] returned by the provided function complete + +**backpressures** when the number of @scala[`Future` s] @java[`CompletionStage` s] reaches the configured parallelism and the downstream backpressures + +**completes** upstream completes and all @scala[`Future` s] @java[`CompletionStage` s] has been completed and all elements has been emitted + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/mapConcat.md b/akka-docs/src/main/paradox/stream/reference/mapConcat.md new file mode 100644 index 0000000000..bf172daf3b --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/mapConcat.md @@ -0,0 +1,23 @@ +# mapConcat + +Transform each element into zero or more elements that are individually passed downstream. + +## Signature + +## Description + +Transform each element into zero or more elements that are individually passed downstream. + + +@@@div { .callout } + +**emits** when the mapping function returns an element or there are still remaining elements from the previously calculated collection + +**backpressures** when downstream backpressures or there are still available elements from the previously calculated collection + +**completes** when upstream completes and all remaining elements has been emitted + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/mapError.md b/akka-docs/src/main/paradox/stream/reference/mapError.md new file mode 100644 index 0000000000..2db954b6da --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/mapError.md @@ -0,0 +1,29 @@ +# mapError + +While similar to `recover` this stage can be used to transform an error signal to a different one *without* logging +it as an error in the process. + +## Signature + +## Description + +While similar to `recover` this stage can be used to transform an error signal to a different one *without* logging +it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover +would log the `t2` error. + +Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. +This stage can recover the failure signal, but not the skipped elements, which will be dropped. + +Similarily to `recover` throwing an exception inside `mapError` _will_ be logged on ERROR level automatically. + + +@@@div { .callout } + +**emits** when element is available from the upstream or upstream is failed and pf returns an element +**backpressures** when downstream backpressures +**completes** when upstream completes or upstream failed with exception pf can handle + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/maybe.md b/akka-docs/src/main/paradox/stream/reference/maybe.md new file mode 100644 index 0000000000..fb33c1b41a --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/maybe.md @@ -0,0 +1,23 @@ +# maybe + +Materialize a @scala[`Promise[Option[T]]`] @java[`CompletionStage`] that if completed with a @scala[`Some[T]`] @java[`Optional`] +will emit that *T* and then complete the stream, or if completed with @scala[`None`] @java[`empty Optional`] complete the stream right away. + +## Signature + +## Description + +Materialize a @scala[`Promise[Option[T]]`] @java[`CompletionStage`] that if completed with a @scala[`Some[T]`] @java[`Optional`] +will emit that *T* and then complete the stream, or if completed with @scala[`None`] @java[`empty Optional`] complete the stream right away. + + +@@@div { .callout } + +**emits** when the returned promise is completed with some value + +**completes** after emitting some value, or directly if the promise is completed with no value + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/merge.md b/akka-docs/src/main/paradox/stream/reference/merge.md new file mode 100644 index 0000000000..1bf5c5802d --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/merge.md @@ -0,0 +1,23 @@ +# merge + +Merge multiple sources. + +## Signature + +## Description + +Merge multiple sources. Picks elements randomly if all sources has elements ready. + + +@@@div { .callout } + +**emits** when one of the inputs has an element available + +**backpressures** when downstream backpressures + +**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/mergePreferred.md b/akka-docs/src/main/paradox/stream/reference/mergePreferred.md new file mode 100644 index 0000000000..938a9aa672 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/mergePreferred.md @@ -0,0 +1,23 @@ +# mergePreferred + +Merge multiple sources. + +## Signature + +## Description + +Merge multiple sources. Prefer one source if all sources has elements ready. + + +@@@div { .callout } + +**emits** when one of the inputs has an element available, preferring a defined input if multiple have elements available + +**backpressures** when downstream backpressures + +**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/mergePrioritized.md b/akka-docs/src/main/paradox/stream/reference/mergePrioritized.md new file mode 100644 index 0000000000..5374f9f67f --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/mergePrioritized.md @@ -0,0 +1,24 @@ +# mergePrioritized + +Merge multiple sources. + +## Signature + +## Description + +Merge multiple sources. Prefer sources depending on priorities if all sources has elements ready. If a subset of all +sources has elements ready the relative priorities for those sources are used to prioritise. + + +@@@div { .callout } + +**emits** when one of the inputs has an element available, preferring inputs based on their priorities if multiple have elements available + +**backpressures** when downstream backpressures + +**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/mergeSorted.md b/akka-docs/src/main/paradox/stream/reference/mergeSorted.md new file mode 100644 index 0000000000..d1448bc239 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/mergeSorted.md @@ -0,0 +1,24 @@ +# mergeSorted + +Merge multiple sources. + +## Signature + +## Description + +Merge multiple sources. Waits for one element to be ready from each input stream and emits the +smallest element. + + +@@@div { .callout } + +**emits** when all of the inputs have an element available + +**backpressures** when downstream backpressures + +**completes** when all upstreams complete + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/monitor.md b/akka-docs/src/main/paradox/stream/reference/monitor.md new file mode 100644 index 0000000000..3ab9efe14e --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/monitor.md @@ -0,0 +1,25 @@ +# monitor + +Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the stage. + +## Signature + +## Description + +Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the stage. The stage otherwise +passes through elements unchanged. Note that the `FlowMonitor` inserts a memory barrier every time it processes an +event, and may therefore affect performance. + + +@@@div { .callout } + +**emits** when upstream emits an element + +**backpressures** when downstream **backpressures** + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/onComplete.md b/akka-docs/src/main/paradox/stream/reference/onComplete.md new file mode 100644 index 0000000000..3742d13624 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/onComplete.md @@ -0,0 +1,21 @@ +# onComplete + +Invoke a callback when the stream has completed or failed. + +## Signature + +## Description + +Invoke a callback when the stream has completed or failed. + + +@@@div { .callout } + +**cancels** never + +**backpressures** never + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/orElse.md b/akka-docs/src/main/paradox/stream/reference/orElse.md new file mode 100644 index 0000000000..f986adeae7 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/orElse.md @@ -0,0 +1,32 @@ +# orElse + +If the primary source completes without emitting any elements, the elements from the secondary source +are emitted. + +## Signature + +## Description + +If the primary source completes without emitting any elements, the elements from the secondary source +are emitted. If the primary source emits any elements the secondary source is cancelled. + +Note that both sources are materialized directly and the secondary source is backpressured until it becomes +the source of elements or is cancelled. + +Signal errors downstream, regardless which of the two sources emitted the error. + + +@@@div { .callout } + +**emits** when an element is available from first stream or first stream closed without emitting any elements and an element +is available from the second stream + +**backpressures** when downstream backpressures + +**completes** the primary stream completes after emitting at least one element, when the primary stream completes +without emitting and the secondary stream already has completed or when the secondary stream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/partition.md b/akka-docs/src/main/paradox/stream/reference/partition.md new file mode 100644 index 0000000000..82a8ffcdda --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/partition.md @@ -0,0 +1,26 @@ +# partition + +Fan-out the stream to several streams. + +## Signature + +## Description + +Fan-out the stream to several streams. Each upstream element is emitted to one downstream consumer according to the +partitioner function applied to the element. + + +@@@div { .callout } + +**emits** when the chosen output stops backpressuring and there is an input element available + +**backpressures** when the chosen output backpressures + +**completes** when upstream completes and no output is pending + +**cancels** depends on the `eagerCancel` flag. If it is true, when any downstream cancels, if false, when all downstreams cancel. + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/preMaterialize.md b/akka-docs/src/main/paradox/stream/reference/preMaterialize.md new file mode 100644 index 0000000000..8a2e3e05b1 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/preMaterialize.md @@ -0,0 +1,16 @@ +# preMaterialize + +Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements 'into' the pre-materialized one. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/prefixAndTail.md b/akka-docs/src/main/paradox/stream/reference/prefixAndTail.md new file mode 100644 index 0000000000..544a641dec --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/prefixAndTail.md @@ -0,0 +1,25 @@ +# prefixAndTail + +Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements) +and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements. + +## Signature + +## Description + +Take up to *n* elements from the stream (less than *n* only if the upstream completes before emitting *n* elements) +and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements. + + +@@@div { .callout } + +**emits** when the configured number of prefix elements are available. Emits this prefix, and the rest as a substream + +**backpressures** when downstream backpressures or substream backpressures + +**completes** when prefix elements has been consumed and substream has been consumed + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/prepend.md b/akka-docs/src/main/paradox/stream/reference/prepend.md new file mode 100644 index 0000000000..2fde0f208b --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/prepend.md @@ -0,0 +1,25 @@ +# prepend + +Prepends the given source to the flow, consuming it until completion before the original source is consumed. + +## Signature + +## Description + +Prepends the given source to the flow, consuming it until completion before the original source is consumed. + +If materialized values needs to be collected `prependMat` is available. + + +@@@div { .callout } + +**emits** when the given stream has an element available; if the given input completes, it tries the current one + +**backpressures** when downstream backpressures + +**completes** when all upstreams complete + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/queue.md b/akka-docs/src/main/paradox/stream/reference/queue.md new file mode 100644 index 0000000000..d105de4355 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/queue.md @@ -0,0 +1,22 @@ +# queue + +Materialize a `SinkQueue` that can be pulled to trigger demand through the sink. + +## Signature + +## Description + +Materialize a `SinkQueue` that can be pulled to trigger demand through the sink. The queue contains +a buffer in case stream emitting elements faster than queue pulling them. + + +@@@div { .callout } + +**cancels** when `SinkQueue.cancel` is called + +**backpressures** when buffer has some space + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/range.md b/akka-docs/src/main/paradox/stream/reference/range.md new file mode 100644 index 0000000000..13a8555521 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/range.md @@ -0,0 +1,21 @@ +# range + +Emit each integer in a range, with an option to take bigger steps than 1. + +## Signature + +## Description + +Emit each integer in a range, with an option to take bigger steps than 1. + + +@@@div { .callout } + +**emits** when there is demand, the next value + +**completes** when the end of the range has been reached + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/recover.md b/akka-docs/src/main/paradox/stream/reference/recover.md new file mode 100644 index 0000000000..17218ad26f --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/recover.md @@ -0,0 +1,25 @@ +# recover + +Allow sending of one last element downstream when a failure has happened upstream. + +## Signature + +## Description + +Allow sending of one last element downstream when a failure has happened upstream. + +Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + + +@@@div { .callout } + +**emits** when the element is available from the upstream or upstream is failed and pf returns an element + +**backpressures** when downstream backpressures, not when failure happened + +**completes** when upstream completes or upstream failed with exception pf can handle + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/recoverWith.md b/akka-docs/src/main/paradox/stream/reference/recoverWith.md new file mode 100644 index 0000000000..80ab9a67c4 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/recoverWith.md @@ -0,0 +1,25 @@ +# recoverWith + +Allow switching to alternative Source when a failure has happened upstream. + +## Signature + +## Description + +Allow switching to alternative Source when a failure has happened upstream. + +Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically. + + +@@@div { .callout } + +**emits** the element is available from the upstream or upstream is failed and pf returns alternative Source + +**backpressures** downstream backpressures, after failure happened it backprssures to alternative Source + +**completes** upstream completes or upstream failed with exception pf can handle + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/recoverWithRetries.md b/akka-docs/src/main/paradox/stream/reference/recoverWithRetries.md new file mode 100644 index 0000000000..3149bed382 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/recoverWithRetries.md @@ -0,0 +1,29 @@ +# recoverWithRetries + +RecoverWithRetries allows to switch to alternative Source on flow failure. + +## Signature + +## Description + +RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after +a failure has been recovered up to *attempts* number of times so that each time there is a failure +it is fed into the *pf* and a new Source may be materialized. Note that if you pass in 0, this won't +attempt to recover at all. A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`. + +Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. +This stage can recover the failure signal, but not the skipped elements, which will be dropped. + + +@@@div { .callout } + +**emits** when element is available from the upstream or upstream is failed and element is available from alternative Source + +**backpressures** when downstream backpressures + +**completes** when upstream completes or upstream failed with exception pf can handle + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/reduce.md b/akka-docs/src/main/paradox/stream/reference/reduce.md new file mode 100644 index 0000000000..1ba9f88975 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/reduce.md @@ -0,0 +1,25 @@ +# reduce + +Start with first element and then apply the current and next value to the given function, when upstream +complete the current value is emitted downstream. + +## Signature + +## Description + +Start with first element and then apply the current and next value to the given function, when upstream +complete the current value is emitted downstream. Similar to `fold`. + + +@@@div { .callout } + +**emits** when upstream completes + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/repeat.md b/akka-docs/src/main/paradox/stream/reference/repeat.md new file mode 100644 index 0000000000..0ab5fbb129 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/repeat.md @@ -0,0 +1,21 @@ +# repeat + +repeat + +## Signature + +## Description + +Stream a single object repeatedly + + +@@@div { .callout } + +**emits** the same value repeatedly when there is demand + +**completes** never + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/scan.md b/akka-docs/src/main/paradox/stream/reference/scan.md new file mode 100644 index 0000000000..ed12432038 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/scan.md @@ -0,0 +1,28 @@ +# scan + +Emit its current value which starts at `zero` and then applies the current and next value to the given function +emitting the next current value. + +## Signature + +## Description + +Emit its current value which starts at `zero` and then applies the current and next value to the given function +emitting the next current value. + +Note that this means that scan emits one element downstream before and upstream elements will not be requested until +the second element is required from downstream. + + +@@@div { .callout } + +**emits** when the function scanning the element returns a new element + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/scanAsync.md b/akka-docs/src/main/paradox/stream/reference/scanAsync.md new file mode 100644 index 0000000000..bfe31cbc0d --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/scanAsync.md @@ -0,0 +1,23 @@ +# scanAsync + +Just like `scan` but receiving a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. + +## Signature + +## Description + +Just like `scan` but receiving a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. + + +@@@div { .callout } + +**emits** when the @scala[`Future`] @java[`CompletionStage`] resulting from the function scanning the element resolves to the next value + +**backpressures** when downstream backpressures + +**completes** when upstream completes and the last @scala[`Future`] @java[`CompletionStage`] is resolved + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/seq.md b/akka-docs/src/main/paradox/stream/reference/seq.md new file mode 100644 index 0000000000..a53dc586a6 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/seq.md @@ -0,0 +1,22 @@ +# seq + +Collect values emitted from the stream into a collection, the collection is available through a @scala[`Future`] @java[`CompletionStage`] or +which completes when the stream completes. + +## Signature + +## Description + +Collect values emitted from the stream into a collection, the collection is available through a @scala[`Future`] @java[`CompletionStage`] or +which completes when the stream completes. Note that the collection is bounded to @scala[`Int.MaxValue`] @java[`Integer.MAX_VALUE`], +if more element are emitted the sink will cancel the stream + + +@@@div { .callout } + +**cancels** If too many values are collected + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/single.md b/akka-docs/src/main/paradox/stream/reference/single.md new file mode 100644 index 0000000000..c04d9a3e13 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/single.md @@ -0,0 +1,21 @@ +# single + +single + +## Signature + +## Description + +Stream a single object + + +@@@div { .callout } + +**emits** the value once + +**completes** when the single value has been emitted + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/sliding.md b/akka-docs/src/main/paradox/stream/reference/sliding.md new file mode 100644 index 0000000000..2458bed5e8 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/sliding.md @@ -0,0 +1,25 @@ +# sliding + +Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream. + +## Signature + +## Description + +Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream. + +Note: the last window might be smaller than the requested size due to end of stream. + + +@@@div { .callout } + +**emits** the specified number of elements has been accumulated or upstream completed + +**backpressures** when a group has been assembled and downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/splitAfter.md b/akka-docs/src/main/paradox/stream/reference/splitAfter.md new file mode 100644 index 0000000000..f80ce69e86 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/splitAfter.md @@ -0,0 +1,23 @@ +# splitAfter + +End the current substream whenever a predicate returns `true`, starting a new substream for the next element. + +## Signature + +## Description + +End the current substream whenever a predicate returns `true`, starting a new substream for the next element. + + +@@@div { .callout } + +**emits** when an element passes through. When the provided predicate is true it emits the element * and opens a new substream for subsequent element + +**backpressures** when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures + +**completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/splitWhen.md b/akka-docs/src/main/paradox/stream/reference/splitWhen.md new file mode 100644 index 0000000000..ab9d3f8150 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/splitWhen.md @@ -0,0 +1,23 @@ +# splitWhen + +Split off elements into a new substream whenever a predicate function return `true`. + +## Signature + +## Description + +Split off elements into a new substream whenever a predicate function return `true`. + + +@@@div { .callout } + +**emits** an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements + +**backpressures** when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures + +**completes** when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not) + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/statefulMapConcat.md b/akka-docs/src/main/paradox/stream/reference/statefulMapConcat.md new file mode 100644 index 0000000000..40a0a34758 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/statefulMapConcat.md @@ -0,0 +1,24 @@ +# statefulMapConcat + +Transform each element into zero or more elements that are individually passed downstream. + +## Signature + +## Description + +Transform each element into zero or more elements that are individually passed downstream. The difference to `mapConcat` is that +the transformation function is created from a factory for every materialization of the flow. + + +@@@div { .callout } + +**emits** when the mapping function returns an element or there are still remaining elements from the previously calculated collection + +**backpressures** when downstream backpressures or there are still available elements from the previously calculated collection + +**completes** when upstream completes and all remaining elements has been emitted + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/take.md b/akka-docs/src/main/paradox/stream/reference/take.md new file mode 100644 index 0000000000..9a3420c0b0 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/take.md @@ -0,0 +1,23 @@ +# take + +take + +## Signature + +## Description + +Pass `n` incoming elements downstream and then complete + + +@@@div { .callout } + +**emits** while the specified number of elements to take has not yet been reached + +**backpressures** when downstream backpressures + +**completes** when the defined number of elements has been taken or upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/takeWhile.md b/akka-docs/src/main/paradox/stream/reference/takeWhile.md new file mode 100644 index 0000000000..216093ad78 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/takeWhile.md @@ -0,0 +1,25 @@ +# takeWhile + +Pass elements downstream as long as a predicate function return true for the element include the element +when the predicate first return false and then complete. + +## Signature + +## Description + +Pass elements downstream as long as a predicate function return true for the element include the element +when the predicate first return false and then complete. + + +@@@div { .callout } + +**emits** while the predicate is true and until the first false result + +**backpressures** when downstream backpressures + +**completes** when predicate returned false or upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/takeWithin.md b/akka-docs/src/main/paradox/stream/reference/takeWithin.md new file mode 100644 index 0000000000..da22f4d2e1 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/takeWithin.md @@ -0,0 +1,23 @@ +# takeWithin + +Pass elements downstream within a timeout and then complete. + +## Signature + +## Description + +Pass elements downstream within a timeout and then complete. + + +@@@div { .callout } + +**emits** when an upstream element arrives + +**backpressures** downstream backpressures + +**completes** upstream completes or timer fires + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/throttle.md b/akka-docs/src/main/paradox/stream/reference/throttle.md new file mode 100644 index 0000000000..093fdbbd87 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/throttle.md @@ -0,0 +1,25 @@ +# throttle + +Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where +a function has to be provided to calculate the individual cost of each element. + +## Signature + +## Description + +Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where +a function has to be provided to calculate the individual cost of each element. + + +@@@div { .callout } + +**emits** when upstream emits an element and configured time per each element elapsed + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/tick.md b/akka-docs/src/main/paradox/stream/reference/tick.md new file mode 100644 index 0000000000..c9f35b5355 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/tick.md @@ -0,0 +1,22 @@ +# tick + +A periodical repetition of an arbitrary object. + +## Signature + +## Description + +A periodical repetition of an arbitrary object. Delay of first tick is specified +separately from interval of the following ticks. + + +@@@div { .callout } + +**emits** periodically, if there is downstream backpressure ticks are skipped + +**completes** never + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/toPath.md b/akka-docs/src/main/paradox/stream/reference/toPath.md new file mode 100644 index 0000000000..9a0936b2b1 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/toPath.md @@ -0,0 +1,16 @@ +# toPath + +Create a sink which will write incoming `ByteString` s to a given file path. + +## Signature + +## Description + + + +@@@div { .callout } + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/unfold.md b/akka-docs/src/main/paradox/stream/reference/unfold.md new file mode 100644 index 0000000000..4b4a7cef53 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/unfold.md @@ -0,0 +1,27 @@ +# unfold + +Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`], the value inside the option +consists of a @scala[tuple] @java[pair] where the first value is a state passed back into the next call to the function allowing +to pass a state. + +## Signature + +## Description + +Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`], the value inside the option +consists of a @scala[tuple] @java[pair] where the first value is a state passed back into the next call to the function allowing +to pass a state. The first invocation of the provided fold function will receive the `zero` state. + +Can be used to implement many stateful sources without having to touch the more low level `GraphStage` API. + + +@@@div { .callout } + +**emits** when there is demand and the unfold function over the previous state returns non empty value + +**completes** when the unfold function returns an empty value + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/unfoldAsync.md b/akka-docs/src/main/paradox/stream/reference/unfoldAsync.md new file mode 100644 index 0000000000..1d223b689a --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/unfoldAsync.md @@ -0,0 +1,25 @@ +# unfoldAsync + +Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`] which will cause the source to +complete or emit when it completes. + +## Signature + +## Description + +Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`] which will cause the source to +complete or emit when it completes. + +Can be used to implement many stateful sources without having to touch the more low level `GraphStage` API. + + +@@@div { .callout } + +**emits** when there is demand and unfold state returned future completes with some value + +**completes** when the @scala[future] @java[CompletionStage] returned by the unfold function completes with an empty value + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/unfoldResource.md b/akka-docs/src/main/paradox/stream/reference/unfoldResource.md new file mode 100644 index 0000000000..5ee8561bd7 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/unfoldResource.md @@ -0,0 +1,21 @@ +# unfoldResource + +Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. + +## Signature + +## Description + +Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. + + +@@@div { .callout } + +**emits** when there is demand and read @scala[function] @java[method] returns value + +**completes** when read function returns `None` + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/unfoldResourceAsync.md b/akka-docs/src/main/paradox/stream/reference/unfoldResourceAsync.md new file mode 100644 index 0000000000..f3dc9618d9 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/unfoldResourceAsync.md @@ -0,0 +1,22 @@ +# unfoldResourceAsync + +Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. + +## Signature + +## Description + +Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. +Functions return @scala[`Future`] @java[`CompletionStage`] to achieve asynchronous processing + + +@@@div { .callout } + +**emits** when there is demand and @scala[`Future`] @java[`CompletionStage`] from read function returns value + +**completes** when @scala[`Future`] @java[`CompletionStage`] from read function returns `None` + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/unzip.md b/akka-docs/src/main/paradox/stream/reference/unzip.md new file mode 100644 index 0000000000..bd081dabe1 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/unzip.md @@ -0,0 +1,23 @@ +# unzip + +Takes a stream of two element tuples and unzips the two elements ino two different downstreams. + +## Signature + +## Description + +Takes a stream of two element tuples and unzips the two elements ino two different downstreams. + + +@@@div { .callout } + +**emits** when all of the outputs stops backpressuring and there is an input element available + +**backpressures** when any of the outputs backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/unzipWith.md b/akka-docs/src/main/paradox/stream/reference/unzipWith.md new file mode 100644 index 0000000000..b25c7ee43c --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/unzipWith.md @@ -0,0 +1,23 @@ +# unzipWith + +unzipWith + +## Signature + +## Description + +Splits each element of input into multiple downstreams using a function + + +@@@div { .callout } + +**emits** when all of the outputs stops backpressuring and there is an input element available + +**backpressures** when any of the outputs backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/watch.md b/akka-docs/src/main/paradox/stream/reference/watch.md new file mode 100644 index 0000000000..f33ed90340 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/watch.md @@ -0,0 +1,25 @@ +# watch + +Watch a specific `ActorRef` and signal a failure downstream once the actor terminates. + +## Signature + +## Description + +Watch a specific `ActorRef` and signal a failure downstream once the actor terminates. +The signaled failure will be an @java[@javadoc:[WatchedActorTerminatedException](akka.stream.WatchedActorTerminatedException)] +@scala[@scaladoc[WatchedActorTerminatedException](akka.stream.WatchedActorTerminatedException)]. + + +@@@div { .callout } + +**emits** when upstream emits + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/watchTermination.md b/akka-docs/src/main/paradox/stream/reference/watchTermination.md new file mode 100644 index 0000000000..774a679a61 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/watchTermination.md @@ -0,0 +1,24 @@ +# watchTermination + +Materializes to a @scala[`Future`] @java[`CompletionStage`] that will be completed with Done or failed depending whether the upstream of the stage has been completed or failed. + +## Signature + +## Description + +Materializes to a @scala[`Future`] @java[`CompletionStage`] that will be completed with Done or failed depending whether the upstream of the stage has been completed or failed. +The stage otherwise passes through elements unchanged. + + +@@@div { .callout } + +**emits** when input has an element available + +**backpressures** when output backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/wireTap.md b/akka-docs/src/main/paradox/stream/reference/wireTap.md new file mode 100644 index 0000000000..6c7bd13dec --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/wireTap.md @@ -0,0 +1,29 @@ +# wireTap + +Attaches the given `Sink` to this `Flow` as a wire tap, meaning that elements that pass +through will also be sent to the wire-tap `Sink`, without the latter affecting the mainline flow. + +## Signature + +## Description + +Attaches the given `Sink` to this `Flow` as a wire tap, meaning that elements that pass +through will also be sent to the wire-tap `Sink`, without the latter affecting the mainline flow. +If the wire-tap `Sink` backpressures, elements that would've been sent to it will be dropped instead. + + +@@@div { .callout } + +**emits** element is available and demand exists from the downstream; the element will +also be sent to the wire-tap `Sink` if there is demand. + +**backpressures** downstream backpressures + +**completes** upstream completes + +**cancels** downstream cancels + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/zip.md b/akka-docs/src/main/paradox/stream/reference/zip.md new file mode 100644 index 0000000000..0e7bc4ef62 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/zip.md @@ -0,0 +1,23 @@ +# zip + +Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream. + +## Signature + +## Description + +Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream. + + +@@@div { .callout } + +**emits** when all of the inputs have an element available + +**backpressures** when downstream backpressures + +**completes** when any upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/zipN.md b/akka-docs/src/main/paradox/stream/reference/zipN.md new file mode 100644 index 0000000000..2a25f593c8 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/zipN.md @@ -0,0 +1,21 @@ +# zipN + +Combine the elements of multiple streams into a stream of sequences. + +## Signature + +## Description + +Combine the elements of multiple streams into a stream of sequences. + + +@@@div { .callout } + +**emits** when all of the inputs has an element available + +**completes** when any upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/zipWith.md b/akka-docs/src/main/paradox/stream/reference/zipWith.md new file mode 100644 index 0000000000..fc1fe01595 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/zipWith.md @@ -0,0 +1,25 @@ +# zipWith + +Combines elements from multiple sources through a `combine` function and passes the +returned value downstream. + +## Signature + +## Description + +Combines elements from multiple sources through a `combine` function and passes the +returned value downstream. + + +@@@div { .callout } + +**emits** when all of the inputs have an element available + +**backpressures** when downstream backpressures + +**completes** when any upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/zipWithIndex.md b/akka-docs/src/main/paradox/stream/reference/zipWithIndex.md new file mode 100644 index 0000000000..e94cd99a6d --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/zipWithIndex.md @@ -0,0 +1,23 @@ +# zipWithIndex + +Zips elements of current flow with its indices. + +## Signature + +## Description + +Zips elements of current flow with its indices. + + +@@@div { .callout } + +**emits** upstream emits an element and is paired with their index + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/reference/zipWithN.md b/akka-docs/src/main/paradox/stream/reference/zipWithN.md new file mode 100644 index 0000000000..7c6ce3c2f2 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/reference/zipWithN.md @@ -0,0 +1,21 @@ +# zipWithN + +Combine the elements of multiple streams into a stream of sequences using a combiner function. + +## Signature + +## Description + +Combine the elements of multiple streams into a stream of sequences using a combiner function. + + +@@@div { .callout } + +**emits** when all of the inputs has an element available + +**completes** when any upstream completes + +@@@ + +## Example + diff --git a/akka-docs/src/main/paradox/stream/stages-overview.md b/akka-docs/src/main/paradox/stream/stages-overview.md index 02b78be4c4..8effc11954 100644 --- a/akka-docs/src/main/paradox/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/stream/stages-overview.md @@ -1983,3 +1983,149 @@ event, and may therefore affect performance. --------------------------------------------------------------- +@@@ index + +* [fromIterator](reference/fromIterator.md) +* [apply](reference/apply.md) +* [single](reference/single.md) +* [repeat](reference/repeat.md) +* [cycle](reference/cycle.md) +* [tick](reference/tick.md) +* [fromFuture](reference/fromFuture.md) +* [fromCompletionStage](reference/fromCompletionStage.md) +* [fromFutureSource](reference/fromFutureSource.md) +* [fromSourceCompletionStage](reference/fromSourceCompletionStage.md) +* [unfold](reference/unfold.md) +* [unfoldAsync](reference/unfoldAsync.md) +* [empty](reference/empty.md) +* [maybe](reference/maybe.md) +* [failed](reference/failed.md) +* [lazily](reference/lazily.md) +* [lazilyAsync](reference/lazilyAsync.md) +* [actorRef](reference/actorRef.md) +* [range](reference/range.md) +* [combine](reference/combine.md) +* [unfoldResource](reference/unfoldResource.md) +* [unfoldResourceAsync](reference/unfoldResourceAsync.md) +* [queue](reference/queue.md) +* [asSubscriber](reference/asSubscriber.md) +* [fromPublisher](reference/fromPublisher.md) +* [zipN](reference/zipN.md) +* [zipWithN](reference/zipWithN.md) +* [head](reference/head.md) +* [headOption](reference/headOption.md) +* [last](reference/last.md) +* [lastOption](reference/lastOption.md) +* [ignore](reference/ignore.md) +* [cancelled](reference/cancelled.md) +* [seq](reference/seq.md) +* [foreach](reference/foreach.md) +* [foreachParallel](reference/foreachParallel.md) +* [onComplete](reference/onComplete.md) +* [lazyInitAsync](reference/lazyInitAsync.md) +* [queue](reference/queue.md) +* [fold](reference/fold.md) +* [reduce](reference/reduce.md) +* [combine](reference/combine.md) +* [actorRef](reference/actorRef.md) +* [actorRefWithAck](reference/actorRefWithAck.md) +* [asPublisher](reference/asPublisher.md) +* [fromSubscriber](reference/fromSubscriber.md) +* [preMaterialize](reference/preMaterialize.md) +* [fromOutputStream](reference/fromOutputStream.md) +* [asInputStream](reference/asInputStream.md) +* [fromInputStream](reference/fromInputStream.md) +* [asOutputStream](reference/asOutputStream.md) +* [asJavaStream](reference/asJavaStream.md) +* [fromJavaStream](reference/fromJavaStream.md) +* [javaCollector](reference/javaCollector.md) +* [javaCollectorParallelUnordered](reference/javaCollectorParallelUnordered.md) +* [fromPath](reference/fromPath.md) +* [toPath](reference/toPath.md) +* [alsoTo](reference/alsoTo.md) +* [map](reference/map.md) +* [mapConcat](reference/mapConcat.md) +* [statefulMapConcat](reference/statefulMapConcat.md) +* [filter](reference/filter.md) +* [filterNot](reference/filterNot.md) +* [collect](reference/collect.md) +* [collectType ](reference/collectType.md) +* [grouped](reference/grouped.md) +* [sliding](reference/sliding.md) +* [scan](reference/scan.md) +* [scanAsync](reference/scanAsync.md) +* [fold](reference/fold.md) +* [foldAsync](reference/foldAsync.md) +* [reduce](reference/reduce.md) +* [drop](reference/drop.md) +* [take](reference/take.md) +* [takeWhile](reference/takeWhile.md) +* [dropWhile](reference/dropWhile.md) +* [recover](reference/recover.md) +* [recoverWith](reference/recoverWith.md) +* [recoverWithRetries](reference/recoverWithRetries.md) +* [mapError](reference/mapError.md) +* [detach](reference/detach.md) +* [throttle](reference/throttle.md) +* [intersperse](reference/intersperse.md) +* [limit](reference/limit.md) +* [limitWeighted](reference/limitWeighted.md) +* [log](reference/log.md) +* [divertTo](reference/divertTo.md) +* [wireTap](reference/wireTap.md) +* [lazyInitAsync](reference/lazyInitAsync.md) +* [watch](reference/watch.md) +* [ask](reference/ask.md) +* [Flow.fromSinkAndSource](reference/Flow.fromSinkAndSource.md) +* [Flow.fromSinkAndSourceCoupled](reference/Flow.fromSinkAndSourceCoupled.md) +* [mapAsync](reference/mapAsync.md) +* [mapAsyncUnordered](reference/mapAsyncUnordered.md) +* [ask](reference/ask.md) +* [takeWithin](reference/takeWithin.md) +* [dropWithin](reference/dropWithin.md) +* [groupedWithin](reference/groupedWithin.md) +* [groupedWeightedWithin](reference/groupedWeightedWithin.md) +* [initialDelay](reference/initialDelay.md) +* [delay](reference/delay.md) +* [conflate](reference/conflate.md) +* [conflateWithSeed](reference/conflateWithSeed.md) +* [batch](reference/batch.md) +* [batchWeighted](reference/batchWeighted.md) +* [extrapolate](reference/extrapolate.md) +* [expand](reference/expand.md) +* [buffer (Backpressure)](reference/bufferBackpressure.md) +* [buffer (Drop)](reference/bufferDrop.md) +* [buffer (Fail)](reference/bufferFail.md) +* [prefixAndTail](reference/prefixAndTail.md) +* [groupBy](reference/groupBy.md) +* [splitWhen](reference/splitWhen.md) +* [splitAfter](reference/splitAfter.md) +* [flatMapConcat](reference/flatMapConcat.md) +* [flatMapMerge](reference/flatMapMerge.md) +* [initialTimeout](reference/initialTimeout.md) +* [completionTimeout](reference/completionTimeout.md) +* [idleTimeout](reference/idleTimeout.md) +* [backpressureTimeout](reference/backpressureTimeout.md) +* [keepAlive](reference/keepAlive.md) +* [initialDelay](reference/initialDelay.md) +* [merge](reference/merge.md) +* [mergeSorted](reference/mergeSorted.md) +* [mergePreferred](reference/mergePreferred.md) +* [mergePrioritized](reference/mergePrioritized.md) +* [zip](reference/zip.md) +* [zipWith](reference/zipWith.md) +* [zipWithIndex](reference/zipWithIndex.md) +* [concat](reference/concat.md) +* [++](reference/++.md) +* [prepend](reference/prepend.md) +* [orElse](reference/orElse.md) +* [interleave](reference/interleave.md) +* [unzip](reference/unzip.md) +* [unzipWith](reference/unzipWith.md) +* [broadcast](reference/broadcast.md) +* [balance](reference/balance.md) +* [partition](reference/partition.md) +* [watchTermination](reference/watchTermination.md) +* [monitor](reference/monitor.md) + +@@@ diff --git a/akka-stream-tests/src/test/scala/akka/stream/DocsStageCoverageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DocsStageCoverageSpec.scala new file mode 100644 index 0000000000..9dc48fffb6 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/DocsStageCoverageSpec.scala @@ -0,0 +1,97 @@ +/** + * Copyright (C) 2015-2018 Lightbend Inc. + */ + +package akka.stream + +import java.io.File +import java.lang.reflect.{Method, Modifier} +import java.nio.file.{Path, Paths} + +import akka.event.Logging +import org.scalatest.{Matchers, WordSpec} + +class DocsStageCoverageSpec extends WordSpec with Matchers { + + val sFlowClass: Class[_] = classOf[akka.stream.scaladsl.Flow[_, _, _]] + val jFlowClass: Class[_] = classOf[akka.stream.javadsl.Flow[_, _, _]] + + val sSubFlowClass: Class[_] = classOf[DslConsistencySpec.ScalaSubFlow[_, _, _]] + val jSubFlowClass: Class[_] = classOf[akka.stream.javadsl.SubFlow[_, _, _]] + + val sSourceClass: Class[_] = classOf[akka.stream.scaladsl.Source[_, _]] + val jSourceClass: Class[_] = classOf[akka.stream.javadsl.Source[_, _]] + + val sSubSourceClass: Class[_] = classOf[DslConsistencySpec.ScalaSubSource[_, _]] + val jSubSourceClass: Class[_] = classOf[akka.stream.javadsl.SubSource[_, _]] + + val sSinkClass: Class[_] = classOf[akka.stream.scaladsl.Sink[_, _]] + val jSinkClass: Class[_] = classOf[akka.stream.javadsl.Sink[_, _]] + + val jRunnableGraphClass: Class[_] = classOf[akka.stream.javadsl.RunnableGraph[_]] + val sRunnableGraphClass: Class[_] = classOf[akka.stream.scaladsl.RunnableGraph[_]] + + val ignore: Set[String] = + Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++ + Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++ + Set("create", "apply", "ops", "appendJava", "andThen", "andThenMat", "isIdentity", "withAttributes", "transformMaterializing") ++ + Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") + + val graphHelpers = Set("zipGraph", "zipWithGraph", "mergeGraph", "mergeSortedGraph", "interleaveGraph", "concatGraph", "prependGraph", "alsoToGraph", "wireTapGraph", "orElseGraph", "divertToGraph") + + val allowMissing: Map[Class[_], Set[String]] = Map( + jFlowClass → graphHelpers, + jSourceClass → (graphHelpers ++ Set("watch", "ask")), + // Java subflows can only be nested using .via and .to (due to type system restrictions) + jSubFlowClass → (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", "watch", "ask")), + jSubSourceClass → (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", "watch", "ask")), + + sFlowClass → Set("of"), + sSourceClass → Set("adapt", "from", "watch"), + sSinkClass → Set("adapt"), + sSubFlowClass → Set(), + sSubSourceClass → Set(), + + sRunnableGraphClass → Set("builder")) + + def materializing(m: Method): Boolean = m.getParameterTypes.contains(classOf[ActorMaterializer]) + + val DocsRoot = Paths.get("akka-docs/src/main/paradox/stream") + + def assertPageExists(c: Class[_], name: String): Unit = { + val operatorDocFile: Path = operatorDocFilePath(c, name) + assert(operatorDocFile.toFile.exists(), s"Expected [$operatorDocFile] to exist and document [$name] operator on [$c]") + } + + private def operatorDocFilePath(c: Class[_], name: String): Path = + DocsRoot.resolve(Logging.simpleName(c)).resolve(name + ".md") + + "Java and Scala DSLs" must { + + ("Source" → List[Class[_]](sSourceClass, jSourceClass)) :: + // ("SubSource" → List[Class[_]](sSubSourceClass, jSubSourceClass)) :: + ("Flow" → List[Class[_]](sFlowClass, jFlowClass)) :: + // ("SubFlow" → List[Class[_]](sSubFlowClass, jSubFlowClass)) :: + ("Sink" → List[Class[_]](sSinkClass, jSinkClass)) :: + ("RunnableFlow" → List[Class[_]](sRunnableGraphClass, jRunnableGraphClass)) :: + Nil foreach { + case (element, classes) ⇒ + + val allOps = + (for { + c ← classes + m ← c.getMethods + if !Modifier.isStatic(m.getModifiers) + if !ignore(m.getName) + if !m.getName.contains("$") + if !materializing(m) + } yield m.getName).toSet + + allOps foreach { opName ⇒ + s"document [$element] operation [$opName]" in { + assertPageExists(classes.head, opName) + } + } + } + } +}