diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala index d93ed94027..b0899713d4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala @@ -13,6 +13,8 @@ import scala.util.control.NoStackTrace import scala.collection.immutable import akka.actor.ActorRef import akka.testkit.TestProbe +import scala.concurrent.Await +import scala.concurrent.duration._ object GraphFlexiMergeSpec { @@ -687,6 +689,42 @@ class GraphFlexiMergeSpec extends AkkaSpec { s.expectComplete() } + "have the correct value for input in ReadPreffered" in { + import akka.stream.FanInShape._ + class MShape[T](_init: Init[T] = Name("mshape")) extends FanInShape[T](_init) { + val priority = newInlet[T]("priority") + val second = newInlet[T]("second") + protected override def construct(i: Init[T]) = new MShape(i) + } + class MyMerge[T] extends FlexiMerge[T, MShape[T]]( + new MShape, OperationAttributes.name("cmerge")) { + import akka.stream.scaladsl.FlexiMerge._ + override def createMergeLogic(p: PortT) = new MergeLogic[T] { + override def initialState = + State[T](ReadPreferred(p.priority, p.second)) { + (ctx, input, element) ⇒ + if (element == 1) assert(input == p.priority) + if (element == 2) assert(input == p.second) + ctx.emit(element) + SameState + } + } + } + + val sink = Sink.fold[Int, Int](0)(_ + _) + val graph = FlowGraph.closed(sink) { implicit b ⇒ + sink ⇒ + import FlowGraph.Implicits._ + + val merge = b.add(new MyMerge[Int]()) + + Source.single(1) ~> merge.priority + Source.single(2) ~> merge.second + + merge.out ~> sink.inlet + } + Await.result(graph.run(), 1.second) should equal(3) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index 2bfab90e87..6980fbb279 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -68,6 +68,8 @@ private[akka] object FanIn { | mark=$markCount pend=$markedPending depl=$markedDepleted pref=$preferredId""".stripMargin private var preferredId = 0 + private var _lastDequeuedId = 0 + def lastDequeuedId = _lastDequeuedId def cancel(): Unit = if (!allCancelled) { @@ -143,6 +145,7 @@ private[akka] object FanIn { def dequeue(id: Int): Any = { require(!isDepleted(id), s"Can't dequeue from depleted $id") require(isPending(id), s"No pending input at $id") + _lastDequeuedId = id val input = inputs(id) val elem = input.dequeueInputElement() if (!input.inputsAvailable) { @@ -339,4 +342,3 @@ private[akka] final class Concat(_settings: ActorFlowMaterializerSettings) exten nextPhase(drainFirst) } - diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala index ac8b9fd3a9..2a43b078fa 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala @@ -126,8 +126,8 @@ private[akka] class FlexiMergeImpl[T, S <: Shape]( callOnInput(inputHandle, elem) triggerCompletionAfterRead(inputHandle) case r: ReadPreferred[t] ⇒ - val id = indexOf(r.preferred) - val elem = inputBunch.dequeuePrefering(id) + val elem = inputBunch.dequeuePrefering(indexOf(r.preferred)) + val id = inputBunch.lastDequeuedId val inputHandle = inputMapping(id) callOnInput(inputHandle, elem) triggerCompletionAfterRead(inputHandle)