diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/reduce.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/reduce.md index 7e2ba0b9a4..a5e35d1a65 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/reduce.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/reduce.md @@ -15,6 +15,18 @@ Start with first element and then apply the current and next value to the given Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream. Similar to `fold`. +## Example + +`reduce` will take a function and apply it on the incoming elements in the Stream and only emits its result when upstream completes. +Here, it will add the incoming elements. + +Scala +: @@snip [Reduce.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Reduce.scala) { #reduceExample } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #reduceExample } + + ## Reactive Streams semantics @@@div { .callout } @@ -26,4 +38,3 @@ complete the current value is emitted downstream. Similar to `fold`. **completes** when upstream completes @@@ - diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 97df80130d..3198ca4488 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -456,6 +456,15 @@ class SourceOrFlow { // #dropWhile } + static void reduceExample() { + // #reduceExample + Source source = Source.range(1, 100).reduce((acc, element) -> acc + element); + CompletionStage result = source.runWith(Sink.head(), system); + result.thenAccept(System.out::println); + // 5050 + // #reduceExample + } + void watchExample() { // #watch final ActorRef ref = someActor(); diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Reduce.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Reduce.scala new file mode 100644 index 0000000000..93517f22e1 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Reduce.scala @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package docs.stream.operators.sink + +import akka.actor.ActorSystem +import akka.stream.scaladsl.{ Sink, Source } + +import scala.concurrent.{ ExecutionContextExecutor, Future } + +object Reduce { + implicit val system: ActorSystem = ??? + implicit val ec: ExecutionContextExecutor = system.dispatcher + def reduceExample: Future[Unit] = { + //#reduceExample + val source = Source(1 to 100).reduce((acc, element) => acc + element) + val result: Future[Int] = source.runWith(Sink.head) + result.map(println) + //5050 + //#reduceExample + } +}