From a50df1c5753a3afcdffb35dcbbb7a1412b507467 Mon Sep 17 00:00:00 2001 From: Richard Imaoka Date: Thu, 2 Nov 2017 10:34:40 +0900 Subject: [PATCH] Add combinedMat method to Source (#23809) * Add combinedMat method to Source * Fix formatting --- .../java/akka/stream/javadsl/SourceTest.java | 24 +++++++++++ .../akka/stream/scaladsl/SourceSpec.scala | 41 +++++++++++++++++++ .../scala/akka/stream/javadsl/Source.scala | 9 ++++ .../scala/akka/stream/scaladsl/Source.scala | 13 ++++++ 4 files changed, 87 insertions(+) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 511d6342ec..b896d22f05 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -11,6 +11,8 @@ import akka.japi.Pair; import akka.japi.function.*; import akka.japi.pf.PFBuilder; import akka.stream.*; +import akka.stream.testkit.TestSubscriber; +import akka.stream.testkit.javadsl.TestSink; import akka.util.ConstantFun; import akka.stream.stage.*; import akka.testkit.AkkaSpec; @@ -655,6 +657,28 @@ public class SourceTest extends StreamTest { future.toCompletableFuture().get(3, TimeUnit.SECONDS); } + @Test + public void mustBeAbleToCombineMat() throws Exception { + final TestKit probe = new TestKit(system); + final Source> source1 = Source.queue(1, OverflowStrategy.dropNew()); + final Source source2 = Source.from(Arrays.asList(2, 3)); + + // compiler to check the correct materialized value of type = SourceQueueWithComplete available + final Source> combined = Source.combineMat( + source1, source2, width -> Concat. create(width), Keep.left()); //Keep.left() (i.e. preserve queueSource's materialized value) + + SourceQueueWithComplete queue = combined + .toMat(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), Keep.left()) + .run(materializer); + + queue.offer(0); + queue.offer(1); + queue.complete(); //complete queueSource so that combined with `Concat` pulls elements from queueSource + + // elements from source1 (i.e. first of combined source) come first, then source2 elements, due to `Concat` + probe.expectMsgAllOf(0, 1, 2, 3); + } + @Test public void mustBeAbleToZipN() throws Exception { final TestKit probe = new TestKit(system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index bcdd23da51..185a7bbb32 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -18,6 +18,8 @@ import scala.collection.immutable import java.util import java.util.stream.BaseStream +import akka.stream.testkit.scaladsl.TestSink + class SourceSpec extends StreamSpec with DefaultTimeout { implicit val materializer = ActorMaterializer() @@ -141,6 +143,45 @@ class SourceSpec extends StreamSpec with DefaultTimeout { out.expectComplete() } + "combine from two inputs with combinedMat and take a materialized value" in { + val queueSource = Source.queue[Int](1, OverflowStrategy.dropBuffer) + val intSeqSource = Source(1 to 3) + + // compiler to check the correct materialized value of type = SourceQueueWithComplete[Int] available + val combined1: Source[Int, SourceQueueWithComplete[Int]] = + Source.combineMat(queueSource, intSeqSource)(Concat(_))(Keep.left) //Keep.left (i.e. preserve queueSource's materialized value) + + val (queue1, sinkProbe1) = combined1.toMat(TestSink.probe[Int])(Keep.both).run() + sinkProbe1.request(6) + queue1.offer(10) + queue1.offer(20) + queue1.offer(30) + queue1.complete() //complete queueSource so that combined1 with `Concat` then pulls elements from intSeqSource + sinkProbe1.expectNext(10) + sinkProbe1.expectNext(20) + sinkProbe1.expectNext(30) + sinkProbe1.expectNext(1) + sinkProbe1.expectNext(2) + sinkProbe1.expectNext(3) + + // compiler to check the correct materialized value of type = SourceQueueWithComplete[Int] available + val combined2: Source[Int, SourceQueueWithComplete[Int]] = + //queueSource to be the second of combined source + Source.combineMat(intSeqSource, queueSource)(Concat(_))(Keep.right) //Keep.right (i.e. preserve queueSource's materialized value) + + val (queue2, sinkProbe2) = combined2.toMat(TestSink.probe[Int])(Keep.both).run() + sinkProbe2.request(6) + queue2.offer(10) + queue2.offer(20) + queue2.offer(30) + queue2.complete() //complete queueSource so that combined1 with `Concat` then pulls elements from queueSource + sinkProbe2.expectNext(1) //as intSeqSource iss the first in combined source, elements from intSeqSource come first + sinkProbe2.expectNext(2) + sinkProbe2.expectNext(3) + sinkProbe2.expectNext(10) //after intSeqSource run out elements, queueSource elements come + sinkProbe2.expectNext(20) + sinkProbe2.expectNext(30) + } } "Repeat Source" must { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index dbaef24b21..68a933d00f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -317,6 +317,15 @@ object Source { new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num ⇒ strategy.apply(num))) } + /** + * Combines two sources with fan-in strategy like `Merge` or `Concat` and returns `Source` with a materialized value. + */ + def combineMat[T, U, M1, M2, M](first: Source[T, M1], second: Source[T, M2], + strategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]], + combine: function.Function2[M1, M2, M]): Source[U, M] = { + new Source(scaladsl.Source.combineMat(first.asScala, second.asScala)(num ⇒ strategy.apply(num))(combinerToScala(combine))) + } + /** * Combine the elements of multiple streams into a stream of lists. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 3f0c0ea88e..d867e86891 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -448,6 +448,19 @@ object Source { combineRest(2, rest.iterator) }) + /** + * Combines two sources with fan-in strategy like `Merge` or `Concat` and returns `Source` with a materialized value. + */ + def combineMat[T, U, M1, M2, M](first: Source[T, M1], second: Source[T, M2])(strategy: Int ⇒ Graph[UniformFanInShape[T, U], NotUsed])(matF: (M1, M2) ⇒ M): Source[U, M] = { + val secondPartiallyCombined = GraphDSL.create(second) { implicit b ⇒ secondShape ⇒ + import GraphDSL.Implicits._ + val c = b.add(strategy(2)) + secondShape ~> c.in(1) + FlowShape(c.in(0), c.out) + } + first.viaMat(secondPartiallyCombined)(matF) + } + /** * Combine the elements of multiple streams into a stream of sequences. */