From 5e5bedb956e2d7b284d65787ed773046025ef474 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andre=CC=81n?= Date: Wed, 16 Mar 2016 16:04:22 +0100 Subject: [PATCH] =doc #20051 Parallellism docs updated for async and fusing --- .../docs/stream/FlowParallelismDocTest.java | 23 ++++++++++--------- .../javadsl/cookbook/RecipeWorkerPool.java | 2 +- akka-docs/rst/java/stream/stream-cookbook.rst | 2 ++ .../rst/java/stream/stream-parallelism.rst | 14 +++++++---- .../docs/stream/FlowParallelismDocSpec.scala | 18 +++++++-------- .../stream/cookbook/RecipeWorkerPool.scala | 2 +- .../rst/scala/stream/stream-cookbook.rst | 2 ++ .../rst/scala/stream/stream-parallelism.rst | 14 +++++++---- 8 files changed, 47 insertions(+), 30 deletions(-) diff --git a/akka-docs/rst/java/code/docs/stream/FlowParallelismDocTest.java b/akka-docs/rst/java/code/docs/stream/FlowParallelismDocTest.java index f5bc342349..87b93d8781 100644 --- a/akka-docs/rst/java/code/docs/stream/FlowParallelismDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/FlowParallelismDocTest.java @@ -51,7 +51,8 @@ public class FlowParallelismDocTest extends AbstractJavaTest { //#pipelining // With the two frying pans we can fully cook pancakes - Flow pancakeChef = fryingPan1.via(fryingPan2); + Flow pancakeChef = + fryingPan1.async().via(fryingPan2.async()); //#pipelining } @@ -70,11 +71,11 @@ public class FlowParallelismDocTest extends AbstractJavaTest { // Using two frying pans in parallel, both fully cooking a pancake from the batter. // We always put the next scoop of batter to the first frying pan that becomes available. - b.from(dispatchBatter.out(0)).via(b.add(fryingPan)).toInlet(mergePancakes.in(0)); + b.from(dispatchBatter.out(0)).via(b.add(fryingPan.async())).toInlet(mergePancakes.in(0)); // Notice that we used the "fryingPan" flow without importing it via builder.add(). // Flows used this way are auto-imported, which in this case means that the two // uses of "fryingPan" mean actually different stages in the graph. - b.from(dispatchBatter.out(1)).via(b.add(fryingPan)).toInlet(mergePancakes.in(1)); + b.from(dispatchBatter.out(1)).via(b.add(fryingPan.async())).toInlet(mergePancakes.in(1)); return FlowShape.of(dispatchBatter.in(), mergePancakes.out()); })); @@ -94,13 +95,13 @@ public class FlowParallelismDocTest extends AbstractJavaTest { // Using two pipelines, having two frying pans each, in total using // four frying pans b.from(dispatchBatter.out(0)) - .via(b.add(fryingPan1)) - .via(b.add(fryingPan2)) + .via(b.add(fryingPan1.async())) + .via(b.add(fryingPan2.async())) .toInlet(mergePancakes.in(0)); b.from(dispatchBatter.out(1)) - .via(b.add(fryingPan1)) - .via(b.add(fryingPan2)) + .via(b.add(fryingPan1.async())) + .via(b.add(fryingPan2.async())) .toInlet(mergePancakes.in(1)); return FlowShape.of(dispatchBatter.in(), mergePancakes.out()); @@ -120,8 +121,8 @@ public class FlowParallelismDocTest extends AbstractJavaTest { // Two chefs work with one frying pan for each, half-frying the pancakes then putting // them into a common pool - b.from(dispatchBatter.out(0)).via(b.add(fryingPan1)).toInlet(mergeHalfCooked.in(0)); - b.from(dispatchBatter.out(1)).via(b.add(fryingPan1)).toInlet(mergeHalfCooked.in(1)); + b.from(dispatchBatter.out(0)).via(b.add(fryingPan1.async())).toInlet(mergeHalfCooked.in(0)); + b.from(dispatchBatter.out(1)).via(b.add(fryingPan1.async())).toInlet(mergeHalfCooked.in(1)); return FlowShape.of(dispatchBatter.in(), mergeHalfCooked.out()); })); @@ -135,8 +136,8 @@ public class FlowParallelismDocTest extends AbstractJavaTest { // Two chefs work with one frying pan for each, finishing the pancakes then putting // them into a common pool - b.from(dispatchHalfCooked.out(0)).via(b.add(fryingPan2)).toInlet(mergePancakes.in(0)); - b.from(dispatchHalfCooked.out(1)).via(b.add(fryingPan2)).toInlet(mergePancakes.in(1)); + b.from(dispatchHalfCooked.out(0)).via(b.add(fryingPan2.async())).toInlet(mergePancakes.in(0)); + b.from(dispatchHalfCooked.out(1)).via(b.add(fryingPan2.async())).toInlet(mergePancakes.in(1)); return FlowShape.of(dispatchHalfCooked.in(), mergePancakes.out()); })); diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeWorkerPool.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeWorkerPool.java index 162cee2417..cdf463b449 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeWorkerPool.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeWorkerPool.java @@ -50,7 +50,7 @@ public class RecipeWorkerPool extends RecipeTest { b.add(Merge.create(workerCount)); for (int i = 0; i < workerCount; i++) { - b.from(balance.out(i)).via(b.add(worker)).toInlet(merge.in(i)); + b.from(balance.out(i)).via(b.add(worker.async())).toInlet(merge.in(i)); } return FlowShape.of(balance.in(), merge.out()); diff --git a/akka-docs/rst/java/stream/stream-cookbook.rst b/akka-docs/rst/java/stream/stream-cookbook.rst index bdd138d62e..c8e0a78780 100644 --- a/akka-docs/rst/java/stream/stream-cookbook.rst +++ b/akka-docs/rst/java/stream/stream-cookbook.rst @@ -204,6 +204,8 @@ The graph consists of a ``Balance`` node which is a special fan-out operation th downstream consumers. In a ``for`` loop we wire all of our desired workers as outputs of this balancer element, then we wire the outputs of these workers to a ``Merge`` element that will collect the results from the workers. +To make the worker stages run in parallel we mark them as asynchronous with `async()`. + .. includecode:: ../code/docs/stream/javadsl/cookbook/RecipeWorkerPool.java#worker-pool .. includecode:: ../code/docs/stream/javadsl/cookbook/RecipeWorkerPool.java#worker-pool2 diff --git a/akka-docs/rst/java/stream/stream-parallelism.rst b/akka-docs/rst/java/stream/stream-parallelism.rst index f8a4b07647..c505d77e30 100644 --- a/akka-docs/rst/java/stream/stream-parallelism.rst +++ b/akka-docs/rst/java/stream/stream-parallelism.rst @@ -4,8 +4,13 @@ Pipelining and Parallelism ########################## -Akka Streams processing stages (be it simple operators on Flows and Sources or graph junctions) are executed -concurrently by default. This is realized by mapping each of the processing stages to a dedicated actor internally. +Akka Streams processing stages (be it simple operators on Flows and Sources or graph junctions) are "fused" together +and executed sequentially by default. This avoids the overhead of events crossing asynchronous boundaries but +limits the flow to execute at most one stage at any given time. + +In many cases it is useful to be able to concurrently execute the stages of a flow, this is done by explicitly marking +them as asynchronous using the ``async()`` method. Each processing stage marked as asynchronous will run in a +dedicated actor internally, while all stages not marked asynchronous will run in one single actor. We will illustrate through the example of pancake cooking how streams can be used for various processing patterns, exploiting the available parallelism on modern computers. The setting is the following: both Patrik and Roland @@ -40,8 +45,9 @@ be able to operate at full throughput because they will wait on a previous or su pancake example frying the second half of the pancake is usually faster than frying the first half, ``fryingPan2`` will not be able to operate at full capacity [#]_. -Stream processing stages have internal buffers to make communication between them more efficient. For more details -about the behavior of these and how to add additional buffers refer to :ref:`stream-rate-scala`. +note:: + Asynchronous stream processing stages have internal buffers to make communication between them more efficient. + For more details about the behavior of these and how to add additional buffers refer to :ref:`stream-rate-scala`. Parallel processing ------------------- diff --git a/akka-docs/rst/scala/code/docs/stream/FlowParallelismDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/FlowParallelismDocSpec.scala index 1af724e372..2c4e6daec2 100644 --- a/akka-docs/rst/scala/code/docs/stream/FlowParallelismDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/FlowParallelismDocSpec.scala @@ -30,7 +30,7 @@ class FlowParallelismDocSpec extends AkkaSpec { // With the two frying pans we can fully cook pancakes val pancakeChef: Flow[ScoopOfBatter, Pancake, NotUsed] = - Flow[ScoopOfBatter].via(fryingPan1).via(fryingPan2) + Flow[ScoopOfBatter].via(fryingPan1.async).via(fryingPan2.async) //#pipelining } @@ -45,11 +45,11 @@ class FlowParallelismDocSpec extends AkkaSpec { // Using two frying pans in parallel, both fully cooking a pancake from the batter. // We always put the next scoop of batter to the first frying pan that becomes available. - dispatchBatter.out(0) ~> fryingPan ~> mergePancakes.in(0) + dispatchBatter.out(0) ~> fryingPan.async ~> mergePancakes.in(0) // Notice that we used the "fryingPan" flow without importing it via builder.add(). // Flows used this way are auto-imported, which in this case means that the two // uses of "fryingPan" mean actually different stages in the graph. - dispatchBatter.out(1) ~> fryingPan ~> mergePancakes.in(1) + dispatchBatter.out(1) ~> fryingPan.async ~> mergePancakes.in(1) FlowShape(dispatchBatter.in, mergePancakes.out) }) @@ -67,8 +67,8 @@ class FlowParallelismDocSpec extends AkkaSpec { // Using two pipelines, having two frying pans each, in total using // four frying pans - dispatchBatter.out(0) ~> fryingPan1 ~> fryingPan2 ~> mergePancakes.in(0) - dispatchBatter.out(1) ~> fryingPan1 ~> fryingPan2 ~> mergePancakes.in(1) + dispatchBatter.out(0) ~> fryingPan1.async ~> fryingPan2.async ~> mergePancakes.in(0) + dispatchBatter.out(1) ~> fryingPan1.async ~> fryingPan2.async ~> mergePancakes.in(1) FlowShape(dispatchBatter.in, mergePancakes.out) }) @@ -84,8 +84,8 @@ class FlowParallelismDocSpec extends AkkaSpec { // Two chefs work with one frying pan for each, half-frying the pancakes then putting // them into a common pool - dispatchBatter.out(0) ~> fryingPan1 ~> mergeHalfPancakes.in(0) - dispatchBatter.out(1) ~> fryingPan1 ~> mergeHalfPancakes.in(1) + dispatchBatter.out(0) ~> fryingPan1.async ~> mergeHalfPancakes.in(0) + dispatchBatter.out(1) ~> fryingPan1.async ~> mergeHalfPancakes.in(1) FlowShape(dispatchBatter.in, mergeHalfPancakes.out) }) @@ -97,8 +97,8 @@ class FlowParallelismDocSpec extends AkkaSpec { // Two chefs work with one frying pan for each, finishing the pancakes then putting // them into a common pool - dispatchHalfPancakes.out(0) ~> fryingPan2 ~> mergePancakes.in(0) - dispatchHalfPancakes.out(1) ~> fryingPan2 ~> mergePancakes.in(1) + dispatchHalfPancakes.out(0) ~> fryingPan2.async ~> mergePancakes.in(0) + dispatchHalfPancakes.out(1) ~> fryingPan2.async ~> mergePancakes.in(1) FlowShape(dispatchHalfPancakes.in, mergePancakes.out) }) diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala index afaae9a438..4ccb3d2f1a 100644 --- a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala +++ b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala @@ -29,7 +29,7 @@ class RecipeWorkerPool extends RecipeSpec { for (_ <- 1 to workerCount) { // for each worker, add an edge from the balancer to the worker, then wire // it to the merge element - balancer ~> worker ~> merge + balancer ~> worker.async ~> merge } FlowShape(balancer.in, merge.out) diff --git a/akka-docs/rst/scala/stream/stream-cookbook.rst b/akka-docs/rst/scala/stream/stream-cookbook.rst index 30abe02653..2d2970912b 100644 --- a/akka-docs/rst/scala/stream/stream-cookbook.rst +++ b/akka-docs/rst/scala/stream/stream-cookbook.rst @@ -201,6 +201,8 @@ The graph consists of a ``Balance`` node which is a special fan-out operation th downstream consumers. In a ``for`` loop we wire all of our desired workers as outputs of this balancer element, then we wire the outputs of these workers to a ``Merge`` element that will collect the results from the workers. +To make the worker stages run in parallel we mark them as asynchronous with `async`. + .. includecode:: ../code/docs/stream/cookbook/RecipeWorkerPool.scala#worker-pool Working with rate diff --git a/akka-docs/rst/scala/stream/stream-parallelism.rst b/akka-docs/rst/scala/stream/stream-parallelism.rst index c48de591f7..57d46533b0 100644 --- a/akka-docs/rst/scala/stream/stream-parallelism.rst +++ b/akka-docs/rst/scala/stream/stream-parallelism.rst @@ -4,8 +4,13 @@ Pipelining and Parallelism ########################## -Akka Streams processing stages (be it simple operators on Flows and Sources or graph junctions) are executed -concurrently by default. This is realized by mapping each of the processing stages to a dedicated actor internally. +Akka Streams processing stages (be it simple operators on Flows and Sources or graph junctions) are "fused" together +and executed sequentially by default. This avoids the overhead of events crossing asynchronous boundaries but +limits the flow to execute at most one stage at any given time. + +In many cases it is useful to be able to concurrently execute the stages of a flow, this is done by explicitly marking +them as asynchronous using the ``async`` method. Each processing stage marked as asynchronous will run in a +dedicated actor internally, while all stages not marked asynchronous will run in one single actor. We will illustrate through the example of pancake cooking how streams can be used for various processing patterns, exploiting the available parallelism on modern computers. The setting is the following: both Patrik and Roland @@ -40,8 +45,9 @@ be able to operate at full throughput because they will wait on a previous or su pancake example frying the second half of the pancake is usually faster than frying the first half, ``fryingPan2`` will not be able to operate at full capacity [#]_. -Stream processing stages have internal buffers to make communication between them more efficient. For more details -about the behavior of these and how to add additional buffers refer to :ref:`stream-rate-scala`. +.. note:: + Asynchronous stream processing stages have internal buffers to make communication between them more efficient. + For more details about the behavior of these and how to add additional buffers refer to :ref:`stream-rate-scala`. Parallel processing -------------------