Can merging of multiple sources be prioritized? (#22864)

* Adding MergePrioritized graph stage

* Adding MergePrioritized in akka docs

* Adding suggested documentation of MergePrioritized in akka docs

* Adding thread safe random number generator ThreadLocalRandom

* fixing documentation for MergePrioritized

* Refactoring selectNextElement() in MergePrioritized for less memory allocations

* Removing extra parameter in MergePrioritized and using SplittableRandom

* Refactoring GraphMergePrioritizedSpec

* Changes for paradox migration

* Optimized a few methods in MergePrioritized (#22864)

* increased no of elements in source in GraphMergePrioritizedSpec (#22864)
This commit is contained in:
Arpan Chaudhury 2017-06-28 17:45:46 +07:00 committed by Patrik Nordwall
parent c636f3540f
commit bfb8f168f4
8 changed files with 330 additions and 4 deletions

View file

@ -31,6 +31,7 @@ Akka Streams currently provide these junctions (for a detailed list see @ref:[st
* `Merge<In>` *(N inputs , 1 output)* picks randomly from inputs pushing them one by one to its output
* `MergePreferred<In>` like `Merge` but if elements are available on `preferred` port, it picks from it, otherwise randomly from `others`
* `MergePrioritized<In>` like `Merge` but if elements are available on all input ports, it picks from them randomly based on their `priority`
* `ZipWith<A,B,...,Out>` *(N inputs, 1 output)* which takes a function of N inputs that given a value for each input emits 1 output element
* `Zip<A,B>` *(2 inputs, 1 output)* is a `ZipWith` specialised to zipping input streams of `A` and `B` into a `Pair(A,B)` tuple stream
* `Concat<A>` *(2 inputs, 1 output)* concatenates two streams (first consume one, then the second one)

View file

@ -1568,6 +1568,19 @@ Merge multiple sources. Prefer one source if all sources has elements ready.
---------------------------------------------------------------
### mergePrioritized
Merge multiple sources. Prefer sources depending on priorities if all sources has elements ready. If a subset of all
sources has elements ready the relative priorities for those sources are used to prioritise.
**emits** when one of the inputs has an element available, preferring inputs based on their priorities if multiple have elements available
**backpressures** when downstream backpressures
**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.)
---------------------------------------------------------------
### zip
Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.

View file

@ -352,5 +352,5 @@ such as `Zip` however *do guarantee* their outputs order, as each output element
been signalled already thus the ordering in the case of zipping is defined by this property.
If you find yourself in need of fine grained control over order of emitted elements in fan-in
scenarios consider using `MergePreferred` or `GraphStage` which gives you full control over how the
merge is performed.
scenarios consider using `MergePreferred`, `MergePrioritized` or `GraphStage` which gives you full control over how the
merge is performed.

View file

@ -31,6 +31,7 @@ Akka Streams currently provide these junctions (for a detailed list see @ref:[st
* `Merge[In]` *(N inputs , 1 output)* picks randomly from inputs pushing them one by one to its output
* `MergePreferred[In]` like `Merge` but if elements are available on `preferred` port, it picks from it, otherwise randomly from `others`
* `MergePrioritized[In]` like `Merge` but if elements are available on all input ports, it picks from them randomly based on their `priority`
* `ZipWith[A,B,...,Out]` *(N inputs, 1 output)* which takes a function of N inputs that given a value for each input emits 1 output element
* `Zip[A,B]` *(2 inputs, 1 output)* is a `ZipWith` specialised to zipping input streams of `A` and `B` into an `(A,B)` tuple stream
* `Concat[A]` *(2 inputs, 1 output)* concatenates two streams (first consume one, then the second one)

View file

@ -0,0 +1,144 @@
package akka.stream.scaladsl
import akka.NotUsed
import akka.stream.testkit.TestSubscriber.ManualProbe
import akka.stream.{ ClosedShape, Inlet, Outlet }
import akka.stream.testkit.{ TestSubscriber, TwoStreamsSetup }
class GraphMergePrioritizedSpec extends TwoStreamsSetup {
import GraphDSL.Implicits._
override type Outputs = Int
override def fixture(b: GraphDSL.Builder[_]): Fixture = new Fixture(b) {
val mergePrioritized = b add MergePrioritized[Outputs](Seq(2, 8))
override def left: Inlet[Outputs] = mergePrioritized.in(0)
override def right: Inlet[Outputs] = mergePrioritized.in(1)
override def out: Outlet[Outputs] = mergePrioritized.out
}
"merge prioritized" must {
commonTests()
"stream data from all sources" in {
val source1 = Source.fromIterator(() (1 to 3).iterator)
val source2 = Source.fromIterator(() (4 to 6).iterator)
val source3 = Source.fromIterator(() (7 to 9).iterator)
val priorities = Seq(6, 3, 1)
val probe = TestSubscriber.manualProbe[Int]()
threeSourceMerge(source1, source2, source3, priorities, probe).run()
val subscription = probe.expectSubscription()
var collected = Seq.empty[Int]
for (_ 1 to 9) {
subscription.request(1)
collected :+= probe.expectNext()
}
collected.toSet should be(Set(1, 2, 3, 4, 5, 6, 7, 8, 9))
probe.expectComplete()
}
"stream data with priority" in {
val elementCount = 20000
val source1 = Source.fromIterator(() Seq.fill(elementCount)(1).iterator)
val source2 = Source.fromIterator(() Seq.fill(elementCount)(2).iterator)
val source3 = Source.fromIterator(() Seq.fill(elementCount)(3).iterator)
val priorities = Seq(6, 3, 1)
val probe = TestSubscriber.manualProbe[Int]()
threeSourceMerge(source1, source2, source3, priorities, probe).run()
val subscription = probe.expectSubscription()
var collected = Seq.empty[Int]
for (_ 1 to elementCount) {
subscription.request(1)
collected :+= probe.expectNext()
}
val ones = collected.count(_ == 1).toDouble
val twos = collected.count(_ == 2).toDouble
val threes = collected.count(_ == 3).toDouble
(ones / twos).round shouldEqual 2
(ones / threes).round shouldEqual 6
(twos / threes).round shouldEqual 3
}
"stream data when only one source produces" in {
val elementCount = 10
val source1 = Source.fromIterator(() Seq.fill(elementCount)(1).iterator)
val source2 = Source.fromIterator(() Seq.empty[Int].iterator)
val source3 = Source.fromIterator(() Seq.empty[Int].iterator)
val priorities = Seq(6, 3, 1)
val probe = TestSubscriber.manualProbe[Int]()
threeSourceMerge(source1, source2, source3, priorities, probe).run()
val subscription = probe.expectSubscription()
var collected = Seq.empty[Int]
for (_ 1 to elementCount) {
subscription.request(1)
collected :+= probe.expectNext()
}
val ones = collected.count(_ == 1)
val twos = collected.count(_ == 2)
val threes = collected.count(_ == 3)
ones shouldEqual elementCount
twos shouldEqual 0
threes shouldEqual 0
}
"stream data with priority when only two sources produce" in {
val elementCount = 20000
val source1 = Source.fromIterator(() Seq.fill(elementCount)(1).iterator)
val source2 = Source.fromIterator(() Seq.fill(elementCount)(2).iterator)
val source3 = Source.fromIterator(() Seq.empty[Int].iterator)
val priorities = Seq(6, 3, 1)
val probe = TestSubscriber.manualProbe[Int]()
threeSourceMerge(source1, source2, source3, priorities, probe).run()
val subscription = probe.expectSubscription()
var collected = Seq.empty[Int]
for (_ 1 to elementCount) {
subscription.request(1)
collected :+= probe.expectNext()
}
val ones = collected.count(_ == 1).toDouble
val twos = collected.count(_ == 2).toDouble
val threes = collected.count(_ == 3)
threes shouldEqual 0
(ones / twos).round shouldBe 2
}
}
private def threeSourceMerge[T](source1: Source[T, NotUsed], source2: Source[T, NotUsed], source3: Source[T, NotUsed], priorities: Seq[Int], probe: ManualProbe[T]) = {
RunnableGraph.fromGraph(GraphDSL.create(source1, source2, source3)((_, _, _)) { implicit b (s1, s2, s3)
val merge = b.add(MergePrioritized[T](priorities))
s1.out ~> merge.in(0)
s2.out ~> merge.in(1)
s3.out ~> merge.in(2)
merge.out ~> Sink.fromSubscriber(probe)
ClosedShape
})
}
}

View file

@ -70,6 +70,7 @@ import akka.stream._
val merge = name("merge")
val mergePreferred = name("mergePreferred")
val mergePrioritized = name("mergePrioritized")
val flattenMerge = name("flattenMerge")
val recoverWith = name("recoverWith")
val broadcast = name("broadcast")
@ -135,6 +136,4 @@ import akka.stream._
val fromJavaStream = name("fromJavaStream")
}
import DefaultAttributes._
}

View file

@ -98,6 +98,56 @@ object MergePreferred {
}
/**
* Merge several streams, taking elements as they arrive from input streams
* (picking from prioritized once when several have elements ready).
*
* A `MergePrioritized` has one `out` port, one or more input port with their priorities.
*
* '''Emits when''' one of the inputs has an element available, preferring
* a input based on its priority if multiple have elements available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
*
* '''Cancels when''' downstream cancels
*
* A `Broadcast` has one `in` port and 2 or more `out` ports.
*/
object MergePrioritized {
/**
* Create a new `MergePrioritized` stage with the specified output type.
*/
def create[T](priorities: Array[Int]): Graph[UniformFanInShape[T, T], NotUsed] =
scaladsl.MergePrioritized(priorities)
/**
* Create a new `MergePrioritized` stage with the specified output type.
*/
def create[T](clazz: Class[T], priorities: Array[Int]): Graph[UniformFanInShape[T, T], NotUsed] =
create(priorities)
/**
* Create a new `MergePrioritized` stage with the specified output type.
*
* @param eagerComplete set to true in order to make this stage eagerly
* finish as soon as one of its inputs completes
*/
def create[T](priorities: Array[Int], eagerComplete: Boolean): Graph[UniformFanInShape[T, T], NotUsed] =
scaladsl.MergePrioritized(priorities, eagerComplete = eagerComplete)
/**
* Create a new `MergePrioritized` stage with the specified output type.
*
* @param eagerComplete set to true in order to make this stage eagerly
* finish as soon as one of its inputs completes
*/
def create[T](clazz: Class[T], priorities: Array[Int], eagerComplete: Boolean): Graph[UniformFanInShape[T, T], NotUsed] =
create(priorities, eagerComplete)
}
/**
* Fan-out the stream to several streams. emitting each incoming upstream element to all downstream consumers.
* It will not shutdown until the subscriptions for at least

View file

@ -3,7 +3,10 @@
*/
package akka.stream.scaladsl
import java.util.SplittableRandom
import akka.NotUsed
import akka.dispatch.forkjoin.ThreadLocalRandom
import akka.stream._
import akka.stream.impl._
import akka.stream.impl.fusing.GraphStages
@ -278,6 +281,121 @@ final class MergePreferred[T](val secondaryPorts: Int, val eagerComplete: Boolea
}
}
object MergePrioritized {
/**
* Create a new `MergePrioritized` with specified number of input ports.
*
* @param priorities priorities of the input ports
* @param eagerComplete if true, the merge will complete as soon as one of its inputs completes.
*/
def apply[T](priorities: Seq[Int], eagerComplete: Boolean = false): GraphStage[UniformFanInShape[T, T]] = new MergePrioritized(priorities, eagerComplete)
}
/**
* Merge several streams, taking elements as they arrive from input streams
* (picking from prioritized once when several have elements ready).
*
* A `MergePrioritized` has one `out` port, one or more input port with their priorities.
*
* '''Emits when''' one of the inputs has an element available, preferring
* a input based on its priority if multiple have elements available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
*
* '''Cancels when''' downstream cancels
*
* A `Broadcast` has one `in` port and 2 or more `out` ports.
*/
final class MergePrioritized[T] private (val priorities: Seq[Int], val eagerComplete: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
private val inputPorts = priorities.size
require(inputPorts > 0, "A Merge must have one or more input ports")
require(priorities.forall(_ > 0), "Priorities should be positive integers")
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i Inlet[T]("MergePrioritized.in" + i))
val out: Outlet[T] = Outlet[T]("MergePrioritized.out")
override def initialAttributes: Attributes = DefaultAttributes.mergePrioritized
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
private val allBuffers = Vector.tabulate(priorities.size)(i FixedSizeBuffer[Inlet[T]](priorities(i)))
private var runningUpstreams = inputPorts
private val randomGen = new SplittableRandom
override def preStart(): Unit = in.foreach(tryPull)
(in zip allBuffers).foreach {
case (inlet, buffer)
setHandler(inlet, new InHandler {
override def onPush(): Unit = {
if (isAvailable(out) && !hasPending) {
push(out, grab(inlet))
tryPull(inlet)
} else {
buffer.enqueue(inlet)
}
}
override def onUpstreamFinish(): Unit = {
if (eagerComplete) {
in.foreach(cancel)
runningUpstreams = 0
if (!hasPending) completeStage()
} else {
runningUpstreams -= 1
if (upstreamsClosed && !hasPending) completeStage()
}
}
})
}
override def onPull(): Unit = {
if (hasPending) dequeueAndDispatch()
}
setHandler(out, this)
private def hasPending: Boolean = allBuffers.exists(_.nonEmpty)
private def upstreamsClosed = runningUpstreams == 0
private def dequeueAndDispatch(): Unit = {
val in = selectNextElement()
push(out, grab(in))
if (upstreamsClosed && !hasPending) completeStage() else tryPull(in)
}
private def selectNextElement() = {
var tp = 0
var ix = 0
while (ix < in.size) {
if (allBuffers(ix).nonEmpty) {
tp += priorities(ix)
}
ix += 1
}
var r = randomGen.nextInt(tp)
var next: Inlet[T] = null
ix = 0
while (ix < in.size && next == null) {
if (allBuffers(ix).nonEmpty) {
r -= priorities(ix)
if (r < 0) next = allBuffers(ix).dequeue()
}
ix += 1
}
next
}
}
override def toString = "MergePrioritized"
}
object Interleave {
/**
* Create a new `Interleave` with the specified number of input ports and given size of elements