Add scanAsync examples to akka-docs (#25468) (#27990)

* Add scanAsync examples to akka-docs (#25468)

* Sync scan with scanAsync in akka-docs (#25468)
This commit is contained in:
Lim Chee Hau 2019-11-26 10:57:12 +01:00 committed by Arnout Engelen
parent 2ee63f5386
commit 444a86291e
4 changed files with 77 additions and 3 deletions

View file

@ -18,7 +18,13 @@ Emit its current value, which starts at `zero`, and then apply the current and n
emitting the next current value. This means that `scan` emits one element downstream before, and upstream elements emitting the next current value. This means that `scan` emits one element downstream before, and upstream elements
will not be requested until, the second element is required from downstream. will not be requested until, the second element is required from downstream.
Note that the `zero` value must be immutable. @@@ warning
Note that the `zero` value must be immutable, because otherwise
the same mutable instance would be shared across different threads
when running the stream more than once.
@@@
## Reactive Streams semantics ## Reactive Streams semantics
@ -34,8 +40,10 @@ Note that the `zero` value must be immutable.
## Examples ## Examples
Below example demonstrates how `scan` is similar to `fold`, but it keeps value from every iteration.
Scala Scala
: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Scan.scala) { #scan } : @@snip [Scan.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Scan.scala) { #scan }
Java Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #scan } : @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #scan }

View file

@ -16,7 +16,13 @@ Just like `scan` but receives a function that results in a @scala[`Future`] @jav
Just like `scan` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. Just like `scan` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.
Note that the `zero` value must be immutable. @@@ warning
Note that the `zero` value must be immutable, because otherwise
the same mutable instance would be shared across different threads
when running the stream more than once.
@@@
## Reactive Streams semantics ## Reactive Streams semantics
@ -30,3 +36,19 @@ Note that the `zero` value must be immutable.
@@@ @@@
## Examples
Below example demonstrates how `scanAsync` is similar to `fold`, but it keeps value from every iteration.
Scala
: @@snip [ScanAsync.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ScanAsync.scala) { #scanAsync }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #scanAsync }
@@@ warning
In an actual application the future would probably involve some external API that returns a @scala[`Future`]
@java[`CompletionStage`] rather than an immediately completed value.
@@@

View file

@ -42,6 +42,7 @@ import akka.stream.Attributes;
import java.time.Duration; import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.concurrent.CompletableFuture;
class SourceOrFlow { class SourceOrFlow {
private static ActorSystem system = null; private static ActorSystem system = null;
@ -186,6 +187,21 @@ class SourceOrFlow {
// #scan // #scan
} }
void scanAsyncExample() {
// #scanAsync
Source<Integer, NotUsed> source = Source.range(1, 5);
source
.scanAsync(0, (acc, x) -> CompletableFuture.completedFuture(acc + x))
.runForeach(System.out::println, materializer);
// 0 (= 0)
// 1 (= 0 + 1)
// 3 (= 0 + 1 + 2)
// 6 (= 0 + 1 + 2 + 3)
// 10 (= 0 + 1 + 2 + 3 + 4)
// 15 (= 0 + 1 + 2 + 3 + 4 + 5)
// #scanAsync
}
static // #conflateWithSeed-type static // #conflateWithSeed-type
class Summed { class Summed {

View file

@ -0,0 +1,28 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sourceorflow
import akka.stream.scaladsl.Source
import scala.concurrent.Future
object ScanAsync {
def scanAsyncExample(): Unit = {
import akka.actor.ActorSystem
implicit val system: ActorSystem = ActorSystem()
//#scanAsync
val source = Source(1 to 5)
source.scanAsync(0)((acc, x) => Future.successful(acc + x)).runForeach(println)
// 0 (= 0)
// 1 (= 0 + 1)
// 3 (= 0 + 1 + 2)
// 6 (= 0 + 1 + 2 + 3)
// 10 (= 0 + 1 + 2 + 3 + 4)
// 15 (= 0 + 1 + 2 + 3 + 4 + 5)
//#scanAsync
}
}