Examples for zipN, zipWithN and zipAll (#28510)

This commit is contained in:
Johan Andrén 2020-01-27 09:50:34 +01:00 committed by GitHub
parent a614f0bee7
commit 8d8fa29f47
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 234 additions and 25 deletions

View file

@ -16,6 +16,12 @@ Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*
Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.
See also:
* @ref:[zipAll](zipAll.md)
* @ref:[zipWith](zipWith.md)
* @ref:[zipWithIndex](zipWithIndex.md)
## Examples
Scala
@ -28,10 +34,10 @@ Java
@@@div { .callout }
**emits** when all of the inputs have an element available
**emits** when both of the inputs have an element available
**backpressures** when downstream backpressures
**backpressures** both upstreams when downstream backpressures but also on an upstream that has emitted an element until the other upstream has emitted an element
**completes** when any upstream completes
**completes** when either upstream completes
@@@

View file

@ -1,6 +1,6 @@
# zipAll
Combines all elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.
Combines elements from two sources into @scala[tuples] @java[*Pair*] handling early completion of either source.
@ref[Fan-in operators](../index.md#fan-in-operators)
@ -14,23 +14,33 @@ Combines all elements from each of multiple sources into @scala[tuples] @java[*P
## Description
Combines all elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.
Combines elements from two sources into @scala[tuples] @java[*Pair*] and passes downstream.
If either source completes, a default value is combined with each value from the other source until it completes.
See also:
* @ref:[zip](zip.md)
* @ref:[zipWith](zipWith.md)
* @ref:[zipWith](zipWith.md)
* @ref:[zipWithIndex](zipWithIndex.md)
## Example
Scala
: @@snip [FlowZipSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala) { #zip }
: @@snip [Zip.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Zip.scala) { #zipAll-simple }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #zip }
: @@snip [Zip.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Zip.java) { #zipAll-simple }
## Reactive Streams semantics
@@@div { .callout }
**emits** at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input).
**emits** at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input)
**backpressures** when downstream backpressures
**backpressures** both upstreams when downstream backpressures but also on an upstream that has emitted an element until the other upstream has emitted an element
**completes** when all upstream completes
**completes** when both upstream completes
@@@

View file

@ -17,6 +17,12 @@ Combines elements from multiple sources through a `combine` function and passes
Combines elements from multiple sources through a `combine` function and passes the
returned value downstream.
See also:
* @ref:[zip](zip.md)
* @ref:[zipAll](zipAll.md)
* @ref:[zipWithIndex](zipWithIndex.md)
## Examples
Scala
@ -31,7 +37,7 @@ Java
**emits** when all of the inputs have an element available
**backpressures** when downstream backpressures
**backpressures** both upstreams when downstream backpressures but also on an upstream that has emitted an element until the other upstream has emitted an element
**completes** when any upstream completes

View file

@ -16,6 +16,12 @@ Zips elements of current flow with its indices.
Zips elements of current flow with its indices.
See also:
* @ref:[zip](zip.md)
* @ref:[zipAll](zipAll.md)
* @ref:[zipWith](zipWith.md)
## Example
Scala

View file

@ -1,20 +1,40 @@
# Source.zipN
Combine the elements of multiple streams into a stream of sequences.
Combine the elements of multiple sources into a source of sequences of value.
@ref[Source operators](../index.md#source-operators)
@@@div { .group-scala }
## Signature
@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #zipN }
@@@
@apidoc[Source.zipN](Source$) { scala="#zipN[T](sources:scala.collection.immutable.Seq[akka.stream.scaladsl.Source[T,_]]):akka.stream.scaladsl.Source[scala.collection.immutable.Seq[T],akka.NotUsed]" java="#zipN(java.util.List)" }
## Description
Combine the elements of multiple streams into a stream of sequences.
Collects one element for every upstream and when all upstreams has emitted one element all of them are emitted downstream as a collection.
The element order in the downstream collection will be the same order as the sources were listed.
Since the sources are provided as a list the individual types are lost and @scala[the downstream sequences will end up containing the closest supertype shared by all sources]@java[you may have to make sure to have sources type casted to the same common supertype of all stream elements to use `zipN`].
See also:
* @ref:[zipWithN](zipWithN.md)
* @ref:[zip](../Source-or-Flow/zip.md)
* @ref:[zipAll](../Source-or-Flow/zipAll.md)
* @ref:[zipWith](../Source-or-Flow/zipWith.md)
* @ref:[zipWithIndex](../Source-or-Flow/zipWithIndex.md)
## Example
In this sample we zip a stream of characters, a stream of numbers and a stream of colours. Into a single `Source`
where each element is a @scala[`Vector`]@java[`List`] of `[character, number, color]`:
Scala
: @@snip [Zip.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Zip.scala) { #zipN-simple }
Java
: @@snip [Zip.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Zip.java) { #zipN-simple }
Note how it stops as soon as any of the original sources reaches its end.
## Reactive Streams semantics
@ -24,5 +44,7 @@ Combine the elements of multiple streams into a stream of sequences.
**completes** when any upstream completes
**backpressures** all upstreams when downstream backpressures but also on an upstream that has emitted an element until all other upstreams has emitted elements
@@@

View file

@ -4,18 +4,38 @@ Combine the elements of multiple streams into a stream of sequences using a comb
@ref[Source operators](../index.md#source-operators)
@@@div { .group-scala }
## Signature
@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #zipWithN }
@@@
@apidoc[Source.zipWithN](Source$) { scala="#zipWithN[T,O](zipper:scala.collection.immutable.Seq[T]=>O)(sources:scala.collection.immutable.Seq[akka.stream.scaladsl.Source[T,_]]):akka.stream.scaladsl.Source[O,akka.NotUsed]" java="#zipWithN(akka.japi.function.Function,java.util.List)" }
## Description
Combine the elements of multiple streams into a stream of sequences using a combiner function.
This operator is essentially the same as using @ref:[zipN](zipN.md) followed by @ref[map](../Source-or-Flow/map.md)
to turn the zipped sequence into an arbitrary object to emit downstream.
See also:
* @ref:[zipN](zipN.md)
* @ref:[zip](../Source-or-Flow/zip.md)
* @ref:[zipAll](../Source-or-Flow/zipAll.md)
* @ref:[zipWith](../Source-or-Flow/zipWith.md)
* @ref:[zipWithIndex](../Source-or-Flow/zipWithIndex.md)
## Example
In this sample we zip three streams of integers and for each zipped sequence of numbers we calculate the max value
and send downstream:
Scala
: @@snip [Zip.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Zip.scala) { #zipWithN-simple }
Java
: @@snip [Zip.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Zip.java) { #zipWithN-simple }
Note how it stops as soon as any of the original sources reaches its end.
## Reactive Streams semantics
@@@div { .callout }
@ -24,6 +44,8 @@ Combine the elements of multiple streams into a stream of sequences using a comb
**completes** when any upstream completes
**backpressures** all upstreams when downstream backpressures but also on an upstream that has emitted an element until all other upstreams has emitted elements
@@@

View file

@ -44,7 +44,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
|Source|<a name="unfoldasync"></a>@ref[unfoldAsync](Source/unfoldAsync.md)|Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`].|
|Source|<a name="unfoldresource"></a>@ref[unfoldResource](Source/unfoldResource.md)|Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.|
|Source|<a name="unfoldresourceasync"></a>@ref[unfoldResourceAsync](Source/unfoldResourceAsync.md)|Wrap any resource that can be opened, queried for next element and closed in an asynchronous way.|
|Source|<a name="zipn"></a>@ref[zipN](Source/zipN.md)|Combine the elements of multiple streams into a stream of sequences.|
|Source|<a name="zipn"></a>@ref[zipN](Source/zipN.md)|Combine the elements of multiple sources into a source of sequences of value.|
|Source|<a name="zipwithn"></a>@ref[zipWithN](Source/zipWithN.md)|Combine the elements of multiple streams into a stream of sequences using a combiner function.|
## Sink operators
@ -275,7 +275,7 @@ the inputs in different ways.
|Source/Flow|<a name="orelse"></a>@ref[orElse](Source-or-Flow/orElse.md)|If the primary source completes without emitting any elements, the elements from the secondary source are emitted.|
|Source/Flow|<a name="prepend"></a>@ref[prepend](Source-or-Flow/prepend.md)|Prepends the given source to the flow, consuming it until completion before the original source is consumed.|
|Source/Flow|<a name="zip"></a>@ref[zip](Source-or-Flow/zip.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.|
|Source/Flow|<a name="zipall"></a>@ref[zipAll](Source-or-Flow/zipAll.md)|Combines all elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.|
|Source/Flow|<a name="zipall"></a>@ref[zipAll](Source-or-Flow/zipAll.md)|Combines elements from two sources into @scala[tuples] @java[*Pair*] handling early completion of either source.|
|Source/Flow|<a name="ziplatest"></a>@ref[zipLatest](Source-or-Flow/zipLatest.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream, picking always the latest element of each.|
|Source/Flow|<a name="ziplatestwith"></a>@ref[zipLatestWith](Source-or-Flow/zipLatestWith.md)|Combines elements from multiple sources through a `combine` function and passes the returned value downstream, picking always the latest element of each.|
|Source/Flow|<a name="zipwith"></a>@ref[zipWith](Source-or-Flow/zipWith.md)|Combines elements from multiple sources through a `combine` function and passes the returned value downstream.|

View file

@ -0,0 +1,73 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.source;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Source;
import java.util.Arrays;
import java.util.List;
public class Zip {
void zipNSample() {
ActorSystem system = null;
// #zipN-simple
Source<Object, NotUsed> chars = Source.from(Arrays.asList("a", "b", "c", "e", "f"));
Source<Object, NotUsed> numbers = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6));
Source<Object, NotUsed> colors =
Source.from(Arrays.asList("red", "green", "blue", "yellow", "purple"));
Source.zipN(Arrays.asList(chars, numbers, colors)).runForeach(System.out::println, system);
// prints:
// [a, 1, red]
// [b, 2, green]
// [c, 3, blue]
// [e, 4, yellow]
// [f, 5, purple]
// #zipN-simple
}
void zipWithNSample() {
ActorSystem system = null;
// #zipWithN-simple
Source<Integer, NotUsed> numbers = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6));
Source<Integer, NotUsed> otherNumbers = Source.from(Arrays.asList(5, 2, 1, 4, 10, 4));
Source<Integer, NotUsed> andSomeOtherNumbers = Source.from(Arrays.asList(3, 7, 2, 1, 1));
Source.zipWithN(
(List<Integer> seq) -> seq.stream().mapToInt(i -> i).max().getAsInt(),
Arrays.asList(numbers, otherNumbers, andSomeOtherNumbers))
.runForeach(System.out::println, system);
// prints:
// 5
// 7
// 3
// 4
// 10
// #zipWithN-simple
}
void zipAllSample() {
ActorSystem system = null;
// #zipAll-simple
Source<Integer, NotUsed> numbers = Source.from(Arrays.asList(1, 2, 3, 4));
Source<String, NotUsed> letters = Source.from(Arrays.asList("a", "b", "c"));
numbers.zipAll(letters, -1, "default").runForeach(System.out::println, system);
// prints:
// Pair(1,a)
// Pair(2,b)
// Pair(3,c)
// Pair(4,default)
// #zipAll-simple
}
}

View file

@ -0,0 +1,64 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.source
import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.stream.scaladsl.Source
object Zip {
implicit val system: ActorSystem[_] = ???
def zipN(): Unit = {
// #zipN-simple
val chars = Source("a" :: "b" :: "c" :: "e" :: "f" :: Nil)
val numbers = Source(1 :: 2 :: 3 :: 4 :: 5 :: 6 :: Nil)
val colors = Source("red" :: "green" :: "blue" :: "yellow" :: "purple" :: Nil)
Source.zipN(chars :: numbers :: colors :: Nil).runForeach(println)
// prints:
// Vector(a, 1, red)
// Vector(b, 2, green)
// Vector(c, 3, blue)
// Vector(e, 4, yellow)
// Vector(f, 5, purple)
// #zipN-simple
}
def zipNWith(): Unit = {
// #zipWithN-simple
val numbers = Source(1 :: 2 :: 3 :: 4 :: 5 :: 6 :: Nil)
val otherNumbers = Source(5 :: 2 :: 1 :: 4 :: 10 :: 4 :: Nil)
val andSomeOtherNumbers = Source(3 :: 7 :: 2 :: 1 :: 1 :: Nil)
Source
.zipWithN((seq: Seq[Int]) => seq.max)(numbers :: otherNumbers :: andSomeOtherNumbers :: Nil)
.runForeach(println)
// prints:
// 5
// 7
// 3
// 4
// 10
// #zipWithN-simple
}
def zipAll() {
// #zipAll-simple
val numbers = Source(1 :: 2 :: 3 :: 4 :: Nil)
val letters = Source("a" :: "b" :: "c" :: Nil)
numbers.zipAll(letters, -1, "default").runForeach(println)
// prints:
// (1,a)
// (2,b)
// (3,c)
// (4,default)
// #zipAll-simple
}
}