diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeAll.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeAll.md new file mode 100644 index 0000000000..bd2d4257ca --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeAll.md @@ -0,0 +1,33 @@ +# mergeAll + +Merge multiple sources. + +@ref[Fan-in operators](../index.md#fan-in-operators) + +## Signature + +@apidoc[Source.mergeAll](Source) { scala="#mergeAll[U>:Out,M](those:immutable.Seq[akka.stream.Graph[akka.stream.SourceShape[U],M]],eagerComplete:Boolean):FlowOps.this.Repr[U]" java="#mergeAll(java.util.List,boolean)" } +@apidoc[Flow.mergeAll](Flow) { scala="#mergeAll[U>:Out,M](those:immutable.Seq[akka.stream.Graph[akka.stream.SourceShape[U],M]],eagerComplete:Boolean):FlowOps.this.Repr[U]" java="#mergeAll(java.util.List,boolean)" } + +## Description + +Merge multiple sources. Picks elements randomly if all sources has elements ready. + +## Example +Scala +: @@snip [FlowMergeSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeAllSpec.scala) { #merge-all } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #merge-all } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when one of the inputs has an element available + +**backpressures** when downstream backpressures + +**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index a4170a4f46..81cadb5d2e 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -273,6 +273,7 @@ the inputs in different ways. |Source/Flow|@ref[interleave](Source-or-Flow/interleave.md)|Emits a specifiable number of elements from the original source, then from the provided source and repeats.| |Source/Flow|@ref[interleaveAll](Source-or-Flow/interleaveAll.md)|Emits a specifiable number of elements from the original source, then from the provided sources and repeats.| |Source/Flow|@ref[merge](Source-or-Flow/merge.md)|Merge multiple sources.| +|Source/Flow|@ref[mergeAll](Source-or-Flow/mergeAll.md)|Merge multiple sources.| |Source/Flow|@ref[mergeLatest](Source-or-Flow/mergeLatest.md)|Merge multiple sources.| |Source/Flow|@ref[mergePreferred](Source-or-Flow/mergePreferred.md)|Merge multiple sources.| |Source/Flow|@ref[mergePrioritized](Source-or-Flow/mergePrioritized.md)|Merge multiple sources.| @@ -514,6 +515,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [mapError](Source-or-Flow/mapError.md) * [maybe](Source/maybe.md) * [merge](Source-or-Flow/merge.md) +* [mergeAll](Source-or-Flow/mergeAll.md) * [mergeLatest](Source-or-Flow/mergeLatest.md) * [mergePreferred](Source-or-Flow/mergePreferred.md) * [mergePrioritized](Source-or-Flow/mergePrioritized.md) diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 31109f5c8b..787e28ed6a 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -5,14 +5,16 @@ package jdocs.stream.operators; import akka.Done; +import akka.NotUsed; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.event.LogMarker; import akka.japi.Pair; +import akka.japi.function.Function2; import akka.japi.pf.PFBuilder; +import akka.stream.Attributes; import akka.stream.javadsl.Flow; -import akka.NotUsed; -import akka.japi.function.Function2; // #zip // #zip-with @@ -204,6 +206,17 @@ class SourceOrFlow { // #merge } + + void mergeAllExample() { + // #merge-all + Source sourceA = Source.from(Arrays.asList(1, 2, 3)); + Source sourceB = Source.from(Arrays.asList(4, 5, 6)); + Source sourceC = Source.from(Arrays.asList(7, 8, 9, 10)); + sourceA.mergeAll(Arrays.asList(sourceB, sourceC), false) + .runForeach(System.out::println, system); + // merging is not deterministic, can for example print 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 + // #merge-all + } void mergePreferredExample() { // #mergePreferred diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 563c18d1ba..cf2e496ae1 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -10,6 +10,7 @@ import akka.actor.ActorRef; import akka.actor.Cancellable; import akka.actor.Status; import akka.japi.Pair; +import akka.japi.Util; import akka.japi.function.*; import akka.japi.pf.PFBuilder; // #imports @@ -1125,6 +1126,20 @@ public class SourceTest extends StreamTest { probe.expectMsgAllOf("A", "B", "C", "D", "E", "F"); } + @Test + public void mustBeAbleToUseMerge3() { + final Source sourceA = Source.from(Arrays.asList(1, 2, 3)); + final Source sourceB = Source.from(Arrays.asList(4, 5, 6)); + final Source sourceC = Source.from(Arrays.asList(7, 8, 9)); + final TestSubscriber.Probe sub = + sourceA + .mergeAll(Arrays.asList(sourceB, sourceC), false) + .runWith(TestSink.probe(system), system); + sub.expectSubscription().request(9); + sub.expectNextUnorderedN(Util.immutableSeq(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9))) + .expectComplete(); + } + @Test public void mustBeAbleToUseInitialTimeout() { ExecutionException exception = diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeAllSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeAllSpec.scala new file mode 100644 index 0000000000..d93a37a7ab --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeAllSpec.scala @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2014-2022 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSink + +class FlowMergeAllSpec extends StreamSpec(""" + akka.stream.materializer.initial-input-buffer-size = 2 + """) with ScriptedTest { + + "Flow mergeAll" must { + "merge all upstream elements to its downstream" in { + val source1 = Source(1 to 3) + val source2 = Source(4 to 6) + val source3 = Source(7 to 10) + source1 + .mergeAll(List(source2, source3), eagerComplete = false) + .fold(Set.empty[Int])((set, i) => set + i) + .runWith(TestSink.probe) + .request(1) + .expectNext(Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .expectComplete(); + } + + "merge all elements of the first completed source to its downstream " in { + val source1 = Source(1 to 2) + val source2 = Source(3 to 6) + val source3 = Source(7 to 10) + val result = + source1.mergeAll(List(source2, source3), eagerComplete = true).runFold(Set.empty[Int])((set, i) => set + i) + result.futureValue should contain allElementsOf (Set(1, 2)) + } + + "merge single upstream elements to its downstream" in { + Source(1 to 3) + .mergeAll(Nil, eagerComplete = false) + .runWith(TestSink.probe) + .request(3) + .expectNext(1, 2, 3) + .expectComplete() + } + + "works in merge numbers example" in { + // #merge-all + val sourceA = Source(1 to 3) + val sourceB = Source(4 to 6) + val sourceC = Source(7 to 10) + sourceA.mergeAll(List(sourceB, sourceC), eagerComplete = false).runForeach(println) + // merging is not deterministic, can for example print 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 + // #merge-all + } + + } +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 62e6586cb4..4ae65b1537 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -11,6 +11,7 @@ import java.util.function.BiFunction import java.util.function.Supplier import scala.annotation.{ nowarn, varargs } import scala.annotation.unchecked.uncheckedVariance +import scala.collection.immutable import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters.RichOptionalGeneric import scala.concurrent.duration.FiniteDuration @@ -2977,6 +2978,28 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr eagerComplete: Boolean): javadsl.Flow[In, Out, M2] = new Flow(delegate.mergeMat(that, eagerComplete)(combinerToScala(matF))) + /** + * Merge the given [[Source]]s to this [[Flow]], taking elements as they arrive from input streams, + * picking randomly when several elements ready. + * + * '''Emits when''' one of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false` + * + * '''Cancels when''' downstream cancels + */ + def mergeAll( + those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], + eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = { + val seq = if (those != null) Util.immutableSeq(those).collect { + case source: Source[Out @unchecked, _] => source.asScala + case other => other + } else immutable.Seq() + new javadsl.Flow(delegate.mergeAll(seq, eagerComplete)) + } + /** * MergeLatest joins elements from N input streams into stream of lists of size N. * i-th element in list is the latest emitted element from i-th input stream. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 4921d8ab96..51577809e3 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -4,6 +4,22 @@ package akka.stream.javadsl +import java.util.Optional +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +import java.util.function.BiFunction +import java.util.stream.Collector + +import scala.annotation.unchecked.uncheckedVariance +import scala.collection.immutable +import scala.compat.java8.FutureConverters._ +import scala.compat.java8.OptionConverters._ +import scala.concurrent.ExecutionContext +import scala.util.Try + +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber + import akka._ import akka.actor.ActorRef import akka.actor.ClassicActorSystemProvider @@ -16,20 +32,6 @@ import akka.stream.impl.LinearTraversalBuilder import akka.stream.javadsl import akka.stream.scaladsl import akka.stream.scaladsl.SinkToCompletionStage -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber - -import java.util.Optional -import java.util.concurrent.CompletableFuture -import java.util.concurrent.CompletionStage -import java.util.function.BiFunction -import java.util.stream.Collector -import scala.annotation.unchecked.uncheckedVariance -import scala.collection.immutable -import scala.compat.java8.FutureConverters._ -import scala.compat.java8.OptionConverters._ -import scala.concurrent.ExecutionContext -import scala.util.Try /** Java API */ object Sink { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 7ffa1ecee1..6d13086b30 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1725,6 +1725,28 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ eagerComplete: Boolean): javadsl.Source[Out, M2] = new Source(delegate.mergeMat(that, eagerComplete)(combinerToScala(matF))) + /** + * Merge the given [[Source]]s to the current one, taking elements as they arrive from input streams, + * picking randomly when several elements ready. + * + * '''Emits when''' one of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false` + * + * '''Cancels when''' downstream cancels + */ + def mergeAll( + those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], + eagerComplete: Boolean): javadsl.Source[Out, Mat] = { + val seq = if (those != null) Util.immutableSeq(those).collect { + case source: Source[Out @unchecked, _] => source.asScala + case other => other + } else immutable.Seq() + new Source(delegate.mergeAll(seq, eagerComplete)) + } + /** * MergeLatest joins elements from N input streams into stream of lists of size N. * i-th element in list is the latest emitted element from i-th input stream. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 2b943e1ac0..ed905179ff 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -9,6 +9,7 @@ import java.util.concurrent.CompletionStage import java.util.function.Supplier import scala.annotation.{ nowarn, varargs } import scala.annotation.unchecked.uncheckedVariance +import scala.collection.immutable import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters.RichOptionalGeneric import scala.concurrent.duration.FiniteDuration @@ -1764,6 +1765,28 @@ class SubFlow[In, Out, Mat]( def merge(that: Graph[SourceShape[Out], _]): SubFlow[In, Out, Mat] = new SubFlow(delegate.merge(that)) + /** + * Merge the given [[Source]]s to this [[Flow]], taking elements as they arrive from input streams, + * picking randomly when several elements ready. + * + * '''Emits when''' one of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false` + * + * '''Cancels when''' downstream cancels + */ + def mergeAll( + those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], + eagerComplete: Boolean): SubFlow[In, Out, Mat] = { + val seq = if (those != null) Util.immutableSeq(those).collect { + case source: Source[Out @unchecked, _] => source.asScala + case other => other + } else immutable.Seq() + new SubFlow(delegate.mergeAll(seq, eagerComplete)) + } + /** * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index ce667d0e66..498c4b9b64 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -9,6 +9,7 @@ import java.util.concurrent.CompletionStage import java.util.function.Supplier import scala.annotation.{ nowarn, varargs } import scala.annotation.unchecked.uncheckedVariance +import scala.collection.immutable import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters.RichOptionalGeneric import scala.concurrent.duration.FiniteDuration @@ -1740,6 +1741,28 @@ class SubSource[Out, Mat]( def merge(that: Graph[SourceShape[Out], _]): SubSource[Out, Mat] = new SubSource(delegate.merge(that)) + /** + * Merge the given [[Source]]s to the current one, taking elements as they arrive from input streams, + * picking randomly when several elements ready. + * + * '''Emits when''' one of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false` + * + * '''Cancels when''' downstream cancels + */ + def mergeAll( + those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], + eagerComplete: Boolean): SubSource[Out, Mat] = { + val seq = if (those != null) Util.immutableSeq(those).collect { + case source: Source[Out @unchecked, _] => source.asScala + case other => other + } else immutable.Seq() + new SubSource(delegate.mergeAll(seq, eagerComplete)) + } + /** * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Source]]. * It first emits `segmentSize` number of elements from this flow to downstream, then - same amount for `that` source, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 9aa126af80..763694b5d1 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -2996,6 +2996,31 @@ trait FlowOps[+Out, +Mat] { FlowShape(merge.in(0), merge.out) } + /** + * Merge the given [[Source]]s to this [[Flow]], taking elements as they arrive from input streams, + * picking randomly when several elements ready. + * + * '''Emits when''' one of the inputs has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false` + * + * '''Cancels when''' downstream cancels + */ + def mergeAll[U >: Out](those: immutable.Seq[Graph[SourceShape[U], _]], eagerComplete: Boolean): Repr[U] = + those match { + case those if those.isEmpty => this.asInstanceOf[Repr[U]] + case _ => + via(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + val merge = b.add(Merge[U](those.size + 1, eagerComplete)) + for ((that, idx) <- those.zipWithIndex) + that ~> merge.in(idx + 1) + FlowShape(merge.in(0), merge.out) + }) + } + /** * MergeLatest joins elements from N input streams into stream of lists of size N. * i-th element in list is the latest emitted element from i-th input stream.