doc: Partition Streams operator, #25468 (#28090)

* doc: Partition Streams operator, #25468

* example intro text
This commit is contained in:
Patrik Nordwall 2019-11-26 14:11:43 +01:00 committed by Christopher Batey
parent 4d99440e00
commit 38d41e11fb
3 changed files with 116 additions and 0 deletions

View file

@ -6,11 +6,24 @@ Fan-out the stream to several streams.
## Signature
@apidoc[Partition]
## Description
Fan-out the stream to several streams. Each upstream element is emitted to one downstream consumer according to the
partitioner function applied to the element.
## Example
Here is an example of using `Partition` to split a `Source` of integers to one `Sink` for the even numbers and
another `Sink` for the odd numbers.
Scala
: @@snip [PartitionDocExample.scala](/akka-docs/src/test/scala/docs/stream/operators/PartitionDocExample.scala) { #partition }
Java
: @@snip [PartitionDocExample.java](/akka-docs/src/test/java/jdocs/stream/operators/PartitionDocExample.java) { #import #partition }
## Reactive Streams semantics
@@@div { .callout }

View file

@ -0,0 +1,59 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators;
import akka.actor.ActorSystem;
// #import
import akka.NotUsed;
import akka.stream.Attributes;
import akka.stream.ClosedShape;
import akka.stream.UniformFanOutShape;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.GraphDSL;
import akka.stream.javadsl.Partition;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
// #import
public class PartitionDocExample {
private final ActorSystem system = ActorSystem.create("PartitionDocExample");
void partitionExample() {
// #partition
Source<Integer, NotUsed> source = Source.range(1, 10);
Sink<Integer, NotUsed> even =
Flow.of(Integer.class)
.log("even")
.withAttributes(Attributes.createLogLevels(Attributes.logLevelInfo()))
.to(Sink.ignore());
Sink<Integer, NotUsed> odd =
Flow.of(Integer.class)
.log("even")
.withAttributes(Attributes.createLogLevels(Attributes.logLevelInfo()))
.to(Sink.ignore());
RunnableGraph.fromGraph(
GraphDSL.create(
builder -> {
UniformFanOutShape<Integer, Integer> partition =
builder.add(
Partition.create(
Integer.class, 2, element -> (element % 2 == 0) ? 0 : 1));
builder.from(builder.add(source)).viaFanOut(partition);
builder.from(partition.out(0)).to(builder.add(even));
builder.from(partition.out(1)).to(builder.add(odd));
return ClosedShape.getInstance();
}))
.run(system);
// #partition
}
}

View file

@ -0,0 +1,44 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators
import akka.actor.ActorSystem
object PartitionDocExample {
implicit val system: ActorSystem = ???
//#partition
import akka.NotUsed
import akka.stream.Attributes
import akka.stream.Attributes.LogLevels
import akka.stream.ClosedShape
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.GraphDSL
import akka.stream.scaladsl.Partition
import akka.stream.scaladsl.RunnableGraph
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
val source: Source[Int, NotUsed] = Source(1 to 10)
val even: Sink[Int, NotUsed] =
Flow[Int].log("even").withAttributes(Attributes.logLevels(onElement = LogLevels.Info)).to(Sink.ignore)
val odd: Sink[Int, NotUsed] =
Flow[Int].log("odd").withAttributes(Attributes.logLevels(onElement = LogLevels.Info)).to(Sink.ignore)
RunnableGraph
.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partition = builder.add(Partition[Int](2, element => if (element % 2 == 0) 0 else 1))
source ~> partition.in
partition.out(0) ~> even
partition.out(0) ~> odd
ClosedShape
})
.run()
//#partition
}