diff --git a/akka-docs/src/main/paradox/stream/operators/Source/mergePrioritizedN.md b/akka-docs/src/main/paradox/stream/operators/Source/mergePrioritizedN.md new file mode 100644 index 0000000000..5ae97289df --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source/mergePrioritizedN.md @@ -0,0 +1,37 @@ +# mergePrioritizedN + +Merge multiple sources with priorities. + +@ref[Fan-in operators](../index.md#fan-in-operators) + +## Signature + +@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #mergePrioritized } + +## Description + +Merge multiple sources. Prefer sources depending on priorities if all sources have elements ready. If a subset of all +sources have elements ready the relative priorities for those sources are used to prioritize. For example, when used +with only three sources `sourceA`, `sourceB` and `sourceC`, the `sourceA` has a probability of `(priorityOfA) / (priorityOfA + priorityOfB + priorityOfC)` of being +prioritized and similarly for the rest of the sources. The priorities for each source must be positive integers. + +## Example +Scala +: @@snip [FlowMergeSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala) { #mergePrioritizedN } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #mergePrioritizedN } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when one of the inputs has an element available, preferring inputs based on their priorities if multiple have elements available + +**backpressures** when downstream backpressures + +**completes** when all upstreams complete (or when any upstream completes if `eagerComplete=true`.) + +**Cancels when** downstream cancels +@@@ + diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 92cb6dd0a3..d198d003fc 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -7,6 +7,7 @@ package jdocs.stream.operators; import akka.Done; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.japi.Pair; import akka.japi.pf.PFBuilder; import akka.stream.javadsl.Flow; @@ -202,6 +203,24 @@ class SourceOrFlow { // #mergePrioritized } + void mergePrioritizedNExample() { + // #mergePrioritizedN + Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); + Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); + Source sourceC = Source.from(Arrays.asList(100, 200, 300, 400)); + List,Integer>> sourcesAndPriorities = Arrays.asList( + new Pair<>(sourceA, 9900), + new Pair<>(sourceB, 99), + new Pair<>(sourceC, 1)); + Source.mergePrioritizedN(sourcesAndPriorities, false).runForeach(System.out::println, system); + // prints e.g. 1, 100, 2, 3, 4, 10, 20, 30, 40, 200, 300, 400 since both sources have their + // first element ready and + // the left sourceA has higher priority - if both sources have elements ready, sourceA has a 99% + // chance of being picked next + // while sourceB has a 0.99% chance and sourceC has a 0.01% chance + // #mergePrioritizedN + } + void mergeSortedExample() { // #merge-sorted Source sourceA = Source.from(Arrays.asList(1, 3, 5, 7)); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala index eb3541fa0b..c0798a211e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala @@ -4,10 +4,9 @@ package akka.stream.scaladsl -import org.reactivestreams.Publisher - import akka.stream.testkit._ import akka.stream.testkit.scaladsl.StreamTestKit._ +import org.reactivestreams.Publisher class FlowMergeSpec extends BaseTwoStreamsSetup { @@ -158,10 +157,26 @@ class FlowMergeSpec extends BaseTwoStreamsSetup { //#mergePrioritized } + "works in number example for mergePrioritizedN" in { + //#mergePrioritizedN + import akka.stream.scaladsl.{ Sink, Source } + + val sourceA = Source(List(1, 2, 3, 4)) + val sourceB = Source(List(10, 20, 30, 40)) + val sourceC = Source(List(100, 200, 300, 400)) + + Source + .mergePrioritizedN(Seq((sourceA, 9900), (sourceB, 99), (sourceC, 1)), eagerComplete = false) + .runWith(Sink.foreach(println)) + // prints e.g. 1, 100, 2, 3, 4, 10, 20, 30, 40, 200, 300, 400 since both sources have their first element ready and + // the left sourceA has higher priority - if both sources have elements ready, sourceA has a 99% chance of being picked next + // while sourceB has a 0.99% chance and sourceC has a 0.01% chance + //#mergePrioritizedN + } + "works in number example for merge sorted" in { //#merge-sorted - import akka.stream.scaladsl.Sink - import akka.stream.scaladsl.Source + import akka.stream.scaladsl.{ Sink, Source } val sourceA = Source(List(1, 3, 5, 7)) val sourceB = Source(List(2, 4, 6, 8)) @@ -178,8 +193,7 @@ class FlowMergeSpec extends BaseTwoStreamsSetup { "works in number example for merge" in { //#merge - import akka.stream.scaladsl.Sink - import akka.stream.scaladsl.Source + import akka.stream.scaladsl.{ Sink, Source } val sourceA = Source(List(1, 2, 3, 4)) val sourceB = Source(List(10, 20, 30, 40)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePrioritizedNSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePrioritizedNSpec.scala new file mode 100644 index 0000000000..ba18aeffe6 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePrioritizedNSpec.scala @@ -0,0 +1,134 @@ +/* + * Copyright (C) 2018-2022 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.NotUsed +import akka.stream.testkit.TestSubscriber.ManualProbe +import akka.stream.testkit.{ StreamSpec, TestSubscriber } + +import scala.concurrent.duration._ + +class GraphMergePrioritizedNSpec extends StreamSpec { + + "merge prioritized" must { + + "stream data from all sources" in { + val source1 = Source.fromIterator(() => (1 to 3).iterator) + val source2 = Source.fromIterator(() => (4 to 6).iterator) + val source3 = Source.fromIterator(() => (7 to 9).iterator) + + val sourcesAndPriorities = Seq((source1, 6), (source2, 3), (source3, 1)); + val probe = TestSubscriber.manualProbe[Int]() + threeSourceMerge(sourcesAndPriorities, probe).run() + + val subscription = probe.expectSubscription() + + var collected = Seq.empty[Int] + for (_ <- 1 to 9) { + subscription.request(1) + collected :+= probe.expectNext() + } + + collected.toSet should be(Set(1, 2, 3, 4, 5, 6, 7, 8, 9)) + probe.expectComplete() + } + + "stream data with priority" in { + val elementCount = 20000 + val source1 = Source.fromIterator(() => Iterator.continually(1).take(elementCount)) + val source2 = Source.fromIterator(() => Iterator.continually(2).take(elementCount)) + val source3 = Source.fromIterator(() => Iterator.continually(3).take(elementCount)) + + val sourcesAndPriorities = Seq((source1, 6), (source2, 3), (source3, 1)); + + val probe = TestSubscriber.manualProbe[Int]() + + threeSourceMerge(sourcesAndPriorities, probe).run() + + val subscription = probe.expectSubscription() + + val builder = Seq.newBuilder[Int] + for (_ <- 1 to elementCount) { + subscription.request(1) + builder += probe.expectNext() + } + val collected = builder.result() + + val ones = collected.count(_ == 1).toDouble + val twos = collected.count(_ == 2).toDouble + val threes = collected.count(_ == 3).toDouble + + (ones / twos) should ===(2d +- 1) + (ones / threes) should ===(6d +- 1) + (twos / threes) should ===(3d +- 1) + } + + "stream data when only one source produces" in { + val elementCount = 10 + val source1 = Source.fromIterator(() => Iterator.continually(1).take(elementCount)) + val source2 = Source.fromIterator[Int](() => Iterator.empty) + val source3 = Source.fromIterator[Int](() => Iterator.empty) + + val sourcesAndPriorities = Seq((source1, 6), (source2, 3), (source3, 1)); + + val probe = TestSubscriber.manualProbe[Int]() + + threeSourceMerge(sourcesAndPriorities, probe).run() + + val subscription = probe.expectSubscription() + + var collected = Seq.empty[Int] + for (_ <- 1 to elementCount) { + subscription.request(1) + collected :+= probe.expectNext() + } + + val ones = collected.count(_ == 1) + val twos = collected.count(_ == 2) + val threes = collected.count(_ == 3) + + ones shouldEqual elementCount + twos shouldEqual 0 + threes shouldEqual 0 + } + + "stream data with priority when only two sources produce" in { + val elementCount = 20000 + val source1 = Source.fromIterator(() => Iterator.continually(1).take(elementCount)) + val source2 = Source.fromIterator(() => Iterator.continually(2).take(elementCount)) + val source3 = Source.fromIterator[Int](() => Iterator.empty) + + val sourcesAndPriorities = Seq((source1, 6), (source2, 3), (source3, 1)); + + val probe = TestSubscriber.manualProbe[Int]() + + threeSourceMerge(sourcesAndPriorities, probe).run() + + val subscription = probe.expectSubscription() + + val builder = Vector.newBuilder[Int] + for (_ <- 1 to elementCount) { + subscription.request(1) + builder += probe.expectNext() + } + val collected = builder.result() + + val ones = collected.count(_ == 1).toDouble + val twos = collected.count(_ == 2).toDouble + val threes = collected.count(_ == 3) + + threes shouldEqual 0 + (ones / twos) should ===(2d +- 1) + } + } + + private def threeSourceMerge[T](sourceAndPriorities: Seq[(Source[T, NotUsed], Int)], probe: ManualProbe[T]) = { + + Source + .mergePrioritizedN(sourceAndPriorities, eagerComplete = false) + .initialDelay(50.millis) + .to(Sink.fromSubscriber(probe)) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index d16845b551..486e913afb 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -879,6 +879,28 @@ object Source { def upcast[SuperOut, Out <: SuperOut, Mat](source: Source[Out, Mat]): Source[SuperOut, Mat] = source.asInstanceOf[Source[SuperOut, Mat]] + /** + * Merge multiple [[Source]]s. Prefer the sources depending on the 'priority' parameters. + * The provided sources and priorities must have the same size and order. + * + * '''emits''' when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available + * + * '''backpressures''' when downstream backpressures + * + * '''completes''' when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + * + * '''Cancels when''' downstream cancels + */ + def mergePrioritizedN[T]( + sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any], java.lang.Integer]], + eagerComplete: Boolean): javadsl.Source[T, NotUsed] = { + val seq = + if (sourcesAndPriorities != null) + Util.immutableSeq(sourcesAndPriorities).map(pair => (pair.first.asScala, pair.second.intValue())) + else + immutable.Seq() + new Source(scaladsl.Source.mergePrioritizedN(seq, eagerComplete)) + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index c285bac0d9..2923f18934 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -961,4 +961,27 @@ object Source { close: (S) => Future[Done]): Source[T, NotUsed] = Source.fromGraph(new UnfoldResourceSourceAsync(create, read, close)) + /** + * Merge multiple [[Source]]s. Prefer the sources depending on the 'priority' parameters. + * The provided sources and priorities must have the same size and order. + * + * '''emits''' when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available + * + * '''backpressures''' when downstream backpressures + * + * '''completes''' when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + * + * '''Cancels when''' downstream cancels + */ + def mergePrioritizedN[T]( + sourcesAndPriorities: immutable.Seq[(Source[T, _], Int)], + eagerComplete: Boolean): Source[T, NotUsed] = { + sourcesAndPriorities match { + case immutable.Seq() => Source.empty + case immutable.Seq((source, _)) => source.mapMaterializedValue(_ => NotUsed) + case sourcesAndPriorities => + val (sources, priorities) = sourcesAndPriorities.unzip + combine(sources.head, sources(1), sources.drop(2): _*)(_ => MergePrioritized(priorities, eagerComplete)) + } + } }