/** * Copyright (C) 2014 Typesafe Inc. */ package akka.stream.javadsl; import akka.actor.ActorRef; import akka.japi.*; import akka.stream.StreamTest; import akka.stream.javadsl.japi.Creator; import akka.stream.javadsl.japi.Function; import akka.stream.javadsl.japi.Function2; import akka.stream.javadsl.japi.Procedure; import akka.stream.stage.*; import akka.stream.javadsl.japi.*; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; import org.junit.ClassRule; import org.junit.Test; import org.reactivestreams.Publisher; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import scala.runtime.BoxedUnit; import java.util.*; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; public class FlowGraphTest extends StreamTest { public FlowGraphTest() { super(actorSystemResource); } @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowGraphTest", AkkaSpec.testConf()); public Creator> op() { return new akka.stream.javadsl.japi.Creator>() { @Override public PushPullStage create() throws Exception { return new PushPullStage() { @Override public Directive onPush(T element, Context ctx) { return ctx.push(element); } @Override public Directive onPull(Context ctx) { return ctx.pull(); } }; } }; } @Test public void mustBeAbleToUseMerge() throws Exception { final JavaTestKit probe = new JavaTestKit(system); final Flow f1 = Flow.of(String.class).section(OperationAttributes.name("f1"), new Function, Flow>() { @Override public Flow apply(Flow flow) { return flow.transform(FlowGraphTest.this.op()); } }); final Flow f2 = Flow.of(String.class).section(OperationAttributes.name("f2"), new Function, Flow>() { @Override public Flow apply(Flow flow) { return flow.transform(FlowGraphTest.this.op()); } }); final Flow f3 = Flow.of(String.class).section(OperationAttributes.name("f3"), new Function, Flow>() { @Override public Flow apply(Flow flow) { return flow.transform(FlowGraphTest.this.op()); } }); final Source in1 = Source.from(Arrays.asList("a", "b", "c")); final Source in2 = Source.from(Arrays.asList("d", "e", "f")); final KeyedSink> publisher = Sink.publisher(); final Merge merge = Merge.create(); MaterializedMap m = FlowGraph.builder().addEdge(in1, f1, merge).addEdge(in2, f2, merge) .addEdge(merge, f3, publisher).build().run(materializer); // collecting final Publisher pub = m.get(publisher); final Future> all = Source.from(pub).grouped(100).runWith(Sink.>head(), materializer); final List result = Await.result(all, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); assertEquals(new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet(result)); } @Test public void mustBeAbleToUseZip() { final JavaTestKit probe = new JavaTestKit(system); final Iterable input1 = Arrays.asList("A", "B", "C"); final Iterable input2 = Arrays.asList(1, 2, 3); final Source in1 = Source.from(input1); final Source in2 = Source.from(input2); final Zip2With> zip = Zip.create(); final KeyedSink, Future> out = Sink .foreach(new Procedure>() { @Override public void apply(Pair param) throws Exception { probe.getRef().tell(param, ActorRef.noSender()); } }); FlowGraph.builder().addEdge(in1, zip.left()).addEdge(in2, zip.right()).addEdge(zip.out(), out).run(materializer); List output = Arrays.asList(probe.receiveN(3)); @SuppressWarnings("unchecked") List> expected = Arrays.asList(new Pair("A", 1), new Pair( "B", 2), new Pair("C", 3)); assertEquals(expected, output); } @Test public void mustBeAbleToUseUnzip() { final JavaTestKit probe1 = new JavaTestKit(system); final JavaTestKit probe2 = new JavaTestKit(system); @SuppressWarnings("unchecked") final List> input = Arrays.asList(new Pair("A", 1), new Pair("B", 2), new Pair("C", 3)); final Iterable expected1 = Arrays.asList("A", "B", "C"); final Iterable expected2 = Arrays.asList(1, 2, 3); final Source> in = Source.from(input); final Unzip unzip = Unzip.create(); final KeyedSink> out1 = Sink.foreach(new Procedure() { @Override public void apply(String param) throws Exception { probe1.getRef().tell(param, ActorRef.noSender()); } }); final KeyedSink> out2 = Sink.foreach(new Procedure() { @Override public void apply(Integer param) throws Exception { probe2.getRef().tell(param, ActorRef.noSender()); } }); FlowGraph.builder().addEdge(in, unzip.in()).addEdge(unzip.left(), out1).addEdge(unzip.right(), out2) .run(materializer); List output1 = Arrays.asList(probe1.receiveN(3)); List output2 = Arrays.asList(probe2.receiveN(3)); assertEquals(expected1, output1); assertEquals(expected2, output2); } @Test public void mustBeAbleToUseZipWith() throws Exception { final Source in1 = Source.single(1); final Source in2 = Source.single(10); final Zip2With sumZip = ZipWith.create( new Function2() { @Override public Integer apply(Integer l, Integer r) throws Exception { return l + r; } }); final KeyedSink> out = Sink.head(); MaterializedMap mat = FlowGraph.builder() .addEdge(in1, sumZip.left()) .addEdge(in2, sumZip.right()) .addEdge(sumZip.out(), out) .run(materializer); final Integer result = Await.result(mat.get(out), Duration.create(300, TimeUnit.MILLISECONDS)); assertEquals(11, (int) result); } @Test public void mustBeAbleToUseZip4With() throws Exception { final Source in1 = Source.single(1); final Source in2 = Source.single(10); final Source in3 = Source.single(100); final Source in4 = Source.single(1000); Function, Integer> sum4 = new Function, Integer>() { @Override public Integer apply(ZipWith.Zip4WithInputs inputs) throws Exception { return inputs.t1() + inputs.t2() + inputs.t3() + inputs.t4(); } }; Zip4With sum4Zip = ZipWith.create(sum4); final KeyedSink> out = Sink.head(); MaterializedMap mat = FlowGraph.builder() .addEdge(in1, sum4Zip.input1()) .addEdge(in2, sum4Zip.input2()) .addEdge(in3, sum4Zip.input3()) .addEdge(in4, sum4Zip.input4()) .addEdge(sum4Zip.out(), out) .run(materializer); final Integer result = Await.result(mat.get(out), Duration.create(300, TimeUnit.MILLISECONDS)); assertEquals(1111, (int) result); } }