From bd7c5fee9a70c9f3cf612b61157592f29374215f Mon Sep 17 00:00:00 2001 From: Stefan Wachter Date: Tue, 16 Feb 2016 21:49:25 +0100 Subject: [PATCH] simplify InHandler in Merge junction: isAvailable(out) implies nothing is pending --- .../src/main/scala/akka/stream/scaladsl/Graph.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index aa478f4551..34a974c4ff 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -71,10 +71,10 @@ final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) e setHandler(i, new InHandler { override def onPush(): Unit = { if (isAvailable(out)) { - if (!pending) { - push(out, grab(i)) - tryPull(i) - } + // isAvailable(out) implies !pending + // -> grab and push immediately + push(out, grab(i)) + tryPull(i) } else pendingQueue.enqueue(i) }