+str Add Source#mergePrioritizedN. (#31189)

This commit is contained in:
kerr 2022-03-14 15:05:08 +08:00 committed by GitHub
parent 9f7f7027f6
commit cbfed3b1de
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 255 additions and 6 deletions

View file

@ -0,0 +1,37 @@
# mergePrioritizedN
Merge multiple sources with priorities.
@ref[Fan-in operators](../index.md#fan-in-operators)
## Signature
@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #mergePrioritized }
## Description
Merge multiple sources. Prefer sources depending on priorities if all sources have elements ready. If a subset of all
sources have elements ready the relative priorities for those sources are used to prioritize. For example, when used
with only three sources `sourceA`, `sourceB` and `sourceC`, the `sourceA` has a probability of `(priorityOfA) / (priorityOfA + priorityOfB + priorityOfC)` of being
prioritized and similarly for the rest of the sources. The priorities for each source must be positive integers.
## Example
Scala
: @@snip [FlowMergeSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala) { #mergePrioritizedN }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #mergePrioritizedN }
## Reactive Streams semantics
@@@div { .callout }
**emits** when one of the inputs has an element available, preferring inputs based on their priorities if multiple have elements available
**backpressures** when downstream backpressures
**completes** when all upstreams complete (or when any upstream completes if `eagerComplete=true`.)
**Cancels when** downstream cancels
@@@

View file

@ -7,6 +7,7 @@ package jdocs.stream.operators;
import akka.Done;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.japi.pf.PFBuilder;
import akka.stream.javadsl.Flow;
@ -202,6 +203,24 @@ class SourceOrFlow {
// #mergePrioritized
}
void mergePrioritizedNExample() {
// #mergePrioritizedN
Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(10, 20, 30, 40));
Source<Integer, NotUsed> sourceC = Source.from(Arrays.asList(100, 200, 300, 400));
List<Pair<Source<Integer, ?>,Integer>> sourcesAndPriorities = Arrays.asList(
new Pair<>(sourceA, 9900),
new Pair<>(sourceB, 99),
new Pair<>(sourceC, 1));
Source.mergePrioritizedN(sourcesAndPriorities, false).runForeach(System.out::println, system);
// prints e.g. 1, 100, 2, 3, 4, 10, 20, 30, 40, 200, 300, 400 since both sources have their
// first element ready and
// the left sourceA has higher priority - if both sources have elements ready, sourceA has a 99%
// chance of being picked next
// while sourceB has a 0.99% chance and sourceC has a 0.01% chance
// #mergePrioritizedN
}
void mergeSortedExample() {
// #merge-sorted
Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 3, 5, 7));

View file

@ -4,10 +4,9 @@
package akka.stream.scaladsl
import org.reactivestreams.Publisher
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.StreamTestKit._
import org.reactivestreams.Publisher
class FlowMergeSpec extends BaseTwoStreamsSetup {
@ -158,10 +157,26 @@ class FlowMergeSpec extends BaseTwoStreamsSetup {
//#mergePrioritized
}
"works in number example for mergePrioritizedN" in {
//#mergePrioritizedN
import akka.stream.scaladsl.{ Sink, Source }
val sourceA = Source(List(1, 2, 3, 4))
val sourceB = Source(List(10, 20, 30, 40))
val sourceC = Source(List(100, 200, 300, 400))
Source
.mergePrioritizedN(Seq((sourceA, 9900), (sourceB, 99), (sourceC, 1)), eagerComplete = false)
.runWith(Sink.foreach(println))
// prints e.g. 1, 100, 2, 3, 4, 10, 20, 30, 40, 200, 300, 400 since both sources have their first element ready and
// the left sourceA has higher priority - if both sources have elements ready, sourceA has a 99% chance of being picked next
// while sourceB has a 0.99% chance and sourceC has a 0.01% chance
//#mergePrioritizedN
}
"works in number example for merge sorted" in {
//#merge-sorted
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{ Sink, Source }
val sourceA = Source(List(1, 3, 5, 7))
val sourceB = Source(List(2, 4, 6, 8))
@ -178,8 +193,7 @@ class FlowMergeSpec extends BaseTwoStreamsSetup {
"works in number example for merge" in {
//#merge
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{ Sink, Source }
val sourceA = Source(List(1, 2, 3, 4))
val sourceB = Source(List(10, 20, 30, 40))

View file

@ -0,0 +1,134 @@
/*
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.NotUsed
import akka.stream.testkit.TestSubscriber.ManualProbe
import akka.stream.testkit.{ StreamSpec, TestSubscriber }
import scala.concurrent.duration._
class GraphMergePrioritizedNSpec extends StreamSpec {
"merge prioritized" must {
"stream data from all sources" in {
val source1 = Source.fromIterator(() => (1 to 3).iterator)
val source2 = Source.fromIterator(() => (4 to 6).iterator)
val source3 = Source.fromIterator(() => (7 to 9).iterator)
val sourcesAndPriorities = Seq((source1, 6), (source2, 3), (source3, 1));
val probe = TestSubscriber.manualProbe[Int]()
threeSourceMerge(sourcesAndPriorities, probe).run()
val subscription = probe.expectSubscription()
var collected = Seq.empty[Int]
for (_ <- 1 to 9) {
subscription.request(1)
collected :+= probe.expectNext()
}
collected.toSet should be(Set(1, 2, 3, 4, 5, 6, 7, 8, 9))
probe.expectComplete()
}
"stream data with priority" in {
val elementCount = 20000
val source1 = Source.fromIterator(() => Iterator.continually(1).take(elementCount))
val source2 = Source.fromIterator(() => Iterator.continually(2).take(elementCount))
val source3 = Source.fromIterator(() => Iterator.continually(3).take(elementCount))
val sourcesAndPriorities = Seq((source1, 6), (source2, 3), (source3, 1));
val probe = TestSubscriber.manualProbe[Int]()
threeSourceMerge(sourcesAndPriorities, probe).run()
val subscription = probe.expectSubscription()
val builder = Seq.newBuilder[Int]
for (_ <- 1 to elementCount) {
subscription.request(1)
builder += probe.expectNext()
}
val collected = builder.result()
val ones = collected.count(_ == 1).toDouble
val twos = collected.count(_ == 2).toDouble
val threes = collected.count(_ == 3).toDouble
(ones / twos) should ===(2d +- 1)
(ones / threes) should ===(6d +- 1)
(twos / threes) should ===(3d +- 1)
}
"stream data when only one source produces" in {
val elementCount = 10
val source1 = Source.fromIterator(() => Iterator.continually(1).take(elementCount))
val source2 = Source.fromIterator[Int](() => Iterator.empty)
val source3 = Source.fromIterator[Int](() => Iterator.empty)
val sourcesAndPriorities = Seq((source1, 6), (source2, 3), (source3, 1));
val probe = TestSubscriber.manualProbe[Int]()
threeSourceMerge(sourcesAndPriorities, probe).run()
val subscription = probe.expectSubscription()
var collected = Seq.empty[Int]
for (_ <- 1 to elementCount) {
subscription.request(1)
collected :+= probe.expectNext()
}
val ones = collected.count(_ == 1)
val twos = collected.count(_ == 2)
val threes = collected.count(_ == 3)
ones shouldEqual elementCount
twos shouldEqual 0
threes shouldEqual 0
}
"stream data with priority when only two sources produce" in {
val elementCount = 20000
val source1 = Source.fromIterator(() => Iterator.continually(1).take(elementCount))
val source2 = Source.fromIterator(() => Iterator.continually(2).take(elementCount))
val source3 = Source.fromIterator[Int](() => Iterator.empty)
val sourcesAndPriorities = Seq((source1, 6), (source2, 3), (source3, 1));
val probe = TestSubscriber.manualProbe[Int]()
threeSourceMerge(sourcesAndPriorities, probe).run()
val subscription = probe.expectSubscription()
val builder = Vector.newBuilder[Int]
for (_ <- 1 to elementCount) {
subscription.request(1)
builder += probe.expectNext()
}
val collected = builder.result()
val ones = collected.count(_ == 1).toDouble
val twos = collected.count(_ == 2).toDouble
val threes = collected.count(_ == 3)
threes shouldEqual 0
(ones / twos) should ===(2d +- 1)
}
}
private def threeSourceMerge[T](sourceAndPriorities: Seq[(Source[T, NotUsed], Int)], probe: ManualProbe[T]) = {
Source
.mergePrioritizedN(sourceAndPriorities, eagerComplete = false)
.initialDelay(50.millis)
.to(Sink.fromSubscriber(probe))
}
}

View file

@ -879,6 +879,28 @@ object Source {
def upcast[SuperOut, Out <: SuperOut, Mat](source: Source[Out, Mat]): Source[SuperOut, Mat] =
source.asInstanceOf[Source[SuperOut, Mat]]
/**
* Merge multiple [[Source]]s. Prefer the sources depending on the 'priority' parameters.
* The provided sources and priorities must have the same size and order.
*
* '''emits''' when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available
*
* '''backpressures''' when downstream backpressures
*
* '''completes''' when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
*
* '''Cancels when''' downstream cancels
*/
def mergePrioritizedN[T](
sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any], java.lang.Integer]],
eagerComplete: Boolean): javadsl.Source[T, NotUsed] = {
val seq =
if (sourcesAndPriorities != null)
Util.immutableSeq(sourcesAndPriorities).map(pair => (pair.first.asScala, pair.second.intValue()))
else
immutable.Seq()
new Source(scaladsl.Source.mergePrioritizedN(seq, eagerComplete))
}
}
/**

View file

@ -961,4 +961,27 @@ object Source {
close: (S) => Future[Done]): Source[T, NotUsed] =
Source.fromGraph(new UnfoldResourceSourceAsync(create, read, close))
/**
* Merge multiple [[Source]]s. Prefer the sources depending on the 'priority' parameters.
* The provided sources and priorities must have the same size and order.
*
* '''emits''' when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available
*
* '''backpressures''' when downstream backpressures
*
* '''completes''' when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
*
* '''Cancels when''' downstream cancels
*/
def mergePrioritizedN[T](
sourcesAndPriorities: immutable.Seq[(Source[T, _], Int)],
eagerComplete: Boolean): Source[T, NotUsed] = {
sourcesAndPriorities match {
case immutable.Seq() => Source.empty
case immutable.Seq((source, _)) => source.mapMaterializedValue(_ => NotUsed)
case sourcesAndPriorities =>
val (sources, priorities) = sourcesAndPriorities.unzip
combine(sources.head, sources(1), sources.drop(2): _*)(_ => MergePrioritized(priorities, eagerComplete))
}
}
}