diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scan.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scan.md index 2ec906a18b..c300126634 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scan.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scan.md @@ -26,6 +26,16 @@ when running the stream more than once. @@@ +## Examples + +Below example demonstrates how `scan` is similar to `fold`, but it keeps value from every iteration. + +Scala +: @@snip [Scan.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Scan.scala) { #scan } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #scan } + ## Reactive Streams semantics @@@div { .callout } @@ -37,13 +47,3 @@ when running the stream more than once. **completes** when upstream completes @@@ - -## Examples - -Below example demonstrates how `scan` is similar to `fold`, but it keeps value from every iteration. - -Scala -: @@snip [Scan.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Scan.scala) { #scan } - -Java -: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #scan } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scanAsync.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scanAsync.md index 40b435760d..8b43b7865e 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scanAsync.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scanAsync.md @@ -1,6 +1,6 @@ # scanAsync -Just like `scan` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. +Just like @ref[`scan`](./scan.md) but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. @ref[Simple operators](../index.md#simple-operators) @@ -24,6 +24,23 @@ when running the stream more than once. @@@ +## Example + +Below example demonstrates how `scanAsync` is similar to `fold`, but it keeps value from every iteration. + +Scala +: @@snip [ScanAsync.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ScanAsync.scala) { #scan-async } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #scan-async } + +@@@ warning + +In an actual application the future would probably involve some external API that returns a @scala[`Future`] +@java[`CompletionStage`] rather than an immediately completed value. + +@@@ + ## Reactive Streams semantics @@@div { .callout } @@ -35,20 +52,3 @@ when running the stream more than once. **completes** when upstream completes and the last @scala[`Future`] @java[`CompletionStage`] is resolved @@@ - -## Examples - -Below example demonstrates how `scanAsync` is similar to `fold`, but it keeps value from every iteration. - -Scala -: @@snip [ScanAsync.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ScanAsync.scala) { #scanAsync } - -Java -: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #scanAsync } - -@@@ warning - -In an actual application the future would probably involve some external API that returns a @scala[`Future`] -@java[`CompletionStage`] rather than an immediately completed value. - -@@@ \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 0e7ae92a6a..a2b68c7e66 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -172,7 +172,7 @@ depending on being backpressured by downstream or not. |Source/Flow|@ref[recoverWithRetries](Source-or-Flow/recoverWithRetries.md)|RecoverWithRetries allows to switch to alternative Source on flow failure.| |Source/Flow|@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.| |Source/Flow|@ref[scan](Source-or-Flow/scan.md)|Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value.| -|Source/Flow|@ref[scanAsync](Source-or-Flow/scanAsync.md)|Just like `scan` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.| +|Source/Flow|@ref[scanAsync](Source-or-Flow/scanAsync.md)|Just like @ref[`scan`](Source-or-Flow/./scan.md) but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.| |Source/Flow|@ref[setup](Source-or-Flow/setup.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`| |Source/Flow|@ref[sliding](Source-or-Flow/sliding.md)|Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.| |Source/Flow|@ref[statefulMapConcat](Source-or-Flow/statefulMapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.| diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 4fc7c75083..67a45ad040 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -43,6 +43,7 @@ import java.time.Duration; import java.util.Arrays; import java.util.Comparator; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; class SourceOrFlow { private static ActorSystem system = null; @@ -187,19 +188,23 @@ class SourceOrFlow { // #scan } + // #scan-async + CompletionStage asyncFunction(int acc, int next) { + return CompletableFuture.supplyAsync(() -> acc + next); + } + // #scan-async + void scanAsyncExample() { - // #scanAsync + // #scan-async Source source = Source.range(1, 5); - source - .scanAsync(0, (acc, x) -> CompletableFuture.completedFuture(acc + x)) - .runForeach(System.out::println, materializer); + source.scanAsync(0, (acc, x) -> asyncFunction(acc, x)).runForeach(System.out::println, system); // 0 (= 0) // 1 (= 0 + 1) // 3 (= 0 + 1 + 2) // 6 (= 0 + 1 + 2 + 3) // 10 (= 0 + 1 + 2 + 3 + 4) // 15 (= 0 + 1 + 2 + 3 + 4 + 5) - // #scanAsync + // #scan-async } static // #conflateWithSeed-type diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ScanAsync.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ScanAsync.scala index 1fffa8e031..4c89decb85 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ScanAsync.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ScanAsync.scala @@ -3,26 +3,34 @@ */ package docs.stream.operators.sourceorflow + import akka.stream.scaladsl.Source +import scala.concurrent.ExecutionContext import scala.concurrent.Future object ScanAsync { + def scanAsyncExample(): Unit = { import akka.actor.ActorSystem implicit val system: ActorSystem = ActorSystem() + implicit val ec: ExecutionContext = system.dispatcher + + //#scan-async + def asyncFunction(acc: Int, next: Int): Future[Int] = Future { + acc + next + } - //#scanAsync val source = Source(1 to 5) - source.scanAsync(0)((acc, x) => Future.successful(acc + x)).runForeach(println) + source.scanAsync(0)((acc, x) => asyncFunction(acc, x)).runForeach(println) // 0 (= 0) // 1 (= 0 + 1) // 3 (= 0 + 1 + 2) // 6 (= 0 + 1 + 2 + 3) // 10 (= 0 + 1 + 2 + 3 + 4) // 15 (= 0 + 1 + 2 + 3 + 4 + 5) - //#scanAsync + //#scan-async } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 2115ec95b5..aebbc30a27 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1372,7 +1372,7 @@ trait FlowOps[+Out, +Mat] { def scan[T](zero: T)(f: (T, Out) => T): Repr[T] = via(Scan(zero, f)) /** - * Similar to `scan` but with a asynchronous function, + * Similar to `scan` but with an asynchronous function, * emits its current value which starts at `zero` and then * applies the current and next value to the given function `f`, * emitting a `Future` that resolves to the next current value. @@ -1389,7 +1389,7 @@ trait FlowOps[+Out, +Mat] { * * Note that the `zero` value must be immutable. * - * '''Emits when''' the future returned by f` completes + * '''Emits when''' the future returned by `f` completes * * '''Backpressures when''' downstream backpressures *