diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/reduce.md b/akka-docs/src/main/paradox/stream/operators/Sink/reduce.md index 2f9f47fe75..0576236312 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/reduce.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/reduce.md @@ -27,3 +27,10 @@ Materializes into a @scala[`Future`] @java[`CompletionStage`] that will be compl @@@ +## Example + +Scala +: @@snip [SinkReduceSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala) { #reduce-operator-example } + +Java +: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #reduce-operator-example } \ No newline at end of file diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java b/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java new file mode 100644 index 0000000000..759585d769 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package jdocs.stream.operators; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; + +import java.util.Arrays; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class SinkDocExamples { + + static void reduceExample() throws InterruptedException, ExecutionException, TimeoutException { + + final ActorSystem system = ActorSystem.create("SourceFromExample"); + final Materializer materializer = ActorMaterializer.create(system); + //#reduce-operator-example + Source ints = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + CompletionStage sum = ints.runWith(Sink.reduce((a, b) -> a + b), materializer); + int result = sum.toCompletableFuture().get(3, TimeUnit.SECONDS); + System.out.println(result); + // 55 + //#reduce-operator-example + } +} + diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index df94676d3a..6e943fd00d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -324,6 +324,18 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } } + "The reduce sink" must { + "sum up 1 to 10 correctly" in { + //#reduce-operator-example + val source = Source(1 to 10) + val result = source.runWith(Sink.reduce[Int]((a, b) ⇒ a + b)) + result.map(println)(system.dispatcher) + // 55 + //#reduce-operator-example + assert(result.futureValue == (1 to 10 sum)) + } + } + "Sink pre-materialization" must { "materialize the sink and wrap its exposed publisher in a Source" in { val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](false)