Merge pull request #19880 from johanandren/wip-pr-19808-for-merge

Merge: isAvailable(out) implies nothing is pending
This commit is contained in:
Johan Andrén 2016-02-25 15:18:35 +01:00
commit b71d8add7d

View file

@ -71,10 +71,10 @@ final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) e
setHandler(i, new InHandler {
override def onPush(): Unit = {
if (isAvailable(out)) {
if (!pending) {
push(out, grab(i))
tryPull(i)
}
// isAvailable(out) implies !pending
// -> grab and push immediately
push(out, grab(i))
tryPull(i)
} else pendingQueue.enqueue(i)
}