diff --git a/akka-docs/src/main/paradox/stream/operators/Partition.md b/akka-docs/src/main/paradox/stream/operators/Partition.md index d96d3c13d1..34683e2979 100644 --- a/akka-docs/src/main/paradox/stream/operators/Partition.md +++ b/akka-docs/src/main/paradox/stream/operators/Partition.md @@ -6,11 +6,24 @@ Fan-out the stream to several streams. ## Signature +@apidoc[Partition] + ## Description Fan-out the stream to several streams. Each upstream element is emitted to one downstream consumer according to the partitioner function applied to the element. +## Example + +Here is an example of using `Partition` to split a `Source` of integers to one `Sink` for the even numbers and +another `Sink` for the odd numbers. + +Scala +: @@snip [PartitionDocExample.scala](/akka-docs/src/test/scala/docs/stream/operators/PartitionDocExample.scala) { #partition } + +Java +: @@snip [PartitionDocExample.java](/akka-docs/src/test/java/jdocs/stream/operators/PartitionDocExample.java) { #import #partition } + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/PartitionDocExample.java b/akka-docs/src/test/java/jdocs/stream/operators/PartitionDocExample.java new file mode 100644 index 0000000000..35f5e57416 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/PartitionDocExample.java @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package jdocs.stream.operators; + +import akka.actor.ActorSystem; + +// #import +import akka.NotUsed; +import akka.stream.Attributes; +import akka.stream.ClosedShape; +import akka.stream.UniformFanOutShape; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.GraphDSL; +import akka.stream.javadsl.Partition; +import akka.stream.javadsl.RunnableGraph; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +// #import + +public class PartitionDocExample { + + private final ActorSystem system = ActorSystem.create("PartitionDocExample"); + + void partitionExample() { + + // #partition + + Source source = Source.range(1, 10); + + Sink even = + Flow.of(Integer.class) + .log("even") + .withAttributes(Attributes.createLogLevels(Attributes.logLevelInfo())) + .to(Sink.ignore()); + Sink odd = + Flow.of(Integer.class) + .log("even") + .withAttributes(Attributes.createLogLevels(Attributes.logLevelInfo())) + .to(Sink.ignore()); + + RunnableGraph.fromGraph( + GraphDSL.create( + builder -> { + UniformFanOutShape partition = + builder.add( + Partition.create( + Integer.class, 2, element -> (element % 2 == 0) ? 0 : 1)); + builder.from(builder.add(source)).viaFanOut(partition); + builder.from(partition.out(0)).to(builder.add(even)); + builder.from(partition.out(1)).to(builder.add(odd)); + return ClosedShape.getInstance(); + })) + .run(system); + + // #partition + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/PartitionDocExample.scala b/akka-docs/src/test/scala/docs/stream/operators/PartitionDocExample.scala new file mode 100644 index 0000000000..a9b6d8e60a --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/PartitionDocExample.scala @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package docs.stream.operators + +import akka.actor.ActorSystem + +object PartitionDocExample { + + implicit val system: ActorSystem = ??? + + //#partition + import akka.NotUsed + import akka.stream.Attributes + import akka.stream.Attributes.LogLevels + import akka.stream.ClosedShape + import akka.stream.scaladsl.Flow + import akka.stream.scaladsl.GraphDSL + import akka.stream.scaladsl.Partition + import akka.stream.scaladsl.RunnableGraph + import akka.stream.scaladsl.Sink + import akka.stream.scaladsl.Source + + val source: Source[Int, NotUsed] = Source(1 to 10) + + val even: Sink[Int, NotUsed] = + Flow[Int].log("even").withAttributes(Attributes.logLevels(onElement = LogLevels.Info)).to(Sink.ignore) + val odd: Sink[Int, NotUsed] = + Flow[Int].log("odd").withAttributes(Attributes.logLevels(onElement = LogLevels.Info)).to(Sink.ignore) + + RunnableGraph + .fromGraph(GraphDSL.create() { implicit builder => + import GraphDSL.Implicits._ + val partition = builder.add(Partition[Int](2, element => if (element % 2 == 0) 0 else 1)) + source ~> partition.in + partition.out(0) ~> even + partition.out(0) ~> odd + ClosedShape + }) + .run() + + //#partition +}