diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java new file mode 100644 index 0000000000..90a3a05991 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java @@ -0,0 +1,154 @@ +package akka.stream.javadsl; + +import akka.actor.ActorRef; +import akka.japi.Pair; +import akka.stream.StreamTest; +import akka.stream.stage.*; +import akka.stream.javadsl.japi.*; +import akka.stream.testkit.AkkaSpec; +import akka.testkit.JavaTestKit; + +import org.junit.ClassRule; +import org.junit.Test; +import org.reactivestreams.Publisher; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; +import scala.runtime.BoxedUnit; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class FlowGraphTest extends StreamTest { + public FlowGraphTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowGraphTest", + AkkaSpec.testConf()); + + public Creator> op() { + return new akka.stream.javadsl.japi.Creator>() { + @Override + public PushPullStage create() throws Exception { + return new PushPullStage() { + @Override + public Directive onPush(T element, Context ctx) { + return ctx.push(element); + } + + @Override + public Directive onPull(Context ctx) { + return ctx.pull(); + } + }; + } + }; + } + + @Test + public void mustBeAbleToUseMerge() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Flow f1 = Flow.of(String.class).section(OperationAttributes.name("f1"), new Function, Flow>() { + @Override + public Flow apply(Flow flow) { + return flow.transform(FlowGraphTest.this.op()); + } + }); + final Flow f2 = Flow.of(String.class).section(OperationAttributes.name("f2"), new Function, Flow>() { + @Override + public Flow apply(Flow flow) { + return flow.transform(FlowGraphTest.this.op()); + } + }); + final Flow f3 = Flow.of(String.class).section(OperationAttributes.name("f3"), new Function, Flow>() { + @Override + public Flow apply(Flow flow) { + return flow.transform(FlowGraphTest.this.op()); + } + }); + + final Source in1 = Source.from(Arrays.asList("a", "b", "c")); + final Source in2 = Source.from(Arrays.asList("d", "e", "f")); + + final KeyedSink> publisher = Sink.publisher(); + + final Merge merge = Merge.create(); + MaterializedMap m = FlowGraph.builder().addEdge(in1, f1, merge).addEdge(in2, f2, merge) + .addEdge(merge, f3, publisher).build().run(materializer); + + // collecting + final Publisher pub = m.get(publisher); + final Future> all = Source.from(pub).grouped(100).runWith(Sink.>head(), materializer); + + final List result = Await.result(all, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals(new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet(result)); + } + + @Test + public void mustBeAbleToUseZip() { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList("A", "B", "C"); + final Iterable input2 = Arrays.asList(1, 2, 3); + + final Source in1 = Source.from(input1); + final Source in2 = Source.from(input2); + final Zip zip = Zip.create(); + final KeyedSink, Future> out = Sink + .foreach(new Procedure>() { + @Override + public void apply(Pair param) throws Exception { + probe.getRef().tell(param, ActorRef.noSender()); + } + }); + + FlowGraph.builder().addEdge(in1, zip.left()).addEdge(in2, zip.right()).addEdge(zip.out(), out).run(materializer); + + List output = Arrays.asList(probe.receiveN(3)); + @SuppressWarnings("unchecked") + List> expected = Arrays.asList(new Pair("A", 1), new Pair( + "B", 2), new Pair("C", 3)); + assertEquals(expected, output); + } + + @Test + public void mustBeAbleToUseUnzip() { + final JavaTestKit probe1 = new JavaTestKit(system); + final JavaTestKit probe2 = new JavaTestKit(system); + + @SuppressWarnings("unchecked") + final List> input = Arrays.asList(new Pair("A", 1), + new Pair("B", 2), new Pair("C", 3)); + + final Iterable expected1 = Arrays.asList("A", "B", "C"); + final Iterable expected2 = Arrays.asList(1, 2, 3); + + final Source> in = Source.from(input); + final Unzip unzip = Unzip.create(); + + final KeyedSink> out1 = Sink.foreach(new Procedure() { + @Override + public void apply(String param) throws Exception { + probe1.getRef().tell(param, ActorRef.noSender()); + } + }); + final KeyedSink> out2 = Sink.foreach(new Procedure() { + @Override + public void apply(Integer param) throws Exception { + probe2.getRef().tell(param, ActorRef.noSender()); + } + }); + + FlowGraph.builder().addEdge(in, unzip.in()).addEdge(unzip.left(), out1).addEdge(unzip.right(), out2) + .run(materializer); + + List output1 = Arrays.asList(probe1.receiveN(3)); + List output2 = Arrays.asList(probe2.receiveN(3)); + assertEquals(expected1, output1); + assertEquals(expected2, output2); + } + +} diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 185410a07b..8d1dceb6fb 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -3,7 +3,6 @@ package akka.stream.javadsl; import akka.actor.ActorRef; import akka.dispatch.Foreach; import akka.dispatch.Futures; -import akka.dispatch.OnSuccess; import akka.japi.Pair; import akka.stream.OverflowStrategy; import akka.stream.StreamTest; @@ -14,15 +13,10 @@ import akka.testkit.JavaTestKit; import org.junit.ClassRule; import org.junit.Test; -import org.reactivestreams.Publisher; import scala.concurrent.Await; import scala.concurrent.Future; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import scala.runtime.BoxedUnit; -import scala.util.Try; import java.util.*; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -41,8 +35,9 @@ public class FlowTest extends StreamTest { final String[] lookup = { "a", "b", "c", "d", "e", "f" }; final java.lang.Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5); final Source ints = Source.from(input); - - ints.drop(2).take(3).takeWithin(FiniteDuration.create(10, TimeUnit.SECONDS)).map(new Function() { + final Flow flow1 = Flow.of(Integer.class).drop(2).take(3 + ).takeWithin(FiniteDuration.create(10, TimeUnit.SECONDS + )).map(new Function() { public String apply(Integer elem) { return lookup[elem]; } @@ -50,20 +45,25 @@ public class FlowTest extends StreamTest { public boolean test(String elem) { return !elem.equals("c"); } - }).grouped(2).mapConcat(new Function, java.util.List>() { + }); + final Flow flow2 = Flow.of(String.class).grouped(2 + ).mapConcat(new Function, java.util.List>() { public java.util.List apply(java.util.List elem) { return elem; } - }).groupedWithin(100, FiniteDuration.create(50, TimeUnit.MILLISECONDS)) - .mapConcat(new Function, java.util.List>() { + }).groupedWithin(100, FiniteDuration.create(50, TimeUnit.MILLISECONDS) + ).mapConcat(new Function, java.util.List>() { public java.util.List apply(java.util.List elem) { return elem; } - }).fold("", new Function2() { + }); + + ints.via(flow1.via(flow2)).fold("", new Function2() { public String apply(String acc, String elem) { return acc + elem; } - }, materializer).foreach(new Foreach() { // Scala Future + }, materializer + ).foreach(new Foreach() { // Scala Future public void each(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } @@ -72,37 +72,12 @@ public class FlowTest extends StreamTest { probe.expectMsgEquals("de"); } - @Test - public void mustBeAbleToUseVoidTypeInForeach() { - final JavaTestKit probe = new JavaTestKit(system); - final java.lang.Iterable input = Arrays.asList("a", "b", "c"); - Source ints = Source.from(input); - - Future completion = ints.foreach(new Procedure() { - public void apply(String elem) { - probe.getRef().tell(elem, ActorRef.noSender()); - } - }, materializer); - - completion.onSuccess(new OnSuccess() { - @Override - public void onSuccess(BoxedUnit elem) throws Throwable { - probe.getRef().tell(String.valueOf(elem), ActorRef.noSender()); - } - }, system.dispatcher()); - - probe.expectMsgEquals("a"); - probe.expectMsgEquals("b"); - probe.expectMsgEquals("c"); - probe.expectMsgEquals("()"); - } - @Test public void mustBeAbleToUseTransform() { final JavaTestKit probe = new JavaTestKit(system); final Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7); // duplicate each element, stop after 4 elements, and emit sum to the end - Source.from(input).transform(new Creator>() { + final Flow flow = Flow.of(Integer.class).transform(new Creator>() { @Override public PushPullStage create() throws Exception { return new StatefulStage() { @@ -133,7 +108,8 @@ public class FlowTest extends StreamTest { }; } - }).foreach(new Procedure() { + }); + Source.from(input).via(flow).foreach(new Procedure() { public void apply(Integer elem) { probe.getRef().tell(elem, ActorRef.noSender()); } @@ -154,11 +130,12 @@ public class FlowTest extends StreamTest { public void mustBeAbleToUseGroupBy() { final JavaTestKit probe = new JavaTestKit(system); final Iterable input = Arrays.asList("Aaa", "Abb", "Bcc", "Cdd", "Cee"); - Source.from(input).groupBy(new Function() { + final Flow>> slsFlow= Flow.of(String.class).groupBy(new Function() { public String apply(String elem) { return elem.substring(0, 1); } - }).foreach(new Procedure>>() { + }); + Source.from(input).via(slsFlow).foreach(new Procedure>>() { @Override public void apply(final Pair> pair) throws Exception { pair.second().foreach(new Procedure() { @@ -190,11 +167,12 @@ public class FlowTest extends StreamTest { public void mustBeAbleToUseSplitWhen() { final JavaTestKit probe = new JavaTestKit(system); final Iterable input = Arrays.asList("A", "B", "C", ".", "D", ".", "E", "F"); - Source.from(input).splitWhen(new Predicate() { + final Flow> flow = Flow.of(String.class).splitWhen(new Predicate() { public boolean test(String elem) { return elem.equals("."); } - }).foreach(new Procedure>() { + }); + Source.from(input).via(flow).foreach(new Procedure>() { @Override public void apply(Source subStream) throws Exception { subStream.filter(new Predicate() { @@ -227,127 +205,6 @@ public class FlowTest extends StreamTest { } - public Creator> op() { - return new akka.stream.javadsl.japi.Creator>() { - @Override - public PushPullStage create() throws Exception { - return new PushPullStage() { - @Override - public Directive onPush(T element, Context ctx) { - return ctx.push(element); - } - - @Override - public Directive onPull(Context ctx) { - return ctx.pull(); - } - }; - } - }; - } - - @Test - public void mustBeAbleToUseMerge() throws Exception { - final Flow f1 = Flow.of(String.class).section(OperationAttributes.name("f1"), new Function, Flow>() { - @Override - public Flow apply(Flow flow) { - return flow.transform(FlowTest.this.op()); - } - }); - final Flow f2 = Flow.of(String.class).section(OperationAttributes.name("f2"), new Function, Flow>() { - @Override - public Flow apply(Flow flow) { - return flow.transform(FlowTest.this.op()); - } - }); - final Flow f3 = Flow.of(String.class).section(OperationAttributes.name("f3"), new Function, Flow>() { - @Override - public Flow apply(Flow flow) { - return flow.transform(FlowTest.this.op()); - } - }); - - final Source in1 = Source.from(Arrays.asList("a", "b", "c")); - final Source in2 = Source.from(Arrays.asList("d", "e", "f")); - - final KeyedSink> publisher = Sink.publisher(); - - // this is red in intellij, but actually valid, scalac generates bridge methods for Java, so inference *works* - final Merge merge = Merge. create(); - MaterializedMap m = FlowGraph.builder().addEdge(in1, f1, merge).addEdge(in2, f2, merge) - .addEdge(merge, f3, publisher).build().run(materializer); - - // collecting - final Publisher pub = m.get(publisher); - final Future> all = Source.from(pub).grouped(100).runWith(Sink.>head(), materializer); - - final List result = Await.result(all, Duration.apply(200, TimeUnit.MILLISECONDS)); - assertEquals(new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet(result)); - } - - @Test - public void mustBeAbleToUseZip() { - final JavaTestKit probe = new JavaTestKit(system); - final Iterable input1 = Arrays.asList("A", "B", "C"); - final Iterable input2 = Arrays.asList(1, 2, 3); - - final Source in1 = Source.from(input1); - final Source in2 = Source.from(input2); - final Zip zip = Zip.create(); - final KeyedSink, Future> out = Sink - .foreach(new Procedure>() { - @Override - public void apply(Pair param) throws Exception { - probe.getRef().tell(param, ActorRef.noSender()); - } - }); - - FlowGraph.builder().addEdge(in1, zip.left()).addEdge(in2, zip.right()).addEdge(zip.out(), out).run(materializer); - - List output = Arrays.asList(probe.receiveN(3)); - @SuppressWarnings("unchecked") - List> expected = Arrays.asList(new Pair("A", 1), new Pair( - "B", 2), new Pair("C", 3)); - assertEquals(expected, output); - } - - @Test - public void mustBeAbleToUseUnzip() { - final JavaTestKit probe1 = new JavaTestKit(system); - final JavaTestKit probe2 = new JavaTestKit(system); - - @SuppressWarnings("unchecked") - final List> input = Arrays.asList(new Pair("A", 1), - new Pair("B", 2), new Pair("C", 3)); - - final Iterable expected1 = Arrays.asList("A", "B", "C"); - final Iterable expected2 = Arrays.asList(1, 2, 3); - - final Source> in = Source.from(input); - final Unzip unzip = Unzip.create(); - - final KeyedSink> out1 = Sink.foreach(new Procedure() { - @Override - public void apply(String param) throws Exception { - probe1.getRef().tell(param, ActorRef.noSender()); - } - }); - final KeyedSink> out2 = Sink.foreach(new Procedure() { - @Override - public void apply(Integer param) throws Exception { - probe2.getRef().tell(param, ActorRef.noSender()); - } - }); - - FlowGraph.builder().addEdge(in, unzip.in()).addEdge(unzip.left(), out1).addEdge(unzip.right(), out2) - .run(materializer); - - List output1 = Arrays.asList(probe1.receiveN(3)); - List output2 = Arrays.asList(probe2.receiveN(3)); - assertEquals(expected1, output1); - assertEquals(expected2, output2); - } - @Test public void mustBeAbleToUseConcat() { final JavaTestKit probe = new JavaTestKit(system); @@ -356,8 +213,8 @@ public class FlowTest extends StreamTest { final Source in1 = Source.from(input1); final Source in2 = Source.from(input2); - - in1.concat(in2).foreach(new Procedure() { + final Flow flow = Flow.of(String.class); + in1.via(flow.concat(in2)).foreach(new Procedure() { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } @@ -367,80 +224,13 @@ public class FlowTest extends StreamTest { assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output); } - @Test - public void mustBeAbleToUseCallableInput() { - final JavaTestKit probe = new JavaTestKit(system); - final Iterable input1 = Arrays.asList(4,3,2,1,0); - final akka.stream.javadsl.japi.Creator> input = new akka.stream.javadsl.japi.Creator>() { - @Override - public Iterator create() { - return input1.iterator(); - } - }; - Source.from(input).foreach(new Procedure() { - public void apply(Integer elem) { - probe.getRef().tell(elem, ActorRef.noSender()); - } - }, materializer); - - List output = Arrays.asList(probe.receiveN(5)); - assertEquals(Arrays.asList(4, 3, 2, 1, 0), output); - probe.expectNoMsg(FiniteDuration.create(500, TimeUnit.MILLISECONDS)); - } - - @Test - public void mustBeAbleToUseOnCompleteSuccess() { - final JavaTestKit probe = new JavaTestKit(system); - final Iterable input = Arrays.asList("A", "B", "C"); - - Source.from(input).runWith(Sink. onComplete(new Procedure() { - @Override - public void apply(BoxedUnit param) throws Exception { - probe.getRef().tell(param, ActorRef.noSender()); - } - }), materializer); - - probe.expectMsgClass(BoxedUnit.class); - } - - @Test - public void mustBeAbleToUseOnCompleteError() { - final JavaTestKit probe = new JavaTestKit(system); - final Iterable input = Arrays.asList("A", "B", "C"); - - Source.from(input).map(new Function() { - public String apply(String arg0) throws Exception { - throw new RuntimeException("simulated err"); - } - }).runWith(Sink.head(), materializer).onComplete(new OnSuccess>() { - @Override - public void onSuccess(Try e) throws Throwable { - if (e == null) { - probe.getRef().tell("done", ActorRef.noSender()); - } else { - probe.getRef().tell(e.failed().get().getMessage(), ActorRef.noSender()); - } - } - }, system.dispatcher()); - - probe.expectMsgEquals("simulated err"); - } - - @Test - public void mustBeAbleToUseToFuture() throws Exception { - final JavaTestKit probe = new JavaTestKit(system); - final Iterable input = Arrays.asList("A", "B", "C"); - Future future = Source.from(input).runWith(Sink.head(), materializer); - String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); - assertEquals("A", result); - } - @Test public void mustBeAbleToUsePrefixAndTail() throws Exception { final JavaTestKit probe = new JavaTestKit(system); final Iterable input = Arrays.asList(1, 2, 3, 4, 5, 6); - Future, Source>> future = Source.from(input).prefixAndTail(3) - .runWith(Sink., Source>>head(), materializer); + final Flow, Source>> flow = Flow.of(Integer.class).prefixAndTail(3); + Future, Source>> future = + flow.runWith(Source.from(input), Sink., Source>>head(), materializer); Pair, Source> result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); assertEquals(Arrays.asList(1, 2, 3), result.first()); @@ -458,8 +248,9 @@ public class FlowTest extends StreamTest { final List> mainInputs = Arrays.asList(Source.from(input1), Source.from(input2)); - Future> future = Source.from(mainInputs) - .flatten(akka.stream.javadsl.FlattenStrategy. concat()).grouped(6) + final Flow, List> flow = Flow.>create(). + flatten(akka.stream.javadsl.FlattenStrategy. concat()).grouped(6); + Future> future = Source.from(mainInputs).via(flow) .runWith(Sink.>head(), materializer); List result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); @@ -471,7 +262,8 @@ public class FlowTest extends StreamTest { public void mustBeAbleToUseBuffer() throws Exception { final JavaTestKit probe = new JavaTestKit(system); final List input = Arrays.asList("A", "B", "C"); - Future> future = Source.from(input).buffer(2, OverflowStrategy.backpressure()).grouped(4) + final Flow> flow = Flow.of(String.class).buffer(2, OverflowStrategy.backpressure()).grouped(4); + Future> future = Source.from(input).via(flow) .runWith(Sink.>head(), materializer); List result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); @@ -481,33 +273,33 @@ public class FlowTest extends StreamTest { @Test public void mustBeAbleToUseConflate() throws Exception { final JavaTestKit probe = new JavaTestKit(system); - // final List input = Arrays.asList("A", "B", "C"); // test was fleaky // TODO FIXME, test was fleaky! - final List input = Arrays.asList("C"); - Future future = Source.from(input).conflate(new Function() { + final List input = Arrays.asList("A", "B", "C"); + final Flow flow = Flow.of(String.class).conflate(new Function() { @Override public String apply(String s) throws Exception { return s; } }, new Function2() { @Override - public String apply(String in, String aggr) throws Exception { - return in; + public String apply(String aggr, String in) throws Exception { + return aggr + in; } - }).fold("", new Function2() { + }); + Future future = Source.from(input).via(flow).fold("", new Function2() { @Override public String apply(String aggr, String in) throws Exception { - return in; + return aggr + in; } }, materializer); String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); - assertEquals("C", result); + assertEquals("ABC", result); } @Test public void mustBeAbleToUseExpand() throws Exception { final JavaTestKit probe = new JavaTestKit(system); final List input = Arrays.asList("A", "B", "C"); - Future future = Source.from(input).expand(new Function() { + final Flow flow = Flow.of(String.class).expand(new Function() { @Override public String apply(String in) throws Exception { return in; @@ -517,45 +309,24 @@ public class FlowTest extends StreamTest { public Pair apply(String in) throws Exception { return new Pair(in, in); } - }).runWith(Sink.head(), materializer); + }); + final KeyedSink> sink = Sink.head(); + MaterializedMap map = Source.from(input).to(flow.to(sink)).run(materializer); + Future future = map.get(sink); String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); assertEquals("A", result); } @Test - public void mustProduceTicks() throws Exception { - final JavaTestKit probe = new JavaTestKit(system); - final Callable tick = new Callable() { - private int count = 1; - - @Override - public String call() { - return "tick-" + (count++); - } - }; - Source.from(FiniteDuration.create(1, TimeUnit.SECONDS), FiniteDuration.create(500, TimeUnit.MILLISECONDS), tick) - .foreach(new Procedure() { - public void apply(String elem) { - probe.getRef().tell(elem, ActorRef.noSender()); - } - }, materializer); - probe.expectNoMsg(FiniteDuration.create(600, TimeUnit.MILLISECONDS)); - probe.expectMsgEquals("tick-1"); - probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); - probe.expectMsgEquals("tick-2"); - probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); - - } - - @Test - public void mustBeAbleToUseMapFuture() throws Exception { + public void mustBeAbleToUseMapAsync() throws Exception { final JavaTestKit probe = new JavaTestKit(system); final Iterable input = Arrays.asList("a", "b", "c"); - Source.from(input).mapAsync(new Function>() { + final Flow flow = Flow.of(String.class).mapAsync(new Function>() { public Future apply(String elem) { return Futures.successful(elem.toUpperCase()); } - }).foreach(new Procedure() { + }); + Source.from(input).via(flow).foreach(new Procedure() { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } @@ -564,5 +335,4 @@ public class FlowTest extends StreamTest { probe.expectMsgEquals("B"); probe.expectMsgEquals("C"); } - } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java new file mode 100644 index 0000000000..16149a8618 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -0,0 +1,455 @@ +package akka.stream.javadsl; + +import akka.actor.ActorRef; +import akka.dispatch.Foreach; +import akka.dispatch.Futures; +import akka.dispatch.OnSuccess; +import akka.japi.Pair; +import akka.stream.OverflowStrategy; +import akka.stream.StreamTest; +import akka.stream.stage.*; +import akka.stream.javadsl.japi.*; +import akka.stream.testkit.AkkaSpec; +import akka.testkit.JavaTestKit; + +import org.junit.ClassRule; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; +import scala.runtime.BoxedUnit; +import scala.util.Try; + +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class SourceTest extends StreamTest { + public SourceTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("SourceTest", + AkkaSpec.testConf()); + + @Test + public void mustBeAbleToUseSimpleOperators() { + final JavaTestKit probe = new JavaTestKit(system); + final String[] lookup = {"a", "b", "c", "d", "e", "f"}; + final java.lang.Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5); + final Source ints = Source.from(input); + + ints.drop(2).take(3).takeWithin(FiniteDuration.create(10, TimeUnit.SECONDS)).map(new Function() { + public String apply(Integer elem) { + return lookup[elem]; + } + }).filter(new Predicate() { + public boolean test(String elem) { + return !elem.equals("c"); + } + }).grouped(2).mapConcat(new Function, java.util.List>() { + public java.util.List apply(java.util.List elem) { + return elem; + } + }).groupedWithin(100, FiniteDuration.create(50, TimeUnit.MILLISECONDS)) + .mapConcat(new Function, java.util.List>() { + public java.util.List apply(java.util.List elem) { + return elem; + } + }).fold("", new Function2() { + public String apply(String acc, String elem) { + return acc + elem; + } + }, materializer).foreach(new Foreach() { // Scala Future + public void each(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, system.dispatcher()); + + probe.expectMsgEquals("de"); + } + + @Test + public void mustBeAbleToUseVoidTypeInForeach() { + final JavaTestKit probe = new JavaTestKit(system); + final java.lang.Iterable input = Arrays.asList("a", "b", "c"); + Source ints = Source.from(input); + + Future completion = ints.foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + + completion.onSuccess(new OnSuccess() { + @Override + public void onSuccess(BoxedUnit elem) throws Throwable { + probe.getRef().tell(String.valueOf(elem), ActorRef.noSender()); + } + }, system.dispatcher()); + + probe.expectMsgEquals("a"); + probe.expectMsgEquals("b"); + probe.expectMsgEquals("c"); + probe.expectMsgEquals("()"); + } + + @Test + public void mustBeAbleToUseTransform() { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7); + // duplicate each element, stop after 4 elements, and emit sum to the end + Source.from(input).transform(new Creator>() { + @Override + public PushPullStage create() throws Exception { + return new StatefulStage() { + int sum = 0; + int count = 0; + + @Override + public StageState initial() { + return new StageState() { + @Override + public Directive onPush(Integer element, Context ctx) { + sum += element; + count += 1; + if (count == 4) { + return emitAndFinish(Arrays.asList(element, element, sum).iterator(), ctx); + } else { + return emit(Arrays.asList(element, element).iterator(), ctx); + } + } + + }; + } + + @Override + public TerminationDirective onUpstreamFinish(Context ctx) { + return terminationEmit(Collections.singletonList(sum).iterator(), ctx); + } + + }; + } + }).foreach(new Procedure() { + public void apply(Integer elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + + probe.expectMsgEquals(0); + probe.expectMsgEquals(0); + probe.expectMsgEquals(1); + probe.expectMsgEquals(1); + probe.expectMsgEquals(2); + probe.expectMsgEquals(2); + probe.expectMsgEquals(3); + probe.expectMsgEquals(3); + probe.expectMsgEquals(6); + } + + @Test + public void mustBeAbleToUseGroupBy() { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input = Arrays.asList("Aaa", "Abb", "Bcc", "Cdd", "Cee"); + Source.from(input).groupBy(new Function() { + public String apply(String elem) { + return elem.substring(0, 1); + } + }).foreach(new Procedure>>() { + @Override + public void apply(final Pair> pair) throws Exception { + pair.second().foreach(new Procedure() { + @Override + public void apply(String elem) throws Exception { + probe.getRef().tell(new Pair(pair.first(), elem), ActorRef.noSender()); + } + }, materializer); + } + }, materializer); + + Map> grouped = new HashMap>(); + for (Object o : probe.receiveN(5)) { + @SuppressWarnings("unchecked") + Pair p = (Pair) o; + List g = grouped.get(p.first()); + if (g == null) { + g = new ArrayList(); + } + g.add(p.second()); + grouped.put(p.first(), g); + } + + assertEquals(Arrays.asList("Aaa", "Abb"), grouped.get("A")); + + } + + @Test + public void mustBeAbleToUseSplitWhen() { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input = Arrays.asList("A", "B", "C", ".", "D", ".", "E", "F"); + Source.from(input).splitWhen(new Predicate() { + public boolean test(String elem) { + return elem.equals("."); + } + }).foreach(new Procedure>() { + @Override + public void apply(Source subStream) throws Exception { + subStream.filter(new Predicate() { + @Override + public boolean test(String elem) { + return !elem.equals("."); + } + }).grouped(10).foreach(new Procedure>() { + @Override + public void apply(List chunk) throws Exception { + probe.getRef().tell(chunk, ActorRef.noSender()); + } + }, materializer); + } + }, materializer); + + for (Object o : probe.receiveN(3)) { + @SuppressWarnings("unchecked") + List chunk = (List) o; + if (chunk.get(0).equals("A")) { + assertEquals(Arrays.asList("A", "B", "C"), chunk); + } else if (chunk.get(0).equals("D")) { + assertEquals(Arrays.asList("D"), chunk); + } else if (chunk.get(0).equals("E")) { + assertEquals(Arrays.asList("E", "F"), chunk); + } else { + assertEquals("[A, B, C] or [D] or [E, F]", chunk); + } + } + + } + + @Test + public void mustBeAbleToUseConcat() { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList("A", "B", "C"); + final Iterable input2 = Arrays.asList("D", "E", "F"); + + final Source in1 = Source.from(input1); + final Source in2 = Source.from(input2); + + in1.concat(in2).foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + + List output = Arrays.asList(probe.receiveN(6)); + assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output); + } + + @Test + public void mustBeAbleToUseCallableInput() { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList(4, 3, 2, 1, 0); + final akka.stream.javadsl.japi.Creator> input = new akka.stream.javadsl.japi.Creator>() { + @Override + public Iterator create() { + return input1.iterator(); + } + }; + Source.from(input).foreach(new Procedure() { + public void apply(Integer elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + + List output = Arrays.asList(probe.receiveN(5)); + assertEquals(Arrays.asList(4, 3, 2, 1, 0), output); + probe.expectNoMsg(FiniteDuration.create(500, TimeUnit.MILLISECONDS)); + } + + @Test + public void mustBeAbleToUseOnCompleteSuccess() { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input = Arrays.asList("A", "B", "C"); + + Source.from(input).runWith(Sink.onComplete(new Procedure() { + @Override + public void apply(BoxedUnit param) throws Exception { + probe.getRef().tell(param, ActorRef.noSender()); + } + }), materializer); + + probe.expectMsgClass(BoxedUnit.class); + } + + @Test + public void mustBeAbleToUseOnCompleteError() { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input = Arrays.asList("A", "B", "C"); + + Source.from(input).map(new Function() { + public String apply(String arg0) throws Exception { + throw new RuntimeException("simulated err"); + } + }).runWith(Sink.head(), materializer).onComplete(new OnSuccess>() { + @Override + public void onSuccess(Try e) throws Throwable { + if (e == null) { + probe.getRef().tell("done", ActorRef.noSender()); + } else { + probe.getRef().tell(e.failed().get().getMessage(), ActorRef.noSender()); + } + } + }, system.dispatcher()); + + probe.expectMsgEquals("simulated err"); + } + + @Test + public void mustBeAbleToUseToFuture() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input = Arrays.asList("A", "B", "C"); + Future future = Source.from(input).runWith(Sink.head(), materializer); + String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals("A", result); + } + + @Test + public void mustBeAbleToUsePrefixAndTail() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input = Arrays.asList(1, 2, 3, 4, 5, 6); + Future, Source>> future = Source.from(input).prefixAndTail(3) + .runWith(Sink., Source>>head(), materializer); + Pair, Source> result = Await.result(future, + probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals(Arrays.asList(1, 2, 3), result.first()); + + Future> tailFuture = result.second().grouped(4).runWith(Sink.>head(), materializer); + List tailResult = Await.result(tailFuture, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals(Arrays.asList(4, 5, 6), tailResult); + } + + @Test + public void mustBeAbleToUseConcatAllWithSources() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList(1, 2, 3); + final Iterable input2 = Arrays.asList(4, 5); + + final List> mainInputs = Arrays.asList(Source.from(input1), Source.from(input2)); + + Future> future = Source.from(mainInputs) + .flatten(akka.stream.javadsl.FlattenStrategy.concat()).grouped(6) + .runWith(Sink.>head(), materializer); + + List result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + + assertEquals(Arrays.asList(1, 2, 3, 4, 5), result); + } + + @Test + public void mustBeAbleToUseBuffer() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final List input = Arrays.asList("A", "B", "C"); + Future> future = Source.from(input).buffer(2, OverflowStrategy.backpressure()).grouped(4) + .runWith(Sink.>head(), materializer); + + List result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals(input, result); + } + + @Test + public void mustBeAbleToUseConflate() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final List input = Arrays.asList("A", "B", "C"); + Future future = Source.from(input).conflate(new Function() { + @Override + public String apply(String s) throws Exception { + return s; + } + }, new Function2() { + @Override + public String apply(String aggr, String in) throws Exception { + return aggr + in; + } + }).fold("", new Function2() { + @Override + public String apply(String aggr, String in) throws Exception { + return aggr + in; + } + }, materializer); + String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals("ABC", result); + } + + @Test + public void mustBeAbleToUseExpand() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final List input = Arrays.asList("A", "B", "C"); + Future future = Source.from(input).expand(new Function() { + @Override + public String apply(String in) throws Exception { + return in; + } + }, new Function>() { + @Override + public Pair apply(String in) throws Exception { + return new Pair(in, in); + } + }).runWith(Sink.head(), materializer); + String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals("A", result); + } + + @Test + public void mustProduceTicks() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Callable tick = new Callable() { + private int count = 1; + + @Override + public String call() { + return "tick-" + (count++); + } + }; + Source.from(FiniteDuration.create(1, TimeUnit.SECONDS), FiniteDuration.create(500, TimeUnit.MILLISECONDS), tick) + .foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + probe.expectNoMsg(FiniteDuration.create(600, TimeUnit.MILLISECONDS)); + probe.expectMsgEquals("tick-1"); + probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); + probe.expectMsgEquals("tick-2"); + probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); + + } + + @Test + public void mustBeAbleToUseMapFuture() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input = Arrays.asList("a", "b", "c"); + Source.from(input).mapAsync(new Function>() { + public Future apply(String elem) { + return Futures.successful(elem.toUpperCase()); + } + }).foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + probe.expectMsgEquals("A"); + probe.expectMsgEquals("B"); + probe.expectMsgEquals("C"); + } + + @Test + public void mustWorkFromFuture() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input = Arrays.asList("A", "B", "C"); + Future future1 = Source.from(input).runWith(Sink.head(), materializer); + Future future2 = Source.from(future1).runWith(Sink.head(), materializer); + String result = Await.result(future2, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals("A", result); + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala index e400013866..f4699229a3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala @@ -6,7 +6,7 @@ package akka.stream.scaladsl import scala.concurrent.Await import scala.util.control.NoStackTrace -import akka.stream.FlowMaterializer +import akka.stream.{ OverflowStrategy, FlowMaterializer } import akka.stream.testkit.AkkaSpec import akka.testkit.DefaultTimeout diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index e792cb2c2a..ed568c6d57 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -111,7 +111,7 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { * @tparam T materialized type of given KeyedSink */ def runWith[T](source: javadsl.Source[In], sink: javadsl.KeyedSink[Out, T], materializer: FlowMaterializer): T = - delegate.runWith(source.asScala, sink.asScala)(materializer).asInstanceOf[T] + delegate.runWith(source.asScala, sink.asScala)(materializer)._2.asInstanceOf[T] /** * Connect the `KeyedSource` to this `Flow` and then connect it to the `Sink` and run it. @@ -121,7 +121,7 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { * @tparam T materialized type of given KeyedSource */ def runWith[T](source: javadsl.KeyedSource[In, T], sink: javadsl.Sink[Out], materializer: FlowMaterializer): T = - delegate.runWith(source.asScala, sink.asScala)(materializer).asInstanceOf[T] + delegate.runWith(source.asScala, sink.asScala)(materializer)._1.asInstanceOf[T] /** * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 59c5530fa7..2db9a4bc16 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -208,7 +208,7 @@ class Source[+Out](delegate: scaladsl.Source[Out]) { * source. */ def concat[Out2 >: Out](second: Source[Out2]): Source[Out2] = - delegate.concat(second.asScala).asJava + Source.concat(this, second) /** * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked