diff --git a/akka-docs/src/main/paradox/images/stream-substream-flatMapConcat1.png b/akka-docs/src/main/paradox/images/stream-substream-flatMapConcat1.png new file mode 100644 index 0000000000..138136c5d3 Binary files /dev/null and b/akka-docs/src/main/paradox/images/stream-substream-flatMapConcat1.png differ diff --git a/akka-docs/src/main/paradox/images/stream-substream-flatMapConcat2.png b/akka-docs/src/main/paradox/images/stream-substream-flatMapConcat2.png new file mode 100644 index 0000000000..85b8db4fb4 Binary files /dev/null and b/akka-docs/src/main/paradox/images/stream-substream-flatMapConcat2.png differ diff --git a/akka-docs/src/main/paradox/images/stream-substream-flatMapMerge.png b/akka-docs/src/main/paradox/images/stream-substream-flatMapMerge.png new file mode 100644 index 0000000000..3909dbddd2 Binary files /dev/null and b/akka-docs/src/main/paradox/images/stream-substream-flatMapMerge.png differ diff --git a/akka-docs/src/main/paradox/images/stream-substream-groupBy1.png b/akka-docs/src/main/paradox/images/stream-substream-groupBy1.png new file mode 100644 index 0000000000..de8765b283 Binary files /dev/null and b/akka-docs/src/main/paradox/images/stream-substream-groupBy1.png differ diff --git a/akka-docs/src/main/paradox/images/stream-substream-groupBy2.png b/akka-docs/src/main/paradox/images/stream-substream-groupBy2.png new file mode 100644 index 0000000000..05c88faab2 Binary files /dev/null and b/akka-docs/src/main/paradox/images/stream-substream-groupBy2.png differ diff --git a/akka-docs/src/main/paradox/images/stream-substream-groupBy3.png b/akka-docs/src/main/paradox/images/stream-substream-groupBy3.png new file mode 100644 index 0000000000..0a2d8be5a6 Binary files /dev/null and b/akka-docs/src/main/paradox/images/stream-substream-groupBy3.png differ diff --git a/akka-docs/src/main/paradox/images/stream-substream-groupBy4.png b/akka-docs/src/main/paradox/images/stream-substream-groupBy4.png new file mode 100644 index 0000000000..89d752bbe7 Binary files /dev/null and b/akka-docs/src/main/paradox/images/stream-substream-groupBy4.png differ diff --git a/akka-docs/src/main/paradox/images/stream-substream-splitWhen-splitAfter.png b/akka-docs/src/main/paradox/images/stream-substream-splitWhen-splitAfter.png new file mode 100644 index 0000000000..815fec4368 Binary files /dev/null and b/akka-docs/src/main/paradox/images/stream-substream-splitWhen-splitAfter.png differ diff --git a/akka-docs/src/main/paradox/stream/index.md b/akka-docs/src/main/paradox/stream/index.md index f1fa4daddd..93be480114 100644 --- a/akka-docs/src/main/paradox/stream/index.md +++ b/akka-docs/src/main/paradox/stream/index.md @@ -19,7 +19,8 @@ * [stream-parallelism](stream-parallelism.md) * [stream-testkit](stream-testkit.md) * [stages-overview](stages-overview.md) +* [stream-substream](stream-substream.md) * [stream-cookbook](stream-cookbook.md) * [../general/stream/stream-configuration](../general/stream/stream-configuration.md) -@@@ \ No newline at end of file +@@@ diff --git a/akka-docs/src/main/paradox/stream/stages-overview.md b/akka-docs/src/main/paradox/stream/stages-overview.md index b320f9a838..c4917a8fb3 100644 --- a/akka-docs/src/main/paradox/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/stream/stages-overview.md @@ -1375,9 +1375,12 @@ the flow with a `BufferOverflowException`. ## Nesting and flattening stages + These stages either take a stream and turn it into a stream of streams (nesting) or they take a stream that contains nested streams and turn them into a stream of elements instead (flattening). +See the [Substreams](stream-substream.md) page for more detail and code samples. + --------------------------------------------------------------- ### prefixAndTail diff --git a/akka-docs/src/main/paradox/stream/stream-substream.md b/akka-docs/src/main/paradox/stream/stream-substream.md new file mode 100644 index 0000000000..fd84bb467c --- /dev/null +++ b/akka-docs/src/main/paradox/stream/stream-substream.md @@ -0,0 +1,145 @@ +# Substreams + +Substreams are represented as `SubSource` or `SubFlow` instances, on which you can multiplex a single `Source` or `Flow` +into a stream of streams. + +SubFlows cannot contribute to the super-flow’s materialized value since they are materialized later, +during the runtime of the flow graph processing. + +Stages that create substreams are listed on @ref:[Nesting and flattening stages](stages-overview.md#nesting-and-flattening-stages) + +## Nesting stages + +### groupBy + +A typical operation that generates substreams is `groupBy`. + +Scala +: @@snip [SubstreamDocSpec.scala]($code$/scala/docs/stream/SubstreamDocSpec.scala) { #groupBy1 } + +Java +: @@snip [SubstreamDocTest.java]($code$/java/jdocs/stream/SubstreamDocTest.java) { #groupBy1 } + +![stream-substream-groupBy1.png](../../images/stream-substream-groupBy1.png) + +This operation splits the incoming stream into separate output +streams, one for each element key. The key is computed for each element +using the given function, which is `f` in the above diagram. When a new key is encountered for the first time +a new substream is opened and subsequently fed with all elements belonging to that key. + +If you add a `Sink` or `Flow` right after the `groupBy` stage, +all transformations are applied to all encountered substreams in the same fashion. +So, if you add the following `Sink`, that is added to each of the substreams as in the below diagram. + +Scala +: @@snip [SubstreamDocSpec.scala]($code$/scala/docs/stream/SubstreamDocSpec.scala) { #groupBy2 } + +Java +: @@snip [SubstreamDocTest.java]($code$/java/jdocs/stream/SubstreamDocTest.java) { #groupBy2 } + +![stream-substream-groupBy2.png](../../images/stream-substream-groupBy2.png) + +Also substreams, more precisely, `SubFlow` and `SubSource` have methods that allow you to +merge or concat substreams into the master stream again. + +The `mergeSubstreams` method merges an unbounded number of substreams back to the master stream. + +Scala +: @@snip [SubstreamDocSpec.scala]($code$/scala/docs/stream/SubstreamDocSpec.scala) { #groupBy3 } + +Java +: @@snip [SubstreamDocTest.java]($code$/java/jdocs/stream/SubstreamDocTest.java) { #groupBy3 } + +![stream-substream-groupBy3.png](../../images/stream-substream-groupBy3.png) + +You can limit the number of active substreams running and being merged at a time, +with either the `mergeSubstreamsWithParallelism` or `concatSubstreams` method. + +Scala +: @@snip [SubstreamDocSpec.scala]($code$/scala/docs/stream/SubstreamDocSpec.scala) { #groupBy4 } + +Java +: @@snip [SubstreamDocTest.java]($code$/java/jdocs/stream/SubstreamDocTest.java) { #groupBy4 } + +However, since the number of running (i.e. not yet completed) substreams is capped, +be careful so that these methods do not cause deadlocks with back pressure like in the below diagram. + +Element one and two leads to two created substreams, but since the number of substreams are capped to 2 +when element 3 comes in it cannot lead to creation of a new substream until one of the previous two are completed +and this leads to the stream being deadlocked. + +![stream-substream-groupBy4.png](../../images/stream-substream-groupBy4.png) + +### splitWhen and splitAfter + +`splitWhen` and `splitAfter` are two other operations which generate substreams. + +The difference from `groupBy` is that, if the predicate for `splitWhen` and `splitAfter` returns true, +a new substream is generated, and the succeeding elements after split will flow into the new substream. + +`splitWhen` flows the element on which the predicate returned true to a new substream, + whereas `splitAfter` flows the next element to the new substream after the element on which predicate returned true. + +Scala +: @@snip [SubstreamDocSpec.scala]($code$/scala/docs/stream/SubstreamDocSpec.scala) { #splitWhenAfter } + +Java +: @@snip [SubstreamDocTest.java]($code$/java/jdocs/stream/SubstreamDocTest.java) { #splitWhenAfter } + +These are useful when you scanned over something and you don't need to care about anything behind it. +A typical example is counting the number of characters for each line like below. + +Scala +: @@snip [SubstreamDocSpec.scala]($code$/scala/docs/stream/SubstreamDocSpec.scala) { #wordCount } + +Java +: @@snip [SubstreamDocTest.java]($code$/java/jdocs/stream/SubstreamDocTest.java) { #wordCount } + +This prints out the following output. + +``` +23 +16 +26 +``` + +![stream-substream-splitWhen-splitAfter.png](../../images/stream-substream-splitWhen-splitAfter.png) + +## Flattening stages + +### flatMapConcat + +`flatMapConcat` and `flatMapMerge` are substream operations different from `groupBy` and `splitWhen/After`. + +`flatMapConcat` takes a function, which is `f` in the following diagram. +The function `f` of `flatMapConcat` transforms each input element into a `Source` that is then flattened +into the output stream by concatenation. + +Scala +: @@snip [SubstreamDocSpec.scala]($code$/scala/docs/stream/SubstreamDocSpec.scala) { #flatMapConcat } + +Java +: @@snip [SubstreamDocTest.java]($code$/java/jdocs/stream/SubstreamDocTest.java) { #flatMapConcat } + +![stream-substream-flatMapConcat1.png](../../images/stream-substream-flatMapConcat1.png) + +Like the `concat` operation on `Flow`, it fully consumes one `Source` after the other. +So, there is only one substream actively running at a given time. + +Then once the active substream is fully consumed, the next substream can start running. +Elements from all the substreams are concatnated to the sink. + +![stream-substream-flatMapConcat2.png](../../images/stream-substream-flatMapConcat2.png) + +### flatMapMerge + +`flatMapMerge` is similar to `flatMapConcat`, but it doesn't wait for one `Source` to be fully consumed. + Instead, up to `breadth` number of streams emit elements at any given time. + +Scala +: @@snip [SubstreamDocSpec.scala]($code$/scala/docs/stream/SubstreamDocSpec.scala) { #flatMapMerge } + +Java +: @@snip [SubstreamDocTest.java]($code$/java/jdocs/stream/SubstreamDocTest.java) { #flatMapMerge } + +![stream-substream-flatMapMerge.png](../../images/stream-substream-flatMapMerge.png) diff --git a/akka-docs/src/test/java/jdocs/stream/SubstreamDocTest.java b/akka-docs/src/test/java/jdocs/stream/SubstreamDocTest.java new file mode 100644 index 0000000000..7e20e7ea16 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/SubstreamDocTest.java @@ -0,0 +1,115 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ +package jdocs.stream; + +import akka.actor.ActorSystem; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.testkit.javadsl.TestKit; +import jdocs.AbstractJavaTest; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class SubstreamDocTest extends AbstractJavaTest { + + static ActorSystem system; + static Materializer mat; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("FlowDocTest"); + mat = ActorMaterializer.create(system); + } + + @AfterClass + public static void tearDown() { + TestKit.shutdownActorSystem(system); + system = null; + mat = null; + } + + @Test + public void demonstrateGroupBy() throws Exception { + //#groupBy1 + Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .groupBy(3, elem -> elem % 3); + //#groupBy1 + + //#groupBy2 + Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .groupBy(3, elem -> elem % 3) + .to(Sink.ignore()) + .run(mat); + //#groupBy2 + + //#groupBy3 + Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .groupBy(3, elem -> elem % 3) + .mergeSubstreams() + .runWith(Sink.ignore(), mat); + //#groupBy3 + + //#groupBy4 + Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .groupBy(3, elem -> elem % 3) + .mergeSubstreamsWithParallelism(2) + .runWith(Sink.ignore(), mat); + //concatSubstreams is equivalent to mergeSubstreamsWithParallelism(1) + Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .groupBy(3, elem -> elem % 3) + .concatSubstreams() + .runWith(Sink.ignore(), mat); + //#groupBy4 + } + + @Test + public void demonstrateSplitWhenAfter() throws Exception { + //#splitWhenAfter + Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .splitWhen(elem -> elem == 3); + + Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .splitAfter(elem -> elem == 3); + //#splitWhenAfter + + //#wordCount + String text = + "This is the first line.\n" + + "The second line.\n" + + "There is also the 3rd line\n"; + + Source.from(Arrays.asList(text.split(""))) + .map(x -> x.charAt(0)) + .splitAfter(x -> x == '\n') + .filter(x -> x != '\n') + .map(x -> 1) + .reduce((x,y) -> x + y) + .to(Sink.foreach(x -> System.out.println(x))) + .run(mat); + //#wordCount + Thread.sleep(1000); + } + + @Test + public void demonstrateflatMapConcatMerge() throws Exception { + //#flatMapConcat + Source.from(Arrays.asList(1, 2)) + .flatMapConcat(i -> Source.from(Arrays.asList(i, i, i))) + .runWith(Sink.ignore(), mat); + //#flatMapConcat + + //#flatMapMerge + Source.from(Arrays.asList(1, 2)) + .flatMapMerge(2, i -> Source.from(Arrays.asList(i, i, i))) + .runWith(Sink.ignore(), mat); + //#flatMapMerge + } +} diff --git a/akka-docs/src/test/scala/docs/stream/SubstreamDocSpec.scala b/akka-docs/src/test/scala/docs/stream/SubstreamDocSpec.scala new file mode 100644 index 0000000000..ec458e640f --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/SubstreamDocSpec.scala @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ +package docs.stream + +import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.{ ActorMaterializer, SubstreamCancelStrategy } +import akka.testkit.AkkaSpec + +class SubstreamDocSpec extends AkkaSpec { + implicit val materializer = ActorMaterializer() + + "generate substreams by groupBy" in { + //#groupBy1 + val source = Source(1 to 10).groupBy(3, _ % 3) + //#groupBy1 + + //#groupBy2 + Source(1 to 10).groupBy(3, _ % 3).to(Sink.ignore).run() + //#groupBy2 + + //#groupBy3 + Source(1 to 10) + .groupBy(3, _ % 3) + .mergeSubstreams + .runWith(Sink.ignore) + //#groupBy3 + + //#groupBy4 + Source(1 to 10) + .groupBy(3, _ % 3) + .mergeSubstreamsWithParallelism(2) + .runWith(Sink.ignore) + + //concatSubstreams is equivalent to mergeSubstreamsWithParallelism(1) + Source(1 to 10) + .groupBy(3, _ % 3) + .concatSubstreams + .runWith(Sink.ignore) + //#groupBy4 + } + + "generate substreams by splitWhen and splitAfter" in { + //#splitWhenAfter + Source(1 to 10).splitWhen(SubstreamCancelStrategy.drain)(_ == 3) + + Source(1 to 10).splitAfter(SubstreamCancelStrategy.drain)(_ == 3) + //#splitWhenAfter + + //#wordCount + val text = + "This is the first line.\n" + + "The second line.\n" + + "There is also the 3rd line\n" + + val charCount = Source(text.toList) + .splitAfter { _ == '\n' } + .filter(_ != '\n') + .map(_ ⇒ 1) + .reduce(_ + _) + .to(Sink.foreach(println)) + .run() + //#wordCount + } + + "generate substreams by flatMapConcat and flatMapMerge" in { + //#flatMapConcat + Source(1 to 2) + .flatMapConcat(i ⇒ Source(List.fill(3)(i))) + .runWith(Sink.ignore) + //#flatMapConcat + + //#flatMapMerge + Source(1 to 2) + .flatMapMerge(2, i ⇒ Source(List.fill(3)(i))) + .runWith(Sink.ignore) + //#flatMapMerge + } +}