diff --git a/akka-docs/src/main/paradox/stream/operators/Source/combine.md b/akka-docs/src/main/paradox/stream/operators/Source/combine.md index 07ab65dbce..5792b0110b 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/combine.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/combine.md @@ -14,22 +14,56 @@ Combine several sources, using a given strategy such as merge or concat, into on ## Description +Provides a way to create a "fan-in" of multiple sources without having to use the more advanced @ref:[GraphDSL](../../stream-graphs.md#constructing-graphs). + +The way the elements from the sources +are combined is pluggable through the `strategy` parameter which accepts a function +@scala[`Int => Graph[FanInShape]`]@java[`Integer -> Graph`] where the integer parameter specifies the number of sources +that the graph must accept. This makes it possible to use `combine` with the built-in `Concat` +and `Merge` by @scala[expanding their `apply` methods to functions]@java[using a method reference to their `create` methods], +but also to use an arbitrary strategy. + +Combine is most useful when you have more sources than 2 or want to use a custom operator, as there are more concise +operators for 2-source @ref:[concat](../Source-or-Flow/concat.md) and @ref:[merge](../Source-or-Flow/merge.md) + +Some of the built-in operators that can be used as strategy are: + + * @apidoc[akka.stream.*.Merge] + * @apidoc[akka.stream.(javadsl|scaladsl).Concat] + * @apidoc[MergePrioritized] + * @apidoc[MergeLatest] + * @apidoc[ZipN] + * @apidoc[ZipWithN] + +## Examples + +In this example we `Merge` three different +sources of integers. The three sources will immediately start contributing elements to the combined source. The individual +elements from each source will be in order but the order compared to elements from other sources is not deterministic: + +Scala +: @@snip [Combine.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Combine.scala) { #imports #source-combine-merge } + +Java +: @@snip [Combine.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Combine.java) { #imports #source-combine-merge } + + +If we instead use `Concat` the first source +will get to emit elements until it completes, then the second source until that completes and so on until all the sources has completed. + +Scala +: @@snip [Combine.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Combine.scala) { #source-combine-concat } + +Java +: @@snip [Combine.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Combine.java) { #source-combine-concat } + + ## Reactive Streams semantics @@@div { .callout } **emits** when there is demand, but depending on the strategy -**completes** when all sources has completed +**completes** depends on the strategy @@@ - - -## Examples - - -Scala -: @@snip [combine.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala) { #imports #combine } - - - diff --git a/akka-docs/src/test/java/jdocs/stream/operators/source/Combine.java b/akka-docs/src/test/java/jdocs/stream/operators/source/Combine.java new file mode 100644 index 0000000000..816ee7a143 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/source/Combine.java @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package jdocs.stream.operators.source; + +import akka.NotUsed; +import akka.actor.ActorSystem; +// #imports +import akka.stream.javadsl.Concat; +import akka.stream.javadsl.Merge; +import akka.stream.javadsl.Source; +// ... + +// #imports +import java.util.Collections; + +public class Combine { + + private static ActorSystem system; + + public void merge() throws Exception { + // #source-combine-merge + Source source1 = Source.range(1, 3); + Source source2 = Source.range(8, 10); + Source source3 = Source.range(12, 14); + + final Source combined = + Source.combine(source1, source2, Collections.singletonList(source3), Merge::create); + + combined.runForeach(System.out::println, system); + // could print (order between sources is not deterministic) + // 1 + // 12 + // 8 + // 9 + // 13 + // 14 + // 2 + // 10 + // 3 + // #source-combine-merge + } + + public void concat() throws Exception { + // #source-combine-concat + Source source1 = Source.range(1, 3); + Source source2 = Source.range(8, 10); + Source source3 = Source.range(12, 14); + + final Source sources = + Source.combine(source1, source2, Collections.singletonList(source3), Concat::create); + + sources.runForeach(System.out::println, system); + // prints (order is deterministic) + // 1 + // 2 + // 3 + // 8 + // 9 + // 10 + // 12 + // 13 + // 14 + // #source-combine-concat + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/source/Combine.scala b/akka-docs/src/test/scala/docs/stream/operators/source/Combine.scala new file mode 100644 index 0000000000..80b9d53ef0 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/source/Combine.scala @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package docs.stream.operators.source + +import akka.actor.ActorSystem +// #imports +import akka.stream.scaladsl.{ Concat, Merge, Source } +// ... + +// #imports + +object Combine { + implicit val system: ActorSystem = null + + def merge(): Unit = { + // #source-combine-merge + val source1 = Source(1 to 3) + val source2 = Source(8 to 10) + val source3 = Source(12 to 14) + val combined = Source.combine(source1, source2, source3)(Merge(_)) + combined.runForeach(println) + // could print (order between sources is not deterministic) + // 1 + // 12 + // 8 + // 9 + // 13 + // 14 + // 2 + // 10 + // 3 + // #source-combine-merge + } + + @throws[Exception] + def concat(): Unit = { + // #source-combine-concat + val source1 = Source(1 to 3) + val source2 = Source(8 to 10) + val source3 = Source(12 to 14) + val sources = Source.combine(source1, source2, source3)(Concat(_)) + sources.runForeach(println) + // prints (order is deterministic) + // 1 + // 2 + // 3 + // 8 + // 9 + // 10 + // 12 + // 13 + // 14 + // #source-combine-concat + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index c13370ffb5..7df85d5306 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -162,16 +162,10 @@ class SourceSpec extends StreamSpec with DefaultTimeout { } "combine using Concat strategy two inputs with simplified API" in { - //#combine val sources = immutable.Seq(Source(List(1, 2, 3)), Source(List(10, 20, 30))) - Source - .combine(sources(0), sources(1))(Concat(_)) - .runWith(Sink.seq) - // This will produce the Seq(1, 2, 3, 10, 20, 30) - //#combine - .futureValue should ===(immutable.Seq(1, 2, 3, 10, 20, 30)) - + Source.combine(sources(0), sources(1))(Concat(_)).runWith(Sink.seq).futureValue should ===( + immutable.Seq(1, 2, 3, 10, 20, 30)) } "combine from two inputs with combinedMat and take a materialized value" in { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 1a6c243aa0..89a356f65d 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -630,7 +630,7 @@ object Source { scaladsl.Source.setup((mat, attr) => factory(mat, attr).asScala).mapMaterializedValue(_.toJava).asJava /** - * Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`. + * Combines several sources with fan-in strategy like [[Merge]] or [[Concat]] into a single [[Source]]. */ def combine[T, U]( first: Source[T, _ <: Any], diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 1034f23220..28d8a4bcb0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -731,7 +731,7 @@ object Source { }, { case akka.actor.Status.Failure(cause) => cause }) /** - * Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`. + * Combines several sources with fan-in strategy like [[Merge]] or [[Concat]] into a single [[Source]]. */ def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)( strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed] = @@ -751,7 +751,7 @@ object Source { }) /** - * Combines two sources with fan-in strategy like `Merge` or `Concat` and returns `Source` with a materialized value. + * Combines several sources with fan-in strategy like [[Merge]] or [[Concat]] into a single [[Source]] with a materialized value. */ def combineMat[T, U, M1, M2, M](first: Source[T, M1], second: Source[T, M2])( strategy: Int => Graph[UniformFanInShape[T, U], NotUsed])(matF: (M1, M2) => M): Source[U, M] = {