fix Merge internal buffer wrap-around, fixes #19806

This commit is contained in:
Roland Kuhn 2016-02-16 20:17:24 +01:00
parent bfe8d3bb31
commit a1e7e87f38

View file

@ -52,25 +52,16 @@ final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) e
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var initialized = false
private val pendingQueue = Array.ofDim[Inlet[T]](inputPorts)
private var pendingHead = 0
private var pendingTail = 0
private val pendingQueue = FixedSizeBuffer[Inlet[T]](inputPorts)
private def pending: Boolean = pendingQueue.nonEmpty
private var runningUpstreams = inputPorts
private def upstreamsClosed = runningUpstreams == 0
private def pending: Boolean = pendingHead != pendingTail
override def preStart(): Unit = in.foreach(tryPull)
private def enqueue(in: Inlet[T]): Unit = {
pendingQueue(pendingTail % inputPorts) = in
pendingTail += 1
}
private def dequeueAndDispatch(): Unit = {
val in = pendingQueue(pendingHead % inputPorts)
pendingHead += 1
val in = pendingQueue.dequeue()
push(out, grab(in))
if (upstreamsClosed && !pending) completeStage()
else tryPull(in)
@ -84,7 +75,7 @@ final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) e
push(out, grab(i))
tryPull(i)
}
} else enqueue(i)
} else pendingQueue.enqueue(i)
}
override def onUpstreamFinish() =
@ -1009,7 +1000,7 @@ object GraphDSL extends GraphApply {
}
private class PortOpsImpl[+Out](override val outlet: Outlet[Out @uncheckedVariance], b: Builder[_])
extends PortOps[Out] {
extends PortOps[Out] {
override def withAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported
override def addAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported