diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/BaseTwoStreamsSetup.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/BaseTwoStreamsSetup.scala new file mode 100644 index 0000000000..4e465ba822 --- /dev/null +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/BaseTwoStreamsSetup.scala @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.testkit + +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, Inlet, Outlet } +import akka.stream.scaladsl._ +import org.reactivestreams.Publisher +import scala.collection.immutable +import scala.util.control.NoStackTrace +import akka.stream.testkit.Utils._ + +abstract class BaseTwoStreamsSetup extends AkkaSpec { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = ActorMaterializer(settings) + + val TestException = new RuntimeException("test") with NoStackTrace + + type Outputs + + def setup(p1: Publisher[Int], p2: Publisher[Int]): TestSubscriber.Probe[Outputs] + + def failedPublisher[T]: Publisher[T] = TestPublisher.error[T](TestException) + + def completedPublisher[T]: Publisher[T] = TestPublisher.empty[T] + + def nonemptyPublisher[T](elems: immutable.Iterable[T]): Publisher[T] = Source(elems).runWith(Sink.publisher) + + def soonToFailPublisher[T]: Publisher[T] = TestPublisher.lazyError[T](TestException) + + def soonToCompletePublisher[T]: Publisher[T] = TestPublisher.lazyEmpty[T] + + def commonTests() = { + "work with two immediately completed publishers" in assertAllStagesStopped { + val subscriber = setup(completedPublisher, completedPublisher) + subscriber.expectSubscriptionAndComplete() + } + + "work with two delayed completed publishers" in assertAllStagesStopped { + val subscriber = setup(soonToCompletePublisher, soonToCompletePublisher) + subscriber.expectSubscriptionAndComplete() + } + + "work with one immediately completed and one delayed completed publisher" in assertAllStagesStopped { + val subscriber = setup(completedPublisher, soonToCompletePublisher) + subscriber.expectSubscriptionAndComplete() + } + + "work with two immediately failed publishers" in assertAllStagesStopped { + val subscriber = setup(failedPublisher, failedPublisher) + subscriber.expectSubscriptionAndError(TestException) + } + + "work with two delayed failed publishers" in assertAllStagesStopped { + val subscriber = setup(soonToFailPublisher, soonToFailPublisher) + subscriber.expectSubscriptionAndError(TestException) + } + + // Warning: The two test cases below are somewhat implementation specific and might fail if the implementation + // is changed. They are here to be an early warning though. + "work with one immediately failed and one delayed failed publisher (case 1)" in assertAllStagesStopped { + val subscriber = setup(soonToFailPublisher, failedPublisher) + subscriber.expectSubscriptionAndError(TestException) + } + + "work with one immediately failed and one delayed failed publisher (case 2)" in assertAllStagesStopped { + val subscriber = setup(failedPublisher, soonToFailPublisher) + subscriber.expectSubscriptionAndError(TestException) + } + } + +} diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala index ecdc2f6e69..8cc084ac9f 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala @@ -7,16 +7,7 @@ import scala.collection.immutable import scala.util.control.NoStackTrace import akka.stream.testkit.Utils._ -abstract class TwoStreamsSetup extends AkkaSpec { - - val settings = ActorMaterializerSettings(system) - .withInputBuffer(initialSize = 2, maxSize = 2) - - implicit val materializer = ActorMaterializer(settings) - - val TestException = new RuntimeException("test") with NoStackTrace - - type Outputs +abstract class TwoStreamsSetup extends BaseTwoStreamsSetup { abstract class Fixture(b: FlowGraph.Builder[_]) { def left: Inlet[Int] @@ -26,7 +17,7 @@ abstract class TwoStreamsSetup extends AkkaSpec { def fixture(b: FlowGraph.Builder[_]): Fixture - def setup(p1: Publisher[Int], p2: Publisher[Int]) = { + override def setup(p1: Publisher[Int], p2: Publisher[Int]) = { val subscriber = TestSubscriber.probe[Outputs]() FlowGraph.closed() { implicit b ⇒ import FlowGraph.Implicits._ @@ -41,53 +32,4 @@ abstract class TwoStreamsSetup extends AkkaSpec { subscriber } - def failedPublisher[T]: Publisher[T] = TestPublisher.error[T](TestException) - - def completedPublisher[T]: Publisher[T] = TestPublisher.empty[T] - - def nonemptyPublisher[T](elems: immutable.Iterable[T]): Publisher[T] = Source(elems).runWith(Sink.publisher) - - def soonToFailPublisher[T]: Publisher[T] = TestPublisher.lazyError[T](TestException) - - def soonToCompletePublisher[T]: Publisher[T] = TestPublisher.lazyEmpty[T] - - def commonTests() = { - "work with two immediately completed publishers" in assertAllStagesStopped { - val subscriber = setup(completedPublisher, completedPublisher) - subscriber.expectSubscriptionAndComplete() - } - - "work with two delayed completed publishers" in assertAllStagesStopped { - val subscriber = setup(soonToCompletePublisher, soonToCompletePublisher) - subscriber.expectSubscriptionAndComplete() - } - - "work with one immediately completed and one delayed completed publisher" in assertAllStagesStopped { - val subscriber = setup(completedPublisher, soonToCompletePublisher) - subscriber.expectSubscriptionAndComplete() - } - - "work with two immediately failed publishers" in assertAllStagesStopped { - val subscriber = setup(failedPublisher, failedPublisher) - subscriber.expectSubscriptionAndError(TestException) - } - - "work with two delayed failed publishers" in assertAllStagesStopped { - val subscriber = setup(soonToFailPublisher, soonToFailPublisher) - subscriber.expectSubscriptionAndError(TestException) - } - - // Warning: The two test cases below are somewhat implementation specific and might fail if the implementation - // is changed. They are here to be an early warning though. - "work with one immediately failed and one delayed failed publisher (case 1)" in assertAllStagesStopped { - val subscriber = setup(soonToFailPublisher, failedPublisher) - subscriber.expectSubscriptionAndError(TestException) - } - - "work with one immediately failed and one delayed failed publisher (case 2)" in assertAllStagesStopped { - val subscriber = setup(failedPublisher, soonToFailPublisher) - subscriber.expectSubscriptionAndError(TestException) - } - } - } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index ba770b2684..71e716159d 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -613,4 +613,59 @@ public class FlowTest extends StreamTest { } + @Test + public void mustBeAbleToUseZipWith() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList("A", "B", "C"); + final Iterable input2 = Arrays.asList("D", "E", "F"); + + Source.from(input1).via(Flow.of(String.class).zipWith(Source.from(input2), new Function2() { + public String apply(String s1, String s2) { + return s1 + "-" + s2; + } + })).runForeach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + + probe.expectMsgEquals("A-D"); + probe.expectMsgEquals("B-E"); + probe.expectMsgEquals("C-F"); + } + + @Test + public void mustBeAbleToUseZip2() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList("A", "B", "C"); + final Iterable input2 = Arrays.asList("D", "E", "F"); + + Source.from(input1).via(Flow.of(String.class).zip(Source.from(input2))) + .runForeach(new Procedure>() { + public void apply(Pair elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + + probe.expectMsgEquals(new Pair("A", "D")); + probe.expectMsgEquals(new Pair("B", "E")); + probe.expectMsgEquals(new Pair("C", "F")); + } + + @Test + public void mustBeAbleToUseMerge2() { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList("A", "B", "C"); + final Iterable input2 = Arrays.asList("D", "E", "F"); + + Source.from(input1).via(Flow.of(String.class).merge(Source.from(input2))) + .runForeach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + + probe.expectMsgAllOf("A", "B", "C", "D", "E", "F"); + } + } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 2fc6a29cd3..6058e72db0 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -584,4 +584,72 @@ public class SourceTest extends StreamTest { Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS)); } + @Test + public void mustBeAbleToUseMerge() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList("A", "B", "C"); + final Iterable input2 = Arrays.asList("D", "E", "F"); + + Source.from(input1).merge(Source.from(input2)).runForeach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + + probe.expectMsgAllOf("A", "B", "C", "D", "E", "F"); + } + + @Test + public void mustBeAbleToUseZipWith() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList("A", "B", "C"); + final Iterable input2 = Arrays.asList("D", "E", "F"); + + Source.from(input1).zipWith(Source.from(input2),new Function2(){ + public String apply(String s1,String s2){ + return s1+"-"+s2; + } + }).runForeach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + + probe.expectMsgEquals("A-D"); + probe.expectMsgEquals("B-E"); + probe.expectMsgEquals("C-F"); + } + + @Test + public void mustBeAbleToUseZip() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList("A", "B", "C"); + final Iterable input2 = Arrays.asList("D", "E", "F"); + + Source.from(input1).zip(Source.from(input2)).runForeach(new Procedure>() { + public void apply(Pair elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + + probe.expectMsgEquals(new Pair("A", "D")); + probe.expectMsgEquals(new Pair("B", "E")); + probe.expectMsgEquals(new Pair("C", "F")); + } + @Test + public void mustBeAbleToUseMerge2() { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList("A", "B", "C"); + final Iterable input2 = Arrays.asList("D", "E", "F"); + + Source.from(input1).merge(Source.from(input2)) + .runForeach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + + probe.expectMsgAllOf("A", "B", "C", "D", "E", "F"); + } + } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala new file mode 100644 index 0000000000..3c1d41d81e --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala @@ -0,0 +1,173 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.{ BaseTwoStreamsSetup, TestPublisher, TestSubscriber } +import org.reactivestreams.Publisher + +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Promise } + +class FlowConcatSpec extends BaseTwoStreamsSetup { + + override type Outputs = Int + + override def setup(p1: Publisher[Int], p2: Publisher[Int]) = { + val subscriber = TestSubscriber.probe[Outputs]() + Source(p1).concat(Source(p2)).runWith(Sink(subscriber)) + subscriber + } + + "A Concat for Flow " must { + + "be able to concat Flow with a Source" in { + val f1: Flow[Int, String, _] = Flow[Int].map(_.toString + "-s") + val s1: Source[Int, _] = Source(List(1, 2, 3)) + val s2: Source[String, _] = Source(List(4, 5, 6)).map(_.toString + "-s") + + val subs = TestSubscriber.manualProbe[Any]() + val subSink = Sink.publisher[Any] + + val (_, res) = f1.concat(s2).runWith(s1, subSink) + + res.subscribe(subs) + val sub = subs.expectSubscription() + sub.request(9) + (1 to 6).foreach(e ⇒ subs.expectNext(e.toString + "-s")) + subs.expectComplete() + } + + commonTests() + + "work with one immediately completed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) + val subscription1 = subscriber1.expectSubscription() + subscription1.request(5) + (1 to 4).foreach(subscriber1.expectNext) + subscriber1.expectComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher) + val subscription2 = subscriber2.expectSubscription() + subscription2.request(5) + (1 to 4).foreach(subscriber2.expectNext) + subscriber2.expectComplete() + } + + "work with one delayed completed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) + val subscription1 = subscriber1.expectSubscription() + subscription1.request(5) + (1 to 4).foreach(subscriber1.expectNext) + subscriber1.expectComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher) + val subscription2 = subscriber2.expectSubscription() + subscription2.request(5) + (1 to 4).foreach(subscriber2.expectNext) + subscriber2.expectComplete() + } + + "work with one immediately failed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndError(TestException) + + val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher) + subscriber2.expectSubscriptionAndError(TestException) + } + + "work with one nonempty and one delayed failed publisher" in assertAllStagesStopped { + val subscriber = setup(nonemptyPublisher(1 to 4), soonToFailPublisher) + subscriber.expectSubscription().request(5) + + val errorSignalled = (1 to 4).foldLeft(false)((errorSignalled, e) ⇒ + if (!errorSignalled) subscriber.expectNextOrError(1, TestException).isLeft else true) + if (!errorSignalled) subscriber.expectSubscriptionAndError(TestException) + } + + "work with one delayed failed and one nonempty publisher" in assertAllStagesStopped { + val subscriber = setup(soonToFailPublisher, nonemptyPublisher(1 to 4)) + subscriber.expectSubscription().request(5) + + val errorSignalled = (1 to 4).foldLeft(false)((errorSignalled, e) ⇒ + if (!errorSignalled) subscriber.expectNextOrError(1, TestException).isLeft else true) + if (!errorSignalled) subscriber.expectSubscriptionAndError(TestException) + } + + "correctly handle async errors in secondary upstream" in assertAllStagesStopped { + val promise = Promise[Int]() + val subscriber = TestSubscriber.manualProbe[Int]() + Source(List(1, 2, 3)).concat(Source(promise.future)).runWith(Sink(subscriber)) + + val subscription = subscriber.expectSubscription() + subscription.request(4) + (1 to 3).foreach(subscriber.expectNext) + promise.failure(TestException) + subscriber.expectError(TestException) + } + + "work with Source DSL" in { + val testSource = Source(1 to 5).concatMat(Source(6 to 10))(Keep.both).grouped(1000) + Await.result(testSource.runWith(Sink.head), 3.seconds) should ===(1 to 10) + + val runnable = testSource.toMat(Sink.ignore)(Keep.left) + val (m1, m2) = runnable.run() + m1.isInstanceOf[Unit] should be(true) + m2.isInstanceOf[Unit] should be(true) + + runnable.mapMaterializedValue((_) ⇒ "boo").run() should be("boo") + } + + "work with Flow DSL" in { + val testFlow = Flow[Int].concatMat(Source(6 to 10))(Keep.both).grouped(1000) + Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10) + + val runnable = Source(1 to 5).viaMat(testFlow)(Keep.both).to(Sink.ignore) + val (m1, (m2, m3)) = runnable.run() + m1.isInstanceOf[Unit] should be(true) + m2.isInstanceOf[Unit] should be(true) + m3.isInstanceOf[Unit] should be(true) + + runnable.mapMaterializedValue((_) ⇒ "boo").run() should be("boo") + } + + "work with Flow DSL2" in { + val testFlow = Flow[Int].concatMat(Source(6 to 10))(Keep.both).grouped(1000) + Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10) + + val sink = testFlow.concatMat(Source(1 to 5))(Keep.both).to(Sink.ignore).mapMaterializedValue[String] { + case ((m1, m2), m3) ⇒ + m1.isInstanceOf[Unit] should be(true) + m2.isInstanceOf[Unit] should be(true) + m3.isInstanceOf[Unit] should be(true) + "boo" + } + Source(10 to 15).runWith(sink) should be("boo") + } + + "subscribe at once to initial source and to one that it's concat to" in { + val publisher1 = TestPublisher.probe[Int]() + val publisher2 = TestPublisher.probe[Int]() + val probeSink = Source(publisher1).concat(Source(publisher2)) + .runWith(TestSink.probe[Int]) + + val sub1 = publisher1.expectSubscription() + val sub2 = publisher2.expectSubscription() + val subSink = probeSink.expectSubscription() + + sub1.sendNext(1) + subSink.request(1) + probeSink.expectNext(1) + sub1.sendComplete() + + sub2.sendNext(2) + subSink.request(1) + probeSink.expectNext(2) + sub2.sendComplete() + + probeSink.expectComplete() + } + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala new file mode 100644 index 0000000000..4941e05222 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.Utils._ +import akka.stream.testkit._ +import org.reactivestreams.{ Subscriber, Publisher } + +class FlowMergeSpec extends BaseTwoStreamsSetup { + + override type Outputs = Int + + override def setup(p1: Publisher[Int], p2: Publisher[Int]) = { + val subscriber = TestSubscriber.probe[Outputs]() + Source(p1).merge(Source(p2)).runWith(Sink(subscriber)) + subscriber + } + + "A Merge for Flow " must { + + "work in the happy case" in assertAllStagesStopped { + // Different input sizes (4 and 6) + val source1 = Source(0 to 3) + val source2 = Source(4 to 9) + val source3 = Source(List[Int]()) + val probe = TestSubscriber.manualProbe[Int]() + + Source(0 to 3).merge(Source(List[Int]())).merge(Source(4 to 9)) + .map(_ * 2).map(_ / 2).map(_ + 1).runWith(Sink(probe)) + + val subscription = probe.expectSubscription() + + var collected = Set.empty[Int] + for (_ ← 1 to 10) { + subscription.request(1) + collected += probe.expectNext() + } + + collected should be(Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + probe.expectComplete() + } + + commonTests() + + "work with one immediately completed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) + val subscription1 = subscriber1.expectSubscription() + subscription1.request(4) + (1 to 4).foreach(subscriber1.expectNext) + subscriber1.expectComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher) + val subscription2 = subscriber2.expectSubscription() + subscription2.request(4) + (1 to 4).foreach(subscriber2.expectNext) + subscriber2.expectComplete() + } + + "work with one delayed completed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) + val subscription1 = subscriber1.expectSubscription() + subscription1.request(4) + (1 to 4).foreach(subscriber1.expectNext) + subscriber1.expectComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher) + val subscription2 = subscriber2.expectSubscription() + subscription2.request(4) + (1 to 4).foreach(subscriber2.expectNext) + subscriber2.expectComplete() + } + + "work with one immediately failed and one nonempty publisher" in { + // This is nondeterministic, multiple scenarios can happen + pending + } + + "work with one delayed failed and one nonempty publisher" in { + // This is nondeterministic, multiple scenarios can happen + pending + } + + "pass along early cancellation" in assertAllStagesStopped { + val up1 = TestPublisher.manualProbe[Int]() + val up2 = TestPublisher.manualProbe[Int]() + val down = TestSubscriber.manualProbe[Int]() + + val (graphSubscriber1, graphSubscriber2) = Source.subscriber[Int] + .mergeMat(Source.subscriber[Int])((_, _)).toMat(Sink(down))(Keep.left).run + + val downstream = down.expectSubscription() + downstream.cancel() + up1.subscribe(graphSubscriber1) + up2.subscribe(graphSubscriber2) + up1.expectSubscription().expectCancellation() + up2.expectSubscription().expectCancellation() + } + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 6a76fd0b09..e64c550c49 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -289,81 +289,6 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val d3: Flow[String, (immutable.Seq[Apple], Source[Fruit, _]), _] = Flow[String].map(_ ⇒ new Apple).prefixAndTail(1) } - "be able to concat with a Source" in { - val f1: Flow[Int, String, _] = Flow[Int].map(_.toString + "-s") - val s1: Source[Int, _] = Source(List(1, 2, 3)) - val s2: Source[String, _] = Source(List(4, 5, 6)).map(_.toString + "-s") - - val subs = TestSubscriber.manualProbe[Any]() - val subSink = Sink.publisher[Any] - - val (_, res) = f1.concat(s2).runWith(s1, subSink) - - res.subscribe(subs) - val sub = subs.expectSubscription() - sub.request(9) - subs.expectNext("1-s") - subs.expectNext("2-s") - subs.expectNext("3-s") - subs.expectNext("4-s") - subs.expectNext("5-s") - subs.expectNext("6-s") - subs.expectComplete() - } - - "be able to concat with empty source" in { - val probe = Source.single(1).concat(Source.empty) - .runWith(TestSink.probe[Int]) - probe.request(1) - probe.expectNext(1) - probe.expectComplete() - } - - "be able to concat empty source" in { - val probe = Source.empty.concat(Source.single(1)) - .runWith(TestSink.probe[Int]) - probe.request(1) - probe.expectNext(1) - probe.expectComplete() - } - - "be able to concat two empty sources" in { - val probe = Source.empty.concat(Source.empty) - .runWith(TestSink.probe[Int]) - probe.expectSubscription() - probe.expectComplete() - } - - "be able to concat source with error" in { - val probe = Source.single(1).concat(Source.failed(TestException)) - .runWith(TestSink.probe[Int]) - probe.expectSubscription() - probe.expectError(TestException) - } - - "subscribe at once to initial source and to one that it's concat to" in { - val publisher1 = TestPublisher.probe[Int]() - val publisher2 = TestPublisher.probe[Int]() - val probeSink = Source.apply(publisher1).concat(Source.apply(publisher2)) - .runWith(TestSink.probe[Int]) - - val sub1 = publisher1.expectSubscription() - val sub2 = publisher2.expectSubscription() - val subSink = probeSink.expectSubscription() - - sub1.sendNext(1) - subSink.request(1) - probeSink.expectNext(1) - sub1.sendComplete() - - sub2.sendNext(2) - subSink.request(1) - probeSink.expectNext(2) - sub2.sendComplete() - - probeSink.expectComplete() - } - "be possible to convert to a processor, and should be able to take a Processor" in { val identity1 = Flow[Int].toProcessor val identity2 = Flow(() ⇒ identity1.run()) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala new file mode 100644 index 0000000000..345c4783b0 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.Utils._ +import akka.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber } +import org.reactivestreams.Publisher + +class FlowZipSpec extends BaseTwoStreamsSetup { + + override type Outputs = (Int, Int) + + override def setup(p1: Publisher[Int], p2: Publisher[Int]) = { + val subscriber = TestSubscriber.probe[Outputs]() + Source(p1).zip(Source(p2)).runWith(Sink(subscriber)) + subscriber + } + + "A Zip for Flow" must { + + "work in the happy case" in assertAllStagesStopped { + val probe = TestSubscriber.manualProbe[(Int, String)]() + Source(1 to 4).zip(Source(List("A", "B", "C", "D", "E", "F"))).runWith(Sink(probe)) + val subscription = probe.expectSubscription() + + subscription.request(2) + probe.expectNext((1, "A")) + probe.expectNext((2, "B")) + + subscription.request(1) + probe.expectNext((3, "C")) + subscription.request(1) + probe.expectNext((4, "D")) + + probe.expectComplete() + } + commonTests() + + "work with one immediately completed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher) + subscriber2.expectSubscriptionAndComplete() + } + + "work with one delayed completed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher) + subscriber2.expectSubscriptionAndComplete() + } + + "work with one immediately failed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndError(TestException) + + val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher) + subscriber2.expectSubscriptionAndError(TestException) + } + + "work with one delayed failed and one nonempty publisher" in assertAllStagesStopped { + val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndError(TestException) + + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher) + subscriber2.expectSubscriptionAndError(TestException) + } + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala new file mode 100644 index 0000000000..0f0a246696 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber } +import org.reactivestreams.Publisher +import scala.concurrent.duration._ + +class FlowZipWithSpec extends BaseTwoStreamsSetup { + + override type Outputs = Int + + override def setup(p1: Publisher[Int], p2: Publisher[Int]) = { + val subscriber = TestSubscriber.probe[Outputs]() + Source(p1).zipWith(Source(p2))(_ + _).runWith(Sink(subscriber)) + subscriber + } + + "A ZipWith for Flow " must { + + "work in the happy case" in { + val probe = TestSubscriber.manualProbe[Outputs]() + Source(1 to 4).zipWith(Source(10 to 40 by 10))((_: Int) + (_: Int)).runWith(Sink(probe)) + + val subscription = probe.expectSubscription() + + subscription.request(2) + probe.expectNext(11) + probe.expectNext(22) + + subscription.request(1) + probe.expectNext(33) + subscription.request(1) + probe.expectNext(44) + + probe.expectComplete() + } + + "work in the sad case" in { + val probe = TestSubscriber.manualProbe[Outputs]() + Source(1 to 4).zipWith(Source(-2 to 2))((_: Int) / (_: Int)).runWith(Sink(probe)) + val subscription = probe.expectSubscription() + + subscription.request(2) + probe.expectNext(1 / -2) + probe.expectNext(2 / -1) + + subscription.request(2) + probe.expectError() match { + case a: java.lang.ArithmeticException ⇒ a.getMessage should be("/ by zero") + } + probe.expectNoMsg(200.millis) + } + + commonTests() + + "work with one immediately completed and one nonempty publisher" in { + val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher) + subscriber2.expectSubscriptionAndComplete() + } + + "work with one delayed completed and one nonempty publisher" in { + val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndComplete() + + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher) + subscriber2.expectSubscriptionAndComplete() + } + + "work with one immediately failed and one nonempty publisher" in { + val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndError(TestException) + + val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher) + subscriber2.expectSubscriptionAndError(TestException) + } + + "work with one delayed failed and one nonempty publisher" in { + val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4)) + subscriber1.expectSubscriptionAndError(TestException) + + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher) + subscriber2.expectSubscriptionAndError(TestException) + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala index f444c395e7..77c615fdac 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala @@ -155,48 +155,5 @@ class GraphConcatSpec extends TwoStreamsSetup { promise.failure(TestException) subscriber.expectError(TestException) } - - "work with Source DSL" in { - val testSource = Source(1 to 5).concat(Source(6 to 10)).grouped(1000) - Await.result(testSource.runWith(Sink.head), 3.seconds) should ===(1 to 10) - - val runnable = testSource.toMat(Sink.ignore)(Keep.left) - val (m1, m2) = runnable.run() - m1.isInstanceOf[Unit] should be(true) - m2.isInstanceOf[Unit] should be(true) - - runnable.mapMaterializedValue((_) ⇒ "boo").run() should be("boo") - - } - - "work with Flow DSL" in { - val testFlow = Flow[Int].concat(Source(6 to 10)).grouped(1000) - Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10) - - val runnable = Source(1 to 5).viaMat(testFlow)(Keep.both).to(Sink.ignore) - val (m1, (m2, m3)) = runnable.run() - m1.isInstanceOf[Unit] should be(true) - m2.isInstanceOf[Unit] should be(true) - m3.isInstanceOf[Unit] should be(true) - - runnable.mapMaterializedValue((_) ⇒ "boo").run() should be("boo") - - } - - "work with Flow DSL2" in { - val testFlow = Flow[Int].concat(Source(6 to 10)).grouped(1000) - Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10) - - val sink = testFlow.concat(Source(1 to 5)).toMat(Sink.ignore)(Keep.left).mapMaterializedValue[String] { - case ((m1, m2), m3) ⇒ - m1.isInstanceOf[Unit] should be(true) - m2.isInstanceOf[Unit] should be(true) - m3.isInstanceOf[Unit] should be(true) - "boo" - } - - Source(10 to 15).runWith(sink) should be("boo") - - } } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 7e6e6a43e3..cdc5b0e3ac 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -4,17 +4,16 @@ package akka.stream.javadsl import akka.event.LoggingAdapter -import akka.stream._ -import akka.japi.Pair -import akka.japi.function -import akka.stream.scaladsl +import akka.japi.{ Pair, function } +import akka.stream.impl.StreamLayout +import akka.stream.{ scaladsl, _ } +import akka.stream.stage.Stage import org.reactivestreams.Processor + import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration -import akka.stream.stage.Stage -import akka.stream.impl.StreamLayout object Flow { @@ -782,25 +781,81 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph /** * Concatenate the given [[Source]] to this [[Flow]], meaning that once this * Flow’s input is exhausted and all result elements have been generated, - * the Source’s elements will be produced. Note that the Source is materialized - * together with this Flow and just kept from producing elements by asserting - * back-pressure until its time comes. + * the Source’s elements will be produced. * - * The resulting Flow’s materialized value is a Pair containing both materialized - * values (of this Flow and that Source). + * Note that the Source is materialized together with this Flow and just kept + * from producing elements by asserting back-pressure until its time comes. + * + * If this [[Flow]] gets upstream error - no elements from the source will be pulled. */ - def concat[M](source: Graph[SourceShape[Out @uncheckedVariance], M]): javadsl.Flow[In, Out, Mat @uncheckedVariance Pair M] = - new Flow(delegate.concat(source).mapMaterializedValue(p ⇒ Pair(p._1, p._2))) + def concat[T >: Out, M](source: Graph[SourceShape[T], M]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.concat(source)) /** * Concatenate the given [[Source]] to this [[Flow]], meaning that once this * Flow’s input is exhausted and all result elements have been generated, - * the Source’s elements will be produced. Note that the Source is materialized - * together with this Flow and just kept from producing elements by asserting - * back-pressure until its time comes. + * the Source’s elements will be produced. + * + * Note that the Source is materialized together with this Flow and just kept + * from producing elements by asserting back-pressure until its time comes. + * + * If this [[Flow]] gets upstream error - no elements from the source will be pulled. */ - def concatMat[M, M2](source: Graph[SourceShape[Out @uncheckedVariance], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] = - new Flow(delegate.concatMat(source)(combinerToScala(combine))) + def concatMat[T >: Out, M, M2](source: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = + new Flow(delegate.concatMat(source)(combinerToScala(matF))) + + /** + * Merge current [[Flow]] with the given [[Source]], taking elements as they arrive, + * picking randomly when several elements ready. + */ + def merge[T >: Out](source: Graph[SourceShape[T], _]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.merge(source)) + + /** + * Merge current [[Flow]] with the given [[Source]], taking elements as they arrive, + * picking randomly when several elements readt. + */ + def mergeMat[T >: Out, M, M2](source: Graph[SourceShape[T], M], + matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = + new Flow(delegate.mergeMat(source)(combinerToScala(matF))) + + /** + * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples. + */ + def zip[T](source: Graph[SourceShape[T], _]): javadsl.Flow[In, Out @uncheckedVariance Pair T, Mat] = + zipMat(source, Keep.left) + + /** + * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples. + */ + def zipMat[T, M, M2](source: Graph[SourceShape[T], M], + matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] = { + //we need this only to have Flow of javadsl.Pair + def block(builder: FlowGraph.Builder[M], + source: SourceShape[T]): Pair[Inlet[Out], Outlet[Pair[Out, T]]] = { + val zip: FanInShape2[Out, T, Out Pair T] = builder.graph(Zip.create[Out, T]) + builder.from(source).to(zip.in1) + new Pair(zip.in0, zip.out) + } + this.viaMat(Flow.factory.create(source, combinerToJava(block)), matF) + } + + /** + * Put together elements of current [[Flow]] and the given [[Source]] + * into a stream of combined elements using a combiner function. + */ + def zipWith[Out2, Out3](source: Graph[SourceShape[Out2], _], + combine: function.Function2[Out, Out2, Out3]): javadsl.Flow[In, Out3, Mat] = + new Flow(delegate.zipWith[Out2, Out3](source)(combinerToScala(combine))) + + /** + * Put together elements of current [[Flow]] and the given [[Source]] + * into a stream of combined elements using a combiner function. + */ + def zipWithMat[Out2, Out3, M, M2](source: Graph[SourceShape[Out2], M], + combine: function.Function2[Out, Out2, Out3], + matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out3, M2] = + new Flow(delegate.zipWithMat[Out2, Out3, M, M2](source)(combinerToScala(combine))(combinerToScala(matF))) override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] = new Flow(delegate.withAttributes(attr)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 7790853d5e..940f3c796f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -5,7 +5,7 @@ package akka.stream.javadsl import akka.actor.{ ActorRef, Cancellable, Props } import akka.event.LoggingAdapter -import akka.japi.{ Util, function } +import akka.japi.{ Pair, Util, function } import akka.stream._ import akka.stream.impl.StreamLayout import akka.stream.stage.Stage @@ -194,22 +194,6 @@ object Source { def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = new Source(scaladsl.Source.actorRef(bufferSize, overflowStrategy)) - /** - * Concatenates two sources so that the first element - * emitted by the second source is emitted after the last element of the first - * source. - */ - def concat[T, M1, M2](first: Graph[SourceShape[T], M1], second: Graph[SourceShape[T], M2]): Source[T, (M1, M2)] = - new Source(scaladsl.Source.concat(first, second)) - - /** - * Concatenates two sources so that the first element - * emitted by the second source is emitted after the last element of the first - * source. - */ - def concatMat[T, M1, M2, M3](first: Graph[SourceShape[T], M1], second: Graph[SourceShape[T], M2], combine: function.Function2[M1, M2, M3]): Source[T, M3] = - new Source(scaladsl.Source.concatMat(first, second)(combinerToScala(combine))) - /** * A graph with the shape of a source logically is a source, this method makes * it so also in type. @@ -320,20 +304,74 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour runWith(Sink.fold(zero, f), materializer) /** - * Concatenates a second source so that the first element - * emitted by that source is emitted after the last element of this - * source. + * Concatenate the second [[Source]] to current one, meaning that once current + * is exhausted and all result elements have been generated, + * the second Source’s elements will be produced. */ - def concat[Out2 >: Out, M2](second: Graph[SourceShape[Out2], M2]): javadsl.Source[Out2, (Mat, M2)] = - Source.concat(this, second) + def concat[T >: Out, M](second: Graph[SourceShape[T], M]): javadsl.Source[T, Mat] = + new Source(delegate.concat(second)) /** - * Concatenates a second source so that the first element - * emitted by that source is emitted after the last element of this - * source. + * Concatenate the second [[Source]] to current one, meaning that once current + * is exhausted and all result elements have been generated, + * the second Source’s elements will be produced. */ - def concatMat[M, M2](second: Graph[SourceShape[Out @uncheckedVariance], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] = - new Source(delegate.concatMat(second)(combinerToScala(combine))) + def concatMat[T >: Out, M, M2](second: Graph[SourceShape[T], M], + matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = + new Source(delegate.concatMat(second)(combinerToScala(matF))) + + /** + * Merge current source with the second one, taking elements as they arrive, + * picking randomly when several elements ready. + */ + def merge[T >: Out](second: Graph[SourceShape[T], _]): javadsl.Source[T, Mat] = + new Source(delegate.merge(second)) + + /** + * Merge current source with the second one, taking elements as they arrive, + * picking randomly when several elements ready. + */ + def mergeMat[T >: Out, M, M2](second: Graph[SourceShape[T], M], + matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = + new Source(delegate.mergeMat(second)(combinerToScala(matF))) + + /** + * Combine the elements of current [[Source]] and the second one into a stream of tuples. + */ + def zip[T](second: Graph[SourceShape[T], _]): javadsl.Source[Out @uncheckedVariance Pair T, Mat] = + zipMat(second, combinerToJava((a: Mat, b: Any) ⇒ a)) + + /** + * Combine the elements of current [[Source]] and the second one into a stream of tuples. + */ + def zipMat[T, M, M2](second: Graph[SourceShape[T], M], + matF: function.Function2[Mat, M, M2]): javadsl.Source[Out @uncheckedVariance Pair T, M2] = { + //we need this only to have Flow of javadsl.Pair + def block(builder: FlowGraph.Builder[M], + source: SourceShape[T]): Pair[Inlet[Out], Outlet[Pair[Out, T]]] = { + val zip: FanInShape2[Out, T, Out Pair T] = builder.graph(Zip.create[Out, T]) + builder.from(source).to(zip.in1) + new Pair(zip.in0, zip.out) + } + this.viaMat(Flow.factory.create(second, combinerToJava(block)), matF) + } + + /** + * Put together elements of current [[Source]] and the second one + * into a stream of combined elements using a combiner function. + */ + def zipWith[Out2, Out3](second: Graph[SourceShape[Out2], _], + combine: function.Function2[Out, Out2, Out3]): javadsl.Source[Out3, Mat] = + new Source(delegate.zipWith[Out2, Out3](second)(combinerToScala(combine))) + + /** + * Put together elements of current [[Source]] and the second one + * into a stream of combined elements using a combiner function. + */ + def zipWithMat[Out2, Out3, M, M2](second: Graph[SourceShape[Out2], M], + combine: function.Function2[Out, Out2, Out3], + matF: function.Function2[Mat, M, M2]): javadsl.Source[Out3, M2] = + new Source(delegate.zipWithMat[Out2, Out3, M, M2](second)(combinerToScala(combine))(combinerToScala(matF))) /** * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/package.scala b/akka-stream/src/main/scala/akka/stream/javadsl/package.scala index 6d38905c95..3b53a11122 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/package.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/package.scala @@ -20,4 +20,7 @@ package object javadsl { case other ⇒ other.apply _ } + def combinerToJava[M1, M2, M](f: (M1, M2) ⇒ M): akka.japi.function.Function2[M1, M2, M] = + new akka.japi.function.Function2[M1, M2, M] { def apply(m1: M1, m2: M2): M = f.apply(m1, m2) } + } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 1debdfa34f..c544f07332 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -9,10 +9,9 @@ import akka.stream._ import akka.stream.impl.SplitDecision._ import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } -import akka.stream.impl.fusing.{ DropWithin, TakeWithin, GroupedWithin } +import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin } import akka.stream.impl.{ Stages, StreamLayout } import akka.stream.stage._ -import akka.util.Collections.EmptyImmutableSeq import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import scala.annotation.unchecked.uncheckedVariance @@ -188,35 +187,6 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) .replaceShape(FlowShape(ins(1), outs.head))) } - /** - * Concatenate the given [[Source]] to this [[Flow]], meaning that once this - * Flow’s input is exhausted and all result elements have been generated, - * the Source’s elements will be produced. Note that the Source is materialized - * together with this Flow and just kept from producing elements by asserting - * back-pressure until its time comes. - * - * The resulting Flow’s materialized value is a Tuple2 containing both materialized - * values (of this Flow and that Source). - */ - def concat[Out2 >: Out, Mat2](source: Graph[SourceShape[Out2], Mat2]): Flow[In, Out2, (Mat, Mat2)] = - concatMat[Out2, Mat2, (Mat, Mat2)](source)(Keep.both) - - /** - * Concatenate the given [[Source]] to this [[Flow]], meaning that once this - * Flow’s input is exhausted and all result elements have been generated, - * the Source’s elements will be produced. Note that the Source is materialized - * together with this Flow and just kept from producing elements by asserting - * back-pressure until its time comes. - */ - def concatMat[Out2 >: Out, Mat2, Mat3](source: Graph[SourceShape[Out2], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Flow[In, Out2, Mat3] = - this.viaMat(Flow(source) { implicit builder ⇒ - s ⇒ - import FlowGraph.Implicits._ - val concat = builder.add(Concat[Out2]()) - s.outlet ~> concat.in(1) - (concat.in(0), concat.out) - })(combine) - /** INTERNAL API */ override private[stream] def andThen[U](op: StageModule): Repr[U, Mat] = { //No need to copy here, op is a fresh instance @@ -985,6 +955,127 @@ trait FlowOps[+Out, +Mat] { def log(name: String, extract: Out ⇒ Any = _identity)(implicit log: LoggingAdapter = null): Repr[Out, Mat] = andThen(Stages.Log(name, extract.asInstanceOf[Any ⇒ Any], Option(log))) + /** + * Combine the elements of current flow and given [[Source]] into a stream of tuples. + * + * '''Emits when''' all of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zip[U](source: Graph[SourceShape[U], _]): Repr[(Out, U), Mat] = zipMat(source)(Keep.left) + + /** + * Combine the elements of current flow and given [[Source]] into a stream of tuples. + */ + def zipMat[U, Mat2, Mat3](source: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): Repr[(Out, U), Mat3] = + this.viaMat(Flow(source) { implicit b ⇒ + r ⇒ + import FlowGraph.Implicits._ + val zip = b.add(Zip[Out, U]()) + r ~> zip.in1 + (zip.in0, zip.out) + })(matF) + + /** + * Put together the elements of current flow and given [[Source]] + * into a stream of combined elements using a combiner function. + * + * '''Emits when''' all of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' any upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipWith[Out2, Out3](source: Graph[SourceShape[Out2], _])(combine: (Out, Out2) ⇒ Out3): Repr[Out3, Mat] = + zipWithMat(source)(combine)(Keep.left) + + /** + * Put together the elements of current flow and given [[Source]] + * into a stream of combined elements using a combiner function. + */ + def zipWithMat[Out2, Out3, Mat2, Mat3](source: Graph[SourceShape[Out2], Mat2])(combine: (Out, Out2) ⇒ Out3)(matF: (Mat, Mat2) ⇒ Mat3): Repr[Out3, Mat3] = + this.viaMat(Flow(source) { implicit b ⇒ + r ⇒ + import FlowGraph.Implicits._ + val zip = b.add(ZipWith[Out, Out2, Out3](combine)) + r ~> zip.in1 + (zip.in0, zip.out) + })(matF) + + /** + * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, + * picking randomly when several elements ready. + * + * '''Emits when''' one of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstreams complete + * + * '''Cancels when''' downstream cancels + */ + def merge[U >: Out](source: Graph[SourceShape[U], _]): Repr[U, Mat] = + mergeMat(source)(Keep.left) + + /** + * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, + * picking randomly when several elements ready. + */ + def mergeMat[U >: Out, Mat2, Mat3](source: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): Repr[U, Mat3] = + this.viaMat(Flow(source) { implicit b ⇒ + r ⇒ + import FlowGraph.Implicits._ + val merge = b.add(Merge[U](2)) + r ~> merge.in(1) + (merge.in(0), merge.out) + })(matF) + + /** + * Concatenate the given [[Source]] to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. + * + * Note that the Source is materialized together with this Flow and just kept + * from producing elements by asserting back-pressure until its time comes. + * + * If this [[Flow]] gets upstream error - no elements from the source will be pulled. + * + * '''Emits when''' element is available from current stream or from second stream when current is completed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' second stream completes + * + * '''Cancels when''' downstream cancels + */ + def concat[U >: Out, Mat2](source: Graph[SourceShape[U], Mat2]): Repr[U, Mat] = + concatMat(source)(Keep.left) + + /** + * Concatenate the given [[Source]] to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. + * + * Note that the Source is materialized together with this Flow and just kept + * from producing elements by asserting back-pressure until its time comes. + * + * If this [[Flow]] gets upstream error - no elements from the source will be pulled. + */ + def concatMat[U >: Out, Mat2, Mat3](source: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): Repr[U, Mat3] = + this.viaMat(Flow(source) { implicit b ⇒ + r ⇒ + import FlowGraph.Implicits._ + val merge = b.add(Concat[U]()) + r ~> merge.in(1) + (merge.in(0), merge.out) + })(matF) + def withAttributes(attr: Attributes): Repr[Out, Mat] /** INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 124a002955..2b7f5c4eda 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -104,21 +104,6 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) */ def runForeach(f: Out ⇒ Unit)(implicit materializer: Materializer): Future[Unit] = runWith(Sink.foreach(f)) - /** - * Concatenates a second source so that the first element - * emitted by that source is emitted after the last element of this - * source. - */ - def concat[Out2 >: Out, M](second: Graph[SourceShape[Out2], M]): Source[Out2, (Mat, M)] = concatMat(second)(Keep.both) - - /** - * Concatenates a second source so that the first element - * emitted by that source is emitted after the last element of this - * source. - */ - def concatMat[Out2 >: Out, Mat2, Mat3](second: Graph[SourceShape[Out2], Mat2])( - combine: (Mat, Mat2) ⇒ Mat3): Source[Out2, Mat3] = Source.concatMat(this, second)(combine) - /** * Concatenates a second source so that the first element * emitted by that source is emitted after the last element of this @@ -126,7 +111,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) * * This is a shorthand for [[concat]] */ - def ++[Out2 >: Out, M](second: Graph[SourceShape[Out2], M]): Source[Out2, (Mat, M)] = concat(second) + def ++[Out2 >: Out, M](second: Graph[SourceShape[Out2], M]): Source[Out2, Mat] = concat(second) /** * Nests the current Source and returns a Source with the given Attributes @@ -301,30 +286,6 @@ object Source extends SourceApply { DefaultAttributes.failedSource, shape("FailedSource"))) - /** - * Concatenates two sources so that the first element - * emitted by the second source is emitted after the last element of the first - * source. - */ - def concat[T, Mat1, Mat2](source1: Graph[SourceShape[T], Mat1], source2: Graph[SourceShape[T], Mat2]): Source[T, (Mat1, Mat2)] = - concatMat(source1, source2)(Keep.both).withAttributes(DefaultAttributes.concatSource) - - /** - * Concatenates two sources so that the first element - * emitted by the second source is emitted after the last element of the first - * source. - */ - def concatMat[T, Mat1, Mat2, Mat3](source1: Graph[SourceShape[T], Mat1], source2: Graph[SourceShape[T], Mat2])( - combine: (Mat1, Mat2) ⇒ Mat3): Source[T, Mat3] = - wrap(FlowGraph.partial(source1, source2)(combine) { implicit b ⇒ - (s1, s2) ⇒ - import FlowGraph.Implicits._ - val c = b.add(Concat[T]()) - s1.outlet ~> c.in(0) - s2.outlet ~> c.in(1) - SourceShape(c.out) - }).withAttributes(DefaultAttributes.concatMatSource) - /** * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] */