Merge pull request #17188 from akka/wip-17157-correct-input-in-ReadPreferred-∂π

=str #17157 indicate right input in ReadPreferred
This commit is contained in:
Roland Kuhn 2015-04-14 10:52:02 +02:00
commit 6d0ad317d0
3 changed files with 43 additions and 3 deletions

View file

@ -13,6 +13,8 @@ import scala.util.control.NoStackTrace
import scala.collection.immutable
import akka.actor.ActorRef
import akka.testkit.TestProbe
import scala.concurrent.Await
import scala.concurrent.duration._
object GraphFlexiMergeSpec {
@ -687,6 +689,42 @@ class GraphFlexiMergeSpec extends AkkaSpec {
s.expectComplete()
}
"have the correct value for input in ReadPreffered" in {
import akka.stream.FanInShape._
class MShape[T](_init: Init[T] = Name("mshape")) extends FanInShape[T](_init) {
val priority = newInlet[T]("priority")
val second = newInlet[T]("second")
protected override def construct(i: Init[T]) = new MShape(i)
}
class MyMerge[T] extends FlexiMerge[T, MShape[T]](
new MShape, OperationAttributes.name("cmerge")) {
import akka.stream.scaladsl.FlexiMerge._
override def createMergeLogic(p: PortT) = new MergeLogic[T] {
override def initialState =
State[T](ReadPreferred(p.priority, p.second)) {
(ctx, input, element)
if (element == 1) assert(input == p.priority)
if (element == 2) assert(input == p.second)
ctx.emit(element)
SameState
}
}
}
val sink = Sink.fold[Int, Int](0)(_ + _)
val graph = FlowGraph.closed(sink) { implicit b
sink
import FlowGraph.Implicits._
val merge = b.add(new MyMerge[Int]())
Source.single(1) ~> merge.priority
Source.single(2) ~> merge.second
merge.out ~> sink.inlet
}
Await.result(graph.run(), 1.second) should equal(3)
}
}
}

View file

@ -68,6 +68,8 @@ private[akka] object FanIn {
| mark=$markCount pend=$markedPending depl=$markedDepleted pref=$preferredId""".stripMargin
private var preferredId = 0
private var _lastDequeuedId = 0
def lastDequeuedId = _lastDequeuedId
def cancel(): Unit =
if (!allCancelled) {
@ -143,6 +145,7 @@ private[akka] object FanIn {
def dequeue(id: Int): Any = {
require(!isDepleted(id), s"Can't dequeue from depleted $id")
require(isPending(id), s"No pending input at $id")
_lastDequeuedId = id
val input = inputs(id)
val elem = input.dequeueInputElement()
if (!input.inputsAvailable) {
@ -339,4 +342,3 @@ private[akka] final class Concat(_settings: ActorFlowMaterializerSettings) exten
nextPhase(drainFirst)
}

View file

@ -126,8 +126,8 @@ private[akka] class FlexiMergeImpl[T, S <: Shape](
callOnInput(inputHandle, elem)
triggerCompletionAfterRead(inputHandle)
case r: ReadPreferred[t]
val id = indexOf(r.preferred)
val elem = inputBunch.dequeuePrefering(id)
val elem = inputBunch.dequeuePrefering(indexOf(r.preferred))
val id = inputBunch.lastDequeuedId
val inputHandle = inputMapping(id)
callOnInput(inputHandle, elem)
triggerCompletionAfterRead(inputHandle)