diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/fold.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/fold.md index 7b5b97a6f2..cbdd8f34b7 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/fold.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/fold.md @@ -1,6 +1,6 @@ # fold -Start with current value `zero` and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream. +Start with current value `zero` and then apply the current and next value to the given function. When upstream completes, the current value is emitted downstream. @ref[Simple operators](../index.md#simple-operators) @@ -14,8 +14,10 @@ Start with current value `zero` and then apply the current and next value to the ## Description -Start with current value `zero` and then apply the current and next value to the given function, when upstream -complete the current value is emitted downstream. +Start with current value `zero` and then apply the current and next value to the given function. When upstream +completes, the current value is emitted downstream. + +Note that the `zero` value must be immutable. @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/foldAsync.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/foldAsync.md index 2178d3bf70..7d3f2a41d1 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/foldAsync.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/foldAsync.md @@ -1,6 +1,6 @@ # foldAsync -Just like `fold` but receiving a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. +Just like `fold` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. @ref[Simple operators](../index.md#simple-operators) @@ -14,7 +14,9 @@ Just like `fold` but receiving a function that results in a @scala[`Future`] @ja ## Description -Just like `fold` but receiving a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. +Just like `fold` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. + +Note that the `zero` value must be immutable. @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scan.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scan.md index 8f5a91d295..27809cc26b 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scan.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scan.md @@ -1,6 +1,6 @@ # scan -Emit its current value which starts at `zero` and then applies the current and next value to the given function emitting the next current value. +Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value. @ref[Simple operators](../index.md#simple-operators) @@ -14,11 +14,11 @@ Emit its current value which starts at `zero` and then applies the current and n ## Description -Emit its current value which starts at `zero` and then applies the current and next value to the given function -emitting the next current value. +Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, +emitting the next current value. This means that `scan` emits one element downstream before, and upstream elements +will not be requested until, the second element is required from downstream. -Note that this means that scan emits one element downstream before and upstream elements will not be requested until -the second element is required from downstream. +Note that the `zero` value must be immutable. @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scanAsync.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scanAsync.md index 05834c223d..36e7495392 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scanAsync.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scanAsync.md @@ -1,6 +1,6 @@ # scanAsync -Just like `scan` but receiving a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. +Just like `scan` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. @ref[Simple operators](../index.md#simple-operators) @@ -14,7 +14,9 @@ Just like `scan` but receiving a function that results in a @scala[`Future`] @ja ## Description -Just like `scan` but receiving a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. +Just like `scan` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. + +Note that the `zero` value must be immutable. @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 0f31d28c37..4783e38516 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -126,8 +126,8 @@ depending on being backpressured by downstream or not. |Source/Flow|@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements as long as a predicate function return true for the element| |Source/Flow|@ref[filter](Source-or-Flow/filter.md)|Filter the incoming elements using a predicate.| |Source/Flow|@ref[filterNot](Source-or-Flow/filterNot.md)|Filter the incoming elements using a predicate.| -|Source/Flow|@ref[fold](Source-or-Flow/fold.md)|Start with current value `zero` and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.| -|Source/Flow|@ref[foldAsync](Source-or-Flow/foldAsync.md)|Just like `fold` but receiving a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.| +|Source/Flow|@ref[fold](Source-or-Flow/fold.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes, the current value is emitted downstream.| +|Source/Flow|@ref[foldAsync](Source-or-Flow/foldAsync.md)|Just like `fold` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.| |Source/Flow|@ref[grouped](Source-or-Flow/grouped.md)|Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.| |Source/Flow|@ref[intersperse](Source-or-Flow/intersperse.md)|Intersperse stream with provided element similar to `List.mkString`.| |Flow|@ref[lazyInitAsync](Flow/lazyInitAsync.md)|Creates a real `Flow` upon receiving the first element by calling relevant `flowFactory` given as an argument.| @@ -141,8 +141,8 @@ depending on being backpressured by downstream or not. |Source/Flow|@ref[recoverWith](Source-or-Flow/recoverWith.md)|Allow switching to alternative Source when a failure has happened upstream.| |Source/Flow|@ref[recoverWithRetries](Source-or-Flow/recoverWithRetries.md)|RecoverWithRetries allows to switch to alternative Source on flow failure.| |Source/Flow|@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.| -|Source/Flow|@ref[scan](Source-or-Flow/scan.md)|Emit its current value which starts at `zero` and then applies the current and next value to the given function emitting the next current value.| -|Source/Flow|@ref[scanAsync](Source-or-Flow/scanAsync.md)|Just like `scan` but receiving a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.| +|Source/Flow|@ref[scan](Source-or-Flow/scan.md)|Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value.| +|Source/Flow|@ref[scanAsync](Source-or-Flow/scanAsync.md)|Just like `scan` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.| |Source/Flow|@ref[sliding](Source-or-Flow/sliding.md)|Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.| |Source/Flow|@ref[statefulMapConcat](Source-or-Flow/statefulMapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.| |Source/Flow|@ref[take](Source-or-Flow/take.md)|Pass `n` incoming elements downstream and then complete| diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index d4a0a9bba3..e2e6c27db9 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -879,6 +879,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' the function scanning the element returns a new element * * '''Backpressures when''' downstream backpressures @@ -906,6 +908,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' the future returned by f` completes * * '''Backpressures when''' downstream backpressures @@ -930,6 +934,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * [[akka.stream.Supervision#restart]] current value starts at `zero` again * the stream will continue. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' upstream completes * * '''Backpressures when''' downstream backpressures @@ -952,6 +958,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * [[akka.stream.Supervision.Restart]] current value starts at `zero` again * the stream will continue. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' upstream completes * * '''Backpressures when''' downstream backpressures diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index bb322a271f..2b0c94ea52 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1554,6 +1554,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' the function scanning the element returns a new element * * '''Backpressures when''' downstream backpressures @@ -1581,6 +1583,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' the future returned by f` completes * * '''Backpressures when''' downstream backpressures @@ -1593,6 +1597,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ */ def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] = new Source(delegate.scanAsync(zero) { (out, in) ⇒ f(out, in).toScala }) + /** * Similar to `scan` but only emits its result when the upstream completes, * after which it also completes. Applies the given function `f` towards its current and next value, @@ -1604,6 +1609,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * [[akka.stream.Supervision#restart]] current value starts at `zero` again * the stream will continue. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' upstream completes * * '''Backpressures when''' downstream backpressures @@ -1626,6 +1633,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * [[akka.stream.Supervision.Restart]] current value starts at `zero` again * the stream will continue. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' upstream completes * * '''Backpressures when''' downstream backpressures diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index b3d65aa8ed..fc293642b4 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -457,6 +457,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' the function scanning the element returns a new element * * '''Backpressures when''' downstream backpressures @@ -484,6 +486,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' the future returned by f` completes * * '''Backpressures when''' downstream backpressures @@ -508,6 +512,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * [[akka.stream.Supervision#restart]] current value starts at `zero` again * the stream will continue. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' upstream completes * * '''Backpressures when''' downstream backpressures @@ -530,6 +536,8 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * [[akka.stream.Supervision.Restart]] current value starts at `zero` again * the stream will continue. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' upstream completes * * '''Backpressures when''' downstream backpressures diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 4cb0e7f51f..c0f7369dd9 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -447,6 +447,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' the function scanning the element returns a new element * * '''Backpressures when''' downstream backpressures @@ -474,6 +476,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' the future returned by f` completes * * '''Backpressures when''' downstream backpressures @@ -498,6 +502,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * [[akka.stream.Supervision#restart]] current value starts at `zero` again * the stream will continue. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' upstream completes * * '''Backpressures when''' downstream backpressures @@ -518,6 +524,8 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * [[akka.stream.Supervision.Restart]] current value starts at `zero` again * the stream will continue. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' upstream completes * * '''Backpressures when''' downstream backpressures diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 8ec87f0a4f..3e8c129bff 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1227,6 +1227,8 @@ trait FlowOps[+Out, +Mat] { * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' the function scanning the element returns a new element * * '''Backpressures when''' downstream backpressures @@ -1255,6 +1257,8 @@ trait FlowOps[+Out, +Mat] { * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' the future returned by f` completes * * '''Backpressures when''' downstream backpressures @@ -1278,6 +1282,8 @@ trait FlowOps[+Out, +Mat] { * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' upstream completes * * '''Backpressures when''' downstream backpressures @@ -1301,6 +1307,8 @@ trait FlowOps[+Out, +Mat] { * [[akka.stream.Supervision.Restart]] current value starts at `zero` again * the stream will continue. * + * Note that the `zero` value must be immutable. + * * '''Emits when''' upstream completes * * '''Backpressures when''' downstream backpressures