diff --git a/akka-docs/src/main/paradox/java/stream/stream-graphs.md b/akka-docs/src/main/paradox/java/stream/stream-graphs.md index 61e3211797..e94e3cba68 100644 --- a/akka-docs/src/main/paradox/java/stream/stream-graphs.md +++ b/akka-docs/src/main/paradox/java/stream/stream-graphs.md @@ -31,6 +31,7 @@ Akka Streams currently provide these junctions (for a detailed list see @ref:[st * `Merge` – *(N inputs , 1 output)* picks randomly from inputs pushing them one by one to its output * `MergePreferred` – like `Merge` but if elements are available on `preferred` port, it picks from it, otherwise randomly from `others` + * `MergePrioritized` – like `Merge` but if elements are available on all input ports, it picks from them randomly based on their `priority` * `ZipWith` – *(N inputs, 1 output)* which takes a function of N inputs that given a value for each input emits 1 output element * `Zip` – *(2 inputs, 1 output)* is a `ZipWith` specialised to zipping input streams of `A` and `B` into a `Pair(A,B)` tuple stream * `Concat` – *(2 inputs, 1 output)* concatenates two streams (first consume one, then the second one) diff --git a/akka-docs/src/main/paradox/scala/stream/stages-overview.md b/akka-docs/src/main/paradox/scala/stream/stages-overview.md index b409c55b68..c2aeea459f 100644 --- a/akka-docs/src/main/paradox/scala/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/scala/stream/stages-overview.md @@ -1568,6 +1568,19 @@ Merge multiple sources. Prefer one source if all sources has elements ready. --------------------------------------------------------------- +### mergePrioritized + +Merge multiple sources. Prefer sources depending on priorities if all sources has elements ready. If a subset of all +sources has elements ready the relative priorities for those sources are used to prioritise. + +**emits** when one of the inputs has an element available, preferring inputs based on their priorities if multiple have elements available + +**backpressures** when downstream backpressures + +**completes** when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting `eagerComplete=true`.) + +--------------------------------------------------------------- + ### zip Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream. diff --git a/akka-docs/src/main/paradox/scala/stream/stream-flows-and-basics.md b/akka-docs/src/main/paradox/scala/stream/stream-flows-and-basics.md index 2ee9025ea6..d1a534b9fb 100644 --- a/akka-docs/src/main/paradox/scala/stream/stream-flows-and-basics.md +++ b/akka-docs/src/main/paradox/scala/stream/stream-flows-and-basics.md @@ -352,5 +352,5 @@ such as `Zip` however *do guarantee* their outputs order, as each output element been signalled already – thus the ordering in the case of zipping is defined by this property. If you find yourself in need of fine grained control over order of emitted elements in fan-in -scenarios consider using `MergePreferred` or `GraphStage` – which gives you full control over how the -merge is performed. +scenarios consider using `MergePreferred`, `MergePrioritized` or `GraphStage` – which gives you full control over how the +merge is performed. \ No newline at end of file diff --git a/akka-docs/src/main/paradox/scala/stream/stream-graphs.md b/akka-docs/src/main/paradox/scala/stream/stream-graphs.md index ec9a0c9451..6f5d2b0592 100644 --- a/akka-docs/src/main/paradox/scala/stream/stream-graphs.md +++ b/akka-docs/src/main/paradox/scala/stream/stream-graphs.md @@ -31,6 +31,7 @@ Akka Streams currently provide these junctions (for a detailed list see @ref:[st * `Merge[In]` – *(N inputs , 1 output)* picks randomly from inputs pushing them one by one to its output * `MergePreferred[In]` – like `Merge` but if elements are available on `preferred` port, it picks from it, otherwise randomly from `others` + * `MergePrioritized[In]` – like `Merge` but if elements are available on all input ports, it picks from them randomly based on their `priority` * `ZipWith[A,B,...,Out]` – *(N inputs, 1 output)* which takes a function of N inputs that given a value for each input emits 1 output element * `Zip[A,B]` – *(2 inputs, 1 output)* is a `ZipWith` specialised to zipping input streams of `A` and `B` into an `(A,B)` tuple stream * `Concat[A]` – *(2 inputs, 1 output)* concatenates two streams (first consume one, then the second one) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePrioritizedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePrioritizedSpec.scala new file mode 100644 index 0000000000..0829dbe3c0 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePrioritizedSpec.scala @@ -0,0 +1,144 @@ +package akka.stream.scaladsl + +import akka.NotUsed +import akka.stream.testkit.TestSubscriber.ManualProbe +import akka.stream.{ ClosedShape, Inlet, Outlet } +import akka.stream.testkit.{ TestSubscriber, TwoStreamsSetup } + +class GraphMergePrioritizedSpec extends TwoStreamsSetup { + import GraphDSL.Implicits._ + + override type Outputs = Int + + override def fixture(b: GraphDSL.Builder[_]): Fixture = new Fixture(b) { + val mergePrioritized = b add MergePrioritized[Outputs](Seq(2, 8)) + + override def left: Inlet[Outputs] = mergePrioritized.in(0) + override def right: Inlet[Outputs] = mergePrioritized.in(1) + override def out: Outlet[Outputs] = mergePrioritized.out + } + + "merge prioritized" must { + commonTests() + + "stream data from all sources" in { + val source1 = Source.fromIterator(() ⇒ (1 to 3).iterator) + val source2 = Source.fromIterator(() ⇒ (4 to 6).iterator) + val source3 = Source.fromIterator(() ⇒ (7 to 9).iterator) + + val priorities = Seq(6, 3, 1) + + val probe = TestSubscriber.manualProbe[Int]() + + threeSourceMerge(source1, source2, source3, priorities, probe).run() + + val subscription = probe.expectSubscription() + + var collected = Seq.empty[Int] + for (_ ← 1 to 9) { + subscription.request(1) + collected :+= probe.expectNext() + } + + collected.toSet should be(Set(1, 2, 3, 4, 5, 6, 7, 8, 9)) + probe.expectComplete() + } + + "stream data with priority" in { + val elementCount = 20000 + val source1 = Source.fromIterator(() ⇒ Seq.fill(elementCount)(1).iterator) + val source2 = Source.fromIterator(() ⇒ Seq.fill(elementCount)(2).iterator) + val source3 = Source.fromIterator(() ⇒ Seq.fill(elementCount)(3).iterator) + + val priorities = Seq(6, 3, 1) + + val probe = TestSubscriber.manualProbe[Int]() + + threeSourceMerge(source1, source2, source3, priorities, probe).run() + + val subscription = probe.expectSubscription() + + var collected = Seq.empty[Int] + for (_ ← 1 to elementCount) { + subscription.request(1) + collected :+= probe.expectNext() + } + + val ones = collected.count(_ == 1).toDouble + val twos = collected.count(_ == 2).toDouble + val threes = collected.count(_ == 3).toDouble + + (ones / twos).round shouldEqual 2 + (ones / threes).round shouldEqual 6 + (twos / threes).round shouldEqual 3 + } + + "stream data when only one source produces" in { + val elementCount = 10 + val source1 = Source.fromIterator(() ⇒ Seq.fill(elementCount)(1).iterator) + val source2 = Source.fromIterator(() ⇒ Seq.empty[Int].iterator) + val source3 = Source.fromIterator(() ⇒ Seq.empty[Int].iterator) + + val priorities = Seq(6, 3, 1) + + val probe = TestSubscriber.manualProbe[Int]() + + threeSourceMerge(source1, source2, source3, priorities, probe).run() + + val subscription = probe.expectSubscription() + + var collected = Seq.empty[Int] + for (_ ← 1 to elementCount) { + subscription.request(1) + collected :+= probe.expectNext() + } + + val ones = collected.count(_ == 1) + val twos = collected.count(_ == 2) + val threes = collected.count(_ == 3) + + ones shouldEqual elementCount + twos shouldEqual 0 + threes shouldEqual 0 + } + + "stream data with priority when only two sources produce" in { + val elementCount = 20000 + val source1 = Source.fromIterator(() ⇒ Seq.fill(elementCount)(1).iterator) + val source2 = Source.fromIterator(() ⇒ Seq.fill(elementCount)(2).iterator) + val source3 = Source.fromIterator(() ⇒ Seq.empty[Int].iterator) + + val priorities = Seq(6, 3, 1) + + val probe = TestSubscriber.manualProbe[Int]() + + threeSourceMerge(source1, source2, source3, priorities, probe).run() + + val subscription = probe.expectSubscription() + + var collected = Seq.empty[Int] + for (_ ← 1 to elementCount) { + subscription.request(1) + collected :+= probe.expectNext() + } + + val ones = collected.count(_ == 1).toDouble + val twos = collected.count(_ == 2).toDouble + val threes = collected.count(_ == 3) + + threes shouldEqual 0 + (ones / twos).round shouldBe 2 + } + } + + private def threeSourceMerge[T](source1: Source[T, NotUsed], source2: Source[T, NotUsed], source3: Source[T, NotUsed], priorities: Seq[Int], probe: ManualProbe[T]) = { + RunnableGraph.fromGraph(GraphDSL.create(source1, source2, source3)((_, _, _)) { implicit b ⇒ (s1, s2, s3) ⇒ + val merge = b.add(MergePrioritized[T](priorities)) + s1.out ~> merge.in(0) + s2.out ~> merge.in(1) + s3.out ~> merge.in(2) + merge.out ~> Sink.fromSubscriber(probe) + ClosedShape + }) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 0efaf4a46b..46811687fd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -70,6 +70,7 @@ import akka.stream._ val merge = name("merge") val mergePreferred = name("mergePreferred") + val mergePrioritized = name("mergePrioritized") val flattenMerge = name("flattenMerge") val recoverWith = name("recoverWith") val broadcast = name("broadcast") @@ -135,6 +136,4 @@ import akka.stream._ val fromJavaStream = name("fromJavaStream") } - import DefaultAttributes._ - } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index e009ef3398..d1bbb06aeb 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -98,6 +98,56 @@ object MergePreferred { } +/** + * Merge several streams, taking elements as they arrive from input streams + * (picking from prioritized once when several have elements ready). + * + * A `MergePrioritized` has one `out` port, one or more input port with their priorities. + * + * '''Emits when''' one of the inputs has an element available, preferring + * a input based on its priority if multiple have elements available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false` + * + * '''Cancels when''' downstream cancels + * + * A `Broadcast` has one `in` port and 2 or more `out` ports. + */ +object MergePrioritized { + /** + * Create a new `MergePrioritized` stage with the specified output type. + */ + def create[T](priorities: Array[Int]): Graph[UniformFanInShape[T, T], NotUsed] = + scaladsl.MergePrioritized(priorities) + + /** + * Create a new `MergePrioritized` stage with the specified output type. + */ + def create[T](clazz: Class[T], priorities: Array[Int]): Graph[UniformFanInShape[T, T], NotUsed] = + create(priorities) + + /** + * Create a new `MergePrioritized` stage with the specified output type. + * + * @param eagerComplete set to true in order to make this stage eagerly + * finish as soon as one of its inputs completes + */ + def create[T](priorities: Array[Int], eagerComplete: Boolean): Graph[UniformFanInShape[T, T], NotUsed] = + scaladsl.MergePrioritized(priorities, eagerComplete = eagerComplete) + + /** + * Create a new `MergePrioritized` stage with the specified output type. + * + * @param eagerComplete set to true in order to make this stage eagerly + * finish as soon as one of its inputs completes + */ + def create[T](clazz: Class[T], priorities: Array[Int], eagerComplete: Boolean): Graph[UniformFanInShape[T, T], NotUsed] = + create(priorities, eagerComplete) + +} + /** * Fan-out the stream to several streams. emitting each incoming upstream element to all downstream consumers. * It will not shutdown until the subscriptions for at least diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 06481014aa..c470cfef62 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -3,7 +3,10 @@ */ package akka.stream.scaladsl +import java.util.SplittableRandom + import akka.NotUsed +import akka.dispatch.forkjoin.ThreadLocalRandom import akka.stream._ import akka.stream.impl._ import akka.stream.impl.fusing.GraphStages @@ -278,6 +281,121 @@ final class MergePreferred[T](val secondaryPorts: Int, val eagerComplete: Boolea } } +object MergePrioritized { + /** + * Create a new `MergePrioritized` with specified number of input ports. + * + * @param priorities priorities of the input ports + * @param eagerComplete if true, the merge will complete as soon as one of its inputs completes. + */ + def apply[T](priorities: Seq[Int], eagerComplete: Boolean = false): GraphStage[UniformFanInShape[T, T]] = new MergePrioritized(priorities, eagerComplete) +} + +/** + * Merge several streams, taking elements as they arrive from input streams + * (picking from prioritized once when several have elements ready). + * + * A `MergePrioritized` has one `out` port, one or more input port with their priorities. + * + * '''Emits when''' one of the inputs has an element available, preferring + * a input based on its priority if multiple have elements available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false` + * + * '''Cancels when''' downstream cancels + * + * A `Broadcast` has one `in` port and 2 or more `out` ports. + */ +final class MergePrioritized[T] private (val priorities: Seq[Int], val eagerComplete: Boolean) extends GraphStage[UniformFanInShape[T, T]] { + private val inputPorts = priorities.size + require(inputPorts > 0, "A Merge must have one or more input ports") + require(priorities.forall(_ > 0), "Priorities should be positive integers") + + val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("MergePrioritized.in" + i)) + val out: Outlet[T] = Outlet[T]("MergePrioritized.out") + override def initialAttributes: Attributes = DefaultAttributes.mergePrioritized + override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { + private val allBuffers = Vector.tabulate(priorities.size)(i ⇒ FixedSizeBuffer[Inlet[T]](priorities(i))) + private var runningUpstreams = inputPorts + private val randomGen = new SplittableRandom + + override def preStart(): Unit = in.foreach(tryPull) + + (in zip allBuffers).foreach { + case (inlet, buffer) ⇒ + setHandler(inlet, new InHandler { + override def onPush(): Unit = { + if (isAvailable(out) && !hasPending) { + push(out, grab(inlet)) + tryPull(inlet) + } else { + buffer.enqueue(inlet) + } + } + + override def onUpstreamFinish(): Unit = { + if (eagerComplete) { + in.foreach(cancel) + runningUpstreams = 0 + if (!hasPending) completeStage() + } else { + runningUpstreams -= 1 + if (upstreamsClosed && !hasPending) completeStage() + } + } + }) + } + + override def onPull(): Unit = { + if (hasPending) dequeueAndDispatch() + } + + setHandler(out, this) + + private def hasPending: Boolean = allBuffers.exists(_.nonEmpty) + + private def upstreamsClosed = runningUpstreams == 0 + + private def dequeueAndDispatch(): Unit = { + val in = selectNextElement() + push(out, grab(in)) + if (upstreamsClosed && !hasPending) completeStage() else tryPull(in) + } + + private def selectNextElement() = { + var tp = 0 + var ix = 0 + + while (ix < in.size) { + if (allBuffers(ix).nonEmpty) { + tp += priorities(ix) + } + ix += 1 + } + + var r = randomGen.nextInt(tp) + var next: Inlet[T] = null + ix = 0 + + while (ix < in.size && next == null) { + if (allBuffers(ix).nonEmpty) { + r -= priorities(ix) + if (r < 0) next = allBuffers(ix).dequeue() + } + ix += 1 + } + + next + } + } + + override def toString = "MergePrioritized" +} + object Interleave { /** * Create a new `Interleave` with the specified number of input ports and given size of elements