Document scanAsync (#28239)

* Document scanAsync

* Include async function in scanAsync example
This commit is contained in:
Christopher Batey 2019-11-26 11:25:20 +00:00 committed by Arnout Engelen
parent 82db446bb7
commit 09a97ce1c9
6 changed files with 52 additions and 39 deletions

View file

@ -26,6 +26,16 @@ when running the stream more than once.
@@@ @@@
## Examples
Below example demonstrates how `scan` is similar to `fold`, but it keeps value from every iteration.
Scala
: @@snip [Scan.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Scan.scala) { #scan }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #scan }
## Reactive Streams semantics ## Reactive Streams semantics
@@@div { .callout } @@@div { .callout }
@ -37,13 +47,3 @@ when running the stream more than once.
**completes** when upstream completes **completes** when upstream completes
@@@ @@@
## Examples
Below example demonstrates how `scan` is similar to `fold`, but it keeps value from every iteration.
Scala
: @@snip [Scan.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Scan.scala) { #scan }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #scan }

View file

@ -1,6 +1,6 @@
# scanAsync # scanAsync
Just like `scan` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. Just like @ref[`scan`](./scan.md) but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.
@ref[Simple operators](../index.md#simple-operators) @ref[Simple operators](../index.md#simple-operators)
@ -24,6 +24,23 @@ when running the stream more than once.
@@@ @@@
## Example
Below example demonstrates how `scanAsync` is similar to `fold`, but it keeps value from every iteration.
Scala
: @@snip [ScanAsync.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ScanAsync.scala) { #scan-async }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #scan-async }
@@@ warning
In an actual application the future would probably involve some external API that returns a @scala[`Future`]
@java[`CompletionStage`] rather than an immediately completed value.
@@@
## Reactive Streams semantics ## Reactive Streams semantics
@@@div { .callout } @@@div { .callout }
@ -35,20 +52,3 @@ when running the stream more than once.
**completes** when upstream completes and the last @scala[`Future`] @java[`CompletionStage`] is resolved **completes** when upstream completes and the last @scala[`Future`] @java[`CompletionStage`] is resolved
@@@ @@@
## Examples
Below example demonstrates how `scanAsync` is similar to `fold`, but it keeps value from every iteration.
Scala
: @@snip [ScanAsync.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ScanAsync.scala) { #scanAsync }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #scanAsync }
@@@ warning
In an actual application the future would probably involve some external API that returns a @scala[`Future`]
@java[`CompletionStage`] rather than an immediately completed value.
@@@

View file

@ -172,7 +172,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|<a name="recoverwithretries"></a>@ref[recoverWithRetries](Source-or-Flow/recoverWithRetries.md)|RecoverWithRetries allows to switch to alternative Source on flow failure.| |Source/Flow|<a name="recoverwithretries"></a>@ref[recoverWithRetries](Source-or-Flow/recoverWithRetries.md)|RecoverWithRetries allows to switch to alternative Source on flow failure.|
|Source/Flow|<a name="reduce"></a>@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.| |Source/Flow|<a name="reduce"></a>@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.|
|Source/Flow|<a name="scan"></a>@ref[scan](Source-or-Flow/scan.md)|Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value.| |Source/Flow|<a name="scan"></a>@ref[scan](Source-or-Flow/scan.md)|Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value.|
|Source/Flow|<a name="scanasync"></a>@ref[scanAsync](Source-or-Flow/scanAsync.md)|Just like `scan` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.| |Source/Flow|<a name="scanasync"></a>@ref[scanAsync](Source-or-Flow/scanAsync.md)|Just like @ref[`scan`](Source-or-Flow/./scan.md) but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.|
|Source/Flow|<a name="setup"></a>@ref[setup](Source-or-Flow/setup.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`| |Source/Flow|<a name="setup"></a>@ref[setup](Source-or-Flow/setup.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`|
|Source/Flow|<a name="sliding"></a>@ref[sliding](Source-or-Flow/sliding.md)|Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.| |Source/Flow|<a name="sliding"></a>@ref[sliding](Source-or-Flow/sliding.md)|Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.|
|Source/Flow|<a name="statefulmapconcat"></a>@ref[statefulMapConcat](Source-or-Flow/statefulMapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.| |Source/Flow|<a name="statefulmapconcat"></a>@ref[statefulMapConcat](Source-or-Flow/statefulMapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.|

View file

@ -43,6 +43,7 @@ import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
class SourceOrFlow { class SourceOrFlow {
private static ActorSystem system = null; private static ActorSystem system = null;
@ -187,19 +188,23 @@ class SourceOrFlow {
// #scan // #scan
} }
// #scan-async
CompletionStage<Integer> asyncFunction(int acc, int next) {
return CompletableFuture.supplyAsync(() -> acc + next);
}
// #scan-async
void scanAsyncExample() { void scanAsyncExample() {
// #scanAsync // #scan-async
Source<Integer, NotUsed> source = Source.range(1, 5); Source<Integer, NotUsed> source = Source.range(1, 5);
source source.scanAsync(0, (acc, x) -> asyncFunction(acc, x)).runForeach(System.out::println, system);
.scanAsync(0, (acc, x) -> CompletableFuture.completedFuture(acc + x))
.runForeach(System.out::println, materializer);
// 0 (= 0) // 0 (= 0)
// 1 (= 0 + 1) // 1 (= 0 + 1)
// 3 (= 0 + 1 + 2) // 3 (= 0 + 1 + 2)
// 6 (= 0 + 1 + 2 + 3) // 6 (= 0 + 1 + 2 + 3)
// 10 (= 0 + 1 + 2 + 3 + 4) // 10 (= 0 + 1 + 2 + 3 + 4)
// 15 (= 0 + 1 + 2 + 3 + 4 + 5) // 15 (= 0 + 1 + 2 + 3 + 4 + 5)
// #scanAsync // #scan-async
} }
static // #conflateWithSeed-type static // #conflateWithSeed-type

View file

@ -3,26 +3,34 @@
*/ */
package docs.stream.operators.sourceorflow package docs.stream.operators.sourceorflow
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import scala.concurrent.ExecutionContext
import scala.concurrent.Future import scala.concurrent.Future
object ScanAsync { object ScanAsync {
def scanAsyncExample(): Unit = { def scanAsyncExample(): Unit = {
import akka.actor.ActorSystem import akka.actor.ActorSystem
implicit val system: ActorSystem = ActorSystem() implicit val system: ActorSystem = ActorSystem()
implicit val ec: ExecutionContext = system.dispatcher
//#scan-async
def asyncFunction(acc: Int, next: Int): Future[Int] = Future {
acc + next
}
//#scanAsync
val source = Source(1 to 5) val source = Source(1 to 5)
source.scanAsync(0)((acc, x) => Future.successful(acc + x)).runForeach(println) source.scanAsync(0)((acc, x) => asyncFunction(acc, x)).runForeach(println)
// 0 (= 0) // 0 (= 0)
// 1 (= 0 + 1) // 1 (= 0 + 1)
// 3 (= 0 + 1 + 2) // 3 (= 0 + 1 + 2)
// 6 (= 0 + 1 + 2 + 3) // 6 (= 0 + 1 + 2 + 3)
// 10 (= 0 + 1 + 2 + 3 + 4) // 10 (= 0 + 1 + 2 + 3 + 4)
// 15 (= 0 + 1 + 2 + 3 + 4 + 5) // 15 (= 0 + 1 + 2 + 3 + 4 + 5)
//#scanAsync //#scan-async
} }
} }

View file

@ -1372,7 +1372,7 @@ trait FlowOps[+Out, +Mat] {
def scan[T](zero: T)(f: (T, Out) => T): Repr[T] = via(Scan(zero, f)) def scan[T](zero: T)(f: (T, Out) => T): Repr[T] = via(Scan(zero, f))
/** /**
* Similar to `scan` but with a asynchronous function, * Similar to `scan` but with an asynchronous function,
* emits its current value which starts at `zero` and then * emits its current value which starts at `zero` and then
* applies the current and next value to the given function `f`, * applies the current and next value to the given function `f`,
* emitting a `Future` that resolves to the next current value. * emitting a `Future` that resolves to the next current value.
@ -1389,7 +1389,7 @@ trait FlowOps[+Out, +Mat] {
* *
* Note that the `zero` value must be immutable. * Note that the `zero` value must be immutable.
* *
* '''Emits when''' the future returned by f` completes * '''Emits when''' the future returned by `f` completes
* *
* '''Backpressures when''' downstream backpressures * '''Backpressures when''' downstream backpressures
* *