diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala index 27d9544116..70f4bc68be 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala @@ -172,6 +172,23 @@ object GraphFlexiMergeSpec { } } + class PreferringMerge extends FlexiMerge[Int] { + import FlexiMerge._ + val preferred = createInputPort[Int]() + val secondary1 = createInputPort[Int]() + val secondary2 = createInputPort[Int]() + + def createMergeLogic = new MergeLogic[Int] { + override def inputHandles(inputCount: Int) = Vector(preferred, secondary1, secondary2) + + override def initialState = State[Int](ReadPreferred(preferred)(secondary1, secondary2)) { + (ctx, input, element) ⇒ + ctx.emit(element) + SameState + } + } + } + class TestMerge extends FlexiMerge[String]("testMerge") { import FlexiMerge._ val input1 = createInputPort[String]() @@ -298,8 +315,9 @@ class GraphFlexiMergeSpec extends AkkaSpec { p.subscribe(s) val sub = s.expectSubscription() sub.request(100) - for (n ← 1 to 9) + for (n ← 1 to 9) { s.expectNext(n) + } s.expectComplete() } @@ -331,6 +349,94 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectComplete() } + "build perferring merge" in { + val output = Sink.publisher[Int] + val m = FlowGraph { implicit b ⇒ + val merge = new PreferringMerge + Source(List(1, 2, 3)) ~> merge.preferred + Source(List(11, 12, 13)) ~> merge.secondary1 + Source(List(14, 15, 16)) ~> merge.secondary2 + merge.out ~> output + }.run() + + val s = SubscriberProbe[Int] + val p = m.get(output) + p.subscribe(s) + val sub = s.expectSubscription() + sub.request(100) + s.expectNext(1) + s.expectNext(2) + s.expectNext(3) + val secondaries = s.expectNext() :: + s.expectNext() :: + s.expectNext() :: + s.expectNext() :: + s.expectNext() :: + s.expectNext() :: Nil + + secondaries.toSet should equal(Set(11, 12, 13, 14, 15, 16)) + s.expectComplete() + } + "build perferring merge, manually driven" in { + val output = Sink.publisher[Int] + val preferredDriver = PublisherProbe[Int]() + val otherDriver1 = PublisherProbe[Int]() + val otherDriver2 = PublisherProbe[Int]() + + val m = FlowGraph { implicit b ⇒ + val merge = new PreferringMerge + Source(preferredDriver) ~> merge.preferred + Source(otherDriver1) ~> merge.secondary1 + Source(otherDriver2) ~> merge.secondary2 + merge.out ~> output + }.run() + + val s = SubscriberProbe[Int] + val p = m.get(output) + p.subscribe(s) + + val sub = s.expectSubscription() + val p1 = preferredDriver.expectSubscription() + val s1 = otherDriver1.expectSubscription() + val s2 = otherDriver2.expectSubscription() + + // just consume the preferred + p1.sendNext(1) + sub.request(1) + s.expectNext(1) + + // pick preferred over any of the secondaries + p1.sendNext(2) + s1.sendNext(10) + s2.sendNext(20) + sub.request(1) + s.expectNext(2) + + sub.request(2) + s.expectNext(10) + s.expectNext(20) + + p1.sendComplete() + + // continue with just secondaries when preferred has completed + s1.sendNext(11) + s2.sendNext(21) + sub.request(2) + val d1 = s.expectNext() + val d2 = s.expectNext() + Set(d1, d2) should equal(Set(11, 21)) + + // continue with just one secondary + s1.sendComplete() + s2.sendNext(4) + sub.request(1) + s.expectNext(4) + s2.sendComplete() + + // complete when all inputs have completed + s.expectComplete() + } + "support cancel of input" in { val publisher = PublisherProbe[String] val m = FlowGraph { implicit b ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala index ab9972a259..90e1618c33 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala @@ -86,7 +86,7 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings, private def precondition: TransferState = { behavior.condition match { - case _: ReadAny | _: Read ⇒ inputBunch.AnyOfMarkedInputs && primaryOutputs.NeedsDemand + case _: ReadAny | _: ReadPreferred | _: Read ⇒ inputBunch.AnyOfMarkedInputs && primaryOutputs.NeedsDemand } } @@ -99,6 +99,9 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings, behavior.condition match { case read: ReadAny ⇒ markInputs(read.inputs.toArray) + case ReadPreferred(preferred, secondaries) ⇒ + markInputs(secondaries.toArray) + inputBunch.markInput(preferred.portIndex) case Read(input) ⇒ require(inputMapping.contains(input.portIndex), s"Unknown input handle $input") require(!inputBunch.isCancelled(input.portIndex), s"Read not allowed from cancelled $input") @@ -119,7 +122,12 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings, val inputHandle = inputMapping(id) changeBehavior(behavior.onInput(ctx, inputHandle, elem)) triggerCompletionAfterRead(inputHandle) - + case ReadPreferred(preferred, secondaries) ⇒ + val id = inputBunch.idToDequeue() + val elem = inputBunch.dequeueAndPrefer(id) + val inputHandle = inputMapping(id) + changeBehavior(behavior.onInput(ctx, inputHandle, elem)) + triggerCompletionAfterRead(inputHandle) case Read(inputHandle) ⇒ val elem = inputBunch.dequeue(inputHandle.portIndex) changeBehavior(behavior.onInput(ctx, inputHandle, elem)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala index e51a231544..6f94becdb4 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala @@ -66,6 +66,18 @@ object FlexiMerge { */ class ReadAny(val inputs: JList[InputHandle]) extends ReadCondition + /** + * Read condition for the [[MergeLogic#State]] that will be + * fulfilled when there are elements for any of the given upstream + * inputs, however it differs from [[ReadAny]] in the case that both + * the `preferred` and at least one other `secondary` input have demand, + * the `preferred` input will always be consumed first. + * + * Cancelled and completed inputs are not used, i.e. it is allowed + * to specify them in the list of `inputs`. + */ + class ReadPreferred(val preferred: InputHandle, val secondaries: JList[InputHandle]) extends ReadCondition + /** * Context that is passed to the methods of [[State]] and [[CompletionHandling]]. * The context provides means for performing side effects, such as emitting elements @@ -159,6 +171,14 @@ object FlexiMerge { new ReadAny(inputs.asJava) } + /** + * Convenience to create a [[ReadPreferred]] condition. + */ + @varargs def readPreferred(preferred: InputHandle, secondaries: InputHandle*): ReadPreferred = { + import scala.collection.JavaConverters._ + new ReadPreferred(preferred, secondaries.asJava) + } + /** * Convenience to create a [[Read]] condition. */ @@ -254,11 +274,13 @@ object FlexiMerge { } - def convertReadCondition(condition: ReadCondition): scaladsl.FlexiMerge.ReadCondition = + def convertReadCondition(condition: ReadCondition): scaladsl.FlexiMerge.ReadCondition = { condition match { - case r: ReadAny ⇒ scaladsl.FlexiMerge.ReadAny(immutableIndexedSeq(r.inputs)) - case r: Read ⇒ scaladsl.FlexiMerge.Read(r.input) + case r: ReadAny ⇒ scaladsl.FlexiMerge.ReadAny(immutableIndexedSeq(r.inputs)) + case r: ReadPreferred ⇒ scaladsl.FlexiMerge.ReadPreferred(r.preferred, immutableIndexedSeq(r.secondaries)) + case r: Read ⇒ scaladsl.FlexiMerge.Read(r.input) } + } } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala index fb7261af60..1366a3c02a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala @@ -3,6 +3,7 @@ */ package akka.stream.scaladsl +import scala.annotation.varargs import scala.collection.immutable import akka.stream.impl.Ast import akka.stream.impl.FlexiMergeImpl.MergeLogicFactory @@ -58,9 +59,28 @@ object FlexiMerge { */ final case class ReadAny(inputs: InputHandle*) extends ReadCondition + object ReadPreferred { + def apply(preferred: InputHandle)(secondaries: InputHandle*): ReadPreferred = + new ReadPreferred(preferred, secondaries.toArray) + + def apply(preferred: InputHandle, secondaries: immutable.Seq[InputHandle]): ReadPreferred = + new ReadPreferred(preferred, secondaries.toArray) + } /** - * The possibly stateful logic that reads from input via the defined [[State]] and - * handles completion and error via the defined [[CompletionHandling]]. + * Read condition for the [[MergeLogic#State]] that will be + * fulfilled when there are elements for any of the given upstream + * inputs, however it differs from [[ReadAny]] in the case that both + * the `preferred` and at least one other `secondary` input have demand, + * the `preferred` input will always be consumed first. + * + * Cancelled and completed inputs are not used, i.e. it is allowed + * to specify them in the list of `inputs`. + */ + final case class ReadPreferred(preferred: InputHandle, secondaries: Array[InputHandle]) extends ReadCondition + + /** + * The possibly stateful logic that reads from input via the defined [[MergeLogic#State]] and + * handles completion and error via the defined [[FlexiMerge#CompletionHandling]]. * * Concrete instance is supposed to be created by implementing [[FlexiMerge#createMergeLogic]]. */