* doc: Broadcast Stream operator, #25468 * example intro text
This commit is contained in:
parent
35dbfd7883
commit
6185fbde18
3 changed files with 177 additions and 0 deletions
|
|
@ -4,10 +4,34 @@ Emit each incoming element each of `n` outputs.
|
|||
|
||||
@ref[Fan-out operators](index.md#fan-out-operators)
|
||||
|
||||
## Signature
|
||||
|
||||
@apidoc[akka.stream.*.Broadcast]
|
||||
|
||||
## Description
|
||||
|
||||
Emit each incoming element each of `n` outputs.
|
||||
|
||||
## Example
|
||||
|
||||
Here is an example that is using `Broadcast` to aggregate different values from a `Source` of integers.
|
||||
|
||||
Scala
|
||||
: @@snip [BroadcastDocExample.scala](/akka-docs/src/test/scala/docs/stream/operators/BroadcastDocExample.scala) { #broadcast }
|
||||
|
||||
Java
|
||||
: @@snip [BroadcastDocExample.java](/akka-docs/src/test/java/jdocs/stream/operators/BroadcastDocExample.java) { #import #broadcast }
|
||||
|
||||
Note that asynchronous boundary for the output streams must be added explicitly if it's desired to run them in parallel.
|
||||
|
||||
Scala
|
||||
: @@snip [BroadcastDocExample.scala](/akka-docs/src/test/scala/docs/stream/operators/BroadcastDocExample.scala) { #broadcast-async }
|
||||
|
||||
Java
|
||||
: @@snip [BroadcastDocExample.java](/akka-docs/src/test/java/jdocs/stream/operators/BroadcastDocExample.java) { #broadcast-async }
|
||||
|
||||
|
||||
|
||||
## Reactive Streams semantics
|
||||
|
||||
@@@div { .callout }
|
||||
|
|
|
|||
|
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.stream.operators;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
|
||||
// #import
|
||||
import akka.NotUsed;
|
||||
import akka.japi.tuple.Tuple3;
|
||||
import akka.stream.ClosedShape;
|
||||
import akka.stream.UniformFanOutShape;
|
||||
import akka.stream.javadsl.Broadcast;
|
||||
import akka.stream.javadsl.Flow;
|
||||
import akka.stream.javadsl.GraphDSL;
|
||||
import akka.stream.javadsl.Keep;
|
||||
import akka.stream.javadsl.RunnableGraph;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
// #import
|
||||
|
||||
public class BroadcastDocExample {
|
||||
|
||||
private final ActorSystem system = ActorSystem.create("PartitionDocExample");
|
||||
|
||||
void partitionExample() {
|
||||
|
||||
// #broadcast
|
||||
|
||||
Source<Integer, NotUsed> source = Source.range(1, 10);
|
||||
|
||||
Sink<Integer, CompletionStage<Integer>> countSink =
|
||||
Flow.of(Integer.class).toMat(Sink.fold(0, (acc, elem) -> acc + 1), Keep.right());
|
||||
Sink<Integer, CompletionStage<Integer>> minSink =
|
||||
Flow.of(Integer.class).toMat(Sink.fold(0, Math::min), Keep.right());
|
||||
Sink<Integer, CompletionStage<Integer>> maxSink =
|
||||
Flow.of(Integer.class).toMat(Sink.fold(0, Math::max), Keep.right());
|
||||
|
||||
final Tuple3<CompletionStage<Integer>, CompletionStage<Integer>, CompletionStage<Integer>>
|
||||
result =
|
||||
RunnableGraph.fromGraph(
|
||||
GraphDSL.create3(
|
||||
countSink,
|
||||
minSink,
|
||||
maxSink,
|
||||
Tuple3::create,
|
||||
(builder, countS, minS, maxS) -> {
|
||||
final UniformFanOutShape<Integer, Integer> broadcast =
|
||||
builder.add(Broadcast.create(3));
|
||||
builder.from(builder.add(source)).viaFanOut(broadcast);
|
||||
builder.from(broadcast.out(0)).to(countS);
|
||||
builder.from(broadcast.out(1)).to(minS);
|
||||
builder.from(broadcast.out(2)).to(maxS);
|
||||
return ClosedShape.getInstance();
|
||||
}))
|
||||
.run(system);
|
||||
|
||||
// #broadcast
|
||||
|
||||
// #broadcast-async
|
||||
RunnableGraph.fromGraph(
|
||||
GraphDSL.create3(
|
||||
countSink,
|
||||
minSink,
|
||||
maxSink,
|
||||
Tuple3::create,
|
||||
(builder, countS, minS, maxS) -> {
|
||||
final UniformFanOutShape<Integer, Integer> broadcast =
|
||||
builder.add(Broadcast.create(3));
|
||||
builder.from(builder.add(source)).viaFanOut(broadcast);
|
||||
|
||||
builder
|
||||
.from(broadcast.out(0))
|
||||
.via(builder.add(Flow.of(Integer.class).async()))
|
||||
.to(countS);
|
||||
builder
|
||||
.from(broadcast.out(1))
|
||||
.via(builder.add(Flow.of(Integer.class).async()))
|
||||
.to(minS);
|
||||
builder
|
||||
.from(broadcast.out(2))
|
||||
.via(builder.add(Flow.of(Integer.class).async()))
|
||||
.to(maxS);
|
||||
return ClosedShape.getInstance();
|
||||
}));
|
||||
// #broadcast-async
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.stream.operators
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.scaladsl.Broadcast
|
||||
import akka.stream.scaladsl.Keep
|
||||
|
||||
object BroadcastDocExample {
|
||||
|
||||
implicit val system: ActorSystem = ???
|
||||
|
||||
//#broadcast
|
||||
import akka.NotUsed
|
||||
import akka.stream.ClosedShape
|
||||
import akka.stream.scaladsl.Flow
|
||||
import akka.stream.scaladsl.GraphDSL
|
||||
import akka.stream.scaladsl.RunnableGraph
|
||||
import akka.stream.scaladsl.Sink
|
||||
import akka.stream.scaladsl.Source
|
||||
|
||||
val source: Source[Int, NotUsed] =
|
||||
Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(100))).take(100)
|
||||
|
||||
val countSink: Sink[Int, Future[Int]] = Flow[Int].toMat(Sink.fold(0)((acc, elem) => acc + 1))(Keep.right)
|
||||
val minSink: Sink[Int, Future[Int]] = Flow[Int].toMat(Sink.fold(0)((acc, elem) => math.min(acc, elem)))(Keep.right)
|
||||
val maxSink: Sink[Int, Future[Int]] = Flow[Int].toMat(Sink.fold(0)((acc, elem) => math.max(acc, elem)))(Keep.right)
|
||||
|
||||
val (count: Future[Int], min: Future[Int], max: Future[Int]) =
|
||||
RunnableGraph
|
||||
.fromGraph(GraphDSL.create(countSink, minSink, maxSink)(Tuple3.apply) {
|
||||
implicit builder => (countS, minS, maxS) =>
|
||||
import GraphDSL.Implicits._
|
||||
val broadcast = builder.add(Broadcast[Int](3))
|
||||
source ~> broadcast
|
||||
broadcast.out(0) ~> countS
|
||||
broadcast.out(0) ~> minS
|
||||
broadcast.out(0) ~> maxS
|
||||
ClosedShape
|
||||
})
|
||||
.run()
|
||||
//#broadcast
|
||||
|
||||
//#broadcast-async
|
||||
RunnableGraph.fromGraph(GraphDSL.create(countSink, minSink, maxSink)(Tuple3.apply) {
|
||||
implicit builder => (countS, minS, maxS) =>
|
||||
import GraphDSL.Implicits._
|
||||
val broadcast = builder.add(Broadcast[Int](3))
|
||||
source ~> broadcast
|
||||
broadcast.out(0) ~> Flow[Int].async ~> countS
|
||||
broadcast.out(0) ~> Flow[Int].async ~> minS
|
||||
broadcast.out(0) ~> Flow[Int].async ~> maxS
|
||||
ClosedShape
|
||||
})
|
||||
//#broadcast-async
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue