diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipAll.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipAll.md new file mode 100644 index 0000000000..239c8061d6 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipAll.md @@ -0,0 +1,35 @@ +# zipAll + +Combines all elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream. + +@ref[Fan-in operators](../index.md#fan-in-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #zipAll } + +@@@ + +## Description + +Combines all elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream. + + +@@@div { .callout } + +**emits** at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input). + +**backpressures** when downstream backpressures + +**completes** when all upstream completes + +@@@ + +## Example +Scala +: @@snip [FlowZipSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala) { #zip } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #zip } \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 95016d0cbc..252cf0d601 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -249,6 +249,7 @@ the inputs in different ways. |Source/Flow|@ref[orElse](Source-or-Flow/orElse.md)|If the primary source completes without emitting any elements, the elements from the secondary source are emitted.| |Source/Flow|@ref[prepend](Source-or-Flow/prepend.md)|Prepends the given source to the flow, consuming it until completion before the original source is consumed.| |Source/Flow|@ref[zip](Source-or-Flow/zip.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.| +|Source/Flow|@ref[zipAll](Source-or-Flow/zipAll.md)|Combines all elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.| |Source/Flow|@ref[zipLatest](Source-or-Flow/zipLatest.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream, picking always the latest element of each.| |Source/Flow|@ref[zipLatestWith](Source-or-Flow/zipLatestWith.md)|Combines elements from multiple sources through a `combine` function and passes the returned value downstream, picking always the latest element of each.| |Source/Flow|@ref[zipWith](Source-or-Flow/zipWith.md)|Combines elements from multiple sources through a `combine` function and passes the returned value downstream.| @@ -346,6 +347,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [merge](Source-or-Flow/merge.md) * [mergeSorted](Source-or-Flow/mergeSorted.md) * [zip](Source-or-Flow/zip.md) +* [zipAll](Source-or-Flow/zipAll.md) * [zipLatest](Source-or-Flow/zipLatest.md) * [zipWith](Source-or-Flow/zipWith.md) * [zipLatestWith](Source-or-Flow/zipLatestWith.md) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 62aa01477d..463630dfcf 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -23,6 +23,7 @@ import org.junit.ClassRule; import org.junit.Test; import org.reactivestreams.Publisher; import akka.testkit.AkkaJUnitActorSystemResource; +import scala.Tuple2; import java.util.*; import java.util.function.Supplier; @@ -548,6 +549,36 @@ public class FlowTest extends StreamTest { assertEquals(expected, output); } + @Test + public void mustBeAbleToUseZipAll() { + final TestKit probe = new TestKit(system); + final Iterable input1 = Arrays.asList("A", "B", "C"); + final Iterable input2 = Arrays.asList(1, 2, 3, 4); + + Source src1 = Source.from(input1); + Source src2 = Source.from(input2); + Sink, CompletionStage> sink = + Sink.foreach( + new Procedure>() { + @Override + public void apply(Pair param) throws Exception { + probe.getRef().tell(param, ActorRef.noSender()); + } + }); + Flow, NotUsed> fl = + Flow.create().zipAll(src2, "MISSING", -1); + src1.via(fl).runWith(sink, materializer); + + List output = probe.receiveN(4); + List> expected = + Arrays.asList( + new Pair("A", 1), + new Pair("B", 2), + new Pair("C", 3), + new Pair("MISSING", 4)); + assertEquals(expected, output); + } + @Test public void mustBeAbleToUseConcat() { final TestKit probe = new TestKit(system); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index a9435e8f6e..beeefe3489 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -893,6 +893,38 @@ public class SourceTest extends StreamTest { future.toCompletableFuture().get(3, TimeUnit.SECONDS); } + @Test + public void mustBeAbleToZipAll() { + final TestKit probe = new TestKit(system); + final Iterable input1 = + Arrays.asList("A", "B", "C", "D", "new kid on the block1", "second newbie"); + final Iterable input2 = Arrays.asList(1, 2, 3, 4); + + Source src1 = Source.from(input1); + Source src2 = Source.from(input2); + Sink, CompletionStage> sink = + Sink.foreach( + new Procedure>() { + @Override + public void apply(Pair param) throws Exception { + probe.getRef().tell(param, ActorRef.noSender()); + } + }); + Source, NotUsed> zippedSrc = src1.zipAll(src2, "MISSING", -1); + zippedSrc.runWith(sink, materializer); + + List output = probe.receiveN(6); + List> expected = + Arrays.asList( + new Pair("A", 1), + new Pair("B", 2), + new Pair("C", 3), + new Pair("D", 4), + new Pair("new kid on the block1", -1), + new Pair("second newbie", -1)); + assertEquals(expected, output); + } + @Test public void createEmptySource() throws Exception { List actual = diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index 841bc29fc5..0aa6a93ee4 100755 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -63,6 +63,7 @@ class DslConsistencySpec extends WordSpec with Matchers { "zipWithGraph", "zipLatestGraph", "zipLatestWithGraph", + "zipAllFlow", "mergeGraph", "mergeSortedGraph", "interleaveGraph", diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipAllSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipAllSpec.scala new file mode 100644 index 0000000000..e8d9f269d0 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipAllSpec.scala @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber } +import akka.stream.testkit.scaladsl.StreamTestKit._ +import org.reactivestreams.Publisher + +class FlowZipAllSpec extends BaseTwoStreamsSetup { + override type Outputs = (Int, Int) + + override def setup(p1: Publisher[Int], p2: Publisher[Int]) = { + val subscriber = TestSubscriber.probe[Outputs]() + Source.fromPublisher(p1).zip(Source.fromPublisher(p2)).runWith(Sink.fromSubscriber(subscriber)) + subscriber + } + + "A zipAll for Flow" must { + "work for shorter left side" in assertAllStagesStopped { + val probe = TestSubscriber.manualProbe[(Int, String)]() + Source(1 to 4) + .zipAll(Source(List("A", "B", "C", "D", "E", "F")), -1, "MISSING") + .runWith(Sink.fromSubscriber(probe)) + val subscription = probe.expectSubscription() + + subscription.request(2) + probe.expectNext((1, "A")) + probe.expectNext((2, "B")) + + subscription.request(1) + probe.expectNext((3, "C")) + subscription.request(1) + probe.expectNext((4, "D")) + + subscription.request(2) + probe.expectNext((-1, "E")) + probe.expectNext((-1, "F")) + + subscription.request(1) + probe.expectComplete() + } + + "work for shorter right side" in assertAllStagesStopped { + val probe = TestSubscriber.manualProbe[(Int, String)]() + Source(1 to 8) + .zipAll(Source(List("A", "B", "C", "D", "E", "F")), -1, "MISSING") + .runWith(Sink.fromSubscriber(probe)) + val subscription = probe.expectSubscription() + + subscription.request(2) + probe.expectNext((1, "A")) + probe.expectNext((2, "B")) + + subscription.request(1) + probe.expectNext((3, "C")) + subscription.request(1) + probe.expectNext((4, "D")) + + subscription.request(2) + probe.expectNext((5, "E")) + probe.expectNext((6, "F")) + + subscription.request(2) + probe.expectNext((7, "MISSING")) + probe.expectNext((8, "MISSING")) + + subscription.request(1) + probe.expectComplete() + } + + "work for equal lengths" in assertAllStagesStopped { + val probe = TestSubscriber.manualProbe[(Int, String)]() + Source(1 to 6) + .zipAll(Source(List("A", "B", "C", "D", "E", "F")), -1, "MISSING") + .runWith(Sink.fromSubscriber(probe)) + val subscription = probe.expectSubscription() + + subscription.request(2) + probe.expectNext((1, "A")) + probe.expectNext((2, "B")) + + subscription.request(1) + probe.expectNext((3, "C")) + subscription.request(1) + probe.expectNext((4, "D")) + + subscription.request(2) + probe.expectNext((5, "E")) + probe.expectNext((6, "F")) + + subscription.request(1) + probe.expectComplete() + } + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index c37d51a683..d56d248567 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -2548,6 +2548,37 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr })), matF) + /** + * Combine the elements of current flow and the given [[Source]] into a stream of tuples. + * + * '''Emits when''' at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input). + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipAll[U, A >: Out](that: Graph[SourceShape[U], _], thisElem: A, thatElem: U): Flow[In, Pair[A, U], Mat] = + new Flow(delegate.zipAll(that, thisElem, thatElem).map { case (a, u) => Pair.create(a, u) }) + + /** + * Combine the elements of current flow and the given [[Source]] into a stream of tuples. + * + * @see [[#zipAll]] + * + * '''Emits when''' at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input). + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U)( + matF: (Mat, Mat2) => Mat3): Flow[In, Pair[A, U], Mat3] = + new Flow(delegate.zipAllMat(that, thisElem, thatElem)(matF).map { case (a, u) => Pair.create(a, u) }) + /** * Combine the elements of 2 streams into a stream of tuples, picking always the latest element of each. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index cf6dca9c70..6cc1a5f531 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -19,6 +19,7 @@ import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance import akka.util.ccompat.JavaConverters._ + import scala.collection.immutable import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } @@ -1153,6 +1154,37 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ matF: function.Function2[Mat, M, M2]): javadsl.Source[Out @uncheckedVariance Pair T, M2] = this.viaMat(Flow.create[Out].zipMat(that, Keep.right[NotUsed, M]), matF) + /** + * Combine the elements of current flow and the given [[Source]] into a stream of tuples. + * + * '''Emits when''' at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input). + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipAll[U, A >: Out](that: Graph[SourceShape[U], _], thisElem: A, thatElem: U): Source[Pair[A, U], Mat] = + new Source(delegate.zipAll(that, thisElem, thatElem).map { case (a, u) => Pair.create(a, u) }) + + /** + * Combine the elements of current flow and the given [[Source]] into a stream of tuples. + * + * @see [[#zipAll]] + * + * '''Emits when''' at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input). + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U)( + matF: (Mat, Mat2) => Mat3): Source[Pair[A, U], Mat3] = + new Source(delegate.zipAllMat(that, thisElem, thatElem)(matF).map { case (a, u) => Pair.create(a, u) }) + /** * Combine the elements of 2 streams into a stream of tuples, picking always the latest element of each. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 0db2fdb5f3..c4cad3c7f1 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -6,15 +6,14 @@ package akka.stream.javadsl import akka.NotUsed import akka.event.LoggingAdapter -import akka.japi.function +import akka.japi.{ function, Pair, Util } import akka.stream._ import akka.util.ConstantFun import akka.util.JavaDurationConverters._ - import akka.util.ccompat.JavaConverters._ + import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration.FiniteDuration -import akka.japi.Util import java.util.Comparator import scala.compat.java8.FutureConverters._ @@ -1542,6 +1541,23 @@ class SubFlow[In, Out, Mat]( def zip[T](source: Graph[SourceShape[T], _]): SubFlow[In, akka.japi.Pair[Out @uncheckedVariance, T], Mat] = new SubFlow(delegate.zip(source).map { case (o, t) => akka.japi.Pair.create(o, t) }) + /** + * Combine the elements of current flow and the given [[Source]] into a stream of tuples. + * + * '''Emits when''' at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input). + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipAll[U, A >: Out]( + that: Graph[SourceShape[U], _], + thisElem: A, + thatElem: U): SubFlow[In, akka.japi.Pair[A, U], Mat] = + new SubFlow(delegate.zipAll(that, thisElem, thatElem).map { case (a, u) => Pair.create(a, u) }) + /** * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples, picking always the latest element of each. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index e7f800b1c7..30a2e33b86 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -6,12 +6,12 @@ package akka.stream.javadsl import akka.NotUsed import akka.event.LoggingAdapter -import akka.japi.{ function, Util } +import akka.japi.{ function, Pair, Util } import akka.stream._ import akka.util.ConstantFun import akka.util.JavaDurationConverters._ - import akka.util.ccompat.JavaConverters._ + import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration.FiniteDuration import java.util.Comparator @@ -1521,6 +1521,23 @@ class SubSource[Out, Mat]( def zip[T](source: Graph[SourceShape[T], _]): SubSource[akka.japi.Pair[Out @uncheckedVariance, T], Mat] = new SubSource(delegate.zip(source).map { case (o, t) => akka.japi.Pair.create(o, t) }) + /** + * Combine the elements of current flow and the given [[Source]] into a stream of tuples. + * + * '''Emits when''' at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input). + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipAll[U, A >: Out]( + that: Graph[SourceShape[U], _], + thisElem: A, + thatElem: U): SubSource[akka.japi.Pair[A, U], Mat] = + new SubSource(delegate.zipAll(that, thisElem, thatElem).map { case (a, u) => Pair.create(a, u) }) + /** * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples, picking always the latest element of each. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 4f2494ca9b..4969c8f867 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -2404,6 +2404,43 @@ trait FlowOps[+Out, +Mat] { */ def zip[U](that: Graph[SourceShape[U], _]): Repr[(Out, U)] = via(zipGraph(that)) + /** + * Combine the elements of current flow and the given [[Source]] into a stream of tuples. + * + * '''Emits when''' at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input). + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipAll[U, A >: Out](that: Graph[SourceShape[U], _], thisElem: A, thatElem: U): Repr[(A, U)] = { + via(zipAllFlow(that, thisElem, thatElem)) + } + + protected def zipAllFlow[U, A >: Out, Mat2]( + that: Graph[SourceShape[U], Mat2], + thisElem: A, + thatElem: U): Flow[Out @uncheckedVariance, (A, U), Mat2] = { + case object passedEnd + val passedEndSrc = Source.repeat(passedEnd) + val left: Flow[Out, Any, NotUsed] = Flow[A].concat(passedEndSrc) + val right: Source[Any, Mat2] = Source.fromGraph(that).concat(passedEndSrc) + val zipFlow: Flow[Out, (A, U), Mat2] = left + .zipMat(right)(Keep.right) + .takeWhile { + case (`passedEnd`, `passedEnd`) => false + case _ => true + } + .map { + case (`passedEnd`, r: U @unchecked) => (thisElem, r) + case (l: A @unchecked, `passedEnd`) => (l, thatElem) + case t: (A, U) @unchecked => t + } + zipFlow + } + protected def zipGraph[U, M](that: Graph[SourceShape[U], M]): Graph[FlowShape[Out @uncheckedVariance, (Out, U)], M] = GraphDSL.create(that) { implicit b => r => val zip = b.add(Zip[Out, U]()) @@ -2910,6 +2947,24 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { def zipMat[U, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) => Mat3): ReprMat[(Out, U), Mat3] = viaMat(zipGraph(that))(matF) + /** + * Combine the elements of current flow and the given [[Source]] into a stream of tuples. + * + * @see [[#zipAll]] + * + * '''Emits when''' at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input). + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstream completes + * + * '''Cancels when''' downstream cancels + */ + def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U)( + matF: (Mat, Mat2) => Mat3): ReprMat[(A, U), Mat3] = { + viaMat(zipAllFlow(that, thisElem, thatElem))(matF) + } + /** * Put together the elements of current flow and the given [[Source]] * into a stream of combined elements using a combiner function.