Add combinedMat method to Source (#23809)

* Add combinedMat method to Source

* Fix formatting
This commit is contained in:
Richard Imaoka 2017-11-02 10:34:40 +09:00 committed by Konrad `ktoso` Malawski
parent 0988933fac
commit a50df1c575
4 changed files with 87 additions and 0 deletions

View file

@ -11,6 +11,8 @@ import akka.japi.Pair;
import akka.japi.function.*;
import akka.japi.pf.PFBuilder;
import akka.stream.*;
import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.javadsl.TestSink;
import akka.util.ConstantFun;
import akka.stream.stage.*;
import akka.testkit.AkkaSpec;
@ -655,6 +657,28 @@ public class SourceTest extends StreamTest {
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
public void mustBeAbleToCombineMat() throws Exception {
final TestKit probe = new TestKit(system);
final Source<Integer, SourceQueueWithComplete<Integer>> source1 = Source.queue(1, OverflowStrategy.dropNew());
final Source<Integer, NotUsed> source2 = Source.from(Arrays.asList(2, 3));
// compiler to check the correct materialized value of type = SourceQueueWithComplete<Integer> available
final Source<Integer, SourceQueueWithComplete<Integer>> combined = Source.combineMat(
source1, source2, width -> Concat.<Integer> create(width), Keep.left()); //Keep.left() (i.e. preserve queueSource's materialized value)
SourceQueueWithComplete<Integer> queue = combined
.toMat(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), Keep.left())
.run(materializer);
queue.offer(0);
queue.offer(1);
queue.complete(); //complete queueSource so that combined with `Concat` pulls elements from queueSource
// elements from source1 (i.e. first of combined source) come first, then source2 elements, due to `Concat`
probe.expectMsgAllOf(0, 1, 2, 3);
}
@Test
public void mustBeAbleToZipN() throws Exception {
final TestKit probe = new TestKit(system);

View file

@ -18,6 +18,8 @@ import scala.collection.immutable
import java.util
import java.util.stream.BaseStream
import akka.stream.testkit.scaladsl.TestSink
class SourceSpec extends StreamSpec with DefaultTimeout {
implicit val materializer = ActorMaterializer()
@ -141,6 +143,45 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
out.expectComplete()
}
"combine from two inputs with combinedMat and take a materialized value" in {
val queueSource = Source.queue[Int](1, OverflowStrategy.dropBuffer)
val intSeqSource = Source(1 to 3)
// compiler to check the correct materialized value of type = SourceQueueWithComplete[Int] available
val combined1: Source[Int, SourceQueueWithComplete[Int]] =
Source.combineMat(queueSource, intSeqSource)(Concat(_))(Keep.left) //Keep.left (i.e. preserve queueSource's materialized value)
val (queue1, sinkProbe1) = combined1.toMat(TestSink.probe[Int])(Keep.both).run()
sinkProbe1.request(6)
queue1.offer(10)
queue1.offer(20)
queue1.offer(30)
queue1.complete() //complete queueSource so that combined1 with `Concat` then pulls elements from intSeqSource
sinkProbe1.expectNext(10)
sinkProbe1.expectNext(20)
sinkProbe1.expectNext(30)
sinkProbe1.expectNext(1)
sinkProbe1.expectNext(2)
sinkProbe1.expectNext(3)
// compiler to check the correct materialized value of type = SourceQueueWithComplete[Int] available
val combined2: Source[Int, SourceQueueWithComplete[Int]] =
//queueSource to be the second of combined source
Source.combineMat(intSeqSource, queueSource)(Concat(_))(Keep.right) //Keep.right (i.e. preserve queueSource's materialized value)
val (queue2, sinkProbe2) = combined2.toMat(TestSink.probe[Int])(Keep.both).run()
sinkProbe2.request(6)
queue2.offer(10)
queue2.offer(20)
queue2.offer(30)
queue2.complete() //complete queueSource so that combined1 with `Concat` then pulls elements from queueSource
sinkProbe2.expectNext(1) //as intSeqSource iss the first in combined source, elements from intSeqSource come first
sinkProbe2.expectNext(2)
sinkProbe2.expectNext(3)
sinkProbe2.expectNext(10) //after intSeqSource run out elements, queueSource elements come
sinkProbe2.expectNext(20)
sinkProbe2.expectNext(30)
}
}
"Repeat Source" must {

View file

@ -317,6 +317,15 @@ object Source {
new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num strategy.apply(num)))
}
/**
* Combines two sources with fan-in strategy like `Merge` or `Concat` and returns `Source` with a materialized value.
*/
def combineMat[T, U, M1, M2, M](first: Source[T, M1], second: Source[T, M2],
strategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]],
combine: function.Function2[M1, M2, M]): Source[U, M] = {
new Source(scaladsl.Source.combineMat(first.asScala, second.asScala)(num strategy.apply(num))(combinerToScala(combine)))
}
/**
* Combine the elements of multiple streams into a stream of lists.
*/

View file

@ -448,6 +448,19 @@ object Source {
combineRest(2, rest.iterator)
})
/**
* Combines two sources with fan-in strategy like `Merge` or `Concat` and returns `Source` with a materialized value.
*/
def combineMat[T, U, M1, M2, M](first: Source[T, M1], second: Source[T, M2])(strategy: Int Graph[UniformFanInShape[T, U], NotUsed])(matF: (M1, M2) M): Source[U, M] = {
val secondPartiallyCombined = GraphDSL.create(second) { implicit b secondShape
import GraphDSL.Implicits._
val c = b.add(strategy(2))
secondShape ~> c.in(1)
FlowShape(c.in(0), c.out)
}
first.viaMat(secondPartiallyCombined)(matF)
}
/**
* Combine the elements of multiple streams into a stream of sequences.
*/