diff --git a/akka-docs/src/test/java/jdocs/stream/operators/BroadcastDocExample.java b/akka-docs/src/test/java/jdocs/stream/operators/BroadcastDocExample.java index e893e6a58a..2bb775d763 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/BroadcastDocExample.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/BroadcastDocExample.java @@ -23,9 +23,9 @@ import java.util.concurrent.CompletionStage; public class BroadcastDocExample { - private final ActorSystem system = ActorSystem.create("PartitionDocExample"); + private final ActorSystem system = ActorSystem.create("BroadcastDocExample"); - void partitionExample() { + void broadcastExample() { // #broadcast @@ -62,27 +62,18 @@ public class BroadcastDocExample { // #broadcast-async RunnableGraph.fromGraph( GraphDSL.create3( - countSink, - minSink, - maxSink, + countSink.async(), + minSink.async(), + maxSink.async(), Tuple3::create, (builder, countS, minS, maxS) -> { final UniformFanOutShape broadcast = builder.add(Broadcast.create(3)); builder.from(builder.add(source)).viaFanOut(broadcast); - builder - .from(broadcast.out(0)) - .via(builder.add(Flow.of(Integer.class).async())) - .to(countS); - builder - .from(broadcast.out(1)) - .via(builder.add(Flow.of(Integer.class).async())) - .to(minS); - builder - .from(broadcast.out(2)) - .via(builder.add(Flow.of(Integer.class).async())) - .to(maxS); + builder.from(broadcast.out(0)).to(countS); + builder.from(broadcast.out(1)).to(minS); + builder.from(broadcast.out(2)).to(maxS); return ClosedShape.getInstance(); })); // #broadcast-async diff --git a/akka-docs/src/test/scala/docs/stream/operators/BroadcastDocExample.scala b/akka-docs/src/test/scala/docs/stream/operators/BroadcastDocExample.scala index c86efbd6e3..de9251fe40 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/BroadcastDocExample.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/BroadcastDocExample.scala @@ -10,16 +10,14 @@ import scala.concurrent.Future import akka.actor.ActorSystem import akka.stream.scaladsl.Broadcast -import akka.stream.scaladsl.Keep object BroadcastDocExample { - implicit val system: ActorSystem = ??? + implicit val system: ActorSystem = ActorSystem("BroadcastDocExample") //#broadcast import akka.NotUsed import akka.stream.ClosedShape - import akka.stream.scaladsl.Flow import akka.stream.scaladsl.GraphDSL import akka.stream.scaladsl.RunnableGraph import akka.stream.scaladsl.Sink @@ -28,9 +26,9 @@ object BroadcastDocExample { val source: Source[Int, NotUsed] = Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(100))).take(100) - val countSink: Sink[Int, Future[Int]] = Flow[Int].toMat(Sink.fold(0)((acc, elem) => acc + 1))(Keep.right) - val minSink: Sink[Int, Future[Int]] = Flow[Int].toMat(Sink.fold(0)((acc, elem) => math.min(acc, elem)))(Keep.right) - val maxSink: Sink[Int, Future[Int]] = Flow[Int].toMat(Sink.fold(0)((acc, elem) => math.max(acc, elem)))(Keep.right) + val countSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => acc + 1) + val minSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => math.min(acc, elem)) + val maxSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => math.max(acc, elem)) val (count: Future[Int], min: Future[Int], max: Future[Int]) = RunnableGraph @@ -48,14 +46,14 @@ object BroadcastDocExample { //#broadcast //#broadcast-async - RunnableGraph.fromGraph(GraphDSL.createGraph(countSink, minSink, maxSink)(Tuple3.apply) { + RunnableGraph.fromGraph(GraphDSL.createGraph(countSink.async, minSink.async, maxSink.async)(Tuple3.apply) { implicit builder => (countS, minS, maxS) => import GraphDSL.Implicits._ val broadcast = builder.add(Broadcast[Int](3)) source ~> broadcast - broadcast.out(0) ~> Flow[Int].async ~> countS - broadcast.out(1) ~> Flow[Int].async ~> minS - broadcast.out(2) ~> Flow[Int].async ~> maxS + broadcast.out(0) ~> countS + broadcast.out(1) ~> minS + broadcast.out(2) ~> maxS ClosedShape }) //#broadcast-async