From 294661bde1549450f94456434b2bcac00cdfffab Mon Sep 17 00:00:00 2001 From: Nitika Agarwal <54163056+nitikagarw@users.noreply.github.com> Date: Wed, 13 Jan 2021 12:31:58 +0530 Subject: [PATCH] Add example for foldAsync (#29912) --- .../operators/Source-or-Flow/foldAsync.md | 19 +++++++++++- .../jdocs/stream/operators/SourceOrFlow.java | 31 +++++++++++++++++-- .../operators/sourceorflow/FoldAsync.scala | 29 +++++++++++++++++ 3 files changed, 75 insertions(+), 4 deletions(-) create mode 100644 akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FoldAsync.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/foldAsync.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/foldAsync.md index 983fa349a8..9955b4518c 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/foldAsync.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/foldAsync.md @@ -13,7 +13,24 @@ Just like `fold` but receives a function that results in a @scala[`Future`] @jav Just like `fold` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. -Note that the `zero` value must be immutable. +@@@ warning + +Note that the `zero` value must be immutable, because otherwise +the same mutable instance would be shared across different threads +when running the stream more than once. + +@@@ + +## Example + +`foldAsync` is typically used to 'fold up' the incoming values into an aggregate asynchronously. +For example, you might want to summarize the incoming values into a histogram: + +Scala +: @@snip [FoldAsync.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FoldAsync.scala) { #imports #foldAsync } + +Java +: @@snip [FoldAsync.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #foldAsync } ## Reactive Streams semantics diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 85bcaad57d..5c1a8b47cc 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -341,7 +341,7 @@ class SourceOrFlow { } static - // #fold + // #fold // #foldAsync class Histogram { final long low; final long high; @@ -354,6 +354,8 @@ class SourceOrFlow { // Immutable start value public static Histogram INSTANCE = new Histogram(0L, 0L); + // #foldAsync + public Histogram add(int number) { if (number < 100) { return new Histogram(low + 1L, high); @@ -361,21 +363,44 @@ class SourceOrFlow { return new Histogram(low, high + 1L); } } + // #fold + + // #foldAsync + public CompletionStage addAsync(Integer n) { + if (n < 100) { + return CompletableFuture.supplyAsync(() -> new Histogram(low + 1L, high)); + } else { + return CompletableFuture.supplyAsync(() -> new Histogram(low, high + 1L)); + } + } + // #fold } - // #fold + // #fold // #foldAsync void foldExample() { // #fold // Folding over the numbers from 1 to 150: Source.range(1, 150) - .fold(Histogram.INSTANCE, (acc, n) -> acc.add(n)) + .fold(Histogram.INSTANCE, Histogram::add) .runForeach(h -> System.out.println("Histogram(" + h.low + ", " + h.high + ")"), system); // Prints: Histogram(99, 51) // #fold } + void foldAsyncExample() { + // #foldAsync + + // Folding over the numbers from 1 to 150: + Source.range(1, 150) + .foldAsync(Histogram.INSTANCE, Histogram::addAsync) + .runForeach(h -> System.out.println("Histogram(" + h.low + ", " + h.high + ")"), system); + + // Prints: Histogram(99, 51) + // #foldAsync + } + void takeExample() { // #take Source.from(Arrays.asList(1, 2, 3, 4, 5)).take(3).runForeach(System.out::println, system); diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FoldAsync.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FoldAsync.scala new file mode 100644 index 0000000000..1b35462cec --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FoldAsync.scala @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +//#imports +import akka.actor.ActorSystem +import akka.stream.scaladsl.Source + +import scala.concurrent.{ ExecutionContext, Future } +//#imports + +object FoldAsync extends App { + + implicit val system: ActorSystem = ActorSystem() + implicit val ec: ExecutionContext = system.dispatcher + + //#foldAsync + case class Histogram(low: Long = 0, high: Long = 0) { + def add(i: Int): Future[Histogram] = + if (i < 100) Future { copy(low = low + 1) } else Future { copy(high = high + 1) } + } + + Source(1 to 150).foldAsync(Histogram())((acc, n) => acc.add(n)).runForeach(println) + + // Prints: Histogram(99,51) + //#foldAsync +}