Akka 27103/streams zip all #27103

This commit is contained in:
eyalfa 2019-07-05 17:40:06 +03:00 committed by Johan Andrén
parent 98865d7fb6
commit 14c02302bc
11 changed files with 355 additions and 5 deletions

View file

@ -0,0 +1,35 @@
# zipAll
Combines all elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.
@ref[Fan-in operators](../index.md#fan-in-operators)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #zipAll }
@@@
## Description
Combines all elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.
@@@div { .callout }
**emits** at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input).
**backpressures** when downstream backpressures
**completes** when all upstream completes
@@@
## Example
Scala
: @@snip [FlowZipSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala) { #zip }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #zip }

View file

@ -249,6 +249,7 @@ the inputs in different ways.
|Source/Flow|<a name="orelse"></a>@ref[orElse](Source-or-Flow/orElse.md)|If the primary source completes without emitting any elements, the elements from the secondary source are emitted.|
|Source/Flow|<a name="prepend"></a>@ref[prepend](Source-or-Flow/prepend.md)|Prepends the given source to the flow, consuming it until completion before the original source is consumed.|
|Source/Flow|<a name="zip"></a>@ref[zip](Source-or-Flow/zip.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.|
|Source/Flow|<a name="zipall"></a>@ref[zipAll](Source-or-Flow/zipAll.md)|Combines all elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.|
|Source/Flow|<a name="ziplatest"></a>@ref[zipLatest](Source-or-Flow/zipLatest.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream, picking always the latest element of each.|
|Source/Flow|<a name="ziplatestwith"></a>@ref[zipLatestWith](Source-or-Flow/zipLatestWith.md)|Combines elements from multiple sources through a `combine` function and passes the returned value downstream, picking always the latest element of each.|
|Source/Flow|<a name="zipwith"></a>@ref[zipWith](Source-or-Flow/zipWith.md)|Combines elements from multiple sources through a `combine` function and passes the returned value downstream.|
@ -346,6 +347,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [merge](Source-or-Flow/merge.md)
* [mergeSorted](Source-or-Flow/mergeSorted.md)
* [zip](Source-or-Flow/zip.md)
* [zipAll](Source-or-Flow/zipAll.md)
* [zipLatest](Source-or-Flow/zipLatest.md)
* [zipWith](Source-or-Flow/zipWith.md)
* [zipLatestWith](Source-or-Flow/zipLatestWith.md)

View file

@ -23,6 +23,7 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.reactivestreams.Publisher;
import akka.testkit.AkkaJUnitActorSystemResource;
import scala.Tuple2;
import java.util.*;
import java.util.function.Supplier;
@ -548,6 +549,36 @@ public class FlowTest extends StreamTest {
assertEquals(expected, output);
}
@Test
public void mustBeAbleToUseZipAll() {
final TestKit probe = new TestKit(system);
final Iterable<String> input1 = Arrays.asList("A", "B", "C");
final Iterable<Integer> input2 = Arrays.asList(1, 2, 3, 4);
Source<String, NotUsed> src1 = Source.from(input1);
Source<Integer, NotUsed> src2 = Source.from(input2);
Sink<Pair<String, Integer>, CompletionStage<Done>> sink =
Sink.foreach(
new Procedure<Pair<String, Integer>>() {
@Override
public void apply(Pair<String, Integer> param) throws Exception {
probe.getRef().tell(param, ActorRef.noSender());
}
});
Flow<String, Pair<String, Integer>, NotUsed> fl =
Flow.<String>create().zipAll(src2, "MISSING", -1);
src1.via(fl).runWith(sink, materializer);
List<Object> output = probe.receiveN(4);
List<Pair<String, Integer>> expected =
Arrays.asList(
new Pair<String, Integer>("A", 1),
new Pair<String, Integer>("B", 2),
new Pair<String, Integer>("C", 3),
new Pair<String, Integer>("MISSING", 4));
assertEquals(expected, output);
}
@Test
public void mustBeAbleToUseConcat() {
final TestKit probe = new TestKit(system);

View file

@ -893,6 +893,38 @@ public class SourceTest extends StreamTest {
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
public void mustBeAbleToZipAll() {
final TestKit probe = new TestKit(system);
final Iterable<String> input1 =
Arrays.asList("A", "B", "C", "D", "new kid on the block1", "second newbie");
final Iterable<Integer> input2 = Arrays.asList(1, 2, 3, 4);
Source<String, NotUsed> src1 = Source.from(input1);
Source<Integer, NotUsed> src2 = Source.from(input2);
Sink<Pair<String, Integer>, CompletionStage<Done>> sink =
Sink.foreach(
new Procedure<Pair<String, Integer>>() {
@Override
public void apply(Pair<String, Integer> param) throws Exception {
probe.getRef().tell(param, ActorRef.noSender());
}
});
Source<Pair<String, Integer>, NotUsed> zippedSrc = src1.zipAll(src2, "MISSING", -1);
zippedSrc.runWith(sink, materializer);
List<Object> output = probe.receiveN(6);
List<Pair<String, Integer>> expected =
Arrays.asList(
new Pair<String, Integer>("A", 1),
new Pair<String, Integer>("B", 2),
new Pair<String, Integer>("C", 3),
new Pair<String, Integer>("D", 4),
new Pair<String, Integer>("new kid on the block1", -1),
new Pair<String, Integer>("second newbie", -1));
assertEquals(expected, output);
}
@Test
public void createEmptySource() throws Exception {
List<Integer> actual =

View file

@ -63,6 +63,7 @@ class DslConsistencySpec extends WordSpec with Matchers {
"zipWithGraph",
"zipLatestGraph",
"zipLatestWithGraph",
"zipAllFlow",
"mergeGraph",
"mergeSortedGraph",
"interleaveGraph",

View file

@ -0,0 +1,98 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber }
import akka.stream.testkit.scaladsl.StreamTestKit._
import org.reactivestreams.Publisher
class FlowZipAllSpec extends BaseTwoStreamsSetup {
override type Outputs = (Int, Int)
override def setup(p1: Publisher[Int], p2: Publisher[Int]) = {
val subscriber = TestSubscriber.probe[Outputs]()
Source.fromPublisher(p1).zip(Source.fromPublisher(p2)).runWith(Sink.fromSubscriber(subscriber))
subscriber
}
"A zipAll for Flow" must {
"work for shorter left side" in assertAllStagesStopped {
val probe = TestSubscriber.manualProbe[(Int, String)]()
Source(1 to 4)
.zipAll(Source(List("A", "B", "C", "D", "E", "F")), -1, "MISSING")
.runWith(Sink.fromSubscriber(probe))
val subscription = probe.expectSubscription()
subscription.request(2)
probe.expectNext((1, "A"))
probe.expectNext((2, "B"))
subscription.request(1)
probe.expectNext((3, "C"))
subscription.request(1)
probe.expectNext((4, "D"))
subscription.request(2)
probe.expectNext((-1, "E"))
probe.expectNext((-1, "F"))
subscription.request(1)
probe.expectComplete()
}
"work for shorter right side" in assertAllStagesStopped {
val probe = TestSubscriber.manualProbe[(Int, String)]()
Source(1 to 8)
.zipAll(Source(List("A", "B", "C", "D", "E", "F")), -1, "MISSING")
.runWith(Sink.fromSubscriber(probe))
val subscription = probe.expectSubscription()
subscription.request(2)
probe.expectNext((1, "A"))
probe.expectNext((2, "B"))
subscription.request(1)
probe.expectNext((3, "C"))
subscription.request(1)
probe.expectNext((4, "D"))
subscription.request(2)
probe.expectNext((5, "E"))
probe.expectNext((6, "F"))
subscription.request(2)
probe.expectNext((7, "MISSING"))
probe.expectNext((8, "MISSING"))
subscription.request(1)
probe.expectComplete()
}
"work for equal lengths" in assertAllStagesStopped {
val probe = TestSubscriber.manualProbe[(Int, String)]()
Source(1 to 6)
.zipAll(Source(List("A", "B", "C", "D", "E", "F")), -1, "MISSING")
.runWith(Sink.fromSubscriber(probe))
val subscription = probe.expectSubscription()
subscription.request(2)
probe.expectNext((1, "A"))
probe.expectNext((2, "B"))
subscription.request(1)
probe.expectNext((3, "C"))
subscription.request(1)
probe.expectNext((4, "D"))
subscription.request(2)
probe.expectNext((5, "E"))
probe.expectNext((6, "F"))
subscription.request(1)
probe.expectComplete()
}
}
}

View file

@ -2548,6 +2548,37 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
})),
matF)
/**
* Combine the elements of current flow and the given [[Source]] into a stream of tuples.
*
* '''Emits when''' at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input).
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipAll[U, A >: Out](that: Graph[SourceShape[U], _], thisElem: A, thatElem: U): Flow[In, Pair[A, U], Mat] =
new Flow(delegate.zipAll(that, thisElem, thatElem).map { case (a, u) => Pair.create(a, u) })
/**
* Combine the elements of current flow and the given [[Source]] into a stream of tuples.
*
* @see [[#zipAll]]
*
* '''Emits when''' at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input).
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U)(
matF: (Mat, Mat2) => Mat3): Flow[In, Pair[A, U], Mat3] =
new Flow(delegate.zipAllMat(that, thisElem, thatElem)(matF).map { case (a, u) => Pair.create(a, u) })
/**
* Combine the elements of 2 streams into a stream of tuples, picking always the latest element of each.
*

View file

@ -19,6 +19,7 @@ import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.unchecked.uncheckedVariance
import akka.util.ccompat.JavaConverters._
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Future, Promise }
@ -1153,6 +1154,37 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out @uncheckedVariance Pair T, M2] =
this.viaMat(Flow.create[Out].zipMat(that, Keep.right[NotUsed, M]), matF)
/**
* Combine the elements of current flow and the given [[Source]] into a stream of tuples.
*
* '''Emits when''' at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input).
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipAll[U, A >: Out](that: Graph[SourceShape[U], _], thisElem: A, thatElem: U): Source[Pair[A, U], Mat] =
new Source(delegate.zipAll(that, thisElem, thatElem).map { case (a, u) => Pair.create(a, u) })
/**
* Combine the elements of current flow and the given [[Source]] into a stream of tuples.
*
* @see [[#zipAll]]
*
* '''Emits when''' at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input).
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U)(
matF: (Mat, Mat2) => Mat3): Source[Pair[A, U], Mat3] =
new Source(delegate.zipAllMat(that, thisElem, thatElem)(matF).map { case (a, u) => Pair.create(a, u) })
/**
* Combine the elements of 2 streams into a stream of tuples, picking always the latest element of each.
*

View file

@ -6,15 +6,14 @@ package akka.stream.javadsl
import akka.NotUsed
import akka.event.LoggingAdapter
import akka.japi.function
import akka.japi.{ function, Pair, Util }
import akka.stream._
import akka.util.ConstantFun
import akka.util.JavaDurationConverters._
import akka.util.ccompat.JavaConverters._
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.duration.FiniteDuration
import akka.japi.Util
import java.util.Comparator
import scala.compat.java8.FutureConverters._
@ -1542,6 +1541,23 @@ class SubFlow[In, Out, Mat](
def zip[T](source: Graph[SourceShape[T], _]): SubFlow[In, akka.japi.Pair[Out @uncheckedVariance, T], Mat] =
new SubFlow(delegate.zip(source).map { case (o, t) => akka.japi.Pair.create(o, t) })
/**
* Combine the elements of current flow and the given [[Source]] into a stream of tuples.
*
* '''Emits when''' at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input).
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipAll[U, A >: Out](
that: Graph[SourceShape[U], _],
thisElem: A,
thatElem: U): SubFlow[In, akka.japi.Pair[A, U], Mat] =
new SubFlow(delegate.zipAll(that, thisElem, thatElem).map { case (a, u) => Pair.create(a, u) })
/**
* Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples, picking always the latest element of each.
*

View file

@ -6,12 +6,12 @@ package akka.stream.javadsl
import akka.NotUsed
import akka.event.LoggingAdapter
import akka.japi.{ function, Util }
import akka.japi.{ function, Pair, Util }
import akka.stream._
import akka.util.ConstantFun
import akka.util.JavaDurationConverters._
import akka.util.ccompat.JavaConverters._
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.duration.FiniteDuration
import java.util.Comparator
@ -1521,6 +1521,23 @@ class SubSource[Out, Mat](
def zip[T](source: Graph[SourceShape[T], _]): SubSource[akka.japi.Pair[Out @uncheckedVariance, T], Mat] =
new SubSource(delegate.zip(source).map { case (o, t) => akka.japi.Pair.create(o, t) })
/**
* Combine the elements of current flow and the given [[Source]] into a stream of tuples.
*
* '''Emits when''' at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input).
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipAll[U, A >: Out](
that: Graph[SourceShape[U], _],
thisElem: A,
thatElem: U): SubSource[akka.japi.Pair[A, U], Mat] =
new SubSource(delegate.zipAll(that, thisElem, thatElem).map { case (a, u) => Pair.create(a, u) })
/**
* Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples, picking always the latest element of each.
*

View file

@ -2404,6 +2404,43 @@ trait FlowOps[+Out, +Mat] {
*/
def zip[U](that: Graph[SourceShape[U], _]): Repr[(Out, U)] = via(zipGraph(that))
/**
* Combine the elements of current flow and the given [[Source]] into a stream of tuples.
*
* '''Emits when''' at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input).
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipAll[U, A >: Out](that: Graph[SourceShape[U], _], thisElem: A, thatElem: U): Repr[(A, U)] = {
via(zipAllFlow(that, thisElem, thatElem))
}
protected def zipAllFlow[U, A >: Out, Mat2](
that: Graph[SourceShape[U], Mat2],
thisElem: A,
thatElem: U): Flow[Out @uncheckedVariance, (A, U), Mat2] = {
case object passedEnd
val passedEndSrc = Source.repeat(passedEnd)
val left: Flow[Out, Any, NotUsed] = Flow[A].concat(passedEndSrc)
val right: Source[Any, Mat2] = Source.fromGraph(that).concat(passedEndSrc)
val zipFlow: Flow[Out, (A, U), Mat2] = left
.zipMat(right)(Keep.right)
.takeWhile {
case (`passedEnd`, `passedEnd`) => false
case _ => true
}
.map {
case (`passedEnd`, r: U @unchecked) => (thisElem, r)
case (l: A @unchecked, `passedEnd`) => (l, thatElem)
case t: (A, U) @unchecked => t
}
zipFlow
}
protected def zipGraph[U, M](that: Graph[SourceShape[U], M]): Graph[FlowShape[Out @uncheckedVariance, (Out, U)], M] =
GraphDSL.create(that) { implicit b => r =>
val zip = b.add(Zip[Out, U]())
@ -2910,6 +2947,24 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
def zipMat[U, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) => Mat3): ReprMat[(Out, U), Mat3] =
viaMat(zipGraph(that))(matF)
/**
* Combine the elements of current flow and the given [[Source]] into a stream of tuples.
*
* @see [[#zipAll]]
*
* '''Emits when''' at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input).
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstream completes
*
* '''Cancels when''' downstream cancels
*/
def zipAllMat[U, Mat2, Mat3, A >: Out](that: Graph[SourceShape[U], Mat2], thisElem: A, thatElem: U)(
matF: (Mat, Mat2) => Mat3): ReprMat[(A, U), Mat3] = {
viaMat(zipAllFlow(that, thisElem, thatElem))(matF)
}
/**
* Put together the elements of current flow and the given [[Source]]
* into a stream of combined elements using a combiner function.