diff --git a/akka-docs-dev/rst/java/stream-index.rst b/akka-docs-dev/rst/java/stream-index.rst index 99eaf10f37..4eb06d1c7f 100644 --- a/akka-docs-dev/rst/java/stream-index.rst +++ b/akka-docs-dev/rst/java/stream-index.rst @@ -16,6 +16,7 @@ Streams stream-integrations stream-error stream-io + stream-parallelism stream-cookbook ../stream-configuration diff --git a/akka-docs-dev/rst/java/stream-parallelism.rst b/akka-docs-dev/rst/java/stream-parallelism.rst new file mode 100644 index 0000000000..812798895b --- /dev/null +++ b/akka-docs-dev/rst/java/stream-parallelism.rst @@ -0,0 +1,103 @@ +.. _stream-parallelism-java: + +########################## +Pipelining and Parallelism +########################## + +Akka Streams processing stages (be it simple operators on Flows and Sources or graph junctions) are executed +concurrently by default. This is realized by mapping each of the processing stages to a dedicated actor internally. + +We will illustrate through the example of pancake cooking how streams can be used for various processing patterns, +exploiting the available parallelism on modern computers. The setting is the following: both Patrik and Roland +like to make pancakes, but they need to produce sufficient amount in a cooking session to make all of the children +happy. To increase their pancake production throughput they use two frying pans. How they organize their pancake +processing is markedly different. + +Pipelining +---------- + +Roland uses the two frying pans in an asymmetric fashion. The first pan is only used to fry one side of the +pancake then the half-finished pancake is flipped into the second pan for the finishing fry on the other side. +Once the first frying pan becomes available it gets a new scoop of batter. As an effect, most of the time there +are two pancakes being cooked at the same time, one being cooked on its first side and the second being cooked to +completion. +This is how this setup would look like implemented as a stream: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowParallelismDocTest.java#pipelining + +The two ``map`` stages in sequence (encapsulated in the "frying pan" flows) will be executed in a pipelined way, +basically doing the same as Roland with his frying pans: + + 1. A ``ScoopOfBatter`` enters ``fryingPan1`` + 2. ``fryingPan1`` emits a HalfCookedPancake once ``fryingPan2`` becomes available + 3. ``fryingPan2`` takes the HalfCookedPancake + 4. at this point fryingPan1 already takes the next scoop, without waiting for fryingPan2 to finish + +The benefit of pipelining is that it can be applied to any sequence of processing steps that are otherwise not +parallelisable (for example because the result of a processing step depends on all the information from the previous +step). One drawback is that if the processing times of the stages are very different then some of the stages will not +be able to operate at full throughput because they will wait on a previous or subsequent stage most of the time. In the +pancake example frying the second half of the pancake is usually faster than frying the first half, ``fryingPan2`` will +not be able to operate at full capacity [#]_. + +Stream processing stages have internal buffers to make communication between them more efficient. For more details +about the behavior of these and how to add additional buffers refer to :ref:`stream-rate-scala`. + +Parallel processing +------------------- + +Patrik uses the two frying pans symmetrically. He uses both pans to fully fry a pancake on both sides, then puts +the results on a shared plate. Whenever a pan becomes empty, he takes the next scoop from the shared bowl of batter. +In essence he parallelizes the same process over multiple pans. This is how this setup will look like if implemented +using streams: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowParallelismDocTest.java#parallelism + +The benefit of parallelizing is that it is easy to scale. In the pancake example +it is easy to add a third frying pan with Patrik's method, but Roland cannot add a third frying pan, +since that would require a third processing step, which is not practically possible in the case of frying pancakes. + +One drawback of the example code above that it does not preserve the ordering of pancakes. This might be a problem +if children like to track their "own" pancakes. In those cases the ``Balance`` and ``Merge`` stages should be replaced +by strict-round robing balancing and merging stages that put in and take out pancakes in a strict order. + +A more detailed example of creating a worker pool can be found in the cookbook: :ref:`cookbook-balance-scala` + +Combining pipelining and parallel processing +-------------------------------------------- + +The two concurrency patterns that we demonstrated as means to increase throughput are not exclusive. +In fact, it is rather simple to combine the two approaches and streams provide +a nice unifying language to express and compose them. + +First, let's look at how we can parallelize pipelined processing stages. In the case of pancakes this means that we +will employ two chefs, each working using Roland's pipelining method, but we use the two chefs in parallel, just like +Patrik used the two frying pans. This is how it looks like if expressed as streams: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowParallelismDocTest.java#parallel-pipeline + +The above pattern works well if there are many independent jobs that do not depend on the results of each other, but +the jobs themselves need multiple processing steps where each step builds on the result of +the previous one. In our case individual pancakes do not depend on each other, they can be cooked in parallel, on the +other hand it is not possible to fry both sides of the same pancake at the same time, so the two sides have to be fried +in sequence. + +It is also possible to organize parallelized stages into pipelines. This would mean employing four chefs: + + - the first two chefs prepare half-cooked pancakes from batter, in parallel, then putting those on a large enough + flat surface. + - the second two chefs take these and fry their other side in their own pans, then they put the pancakes on a shared + plate. + +This is again straightforward to implement with the streams API: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowParallelismDocTest.java#pipelined-parallel + +This usage pattern is less common but might be usable if a certain step in the pipeline might take wildly different +times to finish different jobs. The reason is that there are more balance-merge steps in this pattern +compared to the parallel pipelines. This pattern rebalances after each step, while the previous pattern only balances +at the entry point of the pipeline. This only matters however if the processing time distribution has a large +deviation. + +.. [#] Roland's reason for this seemingly suboptimal procedure is that he prefers the temperature of the second pan + to be slightly lower than the first in order to achieve a more homogeneous result. \ No newline at end of file diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowParallelismDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowParallelismDocSpec.scala new file mode 100644 index 0000000000..a19e939994 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowParallelismDocSpec.scala @@ -0,0 +1,106 @@ +package docs.stream + +import akka.stream.scaladsl.{ FlowGraph, Merge, Balance, Source, Flow } +import akka.stream.testkit.AkkaSpec + +class FlowParallelismDocSpec extends AkkaSpec { + + import FlowGraph.Implicits._ + + case class ScoopOfBatter() + case class HalfCookedPancake() + case class Pancake() + + //format: OFF + //#pipelining + // Takes a scoop of batter and creates a pancake with one side cooked + val fryingPan1: Flow[ScoopOfBatter, HalfCookedPancake, Unit] = + Flow[ScoopOfBatter].map { batter => HalfCookedPancake() } + + // Finishes a half-cooked pancake + val fryingPan2: Flow[HalfCookedPancake, Pancake, Unit] = + Flow[HalfCookedPancake].map { halfCooked => Pancake() } + //#pipelining + //format: ON + + "Demonstrate pipelining" in { + //#pipelining + + // With the two frying pans we can fully cook pancakes + val pancakeChef: Flow[ScoopOfBatter, Pancake, Unit] = + Flow[ScoopOfBatter].via(fryingPan1).via(fryingPan2) + //#pipelining + } + + "Demonstrate parallel processing" in { + //#parallelism + val fryingPan: Flow[ScoopOfBatter, Pancake, Unit] = + Flow[ScoopOfBatter].map { batter => Pancake() } + + val pancakeChef: Flow[ScoopOfBatter, Pancake, Unit] = Flow() { implicit builder => + val dispatchBatter = builder.add(Balance[ScoopOfBatter](2)) + val mergePancakes = builder.add(Merge[Pancake](2)) + + // Using two frying pans in parallel, both fully cooking a pancake from the batter. + // We always put the next scoop of batter to the first frying pan that becomes available. + dispatchBatter.out(0) ~> fryingPan ~> mergePancakes.in(0) + // Notice that we used the "fryingPan" flow without importing it via builder.add(). + // Flows used this way are auto-imported, which in this case means that the two + // uses of "fryingPan" mean actually different stages in the graph. + dispatchBatter.out(1) ~> fryingPan ~> mergePancakes.in(1) + + (dispatchBatter.in, mergePancakes.out) + } + + //#parallelism + } + + "Demonstrate parallelized pipelines" in { + //#parallel-pipeline + val pancakeChef: Flow[ScoopOfBatter, Pancake, Unit] = Flow() { implicit builder => + + val dispatchBatter = builder.add(Balance[ScoopOfBatter](2)) + val mergePancakes = builder.add(Merge[Pancake](2)) + + // Using two pipelines, having two frying pans each, in total using + // four frying pans + dispatchBatter.out(0) ~> fryingPan1 ~> fryingPan2 ~> mergePancakes.in(0) + dispatchBatter.out(1) ~> fryingPan1 ~> fryingPan2 ~> mergePancakes.in(1) + + (dispatchBatter.in, mergePancakes.out) + } + //#parallel-pipeline + } + + "Demonstrate pipelined parallel processing" in { + //#pipelined-parallel + val pancakeChefs1: Flow[ScoopOfBatter, HalfCookedPancake, Unit] = Flow() { implicit builder => + val dispatchBatter = builder.add(Balance[ScoopOfBatter](2)) + val mergeHalfPancakes = builder.add(Merge[HalfCookedPancake](2)) + + // Two chefs work with one frying pan for each, half-frying the pancakes then putting + // them into a common pool + dispatchBatter.out(0) ~> fryingPan1 ~> mergeHalfPancakes.in(0) + dispatchBatter.out(1) ~> fryingPan1 ~> mergeHalfPancakes.in(1) + + (dispatchBatter.in, mergeHalfPancakes.out) + } + + val pancakeChefs2: Flow[HalfCookedPancake, Pancake, Unit] = Flow() { implicit builder => + val dispatchHalfPancakes = builder.add(Balance[HalfCookedPancake](2)) + val mergePancakes = builder.add(Merge[Pancake](2)) + + // Two chefs work with one frying pan for each, finishing the pancakes then putting + // them into a common pool + dispatchHalfPancakes.out(0) ~> fryingPan2 ~> mergePancakes.in(0) + dispatchHalfPancakes.out(1) ~> fryingPan2 ~> mergePancakes.in(1) + + (dispatchHalfPancakes.in, mergePancakes.out) + } + + val kitchen: Flow[ScoopOfBatter, Pancake, Unit] = pancakeChefs1.via(pancakeChefs2) + //#pipelined-parallel + + } + +} diff --git a/akka-docs-dev/rst/scala/stream-cookbook.rst b/akka-docs-dev/rst/scala/stream-cookbook.rst index 5832ba976b..736d69553a 100644 --- a/akka-docs-dev/rst/scala/stream-cookbook.rst +++ b/akka-docs-dev/rst/scala/stream-cookbook.rst @@ -192,6 +192,7 @@ element. If this function would return a pair of the two argument it would be ex .. includecode:: code/docs/stream/cookbook/RecipeManualTrigger.scala#manually-triggered-stream-zipwith +.. _cookbook-balance-scala: Balancing jobs to a fixed pool of workers ----------------------------------------- diff --git a/akka-docs-dev/rst/scala/stream-index.rst b/akka-docs-dev/rst/scala/stream-index.rst index 098ec75894..be0a7ac549 100644 --- a/akka-docs-dev/rst/scala/stream-index.rst +++ b/akka-docs-dev/rst/scala/stream-index.rst @@ -16,6 +16,7 @@ Streams stream-integrations stream-error stream-io + stream-parallelism stream-cookbook ../stream-configuration diff --git a/akka-docs-dev/rst/scala/stream-parallelism.rst b/akka-docs-dev/rst/scala/stream-parallelism.rst new file mode 100644 index 0000000000..84969dbec3 --- /dev/null +++ b/akka-docs-dev/rst/scala/stream-parallelism.rst @@ -0,0 +1,103 @@ +.. _stream-parallelism-scala: + +########################## +Pipelining and Parallelism +########################## + +Akka Streams processing stages (be it simple operators on Flows and Sources or graph junctions) are executed +concurrently by default. This is realized by mapping each of the processing stages to a dedicated actor internally. + +We will illustrate through the example of pancake cooking how streams can be used for various processing patterns, +exploiting the available parallelism on modern computers. The setting is the following: both Patrik and Roland +like to make pancakes, but they need to produce sufficient amount in a cooking session to make all of the children +happy. To increase their pancake production throughput they use two frying pans. How they organize their pancake +processing is markedly different. + +Pipelining +---------- + +Roland uses the two frying pans in an asymmetric fashion. The first pan is only used to fry one side of the +pancake then the half-finished pancake is flipped into the second pan for the finishing fry on the other side. +Once the first frying pan becomes available it gets a new scoop of batter. As an effect, most of the time there +are two pancakes being cooked at the same time, one being cooked on its first side and the second being cooked to +completion. +This is how this setup would look like implemented as a stream: + +.. includecode:: code/docs/stream/FlowParallelismDocSpec.scala#pipelining + +The two ``map`` stages in sequence (encapsulated in the "frying pan" flows) will be executed in a pipelined way, +basically doing the same as Roland with his frying pans: + + 1. A ``ScoopOfBatter`` enters ``fryingPan1`` + 2. ``fryingPan1`` emits a HalfCookedPancake once ``fryingPan2`` becomes available + 3. ``fryingPan2`` takes the HalfCookedPancake + 4. at this point fryingPan1 already takes the next scoop, without waiting for fryingPan2 to finish + +The benefit of pipelining is that it can be applied to any sequence of processing steps that are otherwise not +parallelisable (for example because the result of a processing step depends on all the information from the previous +step). One drawback is that if the processing times of the stages are very different then some of the stages will not +be able to operate at full throughput because they will wait on a previous or subsequent stage most of the time. In the +pancake example frying the second half of the pancake is usually faster than frying the first half, ``fryingPan2`` will +not be able to operate at full capacity [#]_. + +Stream processing stages have internal buffers to make communication between them more efficient. For more details +about the behavior of these and how to add additional buffers refer to :ref:`stream-rate-scala`. + +Parallel processing +------------------- + +Patrik uses the two frying pans symmetrically. He uses both pans to fully fry a pancake on both sides, then puts +the results on a shared plate. Whenever a pan becomes empty, he takes the next scoop from the shared bowl of batter. +In essence he parallelizes the same process over multiple pans. This is how this setup will look like if implemented +using streams: + +.. includecode:: code/docs/stream/FlowParallelismDocSpec.scala#parallelism + +The benefit of parallelizing is that it is easy to scale. In the pancake example +it is easy to add a third frying pan with Patrik's method, but Roland cannot add a third frying pan, +since that would require a third processing step, which is not practically possible in the case of frying pancakes. + +One drawback of the example code above that it does not preserve the ordering of pancakes. This might be a problem +if children like to track their "own" pancakes. In those cases the ``Balance`` and ``Merge`` stages should be replaced +by strict-round robing balancing and merging stages that put in and take out pancakes in a strict order. + +A more detailed example of creating a worker pool can be found in the cookbook: :ref:`cookbook-balance-scala` + +Combining pipelining and parallel processing +-------------------------------------------- + +The two concurrency patterns that we demonstrated as means to increase throughput are not exclusive. +In fact, it is rather simple to combine the two approaches and streams provide +a nice unifying language to express and compose them. + +First, let's look at how we can parallelize pipelined processing stages. In the case of pancakes this means that we +will employ two chefs, each working using Roland's pipelining method, but we use the two chefs in parallel, just like +Patrik used the two frying pans. This is how it looks like if expressed as streams: + +.. includecode:: code/docs/stream/FlowParallelismDocSpec.scala#parallel-pipeline + +The above pattern works well if there are many independent jobs that do not depend on the results of each other, but +the jobs themselves need multiple processing steps where each step builds on the result of +the previous one. In our case individual pancakes do not depend on each other, they can be cooked in parallel, on the +other hand it is not possible to fry both sides of the same pancake at the same time, so the two sides have to be fried +in sequence. + +It is also possible to organize parallelized stages into pipelines. This would mean employing four chefs: + + - the first two chefs prepare half-cooked pancakes from batter, in parallel, then putting those on a large enough + flat surface. + - the second two chefs take these and fry their other side in their own pans, then they put the pancakes on a shared + plate. + +This is again straightforward to implement with the streams API: + +.. includecode:: code/docs/stream/FlowParallelismDocSpec.scala#pipelined-parallel + +This usage pattern is less common but might be usable if a certain step in the pipeline might take wildly different +times to finish different jobs. The reason is that there are more balance-merge steps in this pattern +compared to the parallel pipelines. This pattern rebalances after each step, while the previous pattern only balances +at the entry point of the pipeline. This only matters however if the processing time distribution has a large +deviation. + +.. [#] Roland's reason for this seemingly suboptimal procedure is that he prefers the temperature of the second pan + to be slightly lower than the first in order to achieve a more homogeneous result. \ No newline at end of file