Add example for foldAsync (#29912)

This commit is contained in:
Nitika Agarwal 2021-01-13 12:31:58 +05:30 committed by GitHub
parent 7920694b81
commit 294661bde1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 75 additions and 4 deletions

View file

@ -13,7 +13,24 @@ Just like `fold` but receives a function that results in a @scala[`Future`] @jav
Just like `fold` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value. Just like `fold` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.
Note that the `zero` value must be immutable. @@@ warning
Note that the `zero` value must be immutable, because otherwise
the same mutable instance would be shared across different threads
when running the stream more than once.
@@@
## Example
`foldAsync` is typically used to 'fold up' the incoming values into an aggregate asynchronously.
For example, you might want to summarize the incoming values into a histogram:
Scala
: @@snip [FoldAsync.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FoldAsync.scala) { #imports #foldAsync }
Java
: @@snip [FoldAsync.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #foldAsync }
## Reactive Streams semantics ## Reactive Streams semantics

View file

@ -341,7 +341,7 @@ class SourceOrFlow {
} }
static static
// #fold // #fold // #foldAsync
class Histogram { class Histogram {
final long low; final long low;
final long high; final long high;
@ -354,6 +354,8 @@ class SourceOrFlow {
// Immutable start value // Immutable start value
public static Histogram INSTANCE = new Histogram(0L, 0L); public static Histogram INSTANCE = new Histogram(0L, 0L);
// #foldAsync
public Histogram add(int number) { public Histogram add(int number) {
if (number < 100) { if (number < 100) {
return new Histogram(low + 1L, high); return new Histogram(low + 1L, high);
@ -361,21 +363,44 @@ class SourceOrFlow {
return new Histogram(low, high + 1L); return new Histogram(low, high + 1L);
} }
} }
// #fold
// #foldAsync
public CompletionStage<Histogram> addAsync(Integer n) {
if (n < 100) {
return CompletableFuture.supplyAsync(() -> new Histogram(low + 1L, high));
} else {
return CompletableFuture.supplyAsync(() -> new Histogram(low, high + 1L));
}
}
// #fold
} }
// #fold // #fold // #foldAsync
void foldExample() { void foldExample() {
// #fold // #fold
// Folding over the numbers from 1 to 150: // Folding over the numbers from 1 to 150:
Source.range(1, 150) Source.range(1, 150)
.fold(Histogram.INSTANCE, (acc, n) -> acc.add(n)) .fold(Histogram.INSTANCE, Histogram::add)
.runForeach(h -> System.out.println("Histogram(" + h.low + ", " + h.high + ")"), system); .runForeach(h -> System.out.println("Histogram(" + h.low + ", " + h.high + ")"), system);
// Prints: Histogram(99, 51) // Prints: Histogram(99, 51)
// #fold // #fold
} }
void foldAsyncExample() {
// #foldAsync
// Folding over the numbers from 1 to 150:
Source.range(1, 150)
.foldAsync(Histogram.INSTANCE, Histogram::addAsync)
.runForeach(h -> System.out.println("Histogram(" + h.low + ", " + h.high + ")"), system);
// Prints: Histogram(99, 51)
// #foldAsync
}
void takeExample() { void takeExample() {
// #take // #take
Source.from(Arrays.asList(1, 2, 3, 4, 5)).take(3).runForeach(System.out::println, system); Source.from(Arrays.asList(1, 2, 3, 4, 5)).take(3).runForeach(System.out::println, system);

View file

@ -0,0 +1,29 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sourceorflow
//#imports
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import scala.concurrent.{ ExecutionContext, Future }
//#imports
object FoldAsync extends App {
implicit val system: ActorSystem = ActorSystem()
implicit val ec: ExecutionContext = system.dispatcher
//#foldAsync
case class Histogram(low: Long = 0, high: Long = 0) {
def add(i: Int): Future[Histogram] =
if (i < 100) Future { copy(low = low + 1) } else Future { copy(high = high + 1) }
}
Source(1 to 150).foldAsync(Histogram())((acc, n) => acc.add(n)).runForeach(println)
// Prints: Histogram(99,51)
//#foldAsync
}