+str #18411 add FlowOps.zip/zipWith/merge/concat operators

This commit is contained in:
Alexander Golubev 2015-09-21 08:10:45 -04:00
parent ecd6b9e825
commit 993e545e99
15 changed files with 901 additions and 294 deletions

View file

@ -0,0 +1,75 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.testkit
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, Inlet, Outlet }
import akka.stream.scaladsl._
import org.reactivestreams.Publisher
import scala.collection.immutable
import scala.util.control.NoStackTrace
import akka.stream.testkit.Utils._
abstract class BaseTwoStreamsSetup extends AkkaSpec {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 2)
implicit val materializer = ActorMaterializer(settings)
val TestException = new RuntimeException("test") with NoStackTrace
type Outputs
def setup(p1: Publisher[Int], p2: Publisher[Int]): TestSubscriber.Probe[Outputs]
def failedPublisher[T]: Publisher[T] = TestPublisher.error[T](TestException)
def completedPublisher[T]: Publisher[T] = TestPublisher.empty[T]
def nonemptyPublisher[T](elems: immutable.Iterable[T]): Publisher[T] = Source(elems).runWith(Sink.publisher)
def soonToFailPublisher[T]: Publisher[T] = TestPublisher.lazyError[T](TestException)
def soonToCompletePublisher[T]: Publisher[T] = TestPublisher.lazyEmpty[T]
def commonTests() = {
"work with two immediately completed publishers" in assertAllStagesStopped {
val subscriber = setup(completedPublisher, completedPublisher)
subscriber.expectSubscriptionAndComplete()
}
"work with two delayed completed publishers" in assertAllStagesStopped {
val subscriber = setup(soonToCompletePublisher, soonToCompletePublisher)
subscriber.expectSubscriptionAndComplete()
}
"work with one immediately completed and one delayed completed publisher" in assertAllStagesStopped {
val subscriber = setup(completedPublisher, soonToCompletePublisher)
subscriber.expectSubscriptionAndComplete()
}
"work with two immediately failed publishers" in assertAllStagesStopped {
val subscriber = setup(failedPublisher, failedPublisher)
subscriber.expectSubscriptionAndError(TestException)
}
"work with two delayed failed publishers" in assertAllStagesStopped {
val subscriber = setup(soonToFailPublisher, soonToFailPublisher)
subscriber.expectSubscriptionAndError(TestException)
}
// Warning: The two test cases below are somewhat implementation specific and might fail if the implementation
// is changed. They are here to be an early warning though.
"work with one immediately failed and one delayed failed publisher (case 1)" in assertAllStagesStopped {
val subscriber = setup(soonToFailPublisher, failedPublisher)
subscriber.expectSubscriptionAndError(TestException)
}
"work with one immediately failed and one delayed failed publisher (case 2)" in assertAllStagesStopped {
val subscriber = setup(failedPublisher, soonToFailPublisher)
subscriber.expectSubscriptionAndError(TestException)
}
}
}

View file

@ -7,16 +7,7 @@ import scala.collection.immutable
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
abstract class TwoStreamsSetup extends AkkaSpec { abstract class TwoStreamsSetup extends BaseTwoStreamsSetup {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 2)
implicit val materializer = ActorMaterializer(settings)
val TestException = new RuntimeException("test") with NoStackTrace
type Outputs
abstract class Fixture(b: FlowGraph.Builder[_]) { abstract class Fixture(b: FlowGraph.Builder[_]) {
def left: Inlet[Int] def left: Inlet[Int]
@ -26,7 +17,7 @@ abstract class TwoStreamsSetup extends AkkaSpec {
def fixture(b: FlowGraph.Builder[_]): Fixture def fixture(b: FlowGraph.Builder[_]): Fixture
def setup(p1: Publisher[Int], p2: Publisher[Int]) = { override def setup(p1: Publisher[Int], p2: Publisher[Int]) = {
val subscriber = TestSubscriber.probe[Outputs]() val subscriber = TestSubscriber.probe[Outputs]()
FlowGraph.closed() { implicit b FlowGraph.closed() { implicit b
import FlowGraph.Implicits._ import FlowGraph.Implicits._
@ -41,53 +32,4 @@ abstract class TwoStreamsSetup extends AkkaSpec {
subscriber subscriber
} }
def failedPublisher[T]: Publisher[T] = TestPublisher.error[T](TestException)
def completedPublisher[T]: Publisher[T] = TestPublisher.empty[T]
def nonemptyPublisher[T](elems: immutable.Iterable[T]): Publisher[T] = Source(elems).runWith(Sink.publisher)
def soonToFailPublisher[T]: Publisher[T] = TestPublisher.lazyError[T](TestException)
def soonToCompletePublisher[T]: Publisher[T] = TestPublisher.lazyEmpty[T]
def commonTests() = {
"work with two immediately completed publishers" in assertAllStagesStopped {
val subscriber = setup(completedPublisher, completedPublisher)
subscriber.expectSubscriptionAndComplete()
}
"work with two delayed completed publishers" in assertAllStagesStopped {
val subscriber = setup(soonToCompletePublisher, soonToCompletePublisher)
subscriber.expectSubscriptionAndComplete()
}
"work with one immediately completed and one delayed completed publisher" in assertAllStagesStopped {
val subscriber = setup(completedPublisher, soonToCompletePublisher)
subscriber.expectSubscriptionAndComplete()
}
"work with two immediately failed publishers" in assertAllStagesStopped {
val subscriber = setup(failedPublisher, failedPublisher)
subscriber.expectSubscriptionAndError(TestException)
}
"work with two delayed failed publishers" in assertAllStagesStopped {
val subscriber = setup(soonToFailPublisher, soonToFailPublisher)
subscriber.expectSubscriptionAndError(TestException)
}
// Warning: The two test cases below are somewhat implementation specific and might fail if the implementation
// is changed. They are here to be an early warning though.
"work with one immediately failed and one delayed failed publisher (case 1)" in assertAllStagesStopped {
val subscriber = setup(soonToFailPublisher, failedPublisher)
subscriber.expectSubscriptionAndError(TestException)
}
"work with one immediately failed and one delayed failed publisher (case 2)" in assertAllStagesStopped {
val subscriber = setup(failedPublisher, soonToFailPublisher)
subscriber.expectSubscriptionAndError(TestException)
}
}
} }

View file

@ -613,4 +613,59 @@ public class FlowTest extends StreamTest {
} }
@Test
public void mustBeAbleToUseZipWith() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<String> input1 = Arrays.asList("A", "B", "C");
final Iterable<String> input2 = Arrays.asList("D", "E", "F");
Source.from(input1).via(Flow.of(String.class).zipWith(Source.from(input2), new Function2<String, String, String>() {
public String apply(String s1, String s2) {
return s1 + "-" + s2;
}
})).runForeach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}, materializer);
probe.expectMsgEquals("A-D");
probe.expectMsgEquals("B-E");
probe.expectMsgEquals("C-F");
}
@Test
public void mustBeAbleToUseZip2() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<String> input1 = Arrays.asList("A", "B", "C");
final Iterable<String> input2 = Arrays.asList("D", "E", "F");
Source.from(input1).via(Flow.of(String.class).zip(Source.from(input2)))
.runForeach(new Procedure<Pair<String, String>>() {
public void apply(Pair<String, String> elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}, materializer);
probe.expectMsgEquals(new Pair<String,String>("A", "D"));
probe.expectMsgEquals(new Pair<String,String>("B", "E"));
probe.expectMsgEquals(new Pair<String,String>("C", "F"));
}
@Test
public void mustBeAbleToUseMerge2() {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<String> input1 = Arrays.asList("A", "B", "C");
final Iterable<String> input2 = Arrays.asList("D", "E", "F");
Source.from(input1).via(Flow.of(String.class).merge(Source.from(input2)))
.runForeach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}, materializer);
probe.expectMsgAllOf("A", "B", "C", "D", "E", "F");
}
} }

View file

@ -584,4 +584,72 @@ public class SourceTest extends StreamTest {
Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS)); Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS));
} }
@Test
public void mustBeAbleToUseMerge() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<String> input1 = Arrays.asList("A", "B", "C");
final Iterable<String> input2 = Arrays.asList("D", "E", "F");
Source.from(input1).merge(Source.from(input2)).runForeach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}, materializer);
probe.expectMsgAllOf("A", "B", "C", "D", "E", "F");
}
@Test
public void mustBeAbleToUseZipWith() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<String> input1 = Arrays.asList("A", "B", "C");
final Iterable<String> input2 = Arrays.asList("D", "E", "F");
Source.from(input1).zipWith(Source.from(input2),new Function2<String,String,String>(){
public String apply(String s1,String s2){
return s1+"-"+s2;
}
}).runForeach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}, materializer);
probe.expectMsgEquals("A-D");
probe.expectMsgEquals("B-E");
probe.expectMsgEquals("C-F");
}
@Test
public void mustBeAbleToUseZip() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<String> input1 = Arrays.asList("A", "B", "C");
final Iterable<String> input2 = Arrays.asList("D", "E", "F");
Source.from(input1).zip(Source.from(input2)).runForeach(new Procedure<Pair<String,String>>() {
public void apply(Pair<String,String> elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}, materializer);
probe.expectMsgEquals(new Pair<String,String>("A", "D"));
probe.expectMsgEquals(new Pair<String,String>("B", "E"));
probe.expectMsgEquals(new Pair<String,String>("C", "F"));
}
@Test
public void mustBeAbleToUseMerge2() {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<String> input1 = Arrays.asList("A", "B", "C");
final Iterable<String> input2 = Arrays.asList("D", "E", "F");
Source.from(input1).merge(Source.from(input2))
.runForeach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}, materializer);
probe.expectMsgAllOf("A", "B", "C", "D", "E", "F");
}
} }

View file

@ -0,0 +1,173 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ BaseTwoStreamsSetup, TestPublisher, TestSubscriber }
import org.reactivestreams.Publisher
import scala.concurrent.duration._
import scala.concurrent.{ Await, Promise }
class FlowConcatSpec extends BaseTwoStreamsSetup {
override type Outputs = Int
override def setup(p1: Publisher[Int], p2: Publisher[Int]) = {
val subscriber = TestSubscriber.probe[Outputs]()
Source(p1).concat(Source(p2)).runWith(Sink(subscriber))
subscriber
}
"A Concat for Flow " must {
"be able to concat Flow with a Source" in {
val f1: Flow[Int, String, _] = Flow[Int].map(_.toString + "-s")
val s1: Source[Int, _] = Source(List(1, 2, 3))
val s2: Source[String, _] = Source(List(4, 5, 6)).map(_.toString + "-s")
val subs = TestSubscriber.manualProbe[Any]()
val subSink = Sink.publisher[Any]
val (_, res) = f1.concat(s2).runWith(s1, subSink)
res.subscribe(subs)
val sub = subs.expectSubscription()
sub.request(9)
(1 to 6).foreach(e subs.expectNext(e.toString + "-s"))
subs.expectComplete()
}
commonTests()
"work with one immediately completed and one nonempty publisher" in assertAllStagesStopped {
val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4))
val subscription1 = subscriber1.expectSubscription()
subscription1.request(5)
(1 to 4).foreach(subscriber1.expectNext)
subscriber1.expectComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher)
val subscription2 = subscriber2.expectSubscription()
subscription2.request(5)
(1 to 4).foreach(subscriber2.expectNext)
subscriber2.expectComplete()
}
"work with one delayed completed and one nonempty publisher" in assertAllStagesStopped {
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4))
val subscription1 = subscriber1.expectSubscription()
subscription1.request(5)
(1 to 4).foreach(subscriber1.expectNext)
subscriber1.expectComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher)
val subscription2 = subscriber2.expectSubscription()
subscription2.request(5)
(1 to 4).foreach(subscriber2.expectNext)
subscriber2.expectComplete()
}
"work with one immediately failed and one nonempty publisher" in assertAllStagesStopped {
val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher)
subscriber2.expectSubscriptionAndError(TestException)
}
"work with one nonempty and one delayed failed publisher" in assertAllStagesStopped {
val subscriber = setup(nonemptyPublisher(1 to 4), soonToFailPublisher)
subscriber.expectSubscription().request(5)
val errorSignalled = (1 to 4).foldLeft(false)((errorSignalled, e)
if (!errorSignalled) subscriber.expectNextOrError(1, TestException).isLeft else true)
if (!errorSignalled) subscriber.expectSubscriptionAndError(TestException)
}
"work with one delayed failed and one nonempty publisher" in assertAllStagesStopped {
val subscriber = setup(soonToFailPublisher, nonemptyPublisher(1 to 4))
subscriber.expectSubscription().request(5)
val errorSignalled = (1 to 4).foldLeft(false)((errorSignalled, e)
if (!errorSignalled) subscriber.expectNextOrError(1, TestException).isLeft else true)
if (!errorSignalled) subscriber.expectSubscriptionAndError(TestException)
}
"correctly handle async errors in secondary upstream" in assertAllStagesStopped {
val promise = Promise[Int]()
val subscriber = TestSubscriber.manualProbe[Int]()
Source(List(1, 2, 3)).concat(Source(promise.future)).runWith(Sink(subscriber))
val subscription = subscriber.expectSubscription()
subscription.request(4)
(1 to 3).foreach(subscriber.expectNext)
promise.failure(TestException)
subscriber.expectError(TestException)
}
"work with Source DSL" in {
val testSource = Source(1 to 5).concatMat(Source(6 to 10))(Keep.both).grouped(1000)
Await.result(testSource.runWith(Sink.head), 3.seconds) should ===(1 to 10)
val runnable = testSource.toMat(Sink.ignore)(Keep.left)
val (m1, m2) = runnable.run()
m1.isInstanceOf[Unit] should be(true)
m2.isInstanceOf[Unit] should be(true)
runnable.mapMaterializedValue((_) "boo").run() should be("boo")
}
"work with Flow DSL" in {
val testFlow = Flow[Int].concatMat(Source(6 to 10))(Keep.both).grouped(1000)
Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10)
val runnable = Source(1 to 5).viaMat(testFlow)(Keep.both).to(Sink.ignore)
val (m1, (m2, m3)) = runnable.run()
m1.isInstanceOf[Unit] should be(true)
m2.isInstanceOf[Unit] should be(true)
m3.isInstanceOf[Unit] should be(true)
runnable.mapMaterializedValue((_) "boo").run() should be("boo")
}
"work with Flow DSL2" in {
val testFlow = Flow[Int].concatMat(Source(6 to 10))(Keep.both).grouped(1000)
Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10)
val sink = testFlow.concatMat(Source(1 to 5))(Keep.both).to(Sink.ignore).mapMaterializedValue[String] {
case ((m1, m2), m3)
m1.isInstanceOf[Unit] should be(true)
m2.isInstanceOf[Unit] should be(true)
m3.isInstanceOf[Unit] should be(true)
"boo"
}
Source(10 to 15).runWith(sink) should be("boo")
}
"subscribe at once to initial source and to one that it's concat to" in {
val publisher1 = TestPublisher.probe[Int]()
val publisher2 = TestPublisher.probe[Int]()
val probeSink = Source(publisher1).concat(Source(publisher2))
.runWith(TestSink.probe[Int])
val sub1 = publisher1.expectSubscription()
val sub2 = publisher2.expectSubscription()
val subSink = probeSink.expectSubscription()
sub1.sendNext(1)
subSink.request(1)
probeSink.expectNext(1)
sub1.sendComplete()
sub2.sendNext(2)
subSink.request(1)
probeSink.expectNext(2)
sub2.sendComplete()
probeSink.expectComplete()
}
}
}

View file

@ -0,0 +1,100 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import org.reactivestreams.{ Subscriber, Publisher }
class FlowMergeSpec extends BaseTwoStreamsSetup {
override type Outputs = Int
override def setup(p1: Publisher[Int], p2: Publisher[Int]) = {
val subscriber = TestSubscriber.probe[Outputs]()
Source(p1).merge(Source(p2)).runWith(Sink(subscriber))
subscriber
}
"A Merge for Flow " must {
"work in the happy case" in assertAllStagesStopped {
// Different input sizes (4 and 6)
val source1 = Source(0 to 3)
val source2 = Source(4 to 9)
val source3 = Source(List[Int]())
val probe = TestSubscriber.manualProbe[Int]()
Source(0 to 3).merge(Source(List[Int]())).merge(Source(4 to 9))
.map(_ * 2).map(_ / 2).map(_ + 1).runWith(Sink(probe))
val subscription = probe.expectSubscription()
var collected = Set.empty[Int]
for (_ 1 to 10) {
subscription.request(1)
collected += probe.expectNext()
}
collected should be(Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
probe.expectComplete()
}
commonTests()
"work with one immediately completed and one nonempty publisher" in assertAllStagesStopped {
val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4))
val subscription1 = subscriber1.expectSubscription()
subscription1.request(4)
(1 to 4).foreach(subscriber1.expectNext)
subscriber1.expectComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher)
val subscription2 = subscriber2.expectSubscription()
subscription2.request(4)
(1 to 4).foreach(subscriber2.expectNext)
subscriber2.expectComplete()
}
"work with one delayed completed and one nonempty publisher" in assertAllStagesStopped {
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4))
val subscription1 = subscriber1.expectSubscription()
subscription1.request(4)
(1 to 4).foreach(subscriber1.expectNext)
subscriber1.expectComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher)
val subscription2 = subscriber2.expectSubscription()
subscription2.request(4)
(1 to 4).foreach(subscriber2.expectNext)
subscriber2.expectComplete()
}
"work with one immediately failed and one nonempty publisher" in {
// This is nondeterministic, multiple scenarios can happen
pending
}
"work with one delayed failed and one nonempty publisher" in {
// This is nondeterministic, multiple scenarios can happen
pending
}
"pass along early cancellation" in assertAllStagesStopped {
val up1 = TestPublisher.manualProbe[Int]()
val up2 = TestPublisher.manualProbe[Int]()
val down = TestSubscriber.manualProbe[Int]()
val (graphSubscriber1, graphSubscriber2) = Source.subscriber[Int]
.mergeMat(Source.subscriber[Int])((_, _)).toMat(Sink(down))(Keep.left).run
val downstream = down.expectSubscription()
downstream.cancel()
up1.subscribe(graphSubscriber1)
up2.subscribe(graphSubscriber2)
up1.expectSubscription().expectCancellation()
up2.expectSubscription().expectCancellation()
}
}
}

View file

@ -289,81 +289,6 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val d3: Flow[String, (immutable.Seq[Apple], Source[Fruit, _]), _] = Flow[String].map(_ new Apple).prefixAndTail(1) val d3: Flow[String, (immutable.Seq[Apple], Source[Fruit, _]), _] = Flow[String].map(_ new Apple).prefixAndTail(1)
} }
"be able to concat with a Source" in {
val f1: Flow[Int, String, _] = Flow[Int].map(_.toString + "-s")
val s1: Source[Int, _] = Source(List(1, 2, 3))
val s2: Source[String, _] = Source(List(4, 5, 6)).map(_.toString + "-s")
val subs = TestSubscriber.manualProbe[Any]()
val subSink = Sink.publisher[Any]
val (_, res) = f1.concat(s2).runWith(s1, subSink)
res.subscribe(subs)
val sub = subs.expectSubscription()
sub.request(9)
subs.expectNext("1-s")
subs.expectNext("2-s")
subs.expectNext("3-s")
subs.expectNext("4-s")
subs.expectNext("5-s")
subs.expectNext("6-s")
subs.expectComplete()
}
"be able to concat with empty source" in {
val probe = Source.single(1).concat(Source.empty)
.runWith(TestSink.probe[Int])
probe.request(1)
probe.expectNext(1)
probe.expectComplete()
}
"be able to concat empty source" in {
val probe = Source.empty.concat(Source.single(1))
.runWith(TestSink.probe[Int])
probe.request(1)
probe.expectNext(1)
probe.expectComplete()
}
"be able to concat two empty sources" in {
val probe = Source.empty.concat(Source.empty)
.runWith(TestSink.probe[Int])
probe.expectSubscription()
probe.expectComplete()
}
"be able to concat source with error" in {
val probe = Source.single(1).concat(Source.failed(TestException))
.runWith(TestSink.probe[Int])
probe.expectSubscription()
probe.expectError(TestException)
}
"subscribe at once to initial source and to one that it's concat to" in {
val publisher1 = TestPublisher.probe[Int]()
val publisher2 = TestPublisher.probe[Int]()
val probeSink = Source.apply(publisher1).concat(Source.apply(publisher2))
.runWith(TestSink.probe[Int])
val sub1 = publisher1.expectSubscription()
val sub2 = publisher2.expectSubscription()
val subSink = probeSink.expectSubscription()
sub1.sendNext(1)
subSink.request(1)
probeSink.expectNext(1)
sub1.sendComplete()
sub2.sendNext(2)
subSink.request(1)
probeSink.expectNext(2)
sub2.sendComplete()
probeSink.expectComplete()
}
"be possible to convert to a processor, and should be able to take a Processor" in { "be possible to convert to a processor, and should be able to take a Processor" in {
val identity1 = Flow[Int].toProcessor val identity1 = Flow[Int].toProcessor
val identity2 = Flow(() identity1.run()) val identity2 = Flow(() identity1.run())

View file

@ -0,0 +1,72 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.Utils._
import akka.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber }
import org.reactivestreams.Publisher
class FlowZipSpec extends BaseTwoStreamsSetup {
override type Outputs = (Int, Int)
override def setup(p1: Publisher[Int], p2: Publisher[Int]) = {
val subscriber = TestSubscriber.probe[Outputs]()
Source(p1).zip(Source(p2)).runWith(Sink(subscriber))
subscriber
}
"A Zip for Flow" must {
"work in the happy case" in assertAllStagesStopped {
val probe = TestSubscriber.manualProbe[(Int, String)]()
Source(1 to 4).zip(Source(List("A", "B", "C", "D", "E", "F"))).runWith(Sink(probe))
val subscription = probe.expectSubscription()
subscription.request(2)
probe.expectNext((1, "A"))
probe.expectNext((2, "B"))
subscription.request(1)
probe.expectNext((3, "C"))
subscription.request(1)
probe.expectNext((4, "D"))
probe.expectComplete()
}
commonTests()
"work with one immediately completed and one nonempty publisher" in assertAllStagesStopped {
val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher)
subscriber2.expectSubscriptionAndComplete()
}
"work with one delayed completed and one nonempty publisher" in assertAllStagesStopped {
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher)
subscriber2.expectSubscriptionAndComplete()
}
"work with one immediately failed and one nonempty publisher" in assertAllStagesStopped {
val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher)
subscriber2.expectSubscriptionAndError(TestException)
}
"work with one delayed failed and one nonempty publisher" in assertAllStagesStopped {
val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher)
subscriber2.expectSubscriptionAndError(TestException)
}
}
}

View file

@ -0,0 +1,92 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber }
import org.reactivestreams.Publisher
import scala.concurrent.duration._
class FlowZipWithSpec extends BaseTwoStreamsSetup {
override type Outputs = Int
override def setup(p1: Publisher[Int], p2: Publisher[Int]) = {
val subscriber = TestSubscriber.probe[Outputs]()
Source(p1).zipWith(Source(p2))(_ + _).runWith(Sink(subscriber))
subscriber
}
"A ZipWith for Flow " must {
"work in the happy case" in {
val probe = TestSubscriber.manualProbe[Outputs]()
Source(1 to 4).zipWith(Source(10 to 40 by 10))((_: Int) + (_: Int)).runWith(Sink(probe))
val subscription = probe.expectSubscription()
subscription.request(2)
probe.expectNext(11)
probe.expectNext(22)
subscription.request(1)
probe.expectNext(33)
subscription.request(1)
probe.expectNext(44)
probe.expectComplete()
}
"work in the sad case" in {
val probe = TestSubscriber.manualProbe[Outputs]()
Source(1 to 4).zipWith(Source(-2 to 2))((_: Int) / (_: Int)).runWith(Sink(probe))
val subscription = probe.expectSubscription()
subscription.request(2)
probe.expectNext(1 / -2)
probe.expectNext(2 / -1)
subscription.request(2)
probe.expectError() match {
case a: java.lang.ArithmeticException a.getMessage should be("/ by zero")
}
probe.expectNoMsg(200.millis)
}
commonTests()
"work with one immediately completed and one nonempty publisher" in {
val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher)
subscriber2.expectSubscriptionAndComplete()
}
"work with one delayed completed and one nonempty publisher" in {
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher)
subscriber2.expectSubscriptionAndComplete()
}
"work with one immediately failed and one nonempty publisher" in {
val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher)
subscriber2.expectSubscriptionAndError(TestException)
}
"work with one delayed failed and one nonempty publisher" in {
val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher)
subscriber2.expectSubscriptionAndError(TestException)
}
}
}

View file

@ -155,48 +155,5 @@ class GraphConcatSpec extends TwoStreamsSetup {
promise.failure(TestException) promise.failure(TestException)
subscriber.expectError(TestException) subscriber.expectError(TestException)
} }
"work with Source DSL" in {
val testSource = Source(1 to 5).concat(Source(6 to 10)).grouped(1000)
Await.result(testSource.runWith(Sink.head), 3.seconds) should ===(1 to 10)
val runnable = testSource.toMat(Sink.ignore)(Keep.left)
val (m1, m2) = runnable.run()
m1.isInstanceOf[Unit] should be(true)
m2.isInstanceOf[Unit] should be(true)
runnable.mapMaterializedValue((_) "boo").run() should be("boo")
}
"work with Flow DSL" in {
val testFlow = Flow[Int].concat(Source(6 to 10)).grouped(1000)
Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10)
val runnable = Source(1 to 5).viaMat(testFlow)(Keep.both).to(Sink.ignore)
val (m1, (m2, m3)) = runnable.run()
m1.isInstanceOf[Unit] should be(true)
m2.isInstanceOf[Unit] should be(true)
m3.isInstanceOf[Unit] should be(true)
runnable.mapMaterializedValue((_) "boo").run() should be("boo")
}
"work with Flow DSL2" in {
val testFlow = Flow[Int].concat(Source(6 to 10)).grouped(1000)
Await.result(Source(1 to 5).viaMat(testFlow)(Keep.both).runWith(Sink.head), 3.seconds) should ===(1 to 10)
val sink = testFlow.concat(Source(1 to 5)).toMat(Sink.ignore)(Keep.left).mapMaterializedValue[String] {
case ((m1, m2), m3)
m1.isInstanceOf[Unit] should be(true)
m2.isInstanceOf[Unit] should be(true)
m3.isInstanceOf[Unit] should be(true)
"boo"
}
Source(10 to 15).runWith(sink) should be("boo")
}
} }
} }

View file

@ -4,17 +4,16 @@
package akka.stream.javadsl package akka.stream.javadsl
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.stream._ import akka.japi.{ Pair, function }
import akka.japi.Pair import akka.stream.impl.StreamLayout
import akka.japi.function import akka.stream.{ scaladsl, _ }
import akka.stream.scaladsl import akka.stream.stage.Stage
import org.reactivestreams.Processor import org.reactivestreams.Processor
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.stream.stage.Stage
import akka.stream.impl.StreamLayout
object Flow { object Flow {
@ -782,25 +781,81 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
/** /**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this * Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated, * Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced. Note that the Source is materialized * the Sources elements will be produced.
* together with this Flow and just kept from producing elements by asserting
* back-pressure until its time comes.
* *
* The resulting Flows materialized value is a Pair containing both materialized * Note that the Source is materialized together with this Flow and just kept
* values (of this Flow and that Source). * from producing elements by asserting back-pressure until its time comes.
*
* If this [[Flow]] gets upstream error - no elements from the source will be pulled.
*/ */
def concat[M](source: Graph[SourceShape[Out @uncheckedVariance], M]): javadsl.Flow[In, Out, Mat @uncheckedVariance Pair M] = def concat[T >: Out, M](source: Graph[SourceShape[T], M]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.concat(source).mapMaterializedValue(p Pair(p._1, p._2))) new Flow(delegate.concat(source))
/** /**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this * Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated, * Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced. Note that the Source is materialized * the Sources elements will be produced.
* together with this Flow and just kept from producing elements by asserting *
* back-pressure until its time comes. * Note that the Source is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If this [[Flow]] gets upstream error - no elements from the source will be pulled.
*/ */
def concatMat[M, M2](source: Graph[SourceShape[Out @uncheckedVariance], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] = def concatMat[T >: Out, M, M2](source: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
new Flow(delegate.concatMat(source)(combinerToScala(combine))) new Flow(delegate.concatMat(source)(combinerToScala(matF)))
/**
* Merge current [[Flow]] with the given [[Source]], taking elements as they arrive,
* picking randomly when several elements ready.
*/
def merge[T >: Out](source: Graph[SourceShape[T], _]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.merge(source))
/**
* Merge current [[Flow]] with the given [[Source]], taking elements as they arrive,
* picking randomly when several elements readt.
*/
def mergeMat[T >: Out, M, M2](source: Graph[SourceShape[T], M],
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
new Flow(delegate.mergeMat(source)(combinerToScala(matF)))
/**
* Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples.
*/
def zip[T](source: Graph[SourceShape[T], _]): javadsl.Flow[In, Out @uncheckedVariance Pair T, Mat] =
zipMat(source, Keep.left)
/**
* Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples.
*/
def zipMat[T, M, M2](source: Graph[SourceShape[T], M],
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] = {
//we need this only to have Flow of javadsl.Pair
def block(builder: FlowGraph.Builder[M],
source: SourceShape[T]): Pair[Inlet[Out], Outlet[Pair[Out, T]]] = {
val zip: FanInShape2[Out, T, Out Pair T] = builder.graph(Zip.create[Out, T])
builder.from(source).to(zip.in1)
new Pair(zip.in0, zip.out)
}
this.viaMat(Flow.factory.create(source, combinerToJava(block)), matF)
}
/**
* Put together elements of current [[Flow]] and the given [[Source]]
* into a stream of combined elements using a combiner function.
*/
def zipWith[Out2, Out3](source: Graph[SourceShape[Out2], _],
combine: function.Function2[Out, Out2, Out3]): javadsl.Flow[In, Out3, Mat] =
new Flow(delegate.zipWith[Out2, Out3](source)(combinerToScala(combine)))
/**
* Put together elements of current [[Flow]] and the given [[Source]]
* into a stream of combined elements using a combiner function.
*/
def zipWithMat[Out2, Out3, M, M2](source: Graph[SourceShape[Out2], M],
combine: function.Function2[Out, Out2, Out3],
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out3, M2] =
new Flow(delegate.zipWithMat[Out2, Out3, M, M2](source)(combinerToScala(combine))(combinerToScala(matF)))
override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] = override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.withAttributes(attr)) new Flow(delegate.withAttributes(attr))

View file

@ -5,7 +5,7 @@ package akka.stream.javadsl
import akka.actor.{ ActorRef, Cancellable, Props } import akka.actor.{ ActorRef, Cancellable, Props }
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.japi.{ Util, function } import akka.japi.{ Pair, Util, function }
import akka.stream._ import akka.stream._
import akka.stream.impl.StreamLayout import akka.stream.impl.StreamLayout
import akka.stream.stage.Stage import akka.stream.stage.Stage
@ -194,22 +194,6 @@ object Source {
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] =
new Source(scaladsl.Source.actorRef(bufferSize, overflowStrategy)) new Source(scaladsl.Source.actorRef(bufferSize, overflowStrategy))
/**
* Concatenates two sources so that the first element
* emitted by the second source is emitted after the last element of the first
* source.
*/
def concat[T, M1, M2](first: Graph[SourceShape[T], M1], second: Graph[SourceShape[T], M2]): Source[T, (M1, M2)] =
new Source(scaladsl.Source.concat(first, second))
/**
* Concatenates two sources so that the first element
* emitted by the second source is emitted after the last element of the first
* source.
*/
def concatMat[T, M1, M2, M3](first: Graph[SourceShape[T], M1], second: Graph[SourceShape[T], M2], combine: function.Function2[M1, M2, M3]): Source[T, M3] =
new Source(scaladsl.Source.concatMat(first, second)(combinerToScala(combine)))
/** /**
* A graph with the shape of a source logically is a source, this method makes * A graph with the shape of a source logically is a source, this method makes
* it so also in type. * it so also in type.
@ -320,20 +304,74 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
runWith(Sink.fold(zero, f), materializer) runWith(Sink.fold(zero, f), materializer)
/** /**
* Concatenates a second source so that the first element * Concatenate the second [[Source]] to current one, meaning that once current
* emitted by that source is emitted after the last element of this * is exhausted and all result elements have been generated,
* source. * the second Sources elements will be produced.
*/ */
def concat[Out2 >: Out, M2](second: Graph[SourceShape[Out2], M2]): javadsl.Source[Out2, (Mat, M2)] = def concat[T >: Out, M](second: Graph[SourceShape[T], M]): javadsl.Source[T, Mat] =
Source.concat(this, second) new Source(delegate.concat(second))
/** /**
* Concatenates a second source so that the first element * Concatenate the second [[Source]] to current one, meaning that once current
* emitted by that source is emitted after the last element of this * is exhausted and all result elements have been generated,
* source. * the second Sources elements will be produced.
*/ */
def concatMat[M, M2](second: Graph[SourceShape[Out @uncheckedVariance], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] = def concatMat[T >: Out, M, M2](second: Graph[SourceShape[T], M],
new Source(delegate.concatMat(second)(combinerToScala(combine))) matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
new Source(delegate.concatMat(second)(combinerToScala(matF)))
/**
* Merge current source with the second one, taking elements as they arrive,
* picking randomly when several elements ready.
*/
def merge[T >: Out](second: Graph[SourceShape[T], _]): javadsl.Source[T, Mat] =
new Source(delegate.merge(second))
/**
* Merge current source with the second one, taking elements as they arrive,
* picking randomly when several elements ready.
*/
def mergeMat[T >: Out, M, M2](second: Graph[SourceShape[T], M],
matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
new Source(delegate.mergeMat(second)(combinerToScala(matF)))
/**
* Combine the elements of current [[Source]] and the second one into a stream of tuples.
*/
def zip[T](second: Graph[SourceShape[T], _]): javadsl.Source[Out @uncheckedVariance Pair T, Mat] =
zipMat(second, combinerToJava((a: Mat, b: Any) a))
/**
* Combine the elements of current [[Source]] and the second one into a stream of tuples.
*/
def zipMat[T, M, M2](second: Graph[SourceShape[T], M],
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out @uncheckedVariance Pair T, M2] = {
//we need this only to have Flow of javadsl.Pair
def block(builder: FlowGraph.Builder[M],
source: SourceShape[T]): Pair[Inlet[Out], Outlet[Pair[Out, T]]] = {
val zip: FanInShape2[Out, T, Out Pair T] = builder.graph(Zip.create[Out, T])
builder.from(source).to(zip.in1)
new Pair(zip.in0, zip.out)
}
this.viaMat(Flow.factory.create(second, combinerToJava(block)), matF)
}
/**
* Put together elements of current [[Source]] and the second one
* into a stream of combined elements using a combiner function.
*/
def zipWith[Out2, Out3](second: Graph[SourceShape[Out2], _],
combine: function.Function2[Out, Out2, Out3]): javadsl.Source[Out3, Mat] =
new Source(delegate.zipWith[Out2, Out3](second)(combinerToScala(combine)))
/**
* Put together elements of current [[Source]] and the second one
* into a stream of combined elements using a combiner function.
*/
def zipWithMat[Out2, Out3, M, M2](second: Graph[SourceShape[Out2], M],
combine: function.Function2[Out, Out2, Out3],
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out3, M2] =
new Source(delegate.zipWithMat[Out2, Out3, M, M2](second)(combinerToScala(combine))(combinerToScala(matF)))
/** /**
* Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked

View file

@ -20,4 +20,7 @@ package object javadsl {
case other other.apply _ case other other.apply _
} }
def combinerToJava[M1, M2, M](f: (M1, M2) M): akka.japi.function.Function2[M1, M2, M] =
new akka.japi.function.Function2[M1, M2, M] { def apply(m1: M1, m2: M2): M = f.apply(m1, m2) }
} }

View file

@ -9,10 +9,9 @@ import akka.stream._
import akka.stream.impl.SplitDecision._ import akka.stream.impl.SplitDecision._
import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule } import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream.impl.fusing.{ DropWithin, TakeWithin, GroupedWithin } import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin }
import akka.stream.impl.{ Stages, StreamLayout } import akka.stream.impl.{ Stages, StreamLayout }
import akka.stream.stage._ import akka.stream.stage._
import akka.util.Collections.EmptyImmutableSeq
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
@ -188,35 +187,6 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
.replaceShape(FlowShape(ins(1), outs.head))) .replaceShape(FlowShape(ins(1), outs.head)))
} }
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced. Note that the Source is materialized
* together with this Flow and just kept from producing elements by asserting
* back-pressure until its time comes.
*
* The resulting Flows materialized value is a Tuple2 containing both materialized
* values (of this Flow and that Source).
*/
def concat[Out2 >: Out, Mat2](source: Graph[SourceShape[Out2], Mat2]): Flow[In, Out2, (Mat, Mat2)] =
concatMat[Out2, Mat2, (Mat, Mat2)](source)(Keep.both)
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced. Note that the Source is materialized
* together with this Flow and just kept from producing elements by asserting
* back-pressure until its time comes.
*/
def concatMat[Out2 >: Out, Mat2, Mat3](source: Graph[SourceShape[Out2], Mat2])(combine: (Mat, Mat2) Mat3): Flow[In, Out2, Mat3] =
this.viaMat(Flow(source) { implicit builder
s
import FlowGraph.Implicits._
val concat = builder.add(Concat[Out2]())
s.outlet ~> concat.in(1)
(concat.in(0), concat.out)
})(combine)
/** INTERNAL API */ /** INTERNAL API */
override private[stream] def andThen[U](op: StageModule): Repr[U, Mat] = { override private[stream] def andThen[U](op: StageModule): Repr[U, Mat] = {
//No need to copy here, op is a fresh instance //No need to copy here, op is a fresh instance
@ -985,6 +955,127 @@ trait FlowOps[+Out, +Mat] {
def log(name: String, extract: Out Any = _identity)(implicit log: LoggingAdapter = null): Repr[Out, Mat] = def log(name: String, extract: Out Any = _identity)(implicit log: LoggingAdapter = null): Repr[Out, Mat] =
andThen(Stages.Log(name, extract.asInstanceOf[Any Any], Option(log))) andThen(Stages.Log(name, extract.asInstanceOf[Any Any], Option(log)))
/**
* Combine the elements of current flow and given [[Source]] into a stream of tuples.
*
* '''Emits when''' all of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zip[U](source: Graph[SourceShape[U], _]): Repr[(Out, U), Mat] = zipMat(source)(Keep.left)
/**
* Combine the elements of current flow and given [[Source]] into a stream of tuples.
*/
def zipMat[U, Mat2, Mat3](source: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): Repr[(Out, U), Mat3] =
this.viaMat(Flow(source) { implicit b
r
import FlowGraph.Implicits._
val zip = b.add(Zip[Out, U]())
r ~> zip.in1
(zip.in0, zip.out)
})(matF)
/**
* Put together the elements of current flow and given [[Source]]
* into a stream of combined elements using a combiner function.
*
* '''Emits when''' all of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipWith[Out2, Out3](source: Graph[SourceShape[Out2], _])(combine: (Out, Out2) Out3): Repr[Out3, Mat] =
zipWithMat(source)(combine)(Keep.left)
/**
* Put together the elements of current flow and given [[Source]]
* into a stream of combined elements using a combiner function.
*/
def zipWithMat[Out2, Out3, Mat2, Mat3](source: Graph[SourceShape[Out2], Mat2])(combine: (Out, Out2) Out3)(matF: (Mat, Mat2) Mat3): Repr[Out3, Mat3] =
this.viaMat(Flow(source) { implicit b
r
import FlowGraph.Implicits._
val zip = b.add(ZipWith[Out, Out2, Out3](combine))
r ~> zip.in1
(zip.in0, zip.out)
})(matF)
/**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.
*
* '''Emits when''' one of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete
*
* '''Cancels when''' downstream cancels
*/
def merge[U >: Out](source: Graph[SourceShape[U], _]): Repr[U, Mat] =
mergeMat(source)(Keep.left)
/**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.
*/
def mergeMat[U >: Out, Mat2, Mat3](source: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): Repr[U, Mat3] =
this.viaMat(Flow(source) { implicit b
r
import FlowGraph.Implicits._
val merge = b.add(Merge[U](2))
r ~> merge.in(1)
(merge.in(0), merge.out)
})(matF)
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the Source is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If this [[Flow]] gets upstream error - no elements from the source will be pulled.
*
* '''Emits when''' element is available from current stream or from second stream when current is completed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' second stream completes
*
* '''Cancels when''' downstream cancels
*/
def concat[U >: Out, Mat2](source: Graph[SourceShape[U], Mat2]): Repr[U, Mat] =
concatMat(source)(Keep.left)
/**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
* Flows input is exhausted and all result elements have been generated,
* the Sources elements will be produced.
*
* Note that the Source is materialized together with this Flow and just kept
* from producing elements by asserting back-pressure until its time comes.
*
* If this [[Flow]] gets upstream error - no elements from the source will be pulled.
*/
def concatMat[U >: Out, Mat2, Mat3](source: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): Repr[U, Mat3] =
this.viaMat(Flow(source) { implicit b
r
import FlowGraph.Implicits._
val merge = b.add(Concat[U]())
r ~> merge.in(1)
(merge.in(0), merge.out)
})(matF)
def withAttributes(attr: Attributes): Repr[Out, Mat] def withAttributes(attr: Attributes): Repr[Out, Mat]
/** INTERNAL API */ /** INTERNAL API */

View file

@ -104,21 +104,6 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
*/ */
def runForeach(f: Out Unit)(implicit materializer: Materializer): Future[Unit] = runWith(Sink.foreach(f)) def runForeach(f: Out Unit)(implicit materializer: Materializer): Future[Unit] = runWith(Sink.foreach(f))
/**
* Concatenates a second source so that the first element
* emitted by that source is emitted after the last element of this
* source.
*/
def concat[Out2 >: Out, M](second: Graph[SourceShape[Out2], M]): Source[Out2, (Mat, M)] = concatMat(second)(Keep.both)
/**
* Concatenates a second source so that the first element
* emitted by that source is emitted after the last element of this
* source.
*/
def concatMat[Out2 >: Out, Mat2, Mat3](second: Graph[SourceShape[Out2], Mat2])(
combine: (Mat, Mat2) Mat3): Source[Out2, Mat3] = Source.concatMat(this, second)(combine)
/** /**
* Concatenates a second source so that the first element * Concatenates a second source so that the first element
* emitted by that source is emitted after the last element of this * emitted by that source is emitted after the last element of this
@ -126,7 +111,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
* *
* This is a shorthand for [[concat]] * This is a shorthand for [[concat]]
*/ */
def ++[Out2 >: Out, M](second: Graph[SourceShape[Out2], M]): Source[Out2, (Mat, M)] = concat(second) def ++[Out2 >: Out, M](second: Graph[SourceShape[Out2], M]): Source[Out2, Mat] = concat(second)
/** /**
* Nests the current Source and returns a Source with the given Attributes * Nests the current Source and returns a Source with the given Attributes
@ -301,30 +286,6 @@ object Source extends SourceApply {
DefaultAttributes.failedSource, DefaultAttributes.failedSource,
shape("FailedSource"))) shape("FailedSource")))
/**
* Concatenates two sources so that the first element
* emitted by the second source is emitted after the last element of the first
* source.
*/
def concat[T, Mat1, Mat2](source1: Graph[SourceShape[T], Mat1], source2: Graph[SourceShape[T], Mat2]): Source[T, (Mat1, Mat2)] =
concatMat(source1, source2)(Keep.both).withAttributes(DefaultAttributes.concatSource)
/**
* Concatenates two sources so that the first element
* emitted by the second source is emitted after the last element of the first
* source.
*/
def concatMat[T, Mat1, Mat2, Mat3](source1: Graph[SourceShape[T], Mat1], source2: Graph[SourceShape[T], Mat2])(
combine: (Mat1, Mat2) Mat3): Source[T, Mat3] =
wrap(FlowGraph.partial(source1, source2)(combine) { implicit b
(s1, s2)
import FlowGraph.Implicits._
val c = b.add(Concat[T]())
s1.outlet ~> c.in(0)
s2.outlet ~> c.in(1)
SourceShape(c.out)
}).withAttributes(DefaultAttributes.concatMatSource)
/** /**
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]
*/ */