/** * Copyright (C) 2015-2016 Lightbend Inc. */ package docs.stream; import static org.junit.Assert.assertEquals; import akka.NotUsed; import docs.AbstractJavaTest; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import akka.actor.ActorSystem; import akka.stream.*; import akka.stream.javadsl.*; import akka.testkit.JavaTestKit; public class FlowParallelismDocTest extends AbstractJavaTest { static ActorSystem system; static Materializer mat; @BeforeClass public static void setup() { system = ActorSystem.create("FlowParallellismDocTest"); mat = ActorMaterializer.create(system); } @AfterClass public static void tearDown() { JavaTestKit.shutdownActorSystem(system); system = null; mat = null; } static class ScoopOfBatter {} static class HalfCookedPancake {} static class Pancake {} //#pipelining Flow fryingPan1 = Flow.of(ScoopOfBatter.class).map(batter -> new HalfCookedPancake()); Flow fryingPan2 = Flow.of(HalfCookedPancake.class).map(halfCooked -> new Pancake()); //#pipelining @Test public void demonstratePipelining() { //#pipelining // With the two frying pans we can fully cook pancakes Flow pancakeChef = fryingPan1.async().via(fryingPan2.async()); //#pipelining } @Test public void demonstrateParallelism() { //#parallelism Flow fryingPan = Flow.of(ScoopOfBatter.class).map(batter -> new Pancake()); Flow pancakeChef = Flow.fromGraph(GraphDSL.create(b -> { final UniformFanInShape mergePancakes = b.add(Merge.create(2)); final UniformFanOutShape dispatchBatter = b.add(Balance.create(2)); // Using two frying pans in parallel, both fully cooking a pancake from the batter. // We always put the next scoop of batter to the first frying pan that becomes available. b.from(dispatchBatter.out(0)).via(b.add(fryingPan.async())).toInlet(mergePancakes.in(0)); // Notice that we used the "fryingPan" flow without importing it via builder.add(). // Flows used this way are auto-imported, which in this case means that the two // uses of "fryingPan" mean actually different stages in the graph. b.from(dispatchBatter.out(1)).via(b.add(fryingPan.async())).toInlet(mergePancakes.in(1)); return FlowShape.of(dispatchBatter.in(), mergePancakes.out()); })); //#parallelism } @Test public void parallelPipeline() { //#parallel-pipeline Flow pancakeChef = Flow.fromGraph(GraphDSL.create(b -> { final UniformFanInShape mergePancakes = b.add(Merge.create(2)); final UniformFanOutShape dispatchBatter = b.add(Balance.create(2)); // Using two pipelines, having two frying pans each, in total using // four frying pans b.from(dispatchBatter.out(0)) .via(b.add(fryingPan1.async())) .via(b.add(fryingPan2.async())) .toInlet(mergePancakes.in(0)); b.from(dispatchBatter.out(1)) .via(b.add(fryingPan1.async())) .via(b.add(fryingPan2.async())) .toInlet(mergePancakes.in(1)); return FlowShape.of(dispatchBatter.in(), mergePancakes.out()); })); //#parallel-pipeline } @Test public void pipelinedParallel() { //#pipelined-parallel Flow pancakeChefs1 = Flow.fromGraph(GraphDSL.create(b -> { final UniformFanInShape mergeHalfCooked = b.add(Merge.create(2)); final UniformFanOutShape dispatchBatter = b.add(Balance.create(2)); // Two chefs work with one frying pan for each, half-frying the pancakes then putting // them into a common pool b.from(dispatchBatter.out(0)).via(b.add(fryingPan1.async())).toInlet(mergeHalfCooked.in(0)); b.from(dispatchBatter.out(1)).via(b.add(fryingPan1.async())).toInlet(mergeHalfCooked.in(1)); return FlowShape.of(dispatchBatter.in(), mergeHalfCooked.out()); })); Flow pancakeChefs2 = Flow.fromGraph(GraphDSL.create(b -> { final UniformFanInShape mergePancakes = b.add(Merge.create(2)); final UniformFanOutShape dispatchHalfCooked = b.add(Balance.create(2)); // Two chefs work with one frying pan for each, finishing the pancakes then putting // them into a common pool b.from(dispatchHalfCooked.out(0)).via(b.add(fryingPan2.async())).toInlet(mergePancakes.in(0)); b.from(dispatchHalfCooked.out(1)).via(b.add(fryingPan2.async())).toInlet(mergePancakes.in(1)); return FlowShape.of(dispatchHalfCooked.in(), mergePancakes.out()); })); Flow kitchen = pancakeChefs1.via(pancakeChefs2); //#pipelined-parallel } }