From 548b3589f2cf541dc45c076820515343469a8f67 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Mon, 18 Nov 2019 18:02:44 +0200 Subject: [PATCH] doc: Flow.fold (#27850) --- .../stream/operators/Source-or-Flow/fold.md | 18 ++++++++- .../jdocs/stream/operators/SourceOrFlow.java | 38 +++++++++++++++++++ .../stream/operators/sourceorflow/Fold.scala | 27 +++++++++++++ 3 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Fold.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/fold.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/fold.md index a12b4ecf4b..12628fe8dc 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/fold.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/fold.md @@ -17,7 +17,23 @@ Start with current value `zero` and then apply the current and next value to the Start with current value `zero` and then apply the current and next value to the given function. When upstream completes, the current value is emitted downstream. -Note that the `zero` value must be immutable. +@@@ warning + +Note that the `zero` value must be immutable, because otherwise +the same mutable instance would be shared across different threads +when running the stream more than once. + +@@@ + +## Example + +`fold` is typically used to 'fold up' the incoming values into an aggregate. For example, you might want to summarize the incoming values into a histogram: + +Scala +: @@snip [Fold.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Fold.scala) { #imports #histogram #fold } + +Java +: @@snip [Fold.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #fold } ## Reactive Streams semantics diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 945db072b5..833ada4070 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -4,6 +4,7 @@ package jdocs.stream.operators; +import akka.actor.ActorSystem; import akka.japi.pf.PFBuilder; import akka.stream.Materializer; import akka.stream.javadsl.Flow; @@ -45,6 +46,7 @@ import java.util.Comparator; class SourceOrFlow { private static Materializer materializer = null; + private static ActorSystem system = null; void logExample() { Flow.of(String.class) @@ -271,4 +273,40 @@ class SourceOrFlow { // 7 (= 7) // #grouped } + + static + // #fold + class Histogram { + final long low; + final long high; + + private Histogram(long low, long high) { + this.low = low; + this.high = high; + } + + // Immutable start value + public static Histogram INSTANCE = new Histogram(0L, 0L); + + public Histogram add(int number) { + if (number < 100) { + return new Histogram(low + 1L, high); + } else { + return new Histogram(low, high + 1L); + } + } + } + // #fold + + void foldExample() { + // #fold + + // Folding over the numbers from 1 to 150: + Source.range(1, 150) + .fold(Histogram.INSTANCE, (acc, n) -> acc.add(n)) + .runForeach(h -> System.out.println("Histogram(" + h.low + ", " + h.high + ")"), system); + + // Prints: Histogram(99, 51) + // #fold + } } diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Fold.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Fold.scala new file mode 100644 index 0000000000..10fcb6e0ea --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Fold.scala @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +//#imports +import akka.actor.ActorSystem +import akka.stream.scaladsl.Source + +//#imports +object Fold extends App { + + //#histogram + case class Histogram(low: Long = 0, high: Long = 0) { + def add(i: Int): Histogram = if (i < 100) copy(low = low + 1) else copy(high = high + 1) + } + //#histogram + + implicit val sys = ActorSystem() + + //#fold + Source(1 to 150).fold(Histogram())((acc, n) => acc.add(n)).runForeach(println) + + // Prints: Histogram(99,51) + //#fold +}