+str #16073 readPreferred for FlexiMerge

This commit is contained in:
Konrad 'ktoso' Malawski 2014-11-06 14:57:17 +01:00 committed by Konrad Malawski
parent fc941b891c
commit 293be804e4
4 changed files with 164 additions and 8 deletions

View file

@ -172,6 +172,23 @@ object GraphFlexiMergeSpec {
}
}
class PreferringMerge extends FlexiMerge[Int] {
import FlexiMerge._
val preferred = createInputPort[Int]()
val secondary1 = createInputPort[Int]()
val secondary2 = createInputPort[Int]()
def createMergeLogic = new MergeLogic[Int] {
override def inputHandles(inputCount: Int) = Vector(preferred, secondary1, secondary2)
override def initialState = State[Int](ReadPreferred(preferred)(secondary1, secondary2)) {
(ctx, input, element)
ctx.emit(element)
SameState
}
}
}
class TestMerge extends FlexiMerge[String]("testMerge") {
import FlexiMerge._
val input1 = createInputPort[String]()
@ -298,8 +315,9 @@ class GraphFlexiMergeSpec extends AkkaSpec {
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(100)
for (n 1 to 9)
for (n 1 to 9) {
s.expectNext(n)
}
s.expectComplete()
}
@ -331,6 +349,94 @@ class GraphFlexiMergeSpec extends AkkaSpec {
s.expectComplete()
}
"build perferring merge" in {
val output = Sink.publisher[Int]
val m = FlowGraph { implicit b
val merge = new PreferringMerge
Source(List(1, 2, 3)) ~> merge.preferred
Source(List(11, 12, 13)) ~> merge.secondary1
Source(List(14, 15, 16)) ~> merge.secondary2
merge.out ~> output
}.run()
val s = SubscriberProbe[Int]
val p = m.get(output)
p.subscribe(s)
val sub = s.expectSubscription()
sub.request(100)
s.expectNext(1)
s.expectNext(2)
s.expectNext(3)
val secondaries = s.expectNext() ::
s.expectNext() ::
s.expectNext() ::
s.expectNext() ::
s.expectNext() ::
s.expectNext() :: Nil
secondaries.toSet should equal(Set(11, 12, 13, 14, 15, 16))
s.expectComplete()
}
"build perferring merge, manually driven" in {
val output = Sink.publisher[Int]
val preferredDriver = PublisherProbe[Int]()
val otherDriver1 = PublisherProbe[Int]()
val otherDriver2 = PublisherProbe[Int]()
val m = FlowGraph { implicit b
val merge = new PreferringMerge
Source(preferredDriver) ~> merge.preferred
Source(otherDriver1) ~> merge.secondary1
Source(otherDriver2) ~> merge.secondary2
merge.out ~> output
}.run()
val s = SubscriberProbe[Int]
val p = m.get(output)
p.subscribe(s)
val sub = s.expectSubscription()
val p1 = preferredDriver.expectSubscription()
val s1 = otherDriver1.expectSubscription()
val s2 = otherDriver2.expectSubscription()
// just consume the preferred
p1.sendNext(1)
sub.request(1)
s.expectNext(1)
// pick preferred over any of the secondaries
p1.sendNext(2)
s1.sendNext(10)
s2.sendNext(20)
sub.request(1)
s.expectNext(2)
sub.request(2)
s.expectNext(10)
s.expectNext(20)
p1.sendComplete()
// continue with just secondaries when preferred has completed
s1.sendNext(11)
s2.sendNext(21)
sub.request(2)
val d1 = s.expectNext()
val d2 = s.expectNext()
Set(d1, d2) should equal(Set(11, 21))
// continue with just one secondary
s1.sendComplete()
s2.sendNext(4)
sub.request(1)
s.expectNext(4)
s2.sendComplete()
// complete when all inputs have completed
s.expectComplete()
}
"support cancel of input" in {
val publisher = PublisherProbe[String]
val m = FlowGraph { implicit b

View file

@ -86,7 +86,7 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings,
private def precondition: TransferState = {
behavior.condition match {
case _: ReadAny | _: Read inputBunch.AnyOfMarkedInputs && primaryOutputs.NeedsDemand
case _: ReadAny | _: ReadPreferred | _: Read inputBunch.AnyOfMarkedInputs && primaryOutputs.NeedsDemand
}
}
@ -99,6 +99,9 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings,
behavior.condition match {
case read: ReadAny
markInputs(read.inputs.toArray)
case ReadPreferred(preferred, secondaries)
markInputs(secondaries.toArray)
inputBunch.markInput(preferred.portIndex)
case Read(input)
require(inputMapping.contains(input.portIndex), s"Unknown input handle $input")
require(!inputBunch.isCancelled(input.portIndex), s"Read not allowed from cancelled $input")
@ -119,7 +122,12 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings,
val inputHandle = inputMapping(id)
changeBehavior(behavior.onInput(ctx, inputHandle, elem))
triggerCompletionAfterRead(inputHandle)
case ReadPreferred(preferred, secondaries)
val id = inputBunch.idToDequeue()
val elem = inputBunch.dequeueAndPrefer(id)
val inputHandle = inputMapping(id)
changeBehavior(behavior.onInput(ctx, inputHandle, elem))
triggerCompletionAfterRead(inputHandle)
case Read(inputHandle)
val elem = inputBunch.dequeue(inputHandle.portIndex)
changeBehavior(behavior.onInput(ctx, inputHandle, elem))

View file

@ -66,6 +66,18 @@ object FlexiMerge {
*/
class ReadAny(val inputs: JList[InputHandle]) extends ReadCondition
/**
* Read condition for the [[MergeLogic#State]] that will be
* fulfilled when there are elements for any of the given upstream
* inputs, however it differs from [[ReadAny]] in the case that both
* the `preferred` and at least one other `secondary` input have demand,
* the `preferred` input will always be consumed first.
*
* Cancelled and completed inputs are not used, i.e. it is allowed
* to specify them in the list of `inputs`.
*/
class ReadPreferred(val preferred: InputHandle, val secondaries: JList[InputHandle]) extends ReadCondition
/**
* Context that is passed to the methods of [[State]] and [[CompletionHandling]].
* The context provides means for performing side effects, such as emitting elements
@ -159,6 +171,14 @@ object FlexiMerge {
new ReadAny(inputs.asJava)
}
/**
* Convenience to create a [[ReadPreferred]] condition.
*/
@varargs def readPreferred(preferred: InputHandle, secondaries: InputHandle*): ReadPreferred = {
import scala.collection.JavaConverters._
new ReadPreferred(preferred, secondaries.asJava)
}
/**
* Convenience to create a [[Read]] condition.
*/
@ -254,11 +274,13 @@ object FlexiMerge {
}
def convertReadCondition(condition: ReadCondition): scaladsl.FlexiMerge.ReadCondition =
def convertReadCondition(condition: ReadCondition): scaladsl.FlexiMerge.ReadCondition = {
condition match {
case r: ReadAny scaladsl.FlexiMerge.ReadAny(immutableIndexedSeq(r.inputs))
case r: Read scaladsl.FlexiMerge.Read(r.input)
case r: ReadAny scaladsl.FlexiMerge.ReadAny(immutableIndexedSeq(r.inputs))
case r: ReadPreferred scaladsl.FlexiMerge.ReadPreferred(r.preferred, immutableIndexedSeq(r.secondaries))
case r: Read scaladsl.FlexiMerge.Read(r.input)
}
}
}
}

View file

@ -3,6 +3,7 @@
*/
package akka.stream.scaladsl
import scala.annotation.varargs
import scala.collection.immutable
import akka.stream.impl.Ast
import akka.stream.impl.FlexiMergeImpl.MergeLogicFactory
@ -58,9 +59,28 @@ object FlexiMerge {
*/
final case class ReadAny(inputs: InputHandle*) extends ReadCondition
object ReadPreferred {
def apply(preferred: InputHandle)(secondaries: InputHandle*): ReadPreferred =
new ReadPreferred(preferred, secondaries.toArray)
def apply(preferred: InputHandle, secondaries: immutable.Seq[InputHandle]): ReadPreferred =
new ReadPreferred(preferred, secondaries.toArray)
}
/**
* The possibly stateful logic that reads from input via the defined [[State]] and
* handles completion and error via the defined [[CompletionHandling]].
* Read condition for the [[MergeLogic#State]] that will be
* fulfilled when there are elements for any of the given upstream
* inputs, however it differs from [[ReadAny]] in the case that both
* the `preferred` and at least one other `secondary` input have demand,
* the `preferred` input will always be consumed first.
*
* Cancelled and completed inputs are not used, i.e. it is allowed
* to specify them in the list of `inputs`.
*/
final case class ReadPreferred(preferred: InputHandle, secondaries: Array[InputHandle]) extends ReadCondition
/**
* The possibly stateful logic that reads from input via the defined [[MergeLogic#State]] and
* handles completion and error via the defined [[FlexiMerge#CompletionHandling]].
*
* Concrete instance is supposed to be created by implementing [[FlexiMerge#createMergeLogic]].
*/