diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zip.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zip.md index ac64695c1d..dd4a64a100 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zip.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zip.md @@ -16,6 +16,12 @@ Combines elements from each of multiple sources into @scala[tuples] @java[*Pair* Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream. +See also: + + * @ref:[zipAll](zipAll.md) + * @ref:[zipWith](zipWith.md) + * @ref:[zipWithIndex](zipWithIndex.md) + ## Examples Scala @@ -28,10 +34,10 @@ Java @@@div { .callout } -**emits** when all of the inputs have an element available +**emits** when both of the inputs have an element available -**backpressures** when downstream backpressures +**backpressures** both upstreams when downstream backpressures but also on an upstream that has emitted an element until the other upstream has emitted an element -**completes** when any upstream completes +**completes** when either upstream completes @@@ diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipAll.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipAll.md index 5ef66253ef..9effd50ca4 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipAll.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipAll.md @@ -1,6 +1,6 @@ # zipAll -Combines all elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream. +Combines elements from two sources into @scala[tuples] @java[*Pair*] handling early completion of either source. @ref[Fan-in operators](../index.md#fan-in-operators) @@ -14,23 +14,33 @@ Combines all elements from each of multiple sources into @scala[tuples] @java[*P ## Description -Combines all elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream. +Combines elements from two sources into @scala[tuples] @java[*Pair*] and passes downstream. +If either source completes, a default value is combined with each value from the other source until it completes. + +See also: + + * @ref:[zip](zip.md) + * @ref:[zipWith](zipWith.md) + * @ref:[zipWith](zipWith.md) + * @ref:[zipWithIndex](zipWithIndex.md) ## Example + Scala -: @@snip [FlowZipSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala) { #zip } +: @@snip [Zip.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Zip.scala) { #zipAll-simple } Java -: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #zip } +: @@snip [Zip.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Zip.java) { #zipAll-simple } + ## Reactive Streams semantics @@@div { .callout } -**emits** at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input). +**emits** at first emits when both inputs emit, and then as long as any input emits (coupled to the default value of the completed input) -**backpressures** when downstream backpressures +**backpressures** both upstreams when downstream backpressures but also on an upstream that has emitted an element until the other upstream has emitted an element -**completes** when all upstream completes +**completes** when both upstream completes @@@ diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipWith.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipWith.md index 5da9f6ebc8..f04ccd2dad 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipWith.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipWith.md @@ -17,6 +17,12 @@ Combines elements from multiple sources through a `combine` function and passes Combines elements from multiple sources through a `combine` function and passes the returned value downstream. +See also: + + * @ref:[zip](zip.md) + * @ref:[zipAll](zipAll.md) + * @ref:[zipWithIndex](zipWithIndex.md) + ## Examples Scala @@ -31,7 +37,7 @@ Java **emits** when all of the inputs have an element available -**backpressures** when downstream backpressures +**backpressures** both upstreams when downstream backpressures but also on an upstream that has emitted an element until the other upstream has emitted an element **completes** when any upstream completes diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipWithIndex.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipWithIndex.md index cf024fb68c..6c0e0ff53a 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipWithIndex.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipWithIndex.md @@ -16,6 +16,12 @@ Zips elements of current flow with its indices. Zips elements of current flow with its indices. +See also: + + * @ref:[zip](zip.md) + * @ref:[zipAll](zipAll.md) + * @ref:[zipWith](zipWith.md) + ## Example Scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source/zipN.md b/akka-docs/src/main/paradox/stream/operators/Source/zipN.md index aa8ad9c1c1..335f163f8c 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/zipN.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/zipN.md @@ -1,20 +1,40 @@ # Source.zipN -Combine the elements of multiple streams into a stream of sequences. +Combine the elements of multiple sources into a source of sequences of value. @ref[Source operators](../index.md#source-operators) -@@@div { .group-scala } - ## Signature -@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #zipN } - -@@@ +@apidoc[Source.zipN](Source$) { scala="#zipN[T](sources:scala.collection.immutable.Seq[akka.stream.scaladsl.Source[T,_]]):akka.stream.scaladsl.Source[scala.collection.immutable.Seq[T],akka.NotUsed]" java="#zipN(java.util.List)" } ## Description -Combine the elements of multiple streams into a stream of sequences. +Collects one element for every upstream and when all upstreams has emitted one element all of them are emitted downstream as a collection. +The element order in the downstream collection will be the same order as the sources were listed. + +Since the sources are provided as a list the individual types are lost and @scala[the downstream sequences will end up containing the closest supertype shared by all sources]@java[you may have to make sure to have sources type casted to the same common supertype of all stream elements to use `zipN`]. + +See also: + + * @ref:[zipWithN](zipWithN.md) + * @ref:[zip](../Source-or-Flow/zip.md) + * @ref:[zipAll](../Source-or-Flow/zipAll.md) + * @ref:[zipWith](../Source-or-Flow/zipWith.md) + * @ref:[zipWithIndex](../Source-or-Flow/zipWithIndex.md) + +## Example + +In this sample we zip a stream of characters, a stream of numbers and a stream of colours. Into a single `Source` +where each element is a @scala[`Vector`]@java[`List`] of `[character, number, color]`: + +Scala +: @@snip [Zip.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Zip.scala) { #zipN-simple } + +Java +: @@snip [Zip.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Zip.java) { #zipN-simple } + +Note how it stops as soon as any of the original sources reaches its end. ## Reactive Streams semantics @@ -24,5 +44,7 @@ Combine the elements of multiple streams into a stream of sequences. **completes** when any upstream completes +**backpressures** all upstreams when downstream backpressures but also on an upstream that has emitted an element until all other upstreams has emitted elements + @@@ diff --git a/akka-docs/src/main/paradox/stream/operators/Source/zipWithN.md b/akka-docs/src/main/paradox/stream/operators/Source/zipWithN.md index a76e76c58f..64efeca65f 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/zipWithN.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/zipWithN.md @@ -4,18 +4,38 @@ Combine the elements of multiple streams into a stream of sequences using a comb @ref[Source operators](../index.md#source-operators) -@@@div { .group-scala } - ## Signature -@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #zipWithN } - -@@@ +@apidoc[Source.zipWithN](Source$) { scala="#zipWithN[T,O](zipper:scala.collection.immutable.Seq[T]=>O)(sources:scala.collection.immutable.Seq[akka.stream.scaladsl.Source[T,_]]):akka.stream.scaladsl.Source[O,akka.NotUsed]" java="#zipWithN(akka.japi.function.Function,java.util.List)" } ## Description Combine the elements of multiple streams into a stream of sequences using a combiner function. +This operator is essentially the same as using @ref:[zipN](zipN.md) followed by @ref[map](../Source-or-Flow/map.md) +to turn the zipped sequence into an arbitrary object to emit downstream. + +See also: + + * @ref:[zipN](zipN.md) + * @ref:[zip](../Source-or-Flow/zip.md) + * @ref:[zipAll](../Source-or-Flow/zipAll.md) + * @ref:[zipWith](../Source-or-Flow/zipWith.md) + * @ref:[zipWithIndex](../Source-or-Flow/zipWithIndex.md) + +## Example + +In this sample we zip three streams of integers and for each zipped sequence of numbers we calculate the max value +and send downstream: + +Scala +: @@snip [Zip.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Zip.scala) { #zipWithN-simple } + +Java +: @@snip [Zip.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Zip.java) { #zipWithN-simple } + +Note how it stops as soon as any of the original sources reaches its end. + ## Reactive Streams semantics @@@div { .callout } @@ -24,6 +44,8 @@ Combine the elements of multiple streams into a stream of sequences using a comb **completes** when any upstream completes +**backpressures** all upstreams when downstream backpressures but also on an upstream that has emitted an element until all other upstreams has emitted elements + @@@ diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 551a532e16..ae90373ca9 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -44,7 +44,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] |Source|@ref[unfoldAsync](Source/unfoldAsync.md)|Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`].| |Source|@ref[unfoldResource](Source/unfoldResource.md)|Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.| |Source|@ref[unfoldResourceAsync](Source/unfoldResourceAsync.md)|Wrap any resource that can be opened, queried for next element and closed in an asynchronous way.| -|Source|@ref[zipN](Source/zipN.md)|Combine the elements of multiple streams into a stream of sequences.| +|Source|@ref[zipN](Source/zipN.md)|Combine the elements of multiple sources into a source of sequences of value.| |Source|@ref[zipWithN](Source/zipWithN.md)|Combine the elements of multiple streams into a stream of sequences using a combiner function.| ## Sink operators @@ -275,7 +275,7 @@ the inputs in different ways. |Source/Flow|@ref[orElse](Source-or-Flow/orElse.md)|If the primary source completes without emitting any elements, the elements from the secondary source are emitted.| |Source/Flow|@ref[prepend](Source-or-Flow/prepend.md)|Prepends the given source to the flow, consuming it until completion before the original source is consumed.| |Source/Flow|@ref[zip](Source-or-Flow/zip.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.| -|Source/Flow|@ref[zipAll](Source-or-Flow/zipAll.md)|Combines all elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.| +|Source/Flow|@ref[zipAll](Source-or-Flow/zipAll.md)|Combines elements from two sources into @scala[tuples] @java[*Pair*] handling early completion of either source.| |Source/Flow|@ref[zipLatest](Source-or-Flow/zipLatest.md)|Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream, picking always the latest element of each.| |Source/Flow|@ref[zipLatestWith](Source-or-Flow/zipLatestWith.md)|Combines elements from multiple sources through a `combine` function and passes the returned value downstream, picking always the latest element of each.| |Source/Flow|@ref[zipWith](Source-or-Flow/zipWith.md)|Combines elements from multiple sources through a `combine` function and passes the returned value downstream.| diff --git a/akka-docs/src/test/java/jdocs/stream/operators/source/Zip.java b/akka-docs/src/test/java/jdocs/stream/operators/source/Zip.java new file mode 100644 index 0000000000..91c421057f --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/source/Zip.java @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package jdocs.stream.operators.source; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.javadsl.Source; + +import java.util.Arrays; +import java.util.List; + +public class Zip { + + void zipNSample() { + ActorSystem system = null; + + // #zipN-simple + Source chars = Source.from(Arrays.asList("a", "b", "c", "e", "f")); + Source numbers = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6)); + Source colors = + Source.from(Arrays.asList("red", "green", "blue", "yellow", "purple")); + + Source.zipN(Arrays.asList(chars, numbers, colors)).runForeach(System.out::println, system); + // prints: + // [a, 1, red] + // [b, 2, green] + // [c, 3, blue] + // [e, 4, yellow] + // [f, 5, purple] + + // #zipN-simple + } + + void zipWithNSample() { + ActorSystem system = null; + + // #zipWithN-simple + Source numbers = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6)); + Source otherNumbers = Source.from(Arrays.asList(5, 2, 1, 4, 10, 4)); + Source andSomeOtherNumbers = Source.from(Arrays.asList(3, 7, 2, 1, 1)); + + Source.zipWithN( + (List seq) -> seq.stream().mapToInt(i -> i).max().getAsInt(), + Arrays.asList(numbers, otherNumbers, andSomeOtherNumbers)) + .runForeach(System.out::println, system); + // prints: + // 5 + // 7 + // 3 + // 4 + // 10 + + // #zipWithN-simple + } + + void zipAllSample() { + ActorSystem system = null; + // #zipAll-simple + + Source numbers = Source.from(Arrays.asList(1, 2, 3, 4)); + Source letters = Source.from(Arrays.asList("a", "b", "c")); + + numbers.zipAll(letters, -1, "default").runForeach(System.out::println, system); + // prints: + // Pair(1,a) + // Pair(2,b) + // Pair(3,c) + // Pair(4,default) + // #zipAll-simple + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/source/Zip.scala b/akka-docs/src/test/scala/docs/stream/operators/source/Zip.scala new file mode 100644 index 0000000000..5cccb934b5 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/source/Zip.scala @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package docs.stream.operators.source + +import akka.NotUsed +import akka.actor.typed.ActorSystem +import akka.stream.scaladsl.Source + +object Zip { + + implicit val system: ActorSystem[_] = ??? + + def zipN(): Unit = { + // #zipN-simple + val chars = Source("a" :: "b" :: "c" :: "e" :: "f" :: Nil) + val numbers = Source(1 :: 2 :: 3 :: 4 :: 5 :: 6 :: Nil) + val colors = Source("red" :: "green" :: "blue" :: "yellow" :: "purple" :: Nil) + + Source.zipN(chars :: numbers :: colors :: Nil).runForeach(println) + // prints: + // Vector(a, 1, red) + // Vector(b, 2, green) + // Vector(c, 3, blue) + // Vector(e, 4, yellow) + // Vector(f, 5, purple) + + // #zipN-simple + } + + def zipNWith(): Unit = { + // #zipWithN-simple + val numbers = Source(1 :: 2 :: 3 :: 4 :: 5 :: 6 :: Nil) + val otherNumbers = Source(5 :: 2 :: 1 :: 4 :: 10 :: 4 :: Nil) + val andSomeOtherNumbers = Source(3 :: 7 :: 2 :: 1 :: 1 :: Nil) + + Source + .zipWithN((seq: Seq[Int]) => seq.max)(numbers :: otherNumbers :: andSomeOtherNumbers :: Nil) + .runForeach(println) + // prints: + // 5 + // 7 + // 3 + // 4 + // 10 + + // #zipWithN-simple + } + + def zipAll() { + // #zipAll-simple + val numbers = Source(1 :: 2 :: 3 :: 4 :: Nil) + val letters = Source("a" :: "b" :: "c" :: Nil) + + numbers.zipAll(letters, -1, "default").runForeach(println) + // prints: + // (1,a) + // (2,b) + // (3,c) + // (4,default) + // #zipAll-simple + } +}