diff --git a/akka-docs/src/main/paradox/stream/operators/Broadcast.md b/akka-docs/src/main/paradox/stream/operators/Broadcast.md index 7629ab0208..3a3a1e3c47 100644 --- a/akka-docs/src/main/paradox/stream/operators/Broadcast.md +++ b/akka-docs/src/main/paradox/stream/operators/Broadcast.md @@ -4,10 +4,34 @@ Emit each incoming element each of `n` outputs. @ref[Fan-out operators](index.md#fan-out-operators) +## Signature + +@apidoc[akka.stream.*.Broadcast] + ## Description Emit each incoming element each of `n` outputs. +## Example + +Here is an example that is using `Broadcast` to aggregate different values from a `Source` of integers. + +Scala +: @@snip [BroadcastDocExample.scala](/akka-docs/src/test/scala/docs/stream/operators/BroadcastDocExample.scala) { #broadcast } + +Java +: @@snip [BroadcastDocExample.java](/akka-docs/src/test/java/jdocs/stream/operators/BroadcastDocExample.java) { #import #broadcast } + +Note that asynchronous boundary for the output streams must be added explicitly if it's desired to run them in parallel. + +Scala +: @@snip [BroadcastDocExample.scala](/akka-docs/src/test/scala/docs/stream/operators/BroadcastDocExample.scala) { #broadcast-async } + +Java +: @@snip [BroadcastDocExample.java](/akka-docs/src/test/java/jdocs/stream/operators/BroadcastDocExample.java) { #broadcast-async } + + + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/BroadcastDocExample.java b/akka-docs/src/test/java/jdocs/stream/operators/BroadcastDocExample.java new file mode 100644 index 0000000000..e68308aef0 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/BroadcastDocExample.java @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package jdocs.stream.operators; + +import akka.actor.ActorSystem; + +// #import +import akka.NotUsed; +import akka.japi.tuple.Tuple3; +import akka.stream.ClosedShape; +import akka.stream.UniformFanOutShape; +import akka.stream.javadsl.Broadcast; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.GraphDSL; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.RunnableGraph; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import java.util.concurrent.CompletionStage; +// #import + +public class BroadcastDocExample { + + private final ActorSystem system = ActorSystem.create("PartitionDocExample"); + + void partitionExample() { + + // #broadcast + + Source source = Source.range(1, 10); + + Sink> countSink = + Flow.of(Integer.class).toMat(Sink.fold(0, (acc, elem) -> acc + 1), Keep.right()); + Sink> minSink = + Flow.of(Integer.class).toMat(Sink.fold(0, Math::min), Keep.right()); + Sink> maxSink = + Flow.of(Integer.class).toMat(Sink.fold(0, Math::max), Keep.right()); + + final Tuple3, CompletionStage, CompletionStage> + result = + RunnableGraph.fromGraph( + GraphDSL.create3( + countSink, + minSink, + maxSink, + Tuple3::create, + (builder, countS, minS, maxS) -> { + final UniformFanOutShape broadcast = + builder.add(Broadcast.create(3)); + builder.from(builder.add(source)).viaFanOut(broadcast); + builder.from(broadcast.out(0)).to(countS); + builder.from(broadcast.out(1)).to(minS); + builder.from(broadcast.out(2)).to(maxS); + return ClosedShape.getInstance(); + })) + .run(system); + + // #broadcast + + // #broadcast-async + RunnableGraph.fromGraph( + GraphDSL.create3( + countSink, + minSink, + maxSink, + Tuple3::create, + (builder, countS, minS, maxS) -> { + final UniformFanOutShape broadcast = + builder.add(Broadcast.create(3)); + builder.from(builder.add(source)).viaFanOut(broadcast); + + builder + .from(broadcast.out(0)) + .via(builder.add(Flow.of(Integer.class).async())) + .to(countS); + builder + .from(broadcast.out(1)) + .via(builder.add(Flow.of(Integer.class).async())) + .to(minS); + builder + .from(broadcast.out(2)) + .via(builder.add(Flow.of(Integer.class).async())) + .to(maxS); + return ClosedShape.getInstance(); + })); + // #broadcast-async + + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/BroadcastDocExample.scala b/akka-docs/src/test/scala/docs/stream/operators/BroadcastDocExample.scala new file mode 100644 index 0000000000..5c1344f175 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/BroadcastDocExample.scala @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package docs.stream.operators + +import java.util.concurrent.ThreadLocalRandom + +import scala.concurrent.Future + +import akka.actor.ActorSystem +import akka.stream.scaladsl.Broadcast +import akka.stream.scaladsl.Keep + +object BroadcastDocExample { + + implicit val system: ActorSystem = ??? + + //#broadcast + import akka.NotUsed + import akka.stream.ClosedShape + import akka.stream.scaladsl.Flow + import akka.stream.scaladsl.GraphDSL + import akka.stream.scaladsl.RunnableGraph + import akka.stream.scaladsl.Sink + import akka.stream.scaladsl.Source + + val source: Source[Int, NotUsed] = + Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(100))).take(100) + + val countSink: Sink[Int, Future[Int]] = Flow[Int].toMat(Sink.fold(0)((acc, elem) => acc + 1))(Keep.right) + val minSink: Sink[Int, Future[Int]] = Flow[Int].toMat(Sink.fold(0)((acc, elem) => math.min(acc, elem)))(Keep.right) + val maxSink: Sink[Int, Future[Int]] = Flow[Int].toMat(Sink.fold(0)((acc, elem) => math.max(acc, elem)))(Keep.right) + + val (count: Future[Int], min: Future[Int], max: Future[Int]) = + RunnableGraph + .fromGraph(GraphDSL.create(countSink, minSink, maxSink)(Tuple3.apply) { + implicit builder => (countS, minS, maxS) => + import GraphDSL.Implicits._ + val broadcast = builder.add(Broadcast[Int](3)) + source ~> broadcast + broadcast.out(0) ~> countS + broadcast.out(0) ~> minS + broadcast.out(0) ~> maxS + ClosedShape + }) + .run() + //#broadcast + + //#broadcast-async + RunnableGraph.fromGraph(GraphDSL.create(countSink, minSink, maxSink)(Tuple3.apply) { + implicit builder => (countS, minS, maxS) => + import GraphDSL.Implicits._ + val broadcast = builder.add(Broadcast[Int](3)) + source ~> broadcast + broadcast.out(0) ~> Flow[Int].async ~> countS + broadcast.out(0) ~> Flow[Int].async ~> minS + broadcast.out(0) ~> Flow[Int].async ~> maxS + ClosedShape + }) + //#broadcast-async +}