diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergePreferred.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergePreferred.md index c1ef3737cb..7882e61bf7 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergePreferred.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergePreferred.md @@ -6,9 +6,23 @@ Merge multiple sources. ## Signature +@apidoc[Source.mergePreferred](Source) { scala="#mergePreferred[U>:Out,M](that:akka.stream.Graph[akka.stream.SourceShape[U],M],preferred:Boolean,eagerComplete:Boolean):FlowOps.this.Repr[U]" java="#mergePreferred(akka.stream.Graph,boolean,boolean)" } +@apidoc[Flow.mergePreferred](Flow) { scala="#mergePreferred[U>:Out,M](that:akka.stream.Graph[akka.stream.SourceShape[U],M],preferred:Boolean,eagerComplete:Boolean):FlowOps.this.Repr[U]" java="#mergePreferred(akka.stream.Graph,boolean,boolean)" } + ## Description -Merge multiple sources. Prefer one source if all sources have elements ready. +Merge multiple sources. If all sources have elements ready, emit the preferred source first. Then emit the +preferred source again if another element is pushed. Otherwise, emit all the secondary sources. Repeat until streams +are empty. For the case with two sources, when `preferred` is set to true then prefer the right source, otherwise +prefer the left source (see examples). + +## Example +Scala +: @@snip [FlowMergeSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala) { #mergePreferred } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #mergePreferred } + ## Reactive Streams semantics @@ -21,4 +35,3 @@ Merge multiple sources. Prefer one source if all sources have elements ready. **completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) @@@ - diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergePrioritized.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergePrioritized.md index 378b967158..ed9df82abb 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergePrioritized.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergePrioritized.md @@ -6,10 +6,22 @@ Merge multiple sources. ## Signature +@apidoc[Source.mergePrioritized](Source) { scala="#mergePrioritized[U>:Out,M](that:akka.stream.Graph[akka.stream.SourceShape[U],M],leftPriority:Int,rightPriority:Int,eagerComplete:Boolean):FlowOps.this.Repr[U]" java="#mergePrioritized(akka.stream.Graph,int,int,boolean)" } +@apidoc[Flow.mergePrioritized](Flow) { scala="#mergePrioritized[U>:Out,M](that:akka.stream.Graph[akka.stream.SourceShape[U],M],leftPriority:Int,rightPriority:Int,eagerComplete:Boolean):FlowOps.this.Repr[U]" java="#mergePrioritized(akka.stream.Graph,int,int,boolean)" } + ## Description -Merge multiple sources. Prefer sources depending on priorities if all sources has elements ready. If a subset of all -sources has elements ready the relative priorities for those sources are used to prioritise. +Merge multiple sources. Prefer sources depending on priorities if all sources have elements ready. If a subset of all +sources have elements ready the relative priorities for those sources are used to prioritize. For example, when used +with only two sources, the left source has a probability of `(leftPriority) / (leftPriority + rightPriority)` of being +prioritized and similarly for the right source. The priorities for each source must be positive integers. + +## Example +Scala +: @@snip [FlowMergeSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala) { #mergePrioritized } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #mergePrioritized } ## Reactive Streams semantics diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 91e5d605d1..97df80130d 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -150,6 +150,35 @@ class SourceOrFlow { // #merge } + void mergePreferredExample() { + // #mergePreferred + Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); + Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); + + sourceA.mergePreferred(sourceB, false, false).runWith(Sink.foreach(System.out::print), system); + // prints 1, 10, ... since both sources have their first element ready and the left source is + // preferred + + sourceA.mergePreferred(sourceB, true, false).runWith(Sink.foreach(System.out::print), system); + // prints 10, 1, ... since both sources have their first element ready and the right source is + // preferred + // #mergePreferred + } + + void mergePrioritizedExample() { + // #mergePrioritized + Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); + Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); + + sourceA + .mergePrioritized(sourceB, 99, 1, false) + .runWith(Sink.foreach(System.out::print), system); + // prints e.g. 1, 10, 2, 3, 4, 20, 30, 40 since both sources have their first element ready and + // the left source has higher priority – if both sources have elements ready, sourceA has a + // 99% chance of being picked next while sourceB has a 1% chance + // #mergePrioritized + } + void mergeSortedExample() { // #merge-sorted Source sourceA = Source.from(Arrays.asList(1, 3, 5, 7)); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala index ed7274658c..6a6fda7a7f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala @@ -129,6 +129,35 @@ class FlowMergeSpec extends BaseTwoStreamsSetup { } + "works in number example for mergePreferred" in { + //#mergePreferred + import akka.stream.scaladsl.{ Sink, Source } + + val sourceA = Source(List(1, 2, 3, 4)) + val sourceB = Source(List(10, 20, 30, 40)) + + sourceA.mergePreferred(sourceB, false).runWith(Sink.foreach(println)) + // prints 1, 10, ... since both sources have their first element ready and the left source is preferred + + sourceA.mergePreferred(sourceB, true).runWith(Sink.foreach(println)) + // prints 10, 1, ... since both sources have their first element ready and the right source is preferred + //#mergePreferred + } + + "works in number example for mergePrioritized" in { + //#mergePrioritized + import akka.stream.scaladsl.{ Sink, Source } + + val sourceA = Source(List(1, 2, 3, 4)) + val sourceB = Source(List(10, 20, 30, 40)) + + sourceA.mergePrioritized(sourceB, 99, 1).runWith(Sink.foreach(println)) + // prints e.g. 1, 10, 2, 3, 4, 20, 30, 40 since both sources have their first element ready and the left source + // has higher priority – if both sources have elements ready, sourceA has a 99% chance of being picked next + // while sourceB has a 1% chance + //#mergePrioritized + } + "works in number example for merge sorted" in { //#merge-sorted import akka.stream.scaladsl.Sink