diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index 1796a7023b..9c55de5dea 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -197,6 +197,22 @@ fromPublisher ^^^^^^^^^^^^^ Integration with Reactive Streams, subscribes to a ``org.reactivestreams.Publisher``. +zipN +^^^^ +Combine the elements of multiple streams into a stream of sequences. + +**emits** when all of the inputs has an element available + +**completes** when any upstream completes + +zipWithN +^^^^^^^^ +Combine the elements of multiple streams into a stream of sequences using a combiner function. + +**emits** when all of the inputs has an element available + +**completes** when any upstream completes + diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index ecd444106a..27bd060301 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -186,6 +186,22 @@ fromPublisher ^^^^^^^^^^^^^ Integration with Reactive Streams, subscribes to a ``org.reactivestreams.Publisher``. +zipN +^^^^ +Combine the elements of multiple streams into a stream of sequences. + +**emits** when all of the inputs has an element available + +**completes** when any upstream completes + +zipWithN +^^^^^^^^ +Combine the elements of multiple streams into a stream of sequences using a combiner function. + +**emits** when all of the inputs has an element available + +**completes** when any upstream completes + diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDSLTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDSLTest.java index 41f5a71567..b0f66a8bd2 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDSLTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDSLTest.java @@ -86,7 +86,7 @@ public class GraphDSLTest extends StreamTest { final Publisher pub = source.runWith(publisher, materializer); final CompletionStage> all = Source.fromPublisher(pub).limit(100).runWith(Sink.seq(), materializer); - final List result = all.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + final List result = all.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet(result)); } @@ -189,7 +189,7 @@ public class GraphDSLTest extends StreamTest { } )).run(materializer); - Duration d = Duration.create(300, TimeUnit.MILLISECONDS); + Duration d = Duration.create(3, TimeUnit.SECONDS); Object output1 = probe1.receiveOne(d); Object output2 = probe2.receiveOne(d); @@ -235,7 +235,7 @@ public class GraphDSLTest extends StreamTest { } })).run(materializer); - Duration d = Duration.create(300, TimeUnit.MILLISECONDS); + Duration d = Duration.create(3, TimeUnit.SECONDS); Object output1 = probe1.receiveOne(d); Object output2 = probe2.receiveOne(d); @@ -269,7 +269,59 @@ public class GraphDSLTest extends StreamTest { return ClosedShape.getInstance(); })).run(materializer); - final Integer result = future.toCompletableFuture().get(300, TimeUnit.MILLISECONDS); + final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); + assertEquals(11, (int) result); + } + + @Test + public void mustBeAbleToUseZipN() throws Exception { + final Source in1 = Source.single(1); + final Source in2 = Source.single(10); + + final Graph>, NotUsed> sumZip = ZipN.create(2); + + final CompletionStage> future = RunnableGraph.fromGraph(GraphDSL.create(Sink.>head(), + (b, out) -> { + final UniformFanInShape> zip = b.add(sumZip); + b.from(b.add(in1)).toInlet(zip.in(0)); + b.from(b.add(in2)).toInlet(zip.in(1)); + b.from(zip.out()).to(out); + return ClosedShape.getInstance(); + })).run(materializer); + + final List result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); + + assertEquals(Arrays.asList(1, 10), result); + } + + @Test + public void mustBeAbleToUseZipWithN() throws Exception { + final Source in1 = Source.single(1); + final Source in2 = Source.single(10); + + final Graph, NotUsed> sumZip = ZipWithN.create( + new Function, Integer>() { + @Override public Integer apply(List list) throws Exception { + Integer sum = 0; + + for(Integer i : list) { + sum += i; + } + + return sum; + } + }, 2); + + final CompletionStage future = RunnableGraph.fromGraph(GraphDSL.create(Sink.head(), + (b, out) -> { + final UniformFanInShape zip = b.add(sumZip); + b.from(b.add(in1)).toInlet(zip.in(0)); + b.from(b.add(in2)).toInlet(zip.in(1)); + b.from(zip.out()).to(out); + return ClosedShape.getInstance(); + })).run(materializer); + + final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(11, (int) result); } @@ -298,7 +350,7 @@ public class GraphDSLTest extends StreamTest { return ClosedShape.getInstance(); })).run(materializer); - final Integer result = future.toCompletableFuture().get(300, TimeUnit.MILLISECONDS); + final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(1111, (int) result); } @@ -315,7 +367,7 @@ public class GraphDSLTest extends StreamTest { return ClosedShape.getInstance(); })).run(materializer); - final Integer result = future.toCompletableFuture().get(300, TimeUnit.MILLISECONDS); + final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(1, (int) result); probe.expectMsg(1); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 80aa817f82..35b7383516 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -554,7 +554,7 @@ public class SourceTest extends StreamTest { probe.expectMsgEquals(","); probe.expectMsgEquals("3"); probe.expectMsgEquals("]"); - future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); } @Test @@ -574,7 +574,7 @@ public class SourceTest extends StreamTest { probe.expectMsgEquals("2"); probe.expectMsgEquals(","); probe.expectMsgEquals("3"); - future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); } @Test @@ -591,7 +591,7 @@ public class SourceTest extends StreamTest { probe.expectMsgEquals(2); probe.expectMsgEquals(3); - future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); } @Test @@ -612,7 +612,7 @@ public class SourceTest extends StreamTest { FiniteDuration duration = Duration.apply(200, TimeUnit.MILLISECONDS); probe.expectNoMsg(duration); - future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); } @Test @@ -637,7 +637,7 @@ public class SourceTest extends StreamTest { s.sendNext(1); probe.expectMsgEquals(0); - future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); } @Test @@ -654,7 +654,41 @@ public class SourceTest extends StreamTest { probe.expectMsgAllOf(0, 1, 2, 3); - future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); + } + + @Test + public void mustBeAbleToZipN() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Source source1 = Source.from(Arrays.asList(0, 1)); + final Source source2 = Source.from(Arrays.asList(2, 3)); + + final List> sources = Arrays.asList(source1, source2); + + final Source, ?> source = Source.zipN(sources); + + final CompletionStage future = source.runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer); + + probe.expectMsgAllOf(Arrays.asList(0, 2), Arrays.asList(1, 3)); + + future.toCompletableFuture().get(3, TimeUnit.SECONDS); + } + + @Test + public void mustBeAbleToZipWithN() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Source source1 = Source.from(Arrays.asList(0, 1)); + final Source source2 = Source.from(Arrays.asList(2, 3)); + + final List> sources = Arrays.asList(source1, source2); + + final Source source = Source.zipWithN(list -> new Boolean(list.contains(0)), sources); + + final CompletionStage future = source.runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer); + + probe.expectMsgAllOf(Boolean.TRUE, Boolean.FALSE); + + future.toCompletableFuture().get(3, TimeUnit.SECONDS); } @Test diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index c1376d6029..0d274b6624 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -30,6 +30,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { (classOf[scala.collection.immutable.Iterable[_]], classOf[java.lang.Iterable[_]]) :: (classOf[scala.collection.Iterator[_]], classOf[java.util.Iterator[_]]) :: (classOf[scala.collection.Seq[_]], classOf[java.util.List[_]]) :: + (classOf[scala.collection.immutable.Seq[_]], classOf[java.util.List[_]]) :: (classOf[scala.collection.immutable.Set[_]], classOf[java.util.Set[_]]) :: (classOf[Boolean], classOf[akka.stream.javadsl.AsPublisher]) :: (classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) :: diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipNSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipNSpec.scala new file mode 100644 index 0000000000..e10399f3df --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipNSpec.scala @@ -0,0 +1,227 @@ +/** + * Copyright (C) 2014-2016 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit._ +import akka.stream.testkit.Utils._ +import akka.stream._ + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.collection.immutable + +class GraphZipNSpec extends TwoStreamsSetup { + import GraphDSL.Implicits._ + + override type Outputs = immutable.Seq[Int] + + override def fixture(b: GraphDSL.Builder[_]): Fixture = new Fixture(b) { + val zipN = b.add(ZipN[Int](2)) + + override def left: Inlet[Int] = zipN.in(0) + override def right: Inlet[Int] = zipN.in(1) + override def out: Outlet[immutable.Seq[Int]] = zipN.out + } + + "ZipN" must { + + "work in the happy case" in assertAllStagesStopped { + val probe = TestSubscriber.manualProbe[immutable.Seq[Int]]() + + RunnableGraph.fromGraph(GraphDSL.create() { implicit b ⇒ + val zipN = b.add(ZipN[Int](2)) + + Source(1 to 4) ~> zipN.in(0) + Source(2 to 5) ~> zipN.in(1) + + zipN.out ~> Sink.fromSubscriber(probe) + + ClosedShape + }).run() + + val subscription = probe.expectSubscription() + + subscription.request(2) + probe.expectNext(immutable.Seq(1, 2)) + probe.expectNext(immutable.Seq(2, 3)) + + subscription.request(1) + probe.expectNext(immutable.Seq(3, 4)) + subscription.request(1) + probe.expectNext(immutable.Seq(4, 5)) + + probe.expectComplete() + } + + "complete if one side is available but other already completed" in { + val upstream1 = TestPublisher.probe[Int]() + val upstream2 = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[immutable.Seq[Int]]() + + RunnableGraph.fromGraph(GraphDSL.create(Sink.fromSubscriber(downstream)) { implicit b ⇒ + out ⇒ + val zipN = b.add(ZipN[Int](2)) + + Source.fromPublisher(upstream1) ~> zipN.in(0) + Source.fromPublisher(upstream2) ~> zipN.in(1) + zipN.out ~> out + + ClosedShape + }).run() + + upstream1.sendNext(1) + upstream1.sendNext(2) + upstream2.sendNext(2) + upstream2.sendComplete() + + downstream.requestNext(immutable.Seq(1, 2)) + downstream.expectComplete() + upstream1.expectCancellation() + } + + "complete even if no pending demand" in { + val upstream1 = TestPublisher.probe[Int]() + val upstream2 = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[immutable.Seq[Int]]() + + RunnableGraph.fromGraph(GraphDSL.create(Sink.fromSubscriber(downstream)) { implicit b ⇒ + out ⇒ + val zipN = b.add(ZipN[Int](2)) + + Source.fromPublisher(upstream1) ~> zipN.in(0) + Source.fromPublisher(upstream2) ~> zipN.in(1) + zipN.out ~> out + + ClosedShape + }).run() + + downstream.request(1) + + upstream1.sendNext(1) + upstream2.sendNext(2) + downstream.expectNext(immutable.Seq(1, 2)) + + upstream2.sendComplete() + downstream.expectComplete() + upstream1.expectCancellation() + } + + "complete if both sides complete before requested with elements pending" in { + val upstream1 = TestPublisher.probe[Int]() + val upstream2 = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[immutable.Seq[Int]]() + + RunnableGraph.fromGraph(GraphDSL.create(Sink.fromSubscriber(downstream)) { implicit b ⇒ + out ⇒ + val zipN = b.add(ZipN[Int](2)) + + Source.fromPublisher(upstream1) ~> zipN.in(0) + Source.fromPublisher(upstream2) ~> zipN.in(1) + zipN.out ~> out + + ClosedShape + }).run() + + upstream1.sendNext(1) + upstream2.sendNext(2) + + upstream1.sendComplete() + upstream2.sendComplete() + + downstream.requestNext(immutable.Seq(1, 2)) + downstream.expectComplete() + } + + "complete if one side complete before requested with elements pending" in { + val upstream1 = TestPublisher.probe[Int]() + val upstream2 = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[immutable.Seq[Int]]() + + RunnableGraph.fromGraph(GraphDSL.create(Sink.fromSubscriber(downstream)) { implicit b ⇒ + out ⇒ + val zipN = b.add(ZipN[Int](2)) + + Source.fromPublisher(upstream1) ~> zipN.in(0) + Source.fromPublisher(upstream2) ~> zipN.in(1) + zipN.out ~> out + + ClosedShape + }).run() + + upstream1.sendNext(1) + upstream1.sendNext(2) + upstream2.sendNext(2) + + upstream1.sendComplete() + upstream2.sendComplete() + + downstream.requestNext(immutable.Seq(1, 2)) + downstream.expectComplete() + } + + "complete if one side complete before requested with elements pending 2" in { + val upstream1 = TestPublisher.probe[Int]() + val upstream2 = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[immutable.Seq[Int]]() + + RunnableGraph.fromGraph(GraphDSL.create(Sink.fromSubscriber(downstream)) { implicit b ⇒ + out ⇒ + val zipN = b.add(ZipN[Int](2)) + + Source.fromPublisher(upstream1) ~> zipN.in(0) + Source.fromPublisher(upstream2) ~> zipN.in(1) + zipN.out ~> out + + ClosedShape + }).run() + + downstream.ensureSubscription() + + upstream1.sendNext(1) + upstream1.sendComplete() + downstream.expectNoMsg(500.millis) + + upstream2.sendNext(2) + upstream2.sendComplete() + downstream.requestNext(immutable.Seq(1, 2)) + downstream.expectComplete() + } + + commonTests() + + "work with one immediately completed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher) + subscriber2.expectSubscriptionAndComplete() + } + + "work with one delayed completed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher) + subscriber2.expectSubscriptionAndComplete() + } + + "work with one immediately failed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndError(TestException) + + val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher) + subscriber2.expectSubscriptionAndError(TestException) + } + + "work with one delayed failed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndError(TestException) + + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher) + val subscription2 = subscriber2.expectSubscriptionAndError(TestException) + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithNSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithNSpec.scala new file mode 100644 index 0000000000..d740c6fdbd --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithNSpec.scala @@ -0,0 +1,162 @@ +package akka.stream.scaladsl + +import akka.stream.testkit._ +import scala.concurrent.duration._ +import akka.stream._ +import akka.testkit.EventFilter +import scala.collection.immutable + +class GraphZipWithNSpec extends TwoStreamsSetup { + import GraphDSL.Implicits._ + + override type Outputs = Int + + override def fixture(b: GraphDSL.Builder[_]): Fixture = new Fixture(b) { + val zip = b.add(ZipWithN((_: immutable.Seq[Int]).sum)(2)) + override def left: Inlet[Int] = zip.in(0) + override def right: Inlet[Int] = zip.in(1) + override def out: Outlet[Int] = zip.out + } + + "ZipWithN" must { + + "work in the happy case" in { + val probe = TestSubscriber.manualProbe[Outputs]() + + RunnableGraph.fromGraph(GraphDSL.create() { implicit b ⇒ + val zip = b.add(ZipWithN((_: immutable.Seq[Int]).sum)(2)) + Source(1 to 4) ~> zip.in(0) + Source(10 to 40 by 10) ~> zip.in(1) + + zip.out ~> Sink.fromSubscriber(probe) + + ClosedShape + }).run() + + val subscription = probe.expectSubscription() + + subscription.request(2) + probe.expectNext(11) + probe.expectNext(22) + + subscription.request(1) + probe.expectNext(33) + subscription.request(1) + probe.expectNext(44) + + probe.expectComplete() + } + + "work in the sad case" in { + val probe = TestSubscriber.manualProbe[Outputs]() + + RunnableGraph.fromGraph(GraphDSL.create() { implicit b ⇒ + val zip = b.add(ZipWithN((_: immutable.Seq[Int]).foldLeft(1)(_ / _))(2)) + + Source(1 to 4) ~> zip.in(0) + Source(-2 to 2) ~> zip.in(1) + + zip.out ~> Sink.fromSubscriber(probe) + + ClosedShape + }).run() + + val subscription = probe.expectSubscription() + + subscription.request(2) + probe.expectNext(1 / 1 / -2) + probe.expectNext(1 / 2 / -1) + + EventFilter[ArithmeticException](occurrences = 1).intercept { + subscription.request(2) + } + probe.expectError() match { + case a: java.lang.ArithmeticException ⇒ a.getMessage should be("/ by zero") + } + probe.expectNoMsg(200.millis) + } + + commonTests() + + "work with one immediately completed and one nonempty publisher" in { + val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher) + subscriber2.expectSubscriptionAndComplete() + } + + "work with one delayed completed and one nonempty publisher" in { + val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher) + subscriber2.expectSubscriptionAndComplete() + } + + "work with one immediately failed and one nonempty publisher" in { + val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndError(TestException) + + val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher) + subscriber2.expectSubscriptionAndError(TestException) + } + + "work with one delayed failed and one nonempty publisher" in { + val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndError(TestException) + + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher) + val subscription2 = subscriber2.expectSubscriptionAndError(TestException) + } + + "work with 3 inputs" in { + val probe = TestSubscriber.manualProbe[Int]() + + RunnableGraph.fromGraph(GraphDSL.create() { implicit b ⇒ + val zip = b.add(ZipWithN((_: immutable.Seq[Int]).sum)(3)) + + Source.single(1) ~> zip.in(0) + Source.single(2) ~> zip.in(1) + Source.single(3) ~> zip.in(2) + + zip.out ~> Sink.fromSubscriber(probe) + + ClosedShape + }).run() + + val subscription = probe.expectSubscription() + + subscription.request(5) + probe.expectNext(6) + + probe.expectComplete() + } + + "work with 30 inputs" in { + val probe = TestSubscriber.manualProbe[Int]() + + RunnableGraph.fromGraph(GraphDSL.create() { implicit b ⇒ + val zip = b.add(ZipWithN((_: immutable.Seq[Int]).sum)(30)) + + (0 to 29).foreach { + n ⇒ Source.single(n) ~> zip.in(n) + } + + zip.out ~> Sink.fromSubscriber(probe) + + ClosedShape + }).run() + + val subscription = probe.expectSubscription() + + subscription.request(1) + probe.expectNext((0 to 29).sum) + + probe.expectComplete() + + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 2088d8e133..d04dc8642c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -15,6 +15,7 @@ import akka.stream.testkit._ import akka.NotUsed import akka.testkit.EventFilter import akka.testkit.AkkaSpec +import scala.collection.immutable class SourceSpec extends AkkaSpec with DefaultTimeout { @@ -93,7 +94,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout { c.expectNoMsg(300.millis) subs.cancel() - Await.result(f.future, 500.millis) shouldEqual None + Await.result(f.future, 3.seconds) shouldEqual None } "allow external triggering of empty completion" in Utils.assertAllStagesStopped { @@ -105,7 +106,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout { // external cancellation neverPromise.trySuccess(None) shouldEqual true - Await.result(counterFuture, 500.millis) shouldEqual 0 + Await.result(counterFuture, 3.seconds) shouldEqual 0 } "allow external triggering of non-empty completion" in Utils.assertAllStagesStopped { @@ -117,7 +118,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout { // external cancellation neverPromise.trySuccess(Some(6)) shouldEqual true - Await.result(counterFuture, 500.millis) shouldEqual 6 + Await.result(counterFuture, 3.seconds) shouldEqual 6 } "allow external triggering of onError" in Utils.assertAllStagesStopped { @@ -129,7 +130,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout { // external cancellation neverPromise.failure(new Exception("Boom") with NoStackTrace) - val ready = Await.ready(counterFuture, 500.millis) + val ready = Await.ready(counterFuture, 3.seconds) val Failure(ex) = ready.value.get ex.getMessage should include("Boom") } @@ -138,11 +139,11 @@ class SourceSpec extends AkkaSpec with DefaultTimeout { "Composite Source" must { "merge from many inputs" in { - val probes = Seq.fill(5)(TestPublisher.manualProbe[Int]()) + val probes = immutable.Seq.fill(5)(TestPublisher.manualProbe[Int]()) val source = Source.asSubscriber[Int] val out = TestSubscriber.manualProbe[Int] - val s = Source.fromGraph(GraphDSL.create(source, source, source, source, source)(Seq(_, _, _, _, _)) { implicit b ⇒ + val s = Source.fromGraph(GraphDSL.create(source, source, source, source, source)(immutable.Seq(_, _, _, _, _)) { implicit b ⇒ (i0, i1, i2, i3, i4) ⇒ import GraphDSL.Implicits._ val m = b.add(Merge[Int](5)) @@ -171,7 +172,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout { } "combine from many inputs with simplified API" in { - val probes = Seq.fill(3)(TestPublisher.manualProbe[Int]()) + val probes = immutable.Seq.fill(3)(TestPublisher.manualProbe[Int]()) val source = for (i ← 0 to 2) yield Source.fromPublisher(probes(i)) val out = TestSubscriber.manualProbe[Int] @@ -193,7 +194,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout { } "combine from two inputs with simplified API" in { - val probes = Seq.fill(2)(TestPublisher.manualProbe[Int]()) + val probes = immutable.Seq.fill(2)(TestPublisher.manualProbe[Int]()) val source = Source.fromPublisher(probes(0)) :: Source.fromPublisher(probes(1)) :: Nil val out = TestSubscriber.manualProbe[Int] @@ -268,7 +269,36 @@ class SourceSpec extends AkkaSpec with DefaultTimeout { Source.fromIterator(() ⇒ Iterator.iterate(false)(!_)) .grouped(10) .runWith(Sink.head) - .futureValue should ===(Seq(false, true, false, true, false, true, false, true, false, true)) + .futureValue should ===(immutable.Seq(false, true, false, true, false, true, false, true, false, true)) + } + } + + "ZipN Source" must { + "properly zipN" in { + val sources = immutable.Seq( + Source(List(1, 2, 3)), + Source(List(10, 20, 30)), + Source(List(100, 200, 300))) + + Source.zipN(sources) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq( + immutable.Seq(1, 10, 100), + immutable.Seq(2, 20, 200), + immutable.Seq(3, 30, 300))) + } + } + + "ZipWithN Source" must { + "properly zipWithN" in { + val sources = immutable.Seq( + Source(List(1, 2, 3)), + Source(List(10, 20, 30)), + Source(List(100, 200, 300))) + + Source.zipWithN[Int, Int](_.sum)(sources) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq(111, 222, 333)) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 472bd094c1..60968ad03f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -69,6 +69,8 @@ private[stream] object Stages { val broadcast = name("broadcast") val balance = name("balance") val zip = name("zip") + val zipN = name("zipN") + val zipWithN = name("zipWithN") val unzip = name("unzip") val concat = name("concat") val repeat = name("repeat") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index a8d11b31b9..7a70645f70 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -243,6 +243,45 @@ object Zip { new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) } } +/** + * Combine the elements of multiple streams into a stream of lists. + * + * A `ZipN` has a `n` input ports and one `out` port + * + * '''Emits when''' all of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels + */ +object ZipN { + def create[A](n: Int): Graph[UniformFanInShape[A, java.util.List[A]], NotUsed] = { + ZipWithN.create(ConstantFun.javaIdentityFunction[java.util.List[A]], n) + } +} + +/** + * Combine the elements of multiple streams into a stream of lists using a combiner function. + * + * A `ZipWithN` has a `n` input ports and one `out` port + * + * '''Emits when''' all of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels + */ +object ZipWithN { + def create[A, O](zipper: function.Function[java.util.List[A], O], n: Int): Graph[UniformFanInShape[A, O], NotUsed] = { + import scala.collection.JavaConverters._ + scaladsl.ZipWithN[A, O](seq => zipper.apply(seq.asJava))(n) + } +} + /** * Takes a stream of pair elements and splits each pair to two output streams. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 4febe3a4d8..d9e06c548f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -272,11 +272,26 @@ object Source { */ def combine[T, U](first: Source[T, _ <: Any], second: Source[T, _ <: Any], rest: java.util.List[Source[T, _ <: Any]], strategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]]): Source[U, NotUsed] = { - import scala.collection.JavaConverters._ - val seq = if (rest != null) rest.asScala.map(_.asScala) else Seq() + val seq = if (rest != null) Util.immutableSeq(rest).map(_.asScala) else immutable.Seq() new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num ⇒ strategy.apply(num))) } + /** + * Combine the elements of multiple streams into a stream of lists. + */ + def zipN[T](sources: java.util.List[Source[T, _ <: Any]]): Source[java.util.List[T], NotUsed] = { + val seq = if (sources != null) Util.immutableSeq(sources).map(_.asScala) else immutable.Seq() + new Source(scaladsl.Source.zipN(seq).map(_.asJava)) + } + + /* + * Combine the elements of multiple streams into a stream of lists using a combiner function. + */ + def zipWithN[T, O](zipper: function.Function[java.util.List[T], O], sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = { + val seq = if (sources != null) Util.immutableSeq(sources).map(_.asScala) else immutable.Seq() + new Source(scaladsl.Source.zipWithN[T, O](seq => zipper.apply(seq.asJava))(seq)) + } + /** * Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]]. * You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index cbdc2d69dd..edd6419571 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -732,6 +732,102 @@ final class Unzip[A, B]() extends UnzipWith2[(A, B), A, B](ConstantFun.scalaIden */ object UnzipWith extends UnzipWithApply +object ZipN { + /** + * Create a new `ZipN`. + */ + def apply[A](n: Int) = new ZipN[A](n) +} + +/** + * Combine the elements of multiple streams into a stream of sequences. + * + * A `ZipN` has a `n` input ports and one `out` port + * + * '''Emits when''' all of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels + */ +final class ZipN[A](n: Int) extends ZipWithN[A, immutable.Seq[A]](ConstantFun.scalaIdentityFunction)(n) { + override def initialAttributes = DefaultAttributes.zipN + override def toString = "ZipN" +} + +object ZipWithN { + /** + * Create a new `ZipWithN`. + */ + def apply[A, O](zipper: immutable.Seq[A] => O)(n: Int) = new ZipWithN[A, O](zipper)(n) +} + +/** + * Combine the elements of multiple streams into a stream of sequences using a combiner function. + * + * A `ZipWithN` has a `n` input ports and one `out` port + * + * '''Emits when''' all of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels + */ +class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[UniformFanInShape[A, O]] { + override def initialAttributes = DefaultAttributes.zipWithN + override val shape = new UniformFanInShape[A, O](n) + def out = shape.out + val inSeq = shape.inSeq + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + var pending = 0 + // Without this field the completion signalling would take one extra pull + var willShutDown = false + + val grabInlet = grab[A] _ + val pullInlet = pull[A] _ + + private def pushAll(): Unit = { + push(out, zipper(inSeq.map(grabInlet))) + if (willShutDown) completeStage() + else inSeq.foreach(pullInlet) + } + + override def preStart(): Unit = { + inSeq.foreach(pullInlet) + } + + inSeq.foreach(in => { + setHandler(in, new InHandler { + override def onPush(): Unit = { + pending -= 1 + if (pending == 0) pushAll() + } + + override def onUpstreamFinish(): Unit = { + if (!isAvailable(in)) completeStage() + willShutDown = true + } + + }) + }) + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + pending += n + if (pending == 0) pushAll() + } + }) + } + + override def toString = "ZipWithN" + +} + object Concat { /** * Create a new `Concat`. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 9348a9f7fc..0b5beab5fc 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -413,6 +413,24 @@ object Source { combineRest(2, rest.iterator) }) + /** + * Combine the elements of multiple streams into a stream of sequences. + */ + def zipN[T](sources: immutable.Seq[Source[T, _]]): Source[immutable.Seq[T], NotUsed] = zipWithN(ConstantFun.scalaIdentityFunction[immutable.Seq[T]])(sources).addAttributes(DefaultAttributes.zipN) + + /* + * Combine the elements of multiple streams into a stream of sequences using a combiner function. + */ + def zipWithN[T, O](zipper: immutable.Seq[T] ⇒ O)(sources: immutable.Seq[Source[T, _]]): Source[O, NotUsed] = { + val source = sources match { + case immutable.Seq() ⇒ empty[O] + case immutable.Seq(source) ⇒ source.map(t ⇒ zipper(immutable.Seq(t))).mapMaterializedValue(_ ⇒ NotUsed) + case s1 +: s2 +: ss ⇒ combine(s1, s2, ss: _*)(ZipWithN(zipper)) + } + + source.addAttributes(DefaultAttributes.zipWithN) + } + /** * Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]]. * You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,