diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index e8086ac2c0..9270e591d5 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -52,25 +52,16 @@ final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) e override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { private var initialized = false - private val pendingQueue = Array.ofDim[Inlet[T]](inputPorts) - private var pendingHead = 0 - private var pendingTail = 0 + private val pendingQueue = FixedSizeBuffer[Inlet[T]](inputPorts) + private def pending: Boolean = pendingQueue.nonEmpty private var runningUpstreams = inputPorts private def upstreamsClosed = runningUpstreams == 0 - private def pending: Boolean = pendingHead != pendingTail - override def preStart(): Unit = in.foreach(tryPull) - private def enqueue(in: Inlet[T]): Unit = { - pendingQueue(pendingTail % inputPorts) = in - pendingTail += 1 - } - private def dequeueAndDispatch(): Unit = { - val in = pendingQueue(pendingHead % inputPorts) - pendingHead += 1 + val in = pendingQueue.dequeue() push(out, grab(in)) if (upstreamsClosed && !pending) completeStage() else tryPull(in) @@ -84,7 +75,7 @@ final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) e push(out, grab(i)) tryPull(i) } - } else enqueue(i) + } else pendingQueue.enqueue(i) } override def onUpstreamFinish() = @@ -1009,7 +1000,7 @@ object GraphDSL extends GraphApply { } private class PortOpsImpl[+Out](override val outlet: Outlet[Out @uncheckedVariance], b: Builder[_]) - extends PortOps[Out] { + extends PortOps[Out] { override def withAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported override def addAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported