+str #19782 Transpose a sources sequence (#19795)

This commit is contained in:
Rodolphe BELOUIN 2016-04-22 12:04:28 +02:00 committed by Konrad Malawski
parent fd89f36940
commit 936c97051a
13 changed files with 731 additions and 23 deletions

View file

@ -197,6 +197,22 @@ fromPublisher
^^^^^^^^^^^^^
Integration with Reactive Streams, subscribes to a ``org.reactivestreams.Publisher``.
zipN
^^^^
Combine the elements of multiple streams into a stream of sequences.
**emits** when all of the inputs has an element available
**completes** when any upstream completes
zipWithN
^^^^^^^^
Combine the elements of multiple streams into a stream of sequences using a combiner function.
**emits** when all of the inputs has an element available
**completes** when any upstream completes

View file

@ -186,6 +186,22 @@ fromPublisher
^^^^^^^^^^^^^
Integration with Reactive Streams, subscribes to a ``org.reactivestreams.Publisher``.
zipN
^^^^
Combine the elements of multiple streams into a stream of sequences.
**emits** when all of the inputs has an element available
**completes** when any upstream completes
zipWithN
^^^^^^^^
Combine the elements of multiple streams into a stream of sequences using a combiner function.
**emits** when all of the inputs has an element available
**completes** when any upstream completes

View file

@ -86,7 +86,7 @@ public class GraphDSLTest extends StreamTest {
final Publisher<String> pub = source.runWith(publisher, materializer);
final CompletionStage<List<String>> all = Source.fromPublisher(pub).limit(100).runWith(Sink.<String>seq(), materializer);
final List<String> result = all.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
final List<String> result = all.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(new HashSet<Object>(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet<String>(result));
}
@ -189,7 +189,7 @@ public class GraphDSLTest extends StreamTest {
}
)).run(materializer);
Duration d = Duration.create(300, TimeUnit.MILLISECONDS);
Duration d = Duration.create(3, TimeUnit.SECONDS);
Object output1 = probe1.receiveOne(d);
Object output2 = probe2.receiveOne(d);
@ -235,7 +235,7 @@ public class GraphDSLTest extends StreamTest {
}
})).run(materializer);
Duration d = Duration.create(300, TimeUnit.MILLISECONDS);
Duration d = Duration.create(3, TimeUnit.SECONDS);
Object output1 = probe1.receiveOne(d);
Object output2 = probe2.receiveOne(d);
@ -269,7 +269,59 @@ public class GraphDSLTest extends StreamTest {
return ClosedShape.getInstance();
})).run(materializer);
final Integer result = future.toCompletableFuture().get(300, TimeUnit.MILLISECONDS);
final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(11, (int) result);
}
@Test
public void mustBeAbleToUseZipN() throws Exception {
final Source<Integer, NotUsed> in1 = Source.single(1);
final Source<Integer, NotUsed> in2 = Source.single(10);
final Graph<UniformFanInShape<Integer, List<Integer>>, NotUsed> sumZip = ZipN.create(2);
final CompletionStage<List<Integer>> future = RunnableGraph.fromGraph(GraphDSL.create(Sink.<List<Integer>>head(),
(b, out) -> {
final UniformFanInShape<Integer, List<Integer>> zip = b.add(sumZip);
b.from(b.add(in1)).toInlet(zip.in(0));
b.from(b.add(in2)).toInlet(zip.in(1));
b.from(zip.out()).to(out);
return ClosedShape.getInstance();
})).run(materializer);
final List<Integer> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(Arrays.asList(1, 10), result);
}
@Test
public void mustBeAbleToUseZipWithN() throws Exception {
final Source<Integer, NotUsed> in1 = Source.single(1);
final Source<Integer, NotUsed> in2 = Source.single(10);
final Graph<UniformFanInShape<Integer, Integer>, NotUsed> sumZip = ZipWithN.create(
new Function<List<Integer>, Integer>() {
@Override public Integer apply(List<Integer> list) throws Exception {
Integer sum = 0;
for(Integer i : list) {
sum += i;
}
return sum;
}
}, 2);
final CompletionStage<Integer> future = RunnableGraph.fromGraph(GraphDSL.create(Sink.<Integer>head(),
(b, out) -> {
final UniformFanInShape<Integer, Integer> zip = b.add(sumZip);
b.from(b.add(in1)).toInlet(zip.in(0));
b.from(b.add(in2)).toInlet(zip.in(1));
b.from(zip.out()).to(out);
return ClosedShape.getInstance();
})).run(materializer);
final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(11, (int) result);
}
@ -298,7 +350,7 @@ public class GraphDSLTest extends StreamTest {
return ClosedShape.getInstance();
})).run(materializer);
final Integer result = future.toCompletableFuture().get(300, TimeUnit.MILLISECONDS);
final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(1111, (int) result);
}
@ -315,7 +367,7 @@ public class GraphDSLTest extends StreamTest {
return ClosedShape.getInstance();
})).run(materializer);
final Integer result = future.toCompletableFuture().get(300, TimeUnit.MILLISECONDS);
final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(1, (int) result);
probe.expectMsg(1);

View file

@ -554,7 +554,7 @@ public class SourceTest extends StreamTest {
probe.expectMsgEquals(",");
probe.expectMsgEquals("3");
probe.expectMsgEquals("]");
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
@ -574,7 +574,7 @@ public class SourceTest extends StreamTest {
probe.expectMsgEquals("2");
probe.expectMsgEquals(",");
probe.expectMsgEquals("3");
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
@ -591,7 +591,7 @@ public class SourceTest extends StreamTest {
probe.expectMsgEquals(2);
probe.expectMsgEquals(3);
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
@ -612,7 +612,7 @@ public class SourceTest extends StreamTest {
FiniteDuration duration = Duration.apply(200, TimeUnit.MILLISECONDS);
probe.expectNoMsg(duration);
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
@ -637,7 +637,7 @@ public class SourceTest extends StreamTest {
s.sendNext(1);
probe.expectMsgEquals(0);
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
@ -654,7 +654,41 @@ public class SourceTest extends StreamTest {
probe.expectMsgAllOf(0, 1, 2, 3);
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
public void mustBeAbleToZipN() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Source<Integer, NotUsed> source1 = Source.from(Arrays.asList(0, 1));
final Source<Integer, NotUsed> source2 = Source.from(Arrays.asList(2, 3));
final List<Source<Integer, ?>> sources = Arrays.asList(source1, source2);
final Source<List<Integer>, ?> source = Source.zipN(sources);
final CompletionStage<Done> future = source.runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer);
probe.expectMsgAllOf(Arrays.asList(0, 2), Arrays.asList(1, 3));
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
public void mustBeAbleToZipWithN() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Source<Integer, NotUsed> source1 = Source.from(Arrays.asList(0, 1));
final Source<Integer, NotUsed> source2 = Source.from(Arrays.asList(2, 3));
final List<Source<Integer, ?>> sources = Arrays.asList(source1, source2);
final Source<Boolean, ?> source = Source.zipWithN(list -> new Boolean(list.contains(0)), sources);
final CompletionStage<Done> future = source.runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer);
probe.expectMsgAllOf(Boolean.TRUE, Boolean.FALSE);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test

View file

@ -30,6 +30,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
(classOf[scala.collection.immutable.Iterable[_]], classOf[java.lang.Iterable[_]]) ::
(classOf[scala.collection.Iterator[_]], classOf[java.util.Iterator[_]]) ::
(classOf[scala.collection.Seq[_]], classOf[java.util.List[_]]) ::
(classOf[scala.collection.immutable.Seq[_]], classOf[java.util.List[_]]) ::
(classOf[scala.collection.immutable.Set[_]], classOf[java.util.Set[_]]) ::
(classOf[Boolean], classOf[akka.stream.javadsl.AsPublisher]) ::
(classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) ::

View file

@ -0,0 +1,227 @@
/**
* Copyright (C) 2014-2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.stream._
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.collection.immutable
class GraphZipNSpec extends TwoStreamsSetup {
import GraphDSL.Implicits._
override type Outputs = immutable.Seq[Int]
override def fixture(b: GraphDSL.Builder[_]): Fixture = new Fixture(b) {
val zipN = b.add(ZipN[Int](2))
override def left: Inlet[Int] = zipN.in(0)
override def right: Inlet[Int] = zipN.in(1)
override def out: Outlet[immutable.Seq[Int]] = zipN.out
}
"ZipN" must {
"work in the happy case" in assertAllStagesStopped {
val probe = TestSubscriber.manualProbe[immutable.Seq[Int]]()
RunnableGraph.fromGraph(GraphDSL.create() { implicit b
val zipN = b.add(ZipN[Int](2))
Source(1 to 4) ~> zipN.in(0)
Source(2 to 5) ~> zipN.in(1)
zipN.out ~> Sink.fromSubscriber(probe)
ClosedShape
}).run()
val subscription = probe.expectSubscription()
subscription.request(2)
probe.expectNext(immutable.Seq(1, 2))
probe.expectNext(immutable.Seq(2, 3))
subscription.request(1)
probe.expectNext(immutable.Seq(3, 4))
subscription.request(1)
probe.expectNext(immutable.Seq(4, 5))
probe.expectComplete()
}
"complete if one side is available but other already completed" in {
val upstream1 = TestPublisher.probe[Int]()
val upstream2 = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[immutable.Seq[Int]]()
RunnableGraph.fromGraph(GraphDSL.create(Sink.fromSubscriber(downstream)) { implicit b
out
val zipN = b.add(ZipN[Int](2))
Source.fromPublisher(upstream1) ~> zipN.in(0)
Source.fromPublisher(upstream2) ~> zipN.in(1)
zipN.out ~> out
ClosedShape
}).run()
upstream1.sendNext(1)
upstream1.sendNext(2)
upstream2.sendNext(2)
upstream2.sendComplete()
downstream.requestNext(immutable.Seq(1, 2))
downstream.expectComplete()
upstream1.expectCancellation()
}
"complete even if no pending demand" in {
val upstream1 = TestPublisher.probe[Int]()
val upstream2 = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[immutable.Seq[Int]]()
RunnableGraph.fromGraph(GraphDSL.create(Sink.fromSubscriber(downstream)) { implicit b
out
val zipN = b.add(ZipN[Int](2))
Source.fromPublisher(upstream1) ~> zipN.in(0)
Source.fromPublisher(upstream2) ~> zipN.in(1)
zipN.out ~> out
ClosedShape
}).run()
downstream.request(1)
upstream1.sendNext(1)
upstream2.sendNext(2)
downstream.expectNext(immutable.Seq(1, 2))
upstream2.sendComplete()
downstream.expectComplete()
upstream1.expectCancellation()
}
"complete if both sides complete before requested with elements pending" in {
val upstream1 = TestPublisher.probe[Int]()
val upstream2 = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[immutable.Seq[Int]]()
RunnableGraph.fromGraph(GraphDSL.create(Sink.fromSubscriber(downstream)) { implicit b
out
val zipN = b.add(ZipN[Int](2))
Source.fromPublisher(upstream1) ~> zipN.in(0)
Source.fromPublisher(upstream2) ~> zipN.in(1)
zipN.out ~> out
ClosedShape
}).run()
upstream1.sendNext(1)
upstream2.sendNext(2)
upstream1.sendComplete()
upstream2.sendComplete()
downstream.requestNext(immutable.Seq(1, 2))
downstream.expectComplete()
}
"complete if one side complete before requested with elements pending" in {
val upstream1 = TestPublisher.probe[Int]()
val upstream2 = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[immutable.Seq[Int]]()
RunnableGraph.fromGraph(GraphDSL.create(Sink.fromSubscriber(downstream)) { implicit b
out
val zipN = b.add(ZipN[Int](2))
Source.fromPublisher(upstream1) ~> zipN.in(0)
Source.fromPublisher(upstream2) ~> zipN.in(1)
zipN.out ~> out
ClosedShape
}).run()
upstream1.sendNext(1)
upstream1.sendNext(2)
upstream2.sendNext(2)
upstream1.sendComplete()
upstream2.sendComplete()
downstream.requestNext(immutable.Seq(1, 2))
downstream.expectComplete()
}
"complete if one side complete before requested with elements pending 2" in {
val upstream1 = TestPublisher.probe[Int]()
val upstream2 = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[immutable.Seq[Int]]()
RunnableGraph.fromGraph(GraphDSL.create(Sink.fromSubscriber(downstream)) { implicit b
out
val zipN = b.add(ZipN[Int](2))
Source.fromPublisher(upstream1) ~> zipN.in(0)
Source.fromPublisher(upstream2) ~> zipN.in(1)
zipN.out ~> out
ClosedShape
}).run()
downstream.ensureSubscription()
upstream1.sendNext(1)
upstream1.sendComplete()
downstream.expectNoMsg(500.millis)
upstream2.sendNext(2)
upstream2.sendComplete()
downstream.requestNext(immutable.Seq(1, 2))
downstream.expectComplete()
}
commonTests()
"work with one immediately completed and one nonempty publisher" in assertAllStagesStopped {
val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher)
subscriber2.expectSubscriptionAndComplete()
}
"work with one delayed completed and one nonempty publisher" in assertAllStagesStopped {
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher)
subscriber2.expectSubscriptionAndComplete()
}
"work with one immediately failed and one nonempty publisher" in assertAllStagesStopped {
val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher)
subscriber2.expectSubscriptionAndError(TestException)
}
"work with one delayed failed and one nonempty publisher" in assertAllStagesStopped {
val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher)
val subscription2 = subscriber2.expectSubscriptionAndError(TestException)
}
}
}

View file

@ -0,0 +1,162 @@
package akka.stream.scaladsl
import akka.stream.testkit._
import scala.concurrent.duration._
import akka.stream._
import akka.testkit.EventFilter
import scala.collection.immutable
class GraphZipWithNSpec extends TwoStreamsSetup {
import GraphDSL.Implicits._
override type Outputs = Int
override def fixture(b: GraphDSL.Builder[_]): Fixture = new Fixture(b) {
val zip = b.add(ZipWithN((_: immutable.Seq[Int]).sum)(2))
override def left: Inlet[Int] = zip.in(0)
override def right: Inlet[Int] = zip.in(1)
override def out: Outlet[Int] = zip.out
}
"ZipWithN" must {
"work in the happy case" in {
val probe = TestSubscriber.manualProbe[Outputs]()
RunnableGraph.fromGraph(GraphDSL.create() { implicit b
val zip = b.add(ZipWithN((_: immutable.Seq[Int]).sum)(2))
Source(1 to 4) ~> zip.in(0)
Source(10 to 40 by 10) ~> zip.in(1)
zip.out ~> Sink.fromSubscriber(probe)
ClosedShape
}).run()
val subscription = probe.expectSubscription()
subscription.request(2)
probe.expectNext(11)
probe.expectNext(22)
subscription.request(1)
probe.expectNext(33)
subscription.request(1)
probe.expectNext(44)
probe.expectComplete()
}
"work in the sad case" in {
val probe = TestSubscriber.manualProbe[Outputs]()
RunnableGraph.fromGraph(GraphDSL.create() { implicit b
val zip = b.add(ZipWithN((_: immutable.Seq[Int]).foldLeft(1)(_ / _))(2))
Source(1 to 4) ~> zip.in(0)
Source(-2 to 2) ~> zip.in(1)
zip.out ~> Sink.fromSubscriber(probe)
ClosedShape
}).run()
val subscription = probe.expectSubscription()
subscription.request(2)
probe.expectNext(1 / 1 / -2)
probe.expectNext(1 / 2 / -1)
EventFilter[ArithmeticException](occurrences = 1).intercept {
subscription.request(2)
}
probe.expectError() match {
case a: java.lang.ArithmeticException a.getMessage should be("/ by zero")
}
probe.expectNoMsg(200.millis)
}
commonTests()
"work with one immediately completed and one nonempty publisher" in {
val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher)
subscriber2.expectSubscriptionAndComplete()
}
"work with one delayed completed and one nonempty publisher" in {
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndComplete()
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher)
subscriber2.expectSubscriptionAndComplete()
}
"work with one immediately failed and one nonempty publisher" in {
val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher)
subscriber2.expectSubscriptionAndError(TestException)
}
"work with one delayed failed and one nonempty publisher" in {
val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectSubscriptionAndError(TestException)
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher)
val subscription2 = subscriber2.expectSubscriptionAndError(TestException)
}
"work with 3 inputs" in {
val probe = TestSubscriber.manualProbe[Int]()
RunnableGraph.fromGraph(GraphDSL.create() { implicit b
val zip = b.add(ZipWithN((_: immutable.Seq[Int]).sum)(3))
Source.single(1) ~> zip.in(0)
Source.single(2) ~> zip.in(1)
Source.single(3) ~> zip.in(2)
zip.out ~> Sink.fromSubscriber(probe)
ClosedShape
}).run()
val subscription = probe.expectSubscription()
subscription.request(5)
probe.expectNext(6)
probe.expectComplete()
}
"work with 30 inputs" in {
val probe = TestSubscriber.manualProbe[Int]()
RunnableGraph.fromGraph(GraphDSL.create() { implicit b
val zip = b.add(ZipWithN((_: immutable.Seq[Int]).sum)(30))
(0 to 29).foreach {
n Source.single(n) ~> zip.in(n)
}
zip.out ~> Sink.fromSubscriber(probe)
ClosedShape
}).run()
val subscription = probe.expectSubscription()
subscription.request(1)
probe.expectNext((0 to 29).sum)
probe.expectComplete()
}
}
}

View file

@ -15,6 +15,7 @@ import akka.stream.testkit._
import akka.NotUsed
import akka.testkit.EventFilter
import akka.testkit.AkkaSpec
import scala.collection.immutable
class SourceSpec extends AkkaSpec with DefaultTimeout {
@ -93,7 +94,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout {
c.expectNoMsg(300.millis)
subs.cancel()
Await.result(f.future, 500.millis) shouldEqual None
Await.result(f.future, 3.seconds) shouldEqual None
}
"allow external triggering of empty completion" in Utils.assertAllStagesStopped {
@ -105,7 +106,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout {
// external cancellation
neverPromise.trySuccess(None) shouldEqual true
Await.result(counterFuture, 500.millis) shouldEqual 0
Await.result(counterFuture, 3.seconds) shouldEqual 0
}
"allow external triggering of non-empty completion" in Utils.assertAllStagesStopped {
@ -117,7 +118,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout {
// external cancellation
neverPromise.trySuccess(Some(6)) shouldEqual true
Await.result(counterFuture, 500.millis) shouldEqual 6
Await.result(counterFuture, 3.seconds) shouldEqual 6
}
"allow external triggering of onError" in Utils.assertAllStagesStopped {
@ -129,7 +130,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout {
// external cancellation
neverPromise.failure(new Exception("Boom") with NoStackTrace)
val ready = Await.ready(counterFuture, 500.millis)
val ready = Await.ready(counterFuture, 3.seconds)
val Failure(ex) = ready.value.get
ex.getMessage should include("Boom")
}
@ -138,11 +139,11 @@ class SourceSpec extends AkkaSpec with DefaultTimeout {
"Composite Source" must {
"merge from many inputs" in {
val probes = Seq.fill(5)(TestPublisher.manualProbe[Int]())
val probes = immutable.Seq.fill(5)(TestPublisher.manualProbe[Int]())
val source = Source.asSubscriber[Int]
val out = TestSubscriber.manualProbe[Int]
val s = Source.fromGraph(GraphDSL.create(source, source, source, source, source)(Seq(_, _, _, _, _)) { implicit b
val s = Source.fromGraph(GraphDSL.create(source, source, source, source, source)(immutable.Seq(_, _, _, _, _)) { implicit b
(i0, i1, i2, i3, i4)
import GraphDSL.Implicits._
val m = b.add(Merge[Int](5))
@ -171,7 +172,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout {
}
"combine from many inputs with simplified API" in {
val probes = Seq.fill(3)(TestPublisher.manualProbe[Int]())
val probes = immutable.Seq.fill(3)(TestPublisher.manualProbe[Int]())
val source = for (i 0 to 2) yield Source.fromPublisher(probes(i))
val out = TestSubscriber.manualProbe[Int]
@ -193,7 +194,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout {
}
"combine from two inputs with simplified API" in {
val probes = Seq.fill(2)(TestPublisher.manualProbe[Int]())
val probes = immutable.Seq.fill(2)(TestPublisher.manualProbe[Int]())
val source = Source.fromPublisher(probes(0)) :: Source.fromPublisher(probes(1)) :: Nil
val out = TestSubscriber.manualProbe[Int]
@ -268,7 +269,36 @@ class SourceSpec extends AkkaSpec with DefaultTimeout {
Source.fromIterator(() Iterator.iterate(false)(!_))
.grouped(10)
.runWith(Sink.head)
.futureValue should ===(Seq(false, true, false, true, false, true, false, true, false, true))
.futureValue should ===(immutable.Seq(false, true, false, true, false, true, false, true, false, true))
}
}
"ZipN Source" must {
"properly zipN" in {
val sources = immutable.Seq(
Source(List(1, 2, 3)),
Source(List(10, 20, 30)),
Source(List(100, 200, 300)))
Source.zipN(sources)
.runWith(Sink.seq)
.futureValue should ===(immutable.Seq(
immutable.Seq(1, 10, 100),
immutable.Seq(2, 20, 200),
immutable.Seq(3, 30, 300)))
}
}
"ZipWithN Source" must {
"properly zipWithN" in {
val sources = immutable.Seq(
Source(List(1, 2, 3)),
Source(List(10, 20, 30)),
Source(List(100, 200, 300)))
Source.zipWithN[Int, Int](_.sum)(sources)
.runWith(Sink.seq)
.futureValue should ===(immutable.Seq(111, 222, 333))
}
}

View file

@ -69,6 +69,8 @@ private[stream] object Stages {
val broadcast = name("broadcast")
val balance = name("balance")
val zip = name("zip")
val zipN = name("zipN")
val zipWithN = name("zipWithN")
val unzip = name("unzip")
val concat = name("concat")
val repeat = name("repeat")

View file

@ -243,6 +243,45 @@ object Zip {
new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) }
}
/**
* Combine the elements of multiple streams into a stream of lists.
*
* A `ZipN` has a `n` input ports and one `out` port
*
* '''Emits when''' all of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
object ZipN {
def create[A](n: Int): Graph[UniformFanInShape[A, java.util.List[A]], NotUsed] = {
ZipWithN.create(ConstantFun.javaIdentityFunction[java.util.List[A]], n)
}
}
/**
* Combine the elements of multiple streams into a stream of lists using a combiner function.
*
* A `ZipWithN` has a `n` input ports and one `out` port
*
* '''Emits when''' all of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
object ZipWithN {
def create[A, O](zipper: function.Function[java.util.List[A], O], n: Int): Graph[UniformFanInShape[A, O], NotUsed] = {
import scala.collection.JavaConverters._
scaladsl.ZipWithN[A, O](seq => zipper.apply(seq.asJava))(n)
}
}
/**
* Takes a stream of pair elements and splits each pair to two output streams.
*

View file

@ -272,11 +272,26 @@ object Source {
*/
def combine[T, U](first: Source[T, _ <: Any], second: Source[T, _ <: Any], rest: java.util.List[Source[T, _ <: Any]],
strategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]]): Source[U, NotUsed] = {
import scala.collection.JavaConverters._
val seq = if (rest != null) rest.asScala.map(_.asScala) else Seq()
val seq = if (rest != null) Util.immutableSeq(rest).map(_.asScala) else immutable.Seq()
new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num strategy.apply(num)))
}
/**
* Combine the elements of multiple streams into a stream of lists.
*/
def zipN[T](sources: java.util.List[Source[T, _ <: Any]]): Source[java.util.List[T], NotUsed] = {
val seq = if (sources != null) Util.immutableSeq(sources).map(_.asScala) else immutable.Seq()
new Source(scaladsl.Source.zipN(seq).map(_.asJava))
}
/*
* Combine the elements of multiple streams into a stream of lists using a combiner function.
*/
def zipWithN[T, O](zipper: function.Function[java.util.List[T], O], sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = {
val seq = if (sources != null) Util.immutableSeq(sources).map(_.asScala) else immutable.Seq()
new Source(scaladsl.Source.zipWithN[T, O](seq => zipper.apply(seq.asJava))(seq))
}
/**
* Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,

View file

@ -732,6 +732,102 @@ final class Unzip[A, B]() extends UnzipWith2[(A, B), A, B](ConstantFun.scalaIden
*/
object UnzipWith extends UnzipWithApply
object ZipN {
/**
* Create a new `ZipN`.
*/
def apply[A](n: Int) = new ZipN[A](n)
}
/**
* Combine the elements of multiple streams into a stream of sequences.
*
* A `ZipN` has a `n` input ports and one `out` port
*
* '''Emits when''' all of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
final class ZipN[A](n: Int) extends ZipWithN[A, immutable.Seq[A]](ConstantFun.scalaIdentityFunction)(n) {
override def initialAttributes = DefaultAttributes.zipN
override def toString = "ZipN"
}
object ZipWithN {
/**
* Create a new `ZipWithN`.
*/
def apply[A, O](zipper: immutable.Seq[A] => O)(n: Int) = new ZipWithN[A, O](zipper)(n)
}
/**
* Combine the elements of multiple streams into a stream of sequences using a combiner function.
*
* A `ZipWithN` has a `n` input ports and one `out` port
*
* '''Emits when''' all of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*/
class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[UniformFanInShape[A, O]] {
override def initialAttributes = DefaultAttributes.zipWithN
override val shape = new UniformFanInShape[A, O](n)
def out = shape.out
val inSeq = shape.inSeq
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
var pending = 0
// Without this field the completion signalling would take one extra pull
var willShutDown = false
val grabInlet = grab[A] _
val pullInlet = pull[A] _
private def pushAll(): Unit = {
push(out, zipper(inSeq.map(grabInlet)))
if (willShutDown) completeStage()
else inSeq.foreach(pullInlet)
}
override def preStart(): Unit = {
inSeq.foreach(pullInlet)
}
inSeq.foreach(in => {
setHandler(in, new InHandler {
override def onPush(): Unit = {
pending -= 1
if (pending == 0) pushAll()
}
override def onUpstreamFinish(): Unit = {
if (!isAvailable(in)) completeStage()
willShutDown = true
}
})
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pending += n
if (pending == 0) pushAll()
}
})
}
override def toString = "ZipWithN"
}
object Concat {
/**
* Create a new `Concat`.

View file

@ -413,6 +413,24 @@ object Source {
combineRest(2, rest.iterator)
})
/**
* Combine the elements of multiple streams into a stream of sequences.
*/
def zipN[T](sources: immutable.Seq[Source[T, _]]): Source[immutable.Seq[T], NotUsed] = zipWithN(ConstantFun.scalaIdentityFunction[immutable.Seq[T]])(sources).addAttributes(DefaultAttributes.zipN)
/*
* Combine the elements of multiple streams into a stream of sequences using a combiner function.
*/
def zipWithN[T, O](zipper: immutable.Seq[T] O)(sources: immutable.Seq[Source[T, _]]): Source[O, NotUsed] = {
val source = sources match {
case immutable.Seq() empty[O]
case immutable.Seq(source) source.map(t zipper(immutable.Seq(t))).mapMaterializedValue(_ NotUsed)
case s1 +: s2 +: ss combine(s1, s2, ss: _*)(ZipWithN(zipper))
}
source.addAttributes(DefaultAttributes.zipWithN)
}
/**
* Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,