diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scan.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scan.md index 864d56237d..2ec906a18b 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scan.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scan.md @@ -18,7 +18,13 @@ Emit its current value, which starts at `zero`, and then apply the current and n emitting the next current value. This means that `scan` emits one element downstream before, and upstream elements will not be requested until, the second element is required from downstream. -Note that the `zero` value must be immutable. +@@@ warning + +Note that the `zero` value must be immutable, because otherwise +the same mutable instance would be shared across different threads +when running the stream more than once. + +@@@ ## Reactive Streams semantics @@ -34,8 +40,10 @@ Note that the `zero` value must be immutable. ## Examples +Below example demonstrates how `scan` is similar to `fold`, but it keeps value from every iteration. + Scala -: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Scan.scala) { #scan } +: @@snip [Scan.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Scan.scala) { #scan } Java : @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #scan } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scanAsync.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scanAsync.md index 32913b644c..40b435760d 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scanAsync.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scanAsync.md @@ -16,7 +16,13 @@ Just like `scan` but receives a function that results in a @scala[`Future`] @jav Just like `scan` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. -Note that the `zero` value must be immutable. +@@@ warning + +Note that the `zero` value must be immutable, because otherwise +the same mutable instance would be shared across different threads +when running the stream more than once. + +@@@ ## Reactive Streams semantics @@ -30,3 +36,19 @@ Note that the `zero` value must be immutable. @@@ +## Examples + +Below example demonstrates how `scanAsync` is similar to `fold`, but it keeps value from every iteration. + +Scala +: @@snip [ScanAsync.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ScanAsync.scala) { #scanAsync } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #scanAsync } + +@@@ warning + +In an actual application the future would probably involve some external API that returns a @scala[`Future`] +@java[`CompletionStage`] rather than an immediately completed value. + +@@@ \ No newline at end of file diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 6791a95a26..4fc7c75083 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -42,6 +42,7 @@ import akka.stream.Attributes; import java.time.Duration; import java.util.Arrays; import java.util.Comparator; +import java.util.concurrent.CompletableFuture; class SourceOrFlow { private static ActorSystem system = null; @@ -186,6 +187,21 @@ class SourceOrFlow { // #scan } + void scanAsyncExample() { + // #scanAsync + Source source = Source.range(1, 5); + source + .scanAsync(0, (acc, x) -> CompletableFuture.completedFuture(acc + x)) + .runForeach(System.out::println, materializer); + // 0 (= 0) + // 1 (= 0 + 1) + // 3 (= 0 + 1 + 2) + // 6 (= 0 + 1 + 2 + 3) + // 10 (= 0 + 1 + 2 + 3 + 4) + // 15 (= 0 + 1 + 2 + 3 + 4 + 5) + // #scanAsync + } + static // #conflateWithSeed-type class Summed { diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ScanAsync.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ScanAsync.scala new file mode 100644 index 0000000000..1fffa8e031 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ScanAsync.scala @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow +import akka.stream.scaladsl.Source + +import scala.concurrent.Future + +object ScanAsync { + def scanAsyncExample(): Unit = { + import akka.actor.ActorSystem + + implicit val system: ActorSystem = ActorSystem() + + //#scanAsync + val source = Source(1 to 5) + source.scanAsync(0)((acc, x) => Future.successful(acc + x)).runForeach(println) + // 0 (= 0) + // 1 (= 0 + 1) + // 3 (= 0 + 1 + 2) + // 6 (= 0 + 1 + 2 + 3) + // 10 (= 0 + 1 + 2 + 3 + 4) + // 15 (= 0 + 1 + 2 + 3 + 4 + 5) + //#scanAsync + } + +}